1
0
mirror of https://github.com/sasjs/adapter.git synced 2025-12-11 01:14:36 +00:00

Merge pull request #793 from sasjs/session-manager-improvements

Session manager improvements
This commit is contained in:
Allan Bowe
2023-04-19 15:08:42 +01:00
committed by GitHub
5 changed files with 573 additions and 76 deletions

View File

@@ -153,6 +153,14 @@ The response object will contain returned tables and columns. Table names are a
The adapter will also cache the logs (if debug enabled) and even the work tables. For performance, it is best to keep debug mode off.
### Session Manager
To execute a script on Viya a session has to be created first which is time-consuming (~15sec). That is why a Session Manager has been created which is implementing the following logic:
1. When the first session is requested, we also create one more session (hot session) for future requests. Please notice two pending POST requests to create a session within the same context: ![the first session request](./screenshots/session-manager-first-request.png)
2. When a subsequent request for a session is received and there is a hot session available (not expired), this session is returned and an asynchronous request to create another hot session is sent. Please notice that there is a pending POST request to create a new session while a job has been already finished execution (POST request with status 201): ![subsequent session request](./screenshots/subsequent-session-request.png)
3. When a subsequent request for a session is received and there is no available hot session, 2 requests are sent asynchronously to create a session. The first created session will be returned and another session will be reserved for future requests.
### Variable Types
The SAS type (char/numeric) of the values is determined according to a set of rules:

Binary file not shown.

After

Width:  |  Height:  |  Size: 164 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 212 KiB

View File

