1
0
mirror of https://github.com/sasjs/core.git synced 2025-12-11 14:34:35 +00:00

Compare commits

...

25 Commits

Author SHA1 Message Date
b5c86e7031 fix: mv_jobflow param mixup, not all jobs were running (fixed now). Also fixed doc formatting, removed unnecessary logging, and fixed a debug switch. 2021-01-31 17:58:13 +01:00
9783edd0e3 feat: adding outref option to mv_jobflow so that logs of submitted jobs can be captured. Also making the context name and flow id optional in the input table, for ease of use. 2021-01-29 12:56:12 +01:00
961728a987 chore: updating header link 2021-01-27 00:28:26 +01:00
4b34322d94 feat: mv_getjoblog.sas macro - will fetch a SAS log from an executed SAS Viya log and append it to a fileref.
mv_jobwaitfor is updated to allow the log to be fetched for all the submitted jobs.
2021-01-27 00:14:21 +01:00
8bb83deede fix: updating return codes 2021-01-26 16:04:44 +01:00
79c81aa8a4 feat: mf_existfileref macro 2021-01-26 16:00:23 +01:00
bbbcf7d550 chore: updating the docs for mf_getquotedstr 2021-01-23 13:37:15 +02:00
82184bc6be fix: adding quit statement so that exit loop would work on step boundary 2021-01-21 21:55:07 +02:00
efc731cfaa feat: mp_testjob macro for running arbitrary long jobs 2021-01-21 21:48:05 +02:00
da9a74ee14 chore: updating doxy headers 2021-01-21 21:47:41 +02:00
94762d9381 feat: mv_jobflow macro - enables a SAS program to kick off multiple waves of SAS Viya jobs, and to limit those waves by a maximum number of parallel (concurrent) running jobs. 2021-01-16 21:34:17 +02:00
03d9d805ff fix: adding support for jobparams in output table for mv_jobwaitfor 2021-01-16 20:43:15 +02:00
94416028b7 fix: adding ACTION parameter to mv_jobwaitfor - can now wait for ANY or ALL jobs to finish 2021-01-16 19:08:38 +02:00
6cf5d4ef28 chore: updating the header description 2021-01-15 23:12:38 +02:00
e4ceaecfb2 feat: adding mv_getjobstate macro to fetch the state of a running SAS Viya job 2021-01-15 13:02:53 +02:00
Allan Bowe
2eb246c543 fix: removing favicon file 2021-01-14 18:07:42 +01:00
d9954ae777 fix: renegade comma 2021-01-14 16:55:17 +02:00
364dc9f07f feat: adding _program value to mv_jobexecute.sas 2021-01-14 16:37:58 +02:00
d96125c3cf fix: mv_jobwaitfor 2021-01-05 17:14:03 +00:00
506695be56 feat: mv_jobwaitfor macro, similar to waitfor statement (in concept) - will wait for ALL of a set of viya jobs to finish executing 2021-01-05 14:41:27 +00:00
Allan Bowe
45f858db15 fix: scope of json var, brining the %inc _inside_ the macro 2021-01-03 22:35:56 +00:00
Allan Bowe
b4d97a063a fix: doc update for lua files, plus leftover reference in code 2021-01-03 22:30:41 +00:00
Allan Bowe
4df8f3b4c2 feat: mv_getjobcode macro, introducing LUA macros 2021-01-03 22:16:11 +00:00
Allan Bowe
11aa484996 chore: documentation 2020-12-30 17:06:04 +00:00
Allan Bowe
b9fd79bd5e fix: debugging in mm_assignlib 2020-12-25 16:58:30 +00:00
29 changed files with 2688 additions and 356 deletions

View File

@@ -40,6 +40,27 @@ Documentation: https://sasjs.github.io/core.github.io/files.html
- X command enabled
- Prefixes: _mmw_,_mmu_,_mmx_
**lua** library
Wait - this is a macro library - what is LUA doing here? Well, it is a little known fact that you CAN run LUA within a SAS Macro. It has to be written to a text file with a `.lua` extension, from where you can `%include` it. So, without using the `proc lua` wrapper.
To contribute, simply write your freeform LUA in the LUA folder. Then run the `build.py`, which will convert your LUA into a data step with put statements, and create the macro wrapper with a `ml_` prefix. You can then use your module in any program by running:
```
/* compile the lua module */
%ml_yourmodule()
/* Execute. Do not use the restart keyword! */
proc lua;
submit;
print(yourStuff);
endsubmit;
run;
```
- X command enabled
- Prefixes: _mmw_,_mmu_,_mmx_
# Installation
First, download the repo to a location your SAS system can access. Then update your sasautos path to include the components you wish to have available,eg:
@@ -72,6 +93,7 @@ filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
- _mm_ for metadata macros (interface with the metadata server).
- _mmx_ for macros that use metadata and are XCMD enabled
- _mx_ for macros that are XCMD enabled
- _ml_ for macros that are used to compile LUA modules
- _mv_ for macros that will only work in Viya
- follow verb-noun convention
- unix style line endings (lf)

1452
all.sas

File diff suppressed because it is too large Load Diff

27
base/mf_existfileref.sas Normal file
View File

@@ -0,0 +1,27 @@
/**
@file
@brief Checks whether a fileref exists
@details You can probably do without this macro as it is just a one liner.
Mainly it is here as a convenient way to remember the syntax!
@param fref the fileref to detect
@return output Returns 1 if found and 0 if not found. Note - it is possible
that the fileref is found, but the file does not (yet) exist. If you need
to test for this, you may as well use the fileref function directly.
@version 8
@author [Allan Bowe](https://www.linkedin.com/in/allanbowe/)
**/
%macro mf_existfileref(fref
)/*/STORE SOURCE*/;
%if %sysfunc(fileref(&fref))=0 %then %do;
1
%end;
%else %do;
0
%end;
%mend;

