1
0
mirror of https://github.com/sasjs/core.git synced 2025-12-21 10:11:19 +00:00

Compare commits

...

4 Commits

6 changed files with 667 additions and 43 deletions

359
all.sas
View File

@@ -229,6 +229,32 @@ options noquotelenmax;
%mend; %mend;
/** @endcond *//** /** @endcond *//**
@file
@brief Checks whether a fileref exists
@details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax!
@param fref the fileref to detect
@return output Returns 1 if found and 0 if not found. Note - it is possible
that the fileref is found, but the file does not (yet) exist. If you need
to test for this, you may as well use the fileref function directly.
@version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
**/
%macro mf_existfileref(fref
)/*/STORE SOURCE*/;
%if %sysfunc(fileref(&fref))=0 %then %do;
1
%end;
%else %do;
0
%end;
%mend;/**
@file @file
@brief Checks if a variable exists in a data set. @brief Checks if a variable exists in a data set.
@details Returns 0 if the variable does NOT exist, and return the position of @details Returns 0 if the variable does NOT exist, and return the position of
@@ -12461,10 +12487,10 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
/* read using LUA - this allows the code to be of any length */ /* read using LUA - this allows the code to be of any length */
data _null_; data _null_;
file "&fpath3..lua"; file "&fpath3..lua";
@@ -12494,7 +12520,275 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=0 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;
/** /**
@file @file
@brief Extract the status from a running SAS Viya job @brief Extract the status from a running SAS Viya job
@@ -13084,18 +13378,19 @@ libname &libref;
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -13156,7 +13451,16 @@ libname &libref;
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -13169,6 +13473,7 @@ libname &libref;
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13188,6 +13493,7 @@ libname &libref;
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13210,16 +13516,29 @@ libname &libref;
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -13344,7 +13663,7 @@ data;run;%let jdswaitfor=&syslast;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;
@@ -13449,7 +13768,7 @@ data;run;%let jdswaitfor=&syslast;
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -13460,6 +13779,7 @@ data;run;%let jdswaitfor=&syslast;
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -13468,6 +13788,7 @@ data;run;%let jdswaitfor=&syslast;
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -13511,7 +13832,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -13555,14 +13876,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;

View File

@@ -4,12 +4,11 @@
@details You can probably do without this macro as it is just a one liner. @details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax! Mainly it is here as a convenient way to remember the syntax!
For this macro, if the fileref exists but the underlying file does not exist
@param fref the fileref to detect @param fref the fileref to detect
@return output returns 1 if found AND the file exists. 0 is returned if not @return output Returns 1 if found and 0 if not found. Note - it is possible
found, and -1 is returned if the fileref is found but the file does not exist. that the fileref is found, but the file does not (yet) exist. If you need
to test for this, you may as well use the fileref function directly.
@version 8 @version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/) @author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
@@ -17,15 +16,12 @@
%macro mf_existfileref(fref %macro mf_existfileref(fref
)/*/STORE SOURCE*/; )/*/STORE SOURCE*/;
%local result;
%let result=%sysfunc(fileref(&fref)); %if %sysfunc(fileref(&fref))=0 %then %do;
%if &result>0 %then %do;
0
%end;
%else %if &result=0 %then %do;
1 1
%end; %end;
%else %do; %else %do;
-1 0
%end; %end;
%mend; %mend;

View File

@@ -114,10 +114,10 @@ run;
%let fname3=%mf_getuniquefileref(); %let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1)); %let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2)); %let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname2)); %let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */ /* compile the lua JSON module */
%ml_json() %ml_json()
/* read using LUA - this allows the code to be of any length */ /* read using LUA - this allows the code to be of any length */
data _null_; data _null_;
file "&fpath3..lua"; file "&fpath3..lua";
@@ -147,4 +147,5 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
filename &fname3 clear;
%mend; %mend;

267
viya/mv_getjoblog.sas Normal file
View File

@@ -0,0 +1,267 @@
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=0 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;

View File

@@ -9,18 +9,19 @@
## Input table (minimum variables needed) ## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself @li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro Any additional variables provided in this table are converted into macro
variables and passed into the relevant job. variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM| |_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---| |---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1| |/Public/jobs/somejob1|0|SAS Job Execution compute context|
|0|SAS Job Execution compute context|/Public/jobs/somejob2| |/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced) ## Output table (minimum variables produced)
@@ -81,7 +82,16 @@
Trigger the flow Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4) %mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token @param [in] access_token_var= The global macro variable to contain the access token
@@ -94,6 +104,7 @@
@param [in] inds= The input dataset containing a list of jobs and parameters @param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results @param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05 @version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -113,6 +124,7 @@
,maxconcurrency=8 ,maxconcurrency=8
,access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -135,16 +147,29 @@
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(Input dataset was not provided) ,msg=%str(Input dataset was not provided)
) )
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0) %mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds: ,msg=%str(The _PROGRAM column must exist on input dataset &inds)
_CONTEXTNAME FLOW_ID _PROGRAM)
) )
%mp_abort(iftrue=(&maxconcurrency<1) %mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname ,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer) ,msg=%str(The maxconcurrency variable should be a positive integer)
) )
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings; %local missings;
proc sql noprint; proc sql noprint;
select count(*) into: missings select count(*) into: missings
@@ -269,7 +294,7 @@ data;run;%let jdswaitfor=&syslast;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done; %local done;
%let done=%mf_nobs(&jdswaitfor); %let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do; %if &done>0 %then %do;

View File

@@ -72,7 +72,7 @@
should be in a `_program` variable. should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job @param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute) (default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04 @version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
@@ -83,6 +83,7 @@
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvar.sas @li mf_existvar.sas
@li mf_nobs.sas @li mf_nobs.sas
@li mv_getjoblog.sas
**/ **/
@@ -91,6 +92,7 @@
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
,outref=0
); );
%local oauth_bearer; %local oauth_bearer;
%if &grant_type=detect %then %do; %if &grant_type=detect %then %do;
@@ -134,7 +136,7 @@ options noquotelenmax;
data _null_; data _null_;
length jobparams $32767; length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l'); call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
@@ -178,14 +180,20 @@ run;
run; run;
%if &status=completed or &status=failed or &status=canceled %then %do; %if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql; proc sql;
insert into &outds set insert into &outds set
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&plainuri",
state="&status", state="&status",
timestamp=datetime(), timestamp=datetime(),
jobparams=symget("jobparams&i"); jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;