1
0
mirror of https://github.com/sasjs/core.git synced 2026-01-05 00:20:05 +00:00

Compare commits

...

10 Commits

17 changed files with 797 additions and 96 deletions

434
all.sas
View File

@@ -229,6 +229,32 @@ options noquotelenmax;
%mend; %mend;
/** @endcond *//** /** @endcond *//**
@file
@brief Checks whether a fileref exists
@details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax!
@param fref the fileref to detect
@return output Returns 1 if found and 0 if not found. Note - it is possible
that the fileref is found, but the file does not (yet) exist. If you need
to test for this, you may as well use the fileref function directly.
@version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
**/
%macro mf_existfileref(fref
)/*/STORE SOURCE*/;
%if %sysfunc(fileref(&fref))=0 %then %do;
1
%end;
%else %do;
0
%end;
%mend;/**
@file @file
@brief Checks if a variable exists in a data set. @brief Checks if a variable exists in a data set.
@details Returns 0 if the variable does NOT exist, and return the position of @details Returns 0 if the variable does NOT exist, and return the position of
@@ -378,7 +404,7 @@ options noquotelenmax;
%local dsid rc; %local dsid rc;
%let dsid=%sysfunc(open(&libds,is)); %let dsid=%sysfunc(open(&libds,is));
%if &dsid = 0 %then %do; %if &dsid = 0 %then %do;
%put WARNING: Cannot open %trim(&libds), system message below; %put %str(WARN)ING: Cannot open %trim(&libds), system message below;
%put %sysfunc(sysmsg()); %put %sysfunc(sysmsg());
-1 -1
%end; %end;
@@ -582,6 +608,13 @@ options noquotelenmax;
which returns: which returns:
> 'blah','blah','blah' > 'blah','blah','blah'
Alternatively:
%put %mf_getquotedstr(these words are double quoted,quote=D)
for:
> "these","words","are","double","quoted"
@param in_str the unquoted, spaced delimited string to transform @param in_str the unquoted, spaced delimited string to transform
@param dlm= the delimeter to be applied to the output (default comma) @param dlm= the delimeter to be applied to the output (default comma)
@param indlm= the delimeter used for the input (default is space) @param indlm= the delimeter used for the input (default is space)
@@ -4538,7 +4571,7 @@ proc sql
%if not (%mf_existds(&libds)) %then %do; %if not (%mf_existds(&libds)) %then %do;
data &libds (index=(key/unique)); data &libds (index=(key/unique));
length key $32 valc $256 valn 8 type $1; length key $64 valc $2048 valn 8 type $1;
call missing(of _all_); call missing(of _all_);
stop; stop;
run; run;
@@ -4782,6 +4815,7 @@ proc sql
%local lib dir ds1 ds2 ds3 start_tm i; %local lib dir ds1 ds2 ds3 start_tm i;
%let start_tm=%sysfunc(datetime()); %let start_tm=%sysfunc(datetime());
%let duration=%sysevalf(&duration);
/* create a temporary library in WORK */ /* create a temporary library in WORK */
%let lib=%mf_getuniquelibref(); %let lib=%mf_getuniquelibref();
@@ -4809,7 +4843,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
proc summary ; proc summary ;
class randnum0 randnum1; class randnum0 randnum1;
output out=&lib..&ds2; output out=&lib..&ds2;
run; run;quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate; %if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* add more data */ /* add more data */
@@ -4818,6 +4852,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
select *, ranuni(0)*10 as randnum2 select *, ranuni(0)*10 as randnum2
from &lib..&ds1 from &lib..&ds1
order by randnum1; order by randnum1;
quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate; %if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc sort data=&lib..&ds3; proc sort data=&lib..&ds3;
@@ -10624,7 +10659,6 @@ run;
@details Expects oauth token in a global macro variable (default @details Expects oauth token in a global macro variable (default
ACCESS_TOKEN). ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public) %mv_createfolder(path=/Public)
@@ -11474,7 +11508,7 @@ run;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -11729,7 +11763,6 @@ libname &libref1a clear;
@details If not running in Studo 5 +, will expect an oauth token in a global @details If not running in Studo 5 +, will expect an oauth token in a global
macro variable (default ACCESS_TOKEN). macro variable (default ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public/test/blah) %mv_createfolder(path=/Public/test/blah)
%mv_deleteviyafolder(path=/Public/test) %mv_deleteviyafolder(path=/Public/test)
@@ -12059,7 +12092,7 @@ libname &libref1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12294,7 +12327,7 @@ filename &fname1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12387,7 +12420,6 @@ libname &libref1 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -12452,7 +12484,7 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
@@ -12485,7 +12517,275 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=1 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;
/** /**
@file @file
@brief Extract the status from a running SAS Viya job @brief Extract the status from a running SAS Viya job
@@ -12589,7 +12889,7 @@ filename &fname2 clear;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -13075,18 +13375,19 @@ libname &libref;
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -13099,6 +13400,9 @@ libname &libref;
![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png) ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
To avoid hammering the box with many hits in rapid succession, a one
second pause is made between every request.
## Example ## Example
@@ -13147,7 +13451,16 @@ libname &libref;
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -13159,7 +13472,9 @@ libname &libref;
@li sas_services - will use oauth_bearer=sas_services @li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13179,6 +13494,8 @@ libname &libref;
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
,mdebug=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13201,16 +13518,29 @@ libname &libref;
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -13288,8 +13618,8 @@ data;run;%let jdswaitfor=&syslast;
jparams='jparams'!!left(symget('jid')); jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run; run;
%local joburi&jid; %local jobuid&jid;
%let joburi&jid=0; /* used in next loop */ %let jobuid&jid=0; /* used in next loop */
%end; %end;
%local concurrency completed; %local concurrency completed;
%let concurrency=0; %let concurrency=0;
@@ -13300,8 +13630,21 @@ data;run;%let jdswaitfor=&syslast;
* now we can execute the jobs up to the maxconcurrency setting * now we can execute the jobs up to the maxconcurrency setting
*/ */
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */ /* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath; %local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/); %let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
@@ -13315,27 +13658,22 @@ data;run;%let jdswaitfor=&syslast;
format jobparams $32767.; format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state')); set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid"); jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l'); /* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run; run;
proc append base=&jdsrunning data=&jdsapp; proc append base=&jdsrunning data=&jdsapp;
run; run;
%let concurrency=%eval(&concurrency+1); %let concurrency=%eval(&concurrency+1);
%end; /* sleep one second after every request to smooth the impact */
%else %if %sysfunc(exist(&outds))=1 %then %do; data _null_;
/* check to see if the job has finished as was previously executed */ call sleep(1,1);
%local jobcheck; %let jobcheck=0; run;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end; %end;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -13344,13 +13682,14 @@ data;run;%let jdswaitfor=&syslast;
data &jdsapp; data &jdsapp;
set &jdswaitfor; set &jdswaitfor;
flow_id=&&flow&fid; flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run; run;
proc append base=&outds data=&jdsapp; proc append base=&outds data=&jdsapp;
run; run;
%end; %end;
proc sql; proc sql;
delete from &jdsrunning delete from &jdsrunning
where uri in (select uri from &outds where uuid in (select uuid from &outds
where state in ('canceled','completed','failed') where state in ('canceled','completed','failed')
); );
@@ -13364,6 +13703,9 @@ data;run;%let jdswaitfor=&syslast;
/* back up and execute the next flow */ /* back up and execute the next flow */
%end; %end;
%if &mdebug=1 %then %do;
%put _local_;
%end;
%mend; %mend;
/** /**
@@ -13440,7 +13782,7 @@ data;run;%let jdswaitfor=&syslast;
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13451,6 +13793,7 @@ data;run;%let jdswaitfor=&syslast;
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -13459,6 +13802,7 @@ data;run;%let jdswaitfor=&syslast;
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13502,7 +13846,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -13546,14 +13890,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;

27
base/mf_existfileref.sas Normal file
View File

@@ -0,0 +1,27 @@
/**
@file
@brief Checks whether a fileref exists
@details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax!
@param fref the fileref to detect
@return output Returns 1 if found and 0 if not found. Note - it is possible
that the fileref is found, but the file does not (yet) exist. If you need
to test for this, you may as well use the fileref function directly.
@version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
**/
%macro mf_existfileref(fref
)/*/STORE SOURCE*/;
%if %sysfunc(fileref(&fref))=0 %then %do;
1
%end;
%else %do;
0
%end;
%mend;

