1
0
mirror of https://github.com/sasjs/core.git synced 2025-12-12 15:04:36 +00:00

Compare commits

...

17 Commits

Author SHA1 Message Date
79c81aa8a4 feat: mf_existfileref macro 2021-01-26 16:00:23 +01:00
bbbcf7d550 chore: updating the docs for mf_getquotedstr 2021-01-23 13:37:15 +02:00
82184bc6be fix: adding quit statement so that exit loop would work on step boundary 2021-01-21 21:55:07 +02:00
efc731cfaa feat: mp_testjob macro for running arbitrary long jobs 2021-01-21 21:48:05 +02:00
da9a74ee14 chore: updating doxy headers 2021-01-21 21:47:41 +02:00
94762d9381 feat: mv_jobflow macro - enables a SAS program to kick off multiple waves of SAS Viya jobs, and to limit those waves by a maximum number of parallel (concurrent) running jobs. 2021-01-16 21:34:17 +02:00
03d9d805ff fix: adding support for jobparams in output table for mv_jobwaitfor 2021-01-16 20:43:15 +02:00
94416028b7 fix: adding ACTION parameter to mv_jobwaitfor - can now wait for ANY or ALL jobs to finish 2021-01-16 19:08:38 +02:00
6cf5d4ef28 chore: updating the header description 2021-01-15 23:12:38 +02:00
e4ceaecfb2 feat: adding mv_getjobstate macro to fetch the state of a running SAS Viya job 2021-01-15 13:02:53 +02:00
Allan Bowe
2eb246c543 fix: removing favicon file 2021-01-14 18:07:42 +01:00
d9954ae777 fix: renegade comma 2021-01-14 16:55:17 +02:00
364dc9f07f feat: adding _program value to mv_jobexecute.sas 2021-01-14 16:37:58 +02:00
d96125c3cf fix: mv_jobwaitfor 2021-01-05 17:14:03 +00:00
506695be56 feat: mv_jobwaitfor macro, similar to waitfor statement (in concept) - will wait for ALL of a set of viya jobs to finish executing 2021-01-05 14:41:27 +00:00
Allan Bowe
45f858db15 fix: scope of json var, brining the %inc _inside_ the macro 2021-01-03 22:35:56 +00:00
Allan Bowe
b4d97a063a fix: doc update for lua files, plus leftover reference in code 2021-01-03 22:30:41 +00:00
16 changed files with 1661 additions and 43 deletions

822
all.sas
View File

