mirror of
https://github.com/sasjs/adapter.git
synced 2026-04-21 21:21:31 +00:00
feat(compute-api): implement job execution via compute API
This commit is contained in:
+222
-37
@@ -7,6 +7,8 @@ import {
|
||||
import * as NodeFormData from "form-data";
|
||||
import * as path from "path";
|
||||
import { Job, Session, Context, Folder, CsrfToken } from "./types";
|
||||
import { JobDefinition } from "./types/JobDefinition";
|
||||
import { formatDataForRequest } from "./utils/formatDataForRequest";
|
||||
|
||||
/**
|
||||
* A client for interfacing with the SAS Viya REST API
|
||||
@@ -68,7 +70,7 @@ export class SASViyaApiClient {
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
const contexts = await this.request<{ items: Context[] }>(
|
||||
const { result: contexts } = await this.request<{ items: Context[] }>(
|
||||
`${this.serverUrl}/compute/contexts`,
|
||||
{ headers }
|
||||
);
|
||||
@@ -93,7 +95,7 @@ export class SASViyaApiClient {
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
const contexts = await this.request<{ items: Context[] }>(
|
||||
const { result: contexts } = await this.request<{ items: Context[] }>(
|
||||
`${this.serverUrl}/compute/contexts`,
|
||||
{ headers }
|
||||
);
|
||||
@@ -153,7 +155,7 @@ export class SASViyaApiClient {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
|
||||
const contexts = await this.request<{ items: Context[] }>(
|
||||
const { result: contexts } = await this.request<{ items: Context[] }>(
|
||||
`${this.serverUrl}/compute/contexts`,
|
||||
{ headers }
|
||||
);
|
||||
@@ -172,7 +174,7 @@ export class SASViyaApiClient {
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
};
|
||||
const createdSession = this.request<Session>(
|
||||
const { result: createdSession } = await this.request<Session>(
|
||||
`${this.serverUrl}/compute/contexts/${executionContext.id}/sessions`,
|
||||
createSessionRequest
|
||||
);
|
||||
@@ -190,12 +192,14 @@ export class SASViyaApiClient {
|
||||
* @param silent - optional flag to turn of logging.
|
||||
*/
|
||||
public async executeScript(
|
||||
fileName: string,
|
||||
jobName: string,
|
||||
linesOfCode: string[],
|
||||
contextName: string,
|
||||
accessToken?: string,
|
||||
sessionId = "",
|
||||
silent = false
|
||||
silent = false,
|
||||
data = null,
|
||||
debug = false
|
||||
) {
|
||||
const headers: any = {
|
||||
"Content-Type": "application/json",
|
||||
@@ -206,7 +210,7 @@ export class SASViyaApiClient {
|
||||
if (this.csrfToken) {
|
||||
headers[this.csrfToken.headerName] = this.csrfToken.value;
|
||||
}
|
||||
const contexts = await this.request<{ items: Context[] }>(
|
||||
const { result: contexts } = await this.request<{ items: Context[] }>(
|
||||
`${this.serverUrl}/compute/contexts`,
|
||||
{ headers }
|
||||
);
|
||||
@@ -225,13 +229,52 @@ export class SASViyaApiClient {
|
||||
method: "POST",
|
||||
headers,
|
||||
};
|
||||
const createdSession = await this.request<Session>(
|
||||
const { result: createdSession, etag } = await this.request<Session>(
|
||||
`${this.serverUrl}/compute/contexts/${executionContext.id}/sessions`,
|
||||
createSessionRequest
|
||||
);
|
||||
|
||||
await this.waitForSession(createdSession, etag);
|
||||
|
||||
executionSessionId = createdSession.id;
|
||||
}
|
||||
|
||||
let jobArguments: { [key: string]: any } = {
|
||||
_contextName: contextName,
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: true,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: true,
|
||||
};
|
||||
|
||||
if (debug) {
|
||||
jobArguments["_OMITTEXTLOG"] = false;
|
||||
jobArguments["_OMITSESSIONRESULTS"] = false;
|
||||
jobArguments["_DEBUG"] = 131;
|
||||
}
|
||||
|
||||
const fileName = `exec-${
|
||||
jobName.includes("/") ? jobName.split("/")[1] : jobName
|
||||
}`;
|
||||
|
||||
let jobVariables: any = { SYS_JES_JOB_URI: "", _program: jobName };
|
||||
let files: any[] = [];
|
||||
if (data) {
|
||||
if (JSON.stringify(data).includes(";")) {
|
||||
files = await this.uploadTables(data, accessToken);
|
||||
jobVariables["_webin_file_count"] = files.length;
|
||||
files.forEach((fileInfo, index) => {
|
||||
jobVariables[
|
||||
`_webin_fileuri${index + 1}`
|
||||
] = `/files/files/${fileInfo.file.id}`;
|
||||
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName;
|
||||
});
|
||||
} else {
|
||||
jobVariables = { ...jobVariables, ...formatDataForRequest(data) };
|
||||
}
|
||||
}
|
||||
|
||||
// Execute job in session
|
||||
const postJobRequest = {
|
||||
method: "POST",
|
||||
@@ -240,9 +283,11 @@ export class SASViyaApiClient {
|
||||
name: fileName,
|
||||
description: "Powered by SASjs",
|
||||
code: linesOfCode,
|
||||
variables: jobVariables,
|
||||
arguments: jobArguments,
|
||||
}),
|
||||
};
|
||||
const postedJob = await this.request<Job>(
|
||||
const { result: postedJob, etag } = await this.request<Job>(
|
||||
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs`,
|
||||
postJobRequest
|
||||
);
|
||||
@@ -255,18 +300,42 @@ export class SASViyaApiClient {
|
||||
);
|
||||
}
|
||||
|
||||
const jobStatus = await this.pollJobState(postedJob, accessToken, silent);
|
||||
const logLink = postedJob.links.find((l: any) => l.rel === "log");
|
||||
if (logLink) {
|
||||
const log = await this.request(
|
||||
`${this.serverUrl}${logLink.href}?limit=100000`,
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
accessToken,
|
||||
silent
|
||||
);
|
||||
const { result: currentJob } = await this.request<Job>(
|
||||
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
|
||||
{ headers }
|
||||
);
|
||||
|
||||
let jobResult, log;
|
||||
if (jobStatus === "failed" || jobStatus === "error") {
|
||||
return Promise.reject(currentJob.error);
|
||||
}
|
||||
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`;
|
||||
const logLink = currentJob.links.find((l) => l.rel === "log");
|
||||
if (resultLink) {
|
||||
jobResult = await this.request<any>(
|
||||
`${this.serverUrl}${resultLink}`,
|
||||
{ headers },
|
||||
"text"
|
||||
);
|
||||
}
|
||||
|
||||
if (true && logLink) {
|
||||
log = await this.request<any>(
|
||||
`${this.serverUrl}${logLink.href}/content`,
|
||||
{
|
||||
headers,
|
||||
}
|
||||
).then((res: any) =>
|
||||
res.result.items.map((i: any) => i.line).join("\n")
|
||||
);
|
||||
|
||||
return { jobStatus, log };
|
||||
}
|
||||
return { result: jobResult?.result, log };
|
||||
} else {
|
||||
console.error(
|
||||
`Unable to find execution context ${contextName}.\nPlease check the contextName in the tgtDeployVars and try again.`
|
||||
@@ -334,7 +403,7 @@ export class SASViyaApiClient {
|
||||
createFolderRequest.headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
|
||||
const createFolderResponse = await this.request<Folder>(
|
||||
const { result: createFolderResponse } = await this.request<Folder>(
|
||||
`${this.serverUrl}/folders/folders?parentFolderUri=${parentFolderUri}`,
|
||||
createFolderRequest
|
||||
);
|
||||
@@ -556,6 +625,71 @@ export class SASViyaApiClient {
|
||||
return deleteResponse;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a job via the SAS Viya Compute API
|
||||
* @param sasJob - the relative path to the job.
|
||||
* @param contextName - the name of the context where the job is to be executed.
|
||||
* @param debug - sets the _debug flag in the job arguments.
|
||||
* @param data - any data to be passed in as input to the job.
|
||||
* @param accessToken - an optional access token for an authorized user.
|
||||
*/
|
||||
public async executeComputeJob(
|
||||
sasJob: string,
|
||||
contextName: string,
|
||||
debug: boolean,
|
||||
data?: any,
|
||||
accessToken?: string
|
||||
) {
|
||||
if (!this.rootFolder) {
|
||||
await this.populateRootFolder(accessToken);
|
||||
}
|
||||
|
||||
if (!this.rootFolder) {
|
||||
throw new Error("Root folder was not found");
|
||||
}
|
||||
if (!this.rootFolderMap.size) {
|
||||
await this.populateRootFolderMap(accessToken);
|
||||
}
|
||||
if (!this.rootFolderMap.size) {
|
||||
throw new Error(
|
||||
`The job ${sasJob} was not found in ${this.rootFolderName}`
|
||||
);
|
||||
}
|
||||
|
||||
const headers: any = { "Content-Type": "application/json" };
|
||||
if (!!accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
|
||||
const folderName = sasJob.split("/")[0];
|
||||
const jobName = sasJob.split("/")[1];
|
||||
const jobFolder = this.rootFolderMap.get(folderName);
|
||||
const jobToExecute = jobFolder?.find((item) => item.name === jobName);
|
||||
const jobDefinitionLink = jobToExecute?.links.find(
|
||||
(l) => l.rel === "getResource"
|
||||
);
|
||||
if (!jobDefinitionLink) {
|
||||
throw new Error("Job definition URI was not found.");
|
||||
}
|
||||
const { result: jobDefinition } = await this.request<JobDefinition>(
|
||||
`${this.serverUrl}${jobDefinitionLink.href}`,
|
||||
headers
|
||||
);
|
||||
const linesToExecute = jobDefinition.code
|
||||
.replace(/\r\n/g, "\n")
|
||||
.split("\n");
|
||||
return await this.executeScript(
|
||||
sasJob,
|
||||
linesToExecute,
|
||||
contextName,
|
||||
accessToken,
|
||||
"",
|
||||
false,
|
||||
data,
|
||||
debug
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes a job via the SAS Viya Job Execution API
|
||||
* @param sasJob - the relative path to the job.
|
||||
@@ -590,7 +724,6 @@ export class SASViyaApiClient {
|
||||
let files: any[] = [];
|
||||
if (data && Object.keys(data).length) {
|
||||
files = await this.uploadTables(data, accessToken);
|
||||
console.log("Uploaded table files: ", files);
|
||||
}
|
||||
const jobName = path.basename(sasJob);
|
||||
const jobFolder = sasJob.replace(`/${jobName}`, "");
|
||||
@@ -608,7 +741,7 @@ export class SASViyaApiClient {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
requestInfo.headers = headers;
|
||||
const jobDefinition = await this.request<Job>(
|
||||
const { result: jobDefinition } = await this.request<Job>(
|
||||
`${this.serverUrl}${jobDefinitionLink}`,
|
||||
requestInfo
|
||||
);
|
||||
@@ -647,24 +780,29 @@ export class SASViyaApiClient {
|
||||
arguments: jobArguments,
|
||||
}),
|
||||
};
|
||||
const postedJob = await this.request<Job>(
|
||||
const { result: postedJob, etag } = await this.request<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs?_action=wait`,
|
||||
postJobRequest
|
||||
);
|
||||
const jobStatus = await this.pollJobState(postedJob, accessToken, true);
|
||||
const currentJob = await this.request<Job>(
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
accessToken,
|
||||
true
|
||||
);
|
||||
const { result: currentJob } = await this.request<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs/${postedJob.id}`,
|
||||
{ headers }
|
||||
);
|
||||
|
||||
let result, log;
|
||||
let jobResult, log;
|
||||
if (jobStatus === "failed") {
|
||||
return Promise.reject(currentJob.error);
|
||||
}
|
||||
const resultLink = currentJob.results["_webout.json"];
|
||||
const logLink = currentJob.links.find((l) => l.rel === "log");
|
||||
if (resultLink) {
|
||||
result = await this.request<any>(
|
||||
jobResult = await this.request<any>(
|
||||
`${this.serverUrl}${resultLink}/content`,
|
||||
{ headers },
|
||||
"text"
|
||||
@@ -676,9 +814,11 @@ export class SASViyaApiClient {
|
||||
{
|
||||
headers,
|
||||
}
|
||||
).then((res: any) => res.items.map((i: any) => i.line).join("\n"));
|
||||
).then((res: any) =>
|
||||
res.result.items.map((i: any) => i.line).join("\n")
|
||||
);
|
||||
}
|
||||
return { result, log };
|
||||
return { result: jobResult?.result, log };
|
||||
} else {
|
||||
throw new Error(
|
||||
`The job ${sasJob} was not found at the location ${this.rootFolderName}`
|
||||
@@ -695,14 +835,14 @@ export class SASViyaApiClient {
|
||||
if (accessToken) {
|
||||
requestInfo.headers = { Authorization: `Bearer ${accessToken}` };
|
||||
}
|
||||
const folder = await this.request<Folder>(
|
||||
const { result: folder } = await this.request<Folder>(
|
||||
`${this.serverUrl}${url}`,
|
||||
requestInfo
|
||||
);
|
||||
if (!folder) {
|
||||
throw new Error("Cannot populate RootFolderMap unless rootFolder exists");
|
||||
}
|
||||
const members = await this.request<{ items: any[] }>(
|
||||
const { result: members } = await this.request<{ items: any[] }>(
|
||||
`${this.serverUrl}/folders/folders/${folder.id}/members`,
|
||||
requestInfo
|
||||
);
|
||||
@@ -717,7 +857,7 @@ export class SASViyaApiClient {
|
||||
this.rootFolderName +
|
||||
"/" +
|
||||
member.name;
|
||||
const memberDetail = await this.request<Folder>(
|
||||
const { result: memberDetail } = await this.request<Folder>(
|
||||
`${this.serverUrl}${subFolderUrl}`,
|
||||
requestInfo
|
||||
);
|
||||
@@ -726,7 +866,7 @@ export class SASViyaApiClient {
|
||||
(l: any) => l.rel === "members"
|
||||
);
|
||||
|
||||
const memberContents = await this.request<{ items: any[] }>(
|
||||
const { result: memberContents } = await this.request<{ items: any[] }>(
|
||||
`${this.serverUrl}${membersLink!.href}`,
|
||||
requestInfo
|
||||
);
|
||||
@@ -752,26 +892,28 @@ export class SASViyaApiClient {
|
||||
requestInfo
|
||||
).catch(() => null);
|
||||
|
||||
this.rootFolder = rootFolder;
|
||||
this.rootFolder = rootFolder?.result || null;
|
||||
}
|
||||
|
||||
private async pollJobState(
|
||||
postedJob: any,
|
||||
etag: string | null,
|
||||
accessToken?: string,
|
||||
silent = false
|
||||
) {
|
||||
const MAX_POLL_COUNT = 1000;
|
||||
const POLL_INTERVAL = 300;
|
||||
const POLL_INTERVAL = 100;
|
||||
let postedJobState = "";
|
||||
let pollCount = 0;
|
||||
const headers: any = {
|
||||
"Content-Type": "application/json",
|
||||
"If-None-Match": etag,
|
||||
};
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
const stateLink = postedJob.links.find((l: any) => l.rel === "state");
|
||||
return new Promise((resolve, _) => {
|
||||
return new Promise(async (resolve, _) => {
|
||||
const interval = setInterval(async () => {
|
||||
if (
|
||||
postedJobState === "running" ||
|
||||
@@ -782,8 +924,8 @@ export class SASViyaApiClient {
|
||||
if (!silent) {
|
||||
console.log("Polling job status... \n");
|
||||
}
|
||||
const jobState = await this.request<string>(
|
||||
`${this.serverUrl}${stateLink.href}?wait=30`,
|
||||
const { result: jobState } = await this.request<string>(
|
||||
`${this.serverUrl}${stateLink.href}?_action=wait&wait=30`,
|
||||
{
|
||||
headers,
|
||||
},
|
||||
@@ -807,6 +949,49 @@ export class SASViyaApiClient {
|
||||
});
|
||||
}
|
||||
|
||||
private async waitForSession(
|
||||
session: Session,
|
||||
etag: string | null,
|
||||
accessToken?: string,
|
||||
silent = false
|
||||
) {
|
||||
let sessionState = session.state;
|
||||
let pollCount = 0;
|
||||
const headers: any = {
|
||||
"Content-Type": "application/json",
|
||||
"If-None-Match": etag,
|
||||
};
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`;
|
||||
}
|
||||
const stateLink = session.links.find((l: any) => l.rel === "state");
|
||||
return new Promise(async (resolve, _) => {
|
||||
if (sessionState === "pending") {
|
||||
if (stateLink) {
|
||||
if (!silent) {
|
||||
console.log("Polling session status... \n");
|
||||
}
|
||||
const { result: state } = await this.request<string>(
|
||||
`${this.serverUrl}${stateLink.href}?wait=30`,
|
||||
{
|
||||
headers,
|
||||
},
|
||||
"text"
|
||||
);
|
||||
|
||||
sessionState = state.trim();
|
||||
if (!silent) {
|
||||
console.log(`Current state: ${sessionState}\n`);
|
||||
}
|
||||
pollCount++;
|
||||
resolve(sessionState);
|
||||
}
|
||||
} else {
|
||||
resolve(sessionState);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private async uploadTables(data: any, accessToken?: string) {
|
||||
const uploadedFiles = [];
|
||||
const headers: any = {
|
||||
@@ -830,7 +1015,7 @@ export class SASViyaApiClient {
|
||||
headers,
|
||||
};
|
||||
|
||||
const file = await this.request<any>(
|
||||
const { result: file } = await this.request<any>(
|
||||
`${this.serverUrl}/files/files#rawUpload`,
|
||||
createFileRequest
|
||||
);
|
||||
@@ -848,7 +1033,7 @@ export class SASViyaApiClient {
|
||||
if (accessToken) {
|
||||
requestInfo.headers = { Authorization: `Bearer ${accessToken}` };
|
||||
}
|
||||
const folder = await this.request<Folder>(
|
||||
const { result: folder } = await this.request<Folder>(
|
||||
`${this.serverUrl}${url}`,
|
||||
requestInfo
|
||||
);
|
||||
|
||||
Reference in New Issue
Block a user