diff --git a/package-lock.json b/package-lock.json index 928b41e..b62d148 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7,7 +7,7 @@ "name": "@sasjs/adapter", "license": "ISC", "dependencies": { - "@sasjs/utils": "^2.24.0", + "@sasjs/utils": "^2.25.1", "axios": "^0.21.1", "axios-cookiejar-support": "^1.0.1", "form-data": "^4.0.0", @@ -1188,8 +1188,9 @@ } }, "node_modules/@sasjs/utils": { - "version": "2.24.0", - "license": "ISC", + "version": "2.25.1", + "resolved": "https://registry.npmjs.org/@sasjs/utils/-/utils-2.25.1.tgz", + "integrity": "sha512-lwkPE+QsB81b8/1M8X2zLdhpuiA8pIjgOwJH57zhcsliuDnNs4uijSYu40aYSc8tH98jtSuqWMjfGq8CT9o1Dw==", "dependencies": { "@types/prompts": "^2.0.13", "chalk": "^4.1.1", @@ -7932,7 +7933,182 @@ "treeverse", "validate-npm-package-name", "which", - "write-file-atomic" + "write-file-atomic", + "@npmcli/disparity-colors", + "@npmcli/git", + "@npmcli/installed-package-contents", + "@npmcli/map-workspaces", + "@npmcli/metavuln-calculator", + "@npmcli/move-file", + "@npmcli/name-from-folder", + "@npmcli/node-gyp", + "@npmcli/promise-spawn", + "@tootallnate/once", + "agent-base", + "agentkeepalive", + "aggregate-error", + "ajv", + "ansi-regex", + "ansi-styles", + "aproba", + "are-we-there-yet", + "asap", + "asn1", + "assert-plus", + "asynckit", + "aws-sign2", + "aws4", + "balanced-match", + "bcrypt-pbkdf", + "bin-links", + "binary-extensions", + "brace-expansion", + "builtins", + "caseless", + "cidr-regex", + "clean-stack", + "clone", + "cmd-shim", + "code-point-at", + "color-convert", + "color-name", + "colors", + "combined-stream", + "common-ancestor-path", + "concat-map", + "console-control-strings", + "core-util-is", + "dashdash", + "debug", + "debuglog", + "defaults", + "delayed-stream", + "delegates", + "depd", + "dezalgo", + "diff", + "ecc-jsbn", + "emoji-regex", + "encoding", + "env-paths", + "err-code", + "extend", + "extsprintf", + "fast-deep-equal", + "fast-json-stable-stringify", + "forever-agent", + "fs-minipass", + "fs.realpath", + "function-bind", + "gauge", + "getpass", + "har-schema", + "har-validator", + "has", + "has-flag", + "has-unicode", + "http-cache-semantics", + "http-proxy-agent", + "http-signature", + "https-proxy-agent", + "humanize-ms", + "iconv-lite", + "ignore-walk", + "imurmurhash", + "indent-string", + "infer-owner", + "inflight", + "inherits", + "ip", + "ip-regex", + "is-core-module", + "is-fullwidth-code-point", + "is-lambda", + "is-typedarray", + "isarray", + "isexe", + "isstream", + "jsbn", + "json-schema", + "json-schema-traverse", + "json-stringify-nice", + "json-stringify-safe", + "jsonparse", + "jsprim", + "just-diff", + "just-diff-apply", + "lru-cache", + "mime-db", + "mime-types", + "minimatch", + "minipass-collect", + "minipass-fetch", + "minipass-flush", + "minipass-json-stream", + "minipass-sized", + "minizlib", + "mute-stream", + "negotiator", + "normalize-package-data", + "npm-bundled", + "npm-install-checks", + "npm-normalize-package-bin", + "npm-packlist", + "number-is-nan", + "oauth-sign", + "object-assign", + "once", + "p-map", + "path-is-absolute", + "path-parse", + "performance-now", + "proc-log", + "process-nextick-args", + "promise-all-reject-late", + "promise-call-limit", + "promise-inflight", + "promise-retry", + "promzard", + "psl", + "punycode", + "qs", + "read-cmd-shim", + "readable-stream", + "request", + "resolve", + "retry", + "safe-buffer", + "safer-buffer", + "set-blocking", + "signal-exit", + "smart-buffer", + "socks", + "socks-proxy-agent", + "spdx-correct", + "spdx-exceptions", + "spdx-expression-parse", + "spdx-license-ids", + "sshpk", + "string_decoder", + "string-width", + "stringify-package", + "strip-ansi", + "supports-color", + "tunnel-agent", + "tweetnacl", + "typedarray-to-buffer", + "unique-filename", + "unique-slug", + "uri-js", + "util-deprecate", + "uuid", + "validate-npm-package-license", + "verror", + "walk-up-path", + "wcwidth", + "wide-align", + "wrappy", + "yallist" ], "dev": true, "license": "Artistic-2.0", @@ -14604,7 +14780,9 @@ } }, "@sasjs/utils": { - "version": "2.24.0", + "version": "2.25.1", + "resolved": "https://registry.npmjs.org/@sasjs/utils/-/utils-2.25.1.tgz", + "integrity": "sha512-lwkPE+QsB81b8/1M8X2zLdhpuiA8pIjgOwJH57zhcsliuDnNs4uijSYu40aYSc8tH98jtSuqWMjfGq8CT9o1Dw==", "requires": { "@types/prompts": "^2.0.13", "chalk": "^4.1.1", diff --git a/package.json b/package.json index 95e7deb..332b564 100644 --- a/package.json +++ b/package.json @@ -67,7 +67,7 @@ }, "main": "index.js", "dependencies": { - "@sasjs/utils": "^2.24.0", + "@sasjs/utils": "^2.25.1", "axios": "^0.21.1", "axios-cookiejar-support": "^1.0.1", "form-data": "^4.0.0", diff --git a/src/SASViyaApiClient.ts b/src/SASViyaApiClient.ts index ee439f2..e68082c 100644 --- a/src/SASViyaApiClient.ts +++ b/src/SASViyaApiClient.ts @@ -754,18 +754,16 @@ export class SASViyaApiClient { jobDefinition, arguments: jobArguments } - const { result: postedJob, etag } = await this.requestClient.post( + const { result: postedJob } = await this.requestClient.post( `${this.serverUrl}/jobExecution/jobs?_action=wait`, postJobRequestBody, access_token ) - const jobStatus = await this.pollJobState( - postedJob, - etag, - authConfig - ).catch((err) => { - throw prefixMessage(err, 'Error while polling job status. ') - }) + const jobStatus = await this.pollJobState(postedJob, authConfig).catch( + (err) => { + throw prefixMessage(err, 'Error while polling job status. ') + } + ) const { result: currentJob } = await this.requestClient.get( `${this.serverUrl}/jobExecution/jobs/${postedJob.id}`, access_token @@ -833,7 +831,6 @@ export class SASViyaApiClient { private async pollJobState( postedJob: Job, - etag: string | null, authConfig?: AuthConfig, pollOptions?: PollOptions ) { @@ -841,7 +838,6 @@ export class SASViyaApiClient { this.requestClient, postedJob, this.debug, - etag, authConfig, pollOptions ) diff --git a/src/api/viya/executeScript.ts b/src/api/viya/executeScript.ts index c75a3e5..e54143f 100644 --- a/src/api/viya/executeScript.ts +++ b/src/api/viya/executeScript.ts @@ -178,7 +178,6 @@ export async function executeScript( requestClient, postedJob, debug, - etag, authConfig, pollOptions ).catch(async (err) => { diff --git a/src/api/viya/pollJobState.ts b/src/api/viya/pollJobState.ts index 5e74dda..b777382 100644 --- a/src/api/viya/pollJobState.ts +++ b/src/api/viya/pollJobState.ts @@ -1,16 +1,19 @@ import { AuthConfig } from '@sasjs/utils/types' -import { prefixMessage } from '@sasjs/utils/error' -import { generateTimestamp } from '@sasjs/utils/time' import { Job, PollOptions } from '../..' import { getTokens } from '../../auth/getTokens' import { RequestClient } from '../../request/RequestClient' +import { JobStatePollError } from '../../types/errors' +import { generateTimestamp } from '@sasjs/utils/time' import { saveLog } from './saveLog' +import { createWriteStream } from '@sasjs/utils/file' +import { WriteStream } from 'fs' +import { Link } from '../../types' +import { prefixMessage } from '@sasjs/utils/error' export async function pollJobState( requestClient: RequestClient, postedJob: Job, debug: boolean, - etag: string | null, authConfig?: AuthConfig, pollOptions?: PollOptions ) { @@ -18,130 +21,226 @@ export async function pollJobState( let pollInterval = 300 let maxPollCount = 1000 - let maxErrorCount = 5 - let access_token = (authConfig || {}).access_token - - const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log` - const logFilePath = `${ - pollOptions?.logFolderPath || process.cwd() - }/${logFileName}` - - if (authConfig) { - ;({ access_token } = await getTokens(requestClient, authConfig)) - } if (pollOptions) { pollInterval = pollOptions.pollInterval || pollInterval maxPollCount = pollOptions.maxPollCount || maxPollCount } - let postedJobState = '' - let pollCount = 0 - let errorCount = 0 - const headers: any = { - 'Content-Type': 'application/json', - 'If-None-Match': etag - } - if (access_token) { - headers.Authorization = `Bearer ${access_token}` - } const stateLink = postedJob.links.find((l: any) => l.rel === 'state') if (!stateLink) { throw new Error(`Job state link was not found.`) } - const { result: state } = await requestClient - .get( - `${stateLink.href}?_action=wait&wait=300`, - access_token, - 'text/plain', - {}, - debug + let currentState = await getJobState( + requestClient, + postedJob, + '', + debug, + authConfig + ).catch((err) => { + logger.error( + `Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`, + err ) - .catch((err) => { - logger.error( - `Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`, - err - ) - return { result: 'unavailable' } - }) + return 'unavailable' + }) + + let pollCount = 0 - const currentState = state.trim() if (currentState === 'completed') { return Promise.resolve(currentState) } - return new Promise(async (resolve, reject) => { - let printedState = '' + let logFileStream + if (pollOptions?.streamLog) { + const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log` + const logFilePath = `${ + pollOptions?.logFolderPath || process.cwd() + }/${logFileName}` - const interval = setInterval(async () => { - if ( - postedJobState === 'running' || - postedJobState === '' || - postedJobState === 'pending' || - postedJobState === 'unavailable' - ) { - if (authConfig) { - ;({ access_token } = await getTokens(requestClient, authConfig)) - } + logFileStream = await createWriteStream(logFilePath) + } - if (stateLink) { - const { result: jobState } = await requestClient - .get( - `${stateLink.href}?_action=wait&wait=300`, - access_token, - 'text/plain', - {}, - debug - ) - .catch((err) => { - errorCount++ - if (pollCount >= maxPollCount || errorCount >= maxErrorCount) { - clearInterval(interval) - reject( - prefixMessage( - err, - 'Error while getting job state after interval. ' - ) - ) - } - logger.error( - `Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`, - err - ) - return { result: 'unavailable' } - }) + let result = await doPoll( + requestClient, + postedJob, + currentState, + debug, + pollCount, + authConfig, + pollOptions, + logFileStream + ) - postedJobState = jobState.trim() - if (postedJobState != 'unavailable' && errorCount > 0) { - errorCount = 0 - } + currentState = result.state + pollCount = result.pollCount - if (debug && printedState !== postedJobState) { - logger.info('Polling job status...') - logger.info(`Current job state: ${postedJobState}`) + if (!needsRetry(currentState) || pollCount >= maxPollCount) { + return currentState + } - printedState = postedJobState - } + // If we get to this point, this is a long-running job that needs longer polling. + // We will resume polling with a bigger interval of 1 minute + let longJobPollOptions: PollOptions = { + maxPollCount: 24 * 60, + pollInterval: 60000, + streamLog: false + } + if (pollOptions) { + longJobPollOptions.streamLog = pollOptions.streamLog + longJobPollOptions.logFolderPath = pollOptions.logFolderPath + } - pollCount++ + result = await doPoll( + requestClient, + postedJob, + currentState, + debug, + pollCount, + authConfig, + longJobPollOptions, + logFileStream + ) - await saveLog( - postedJob, - requestClient, - pollOptions?.streamLog || false, - logFilePath, - access_token - ) + currentState = result.state + pollCount = result.pollCount - if (pollCount >= maxPollCount) { - resolve(postedJobState) - } - } - } else { - clearInterval(interval) - resolve(postedJobState) - } - }, pollInterval) - }) + if (logFileStream) { + logFileStream.end() + } + + return currentState } + +const getJobState = async ( + requestClient: RequestClient, + job: Job, + currentState: string, + debug: boolean, + authConfig?: AuthConfig +) => { + const stateLink = job.links.find((l: any) => l.rel === 'state') + if (!stateLink) { + throw new Error(`Job state link was not found.`) + } + + if (needsRetry(currentState)) { + let tokens + if (authConfig) { + tokens = await getTokens(requestClient, authConfig) + } + + const { result: jobState } = await requestClient + .get( + `${stateLink.href}?_action=wait&wait=300`, + tokens?.access_token, + 'text/plain', + {}, + debug + ) + .catch((err) => { + throw new JobStatePollError(job.id, err) + }) + + return jobState.trim() + } else { + return currentState + } +} + +const needsRetry = (state: string) => + state === 'running' || + state === '' || + state === 'pending' || + state === 'unavailable' + +const doPoll = async ( + requestClient: RequestClient, + postedJob: Job, + currentState: string, + debug: boolean, + pollCount: number, + authConfig?: AuthConfig, + pollOptions?: PollOptions, + logStream?: WriteStream +): Promise<{ state: string; pollCount: number }> => { + let pollInterval = 300 + let maxPollCount = 1000 + let maxErrorCount = 5 + let errorCount = 0 + let state = currentState + let printedState = '' + let startLogLine = 0 + + const logger = process.logger || console + + if (pollOptions) { + pollInterval = pollOptions.pollInterval || pollInterval + maxPollCount = pollOptions.maxPollCount || maxPollCount + } + + const stateLink = postedJob.links.find((l: Link) => l.rel === 'state') + if (!stateLink) { + throw new Error(`Job state link was not found.`) + } + + while (needsRetry(state) && pollCount <= 100 && pollCount <= maxPollCount) { + state = await getJobState( + requestClient, + postedJob, + state, + debug, + authConfig + ).catch((err) => { + errorCount++ + if (pollCount >= maxPollCount || errorCount >= maxErrorCount) { + throw err + } + logger.error( + `Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`, + err + ) + return 'unavailable' + }) + + pollCount++ + + const jobUrl = postedJob.links.find((l: Link) => l.rel === 'self') + const { result: job } = await requestClient.get( + jobUrl!.href, + authConfig?.access_token + ) + + const endLogLine = job.logStatistics?.lineCount ?? 1000000 + + await saveLog( + postedJob, + requestClient, + pollOptions?.streamLog || false, + startLogLine, + endLogLine, + logStream, + authConfig?.access_token + ) + + startLogLine += job.logStatistics.lineCount + + if (debug && printedState !== state) { + logger.info('Polling job status...') + logger.info(`Current job state: ${state}`) + + printedState = state + } + + if (state != 'unavailable' && errorCount > 0) { + errorCount = 0 + } + + await delay(pollInterval) + } + + return { state, pollCount } +} + +const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)) diff --git a/src/api/viya/saveLog.ts b/src/api/viya/saveLog.ts index 9200930..94fbbfd 100644 --- a/src/api/viya/saveLog.ts +++ b/src/api/viya/saveLog.ts @@ -1,15 +1,19 @@ -import { createFile } from '@sasjs/utils/file' import { Job } from '../..' import { RequestClient } from '../../request/RequestClient' -import { fetchLogByChunks } from '../../utils' +import { fetchLog } from '../../utils' +import { WriteStream } from 'fs' +import { writeStream } from './writeStream' export async function saveLog( job: Job, requestClient: RequestClient, shouldSaveLog: boolean, - logFilePath: string, + startLine: number, + endLine: number, + logFileStream?: WriteStream, accessToken?: string ) { + console.log('startLine: ', startLine, ' endLine: ', endLine) if (!shouldSaveLog) { return } @@ -20,6 +24,12 @@ export async function saveLog( ) } + if (!logFileStream) { + throw new Error( + `Logs for job ${job.id} cannot be written without a valid write stream.` + ) + } + const logger = process.logger || console const jobLogUrl = job.links.find((l) => l.rel === 'log') @@ -27,14 +37,14 @@ export async function saveLog( throw new Error(`Log URL for job ${job.id} was not found.`) } - const logCount = job.logStatistics?.lineCount ?? 1000000 - const log = await fetchLogByChunks( + const log = await fetchLog( requestClient, accessToken, `${jobLogUrl.href}/content`, - logCount - ) + startLine, + endLine + ).catch((e) => console.log(e)) - logger.info(`Writing logs to ${logFilePath}`) - await createFile(logFilePath, log) + logger.info(`Writing logs to ${logFileStream.path}`) + await writeStream(logFileStream, log || '').catch((e) => console.log(e)) } diff --git a/src/api/viya/spec/executeScript.spec.ts b/src/api/viya/spec/executeScript.spec.ts index 314c70b..5e2fb11 100644 --- a/src/api/viya/spec/executeScript.spec.ts +++ b/src/api/viya/spec/executeScript.spec.ts @@ -344,7 +344,6 @@ describe('executeScript', () => { requestClient, mockJob, false, - '', mockAuthConfig, defaultPollOptions ) @@ -546,7 +545,7 @@ describe('executeScript', () => { if (url.includes('_webout')) { return Promise.reject(new NotFoundError(url)) } - return Promise.resolve({ result: mockJob, etag: '' }) + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) }) const error = await executeScript( @@ -645,7 +644,9 @@ const setupMocks = () => { .mockImplementation(() => Promise.resolve({ result: mockJob, etag: '' })) jest .spyOn(requestClient, 'get') - .mockImplementation(() => Promise.resolve({ result: mockJob, etag: '' })) + .mockImplementation(() => + Promise.resolve({ result: mockJob, etag: '', status: 200 }) + ) jest .spyOn(requestClient, 'delete') .mockImplementation(() => Promise.resolve({ result: {}, etag: '' })) @@ -658,7 +659,7 @@ const setupMocks = () => { jest .spyOn(sessionManager, 'getVariable') .mockImplementation(() => - Promise.resolve({ result: { value: 'test' }, etag: 'test' }) + Promise.resolve({ result: { value: 'test' }, etag: 'test', status: 200 }) ) jest .spyOn(sessionManager, 'getSession') diff --git a/src/api/viya/spec/mockResponses.ts b/src/api/viya/spec/mockResponses.ts index e85028c..22580f7 100644 --- a/src/api/viya/spec/mockResponses.ts +++ b/src/api/viya/spec/mockResponses.ts @@ -31,6 +31,13 @@ export const mockJob: Job = { type: 'log', uri: 'log' }, + { + rel: 'self', + href: '/job', + method: 'GET', + type: 'job', + uri: 'job' + }, { rel: 'state', href: '/state', @@ -54,3 +61,13 @@ export const mockAuthConfig: AuthConfig = { access_token: 'acc355', refresh_token: 'r3fr35h' } + +export class MockStream { + _write(chunk: string, _: any, next: Function) { + next() + } + + reset() {} + + destroy() {} +} diff --git a/src/api/viya/spec/pollJobState.spec.ts b/src/api/viya/spec/pollJobState.spec.ts index 23ac560..dcbd2b9 100644 --- a/src/api/viya/spec/pollJobState.spec.ts +++ b/src/api/viya/spec/pollJobState.spec.ts @@ -1,10 +1,11 @@ +import * as fs from 'fs' +import { Logger, LogLevel } from '@sasjs/utils' import { RequestClient } from '../../../request/RequestClient' import { mockAuthConfig, mockJob } from './mockResponses' import { pollJobState } from '../pollJobState' import * as getTokensModule from '../../../auth/getTokens' import * as saveLogModule from '../saveLog' import { PollOptions } from '../../../types' -import { Logger, LogLevel } from '@sasjs/utils' const requestClient = new (>RequestClient)() const defaultPollOptions: PollOptions = { @@ -24,7 +25,6 @@ describe('pollJobState', () => { requestClient, mockJob, false, - 'test', mockAuthConfig, defaultPollOptions ) @@ -40,7 +40,6 @@ describe('pollJobState', () => { requestClient, mockJob, false, - 'test', undefined, defaultPollOptions ) @@ -53,7 +52,6 @@ describe('pollJobState', () => { requestClient, { ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'state') }, false, - 'test', undefined, defaultPollOptions ).catch((e) => e) @@ -62,23 +60,12 @@ describe('pollJobState', () => { }) it('should attempt to refresh tokens before each poll', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => - Promise.resolve({ result: 'running', etag: '' }) - ) - .mockImplementation(() => - Promise.resolve({ result: 'completed', etag: '' }) - ) + mockSimplePoll() await pollJobState( requestClient, mockJob, false, - 'test', mockAuthConfig, defaultPollOptions ) @@ -87,23 +74,12 @@ describe('pollJobState', () => { }) it('should attempt to fetch and save the log after each poll', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => - Promise.resolve({ result: 'running', etag: '' }) - ) - .mockImplementation(() => - Promise.resolve({ result: 'completed', etag: '' }) - ) + mockSimplePoll() await pollJobState( requestClient, mockJob, false, - 'test', mockAuthConfig, defaultPollOptions ) @@ -112,20 +88,12 @@ describe('pollJobState', () => { }) it('should return the current status when the max poll count is reached', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => - Promise.resolve({ result: 'running', etag: '' }) - ) + mockRunningPoll() const state = await pollJobState( requestClient, mockJob, false, - 'test', mockAuthConfig, { ...defaultPollOptions, @@ -136,51 +104,47 @@ describe('pollJobState', () => { expect(state).toEqual('running') }) - it('should continue polling until the job completes or errors', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => - Promise.resolve({ result: 'running', etag: '' }) - ) - .mockImplementation(() => - Promise.resolve({ result: 'completed', etag: '' }) - ) + it('should poll with a larger interval for longer running jobs', async () => { + mockLongPoll() + + const state = await pollJobState( + requestClient, + mockJob, + false, + mockAuthConfig, + { + ...defaultPollOptions, + maxPollCount: 200, + pollInterval: 10 + } + ) + + expect(state).toEqual('completed') + }, 200000) + + it('should continue polling until the job completes or errors', async () => { + mockSimplePoll(1) const state = await pollJobState( requestClient, mockJob, false, - 'test', undefined, defaultPollOptions ) - expect(requestClient.get).toHaveBeenCalledTimes(4) + expect(requestClient.get).toHaveBeenCalledTimes(3) expect(state).toEqual('completed') }) it('should print the state to the console when debug is on', async () => { jest.spyOn((process as any).logger, 'info') - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => - Promise.resolve({ result: 'running', etag: '' }) - ) - .mockImplementation(() => - Promise.resolve({ result: 'completed', etag: '' }) - ) + mockSimplePoll() await pollJobState( requestClient, mockJob, true, - 'test', undefined, defaultPollOptions ) @@ -205,21 +169,12 @@ describe('pollJobState', () => { }) it('should continue polling when there is a single error in between', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementationOnce(() => - Promise.resolve({ result: 'pending', etag: '' }) - ) - .mockImplementationOnce(() => Promise.reject('Status Error')) - .mockImplementationOnce(() => - Promise.resolve({ result: 'completed', etag: '' }) - ) + mockPollWithSingleError() const state = await pollJobState( requestClient, mockJob, false, - 'test', undefined, defaultPollOptions ) @@ -229,20 +184,19 @@ describe('pollJobState', () => { }) it('should throw an error when the error count exceeds the set value of 5', async () => { - jest - .spyOn(requestClient, 'get') - .mockImplementation(() => Promise.reject('Status Error')) + mockErroredPoll() const error = await pollJobState( requestClient, mockJob, false, - 'test', undefined, defaultPollOptions ).catch((e) => e) - expect(error).toContain('Error while getting job state after interval.') + expect(error.message).toEqual( + 'Error while polling job state for job j0b: Status Error' + ) }) }) @@ -251,11 +205,12 @@ const setupMocks = () => { jest.mock('../../../request/RequestClient') jest.mock('../../../auth/getTokens') jest.mock('../saveLog') + jest.mock('fs') jest .spyOn(requestClient, 'get') .mockImplementation(() => - Promise.resolve({ result: 'completed', etag: '' }) + Promise.resolve({ result: 'completed', etag: '', status: 200 }) ) jest .spyOn(getTokensModule, 'getTokens') @@ -263,4 +218,84 @@ const setupMocks = () => { jest .spyOn(saveLogModule, 'saveLog') .mockImplementation(() => Promise.resolve()) + jest + .spyOn(fs, 'createWriteStream') + .mockImplementation(() => ({} as unknown as fs.WriteStream)) +} + +const mockSimplePoll = (runningCount = 2) => { + let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { + count++ + if (url.includes('job')) { + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) + } + return Promise.resolve({ + result: + count === 0 + ? 'pending' + : count <= runningCount + ? 'running' + : 'completed', + etag: '', + status: 200 + }) + }) +} + +const mockRunningPoll = () => { + let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { + count++ + if (url.includes('job')) { + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) + } + return Promise.resolve({ + result: count === 0 ? 'pending' : 'running', + etag: '', + status: 200 + }) + }) +} + +const mockLongPoll = () => { + let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { + count++ + if (url.includes('job')) { + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) + } + return Promise.resolve({ + result: count <= 101 ? 'running' : 'completed', + etag: '', + status: 200 + }) + }) +} + +const mockPollWithSingleError = () => { + let count = 0 + jest.spyOn(requestClient, 'get').mockImplementation((url) => { + count++ + if (url.includes('job')) { + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) + } + if (count === 1) { + return Promise.reject('Status Error') + } + return Promise.resolve({ + result: count === 0 ? 'pending' : 'completed', + etag: '', + status: 200 + }) + }) +} + +const mockErroredPoll = () => { + jest.spyOn(requestClient, 'get').mockImplementation((url) => { + if (url.includes('job')) { + return Promise.resolve({ result: mockJob, etag: '', status: 200 }) + } + return Promise.reject('Status Error') + }) } diff --git a/src/api/viya/spec/saveLog.spec.ts b/src/api/viya/spec/saveLog.spec.ts index c4b8b9d..4d35c6f 100644 --- a/src/api/viya/spec/saveLog.spec.ts +++ b/src/api/viya/spec/saveLog.spec.ts @@ -1,11 +1,13 @@ import { Logger, LogLevel } from '@sasjs/utils' -import * as fileModule from '@sasjs/utils/file' import { RequestClient } from '../../../request/RequestClient' import * as fetchLogsModule from '../../../utils/fetchLogByChunks' +import * as writeStreamModule from '../writeStream' import { saveLog } from '../saveLog' import { mockJob } from './mockResponses' +import { WriteStream } from 'fs' const requestClient = new (>RequestClient)() +const stream = {} as unknown as WriteStream describe('saveLog', () => { beforeEach(() => { @@ -14,16 +16,21 @@ describe('saveLog', () => { }) it('should return immediately if shouldSaveLog is false', async () => { - await saveLog(mockJob, requestClient, false, '/test', 't0k3n') + await saveLog(mockJob, requestClient, false, 0, 100, stream, 't0k3n') - expect(fetchLogsModule.fetchLogByChunks).not.toHaveBeenCalled() - expect(fileModule.createFile).not.toHaveBeenCalled() + expect(fetchLogsModule.fetchLog).not.toHaveBeenCalled() + expect(writeStreamModule.writeStream).not.toHaveBeenCalled() }) it('should throw an error when a valid access token is not provided', async () => { - const error = await saveLog(mockJob, requestClient, true, '/test').catch( - (e) => e - ) + const error = await saveLog( + mockJob, + requestClient, + true, + 0, + 100, + stream + ).catch((e) => e) expect(error.message).toContain( `Logs for job ${mockJob.id} cannot be fetched without a valid access token.` @@ -35,7 +42,9 @@ describe('saveLog', () => { { ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'log') }, requestClient, true, - '/test', + 0, + 100, + stream, 't0k3n' ).catch((e) => e) @@ -45,15 +54,19 @@ describe('saveLog', () => { }) it('should fetch and save logs to the given path', async () => { - await saveLog(mockJob, requestClient, true, '/test', 't0k3n') + await saveLog(mockJob, requestClient, true, 0, 100, stream, 't0k3n') - expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith( + expect(fetchLogsModule.fetchLog).toHaveBeenCalledWith( requestClient, 't0k3n', '/log/content', + 0, 100 ) - expect(fileModule.createFile).toHaveBeenCalledWith('/test', 'Test Log') + expect(writeStreamModule.writeStream).toHaveBeenCalledWith( + stream, + 'Test Log' + ) }) }) @@ -62,11 +75,12 @@ const setupMocks = () => { jest.mock('../../../request/RequestClient') jest.mock('../../../utils/fetchLogByChunks') jest.mock('@sasjs/utils') + jest.mock('../writeStream') jest - .spyOn(fetchLogsModule, 'fetchLogByChunks') + .spyOn(fetchLogsModule, 'fetchLog') .mockImplementation(() => Promise.resolve('Test Log')) jest - .spyOn(fileModule, 'createFile') + .spyOn(writeStreamModule, 'writeStream') .mockImplementation(() => Promise.resolve()) } diff --git a/src/api/viya/writeStream.ts b/src/api/viya/writeStream.ts new file mode 100644 index 0000000..dc09885 --- /dev/null +++ b/src/api/viya/writeStream.ts @@ -0,0 +1,15 @@ +import { WriteStream } from 'fs' + +export const writeStream = async ( + stream: WriteStream, + content: string +): Promise => { + return new Promise((resolve, reject) => { + stream.write(content + '\n\nnext chunk\n\n', (e) => { + if (e) { + return reject(e) + } + return resolve() + }) + }) +} diff --git a/src/types/errors/JobStatePollError.ts b/src/types/errors/JobStatePollError.ts new file mode 100644 index 0000000..2584b1c --- /dev/null +++ b/src/types/errors/JobStatePollError.ts @@ -0,0 +1,11 @@ +export class JobStatePollError extends Error { + constructor(id: string, public originalError: Error) { + super( + `Error while polling job state for job ${id}: ${ + originalError.message || originalError + }` + ) + this.name = 'JobStatePollError' + Object.setPrototypeOf(this, JobStatePollError.prototype) + } +} diff --git a/src/types/errors/index.ts b/src/types/errors/index.ts index 72b63cc..f8595d4 100644 --- a/src/types/errors/index.ts +++ b/src/types/errors/index.ts @@ -2,6 +2,7 @@ export * from './AuthorizeError' export * from './ComputeJobExecutionError' export * from './InternalServerError' export * from './JobExecutionError' +export * from './JobStatePollError' export * from './LoginRequiredError' export * from './NotFoundError' export * from './ErrorResponse' diff --git a/src/utils/fetchLogByChunks.ts b/src/utils/fetchLogByChunks.ts index a149c4c..22e888a 100644 --- a/src/utils/fetchLogByChunks.ts +++ b/src/utils/fetchLogByChunks.ts @@ -14,18 +14,36 @@ export const fetchLogByChunks = async ( accessToken: string, logUrl: string, logCount: number +): Promise => { + return await fetchLog(requestClient, accessToken, logUrl, 0, logCount) +} + +/** + * Fetches a section of the log file delineated by start and end lines + * @param {object} requestClient - client object of Request Client. + * @param {string} accessToken - an access token for an authorized user. + * @param {string} logUrl - url of the log file. + * @param {number} start - the line at which to start fetching the log. + * @param {number} end - the line at which to stop fetching the log. + * @returns an string containing log lines. + */ +export const fetchLog = async ( + requestClient: RequestClient, + accessToken: string, + logUrl: string, + start: number, + end: number ): Promise => { const logger = process.logger || console let log: string = '' - const loglimit = logCount < 10000 ? logCount : 10000 - let start = 0 + const loglimit = end < 10000 ? end : 10000 do { logger.info( `Fetching logs from line no: ${start + 1} to ${ start + loglimit - } of ${logCount}.` + } of ${end}.` ) const logChunkJson = await requestClient! .get(`${logUrl}?start=${start}&limit=${loglimit}`, accessToken) @@ -40,6 +58,6 @@ export const fetchLogByChunks = async ( log += logChunk start += loglimit - } while (start < logCount) + } while (start < end) return log }