1
0
mirror of https://github.com/sasjs/adapter.git synced 2026-01-12 06:40:06 +00:00

feat(job-state): added session state check to doPoll func

This commit is contained in:
Yury Shkoda
2023-09-11 11:17:05 +03:00
parent 0359fcb6be
commit 3a186bc55c
9 changed files with 423 additions and 53 deletions

View File

@@ -170,16 +170,21 @@ export async function executeOnComputeApi(
postedJob,
debug,
authConfig,
pollOptions
pollOptions,
{
session,
sessionManager
}
).catch(async (err) => {
const error = err?.response?.data
const result = /err=[0-9]*,/.exec(error)
const errorCode = '5113'
if (result?.[0]?.slice(4, -1) === errorCode) {
const logCount = 1000000
const sessionLogUrl =
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
const logCount = 1000000
err.log = await fetchLogByChunks(
requestClient,
access_token!,
@@ -187,6 +192,7 @@ export async function executeOnComputeApi(
logCount
)
}
throw prefixMessage(err, 'Error while polling job status. ')
})
@@ -205,12 +211,12 @@ export async function executeOnComputeApi(
let jobResult
let log = ''
const logLink = currentJob.links.find((l) => l.rel === 'log')
if (debug && logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
requestClient,
access_token!,
@@ -223,9 +229,7 @@ export async function executeOnComputeApi(
throw new ComputeJobExecutionError(currentJob, log)
}
if (!expectWebout) {
return { job: currentJob, log }
}
if (!expectWebout) return { job: currentJob, log }
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
@@ -236,6 +240,7 @@ export async function executeOnComputeApi(
if (logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
requestClient,
access_token!,

View File

@@ -3,7 +3,7 @@ import { Job, PollOptions, PollStrategy } from '../..'
import { getTokens } from '../../auth/getTokens'
import { RequestClient } from '../../request/RequestClient'
import { JobStatePollError } from '../../types/errors'
import { Link, WriteStream } from '../../types'
import { Link, WriteStream, SessionState, JobSessionManager } from '../../types'
import { delay, isNode } from '../../utils'
export enum JobState {
@@ -37,6 +37,7 @@ export enum JobState {
* { maxPollCount: 500, pollInterval: 30000 }, // approximately ~50.5 mins (including time to get response (~300ms))
* { maxPollCount: 3400, pollInterval: 60000 } // approximately ~3015 mins (~125 hours) (including time to get response (~300ms))
* ]
* @param jobSessionManager - job session object containing session object and an instance of Session Manager. Job session object is used to periodically (every 10th job state poll) check parent session state.
* @returns - a promise which resolves with a job state
*/
export async function pollJobState(
@@ -44,7 +45,8 @@ export async function pollJobState(
postedJob: Job,
debug: boolean,
authConfig?: AuthConfig,
pollOptions?: PollOptions
pollOptions?: PollOptions,
jobSessionManager?: JobSessionManager
): Promise<JobState> {
const logger = process.logger || console
@@ -127,7 +129,8 @@ export async function pollJobState(
pollOptions,
authConfig,
streamLog,
logFileStream
logFileStream,
jobSessionManager
)
currentState = result.state
@@ -158,7 +161,8 @@ export async function pollJobState(
defaultPollOptions,
authConfig,
streamLog,
logFileStream
logFileStream,
jobSessionManager
)
currentState = result.state
@@ -208,7 +212,21 @@ const needsRetry = (state: string) =>
state === JobState.Pending ||
state === JobState.Unavailable
const doPoll = async (
/**
* Polls job state.
* @param requestClient - the pre-configured HTTP request client.
* @param postedJob - the relative or absolute path to the job.
* @param currentState - current job state.
* @param debug - sets the _debug flag in the job arguments.
* @param pollCount - current poll count.
* @param pollOptions - an object containing maxPollCount, pollInterval, streamLog and logFolderPath.
* @param authConfig - an access token, refresh token, client and secret for an authorized user.
* @param streamLog - indicates if job log should be streamed.
* @param logStream - job log stream.
* @param jobSessionManager - job session object containing session object and an instance of Session Manager. Job session object is used to periodically (every 10th job state poll) check parent session state.
* @returns - a promise which resolves with a job state
*/
export const doPoll = async (
requestClient: RequestClient,
postedJob: Job,
currentState: JobState,
@@ -217,7 +235,8 @@ const doPoll = async (
pollOptions: PollOptions,
authConfig?: AuthConfig,
streamLog?: boolean,
logStream?: WriteStream
logStream?: WriteStream,
jobSessionManager?: JobSessionManager
): Promise<{ state: JobState; pollCount: number }> => {
const { maxPollCount, pollInterval } = pollOptions
const logger = process.logger || console
@@ -229,6 +248,35 @@ const doPoll = async (
let startLogLine = 0
while (needsRetry(state) && pollCount <= maxPollCount) {
// Check parent session state on every 10th job state poll.
if (jobSessionManager && pollCount && pollCount % 10 === 0 && authConfig) {
const { session, sessionManager } = jobSessionManager
const { stateUrl, etag, id: sessionId } = session
const { access_token } = authConfig
const { id: jobId } = postedJob
// Get session state.
const { result: sessionState, responseStatus } = await sessionManager[
'getSessionState'
](stateUrl, etag, access_token).catch((err) => {
// Handle error while getting session state.
throw new JobStatePollError(jobId, err)
})
// Clear parent session and throw an error if session state is not
// 'running' or response status is not 200.
if (sessionState !== SessionState.Running || responseStatus !== 200) {
sessionManager.clearSession(sessionId, access_token)
const sessionError =
sessionState !== SessionState.Running
? `Session state of the job is not 'running'. Session state is '${sessionState}'`
: `Session response status is not 200. Session response status is ${responseStatus}.`
throw new JobStatePollError(jobId, new Error(sessionError))
}
}
state = await getJobState(
requestClient,
postedJob,

View File

@@ -7,7 +7,7 @@ import * as uploadTablesModule from '../uploadTables'
import * as getTokensModule from '../../../auth/getTokens'
import * as formatDataModule from '../../../utils/formatDataForRequest'
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
import { PollOptions } from '../../../types'
import { PollOptions, JobSessionManager } from '../../../types'
import { ComputeJobExecutionError, NotFoundError } from '../../../types/errors'
import { Logger, LogLevel } from '@sasjs/utils/logger'
@@ -308,6 +308,11 @@ describe('executeScript', () => {
})
it('should poll for job completion when waitForResult is true', async () => {
const jobSessionManager: JobSessionManager = {
session: mockSession,
sessionManager: sessionManager
}
await executeOnComputeApi(
requestClient,
sessionManager,
@@ -329,7 +334,8 @@ describe('executeScript', () => {
mockJob,
false,
mockAuthConfig,
defaultPollOptions
defaultPollOptions,
jobSessionManager
)
})

View File

@@ -1,14 +1,16 @@
import { AuthConfig } from '@sasjs/utils/types'
import { Job, Session } from '../../../types'
import { Job, Session, SessionState } from '../../../types'
export const mockSession: Session = {
id: 's35510n',
state: 'idle',
state: SessionState.Idle,
stateUrl: '',
links: [],
attributes: {
sessionInactiveTimeout: 1
},
creationTimeStamp: new Date().valueOf().toString()
creationTimeStamp: new Date().valueOf().toString(),
etag: 'etag-string'
}
export const mockJob: Job = {

View File

@@ -1,17 +1,25 @@
import { Logger, LogLevel } from '@sasjs/utils/logger'
import { RequestClient } from '../../../request/RequestClient'
import { mockAuthConfig, mockJob } from './mockResponses'
import { pollJobState } from '../pollJobState'
import { pollJobState, doPoll, JobState } from '../pollJobState'
import * as getTokensModule from '../../../auth/getTokens'
import * as saveLogModule from '../saveLog'
import * as getFileStreamModule from '../getFileStream'
import * as isNodeModule from '../../../utils/isNode'
import * as delayModule from '../../../utils/delay'
import { PollOptions, PollStrategy } from '../../../types'
import {
PollOptions,
PollStrategy,
SessionState,
JobSessionManager
} from '../../../types'
import { WriteStream } from 'fs'
import { SessionManager } from '../../../SessionManager'
import { JobStatePollError } from '../../../types'
const baseUrl = 'http://localhost'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
const sessionManager = new (<jest.Mock<SessionManager>>SessionManager)()
requestClient['httpClient'].defaults.baseURL = baseUrl
const defaultStreamLog = false
@@ -423,6 +431,218 @@ describe('pollJobState', () => {
})
})
describe('doPoll', () => {
const sessionStateLink = '/compute/sessions/session-id-ses0000/state'
const jobSessionManager: JobSessionManager = {
sessionManager,
session: {
id: ['id', new Date().getTime(), Math.random()].join('-'),
state: SessionState.NoState,
links: [
{
href: sessionStateLink,
method: 'GET',
rel: 'state',
type: 'text/plain',
uri: sessionStateLink
}
],
attributes: {
sessionInactiveTimeout: 900
},
creationTimeStamp: `${new Date(new Date().getTime()).toISOString()}`,
stateUrl: '',
etag: ''
}
}
beforeEach(() => {
setupMocks()
})
it('should check session state on every 10th job state poll', async () => {
const mockedGetSessionState = jest
.spyOn(sessionManager as any, 'getSessionState')
.mockImplementation(() => {
return Promise.resolve({
result: SessionState.Running,
responseStatus: 200
})
})
let getSessionStateCount = 0
jest.spyOn(requestClient, 'get').mockImplementation(() => {
getSessionStateCount++
return Promise.resolve({
result:
getSessionStateCount < 20 ? JobState.Running : JobState.Completed,
etag: 'etag-string',
status: 200
})
})
await doPoll(
requestClient,
mockJob,
JobState.Running,
false,
1,
defaultPollStrategy,
mockAuthConfig,
undefined,
undefined,
jobSessionManager
)
expect(mockedGetSessionState).toHaveBeenCalledTimes(2)
})
it('should handle error while checking session state', async () => {
const sessionStateError = 'Error while getting session state.'
jest
.spyOn(sessionManager as any, 'getSessionState')
.mockImplementation(() => {
return Promise.reject(sessionStateError)
})
jest.spyOn(requestClient, 'get').mockImplementation(() => {
return Promise.resolve({
result: JobState.Running,
etag: 'etag-string',
status: 200
})
})
await expect(
doPoll(
requestClient,
mockJob,
JobState.Running,
false,
1,
defaultPollStrategy,
mockAuthConfig,
undefined,
undefined,
jobSessionManager
)
).rejects.toEqual(
new JobStatePollError(mockJob.id, new Error(sessionStateError))
)
})
it('should throw an error if session is not in running state', async () => {
const filteredSessionStates = Object.values(SessionState).filter(
(state) => state !== SessionState.Running
)
const randomSessionState =
filteredSessionStates[
Math.floor(Math.random() * filteredSessionStates.length)
]
jest
.spyOn(sessionManager as any, 'getSessionState')
.mockImplementation(() => {
return Promise.resolve({
result: randomSessionState,
responseStatus: 200
})
})
jest.spyOn(requestClient, 'get').mockImplementation(() => {
return Promise.resolve({
result: JobState.Running,
etag: 'etag-string',
status: 200
})
})
const mockedClearSession = jest
.spyOn(sessionManager, 'clearSession')
.mockImplementation(() => Promise.resolve())
await expect(
doPoll(
requestClient,
mockJob,
JobState.Running,
false,
1,
defaultPollStrategy,
mockAuthConfig,
undefined,
undefined,
jobSessionManager
)
).rejects.toEqual(
new JobStatePollError(
mockJob.id,
new Error(
`Session state of the job is not 'running'. Session state is '${randomSessionState}'`
)
)
)
expect(mockedClearSession).toHaveBeenCalledWith(
jobSessionManager.session.id,
mockAuthConfig.access_token
)
})
it('should handle throw an error if response status of session state is not 200', async () => {
const sessionStateResponseStatus = 500
jest
.spyOn(sessionManager as any, 'getSessionState')
.mockImplementation(() => {
return Promise.resolve({
result: SessionState.Running,
responseStatus: sessionStateResponseStatus
})
})
jest.spyOn(requestClient, 'get').mockImplementation(() => {
return Promise.resolve({
result: JobState.Running,
etag: 'etag-string',
status: 200
})
})
const mockedClearSession = jest
.spyOn(sessionManager, 'clearSession')
.mockImplementation(() => Promise.resolve())
await expect(
doPoll(
requestClient,
mockJob,
JobState.Running,
false,
1,
defaultPollStrategy,
mockAuthConfig,
undefined,
undefined,
jobSessionManager
)
).rejects.toEqual(
new JobStatePollError(
mockJob.id,
new Error(
`Session response status is not 200. Session response status is ${sessionStateResponseStatus}.`
)
)
)
expect(mockedClearSession).toHaveBeenCalledWith(
jobSessionManager.session.id,
mockAuthConfig.access_token
)
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../../request/RequestClient')