View File

@@ -3,8 +3,8 @@
@brief retrieves a key value pair from a control dataset
@details By default, control dataset is work.mp_setkeyvalue. Usage:
%mp_setkeyvalue(someindex,22,type=N)
%put %mf_getkeyvalue(someindex)
%mp_setkeyvalue(someindex,22,type=N)
%put %mf_getkeyvalue(someindex)
@param key Provide a key on which to perform the lookup

View File

@@ -3,11 +3,18 @@
@brief Adds custom quotes / delimiters to a delimited string
@details Can be used in open code, eg as follows:
%put %mf_getquotedstr(blah blah blah);
%put %mf_getquotedstr(blah blah blah);
which returns:
> 'blah','blah','blah'
Alternatively:
%put %mf_getquotedstr(these words are double quoted,quote=D)
for:
> "these","words","are","double","quoted"
@param in_str the unquoted, spaced delimited string to transform
@param dlm= the delimeter to be applied to the output (default comma)
@param indlm= the delimeter used for the input (default is space)

View File

@@ -3,8 +3,8 @@
@brief Logs a key value pair a control dataset
@details If the dataset does not exist, it is created. Usage:
%mp_setkeyvalue(someindex,22,type=N)
%mp_setkeyvalue(somenewindex,somevalue)
%mp_setkeyvalue(someindex,22,type=N)
%mp_setkeyvalue(somenewindex,somevalue)
<h4> SAS Macros </h4>
@li mf_existds.sas
@@ -26,7 +26,7 @@
%if not (%mf_existds(&libds)) %then %do;
data &libds (index=(key/unique));
length key $32 valc $256 valn 8 type $1;
length key $64 valc $2048 valn 8 type $1;
call missing(of _all_);
stop;
run;

92
base/mp_testjob.sas Normal file
View File

@@ -0,0 +1,92 @@
/**
@file
@brief Runs arbitrary code for a specified amount of time
@details Executes a series of procs and data steps to enable performance
testing of arbitrary jobs.
%mp_testjob(
duration=60*5
)
@param [in] duration= the time in seconds which the job should run for. Actual
time may vary, as the check is done in between steps. Default = 30 (seconds).
<h4> SAS Macros </h4>
@li mf_getuniquelibref.sas
@li mf_getuniquename.sas
@li mf_mkdir.sas
@version 9.4
@author Allan Bowe
**/
%macro mp_testjob(duration=30
)/*/STORE SOURCE*/;
%local lib dir ds1 ds2 ds3 start_tm i;
%let start_tm=%sysfunc(datetime());
%let duration=%sysevalf(&duration);
/* create a temporary library in WORK */
%let lib=%mf_getuniquelibref();
%let dir=%mf_getuniquename();
%mf_mkdir(%sysfunc(pathname(work))/&dir)
libname &lib "%sysfunc(pathname(work))/&dir";
/* loop through until time expires */
%let ds1=%mf_getuniquename();
%let ds2=%mf_getuniquename();
%let ds3=%mf_getuniquename();
%do i=0 %to 1;
/* create big dataset */
data &lib..&ds1(compress=no );
do x=1 to 1000000;
randnum0=ranuni(0)*3;
randnum1=ranuni(0)*2;
bigchar=repeat('A',300);
output;
end;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc summary ;
class randnum0 randnum1;
output out=&lib..&ds2;
run;quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* add more data */
proc sql;
create table &lib..&ds3 as
select *, ranuni(0)*10 as randnum2
from &lib..&ds1
order by randnum1;
quit;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
proc sort data=&lib..&ds3;
by descending x;
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
/* wait 5 seconds */
data _null_;
call sleep(5,1);
run;
%if %sysevalf( (%sysfunc(datetime())-&start_tm)>&duration ) %then %goto gate;
%let i=0;
%end;
%gate:
%put time is up!;
proc datasets lib=&lib kill;
run;
quit;
libname &lib clear;
%mend;

View File

@@ -9,19 +9,22 @@ for file in files:
ml = open('lua/' + name + '.sas', "w")
ml.write("/**\n")
ml.write(" @file " + name + '.sas\n')
ml.write(" @brief Creates the " + basename + " file\n")
ml.write(" @brief Compiles the " + basename + " lua file\n")
ml.write(" @details Writes " + basename + " to the work directory\n")
ml.write(" and then includes it.\n")
ml.write(" Usage:\n\n")
ml.write(" %" + name + "()\n\n")
ml.write("**/\n\n")
ml.write("%macro " + name + "();\n")
ml.write("data _null_;\n")
ml.write(" file \"%sysfunc(pathname(work))/" + basename + "\";\n")
ml.write(" file \"%sysfunc(pathname(work))/" + name + ".lua\";\n")
with open(file) as infile:
for line in infile:
ml.write(" put '" + line.rstrip().replace("'","''") + " ';\n")
ml.write("run;\n")
ml.write("run;\n\n")
ml.write("%inc \"%sysfunc(pathname(work))/" + name + ".lua\";\n\n")
ml.write("%mend;\n")
ml.close()
# prepare web files

View File