@@ -577,11 +577,18 @@ options noquotelenmax;
@brief Adds custom quotes / delimiters to a delimited string
@details Can be used in open code, eg as follows:
%put %mf_getquotedstr(blah blah blah);
%put %mf_getquotedstr(blah blah blah);
which returns:
> 'blah','blah','blah'
Alternatively:
%put %mf_getquotedstr(these words are double quoted,quote=D)
for:
> "these","words","are","double","quoted"
@param in_str the unquoted, spaced delimited string to transform
@param dlm= the delimeter to be applied to the output (default comma)
@param indlm= the delimeter used for the input (default is space)
@@ -4754,6 +4761,97 @@ proc sql
%mp_binarycopy(inloc="&inloc",outref=_webout)
%end;
%mend;/**
@file
@brief Runs arbitrary code for a specified amount of time
@details Executes a series of procs and data steps to enable performance
testing of arbitrary jobs.
%mp_testjob(
duration=60*5
)
@param [in] duration= the time in seconds which the job should run for. Actual
time may vary, as the check is done in between steps. Default = 30 (seconds).
<h4> SAS Macros </h4>
@li mf_getuniquelibref.sas
@li mf_getuniquename.sas
@li mf_mkdir.sas
@version 9.4
@author Allan Bowe
**/
%macro mp_testjob(duration=30
)/*/STORE SOURCE*/;
%local lib dir ds1 ds2 ds3 start_tm i;
%let start_tm=%sysfunc(datetime());
%let duration=%sysevalf(&duration);
/* create a temporary library in WORK */
%let lib=%mf_getuniquelibref();
%let dir=%mf_getuniquename();
%mf_mkdir(%sysfunc(pathname(work))/&dir)
libname &lib "%sysfunc(pathname(work))/&dir";
/* loop through until time expires */
%let ds1=%mf_getuniquename();
%let ds2=%mf_getuniquename();
%let ds3=%mf_getuniquename();
%do i=0 %to 1;
/* create big dataset */
data &lib..&ds1(compress=no );
do x=1 to 1000000;
randnum0=ranuni(0)*3;
randnum1=ranuni(0)*2;
bigchar=repeat('A',300);
output;
end;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc summary ;
class randnum0 randnum1;
output out=&lib..&ds2;
run;quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* add more data */
proc sql;
create table &lib..&ds3 as
select *, ranuni(0)*10 as randnum2
from &lib..&ds1
order by randnum1;
quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc sort data=&lib..&ds3;
by descending x;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* wait 5 seconds */
data _null_;
call sleep(5,1);
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
%let i=0;
%end;
%gate:
%put time is up!;
proc datasets lib=&lib kill;
run;
quit;
libname &lib clear;
%mend;/**
@file mp_testwritespeedlibrary.sas
@brief Tests the write speed of a new table in a SAS library
@@ -12374,7 +12472,7 @@ data _null_;
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json2sas.decode(io.read())
local resp=json.decode(io.read())
local job=resp["code"]
outfile:write(job)
io.close(infile)
@@ -12396,6 +12494,175 @@ data _null_;
run;
filename &fname1 clear;
filename &fname2 clear;
%mend;
/**
@file
@brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output
dataset with the following structure:
| uri | state | timestamp |
|---------------------------------------------------------------|---------|--------------------|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
To query the running job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a long running job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
data _null_;
call sleep(5,1);
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it, grab the uri, and finally, check the job status:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
data _null_;
set work.info;
if method='GET' and rel='state';
call symputx('uri',uri);
run;
%mv_getjobstate(uri=&uri,outds=results)
You can run this macro as part of a loop to await the final 'completed' status.
The full list of status values is:
@li idle
@li pending
@li running
@li canceled
@li completed
@li failed
If you have one or more jobs that you'd like to wait for completion you can
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outds= The output dataset in which to APPEND the status. Three
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
exist, it is created.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
**/
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
%local fname0;
%let fname0=%mf_getuniquefileref();
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data;
format uri $128. state $32. timestamp datetime19.;
infile &fname0;
uri="&uri";
timestamp=datetime();
input;
state=_infile_;
run;
proc append base=&outds data=&syslast;
run;
filename &fname0 clear;
%mend;
/**
@file mv_getrefreshtoken.sas
@@ -12660,23 +12927,24 @@ libname &libref1 clear;
,paramstring=%str("macvarname":"macvarvalue","answer":42)
)
@param access_token_var= The global macro variable to contain the access token
@param grant_type= valid values:
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@param path= The SAS Drive path to the job being executed
@param name= The name of the job to execute
@param paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] path= The SAS Drive path to the job being executed
@param [in] name= The name of the job to execute
@param [in] paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
@param contextName= Context name with which to run the job.
@param [in] contextName= Context name with which to run the job.
Default = `SAS Job Execution compute context`
@param outds= The output dataset containing links (Default=work.mv_jobexecute)
@param [out] outds= The output dataset containing links (Default=work.mv_jobexecute)
@version VIYA V.03.04
@@ -12708,7 +12976,7 @@ libname &libref1 clear;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
@@ -12762,9 +13030,11 @@ data _null_;
length joburi contextname $128 paramstring $32765;
joburi=quote(trim(symget('joburi')));
contextname=quote(trim(symget('contextname')));
_program=quote("&path/&name");
paramstring=symget('paramstring');
put '{"jobDefinitionUri":' joburi ;
put ' ,"arguments":{"_contextName":' contextname;
put ' ,"_program":' _program;
if paramstring ne "0" then do;
put ' ,' paramstring;
end;
@@ -12795,6 +13065,7 @@ libname &libref JSON fileref=&fname1;
data &outds;
set &libref..links;
_program="&path/&name";
run;
/* clear refs */
@@ -12802,6 +13073,520 @@ filename &fname0 clear;
filename &fname1 clear;
libname &libref;
%mend;/**
@file
@brief Execute a series of job flows
@details Very (very) simple flow manager. Jobs execute in sequential waves,
all previous waves must finish successfully.
The input table is formed as per below. Each observation represents one job.
Each variable is converted into a macro variable with the same name.
## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself
Any additional variables provided in this table are converted into macro
variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
## Output table (minimum variables produced)
@li _PROGRAM - the SAS Drive path of the job
@li URI - the URI of the executed job
@li STATE - the completed state of the job
@li TIMESTAMP - the datetime that the job completed
@li JOBPARAMS - the parameters that were passed to the job
@li FLOW_ID - the id of the flow in which the job was executed
![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create some jobs (in this case, as web services):
filename ft15f001 temp;
parmcards4;
%put this is job: &_program;
%put this was run in flow &flow_id;
data ;
rand=ranuni(0)*&macrovar1;
do x=1 to rand;
y=rand*&macrovar2;
if y=100 then abort;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo1)
%mv_createwebservice(path=/Public/temp,name=demo2)
Prepare an input table with 60 executions:
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 3;
do i=1 to 20;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results
@version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mf_nobs.sas
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvarlist.sas
@li mv_jobwaitfor.sas
@li mv_jobexecute.sas
**/
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
,maxconcurrency=8
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(Input dataset was not provided)
)
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds:
_CONTEXTNAME FLOW_ID _PROGRAM)
)
%mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer)
)
%local missings;
proc sql noprint;
select count(*) into: missings
from &inds
where flow_id is null or _program is null;
%mp_abort(iftrue=(&missings>0)
,mac=&sysmacroname
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
)
%if %mf_nobs(&inds)=0 %then %do;
%put No observations in &inds! Leaving macro &sysmacroname;
%return;
%end;
/* ensure output table is available */
data &outds;run;
proc sql;
drop table &outds;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* get flows */
proc sort data=&inds;
by flow_id;
run;
data _null_;
set &inds (keep=flow_id) end=last;
by flow_id;
if last.flow_id then do;
cnt+1;
call symputx(cats('flow',cnt),flow_id,'l');
end;
if last then call symputx('flowcnt',cnt,'l');
run;
/* prepare temporary datasets and frefs */
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
data;run;%let jds=&syslast;
data;run;%let jjson=&syslast;
data;run;%let jdsapp=&syslast;
data;run;%let jdsrunning=&syslast;
data;run;%let jdswaitfor=&syslast;
%let jfref=%mf_getuniquefileref();
/* start loop */
%do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local joburi&jid;
%let joburi&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
%end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uri in (select uri from &outds
where state in ('canceled','completed','failed')
);
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
%end;
%end;
%end;
/* back up and execute the next flow */
%end;
%mend;
/**
@file
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
jobs are completed. Completion is determined by reference to the returned
_state_, as per the following table:
| state | Wait? | Notes|
|-----------|-------|------|
| idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
| pending | yes | Job is preparing to run |
| running | yes | Job is running|
| canceled | no | Job was cancelled|
| completed | no | Job finished - does not mean it was successful. Check stateDetails|
| failed | no | Job failed to execute, could be a problem when calling the apis|
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, as a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000000;
do x=1 to rand;
y=rand*x;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Then, execute the job,multiple times, and wait for them all to finish:
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
data work.jobs;
set work.ds1 work.ds2 work.ds3 work.ds4;
where method='GET' and rel='state';
run;
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
Delete the job:
%mv_deletejes(path=/Public/temp,name=demo)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
- password
- authorization_code
- detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
- sas_services - will use oauth_bearer=sas_services
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
completes, processing will continue). Default=ALL.
@param [in] inds= The input dataset containing the list of job uris, in the
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
job name. The uri should be in a `uri` variable, and the job path/name
should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute)
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> Dependencies </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvar.sas
@li mf_nobs.sas
**/
%macro mv_jobwaitfor(action
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,inds=0
,outds=work.mv_jobwaitfor
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(input dataset not provided)
)
%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
,mac=&sysmacroname
,msg=%str(The URI variable was not found in the input dataset(&inds))
)
%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
,mac=&sysmacroname
,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
)
%if %mf_nobs(&inds)=0 %then %do;
%put NOTE: Zero observations in &inds, &sysmacroname will now exit;
%return;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_;
length jobparams $32767;
set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l');
run;
%local runcnt;
%if &action=ALL %then %let runcnt=&uricnt;
%else %if &action=ANY %then %let runcnt=1;
%else %let runcnt=&uricnt;
%local fname0 ;
%let fname0=%mf_getuniquefileref();
data &outds;
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop;
run;
%local i;
%do i=1 %to &uricnt;
%if "&&joburi&i" ne "0" %then %do;
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%let status=notset;
data _null_;
infile &fname0;
input;
call symputx('status',_infile_,'l');
run;
%if &status=completed or &status=failed or &status=canceled %then %do;
proc sql;
insert into &outds set
_program="&&jobname&i",
uri="&&joburi&i",
state="&status",
timestamp=datetime(),
jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */
%end;
%else %if &status=idle or &status=pending or &status=running %then %do;
data _null_;
call sleep(1,1);
run;
%end;
%else %do;
%mp_abort(mac=&sysmacroname
,msg=%str(status &status not expected!!)
)
%end;
%end;
%if &i=&uricnt %then %do;
%local goback;
%let goback=0;
proc sql noprint;
select count(*) into:goback from &outds;
%if &goback lt &runcnt %then %let i=0;
%end;
%end;
/* clear refs */
filename &fname0 clear;
%mend;/**
@file mv_registerclient.sas
@brief Register Client and Secret (admin task)
@@ -13549,7 +14334,7 @@ filename &fref1 clear;
data _null_;
file "%sysfunc(pathname(work))/ml_json.lua";
put '-- ';
put '-- json.lua (modified from json.lua) ';
put '-- json.lua ';
put '-- ';
put '-- Copyright (c) 2019 rxi ';
put '-- ';
@@ -13572,7 +14357,7 @@ data _null_;
put '-- SOFTWARE. ';
put '-- ';
put ' ';
put 'local json = { _version = "0.1.2" } ';
put 'json = { _version = "0.1.2" } ';
put ' ';
put '------------------------------------------------------------------------------- ';
put '-- Encode ';
@@ -13920,6 +14705,7 @@ data _null_;
put ' ';
put 'return json ';
run;
%mend;
%inc "%sysfunc(pathname(work))/ml_json.lua";
%mend;

31
base/mf_existfileref.sas Normal file
View File

@@ -0,0 +1,31 @@
/**
@file
@brief Checks whether a fileref exists
@details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax!
For this macro, if the fileref exists but the underlying file does not exist
@param fref the fileref to detect
@return output returns 1 if found AND the file exists. 0 is returned if not
found, and -1 is returned if the fileref is found but the file does not exist.
@version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
**/
%macro mf_existfileref(fref
)/*/STORE SOURCE*/;
%local result;
%let result=%sysfunc(fileref(&fref));
%if &result>0 %then %do;
0
%end;
%else %if &result=0 %then %do;
1
%end;
%else %do;
-1
%end;
%mend;

