mirror of
https://github.com/sasjs/core.git
synced 2025-12-15 08:14:34 +00:00
Compare commits
10 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 94762d9381 | |||
| 03d9d805ff | |||
| 94416028b7 | |||
| 6cf5d4ef28 | |||
| e4ceaecfb2 | |||
|
|
2eb246c543 | ||
| d9954ae777 | |||
| 364dc9f07f | |||
| d96125c3cf | |||
| 506695be56 |
704
all.sas
704
all.sas
@@ -12396,6 +12396,175 @@ data _null_;
|
|||||||
run;
|
run;
|
||||||
filename &fname1 clear;
|
filename &fname1 clear;
|
||||||
filename &fname2 clear;
|
filename &fname2 clear;
|
||||||
|
%mend;
|
||||||
|
/**
|
||||||
|
@file
|
||||||
|
@brief Extract the status from a running SAS Viya job
|
||||||
|
@details Extracts the status from a running job and appends it to an output
|
||||||
|
dataset with the following structure:
|
||||||
|
|
||||||
|
| uri | state | timestamp |
|
||||||
|
|---------------------------------------------------------------|---------|--------------------|
|
||||||
|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||||
|
|
||||||
|
To query the running job, you need the URI. Sample code for achieving this
|
||||||
|
is provided below.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create a long running job (in this case, a web service):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*1000;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*4;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
call sleep(5,1);
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
Execute it, grab the uri, and finally, check the job status:
|
||||||
|
|
||||||
|
%mv_jobexecute(path=/Public/temp
|
||||||
|
,name=demo
|
||||||
|
,outds=work.info
|
||||||
|
)
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
set work.info;
|
||||||
|
if method='GET' and rel='state';
|
||||||
|
call symputx('uri',uri);
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mv_getjobstate(uri=&uri,outds=results)
|
||||||
|
|
||||||
|
You can run this macro as part of a loop to await the final 'completed' status.
|
||||||
|
The full list of status values is:
|
||||||
|
|
||||||
|
@li idle
|
||||||
|
@li pending
|
||||||
|
@li running
|
||||||
|
@li canceled
|
||||||
|
@li completed
|
||||||
|
@li failed
|
||||||
|
|
||||||
|
If you have one or more jobs that you'd like to wait for completion you can
|
||||||
|
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
@li password
|
||||||
|
@li authorization_code
|
||||||
|
@li detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
@li sas_services - will use oauth_bearer=sas_services.
|
||||||
|
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||||
|
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||||
|
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||||
|
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
|
||||||
|
exist, it is created.
|
||||||
|
|
||||||
|
|
||||||
|
@version VIYA V.03.04
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> SAS Macros </h4>
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
|
||||||
|
,contextName=SAS Job Execution compute context
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
%put &sysmacroname: grant_type=&grant_type;
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
/* validation in datastep for better character safety */
|
||||||
|
%local errmsg errflg;
|
||||||
|
data _null_;
|
||||||
|
uri=symget('uri');
|
||||||
|
if length(uri)<12 then do;
|
||||||
|
call symputx('errflg',1);
|
||||||
|
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||||
|
end;
|
||||||
|
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
|
||||||
|
|
||||||
|
call symputx('errflg',1);
|
||||||
|
call symputx('errmsg',
|
||||||
|
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
|
||||||
|
!!" but is actually like: &uri",'l');
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&errflg=1)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(&errmsg)
|
||||||
|
)
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
%local fname0;
|
||||||
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
|
||||||
|
headers "Accept"="text/plain"
|
||||||
|
%if &grant_type=authorization_code %then %do;
|
||||||
|
"Authorization"="Bearer &&&access_token_var"
|
||||||
|
%end; ;
|
||||||
|
run;
|
||||||
|
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||||
|
%do;
|
||||||
|
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
|
||||||
|
data;
|
||||||
|
format uri $128. state $32. timestamp datetime19.;
|
||||||
|
infile &fname0;
|
||||||
|
uri="&uri";
|
||||||
|
timestamp=datetime();
|
||||||
|
input;
|
||||||
|
state=_infile_;
|
||||||
|
run;
|
||||||
|
|
||||||
|
proc append base=&outds data=&syslast;
|
||||||
|
run;
|
||||||
|
|
||||||
|
filename &fname0 clear;
|
||||||
|
|
||||||
%mend;
|
%mend;
|
||||||
/**
|
/**
|
||||||
@file mv_getrefreshtoken.sas
|
@file mv_getrefreshtoken.sas
|
||||||
@@ -12660,23 +12829,24 @@ libname &libref1 clear;
|
|||||||
,paramstring=%str("macvarname":"macvarvalue","answer":42)
|
,paramstring=%str("macvarname":"macvarvalue","answer":42)
|
||||||
)
|
)
|
||||||
|
|
||||||
@param access_token_var= The global macro variable to contain the access token
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
@param grant_type= valid values:
|
@param [in] grant_type= valid values:
|
||||||
* password
|
|
||||||
* authorization_code
|
|
||||||
* detect - will check if access_token exists, if not will use sas_services if
|
|
||||||
a SASStudioV session else authorization_code. Default option.
|
|
||||||
* sas_services - will use oauth_bearer=sas_services
|
|
||||||
|
|
||||||
@param path= The SAS Drive path to the job being executed
|
* password
|
||||||
@param name= The name of the job to execute
|
* authorization_code
|
||||||
@param paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
|
* detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
* sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] path= The SAS Drive path to the job being executed
|
||||||
|
@param [in] name= The name of the job to execute
|
||||||
|
@param [in] paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
|
||||||
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
|
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
|
||||||
|
|
||||||
@param contextName= Context name with which to run the job.
|
@param [in] contextName= Context name with which to run the job.
|
||||||
Default = `SAS Job Execution compute context`
|
Default = `SAS Job Execution compute context`
|
||||||
|
|
||||||
@param outds= The output dataset containing links (Default=work.mv_jobexecute)
|
@param [out] outds= The output dataset containing links (Default=work.mv_jobexecute)
|
||||||
|
|
||||||
|
|
||||||
@version VIYA V.03.04
|
@version VIYA V.03.04
|
||||||
@@ -12708,7 +12878,7 @@ libname &libref1 clear;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -12762,9 +12932,11 @@ data _null_;
|
|||||||
length joburi contextname $128 paramstring $32765;
|
length joburi contextname $128 paramstring $32765;
|
||||||
joburi=quote(trim(symget('joburi')));
|
joburi=quote(trim(symget('joburi')));
|
||||||
contextname=quote(trim(symget('contextname')));
|
contextname=quote(trim(symget('contextname')));
|
||||||
|
_program=quote("&path/&name");
|
||||||
paramstring=symget('paramstring');
|
paramstring=symget('paramstring');
|
||||||
put '{"jobDefinitionUri":' joburi ;
|
put '{"jobDefinitionUri":' joburi ;
|
||||||
put ' ,"arguments":{"_contextName":' contextname;
|
put ' ,"arguments":{"_contextName":' contextname;
|
||||||
|
put ' ,"_program":' _program;
|
||||||
if paramstring ne "0" then do;
|
if paramstring ne "0" then do;
|
||||||
put ' ,' paramstring;
|
put ' ,' paramstring;
|
||||||
end;
|
end;
|
||||||
@@ -12795,6 +12967,7 @@ libname &libref JSON fileref=&fname1;
|
|||||||
|
|
||||||
data &outds;
|
data &outds;
|
||||||
set &libref..links;
|
set &libref..links;
|
||||||
|
_program="&path/&name";
|
||||||
run;
|
run;
|
||||||
|
|
||||||
/* clear refs */
|
/* clear refs */
|
||||||
@@ -12802,6 +12975,511 @@ filename &fname0 clear;
|
|||||||
filename &fname1 clear;
|
filename &fname1 clear;
|
||||||
libname &libref;
|
libname &libref;
|
||||||
|
|
||||||
|
%mend;/**
|
||||||
|
@file
|
||||||
|
@brief Execute a series of job flows
|
||||||
|
@details Very (very) simple flow manager. Jobs execute in sequential waves,
|
||||||
|
all previous waves must finish successfully.
|
||||||
|
|
||||||
|
The input table is formed as per below. Each observation represents one job.
|
||||||
|
Each variable is converted into a macro variable with the same name.
|
||||||
|
|
||||||
|
## Input table (minimum variables needed)
|
||||||
|
|
||||||
|
| variable| description |
|
||||||
|
|---|---|---|
|
||||||
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
|
## Output table (minimum variables produced)
|
||||||
|
|
||||||
|
| variable| description |
|
||||||
|
|---|---|---|
|
||||||
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url
|
||||||
|
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create some jobs (in this case, as web services):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
%put this is job: &_program;
|
||||||
|
%put this was run in flow &flow_id;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*¯ovar1;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*¯ovar2;
|
||||||
|
if y=100 then abort;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo1)
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo2)
|
||||||
|
|
||||||
|
Prepare an input table with 60 executions:
|
||||||
|
|
||||||
|
data work.inputjobs;
|
||||||
|
_contextName='SAS Job Execution compute context';
|
||||||
|
do flow_id=1 to 3;
|
||||||
|
do i=1 to 20;
|
||||||
|
_program='/Public/temp/demo1';
|
||||||
|
macrovar1=10*i;
|
||||||
|
macrovar2=4*i;
|
||||||
|
output;
|
||||||
|
i+1;
|
||||||
|
_program='/Public/temp/demo2';
|
||||||
|
macrovar1=40*i;
|
||||||
|
macrovar2=44*i;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
|
||||||
|
Trigger the flow
|
||||||
|
|
||||||
|
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||||
|
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
@li password
|
||||||
|
@li authorization_code
|
||||||
|
@li detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
@li sas_services - will use oauth_bearer=sas_services
|
||||||
|
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||||
|
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||||
|
@param [out] outds= The output dataset containing the results
|
||||||
|
|
||||||
|
@version VIYA V.03.05
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> SAS Macros </h4>
|
||||||
|
@li mf_nobs.sas
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
@li mf_existvarlist.sas
|
||||||
|
@li mv_jobwaitfor.sas
|
||||||
|
@li mv_jobexecute.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
||||||
|
,maxconcurrency=8
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
%mp_abort(iftrue=("&inds"="0")
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Input dataset was not provided)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The following columns must exist on input dataset &inds:
|
||||||
|
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||||
|
)
|
||||||
|
|
||||||
|
%local missings;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: missings
|
||||||
|
from &inds
|
||||||
|
where flow_id is null or _program is null;
|
||||||
|
%mp_abort(iftrue=(&missings>0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||||
|
)
|
||||||
|
|
||||||
|
%if %mf_nobs(&inds)=0 %then %do;
|
||||||
|
%put No observations in &inds! Leaving macro &sysmacroname;
|
||||||
|
%return;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* ensure output table is available */
|
||||||
|
data &outds;run;
|
||||||
|
proc sql;
|
||||||
|
drop table &outds;
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
|
||||||
|
/* get flows */
|
||||||
|
proc sort data=&inds;
|
||||||
|
by flow_id;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
set &inds (keep=flow_id) end=last;
|
||||||
|
by flow_id;
|
||||||
|
if last.flow_id then do;
|
||||||
|
cnt+1;
|
||||||
|
call symputx(cats('flow',cnt),flow_id,'l');
|
||||||
|
end;
|
||||||
|
if last then call symputx('flowcnt',cnt,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
/* prepare temporary datasets and frefs */
|
||||||
|
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
|
||||||
|
data;run;%let jds=&syslast;
|
||||||
|
data;run;%let jjson=&syslast;
|
||||||
|
data;run;%let jdsapp=&syslast;
|
||||||
|
data;run;%let jdsrunning=&syslast;
|
||||||
|
data;run;%let jdswaitfor=&syslast;
|
||||||
|
%let jfref=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
/* start loop */
|
||||||
|
%do fid=1 %to &flowcnt;
|
||||||
|
%put preparing job attributes for flow &&flow&fid;
|
||||||
|
%local jds jcnt;
|
||||||
|
data &jds(drop=_contextName _program);
|
||||||
|
set &inds(where=(flow_id=&&flow&fid));
|
||||||
|
if _contextName='' then _contextName="SAS Job Execution compute context";
|
||||||
|
call symputx(cats('job',_n_),_program,'l');
|
||||||
|
call symputx(cats('context',_n_),_contextName,'l');
|
||||||
|
call symputx('jcnt',_n_,'l');
|
||||||
|
run;
|
||||||
|
%put exporting job variables in json format;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
data &jjson;
|
||||||
|
set &jds;
|
||||||
|
if _n_=&jid then do;
|
||||||
|
output;
|
||||||
|
stop;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
proc json out=&jfref;
|
||||||
|
export &jjson / nosastags fmtnumeric;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
infile &jfref lrecl=32767;
|
||||||
|
input;
|
||||||
|
jparams='jparams'!!left(symget('jid'));
|
||||||
|
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||||
|
run;
|
||||||
|
%local joburi&jid;
|
||||||
|
%let joburi&jid=0; /* used in next loop */
|
||||||
|
%end;
|
||||||
|
%local concurrency completed;
|
||||||
|
%let concurrency=0;
|
||||||
|
%let completed=0;
|
||||||
|
proc sql; drop table &jdsrunning;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
/**
|
||||||
|
* now we can execute the jobs up to the maxconcurrency setting
|
||||||
|
*/
|
||||||
|
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||||
|
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||||
|
/* job has not been triggered and we have free slots */
|
||||||
|
%local jobname jobpath;
|
||||||
|
%let jobname=%scan(&&job&jid,-1,/);
|
||||||
|
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||||
|
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
|
||||||
|
%mv_jobexecute(path=&jobpath
|
||||||
|
,name=&jobname
|
||||||
|
,paramstring=%superq(jparams&jid)
|
||||||
|
,outds=&jdsapp
|
||||||
|
)
|
||||||
|
data &jdsapp;
|
||||||
|
format jobparams $32767.;
|
||||||
|
set &jdsapp(where=(method='GET' and rel='state'));
|
||||||
|
jobparams=symget("jparams&jid");
|
||||||
|
call symputx("joburi&jid",uri,'l');
|
||||||
|
run;
|
||||||
|
proc append base=&jdsrunning data=&jdsapp;
|
||||||
|
run;
|
||||||
|
%let concurrency=%eval(&concurrency+1);
|
||||||
|
%end;
|
||||||
|
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||||
|
/* check to see if the job has finished as was previously executed */
|
||||||
|
%local jobcheck; %let jobcheck=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: jobcheck
|
||||||
|
from &outds where uri="&&joburi&jid";
|
||||||
|
%if &jobcheck>0 %then %do;
|
||||||
|
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||||
|
%let job&jid=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%if &jid=&jcnt %then %do;
|
||||||
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||||
|
%local done;
|
||||||
|
%let done=%mf_nobs(&jdswaitfor);
|
||||||
|
%if &done>0 %then %do;
|
||||||
|
%let completed=%eval(&completed+&done);
|
||||||
|
%let concurrency=%eval(&concurrency-&done);
|
||||||
|
data &jdsapp;
|
||||||
|
set &jdswaitfor;
|
||||||
|
flow_id=&&flow&fid;
|
||||||
|
run;
|
||||||
|
proc append base=&outds data=&jdsapp;
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
proc sql;
|
||||||
|
delete from &jdsrunning
|
||||||
|
where uri in (select uri from &outds
|
||||||
|
where state in ('canceled','completed','failed')
|
||||||
|
);
|
||||||
|
|
||||||
|
/* loop again if jobs are left */
|
||||||
|
%if &completed < &jcnt %then %do;
|
||||||
|
%let jid=0;
|
||||||
|
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
/* back up and execute the next flow */
|
||||||
|
%end;
|
||||||
|
|
||||||
|
|
||||||
|
%mend;
|
||||||
|
/**
|
||||||
|
@file
|
||||||
|
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||||
|
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||||
|
jobs are completed. Completion is determined by reference to the returned
|
||||||
|
_state_, as per the following table:
|
||||||
|
|
||||||
|
| state | Wait? | Notes|
|
||||||
|
|-----------|-------|------|
|
||||||
|
| idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
|
||||||
|
| pending | yes | Job is preparing to run |
|
||||||
|
| running | yes | Job is running|
|
||||||
|
| canceled | no | Job was cancelled|
|
||||||
|
| completed | no | Job finished - does not mean it was successful. Check stateDetails|
|
||||||
|
| failed | no | Job failed to execute, could be a problem when calling the apis|
|
||||||
|
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url
|
||||||
|
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create a job (in this case, as a web service):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*1000000;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*x;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
Then, execute the job,multiple times, and wait for them all to finish:
|
||||||
|
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
|
||||||
|
|
||||||
|
data work.jobs;
|
||||||
|
set work.ds1 work.ds2 work.ds3 work.ds4;
|
||||||
|
where method='GET' and rel='state';
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||||
|
|
||||||
|
Delete the job:
|
||||||
|
|
||||||
|
%mv_deletejes(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
|
||||||
|
- password
|
||||||
|
- authorization_code
|
||||||
|
- detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
- sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||||
|
completes, processing will continue). Default=ALL.
|
||||||
|
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||||
|
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||||
|
job name. The uri should be in a `uri` variable, and the job path/name
|
||||||
|
should be in a `_program` variable.
|
||||||
|
@param [out] outds= The output dataset containing the list of states by job
|
||||||
|
(default=work.mv_jobexecute)
|
||||||
|
|
||||||
|
|
||||||
|
@version VIYA V.03.04
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> Dependencies </h4>
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
@li mf_existvar.sas
|
||||||
|
@li mf_nobs.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_jobwaitfor(action
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
,inds=0
|
||||||
|
,outds=work.mv_jobwaitfor
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
%mp_abort(iftrue=("&inds"="0")
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(input dataset not provided)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The URI variable was not found in the input dataset(&inds))
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
|
||||||
|
)
|
||||||
|
|
||||||
|
%if %mf_nobs(&inds)=0 %then %do;
|
||||||
|
%put NOTE: Zero observations in &inds, &sysmacroname will now exit;
|
||||||
|
%return;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
length jobparams $32767;
|
||||||
|
set &inds end=last;
|
||||||
|
call symputx(cats('joburi',_n_),uri,'l');
|
||||||
|
call symputx(cats('jobname',_n_),_program,'l');
|
||||||
|
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||||
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
%local runcnt;
|
||||||
|
%if &action=ALL %then %let runcnt=&uricnt;
|
||||||
|
%else %if &action=ANY %then %let runcnt=1;
|
||||||
|
%else %let runcnt=&uricnt;
|
||||||
|
|
||||||
|
%local fname0 ;
|
||||||
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
data &outds;
|
||||||
|
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||||
|
stop;
|
||||||
|
run;
|
||||||
|
|
||||||
|
%local i;
|
||||||
|
%do i=1 %to &uricnt;
|
||||||
|
%if "&&joburi&i" ne "0" %then %do;
|
||||||
|
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
|
||||||
|
headers "Accept"="text/plain"
|
||||||
|
%if &grant_type=authorization_code %then %do;
|
||||||
|
"Authorization"="Bearer &&&access_token_var"
|
||||||
|
%end; ;
|
||||||
|
run;
|
||||||
|
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||||
|
%do;
|
||||||
|
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%let status=notset;
|
||||||
|
data _null_;
|
||||||
|
infile &fname0;
|
||||||
|
input;
|
||||||
|
call symputx('status',_infile_,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
%if &status=completed or &status=failed or &status=canceled %then %do;
|
||||||
|
proc sql;
|
||||||
|
insert into &outds set
|
||||||
|
_program="&&jobname&i",
|
||||||
|
uri="&&joburi&i",
|
||||||
|
state="&status",
|
||||||
|
timestamp=datetime(),
|
||||||
|
jobparams=symget("jobparams&i");
|
||||||
|
%let joburi&i=0; /* do not re-check */
|
||||||
|
%end;
|
||||||
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
|
data _null_;
|
||||||
|
call sleep(1,1);
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
%else %do;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(status &status not expected!!)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%if &i=&uricnt %then %do;
|
||||||
|
%local goback;
|
||||||
|
%let goback=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into:goback from &outds;
|
||||||
|
%if &goback lt &runcnt %then %let i=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* clear refs */
|
||||||
|
filename &fname0 clear;
|
||||||
|
|
||||||
%mend;/**
|
%mend;/**
|
||||||
@file mv_registerclient.sas
|
@file mv_registerclient.sas
|
||||||
@brief Register Client and Secret (admin task)
|
@brief Register Client and Secret (admin task)
|
||||||
|
|||||||
BIN
doxy/favicon.ico
BIN
doxy/favicon.ico
Binary file not shown.
|
Before Width: | Height: | Size: 1.1 KiB |
169
viya/mv_getjobstate.sas
Normal file
169
viya/mv_getjobstate.sas
Normal file
@@ -0,0 +1,169 @@
|
|||||||
|
/**
|
||||||
|
@file
|
||||||
|
@brief Extract the status from a running SAS Viya job
|
||||||
|
@details Extracts the status from a running job and appends it to an output
|
||||||
|
dataset with the following structure:
|
||||||
|
|
||||||
|
| uri | state | timestamp |
|
||||||
|
|---------------------------------------------------------------|---------|--------------------|
|
||||||
|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||||
|
|
||||||
|
To query the running job, you need the URI. Sample code for achieving this
|
||||||
|
is provided below.
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create a long running job (in this case, a web service):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*1000;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*4;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
call sleep(5,1);
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
Execute it, grab the uri, and finally, check the job status:
|
||||||
|
|
||||||
|
%mv_jobexecute(path=/Public/temp
|
||||||
|
,name=demo
|
||||||
|
,outds=work.info
|
||||||
|
)
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
set work.info;
|
||||||
|
if method='GET' and rel='state';
|
||||||
|
call symputx('uri',uri);
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mv_getjobstate(uri=&uri,outds=results)
|
||||||
|
|
||||||
|
You can run this macro as part of a loop to await the final 'completed' status.
|
||||||
|
The full list of status values is:
|
||||||
|
|
||||||
|
@li idle
|
||||||
|
@li pending
|
||||||
|
@li running
|
||||||
|
@li canceled
|
||||||
|
@li completed
|
||||||
|
@li failed
|
||||||
|
|
||||||
|
If you have one or more jobs that you'd like to wait for completion you can
|
||||||
|
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
@li password
|
||||||
|
@li authorization_code
|
||||||
|
@li detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
@li sas_services - will use oauth_bearer=sas_services.
|
||||||
|
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||||
|
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||||
|
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||||
|
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
|
||||||
|
exist, it is created.
|
||||||
|
|
||||||
|
|
||||||
|
@version VIYA V.03.04
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> SAS Macros </h4>
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
|
||||||
|
,contextName=SAS Job Execution compute context
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
%put &sysmacroname: grant_type=&grant_type;
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
/* validation in datastep for better character safety */
|
||||||
|
%local errmsg errflg;
|
||||||
|
data _null_;
|
||||||
|
uri=symget('uri');
|
||||||
|
if length(uri)<12 then do;
|
||||||
|
call symputx('errflg',1);
|
||||||
|
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
|
||||||
|
end;
|
||||||
|
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
|
||||||
|
|
||||||
|
call symputx('errflg',1);
|
||||||
|
call symputx('errmsg',
|
||||||
|
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
|
||||||
|
!!" but is actually like: &uri",'l');
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&errflg=1)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(&errmsg)
|
||||||
|
)
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
%local fname0;
|
||||||
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
|
||||||
|
headers "Accept"="text/plain"
|
||||||
|
%if &grant_type=authorization_code %then %do;
|
||||||
|
"Authorization"="Bearer &&&access_token_var"
|
||||||
|
%end; ;
|
||||||
|
run;
|
||||||
|
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||||
|
%do;
|
||||||
|
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
|
||||||
|
data;
|
||||||
|
format uri $128. state $32. timestamp datetime19.;
|
||||||
|
infile &fname0;
|
||||||
|
uri="&uri";
|
||||||
|
timestamp=datetime();
|
||||||
|
input;
|
||||||
|
state=_infile_;
|
||||||
|
run;
|
||||||
|
|
||||||
|
proc append base=&outds data=&syslast;
|
||||||
|
run;
|
||||||
|
|
||||||
|
filename &fname0 clear;
|
||||||
|
|
||||||
|
%mend;
|
||||||
@@ -23,23 +23,24 @@
|
|||||||
,paramstring=%str("macvarname":"macvarvalue","answer":42)
|
,paramstring=%str("macvarname":"macvarvalue","answer":42)
|
||||||
)
|
)
|
||||||
|
|
||||||
@param access_token_var= The global macro variable to contain the access token
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
@param grant_type= valid values:
|
@param [in] grant_type= valid values:
|
||||||
* password
|
|
||||||
* authorization_code
|
|
||||||
* detect - will check if access_token exists, if not will use sas_services if
|
|
||||||
a SASStudioV session else authorization_code. Default option.
|
|
||||||
* sas_services - will use oauth_bearer=sas_services
|
|
||||||
|
|
||||||
@param path= The SAS Drive path to the job being executed
|
* password
|
||||||
@param name= The name of the job to execute
|
* authorization_code
|
||||||
@param paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
|
* detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
* sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] path= The SAS Drive path to the job being executed
|
||||||
|
@param [in] name= The name of the job to execute
|
||||||
|
@param [in] paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
|
||||||
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
|
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
|
||||||
|
|
||||||
@param contextName= Context name with which to run the job.
|
@param [in] contextName= Context name with which to run the job.
|
||||||
Default = `SAS Job Execution compute context`
|
Default = `SAS Job Execution compute context`
|
||||||
|
|
||||||
@param outds= The output dataset containing links (Default=work.mv_jobexecute)
|
@param [out] outds= The output dataset containing links (Default=work.mv_jobexecute)
|
||||||
|
|
||||||
|
|
||||||
@version VIYA V.03.04
|
@version VIYA V.03.04
|
||||||
@@ -71,7 +72,7 @@
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -125,9 +126,11 @@ data _null_;
|
|||||||
length joburi contextname $128 paramstring $32765;
|
length joburi contextname $128 paramstring $32765;
|
||||||
joburi=quote(trim(symget('joburi')));
|
joburi=quote(trim(symget('joburi')));
|
||||||
contextname=quote(trim(symget('contextname')));
|
contextname=quote(trim(symget('contextname')));
|
||||||
|
_program=quote("&path/&name");
|
||||||
paramstring=symget('paramstring');
|
paramstring=symget('paramstring');
|
||||||
put '{"jobDefinitionUri":' joburi ;
|
put '{"jobDefinitionUri":' joburi ;
|
||||||
put ' ,"arguments":{"_contextName":' contextname;
|
put ' ,"arguments":{"_contextName":' contextname;
|
||||||
|
put ' ,"_program":' _program;
|
||||||
if paramstring ne "0" then do;
|
if paramstring ne "0" then do;
|
||||||
put ' ,' paramstring;
|
put ' ,' paramstring;
|
||||||
end;
|
end;
|
||||||
@@ -158,6 +161,7 @@ libname &libref JSON fileref=&fname1;
|
|||||||
|
|
||||||
data &outds;
|
data &outds;
|
||||||
set &libref..links;
|
set &libref..links;
|
||||||
|
_program="&path/&name";
|
||||||
run;
|
run;
|
||||||
|
|
||||||
/* clear refs */
|
/* clear refs */
|
||||||
|
|||||||
293
viya/mv_jobflow.sas
Normal file
293
viya/mv_jobflow.sas
Normal file
@@ -0,0 +1,293 @@
|
|||||||
|
/**
|
||||||
|
@file
|
||||||
|
@brief Execute a series of job flows
|
||||||
|
@details Very (very) simple flow manager. Jobs execute in sequential waves,
|
||||||
|
all previous waves must finish successfully.
|
||||||
|
|
||||||
|
The input table is formed as per below. Each observation represents one job.
|
||||||
|
Each variable is converted into a macro variable with the same name.
|
||||||
|
|
||||||
|
## Input table (minimum variables needed)
|
||||||
|
|
||||||
|
| variable| description |
|
||||||
|
|---|---|---|
|
||||||
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
|
## Output table (minimum variables produced)
|
||||||
|
|
||||||
|
| variable| description |
|
||||||
|
|---|---|---|
|
||||||
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url
|
||||||
|
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create some jobs (in this case, as web services):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
%put this is job: &_program;
|
||||||
|
%put this was run in flow &flow_id;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*¯ovar1;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*¯ovar2;
|
||||||
|
if y=100 then abort;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo1)
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo2)
|
||||||
|
|
||||||
|
Prepare an input table with 60 executions:
|
||||||
|
|
||||||
|
data work.inputjobs;
|
||||||
|
_contextName='SAS Job Execution compute context';
|
||||||
|
do flow_id=1 to 3;
|
||||||
|
do i=1 to 20;
|
||||||
|
_program='/Public/temp/demo1';
|
||||||
|
macrovar1=10*i;
|
||||||
|
macrovar2=4*i;
|
||||||
|
output;
|
||||||
|
i+1;
|
||||||
|
_program='/Public/temp/demo2';
|
||||||
|
macrovar1=40*i;
|
||||||
|
macrovar2=44*i;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
|
||||||
|
Trigger the flow
|
||||||
|
|
||||||
|
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||||
|
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
@li password
|
||||||
|
@li authorization_code
|
||||||
|
@li detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
@li sas_services - will use oauth_bearer=sas_services
|
||||||
|
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||||
|
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||||
|
@param [out] outds= The output dataset containing the results
|
||||||
|
|
||||||
|
@version VIYA V.03.05
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> SAS Macros </h4>
|
||||||
|
@li mf_nobs.sas
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
@li mf_existvarlist.sas
|
||||||
|
@li mv_jobwaitfor.sas
|
||||||
|
@li mv_jobexecute.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
||||||
|
,maxconcurrency=8
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
%mp_abort(iftrue=("&inds"="0")
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Input dataset was not provided)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The following columns must exist on input dataset &inds:
|
||||||
|
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||||
|
)
|
||||||
|
|
||||||
|
%local missings;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: missings
|
||||||
|
from &inds
|
||||||
|
where flow_id is null or _program is null;
|
||||||
|
%mp_abort(iftrue=(&missings>0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||||
|
)
|
||||||
|
|
||||||
|
%if %mf_nobs(&inds)=0 %then %do;
|
||||||
|
%put No observations in &inds! Leaving macro &sysmacroname;
|
||||||
|
%return;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* ensure output table is available */
|
||||||
|
data &outds;run;
|
||||||
|
proc sql;
|
||||||
|
drop table &outds;
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
|
||||||
|
/* get flows */
|
||||||
|
proc sort data=&inds;
|
||||||
|
by flow_id;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
set &inds (keep=flow_id) end=last;
|
||||||
|
by flow_id;
|
||||||
|
if last.flow_id then do;
|
||||||
|
cnt+1;
|
||||||
|
call symputx(cats('flow',cnt),flow_id,'l');
|
||||||
|
end;
|
||||||
|
if last then call symputx('flowcnt',cnt,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
/* prepare temporary datasets and frefs */
|
||||||
|
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
|
||||||
|
data;run;%let jds=&syslast;
|
||||||
|
data;run;%let jjson=&syslast;
|
||||||
|
data;run;%let jdsapp=&syslast;
|
||||||
|
data;run;%let jdsrunning=&syslast;
|
||||||
|
data;run;%let jdswaitfor=&syslast;
|
||||||
|
%let jfref=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
/* start loop */
|
||||||
|
%do fid=1 %to &flowcnt;
|
||||||
|
%put preparing job attributes for flow &&flow&fid;
|
||||||
|
%local jds jcnt;
|
||||||
|
data &jds(drop=_contextName _program);
|
||||||
|
set &inds(where=(flow_id=&&flow&fid));
|
||||||
|
if _contextName='' then _contextName="SAS Job Execution compute context";
|
||||||
|
call symputx(cats('job',_n_),_program,'l');
|
||||||
|
call symputx(cats('context',_n_),_contextName,'l');
|
||||||
|
call symputx('jcnt',_n_,'l');
|
||||||
|
run;
|
||||||
|
%put exporting job variables in json format;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
data &jjson;
|
||||||
|
set &jds;
|
||||||
|
if _n_=&jid then do;
|
||||||
|
output;
|
||||||
|
stop;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
proc json out=&jfref;
|
||||||
|
export &jjson / nosastags fmtnumeric;
|
||||||
|
run;
|
||||||
|
data _null_;
|
||||||
|
infile &jfref lrecl=32767;
|
||||||
|
input;
|
||||||
|
jparams='jparams'!!left(symget('jid'));
|
||||||
|
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||||
|
run;
|
||||||
|
%local joburi&jid;
|
||||||
|
%let joburi&jid=0; /* used in next loop */
|
||||||
|
%end;
|
||||||
|
%local concurrency completed;
|
||||||
|
%let concurrency=0;
|
||||||
|
%let completed=0;
|
||||||
|
proc sql; drop table &jdsrunning;
|
||||||
|
%do jid=1 %to &jcnt;
|
||||||
|
/**
|
||||||
|
* now we can execute the jobs up to the maxconcurrency setting
|
||||||
|
*/
|
||||||
|
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||||
|
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||||
|
/* job has not been triggered and we have free slots */
|
||||||
|
%local jobname jobpath;
|
||||||
|
%let jobname=%scan(&&job&jid,-1,/);
|
||||||
|
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||||
|
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
|
||||||
|
%mv_jobexecute(path=&jobpath
|
||||||
|
,name=&jobname
|
||||||
|
,paramstring=%superq(jparams&jid)
|
||||||
|
,outds=&jdsapp
|
||||||
|
)
|
||||||
|
data &jdsapp;
|
||||||
|
format jobparams $32767.;
|
||||||
|
set &jdsapp(where=(method='GET' and rel='state'));
|
||||||
|
jobparams=symget("jparams&jid");
|
||||||
|
call symputx("joburi&jid",uri,'l');
|
||||||
|
run;
|
||||||
|
proc append base=&jdsrunning data=&jdsapp;
|
||||||
|
run;
|
||||||
|
%let concurrency=%eval(&concurrency+1);
|
||||||
|
%end;
|
||||||
|
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||||
|
/* check to see if the job has finished as was previously executed */
|
||||||
|
%local jobcheck; %let jobcheck=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: jobcheck
|
||||||
|
from &outds where uri="&&joburi&jid";
|
||||||
|
%if &jobcheck>0 %then %do;
|
||||||
|
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||||
|
%let job&jid=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%if &jid=&jcnt %then %do;
|
||||||
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||||
|
%local done;
|
||||||
|
%let done=%mf_nobs(&jdswaitfor);
|
||||||
|
%if &done>0 %then %do;
|
||||||
|
%let completed=%eval(&completed+&done);
|
||||||
|
%let concurrency=%eval(&concurrency-&done);
|
||||||
|
data &jdsapp;
|
||||||
|
set &jdswaitfor;
|
||||||
|
flow_id=&&flow&fid;
|
||||||
|
run;
|
||||||
|
proc append base=&outds data=&jdsapp;
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
proc sql;
|
||||||
|
delete from &jdsrunning
|
||||||
|
where uri in (select uri from &outds
|
||||||
|
where state in ('canceled','completed','failed')
|
||||||
|
);
|
||||||
|
|
||||||
|
/* loop again if jobs are left */
|
||||||
|
%if &completed < &jcnt %then %do;
|
||||||
|
%let jid=0;
|
||||||
|
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
/* back up and execute the next flow */
|
||||||
|
%end;
|
||||||
|
|
||||||
|
|
||||||
|
%mend;
|
||||||
213
viya/mv_jobwaitfor.sas
Normal file
213
viya/mv_jobwaitfor.sas
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
/**
|
||||||
|
@file
|
||||||
|
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||||
|
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||||
|
jobs are completed. Completion is determined by reference to the returned
|
||||||
|
_state_, as per the following table:
|
||||||
|
|
||||||
|
| state | Wait? | Notes|
|
||||||
|
|-----------|-------|------|
|
||||||
|
| idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
|
||||||
|
| pending | yes | Job is preparing to run |
|
||||||
|
| running | yes | Job is running|
|
||||||
|
| canceled | no | Job was cancelled|
|
||||||
|
| completed | no | Job finished - does not mean it was successful. Check stateDetails|
|
||||||
|
| failed | no | Job failed to execute, could be a problem when calling the apis|
|
||||||
|
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
First, compile the macros:
|
||||||
|
|
||||||
|
filename mc url
|
||||||
|
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||||
|
%inc mc;
|
||||||
|
|
||||||
|
Next, create a job (in this case, as a web service):
|
||||||
|
|
||||||
|
filename ft15f001 temp;
|
||||||
|
parmcards4;
|
||||||
|
data ;
|
||||||
|
rand=ranuni(0)*1000000;
|
||||||
|
do x=1 to rand;
|
||||||
|
y=rand*x;
|
||||||
|
output;
|
||||||
|
end;
|
||||||
|
run;
|
||||||
|
;;;;
|
||||||
|
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
Then, execute the job,multiple times, and wait for them all to finish:
|
||||||
|
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
|
||||||
|
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
|
||||||
|
|
||||||
|
data work.jobs;
|
||||||
|
set work.ds1 work.ds2 work.ds3 work.ds4;
|
||||||
|
where method='GET' and rel='state';
|
||||||
|
run;
|
||||||
|
|
||||||
|
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||||
|
|
||||||
|
Delete the job:
|
||||||
|
|
||||||
|
%mv_deletejes(path=/Public/temp,name=demo)
|
||||||
|
|
||||||
|
@param [in] access_token_var= The global macro variable to contain the access token
|
||||||
|
@param [in] grant_type= valid values:
|
||||||
|
|
||||||
|
- password
|
||||||
|
- authorization_code
|
||||||
|
- detect - will check if access_token exists, if not will use sas_services if
|
||||||
|
a SASStudioV session else authorization_code. Default option.
|
||||||
|
- sas_services - will use oauth_bearer=sas_services
|
||||||
|
|
||||||
|
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||||
|
completes, processing will continue). Default=ALL.
|
||||||
|
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||||
|
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||||
|
job name. The uri should be in a `uri` variable, and the job path/name
|
||||||
|
should be in a `_program` variable.
|
||||||
|
@param [out] outds= The output dataset containing the list of states by job
|
||||||
|
(default=work.mv_jobexecute)
|
||||||
|
|
||||||
|
|
||||||
|
@version VIYA V.03.04
|
||||||
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
|
<h4> Dependencies </h4>
|
||||||
|
@li mp_abort.sas
|
||||||
|
@li mf_getplatform.sas
|
||||||
|
@li mf_getuniquefileref.sas
|
||||||
|
@li mf_existvar.sas
|
||||||
|
@li mf_nobs.sas
|
||||||
|
|
||||||
|
**/
|
||||||
|
|
||||||
|
%macro mv_jobwaitfor(action
|
||||||
|
,access_token_var=ACCESS_TOKEN
|
||||||
|
,grant_type=sas_services
|
||||||
|
,inds=0
|
||||||
|
,outds=work.mv_jobwaitfor
|
||||||
|
);
|
||||||
|
%local oauth_bearer;
|
||||||
|
%if &grant_type=detect %then %do;
|
||||||
|
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||||
|
%else %let grant_type=sas_services;
|
||||||
|
%end;
|
||||||
|
%if &grant_type=sas_services %then %do;
|
||||||
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
|
%let &access_token_var=;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
|
and &grant_type ne sas_services
|
||||||
|
)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||||
|
)
|
||||||
|
|
||||||
|
%mp_abort(iftrue=("&inds"="0")
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(input dataset not provided)
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The URI variable was not found in the input dataset(&inds))
|
||||||
|
)
|
||||||
|
%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
|
||||||
|
,mac=&sysmacroname
|
||||||
|
,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
|
||||||
|
)
|
||||||
|
|
||||||
|
%if %mf_nobs(&inds)=0 %then %do;
|
||||||
|
%put NOTE: Zero observations in &inds, &sysmacroname will now exit;
|
||||||
|
%return;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
options noquotelenmax;
|
||||||
|
%local base_uri; /* location of rest apis */
|
||||||
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
|
data _null_;
|
||||||
|
length jobparams $32767;
|
||||||
|
set &inds end=last;
|
||||||
|
call symputx(cats('joburi',_n_),uri,'l');
|
||||||
|
call symputx(cats('jobname',_n_),_program,'l');
|
||||||
|
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||||
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
%local runcnt;
|
||||||
|
%if &action=ALL %then %let runcnt=&uricnt;
|
||||||
|
%else %if &action=ANY %then %let runcnt=1;
|
||||||
|
%else %let runcnt=&uricnt;
|
||||||
|
|
||||||
|
%local fname0 ;
|
||||||
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
|
data &outds;
|
||||||
|
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||||
|
stop;
|
||||||
|
run;
|
||||||
|
|
||||||
|
%local i;
|
||||||
|
%do i=1 %to &uricnt;
|
||||||
|
%if "&&joburi&i" ne "0" %then %do;
|
||||||
|
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
|
||||||
|
headers "Accept"="text/plain"
|
||||||
|
%if &grant_type=authorization_code %then %do;
|
||||||
|
"Authorization"="Bearer &&&access_token_var"
|
||||||
|
%end; ;
|
||||||
|
run;
|
||||||
|
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||||
|
%do;
|
||||||
|
data _null_;infile &fname0;input;putlog _infile_;run;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
|
||||||
|
%let status=notset;
|
||||||
|
data _null_;
|
||||||
|
infile &fname0;
|
||||||
|
input;
|
||||||
|
call symputx('status',_infile_,'l');
|
||||||
|
run;
|
||||||
|
|
||||||
|
%if &status=completed or &status=failed or &status=canceled %then %do;
|
||||||
|
proc sql;
|
||||||
|
insert into &outds set
|
||||||
|
_program="&&jobname&i",
|
||||||
|
uri="&&joburi&i",
|
||||||
|
state="&status",
|
||||||
|
timestamp=datetime(),
|
||||||
|
jobparams=symget("jobparams&i");
|
||||||
|
%let joburi&i=0; /* do not re-check */
|
||||||
|
%end;
|
||||||
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
|
data _null_;
|
||||||
|
call sleep(1,1);
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
%else %do;
|
||||||
|
%mp_abort(mac=&sysmacroname
|
||||||
|
,msg=%str(status &status not expected!!)
|
||||||
|
)
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
%if &i=&uricnt %then %do;
|
||||||
|
%local goback;
|
||||||
|
%let goback=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into:goback from &outds;
|
||||||
|
%if &goback lt &runcnt %then %let i=0;
|
||||||
|
%end;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* clear refs */
|
||||||
|
filename &fname0 clear;
|
||||||
|
|
||||||
|
%mend;
|
||||||
Reference in New Issue
Block a user