View File

@@ -23,7 +23,7 @@
%local dsid rc; %local dsid rc;
%let dsid=%sysfunc(open(&libds,is)); %let dsid=%sysfunc(open(&libds,is));
%if &dsid = 0 %then %do; %if &dsid = 0 %then %do;
%put WARNING: Cannot open %trim(&libds), system message below; %put %str(WARN)ING: Cannot open %trim(&libds), system message below;
%put %sysfunc(sysmsg()); %put %sysfunc(sysmsg());
-1 -1
%end; %end;

View File

@@ -8,6 +8,13 @@
which returns: which returns:
> 'blah','blah','blah' > 'blah','blah','blah'
Alternatively:
%put %mf_getquotedstr(these words are double quoted,quote=D)
for:
> "these","words","are","double","quoted"
@param in_str the unquoted, spaced delimited string to transform @param in_str the unquoted, spaced delimited string to transform
@param dlm= the delimeter to be applied to the output (default comma) @param dlm= the delimeter to be applied to the output (default comma)
@param indlm= the delimeter used for the input (default is space) @param indlm= the delimeter used for the input (default is space)

View File

@@ -26,7 +26,7 @@
%if not (%mf_existds(&libds)) %then %do; %if not (%mf_existds(&libds)) %then %do;
data &libds (index=(key/unique)); data &libds (index=(key/unique));
length key $32 valc $256 valn 8 type $1; length key $64 valc $2048 valn 8 type $1;
call missing(of _all_); call missing(of _all_);
stop; stop;
run; run;

