diff --git a/src/api/viya/executeScript.ts b/src/api/viya/executeScript.ts index 9ae65a5..2bfb9d1 100644 --- a/src/api/viya/executeScript.ts +++ b/src/api/viya/executeScript.ts @@ -12,7 +12,7 @@ import { RequestClient } from '../../request/RequestClient' import { SessionManager } from '../../SessionManager' import { isRelativePath, fetchLogByChunks } from '../../utils' import { formatDataForRequest } from '../../utils/formatDataForRequest' -import { pollJobState } from './pollJobState' +import { pollJobState, JobState } from './pollJobState' import { uploadTables } from './uploadTables' /** @@ -228,7 +228,7 @@ export async function executeScript( ) } - if (jobStatus === 'failed' || jobStatus === 'error') { + if (jobStatus === JobState.Failed || jobStatus === JobState.Error) { throw new ComputeJobExecutionError(currentJob, log) } diff --git a/src/api/viya/pollJobState.ts b/src/api/viya/pollJobState.ts index 0ac2597..1a7a1ea 100644 --- a/src/api/viya/pollJobState.ts +++ b/src/api/viya/pollJobState.ts @@ -6,23 +6,39 @@ import { JobStatePollError } from '../../types/errors' import { Link, WriteStream } from '../../types' import { delay, isNode } from '../../utils' +export enum JobState { + Completed = 'completed', + Running = 'running', + Pending = 'pending', + Unavailable = 'unavailable', + NoState = '', + Failed = 'failed', + Error = 'error' +} + +type PollStrategies = PollOptions[] + export async function pollJobState( requestClient: RequestClient, postedJob: Job, debug: boolean, authConfig?: AuthConfig, - pollOptions?: PollOptions + pollOptions?: PollOptions, + pollStrategies?: PollStrategies ) { const logger = process.logger || console - let pollInterval = 300 - let maxPollCount = 1000 + const defaultPollStrategies: PollStrategies = [ + { maxPollCount: 200, pollInterval: 300, streamLog: false }, + { maxPollCount: 300, pollInterval: 3000, streamLog: false }, + { maxPollCount: 400, pollInterval: 30000, streamLog: false }, + { maxPollCount: 3400, pollInterval: 60000, streamLog: false } + ] - const defaultPollOptions: PollOptions = { - maxPollCount, - pollInterval, - streamLog: false - } + if (pollStrategies === undefined) pollStrategies = defaultPollStrategies + else validatePollStrategies(pollStrategies) + + let defaultPollOptions: PollOptions = pollStrategies.splice(0, 1)[0] pollOptions = { ...defaultPollOptions, ...(pollOptions || {}) } @@ -31,10 +47,10 @@ export async function pollJobState( throw new Error(`Job state link was not found.`) } - let currentState = await getJobState( + let currentState: JobState = await getJobState( requestClient, postedJob, - '', + JobState.NoState, debug, authConfig ).catch((err) => { @@ -42,12 +58,13 @@ export async function pollJobState( `Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`, err ) - return 'unavailable' + + return JobState.Unavailable }) let pollCount = 0 - if (currentState === 'completed') { + if (currentState === JobState.Completed) { return Promise.resolve(currentState) } @@ -57,58 +74,53 @@ export async function pollJobState( logFileStream = await getFileStream(postedJob, pollOptions.logFolderPath) } - // Poll up to the first 100 times with the specified poll interval let result = await doPoll( requestClient, postedJob, currentState, debug, pollCount, + pollOptions, authConfig, - { - ...pollOptions, - maxPollCount: - pollOptions.maxPollCount <= 100 ? pollOptions.maxPollCount : 100 - }, logFileStream ) currentState = result.state pollCount = result.pollCount - if (!needsRetry(currentState) || pollCount >= pollOptions.maxPollCount) { + if ( + !needsRetry(currentState) || + (pollCount >= pollOptions.maxPollCount && !pollStrategies.length) + ) { return currentState } - // If we get to this point, this is a long-running job that needs longer polling. - // We will resume polling with a bigger interval of 1 minute - let longJobPollOptions: PollOptions = { - maxPollCount: 24 * 60, - pollInterval: 60000, - streamLog: false - } - if (pollOptions) { - longJobPollOptions.streamLog = pollOptions.streamLog - longJobPollOptions.logFolderPath = pollOptions.logFolderPath + // INFO: If we get to this point, this is a long-running job that needs longer polling. + // We will resume polling with a bigger interval according to the next polling strategy + while (pollStrategies.length && needsRetry(currentState)) { + defaultPollOptions = pollStrategies.splice(0, 1)[0] + + if (pollOptions) { + defaultPollOptions.streamLog = pollOptions.streamLog + defaultPollOptions.logFolderPath = pollOptions.logFolderPath + } + + result = await doPoll( + requestClient, + postedJob, + currentState, + debug, + pollCount, + defaultPollOptions, + authConfig, + logFileStream + ) + + currentState = result.state + pollCount = result.pollCount } - result = await doPoll( - requestClient, - postedJob, - currentState, - debug, - pollCount, - authConfig, - longJobPollOptions, - logFileStream - ) - - currentState = result.state - pollCount = result.pollCount - - if (logFileStream) { - logFileStream.end() - } + if (logFileStream) logFileStream.end() return currentState } @@ -119,17 +131,13 @@ const getJobState = async ( currentState: string, debug: boolean, authConfig?: AuthConfig -) => { - const stateLink = job.links.find((l: any) => l.rel === 'state') - if (!stateLink) { - throw new Error(`Job state link was not found.`) - } +): Promise => { + const stateLink = job.links.find((l: any) => l.rel === 'state')! if (needsRetry(currentState)) { let tokens - if (authConfig) { - tokens = await getTokens(requestClient, authConfig) - } + + if (authConfig) tokens = await getTokens(requestClient, authConfig) const { result: jobState } = await requestClient .get( @@ -143,48 +151,37 @@ const getJobState = async ( throw new JobStatePollError(job.id, err) }) - return jobState.trim() + return jobState.trim() as JobState } else { - return currentState + return currentState as JobState } } const needsRetry = (state: string) => - state === 'running' || - state === '' || - state === 'pending' || - state === 'unavailable' + state === JobState.Running || + state === JobState.NoState || + state === JobState.Pending || + state === JobState.Unavailable const doPoll = async ( requestClient: RequestClient, postedJob: Job, - currentState: string, + currentState: JobState, debug: boolean, pollCount: number, + pollOptions: PollOptions, authConfig?: AuthConfig, - pollOptions?: PollOptions, logStream?: WriteStream -): Promise<{ state: string; pollCount: number }> => { - let pollInterval = 300 - let maxPollCount = 1000 +): Promise<{ state: JobState; pollCount: number }> => { + const { maxPollCount, pollInterval } = pollOptions + const logger = process.logger || console + const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')! let maxErrorCount = 5 let errorCount = 0 let state = currentState - let printedState = '' + let printedState = JobState.NoState let startLogLine = 0 - const logger = process.logger || console - - if (pollOptions) { - pollInterval = pollOptions.pollInterval || pollInterval - maxPollCount = pollOptions.maxPollCount || maxPollCount - } - - const stateLink = postedJob.links.find((l: Link) => l.rel === 'state') - if (!stateLink) { - throw new Error(`Job state link was not found.`) - } - while (needsRetry(state) && pollCount <= maxPollCount) { state = await getJobState( requestClient, @@ -194,14 +191,17 @@ const doPoll = async ( authConfig ).catch((err) => { errorCount++ + if (pollCount >= maxPollCount || errorCount >= maxErrorCount) { throw err } + logger.error( `Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`, err ) - return 'unavailable' + + return JobState.Unavailable }) pollCount++ @@ -238,12 +238,47 @@ const doPoll = async ( printedState = state } - if (state != 'unavailable' && errorCount > 0) { + if (state !== JobState.Unavailable && errorCount > 0) { errorCount = 0 } - await delay(pollInterval) + if (state !== JobState.Completed) { + await delay(pollInterval) + } } return { state, pollCount } } + +const validatePollStrategies = (strategies: PollStrategies) => { + const throwError = (message?: string, strategy?: PollOptions) => { + throw new Error( + `Poll strategies are not valid.${message ? ` ${message}` : ''}${ + strategy + ? ` Invalid poll strategy: \n${JSON.stringify(strategy, null, 2)}` + : '' + }` + ) + } + + if (!strategies.length) throwError('No strategies provided.') + + strategies.forEach((strategy: PollOptions, i: number) => { + const { maxPollCount, pollInterval } = strategy + + if (maxPollCount < 1) { + throwError(`'maxPollCount' has to be greater than 0.`, strategy) + } else if (i !== 0) { + const previousStrategy = strategies[i - 1] + + if (maxPollCount <= previousStrategy.maxPollCount) { + throwError( + `'maxPollCount' has to be greater than 'maxPollCount' in previous poll strategy.`, + strategy + ) + } + } else if (pollInterval < 1) { + throwError(`'pollInterval' has to be greater than 0.`, strategy) + } + }) +} diff --git a/src/api/viya/spec/executeScript.spec.ts b/src/api/viya/spec/executeScript.spec.ts index 0df1512..5d5b0ec 100644 --- a/src/api/viya/spec/executeScript.spec.ts +++ b/src/api/viya/spec/executeScript.spec.ts @@ -452,7 +452,9 @@ describe('executeScript', () => { it('should throw a ComputeJobExecutionError if the job has failed', async () => { jest .spyOn(pollJobStateModule, 'pollJobState') - .mockImplementation(() => Promise.resolve('failed')) + .mockImplementation(() => + Promise.resolve(pollJobStateModule.JobState.Failed) + ) const error: ComputeJobExecutionError = await executeScript( requestClient, @@ -485,7 +487,9 @@ describe('executeScript', () => { it('should throw a ComputeJobExecutionError if the job has errored out', async () => { jest .spyOn(pollJobStateModule, 'pollJobState') - .mockImplementation(() => Promise.resolve('error')) + .mockImplementation(() => + Promise.resolve(pollJobStateModule.JobState.Error) + ) const error: ComputeJobExecutionError = await executeScript( requestClient, @@ -654,7 +658,9 @@ const setupMocks = () => { .mockImplementation(() => Promise.resolve(mockAuthConfig)) jest .spyOn(pollJobStateModule, 'pollJobState') - .mockImplementation(() => Promise.resolve('completed')) + .mockImplementation(() => + Promise.resolve(pollJobStateModule.JobState.Completed) + ) jest .spyOn(sessionManager, 'getVariable') .mockImplementation(() => diff --git a/src/api/viya/spec/pollJobState.spec.ts b/src/api/viya/spec/pollJobState.spec.ts index 9a07741..e9bc09b 100644 --- a/src/api/viya/spec/pollJobState.spec.ts +++ b/src/api/viya/spec/pollJobState.spec.ts @@ -6,6 +6,7 @@ import * as getTokensModule from '../../../auth/getTokens' import * as saveLogModule from '../saveLog' import * as getFileStreamModule from '../getFileStream' import * as isNodeModule from '../../../utils/isNode' +import * as delayModule from '../../../utils/delay' import { PollOptions } from '../../../types' import { WriteStream } from 'fs' @@ -136,15 +137,19 @@ describe('pollJobState', () => { it('should return the current status when the max poll count is reached', async () => { mockRunningPoll() + const pollOptions = { + ...defaultPollOptions, + maxPollCount: 1 + } + const pollStrategies = [pollOptions] + const state = await pollJobState( requestClient, mockJob, false, mockAuthConfig, - { - ...defaultPollOptions, - maxPollCount: 1 - } + pollOptions, + pollStrategies ) expect(state).toEqual('running') @@ -244,6 +249,148 @@ describe('pollJobState', () => { 'Error while polling job state for job j0b: Status Error' ) }) + + it('should change poll strategies', async () => { + mockSimplePoll(6) + + const delays: number[] = [] + + jest.spyOn(delayModule, 'delay').mockImplementation((ms: number) => { + delays.push(ms) + + return Promise.resolve() + }) + + const pollIntervals = [3, 4, 5, 6] + + const pollStrategies = [ + { maxPollCount: 1, pollInterval: pollIntervals[0], streamLog: false }, + { maxPollCount: 2, pollInterval: pollIntervals[1], streamLog: false }, + { maxPollCount: 3, pollInterval: pollIntervals[2], streamLog: false }, + { maxPollCount: 4, pollInterval: pollIntervals[3], streamLog: false } + ] + + await pollJobState( + requestClient, + mockJob, + false, + undefined, + undefined, + pollStrategies + ) + + expect(delays).toEqual([pollIntervals[0], ...pollIntervals]) + }) + + it('should throw an error if not valid poll strategies provided', async () => { + // INFO: No strategies provided. + let expectedError = new Error( + 'Poll strategies are not valid. No strategies provided.' + ) + + let pollStrategies: PollOptions[] = [] + + await expect( + pollJobState( + requestClient, + mockJob, + false, + undefined, + undefined, + pollStrategies + ) + ).rejects.toThrow(expectedError) + + // INFO: 'maxPollCount' has to be > 0 + let invalidPollStrategy = { + maxPollCount: 0, + pollInterval: 3, + streamLog: false + } + + pollStrategies.push(invalidPollStrategy) + + expectedError = new Error( + `Poll strategies are not valid. 'maxPollCount' has to be greater than 0. Invalid poll strategy: \n${JSON.stringify( + invalidPollStrategy, + null, + 2 + )}` + ) + + await expect( + pollJobState( + requestClient, + mockJob, + false, + undefined, + undefined, + pollStrategies + ) + ).rejects.toThrow(expectedError) + + // INFO: 'maxPollCount' has to be > than 'maxPollCount' of the previous strategy + const validPollStrategy = { + maxPollCount: 5, + pollInterval: 2, + streamLog: false + } + + invalidPollStrategy = { + maxPollCount: validPollStrategy.maxPollCount, + pollInterval: 3, + streamLog: false + } + + pollStrategies = [validPollStrategy, invalidPollStrategy] + + expectedError = new Error( + `Poll strategies are not valid. 'maxPollCount' has to be greater than 'maxPollCount' in previous poll strategy. Invalid poll strategy: \n${JSON.stringify( + invalidPollStrategy, + null, + 2 + )}` + ) + + await expect( + pollJobState( + requestClient, + mockJob, + false, + undefined, + undefined, + pollStrategies + ) + ).rejects.toThrow(expectedError) + + // INFO: invalid 'pollInterval' + invalidPollStrategy = { + maxPollCount: 1, + pollInterval: 0, + streamLog: false + } + + pollStrategies = [invalidPollStrategy] + + expectedError = new Error( + `Poll strategies are not valid. 'pollInterval' has to be greater than 0. Invalid poll strategy: \n${JSON.stringify( + invalidPollStrategy, + null, + 2 + )}` + ) + + await expect( + pollJobState( + requestClient, + mockJob, + false, + undefined, + undefined, + pollStrategies + ) + ).rejects.toThrow(expectedError) + }) }) const setupMocks = () => { @@ -273,11 +420,14 @@ const setupMocks = () => { const mockSimplePoll = (runningCount = 2) => { let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { count++ + if (url.includes('job')) { return Promise.resolve({ result: mockJob, etag: '', status: 200 }) } + return Promise.resolve({ result: count === 0 @@ -293,11 +443,14 @@ const mockSimplePoll = (runningCount = 2) => { const mockRunningPoll = () => { let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { count++ + if (url.includes('job')) { return Promise.resolve({ result: mockJob, etag: '', status: 200 }) } + return Promise.resolve({ result: count === 0 ? 'pending' : 'running', etag: '', @@ -308,11 +461,14 @@ const mockRunningPoll = () => { const mockLongPoll = () => { let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { count++ + if (url.includes('job')) { return Promise.resolve({ result: mockJob, etag: '', status: 200 }) } + return Promise.resolve({ result: count <= 102 ? 'running' : 'completed', etag: '', @@ -323,14 +479,18 @@ const mockLongPoll = () => { const mockPollWithSingleError = () => { let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { count++ + if (url.includes('job')) { return Promise.resolve({ result: mockJob, etag: '', status: 200 }) } + if (count === 1) { return Promise.reject('Status Error') } + return Promise.resolve({ result: count === 0 ? 'pending' : 'completed', etag: '', @@ -344,6 +504,7 @@ const mockErroredPoll = () => { if (url.includes('job')) { return Promise.resolve({ result: mockJob, etag: '', status: 200 }) } + return Promise.reject('Status Error') }) }