diff --git a/tests/viya/mv_jobflow.test.1.sas b/tests/viya/mv_jobflow.test.1.sas new file mode 100644 index 0000000..56df6a3 --- /dev/null +++ b/tests/viya/mv_jobflow.test.1.sas @@ -0,0 +1,78 @@ +/** + + @file + @brief Testing mv_jobflow macro + @details One of the remote jobs aborts with syscc>0 - test to + make sure this comes back to the calling session +​ +

SAS Macros

+ @li mp_assert.sas + @li mv_createjob.sas + @li mv_jobflow.sas + +**/ + +/** + * Test Case 1 + */ + +filename testprog temp; +data _null_; + file testprog; + put '%put this is job: &_program;' + / '%put this was run in flow &flow_id;' + / 'data ;' + / ' rval=rand("uniform");' + / ' rand=rval*¯ovar1;' + / ' do x=1 to rand;' + / ' y=rand*¯ovar2;' + / ' if (rval>0.50) then abort;' + / ' else output;' + / ' end;' + / 'run;' + ; +run; + +%mv_createjob(path=/Public/temp,name=demo1,code=testprog) +%mv_createjob(path=/Public/temp,name=demo2,code=testprog) + +data work.inputjobs; + _contextName='SAS Job Execution compute context'; + do flow_id=1 to 2; + do i=1 to 4; + _program='/Public/temp/demo1'; + macrovar1=10*i; + macrovar2=4*i; + output; + i+1; + _program='/Public/temp/demo2'; + macrovar1=40*i; + macrovar2=44*i; + output; + end; + end; +run; + +* Trigger the flow ; + +%put NOTE: &=syscc; + +%mv_jobflow(inds=work.inputjobs + ,maxconcurrency=2 + ,outds=work.results + ,outref=myjoblog + ,raise_err=1 + ,mdebug=1 +) + +%put NOTE: &=syscc; + +data _null_; + infile myjoblog; + input; put _infile_; +run; + +%mp_assert( + iftrue=(&syscc ne 0), + desc=Check that non zero return code is returned if called job fails +) diff --git a/tests/viya/mv_jobflow.test.2.sas b/tests/viya/mv_jobflow.test.2.sas new file mode 100644 index 0000000..e94fe98 --- /dev/null +++ b/tests/viya/mv_jobflow.test.2.sas @@ -0,0 +1,74 @@ +/** + @file + @brief Testing mv_jobflow macro + @details All jobs complete successfully with syscc = 0 - test to + make sure this comes back to the calling session +​ +

SAS Macros

+ @li mp_assert.sas + @li mv_createjob.sas + @li mv_jobflow.sas +**/ + +/** + * Test Case 1 + */ +filename testprog temp; +data _null_; + file testprog; + put '%put this is job: &_program;' + / '%put this was run in flow &flow_id;' + / 'data ;' + / ' rval=rand("uniform");' + / ' rand=rval*¯ovar1;' + / ' do x=1 to rand;' + / ' y=rand*¯ovar2;' + / ' output;' + / ' end;' + / 'run;' + ; +run; + +%mv_createjob(path=/Public/temp,name=demo1,code=testprog) +%mv_createjob(path=/Public/temp,name=demo2,code=testprog) + +data work.inputjobs; + _contextName='SAS Job Execution compute context'; + do flow_id=1 to 2; + do i=1 to 4; + _program='/Public/temp/demo1'; + macrovar1=10*i; + macrovar2=4*i; + output; + i+1; + _program='/Public/temp/demo2'; + macrovar1=40*i; + macrovar2=44*i; + output; + end; + end; +run; + +* Trigger the flow ; + +%put NOTE: &=syscc; + +%mv_jobflow(inds=work.inputjobs + ,maxconcurrency=2 + ,outds=work.results + ,outref=myjoblog + ,raise_err=1 + ,mdebug=1 +) + +%put NOTE: &=syscc; + +data _null_; + infile myjoblog; + input; put _infile_; +run; + +%mp_assert( + iftrue=(&syscc eq 0), + desc=Check that a zero return code is returned if no called job fails +) diff --git a/viya/mv_jobflow.sas b/viya/mv_jobflow.sas index fab42bb..46237e8 100644 --- a/viya/mv_jobflow.sas +++ b/viya/mv_jobflow.sas @@ -97,8 +97,8 @@ run; - @param [in] access_token_var= The global macro variable to contain the access - token + @param [in] access_token_var= The global macro variable to contain the + access token @param [in] grant_type= valid values: @li password @li authorization_code @@ -237,123 +237,143 @@ data;run;%let jdswaitfor=&syslast; /* start loop */ %do fid=1 %to &flowcnt; - %put preparing job attributes for flow &&flow&fid; - %local jds jcnt; - data &jds(drop=_contextName _program); - set &inds(where=(flow_id=&&flow&fid)); - if _contextName='' then _contextName="SAS Job Execution compute context"; - call symputx(cats('job',_n_),_program,'l'); - call symputx(cats('context',_n_),_contextName,'l'); - call symputx('jcnt',_n_,'l'); - run; - %put exporting job variables in json format; - %do jid=1 %to &jcnt; - data &jjson; - set &jds; - if _n_=&jid then do; - output; - stop; - end; - run; - proc json out=&jfref; - export &jjson / nosastags fmtnumeric; - run; - data _null_; - infile &jfref lrecl=32767; - input; - jparams='jparams'!!left(symget('jid')); - call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); - run; - %local jobuid&jid; - %let jobuid&jid=0; /* used in next loop */ - %end; - %local concurrency completed; - %let concurrency=0; - %let completed=0; - proc sql; drop table &jdsrunning; - %do jid=1 %to &jcnt; - /** - * now we can execute the jobs up to the maxconcurrency setting - */ - %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ - /* check to see if the job finished in the previous round */ - %if %sysfunc(exist(&outds))=1 %then %do; - %local jobcheck; %let jobcheck=0; - proc sql noprint; - select count(*) into: jobcheck - from &outds where uuid="&&jobuid&jid"; - %if &jobcheck>0 %then %do; - %put &&job&jid in flow &fid with uid &&jobuid&jid completed!; - %let job&jid=0; + %if not ( &raise_err and &syscc ) %then %do; + + %put preparing job attributes for flow &&flow&fid; + %local jds jcnt; + data &jds(drop=_contextName _program); + set &inds(where=(flow_id=&&flow&fid)); + if _contextName='' then _contextName="SAS Job Execution compute context"; + call symputx(cats('job',_n_),_program,'l'); + call symputx(cats('context',_n_),_contextName,'l'); + call symputx('jcnt',_n_,'l'); + run; + %put exporting job variables in json format; + %do jid=1 %to &jcnt; + data &jjson; + set &jds; + if _n_=&jid then do; + output; + stop; + end; + run; + proc json out=&jfref; + export &jjson / nosastags fmtnumeric; + run; + data _null_; + infile &jfref lrecl=32767; + input; + jparams='jparams'!!left(symget('jid')); + call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); + run; + %local jobuid&jid; + %let jobuid&jid=0; /* used in next loop */ + %end; + %local concurrency completed; + %let concurrency=0; + %let completed=0; + proc sql; drop table &jdsrunning; + %do jid=1 %to &jcnt; + /** + * now we can execute the jobs up to the maxconcurrency setting + */ + %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ + + /* check to see if the job finished in the previous round */ + %if %sysfunc(exist(&outds))=1 %then %do; + %local jobcheck; %let jobcheck=0; + proc sql noprint; + select count(*) into: jobcheck + from &outds where uuid="&&jobuid&jid"; + %if &jobcheck>0 %then %do; + %put &&job&jid in flow &fid with uid &&jobuid&jid completed!; + %let job&jid=0; + %end; + %end; + + /* check if job was triggered and, if + so, if we have enough slots to run? */ + %if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do; + + /* But only start if no issues detected so far */ + %if not ( &raise_err and &syscc ) %then %do; + + %local jobname jobpath; + %let jobname=%scan(&&job&jid,-1,/); + %let jobpath= + %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); + + %put executing &jobpath/&jobname with paramstring &&jparams&jid; + %mv_jobexecute(path=&jobpath + ,name=&jobname + ,paramstring=%superq(jparams&jid) + ,outds=&jdsapp + ) + data &jdsapp; + format jobparams $32767.; + set &jdsapp(where=(method='GET' and rel='state')); + jobparams=symget("jparams&jid"); + /* uri here has the /state suffix */ + uuid=scan(uri,-2,'/'); + call symputx("jobuid&jid",uuid,'l'); + run; + proc append base=&jdsrunning data=&jdsapp; + run; + %let concurrency=%eval(&concurrency+1); + /* sleep one second after every request to smooth the impact */ + data _null_; + call sleep(1,1); + run; + + %end; + %else %do; /* Job was skipped due to problems */ + + %put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc); + %let completed = %eval(&completed+1); + %let job&jid=0; /* Indicate job has finished */ + + %end; + %end; %end; + %if &jid=&jcnt %then %do; + /* we are at the end of the loop - check which jobs have finished */ + %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref + ,raise_err=&raise_err,mdebug=&mdebug) + %local done; + %let done=%mf_nobs(&jdswaitfor); + %if &done>0 %then %do; + %let completed=%eval(&completed+&done); + %let concurrency=%eval(&concurrency-&done); + data &jdsapp; + set &jdswaitfor; + flow_id=&&flow&fid; + uuid=scan(uri,-1,'/'); + run; + proc append base=&outds data=&jdsapp; + run; + %end; + proc sql; + delete from &jdsrunning + where uuid in (select uuid from &outds + where state in ('canceled','completed','failed') + ); - /* check if job was triggered and if so, if we have enough slots to run */ - %if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do; - %local jobname jobpath; - %let jobname=%scan(&&job&jid,-1,/); - %let jobpath= - %substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); - %put executing &jobpath/&jobname with paramstring &&jparams&jid; - %mv_jobexecute(path=&jobpath - ,name=&jobname - ,paramstring=%superq(jparams&jid) - ,outds=&jdsapp - ,mdebug=&mdebug - ) - data &jdsapp; - format jobparams $32767.; - set &jdsapp(where=(method='GET' and rel='state')); - jobparams=symget("jparams&jid"); - /* uri here has the /state suffix */ - uuid=scan(uri,-2,'/'); - call symputx("jobuid&jid",uuid,'l'); - run; - proc append base=&jdsrunning data=&jdsapp; - run; - %let concurrency=%eval(&concurrency+1); - /* sleep one second after every request to smooth the impact */ - data _null_; - call sleep(1,1); - run; + /* loop again if jobs are left */ + %if &completed < &jcnt %then %do; + %let jid=0; + %put looping flow &fid again; + %put &completed of &jcnt jobs completed, &concurrency jobs running; + %end; %end; %end; - %if &jid=&jcnt %then %do; - /* we are at the end of the loop - time to see which jobs have finished */ - %mv_jobwaitfor(ANY - ,inds=&jdsrunning - ,outds=&jdswaitfor - ,outref=&outref - ,raise_err=&raise_err - ,mdebug=&mdebug - ) - %local done; - %let done=%mf_nobs(&jdswaitfor); - %if &done>0 %then %do; - %let completed=%eval(&completed+&done); - %let concurrency=%eval(&concurrency-&done); - data &jdsapp; - set &jdswaitfor; - flow_id=&&flow&fid; - uuid=scan(uri,-1,'/'); - run; - proc append base=&outds data=&jdsapp; - run; - %end; - proc sql; - delete from &jdsrunning - where uuid in (select uuid from &outds - where state in ('canceled','completed','failed') - ); - /* loop again if jobs are left */ - %if &completed < &jcnt %then %do; - %let jid=0; - %put looping flow &fid again - &completed of &jcnt jobs completed, - &concurrency jobs running; - %end; - %end; + %end; + %else %do; + + %put Flow &&flow&fid skipped due to SYSCC (&syscc); + %end; /* back up and execute the next flow */ %end;