From eeb25fa5bc05d8cf65dcba2f7a99e305261ba3c5 Mon Sep 17 00:00:00 2001 From: Trevor Moody Date: Wed, 17 Mar 2021 16:27:10 +0000 Subject: [PATCH 1/2] fix: Closes issue #8. Optionally raise SYSCC and exit early on job not completing successfully --- viya/mv_jobflow.sas | 6 +++++- viya/mv_jobwaitfor.sas | 35 +++++++++++++++++++++++++++++------ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/viya/mv_jobflow.sas b/viya/mv_jobflow.sas index aa4f1af..9ae375a 100644 --- a/viya/mv_jobflow.sas +++ b/viya/mv_jobflow.sas @@ -106,6 +106,8 @@ @li sas_services - will use oauth_bearer=sas_services @param [in] inds= The input dataset containing a list of jobs and parameters @param [in] maxconcurrency= The max number of parallel jobs to run. Default=8. + @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete + succcessfully @param [in] mdebug= set to 1 to enable DEBUG messages @param [out] outds= The output dataset containing the results @param [out] outref= The output fileref to which to append the log file(s). @@ -129,6 +131,7 @@ ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,outref=0 + ,raise_error=0 ,mdebug=0 ); %local oauth_bearer; @@ -307,7 +310,8 @@ data;run;%let jdswaitfor=&syslast; %end; %if &jid=&jcnt %then %do; /* we are at the end of the loop - time to see which jobs have finished */ - %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref) + %mv_jobwaitfor(ANY,inds=&jdsrunning,outds=&jdswaitfor,outref=&outref + ,raise_err=&raise_err) %local done; %let done=%mf_nobs(&jdswaitfor); %if &done>0 %then %do; diff --git a/viya/mv_jobwaitfor.sas b/viya/mv_jobwaitfor.sas index 5875368..9820903 100644 --- a/viya/mv_jobwaitfor.sas +++ b/viya/mv_jobwaitfor.sas @@ -70,6 +70,8 @@ following format: `/jobExecution/jobs/&JOBID./state` and the corresponding job name. The uri should be in a `uri` variable, and the job path/name should be in a `_program` variable. + @param [in] raise_err=0 Set to 1 to raise SYSCC when a job does not complete + succcessfully @param [out] outds= The output dataset containing the list of states by job (default=work.mv_jobexecute) @param [out] outref= A fileref to which the spawned job logs should be appended. @@ -81,6 +83,7 @@ @li mp_abort.sas @li mf_getplatform.sas @li mf_getuniquefileref.sas + @li mf_getuniquelibref.sas @li mf_existvar.sas @li mf_nobs.sas @li mv_getjoblog.sas @@ -93,6 +96,7 @@ ,inds=0 ,outds=work.mv_jobwaitfor ,outref=0 + ,raise_err=0 ); %local oauth_bearer; %if &grant_type=detect %then %do; @@ -136,7 +140,7 @@ options noquotelenmax; data _null_; length jobparams $32767; set &inds end=last; - call symputx(cats('joburi',_n_),substr(uri,1,55)!!'/state','l'); + call symputx(cats('joburi',_n_),substr(uri,1,55),'l'); call symputx(cats('jobname',_n_),_program,'l'); call symputx(cats('jobparams',_n_),jobparams,'l'); if last then call symputx('uricnt',_n_,'l'); @@ -151,7 +155,7 @@ run; %let fname0=%mf_getuniquefileref(); data &outds; - format _program uri $128. state $32. timestamp datetime19. jobparams $32767.; + format _program uri $128. state $32. stateDetails $32. timestamp datetime19. jobparams $32767.; stop; run; @@ -159,7 +163,7 @@ run; %do i=1 %to &uricnt; %if "&&joburi&i" ne "0" %then %do; proc http method='GET' out=&fname0 &oauth_bearer url="&base_uri/&&joburi&i"; - headers "Accept"="text/plain" + headers "Accept"="application/json" %if &grant_type=authorization_code %then %do; "Authorization"="Bearer &&&access_token_var" %end; ; @@ -173,12 +177,20 @@ run; %end; %let status=notset; + + %local libref1; + %let libref1=%mf_getuniquelibref(); + libname &libref1 json fileref=&fname0; + data _null_; - infile &fname0; - input; - call symputx('status',_infile_,'l'); + length state stateDetails $32; + set &libref1..root; + call symputx('status',state,'l'); + call symputx('stateDetails',stateDetails,'l'); run; + libname &libref1 clear; + %if &status=completed or &status=failed or &status=canceled %then %do; %local plainuri; %let plainuri=%substr(&&joburi&i,1,55); @@ -187,6 +199,7 @@ run; _program="&&jobname&i", uri="&plainuri", state="&status", + stateDetails=symget("stateDetails"), timestamp=datetime(), jobparams=symget("jobparams&i"); %let joburi&i=0; /* do not re-check */ @@ -205,6 +218,16 @@ run; ,msg=%str(status &status not expected!!) ) %end; + + %if (&raise_err) %then %do; + %if (&status = canceled or &status = failed or %length(&stateDetails)>0) %then %do; + %if ("&stateDetails" = "%str(war)ning") %then %let SYSCC=4; + %else %let SYSCC=5; + %put %str(ERR)OR: Job &&jobname&i. did not complete successfully. &stateDetails; + %return; + %end; + %end; + %end; %if &i=&uricnt %then %do; %local goback; From 6fc8408988da4d1068bea0cf5cefd3c9b608c4d3 Mon Sep 17 00:00:00 2001 From: Trevor Moody Date: Wed, 17 Mar 2021 16:31:47 +0000 Subject: [PATCH 2/2] fix: Syntax correction for raise_err parameter --- viya/mv_jobflow.sas | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/viya/mv_jobflow.sas b/viya/mv_jobflow.sas index 9ae375a..6b4142a 100644 --- a/viya/mv_jobflow.sas +++ b/viya/mv_jobflow.sas @@ -131,7 +131,7 @@ ,access_token_var=ACCESS_TOKEN ,grant_type=sas_services ,outref=0 - ,raise_error=0 + ,raise_err=0 ,mdebug=0 ); %local oauth_bearer;