1
0
mirror of https://github.com/sasjs/core.git synced 2025-12-28 13:30:04 +00:00

Compare commits

...

8 Commits

Author SHA1 Message Date
Allan Bowe
779e4942c7 Merge pull request #26 from tmoody/fix/clean_exit_mv_jobflow_on_syscc
fix: early exit, with syscc, when submitted jobs fail within a flow
2021-05-12 16:31:06 +03:00
Trevor Moody
a69a1ac7f0 fix: removed invisible hexchars on blank lines 2021-05-12 14:18:44 +01:00
Trevor Moody
2a644d6c2b fix: corrected asser description 2021-05-12 14:01:49 +01:00
Trevor Moody
843930c666 chore: added tests for mv_jobflow 2021-05-12 13:59:21 +01:00
Trevor Moody
90d69af7ee feat: early exit, with syscc, when submitted jobs fail within a flow 2021-05-12 12:06:02 +01:00
Allan Bowe
b7bafb49f4 Merge pull request #25 from sasjs/dcfixes
fix: more logging in mp_abort, fixing job test, better return values …
2021-05-11 23:37:25 +03:00
Allan Bowe
2fa9e48286 chore: automated commit 2021-05-11 23:36:40 +03:00
Allan Bowe
5cee93c7bd fix: more logging in mp_abort, fixing job test, better return values in mp_filtervalidate and mp_filtercheck, further fixes in mp_jsonout 2021-05-11 23:08:54 +03:00
8 changed files with 329 additions and 139 deletions

View File

@@ -74,7 +74,7 @@
input;
i=1;
stoploop=0;
if _n_ ge &logline-5 and stoploop=0 then do until (i>12);
if _n_ ge &logline-15 and stoploop=0 then do until (i>22);
call symputx('logmsg',catx('\n',symget('logmsg'),_infile_));
input;
i+1;

View File

@@ -33,12 +33,13 @@
@returns The &outds table containing any bad rows, plus a REASON_CD column.
@param [in] inds The table to be checked, with the format above
@param [in] targetds= The target dataset against which to verify VARIABLE_NM
@param [in] targetds= The target dataset against which to verify VARIABLE_NM.
This must be available (ie, the library must be assigned).
@param [out] abort= (YES) If YES will call mp_abort.sas on any exceptions
@param [out] outds= The output table, which is a copy of the &inds. table
plus a REASON_CD column, containing only bad records. If bad records found,
the SYSCC value will be set to 1008 (general data problem). Downstream
processes should check this table (and return code) before continuing.
plus a REASON_CD column, containing only bad records. If bad records found,
the SYSCC value will be set to 1008 (general data problem). Downstream
processes should check this table (and return code) before continuing.
<h4> SAS Macros </h4>
@li mp_abort.sas
@@ -84,41 +85,52 @@
* quotes, commas, periods and spaces.
* Only numeric values should remain
*/
%local reason_cd;
%local reason_cd nobs;
%let nobs=0;
data &outds;
/*length GROUP_LOGIC SUBGROUP_LOGIC $3 SUBGROUP_ID 8 VARIABLE_NM $32
OPERATOR_NM $10 RAW_VALUE $4000;*/
set &inds;
length reason_cd $32;
length reason_cd $4032;
/* closed list checks */
if GROUP_LOGIC not in ('AND','OR') then do;
REASON_CD='GROUP_LOGIC should be either AND or OR';
REASON_CD='GROUP_LOGIC should be AND/OR, not:'!!cats(GROUP_LOGIC);
putlog REASON_CD= GROUP_LOGIC=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
if SUBGROUP_LOGIC not in ('AND','OR') then do;
REASON_CD='SUBGROUP_LOGIC should be either AND or OR';
REASON_CD='SUBGROUP_LOGIC should be AND/OR, not:'!!cats(SUBGROUP_LOGIC);
putlog REASON_CD= SUBGROUP_LOGIC=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
if mod(SUBGROUP_ID,1) ne 0 then do;
REASON_CD='SUBGROUP_ID should be integer';
REASON_CD='SUBGROUP_ID should be integer, not '!!left(subgroup_id);
putlog REASON_CD= SUBGROUP_ID=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
if upcase(VARIABLE_NM) not in
(%upcase(%mf_getvarlist(&targetds,dlm=%str(,),quote=SINGLE)))
then do;
REASON_CD="VARIABLE_NM not in &targetds";
REASON_CD="Variable "!!cats(variable_nm)!!" not in &targetds";
putlog REASON_CD= VARIABLE_NM=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
if OPERATOR_NM not in
('=','>','<','<=','>=','BETWEEN','IN','NOT IN','NE','CONTAINS')
then do;
REASON_CD='Invalid OPERATOR_NM';
REASON_CD='Invalid OPERATOR_NM: '!!left(OPERATOR_NM);
putlog REASON_CD= OPERATOR_NM=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
@@ -128,8 +140,10 @@ data &outds;
if substr(raw_value,1,1) ne '('
or substr(cats(reverse(raw_value)),1,1) ne ')'
then do;
REASON_CD='Missing brackets in RAW_VALUE';
REASON_CD='Missing start/end bracket in RAW_VALUE';
putlog REASON_CD= OPERATOR_NM= raw_value= raw_value1= ;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
else raw_value1=substr(raw_value,2,max(length(raw_value)-2,0));
@@ -150,27 +164,24 @@ data &outds;
/* output records that contain values other than digits and spaces */
if notdigit(compress(raw_value3,' '))>0 then do;
putlog raw_value3= $hex32.;
REASON_CD='Invalid RAW_VALUE';
REASON_CD=cats('Invalid RAW_VALUE:',raw_value);
putlog REASON_CD= raw_value= raw_value1= raw_value2= raw_value3=;
call symputx('reason_cd',reason_cd,'l');
call symputx('nobs',_n_,'l');
output;
end;
run;
%local nobs;
%let nobs=0;
data _null_;
set &outds end=last;
putlog (_all_)(=);
if last then do;
call symputx('REASON_CD',reason_cd,'l');
call symputx('nobs',_n_,'l');
end;
run;
%mp_abort(iftrue=(&abort=YES and &nobs>0),
mac=&sysmacroname,
msg=%str(&nobs filter issues in &inds, reason: &reason_cd, details in &outds)
msg=%str(Data issue: %superq(reason_cd))
)
%if &nobs>0 %then %do;

