From b5c86e7031b798c99542363eeab32cf5142a4b7f Mon Sep 17 00:00:00 2001 From: Allan Bowe Date: Sun, 31 Jan 2021 17:58:13 +0100 Subject: [PATCH] fix: mv_jobflow param mixup, not all jobs were running (fixed now). Also fixed doc formatting, removed unnecessary logging, and fixed a debug switch. --- all.sas | 69 +++++++++++++++++++--------------- base/mf_getkeyvalue.sas | 4 +- base/mp_setkeyvalue.sas | 6 +-- viya/mv_createfolder.sas | 1 - viya/mv_deletefoldermember.sas | 2 +- viya/mv_deleteviyafolder.sas | 1 - viya/mv_getfoldermembers.sas | 2 +- viya/mv_getgroups.sas | 2 +- viya/mv_getjobcode.sas | 1 - viya/mv_getjoblog.sas | 4 +- viya/mv_getjobstate.sas | 2 +- viya/mv_jobflow.sas | 44 +++++++++++++--------- 12 files changed, 76 insertions(+), 62 deletions(-) diff --git a/all.sas b/all.sas index b4aabc3..d0a2f32 100644 --- a/all.sas +++ b/all.sas @@ -510,8 +510,8 @@ options noquotelenmax; @brief retrieves a key value pair from a control dataset @details By default, control dataset is work.mp_setkeyvalue. Usage: - %mp_setkeyvalue(someindex,22,type=N) - %put %mf_getkeyvalue(someindex) + %mp_setkeyvalue(someindex,22,type=N) + %put %mf_getkeyvalue(someindex) @param key Provide a key on which to perform the lookup @@ -4548,8 +4548,8 @@ proc sql @brief Logs a key value pair a control dataset @details If the dataset does not exist, it is created. Usage: - %mp_setkeyvalue(someindex,22,type=N) - %mp_setkeyvalue(somenewindex,somevalue) + %mp_setkeyvalue(someindex,22,type=N) + %mp_setkeyvalue(somenewindex,somevalue)

SAS Macros