View File

@@ -3,11 +3,18 @@
@brief Adds custom quotes / delimiters to a delimited string
@details Can be used in open code, eg as follows:
%put %mf_getquotedstr(blah blah blah);
%put %mf_getquotedstr(blah blah blah);
which returns:
> 'blah','blah','blah'
Alternatively:
%put %mf_getquotedstr(these words are double quoted,quote=D)
for:
> "these","words","are","double","quoted"
@param in_str the unquoted, spaced delimited string to transform
@param dlm= the delimeter to be applied to the output (default comma)
@param indlm= the delimeter used for the input (default is space)

92
base/mp_testjob.sas Normal file
View File

@@ -0,0 +1,92 @@
/**
@file
@brief Runs arbitrary code for a specified amount of time
@details Executes a series of procs and data steps to enable performance
testing of arbitrary jobs.
%mp_testjob(
duration=60*5
)
@param [in] duration= the time in seconds which the job should run for. Actual
time may vary, as the check is done in between steps. Default = 30 (seconds).
<h4> SAS Macros </h4>
@li mf_getuniquelibref.sas
@li mf_getuniquename.sas
@li mf_mkdir.sas
@version 9.4
@author Allan Bowe
**/
%macro mp_testjob(duration=30
)/*/STORE SOURCE*/;
%local lib dir ds1 ds2 ds3 start_tm i;
%let start_tm=%sysfunc(datetime());
%let duration=%sysevalf(&duration);
/* create a temporary library in WORK */
%let lib=%mf_getuniquelibref();
%let dir=%mf_getuniquename();
%mf_mkdir(%sysfunc(pathname(work))/&dir)
libname &lib "%sysfunc(pathname(work))/&dir";
/* loop through until time expires */
%let ds1=%mf_getuniquename();
%let ds2=%mf_getuniquename();
%let ds3=%mf_getuniquename();
%do i=0 %to 1;
/* create big dataset */
data &lib..&ds1(compress=no );
do x=1 to 1000000;
randnum0=ranuni(0)*3;
randnum1=ranuni(0)*2;
bigchar=repeat('A',300);
output;
end;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc summary ;
class randnum0 randnum1;
output out=&lib..&ds2;
run;quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* add more data */
proc sql;
create table &lib..&ds3 as
select *, ranuni(0)*10 as randnum2
from &lib..&ds1
order by randnum1;
quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc sort data=&lib..&ds3;
by descending x;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* wait 5 seconds */
data _null_;
call sleep(5,1);
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
%let i=0;
%end;
%gate:
%put time is up!;
proc datasets lib=&lib kill;
run;
quit;
libname &lib clear;
%mend;