View File

@@ -95,7 +95,8 @@ filename &fref1 clear;
run;
%mp_abort(
mac=&sysmacroname,
msg=%str(Filter issues in &inref: %quote(&reason_cd))
msg=%str(Filter validation issues. ERR=%superq(SYSERRORTEXT)
, WARN=%superq(SYSWARNINGTEXT) )
)
%end;
%let syscc=1008;

View File

@@ -61,7 +61,7 @@
)/*/STORE SOURCE*/;
%put output location=&jref;
%if &action=OPEN %then %do;
OPTIONS NOBOMFILE;
options nobomfile;
data _null_;file &jref encoding='utf-8';
put '{"START_DTTM" : "' "%sysfunc(datetime(),datetime20.3)" '"';
run;

View File

@@ -19,6 +19,7 @@
filename testref temp;
data _null_;
file testref;
put 'data;run;';
put 'endsas;';
run;
%mv_createjob(
@@ -31,7 +32,7 @@ run;
%mv_jobexecute(
path=&mcTestAppLoc/jobs/temp,
name=testjob,
outds=work.info,
outds=work.info
)
%* Wait for it to finish;
@@ -52,10 +53,15 @@ run;
data _null_;
infile mylog;
infile mylog end=eof;
input;
if index(_infile_,'endsas;') then call symputx('found',1);
else call symputx('found',0);
putlog _infile_;
retain found 0;
if index(_infile_,'endsas;') then do;
found=1;
call symputx('found',found);
end;
else if eof and found ne 1 then call symputx('found',0);
run;
%mp_assert(

View File

@@ -0,0 +1,78 @@
/**
@file
@brief Testing mv_jobflow macro
@details One of the remote jobs aborts with syscc>0 - test to
make sure this comes back to the calling session
<h4> SAS Macros </h4>
@li mp_assert.sas
@li mv_createjob.sas
@li mv_jobflow.sas
**/
/**
* Test Case 1
*/
filename testprog temp;
data _null_;
file testprog;
put '%put this is job: &_program;'
/ '%put this was run in flow &flow_id;'
/ 'data ;'
/ ' rval=rand("uniform");'
/ ' rand=rval*&macrovar1;'
/ ' do x=1 to rand;'
/ ' y=rand*&macrovar2;'
/ ' if (rval>0.50) then abort;'
/ ' else output;'
/ ' end;'
/ 'run;'
;
run;
%mv_createjob(path=/Public/temp,name=demo1,code=testprog)
%mv_createjob(path=/Public/temp,name=demo2,code=testprog)
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 2;
do i=1 to 4;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
* Trigger the flow ;
%put NOTE: &=syscc;
%mv_jobflow(inds=work.inputjobs
,maxconcurrency=2
,outds=work.results
,outref=myjoblog
,raise_err=1
,mdebug=1
)
%put NOTE: &=syscc;
data _null_;
infile myjoblog;
input; put _infile_;
run;
%mp_assert(
iftrue=(&syscc ne 0),
desc=Check that non zero return code is returned if called job fails
)

View File

@@ -0,0 +1,74 @@
/**
@file
@brief Testing mv_jobflow macro
@details All jobs complete successfully with syscc = 0 - test to
make sure this comes back to the calling session
<h4> SAS Macros </h4>
@li mp_assert.sas
@li mv_createjob.sas
@li mv_jobflow.sas
**/
/**
* Test Case 1
*/
filename testprog temp;
data _null_;
file testprog;
put '%put this is job: &_program;'
/ '%put this was run in flow &flow_id;'
/ 'data ;'
/ ' rval=rand("uniform");'
/ ' rand=rval*&macrovar1;'
/ ' do x=1 to rand;'
/ ' y=rand*&macrovar2;'
/ ' output;'
/ ' end;'
/ 'run;'
;
run;
%mv_createjob(path=/Public/temp,name=demo1,code=testprog)
%mv_createjob(path=/Public/temp,name=demo2,code=testprog)
data work.inputjobs;
_contextName='SAS Job Execution compute context';
do flow_id=1 to 2;
do i=1 to 4;
_program='/Public/temp/demo1';
macrovar1=10*i;
macrovar2=4*i;
output;
i+1;
_program='/Public/temp/demo2';
macrovar1=40*i;
macrovar2=44*i;
output;
end;
end;
run;
* Trigger the flow ;
%put NOTE: &=syscc;
%mv_jobflow(inds=work.inputjobs
,maxconcurrency=2
,outds=work.results
,outref=myjoblog
,raise_err=1
,mdebug=1
)
%put NOTE: &=syscc;
data _null_;
infile myjoblog;
input; put _infile_;
run;
%mp_assert(
iftrue=(&syscc eq 0),
desc=Check that a zero return code is returned if no called job fails
)

View File

@@ -97,8 +97,8 @@
run;
@param [in] access_token_var= The global macro variable to contain the access
token
@param [in] access_token_var= The global macro variable to contain the
access token
@param [in] grant_type= valid values:
@li password
@li authorization_code
@@ -237,123 +237,143 @@ data;run;%let jdswaitfor=&syslast;
/* start loop */
%do fid=1 %to &flowcnt;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local jobuid&jid;
%let jobuid&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
/* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%if not ( &raise_err and &syscc ) %then %do;
%put preparing job attributes for flow &&flow&fid;
%local jds jcnt;
data &jds(drop=_contextName _program);
set &inds(where=(flow_id=&&flow&fid));
if _contextName='' then _contextName="SAS Job Execution compute context";
call symputx(cats('job',_n_),_program,'l');
call symputx(cats('context',_n_),_contextName,'l');
call symputx('jcnt',_n_,'l');
run;
%put exporting job variables in json format;
%do jid=1 %to &jcnt;
data &jjson;
set &jds;
if _n_=&jid then do;
output;
stop;
end;
run;
proc json out=&jfref;
export &jjson / nosastags fmtnumeric;
run;
data _null_;
infile &jfref lrecl=32767;
input;
jparams='jparams'!!left(symget('jid'));
call symputx(jparams,substr(_infile_,3,length(_infile_)-4));
run;
%local jobuid&jid;
%let jobuid&jid=0; /* used in next loop */
%end;
%local concurrency completed;
%let concurrency=0;
%let completed=0;
proc sql; drop table &jdsrunning;
%do jid=1 %to &jcnt;
/**
* now we can execute the jobs up to the maxconcurrency setting
*/
%if "&&job&jid" ne "0" %then %do; /* this var is zero if job finished */
/* check to see if the job finished in the previous round */
%if %sysfunc(exist(&outds))=1 %then %do;
%local jobcheck; %let jobcheck=0;
proc sql noprint;
select count(*) into: jobcheck
from &outds where uuid="&&jobuid&jid";
%if &jobcheck>0 %then %do;
%put &&job&jid in flow &fid with uid &&jobuid&jid completed!;
%let job&jid=0;
%end;
%end;
/* check if job was triggered and, if
so, if we have enough slots to run? */
%if ("&&jobuid&jid"="0") and (&concurrency<&maxconcurrency) %then %do;
/* But only start if no issues detected so far */
%if not ( &raise_err and &syscc ) %then %do;
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=
%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
/* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
/* sleep one second after every request to smooth the impact */
data _null_;
call sleep(1,1);
run;
%end;
%else %do; /* Job was skipped due to problems */
%put jobid &&job&jid in flow &fid skipped due to SYSCC (&syscc);
%let completed = %eval(&completed+1);
%let job&jid=0; /* Indicate job has finished */
%end;
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - check which jobs have finished */
%mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref
,raise_err=&raise_err,mdebug=&mdebug)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uuid in (select uuid from &outds
where state in ('canceled','completed','failed')
);
/* check if job was triggered and if so, if we have enough slots to run */
%if "&&jobuid&jid"="0" and &concurrency<&maxconcurrency %then %do;
%local jobname jobpath;
%let jobname=%scan(&&job&jid,-1,/);
%let jobpath=
%substr(&&job&jid,1,%length(&&job&jid)-%length(&jobname)-1);
%put executing &jobpath/&jobname with paramstring &&jparams&jid;
%mv_jobexecute(path=&jobpath
,name=&jobname
,paramstring=%superq(jparams&jid)
,outds=&jdsapp
,mdebug=&mdebug
)
data &jdsapp;
format jobparams $32767.;
set &jdsapp(where=(method='GET' and rel='state'));
jobparams=symget("jparams&jid");
/* uri here has the /state suffix */
uuid=scan(uri,-2,'/');
call symputx("jobuid&jid",uuid,'l');
run;
proc append base=&jdsrunning data=&jdsapp;
run;
%let concurrency=%eval(&concurrency+1);
/* sleep one second after every request to smooth the impact */
data _null_;
call sleep(1,1);
run;
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again;
%put &completed of &jcnt jobs completed, &concurrency jobs running;
%end;
%end;
%end;
%if &jid=&jcnt %then %do;
/* we are at the end of the loop - time to see which jobs have finished */
%mv_jobwaitfor(ANY
,inds=&jdsrunning
,outds=&jdswaitfor
,outref=&outref
,raise_err=&raise_err
,mdebug=&mdebug
)
%local done;
%let done=%mf_nobs(&jdswaitfor);
%if &done>0 %then %do;
%let completed=%eval(&completed+&done);
%let concurrency=%eval(&concurrency-&done);
data &jdsapp;
set &jdswaitfor;
flow_id=&&flow&fid;
uuid=scan(uri,-1,'/');
run;
proc append base=&outds data=&jdsapp;
run;
%end;
proc sql;
delete from &jdsrunning
where uuid in (select uuid from &outds
where state in ('canceled','completed','failed')
);
/* loop again if jobs are left */
%if &completed < &jcnt %then %do;
%let jid=0;
%put looping flow &fid again - &completed of &jcnt jobs completed,
&concurrency jobs running;
%end;
%end;
%end;
%else %do;
%put Flow &&flow&fid skipped due to SYSCC (&syscc);
%end;
/* back up and execute the next flow */
%end;