diff --git a/README.md b/README.md index e3baeb7..b08ef9c 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,14 @@ The response object will contain returned tables and columns. Table names are a The adapter will also cache the logs (if debug enabled) and even the work tables. For performance, it is best to keep debug mode off. +### Session Manager + +To execute a script on Viya a session has to be created first which is time-consuming (~15sec). That is why a Session Manager has been created which is implementing the following logic: + +1. When the first session is requested, we also create one more session (hot session) for future requests. Please notice two pending POST requests to create a session within the same context: ![the first session request](./screenshots/session-manager-first-request.png) +2. When a subsequent request for a session is received and there is a hot session available (not expired), this session is returned and an asynchronous request to create another hot session is sent. Please notice that there is a pending POST request to create a new session while a job has been already finished execution (POST request with status 201): ![subsequent session request](./screenshots/subsequent-session-request.png) +3. When a subsequent request for a session is received and there is no available hot session, 2 requests are sent asynchronously to create a session. The first created session will be returned and another session will be reserved for future requests. + ### Variable Types The SAS type (char/numeric) of the values is determined according to a set of rules: diff --git a/screenshots/session-manager-first-request.png b/screenshots/session-manager-first-request.png new file mode 100644 index 0000000..0556504 Binary files /dev/null and b/screenshots/session-manager-first-request.png differ diff --git a/screenshots/subsequent-session-request.png b/screenshots/subsequent-session-request.png new file mode 100644 index 0000000..57055d5 Binary files /dev/null and b/screenshots/subsequent-session-request.png differ diff --git a/src/SessionManager.ts b/src/SessionManager.ts index 1f3ff5f..bd7c535 100644 --- a/src/SessionManager.ts +++ b/src/SessionManager.ts @@ -6,6 +6,10 @@ import { RequestClient } from './request/RequestClient' const MAX_SESSION_COUNT = 1 +interface ApiErrorResponse { + response: { status: number | string; data: { message: string } } +} + export class SessionManager { private loggedErrors: NoSessionStateError[] = [] @@ -17,8 +21,10 @@ export class SessionManager { if (serverUrl) isUrl(serverUrl) } + // INFO: session pool private sessions: Session[] = [] private currentContext: Context | null = null + private settingContext: boolean = false private _debug: boolean = false private printedSessionState = { printed: false, @@ -33,71 +39,230 @@ export class SessionManager { this._debug = value } - async getSession(accessToken?: string) { - await this.createSessions(accessToken) - await this.createAndWaitForSession(accessToken) - const session = this.sessions.pop() + /** + * Checks if session is valid. Session is considered valid if time since it's creation is less than 'sessionInactiveTimeout' attribute. + * @param session - session object. + * @returns - boolean indicating if session is valid. + */ + private isSessionValid(session: Session): boolean { + if (!session) return false + const secondsSinceSessionCreation = - (new Date().getTime() - new Date(session!.creationTimeStamp).getTime()) / + (new Date().getTime() - new Date(session.creationTimeStamp).getTime()) / 1000 if ( !session!.attributes || secondsSinceSessionCreation >= session!.attributes.sessionInactiveTimeout ) { - await this.createSessions(accessToken) - const freshSession = this.sessions.pop() - - return freshSession + return false + } else { + return true } - - return session } - async clearSession(id: string, accessToken?: string) { + /** + * Removes session from pool of hot sessions. + * @param session - session object. + * @returns - void. + */ + private removeSessionFromPool(session: Session): void { + this.sessions = this.sessions.filter((ses) => ses.id !== session.id) + } + + /** + * Filters session pool to keep only valid sessions. + * @param session - session object. + * @returns - void. + */ + private removeExpiredSessions(): void { + this.sessions = this.sessions.filter((session) => + this.isSessionValid(session) + ) + } + + /** + * Throws set of errors as a single error. + * @param errors - array of errors or string. + * @param prefix - an optional final error prefix. + * @returns - never. + */ + private throwErrors(errors: (Error | string)[], prefix?: string): never { + throw prefix + ? prefixMessage(new Error(errors.join('. ')), prefix) + : new Error( + errors + .map((err) => + (err as Error).message ? (err as Error).message : err + ) + .join('. ') + ) + } + + /** + * Returns session. + * If there is a hot session available, it will be returned immediately and an asynchronous request to create new hot session will be submitted. + * If there is no available session, 2 session creation requests will be submitted. The session is returned once it is created and ready. + * @param accessToken - an optional access token. + * @returns - a promise which resolves with a session. + */ + async getSession(accessToken?: string) { + const errors: (Error | string)[] = [] + let isErrorThrown = false + + const throwIfError = () => { + if (errors.length && !isErrorThrown) { + isErrorThrown = true + + this.throwErrors(errors) + } + } + + this.removeExpiredSessions() + + if (this.sessions.length) { + const session = this.sessions[0] + + this.removeSessionFromPool(session) + + this.createSessions(accessToken).catch((err) => { + errors.push(err) + }) + + this.createAndWaitForSession(accessToken).catch((err) => { + errors.push(err) + }) + + throwIfError() + + return session + } else { + this.createSessions(accessToken).catch((err) => { + errors.push(err) + }) + + await this.createAndWaitForSession(accessToken).catch((err) => { + errors.push(err) + }) + + this.removeExpiredSessions() + + const session = this.sessions.pop()! + + this.removeSessionFromPool(session) + + throwIfError() + + return session + } + } + + /** + * Returns error message based on the response from SAS API. + * @param err - an optional access token. + * @param accessToken - an optional access token. + * @returns - an error message. + */ + private getErrorMessage( + err: ApiErrorResponse, + url: string, + method: 'GET' | 'POST' | 'DELETE' + ) { + return ( + `${method} request to ${url} failed with status code ${ + err.response.status || 'unknown' + }. ` + err.response.data.message || '' + ) + } + + /** + * Deletes session. + * @param id - a session id. + * @param accessToken - an optional access token. + * @returns - a promise which resolves when session is deleted. + */ + async clearSession(id: string, accessToken?: string): Promise { + const url = `/compute/sessions/${id}` + return await this.requestClient - .delete(`/compute/sessions/${id}`, accessToken) + .delete(url, accessToken) .then(() => { this.sessions = this.sessions.filter((s) => s.id !== id) }) - .catch((err) => { - throw prefixMessage(err, 'Error while deleting session. ') + .catch((err: ApiErrorResponse) => { + throw prefixMessage( + this.getErrorMessage(err, url, 'DELETE'), + 'Error while deleting session. ' + ) }) } - private async createSessions(accessToken?: string) { + /** + * Creates sessions in amount equal to MAX_SESSION_COUNT. + * @param accessToken - an optional access token. + * @returns - a promise which resolves when required amount of sessions is created. + */ + private async createSessions(accessToken?: string): Promise { + const errors: (Error | string)[] = [] + if (!this.sessions.length) { - if (!this.currentContext) { - await this.setCurrentContext(accessToken).catch((err) => { - throw err - }) - } - await asyncForEach(new Array(MAX_SESSION_COUNT), async () => { - const createdSession = await this.createAndWaitForSession( - accessToken - ).catch((err) => { - throw err + await this.createAndWaitForSession(accessToken).catch((err) => { + errors.push(err) }) - - this.sessions.push(createdSession) - }).catch((err) => { - throw err }) } + + if (errors.length) { + this.throwErrors(errors, 'Error while creating session. ') + } } - private async createAndWaitForSession(accessToken?: string) { + /** + * Waits for the current context to be set. + * @returns - a promise which resolves when current context is set. + */ + private async waitForCurrentContext(): Promise { + return new Promise((resolve) => { + const timer = setInterval(() => { + if (this.currentContext) { + this.settingContext = false + + clearInterval(timer) + + resolve() + } + }, 100) + }) + } + + /** + * Creates and waits for session to be ready. + * @param accessToken - an optional access token. + * @returns - a promise which resolves with a session. + */ + private async createAndWaitForSession( + accessToken?: string + ): Promise { + if (!this.currentContext) { + if (!this.settingContext) { + await this.setCurrentContext(accessToken) + } else { + await this.waitForCurrentContext() + } + } + + const url = `${this.serverUrl}/compute/contexts/${ + this.currentContext!.id + }/sessions` + const { result: createdSession, etag } = await this.requestClient - .post( - `${this.serverUrl}/compute/contexts/${ - this.currentContext!.id - }/sessions`, - {}, - accessToken - ) - .catch((err) => { - throw err + .post(url, {}, accessToken) + .catch((err: ApiErrorResponse) => { + throw prefixMessage( + this.getErrorMessage(err, url, 'POST'), + `Error while creating session. ` + ) }) await this.waitForSession(createdSession, etag, accessToken) @@ -107,14 +272,26 @@ export class SessionManager { return createdSession } - private async setCurrentContext(accessToken?: string) { + /** + * Sets current context. + * @param accessToken - an optional access token. + * @returns - a promise which resolves when current context is set. + */ + private async setCurrentContext(accessToken?: string): Promise { if (!this.currentContext) { + const url = `${this.serverUrl}/compute/contexts?limit=10000` + + this.settingContext = true + const { result: contexts } = await this.requestClient .get<{ items: Context[] - }>(`${this.serverUrl}/compute/contexts?limit=10000`, accessToken) - .catch((err) => { - throw err + }>(url, accessToken) + .catch((err: ApiErrorResponse) => { + throw prefixMessage( + this.getErrorMessage(err, url, 'GET'), + `Error while getting list of contexts. ` + ) }) const contextsList = @@ -138,18 +315,13 @@ export class SessionManager { } } - private getHeaders(accessToken?: string) { - const headers: any = { - 'Content-Type': 'application/json' - } - - if (accessToken) { - headers.Authorization = `Bearer ${accessToken}` - } - - return headers - } - + /** + * Waits for session to be ready. + * @param session - a session object. + * @param etag - an etag that can be a string or null. + * @param accessToken - an optional access token. + * @returns - a promise which resolves with a session state. + */ private async waitForSession( session: Session, etag: string | null, @@ -173,13 +345,11 @@ export class SessionManager { this.printedSessionState.printed = true } + const url = `${this.serverUrl}${stateLink.href}?wait=30` + const { result: state, responseStatus: responseStatus } = - await this.getSessionState( - `${this.serverUrl}${stateLink.href}?wait=30`, - etag!, - accessToken - ).catch((err) => { - throw prefixMessage(err, 'Error while getting session state.') + await this.getSessionState(url, etag!, accessToken).catch((err) => { + throw prefixMessage(err, 'Error while waiting for session. ') }) sessionState = state.trim() @@ -216,7 +386,7 @@ export class SessionManager { return sessionState } else { - throw 'Error while getting session state link.' + throw 'Error while getting session state link. ' } } else { this.loggedErrors = [] @@ -225,11 +395,21 @@ export class SessionManager { } } + /** + * Gets session state. + * @param url - a URL to get session state. + * @param etag - an etag string. + * @param accessToken - an optional access token. + * @returns - a promise which resolves with a result string and response status. + */ private async getSessionState( url: string, etag: string, accessToken?: string - ) { + ): Promise<{ + result: string + responseStatus: number + }> { return await this.requestClient .get(url, accessToken, 'text/plain', { 'If-None-Match': etag }) .then((res) => ({ @@ -237,20 +417,37 @@ export class SessionManager { responseStatus: res.status })) .catch((err) => { - throw err + throw prefixMessage( + this.getErrorMessage(err, url, 'GET'), + 'Error while getting session state. ' + ) }) } - async getVariable(sessionId: string, variable: string, accessToken?: string) { + /** + * Gets variable. + * @param sessionId - a session id. + * @param variable - a variable string. + * @param accessToken - an optional access token. + * @returns - a promise which resolves with a result that confirms to SessionVariable interface, etag string and status code. + */ + async getVariable( + sessionId: string, + variable: string, + accessToken?: string + ): Promise<{ + result: SessionVariable + etag: string + status: number + }> { + const url = `${this.serverUrl}/compute/sessions/${sessionId}/variables/${variable}` + return await this.requestClient - .get( - `${this.serverUrl}/compute/sessions/${sessionId}/variables/${variable}`, - accessToken - ) + .get(url, accessToken) .catch((err) => { throw prefixMessage( - err, - `Error while fetching session variable '${variable}'.` + this.getErrorMessage(err, url, 'GET'), + `Error while fetching session variable '${variable}'. ` ) }) } diff --git a/src/test/SessionManager.spec.ts b/src/test/SessionManager.spec.ts index f40205a..b77c6da 100644 --- a/src/test/SessionManager.spec.ts +++ b/src/test/SessionManager.spec.ts @@ -3,10 +3,11 @@ import { RequestClient } from '../request/RequestClient' import * as dotenv from 'dotenv' import axios from 'axios' import { Logger, LogLevel } from '@sasjs/utils' -import { Session } from '../types' +import { Session, Context } from '../types' jest.mock('axios') const mockedAxios = axios as jest.Mocked +const requestClient = new (>RequestClient)() describe('SessionManager', () => { dotenv.config() @@ -14,9 +15,23 @@ describe('SessionManager', () => { const sessionManager = new SessionManager( process.env.SERVER_URL as string, process.env.DEFAULT_COMPUTE_CONTEXT as string, - new RequestClient('https://sample.server.com') + requestClient ) + const getMockSession = () => ({ + id: ['id', new Date().getTime(), Math.random()].join('-'), + state: '', + links: [{ rel: 'state', href: '', uri: '', type: '', method: 'GET' }], + attributes: { + sessionInactiveTimeout: 900 + }, + creationTimeStamp: `${new Date(new Date().getTime()).toISOString()}` + }) + + afterEach(() => { + jest.restoreAllMocks() + }) + describe('getVariable', () => { it('should fetch session variable', async () => { const sampleResponse = { @@ -45,6 +60,30 @@ describe('SessionManager', () => { ) ).resolves.toEqual(expectedResponse) }) + + it('should throw an error if GET request failed', async () => { + const responseStatus = 500 + const responseErrorMessage = `The process timed out after 60 seconds. Request failed with status code ${responseStatus}` + const response = { + status: responseStatus, + data: { + message: responseErrorMessage + } + } + const testVariable = 'testVariable' + + jest.spyOn(requestClient, 'get').mockImplementation(() => + Promise.reject({ + response + }) + ) + + const expectedError = `Error while fetching session variable '${testVariable}'. GET request to ${process.env.SERVER_URL}/compute/sessions/testId/variables/${testVariable} failed with status code ${responseStatus}. ${responseErrorMessage}` + + await expect( + sessionManager.getVariable('testId', testVariable) + ).rejects.toEqual(expectedError) + }) }) describe('waitForSession', () => { @@ -115,11 +154,25 @@ describe('SessionManager', () => { }) it('should throw an error if could not get session state', async () => { - mockedAxios.get.mockImplementation(() => Promise.reject('Mocked error')) + const gettingSessionStatus = 500 + const sessionStatusError = `Getting session status timed out after 60 seconds. Request failed with status code ${gettingSessionStatus}` + + mockedAxios.get.mockImplementation(() => + Promise.reject({ + response: { + status: gettingSessionStatus, + data: { + message: sessionStatusError + } + } + }) + ) + + const expectedError = `Error while waiting for session. Error while getting session state. GET request to ${process.env.SERVER_URL}?wait=30 failed with status code ${gettingSessionStatus}. ${sessionStatusError}` await expect( sessionManager['waitForSession'](session, null, 'access_token') - ).rejects.toContain('Error while getting session state.') + ).rejects.toEqual(expectedError) }) it('should return session state', async () => { @@ -135,4 +188,243 @@ describe('SessionManager', () => { ).resolves.toEqual(customSession.state) }) }) + + describe('isSessionValid', () => { + const session: Session = getMockSession() + + it('should return false if not a session provided', () => { + expect(sessionManager['isSessionValid'](undefined as any)).toEqual(false) + }) + + it('should return true if session is not expired', () => { + expect(sessionManager['isSessionValid'](session)).toEqual(true) + }) + + it('should return false if session is expired', () => { + session.creationTimeStamp = `${new Date( + new Date().getTime() - + (session.attributes.sessionInactiveTimeout * 1000 + 1000) + ).toISOString()}` + expect(sessionManager['isSessionValid'](session)).toEqual(false) + }) + }) + + describe('removeSessionFromPool', () => { + it('should remove session from the pool of sessions', () => { + const session: Session = getMockSession() + const sessions: Session[] = [getMockSession(), session] + + sessionManager['sessions'] = sessions + sessionManager['removeSessionFromPool'](session) + + expect(sessionManager['sessions'].length).toEqual(1) + }) + }) + + describe('getSession', () => { + it('should return session if there is a valid session and create new session', async () => { + jest + .spyOn(sessionManager as any, 'createAndWaitForSession') + .mockImplementation(async () => Promise.resolve(getMockSession())) + + const session = getMockSession() + sessionManager['sessions'] = [session] + + await expect(sessionManager.getSession()).resolves.toEqual(session) + expect(sessionManager['createAndWaitForSession']).toHaveBeenCalled() + }) + + it('should return a session and keep one session if there is no sessions available', async () => { + jest + .spyOn(sessionManager as any, 'createAndWaitForSession') + .mockImplementation(async () => { + const session = getMockSession() + sessionManager['sessions'].push(session) + + return Promise.resolve(session) + }) + + const session = await sessionManager.getSession() + + expect(Object.keys(session)).toEqual(Object.keys(getMockSession())) + expect(sessionManager['createAndWaitForSession']).toHaveBeenCalledTimes(2) + expect(sessionManager['sessions'].length).toEqual(1) + }) + + it.concurrent( + 'should throw an error if session creation request returned 500', + async () => { + const sessionCreationStatus = 500 + const sessionCreationError = `The process initialization for the Compute server with the ID 'ed40398a-ec8a-422b-867a-61493ee8a57f' timed out after 60 seconds. Request failed with status code ${sessionCreationStatus}` + + jest.spyOn(requestClient, 'post').mockImplementation(() => + Promise.reject({ + response: { + status: sessionCreationStatus, + data: { + message: sessionCreationError + } + } + }) + ) + + const contextId = 'testContextId' + const context: Context = { + name: 'testContext', + id: contextId, + createdBy: 'createdBy', + version: 1 + } + + sessionManager['currentContext'] = context + + const expectedError = new Error( + `Error while creating session. POST request to ${process.env.SERVER_URL}/compute/contexts/${contextId}/sessions failed with status code ${sessionCreationStatus}. ${sessionCreationError}` + ) + + await expect(sessionManager.getSession()).rejects.toEqual(expectedError) + } + ) + }) + + describe('clearSession', () => { + it('should clear session', async () => { + jest + .spyOn(requestClient, 'delete') + .mockImplementation(() => + Promise.resolve({ result: '', etag: '', status: 200 }) + ) + + const sessionToBeCleared = getMockSession() + const sessionToStay = getMockSession() + + sessionManager['sessions'] = [sessionToBeCleared, sessionToStay] + + await sessionManager.clearSession(sessionToBeCleared.id) + + expect(sessionManager['sessions']).toEqual([sessionToStay]) + }) + + it('should throw error if DELETE request failed', async () => { + const sessionCreationStatus = 500 + const sessionDeleteError = `The process timed out after 60 seconds. Request failed with status code ${sessionCreationStatus}` + + jest.spyOn(requestClient, 'delete').mockImplementation(() => + Promise.reject({ + response: { + status: sessionCreationStatus, + data: { + message: sessionDeleteError + } + } + }) + ) + + const session = getMockSession() + + sessionManager['sessions'] = [session] + + const expectedError = `Error while deleting session. DELETE request to /compute/sessions/${session.id} failed with status code ${sessionCreationStatus}. ${sessionDeleteError}` + + await expect(sessionManager.clearSession(session.id)).rejects.toEqual( + expectedError + ) + }) + }) + + describe('waitForCurrentContext', () => { + it('should resolve when current context is ready', async () => { + sessionManager['settingContext'] = true + sessionManager['contextName'] = 'test context' + + await expect(sessionManager['waitForCurrentContext']()).toResolve() + expect(sessionManager['settingContext']).toEqual(false) + }) + }) + + describe('setCurrentContext', () => { + it('should set current context', async () => { + const contextName = 'test context' + const testContext: Context = { + name: contextName, + id: 'string', + createdBy: 'string', + version: 1 + } + + jest.spyOn(requestClient, 'get').mockImplementation(() => { + return Promise.resolve({ + result: { + items: [testContext] + }, + etag: '', + status: 200 + }) + }) + + sessionManager['currentContext'] = null + sessionManager['contextName'] = contextName + sessionManager['settingContext'] = false + + await expect(sessionManager['setCurrentContext']()).toResolve() + expect(sessionManager['currentContext']).toEqual(testContext) + }) + + it('should throw error if GET request failed', async () => { + const responseStatus = 500 + const responseErrorMessage = `The process timed out after 60 seconds. Request failed with status code ${responseStatus}` + const response = { + status: responseStatus, + data: { + message: responseErrorMessage + } + } + + jest.spyOn(requestClient, 'get').mockImplementation(() => + Promise.reject({ + response + }) + ) + + const expectedError = `Error while getting list of contexts. GET request to ${process.env.SERVER_URL}/compute/contexts?limit=10000 failed with status code ${responseStatus}. ${responseErrorMessage}` + + sessionManager['currentContext'] = null + + await expect(sessionManager['setCurrentContext']()).rejects.toEqual( + expectedError + ) + }) + + it('should throw an error if current context is not in the list of contexts', async () => { + const contextName = 'test context' + const testContext: Context = { + name: `${contextName} does not exist`, + id: 'string', + createdBy: 'string', + version: 1 + } + + jest.spyOn(requestClient, 'get').mockImplementation(() => { + return Promise.resolve({ + result: { + items: [testContext] + }, + etag: '', + status: 200 + }) + }) + + sessionManager['currentContext'] = null + sessionManager['contextName'] = contextName + sessionManager['settingContext'] = false + + const expectedError = new Error( + `The context '${contextName}' was not found on the server ${process.env.SERVER_URL}.` + ) + + await expect(sessionManager['setCurrentContext']()).rejects.toEqual( + expectedError + ) + }) + }) })