From 94416028b7b46348d1c50cab8c291562683374a6 Mon Sep 17 00:00:00 2001 From: Allan Bowe Date: Sat, 16 Jan 2021 19:08:38 +0200 Subject: [PATCH] fix: adding ACTION parameter to mv_jobwaitfor - can now wait for ANY or ALL jobs to finish --- all.sas | 161 ++++++++++++++++++++++++++++++----------- viya/mv_jobwaitfor.sas | 26 ++++--- 2 files changed, 135 insertions(+), 52 deletions(-) diff --git a/all.sas b/all.sas index 26b73f3..14c7598 100644 --- a/all.sas +++ b/all.sas @@ -12986,7 +12986,7 @@ libname &libref; | variable| description | |---|---|---| - |FLOW_ID| Provides the sequential ordering capability| + |FLOW_ID| Numeric value, provides sequential ordering capability| |_CONTEXTNAME|Dictates which context should be used to run the job. If blank, will default to `SAS Job Execution compute context`.| |_PROGRAM|Provides the path to the job itself| @@ -13023,15 +13023,16 @@ libname &libref; _contextName='SAS Job Execution compute context'; do flow_id=1 to 3; do job=1 to 10; - _program='/Public/temp/name/demo1'; + _program='/Public/temp/demo1'; macrovar1=10*job; macrovar2=4*job; output; - _program='/Public/temp/name/demo2'; + _program='/Public/temp/demo2'; macrovar1=40*job; macrovar2=44*job; output; end; + drop job; end; run; @@ -13042,14 +13043,13 @@ libname &libref; @param [in] access_token_var= The global macro variable to contain the access token @param [in] grant_type= valid values: - * password - * authorization_code - * detect - will check if access_token exists, if not will use sas_services if + @li password + @li authorization_code + @li detect - will check if access_token exists, if not will use sas_services if a SASStudioV session else authorization_code. Default option. - * sas_services - will use oauth_bearer=sas_services + @li sas_services - will use oauth_bearer=sas_services @param [in] inds= The input dataset containing a list of jobs and parameters - @param [in] maxconcurrency= The max number of parallel jobs to run. If set to - 0 (default) then there is no limit applied. + @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [out] outds= The output dataset containing the results @version VIYA V.03.05 @@ -13064,7 +13064,7 @@ libname &libref; **/ %macro mv_jobflow(inds=0,outds=work.mv_jobflow - ,maxconcurrency=0 + ,maxconcurrency=8 ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ); @@ -13089,10 +13089,24 @@ libname &libref; ,mac=&sysmacroname ,msg=%str(Input dataset was not provided) ) -%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOWID _PROGRAM)=0) +%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) ,mac=&sysmacroname - ,msg=%str(The following columns must exist on the input dataset(&inds): - _CONTEXTNAME FLOWID _PROGRAM) + ,msg=%str(The following columns must exist on input dataset &inds: + _CONTEXTNAME FLOW_ID _PROGRAM) +) +%mp_abort(iftrue=(&maxconcurrency<1) + ,mac=&sysmacroname + ,msg=%str(The maxconcurrency variable should be a positive integer) +) + +%local missings; +proc sql noprint; +select count(*) into: missings + from &inds + where flow_id is null or _program is null; +%mp_abort(iftrue=(&missings>0) + ,mac=&sysmacroname + ,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM) ) options noquotelenmax; @@ -13102,43 +13116,100 @@ options noquotelenmax; /* get flows */ proc sort data=&inds; -by flow_id; + by flow_id; run; - data _null_; - set &inds keep=flow_id end=last; + set &inds (keep=flow_id) end=last; by flow_id; if last.flow_id then do; cnt+1; - call symputx(cats('flow',_n_),flow_id,'l'); + call symputx(cats('flow',cnt),flow_id,'l'); end; if last then call symputx('flowcnt',cnt,'l'); run; +/* prepare temporary datasets and frefs */ +%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref; +data;run;%let jds=&syslast; +data;run;%let jjson=&syslast; +data;run;%let jdsapp=&syslast; +data;run;%let jdsrunning=&syslast; +data;run;%let jdswaitfor=&syslast; +%let jfref=%mf_getuniquefileref(); + +/* start loop */ +%do fid=1 %to &flowcnt; + %put preparing job attributes for flow &&flow&fid; + %local jds jcnt; + data &jds(drop=_contextName _program); + set &inds(where=(flow_id=&&flow&fid)); + if _contextName='' then _contextName="SAS Job Execution compute context"; + call symputx(cats('job',_n_),_program,'l'); + call symputx(cats('context',_n_),_contextName,'l'); + call symputx('jcnt',_n_,'l'); + run; + %put exporting job variables in json format; + %do jid=1 %to &jcnt; + data &jjson; + set &jds; + if _n_=&jid then do; + output; + stop; + end; + run; + proc json out=&jfref; + export &jjson / nosastags fmtnumeric; + run; + data _null_; + infile &jfref lrecl=32767; + input; + call symputx("jparams&jid",substr(_infile_,3,length(_infile_)-4)); + run; + %local joburi&jid; + %let joburi&jid=0; /* used in next loop */ + %end; + %local concurrency; + %let concurrency=0; + proc sql; drop table &jdsrunning; + %do jid=1 %to &jcnt; + /** + * now we can execute the jobs up to the maxconcurrency setting + */ + %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ + %if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do; + /* job has not been triggered and we have free slots */ + %local jobname jobpath; + %let jobname=%scan(&&job&jid,-1,/); + %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); + %put executing &jobpath/&jobname with paramstring &&jparams&jid; + %mv_jobexecute(path=&jobpath + ,name=&jobname + ,paramstring=%superq(jparams&jid) + ,outds=&jdsapp + ) + proc append base=&jdsrunning + data=&jdsapp(where=(method='GET' and rel='state')); + run; + %let concurrency=%eval(&concurrency+1); + %let joburi&jid=1; + %end; + %end; + %if &jid=&jcnt %then %do; + /* we are at the end of the loop - time to see which jobs have finished */ + %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) + %end; + %end; + /* back up and execute the next flow */ +%end; -%inc "&fpath3..lua"; -/* export to desired destination */ -data _null_; - %if &outref=0 %then %do; - file "&outfile" lrecl=32767; - %end; - %else %do; - file &outref; - %end; - infile &fname2; - input; - put _infile_; -run; -filename &fname1 clear; -filename &fname2 clear; %mend; /** @file - @brief Takes a dataset of running jobs and waits for them to complete - @details Will poll `/jobs/{jobId}/state` at set intervals until they are all - completed. Completion is determined by reference to the returned _state_, as - per the following table: + @brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete + @details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL + jobs are completed. Completion is determined by reference to the returned + _state_, as per the following table: | state | Wait? | Notes| |-----------|-------|------| @@ -13184,8 +13255,7 @@ filename &fname2 clear; where method='GET' and rel='state'; run; - %mv_jobwaitfor(inds=work.jobs,outds=work.jobstates) - + %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates) Delete the job: @@ -13200,6 +13270,8 @@ filename &fname2 clear; a SASStudioV session else authorization_code. Default option. - sas_services - will use oauth_bearer=sas_services + @param [in] action=Either ALL (to wait for every job) or ANY (if one job + completes, processing will continue). Default=ALL. @param [in] inds= The input dataset containing the list of job uris, in the following format: `/jobExecution/jobs/&JOBID./state` and the corresponding job name. The uri should be in a `uri` variable, and the job path/name @@ -13220,8 +13292,8 @@ filename &fname2 clear; **/ -%macro mv_jobwaitfor( - access_token_var=ACCESS_TOKEN +%macro mv_jobwaitfor(action + ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,inds=0 ,outds=work.mv_jobwaitfor @@ -13272,6 +13344,11 @@ data _null_; if last then call symputx('uricnt',_n_,'l'); run; +%local runcnt; +%if &action=ALL %then %let runcnt=&uricnt; +%else %if &action=ANY %then %let runcnt=1; +%else %let runcnt=&uricnt; + %local fname0 ; %let fname0=%mf_getuniquefileref(); @@ -13311,7 +13388,7 @@ run; uri="&&joburi&i", state="&status", timestamp=datetime(); - %let joburi&i=0; + %let joburi&i=0; /* do not re-check */ %end; %else %if &status=idle or &status=pending or &status=running %then %do; data _null_; @@ -13329,7 +13406,7 @@ run; %let goback=0; proc sql noprint; select count(*) into:goback from &outds; - %if &goback ne &uricnt %then %let i=0; + %if &goback lt &runcnt %then %let i=0; %end; %end; diff --git a/viya/mv_jobwaitfor.sas b/viya/mv_jobwaitfor.sas index 8e3248f..8c353b9 100644 --- a/viya/mv_jobwaitfor.sas +++ b/viya/mv_jobwaitfor.sas @@ -1,9 +1,9 @@ /** @file - @brief Takes a dataset of running jobs and waits for them to complete - @details Will poll `/jobs/{jobId}/state` at set intervals until they are all - completed. Completion is determined by reference to the returned _state_, as - per the following table: + @brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete + @details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL + jobs are completed. Completion is determined by reference to the returned + _state_, as per the following table: | state | Wait? | Notes| |-----------|-------|------| @@ -49,8 +49,7 @@ where method='GET' and rel='state'; run; - %mv_jobwaitfor(inds=work.jobs,outds=work.jobstates) - + %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates) Delete the job: @@ -65,6 +64,8 @@ a SASStudioV session else authorization_code. Default option. - sas_services - will use oauth_bearer=sas_services + @param [in] action=Either ALL (to wait for every job) or ANY (if one job + completes, processing will continue). Default=ALL. @param [in] inds= The input dataset containing the list of job uris, in the following format: `/jobExecution/jobs/&JOBID./state` and the corresponding job name. The uri should be in a `uri` variable, and the job path/name @@ -85,8 +86,8 @@ **/ -%macro mv_jobwaitfor( - access_token_var=ACCESS_TOKEN +%macro mv_jobwaitfor(action + ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,inds=0 ,outds=work.mv_jobwaitfor @@ -137,6 +138,11 @@ data _null_; if last then call symputx('uricnt',_n_,'l'); run; +%local runcnt; +%if &action=ALL %then %let runcnt=&uricnt; +%else %if &action=ANY %then %let runcnt=1; +%else %let runcnt=&uricnt; + %local fname0 ; %let fname0=%mf_getuniquefileref(); @@ -176,7 +182,7 @@ run; uri="&&joburi&i", state="&status", timestamp=datetime(); - %let joburi&i=0; + %let joburi&i=0; /* do not re-check */ %end; %else %if &status=idle or &status=pending or &status=running %then %do; data _null_; @@ -194,7 +200,7 @@ run; %let goback=0; proc sql noprint; select count(*) into:goback from &outds; - %if &goback ne &uricnt %then %let i=0; + %if &goback lt &runcnt %then %let i=0; %end; %end;