mirror of
https://github.com/sasjs/adapter.git
synced 2026-01-08 13:00:05 +00:00
feat(log): write logs to file when polling for job status
This commit is contained in:
303
src/api/viya/executeScript.ts
Normal file
303
src/api/viya/executeScript.ts
Normal file
@@ -0,0 +1,303 @@
|
||||
import {
|
||||
AuthConfig,
|
||||
MacroVar,
|
||||
Logger,
|
||||
LogLevel,
|
||||
timestampToYYYYMMDDHHMMSS
|
||||
} from '@sasjs/utils'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import {
|
||||
PollOptions,
|
||||
Job,
|
||||
ComputeJobExecutionError,
|
||||
NotFoundError
|
||||
} from '../..'
|
||||
import { getTokens } from '../../auth/tokens'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { SessionManager } from '../../SessionManager'
|
||||
import { isRelativePath, fetchLogByChunks } from '../../utils'
|
||||
import { formatDataForRequest } from '../../utils/formatDataForRequest'
|
||||
import { pollJobState } from './pollJobState'
|
||||
import { uploadTables } from './uploadTables'
|
||||
|
||||
/**
|
||||
* Executes code on the current SAS Viya server.
|
||||
* @param jobPath - the path to the file being submitted for execution.
|
||||
* @param linesOfCode - an array of code lines to execute.
|
||||
* @param contextName - the context to execute the code in.
|
||||
* @param authConfig - an object containing an access token, refresh token, client ID and secret.
|
||||
* @param data - execution data.
|
||||
* @param debug - when set to true, the log will be returned.
|
||||
* @param expectWebout - when set to true, the automatic _webout fileref will be checked for content, and that content returned. This fileref is used when the Job contains a SASjs web request (as opposed to executing arbitrary SAS code).
|
||||
* @param waitForResult - when set to true, function will return the session
|
||||
* @param pollOptions - an object that represents poll interval(milliseconds) and maximum amount of attempts. Object example: { MAX_POLL_COUNT: 24 * 60 * 60, POLL_INTERVAL: 1000 }.
|
||||
* @param printPid - a boolean that indicates whether the function should print (PID) of the started job.
|
||||
* @param variables - an object that represents macro variables.
|
||||
*/
|
||||
export async function executeScript(
|
||||
requestClient: RequestClient,
|
||||
sessionManager: SessionManager,
|
||||
rootFolderName: string,
|
||||
jobPath: string,
|
||||
linesOfCode: string[],
|
||||
contextName: string,
|
||||
authConfig?: AuthConfig,
|
||||
data = null,
|
||||
debug: boolean = false,
|
||||
expectWebout = false,
|
||||
waitForResult = true,
|
||||
pollOptions?: PollOptions,
|
||||
printPid = false,
|
||||
variables?: MacroVar
|
||||
): Promise<any> {
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
const logger = process.logger || console
|
||||
|
||||
try {
|
||||
let executionSessionId: string
|
||||
|
||||
const session = await sessionManager
|
||||
.getSession(access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session. ')
|
||||
})
|
||||
|
||||
executionSessionId = session!.id
|
||||
|
||||
if (printPid) {
|
||||
const { result: jobIdVariable } = await sessionManager
|
||||
.getVariable(executionSessionId, 'SYSJOBID', access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session variable. ')
|
||||
})
|
||||
|
||||
if (jobIdVariable && jobIdVariable.value) {
|
||||
const relativeJobPath = rootFolderName
|
||||
? jobPath.split(rootFolderName).join('').replace(/^\//, '')
|
||||
: jobPath
|
||||
|
||||
const logger = new Logger(debug ? LogLevel.Debug : LogLevel.Info)
|
||||
|
||||
logger.info(
|
||||
`Triggered '${relativeJobPath}' with PID ${
|
||||
jobIdVariable.value
|
||||
} at ${timestampToYYYYMMDDHHMMSS()}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const jobArguments: { [key: string]: any } = {
|
||||
_contextName: contextName,
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: true,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: true
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
jobArguments['_OMITTEXTLOG'] = false
|
||||
jobArguments['_OMITSESSIONRESULTS'] = false
|
||||
}
|
||||
|
||||
let fileName
|
||||
|
||||
if (isRelativePath(jobPath)) {
|
||||
fileName = `exec-${
|
||||
jobPath.includes('/') ? jobPath.split('/')[1] : jobPath
|
||||
}`
|
||||
} else {
|
||||
const jobPathParts = jobPath.split('/')
|
||||
fileName = jobPathParts.pop()
|
||||
}
|
||||
|
||||
let jobVariables: any = {
|
||||
SYS_JES_JOB_URI: '',
|
||||
_program: isRelativePath(jobPath)
|
||||
? rootFolderName + '/' + jobPath
|
||||
: jobPath
|
||||
}
|
||||
|
||||
if (variables) jobVariables = { ...jobVariables, ...variables }
|
||||
|
||||
if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 }
|
||||
|
||||
let files: any[] = []
|
||||
|
||||
if (data) {
|
||||
if (JSON.stringify(data).includes(';')) {
|
||||
files = await uploadTables(requestClient, data, access_token).catch(
|
||||
(err) => {
|
||||
throw prefixMessage(err, 'Error while uploading tables. ')
|
||||
}
|
||||
)
|
||||
|
||||
jobVariables['_webin_file_count'] = files.length
|
||||
|
||||
files.forEach((fileInfo, index) => {
|
||||
jobVariables[
|
||||
`_webin_fileuri${index + 1}`
|
||||
] = `/files/files/${fileInfo.file.id}`
|
||||
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName
|
||||
})
|
||||
} else {
|
||||
jobVariables = { ...jobVariables, ...formatDataForRequest(data) }
|
||||
}
|
||||
}
|
||||
|
||||
// Execute job in session
|
||||
const jobRequestBody = {
|
||||
name: fileName,
|
||||
description: 'Powered by SASjs',
|
||||
code: linesOfCode,
|
||||
variables: jobVariables,
|
||||
arguments: jobArguments
|
||||
}
|
||||
|
||||
const { result: postedJob, etag } = await requestClient
|
||||
.post<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs`,
|
||||
jobRequestBody,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while posting job. ')
|
||||
})
|
||||
|
||||
if (!waitForResult) return session
|
||||
|
||||
if (debug) {
|
||||
logger.info(`Job has been submitted for '${fileName}'.`)
|
||||
logger.info(
|
||||
`You can monitor the job progress at '${requestClient.getBaseUrl()}${
|
||||
postedJob.links.find((l: any) => l.rel === 'state')!.href
|
||||
}'.`
|
||||
)
|
||||
}
|
||||
|
||||
const jobStatus = await pollJobState(
|
||||
requestClient,
|
||||
postedJob,
|
||||
debug,
|
||||
etag,
|
||||
authConfig,
|
||||
pollOptions
|
||||
).catch(async (err) => {
|
||||
const error = err?.response?.data
|
||||
const result = /err=[0-9]*,/.exec(error)
|
||||
|
||||
const errorCode = '5113'
|
||||
if (result?.[0]?.slice(4, -1) === errorCode) {
|
||||
const sessionLogUrl =
|
||||
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
|
||||
const logCount = 1000000
|
||||
err.log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
sessionLogUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
})
|
||||
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
const { result: currentJob } = await requestClient
|
||||
.get<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting job. ')
|
||||
})
|
||||
|
||||
let jobResult
|
||||
let log = ''
|
||||
|
||||
const logLink = currentJob.links.find((l) => l.rel === 'log')
|
||||
|
||||
if (debug && logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
|
||||
if (jobStatus === 'failed' || jobStatus === 'error') {
|
||||
return Promise.reject(new ComputeJobExecutionError(currentJob, log))
|
||||
}
|
||||
|
||||
let resultLink
|
||||
|
||||
if (expectWebout) {
|
||||
resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
|
||||
} else {
|
||||
return { job: currentJob, log }
|
||||
}
|
||||
|
||||
if (resultLink) {
|
||||
jobResult = await requestClient
|
||||
.get<any>(resultLink, access_token, 'text/plain')
|
||||
.catch(async (e) => {
|
||||
if (e instanceof NotFoundError) {
|
||||
if (logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
|
||||
return Promise.reject({
|
||||
status: 500,
|
||||
log
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
result: JSON.stringify(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
await sessionManager
|
||||
.clearSession(executionSessionId, access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while clearing session. ')
|
||||
})
|
||||
|
||||
return { result: jobResult?.result, log }
|
||||
} catch (e) {
|
||||
if (e && e.status === 404) {
|
||||
return executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
rootFolderName,
|
||||
jobPath,
|
||||
linesOfCode,
|
||||
contextName,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
false,
|
||||
true
|
||||
)
|
||||
} else {
|
||||
throw prefixMessage(e, 'Error while executing script. ')
|
||||
}
|
||||
}
|
||||
}
|
||||
178
src/api/viya/pollJobState.ts
Normal file
178
src/api/viya/pollJobState.ts
Normal file
@@ -0,0 +1,178 @@
|
||||
import { AuthConfig, createFile, generateTimestamp } from '@sasjs/utils'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import { Job, PollOptions } from '../..'
|
||||
import { getTokens } from '../../auth/tokens'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { fetchLogByChunks } from '../../utils'
|
||||
|
||||
export async function pollJobState(
|
||||
requestClient: RequestClient,
|
||||
postedJob: Job,
|
||||
debug: boolean,
|
||||
etag: string | null,
|
||||
authConfig?: AuthConfig,
|
||||
pollOptions?: PollOptions
|
||||
) {
|
||||
const logger = process.logger || console
|
||||
|
||||
let POLL_INTERVAL = 300
|
||||
let MAX_POLL_COUNT = 1000
|
||||
let MAX_ERROR_COUNT = 5
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
if (pollOptions) {
|
||||
POLL_INTERVAL = pollOptions.pollInterval || POLL_INTERVAL
|
||||
MAX_POLL_COUNT = pollOptions.maxPollCount || MAX_POLL_COUNT
|
||||
}
|
||||
|
||||
let postedJobState = ''
|
||||
let pollCount = 0
|
||||
let errorCount = 0
|
||||
const headers: any = {
|
||||
'Content-Type': 'application/json',
|
||||
'If-None-Match': etag
|
||||
}
|
||||
if (access_token) {
|
||||
headers.Authorization = `Bearer ${access_token}`
|
||||
}
|
||||
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
return Promise.reject(`Job state link was not found.`)
|
||||
}
|
||||
|
||||
const { result: state } = await requestClient
|
||||
.get<string>(
|
||||
`${stateLink.href}?_action=wait&wait=300`,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
debug
|
||||
)
|
||||
.catch((err) => {
|
||||
logger.error(
|
||||
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return { result: 'unavailable' }
|
||||
})
|
||||
|
||||
const currentState = state.trim()
|
||||
if (currentState === 'completed') {
|
||||
return Promise.resolve(currentState)
|
||||
}
|
||||
|
||||
return new Promise(async (resolve, _) => {
|
||||
let printedState = ''
|
||||
|
||||
const interval = setInterval(async () => {
|
||||
if (
|
||||
postedJobState === 'running' ||
|
||||
postedJobState === '' ||
|
||||
postedJobState === 'pending' ||
|
||||
postedJobState === 'unavailable'
|
||||
) {
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
if (stateLink) {
|
||||
const { result: jobState } = await requestClient
|
||||
.get<string>(
|
||||
`${stateLink.href}?_action=wait&wait=300`,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
debug
|
||||
)
|
||||
.catch((err) => {
|
||||
errorCount++
|
||||
if (
|
||||
pollCount >= MAX_POLL_COUNT ||
|
||||
errorCount >= MAX_ERROR_COUNT
|
||||
) {
|
||||
throw prefixMessage(
|
||||
err,
|
||||
'Error while getting job state after interval. '
|
||||
)
|
||||
}
|
||||
logger.error(
|
||||
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return { result: 'unavailable' }
|
||||
})
|
||||
|
||||
postedJobState = jobState.trim()
|
||||
if (postedJobState != 'unavailable' && errorCount > 0) {
|
||||
errorCount = 0
|
||||
}
|
||||
|
||||
if (debug && printedState !== postedJobState) {
|
||||
logger.info('Polling job status...')
|
||||
logger.info(`Current job state: ${postedJobState}`)
|
||||
|
||||
printedState = postedJobState
|
||||
}
|
||||
|
||||
pollCount++
|
||||
|
||||
await saveLog(
|
||||
postedJob,
|
||||
requestClient,
|
||||
pollOptions?.streamLog || false,
|
||||
pollOptions?.logFilePath,
|
||||
access_token
|
||||
)
|
||||
|
||||
if (pollCount >= MAX_POLL_COUNT) {
|
||||
resolve(postedJobState)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
clearInterval(interval)
|
||||
resolve(postedJobState)
|
||||
}
|
||||
}, POLL_INTERVAL)
|
||||
})
|
||||
}
|
||||
|
||||
async function saveLog(
|
||||
job: Job,
|
||||
requestClient: RequestClient,
|
||||
shouldSaveLog: boolean,
|
||||
logFilePath?: string,
|
||||
accessToken?: string
|
||||
) {
|
||||
if (!shouldSaveLog) {
|
||||
return
|
||||
}
|
||||
|
||||
if (!accessToken) {
|
||||
throw new Error(
|
||||
`Logs for job ${job.id} cannot be fetched without a valid access token.`
|
||||
)
|
||||
}
|
||||
|
||||
const logger = process.logger || console
|
||||
const logFileName = `${job.name || 'job'}-${generateTimestamp()}.log`
|
||||
const logPath = `${logFilePath || process.cwd()}/${logFileName}`
|
||||
const jobLogUrl = job.links.find((l) => l.rel === 'log')
|
||||
|
||||
if (!jobLogUrl) {
|
||||
throw new Error(`Log URL for job ${job.id} was not found.`)
|
||||
}
|
||||
|
||||
const logCount = job.logStatistics?.lineCount ?? 1000000
|
||||
const log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
accessToken,
|
||||
`${jobLogUrl.href}/content`,
|
||||
logCount
|
||||
)
|
||||
|
||||
logger.info(`Writing logs to ${logPath}`)
|
||||
await createFile(logPath, log)
|
||||
}
|
||||
35
src/api/viya/uploadTables.ts
Normal file
35
src/api/viya/uploadTables.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { convertToCSV } from '../../utils/convertToCsv'
|
||||
|
||||
export async function uploadTables(
|
||||
requestClient: RequestClient,
|
||||
data: any,
|
||||
accessToken?: string
|
||||
) {
|
||||
const uploadedFiles = []
|
||||
const headers: any = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`
|
||||
}
|
||||
|
||||
for (const tableName in data) {
|
||||
const csv = convertToCSV(data[tableName])
|
||||
if (csv === 'ERROR: LARGE STRING LENGTH') {
|
||||
throw new Error(
|
||||
'The max length of a string value in SASjs is 32765 characters.'
|
||||
)
|
||||
}
|
||||
|
||||
const uploadResponse = await requestClient
|
||||
.uploadFile(`/files/files#rawUpload`, csv, accessToken)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while uploading file. ')
|
||||
})
|
||||
|
||||
uploadedFiles.push({ tableName, file: uploadResponse.result })
|
||||
}
|
||||
return uploadedFiles
|
||||
}
|
||||
Reference in New Issue
Block a user