1
0
mirror of https://github.com/sasjs/core.git synced 2025-12-31 14:40:05 +00:00

feat: early exit, with syscc, when submitted jobs fail within a flow

This commit is contained in:
Trevor Moody
2021-05-12 12:06:02 +01:00
parent 6fc8408988
commit 90d69af7ee

View File

@@ -13,7 +13,7 @@
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is @li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided. optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If @li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`. blank (or not provided), it defaults to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
@@ -97,17 +97,21 @@
run; run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the
access token
@param [in] grant_type= valid values: @param [in] grant_type= valid values:
@li password @li password
@li authorization_code @li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if @li detect - will check if access_token exists, if not will use
a SASStudioV session else authorization_code. Default option. sas_services if a SASStudioV session else authorization_code.
Default option.
@li sas_services - will use oauth_bearer=sas_services @li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. parameters
@param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete @param [in] maxconcurrency= The max number of parallel jobs to run.
succcessfully Default=8.
@param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not
complete succcessfully
@param [in] mdebug= set to 1 to enable DEBUG messages @param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s). @param [out] outref= The output fileref to which to append the log file(s).
@@ -228,115 +232,143 @@ data;run;%let jdswaitfor=&syslast;
/* start loop */ /* start loop */
%do fid=1 %to &flowcnt; %do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local jobuid&jid;
%let jobuid&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
/* check to see if the job finished in the previous round */ %if not ( &raise_err and &syscc ) %then %do;
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0; %put preparing job attributes for flow &&flow&fid;
proc sql noprint; %local jds jcnt;
select count(*) into: jobcheck data &jds(drop=_contextName _program);
from &outds where uuid="&&jobuid&jid"; set &inds(where=(flow_id=&&flow&fid));
%if &jobcheck>0 %then %do; if _contextName='' then _contextName="SAS Job Execution compute context";
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!; call symputx(cats('job',_n_),_program,'l');
%let job&jid=0; call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local jobuid&jid;
%let jobuid&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
/* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and, if
so, if we have enough slots to run? */
%if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
/* But only start if no issues detected so far */
%if not ( &raise_err and &syscc ) %then %do;
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=
%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
/* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
/* sleep one second after every request to smooth the impact */
data _null_;
call sleep(1,1);
run;
%end;
%else %do; /* Job was skipped due to problems */
%put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
%let completed = %eval(&completed+1);
%let job&jid=0; /* Indicate job has finished */
%end;
%end; %end;
%end; %end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - check which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
,raise_err=&raise_err,mdebug=&mdebug)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uuid in (select uuid from &outds
where state in ('canceled','completed','failed')
);
/* check if job was triggered and if so, if we have enough slots to run */ /* loop again if jobs are left */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do; %if &completed < &jcnt %then %do;
%local jobname jobpath; %let jid=0;
%let jobname=%scan(&&job&jid,-1,/); %put looping flow &fid again;
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); %put &completed of &jcnt jobs completed, &concurrency jobs running;
%put executing &jobpath/&jobname with paramstring &&jparams&jid; %end;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
/* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
/* sleep one second after every request to smooth the impact */
data _null_;
call sleep(1,1);
run;
%end; %end;
%end; %end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
,raise_err=&raise_err)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uuid in (select uuid from &outds
where state in ('canceled','completed','failed')
);
/* loop again if jobs are left */ %end;
%if &completed < &jcnt %then %do; %else %do;
%let jid=0;
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running; %put Flow &&flow&fid skipped due to SYSCC (&syscc);
%end;
%end;
%end; %end;
/* back up and execute the next flow */ /* back up and execute the next flow */
%end; %end;
@@ -345,4 +377,4 @@ data;run;%let jdswaitfor=&syslast;
%put _local_; %put _local_;
%end; %end;
%mend; %mend mv_jobflow;