View File

@@ -21,9 +21,10 @@ for file in files:
with open(file) as infile:
for line in infile:
ml.write(" put '" + line.rstrip().replace("'","''") + " ';\n")
ml.write("run;\n")
ml.write("%mend;\n\n")
ml.write("%inc \"%sysfunc(pathname(work))/" + name + ".lua\";\n")
ml.write("run;\n\n")
ml.write("%inc \"%sysfunc(pathname(work))/" + name + ".lua\";\n\n")
ml.write("%mend;\n")
ml.close()
# prepare web files

View File

@@ -19,7 +19,7 @@ HTML_FOOTER = ./doxy/new_footer.html
HTML_EXTRA_STYLESHEET = ./doxy/new_stylesheet.css
INHERIT_DOCS = NO
INLINE_INFO = NO
INPUT = base meta metax viya
INPUT = base meta metax viya lua
LAYOUT_FILE = ./doxy/DoxygenLayout.xml
MAX_INITIALIZER_LINES = 0
PROJECT_NAME = Macro Core

View File

@@ -16,6 +16,7 @@ mkdir $BUILD_FOLDER
cp -r base $BUILD_FOLDER
cp -r meta $BUILD_FOLDER
cp -r metax $BUILD_FOLDER
cp -r lua $BUILD_FOLDER
cp -r viya $BUILD_FOLDER
cp -r doxy $BUILD_FOLDER
cp main.dox $BUILD_FOLDER

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,5 +1,5 @@
--
-- json.lua (modified from json.lua)
-- json.lua
--
-- Copyright (c) 2019 rxi
--
@@ -22,7 +22,7 @@
-- SOFTWARE.
--
local json = { _version = "0.1.2" }
json = { _version = "0.1.2" }
-------------------------------------------------------------------------------
-- Encode
@@ -368,4 +368,4 @@ function json.decode(str)
return res
end
return json
return json