View File

@@ -26,6 +26,7 @@
%local lib dir ds1 ds2 ds3 start_tm i; %local lib dir ds1 ds2 ds3 start_tm i;
%let start_tm=%sysfunc(datetime()); %let start_tm=%sysfunc(datetime());
%let duration=%sysevalf(&duration);
/* create a temporary library in WORK */ /* create a temporary library in WORK */
%let lib=%mf_getuniquelibref(); %let lib=%mf_getuniquelibref();
@@ -53,7 +54,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
proc summary ; proc summary ;
class randnum0 randnum1; class randnum0 randnum1;
output out=&lib..&ds2; output out=&lib..&ds2;
run; run;quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate; %if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* add more data */ /* add more data */
@@ -62,6 +63,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
select *, ranuni(0)*10 as randnum2 select *, ranuni(0)*10 as randnum2
from &lib..&ds1 from &lib..&ds1
order by randnum1; order by randnum1;
quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate; %if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc sort data=&lib..&ds3; proc sort data=&lib..&ds3;

View File

@@ -4,7 +4,6 @@
@details Expects oauth token in a global macro variable (default @details Expects oauth token in a global macro variable (default
ACCESS_TOKEN). ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public) %mv_createfolder(path=/Public)

View File

@@ -46,7 +46,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -4,7 +4,6 @@
@details If not running in Studo 5 +, will expect an oauth token in a global @details If not running in Studo 5 +, will expect an oauth token in a global
macro variable (default ACCESS_TOKEN). macro variable (default ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public/test/blah) %mv_createfolder(path=/Public/test/blah)
%mv_deleteviyafolder(path=/Public/test) %mv_deleteviyafolder(path=/Public/test)

View File

@@ -40,7 +40,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -52,7 +52,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -49,7 +49,6 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -114,7 +113,7 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
@@ -147,4 +146,5 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;

267
viya/mv_getjoblog.sas Normal file
View File

@@ -0,0 +1,267 @@
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=1 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;

View File

@@ -101,7 +101,7 @@
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )

View File

@@ -9,18 +9,19 @@
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -33,6 +34,9 @@
![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png) ![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
To avoid hammering the box with many hits in rapid succession, a one
second pause is made between every request.
## Example ## Example
@@ -81,7 +85,16 @@
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -93,7 +106,9 @@
@li sas_services - will use oauth_bearer=sas_services @li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -113,6 +128,8 @@
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
,mdebug=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -135,16 +152,29 @@
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -222,8 +252,8 @@ data;run;%let jdswaitfor=&syslast;
jparams='jparams'!!left(symget('jid')); jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4)); call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run; run;
%local joburi&jid; %local jobuid&jid;
%let joburi&jid=0; /* used in next loop */ %let jobuid&jid=0; /* used in next loop */
%end; %end;
%local concurrency completed; %local concurrency completed;
%let concurrency=0; %let concurrency=0;
@@ -234,8 +264,21 @@ data;run;%let jdswaitfor=&syslast;
* now we can execute the jobs up to the maxconcurrency setting * now we can execute the jobs up to the maxconcurrency setting
*/ */
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */ %if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */ /* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath; %local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/); %let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1); %let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
@@ -249,27 +292,22 @@ data;run;%let jdswaitfor=&syslast;
format jobparams $32767.; format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state')); set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid"); jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l'); /* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run; run;
proc append base=&jdsrunning data=&jdsapp; proc append base=&jdsrunning data=&jdsapp;
run; run;
%let concurrency=%eval(&concurrency+1); %let concurrency=%eval(&concurrency+1);
%end; /* sleep one second after every request to smooth the impact */
%else %if %sysfunc(exist(&outds))=1 %then %do; data _null_;
/* check to see if the job has finished as was previously executed */ call sleep(1,1);
%local jobcheck; %let jobcheck=0; run;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end; %end;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -278,13 +316,14 @@ data;run;%let jdswaitfor=&syslast;
data &jdsapp; data &jdsapp;
set &jdswaitfor; set &jdswaitfor;
flow_id=&&flow&fid; flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run; run;
proc append base=&outds data=&jdsapp; proc append base=&outds data=&jdsapp;
run; run;
%end; %end;
proc sql; proc sql;
delete from &jdsrunning delete from &jdsrunning
where uri in (select uri from &outds where uuid in (select uuid from &outds
where state in ('canceled','completed','failed') where state in ('canceled','completed','failed')
); );
@@ -298,5 +337,8 @@ data;run;%let jdswaitfor=&syslast;
/* back up and execute the next flow */ /* back up and execute the next flow */
%end; %end;
%if &mdebug=1 %then %do;
%put _local_;
%end;
%mend; %mend;

View File

@@ -72,7 +72,7 @@
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -83,6 +83,7 @@
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -91,6 +92,7 @@
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -134,7 +136,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -178,14 +180,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;