mirror of
https://github.com/sasjs/core.git
synced 2026-01-03 23:50:06 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4a6c8ffbb3 | |||
| b5c86e7031 | |||
| 9783edd0e3 | |||
| 961728a987 |
120
all.sas
120
all.sas
@@ -404,7 +404,7 @@ options noquotelenmax;
|
|||||||
%local dsid rc;
|
%local dsid rc;
|
||||||
%let dsid=%sysfunc(open(&libds,is));
|
%let dsid=%sysfunc(open(&libds,is));
|
||||||
%if &dsid = 0 %then %do;
|
%if &dsid = 0 %then %do;
|
||||||
%put WARNING: Cannot open %trim(&libds), system message below;
|
%put %str(WARN)ING: Cannot open %trim(&libds), system message below;
|
||||||
%put %sysfunc(sysmsg());
|
%put %sysfunc(sysmsg());
|
||||||
-1
|
-1
|
||||||
%end;
|
%end;
|
||||||
@@ -510,8 +510,8 @@ options noquotelenmax;
|
|||||||
@brief retrieves a key value pair from a control dataset
|
@brief retrieves a key value pair from a control dataset
|
||||||
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
||||||
|
|
||||||
%mp_setkeyvalue(someindex,22,type=N)
|
%mp_setkeyvalue(someindex,22,type=N)
|
||||||
%put %mf_getkeyvalue(someindex)
|
%put %mf_getkeyvalue(someindex)
|
||||||
|
|
||||||
|
|
||||||
@param key Provide a key on which to perform the lookup
|
@param key Provide a key on which to perform the lookup
|
||||||
@@ -4548,8 +4548,8 @@ proc sql
|
|||||||
@brief Logs a key value pair a control dataset
|
@brief Logs a key value pair a control dataset
|
||||||
@details If the dataset does not exist, it is created. Usage:
|
@details If the dataset does not exist, it is created. Usage:
|
||||||
|
|
||||||
%mp_setkeyvalue(someindex,22,type=N)
|
%mp_setkeyvalue(someindex,22,type=N)
|
||||||
%mp_setkeyvalue(somenewindex,somevalue)
|
%mp_setkeyvalue(somenewindex,somevalue)
|
||||||
|
|
||||||
<h4> SAS Macros </h4>
|
<h4> SAS Macros </h4>
|
||||||
@li mf_existds.sas
|
@li mf_existds.sas
|
||||||
@@ -4571,7 +4571,7 @@ proc sql
|
|||||||
|
|
||||||
%if not (%mf_existds(&libds)) %then %do;
|
%if not (%mf_existds(&libds)) %then %do;
|
||||||
data &libds (index=(key/unique));
|
data &libds (index=(key/unique));
|
||||||
length key $32 valc $256 valn 8 type $1;
|
length key $64 valc $2048 valn 8 type $1;
|
||||||
call missing(of _all_);
|
call missing(of _all_);
|
||||||
stop;
|
stop;
|
||||||
run;
|
run;
|
||||||
@@ -10659,7 +10659,6 @@ run;
|
|||||||
@details Expects oauth token in a global macro variable (default
|
@details Expects oauth token in a global macro variable (default
|
||||||
ACCESS_TOKEN).
|
ACCESS_TOKEN).
|
||||||
|
|
||||||
options mprint;
|
|
||||||
%mv_createfolder(path=/Public)
|
%mv_createfolder(path=/Public)
|
||||||
|
|
||||||
|
|
||||||
@@ -11509,7 +11508,7 @@ run;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -11764,7 +11763,6 @@ libname &libref1a clear;
|
|||||||
@details If not running in Studo 5 +, will expect an oauth token in a global
|
@details If not running in Studo 5 +, will expect an oauth token in a global
|
||||||
macro variable (default ACCESS_TOKEN).
|
macro variable (default ACCESS_TOKEN).
|
||||||
|
|
||||||
options mprint;
|
|
||||||
%mv_createfolder(path=/Public/test/blah)
|
%mv_createfolder(path=/Public/test/blah)
|
||||||
%mv_deleteviyafolder(path=/Public/test)
|
%mv_deleteviyafolder(path=/Public/test)
|
||||||
|
|
||||||
@@ -12094,7 +12092,7 @@ libname &libref1 clear;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -12329,7 +12327,7 @@ filename &fname1 clear;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -12422,7 +12420,6 @@ libname &libref1 clear;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -12573,7 +12570,7 @@ filename &fname3 clear;
|
|||||||
|
|
||||||
%mv_getjoblog(uri=&uri,outref=mylog)
|
%mv_getjoblog(uri=&uri,outref=mylog)
|
||||||
|
|
||||||
This macro is used by the mv_jobwaitfor macro, which is generally a more
|
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
|
||||||
convenient way to wait for the job to finish before fetching the log.
|
convenient way to wait for the job to finish before fetching the log.
|
||||||
|
|
||||||
|
|
||||||
@@ -12617,7 +12614,7 @@ filename &fname3 clear;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -12769,7 +12766,7 @@ data _null_;
|
|||||||
end;
|
end;
|
||||||
input;
|
input;
|
||||||
put _infile_;
|
put _infile_;
|
||||||
%if &mdebug=0 %then %do;
|
%if &mdebug=1 %then %do;
|
||||||
putlog _infile_;
|
putlog _infile_;
|
||||||
%end;
|
%end;
|
||||||
if last then do;
|
if last then do;
|
||||||
@@ -12892,7 +12889,7 @@ run;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -13378,18 +13375,19 @@ libname &libref;
|
|||||||
|
|
||||||
## Input table (minimum variables needed)
|
## Input table (minimum variables needed)
|
||||||
|
|
||||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
|
||||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
|
||||||
blank, will default to `SAS Job Execution compute context`.
|
|
||||||
@li _PROGRAM - Provides the path to the job itself
|
@li _PROGRAM - Provides the path to the job itself
|
||||||
|
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
|
||||||
|
optional, will default to 0 if not provided.
|
||||||
|
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||||
|
blank (or not provided), will default to `SAS Job Execution compute context`.
|
||||||
|
|
||||||
Any additional variables provided in this table are converted into macro
|
Any additional variables provided in this table are converted into macro
|
||||||
variables and passed into the relevant job.
|
variables and passed into the relevant job.
|
||||||
|
|
||||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
|_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
|/Public/jobs/somejob1|0|SAS Job Execution compute context|
|
||||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
|/Public/jobs/somejob2|0|SAS Job Execution compute context|
|
||||||
|
|
||||||
## Output table (minimum variables produced)
|
## Output table (minimum variables produced)
|
||||||
|
|
||||||
@@ -13450,7 +13448,16 @@ libname &libref;
|
|||||||
|
|
||||||
Trigger the flow
|
Trigger the flow
|
||||||
|
|
||||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
%mv_jobflow(inds=work.inputjobs
|
||||||
|
,maxconcurrency=4
|
||||||
|
,outds=work.results
|
||||||
|
,outref=myjoblog
|
||||||
|
)
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
infile myjoblog;
|
||||||
|
input; put _infile_;
|
||||||
|
run;
|
||||||
|
|
||||||
|
|
||||||
@param [in] access_token_var= The global macro variable to contain the access token
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
@@ -13462,7 +13469,9 @@ libname &libref;
|
|||||||
@li sas_services - will use oauth_bearer=sas_services
|
@li sas_services - will use oauth_bearer=sas_services
|
||||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||||
|
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||||
@param [out] outds= The output dataset containing the results
|
@param [out] outds= The output dataset containing the results
|
||||||
|
@param [out] outref= The output fileref to which to append the log file(s).
|
||||||
|
|
||||||
@version VIYA V.03.05
|
@version VIYA V.03.05
|
||||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
@@ -13482,6 +13491,8 @@ libname &libref;
|
|||||||
,maxconcurrency=8
|
,maxconcurrency=8
|
||||||
,access_token_var=ACCESS_TOKEN
|
,access_token_var=ACCESS_TOKEN
|
||||||
,grant_type=sas_services
|
,grant_type=sas_services
|
||||||
|
,outref=0
|
||||||
|
,mdebug=0
|
||||||
);
|
);
|
||||||
%local oauth_bearer;
|
%local oauth_bearer;
|
||||||
%if &grant_type=detect %then %do;
|
%if &grant_type=detect %then %do;
|
||||||
@@ -13504,16 +13515,29 @@ libname &libref;
|
|||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(Input dataset was not provided)
|
,msg=%str(Input dataset was not provided)
|
||||||
)
|
)
|
||||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
|
||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(The following columns must exist on input dataset &inds:
|
,msg=%str(The _PROGRAM column must exist on input dataset &inds)
|
||||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
|
||||||
)
|
)
|
||||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/* set defaults if not provided */
|
||||||
|
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
|
||||||
|
data &inds;
|
||||||
|
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
|
||||||
|
length _CONTEXTNAME $128;
|
||||||
|
retain _CONTEXTNAME "SAS Job Execution compute context";
|
||||||
|
%end;
|
||||||
|
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
|
||||||
|
retain FLOW_ID 0;
|
||||||
|
%end;
|
||||||
|
set &inds;
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
|
||||||
%local missings;
|
%local missings;
|
||||||
proc sql noprint;
|
proc sql noprint;
|
||||||
select count(*) into: missings
|
select count(*) into: missings
|
||||||
@@ -13591,8 +13615,8 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
jparams='jparams'!!left(symget('jid'));
|
jparams='jparams'!!left(symget('jid'));
|
||||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||||
run;
|
run;
|
||||||
%local joburi&jid;
|
%local jobuid&jid;
|
||||||
%let joburi&jid=0; /* used in next loop */
|
%let jobuid&jid=0; /* used in next loop */
|
||||||
%end;
|
%end;
|
||||||
%local concurrency completed;
|
%local concurrency completed;
|
||||||
%let concurrency=0;
|
%let concurrency=0;
|
||||||
@@ -13603,8 +13627,21 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
* now we can execute the jobs up to the maxconcurrency setting
|
* now we can execute the jobs up to the maxconcurrency setting
|
||||||
*/
|
*/
|
||||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
|
||||||
/* job has not been triggered and we have free slots */
|
/* check to see if the job finished in the previous round */
|
||||||
|
%if %sysfunc(exist(&outds))=1 %then %do;
|
||||||
|
%local jobcheck; %let jobcheck=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: jobcheck
|
||||||
|
from &outds where uuid="&&jobuid&jid";
|
||||||
|
%if &jobcheck>0 %then %do;
|
||||||
|
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
|
||||||
|
%let job&jid=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* check if job was triggered and if so, if we have enough slots to run */
|
||||||
|
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||||
%local jobname jobpath;
|
%local jobname jobpath;
|
||||||
%let jobname=%scan(&&job&jid,-1,/);
|
%let jobname=%scan(&&job&jid,-1,/);
|
||||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||||
@@ -13618,27 +13655,18 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
format jobparams $32767.;
|
format jobparams $32767.;
|
||||||
set &jdsapp(where=(method='GET' and rel='state'));
|
set &jdsapp(where=(method='GET' and rel='state'));
|
||||||
jobparams=symget("jparams&jid");
|
jobparams=symget("jparams&jid");
|
||||||
call symputx("joburi&jid",uri,'l');
|
/* uri here has the /state suffix */
|
||||||
|
uuid=scan(uri,-2,'/');
|
||||||
|
call symputx("jobuid&jid",uuid,'l');
|
||||||
run;
|
run;
|
||||||
proc append base=&jdsrunning data=&jdsapp;
|
proc append base=&jdsrunning data=&jdsapp;
|
||||||
run;
|
run;
|
||||||
%let concurrency=%eval(&concurrency+1);
|
%let concurrency=%eval(&concurrency+1);
|
||||||
%end;
|
%end;
|
||||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
|
||||||
/* check to see if the job has finished as was previously executed */
|
|
||||||
%local jobcheck; %let jobcheck=0;
|
|
||||||
proc sql noprint;
|
|
||||||
select count(*) into: jobcheck
|
|
||||||
from &outds where uri="&&joburi&jid";
|
|
||||||
%if &jobcheck>0 %then %do;
|
|
||||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
|
||||||
%let job&jid=0;
|
|
||||||
%end;
|
|
||||||
%end;
|
|
||||||
%end;
|
%end;
|
||||||
%if &jid=&jcnt %then %do;
|
%if &jid=&jcnt %then %do;
|
||||||
/* we are at the end of the loop - time to see which jobs have finished */
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
|
||||||
%local done;
|
%local done;
|
||||||
%let done=%mf_nobs(&jdswaitfor);
|
%let done=%mf_nobs(&jdswaitfor);
|
||||||
%if &done>0 %then %do;
|
%if &done>0 %then %do;
|
||||||
@@ -13647,13 +13675,14 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
data &jdsapp;
|
data &jdsapp;
|
||||||
set &jdswaitfor;
|
set &jdswaitfor;
|
||||||
flow_id=&&flow&fid;
|
flow_id=&&flow&fid;
|
||||||
|
uuid=scan(uri,-1,'/');
|
||||||
run;
|
run;
|
||||||
proc append base=&outds data=&jdsapp;
|
proc append base=&outds data=&jdsapp;
|
||||||
run;
|
run;
|
||||||
%end;
|
%end;
|
||||||
proc sql;
|
proc sql;
|
||||||
delete from &jdsrunning
|
delete from &jdsrunning
|
||||||
where uri in (select uri from &outds
|
where uuid in (select uuid from &outds
|
||||||
where state in ('canceled','completed','failed')
|
where state in ('canceled','completed','failed')
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -13667,6 +13696,9 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
/* back up and execute the next flow */
|
/* back up and execute the next flow */
|
||||||
%end;
|
%end;
|
||||||
|
|
||||||
|
%if &mdebug=1 %then %do;
|
||||||
|
%put _local_;
|
||||||
|
%end;
|
||||||
|
|
||||||
%mend;
|
%mend;
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -23,7 +23,7 @@
|
|||||||
%local dsid rc;
|
%local dsid rc;
|
||||||
%let dsid=%sysfunc(open(&libds,is));
|
%let dsid=%sysfunc(open(&libds,is));
|
||||||
%if &dsid = 0 %then %do;
|
%if &dsid = 0 %then %do;
|
||||||
%put WARNING: Cannot open %trim(&libds), system message below;
|
%put %str(WARN)ING: Cannot open %trim(&libds), system message below;
|
||||||
%put %sysfunc(sysmsg());
|
%put %sysfunc(sysmsg());
|
||||||
-1
|
-1
|
||||||
%end;
|
%end;
|
||||||
|
|||||||
@@ -3,8 +3,8 @@
|
|||||||
@brief retrieves a key value pair from a control dataset
|
@brief retrieves a key value pair from a control dataset
|
||||||
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
||||||
|
|
||||||
%mp_setkeyvalue(someindex,22,type=N)
|
%mp_setkeyvalue(someindex,22,type=N)
|
||||||
%put %mf_getkeyvalue(someindex)
|
%put %mf_getkeyvalue(someindex)
|
||||||
|
|
||||||
|
|
||||||
@param key Provide a key on which to perform the lookup
|
@param key Provide a key on which to perform the lookup
|
||||||
|
|||||||
@@ -3,8 +3,8 @@
|
|||||||
@brief Logs a key value pair a control dataset
|
@brief Logs a key value pair a control dataset
|
||||||
@details If the dataset does not exist, it is created. Usage:
|
@details If the dataset does not exist, it is created. Usage:
|
||||||
|
|
||||||
%mp_setkeyvalue(someindex,22,type=N)
|
%mp_setkeyvalue(someindex,22,type=N)
|
||||||
%mp_setkeyvalue(somenewindex,somevalue)
|
%mp_setkeyvalue(somenewindex,somevalue)
|
||||||
|
|
||||||
<h4> SAS Macros </h4>
|
<h4> SAS Macros </h4>
|
||||||
@li mf_existds.sas
|
@li mf_existds.sas
|
||||||
@@ -26,7 +26,7 @@
|
|||||||
|
|
||||||
%if not (%mf_existds(&libds)) %then %do;
|
%if not (%mf_existds(&libds)) %then %do;
|
||||||
data &libds (index=(key/unique));
|
data &libds (index=(key/unique));
|
||||||
length key $32 valc $256 valn 8 type $1;
|
length key $64 valc $2048 valn 8 type $1;
|
||||||
call missing(of _all_);
|
call missing(of _all_);
|
||||||
stop;
|
stop;
|
||||||
run;
|
run;
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
@details Expects oauth token in a global macro variable (default
|
@details Expects oauth token in a global macro variable (default
|
||||||
ACCESS_TOKEN).
|
ACCESS_TOKEN).
|
||||||
|
|
||||||
options mprint;
|
|
||||||
%mv_createfolder(path=/Public)
|
%mv_createfolder(path=/Public)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -46,7 +46,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -4,7 +4,6 @@
|
|||||||
@details If not running in Studo 5 +, will expect an oauth token in a global
|
@details If not running in Studo 5 +, will expect an oauth token in a global
|
||||||
macro variable (default ACCESS_TOKEN).
|
macro variable (default ACCESS_TOKEN).
|
||||||
|
|
||||||
options mprint;
|
|
||||||
%mv_createfolder(path=/Public/test/blah)
|
%mv_createfolder(path=/Public/test/blah)
|
||||||
%mv_deleteviyafolder(path=/Public/test)
|
%mv_deleteviyafolder(path=/Public/test)
|
||||||
|
|
||||||
|
|||||||
@@ -40,7 +40,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -52,7 +52,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -49,7 +49,6 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -49,7 +49,7 @@
|
|||||||
|
|
||||||
%mv_getjoblog(uri=&uri,outref=mylog)
|
%mv_getjoblog(uri=&uri,outref=mylog)
|
||||||
|
|
||||||
This macro is used by the mv_jobwaitfor macro, which is generally a more
|
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
|
||||||
convenient way to wait for the job to finish before fetching the log.
|
convenient way to wait for the job to finish before fetching the log.
|
||||||
|
|
||||||
|
|
||||||
@@ -93,7 +93,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -245,7 +245,7 @@ data _null_;
|
|||||||
end;
|
end;
|
||||||
input;
|
input;
|
||||||
put _infile_;
|
put _infile_;
|
||||||
%if &mdebug=0 %then %do;
|
%if &mdebug=1 %then %do;
|
||||||
putlog _infile_;
|
putlog _infile_;
|
||||||
%end;
|
%end;
|
||||||
if last then do;
|
if last then do;
|
||||||
|
|||||||
@@ -101,7 +101,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -9,18 +9,19 @@
|
|||||||
|
|
||||||
## Input table (minimum variables needed)
|
## Input table (minimum variables needed)
|
||||||
|
|
||||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
|
||||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
|
||||||
blank, will default to `SAS Job Execution compute context`.
|
|
||||||
@li _PROGRAM - Provides the path to the job itself
|
@li _PROGRAM - Provides the path to the job itself
|
||||||
|
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
|
||||||
|
optional, will default to 0 if not provided.
|
||||||
|
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||||
|
blank (or not provided), will default to `SAS Job Execution compute context`.
|
||||||
|
|
||||||
Any additional variables provided in this table are converted into macro
|
Any additional variables provided in this table are converted into macro
|
||||||
variables and passed into the relevant job.
|
variables and passed into the relevant job.
|
||||||
|
|
||||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
|_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
|/Public/jobs/somejob1|0|SAS Job Execution compute context|
|
||||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
|/Public/jobs/somejob2|0|SAS Job Execution compute context|
|
||||||
|
|
||||||
## Output table (minimum variables produced)
|
## Output table (minimum variables produced)
|
||||||
|
|
||||||
@@ -81,7 +82,16 @@
|
|||||||
|
|
||||||
Trigger the flow
|
Trigger the flow
|
||||||
|
|
||||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
%mv_jobflow(inds=work.inputjobs
|
||||||
|
,maxconcurrency=4
|
||||||
|
,outds=work.results
|
||||||
|
,outref=myjoblog
|
||||||
|
)
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
infile myjoblog;
|
||||||
|
input; put _infile_;
|
||||||
|
run;
|
||||||
|
|
||||||
|
|
||||||
@param [in] access_token_var= The global macro variable to contain the access token
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
@@ -93,7 +103,9 @@
|
|||||||
@li sas_services - will use oauth_bearer=sas_services
|
@li sas_services - will use oauth_bearer=sas_services
|
||||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||||
|
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||||
@param [out] outds= The output dataset containing the results
|
@param [out] outds= The output dataset containing the results
|
||||||
|
@param [out] outref= The output fileref to which to append the log file(s).
|
||||||
|
|
||||||
@version VIYA V.03.05
|
@version VIYA V.03.05
|
||||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
@@ -113,6 +125,8 @@
|
|||||||
,maxconcurrency=8
|
,maxconcurrency=8
|
||||||
,access_token_var=ACCESS_TOKEN
|
,access_token_var=ACCESS_TOKEN
|
||||||
,grant_type=sas_services
|
,grant_type=sas_services
|
||||||
|
,outref=0
|
||||||
|
,mdebug=0
|
||||||
);
|
);
|
||||||
%local oauth_bearer;
|
%local oauth_bearer;
|
||||||
%if &grant_type=detect %then %do;
|
%if &grant_type=detect %then %do;
|
||||||
@@ -135,16 +149,29 @@
|
|||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(Input dataset was not provided)
|
,msg=%str(Input dataset was not provided)
|
||||||
)
|
)
|
||||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
|
||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(The following columns must exist on input dataset &inds:
|
,msg=%str(The _PROGRAM column must exist on input dataset &inds)
|
||||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
|
||||||
)
|
)
|
||||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
/* set defaults if not provided */
|
||||||
|
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
|
||||||
|
data &inds;
|
||||||
|
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
|
||||||
|
length _CONTEXTNAME $128;
|
||||||
|
retain _CONTEXTNAME "SAS Job Execution compute context";
|
||||||
|
%end;
|
||||||
|
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
|
||||||
|
retain FLOW_ID 0;
|
||||||
|
%end;
|
||||||
|
set &inds;
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
|
||||||
%local missings;
|
%local missings;
|
||||||
proc sql noprint;
|
proc sql noprint;
|
||||||
select count(*) into: missings
|
select count(*) into: missings
|
||||||
@@ -222,8 +249,8 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
jparams='jparams'!!left(symget('jid'));
|
jparams='jparams'!!left(symget('jid'));
|
||||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||||
run;
|
run;
|
||||||
%local joburi&jid;
|
%local jobuid&jid;
|
||||||
%let joburi&jid=0; /* used in next loop */
|
%let jobuid&jid=0; /* used in next loop */
|
||||||
%end;
|
%end;
|
||||||
%local concurrency completed;
|
%local concurrency completed;
|
||||||
%let concurrency=0;
|
%let concurrency=0;
|
||||||
@@ -234,8 +261,21 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
* now we can execute the jobs up to the maxconcurrency setting
|
* now we can execute the jobs up to the maxconcurrency setting
|
||||||
*/
|
*/
|
||||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
|
||||||
/* job has not been triggered and we have free slots */
|
/* check to see if the job finished in the previous round */
|
||||||
|
%if %sysfunc(exist(&outds))=1 %then %do;
|
||||||
|
%local jobcheck; %let jobcheck=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: jobcheck
|
||||||
|
from &outds where uuid="&&jobuid&jid";
|
||||||
|
%if &jobcheck>0 %then %do;
|
||||||
|
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
|
||||||
|
%let job&jid=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* check if job was triggered and if so, if we have enough slots to run */
|
||||||
|
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||||
%local jobname jobpath;
|
%local jobname jobpath;
|
||||||
%let jobname=%scan(&&job&jid,-1,/);
|
%let jobname=%scan(&&job&jid,-1,/);
|
||||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||||
@@ -249,27 +289,18 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
format jobparams $32767.;
|
format jobparams $32767.;
|
||||||
set &jdsapp(where=(method='GET' and rel='state'));
|
set &jdsapp(where=(method='GET' and rel='state'));
|
||||||
jobparams=symget("jparams&jid");
|
jobparams=symget("jparams&jid");
|
||||||
call symputx("joburi&jid",uri,'l');
|
/* uri here has the /state suffix */
|
||||||
|
uuid=scan(uri,-2,'/');
|
||||||
|
call symputx("jobuid&jid",uuid,'l');
|
||||||
run;
|
run;
|
||||||
proc append base=&jdsrunning data=&jdsapp;
|
proc append base=&jdsrunning data=&jdsapp;
|
||||||
run;
|
run;
|
||||||
%let concurrency=%eval(&concurrency+1);
|
%let concurrency=%eval(&concurrency+1);
|
||||||
%end;
|
%end;
|
||||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
|
||||||
/* check to see if the job has finished as was previously executed */
|
|
||||||
%local jobcheck; %let jobcheck=0;
|
|
||||||
proc sql noprint;
|
|
||||||
select count(*) into: jobcheck
|
|
||||||
from &outds where uri="&&joburi&jid";
|
|
||||||
%if &jobcheck>0 %then %do;
|
|
||||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
|
||||||
%let job&jid=0;
|
|
||||||
%end;
|
|
||||||
%end;
|
|
||||||
%end;
|
%end;
|
||||||
%if &jid=&jcnt %then %do;
|
%if &jid=&jcnt %then %do;
|
||||||
/* we are at the end of the loop - time to see which jobs have finished */
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
|
||||||
%local done;
|
%local done;
|
||||||
%let done=%mf_nobs(&jdswaitfor);
|
%let done=%mf_nobs(&jdswaitfor);
|
||||||
%if &done>0 %then %do;
|
%if &done>0 %then %do;
|
||||||
@@ -278,13 +309,14 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
data &jdsapp;
|
data &jdsapp;
|
||||||
set &jdswaitfor;
|
set &jdswaitfor;
|
||||||
flow_id=&&flow&fid;
|
flow_id=&&flow&fid;
|
||||||
|
uuid=scan(uri,-1,'/');
|
||||||
run;
|
run;
|
||||||
proc append base=&outds data=&jdsapp;
|
proc append base=&outds data=&jdsapp;
|
||||||
run;
|
run;
|
||||||
%end;
|
%end;
|
||||||
proc sql;
|
proc sql;
|
||||||
delete from &jdsrunning
|
delete from &jdsrunning
|
||||||
where uri in (select uri from &outds
|
where uuid in (select uuid from &outds
|
||||||
where state in ('canceled','completed','failed')
|
where state in ('canceled','completed','failed')
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -298,5 +330,8 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
/* back up and execute the next flow */
|
/* back up and execute the next flow */
|
||||||
%end;
|
%end;
|
||||||
|
|
||||||
|
%if &mdebug=1 %then %do;
|
||||||
|
%put _local_;
|
||||||
|
%end;
|
||||||
|
|
||||||
%mend;
|
%mend;
|
||||||
|
|||||||
Reference in New Issue
Block a user