1
0
mirror of https://github.com/sasjs/core.git synced 2026-01-03 15:40:05 +00:00

Compare commits

...

6 Commits

5 changed files with 669 additions and 26 deletions

491
all.sas
View File

@@ -12396,6 +12396,175 @@ data _null_;
run; run;
filename &fname1 clear; filename &fname1 clear;
filename &fname2 clear; filename &fname2 clear;
%mend;
/**
@file
@brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output
dataset with the following structure:
| uri | state | timestamp |
|---------------------------------------------------------------|---------|--------------------|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
To query the running job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a long running job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
data _null_;
call sleep(5,1);
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it, grab the uri, and finally, check the job status:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
data _null_;
set work.info;
if method='GET' and rel='state';
call symputx('uri',uri);
run;
%mv_getjobstate(uri=&uri,outds=results)
You can run this macro as part of a loop to await the final 'completed' status.
The full list of status values is:
@li idle
@li pending
@li running
@li canceled
@li completed
@li failed
If you have one or more jobs that you'd like to wait for completion you can
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outds= The output dataset in which to APPEND the status. Three
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
exist, it is created.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
**/
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
%local fname0;
%let fname0=%mf_getuniquefileref();
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data;
format uri $128. state $32. timestamp datetime19.;
infile &fname0;
uri="&uri";
timestamp=datetime();
input;
state=_infile_;
run;
proc append base=&outds data=&syslast;
run;
filename &fname0 clear;
%mend; %mend;
/** /**
@file mv_getrefreshtoken.sas @file mv_getrefreshtoken.sas
@@ -12767,7 +12936,7 @@ data _null_;
paramstring=symget('paramstring'); paramstring=symget('paramstring');
put '{"jobDefinitionUri":' joburi ; put '{"jobDefinitionUri":' joburi ;
put ' ,"arguments":{"_contextName":' contextname; put ' ,"arguments":{"_contextName":' contextname;
put ' "_program":' _program; put ' ,"_program":' _program;
if paramstring ne "0" then do; if paramstring ne "0" then do;
put ' ,' paramstring; put ' ,' paramstring;
end; end;
@@ -12808,10 +12977,297 @@ libname &libref;
%mend;/** %mend;/**
@file @file
@brief Takes a dataset of running jobs and waits for them to complete @brief Execute a series of job flows
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all @details Very (very) simple flow manager. Jobs execute in sequential waves,
completed. Completion is determined by reference to the returned _state_, as all previous waves must finish successfully.
per the following table:
The input table is formed as per below. Each observation represents one job.
Each variable is converted into a macro variable with the same name.
## Input table (minimum variables needed)
| variable| description |
|---|---|---|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.|
|_PROGRAM|Provides the path to the job itself|
## Output table (minimum variables produced)
| variable| description |
|---|---|---|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.|
|_PROGRAM|Provides the path to the job itself|
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create some jobs (in this case, as web services):
filename ft15f001 temp;
parmcards4;
%put this is job: &_program;
%put this was run in flow &flow_id;
data ;
rand=ranuni(0)*&macrovar1;
do x=1 to rand;
y=rand*&macrovar2;
if y=100 then abort;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo1)
%mv_createwebservice(path=/Public/temp,name=demo2)
Prepare an input table with 60 executions:
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 3;
do i=1 to 20;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results
@version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mf_nobs.sas
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvarlist.sas
@li mv_jobwaitfor.sas
@li mv_jobexecute.sas
**/
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
,maxconcurrency=8
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(Input dataset was not provided)
)
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds:
_CONTEXTNAME FLOW_ID _PROGRAM)
)
%mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer)
)
%local missings;
proc sql noprint;
select count(*) into: missings
from &inds
where flow_id is null or _program is null;
%mp_abort(iftrue=(&missings>0)
,mac=&sysmacroname
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
)
%if %mf_nobs(&inds)=0 %then %do;
%put No observations in &inds! Leaving macro &sysmacroname;
%return;
%end;
/* ensure output table is available */
data &outds;run;
proc sql;
drop table &outds;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* get flows */
proc sort data=&inds;
by flow_id;
run;
data _null_;
set &inds (keep=flow_id) end=last;
by flow_id;
if last.flow_id then do;
cnt+1;
call symputx(cats('flow',cnt),flow_id,'l');
end;
if last then call symputx('flowcnt',cnt,'l');
run;
/* prepare temporary datasets and frefs */
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
data;run;%let jds=&syslast;
data;run;%let jjson=&syslast;
data;run;%let jdsapp=&syslast;
data;run;%let jdsrunning=&syslast;
data;run;%let jdswaitfor=&syslast;
%let jfref=%mf_getuniquefileref();
/* start loop */
%do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local joburi&jid;
%let joburi&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
%end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%let job&jid=0;
%put &&job&jid in flow &fid completed!;
%end;
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
run;
proc append base=&outds data=&jdsapp;
run;
%end;
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed jobs completed, &concurrency jobs running;
%end;
%end;
%end;
/* back up and execute the next flow */
%end;
%mend;
/**
@file
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
jobs are completed. Completion is determined by reference to the returned
_state_, as per the following table:
| state | Wait? | Notes| | state | Wait? | Notes|
|-----------|-------|------| |-----------|-------|------|
@@ -12857,8 +13313,7 @@ libname &libref;
where method='GET' and rel='state'; where method='GET' and rel='state';
run; run;
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates) %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
Delete the job: Delete the job:
@@ -12873,6 +13328,8 @@ libname &libref;
a SASStudioV session else authorization_code. Default option. a SASStudioV session else authorization_code. Default option.
- sas_services - will use oauth_bearer=sas_services - sas_services - will use oauth_bearer=sas_services
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
completes, processing will continue). Default=ALL.
@param [in] inds= The input dataset containing the list of job uris, in the @param [in] inds= The input dataset containing the list of job uris, in the
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
job name. The uri should be in a `uri` variable, and the job path/name job name. The uri should be in a `uri` variable, and the job path/name
@@ -12893,8 +13350,8 @@ libname &libref;
**/ **/
%macro mv_jobwaitfor( %macro mv_jobwaitfor(action
access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
@@ -12939,17 +13396,24 @@ options noquotelenmax;
%let base_uri=%mf_getplatform(VIYARESTAPI); %let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_; data _null_;
length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
run; run;
%local runcnt;
%if &action=ALL %then %let runcnt=&uricnt;
%else %if &action=ANY %then %let runcnt=1;
%else %let runcnt=&uricnt;
%local fname0 ; %local fname0 ;
%let fname0=%mf_getuniquefileref(); %let fname0=%mf_getuniquefileref();
data &outds; data &outds;
format _program uri $128. state $32. timestamp datetime19.; format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop; stop;
run; run;
@@ -12983,8 +13447,9 @@ run;
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&&joburi&i",
state="&status", state="&status",
timestamp=datetime(); timestamp=datetime(),
%let joburi&i=0; jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;
@@ -13002,7 +13467,7 @@ run;
%let goback=0; %let goback=0;
proc sql noprint; proc sql noprint;
select count(*) into:goback from &outds; select count(*) into:goback from &outds;
%if &goback ne &uricnt %then %let i=0; %if &goback lt &runcnt %then %let i=0;
%end; %end;
%end; %end;

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 KiB

169
viya/mv_getjobstate.sas Normal file
View File

@@ -0,0 +1,169 @@
/**
@file
@brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output
dataset with the following structure:
| uri | state | timestamp |
|---------------------------------------------------------------|---------|--------------------|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
To query the running job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a long running job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
data _null_;
call sleep(5,1);
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it, grab the uri, and finally, check the job status:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
data _null_;
set work.info;
if method='GET' and rel='state';
call symputx('uri',uri);
run;
%mv_getjobstate(uri=&uri,outds=results)
You can run this macro as part of a loop to await the final 'completed' status.
The full list of status values is:
@li idle
@li pending
@li running
@li canceled
@li completed
@li failed
If you have one or more jobs that you'd like to wait for completion you can
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outds= The output dataset in which to APPEND the status. Three
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
exist, it is created.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
**/
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
%local fname0;
%let fname0=%mf_getuniquefileref();
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data;
format uri $128. state $32. timestamp datetime19.;
infile &fname0;
uri="&uri";
timestamp=datetime();
input;
state=_infile_;
run;
proc append base=&outds data=&syslast;
run;
filename &fname0 clear;
%mend;

View File

@@ -130,7 +130,7 @@ data _null_;
paramstring=symget('paramstring'); paramstring=symget('paramstring');
put '{"jobDefinitionUri":' joburi ; put '{"jobDefinitionUri":' joburi ;
put ' ,"arguments":{"_contextName":' contextname; put ' ,"arguments":{"_contextName":' contextname;
put ' "_program":' _program; put ' ,"_program":' _program;
if paramstring ne "0" then do; if paramstring ne "0" then do;
put ' ,' paramstring; put ' ,' paramstring;
end; end;

View File

@@ -1,9 +1,9 @@
/** /**
@file @file
@brief Takes a dataset of running jobs and waits for them to complete @brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all @details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
completed. Completion is determined by reference to the returned _state_, as jobs are completed. Completion is determined by reference to the returned
per the following table: _state_, as per the following table:
| state | Wait? | Notes| | state | Wait? | Notes|
|-----------|-------|------| |-----------|-------|------|
@@ -49,8 +49,7 @@
where method='GET' and rel='state'; where method='GET' and rel='state';
run; run;
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates) %mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
Delete the job: Delete the job:
@@ -65,6 +64,8 @@
a SASStudioV session else authorization_code. Default option. a SASStudioV session else authorization_code. Default option.
- sas_services - will use oauth_bearer=sas_services - sas_services - will use oauth_bearer=sas_services
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
completes, processing will continue). Default=ALL.
@param [in] inds= The input dataset containing the list of job uris, in the @param [in] inds= The input dataset containing the list of job uris, in the
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
job name. The uri should be in a `uri` variable, and the job path/name job name. The uri should be in a `uri` variable, and the job path/name
@@ -85,8 +86,8 @@
**/ **/
%macro mv_jobwaitfor( %macro mv_jobwaitfor(action
access_token_var=ACCESS_TOKEN ,access_token_var=ACCESS_TOKEN
,grant_type=sas_services ,grant_type=sas_services
,inds=0 ,inds=0
,outds=work.mv_jobwaitfor ,outds=work.mv_jobwaitfor
@@ -131,17 +132,24 @@ options noquotelenmax;
%let base_uri=%mf_getplatform(VIYARESTAPI); %let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_; data _null_;
length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
run; run;
%local runcnt;
%if &action=ALL %then %let runcnt=&uricnt;
%else %if &action=ANY %then %let runcnt=1;
%else %let runcnt=&uricnt;
%local fname0 ; %local fname0 ;
%let fname0=%mf_getuniquefileref(); %let fname0=%mf_getuniquefileref();
data &outds; data &outds;
format _program uri $128. state $32. timestamp datetime19.; format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop; stop;
run; run;
@@ -175,8 +183,9 @@ run;
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&&joburi&i",
state="&status", state="&status",
timestamp=datetime(); timestamp=datetime(),
%let joburi&i=0; jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;
data _null_; data _null_;
@@ -194,7 +203,7 @@ run;
%let goback=0; %let goback=0;
proc sql noprint; proc sql noprint;
select count(*) into:goback from &outds; select count(*) into:goback from &outds;
%if &goback ne &uricnt %then %let i=0; %if &goback lt &runcnt %then %let i=0;
%end; %end;
%end; %end;