mirror of
https://github.com/sasjs/core.git
synced 2025-12-24 11:41:20 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 4a6c8ffbb3 | |||
| b5c86e7031 | |||
| 9783edd0e3 | |||
| 961728a987 | |||
| 4b34322d94 | |||
| 8bb83deede | |||
| 79c81aa8a4 | |||
| bbbcf7d550 | |||
| 82184bc6be |
439
all.sas
439
all.sas
@@ -229,6 +229,32 @@ options noquotelenmax;
|
||||
%mend;
|
||||
|
||||
/** @endcond *//**
|
||||
@file
|
||||
@brief Checks whether a fileref exists
|
||||
@details You can probably do without this macro as it is just a one liner.
|
||||
Mainly it is here as a convenient way to remember the syntax!
|
||||
|
||||
@param fref the fileref to detect
|
||||
|
||||
@return output Returns 1 if found and 0 if not found. Note - it is possible
|
||||
that the fileref is found, but the file does not (yet) exist. If you need
|
||||
to test for this, you may as well use the fileref function directly.
|
||||
|
||||
@version 8
|
||||
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
|
||||
**/
|
||||
|
||||
%macro mf_existfileref(fref
|
||||
)/*/STORE SOURCE*/;
|
||||
|
||||
%if %sysfunc(fileref(&fref))=0 %then %do;
|
||||
1
|
||||
%end;
|
||||
%else %do;
|
||||
0
|
||||
%end;
|
||||
|
||||
%mend;/**
|
||||
@file
|
||||
@brief Checks if a variable exists in a data set.
|
||||
@details Returns 0 if the variable does NOT exist, and return the position of
|
||||
@@ -378,7 +404,7 @@ options noquotelenmax;
|
||||
%local dsid rc;
|
||||
%let dsid=%sysfunc(open(&libds,is));
|
||||
%if &dsid = 0 %then %do;
|
||||
%put WARNING: Cannot open %trim(&libds), system message below;
|
||||
%put %str(WARN)ING: Cannot open %trim(&libds), system message below;
|
||||
%put %sysfunc(sysmsg());
|
||||
-1
|
||||
%end;
|
||||
@@ -484,8 +510,8 @@ options noquotelenmax;
|
||||
@brief retrieves a key value pair from a control dataset
|
||||
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
||||
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%put %mf_getkeyvalue(someindex)
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%put %mf_getkeyvalue(someindex)
|
||||
|
||||
|
||||
@param key Provide a key on which to perform the lookup
|
||||
@@ -577,11 +603,18 @@ options noquotelenmax;
|
||||
@brief Adds custom quotes / delimiters to a delimited string
|
||||
@details Can be used in open code, eg as follows:
|
||||
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
|
||||
which returns:
|
||||
> 'blah','blah','blah'
|
||||
|
||||
Alternatively:
|
||||
|
||||
%put %mf_getquotedstr(these words are double quoted,quote=D)
|
||||
|
||||
for:
|
||||
> "these","words","are","double","quoted"
|
||||
|
||||
@param in_str the unquoted, spaced delimited string to transform
|
||||
@param dlm= the delimeter to be applied to the output (default comma)
|
||||
@param indlm= the delimeter used for the input (default is space)
|
||||
@@ -4515,8 +4548,8 @@ proc sql
|
||||
@brief Logs a key value pair a control dataset
|
||||
@details If the dataset does not exist, it is created. Usage:
|
||||
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%mp_setkeyvalue(somenewindex,somevalue)
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%mp_setkeyvalue(somenewindex,somevalue)
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_existds.sas
|
||||
@@ -4538,7 +4571,7 @@ proc sql
|
||||
|
||||
%if not (%mf_existds(&libds)) %then %do;
|
||||
data &libds (index=(key/unique));
|
||||
length key $32 valc $256 valn 8 type $1;
|
||||
length key $64 valc $2048 valn 8 type $1;
|
||||
call missing(of _all_);
|
||||
stop;
|
||||
run;
|
||||
@@ -4782,6 +4815,7 @@ proc sql
|
||||
%local lib dir ds1 ds2 ds3 start_tm i;
|
||||
|
||||
%let start_tm=%sysfunc(datetime());
|
||||
%let duration=%sysevalf(&duration);
|
||||
|
||||
/* create a temporary library in WORK */
|
||||
%let lib=%mf_getuniquelibref();
|
||||
@@ -4809,7 +4843,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
proc summary ;
|
||||
class randnum0 randnum1;
|
||||
output out=&lib..&ds2;
|
||||
run;
|
||||
run;quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* add more data */
|
||||
@@ -4818,6 +4852,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
select *, ranuni(0)*10 as randnum2
|
||||
from &lib..&ds1
|
||||
order by randnum1;
|
||||
quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc sort data=&lib..&ds3;
|
||||
@@ -10624,7 +10659,6 @@ run;
|
||||
@details Expects oauth token in a global macro variable (default
|
||||
ACCESS_TOKEN).
|
||||
|
||||
options mprint;
|
||||
%mv_createfolder(path=/Public)
|
||||
|
||||
|
||||
@@ -11474,7 +11508,7 @@ run;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -11729,7 +11763,6 @@ libname &libref1a clear;
|
||||
@details If not running in Studo 5 +, will expect an oauth token in a global
|
||||
macro variable (default ACCESS_TOKEN).
|
||||
|
||||
options mprint;
|
||||
%mv_createfolder(path=/Public/test/blah)
|
||||
%mv_deleteviyafolder(path=/Public/test)
|
||||
|
||||
@@ -12059,7 +12092,7 @@ libname &libref1 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -12294,7 +12327,7 @@ filename &fname1 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -12387,7 +12420,6 @@ libname &libref1 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -12452,10 +12484,10 @@ run;
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname3));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
@@ -12485,7 +12517,275 @@ data _null_;
|
||||
run;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
filename &fname3 clear;
|
||||
%mend;
|
||||
/**
|
||||
@file
|
||||
@brief Extract the log from a completed SAS Viya Job
|
||||
@details Extracts log from a Viya job and writes it out to a fileref
|
||||
|
||||
To query the job, you need the URI. Sample code for achieving this
|
||||
is provided below.
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Next, create a job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
data ;
|
||||
rand=ranuni(0)*1000;
|
||||
do x=1 to rand;
|
||||
y=rand*4;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
proc sort data=&syslast
|
||||
by descending y;
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
,outds=work.info
|
||||
)
|
||||
|
||||
Wait for it to finish, and grab the uri:
|
||||
|
||||
data _null_;
|
||||
set work.info;
|
||||
if method='GET' and rel='self';
|
||||
call symputx('uri',uri);
|
||||
run;
|
||||
|
||||
Finally, fetch the log:
|
||||
|
||||
%mv_getjoblog(uri=&uri,outref=mylog)
|
||||
|
||||
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
|
||||
convenient way to wait for the job to finish before fetching the log.
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||
@param [in] grant_type= valid values:
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
@li sas_services - will use oauth_bearer=sas_services.
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outref= The output fileref to which to APPEND the log (is always
|
||||
appended).
|
||||
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_existfileref.sas
|
||||
@li ml_json.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_getjoblog(uri=0,outref=0
|
||||
,contextName=SAS Job Execution compute context
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,mdebug=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
|
||||
/* validation in datastep for better character safety */
|
||||
%local errmsg errflg;
|
||||
data _null_;
|
||||
uri=symget('uri');
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
%mp_abort(iftrue=(&outref=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Output fileref should be provided)
|
||||
)
|
||||
|
||||
%if %mf_existfileref(&outref) ne 1 %then %do;
|
||||
filename &outref temp;
|
||||
%end;
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
/* prepare request*/
|
||||
%local fname1;
|
||||
%let fname1=%mf_getuniquefileref();
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&uri";
|
||||
headers
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
%local fname2 fname3 fpath1 fpath2 fpath3;
|
||||
%let fname2=%mf_getuniquefileref();
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname3));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
local logloc=resp["logLocation"]
|
||||
outfile:write(logloc)
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
/* get log path*/
|
||||
%let errflg=1;
|
||||
%let errmsg=No entry in &fname2 fileref;
|
||||
data _null_;
|
||||
infile &fname2;
|
||||
input;
|
||||
uri=_infile_;
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /files/files/$$$$UUID$$$$"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
call symputx('errflg',0,'l');
|
||||
call symputx('logloc',uri,'l');
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
/* we have a log uri - now fetch the log */
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&logloc/content";
|
||||
headers
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
for i, v in pairs(resp["items"]) do
|
||||
outfile:write(v.line,"\n")
|
||||
end
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
|
||||
/* write log out to the specified fileref */
|
||||
data _null_;
|
||||
infile &fname2 end=last;
|
||||
file &outref mod;
|
||||
if _n_=1 then do;
|
||||
put "/** SASJS Viya Job Log Extract start: &uri **/";
|
||||
end;
|
||||
input;
|
||||
put _infile_;
|
||||
%if &mdebug=1 %then %do;
|
||||
putlog _infile_;
|
||||
%end;
|
||||
if last then do;
|
||||
put "/** SASJS Viya Job Log Extract end: &uri **/";
|
||||
end;
|
||||
run;
|
||||
|
||||
%if &mdebug=0 %then %do;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
filename &fname3 clear;
|
||||
%end;
|
||||
%else %do;
|
||||
%put _local_;
|
||||
%end;
|
||||
%mend;
|
||||
|
||||
|
||||
|
||||
/**
|
||||
@file
|
||||
@brief Extract the status from a running SAS Viya job
|
||||
@@ -12589,7 +12889,7 @@ filename &fname2 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -13075,18 +13375,19 @@ libname &libref;
|
||||
|
||||
## Input table (minimum variables needed)
|
||||
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank, will default to `SAS Job Execution compute context`.
|
||||
@li _PROGRAM - Provides the path to the job itself
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
|
||||
optional, will default to 0 if not provided.
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank (or not provided), will default to `SAS Job Execution compute context`.
|
||||
|
||||
Any additional variables provided in this table are converted into macro
|
||||
variables and passed into the relevant job.
|
||||
|
||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
||||
|_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|
||||
|---|---|---|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
||||
|/Public/jobs/somejob1|0|SAS Job Execution compute context|
|
||||
|/Public/jobs/somejob2|0|SAS Job Execution compute context|
|
||||
|
||||
## Output table (minimum variables produced)
|
||||
|
||||
@@ -13147,7 +13448,16 @@ libname &libref;
|
||||
|
||||
Trigger the flow
|
||||
|
||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||
%mv_jobflow(inds=work.inputjobs
|
||||
,maxconcurrency=4
|
||||
,outds=work.results
|
||||
,outref=myjoblog
|
||||
)
|
||||
|
||||
data _null_;
|
||||
infile myjoblog;
|
||||
input; put _infile_;
|
||||
run;
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@@ -13159,7 +13469,9 @@ libname &libref;
|
||||
@li sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||
@param [out] outds= The output dataset containing the results
|
||||
@param [out] outref= The output fileref to which to append the log file(s).
|
||||
|
||||
@version VIYA V.03.05
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
@@ -13179,6 +13491,8 @@ libname &libref;
|
||||
,maxconcurrency=8
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,outref=0
|
||||
,mdebug=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
@@ -13201,16 +13515,29 @@ libname &libref;
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Input dataset was not provided)
|
||||
)
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The following columns must exist on input dataset &inds:
|
||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||
,msg=%str(The _PROGRAM column must exist on input dataset &inds)
|
||||
)
|
||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||
)
|
||||
|
||||
/* set defaults if not provided */
|
||||
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
|
||||
data &inds;
|
||||
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
|
||||
length _CONTEXTNAME $128;
|
||||
retain _CONTEXTNAME "SAS Job Execution compute context";
|
||||
%end;
|
||||
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
|
||||
retain FLOW_ID 0;
|
||||
%end;
|
||||
set &inds;
|
||||
run;
|
||||
%end;
|
||||
|
||||
%local missings;
|
||||
proc sql noprint;
|
||||
select count(*) into: missings
|
||||
@@ -13288,8 +13615,8 @@ data;run;%let jdswaitfor=&syslast;
|
||||
jparams='jparams'!!left(symget('jid'));
|
||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||
run;
|
||||
%local joburi&jid;
|
||||
%let joburi&jid=0; /* used in next loop */
|
||||
%local jobuid&jid;
|
||||
%let jobuid&jid=0; /* used in next loop */
|
||||
%end;
|
||||
%local concurrency completed;
|
||||
%let concurrency=0;
|
||||
@@ -13300,8 +13627,21 @@ data;run;%let jdswaitfor=&syslast;
|
||||
* now we can execute the jobs up to the maxconcurrency setting
|
||||
*/
|
||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
/* job has not been triggered and we have free slots */
|
||||
|
||||
/* check to see if the job finished in the previous round */
|
||||
%if %sysfunc(exist(&outds))=1 %then %do;
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uuid="&&jobuid&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
|
||||
/* check if job was triggered and if so, if we have enough slots to run */
|
||||
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
%local jobname jobpath;
|
||||
%let jobname=%scan(&&job&jid,-1,/);
|
||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||
@@ -13315,27 +13655,18 @@ data;run;%let jdswaitfor=&syslast;
|
||||
format jobparams $32767.;
|
||||
set &jdsapp(where=(method='GET' and rel='state'));
|
||||
jobparams=symget("jparams&jid");
|
||||
call symputx("joburi&jid",uri,'l');
|
||||
/* uri here has the /state suffix */
|
||||
uuid=scan(uri,-2,'/');
|
||||
call symputx("jobuid&jid",uuid,'l');
|
||||
run;
|
||||
proc append base=&jdsrunning data=&jdsapp;
|
||||
run;
|
||||
%let concurrency=%eval(&concurrency+1);
|
||||
%end;
|
||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||
/* check to see if the job has finished as was previously executed */
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uri="&&joburi&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
%if &jid=&jcnt %then %do;
|
||||
/* we are at the end of the loop - time to see which jobs have finished */
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
|
||||
%local done;
|
||||
%let done=%mf_nobs(&jdswaitfor);
|
||||
%if &done>0 %then %do;
|
||||
@@ -13344,13 +13675,14 @@ data;run;%let jdswaitfor=&syslast;
|
||||
data &jdsapp;
|
||||
set &jdswaitfor;
|
||||
flow_id=&&flow&fid;
|
||||
uuid=scan(uri,-1,'/');
|
||||
run;
|
||||
proc append base=&outds data=&jdsapp;
|
||||
run;
|
||||
%end;
|
||||
proc sql;
|
||||
delete from &jdsrunning
|
||||
where uri in (select uri from &outds
|
||||
where uuid in (select uuid from &outds
|
||||
where state in ('canceled','completed','failed')
|
||||
);
|
||||
|
||||
@@ -13364,6 +13696,9 @@ data;run;%let jdswaitfor=&syslast;
|
||||
/* back up and execute the next flow */
|
||||
%end;
|
||||
|
||||
%if &mdebug=1 %then %do;
|
||||
%put _local_;
|
||||
%end;
|
||||
|
||||
%mend;
|
||||
/**
|
||||
@@ -13440,7 +13775,7 @@ data;run;%let jdswaitfor=&syslast;
|
||||
should be in a `_program` variable.
|
||||
@param [out] outds= The output dataset containing the list of states by job
|
||||
(default=work.mv_jobexecute)
|
||||
|
||||
@param [out] outref= A fileref to which the spawned job logs should be appended.
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
@@ -13451,6 +13786,7 @@ data;run;%let jdswaitfor=&syslast;
|
||||
@li mf_getuniquefileref.sas
|
||||
@li mf_existvar.sas
|
||||
@li mf_nobs.sas
|
||||
@li mv_getjoblog.sas
|
||||
|
||||
**/
|
||||
|
||||
@@ -13459,6 +13795,7 @@ data;run;%let jdswaitfor=&syslast;
|
||||
,grant_type=sas_services
|
||||
,inds=0
|
||||
,outds=work.mv_jobwaitfor
|
||||
,outref=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
@@ -13502,7 +13839,7 @@ options noquotelenmax;
|
||||
data _null_;
|
||||
length jobparams $32767;
|
||||
set &inds end=last;
|
||||
call symputx(cats('joburi',_n_),uri,'l');
|
||||
call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
|
||||
call symputx(cats('jobname',_n_),_program,'l');
|
||||
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||
if last then call symputx('uricnt',_n_,'l');
|
||||
@@ -13546,14 +13883,20 @@ run;
|
||||
run;
|
||||
|
||||
%if &status=completed or &status=failed or &status=canceled %then %do;
|
||||
%local plainuri;
|
||||
%let plainuri=%substr(&&joburi&i,1,55);
|
||||
proc sql;
|
||||
insert into &outds set
|
||||
_program="&&jobname&i",
|
||||
uri="&&joburi&i",
|
||||
uri="&plainuri",
|
||||
state="&status",
|
||||
timestamp=datetime(),
|
||||
jobparams=symget("jobparams&i");
|
||||
%let joburi&i=0; /* do not re-check */
|
||||
/* fetch log */
|
||||
%if %str(&outref) ne 0 %then %do;
|
||||
%mv_getjoblog(uri=&plainuri,outref=&outref)
|
||||
%end;
|
||||
%end;
|
||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||
data _null_;
|
||||
|
||||
27
base/mf_existfileref.sas
Normal file
27
base/mf_existfileref.sas
Normal file
@@ -0,0 +1,27 @@
|
||||
/**
|
||||
@file
|
||||
@brief Checks whether a fileref exists
|
||||
@details You can probably do without this macro as it is just a one liner.
|
||||
Mainly it is here as a convenient way to remember the syntax!
|
||||
|
||||
@param fref the fileref to detect
|
||||
|
||||
@return output Returns 1 if found and 0 if not found. Note - it is possible
|
||||
that the fileref is found, but the file does not (yet) exist. If you need
|
||||
to test for this, you may as well use the fileref function directly.
|
||||
|
||||
@version 8
|
||||
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
|
||||
**/
|
||||
|
||||
%macro mf_existfileref(fref
|
||||
)/*/STORE SOURCE*/;
|
||||
|
||||
%if %sysfunc(fileref(&fref))=0 %then %do;
|
||||
1
|
||||
%end;
|
||||
%else %do;
|
||||
0
|
||||
%end;
|
||||
|
||||
%mend;
|
||||
@@ -23,7 +23,7 @@
|
||||
%local dsid rc;
|
||||
%let dsid=%sysfunc(open(&libds,is));
|
||||
%if &dsid = 0 %then %do;
|
||||
%put WARNING: Cannot open %trim(&libds), system message below;
|
||||
%put %str(WARN)ING: Cannot open %trim(&libds), system message below;
|
||||
%put %sysfunc(sysmsg());
|
||||
-1
|
||||
%end;
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
@brief retrieves a key value pair from a control dataset
|
||||
@details By default, control dataset is work.mp_setkeyvalue. Usage:
|
||||
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%put %mf_getkeyvalue(someindex)
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%put %mf_getkeyvalue(someindex)
|
||||
|
||||
|
||||
@param key Provide a key on which to perform the lookup
|
||||
|
||||
@@ -3,11 +3,18 @@
|
||||
@brief Adds custom quotes / delimiters to a delimited string
|
||||
@details Can be used in open code, eg as follows:
|
||||
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
|
||||
which returns:
|
||||
> 'blah','blah','blah'
|
||||
|
||||
Alternatively:
|
||||
|
||||
%put %mf_getquotedstr(these words are double quoted,quote=D)
|
||||
|
||||
for:
|
||||
> "these","words","are","double","quoted"
|
||||
|
||||
@param in_str the unquoted, spaced delimited string to transform
|
||||
@param dlm= the delimeter to be applied to the output (default comma)
|
||||
@param indlm= the delimeter used for the input (default is space)
|
||||
|
||||
@@ -3,8 +3,8 @@
|
||||
@brief Logs a key value pair a control dataset
|
||||
@details If the dataset does not exist, it is created. Usage:
|
||||
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%mp_setkeyvalue(somenewindex,somevalue)
|
||||
%mp_setkeyvalue(someindex,22,type=N)
|
||||
%mp_setkeyvalue(somenewindex,somevalue)
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_existds.sas
|
||||
@@ -26,7 +26,7 @@
|
||||
|
||||
%if not (%mf_existds(&libds)) %then %do;
|
||||
data &libds (index=(key/unique));
|
||||
length key $32 valc $256 valn 8 type $1;
|
||||
length key $64 valc $2048 valn 8 type $1;
|
||||
call missing(of _all_);
|
||||
stop;
|
||||
run;
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
%local lib dir ds1 ds2 ds3 start_tm i;
|
||||
|
||||
%let start_tm=%sysfunc(datetime());
|
||||
%let duration=%sysevalf(&duration);
|
||||
|
||||
/* create a temporary library in WORK */
|
||||
%let lib=%mf_getuniquelibref();
|
||||
@@ -53,7 +54,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
proc summary ;
|
||||
class randnum0 randnum1;
|
||||
output out=&lib..&ds2;
|
||||
run;
|
||||
run;quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* add more data */
|
||||
@@ -62,6 +63,7 @@ libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
select *, ranuni(0)*10 as randnum2
|
||||
from &lib..&ds1
|
||||
order by randnum1;
|
||||
quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc sort data=&lib..&ds3;
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
@details Expects oauth token in a global macro variable (default
|
||||
ACCESS_TOKEN).
|
||||
|
||||
options mprint;
|
||||
%mv_createfolder(path=/Public)
|
||||
|
||||
|
||||
|
||||
@@ -46,7 +46,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
|
||||
@@ -4,7 +4,6 @@
|
||||
@details If not running in Studo 5 +, will expect an oauth token in a global
|
||||
macro variable (default ACCESS_TOKEN).
|
||||
|
||||
options mprint;
|
||||
%mv_createfolder(path=/Public/test/blah)
|
||||
%mv_deleteviyafolder(path=/Public/test)
|
||||
|
||||
|
||||
@@ -40,7 +40,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
|
||||
@@ -52,7 +52,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
|
||||
@@ -49,7 +49,6 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -114,10 +113,10 @@ run;
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname3));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
@@ -147,4 +146,5 @@ data _null_;
|
||||
run;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
filename &fname3 clear;
|
||||
%mend;
|
||||
|
||||
267
viya/mv_getjoblog.sas
Normal file
267
viya/mv_getjoblog.sas
Normal file
@@ -0,0 +1,267 @@
|
||||
/**
|
||||
@file
|
||||
@brief Extract the log from a completed SAS Viya Job
|
||||
@details Extracts log from a Viya job and writes it out to a fileref
|
||||
|
||||
To query the job, you need the URI. Sample code for achieving this
|
||||
is provided below.
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Next, create a job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
data ;
|
||||
rand=ranuni(0)*1000;
|
||||
do x=1 to rand;
|
||||
y=rand*4;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
proc sort data=&syslast
|
||||
by descending y;
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
,outds=work.info
|
||||
)
|
||||
|
||||
Wait for it to finish, and grab the uri:
|
||||
|
||||
data _null_;
|
||||
set work.info;
|
||||
if method='GET' and rel='self';
|
||||
call symputx('uri',uri);
|
||||
run;
|
||||
|
||||
Finally, fetch the log:
|
||||
|
||||
%mv_getjoblog(uri=&uri,outref=mylog)
|
||||
|
||||
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
|
||||
convenient way to wait for the job to finish before fetching the log.
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||
@param [in] grant_type= valid values:
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
@li sas_services - will use oauth_bearer=sas_services.
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outref= The output fileref to which to APPEND the log (is always
|
||||
appended).
|
||||
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_existfileref.sas
|
||||
@li ml_json.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_getjoblog(uri=0,outref=0
|
||||
,contextName=SAS Job Execution compute context
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,mdebug=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
|
||||
/* validation in datastep for better character safety */
|
||||
%local errmsg errflg;
|
||||
data _null_;
|
||||
uri=symget('uri');
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
%mp_abort(iftrue=(&outref=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Output fileref should be provided)
|
||||
)
|
||||
|
||||
%if %mf_existfileref(&outref) ne 1 %then %do;
|
||||
filename &outref temp;
|
||||
%end;
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
/* prepare request*/
|
||||
%local fname1;
|
||||
%let fname1=%mf_getuniquefileref();
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&uri";
|
||||
headers
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
%local fname2 fname3 fpath1 fpath2 fpath3;
|
||||
%let fname2=%mf_getuniquefileref();
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname3));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
local logloc=resp["logLocation"]
|
||||
outfile:write(logloc)
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
/* get log path*/
|
||||
%let errflg=1;
|
||||
%let errmsg=No entry in &fname2 fileref;
|
||||
data _null_;
|
||||
infile &fname2;
|
||||
input;
|
||||
uri=_infile_;
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /files/files/$$$$UUID$$$$"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
call symputx('errflg',0,'l');
|
||||
call symputx('logloc',uri,'l');
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
/* we have a log uri - now fetch the log */
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&logloc/content";
|
||||
headers
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
for i, v in pairs(resp["items"]) do
|
||||
outfile:write(v.line,"\n")
|
||||
end
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
|
||||
/* write log out to the specified fileref */
|
||||
data _null_;
|
||||
infile &fname2 end=last;
|
||||
file &outref mod;
|
||||
if _n_=1 then do;
|
||||
put "/** SASJS Viya Job Log Extract start: &uri **/";
|
||||
end;
|
||||
input;
|
||||
put _infile_;
|
||||
%if &mdebug=1 %then %do;
|
||||
putlog _infile_;
|
||||
%end;
|
||||
if last then do;
|
||||
put "/** SASJS Viya Job Log Extract end: &uri **/";
|
||||
end;
|
||||
run;
|
||||
|
||||
%if &mdebug=0 %then %do;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
filename &fname3 clear;
|
||||
%end;
|
||||
%else %do;
|
||||
%put _local_;
|
||||
%end;
|
||||
%mend;
|
||||
|
||||
|
||||
|
||||
@@ -101,7 +101,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
|
||||
@@ -9,18 +9,19 @@
|
||||
|
||||
## Input table (minimum variables needed)
|
||||
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank, will default to `SAS Job Execution compute context`.
|
||||
@li _PROGRAM - Provides the path to the job itself
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
|
||||
optional, will default to 0 if not provided.
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank (or not provided), will default to `SAS Job Execution compute context`.
|
||||
|
||||
Any additional variables provided in this table are converted into macro
|
||||
variables and passed into the relevant job.
|
||||
|
||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
||||
|_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|
||||
|---|---|---|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
||||
|/Public/jobs/somejob1|0|SAS Job Execution compute context|
|
||||
|/Public/jobs/somejob2|0|SAS Job Execution compute context|
|
||||
|
||||
## Output table (minimum variables produced)
|
||||
|
||||
@@ -81,7 +82,16 @@
|
||||
|
||||
Trigger the flow
|
||||
|
||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||
%mv_jobflow(inds=work.inputjobs
|
||||
,maxconcurrency=4
|
||||
,outds=work.results
|
||||
,outref=myjoblog
|
||||
)
|
||||
|
||||
data _null_;
|
||||
infile myjoblog;
|
||||
input; put _infile_;
|
||||
run;
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@@ -93,7 +103,9 @@
|
||||
@li sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||
@param [in] mdebug= set to 1 to enable DEBUG messages
|
||||
@param [out] outds= The output dataset containing the results
|
||||
@param [out] outref= The output fileref to which to append the log file(s).
|
||||
|
||||
@version VIYA V.03.05
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
@@ -113,6 +125,8 @@
|
||||
,maxconcurrency=8
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,outref=0
|
||||
,mdebug=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
@@ -135,16 +149,29 @@
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Input dataset was not provided)
|
||||
)
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The following columns must exist on input dataset &inds:
|
||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||
,msg=%str(The _PROGRAM column must exist on input dataset &inds)
|
||||
)
|
||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||
)
|
||||
|
||||
/* set defaults if not provided */
|
||||
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
|
||||
data &inds;
|
||||
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
|
||||
length _CONTEXTNAME $128;
|
||||
retain _CONTEXTNAME "SAS Job Execution compute context";
|
||||
%end;
|
||||
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
|
||||
retain FLOW_ID 0;
|
||||
%end;
|
||||
set &inds;
|
||||
run;
|
||||
%end;
|
||||
|
||||
%local missings;
|
||||
proc sql noprint;
|
||||
select count(*) into: missings
|
||||
@@ -222,8 +249,8 @@ data;run;%let jdswaitfor=&syslast;
|
||||
jparams='jparams'!!left(symget('jid'));
|
||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||
run;
|
||||
%local joburi&jid;
|
||||
%let joburi&jid=0; /* used in next loop */
|
||||
%local jobuid&jid;
|
||||
%let jobuid&jid=0; /* used in next loop */
|
||||
%end;
|
||||
%local concurrency completed;
|
||||
%let concurrency=0;
|
||||
@@ -234,8 +261,21 @@ data;run;%let jdswaitfor=&syslast;
|
||||
* now we can execute the jobs up to the maxconcurrency setting
|
||||
*/
|
||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
/* job has not been triggered and we have free slots */
|
||||
|
||||
/* check to see if the job finished in the previous round */
|
||||
%if %sysfunc(exist(&outds))=1 %then %do;
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uuid="&&jobuid&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
|
||||
/* check if job was triggered and if so, if we have enough slots to run */
|
||||
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
%local jobname jobpath;
|
||||
%let jobname=%scan(&&job&jid,-1,/);
|
||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||
@@ -249,27 +289,18 @@ data;run;%let jdswaitfor=&syslast;
|
||||
format jobparams $32767.;
|
||||
set &jdsapp(where=(method='GET' and rel='state'));
|
||||
jobparams=symget("jparams&jid");
|
||||
call symputx("joburi&jid",uri,'l');
|
||||
/* uri here has the /state suffix */
|
||||
uuid=scan(uri,-2,'/');
|
||||
call symputx("jobuid&jid",uuid,'l');
|
||||
run;
|
||||
proc append base=&jdsrunning data=&jdsapp;
|
||||
run;
|
||||
%let concurrency=%eval(&concurrency+1);
|
||||
%end;
|
||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||
/* check to see if the job has finished as was previously executed */
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uri="&&joburi&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
%if &jid=&jcnt %then %do;
|
||||
/* we are at the end of the loop - time to see which jobs have finished */
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
|
||||
%local done;
|
||||
%let done=%mf_nobs(&jdswaitfor);
|
||||
%if &done>0 %then %do;
|
||||
@@ -278,13 +309,14 @@ data;run;%let jdswaitfor=&syslast;
|
||||
data &jdsapp;
|
||||
set &jdswaitfor;
|
||||
flow_id=&&flow&fid;
|
||||
uuid=scan(uri,-1,'/');
|
||||
run;
|
||||
proc append base=&outds data=&jdsapp;
|
||||
run;
|
||||
%end;
|
||||
proc sql;
|
||||
delete from &jdsrunning
|
||||
where uri in (select uri from &outds
|
||||
where uuid in (select uuid from &outds
|
||||
where state in ('canceled','completed','failed')
|
||||
);
|
||||
|
||||
@@ -298,5 +330,8 @@ data;run;%let jdswaitfor=&syslast;
|
||||
/* back up and execute the next flow */
|
||||
%end;
|
||||
|
||||
%if &mdebug=1 %then %do;
|
||||
%put _local_;
|
||||
%end;
|
||||
|
||||
%mend;
|
||||
|
||||
@@ -72,7 +72,7 @@
|
||||
should be in a `_program` variable.
|
||||
@param [out] outds= The output dataset containing the list of states by job
|
||||
(default=work.mv_jobexecute)
|
||||
|
||||
@param [out] outref= A fileref to which the spawned job logs should be appended.
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
@@ -83,6 +83,7 @@
|
||||
@li mf_getuniquefileref.sas
|
||||
@li mf_existvar.sas
|
||||
@li mf_nobs.sas
|
||||
@li mv_getjoblog.sas
|
||||
|
||||
**/
|
||||
|
||||
@@ -91,6 +92,7 @@
|
||||
,grant_type=sas_services
|
||||
,inds=0
|
||||
,outds=work.mv_jobwaitfor
|
||||
,outref=0
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
@@ -134,7 +136,7 @@ options noquotelenmax;
|
||||
data _null_;
|
||||
length jobparams $32767;
|
||||
set &inds end=last;
|
||||
call symputx(cats('joburi',_n_),uri,'l');
|
||||
call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
|
||||
call symputx(cats('jobname',_n_),_program,'l');
|
||||
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||
if last then call symputx('uricnt',_n_,'l');
|
||||
@@ -178,14 +180,20 @@ run;
|
||||
run;
|
||||
|
||||
%if &status=completed or &status=failed or &status=canceled %then %do;
|
||||
%local plainuri;
|
||||
%let plainuri=%substr(&&joburi&i,1,55);
|
||||
proc sql;
|
||||
insert into &outds set
|
||||
_program="&&jobname&i",
|
||||
uri="&&joburi&i",
|
||||
uri="&plainuri",
|
||||
state="&status",
|
||||
timestamp=datetime(),
|
||||
jobparams=symget("jobparams&i");
|
||||
%let joburi&i=0; /* do not re-check */
|
||||
/* fetch log */
|
||||
%if %str(&outref) ne 0 %then %do;
|
||||
%mv_getjoblog(uri=&plainuri,outref=&outref)
|
||||
%end;
|
||||
%end;
|
||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||
data _null_;
|
||||
|
||||
Reference in New Issue
Block a user