1
0
mirror of https://github.com/sasjs/adapter.git synced 2026-01-06 20:10:05 +00:00

feat(session-manager): Manage a pool of sessions for job execution

This commit is contained in:
Krishna Acondy
2020-07-17 08:31:27 +01:00
parent a579f481c5
commit a12244cf78
3 changed files with 252 additions and 140 deletions

View File

@@ -9,6 +9,7 @@ import * as path from "path";
import { Job, Session, Context, Folder, CsrfToken } from "./types";
import { JobDefinition } from "./types/JobDefinition";
import { formatDataForRequest } from "./utils/formatDataForRequest";
import { SessionManager } from "./SessionManager";
/**
* A client for interfacing with the SAS Viya REST API
@@ -18,15 +19,16 @@ export class SASViyaApiClient {
constructor(
private serverUrl: string,
private rootFolderName: string,
private contextName: string,
private rootFolderMap = new Map<string, Job[]>()
) {
if (!rootFolderName) {
throw new Error("Root folder must be provided.");
}
}
private csrfToken: { headerName: string; value: string } | null = null;
private csrfToken: CsrfToken | null = null;
private rootFolder: Folder | null = null;
private contexts: Context[] = [];
private sessionManager = new SessionManager(this.serverUrl, this.contextName);
/**
* Returns a map containing the directory structure in the currently set root folder.
@@ -209,147 +211,114 @@ export class SASViyaApiClient {
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`;
}
if (this.csrfToken) {
headers[this.csrfToken.headerName] = this.csrfToken.value;
}
if (!this.contexts.length) {
const { result: contexts } = await this.request<{ items: Context[] }>(
`${this.serverUrl}/compute/contexts`,
{ headers }
);
let executionSessionId: string;
const session = await this.sessionManager.getSession(accessToken);
executionSessionId = session!.id;
this.contexts =
contexts && contexts.items && contexts.items.length
? contexts.items
: [];
const jobArguments: { [key: string]: any } = {
_contextName: contextName,
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: true,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: true,
};
if (debug) {
jobArguments["_OMITTEXTLOG"] = false;
jobArguments["_OMITSESSIONRESULTS"] = false;
jobArguments["_DEBUG"] = 131;
}
const executionContext = this.contexts.find(
(c: any) => c.name === contextName
const fileName = `exec-${
jobName.includes("/") ? jobName.split("/")[1] : jobName
}`;
let jobVariables: any = {
SYS_JES_JOB_URI: "",
_program: this.rootFolderName + "/" + jobName,
};
let files: any[] = [];
if (data) {
if (JSON.stringify(data).includes(";")) {
files = await this.uploadTables(data, accessToken);
jobVariables["_webin_file_count"] = files.length;
files.forEach((fileInfo, index) => {
jobVariables[
`_webin_fileuri${index + 1}`
] = `/files/files/${fileInfo.file.id}`;
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName;
});
} else {
jobVariables = { ...jobVariables, ...formatDataForRequest(data) };
}
}
// Execute job in session
const postJobRequest = {
method: "POST",
headers,
body: JSON.stringify({
name: fileName,
description: "Powered by SASjs",
code: linesOfCode,
variables: jobVariables,
arguments: jobArguments,
}),
};
const { result: postedJob, etag } = await this.request<Job>(
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs`,
postJobRequest
);
if (!silent) {
console.log(`Job has been submitted for ${fileName}`);
console.log(
`You can monitor the job progress at ${this.serverUrl}${
postedJob.links.find((l: any) => l.rel === "state")!.href
}`
);
}
const jobStatus = await this.pollJobState(
postedJob,
etag,
accessToken,
silent
);
const { result: currentJob } = await this.request<Job>(
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
{ headers }
);
if (executionContext) {
// Request new session in context or use the ID passed in
let executionSessionId: string;
if (sessionId) {
executionSessionId = sessionId;
} else {
const createSessionRequest = {
method: "POST",
headers,
};
const { result: createdSession, etag } = await this.request<Session>(
`${this.serverUrl}/compute/contexts/${executionContext.id}/sessions`,
createSessionRequest
);
await this.waitForSession(createdSession, etag);
executionSessionId = createdSession.id;
}
let jobArguments: { [key: string]: any } = {
_contextName: contextName,
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: true,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: true,
};
if (debug) {
jobArguments["_OMITTEXTLOG"] = false;
jobArguments["_OMITSESSIONRESULTS"] = false;
jobArguments["_DEBUG"] = 131;
}
const fileName = `exec-${
jobName.includes("/") ? jobName.split("/")[1] : jobName
}`;
let jobVariables: any = { SYS_JES_JOB_URI: "", _program: jobName };
let files: any[] = [];
if (data) {
if (JSON.stringify(data).includes(";")) {
files = await this.uploadTables(data, accessToken);
jobVariables["_webin_file_count"] = files.length;
files.forEach((fileInfo, index) => {
jobVariables[
`_webin_fileuri${index + 1}`
] = `/files/files/${fileInfo.file.id}`;
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName;
});
} else {
jobVariables = { ...jobVariables, ...formatDataForRequest(data) };
}
}
// Execute job in session
const postJobRequest = {
method: "POST",
headers,
body: JSON.stringify({
name: fileName,
description: "Powered by SASjs",
code: linesOfCode,
variables: jobVariables,
arguments: jobArguments,
}),
};
const { result: postedJob, etag } = await this.request<Job>(
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs`,
postJobRequest
);
if (!silent) {
console.log(`Job has been submitted for ${fileName}`);
console.log(
`You can monitor the job progress at ${this.serverUrl}${
postedJob.links.find((l: any) => l.rel === "state")!.href
}`
);
}
const jobStatus = await this.pollJobState(
postedJob,
etag,
accessToken,
silent
);
const { result: currentJob } = await this.request<Job>(
`${this.serverUrl}/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
{ headers }
);
let jobResult, log;
if (jobStatus === "failed" || jobStatus === "error") {
return Promise.reject(currentJob.error);
}
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`;
const logLink = currentJob.links.find((l) => l.rel === "log");
if (resultLink) {
jobResult = await this.request<any>(
`${this.serverUrl}${resultLink}`,
{ headers },
"text"
);
}
if (true && logLink) {
log = await this.request<any>(
`${this.serverUrl}${logLink.href}/content`,
{
headers,
}
).then((res: any) =>
res.result.items.map((i: any) => i.line).join("\n")
);
}
return { result: jobResult?.result, log };
} else {
console.error(
`Unable to find execution context ${contextName}.\nPlease check the contextName in the tgtDeployVars and try again.`
);
console.error("Response from server: ", JSON.stringify(this.contexts));
let jobResult, log;
if (jobStatus === "failed" || jobStatus === "error") {
return Promise.reject(currentJob.error);
}
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`;
const logLink = currentJob.links.find((l) => l.rel === "log");
if (resultLink) {
jobResult = await this.request<any>(
`${this.serverUrl}${resultLink}`,
{ headers },
"text"
);
}
if (true && logLink) {
log = await this.request<any>(
`${this.serverUrl}${logLink.href}/content`,
{
headers,
}
).then((res: any) => res.result.items.map((i: any) => i.line).join("\n"));
}
return { result: jobResult?.result, log };
// } else {
// console.error(
// `Unable to find execution context ${contextName}.\nPlease check the contextName in the tgtDeployVars and try again.`
// );
// console.error("Response from server: ", JSON.stringify(this.contexts));
// }
}
/**

View File

@@ -448,7 +448,11 @@ export default class SASjs {
appLoc = this.sasjsConfig.appLoc;
}
if (this.sasjsConfig.serverType === ServerType.SASViya) {
sasApiClient = new SASViyaApiClient(serverUrl, appLoc);
sasApiClient = new SASViyaApiClient(
serverUrl,
appLoc,
this.sasjsConfig.contextName
);
} else if (this.sasjsConfig.serverType === ServerType.SAS9) {
sasApiClient = new SAS9ApiClient(serverUrl);
}
@@ -1066,7 +1070,8 @@ export default class SASjs {
else
this.sasViyaApiClient = new SASViyaApiClient(
this.sasjsConfig.serverUrl,
this.sasjsConfig.appLoc
this.sasjsConfig.appLoc,
this.sasjsConfig.contextName
);
}
if (this.sasjsConfig.serverType === ServerType.SAS9) {

138
src/SessionManager.ts Normal file
View File

@@ -0,0 +1,138 @@
import { Session, Context, CsrfToken } from "./types";
import { asyncForEach, makeRequest } from "./utils";
const MAX_SESSION_COUNT = 1;
export class SessionManager {
constructor(private serverUrl: string, private contextName: string) {}
private sessions: Session[] = [];
private currentContext: Context | null = null;
private csrfToken: CsrfToken | null = null;
async getSession(accessToken?: string) {
await this.createSessions(accessToken);
this.createAndWaitForSession(accessToken);
return this.sessions.pop();
}
private async createSessions(accessToken?: string) {
if (!this.sessions.length) {
if (!this.currentContext) {
await this.setCurrentContext(accessToken);
}
await asyncForEach(new Array(MAX_SESSION_COUNT), async () => {
const createdSession = await this.createAndWaitForSession(accessToken);
this.sessions.push(createdSession);
});
}
}
private async createAndWaitForSession(accessToken?: string) {
const createSessionRequest = {
method: "POST",
headers: this.getHeaders(accessToken),
};
const { result: createdSession, etag } = await this.request<Session>(
`${this.serverUrl}/compute/contexts/${this.currentContext!.id}/sessions`,
createSessionRequest
);
await this.waitForSession(createdSession, etag);
return createdSession;
}
private async setCurrentContext(accessToken?: string) {
if (!this.currentContext) {
const { result: contexts } = await this.request<{
items: Context[];
}>(`${this.serverUrl}/compute/contexts`, {
headers: this.getHeaders(accessToken),
});
const contextsList =
contexts && contexts.items && contexts.items.length
? contexts.items
: [];
const currentContext = contextsList.find(
(c: any) => c.name === this.contextName
);
if (!currentContext) {
throw new Error(
`The context ${this.contextName} was not found on the server ${this.serverUrl}`
);
}
this.currentContext = currentContext;
}
}
private getHeaders(accessToken?: string) {
const headers: any = {
"Content-Type": "application/json",
};
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`;
}
return headers;
}
private async waitForSession(
session: Session,
etag: string | null,
accessToken?: string,
silent = false
) {
let sessionState = session.state;
const headers: any = {
...this.getHeaders(accessToken),
"If-None-Match": etag,
};
const stateLink = session.links.find((l: any) => l.rel === "state");
return new Promise(async (resolve, _) => {
if (sessionState === "pending") {
if (stateLink) {
if (!silent) {
console.log("Polling session status... \n");
}
const { result: state } = await this.request<string>(
`${this.serverUrl}${stateLink.href}?wait=30`,
{
headers,
},
"text"
);
sessionState = state.trim();
if (!silent) {
console.log(`Current state: ${sessionState}\n`);
}
resolve(sessionState);
}
} else {
resolve(sessionState);
}
});
}
private async request<T>(
url: string,
options: RequestInit,
contentType: "text" | "json" = "json"
) {
if (this.csrfToken) {
options.headers = {
...options.headers,
[this.csrfToken.headerName]: this.csrfToken.value,
};
}
return await makeRequest<T>(
url,
options,
(token) => (this.csrfToken = token),
contentType
);
}
}