1
0
mirror of https://github.com/sasjs/core.git synced 2026-01-03 23:50:06 +00:00

Compare commits

...

1 Commits

2 changed files with 80 additions and 16 deletions

89
all.sas
View File

@@ -12984,6 +12984,8 @@ libname &libref;
The input table is formed as per below. Each observation represents one job. The input table is formed as per below. Each observation represents one job.
Each variable is converted into a macro variable with the same name. Each variable is converted into a macro variable with the same name.
## Input table (minimum variables needed)
| variable| description | | variable| description |
|---|---|---| |---|---|---|
|FLOW_ID| Numeric value, provides sequential ordering capability| |FLOW_ID| Numeric value, provides sequential ordering capability|
@@ -12991,6 +12993,14 @@ libname &libref;
blank, will default to `SAS Job Execution compute context`.| blank, will default to `SAS Job Execution compute context`.|
|_PROGRAM|Provides the path to the job itself| |_PROGRAM|Provides the path to the job itself|
## Output table (minimum variables produced)
| variable| description |
|---|---|---|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
blank, will default to `SAS Job Execution compute context`.|
|_PROGRAM|Provides the path to the job itself|
## Example ## Example
@@ -13010,6 +13020,7 @@ libname &libref;
rand=ranuni(0)*&macrovar1; rand=ranuni(0)*&macrovar1;
do x=1 to rand; do x=1 to rand;
y=rand*&macrovar2; y=rand*&macrovar2;
if y=100 then abort;
output; output;
end; end;
run; run;
@@ -13022,17 +13033,17 @@ libname &libref;
data work.inputjobs; data work.inputjobs;
_contextName='SAS Job Execution compute context'; _contextName='SAS Job Execution compute context';
do flow_id=1 to 3; do flow_id=1 to 3;
do job=1 to 10; do i=1 to 20;
_program='/Public/temp/demo1'; _program='/Public/temp/demo1';
macrovar1=10*job; macrovar1=10*i;
macrovar2=4*job; macrovar2=4*i;
output; output;
i+1;
_program='/Public/temp/demo2'; _program='/Public/temp/demo2';
macrovar1=40*job; macrovar1=40*i;
macrovar2=44*job; macrovar2=44*i;
output; output;
end; end;
drop job;
end; end;
run; run;
@@ -13056,10 +13067,13 @@ libname &libref;
@author Allan Bowe, source: https://github.com/sasjs/core @author Allan Bowe, source: https://github.com/sasjs/core
<h4> SAS Macros </h4> <h4> SAS Macros </h4>
@li mf_nobs.sas
@li mp_abort.sas @li mp_abort.sas
@li mf_getplatform.sas @li mf_getplatform.sas
@li mf_getuniquefileref.sas @li mf_getuniquefileref.sas
@li mf_existvarlist.sas @li mf_existvarlist.sas
@li mv_jobwaitfor.sas
@li mv_jobexecute.sas
**/ **/
@@ -13077,7 +13091,7 @@ libname &libref;
%let oauth_bearer=oauth_bearer=sas_services; %let oauth_bearer=oauth_bearer=sas_services;
%let &access_token_var=; %let &access_token_var=;
%end; %end;
%put &sysmacroname: grant_type=&grant_type;
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password %mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
and &grant_type ne sas_services and &grant_type ne sas_services
) )
@@ -13109,6 +13123,16 @@ select count(*) into: missings
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM) ,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
) )
%if %mf_nobs(&inds)=0 %then %do;
%put No observations in &inds! Leaving macro &sysmacroname;
%return;
%end;
/* ensure output table is available */
data &outds;run;
proc sql;
drop table &outds;
options noquotelenmax; options noquotelenmax;
%local base_uri; /* location of rest apis */ %local base_uri; /* location of rest apis */
%let base_uri=%mf_getplatform(VIYARESTAPI); %let base_uri=%mf_getplatform(VIYARESTAPI);
@@ -13163,13 +13187,15 @@ data;run;%let jdswaitfor=&syslast;
data _null_; data _null_;
infile &jfref lrecl=32767; infile &jfref lrecl=32767;
input; input;
call symputx("jparams&jid",substr(_infile_,3,length(_infile_)-4)); jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run; run;
%local joburi&jid; %local joburi&jid;
%let joburi&jid=0; /* used in next loop */ %let joburi&jid=0; /* used in next loop */
%end; %end;
%local concurrency; %local concurrency completed;
%let concurrency=0; %let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning; proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt; %do jid=1 %to &jcnt;
/** /**
@@ -13187,16 +13213,48 @@ data;run;%let jdswaitfor=&syslast;
,paramstring=%superq(jparams&jid) ,paramstring=%superq(jparams&jid)
,outds=&jdsapp ,outds=&jdsapp
) )
proc append base=&jdsrunning data &jdsapp;
data=&jdsapp(where=(method='GET' and rel='state')); format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
call symputx("joburi&jid",uri,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run; run;
%let concurrency=%eval(&concurrency+1); %let concurrency=%eval(&concurrency+1);
%let joburi&jid=1; %end;
%else %if %sysfunc(exist(&outds))=1 %then %do;
/* check to see if the job has finished as was previously executed */
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uri="&&joburi&jid";
%if &jobcheck>0 %then %do;
%let job&jid=0;
%put &&job&jid in flow &fid completed!;
%end;
%end; %end;
%end; %end;
%if &jid=&jcnt %then %do; %if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */ /* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor) %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
run;
proc append base=&outds data=&jdsapp;
run;
%end;
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed jobs completed, &concurrency jobs running;
%end;
%end; %end;
%end; %end;
/* back up and execute the next flow */ /* back up and execute the next flow */
@@ -13338,9 +13396,11 @@ options noquotelenmax;
%let base_uri=%mf_getplatform(VIYARESTAPI); %let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_; data _null_;
length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
run; run;
@@ -13353,7 +13413,7 @@ run;
%let fname0=%mf_getuniquefileref(); %let fname0=%mf_getuniquefileref();
data &outds; data &outds;
format _program uri $128. state $32. timestamp datetime19.; format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop; stop;
run; run;
@@ -13387,7 +13447,8 @@ run;
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&&joburi&i",
state="&status", state="&status",
timestamp=datetime(); timestamp=datetime(),
jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;

