mirror of
https://github.com/sasjs/core.git
synced 2026-01-19 14:30:05 +00:00
fix: adding ACTION parameter to mv_jobwaitfor - can now wait for ANY or ALL jobs to finish
This commit is contained in:
161
all.sas
161
all.sas
@@ -12986,7 +12986,7 @@ libname &libref;
|
|||||||
|
|
||||||
| variable| description |
|
| variable| description |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
|FLOW_ID| Provides the sequential ordering capability|
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
blank, will default to `SAS Job Execution compute context`.|
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|_PROGRAM|Provides the path to the job itself|
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
@@ -13023,15 +13023,16 @@ libname &libref;
|
|||||||
_contextName='SAS Job Execution compute context';
|
_contextName='SAS Job Execution compute context';
|
||||||
do flow_id=1 to 3;
|
do flow_id=1 to 3;
|
||||||
do job=1 to 10;
|
do job=1 to 10;
|
||||||
_program='/Public/temp/name/demo1';
|
_program='/Public/temp/demo1';
|
||||||
macrovar1=10*job;
|
macrovar1=10*job;
|
||||||
macrovar2=4*job;
|
macrovar2=4*job;
|
||||||
output;
|
output;
|
||||||
_program='/Public/temp/name/demo2';
|
_program='/Public/temp/demo2';
|
||||||
macrovar1=40*job;
|
macrovar1=40*job;
|
||||||
macrovar2=44*job;
|
macrovar2=44*job;
|
||||||
output;
|
output;
|
||||||
end;
|
end;
|
||||||
|
drop job;
|
||||||
end;
|
end;
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -13042,14 +13043,13 @@ libname &libref;
|
|||||||
|
|
||||||
@param [in] access_token_var= The global macro variable to contain the access token
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
@param [in] grant_type= valid values:
|
@param [in] grant_type= valid values:
|
||||||
* password
|
@li password
|
||||||
* authorization_code
|
@li authorization_code
|
||||||
* detect - will check if access_token exists, if not will use sas_services if
|
@li detect - will check if access_token exists, if not will use sas_services if
|
||||||
a SASStudioV session else authorization_code. Default option.
|
a SASStudioV session else authorization_code. Default option.
|
||||||
* sas_services - will use oauth_bearer=sas_services
|
@li sas_services - will use oauth_bearer=sas_services
|
||||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||||
@param [in] maxconcurrency= The max number of parallel jobs to run. If set to
|
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||||
0 (default) then there is no limit applied.
|
|
||||||
@param [out] outds= The output dataset containing the results
|
@param [out] outds= The output dataset containing the results
|
||||||
|
|
||||||
@version VIYA V.03.05
|
@version VIYA V.03.05
|
||||||
@@ -13064,7 +13064,7 @@ libname &libref;
|
|||||||
**/
|
**/
|
||||||
|
|
||||||
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
||||||
,maxconcurrency=0
|
,maxconcurrency=8
|
||||||
,access_token_var=ACCESS_TOKEN
|
,access_token_var=ACCESS_TOKEN
|
||||||
,grant_type=sas_services
|
,grant_type=sas_services
|
||||||
);
|
);
|
||||||
@@ -13089,10 +13089,24 @@ libname &libref;
|
|||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(Input dataset was not provided)
|
,msg=%str(Input dataset was not provided)
|
||||||
)
|
)
|
||||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOWID _PROGRAM)=0)
|
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||||
,mac=&sysmacroname
|
,mac=&sysmacroname
|
||||||
,msg=%str(The following columns must exist on the input dataset(&inds):
|
,msg=%str(The following columns must exist on input dataset &inds:
|
||||||
_CONTEXTNAME FLOWID _PROGRAM)
|
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||||
|
)
|
||||||
|
|
||||||
|
%local missings;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: missings
|
||||||
|
from &inds
|
||||||
|
where flow_id is null or _program is null;
|
||||||
|
%mp_abort(iftrue=(&missings>0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||||
)
|
)
|
||||||
|
|
||||||
options noquotelenmax;
|
options noquotelenmax;
|
||||||
@@ -13102,43 +13116,100 @@ options noquotelenmax;
|
|||||||
|
|
||||||
/* get flows */
|
/* get flows */
|
||||||
proc sort data=&inds;
|
proc sort data=&inds;
|
||||||
by flow_id;
|
by flow_id;
|
||||||
run;
|
run;
|
||||||
|
|
||||||
data _null_;
|
data _null_;
|
||||||
set &inds keep=flow_id end=last;
|
set &inds (keep=flow_id) end=last;
|
||||||
by flow_id;
|
by flow_id;
|
||||||
if last.flow_id then do;
|
if last.flow_id then do;
|
||||||
cnt+1;
|
cnt+1;
|
||||||
call symputx(cats('flow',_n_),flow_id,'l');
|
call symputx(cats('flow',cnt),flow_id,'l');
|
||||||
end;
|
end;
|
||||||
if last then call symputx('flowcnt',cnt,'l');
|
if last then call symputx('flowcnt',cnt,'l');
|
||||||
run;
|
run;
|
||||||
|
|
||||||
|
/* prepare temporary datasets and frefs */
|
||||||
|
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
|
||||||
|
data;run;%let jds=&syslast;
|
||||||
|
data;run;%let jjson=&syslast;
|
||||||
|
data;run;%let jdsapp=&syslast;
|
||||||
|
data;run;%let jdsrunning=&syslast;
|
||||||
|
data;run;%let jdswaitfor=&syslast;
|
||||||
|
%let jfref=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
/* start loop */
|
||||||
|
%do fid=1 %to &flowcnt;
|
||||||
|
%put preparing job attributes for flow &&flow&fid;
|
||||||
|
%local jds jcnt;
|
||||||
|
data &jds(drop=_contextName _program);
|
||||||
|
set &inds(where=(flow_id=&&flow&fid));
|
||||||
|
if _contextName='' then _contextName="SAS Job Execution compute context";
|
||||||
|
call symputx(cats('job',_n_),_program,'l');
|
||||||
|
call symputx(cats('context',_n_),_contextName,'l');
|
||||||
|
call symputx('jcnt',_n_,'l');
|
||||||
|
run;
|
||||||
|
%put exporting job variables in json format;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
data &jjson;
|
||||||
|
set &jds;
|
||||||
|
if _n_=&jid then do;
|
||||||
|
output;
|
||||||
|
stop;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
proc json out=&jfref;
|
||||||
|
export &jjson / nosastags fmtnumeric;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
infile &jfref lrecl=32767;
|
||||||
|
input;
|
||||||
|
call symputx("jparams&jid",substr(_infile_,3,length(_infile_)-4));
|
||||||
|
run;
|
||||||
|
%local joburi&jid;
|
||||||
|
%let joburi&jid=0; /* used in next loop */
|
||||||
|
%end;
|
||||||
|
%local concurrency;
|
||||||
|
%let concurrency=0;
|
||||||
|
proc sql; drop table &jdsrunning;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
/**
|
||||||
|
* now we can execute the jobs up to the maxconcurrency setting
|
||||||
|
*/
|
||||||
|
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||||
|
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||||
|
/* job has not been triggered and we have free slots */
|
||||||
|
%local jobname jobpath;
|
||||||
|
%let jobname=%scan(&&job&jid,-1,/);
|
||||||
|
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||||
|
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
|
||||||
|
%mv_jobexecute(path=&jobpath
|
||||||
|
,name=&jobname
|
||||||
|
,paramstring=%superq(jparams&jid)
|
||||||
|
,outds=&jdsapp
|
||||||
|
)
|
||||||
|
proc append base=&jdsrunning
|
||||||
|
data=&jdsapp(where=(method='GET' and rel='state'));
|
||||||
|
run;
|
||||||
|
%let concurrency=%eval(&concurrency+1);
|
||||||
|
%let joburi&jid=1;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%if &jid=&jcnt %then %do;
|
||||||
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
/* back up and execute the next flow */
|
||||||
|
%end;
|
||||||
|
|
||||||
|
|
||||||
%inc "&fpath3..lua";
|
|
||||||
/* export to desired destination */
|
|
||||||
data _null_;
|
|
||||||
%if &outref=0 %then %do;
|
|
||||||
file "&outfile" lrecl=32767;
|
|
||||||
%end;
|
|
||||||
%else %do;
|
|
||||||
file &outref;
|
|
||||||
%end;
|
|
||||||
infile &fname2;
|
|
||||||
input;
|
|
||||||
put _infile_;
|
|
||||||
run;
|
|
||||||
filename &fname1 clear;
|
|
||||||
filename &fname2 clear;
|
|
||||||
%mend;
|
%mend;
|
||||||
/**
|
/**
|
||||||
@file
|
@file
|
||||||
@brief Takes a dataset of running jobs and waits for them to complete
|
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||||
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all
|
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||||
completed. Completion is determined by reference to the returned _state_, as
|
jobs are completed. Completion is determined by reference to the returned
|
||||||
per the following table:
|
_state_, as per the following table:
|
||||||
|
|
||||||
| state | Wait? | Notes|
|
| state | Wait? | Notes|
|
||||||
|-----------|-------|------|
|
|-----------|-------|------|
|
||||||
@@ -13184,8 +13255,7 @@ filename &fname2 clear;
|
|||||||
where method='GET' and rel='state';
|
where method='GET' and rel='state';
|
||||||
run;
|
run;
|
||||||
|
|
||||||
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates)
|
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||||
|
|
||||||
|
|
||||||
Delete the job:
|
Delete the job:
|
||||||
|
|
||||||
@@ -13200,6 +13270,8 @@ filename &fname2 clear;
|
|||||||
a SASStudioV session else authorization_code. Default option.
|
a SASStudioV session else authorization_code. Default option.
|
||||||
- sas_services - will use oauth_bearer=sas_services
|
- sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||||
|
completes, processing will continue). Default=ALL.
|
||||||
@param [in] inds= The input dataset containing the list of job uris, in the
|
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||||
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||||
job name. The uri should be in a `uri` variable, and the job path/name
|
job name. The uri should be in a `uri` variable, and the job path/name
|
||||||
@@ -13220,8 +13292,8 @@ filename &fname2 clear;
|
|||||||
|
|
||||||
**/
|
**/
|
||||||
|
|
||||||
%macro mv_jobwaitfor(
|
%macro mv_jobwaitfor(action
|
||||||
access_token_var=ACCESS_TOKEN
|
,access_token_var=ACCESS_TOKEN
|
||||||
,grant_type=sas_services
|
,grant_type=sas_services
|
||||||
,inds=0
|
,inds=0
|
||||||
,outds=work.mv_jobwaitfor
|
,outds=work.mv_jobwaitfor
|
||||||
@@ -13272,6 +13344,11 @@ data _null_;
|
|||||||
if last then call symputx('uricnt',_n_,'l');
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
run;
|
run;
|
||||||
|
|
||||||
|
%local runcnt;
|
||||||
|
%if &action=ALL %then %let runcnt=&uricnt;
|
||||||
|
%else %if &action=ANY %then %let runcnt=1;
|
||||||
|
%else %let runcnt=&uricnt;
|
||||||
|
|
||||||
%local fname0 ;
|
%local fname0 ;
|
||||||
%let fname0=%mf_getuniquefileref();
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
@@ -13311,7 +13388,7 @@ run;
|
|||||||
uri="&&joburi&i",
|
uri="&&joburi&i",
|
||||||
state="&status",
|
state="&status",
|
||||||
timestamp=datetime();
|
timestamp=datetime();
|
||||||
%let joburi&i=0;
|
%let joburi&i=0; /* do not re-check */
|
||||||
%end;
|
%end;
|
||||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
data _null_;
|
data _null_;
|
||||||
@@ -13329,7 +13406,7 @@ run;
|
|||||||
%let goback=0;
|
%let goback=0;
|
||||||
proc sql noprint;
|
proc sql noprint;
|
||||||
select count(*) into:goback from &outds;
|
select count(*) into:goback from &outds;
|
||||||
%if &goback ne &uricnt %then %let i=0;
|
%if &goback lt &runcnt %then %let i=0;
|
||||||
%end;
|
%end;
|
||||||
%end;
|
%end;
|
||||||
|
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
/**
|
/**
|
||||||
@file
|
@file
|
||||||
@brief Takes a dataset of running jobs and waits for them to complete
|
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||||
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all
|
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||||
completed. Completion is determined by reference to the returned _state_, as
|
jobs are completed. Completion is determined by reference to the returned
|
||||||
per the following table:
|
_state_, as per the following table:
|
||||||
|
|
||||||
| state | Wait? | Notes|
|
| state | Wait? | Notes|
|
||||||
|-----------|-------|------|
|
|-----------|-------|------|
|
||||||
@@ -49,8 +49,7 @@
|
|||||||
where method='GET' and rel='state';
|
where method='GET' and rel='state';
|
||||||
run;
|
run;
|
||||||
|
|
||||||
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates)
|
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||||
|
|
||||||
|
|
||||||
Delete the job:
|
Delete the job:
|
||||||
|
|
||||||
@@ -65,6 +64,8 @@
|
|||||||
a SASStudioV session else authorization_code. Default option.
|
a SASStudioV session else authorization_code. Default option.
|
||||||
- sas_services - will use oauth_bearer=sas_services
|
- sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||||
|
completes, processing will continue). Default=ALL.
|
||||||
@param [in] inds= The input dataset containing the list of job uris, in the
|
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||||
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||||
job name. The uri should be in a `uri` variable, and the job path/name
|
job name. The uri should be in a `uri` variable, and the job path/name
|
||||||
@@ -85,8 +86,8 @@
|
|||||||
|
|
||||||
**/
|
**/
|
||||||
|
|
||||||
%macro mv_jobwaitfor(
|
%macro mv_jobwaitfor(action
|
||||||
access_token_var=ACCESS_TOKEN
|
,access_token_var=ACCESS_TOKEN
|
||||||
,grant_type=sas_services
|
,grant_type=sas_services
|
||||||
,inds=0
|
,inds=0
|
||||||
,outds=work.mv_jobwaitfor
|
,outds=work.mv_jobwaitfor
|
||||||
@@ -137,6 +138,11 @@ data _null_;
|
|||||||
if last then call symputx('uricnt',_n_,'l');
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
run;
|
run;
|
||||||
|
|
||||||
|
%local runcnt;
|
||||||
|
%if &action=ALL %then %let runcnt=&uricnt;
|
||||||
|
%else %if &action=ANY %then %let runcnt=1;
|
||||||
|
%else %let runcnt=&uricnt;
|
||||||
|
|
||||||
%local fname0 ;
|
%local fname0 ;
|
||||||
%let fname0=%mf_getuniquefileref();
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
@@ -176,7 +182,7 @@ run;
|
|||||||
uri="&&joburi&i",
|
uri="&&joburi&i",
|
||||||
state="&status",
|
state="&status",
|
||||||
timestamp=datetime();
|
timestamp=datetime();
|
||||||
%let joburi&i=0;
|
%let joburi&i=0; /* do not re-check */
|
||||||
%end;
|
%end;
|
||||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
data _null_;
|
data _null_;
|
||||||
@@ -194,7 +200,7 @@ run;
|
|||||||
%let goback=0;
|
%let goback=0;
|
||||||
proc sql noprint;
|
proc sql noprint;
|
||||||
select count(*) into:goback from &outds;
|
select count(*) into:goback from &outds;
|
||||||
%if &goback ne &uricnt %then %let i=0;
|
%if &goback lt &runcnt %then %let i=0;
|
||||||
%end;
|
%end;
|
||||||
%end;
|
%end;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user