@@ -6,6 +6,10 @@ import { RequestClient } from './request/RequestClient'
const MAX_SESSION_COUNT = 1
interface ApiErrorResponse {
response: { status: number | string; data: { message: string } }
}
export class SessionManager {
private loggedErrors: NoSessionStateError[] = []
@@ -17,8 +21,10 @@ export class SessionManager {
if (serverUrl) isUrl(serverUrl)
}
// INFO: session pool
private sessions: Session[] = []
private currentContext: Context | null = null
private settingContext: boolean = false
private _debug: boolean = false
private printedSessionState = {
printed: false,
@@ -33,71 +39,230 @@ export class SessionManager {
this._debug = value
}
async getSession(accessToken?: string) {
await this.createSessions(accessToken)
await this.createAndWaitForSession(accessToken)
const session = this.sessions.pop()
/**
* Checks if session is valid. Session is considered valid if time since it's creation is less than 'sessionInactiveTimeout' attribute.
* @param session - session object.
* @returns - boolean indicating if session is valid.
*/
private isSessionValid(session: Session): boolean {
if (!session) return false
const secondsSinceSessionCreation =
(new Date().getTime() - new Date(session!.creationTimeStamp).getTime()) /
(new Date().getTime() - new Date(session.creationTimeStamp).getTime()) /
1000
if (
!session!.attributes ||
secondsSinceSessionCreation >= session!.attributes.sessionInactiveTimeout
) {
await this.createSessions(accessToken)
const freshSession = this.sessions.pop()
return freshSession
return false
} else {
return true
}
return session
}
async clearSession(id: string, accessToken?: string) {
/**
* Removes session from pool of hot sessions.
* @param session - session object.
* @returns - void.
*/
private removeSessionFromPool(session: Session): void {
this.sessions = this.sessions.filter((ses) => ses.id !== session.id)
}
/**
* Filters session pool to keep only valid sessions.
* @param session - session object.
* @returns - void.
*/
private removeExpiredSessions(): void {
this.sessions = this.sessions.filter((session) =>
this.isSessionValid(session)
)
}
/**
* Throws set of errors as a single error.
* @param errors - array of errors or string.
* @param prefix - an optional final error prefix.
* @returns - never.
*/
private throwErrors(errors: (Error | string)[], prefix?: string): never {
throw prefix
? prefixMessage(new Error(errors.join('. ')), prefix)
: new Error(
errors
.map((err) =>
(err as Error).message ? (err as Error).message : err
)
.join('. ')
)
}
/**
* Returns session.
* If there is a hot session available, it will be returned immediately and an asynchronous request to create new hot session will be submitted.
* If there is no available session, 2 session creation requests will be submitted. The session is returned once it is created and ready.
* @param accessToken - an optional access token.
* @returns - a promise which resolves with a session.
*/
async getSession(accessToken?: string) {
const errors: (Error | string)[] = []
let isErrorThrown = false
const throwIfError = () => {
if (errors.length && !isErrorThrown) {
isErrorThrown = true
this.throwErrors(errors)
}
}
this.removeExpiredSessions()
if (this.sessions.length) {
const session = this.sessions[0]
this.removeSessionFromPool(session)
this.createSessions(accessToken).catch((err) => {
errors.push(err)
})
this.createAndWaitForSession(accessToken).catch((err) => {
errors.push(err)
})
throwIfError()
return session
} else {
this.createSessions(accessToken).catch((err) => {
errors.push(err)
})
await this.createAndWaitForSession(accessToken).catch((err) => {
errors.push(err)
})
this.removeExpiredSessions()
const session = this.sessions.pop()!
this.removeSessionFromPool(session)
throwIfError()
return session
}
}
/**
* Returns error message based on the response from SAS API.
* @param err - an optional access token.
* @param accessToken - an optional access token.
* @returns - an error message.
*/
private getErrorMessage(
err: ApiErrorResponse,
url: string,
method: 'GET' | 'POST' | 'DELETE'
) {
return (
`${method} request to ${url} failed with status code ${
err.response.status || 'unknown'
}. ` + err.response.data.message || ''
)
}
/**
* Deletes session.
* @param id - a session id.
* @param accessToken - an optional access token.
* @returns - a promise which resolves when session is deleted.
*/
async clearSession(id: string, accessToken?: string): Promise<void> {
const url = `/compute/sessions/${id}`
return await this.requestClient
.delete<Session>(`/compute/sessions/${id}`, accessToken)
.delete<Session>(url, accessToken)
.then(() => {
this.sessions = this.sessions.filter((s) => s.id !== id)
})
.catch((err) => {
throw prefixMessage(err, 'Error while deleting session. ')
.catch((err: ApiErrorResponse) => {
throw prefixMessage(
this.getErrorMessage(err, url, 'DELETE'),
'Error while deleting session. '
)
})
}
private async createSessions(accessToken?: string) {
/**
* Creates sessions in amount equal to MAX_SESSION_COUNT.
* @param accessToken - an optional access token.
* @returns - a promise which resolves when required amount of sessions is created.
*/
private async createSessions(accessToken?: string): Promise<void> {
const errors: (Error | string)[] = []
if (!this.sessions.length) {
if (!this.currentContext) {
await this.setCurrentContext(accessToken).catch((err) => {
throw err
})
}
await asyncForEach(new Array(MAX_SESSION_COUNT), async () => {
const createdSession = await this.createAndWaitForSession(
accessToken
).catch((err) => {
throw err
await this.createAndWaitForSession(accessToken).catch((err) => {
errors.push(err)
})
this.sessions.push(createdSession)
}).catch((err) => {
throw err
})
}
if (errors.length) {
this.throwErrors(errors, 'Error while creating session. ')
}
}
private async createAndWaitForSession(accessToken?: string) {
/**
* Waits for the current context to be set.
* @returns - a promise which resolves when current context is set.
*/
private async waitForCurrentContext(): Promise<void> {
return new Promise((resolve) => {
const timer = setInterval(() => {
if (this.currentContext) {
this.settingContext = false
clearInterval(timer)
resolve()
}
}, 100)
})
}
/**
* Creates and waits for session to be ready.
* @param accessToken - an optional access token.
* @returns - a promise which resolves with a session.
*/
private async createAndWaitForSession(
accessToken?: string
): Promise<Session> {
if (!this.currentContext) {
if (!this.settingContext) {
await this.setCurrentContext(accessToken)
} else {
await this.waitForCurrentContext()
}
}
const url = `${this.serverUrl}/compute/contexts/${
this.currentContext!.id
}/sessions`
const { result: createdSession, etag } = await this.requestClient
.post<Session>(
`${this.serverUrl}/compute/contexts/${
this.currentContext!.id
}/sessions`,
{},
accessToken
)
.catch((err) => {
throw err
.post<Session>(url, {}, accessToken)
.catch((err: ApiErrorResponse) => {
throw prefixMessage(
this.getErrorMessage(err, url, 'POST'),
`Error while creating session. `
)
})
await this.waitForSession(createdSession, etag, accessToken)
@@ -107,14 +272,26 @@ export class SessionManager {
return createdSession
}
private async setCurrentContext(accessToken?: string) {
/**
* Sets current context.
* @param accessToken - an optional access token.
* @returns - a promise which resolves when current context is set.
*/
private async setCurrentContext(accessToken?: string): Promise<void> {
if (!this.currentContext) {
const url = `${this.serverUrl}/compute/contexts?limit=10000`
this.settingContext = true
const { result: contexts } = await this.requestClient
.get<{
items: Context[]
}>(`${this.serverUrl}/compute/contexts?limit=10000`, accessToken)
.catch((err) => {
throw err
}>(url, accessToken)
.catch((err: ApiErrorResponse) => {
throw prefixMessage(
this.getErrorMessage(err, url, 'GET'),
`Error while getting list of contexts. `
)
})
const contextsList =
@@ -138,18 +315,13 @@ export class SessionManager {
}
}
private getHeaders(accessToken?: string) {
const headers: any = {
'Content-Type': 'application/json'
}
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`
}
return headers
}
/**
* Waits for session to be ready.
* @param session - a session object.
* @param etag - an etag that can be a string or null.
* @param accessToken - an optional access token.
* @returns - a promise which resolves with a session state.
*/
private async waitForSession(
session: Session,
etag: string | null,
@@ -173,13 +345,11 @@ export class SessionManager {
this.printedSessionState.printed = true
}
const url = `${this.serverUrl}${stateLink.href}?wait=30`
const { result: state, responseStatus: responseStatus } =
await this.getSessionState(
`${this.serverUrl}${stateLink.href}?wait=30`,
etag!,
accessToken
).catch((err) => {
throw prefixMessage(err, 'Error while getting session state.')
await this.getSessionState(url, etag!, accessToken).catch((err) => {
throw prefixMessage(err, 'Error while waiting for session. ')
})
sessionState = state.trim()
@@ -216,7 +386,7 @@ export class SessionManager {
return sessionState
} else {
throw 'Error while getting session state link.'
throw 'Error while getting session state link. '
}
} else {
this.loggedErrors = []
@@ -225,11 +395,21 @@ export class SessionManager {
}
}
/**
* Gets session state.
* @param url - a URL to get session state.
* @param etag - an etag string.
* @param accessToken - an optional access token.
* @returns - a promise which resolves with a result string and response status.
*/
private async getSessionState(
url: string,
etag: string,
accessToken?: string
) {
): Promise<{
result: string
responseStatus: number
}> {
return await this.requestClient
.get(url, accessToken, 'text/plain', { 'If-None-Match': etag })
.then((res) => ({
@@ -237,20 +417,37 @@ export class SessionManager {
responseStatus: res.status
}))
.catch((err) => {
throw err
throw prefixMessage(
this.getErrorMessage(err, url, 'GET'),
'Error while getting session state. '
)
})
}
async getVariable(sessionId: string, variable: string, accessToken?: string) {
/**
* Gets variable.
* @param sessionId - a session id.
* @param variable - a variable string.
* @param accessToken - an optional access token.
* @returns - a promise which resolves with a result that confirms to SessionVariable interface, etag string and status code.
*/
async getVariable(
sessionId: string,
variable: string,
accessToken?: string
): Promise<{
result: SessionVariable
etag: string
status: number
}> {
const url = `${this.serverUrl}/compute/sessions/${sessionId}/variables/${variable}`
return await this.requestClient
.get<SessionVariable>(
`${this.serverUrl}/compute/sessions/${sessionId}/variables/${variable}`,
accessToken
)
.get<SessionVariable>(url, accessToken)
.catch((err) => {
throw prefixMessage(
err,
`Error while fetching session variable '${variable}'.`
this.getErrorMessage(err, url, 'GET'),
`Error while fetching session variable '${variable}'. `
)
})
}

View File

@@ -3,10 +3,11 @@ import { RequestClient } from '../request/RequestClient'
import * as dotenv from 'dotenv'
import axios from 'axios'
import { Logger, LogLevel } from '@sasjs/utils'
import { Session } from '../types'
import { Session, Context } from '../types'
jest.mock('axios')
const mockedAxios = axios as jest.Mocked<typeof axios>
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
describe('SessionManager', () => {
dotenv.config()
@@ -14,9 +15,23 @@ describe('SessionManager', () => {
const sessionManager = new SessionManager(
process.env.SERVER_URL as string,
process.env.DEFAULT_COMPUTE_CONTEXT as string,
new RequestClient('https://sample.server.com')
requestClient
)
const getMockSession = () => ({
id: ['id', new Date().getTime(), Math.random()].join('-'),
state: '',
links: [{ rel: 'state', href: '', uri: '', type: '', method: 'GET' }],
attributes: {
sessionInactiveTimeout: 900
},
creationTimeStamp: `${new Date(new Date().getTime()).toISOString()}`
})
afterEach(() => {
jest.restoreAllMocks()
})
describe('getVariable', () => {
it('should fetch session variable', async () => {
const sampleResponse = {
@@ -45,6 +60,30 @@ describe('SessionManager', () => {
)
).resolves.toEqual(expectedResponse)
})
it('should throw an error if GET request failed', async () => {
const responseStatus = 500
const responseErrorMessage = `The process timed out after 60 seconds. Request failed with status code ${responseStatus}`
const response = {
status: responseStatus,
data: {
message: responseErrorMessage
}
}
const testVariable = 'testVariable'
jest.spyOn(requestClient, 'get').mockImplementation(() =>
Promise.reject({
response
})
)
const expectedError = `Error while fetching session variable '${testVariable}'. GET request to ${process.env.SERVER_URL}/compute/sessions/testId/variables/${testVariable} failed with status code ${responseStatus}. ${responseErrorMessage}`
await expect(
sessionManager.getVariable('testId', testVariable)
).rejects.toEqual(expectedError)
})
})
describe('waitForSession', () => {
@@ -115,11 +154,25 @@ describe('SessionManager', () => {
})
it('should throw an error if could not get session state', async () => {
mockedAxios.get.mockImplementation(() => Promise.reject('Mocked error'))
const gettingSessionStatus = 500
const sessionStatusError = `Getting session status timed out after 60 seconds. Request failed with status code ${gettingSessionStatus}`
mockedAxios.get.mockImplementation(() =>
Promise.reject({
response: {
status: gettingSessionStatus,
data: {
message: sessionStatusError
}
}
})
)
const expectedError = `Error while waiting for session. Error while getting session state. GET request to ${process.env.SERVER_URL}?wait=30 failed with status code ${gettingSessionStatus}. ${sessionStatusError}`
await expect(
sessionManager['waitForSession'](session, null, 'access_token')
).rejects.toContain('Error while getting session state.')
).rejects.toEqual(expectedError)
})
it('should return session state', async () => {
@@ -135,4 +188,243 @@ describe('SessionManager', () => {
).resolves.toEqual(customSession.state)
})
})
describe('isSessionValid', () => {
const session: Session = getMockSession()
it('should return false if not a session provided', () => {
expect(sessionManager['isSessionValid'](undefined as any)).toEqual(false)
})
it('should return true if session is not expired', () => {
expect(sessionManager['isSessionValid'](session)).toEqual(true)
})
it('should return false if session is expired', () => {
session.creationTimeStamp = `${new Date(
new Date().getTime() -
(session.attributes.sessionInactiveTimeout * 1000 + 1000)
).toISOString()}`
expect(sessionManager['isSessionValid'](session)).toEqual(false)
})
})
describe('removeSessionFromPool', () => {
it('should remove session from the pool of sessions', () => {
const session: Session = getMockSession()
const sessions: Session[] = [getMockSession(), session]
sessionManager['sessions'] = sessions
sessionManager['removeSessionFromPool'](session)
expect(sessionManager['sessions'].length).toEqual(1)
})
})
describe('getSession', () => {
it('should return session if there is a valid session and create new session', async () => {
jest
.spyOn(sessionManager as any, 'createAndWaitForSession')
.mockImplementation(async () => Promise.resolve(getMockSession()))
const session = getMockSession()
sessionManager['sessions'] = [session]
await expect(sessionManager.getSession()).resolves.toEqual(session)
expect(sessionManager['createAndWaitForSession']).toHaveBeenCalled()
})
it('should return a session and keep one session if there is no sessions available', async () => {
jest
.spyOn(sessionManager as any, 'createAndWaitForSession')
.mockImplementation(async () => {
const session = getMockSession()
sessionManager['sessions'].push(session)
return Promise.resolve(session)
})
const session = await sessionManager.getSession()
expect(Object.keys(session)).toEqual(Object.keys(getMockSession()))
expect(sessionManager['createAndWaitForSession']).toHaveBeenCalledTimes(2)
expect(sessionManager['sessions'].length).toEqual(1)
})
it.concurrent(
'should throw an error if session creation request returned 500',
async () => {
const sessionCreationStatus = 500
const sessionCreationError = `The process initialization for the Compute server with the ID 'ed40398a-ec8a-422b-867a-61493ee8a57f' timed out after 60 seconds. Request failed with status code ${sessionCreationStatus}`
jest.spyOn(requestClient, 'post').mockImplementation(() =>
Promise.reject({
response: {
status: sessionCreationStatus,
data: {
message: sessionCreationError
}
}
})
)
const contextId = 'testContextId'
const context: Context = {
name: 'testContext',
id: contextId,
createdBy: 'createdBy',
version: 1
}
sessionManager['currentContext'] = context
const expectedError = new Error(
`Error while creating session. POST request to ${process.env.SERVER_URL}/compute/contexts/${contextId}/sessions failed with status code ${sessionCreationStatus}. ${sessionCreationError}`
)
await expect(sessionManager.getSession()).rejects.toEqual(expectedError)
}
)
})
describe('clearSession', () => {
it('should clear session', async () => {
jest
.spyOn(requestClient, 'delete')
.mockImplementation(() =>
Promise.resolve({ result: '', etag: '', status: 200 })
)
const sessionToBeCleared = getMockSession()
const sessionToStay = getMockSession()
sessionManager['sessions'] = [sessionToBeCleared, sessionToStay]
await sessionManager.clearSession(sessionToBeCleared.id)
expect(sessionManager['sessions']).toEqual([sessionToStay])
})
it('should throw error if DELETE request failed', async () => {
const sessionCreationStatus = 500
const sessionDeleteError = `The process timed out after 60 seconds. Request failed with status code ${sessionCreationStatus}`
jest.spyOn(requestClient, 'delete').mockImplementation(() =>
Promise.reject({
response: {
status: sessionCreationStatus,
data: {
message: sessionDeleteError
}
}
})
)
const session = getMockSession()
sessionManager['sessions'] = [session]
const expectedError = `Error while deleting session. DELETE request to /compute/sessions/${session.id} failed with status code ${sessionCreationStatus}. ${sessionDeleteError}`
await expect(sessionManager.clearSession(session.id)).rejects.toEqual(
expectedError
)
})
})
describe('waitForCurrentContext', () => {
it('should resolve when current context is ready', async () => {
sessionManager['settingContext'] = true
sessionManager['contextName'] = 'test context'
await expect(sessionManager['waitForCurrentContext']()).toResolve()
expect(sessionManager['settingContext']).toEqual(false)
})
})
describe('setCurrentContext', () => {
it('should set current context', async () => {
const contextName = 'test context'
const testContext: Context = {
name: contextName,
id: 'string',
createdBy: 'string',
version: 1
}
jest.spyOn(requestClient, 'get').mockImplementation(() => {
return Promise.resolve({
result: {
items: [testContext]
},
etag: '',
status: 200
})
})
sessionManager['currentContext'] = null
sessionManager['contextName'] = contextName
sessionManager['settingContext'] = false
await expect(sessionManager['setCurrentContext']()).toResolve()
expect(sessionManager['currentContext']).toEqual(testContext)
})
it('should throw error if GET request failed', async () => {
const responseStatus = 500
const responseErrorMessage = `The process timed out after 60 seconds. Request failed with status code ${responseStatus}`
const response = {
status: responseStatus,
data: {
message: responseErrorMessage
}
}
jest.spyOn(requestClient, 'get').mockImplementation(() =>
Promise.reject({
response
})
)
const expectedError = `Error while getting list of contexts. GET request to ${process.env.SERVER_URL}/compute/contexts?limit=10000 failed with status code ${responseStatus}. ${responseErrorMessage}`
sessionManager['currentContext'] = null
await expect(sessionManager['setCurrentContext']()).rejects.toEqual(
expectedError
)
})
it('should throw an error if current context is not in the list of contexts', async () => {
const contextName = 'test context'
const testContext: Context = {
name: `${contextName} does not exist`,
id: 'string',
createdBy: 'string',
version: 1
}
jest.spyOn(requestClient, 'get').mockImplementation(() => {
return Promise.resolve({
result: {
items: [testContext]
},
etag: '',
status: 200
})
})
sessionManager['currentContext'] = null
sessionManager['contextName'] = contextName
sessionManager['settingContext'] = false
const expectedError = new Error(
`The context '${contextName}' was not found on the server ${process.env.SERVER_URL}.`
)
await expect(sessionManager['setCurrentContext']()).rejects.toEqual(
expectedError
)
})
})
})