@li mf_existds.sas @@ -4571,7 +4571,7 @@ proc sql %if not (%mf_existds(&libds)) %then %do; data &libds (index=(key/unique)); - length key $32 valc $256 valn 8 type $1; + length key $64 valc $2048 valn 8 type $1; call missing(of _all_); stop; run; @@ -10659,7 +10659,6 @@ run; @details Expects oauth token in a global macro variable (default ACCESS_TOKEN). - options mprint; %mv_createfolder(path=/Public) @@ -11509,7 +11508,7 @@ run; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -11764,7 +11763,6 @@ libname &libref1a clear; @details If not running in Studo 5 +, will expect an oauth token in a global macro variable (default ACCESS_TOKEN). - options mprint; %mv_createfolder(path=/Public/test/blah) %mv_deleteviyafolder(path=/Public/test) @@ -12094,7 +12092,7 @@ libname &libref1 clear; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -12329,7 +12327,7 @@ filename &fname1 clear; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -12422,7 +12420,6 @@ libname &libref1 clear; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -12617,7 +12614,7 @@ filename &fname3 clear; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -12769,7 +12766,7 @@ data _null_; end; input; put _infile_; - %if &mdebug=0 %then %do; + %if &mdebug=1 %then %do; putlog _infile_; %end; if last then do; @@ -12892,7 +12889,7 @@ run; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -13472,6 +13469,7 @@ libname &libref; @li sas_services - will use oauth_bearer=sas_services @param [in] inds= The input dataset containing a list of jobs and parameters @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. + @param [in] mdebug= set to 1 to enable DEBUG messages @param [out] outds= The output dataset containing the results @param [out] outref= The output fileref to which to append the log file(s). @@ -13494,6 +13492,7 @@ libname &libref; ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,outref=0 + ,mdebug=0 ); %local oauth_bearer; %if &grant_type=detect %then %do; @@ -13616,8 +13615,8 @@ data;run;%let jdswaitfor=&syslast; jparams='jparams'!!left(symget('jid')); call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); run; - %local joburi&jid; - %let joburi&jid=0; /* used in next loop */ + %local jobuid&jid; + %let jobuid&jid=0; /* used in next loop */ %end; %local concurrency completed; %let concurrency=0; @@ -13628,8 +13627,21 @@ data;run;%let jdswaitfor=&syslast; * now we can execute the jobs up to the maxconcurrency setting */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ - %if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do; - /* job has not been triggered and we have free slots */ + + /* check to see if the job finished in the previous round */ + %if %sysfunc(exist(&outds))=1 %then %do; + %local jobcheck; %let jobcheck=0; + proc sql noprint; + select count(*) into: jobcheck + from &outds where uuid="&&jobuid&jid"; + %if &jobcheck>0 %then %do; + %put &&job&jid in flow &fid with uid &&jobuid&jid completed!; + %let job&jid=0; + %end; + %end; + + /* check if job was triggered and if so, if we have enough slots to run */ + %if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do; %local jobname jobpath; %let jobname=%scan(&&job&jid,-1,/); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); @@ -13643,23 +13655,14 @@ data;run;%let jdswaitfor=&syslast; format jobparams $32767.; set &jdsapp(where=(method='GET' and rel='state')); jobparams=symget("jparams&jid"); - call symputx("joburi&jid",uri,'l'); + /* uri here has the /state suffix */ + uuid=scan(uri,-2,'/'); + call symputx("jobuid&jid",uuid,'l'); run; proc append base=&jdsrunning data=&jdsapp; run; %let concurrency=%eval(&concurrency+1); %end; - %else %if %sysfunc(exist(&outds))=1 %then %do; - /* check to see if the job has finished as was previously executed */ - %local jobcheck; %let jobcheck=0; - proc sql noprint; - select count(*) into: jobcheck - from &outds where uri="&&joburi&jid"; - %if &jobcheck>0 %then %do; - %put &&job&jid in flow &fid with uri &&joburi&jid completed!; - %let job&jid=0; - %end; - %end; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ @@ -13672,13 +13675,14 @@ data;run;%let jdswaitfor=&syslast; data &jdsapp; set &jdswaitfor; flow_id=&&flow&fid; + uuid=scan(uri,-1,'/'); run; proc append base=&outds data=&jdsapp; run; %end; proc sql; delete from &jdsrunning - where uri in (select uri from &outds + where uuid in (select uuid from &outds where state in ('canceled','completed','failed') ); @@ -13692,6 +13696,9 @@ data;run;%let jdswaitfor=&syslast; /* back up and execute the next flow */ %end; +%if &mdebug=1 %then %do; + %put _local_; +%end; %mend; /** diff --git a/base/mf_getkeyvalue.sas b/base/mf_getkeyvalue.sas index 465e7c9..b3e9136 100644 --- a/base/mf_getkeyvalue.sas +++ b/base/mf_getkeyvalue.sas @@ -3,8 +3,8 @@ @brief retrieves a key value pair from a control dataset @details By default, control dataset is work.mp_setkeyvalue. Usage: - %mp_setkeyvalue(someindex,22,type=N) - %put %mf_getkeyvalue(someindex) + %mp_setkeyvalue(someindex,22,type=N) + %put %mf_getkeyvalue(someindex) @param key Provide a key on which to perform the lookup diff --git a/base/mp_setkeyvalue.sas b/base/mp_setkeyvalue.sas index a9e75a0..b01a5e3 100644 --- a/base/mp_setkeyvalue.sas +++ b/base/mp_setkeyvalue.sas @@ -3,8 +3,8 @@ @brief Logs a key value pair a control dataset @details If the dataset does not exist, it is created. Usage: - %mp_setkeyvalue(someindex,22,type=N) - %mp_setkeyvalue(somenewindex,somevalue) + %mp_setkeyvalue(someindex,22,type=N) + %mp_setkeyvalue(somenewindex,somevalue)

SAS Macros

@li mf_existds.sas @@ -26,7 +26,7 @@ %if not (%mf_existds(&libds)) %then %do; data &libds (index=(key/unique)); - length key $32 valc $256 valn 8 type $1; + length key $64 valc $2048 valn 8 type $1; call missing(of _all_); stop; run; diff --git a/viya/mv_createfolder.sas b/viya/mv_createfolder.sas index 6f9cb68..582ac66 100644 --- a/viya/mv_createfolder.sas +++ b/viya/mv_createfolder.sas @@ -4,7 +4,6 @@ @details Expects oauth token in a global macro variable (default ACCESS_TOKEN). - options mprint; %mv_createfolder(path=/Public) diff --git a/viya/mv_deletefoldermember.sas b/viya/mv_deletefoldermember.sas index e3c6e5f..aa2bf73 100644 --- a/viya/mv_deletefoldermember.sas +++ b/viya/mv_deletefoldermember.sas @@ -46,7 +46,7 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) diff --git a/viya/mv_deleteviyafolder.sas b/viya/mv_deleteviyafolder.sas index 3eae200..2b5e640 100644 --- a/viya/mv_deleteviyafolder.sas +++ b/viya/mv_deleteviyafolder.sas @@ -4,7 +4,6 @@ @details If not running in Studo 5 +, will expect an oauth token in a global macro variable (default ACCESS_TOKEN). - options mprint; %mv_createfolder(path=/Public/test/blah) %mv_deleteviyafolder(path=/Public/test) diff --git a/viya/mv_getfoldermembers.sas b/viya/mv_getfoldermembers.sas index 4659bc8..0d492d3 100644 --- a/viya/mv_getfoldermembers.sas +++ b/viya/mv_getfoldermembers.sas @@ -40,7 +40,7 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) diff --git a/viya/mv_getgroups.sas b/viya/mv_getgroups.sas index 5611ef1..b292d40 100644 --- a/viya/mv_getgroups.sas +++ b/viya/mv_getgroups.sas @@ -52,7 +52,7 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) diff --git a/viya/mv_getjobcode.sas b/viya/mv_getjobcode.sas index 996168e..e01a339 100644 --- a/viya/mv_getjobcode.sas +++ b/viya/mv_getjobcode.sas @@ -49,7 +49,6 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) diff --git a/viya/mv_getjoblog.sas b/viya/mv_getjoblog.sas index de641a5..860928e 100644 --- a/viya/mv_getjoblog.sas +++ b/viya/mv_getjoblog.sas @@ -93,7 +93,7 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -245,7 +245,7 @@ data _null_; end; input; put _infile_; - %if &mdebug=0 %then %do; + %if &mdebug=1 %then %do; putlog _infile_; %end; if last then do; diff --git a/viya/mv_getjobstate.sas b/viya/mv_getjobstate.sas index dc95415..77ccb35 100644 --- a/viya/mv_getjobstate.sas +++ b/viya/mv_getjobstate.sas @@ -101,7 +101,7 @@ %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) diff --git a/viya/mv_jobflow.sas b/viya/mv_jobflow.sas index 73c3f96..f49d99d 100644 --- a/viya/mv_jobflow.sas +++ b/viya/mv_jobflow.sas @@ -103,6 +103,7 @@ @li sas_services - will use oauth_bearer=sas_services @param [in] inds= The input dataset containing a list of jobs and parameters @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. + @param [in] mdebug= set to 1 to enable DEBUG messages @param [out] outds= The output dataset containing the results @param [out] outref= The output fileref to which to append the log file(s). @@ -125,6 +126,7 @@ ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,outref=0 + ,mdebug=0 ); %local oauth_bearer; %if &grant_type=detect %then %do; @@ -247,8 +249,8 @@ data;run;%let jdswaitfor=&syslast; jparams='jparams'!!left(symget('jid')); call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); run; - %local joburi&jid; - %let joburi&jid=0; /* used in next loop */ + %local jobuid&jid; + %let jobuid&jid=0; /* used in next loop */ %end; %local concurrency completed; %let concurrency=0; @@ -259,8 +261,21 @@ data;run;%let jdswaitfor=&syslast; * now we can execute the jobs up to the maxconcurrency setting */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ - %if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do; - /* job has not been triggered and we have free slots */ + + /* check to see if the job finished in the previous round */ + %if %sysfunc(exist(&outds))=1 %then %do; + %local jobcheck; %let jobcheck=0; + proc sql noprint; + select count(*) into: jobcheck + from &outds where uuid="&&jobuid&jid"; + %if &jobcheck>0 %then %do; + %put &&job&jid in flow &fid with uid &&jobuid&jid completed!; + %let job&jid=0; + %end; + %end; + + /* check if job was triggered and if so, if we have enough slots to run */ + %if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do; %local jobname jobpath; %let jobname=%scan(&&job&jid,-1,/); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); @@ -274,23 +289,14 @@ data;run;%let jdswaitfor=&syslast; format jobparams $32767.; set &jdsapp(where=(method='GET' and rel='state')); jobparams=symget("jparams&jid"); - call symputx("joburi&jid",uri,'l'); + /* uri here has the /state suffix */ + uuid=scan(uri,-2,'/'); + call symputx("jobuid&jid",uuid,'l'); run; proc append base=&jdsrunning data=&jdsapp; run; %let concurrency=%eval(&concurrency+1); %end; - %else %if %sysfunc(exist(&outds))=1 %then %do; - /* check to see if the job has finished as was previously executed */ - %local jobcheck; %let jobcheck=0; - proc sql noprint; - select count(*) into: jobcheck - from &outds where uri="&&joburi&jid"; - %if &jobcheck>0 %then %do; - %put &&job&jid in flow &fid with uri &&joburi&jid completed!; - %let job&jid=0; - %end; - %end; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ @@ -303,13 +309,14 @@ data;run;%let jdswaitfor=&syslast; data &jdsapp; set &jdswaitfor; flow_id=&&flow&fid; + uuid=scan(uri,-1,'/'); run; proc append base=&outds data=&jdsapp; run; %end; proc sql; delete from &jdsrunning - where uri in (select uri from &outds + where uuid in (select uuid from &outds where state in ('canceled','completed','failed') ); @@ -323,5 +330,8 @@ data;run;%let jdswaitfor=&syslast; /* back up and execute the next flow */ %end; +%if &mdebug=1 %then %do; + %put _local_; +%end; %mend;