1
0
mirror of https://github.com/sasjs/adapter.git synced 2026-01-10 22:00:05 +00:00

chore(streamlog): optimise polling mechanism

This commit is contained in:
Krishna Acondy
2021-07-20 09:25:39 +01:00
parent 1594f0c7db
commit 5c8d311ae8
14 changed files with 624 additions and 230 deletions

View File

@@ -1,16 +1,19 @@
import { AuthConfig } from '@sasjs/utils/types'
import { prefixMessage } from '@sasjs/utils/error'
import { generateTimestamp } from '@sasjs/utils/time'
import { Job, PollOptions } from '../..'
import { getTokens } from '../../auth/getTokens'
import { RequestClient } from '../../request/RequestClient'
import { JobStatePollError } from '../../types/errors'
import { generateTimestamp } from '@sasjs/utils/time'
import { saveLog } from './saveLog'
import { createWriteStream } from '@sasjs/utils/file'
import { WriteStream } from 'fs'
import { Link } from '../../types'
import { prefixMessage } from '@sasjs/utils/error'
export async function pollJobState(
requestClient: RequestClient,
postedJob: Job,
debug: boolean,
etag: string | null,
authConfig?: AuthConfig,
pollOptions?: PollOptions
) {
@@ -18,130 +21,226 @@ export async function pollJobState(
let pollInterval = 300
let maxPollCount = 1000
let maxErrorCount = 5
let access_token = (authConfig || {}).access_token
const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log`
const logFilePath = `${
pollOptions?.logFolderPath || process.cwd()
}/${logFileName}`
if (authConfig) {
;({ access_token } = await getTokens(requestClient, authConfig))
}
if (pollOptions) {
pollInterval = pollOptions.pollInterval || pollInterval
maxPollCount = pollOptions.maxPollCount || maxPollCount
}
let postedJobState = ''
let pollCount = 0
let errorCount = 0
const headers: any = {
'Content-Type': 'application/json',
'If-None-Match': etag
}
if (access_token) {
headers.Authorization = `Bearer ${access_token}`
}
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
const { result: state } = await requestClient
.get<string>(
`${stateLink.href}?_action=wait&wait=300`,
access_token,
'text/plain',
{},
debug
let currentState = await getJobState(
requestClient,
postedJob,
'',
debug,
authConfig
).catch((err) => {
logger.error(
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
err
)
.catch((err) => {
logger.error(
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
err
)
return { result: 'unavailable' }
})
return 'unavailable'
})
let pollCount = 0
const currentState = state.trim()
if (currentState === 'completed') {
return Promise.resolve(currentState)
}
return new Promise(async (resolve, reject) => {
let printedState = ''
let logFileStream
if (pollOptions?.streamLog) {
const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log`
const logFilePath = `${
pollOptions?.logFolderPath || process.cwd()
}/${logFileName}`
const interval = setInterval(async () => {
if (
postedJobState === 'running' ||
postedJobState === '' ||
postedJobState === 'pending' ||
postedJobState === 'unavailable'
) {
if (authConfig) {
;({ access_token } = await getTokens(requestClient, authConfig))
}
logFileStream = await createWriteStream(logFilePath)
}
if (stateLink) {
const { result: jobState } = await requestClient
.get<string>(
`${stateLink.href}?_action=wait&wait=300`,
access_token,
'text/plain',
{},
debug
)
.catch((err) => {
errorCount++
if (pollCount >= maxPollCount || errorCount >= maxErrorCount) {
clearInterval(interval)
reject(
prefixMessage(
err,
'Error while getting job state after interval. '
)
)
}
logger.error(
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
err
)
return { result: 'unavailable' }
})
let result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
authConfig,
pollOptions,
logFileStream
)
postedJobState = jobState.trim()
if (postedJobState != 'unavailable' && errorCount > 0) {
errorCount = 0
}
currentState = result.state
pollCount = result.pollCount
if (debug && printedState !== postedJobState) {
logger.info('Polling job status...')
logger.info(`Current job state: ${postedJobState}`)
if (!needsRetry(currentState) || pollCount >= maxPollCount) {
return currentState
}
printedState = postedJobState
}
// If we get to this point, this is a long-running job that needs longer polling.
// We will resume polling with a bigger interval of 1 minute
let longJobPollOptions: PollOptions = {
maxPollCount: 24 * 60,
pollInterval: 60000,
streamLog: false
}
if (pollOptions) {
longJobPollOptions.streamLog = pollOptions.streamLog
longJobPollOptions.logFolderPath = pollOptions.logFolderPath
}
pollCount++
result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
authConfig,
longJobPollOptions,
logFileStream
)
await saveLog(
postedJob,
requestClient,
pollOptions?.streamLog || false,
logFilePath,
access_token
)
currentState = result.state
pollCount = result.pollCount
if (pollCount >= maxPollCount) {
resolve(postedJobState)
}
}
} else {
clearInterval(interval)
resolve(postedJobState)
}
}, pollInterval)
})
if (logFileStream) {
logFileStream.end()
}
return currentState
}
const getJobState = async (
requestClient: RequestClient,
job: Job,
currentState: string,
debug: boolean,
authConfig?: AuthConfig
) => {
const stateLink = job.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
if (needsRetry(currentState)) {
let tokens
if (authConfig) {
tokens = await getTokens(requestClient, authConfig)
}
const { result: jobState } = await requestClient
.get<string>(
`${stateLink.href}?_action=wait&wait=300`,
tokens?.access_token,
'text/plain',
{},
debug
)
.catch((err) => {
throw new JobStatePollError(job.id, err)
})
return jobState.trim()
} else {
return currentState
}
}
const needsRetry = (state: string) =>
state === 'running' ||
state === '' ||
state === 'pending' ||
state === 'unavailable'
const doPoll = async (
requestClient: RequestClient,
postedJob: Job,
currentState: string,
debug: boolean,
pollCount: number,
authConfig?: AuthConfig,
pollOptions?: PollOptions,
logStream?: WriteStream
): Promise<{ state: string; pollCount: number }> => {
let pollInterval = 300
let maxPollCount = 1000
let maxErrorCount = 5
let errorCount = 0
let state = currentState
let printedState = ''
let startLogLine = 0
const logger = process.logger || console
if (pollOptions) {
pollInterval = pollOptions.pollInterval || pollInterval
maxPollCount = pollOptions.maxPollCount || maxPollCount
}
const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
while (needsRetry(state) && pollCount <= 100 && pollCount <= maxPollCount) {
state = await getJobState(
requestClient,
postedJob,
state,
debug,
authConfig
).catch((err) => {
errorCount++
if (pollCount >= maxPollCount || errorCount >= maxErrorCount) {
throw err
}
logger.error(
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
err
)
return 'unavailable'
})
pollCount++
const jobUrl = postedJob.links.find((l: Link) => l.rel === 'self')
const { result: job } = await requestClient.get<Job>(
jobUrl!.href,
authConfig?.access_token
)
const endLogLine = job.logStatistics?.lineCount ?? 1000000
await saveLog(
postedJob,
requestClient,
pollOptions?.streamLog || false,
startLogLine,
endLogLine,
logStream,
authConfig?.access_token
)
startLogLine += job.logStatistics.lineCount
if (debug && printedState !== state) {
logger.info('Polling job status...')
logger.info(`Current job state: ${state}`)
printedState = state
}
if (state != 'unavailable' && errorCount > 0) {
errorCount = 0
}
await delay(pollInterval)
}
return { state, pollCount }
}
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))