View File

@@ -13,7 +13,7 @@
data _null_;
file "%sysfunc(pathname(work))/ml_json.lua";
put '-- ';
put '-- json.lua (modified from json.lua) ';
put '-- json.lua ';
put '-- ';
put '-- Copyright (c) 2019 rxi ';
put '-- ';
@@ -36,7 +36,7 @@ data _null_;
put '-- SOFTWARE. ';
put '-- ';
put ' ';
put 'local json = { _version = "0.1.2" } ';
put 'json = { _version = "0.1.2" } ';
put ' ';
put '------------------------------------------------------------------------------- ';
put '-- Encode ';
@@ -384,6 +384,7 @@ data _null_;
put ' ';
put 'return json ';
run;
%mend;
%inc "%sysfunc(pathname(work))/ml_json.lua";
%mend;

View File

@@ -50,4 +50,15 @@
* No X command
* Prefixes: _mv_
*/
/*! \dir lua
* \brief Lua macros
* \details These macros have the following attributes:
* OS independent
* Work as LUA functions (they are immediately executed/compiled)
* Auto-generated from the plain source `.lua` files in the same directory
* Prefixes: _ml_
*/

View File

@@ -125,7 +125,7 @@ data _null_;
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json2sas.decode(io.read())
local resp=json.decode(io.read())
local job=resp["code"]
outfile:write(job)
io.close(infile)

169
viya/mv_getjobstate.sas Normal file
View File