@@ -19,7 +19,7 @@ HTML_FOOTER = ./doxy/new_footer.html
HTML_EXTRA_STYLESHEET = ./doxy/new_stylesheet.css
INHERIT_DOCS = NO
INLINE_INFO = NO
INPUT = base meta metax viya
INPUT = base meta metax viya lua
LAYOUT_FILE = ./doxy/DoxygenLayout.xml
MAX_INITIALIZER_LINES = 0
PROJECT_NAME = Macro Core

View File

@@ -16,6 +16,7 @@ mkdir $BUILD_FOLDER
cp -r base $BUILD_FOLDER
cp -r meta $BUILD_FOLDER
cp -r metax $BUILD_FOLDER
cp -r lua $BUILD_FOLDER
cp -r viya $BUILD_FOLDER
cp -r doxy $BUILD_FOLDER
cp main.dox $BUILD_FOLDER

Binary file not shown.

Before

Width:  |  Height:  |  Size: 1.1 KiB

View File

@@ -1,5 +1,5 @@
--
-- json2sas.lua (modified from json.lua)
-- json.lua
--
-- Copyright (c) 2019 rxi
--
@@ -22,7 +22,7 @@
-- SOFTWARE.
--
local json2sas = { _version = "0.1.2" }
json = { _version = "0.1.2" }
-------------------------------------------------------------------------------
-- Encode
@@ -122,7 +122,7 @@ encode = function(val, stack)
error("unexpected type '" .. t .. "'")
end
function json2sas.encode(val)
function json.encode(val)
return ( encode(val) )
end
@@ -356,7 +356,7 @@ parse = function(str, idx)
decode_error(str, idx, "unexpected character '" .. chr .. "'")
end
function json2sas.decode(str)
function json.decode(str)
if type(str) ~= "string" then
error("expected argument of type string, got " .. type(str))
end
@@ -368,88 +368,4 @@ function json2sas.decode(str)
return res
end
-- convert macro variable array into one variable and decode
function json2sas.go(macvar)
local x=1
local cnt=0
local mac=sas.symget(macvar..'0')
local newstr=''
if mac and mac ~= '' then
cnt=mac
for x=1,cnt,1 do
mac=sas.symget(macvar..x)
if mac and mac ~= '' then
newstr=newstr..mac
else
return print(macvar..x..' NOT FOUND!!')
end
end
else
return print(macvar..'0 NOT FOUND!!')
end
-- print('mac:'..mac..'cnt:'..cnt..'newstr'..newstr)
local oneVar=json2sas.decode(newstr)
local jsdata=oneVar["data"]
local meta={}
local attrs={}
for tablename, data in pairs(jsdata) do -- each table
print("Processing table: "..tablename)
attrs[tablename]={}
for k, v in ipairs(data) do -- each row
if(k==1) then -- column names
for a, b in pairs(v) do
attrs[tablename][a]={}
attrs[tablename][a]["name"]=b
end
elseif(k==2) then -- get types
for a, b in pairs(v) do
if type(b)=='number' then
attrs[tablename][a]["type"]="N"
attrs[tablename][a]["length"]=8
else
attrs[tablename][a]["type"]="C"
attrs[tablename][a]["length"]=string.len(b)
end
end
else --update lengths
for a, b in pairs(v) do
if (type(b)=='string' and string.len(b)>attrs[tablename][a]["length"])
then
attrs[tablename][a]["length"]=string.len(b)
end
end
end
end
print(json2sas.encode(attrs[tablename])) -- show results
-- Now create the SAS table
sas.new_table("work."..tablename,attrs[tablename])
local dsid=sas.open("work."..tablename, "u")
for k, v in ipairs(data) do
if k>1 then
sas.append(dsid)
for a, b in pairs(v) do
sas.put_value(dsid, attrs[tablename][a]["name"], b)
end
sas.update(dsid)
end
end
sas.close(dsid)
end
return json2sas.decode(newstr)
end
function quote(str)
return sas.quote(str)
end
function sasvar(str)
print("processing: "..str)
print(sas.symexist(str))
if sas.symexist(str)==1 then
return quote(str)..':'..quote(sas.symget(str))..','
end
return ''
end
return json2sas
return json

View File

