1
0
mirror of https://github.com/sasjs/core.git synced 2026-01-03 23:50:06 +00:00

Compare commits

...

4 Commits

13 changed files with 152 additions and 88 deletions

120
all.sas
View File

@@ -404,7 +404,7 @@ options noquotelenmax;
%local dsid rc; %local dsid rc;
%let dsid=%sysfunc(open(&libds,is)); %let dsid=%sysfunc(open(&libds,is));
%if &dsid = 0 %then %do; %if &dsid = 0 %then %do;
%put WARNING: Cannot open %trim(&libds), system message below; %put %str(WARN)ING: Cannot open %trim(&libds), system message below;
%put %sysfunc(sysmsg()); %put %sysfunc(sysmsg());
-1 -1
%end; %end;
@@ -510,8 +510,8 @@ options noquotelenmax;
@brief retrieves a key value pair from a control dataset @brief retrieves a key value pair from a control dataset
@details By default, control dataset is work.mp_setkeyvalue. Usage: @details By default, control dataset is work.mp_setkeyvalue. Usage:
%mp_setkeyvalue(someindex,22,type=N) %mp_setkeyvalue(someindex,22,type=N)
%put %mf_getkeyvalue(someindex) %put %mf_getkeyvalue(someindex)
@param key Provide a key on which to perform the lookup @param key Provide a key on which to perform the lookup
@@ -4548,8 +4548,8 @@ proc sql
@brief Logs a key value pair a control dataset @brief Logs a key value pair a control dataset
@details If the dataset does not exist, it is created. Usage: @details If the dataset does not exist, it is created. Usage:
%mp_setkeyvalue(someindex,22,type=N) %mp_setkeyvalue(someindex,22,type=N)
%mp_setkeyvalue(somenewindex,somevalue) %mp_setkeyvalue(somenewindex,somevalue)
<h4> SAS Macros </h4> <h4> SAS Macros </h4>
@li mf_existds.sas @li mf_existds.sas
@@ -4571,7 +4571,7 @@ proc sql
%if not (%mf_existds(&libds)) %then %do; %if not (%mf_existds(&libds)) %then %do;
data &libds (index=(key/unique)); data &libds (index=(key/unique));
length key $32 valc $256 valn 8 type $1; length key $64 valc $2048 valn 8 type $1;
call missing(of _all_); call missing(of _all_);
stop; stop;
run; run;
@@ -10659,7 +10659,6 @@ run;
@details Expects oauth token in a global macro variable (default @details Expects oauth token in a global macro variable (default
ACCESS_TOKEN). ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public) %mv_createfolder(path=/Public)
@@ -11509,7 +11508,7 @@ run;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -11764,7 +11763,6 @@ libname &libref1a clear;
@details If not running in Studo 5 +, will expect an oauth token in a global @details If not running in Studo 5 +, will expect an oauth token in a global
macro variable (default ACCESS_TOKEN). macro variable (default ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public/test/blah) %mv_createfolder(path=/Public/test/blah)
%mv_deleteviyafolder(path=/Public/test) %mv_deleteviyafolder(path=/Public/test)
@@ -12094,7 +12092,7 @@ libname &libref1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12329,7 +12327,7 @@ filename &fname1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12422,7 +12420,6 @@ libname &libref1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12573,7 +12570,7 @@ filename &fname3 clear;
%mv_getjoblog(uri=&uri,outref=mylog) %mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor macro, which is generally a more This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log. convenient way to wait for the job to finish before fetching the log.
@@ -12617,7 +12614,7 @@ filename &fname3 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12769,7 +12766,7 @@ data _null_;
end; end;
input; input;
put _infile_; put _infile_;
%if &mdebug=0 %then %do; %if &mdebug=1 %then %do;
putlog _infile_; putlog _infile_;
%end; %end;
if last then do; if last then do;
@@ -12892,7 +12889,7 @@ run;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -13378,18 +13375,19 @@ libname &libref;
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -13450,7 +13448,16 @@ libname &libref;
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -13462,7 +13469,9 @@ libname &libref;
@li sas_services - will use oauth_bearer=sas_services @li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13482,6 +13491,8 @@ libname &libref;
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
,mdebug=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13504,16 +13515,29 @@ libname &libref;
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -13591,8 +13615,8 @@ data;run;%let jdswaitfor=&syslast;
jparams='jparams'!!left(symget('jid')); jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run; run;
%local joburi&jid; %local jobuid&jid;
%let joburi&jid=0; /* used in next loop */ %let jobuid&jid=0; /* used in next loop */
%end; %end;
%local concurrency completed; %local concurrency completed;
%let concurrency=0; %let concurrency=0;
@@ -13603,8 +13627,21 @@ data;run;%let jdswaitfor=&syslast;
* now we can execute the jobs up to the maxconcurrency setting * now we can execute the jobs up to the maxconcurrency setting
*/ */
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */ /* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath; %local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/); %let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
@@ -13618,27 +13655,18 @@ data;run;%let jdswaitfor=&syslast;
format jobparams $32767.; format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state')); set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid"); jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l'); /* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run; run;
proc append base=&jdsrunning data=&jdsapp; proc append base=&jdsrunning data=&jdsapp;
run; run;
%let concurrency=%eval(&concurrency+1); %let concurrency=%eval(&concurrency+1);
%end; %end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -13647,13 +13675,14 @@ data;run;%let jdswaitfor=&syslast;
data &jdsapp; data &jdsapp;
set &jdswaitfor; set &jdswaitfor;
flow_id=&&flow&fid; flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run; run;
proc append base=&outds data=&jdsapp; proc append base=&outds data=&jdsapp;
run; run;
%end; %end;
proc sql; proc sql;
delete from &jdsrunning delete from &jdsrunning
where uri in (select uri from &outds where uuid in (select uuid from &outds
where state in ('canceled','completed','failed') where state in ('canceled','completed','failed')
); );
@@ -13667,6 +13696,9 @@ data;run;%let jdswaitfor=&syslast;
/* back up and execute the next flow */ /* back up and execute the next flow */
%end; %end;
%if &mdebug=1 %then %do;
%put _local_;
%end;
%mend; %mend;
/** /**

View File

@@ -23,7 +23,7 @@
%local dsid rc; %local dsid rc;
%let dsid=%sysfunc(open(&libds,is)); %let dsid=%sysfunc(open(&libds,is));
%if &dsid = 0 %then %do; %if &dsid = 0 %then %do;
%put WARNING: Cannot open %trim(&libds), system message below; %put %str(WARN)ING: Cannot open %trim(&libds), system message below;
%put %sysfunc(sysmsg()); %put %sysfunc(sysmsg());
-1 -1
%end; %end;

View File

@@ -3,8 +3,8 @@
@brief retrieves a key value pair from a control dataset @brief retrieves a key value pair from a control dataset
@details By default, control dataset is work.mp_setkeyvalue. Usage: @details By default, control dataset is work.mp_setkeyvalue. Usage:
%mp_setkeyvalue(someindex,22,type=N) %mp_setkeyvalue(someindex,22,type=N)
%put %mf_getkeyvalue(someindex) %put %mf_getkeyvalue(someindex)
@param key Provide a key on which to perform the lookup @param key Provide a key on which to perform the lookup

View File

@@ -3,8 +3,8 @@
@brief Logs a key value pair a control dataset @brief Logs a key value pair a control dataset
@details If the dataset does not exist, it is created. Usage: @details If the dataset does not exist, it is created. Usage:
%mp_setkeyvalue(someindex,22,type=N) %mp_setkeyvalue(someindex,22,type=N)
%mp_setkeyvalue(somenewindex,somevalue) %mp_setkeyvalue(somenewindex,somevalue)
<h4> SAS Macros </h4> <h4> SAS Macros </h4>
@li mf_existds.sas @li mf_existds.sas
@@ -26,7 +26,7 @@
%if not (%mf_existds(&libds)) %then %do; %if not (%mf_existds(&libds)) %then %do;
data &libds (index=(key/unique)); data &libds (index=(key/unique));
length key $32 valc $256 valn 8 type $1; length key $64 valc $2048 valn 8 type $1;
call missing(of _all_); call missing(of _all_);
stop; stop;
run; run;

View File

@@ -4,7 +4,6 @@
@details Expects oauth token in a global macro variable (default @details Expects oauth token in a global macro variable (default
ACCESS_TOKEN). ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public) %mv_createfolder(path=/Public)

View File

@@ -46,7 +46,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -4,7 +4,6 @@
@details If not running in Studo 5 +, will expect an oauth token in a global @details If not running in Studo 5 +, will expect an oauth token in a global
macro variable (default ACCESS_TOKEN). macro variable (default ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public/test/blah) %mv_createfolder(path=/Public/test/blah)
%mv_deleteviyafolder(path=/Public/test) %mv_deleteviyafolder(path=/Public/test)

View File

@@ -40,7 +40,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -52,7 +52,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -49,7 +49,6 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -49,7 +49,7 @@
%mv_getjoblog(uri=&uri,outref=mylog) %mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor macro, which is generally a more This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log. convenient way to wait for the job to finish before fetching the log.
@@ -93,7 +93,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -245,7 +245,7 @@ data _null_;
end; end;
input; input;
put _infile_; put _infile_;
%if &mdebug=0 %then %do; %if &mdebug=1 %then %do;
putlog _infile_; putlog _infile_;
%end; %end;
if last then do; if last then do;

View File

@@ -101,7 +101,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -9,18 +9,19 @@
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -81,7 +82,16 @@
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -93,7 +103,9 @@
@li sas_services - will use oauth_bearer=sas_services @li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -113,6 +125,8 @@
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
,mdebug=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -135,16 +149,29 @@
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -222,8 +249,8 @@ data;run;%let jdswaitfor=&syslast;
jparams='jparams'!!left(symget('jid')); jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run; run;
%local joburi&jid; %local jobuid&jid;
%let joburi&jid=0; /* used in next loop */ %let jobuid&jid=0; /* used in next loop */
%end; %end;
%local concurrency completed; %local concurrency completed;
%let concurrency=0; %let concurrency=0;
@@ -234,8 +261,21 @@ data;run;%let jdswaitfor=&syslast;
* now we can execute the jobs up to the maxconcurrency setting * now we can execute the jobs up to the maxconcurrency setting
*/ */
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */ /* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath; %local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/); %let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
@@ -249,27 +289,18 @@ data;run;%let jdswaitfor=&syslast;
format jobparams $32767.; format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state')); set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid"); jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l'); /* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run; run;
proc append base=&jdsrunning data=&jdsapp; proc append base=&jdsrunning data=&jdsapp;
run; run;
%let concurrency=%eval(&concurrency+1); %let concurrency=%eval(&concurrency+1);
%end; %end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -278,13 +309,14 @@ data;run;%let jdswaitfor=&syslast;
data &jdsapp; data &jdsapp;
set &jdswaitfor; set &jdswaitfor;
flow_id=&&flow&fid; flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run; run;
proc append base=&outds data=&jdsapp; proc append base=&outds data=&jdsapp;
run; run;
%end; %end;
proc sql; proc sql;
delete from &jdsrunning delete from &jdsrunning
where uri in (select uri from &outds where uuid in (select uuid from &outds
where state in ('canceled','completed','failed') where state in ('canceled','completed','failed')
); );
@@ -298,5 +330,8 @@ data;run;%let jdswaitfor=&syslast;
/* back up and execute the next flow */ /* back up and execute the next flow */
%end; %end;
%if &mdebug=1 %then %do;
%put _local_;
%end;
%mend; %mend;