@@ -0,0 +1,169 @@
/**
@file
@brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output
dataset with the following structure:
| uri | state | timestamp |
|---------------------------------------------------------------|---------|--------------------|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
To query the running job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a long running job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
data _null_;
call sleep(5,1);
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it, grab the uri, and finally, check the job status:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
data _null_;
set work.info;
if method='GET' and rel='state';
call symputx('uri',uri);
run;
%mv_getjobstate(uri=&uri,outds=results)
You can run this macro as part of a loop to await the final 'completed' status.
The full list of status values is:
@li idle
@li pending
@li running
@li canceled
@li completed
@li failed
If you have one or more jobs that you'd like to wait for completion you can
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outds= The output dataset in which to APPEND the status. Three
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
exist, it is created.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
**/
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
%local fname0;
%let fname0=%mf_getuniquefileref();
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data;
format uri $128. state $32. timestamp datetime19.;
infile &fname0;
uri="&uri";
timestamp=datetime();
input;
state=_infile_;
run;
proc append base=&outds data=&syslast;
run;
filename &fname0 clear;
%mend;

View File

@@ -23,23 +23,24 @@
,paramstring=%str("macvarname":"macvarvalue","answer":42)
)
@param access_token_var= The global macro variable to contain the access token
@param grant_type= valid values:
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@param path= The SAS Drive path to the job being executed
@param name= The name of the job to execute
@param paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] path= The SAS Drive path to the job being executed
@param [in] name= The name of the job to execute
@param [in] paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
@param contextName= Context name with which to run the job.
@param [in] contextName= Context name with which to run the job.
Default = `SAS Job Execution compute context`
@param outds= The output dataset containing links (Default=work.mv_jobexecute)
@param [out] outds= The output dataset containing links (Default=work.mv_jobexecute)
@version VIYA V.03.04
@@ -71,7 +72,7 @@
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
@@ -125,9 +126,11 @@ data _null_;
length joburi contextname $128 paramstring $32765;
joburi=quote(trim(symget('joburi')));
contextname=quote(trim(symget('contextname')));
_program=quote("&path/&name");
paramstring=symget('paramstring');
put '{"jobDefinitionUri":' joburi ;
put ' ,"arguments":{"_contextName":' contextname;
put ' ,"_program":' _program;
if paramstring ne "0" then do;
put ' ,' paramstring;
end;
@@ -158,6 +161,7 @@ libname &libref JSON fileref=&fname1;
data &outds;
set &libref..links;
_program="&path/&name";
run;
/* clear refs */

302
viya/mv_jobflow.sas Normal file
View File

@@ -0,0 +1,302 @@
/**
@file
@brief Execute a series of job flows
@details Very (very) simple flow manager. Jobs execute in sequential waves,
all previous waves must finish successfully.
The input table is formed as per below. Each observation represents one job.
Each variable is converted into a macro variable with the same name.
## Input table (minimum variables needed)
@li FLOW_ID - Numeric value, provides sequential ordering capability
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.
@li _PROGRAM - Provides the path to the job itself
Any additional variables provided in this table are converted into macro
variables and passed into the relevant job.
| FLOW_ID| _CONTEXTNAME |_PROGRAM|
|---|---|---|
|0|SAS Job Execution compute context|/Public/jobs/somejob1|
|0|SAS Job Execution compute context|/Public/jobs/somejob2|
## Output table (minimum variables produced)
@li _PROGRAM - the SAS Drive path of the job
@li URI - the URI of the executed job
@li STATE - the completed state of the job
@li TIMESTAMP - the datetime that the job completed
@li JOBPARAMS - the parameters that were passed to the job
@li FLOW_ID - the id of the flow in which the job was executed
![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create some jobs (in this case, as web services):
filename ft15f001 temp;
parmcards4;
%put this is job: &_program;
%put this was run in flow &flow_id;
data ;
rand=ranuni(0)*&macrovar1;
do x=1 to rand;
y=rand*&macrovar2;
if y=100 then abort;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo1)
%mv_createwebservice(path=/Public/temp,name=demo2)
Prepare an input table with 60 executions:
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 3;
do i=1 to 20;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
Trigger the flow
%mv_jobflow(inds=work.inputjobs,outds=work.results,maxconcurrency=4)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [out] outds= The output dataset containing the results
@version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mf_nobs.sas
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvarlist.sas
@li mv_jobwaitfor.sas
@li mv_jobexecute.sas
**/
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
,maxconcurrency=8
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(Input dataset was not provided)
)
%mp_abort(iftrue=(%mf_existVarList(&inds,_CONTEXTNAME FLOW_ID _PROGRAM)=0)
,mac=&sysmacroname
,msg=%str(The following columns must exist on input dataset &inds:
_CONTEXTNAME FLOW_ID _PROGRAM)
)
%mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer)
)
%local missings;
proc sql noprint;
select count(*) into: missings
from &inds
where flow_id is null or _program is null;
%mp_abort(iftrue=(&missings>0)
,mac=&sysmacroname
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
)
%if %mf_nobs(&inds)=0 %then %do;
%put No observations in &inds! Leaving macro &sysmacroname;
%return;
%end;
/* ensure output table is available */
data &outds;run;
proc sql;
drop table &outds;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* get flows */
proc sort data=&inds;
by flow_id;
run;
data _null_;
set &inds (keep=flow_id) end=last;
by flow_id;
if last.flow_id then do;
cnt+1;
call symputx(cats('flow',cnt),flow_id,'l');
end;
if last then call symputx('flowcnt',cnt,'l');
run;
/* prepare temporary datasets and frefs */
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
data;run;%let jds=&syslast;
data;run;%let jjson=&syslast;
data;run;%let jdsapp=&syslast;
data;run;%let jdsrunning=&syslast;
data;run;%let jdswaitfor=&syslast;
%let jfref=%mf_getuniquefileref();
/* start loop */
%do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local joburi&jid;
%let joburi&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
%if "&&joburi&jid"="0" and &concurrency<&maxconcurrency %then %do;
/* job has not been triggered and we have free slots */
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
%end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uri &&joburi&jid completed!;
%let job&jid=0;
%end;
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uri in (select uri from &outds
where state in ('canceled','completed','failed')
);
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
%end;
%end;
%end;
/* back up and execute the next flow */
%end;
%mend;