View File

@@ -132,9 +132,11 @@ options noquotelenmax;
%let base_uri=%mf_getplatform(VIYARESTAPI); %let base_uri=%mf_getplatform(VIYARESTAPI);
data _null_; data _null_;
length jobparams $32767;
set &inds end=last; set &inds end=last;
call symputx(cats('joburi',_n_),uri,'l'); call symputx(cats('joburi',_n_),uri,'l');
call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobname',_n_),_program,'l');
call symputx(cats('jobparams',_n_),jobparams,'l');
if last then call symputx('uricnt',_n_,'l'); if last then call symputx('uricnt',_n_,'l');
run; run;
@@ -147,7 +149,7 @@ run;
%let fname0=%mf_getuniquefileref(); %let fname0=%mf_getuniquefileref();
data &outds; data &outds;
format _program uri $128. state $32. timestamp datetime19.; format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
stop; stop;
run; run;
@@ -181,7 +183,8 @@ run;
_program="&&jobname&i", _program="&&jobname&i",
uri="&&joburi&i", uri="&&joburi&i",
state="&status", state="&status",
timestamp=datetime(); timestamp=datetime(),
jobparams=symget("jobparams&i");
%let joburi&i=0; /* do not re-check */ %let joburi&i=0; /* do not re-check */
%end; %end;
%else %if &status=idle or &status=pending or &status=running %then %do; %else %if &status=idle or &status=pending or &status=running %then %do;