From 9783edd0e31e8f0861f9aaa16ca5eeb88eb35041 Mon Sep 17 00:00:00 2001 From: Allan Bowe Date: Fri, 29 Jan 2021 12:56:12 +0100 Subject: [PATCH] feat: adding outref option to mv_jobflow so that logs of submitted jobs can be captured. Also making the context name and flow id optional in the input table, for ease of use. --- all.sas | 47 ++++++++++++++++++++++++++++++++++----------- viya/mv_jobflow.sas | 47 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 72 insertions(+), 22 deletions(-) diff --git a/all.sas b/all.sas index 663e94e..b4aabc3 100644 --- a/all.sas +++ b/all.sas @@ -13378,18 +13378,19 @@ libname &libref; ## Input table (minimum variables needed) - @li FLOW_ID - Numeric value, provides sequential ordering capability - @li _CONTEXTNAME - Dictates which context should be used to run the job. If - blank, will default to `SAS Job Execution compute context`. @li _PROGRAM - Provides the path to the job itself + @li FLOW_ID - Numeric value, provides sequential ordering capability. Is + optional, will default to 0 if not provided. + @li _CONTEXTNAME - Dictates which context should be used to run the job. If + blank (or not provided), will default to `SAS Job Execution compute context`. Any additional variables provided in this table are converted into macro variables and passed into the relevant job. - | FLOW_ID| _CONTEXTNAME |_PROGRAM| + |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) | |---|---|---| - |0|SAS Job Execution compute context|/Public/jobs/somejob1| - |0|SAS Job Execution compute context|/Public/jobs/somejob2| + |/Public/jobs/somejob1|0|SAS Job Execution compute context| + |/Public/jobs/somejob2|0|SAS Job Execution compute context| ## Output table (minimum variables produced) @@ -13450,7 +13451,16 @@ libname &libref; Trigger the flow - %mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) + %mv_jobflow(inds=work.inputjobs + ,maxconcurrency=4 + ,outds=work.results + ,outref=myjoblog + ) + + data _null_; + infile myjoblog; + input; put _infile_; + run; @param [in] access_token_var= The global macro variable to contain the access token @@ -13463,6 +13473,7 @@ libname &libref; @param [in] inds= The input dataset containing a list of jobs and parameters @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [out] outds= The output dataset containing the results + @param [out] outref= The output fileref to which to append the log file(s). @version VIYA V.03.05 @author Allan Bowe, source: https://github.com/sasjs/core @@ -13482,6 +13493,7 @@ libname &libref; ,maxconcurrency=8 ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services + ,outref=0 ); %local oauth_bearer; %if &grant_type=detect %then %do; @@ -13504,16 +13516,29 @@ libname &libref; ,mac=&sysmacroname ,msg=%str(Input dataset was not provided) ) -%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) +%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0) ,mac=&sysmacroname - ,msg=%str(The following columns must exist on input dataset &inds: - _CONTEXTNAME FLOW_ID _PROGRAM) + ,msg=%str(The _PROGRAM column must exist on input dataset &inds) ) %mp_abort(iftrue=(&maxconcurrency<1) ,mac=&sysmacroname ,msg=%str(The maxconcurrency variable should be a positive integer) ) +/* set defaults if not provided */ +%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do; + data &inds; + %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do; + length _CONTEXTNAME $128; + retain _CONTEXTNAME "SAS Job Execution compute context"; + %end; + %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do; + retain FLOW_ID 0; + %end; + set &inds; + run; +%end; + %local missings; proc sql noprint; select count(*) into: missings @@ -13638,7 +13663,7 @@ data;run;%let jdswaitfor=&syslast; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ - %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) + %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref) %local done; %let done=%mf_nobs(&jdswaitfor); %if &done>0 %then %do; diff --git a/viya/mv_jobflow.sas b/viya/mv_jobflow.sas index f283b7c..73c3f96 100644 --- a/viya/mv_jobflow.sas +++ b/viya/mv_jobflow.sas @@ -9,18 +9,19 @@ ## Input table (minimum variables needed) - @li FLOW_ID - Numeric value, provides sequential ordering capability - @li _CONTEXTNAME - Dictates which context should be used to run the job. If - blank, will default to `SAS Job Execution compute context`. @li _PROGRAM - Provides the path to the job itself + @li FLOW_ID - Numeric value, provides sequential ordering capability. Is + optional, will default to 0 if not provided. + @li _CONTEXTNAME - Dictates which context should be used to run the job. If + blank (or not provided), will default to `SAS Job Execution compute context`. Any additional variables provided in this table are converted into macro variables and passed into the relevant job. - | FLOW_ID| _CONTEXTNAME |_PROGRAM| + |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) | |---|---|---| - |0|SAS Job Execution compute context|/Public/jobs/somejob1| - |0|SAS Job Execution compute context|/Public/jobs/somejob2| + |/Public/jobs/somejob1|0|SAS Job Execution compute context| + |/Public/jobs/somejob2|0|SAS Job Execution compute context| ## Output table (minimum variables produced) @@ -81,7 +82,16 @@ Trigger the flow - %mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) + %mv_jobflow(inds=work.inputjobs + ,maxconcurrency=4 + ,outds=work.results + ,outref=myjoblog + ) + + data _null_; + infile myjoblog; + input; put _infile_; + run; @param [in] access_token_var= The global macro variable to contain the access token @@ -94,6 +104,7 @@ @param [in] inds= The input dataset containing a list of jobs and parameters @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [out] outds= The output dataset containing the results + @param [out] outref= The output fileref to which to append the log file(s). @version VIYA V.03.05 @author Allan Bowe, source: https://github.com/sasjs/core @@ -113,6 +124,7 @@ ,maxconcurrency=8 ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services + ,outref=0 ); %local oauth_bearer; %if &grant_type=detect %then %do; @@ -135,16 +147,29 @@ ,mac=&sysmacroname ,msg=%str(Input dataset was not provided) ) -%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) +%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0) ,mac=&sysmacroname - ,msg=%str(The following columns must exist on input dataset &inds: - _CONTEXTNAME FLOW_ID _PROGRAM) + ,msg=%str(The _PROGRAM column must exist on input dataset &inds) ) %mp_abort(iftrue=(&maxconcurrency<1) ,mac=&sysmacroname ,msg=%str(The maxconcurrency variable should be a positive integer) ) +/* set defaults if not provided */ +%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do; + data &inds; + %if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do; + length _CONTEXTNAME $128; + retain _CONTEXTNAME "SAS Job Execution compute context"; + %end; + %if %mf_existvarList(&inds,FLOW_ID)=0 %then %do; + retain FLOW_ID 0; + %end; + set &inds; + run; +%end; + %local missings; proc sql noprint; select count(*) into: missings @@ -269,7 +294,7 @@ data;run;%let jdswaitfor=&syslast; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ - %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) + %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref) %local done; %let done=%mf_nobs(&jdswaitfor); %if &done>0 %then %do;