diff --git a/src/SASViyaApiClient.ts b/src/SASViyaApiClient.ts index 07d5ad6..a56a17c 100644 --- a/src/SASViyaApiClient.ts +++ b/src/SASViyaApiClient.ts @@ -9,6 +9,7 @@ import * as path from "path"; import { Job, Session, Context, Folder, CsrfToken } from "./types"; import { JobDefinition } from "./types/JobDefinition"; import { formatDataForRequest } from "./utils/formatDataForRequest"; +import { SessionManager } from "./SessionManager"; /** * A client for interfacing with the SAS Viya REST API @@ -18,15 +19,16 @@ export class SASViyaApiClient { constructor( private serverUrl: string, private rootFolderName: string, + private contextName: string, private rootFolderMap = new Map() ) { if (!rootFolderName) { throw new Error("Root folder must be provided."); } } - private csrfToken: { headerName: string; value: string } | null = null; + private csrfToken: CsrfToken | null = null; private rootFolder: Folder | null = null; - private contexts: Context[] = []; + private sessionManager = new SessionManager(this.serverUrl, this.contextName); /** * Returns a map containing the directory structure in the currently set root folder. @@ -209,147 +211,114 @@ export class SASViyaApiClient { if (accessToken) { headers.Authorization = `Bearer ${accessToken}`; } - if (this.csrfToken) { - headers[this.csrfToken.headerName] = this.csrfToken.value; - } - if (!this.contexts.length) { - const { result: contexts } = await this.request<{ items: Context[] }>( - `${this.serverUrl}/compute/contexts`, - { headers } - ); + let executionSessionId: string; + const session = await this.sessionManager.getSession(accessToken); + executionSessionId = session!.id; - this.contexts = - contexts && contexts.items && contexts.items.length - ? contexts.items - : []; + const jobArguments: { [key: string]: any } = { + _contextName: contextName, + _OMITJSONLISTING: true, + _OMITJSONLOG: true, + _OMITSESSIONRESULTS: true, + _OMITTEXTLISTING: true, + _OMITTEXTLOG: true, + }; + + if (debug) { + jobArguments["_OMITTEXTLOG"] = false; + jobArguments["_OMITSESSIONRESULTS"] = false; + jobArguments["_DEBUG"] = 131; } - const executionContext = this.contexts.find( - (c: any) => c.name === contextName + + const fileName = `exec-${ + jobName.includes("/") ? jobName.split("/")[1] : jobName + }`; + + let jobVariables: any = { + SYS_JES_JOB_URI: "", + _program: this.rootFolderName + "/" + jobName, + }; + let files: any[] = []; + if (data) { + if (JSON.stringify(data).includes(";")) { + files = await this.uploadTables(data, accessToken); + jobVariables["_webin_file_count"] = files.length; + files.forEach((fileInfo, index) => { + jobVariables[ + `_webin_fileuri${index + 1}` + ] = `/files/files/${fileInfo.file.id}`; + jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName; + }); + } else { + jobVariables = { ...jobVariables, ...formatDataForRequest(data) }; + } + } + + // Execute job in session + const postJobRequest = { + method: "POST", + headers, + body: JSON.stringify({ + name: fileName, + description: "Powered by SASjs", + code: linesOfCode, + variables: jobVariables, + arguments: jobArguments, + }), + }; + const { result: postedJob, etag } = await this.request( + `${this.serverUrl}/compute/sessions/${executionSessionId}/jobs`, + postJobRequest + ); + if (!silent) { + console.log(`Job has been submitted for ${fileName}`); + console.log( + `You can monitor the job progress at ${this.serverUrl}${ + postedJob.links.find((l: any) => l.rel === "state")!.href + }` + ); + } + + const jobStatus = await this.pollJobState( + postedJob, + etag, + accessToken, + silent + ); + const { result: currentJob } = await this.request( + `${this.serverUrl}/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`, + { headers } ); - if (executionContext) { - // Request new session in context or use the ID passed in - let executionSessionId: string; - if (sessionId) { - executionSessionId = sessionId; - } else { - const createSessionRequest = { - method: "POST", - headers, - }; - const { result: createdSession, etag } = await this.request( - `${this.serverUrl}/compute/contexts/${executionContext.id}/sessions`, - createSessionRequest - ); - - await this.waitForSession(createdSession, etag); - - executionSessionId = createdSession.id; - } - - let jobArguments: { [key: string]: any } = { - _contextName: contextName, - _OMITJSONLISTING: true, - _OMITJSONLOG: true, - _OMITSESSIONRESULTS: true, - _OMITTEXTLISTING: true, - _OMITTEXTLOG: true, - }; - - if (debug) { - jobArguments["_OMITTEXTLOG"] = false; - jobArguments["_OMITSESSIONRESULTS"] = false; - jobArguments["_DEBUG"] = 131; - } - - const fileName = `exec-${ - jobName.includes("/") ? jobName.split("/")[1] : jobName - }`; - - let jobVariables: any = { SYS_JES_JOB_URI: "", _program: jobName }; - let files: any[] = []; - if (data) { - if (JSON.stringify(data).includes(";")) { - files = await this.uploadTables(data, accessToken); - jobVariables["_webin_file_count"] = files.length; - files.forEach((fileInfo, index) => { - jobVariables[ - `_webin_fileuri${index + 1}` - ] = `/files/files/${fileInfo.file.id}`; - jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName; - }); - } else { - jobVariables = { ...jobVariables, ...formatDataForRequest(data) }; - } - } - - // Execute job in session - const postJobRequest = { - method: "POST", - headers, - body: JSON.stringify({ - name: fileName, - description: "Powered by SASjs", - code: linesOfCode, - variables: jobVariables, - arguments: jobArguments, - }), - }; - const { result: postedJob, etag } = await this.request( - `${this.serverUrl}/compute/sessions/${executionSessionId}/jobs`, - postJobRequest - ); - if (!silent) { - console.log(`Job has been submitted for ${fileName}`); - console.log( - `You can monitor the job progress at ${this.serverUrl}${ - postedJob.links.find((l: any) => l.rel === "state")!.href - }` - ); - } - - const jobStatus = await this.pollJobState( - postedJob, - etag, - accessToken, - silent - ); - const { result: currentJob } = await this.request( - `${this.serverUrl}/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`, - { headers } - ); - - let jobResult, log; - if (jobStatus === "failed" || jobStatus === "error") { - return Promise.reject(currentJob.error); - } - const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`; - const logLink = currentJob.links.find((l) => l.rel === "log"); - if (resultLink) { - jobResult = await this.request( - `${this.serverUrl}${resultLink}`, - { headers }, - "text" - ); - } - - if (true && logLink) { - log = await this.request( - `${this.serverUrl}${logLink.href}/content`, - { - headers, - } - ).then((res: any) => - res.result.items.map((i: any) => i.line).join("\n") - ); - } - return { result: jobResult?.result, log }; - } else { - console.error( - `Unable to find execution context ${contextName}.\nPlease check the contextName in the tgtDeployVars and try again.` - ); - console.error("Response from server: ", JSON.stringify(this.contexts)); + let jobResult, log; + if (jobStatus === "failed" || jobStatus === "error") { + return Promise.reject(currentJob.error); } + const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`; + const logLink = currentJob.links.find((l) => l.rel === "log"); + if (resultLink) { + jobResult = await this.request( + `${this.serverUrl}${resultLink}`, + { headers }, + "text" + ); + } + + if (true && logLink) { + log = await this.request( + `${this.serverUrl}${logLink.href}/content`, + { + headers, + } + ).then((res: any) => res.result.items.map((i: any) => i.line).join("\n")); + } + return { result: jobResult?.result, log }; + // } else { + // console.error( + // `Unable to find execution context ${contextName}.\nPlease check the contextName in the tgtDeployVars and try again.` + // ); + // console.error("Response from server: ", JSON.stringify(this.contexts)); + // } } /** diff --git a/src/SASjs.ts b/src/SASjs.ts index 2434b98..5d3d911 100644 --- a/src/SASjs.ts +++ b/src/SASjs.ts @@ -448,7 +448,11 @@ export default class SASjs { appLoc = this.sasjsConfig.appLoc; } if (this.sasjsConfig.serverType === ServerType.SASViya) { - sasApiClient = new SASViyaApiClient(serverUrl, appLoc); + sasApiClient = new SASViyaApiClient( + serverUrl, + appLoc, + this.sasjsConfig.contextName + ); } else if (this.sasjsConfig.serverType === ServerType.SAS9) { sasApiClient = new SAS9ApiClient(serverUrl); } @@ -1066,7 +1070,8 @@ export default class SASjs { else this.sasViyaApiClient = new SASViyaApiClient( this.sasjsConfig.serverUrl, - this.sasjsConfig.appLoc + this.sasjsConfig.appLoc, + this.sasjsConfig.contextName ); } if (this.sasjsConfig.serverType === ServerType.SAS9) { diff --git a/src/SessionManager.ts b/src/SessionManager.ts new file mode 100644 index 0000000..cda6018 --- /dev/null +++ b/src/SessionManager.ts @@ -0,0 +1,138 @@ +import { Session, Context, CsrfToken } from "./types"; +import { asyncForEach, makeRequest } from "./utils"; + +const MAX_SESSION_COUNT = 1; + +export class SessionManager { + constructor(private serverUrl: string, private contextName: string) {} + private sessions: Session[] = []; + private currentContext: Context | null = null; + private csrfToken: CsrfToken | null = null; + + async getSession(accessToken?: string) { + await this.createSessions(accessToken); + this.createAndWaitForSession(accessToken); + return this.sessions.pop(); + } + + private async createSessions(accessToken?: string) { + if (!this.sessions.length) { + if (!this.currentContext) { + await this.setCurrentContext(accessToken); + } + await asyncForEach(new Array(MAX_SESSION_COUNT), async () => { + const createdSession = await this.createAndWaitForSession(accessToken); + this.sessions.push(createdSession); + }); + } + } + + private async createAndWaitForSession(accessToken?: string) { + const createSessionRequest = { + method: "POST", + headers: this.getHeaders(accessToken), + }; + const { result: createdSession, etag } = await this.request( + `${this.serverUrl}/compute/contexts/${this.currentContext!.id}/sessions`, + createSessionRequest + ); + + await this.waitForSession(createdSession, etag); + return createdSession; + } + + private async setCurrentContext(accessToken?: string) { + if (!this.currentContext) { + const { result: contexts } = await this.request<{ + items: Context[]; + }>(`${this.serverUrl}/compute/contexts`, { + headers: this.getHeaders(accessToken), + }); + + const contextsList = + contexts && contexts.items && contexts.items.length + ? contexts.items + : []; + + const currentContext = contextsList.find( + (c: any) => c.name === this.contextName + ); + + if (!currentContext) { + throw new Error( + `The context ${this.contextName} was not found on the server ${this.serverUrl}` + ); + } + + this.currentContext = currentContext; + } + } + + private getHeaders(accessToken?: string) { + const headers: any = { + "Content-Type": "application/json", + }; + if (accessToken) { + headers.Authorization = `Bearer ${accessToken}`; + } + + return headers; + } + + private async waitForSession( + session: Session, + etag: string | null, + accessToken?: string, + silent = false + ) { + let sessionState = session.state; + const headers: any = { + ...this.getHeaders(accessToken), + "If-None-Match": etag, + }; + const stateLink = session.links.find((l: any) => l.rel === "state"); + return new Promise(async (resolve, _) => { + if (sessionState === "pending") { + if (stateLink) { + if (!silent) { + console.log("Polling session status... \n"); + } + const { result: state } = await this.request( + `${this.serverUrl}${stateLink.href}?wait=30`, + { + headers, + }, + "text" + ); + + sessionState = state.trim(); + if (!silent) { + console.log(`Current state: ${sessionState}\n`); + } + resolve(sessionState); + } + } else { + resolve(sessionState); + } + }); + } + + private async request( + url: string, + options: RequestInit, + contentType: "text" | "json" = "json" + ) { + if (this.csrfToken) { + options.headers = { + ...options.headers, + [this.csrfToken.headerName]: this.csrfToken.value, + }; + } + return await makeRequest( + url, + options, + (token) => (this.csrfToken = token), + contentType + ); + } +}