From 03d9d805ffb54ccc1cbb75200fceec6926285fdd Mon Sep 17 00:00:00 2001 From: Allan Bowe Date: Sat, 16 Jan 2021 20:43:15 +0200 Subject: [PATCH] fix: adding support for jobparams in output table for mv_jobwaitfor --- all.sas | 89 +++++++++++++++++++++++++++++++++++------- viya/mv_jobwaitfor.sas | 7 +++- 2 files changed, 80 insertions(+), 16 deletions(-) diff --git a/all.sas b/all.sas index 14c7598..185911b 100644 --- a/all.sas +++ b/all.sas @@ -12984,6 +12984,8 @@ libname &libref; The input table is formed as per below. Each observation represents one job. Each variable is converted into a macro variable with the same name. + ## Input table (minimum variables needed) + | variable| description | |---|---|---| |FLOW_ID| Numeric value, provides sequential ordering capability| @@ -12991,6 +12993,14 @@ libname &libref; blank, will default to `SAS Job Execution compute context`.| |_PROGRAM|Provides the path to the job itself| + ## Output table (minimum variables produced) + + | variable| description | + |---|---|---| + |FLOW_ID| Numeric value, provides sequential ordering capability| + |_CONTEXTNAME|Dictates which context should be used to run the job. If + blank, will default to `SAS Job Execution compute context`.| + |_PROGRAM|Provides the path to the job itself| ## Example @@ -13010,6 +13020,7 @@ libname &libref; rand=ranuni(0)*¯ovar1; do x=1 to rand; y=rand*¯ovar2; + if y=100 then abort; output; end; run; @@ -13022,17 +13033,17 @@ libname &libref; data work.inputjobs; _contextName='SAS Job Execution compute context'; do flow_id=1 to 3; - do job=1 to 10; + do i=1 to 20; _program='/Public/temp/demo1'; - macrovar1=10*job; - macrovar2=4*job; + macrovar1=10*i; + macrovar2=4*i; output; + i+1; _program='/Public/temp/demo2'; - macrovar1=40*job; - macrovar2=44*job; + macrovar1=40*i; + macrovar2=44*i; output; end; - drop job; end; run; @@ -13056,10 +13067,13 @@ libname &libref; @author Allan Bowe, source: https://github.com/sasjs/core

SAS Macros

+ @li mf_nobs.sas @li mp_abort.sas @li mf_getplatform.sas @li mf_getuniquefileref.sas @li mf_existvarlist.sas + @li mv_jobwaitfor.sas + @li mv_jobexecute.sas **/ @@ -13077,7 +13091,7 @@ libname &libref; %let oauth_bearer=oauth_bearer=sas_services; %let &access_token_var=; %end; -%put &sysmacroname: grant_type=&grant_type; + %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password and &grant_type ne sas_services ) @@ -13109,6 +13123,16 @@ select count(*) into: missings ,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM) ) +%if %mf_nobs(&inds)=0 %then %do; + %put No observations in &inds! Leaving macro &sysmacroname; + %return; +%end; + +/* ensure output table is available */ +data &outds;run; +proc sql; +drop table &outds; + options noquotelenmax; %local base_uri; /* location of rest apis */ %let base_uri=%mf_getplatform(VIYARESTAPI); @@ -13163,13 +13187,15 @@ data;run;%let jdswaitfor=&syslast; data _null_; infile &jfref lrecl=32767; input; - call symputx("jparams&jid",substr(_infile_,3,length(_infile_)-4)); + jparams='jparams'!!left(symget('jid')); + call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); run; %local joburi&jid; %let joburi&jid=0; /* used in next loop */ %end; - %local concurrency; + %local concurrency completed; %let concurrency=0; + %let completed=0; proc sql; drop table &jdsrunning; %do jid=1 %to &jcnt; /** @@ -13187,16 +13213,48 @@ data;run;%let jdswaitfor=&syslast; ,paramstring=%superq(jparams&jid) ,outds=&jdsapp ) - proc append base=&jdsrunning - data=&jdsapp(where=(method='GET' and rel='state')); + data &jdsapp; + format jobparams $32767.; + set &jdsapp(where=(method='GET' and rel='state')); + jobparams=symget("jparams&jid"); + call symputx("joburi&jid",uri,'l'); + run; + proc append base=&jdsrunning data=&jdsapp; run; %let concurrency=%eval(&concurrency+1); - %let joburi&jid=1; + %end; + %else %if %sysfunc(exist(&outds))=1 %then %do; + /* check to see if the job has finished as was previously executed */ + %local jobcheck; %let jobcheck=0; + proc sql noprint; + select count(*) into: jobcheck + from &outds where uri="&&joburi&jid"; + %if &jobcheck>0 %then %do; + %let job&jid=0; + %put &&job&jid in flow &fid completed!; + %end; %end; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) + %local done; + %let done=%mf_nobs(&jdswaitfor); + %if &done>0 %then %do; + %let completed=%eval(&completed+&done); + %let concurrency=%eval(&concurrency-&done); + data &jdsapp; + set &jdswaitfor; + flow_id=&&flow&fid; + run; + proc append base=&outds data=&jdsapp; + run; + %end; + /* loop again if jobs are left */ + %if &completed < &jcnt %then %do; + %let jid=0; + %put looping flow &fid again - &completed jobs completed, &concurrency jobs running; + %end; %end; %end; /* back up and execute the next flow */ @@ -13338,9 +13396,11 @@ options noquotelenmax; %let base_uri=%mf_getplatform(VIYARESTAPI); data _null_; + length jobparams $32767; set &inds end=last; call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('jobname',_n_),_program,'l'); + call symputx(cats('jobparams',_n_),jobparams,'l'); if last then call symputx('uricnt',_n_,'l'); run; @@ -13353,7 +13413,7 @@ run; %let fname0=%mf_getuniquefileref(); data &outds; - format _program uri $128. state $32. timestamp datetime19.; + format _program uri $128. state $32. timestamp datetime19. jobparams $32767.; stop; run; @@ -13387,7 +13447,8 @@ run; _program="&&jobname&i", uri="&&joburi&i", state="&status", - timestamp=datetime(); + timestamp=datetime(), + jobparams=symget("jobparams&i"); %let joburi&i=0; /* do not re-check */ %end; %else %if &status=idle or &status=pending or &status=running %then %do; diff --git a/viya/mv_jobwaitfor.sas b/viya/mv_jobwaitfor.sas index 8c353b9..a01186a 100644 --- a/viya/mv_jobwaitfor.sas +++ b/viya/mv_jobwaitfor.sas @@ -132,9 +132,11 @@ options noquotelenmax; %let base_uri=%mf_getplatform(VIYARESTAPI); data _null_; + length jobparams $32767; set &inds end=last; call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('jobname',_n_),_program,'l'); + call symputx(cats('jobparams',_n_),jobparams,'l'); if last then call symputx('uricnt',_n_,'l'); run; @@ -147,7 +149,7 @@ run; %let fname0=%mf_getuniquefileref(); data &outds; - format _program uri $128. state $32. timestamp datetime19.; + format _program uri $128. state $32. timestamp datetime19. jobparams $32767.; stop; run; @@ -181,7 +183,8 @@ run; _program="&&jobname&i", uri="&&joburi&i", state="&status", - timestamp=datetime(); + timestamp=datetime(), + jobparams=symget("jobparams&i"); %let joburi&i=0; /* do not re-check */ %end; %else %if &status=idle or &status=pending or &status=running %then %do;