@@ -1,18 +1,19 @@
/**
@file ml_json2sas.sas
@brief Creates the json2sas.lua file
@details Writes json2sas.lua to the work directory
@file ml_json.sas
@brief Compiles the json.lua lua file
@details Writes json.lua to the work directory
and then includes it.
Usage:
%ml_json2sas()
%ml_json()
**/
%macro ml_json2sas();
%macro ml_json();
data _null_;
file "%sysfunc(pathname(work))/json2sas.lua";
file "%sysfunc(pathname(work))/ml_json.lua";
put '-- ';
put '-- json2sas.lua (modified from json.lua) ';
put '-- json.lua ';
put '-- ';
put '-- Copyright (c) 2019 rxi ';
put '-- ';
@@ -35,7 +36,7 @@ data _null_;
put '-- SOFTWARE. ';
put '-- ';
put ' ';
put 'local json2sas = { _version = "0.1.2" } ';
put 'json = { _version = "0.1.2" } ';
put ' ';
put '------------------------------------------------------------------------------- ';
put '-- Encode ';
@@ -135,7 +136,7 @@ data _null_;
put ' error("unexpected type ''" .. t .. "''") ';
put 'end ';
put ' ';
put 'function json2sas.encode(val) ';
put 'function json.encode(val) ';
put ' return ( encode(val) ) ';
put 'end ';
put ' ';
@@ -369,7 +370,7 @@ data _null_;
put ' decode_error(str, idx, "unexpected character ''" .. chr .. "''") ';
put 'end ';
put ' ';
put 'function json2sas.decode(str) ';
put 'function json.decode(str) ';
put ' if type(str) ~= "string" then ';
put ' error("expected argument of type string, got " .. type(str)) ';
put ' end ';
@@ -381,90 +382,9 @@ data _null_;
put ' return res ';
put 'end ';
put ' ';
put '-- convert macro variable array into one variable and decode ';
put 'function json2sas.go(macvar) ';
put ' local x=1 ';
put ' local cnt=0 ';
put ' local mac=sas.symget(macvar..''0'') ';
put ' local newstr='''' ';
put ' if mac and mac ~= '''' then ';
put ' cnt=mac ';
put ' for x=1,cnt,1 do ';
put ' mac=sas.symget(macvar..x) ';
put ' if mac and mac ~= '''' then ';
put ' newstr=newstr..mac ';
put ' else ';
put ' return print(macvar..x..'' NOT FOUND!!'') ';
put ' end ';
put ' end ';
put ' else ';
put ' return print(macvar..''0 NOT FOUND!!'') ';
put ' end ';
put ' -- print(''mac:''..mac..''cnt:''..cnt..''newstr''..newstr) ';
put ' local oneVar=json2sas.decode(newstr) ';
put ' local jsdata=oneVar["data"] ';
put ' local meta={} ';
put ' local attrs={} ';
put ' for tablename, data in pairs(jsdata) do -- each table ';
put ' print("Processing table: "..tablename) ';
put ' attrs[tablename]={} ';
put ' for k, v in ipairs(data) do -- each row ';
put ' if(k==1) then -- column names ';
put ' for a, b in pairs(v) do ';
put ' attrs[tablename][a]={} ';
put ' attrs[tablename][a]["name"]=b ';
put ' end ';
put ' elseif(k==2) then -- get types ';
put ' for a, b in pairs(v) do ';
put ' if type(b)==''number'' then ';
put ' attrs[tablename][a]["type"]="N" ';
put ' attrs[tablename][a]["length"]=8 ';
put ' else ';
put ' attrs[tablename][a]["type"]="C" ';
put ' attrs[tablename][a]["length"]=string.len(b) ';
put ' end ';
put ' end ';
put ' else --update lengths ';
put ' for a, b in pairs(v) do ';
put ' if (type(b)==''string'' and string.len(b)>attrs[tablename][a]["length"]) ';
put ' then ';
put ' attrs[tablename][a]["length"]=string.len(b) ';
put ' end ';
put ' end ';
put ' end ';
put ' end ';
put ' print(json2sas.encode(attrs[tablename])) -- show results ';
put ' ';
put ' -- Now create the SAS table ';
put ' sas.new_table("work."..tablename,attrs[tablename]) ';
put ' local dsid=sas.open("work."..tablename, "u") ';
put ' for k, v in ipairs(data) do ';
put ' if k>1 then ';
put ' sas.append(dsid) ';
put ' for a, b in pairs(v) do ';
put ' sas.put_value(dsid, attrs[tablename][a]["name"], b) ';
put ' end ';
put ' sas.update(dsid) ';
put ' end ';
put ' end ';
put ' sas.close(dsid) ';
put ' end ';
put ' return json2sas.decode(newstr) ';
put 'end ';
put ' ';
put ' ';
put 'function quote(str) ';
put ' return sas.quote(str) ';
put 'end ';
put 'function sasvar(str) ';
put ' print("processing: "..str) ';
put ' print(sas.symexist(str)) ';
put ' if sas.symexist(str)==1 then ';
put ' return quote(str)..'':''..quote(sas.symget(str))..'','' ';
put ' end ';
put ' return '''' ';
put 'end ';
put ' ';
put 'return json2sas ';
put 'return json ';
run;
%inc "%sysfunc(pathname(work))/ml_json.lua";
%mend;

View File

@@ -50,4 +50,15 @@
* No X command
* Prefixes: _mv_
*/
/*! \dir lua
* \brief Lua macros
* \details These macros have the following attributes:
* OS independent
* Work as LUA functions (they are immediately executed/compiled)
* Auto-generated from the plain source `.lua` files in the same directory
* Prefixes: _ml_
*/

View File

@@ -38,6 +38,7 @@
rc=metadata_getattr(liburi,"Name",LibName);
/* now try and assign it */
if libname("&libref",,'meta',cats('liburi="',liburi,'";')) ne 0 then do;
putlog "&libref could not be assigned";
call symputx('msg',sysmsg(),'l');
if "&mabort"='HARD' then call symputx('mp_abort',1,'l');
end;

View File

@@ -5,17 +5,15 @@
blank to return all groups.
Usage:
- all groups
%mm_getGroups()
- all groups: `%mm_getGroups()`
- all groups for a particular user
%mm_getgroups(user=&sysuserid)
- all groups for a particular user: `%mm_getgroups(user=&sysuserid)`
@param user= the metadata user to return groups for. Leave blank for all
@param [in] user= the metadata user to return groups for. Leave blank for all
groups.
@param outds= the dataset to create that contains the list of groups
@param repo= the metadata repository that contains the user/group information
@param mDebug= set to 1 to show debug messages in the log
@param [in] repo= the metadata repository that contains the user/group information
@param [in] mDebug= set to 1 to show debug messages in the log
@param [out] outds= the dataset to create that contains the list of groups
@returns outds dataset containing all groups in a column named "metagroup"
- groupuri

View File

