1
0
mirror of https://github.com/sasjs/adapter.git synced 2025-12-11 01:14:36 +00:00

feat(poll-job-state): implemented polling strategies

This commit is contained in:
Yury Shkoda
2023-05-15 14:23:20 +03:00
parent bd872e0e75
commit 615c9d012e
4 changed files with 292 additions and 90 deletions

View File

@@ -12,7 +12,7 @@ import { RequestClient } from '../../request/RequestClient'
import { SessionManager } from '../../SessionManager'
import { isRelativePath, fetchLogByChunks } from '../../utils'
import { formatDataForRequest } from '../../utils/formatDataForRequest'
import { pollJobState } from './pollJobState'
import { pollJobState, JobState } from './pollJobState'
import { uploadTables } from './uploadTables'
/**
@@ -228,7 +228,7 @@ export async function executeScript(
)
}
if (jobStatus === 'failed' || jobStatus === 'error') {
if (jobStatus === JobState.Failed || jobStatus === JobState.Error) {
throw new ComputeJobExecutionError(currentJob, log)
}

View File

@@ -6,23 +6,39 @@ import { JobStatePollError } from '../../types/errors'
import { Link, WriteStream } from '../../types'
import { delay, isNode } from '../../utils'
export enum JobState {
Completed = 'completed',
Running = 'running',
Pending = 'pending',
Unavailable = 'unavailable',
NoState = '',
Failed = 'failed',
Error = 'error'
}
type PollStrategies = PollOptions[]
export async function pollJobState(
requestClient: RequestClient,
postedJob: Job,
debug: boolean,
authConfig?: AuthConfig,
pollOptions?: PollOptions
pollOptions?: PollOptions,
pollStrategies?: PollStrategies
) {
const logger = process.logger || console
let pollInterval = 300
let maxPollCount = 1000
const defaultPollStrategies: PollStrategies = [
{ maxPollCount: 200, pollInterval: 300, streamLog: false },
{ maxPollCount: 300, pollInterval: 3000, streamLog: false },
{ maxPollCount: 400, pollInterval: 30000, streamLog: false },
{ maxPollCount: 3400, pollInterval: 60000, streamLog: false }
]
const defaultPollOptions: PollOptions = {
maxPollCount,
pollInterval,
streamLog: false
}
if (pollStrategies === undefined) pollStrategies = defaultPollStrategies
else validatePollStrategies(pollStrategies)
let defaultPollOptions: PollOptions = pollStrategies.splice(0, 1)[0]
pollOptions = { ...defaultPollOptions, ...(pollOptions || {}) }
@@ -31,10 +47,10 @@ export async function pollJobState(
throw new Error(`Job state link was not found.`)
}
let currentState = await getJobState(
let currentState: JobState = await getJobState(
requestClient,
postedJob,
'',
JobState.NoState,
debug,
authConfig
).catch((err) => {
@@ -42,12 +58,13 @@ export async function pollJobState(
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
err
)
return 'unavailable'
return JobState.Unavailable
})
let pollCount = 0
if (currentState === 'completed') {
if (currentState === JobState.Completed) {
return Promise.resolve(currentState)
}
@@ -57,58 +74,53 @@ export async function pollJobState(
logFileStream = await getFileStream(postedJob, pollOptions.logFolderPath)
}
// Poll up to the first 100 times with the specified poll interval
let result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
pollOptions,
authConfig,
{
...pollOptions,
maxPollCount:
pollOptions.maxPollCount <= 100 ? pollOptions.maxPollCount : 100
},
logFileStream
)
currentState = result.state
pollCount = result.pollCount
if (!needsRetry(currentState) || pollCount >= pollOptions.maxPollCount) {
if (
!needsRetry(currentState) ||
(pollCount >= pollOptions.maxPollCount && !pollStrategies.length)
) {
return currentState
}
// If we get to this point, this is a long-running job that needs longer polling.
// We will resume polling with a bigger interval of 1 minute
let longJobPollOptions: PollOptions = {
maxPollCount: 24 * 60,
pollInterval: 60000,
streamLog: false
}
if (pollOptions) {
longJobPollOptions.streamLog = pollOptions.streamLog
longJobPollOptions.logFolderPath = pollOptions.logFolderPath
// INFO: If we get to this point, this is a long-running job that needs longer polling.
// We will resume polling with a bigger interval according to the next polling strategy
while (pollStrategies.length && needsRetry(currentState)) {
defaultPollOptions = pollStrategies.splice(0, 1)[0]
if (pollOptions) {
defaultPollOptions.streamLog = pollOptions.streamLog
defaultPollOptions.logFolderPath = pollOptions.logFolderPath
}
result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
defaultPollOptions,
authConfig,
logFileStream
)
currentState = result.state
pollCount = result.pollCount
}
result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
authConfig,
longJobPollOptions,
logFileStream
)
currentState = result.state
pollCount = result.pollCount
if (logFileStream) {
logFileStream.end()
}
if (logFileStream) logFileStream.end()
return currentState
}
@@ -119,17 +131,13 @@ const getJobState = async (
currentState: string,
debug: boolean,
authConfig?: AuthConfig
) => {
const stateLink = job.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
): Promise<JobState> => {
const stateLink = job.links.find((l: any) => l.rel === 'state')!
if (needsRetry(currentState)) {
let tokens
if (authConfig) {
tokens = await getTokens(requestClient, authConfig)
}
if (authConfig) tokens = await getTokens(requestClient, authConfig)
const { result: jobState } = await requestClient
.get<string>(
@@ -143,48 +151,37 @@ const getJobState = async (
throw new JobStatePollError(job.id, err)
})
return jobState.trim()
return jobState.trim() as JobState
} else {
return currentState
return currentState as JobState
}
}
const needsRetry = (state: string) =>
state === 'running' ||
state === '' ||
state === 'pending' ||
state === 'unavailable'
state === JobState.Running ||
state === JobState.NoState ||
state === JobState.Pending ||
state === JobState.Unavailable
const doPoll = async (
requestClient: RequestClient,
postedJob: Job,
currentState: string,
currentState: JobState,
debug: boolean,
pollCount: number,
pollOptions: PollOptions,
authConfig?: AuthConfig,
pollOptions?: PollOptions,
logStream?: WriteStream
): Promise<{ state: string; pollCount: number }> => {
let pollInterval = 300
let maxPollCount = 1000
): Promise<{ state: JobState; pollCount: number }> => {
const { maxPollCount, pollInterval } = pollOptions
const logger = process.logger || console
const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')!
let maxErrorCount = 5
let errorCount = 0
let state = currentState
let printedState = ''
let printedState = JobState.NoState
let startLogLine = 0
const logger = process.logger || console
if (pollOptions) {
pollInterval = pollOptions.pollInterval || pollInterval
maxPollCount = pollOptions.maxPollCount || maxPollCount
}
const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
while (needsRetry(state) && pollCount <= maxPollCount) {
state = await getJobState(
requestClient,
@@ -194,14 +191,17 @@ const doPoll = async (
authConfig
).catch((err) => {
errorCount++
if (pollCount >= maxPollCount || errorCount >= maxErrorCount) {
throw err
}
logger.error(
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
err
)
return 'unavailable'
return JobState.Unavailable
})
pollCount++
@@ -238,12 +238,47 @@ const doPoll = async (
printedState = state
}
if (state != 'unavailable' && errorCount > 0) {
if (state !== JobState.Unavailable && errorCount > 0) {
errorCount = 0
}
await delay(pollInterval)
if (state !== JobState.Completed) {
await delay(pollInterval)
}
}
return { state, pollCount }
}
const validatePollStrategies = (strategies: PollStrategies) => {
const throwError = (message?: string, strategy?: PollOptions) => {
throw new Error(
`Poll strategies are not valid.${message ? ` ${message}` : ''}${
strategy
? ` Invalid poll strategy: \n${JSON.stringify(strategy, null, 2)}`
: ''
}`
)
}
if (!strategies.length) throwError('No strategies provided.')
strategies.forEach((strategy: PollOptions, i: number) => {
const { maxPollCount, pollInterval } = strategy
if (maxPollCount < 1) {
throwError(`'maxPollCount' has to be greater than 0.`, strategy)
} else if (i !== 0) {
const previousStrategy = strategies[i - 1]
if (maxPollCount <= previousStrategy.maxPollCount) {
throwError(
`'maxPollCount' has to be greater than 'maxPollCount' in previous poll strategy.`,
strategy
)
}
} else if (pollInterval < 1) {
throwError(`'pollInterval' has to be greater than 0.`, strategy)
}
})
}

View File

@@ -452,7 +452,9 @@ describe('executeScript', () => {
it('should throw a ComputeJobExecutionError if the job has failed', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('failed'))
.mockImplementation(() =>
Promise.resolve(pollJobStateModule.JobState.Failed)
)
const error: ComputeJobExecutionError = await executeScript(
requestClient,
@@ -485,7 +487,9 @@ describe('executeScript', () => {
it('should throw a ComputeJobExecutionError if the job has errored out', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('error'))
.mockImplementation(() =>
Promise.resolve(pollJobStateModule.JobState.Error)
)
const error: ComputeJobExecutionError = await executeScript(
requestClient,
@@ -654,7 +658,9 @@ const setupMocks = () => {
.mockImplementation(() => Promise.resolve(mockAuthConfig))
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('completed'))
.mockImplementation(() =>
Promise.resolve(pollJobStateModule.JobState.Completed)
)
jest
.spyOn(sessionManager, 'getVariable')
.mockImplementation(() =>

View File

@@ -6,6 +6,7 @@ import * as getTokensModule from '../../../auth/getTokens'
import * as saveLogModule from '../saveLog'
import * as getFileStreamModule from '../getFileStream'
import * as isNodeModule from '../../../utils/isNode'
import * as delayModule from '../../../utils/delay'
import { PollOptions } from '../../../types'
import { WriteStream } from 'fs'
@@ -136,15 +137,19 @@ describe('pollJobState', () => {
it('should return the current status when the max poll count is reached', async () => {
mockRunningPoll()
const pollOptions = {
...defaultPollOptions,
maxPollCount: 1
}
const pollStrategies = [pollOptions]
const state = await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
{
...defaultPollOptions,
maxPollCount: 1
}
pollOptions,
pollStrategies
)
expect(state).toEqual('running')
@@ -244,6 +249,148 @@ describe('pollJobState', () => {
'Error while polling job state for job j0b: Status Error'
)
})
it('should change poll strategies', async () => {
mockSimplePoll(6)
const delays: number[] = []
jest.spyOn(delayModule, 'delay').mockImplementation((ms: number) => {
delays.push(ms)
return Promise.resolve()
})
const pollIntervals = [3, 4, 5, 6]
const pollStrategies = [
{ maxPollCount: 1, pollInterval: pollIntervals[0], streamLog: false },
{ maxPollCount: 2, pollInterval: pollIntervals[1], streamLog: false },
{ maxPollCount: 3, pollInterval: pollIntervals[2], streamLog: false },
{ maxPollCount: 4, pollInterval: pollIntervals[3], streamLog: false }
]
await pollJobState(
requestClient,
mockJob,
false,
undefined,
undefined,
pollStrategies
)
expect(delays).toEqual([pollIntervals[0], ...pollIntervals])
})
it('should throw an error if not valid poll strategies provided', async () => {
// INFO: No strategies provided.
let expectedError = new Error(
'Poll strategies are not valid. No strategies provided.'
)
let pollStrategies: PollOptions[] = []
await expect(
pollJobState(
requestClient,
mockJob,
false,
undefined,
undefined,
pollStrategies
)
).rejects.toThrow(expectedError)
// INFO: 'maxPollCount' has to be > 0
let invalidPollStrategy = {
maxPollCount: 0,
pollInterval: 3,
streamLog: false
}
pollStrategies.push(invalidPollStrategy)
expectedError = new Error(
`Poll strategies are not valid. 'maxPollCount' has to be greater than 0. Invalid poll strategy: \n${JSON.stringify(
invalidPollStrategy,
null,
2
)}`
)
await expect(
pollJobState(
requestClient,
mockJob,
false,
undefined,
undefined,
pollStrategies
)
).rejects.toThrow(expectedError)
// INFO: 'maxPollCount' has to be > than 'maxPollCount' of the previous strategy
const validPollStrategy = {
maxPollCount: 5,
pollInterval: 2,
streamLog: false
}
invalidPollStrategy = {
maxPollCount: validPollStrategy.maxPollCount,
pollInterval: 3,
streamLog: false
}
pollStrategies = [validPollStrategy, invalidPollStrategy]
expectedError = new Error(
`Poll strategies are not valid. 'maxPollCount' has to be greater than 'maxPollCount' in previous poll strategy. Invalid poll strategy: \n${JSON.stringify(
invalidPollStrategy,
null,
2
)}`
)
await expect(
pollJobState(
requestClient,
mockJob,
false,
undefined,
undefined,
pollStrategies
)
).rejects.toThrow(expectedError)
// INFO: invalid 'pollInterval'
invalidPollStrategy = {
maxPollCount: 1,
pollInterval: 0,
streamLog: false
}
pollStrategies = [invalidPollStrategy]
expectedError = new Error(
`Poll strategies are not valid. 'pollInterval' has to be greater than 0. Invalid poll strategy: \n${JSON.stringify(
invalidPollStrategy,
null,
2
)}`
)
await expect(
pollJobState(
requestClient,
mockJob,
false,
undefined,
undefined,
pollStrategies
)
).rejects.toThrow(expectedError)
})
})
const setupMocks = () => {
@@ -273,11 +420,14 @@ const setupMocks = () => {
const mockSimplePoll = (runningCount = 2) => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result:
count === 0
@@ -293,11 +443,14 @@ const mockSimplePoll = (runningCount = 2) => {
const mockRunningPoll = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result: count === 0 ? 'pending' : 'running',
etag: '',
@@ -308,11 +461,14 @@ const mockRunningPoll = () => {
const mockLongPoll = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result: count <= 102 ? 'running' : 'completed',
etag: '',
@@ -323,14 +479,18 @@ const mockLongPoll = () => {
const mockPollWithSingleError = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
if (count === 1) {
return Promise.reject('Status Error')
}
return Promise.resolve({
result: count === 0 ? 'pending' : 'completed',
etag: '',
@@ -344,6 +504,7 @@ const mockErroredPoll = () => {
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.reject('Status Error')
})
}