1
0
mirror of https://github.com/sasjs/core.git synced 2026-01-10 02:40:05 +00:00

Compare commits

...

3 Commits

5 changed files with 455 additions and 39 deletions

161
all.sas
View File

@@ -12487,7 +12487,7 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
@@ -12520,6 +12520,7 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;
/** /**
@file @file
@@ -12572,11 +12573,12 @@ filename &fname2 clear;
%mv_getjoblog(uri=&uri,outref=mylog) %mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor macro, which is generally a more This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log. convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values: @param [in] grant_type= valid values:
@li password @li password
@li authorization_code @li authorization_code
@@ -12604,6 +12606,7 @@ filename &fname2 clear;
,contextName=SAS Job Execution compute context ,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,mdebug=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -12660,7 +12663,7 @@ options noquotelenmax;
%local fname1; %local fname1;
%let fname1=%mf_getuniquefileref(); %let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&joburi"; url="&base_uri&uri";
headers headers
%if &grant_type=authorization_code %then %do; %if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var" "Authorization"="Bearer &&&access_token_var"
@@ -12679,7 +12682,7 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
@@ -12699,15 +12702,94 @@ data _null_;
run; run;
%inc "&fpath3..lua"; %inc "&fpath3..lua";
/* get log path*/ /* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_; data _null_;
infile &fname2; infile &fname2;
input; input;
call symputx('logloc',_infile_,'l'); uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run; run;
%put &=logloc;
filename &fname1 clear; %mp_abort(iftrue=(&errflg=1)
filename &fname2 clear; ,mac=&sysmacroname
%mend;/** ,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=0 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;
/**
@file @file
@brief Extract the status from a running SAS Viya job @brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output @details Extracts the status from a running job and appends it to an output
@@ -13296,18 +13378,19 @@ libname &libref;
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -13368,7 +13451,16 @@ libname &libref;
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -13381,6 +13473,7 @@ libname &libref;
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13400,6 +13493,7 @@ libname &libref;
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13422,16 +13516,29 @@ libname &libref;
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -13556,7 +13663,7 @@ data;run;%let jdswaitfor=&syslast;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -13661,7 +13768,7 @@ data;run;%let jdswaitfor=&syslast;
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13672,6 +13779,7 @@ data;run;%let jdswaitfor=&syslast;
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -13680,6 +13788,7 @@ data;run;%let jdswaitfor=&syslast;
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13723,7 +13832,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -13767,14 +13876,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;

View File

@@ -114,10 +114,10 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
/* read using LUA - this allows the code to be of any length */ /* read using LUA - this allows the code to be of any length */
data _null_; data _null_;
file "&fpath3..lua"; file "&fpath3..lua";
@@ -147,4 +147,5 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;

267
viya/mv_getjoblog.sas Normal file
View File

@@ -0,0 +1,267 @@
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=0 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;

View File

@@ -9,18 +9,19 @@
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -81,7 +82,16 @@
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -94,6 +104,7 @@
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -113,6 +124,7 @@
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -135,16 +147,29 @@
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -269,7 +294,7 @@ data;run;%let jdswaitfor=&syslast;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;

View File

@@ -72,7 +72,7 @@
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -83,6 +83,7 @@
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -91,6 +92,7 @@
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -134,7 +136,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -178,14 +180,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;