213
viya/mv_jobwaitfor.sas Normal file
View File

@@ -0,0 +1,213 @@
/**
@file
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
jobs are completed. Completion is determined by reference to the returned
_state_, as per the following table:
| state | Wait? | Notes|
|-----------|-------|------|
| idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
| pending | yes | Job is preparing to run |
| running | yes | Job is running|
| canceled | no | Job was cancelled|
| completed | no | Job finished - does not mean it was successful. Check stateDetails|
| failed | no | Job failed to execute, could be a problem when calling the apis|
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, as a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000000;
do x=1 to rand;
y=rand*x;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Then, execute the job,multiple times, and wait for them all to finish:
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
data work.jobs;
set work.ds1 work.ds2 work.ds3 work.ds4;
where method='GET' and rel='state';
run;
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
Delete the job:
%mv_deletejes(path=/Public/temp,name=demo)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
- password
- authorization_code
- detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
- sas_services - will use oauth_bearer=sas_services
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
completes, processing will continue). Default=ALL.
@param [in] inds= The input dataset containing the list of job uris, in the
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
job name. The uri should be in a `uri` variable, and the job path/name
should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute)
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> Dependencies </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvar.sas
@li mf_nobs.sas
**/
%macro mv_jobwaitfor(action
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,inds=0
,outds=work.mv_jobwaitfor
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(input dataset not provided)
)
%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
,mac=&sysmacroname
,msg=%str(The URI variable was not found in the input dataset(&inds))
)
%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
,mac=&sysmacroname
,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
)
%if %mf_nobs(&inds)=0 %then %do;
%put NOTE: Zero observations in &inds, &sysmacroname will now exit;
%return;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_;
length jobparams $32767;
set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l');
run;
%local runcnt;
%if &action=ALL %then %let runcnt=&uricnt;
%else %if &action=ANY %then %let runcnt=1;
%else %let runcnt=&uricnt;
%local fname0 ;
%let fname0=%mf_getuniquefileref();
data &outds;
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop;
run;
%local i;
%do i=1 %to &uricnt;
%if "&&joburi&i" ne "0" %then %do;
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%let status=notset;
data _null_;
infile &fname0;
input;
call symputx('status',_infile_,'l');
run;
%if &status=completed or &status=failed or &status=canceled %then %do;
proc sql;
insert into &outds set
_program="&&jobname&i",
uri="&&joburi&i",
state="&status",
timestamp=datetime(),
jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */
%end;
%else %if &status=idle or &status=pending or &status=running %then %do;
data _null_;
call sleep(1,1);
run;
%end;
%else %do;
%mp_abort(mac=&sysmacroname
,msg=%str(status &status not expected!!)
)
%end;
%end;
%if &i=&uricnt %then %do;
%local goback;
%let goback=0;
proc sql noprint;
select count(*) into:goback from &outds;
%if &goback lt &runcnt %then %let i=0;
%end;
%end;
/* clear refs */
filename &fname0 clear;
%mend;