mirror of
https://github.com/sasjs/core.git
synced 2026-01-03 23:50:06 +00:00
Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 03d9d805ff |
89
all.sas
89
all.sas
@@ -12984,6 +12984,8 @@ libname &libref;
|
|||||||
The input table is formed as per below. Each observation represents one job.
|
The input table is formed as per below. Each observation represents one job.
|
||||||
Each variable is converted into a macro variable with the same name.
|
Each variable is converted into a macro variable with the same name.
|
||||||
|
|
||||||
|
## Input table (minimum variables needed)
|
||||||
|
|
||||||
| variable| description |
|
| variable| description |
|
||||||
|---|---|---|
|
|---|---|---|
|
||||||
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
@@ -12991,6 +12993,14 @@ libname &libref;
|
|||||||
blank, will default to `SAS Job Execution compute context`.|
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|_PROGRAM|Provides the path to the job itself|
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
|
## Output table (minimum variables produced)
|
||||||
|
|
||||||
|
| variable| description |
|
||||||
|
|---|---|---|
|
||||||
|
|FLOW_ID| Numeric value, provides sequential ordering capability|
|
||||||
|
|_CONTEXTNAME|Dictates which context should be used to run the job. If
|
||||||
|
blank, will default to `SAS Job Execution compute context`.|
|
||||||
|
|_PROGRAM|Provides the path to the job itself|
|
||||||
|
|
||||||
## Example
|
## Example
|
||||||
|
|
||||||
@@ -13010,6 +13020,7 @@ libname &libref;
|
|||||||
rand=ranuni(0)*¯ovar1;
|
rand=ranuni(0)*¯ovar1;
|
||||||
do x=1 to rand;
|
do x=1 to rand;
|
||||||
y=rand*¯ovar2;
|
y=rand*¯ovar2;
|
||||||
|
if y=100 then abort;
|
||||||
output;
|
output;
|
||||||
end;
|
end;
|
||||||
run;
|
run;
|
||||||
@@ -13022,17 +13033,17 @@ libname &libref;
|
|||||||
data work.inputjobs;
|
data work.inputjobs;
|
||||||
_contextName='SAS Job Execution compute context';
|
_contextName='SAS Job Execution compute context';
|
||||||
do flow_id=1 to 3;
|
do flow_id=1 to 3;
|
||||||
do job=1 to 10;
|
do i=1 to 20;
|
||||||
_program='/Public/temp/demo1';
|
_program='/Public/temp/demo1';
|
||||||
macrovar1=10*job;
|
macrovar1=10*i;
|
||||||
macrovar2=4*job;
|
macrovar2=4*i;
|
||||||
output;
|
output;
|
||||||
|
i+1;
|
||||||
_program='/Public/temp/demo2';
|
_program='/Public/temp/demo2';
|
||||||
macrovar1=40*job;
|
macrovar1=40*i;
|
||||||
macrovar2=44*job;
|
macrovar2=44*i;
|
||||||
output;
|
output;
|
||||||
end;
|
end;
|
||||||
drop job;
|
|
||||||
end;
|
end;
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -13056,10 +13067,13 @@ libname &libref;
|
|||||||
@author Allan Bowe, source: https://github.com/sasjs/core
|
@author Allan Bowe, source: https://github.com/sasjs/core
|
||||||
|
|
||||||
<h4> SAS Macros </h4>
|
<h4> SAS Macros </h4>
|
||||||
|
@li mf_nobs.sas
|
||||||
@li mp_abort.sas
|
@li mp_abort.sas
|
||||||
@li mf_getplatform.sas
|
@li mf_getplatform.sas
|
||||||
@li mf_getuniquefileref.sas
|
@li mf_getuniquefileref.sas
|
||||||
@li mf_existvarlist.sas
|
@li mf_existvarlist.sas
|
||||||
|
@li mv_jobwaitfor.sas
|
||||||
|
@li mv_jobexecute.sas
|
||||||
|
|
||||||
**/
|
**/
|
||||||
|
|
||||||
@@ -13077,7 +13091,7 @@ libname &libref;
|
|||||||
%let oauth_bearer=oauth_bearer=sas_services;
|
%let oauth_bearer=oauth_bearer=sas_services;
|
||||||
%let &access_token_var=;
|
%let &access_token_var=;
|
||||||
%end;
|
%end;
|
||||||
%put &sysmacroname: grant_type=&grant_type;
|
|
||||||
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
%mp_abort(iftrue=(&grant_type ne authorization_code and &grant_type ne password
|
||||||
and &grant_type ne sas_services
|
and &grant_type ne sas_services
|
||||||
)
|
)
|
||||||
@@ -13109,6 +13123,16 @@ select count(*) into: missings
|
|||||||
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
,msg=%str(input dataset contains &missings missing values for FLOW_ID or _PROGRAM)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
%if %mf_nobs(&inds)=0 %then %do;
|
||||||
|
%put No observations in &inds! Leaving macro &sysmacroname;
|
||||||
|
%return;
|
||||||
|
%end;
|
||||||
|
|
||||||
|
/* ensure output table is available */
|
||||||
|
data &outds;run;
|
||||||
|
proc sql;
|
||||||
|
drop table &outds;
|
||||||
|
|
||||||
options noquotelenmax;
|
options noquotelenmax;
|
||||||
%local base_uri; /* location of rest apis */
|
%local base_uri; /* location of rest apis */
|
||||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
@@ -13163,13 +13187,15 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
data _null_;
|
data _null_;
|
||||||
infile &jfref lrecl=32767;
|
infile &jfref lrecl=32767;
|
||||||
input;
|
input;
|
||||||
call symputx("jparams&jid",substr(_infile_,3,length(_infile_)-4));
|
jparams='jparams'!!left(symget('jid'));
|
||||||
|
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
|
||||||
run;
|
run;
|
||||||
%local joburi&jid;
|
%local joburi&jid;
|
||||||
%let joburi&jid=0; /* used in next loop */
|
%let joburi&jid=0; /* used in next loop */
|
||||||
%end;
|
%end;
|
||||||
%local concurrency;
|
%local concurrency completed;
|
||||||
%let concurrency=0;
|
%let concurrency=0;
|
||||||
|
%let completed=0;
|
||||||
proc sql; drop table &jdsrunning;
|
proc sql; drop table &jdsrunning;
|
||||||
%do jid=1 %to &jcnt;
|
%do jid=1 %to &jcnt;
|
||||||
/**
|
/**
|
||||||
@@ -13187,16 +13213,48 @@ data;run;%let jdswaitfor=&syslast;
|
|||||||
,paramstring=%superq(jparams&jid)
|
,paramstring=%superq(jparams&jid)
|
||||||
,outds=&jdsapp
|
,outds=&jdsapp
|
||||||
)
|
)
|
||||||
proc append base=&jdsrunning
|
data &jdsapp;
|
||||||
data=&jdsapp(where=(method='GET' and rel='state'));
|
format jobparams $32767.;
|
||||||
|
set &jdsapp(where=(method='GET' and rel='state'));
|
||||||
|
jobparams=symget("jparams&jid");
|
||||||
|
call symputx("joburi&jid",uri,'l');
|
||||||
|
run;
|
||||||
|
proc append base=&jdsrunning data=&jdsapp;
|
||||||
run;
|
run;
|
||||||
%let concurrency=%eval(&concurrency+1);
|
%let concurrency=%eval(&concurrency+1);
|
||||||
%let joburi&jid=1;
|
%end;
|
||||||
|
%else %if %sysfunc(exist(&outds))=1 %then %do;
|
||||||
|
/* check to see if the job has finished as was previously executed */
|
||||||
|
%local jobcheck; %let jobcheck=0;
|
||||||
|
proc sql noprint;
|
||||||
|
select count(*) into: jobcheck
|
||||||
|
from &outds where uri="&&joburi&jid";
|
||||||
|
%if &jobcheck>0 %then %do;
|
||||||
|
%let job&jid=0;
|
||||||
|
%put &&job&jid in flow &fid completed!;
|
||||||
|
%end;
|
||||||
%end;
|
%end;
|
||||||
%end;
|
%end;
|
||||||
%if &jid=&jcnt %then %do;
|
%if &jid=&jcnt %then %do;
|
||||||
/* we are at the end of the loop - time to see which jobs have finished */
|
/* we are at the end of the loop - time to see which jobs have finished */
|
||||||
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor)
|
||||||
|
%local done;
|
||||||
|
%let done=%mf_nobs(&jdswaitfor);
|
||||||
|
%if &done>0 %then %do;
|
||||||
|
%let completed=%eval(&completed+&done);
|
||||||
|
%let concurrency=%eval(&concurrency-&done);
|
||||||
|
data &jdsapp;
|
||||||
|
set &jdswaitfor;
|
||||||
|
flow_id=&&flow&fid;
|
||||||
|
run;
|
||||||
|
proc append base=&outds data=&jdsapp;
|
||||||
|
run;
|
||||||
|
%end;
|
||||||
|
/* loop again if jobs are left */
|
||||||
|
%if &completed < &jcnt %then %do;
|
||||||
|
%let jid=0;
|
||||||
|
%put looping flow &fid again - &completed jobs completed, &concurrency jobs running;
|
||||||
|
%end;
|
||||||
%end;
|
%end;
|
||||||
%end;
|
%end;
|
||||||
/* back up and execute the next flow */
|
/* back up and execute the next flow */
|
||||||
@@ -13338,9 +13396,11 @@ options noquotelenmax;
|
|||||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
data _null_;
|
data _null_;
|
||||||
|
length jobparams $32767;
|
||||||
set &inds end=last;
|
set &inds end=last;
|
||||||
call symputx(cats('joburi',_n_),uri,'l');
|
call symputx(cats('joburi',_n_),uri,'l');
|
||||||
call symputx(cats('jobname',_n_),_program,'l');
|
call symputx(cats('jobname',_n_),_program,'l');
|
||||||
|
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||||
if last then call symputx('uricnt',_n_,'l');
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -13353,7 +13413,7 @@ run;
|
|||||||
%let fname0=%mf_getuniquefileref();
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
data &outds;
|
data &outds;
|
||||||
format _program uri $128. state $32. timestamp datetime19.;
|
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||||
stop;
|
stop;
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -13387,7 +13447,8 @@ run;
|
|||||||
_program="&&jobname&i",
|
_program="&&jobname&i",
|
||||||
uri="&&joburi&i",
|
uri="&&joburi&i",
|
||||||
state="&status",
|
state="&status",
|
||||||
timestamp=datetime();
|
timestamp=datetime(),
|
||||||
|
jobparams=symget("jobparams&i");
|
||||||
%let joburi&i=0; /* do not re-check */
|
%let joburi&i=0; /* do not re-check */
|
||||||
%end;
|
%end;
|
||||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
|
|||||||
@@ -132,9 +132,11 @@ options noquotelenmax;
|
|||||||
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
%let base_uri=%mf_getplatform(VIYARESTAPI);
|
||||||
|
|
||||||
data _null_;
|
data _null_;
|
||||||
|
length jobparams $32767;
|
||||||
set &inds end=last;
|
set &inds end=last;
|
||||||
call symputx(cats('joburi',_n_),uri,'l');
|
call symputx(cats('joburi',_n_),uri,'l');
|
||||||
call symputx(cats('jobname',_n_),_program,'l');
|
call symputx(cats('jobname',_n_),_program,'l');
|
||||||
|
call symputx(cats('jobparams',_n_),jobparams,'l');
|
||||||
if last then call symputx('uricnt',_n_,'l');
|
if last then call symputx('uricnt',_n_,'l');
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -147,7 +149,7 @@ run;
|
|||||||
%let fname0=%mf_getuniquefileref();
|
%let fname0=%mf_getuniquefileref();
|
||||||
|
|
||||||
data &outds;
|
data &outds;
|
||||||
format _program uri $128. state $32. timestamp datetime19.;
|
format _program uri $128. state $32. timestamp datetime19. jobparams $32767.;
|
||||||
stop;
|
stop;
|
||||||
run;
|
run;
|
||||||
|
|
||||||
@@ -181,7 +183,8 @@ run;
|
|||||||
_program="&&jobname&i",
|
_program="&&jobname&i",
|
||||||
uri="&&joburi&i",
|
uri="&&joburi&i",
|
||||||
state="&status",
|
state="&status",
|
||||||
timestamp=datetime();
|
timestamp=datetime(),
|
||||||
|
jobparams=symget("jobparams&i");
|
||||||
%let joburi&i=0; /* do not re-check */
|
%let joburi&i=0; /* do not re-check */
|
||||||
%end;
|
%end;
|
||||||
%else %if &status=idle or &status=pending or &status=running %then %do;
|
%else %if &status=idle or &status=pending or &status=running %then %do;
|
||||||
|
|||||||
Reference in New Issue
Block a user