mirror of
https://github.com/sasjs/core.git
synced 2025-12-11 14:34:35 +00:00
Compare commits
9 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 79c81aa8a4 | |||
| bbbcf7d550 | |||
| 82184bc6be | |||
| efc731cfaa | |||
| da9a74ee14 | |||
| 94762d9381 | |||
| 03d9d805ff | |||
| 94416028b7 | |||
| 6cf5d4ef28 |
476
all.sas
476
all.sas
@@ -577,11 +577,18 @@ options noquotelenmax;
|
||||
@brief Adds custom quotes / delimiters to a delimited string
|
||||
@details Can be used in open code, eg as follows:
|
||||
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
|
||||
which returns:
|
||||
> 'blah','blah','blah'
|
||||
|
||||
Alternatively:
|
||||
|
||||
%put %mf_getquotedstr(these words are double quoted,quote=D)
|
||||
|
||||
for:
|
||||
> "these","words","are","double","quoted"
|
||||
|
||||
@param in_str the unquoted, spaced delimited string to transform
|
||||
@param dlm= the delimeter to be applied to the output (default comma)
|
||||
@param indlm= the delimeter used for the input (default is space)
|
||||
@@ -4754,6 +4761,97 @@ proc sql
|
||||
%mp_binarycopy(inloc="&inloc",outref=_webout)
|
||||
%end;
|
||||
|
||||
%mend;/**
|
||||
@file
|
||||
@brief Runs arbitrary code for a specified amount of time
|
||||
@details Executes a series of procs and data steps to enable performance
|
||||
testing of arbitrary jobs.
|
||||
|
||||
%mp_testjob(
|
||||
duration=60*5
|
||||
)
|
||||
|
||||
@param [in] duration= the time in seconds which the job should run for. Actual
|
||||
time may vary, as the check is done in between steps. Default = 30 (seconds).
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_getuniquelibref.sas
|
||||
@li mf_getuniquename.sas
|
||||
@li mf_mkdir.sas
|
||||
|
||||
@version 9.4
|
||||
@author Allan Bowe
|
||||
|
||||
**/
|
||||
|
||||
%macro mp_testjob(duration=30
|
||||
)/*/STORE SOURCE*/;
|
||||
%local lib dir ds1 ds2 ds3 start_tm i;
|
||||
|
||||
%let start_tm=%sysfunc(datetime());
|
||||
%let duration=%sysevalf(&duration);
|
||||
|
||||
/* create a temporary library in WORK */
|
||||
%let lib=%mf_getuniquelibref();
|
||||
%let dir=%mf_getuniquename();
|
||||
%mf_mkdir(%sysfunc(pathname(work))/&dir)
|
||||
libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
|
||||
/* loop through until time expires */
|
||||
%let ds1=%mf_getuniquename();
|
||||
%let ds2=%mf_getuniquename();
|
||||
%let ds3=%mf_getuniquename();
|
||||
%do i=0 %to 1;
|
||||
|
||||
/* create big dataset */
|
||||
data &lib..&ds1(compress=no );
|
||||
do x=1 to 1000000;
|
||||
randnum0=ranuni(0)*3;
|
||||
randnum1=ranuni(0)*2;
|
||||
bigchar=repeat('A',300);
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc summary ;
|
||||
class randnum0 randnum1;
|
||||
output out=&lib..&ds2;
|
||||
run;quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* add more data */
|
||||
proc sql;
|
||||
create table &lib..&ds3 as
|
||||
select *, ranuni(0)*10 as randnum2
|
||||
from &lib..&ds1
|
||||
order by randnum1;
|
||||
quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc sort data=&lib..&ds3;
|
||||
by descending x;
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* wait 5 seconds */
|
||||
data _null_;
|
||||
call sleep(5,1);
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
%let i=0;
|
||||
|
||||
%end;
|
||||
|
||||
%gate:
|
||||
%put time is up!;
|
||||
proc datasets lib=&lib kill;
|
||||
run;
|
||||
quit;
|
||||
libname &lib clear;
|
||||
|
||||
|
||||
%mend;/**
|
||||
@file mp_testwritespeedlibrary.sas
|
||||
@brief Tests the write speed of a new table in a SAS library
|
||||
@@ -12400,22 +12498,24 @@ filename &fname2 clear;
|
||||
/**
|
||||
@file
|
||||
@brief Extract the status from a running SAS Viya job
|
||||
@details Extracts the status from a running job and writes it to a fileref.
|
||||
An output dataset is created like this:
|
||||
@details Extracts the status from a running job and appends it to an output
|
||||
dataset with the following structure:
|
||||
|
||||
| uri | state | timestamp |
|
||||
|---------------------------------------------------------------|---------|--------------------|
|
||||
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||
|
||||
To query the running job, you need the URI. Sample code for achieving this
|
||||
is provided below.
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Create a long running job (in this case, a web service):
|
||||
Next, create a long running job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
@@ -12432,7 +12532,7 @@ filename &fname2 clear;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it, grab the uri, and check status:
|
||||
Execute it, grab the uri, and finally, check the job status:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
@@ -12447,14 +12547,26 @@ filename &fname2 clear;
|
||||
|
||||
%mv_getjobstate(uri=&uri,outds=results)
|
||||
|
||||
You can run this macro as part of a loop to await the final 'completed' status.
|
||||
The full list of status values is:
|
||||
|
||||
@li idle
|
||||
@li pending
|
||||
@li running
|
||||
@li canceled
|
||||
@li completed
|
||||
@li failed
|
||||
|
||||
If you have one or more jobs that you'd like to wait for completion you can
|
||||
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@li sas_services - will use oauth_bearer=sas_services.
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||
@@ -12864,7 +12976,7 @@ libname &libref1 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -12969,7 +13081,32 @@ libname &libref;
|
||||
|
||||
The input table is formed as per below. Each observation represents one job.
|
||||
Each variable is converted into a macro variable with the same name.
|
||||
The FLOW column provides the sequential ordering capability.
|
||||
|
||||
## Input table (minimum variables needed)
|
||||
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank, will default to `SAS Job Execution compute context`.
|
||||
@li _PROGRAM - Provides the path to the job itself
|
||||
|
||||
Any additional variables provided in this table are converted into macro
|
||||
variables and passed into the relevant job.
|
||||
|
||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
||||
|---|---|---|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
||||
|
||||
## Output table (minimum variables produced)
|
||||
|
||||
@li _PROGRAM - the SAS Drive path of the job
|
||||
@li URI - the URI of the executed job
|
||||
@li STATE - the completed state of the job
|
||||
@li TIMESTAMP - the datetime that the job completed
|
||||
@li JOBPARAMS - the parameters that were passed to the job
|
||||
@li FLOW_ID - the id of the flow in which the job was executed
|
||||
|
||||

|
||||
|
||||
|
||||
## Example
|
||||
@@ -12984,10 +13121,13 @@ libname &libref;
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
%put this is job: &_program;
|
||||
%put this was run in flow &flow_id;
|
||||
data ;
|
||||
rand=ranuni(0)*¯ovar1;
|
||||
do x=1 to rand;
|
||||
y=rand*¯ovar2;
|
||||
if y=100 then abort;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
@@ -12998,15 +13138,17 @@ libname &libref;
|
||||
Prepare an input table with 60 executions:
|
||||
|
||||
data work.inputjobs;
|
||||
do flow=1 to 3;
|
||||
do job=1 to 10;
|
||||
_program='/Public/temp/name/demo1';
|
||||
macrovar1=10*job;
|
||||
macrovar2=4*job;
|
||||
_contextName='SAS Job Execution compute context';
|
||||
do flow_id=1 to 3;
|
||||
do i=1 to 20;
|
||||
_program='/Public/temp/demo1';
|
||||
macrovar1=10*i;
|
||||
macrovar2=4*i;
|
||||
output;
|
||||
_program='/Public/temp/name/demo2';
|
||||
macrovar1=40*job;
|
||||
macrovar2=44*job;
|
||||
i+1;
|
||||
_program='/Public/temp/demo2';
|
||||
macrovar1=40*i;
|
||||
macrovar2=44*i;
|
||||
output;
|
||||
end;
|
||||
end;
|
||||
@@ -13019,30 +13161,31 @@ libname &libref;
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@li sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||
@param [out] outds= The output dataset containing the results
|
||||
|
||||
@version VIYA V.03.05
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_nobs.sas
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_getuniquefileref.sas
|
||||
@li mv_getfoldermembers.sas
|
||||
@li ml_json.sas
|
||||
@li mf_existvarlist.sas
|
||||
@li mv_jobwaitfor.sas
|
||||
@li mv_jobexecute.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_jobflow(outref=0,outfile=0
|
||||
,name=0,path=0
|
||||
,contextName=SAS Job Execution compute context
|
||||
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
||||
,maxconcurrency=8
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
);
|
||||
@@ -13055,111 +13198,189 @@ libname &libref;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
%mp_abort(iftrue=("&path"="0")
|
||||
|
||||
%mp_abort(iftrue=("&inds"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job Path not provided)
|
||||
,msg=%str(Input dataset was not provided)
|
||||
)
|
||||
%mp_abort(iftrue=("&name"="0")
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job Name not provided)
|
||||
,msg=%str(The following columns must exist on input dataset &inds:
|
||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||
)
|
||||
%mp_abort(iftrue=("&outfile"="0" and "&outref"="0")
|
||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Output destination (file or fileref) must be provided)
|
||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||
)
|
||||
|
||||
%local missings;
|
||||
proc sql noprint;
|
||||
select count(*) into: missings
|
||||
from &inds
|
||||
where flow_id is null or _program is null;
|
||||
%mp_abort(iftrue=(&missings>0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||
)
|
||||
|
||||
%if %mf_nobs(&inds)=0 %then %do;
|
||||
%put No observations in &inds! Leaving macro &sysmacroname;
|
||||
%return;
|
||||
%end;
|
||||
|
||||
/* ensure output table is available */
|
||||
data &outds;run;
|
||||
proc sql;
|
||||
drop table &outds;
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
data;run;
|
||||
%local foldermembers;
|
||||
%let foldermembers=&syslast;
|
||||
%mv_getfoldermembers(root=&path
|
||||
,access_token_var=&access_token_var
|
||||
,grant_type=&grant_type
|
||||
,outds=&foldermembers
|
||||
)
|
||||
%local joburi;
|
||||
%let joburi=0;
|
||||
data _null_;
|
||||
set &foldermembers;
|
||||
if name="&name" and uri=:'/jobDefinitions/definitions'
|
||||
then call symputx('joburi',uri);
|
||||
run;
|
||||
%mp_abort(iftrue=("&joburi"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Job &path/&name not found)
|
||||
)
|
||||
|
||||
/* prepare request*/
|
||||
%local fname1;
|
||||
%let fname1=%mf_getuniquefileref();
|
||||
proc http method='GET' out=&fname1 &oauth_bearer
|
||||
url="&base_uri&joburi";
|
||||
headers "Accept"="application/vnd.sas.job.definition+json"
|
||||
%if &grant_type=authorization_code %then %do;
|
||||
"Authorization"="Bearer &&&access_token_var"
|
||||
%end;
|
||||
;
|
||||
|
||||
/* get flows */
|
||||
proc sort data=&inds;
|
||||
by flow_id;
|
||||
run;
|
||||
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
|
||||
%do;
|
||||
data _null_;infile &fname1;input;putlog _infile_;run;
|
||||
%mp_abort(mac=&sysmacroname
|
||||
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
|
||||
)
|
||||
data _null_;
|
||||
set &inds (keep=flow_id) end=last;
|
||||
by flow_id;
|
||||
if last.flow_id then do;
|
||||
cnt+1;
|
||||
call symputx(cats('flow',cnt),flow_id,'l');
|
||||
end;
|
||||
if last then call symputx('flowcnt',cnt,'l');
|
||||
run;
|
||||
|
||||
/* prepare temporary datasets and frefs */
|
||||
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
|
||||
data;run;%let jds=&syslast;
|
||||
data;run;%let jjson=&syslast;
|
||||
data;run;%let jdsapp=&syslast;
|
||||
data;run;%let jdsrunning=&syslast;
|
||||
data;run;%let jdswaitfor=&syslast;
|
||||
%let jfref=%mf_getuniquefileref();
|
||||
|
||||
/* start loop */
|
||||
%do fid=1 %to &flowcnt;
|
||||
%put preparing job attributes for flow &&flow&fid;
|
||||
%local jds jcnt;
|
||||
data &jds(drop=_contextName _program);
|
||||
set &inds(where=(flow_id=&&flow&fid));
|
||||
if _contextName='' then _contextName="SAS Job Execution compute context";
|
||||
call symputx(cats('job',_n_),_program,'l');
|
||||
call symputx(cats('context',_n_),_contextName,'l');
|
||||
call symputx('jcnt',_n_,'l');
|
||||
run;
|
||||
%put exporting job variables in json format;
|
||||
%do jid=1 %to &jcnt;
|
||||
data &jjson;
|
||||
set &jds;
|
||||
if _n_=&jid then do;
|
||||
output;
|
||||
stop;
|
||||
end;
|
||||
run;
|
||||
proc json out=&jfref;
|
||||
export &jjson / nosastags fmtnumeric;
|
||||
run;
|
||||
data _null_;
|
||||
infile &jfref lrecl=32767;
|
||||
input;
|
||||
jparams='jparams'!!left(symget('jid'));
|
||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||
run;
|
||||
%local joburi&jid;
|
||||
%let joburi&jid=0; /* used in next loop */
|
||||
%end;
|
||||
%local concurrency completed;
|
||||
%let concurrency=0;
|
||||
%let completed=0;
|
||||
proc sql; drop table &jdsrunning;
|
||||
%do jid=1 %to &jcnt;
|
||||
/**
|
||||
* now we can execute the jobs up to the maxconcurrency setting
|
||||
*/
|
||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
/* job has not been triggered and we have free slots */
|
||||
%local jobname jobpath;
|
||||
%let jobname=%scan(&&job&jid,-1,/);
|
||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
|
||||
%mv_jobexecute(path=&jobpath
|
||||
,name=&jobname
|
||||
,paramstring=%superq(jparams&jid)
|
||||
,outds=&jdsapp
|
||||
)
|
||||
data &jdsapp;
|
||||
format jobparams $32767.;
|
||||
set &jdsapp(where=(method='GET' and rel='state'));
|
||||
jobparams=symget("jparams&jid");
|
||||
call symputx("joburi&jid",uri,'l');
|
||||
run;
|
||||
proc append base=&jdsrunning data=&jdsapp;
|
||||
run;
|
||||
%let concurrency=%eval(&concurrency+1);
|
||||
%end;
|
||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||
/* check to see if the job has finished as was previously executed */
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uri="&&joburi&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
%if &jid=&jcnt %then %do;
|
||||
/* we are at the end of the loop - time to see which jobs have finished */
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||
%local done;
|
||||
%let done=%mf_nobs(&jdswaitfor);
|
||||
%if &done>0 %then %do;
|
||||
%let completed=%eval(&completed+&done);
|
||||
%let concurrency=%eval(&concurrency-&done);
|
||||
data &jdsapp;
|
||||
set &jdswaitfor;
|
||||
flow_id=&&flow&fid;
|
||||
run;
|
||||
proc append base=&outds data=&jdsapp;
|
||||
run;
|
||||
%end;
|
||||
proc sql;
|
||||
delete from &jdsrunning
|
||||
where uri in (select uri from &outds
|
||||
where state in ('canceled','completed','failed')
|
||||
);
|
||||
|
||||
/* loop again if jobs are left */
|
||||
%if &completed < &jcnt %then %do;
|
||||
%let jid=0;
|
||||
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
/* back up and execute the next flow */
|
||||
%end;
|
||||
%local fname2 fname3 fpath1 fpath2 fpath3;
|
||||
%let fname2=%mf_getuniquefileref();
|
||||
%let fname3=%mf_getuniquefileref();
|
||||
%let fpath1=%sysfunc(pathname(&fname1));
|
||||
%let fpath2=%sysfunc(pathname(&fname2));
|
||||
%let fpath3=%sysfunc(pathname(&fname2));
|
||||
|
||||
/* compile the lua JSON module */
|
||||
%ml_json()
|
||||
/* read using LUA - this allows the code to be of any length */
|
||||
data _null_;
|
||||
file "&fpath3..lua";
|
||||
put '
|
||||
infile = io.open (sas.symget("fpath1"), "r")
|
||||
outfile = io.open (sas.symget("fpath2"), "w")
|
||||
io.input(infile)
|
||||
local resp=json.decode(io.read())
|
||||
local job=resp["code"]
|
||||
outfile:write(job)
|
||||
io.close(infile)
|
||||
io.close(outfile)
|
||||
';
|
||||
run;
|
||||
%inc "&fpath3..lua";
|
||||
/* export to desired destination */
|
||||
data _null_;
|
||||
%if &outref=0 %then %do;
|
||||
file "&outfile" lrecl=32767;
|
||||
%end;
|
||||
%else %do;
|
||||
file &outref;
|
||||
%end;
|
||||
infile &fname2;
|
||||
input;
|
||||
put _infile_;
|
||||
run;
|
||||
filename &fname1 clear;
|
||||
filename &fname2 clear;
|
||||
|
||||
%mend;
|
||||
/**
|
||||
@file
|
||||
@brief Takes a dataset of running jobs and waits for them to complete
|
||||
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all
|
||||
completed. Completion is determined by reference to the returned _state_, as
|
||||
per the following table:
|
||||
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||
jobs are completed. Completion is determined by reference to the returned
|
||||
_state_, as per the following table:
|
||||
|
||||
| state | Wait? | Notes|
|
||||
|-----------|-------|------|
|
||||
@@ -13205,8 +13426,7 @@ filename &fname2 clear;
|
||||
where method='GET' and rel='state';
|
||||
run;
|
||||
|
||||
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates)
|
||||
|
||||
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||
|
||||
Delete the job:
|
||||
|
||||
@@ -13221,6 +13441,8 @@ filename &fname2 clear;
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
- sas_services - will use oauth_bearer=sas_services
|
||||
|
||||
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||
completes, processing will continue). Default=ALL.
|
||||
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||
job name. The uri should be in a `uri` variable, and the job path/name
|
||||
@@ -13241,8 +13463,8 @@ filename &fname2 clear;
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_jobwaitfor(
|
||||
access_token_var=ACCESS_TOKEN
|
||||
%macro mv_jobwaitfor(action
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,inds=0
|
||||
,outds=work.mv_jobwaitfor
|
||||
@@ -13256,7 +13478,7 @@ filename &fname2 clear;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -13287,17 +13509,24 @@ options noquotelenmax;
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
data _null_;
|
||||
length jobparams $32767;
|
||||
set &inds end=last;
|
||||
call symputx(cats('joburi',_n_),uri,'l');
|
||||
call symputx(cats('jobname',_n_),_program,'l');
|
||||
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||
if last then call symputx('uricnt',_n_,'l');
|
||||
run;
|
||||
|
||||
%local runcnt;
|
||||
%if &action=ALL %then %let runcnt=&uricnt;
|
||||
%else %if &action=ANY %then %let runcnt=1;
|
||||
%else %let runcnt=&uricnt;
|
||||
|
||||
%local fname0 ;
|
||||
%let fname0=%mf_getuniquefileref();
|
||||
|
||||
data &outds;
|
||||
format _program uri $128. state $32. timestamp datetime19.;
|
||||
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||
stop;
|
||||
run;
|
||||
|
||||
@@ -13331,8 +13560,9 @@ run;
|
||||
_program="&&jobname&i",
|
||||
uri="&&joburi&i",
|
||||
state="&status",
|
||||
timestamp=datetime();
|
||||
%let joburi&i=0;
|
||||
timestamp=datetime(),
|
||||
jobparams=symget("jobparams&i");
|
||||
%let joburi&i=0; /* do not re-check */
|
||||
%end;
|
||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||
data _null_;
|
||||
@@ -13350,7 +13580,7 @@ run;
|
||||
%let goback=0;
|
||||
proc sql noprint;
|
||||
select count(*) into:goback from &outds;
|
||||
%if &goback ne &uricnt %then %let i=0;
|
||||
%if &goback lt &runcnt %then %let i=0;
|
||||
%end;
|
||||
%end;
|
||||
|
||||
|
||||
31
base/mf_existfileref.sas
Normal file
31
base/mf_existfileref.sas
Normal file
@@ -0,0 +1,31 @@
|
||||
/**
|
||||
@file
|
||||
@brief Checks whether a fileref exists
|
||||
@details You can probably do without this macro as it is just a one liner.
|
||||
Mainly it is here as a convenient way to remember the syntax!
|
||||
|
||||
For this macro, if the fileref exists but the underlying file does not exist
|
||||
|
||||
@param fref the fileref to detect
|
||||
|
||||
@return output returns 1 if found AND the file exists. 0 is returned if not
|
||||
found, and -1 is returned if the fileref is found but the file does not exist.
|
||||
|
||||
@version 8
|
||||
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
|
||||
**/
|
||||
|
||||
%macro mf_existfileref(fref
|
||||
)/*/STORE SOURCE*/;
|
||||
%local result;
|
||||
%let result=%sysfunc(fileref(&fref));
|
||||
%if &result>0 %then %do;
|
||||
0
|
||||
%end;
|
||||
%else %if &result=0 %then %do;
|
||||
1
|
||||
%end;
|
||||
%else %do;
|
||||
-1
|
||||
%end;
|
||||
%mend;
|
||||
@@ -3,11 +3,18 @@
|
||||
@brief Adds custom quotes / delimiters to a delimited string
|
||||
@details Can be used in open code, eg as follows:
|
||||
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
%put %mf_getquotedstr(blah blah blah);
|
||||
|
||||
which returns:
|
||||
> 'blah','blah','blah'
|
||||
|
||||
Alternatively:
|
||||
|
||||
%put %mf_getquotedstr(these words are double quoted,quote=D)
|
||||
|
||||
for:
|
||||
> "these","words","are","double","quoted"
|
||||
|
||||
@param in_str the unquoted, spaced delimited string to transform
|
||||
@param dlm= the delimeter to be applied to the output (default comma)
|
||||
@param indlm= the delimeter used for the input (default is space)
|
||||
|
||||
92
base/mp_testjob.sas
Normal file
92
base/mp_testjob.sas
Normal file
@@ -0,0 +1,92 @@
|
||||
/**
|
||||
@file
|
||||
@brief Runs arbitrary code for a specified amount of time
|
||||
@details Executes a series of procs and data steps to enable performance
|
||||
testing of arbitrary jobs.
|
||||
|
||||
%mp_testjob(
|
||||
duration=60*5
|
||||
)
|
||||
|
||||
@param [in] duration= the time in seconds which the job should run for. Actual
|
||||
time may vary, as the check is done in between steps. Default = 30 (seconds).
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_getuniquelibref.sas
|
||||
@li mf_getuniquename.sas
|
||||
@li mf_mkdir.sas
|
||||
|
||||
@version 9.4
|
||||
@author Allan Bowe
|
||||
|
||||
**/
|
||||
|
||||
%macro mp_testjob(duration=30
|
||||
)/*/STORE SOURCE*/;
|
||||
%local lib dir ds1 ds2 ds3 start_tm i;
|
||||
|
||||
%let start_tm=%sysfunc(datetime());
|
||||
%let duration=%sysevalf(&duration);
|
||||
|
||||
/* create a temporary library in WORK */
|
||||
%let lib=%mf_getuniquelibref();
|
||||
%let dir=%mf_getuniquename();
|
||||
%mf_mkdir(%sysfunc(pathname(work))/&dir)
|
||||
libname &lib "%sysfunc(pathname(work))/&dir";
|
||||
|
||||
/* loop through until time expires */
|
||||
%let ds1=%mf_getuniquename();
|
||||
%let ds2=%mf_getuniquename();
|
||||
%let ds3=%mf_getuniquename();
|
||||
%do i=0 %to 1;
|
||||
|
||||
/* create big dataset */
|
||||
data &lib..&ds1(compress=no );
|
||||
do x=1 to 1000000;
|
||||
randnum0=ranuni(0)*3;
|
||||
randnum1=ranuni(0)*2;
|
||||
bigchar=repeat('A',300);
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc summary ;
|
||||
class randnum0 randnum1;
|
||||
output out=&lib..&ds2;
|
||||
run;quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* add more data */
|
||||
proc sql;
|
||||
create table &lib..&ds3 as
|
||||
select *, ranuni(0)*10 as randnum2
|
||||
from &lib..&ds1
|
||||
order by randnum1;
|
||||
quit;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
proc sort data=&lib..&ds3;
|
||||
by descending x;
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
/* wait 5 seconds */
|
||||
data _null_;
|
||||
call sleep(5,1);
|
||||
run;
|
||||
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
|
||||
|
||||
%let i=0;
|
||||
|
||||
%end;
|
||||
|
||||
%gate:
|
||||
%put time is up!;
|
||||
proc datasets lib=&lib kill;
|
||||
run;
|
||||
quit;
|
||||
libname &lib clear;
|
||||
|
||||
|
||||
%mend;
|
||||
@@ -1,22 +1,24 @@
|
||||
/**
|
||||
@file
|
||||
@brief Extract the status from a running SAS Viya job
|
||||
@details Extracts the status from a running job and writes it to a fileref.
|
||||
An output dataset is created like this:
|
||||
@details Extracts the status from a running job and appends it to an output
|
||||
dataset with the following structure:
|
||||
|
||||
| uri | state | timestamp |
|
||||
|---------------------------------------------------------------|---------|--------------------|
|
||||
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
|
||||
|
||||
To query the running job, you need the URI. Sample code for achieving this
|
||||
is provided below.
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Create a long running job (in this case, a web service):
|
||||
Next, create a long running job (in this case, a web service):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
@@ -33,7 +35,7 @@
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo)
|
||||
|
||||
Execute it, grab the uri, and check status:
|
||||
Execute it, grab the uri, and finally, check the job status:
|
||||
|
||||
%mv_jobexecute(path=/Public/temp
|
||||
,name=demo
|
||||
@@ -48,14 +50,26 @@
|
||||
|
||||
%mv_getjobstate(uri=&uri,outds=results)
|
||||
|
||||
You can run this macro as part of a loop to await the final 'completed' status.
|
||||
The full list of status values is:
|
||||
|
||||
@li idle
|
||||
@li pending
|
||||
@li running
|
||||
@li canceled
|
||||
@li completed
|
||||
@li failed
|
||||
|
||||
If you have one or more jobs that you'd like to wait for completion you can
|
||||
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
* password
|
||||
* authorization_code
|
||||
* detect - will check if access_token exists, if not will use sas_services if
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
* sas_services - will use oauth_bearer=sas_services
|
||||
@li sas_services - will use oauth_bearer=sas_services.
|
||||
@param [in] uri= The uri of the running job for which to fetch the status,
|
||||
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
|
||||
@param [out] outds= The output dataset in which to APPEND the status. Three
|
||||
|
||||
@@ -72,7 +72,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
|
||||
302
viya/mv_jobflow.sas
Normal file
302
viya/mv_jobflow.sas
Normal file
@@ -0,0 +1,302 @@
|
||||
/**
|
||||
@file
|
||||
@brief Execute a series of job flows
|
||||
@details Very (very) simple flow manager. Jobs execute in sequential waves,
|
||||
all previous waves must finish successfully.
|
||||
|
||||
The input table is formed as per below. Each observation represents one job.
|
||||
Each variable is converted into a macro variable with the same name.
|
||||
|
||||
## Input table (minimum variables needed)
|
||||
|
||||
@li FLOW_ID - Numeric value, provides sequential ordering capability
|
||||
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
|
||||
blank, will default to `SAS Job Execution compute context`.
|
||||
@li _PROGRAM - Provides the path to the job itself
|
||||
|
||||
Any additional variables provided in this table are converted into macro
|
||||
variables and passed into the relevant job.
|
||||
|
||||
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|
||||
|---|---|---|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|
||||
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
|
||||
|
||||
## Output table (minimum variables produced)
|
||||
|
||||
@li _PROGRAM - the SAS Drive path of the job
|
||||
@li URI - the URI of the executed job
|
||||
@li STATE - the completed state of the job
|
||||
@li TIMESTAMP - the datetime that the job completed
|
||||
@li JOBPARAMS - the parameters that were passed to the job
|
||||
@li FLOW_ID - the id of the flow in which the job was executed
|
||||
|
||||

|
||||
|
||||
|
||||
## Example
|
||||
|
||||
First, compile the macros:
|
||||
|
||||
filename mc url
|
||||
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
|
||||
%inc mc;
|
||||
|
||||
Next, create some jobs (in this case, as web services):
|
||||
|
||||
filename ft15f001 temp;
|
||||
parmcards4;
|
||||
%put this is job: &_program;
|
||||
%put this was run in flow &flow_id;
|
||||
data ;
|
||||
rand=ranuni(0)*¯ovar1;
|
||||
do x=1 to rand;
|
||||
y=rand*¯ovar2;
|
||||
if y=100 then abort;
|
||||
output;
|
||||
end;
|
||||
run;
|
||||
;;;;
|
||||
%mv_createwebservice(path=/Public/temp,name=demo1)
|
||||
%mv_createwebservice(path=/Public/temp,name=demo2)
|
||||
|
||||
Prepare an input table with 60 executions:
|
||||
|
||||
data work.inputjobs;
|
||||
_contextName='SAS Job Execution compute context';
|
||||
do flow_id=1 to 3;
|
||||
do i=1 to 20;
|
||||
_program='/Public/temp/demo1';
|
||||
macrovar1=10*i;
|
||||
macrovar2=4*i;
|
||||
output;
|
||||
i+1;
|
||||
_program='/Public/temp/demo2';
|
||||
macrovar1=40*i;
|
||||
macrovar2=44*i;
|
||||
output;
|
||||
end;
|
||||
end;
|
||||
run;
|
||||
|
||||
Trigger the flow
|
||||
|
||||
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
|
||||
|
||||
|
||||
@param [in] access_token_var= The global macro variable to contain the access token
|
||||
@param [in] grant_type= valid values:
|
||||
@li password
|
||||
@li authorization_code
|
||||
@li detect - will check if access_token exists, if not will use sas_services if
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
@li sas_services - will use oauth_bearer=sas_services
|
||||
@param [in] inds= The input dataset containing a list of jobs and parameters
|
||||
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
|
||||
@param [out] outds= The output dataset containing the results
|
||||
|
||||
@version VIYA V.03.05
|
||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||
|
||||
<h4> SAS Macros </h4>
|
||||
@li mf_nobs.sas
|
||||
@li mp_abort.sas
|
||||
@li mf_getplatform.sas
|
||||
@li mf_getuniquefileref.sas
|
||||
@li mf_existvarlist.sas
|
||||
@li mv_jobwaitfor.sas
|
||||
@li mv_jobexecute.sas
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
|
||||
,maxconcurrency=8
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
);
|
||||
%local oauth_bearer;
|
||||
%if &grant_type=detect %then %do;
|
||||
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
|
||||
%else %let grant_type=sas_services;
|
||||
%end;
|
||||
%if &grant_type=sas_services %then %do;
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Invalid value for grant_type: &grant_type)
|
||||
)
|
||||
|
||||
%mp_abort(iftrue=("&inds"="0")
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(Input dataset was not provided)
|
||||
)
|
||||
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The following columns must exist on input dataset &inds:
|
||||
_CONTEXTNAME FLOW_ID _PROGRAM)
|
||||
)
|
||||
%mp_abort(iftrue=(&maxconcurrency<1)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(The maxconcurrency variable should be a positive integer)
|
||||
)
|
||||
|
||||
%local missings;
|
||||
proc sql noprint;
|
||||
select count(*) into: missings
|
||||
from &inds
|
||||
where flow_id is null or _program is null;
|
||||
%mp_abort(iftrue=(&missings>0)
|
||||
,mac=&sysmacroname
|
||||
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||
)
|
||||
|
||||
%if %mf_nobs(&inds)=0 %then %do;
|
||||
%put No observations in &inds! Leaving macro &sysmacroname;
|
||||
%return;
|
||||
%end;
|
||||
|
||||
/* ensure output table is available */
|
||||
data &outds;run;
|
||||
proc sql;
|
||||
drop table &outds;
|
||||
|
||||
options noquotelenmax;
|
||||
%local base_uri; /* location of rest apis */
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
|
||||
/* get flows */
|
||||
proc sort data=&inds;
|
||||
by flow_id;
|
||||
run;
|
||||
data _null_;
|
||||
set &inds (keep=flow_id) end=last;
|
||||
by flow_id;
|
||||
if last.flow_id then do;
|
||||
cnt+1;
|
||||
call symputx(cats('flow',cnt),flow_id,'l');
|
||||
end;
|
||||
if last then call symputx('flowcnt',cnt,'l');
|
||||
run;
|
||||
|
||||
/* prepare temporary datasets and frefs */
|
||||
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
|
||||
data;run;%let jds=&syslast;
|
||||
data;run;%let jjson=&syslast;
|
||||
data;run;%let jdsapp=&syslast;
|
||||
data;run;%let jdsrunning=&syslast;
|
||||
data;run;%let jdswaitfor=&syslast;
|
||||
%let jfref=%mf_getuniquefileref();
|
||||
|
||||
/* start loop */
|
||||
%do fid=1 %to &flowcnt;
|
||||
%put preparing job attributes for flow &&flow&fid;
|
||||
%local jds jcnt;
|
||||
data &jds(drop=_contextName _program);
|
||||
set &inds(where=(flow_id=&&flow&fid));
|
||||
if _contextName='' then _contextName="SAS Job Execution compute context";
|
||||
call symputx(cats('job',_n_),_program,'l');
|
||||
call symputx(cats('context',_n_),_contextName,'l');
|
||||
call symputx('jcnt',_n_,'l');
|
||||
run;
|
||||
%put exporting job variables in json format;
|
||||
%do jid=1 %to &jcnt;
|
||||
data &jjson;
|
||||
set &jds;
|
||||
if _n_=&jid then do;
|
||||
output;
|
||||
stop;
|
||||
end;
|
||||
run;
|
||||
proc json out=&jfref;
|
||||
export &jjson / nosastags fmtnumeric;
|
||||
run;
|
||||
data _null_;
|
||||
infile &jfref lrecl=32767;
|
||||
input;
|
||||
jparams='jparams'!!left(symget('jid'));
|
||||
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||
run;
|
||||
%local joburi&jid;
|
||||
%let joburi&jid=0; /* used in next loop */
|
||||
%end;
|
||||
%local concurrency completed;
|
||||
%let concurrency=0;
|
||||
%let completed=0;
|
||||
proc sql; drop table &jdsrunning;
|
||||
%do jid=1 %to &jcnt;
|
||||
/**
|
||||
* now we can execute the jobs up to the maxconcurrency setting
|
||||
*/
|
||||
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
|
||||
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
|
||||
/* job has not been triggered and we have free slots */
|
||||
%local jobname jobpath;
|
||||
%let jobname=%scan(&&job&jid,-1,/);
|
||||
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
|
||||
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
|
||||
%mv_jobexecute(path=&jobpath
|
||||
,name=&jobname
|
||||
,paramstring=%superq(jparams&jid)
|
||||
,outds=&jdsapp
|
||||
)
|
||||
data &jdsapp;
|
||||
format jobparams $32767.;
|
||||
set &jdsapp(where=(method='GET' and rel='state'));
|
||||
jobparams=symget("jparams&jid");
|
||||
call symputx("joburi&jid",uri,'l');
|
||||
run;
|
||||
proc append base=&jdsrunning data=&jdsapp;
|
||||
run;
|
||||
%let concurrency=%eval(&concurrency+1);
|
||||
%end;
|
||||
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||
/* check to see if the job has finished as was previously executed */
|
||||
%local jobcheck; %let jobcheck=0;
|
||||
proc sql noprint;
|
||||
select count(*) into: jobcheck
|
||||
from &outds where uri="&&joburi&jid";
|
||||
%if &jobcheck>0 %then %do;
|
||||
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
|
||||
%let job&jid=0;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
%if &jid=&jcnt %then %do;
|
||||
/* we are at the end of the loop - time to see which jobs have finished */
|
||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||
%local done;
|
||||
%let done=%mf_nobs(&jdswaitfor);
|
||||
%if &done>0 %then %do;
|
||||
%let completed=%eval(&completed+&done);
|
||||
%let concurrency=%eval(&concurrency-&done);
|
||||
data &jdsapp;
|
||||
set &jdswaitfor;
|
||||
flow_id=&&flow&fid;
|
||||
run;
|
||||
proc append base=&outds data=&jdsapp;
|
||||
run;
|
||||
%end;
|
||||
proc sql;
|
||||
delete from &jdsrunning
|
||||
where uri in (select uri from &outds
|
||||
where state in ('canceled','completed','failed')
|
||||
);
|
||||
|
||||
/* loop again if jobs are left */
|
||||
%if &completed < &jcnt %then %do;
|
||||
%let jid=0;
|
||||
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
|
||||
%end;
|
||||
%end;
|
||||
%end;
|
||||
/* back up and execute the next flow */
|
||||
%end;
|
||||
|
||||
|
||||
%mend;
|
||||
@@ -1,9 +1,9 @@
|
||||
/**
|
||||
@file
|
||||
@brief Takes a dataset of running jobs and waits for them to complete
|
||||
@details Will poll `/jobs/{jobId}/state` at set intervals until they are all
|
||||
completed. Completion is determined by reference to the returned _state_, as
|
||||
per the following table:
|
||||
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
|
||||
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
|
||||
jobs are completed. Completion is determined by reference to the returned
|
||||
_state_, as per the following table:
|
||||
|
||||
| state | Wait? | Notes|
|
||||
|-----------|-------|------|
|
||||
@@ -49,8 +49,7 @@
|
||||
where method='GET' and rel='state';
|
||||
run;
|
||||
|
||||
%mv_jobwaitfor(inds=work.jobs,outds=work.jobstates)
|
||||
|
||||
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
|
||||
|
||||
Delete the job:
|
||||
|
||||
@@ -65,6 +64,8 @@
|
||||
a SASStudioV session else authorization_code. Default option.
|
||||
- sas_services - will use oauth_bearer=sas_services
|
||||
|
||||
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
|
||||
completes, processing will continue). Default=ALL.
|
||||
@param [in] inds= The input dataset containing the list of job uris, in the
|
||||
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
|
||||
job name. The uri should be in a `uri` variable, and the job path/name
|
||||
@@ -85,8 +86,8 @@
|
||||
|
||||
**/
|
||||
|
||||
%macro mv_jobwaitfor(
|
||||
access_token_var=ACCESS_TOKEN
|
||||
%macro mv_jobwaitfor(action
|
||||
,access_token_var=ACCESS_TOKEN
|
||||
,grant_type=sas_services
|
||||
,inds=0
|
||||
,outds=work.mv_jobwaitfor
|
||||
@@ -100,7 +101,7 @@
|
||||
%let oauth_bearer=oauth_bearer=sas_services;
|
||||
%let &access_token_var=;
|
||||
%end;
|
||||
%put &sysmacroname: grant_type=&grant_type;
|
||||
|
||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||
and &grant_type ne sas_services
|
||||
)
|
||||
@@ -131,17 +132,24 @@ options noquotelenmax;
|
||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||
|
||||
data _null_;
|
||||
length jobparams $32767;
|
||||
set &inds end=last;
|
||||
call symputx(cats('joburi',_n_),uri,'l');
|
||||
call symputx(cats('jobname',_n_),_program,'l');
|
||||
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||
if last then call symputx('uricnt',_n_,'l');
|
||||
run;
|
||||
|
||||
%local runcnt;
|
||||
%if &action=ALL %then %let runcnt=&uricnt;
|
||||
%else %if &action=ANY %then %let runcnt=1;
|
||||
%else %let runcnt=&uricnt;
|
||||
|
||||
%local fname0 ;
|
||||
%let fname0=%mf_getuniquefileref();
|
||||
|
||||
data &outds;
|
||||
format _program uri $128. state $32. timestamp datetime19.;
|
||||
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||
stop;
|
||||
run;
|
||||
|
||||
@@ -175,8 +183,9 @@ run;
|
||||
_program="&&jobname&i",
|
||||
uri="&&joburi&i",
|
||||
state="&status",
|
||||
timestamp=datetime();
|
||||
%let joburi&i=0;
|
||||
timestamp=datetime(),
|
||||
jobparams=symget("jobparams&i");
|
||||
%let joburi&i=0; /* do not re-check */
|
||||
%end;
|
||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||
data _null_;
|
||||
@@ -194,7 +203,7 @@ run;
|
||||
%let goback=0;
|
||||
proc sql noprint;
|
||||
select count(*) into:goback from &outds;
|
||||
%if &goback ne &uricnt %then %let i=0;
|
||||
%if &goback lt &runcnt %then %let i=0;
|
||||
%end;
|
||||
%end;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user