@@ -5,9 +5,9 @@
Usage:
%mm_getroles()
%mm_getroles()
@param outds the dataset to create that contains the list of roles
@param [out] outds the dataset to create that contains the list of roles
@returns outds dataset containing all roles, with the following columns:
- uri

View File

@@ -4,7 +4,6 @@
@details Expects oauth token in a global macro variable (default
ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public)

View File

@@ -46,7 +46,7 @@
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)

View File

@@ -1,6 +1,6 @@
/**
@file mv_deletejes.sas
@brief Creates a job execution service if it does not already exist
@file
@brief Deletes a Viya Job, if it exists
@details If not executed in Studio 5+ will expect oauth token in a global
macro variable (default ACCESS_TOKEN).

View File

@@ -4,7 +4,6 @@
@details If not running in Studo 5 +, will expect an oauth token in a global
macro variable (default ACCESS_TOKEN).
options mprint;
%mv_createfolder(path=/Public/test/blah)
%mv_deleteviyafolder(path=/Public/test)

View File

@@ -40,7 +40,7 @@
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)

View File

@@ -52,7 +52,7 @@
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)

150
viya/mv_getjobcode.sas Normal file
View File

@@ -0,0 +1,150 @@
/**
@file
@brief Extract the source code from a SAS Viya Job
@details Extracts the SAS code from a Job into a fileref or physical file.
Example:
%mv_getjobcode(
path=/Public/jobs
,name=some_job
,outfile=/tmp/some_job.sas
)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] path= The SAS Drive path of the job
@param [in] name= The name of the job
@param [out] outref= A fileref to which to write the source code
@param [out] outfile= A file to which to write the source code
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mv_getfoldermembers.sas
@li ml_json.sas
**/
%macro mv_getjobcode(outref=0,outfile=0
,name=0,path=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&path"="0")
,mac=&sysmacroname
,msg=%str(Job Path not provided)
)
%mp_abort(iftrue=("&name"="0")
,mac=&sysmacroname
,msg=%str(Job Name not provided)
)
%mp_abort(iftrue=("&outfile"="0" and "&outref"="0")
,mac=&sysmacroname
,msg=%str(Output destination (file or fileref) must be provided)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
data;run;
%local foldermembers;
%let foldermembers=&syslast;
%mv_getfoldermembers(root=&path
,access_token_var=&access_token_var
,grant_type=&grant_type
,outds=&foldermembers
)
%local joburi;
%let joburi=0;
data _null_;
set &foldermembers;
if name="&name" and uri=:'/jobDefinitions/definitions'
then call symputx('joburi',uri);
run;
%mp_abort(iftrue=("&joburi"="0")
,mac=&sysmacroname
,msg=%str(Job &path/&name not found)
)
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&joburi";
headers "Accept"="application/vnd.sas.job.definition+json"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local job=resp["code"]
outfile:write(job)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* export to desired destination */
data _null_;
%if &outref=0 %then %do;
file "&outfile" lrecl=32767;
%end;
%else %do;
file &outref;
%end;
infile &fname2;
input;
put _infile_;
run;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%mend;

267
viya/mv_getjoblog.sas Normal file
View File

@@ -0,0 +1,267 @@
/**
@file
@brief Extract the log from a completed SAS Viya Job
@details Extracts log from a Viya job and writes it out to a fileref
To query the job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
proc sort data=&syslast
by descending y;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
Wait for it to finish, and grab the uri:
data _null_;
set work.info;
if method='GET' and rel='self';
call symputx('uri',uri);
run;
Finally, fetch the log:
%mv_getjoblog(uri=&uri,outref=mylog)
This macro is used by the mv_jobwaitfor.sas macro, which is generally a more
convenient way to wait for the job to finish before fetching the log.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outref= The output fileref to which to APPEND the log (is always
appended).
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_existfileref.sas
@li ml_json.sas
**/
%macro mv_getjoblog(uri=0,outref=0
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1)='state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
%mp_abort(iftrue=(&outref=0)
,mac=&sysmacroname
,msg=%str(Output fileref should be provided)
)
%if %mf_existfileref(&outref) ne 1 %then %do;
filename &outref temp;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* prepare request*/
%local fname1;
%let fname1=%mf_getuniquefileref();
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&uri";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%local fname2 fname3 fpath1 fpath2 fpath3;
%let fname2=%mf_getuniquefileref();
%let fname3=%mf_getuniquefileref();
%let fpath1=%sysfunc(pathname(&fname1));
%let fpath2=%sysfunc(pathname(&fname2));
%let fpath3=%sysfunc(pathname(&fname3));
/* compile the lua JSON module */
%ml_json()
/* read using LUA - this allows the code to be of any length */
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
local logloc=resp["logLocation"]
outfile:write(logloc)
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* get log path*/
%let errflg=1;
%let errmsg=No entry in &fname2 fileref;
data _null_;
infile &fname2;
input;
uri=_infile_;
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,1) ne 'files' or scan(uri,2) ne 'files' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /files/files/$$$$UUID$$$$"
!!" but is actually like: &uri",'l');
end;
call symputx('errflg',0,'l');
call symputx('logloc',uri,'l');
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
/* we have a log uri - now fetch the log */
proc http method='GET' out=&fname1 &oauth_bearer
url="&base_uri&logloc/content";
headers
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end;
;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname1;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(logfetch: &SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data _null_;
file "&fpath3..lua";
put '
infile = io.open (sas.symget("fpath1"), "r")
outfile = io.open (sas.symget("fpath2"), "w")
io.input(infile)
local resp=json.decode(io.read())
for i, v in pairs(resp["items"]) do
outfile:write(v.line,"\n")
end
io.close(infile)
io.close(outfile)
';
run;
%inc "&fpath3..lua";
/* write log out to the specified fileref */
data _null_;
infile &fname2 end=last;
file &outref mod;
if _n_=1 then do;
put "/** SASJS Viya Job Log Extract start: &uri **/";
end;
input;
put _infile_;
%if &mdebug=1 %then %do;
putlog _infile_;
%end;
if last then do;
put "/** SASJS Viya Job Log Extract end: &uri **/";
end;
run;
%if &mdebug=0 %then %do;
filename &fname1 clear;
filename &fname2 clear;
filename &fname3 clear;
%end;
%else %do;
%put _local_;
%end;
%mend;

169
viya/mv_getjobstate.sas Normal file
View File

@@ -0,0 +1,169 @@
/**
@file
@brief Extract the status from a running SAS Viya job
@details Extracts the status from a running job and appends it to an output
dataset with the following structure:
| uri | state | timestamp |
|---------------------------------------------------------------|---------|--------------------|
| /jobExecution/jobs/5cebd840-2063-42c1-be0c-421ec3e1c175/state | running | 15JAN2021:12:35:08 |
To query the running job, you need the URI. Sample code for achieving this
is provided below.
## Example
First, compile the macros:
filename mc url "https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a long running job (in this case, a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000;
do x=1 to rand;
y=rand*4;
output;
end;
run;
data _null_;
call sleep(5,1);
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Execute it, grab the uri, and finally, check the job status:
%mv_jobexecute(path=/Public/temp
,name=demo
,outds=work.info
)
data _null_;
set work.info;
if method='GET' and rel='state';
call symputx('uri',uri);
run;
%mv_getjobstate(uri=&uri,outds=results)
You can run this macro as part of a loop to await the final 'completed' status.
The full list of status values is:
@li idle
@li pending
@li running
@li canceled
@li completed
@li failed
If you have one or more jobs that you'd like to wait for completion you can
also use the [mv_jobwaitfor](/mv__jobwaitfor_8sas.html) macro.
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services.
@param [in] uri= The uri of the running job for which to fetch the status,
in the format `/jobExecution/jobs/$UUID/state` (unquoted).
@param [out] outds= The output dataset in which to APPEND the status. Three
fields are appended: `CHECK_TM`, `URI` and `STATE`. If the dataset does not
exist, it is created.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
**/
%macro mv_getjobstate(uri=0,outds=work.mv_getjobstate
,contextName=SAS Job Execution compute context
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
/* validation in datastep for better character safety */
%local errmsg errflg;
data _null_;
uri=symget('uri');
if length(uri)<12 then do;
call symputx('errflg',1);
call symputx('errmsg',"URI is invalid (too short) - '&uri'",'l');
end;
if scan(uri,-1) ne 'state' or scan(uri,1) ne 'jobExecution' then do;
call symputx('errflg',1);
call symputx('errmsg',
"URI should be in format /jobExecution/jobs/$$$$UUID$$$$/state"
!!" but is actually like: &uri",'l');
end;
run;
%mp_abort(iftrue=(&errflg=1)
,mac=&sysmacroname
,msg=%str(&errmsg)
)
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
%local fname0;
%let fname0=%mf_getuniquefileref();
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&uri";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
data;
format uri $128. state $32. timestamp datetime19.;
infile &fname0;
uri="&uri";
timestamp=datetime();
input;
state=_infile_;
run;
proc append base=&outds data=&syslast;
run;
filename &fname0 clear;
%mend;

View File

@@ -23,23 +23,24 @@
,paramstring=%str("macvarname":"macvarvalue","answer":42)
)
@param access_token_var= The global macro variable to contain the access token
@param grant_type= valid values:
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@param path= The SAS Drive path to the job being executed
@param name= The name of the job to execute
@param paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
* password
* authorization_code
* detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
* sas_services - will use oauth_bearer=sas_services
@param [in] path= The SAS Drive path to the job being executed
@param [in] name= The name of the job to execute
@param [in] paramstring= A JSON fragment with name:value pairs, eg: `"name":"value"`
or "name":"value","name2":42`. This will need to be wrapped in `%str()`.
@param contextName= Context name with which to run the job.
@param [in] contextName= Context name with which to run the job.
Default = `SAS Job Execution compute context`
@param outds= The output dataset containing links (Default=work.mv_jobexecute)
@param [out] outds= The output dataset containing links (Default=work.mv_jobexecute)
@version VIYA V.03.04
@@ -71,7 +72,7 @@
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
@@ -125,9 +126,11 @@ data _null_;
length joburi contextname $128 paramstring $32765;
joburi=quote(trim(symget('joburi')));
contextname=quote(trim(symget('contextname')));
_program=quote("&path/&name");
paramstring=symget('paramstring');
put '{"jobDefinitionUri":' joburi ;
put ' ,"arguments":{"_contextName":' contextname;
put ' ,"_program":' _program;
if paramstring ne "0" then do;
put ' ,' paramstring;
end;
@@ -158,6 +161,7 @@ libname &libref JSON fileref=&fname1;
data &outds;
set &libref..links;
_program="&path/&name";
run;
/* clear refs */

337
viya/mv_jobflow.sas Normal file
View File

@@ -0,0 +1,337 @@
/**
@file
@brief Execute a series of job flows
@details Very (very) simple flow manager. Jobs execute in sequential waves,
all previous waves must finish successfully.
The input table is formed as per below. Each observation represents one job.
Each variable is converted into a macro variable with the same name.
## Input table (minimum variables needed)
@li _PROGRAM - Provides the path to the job itself
@li FLOW_ID - Numeric value, provides sequential ordering capability. Is
optional, will default to 0 if not provided.
@li _CONTEXTNAME - Dictates which context should be used to run the job. If
blank (or not provided), will default to `SAS Job Execution compute context`.
Any additional variables provided in this table are converted into macro
variables and passed into the relevant job.
|_PROGRAM| FLOW_ID (optional)| _CONTEXTNAME (optional) |
|---|---|---|
|/Public/jobs/somejob1|0|SAS Job Execution compute context|
|/Public/jobs/somejob2|0|SAS Job Execution compute context|
## Output table (minimum variables produced)
@li _PROGRAM - the SAS Drive path of the job
@li URI - the URI of the executed job
@li STATE - the completed state of the job
@li TIMESTAMP - the datetime that the job completed
@li JOBPARAMS - the parameters that were passed to the job
@li FLOW_ID - the id of the flow in which the job was executed
![https://i.imgur.com/nZE9PvT.png](https://i.imgur.com/nZE9PvT.png)
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create some jobs (in this case, as web services):
filename ft15f001 temp;
parmcards4;
%put this is job: &_program;
%put this was run in flow &flow_id;
data ;
rand=ranuni(0)*&macrovar1;
do x=1 to rand;
y=rand*&macrovar2;
if y=100 then abort;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo1)
%mv_createwebservice(path=/Public/temp,name=demo2)
Prepare an input table with 60 executions:
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 3;
do i=1 to 20;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
Trigger the flow
%mv_jobflow(inds=work.inputjobs
,maxconcurrency=4
,outds=work.results
,outref=myjoblog
)
data _null_;
infile myjoblog;
input; put _infile_;
run;
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@li detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
@li sas_services - will use oauth_bearer=sas_services
@param [in] inds= The input dataset containing a list of jobs and parameters
@param [in] maxconcurrency= The max number of parallel jobs to run. Default=8.
@param [in] mdebug= set to 1 to enable DEBUG messages
@param [out] outds= The output dataset containing the results
@param [out] outref= The output fileref to which to append the log file(s).
@version VIYA V.03.05
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4>
@li mf_nobs.sas
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvarlist.sas
@li mv_jobwaitfor.sas
@li mv_jobexecute.sas
**/
%macro mv_jobflow(inds=0,outds=work.mv_jobflow
,maxconcurrency=8
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,outref=0
,mdebug=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(Input dataset was not provided)
)
%mp_abort(iftrue=(%mf_existVarList(&inds,_PROGRAM)=0)
,mac=&sysmacroname
,msg=%str(The _PROGRAM column must exist on input dataset &inds)
)
%mp_abort(iftrue=(&maxconcurrency<1)
,mac=&sysmacroname
,msg=%str(The maxconcurrency variable should be a positive integer)
)
/* set defaults if not provided */
%if %mf_existVarList(&inds,_CONTEXTNAME FLOW_ID)=0 %then %do;
data &inds;
%if %mf_existvarList(&inds,_CONTEXTNAME)=0 %then %do;
length _CONTEXTNAME $128;
retain _CONTEXTNAME "SAS Job Execution compute context";
%end;
%if %mf_existvarList(&inds,FLOW_ID)=0 %then %do;
retain FLOW_ID 0;
%end;
set &inds;
run;
%end;
%local missings;
proc sql noprint;
select count(*) into: missings
from &inds
where flow_id is null or _program is null;
%mp_abort(iftrue=(&missings>0)
,mac=&sysmacroname
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
)
%if %mf_nobs(&inds)=0 %then %do;
%put No observations in &inds! Leaving macro &sysmacroname;
%return;
%end;
/* ensure output table is available */
data &outds;run;
proc sql;
drop table &outds;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
/* get flows */
proc sort data=&inds;
by flow_id;
run;
data _null_;
set &inds (keep=flow_id) end=last;
by flow_id;
if last.flow_id then do;
cnt+1;
call symputx(cats('flow',cnt),flow_id,'l');
end;
if last then call symputx('flowcnt',cnt,'l');
run;
/* prepare temporary datasets and frefs */
%local fid jid jds jjson jdsapp jdsrunning jdswaitfor jfref;
data;run;%let jds=&syslast;
data;run;%let jjson=&syslast;
data;run;%let jdsapp=&syslast;
data;run;%let jdsrunning=&syslast;
data;run;%let jdswaitfor=&syslast;
%let jfref=%mf_getuniquefileref();
/* start loop */
%do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local jobuid&jid;
%let jobuid&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
/* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
/* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uuid in (select uuid from &outds
where state in ('canceled','completed','failed')
);
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed of &jcnt jobs completed, &concurrency jobs running;
%end;
%end;
%end;
/* back up and execute the next flow */
%end;
%if &mdebug=1 %then %do;
%put _local_;
%end;
%mend;

221
viya/mv_jobwaitfor.sas Normal file
View File

@@ -0,0 +1,221 @@
/**
@file
@brief Takes a dataset of running jobs and waits for ANY or ALL of them to complete
@details Will poll `/jobs/{jobId}/state` at set intervals until ANY or ALL
jobs are completed. Completion is determined by reference to the returned
_state_, as per the following table:
| state | Wait? | Notes|
|-----------|-------|------|
| idle | yes | We assume processing will continue. Beware of idle sessions with no code submitted! |
| pending | yes | Job is preparing to run |
| running | yes | Job is running|
| canceled | no | Job was cancelled|
| completed | no | Job finished - does not mean it was successful. Check stateDetails|
| failed | no | Job failed to execute, could be a problem when calling the apis|
## Example
First, compile the macros:
filename mc url
"https://raw.githubusercontent.com/sasjs/core/main/all.sas";
%inc mc;
Next, create a job (in this case, as a web service):
filename ft15f001 temp;
parmcards4;
data ;
rand=ranuni(0)*1000000;
do x=1 to rand;
y=rand*x;
output;
end;
run;
;;;;
%mv_createwebservice(path=/Public/temp,name=demo)
Then, execute the job,multiple times, and wait for them all to finish:
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds1)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds2)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds3)
%mv_jobexecute(path=/Public/temp,name=demo,outds=work.ds4)
data work.jobs;
set work.ds1 work.ds2 work.ds3 work.ds4;
where method='GET' and rel='state';
run;
%mv_jobwaitfor(ALL,inds=work.jobs,outds=work.jobstates)
Delete the job:
%mv_deletejes(path=/Public/temp,name=demo)
@param [in] access_token_var= The global macro variable to contain the access token
@param [in] grant_type= valid values:
- password
- authorization_code
- detect - will check if access_token exists, if not will use sas_services if
a SASStudioV session else authorization_code. Default option.
- sas_services - will use oauth_bearer=sas_services
@param [in] action=Either ALL (to wait for every job) or ANY (if one job
completes, processing will continue). Default=ALL.
@param [in] inds= The input dataset containing the list of job uris, in the
following format: `/jobExecution/jobs/&JOBID./state` and the corresponding
job name. The uri should be in a `uri` variable, and the job path/name
should be in a `_program` variable.
@param [out] outds= The output dataset containing the list of states by job
(default=work.mv_jobexecute)
@param [out] outref= A fileref to which the spawned job logs should be appended.
@version VIYA V.03.04
@author Allan Bowe, source: https://github.com/sasjs/core
<h4> Dependencies </h4>
@li mp_abort.sas
@li mf_getplatform.sas
@li mf_getuniquefileref.sas
@li mf_existvar.sas
@li mf_nobs.sas
@li mv_getjoblog.sas
**/
%macro mv_jobwaitfor(action
,access_token_var=ACCESS_TOKEN
,grant_type=sas_services
,inds=0
,outds=work.mv_jobwaitfor
,outref=0
);
%local oauth_bearer;
%if &grant_type=detect %then %do;
%if %symexist(&access_token_var) %then %let grant_type=authorization_code;
%else %let grant_type=sas_services;
%end;
%if &grant_type=sas_services %then %do;
%let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=;
%end;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services
)
,mac=&sysmacroname
,msg=%str(Invalid value for grant_type: &grant_type)
)
%mp_abort(iftrue=("&inds"="0")
,mac=&sysmacroname
,msg=%str(input dataset not provided)
)
%mp_abort(iftrue=(%mf_existvar(&inds,uri)=0)
,mac=&sysmacroname
,msg=%str(The URI variable was not found in the input dataset(&inds))
)
%mp_abort(iftrue=(%mf_existvar(&inds,_program)=0)
,mac=&sysmacroname
,msg=%str(The _PROGRAM variable was not found in the input dataset(&inds))
)
%if %mf_nobs(&inds)=0 %then %do;
%put NOTE: Zero observations in &inds, &sysmacroname will now exit;
%return;
%end;
options noquotelenmax;
%local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_;
length jobparams $32767;
set &inds end=last;
call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l');
call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l');
run;
%local runcnt;
%if &action=ALL %then %let runcnt=&uricnt;
%else %if &action=ANY %then %let runcnt=1;
%else %let runcnt=&uricnt;
%local fname0 ;
%let fname0=%mf_getuniquefileref();
data &outds;
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop;
run;
%local i;
%do i=1 %to &uricnt;
%if "&&joburi&i" ne "0" %then %do;
proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i";
headers "Accept"="text/plain"
%if &grant_type=authorization_code %then %do;
"Authorization"="Bearer &&&access_token_var"
%end; ;
run;
%if &SYS_PROCHTTP_STATUS_CODE ne 200 and &SYS_PROCHTTP_STATUS_CODE ne 201 %then
%do;
data _null_;infile &fname0;input;putlog _infile_;run;
%mp_abort(mac=&sysmacroname
,msg=%str(&SYS_PROCHTTP_STATUS_CODE &SYS_PROCHTTP_STATUS_PHRASE)
)
%end;
%let status=notset;
data _null_;
infile &fname0;
input;
call symputx('status',_infile_,'l');
run;
%if &status=completed or &status=failed or &status=canceled %then %do;
%local plainuri;
%let plainuri=%substr(&&joburi&i,1,55);
proc sql;
insert into &outds set
_program="&&jobname&i",
uri="&plainuri",
state="&status",
timestamp=datetime(),
jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */
/* fetch log */
%if %str(&outref) ne 0 %then %do;
%mv_getjoblog(uri=&plainuri,outref=&outref)
%end;
%end;
%else %if &status=idle or &status=pending or &status=running %then %do;
data _null_;
call sleep(1,1);
run;
%end;
%else %do;
%mp_abort(mac=&sysmacroname
,msg=%str(status &status not expected!!)
)
%end;
%end;
%if &i=&uricnt %then %do;
%local goback;
%let goback=0;
proc sql noprint;
select count(*) into:goback from &outds;
%if &goback lt &runcnt %then %let i=0;
%end;
%end;
/* clear refs */
filename &fname0 clear;
%mend;