mirror of
https://github.com/sasjs/core.git
synced 2025-12-10 14:04:36 +00:00
feat: adding mv_getjobstate macro to fetch the state of a running SAS Viya job
This commit is contained in:
348
all.sas
348
all.sas
@@ -12396,6 +12396,161 @@ data _null_;
|
||||
run;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
%mend;
|
||||
/**
|
||||
@file
|
||||
@brief Extract the status from a running SAS Viya job
|
||||
@details Extracts the status from a running job and writes it to a fileref.
|
||||
An output dataset is created like this:
|
||||
|
||||
| uri | state | timestamp |
|
||||
|---------------------------------------------------------------|---------|--------------------|
|
||||
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Create a long running job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
data ;
|
||||
rand=ranuni(0)*1000;
|
||||
do x=1 to rand;
|
||||
y=rand*4;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
data _null_;
|
||||
call sleep(5,1);
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it, grab the uri, and check status:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
,outds=work.info
|
||||
)
|
||||
|
||||
data _null_;
|
||||
set work.info;
|
||||
if method='GET' and rel='state';
|
||||
call symputx('uri',uri);
|
||||
run;
|
||||
|
||||
%mv_getjobstate(uri=&uri,outds=results)
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
|
||||
exist, it is created.
|
||||
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_getuniquefileref.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
|
||||
,contextName=SAS Job Execution compute context
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
|
||||
/* validation in datastep for better character safety */
|
||||
%local errmsg errflg;
|
||||
data _null_;
|
||||
uri=symget('uri');
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
|
||||
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
%local fname0;
|
||||
%let fname0=%mf_getuniquefileref();
|
||||
|
||||
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
|
||||
headers "Accept"="text/plain"
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end; ;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
|
||||
data;
|
||||
format uri $128. state $32. timestamp datetime19.;
|
||||
infile &fname0;
|
||||
uri="&uri";
|
||||
timestamp=datetime();
|
||||
input;
|
||||
state=_infile_;
|
||||
run;
|
||||
|
||||
proc append base=&outds data=&syslast;
|
||||
run;
|
||||
|
||||
filename &fname0 clear;
|
||||
|
||||
%mend;
|
||||
/**
|
||||
@file mv_getrefreshtoken.sas
|
||||
@@ -12807,6 +12962,199 @@ filename &fname1 clear;
|
||||
libname &libref;
|
||||
|
||||
%mend;/**
|
||||
@file
|
||||
@brief Execute a series of job flows
|
||||
@details Very (very) simple flow manager. Jobs execute in sequential waves,
|
||||
all previous waves must finish successfully.
|
||||
|
||||
The input table is formed as per below. Each observation represents one job.
|
||||
Each variable is converted into a macro variable with the same name.
|
||||
The FLOW column provides the sequential ordering capability.
|
||||
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Next, create some jobs (in this case, as web services):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
data ;
|
||||
rand=ranuni(0)*¯ovar1;
|
||||
do x=1 to rand;
|
||||
y=rand*¯ovar2;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo1)
|
||||
%mv_createwebservice(path=/Public/temp,name=demo2)
|
||||
|
||||
Prepare an input table with 60 executions:
|
||||
|
||||
data work.inputjobs;
|
||||
do flow=1 to 3;
|
||||
do job=1 to 10;
|
||||
_program='/Public/temp/name/demo1';
|
||||
macrovar1=10*job;
|
||||
macrovar2=4*job;
|
||||
output;
|
||||
_program='/Public/temp/name/demo2';
|
||||
macrovar1=40*job;
|
||||
macrovar2=44*job;
|
||||
output;
|
||||
end;
|
||||
end;
|
||||
run;
|
||||
|
||||
Trigger the flow
|
||||
|
||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run
|
||||
@param [out] outds= The output dataset containing the results
|
||||
|
||||
@version VIYA V.03.05
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_getuniquefileref.sas
|
||||
@li mv_getfoldermembers.sas
|
||||
@li ml_json.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_jobflow(outref=0,outfile=0
|
||||
,name=0,path=0
|
||||
,contextName=SAS Job Execution compute context
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
%mp_abort(iftrue=("&path"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job Path not provided)
|
||||
)
|
||||
%mp_abort(iftrue=("&name"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job Name not provided)
|
||||
)
|
||||
%mp_abort(iftrue=("&outfile"="0" and "&outref"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Output destination (file or fileref) must be provided)
|
||||
)
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
data;run;
|
||||
%local foldermembers;
|
||||
%let foldermembers=&syslast;
|
||||
%mv_getfoldermembers(root=&path
|
||||
,access_token_var=&access_token_var
|
||||
,grant_type=&grant_type
|
||||
,outds=&foldermembers
|
||||
)
|
||||
%local joburi;
|
||||
%let joburi=0;
|
||||
data _null_;
|
||||
set &foldermembers;
|
||||
if name="&name" and uri=:'/jobDefinitions/definitions'
|
||||
then call symputx('joburi',uri);
|
||||
run;
|
||||
%mp_abort(iftrue=("&joburi"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job &path/&name not found)
|
||||
)
|
||||
|
||||
/* prepare request*/
|
||||
%local fname1;
|
||||
%let fname1=%mf_getuniquefileref();
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&joburi";
|
||||
headers "Accept"="application/vnd.sas.job.definition+json"
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
%local fname2 fname3 fpath1 fpath2 fpath3;
|
||||
%let fname2=%mf_getuniquefileref();
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname2));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
local job=resp["code"]
|
||||
outfile:write(job)
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
/* export to desired destination */
|
||||
data _null_;
|
||||
%if &outref=0 %then %do;
|
||||
file "&outfile" lrecl=32767;
|
||||
%end;
|
||||
%else %do;
|
||||
file &outref;
|
||||
%end;
|
||||
infile &fname2;
|
||||
input;
|
||||
put _infile_;
|
||||
run;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
%mend;
|
||||
/**
|
||||
@file
|
||||
@brief Takes a dataset of running jobs and waits for them to complete
|
||||
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all
|
||||
|
||||
155
viya/mv_getjobstate.sas
Normal file
155
viya/mv_getjobstate.sas
Normal file
@@ -0,0 +1,155 @@
|
||||
/**
|
||||
@file
|
||||
@brief Extract the status from a running SAS Viya job
|
||||
@details Extracts the status from a running job and writes it to a fileref.
|
||||
An output dataset is created like this:
|
||||
|
||||
| uri | state | timestamp |
|
||||
|---------------------------------------------------------------|---------|--------------------|
|
||||
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Create a long running job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
data ;
|
||||
rand=ranuni(0)*1000;
|
||||
do x=1 to rand;
|
||||
y=rand*4;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
data _null_;
|
||||
call sleep(5,1);
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it, grab the uri, and check status:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
,outds=work.info
|
||||
)
|
||||
|
||||
data _null_;
|
||||
set work.info;
|
||||
if method='GET' and rel='state';
|
||||
call symputx('uri',uri);
|
||||
run;
|
||||
|
||||
%mv_getjobstate(uri=&uri,outds=results)
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
|
||||
exist, it is created.
|
||||
|
||||
|
||||
@version VIYA V.03.04
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_getuniquefileref.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
|
||||
,contextName=SAS Job Execution compute context
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
|
||||
/* validation in datastep for better character safety */
|
||||
%local errmsg errflg;
|
||||
data _null_;
|
||||
uri=symget('uri');
|
||||
if length(uri)<12 then do;
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||
end;
|
||||
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
|
||||
|
||||
call symputx('errflg',1);
|
||||
call symputx('errmsg',
|
||||
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
|
||||
!!" but is actually like: &uri",'l');
|
||||
end;
|
||||
run;
|
||||
|
||||
%mp_abort(iftrue=(&errflg=1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(&errmsg)
|
||||
)
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
%local fname0;
|
||||
%let fname0=%mf_getuniquefileref();
|
||||
|
||||
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
|
||||
headers "Accept"="text/plain"
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end; ;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
%end;
|
||||
|
||||
data;
|
||||
format uri $128. state $32. timestamp datetime19.;
|
||||
infile &fname0;
|
||||
uri="&uri";
|
||||
timestamp=datetime();
|
||||
input;
|
||||
state=_infile_;
|
||||
run;
|
||||
|
||||
proc append base=&outds data=&syslast;
|
||||
run;
|
||||
|
||||
filename &fname0 clear;
|
||||
|
||||
%mend;
|
||||
Reference in New Issue
Block a user