mirror of
https://github.com/sasjs/adapter.git
synced 2025-12-11 01:14:36 +00:00
Merge pull request #465 from sasjs/stream-job-logs
feat(stream-logs): Save logs to file during job status poll
This commit is contained in:
4
.github/workflows/build.yml
vendored
4
.github/workflows/build.yml
vendored
@@ -27,6 +27,10 @@ jobs:
|
||||
run: npm run lint
|
||||
- name: Run unit tests
|
||||
run: npm test
|
||||
- name: Generate coverage report
|
||||
uses: artiomtr/jest-coverage-report-action@v2.0-rc.2
|
||||
with:
|
||||
github-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
- name: Build Package
|
||||
run: npm run package:lib
|
||||
env:
|
||||
|
||||
16272
package-lock.json
generated
16272
package-lock.json
generated
File diff suppressed because it is too large
Load Diff
15
package.json
15
package.json
@@ -38,15 +38,16 @@
|
||||
},
|
||||
"license": "ISC",
|
||||
"devDependencies": {
|
||||
"@types/jest": "^26.0.23",
|
||||
"@types/axios": "^0.14.0",
|
||||
"@types/form-data": "^2.5.0",
|
||||
"@types/jest": "^26.0.24",
|
||||
"@types/mime": "^2.0.3",
|
||||
"@types/tough-cookie": "^4.0.0",
|
||||
"@types/tough-cookie": "^4.0.1",
|
||||
"copyfiles": "^2.4.1",
|
||||
"cp": "^0.2.0",
|
||||
"dotenv": "^10.0.0",
|
||||
"jest": "^27.0.6",
|
||||
"jest-extended": "^0.11.5",
|
||||
"mime": "^2.5.2",
|
||||
"node-polyfill-webpack-plugin": "^1.1.4",
|
||||
"path": "^0.12.7",
|
||||
"process": "^0.11.10",
|
||||
@@ -57,16 +58,16 @@
|
||||
"ts-loader": "^9.2.2",
|
||||
"tslint": "^6.1.3",
|
||||
"tslint-config-prettier": "^1.18.0",
|
||||
"typedoc": "^0.21.2",
|
||||
"typedoc": "^0.21.4",
|
||||
"typedoc-neo-theme": "^1.1.1",
|
||||
"typedoc-plugin-external-module-name": "^4.0.6",
|
||||
"typescript": "^4.3.4",
|
||||
"webpack": "^5.41.1",
|
||||
"typescript": "^4.3.5",
|
||||
"webpack": "^5.44.0",
|
||||
"webpack-cli": "^4.7.2"
|
||||
},
|
||||
"main": "index.js",
|
||||
"dependencies": {
|
||||
"@sasjs/utils": "^2.23.2",
|
||||
"@sasjs/utils": "^2.25.4",
|
||||
"axios": "^0.21.1",
|
||||
"axios-cookiejar-support": "^1.0.1",
|
||||
"form-data": "^4.0.0",
|
||||
|
||||
@@ -1,10 +1,4 @@
|
||||
import {
|
||||
convertToCSV,
|
||||
isRelativePath,
|
||||
isUri,
|
||||
isUrl,
|
||||
fetchLogByChunks
|
||||
} from './utils'
|
||||
import { isRelativePath, isUri, isUrl } from './utils'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import {
|
||||
Job,
|
||||
@@ -17,25 +11,19 @@ import {
|
||||
JobDefinition,
|
||||
PollOptions
|
||||
} from './types'
|
||||
import {
|
||||
ComputeJobExecutionError,
|
||||
JobExecutionError,
|
||||
NotFoundError
|
||||
} from './types/errors'
|
||||
import { formatDataForRequest } from './utils/formatDataForRequest'
|
||||
import { JobExecutionError } from './types/errors'
|
||||
import { SessionManager } from './SessionManager'
|
||||
import { ContextManager } from './ContextManager'
|
||||
import { timestampToYYYYMMDDHHMMSS } from '@sasjs/utils/time'
|
||||
import {
|
||||
isAccessTokenExpiring,
|
||||
isRefreshTokenExpiring
|
||||
} from '@sasjs/utils/auth'
|
||||
import { Logger, LogLevel } from '@sasjs/utils/logger'
|
||||
import { SasAuthResponse, MacroVar, AuthConfig } from '@sasjs/utils/types'
|
||||
import { isAuthorizeFormRequired } from './auth/isAuthorizeFormRequired'
|
||||
import { RequestClient } from './request/RequestClient'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import * as mime from 'mime'
|
||||
import { pollJobState } from './api/viya/pollJobState'
|
||||
import { getTokens } from './auth/getTokens'
|
||||
import { uploadTables } from './api/viya/uploadTables'
|
||||
import { executeScript } from './api/viya/executeScript'
|
||||
import { getAccessToken } from './auth/getAccessToken'
|
||||
import { refreshTokens } from './auth/refreshTokens'
|
||||
|
||||
/**
|
||||
* A client for interfacing with the SAS Viya REST API.
|
||||
@@ -171,13 +159,6 @@ export class SASViyaApiClient {
|
||||
throw new Error(`Execution context ${contextName} not found.`)
|
||||
}
|
||||
|
||||
const createSessionRequest = {
|
||||
method: 'POST',
|
||||
headers: {
|
||||
Authorization: `Bearer ${accessToken}`,
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
}
|
||||
const { result: createdSession } = await this.requestClient.post<Session>(
|
||||
`/compute/contexts/${executionContext.id}/sessions`,
|
||||
{},
|
||||
@@ -292,249 +273,22 @@ export class SASViyaApiClient {
|
||||
printPid = false,
|
||||
variables?: MacroVar
|
||||
): Promise<any> {
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
}
|
||||
|
||||
const logger = process.logger || console
|
||||
|
||||
try {
|
||||
let executionSessionId: string
|
||||
|
||||
const session = await this.sessionManager
|
||||
.getSession(access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session. ')
|
||||
})
|
||||
|
||||
executionSessionId = session!.id
|
||||
|
||||
if (printPid) {
|
||||
const { result: jobIdVariable } = await this.sessionManager
|
||||
.getVariable(executionSessionId, 'SYSJOBID', access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session variable. ')
|
||||
})
|
||||
|
||||
if (jobIdVariable && jobIdVariable.value) {
|
||||
const relativeJobPath = this.rootFolderName
|
||||
? jobPath.split(this.rootFolderName).join('').replace(/^\//, '')
|
||||
: jobPath
|
||||
|
||||
const logger = new Logger(debug ? LogLevel.Debug : LogLevel.Info)
|
||||
|
||||
logger.info(
|
||||
`Triggered '${relativeJobPath}' with PID ${
|
||||
jobIdVariable.value
|
||||
} at ${timestampToYYYYMMDDHHMMSS()}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const jobArguments: { [key: string]: any } = {
|
||||
_contextName: contextName,
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: true,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: true
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
jobArguments['_OMITTEXTLOG'] = false
|
||||
jobArguments['_OMITSESSIONRESULTS'] = false
|
||||
}
|
||||
|
||||
let fileName
|
||||
|
||||
if (isRelativePath(jobPath)) {
|
||||
fileName = `exec-${
|
||||
jobPath.includes('/') ? jobPath.split('/')[1] : jobPath
|
||||
}`
|
||||
} else {
|
||||
const jobPathParts = jobPath.split('/')
|
||||
fileName = jobPathParts.pop()
|
||||
}
|
||||
|
||||
let jobVariables: any = {
|
||||
SYS_JES_JOB_URI: '',
|
||||
_program: isRelativePath(jobPath)
|
||||
? this.rootFolderName + '/' + jobPath
|
||||
: jobPath
|
||||
}
|
||||
|
||||
if (variables) jobVariables = { ...jobVariables, ...variables }
|
||||
|
||||
if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 }
|
||||
|
||||
let files: any[] = []
|
||||
|
||||
if (data) {
|
||||
if (JSON.stringify(data).includes(';')) {
|
||||
files = await this.uploadTables(data, access_token).catch((err) => {
|
||||
throw prefixMessage(err, 'Error while uploading tables. ')
|
||||
})
|
||||
|
||||
jobVariables['_webin_file_count'] = files.length
|
||||
|
||||
files.forEach((fileInfo, index) => {
|
||||
jobVariables[
|
||||
`_webin_fileuri${index + 1}`
|
||||
] = `/files/files/${fileInfo.file.id}`
|
||||
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName
|
||||
})
|
||||
} else {
|
||||
jobVariables = { ...jobVariables, ...formatDataForRequest(data) }
|
||||
}
|
||||
}
|
||||
|
||||
// Execute job in session
|
||||
const jobRequestBody = {
|
||||
name: fileName,
|
||||
description: 'Powered by SASjs',
|
||||
code: linesOfCode,
|
||||
variables: jobVariables,
|
||||
arguments: jobArguments
|
||||
}
|
||||
|
||||
const { result: postedJob, etag } = await this.requestClient
|
||||
.post<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs`,
|
||||
jobRequestBody,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while posting job. ')
|
||||
})
|
||||
|
||||
if (!waitForResult) return session
|
||||
|
||||
if (debug) {
|
||||
logger.info(`Job has been submitted for '${fileName}'.`)
|
||||
logger.info(
|
||||
`You can monitor the job progress at '${this.serverUrl}${
|
||||
postedJob.links.find((l: any) => l.rel === 'state')!.href
|
||||
}'.`
|
||||
)
|
||||
}
|
||||
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
authConfig,
|
||||
pollOptions
|
||||
).catch(async (err) => {
|
||||
const error = err?.response?.data
|
||||
const result = /err=[0-9]*,/.exec(error)
|
||||
|
||||
const errorCode = '5113'
|
||||
if (result?.[0]?.slice(4, -1) === errorCode) {
|
||||
const sessionLogUrl =
|
||||
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
|
||||
const logCount = 1000000
|
||||
err.log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
access_token!,
|
||||
sessionLogUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
})
|
||||
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
}
|
||||
|
||||
const { result: currentJob } = await this.requestClient
|
||||
.get<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting job. ')
|
||||
})
|
||||
|
||||
let jobResult
|
||||
let log = ''
|
||||
|
||||
const logLink = currentJob.links.find((l) => l.rel === 'log')
|
||||
|
||||
if (debug && logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
|
||||
if (jobStatus === 'failed' || jobStatus === 'error') {
|
||||
return Promise.reject(new ComputeJobExecutionError(currentJob, log))
|
||||
}
|
||||
|
||||
let resultLink
|
||||
|
||||
if (expectWebout) {
|
||||
resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
|
||||
} else {
|
||||
return { job: currentJob, log }
|
||||
}
|
||||
|
||||
if (resultLink) {
|
||||
jobResult = await this.requestClient
|
||||
.get<any>(resultLink, access_token, 'text/plain')
|
||||
.catch(async (e) => {
|
||||
if (e instanceof NotFoundError) {
|
||||
if (logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
|
||||
return Promise.reject({
|
||||
status: 500,
|
||||
log
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
result: JSON.stringify(e)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
await this.sessionManager
|
||||
.clearSession(executionSessionId, access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while clearing session. ')
|
||||
})
|
||||
|
||||
return { result: jobResult?.result, log }
|
||||
} catch (e) {
|
||||
if (e && e.status === 404) {
|
||||
return this.executeScript(
|
||||
jobPath,
|
||||
linesOfCode,
|
||||
contextName,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
false,
|
||||
true
|
||||
)
|
||||
} else {
|
||||
throw prefixMessage(e, 'Error while executing script. ')
|
||||
}
|
||||
}
|
||||
return executeScript(
|
||||
this.requestClient,
|
||||
this.sessionManager,
|
||||
this.rootFolderName,
|
||||
jobPath,
|
||||
linesOfCode,
|
||||
contextName,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
expectWebout,
|
||||
waitForResult,
|
||||
pollOptions,
|
||||
printPid,
|
||||
variables
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -581,9 +335,6 @@ export class SASViyaApiClient {
|
||||
const formData = new NodeFormData()
|
||||
formData.append('file', contentBuffer, fileName)
|
||||
|
||||
const mimeType =
|
||||
mime.getType(fileName.match(/\.[0-9a-z]+$/i)?.[0] || '') ?? 'text/plain'
|
||||
|
||||
return (
|
||||
await this.requestClient.post<File>(
|
||||
`/files/files?parentFolderUri=${parentFolderUri}&typeDefName=file#rawUpload`,
|
||||
@@ -769,37 +520,7 @@ export class SASViyaApiClient {
|
||||
clientSecret: string,
|
||||
authCode: string
|
||||
): Promise<SasAuthResponse> {
|
||||
const url = this.serverUrl + '/SASLogon/oauth/token'
|
||||
let token
|
||||
if (typeof Buffer === 'undefined') {
|
||||
token = btoa(clientId + ':' + clientSecret)
|
||||
} else {
|
||||
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
|
||||
}
|
||||
const headers = {
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
|
||||
let formData
|
||||
if (typeof FormData === 'undefined') {
|
||||
formData = new NodeFormData()
|
||||
} else {
|
||||
formData = new FormData()
|
||||
}
|
||||
formData.append('grant_type', 'authorization_code')
|
||||
formData.append('code', authCode)
|
||||
|
||||
const authResponse = await this.requestClient
|
||||
.post(
|
||||
url,
|
||||
formData,
|
||||
undefined,
|
||||
'multipart/form-data; boundary=' + (formData as any)._boundary,
|
||||
headers
|
||||
)
|
||||
.then((res) => res.result as SasAuthResponse)
|
||||
|
||||
return authResponse
|
||||
return getAccessToken(this.requestClient, clientId, clientSecret, authCode)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -813,39 +534,12 @@ export class SASViyaApiClient {
|
||||
clientSecret: string,
|
||||
refreshToken: string
|
||||
) {
|
||||
const url = this.serverUrl + '/SASLogon/oauth/token'
|
||||
let token
|
||||
if (typeof Buffer === 'undefined') {
|
||||
token = btoa(clientId + ':' + clientSecret)
|
||||
} else {
|
||||
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
|
||||
}
|
||||
const headers = {
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
|
||||
let formData
|
||||
if (typeof FormData === 'undefined') {
|
||||
formData = new NodeFormData()
|
||||
formData.append('grant_type', 'refresh_token')
|
||||
formData.append('refresh_token', refreshToken)
|
||||
} else {
|
||||
formData = new FormData()
|
||||
formData.append('grant_type', 'refresh_token')
|
||||
formData.append('refresh_token', refreshToken)
|
||||
}
|
||||
|
||||
const authResponse = await this.requestClient
|
||||
.post<SasAuthResponse>(
|
||||
url,
|
||||
formData,
|
||||
undefined,
|
||||
'multipart/form-data; boundary=' + (formData as any)._boundary,
|
||||
headers
|
||||
)
|
||||
.then((res) => res.result)
|
||||
|
||||
return authResponse
|
||||
return refreshTokens(
|
||||
this.requestClient,
|
||||
clientId,
|
||||
clientSecret,
|
||||
refreshToken
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -892,7 +586,7 @@ export class SASViyaApiClient {
|
||||
) {
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
;({ access_token } = await getTokens(this.requestClient, authConfig))
|
||||
}
|
||||
|
||||
if (isRelativePath(sasJob) && !this.rootFolderName) {
|
||||
@@ -988,7 +682,7 @@ export class SASViyaApiClient {
|
||||
) {
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
;({ access_token } = await getTokens(this.requestClient, authConfig))
|
||||
}
|
||||
if (isRelativePath(sasJob) && !this.rootFolderName) {
|
||||
throw new Error(
|
||||
@@ -1060,18 +754,16 @@ export class SASViyaApiClient {
|
||||
jobDefinition,
|
||||
arguments: jobArguments
|
||||
}
|
||||
const { result: postedJob, etag } = await this.requestClient.post<Job>(
|
||||
const { result: postedJob } = await this.requestClient.post<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs?_action=wait`,
|
||||
postJobRequestBody,
|
||||
access_token
|
||||
)
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
authConfig
|
||||
).catch((err) => {
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
})
|
||||
const jobStatus = await this.pollJobState(postedJob, authConfig).catch(
|
||||
(err) => {
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
}
|
||||
)
|
||||
const { result: currentJob } = await this.requestClient.get<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs/${postedJob.id}`,
|
||||
access_token
|
||||
@@ -1137,157 +829,22 @@ export class SASViyaApiClient {
|
||||
this.folderMap.set(path, itemsAtRoot)
|
||||
}
|
||||
|
||||
// REFACTOR: set default value for 'pollOptions' attribute
|
||||
private async pollJobState(
|
||||
postedJob: any,
|
||||
etag: string | null,
|
||||
postedJob: Job,
|
||||
authConfig?: AuthConfig,
|
||||
pollOptions?: PollOptions
|
||||
) {
|
||||
const logger = process.logger || console
|
||||
|
||||
let POLL_INTERVAL = 300
|
||||
let MAX_POLL_COUNT = 1000
|
||||
let MAX_ERROR_COUNT = 5
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
}
|
||||
|
||||
if (pollOptions) {
|
||||
POLL_INTERVAL = pollOptions.POLL_INTERVAL || POLL_INTERVAL
|
||||
MAX_POLL_COUNT = pollOptions.MAX_POLL_COUNT || MAX_POLL_COUNT
|
||||
}
|
||||
|
||||
let postedJobState = ''
|
||||
let pollCount = 0
|
||||
let errorCount = 0
|
||||
const headers: any = {
|
||||
'Content-Type': 'application/json',
|
||||
'If-None-Match': etag
|
||||
}
|
||||
if (access_token) {
|
||||
headers.Authorization = `Bearer ${access_token}`
|
||||
}
|
||||
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
Promise.reject(`Job state link was not found.`)
|
||||
}
|
||||
|
||||
const { result: state } = await this.requestClient
|
||||
.get<string>(
|
||||
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
this.debug
|
||||
)
|
||||
.catch((err) => {
|
||||
console.error(
|
||||
`Error fetching job state from ${this.serverUrl}${stateLink.href}. Starting poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return { result: 'unavailable' }
|
||||
})
|
||||
|
||||
const currentState = state.trim()
|
||||
if (currentState === 'completed') {
|
||||
return Promise.resolve(currentState)
|
||||
}
|
||||
|
||||
return new Promise(async (resolve, _) => {
|
||||
let printedState = ''
|
||||
|
||||
const interval = setInterval(async () => {
|
||||
if (
|
||||
postedJobState === 'running' ||
|
||||
postedJobState === '' ||
|
||||
postedJobState === 'pending' ||
|
||||
postedJobState === 'unavailable'
|
||||
) {
|
||||
if (authConfig) {
|
||||
;({ access_token } = await this.getTokens(authConfig))
|
||||
}
|
||||
|
||||
if (stateLink) {
|
||||
const { result: jobState } = await this.requestClient
|
||||
.get<string>(
|
||||
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
this.debug
|
||||
)
|
||||
.catch((err) => {
|
||||
errorCount++
|
||||
if (
|
||||
pollCount >= MAX_POLL_COUNT ||
|
||||
errorCount >= MAX_ERROR_COUNT
|
||||
) {
|
||||
throw prefixMessage(
|
||||
err,
|
||||
'Error while getting job state after interval. '
|
||||
)
|
||||
}
|
||||
console.error(
|
||||
`Error fetching job state from ${this.serverUrl}${stateLink.href}. Resuming poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return { result: 'unavailable' }
|
||||
})
|
||||
|
||||
postedJobState = jobState.trim()
|
||||
if (postedJobState != 'unavailable' && errorCount > 0) {
|
||||
errorCount = 0
|
||||
}
|
||||
|
||||
if (this.debug && printedState !== postedJobState) {
|
||||
logger.info('Polling job status...')
|
||||
logger.info(`Current job state: ${postedJobState}`)
|
||||
|
||||
printedState = postedJobState
|
||||
}
|
||||
|
||||
pollCount++
|
||||
|
||||
if (pollCount >= MAX_POLL_COUNT) {
|
||||
resolve(postedJobState)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
clearInterval(interval)
|
||||
resolve(postedJobState)
|
||||
}
|
||||
}, POLL_INTERVAL)
|
||||
})
|
||||
return pollJobState(
|
||||
this.requestClient,
|
||||
postedJob,
|
||||
this.debug,
|
||||
authConfig,
|
||||
pollOptions
|
||||
)
|
||||
}
|
||||
|
||||
private async uploadTables(data: any, accessToken?: string) {
|
||||
const uploadedFiles = []
|
||||
const headers: any = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`
|
||||
}
|
||||
|
||||
for (const tableName in data) {
|
||||
const csv = convertToCSV(data[tableName])
|
||||
if (csv === 'ERROR: LARGE STRING LENGTH') {
|
||||
throw new Error(
|
||||
'The max length of a string value in SASjs is 32765 characters.'
|
||||
)
|
||||
}
|
||||
|
||||
const uploadResponse = await this.requestClient
|
||||
.uploadFile(`${this.serverUrl}/files/files#rawUpload`, csv, accessToken)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while uploading file. ')
|
||||
})
|
||||
|
||||
uploadedFiles.push({ tableName, file: uploadResponse.result })
|
||||
}
|
||||
return uploadedFiles
|
||||
return uploadTables(this.requestClient, data, accessToken)
|
||||
}
|
||||
|
||||
private async getFolderDetails(
|
||||
@@ -1376,14 +933,6 @@ export class SASViyaApiClient {
|
||||
? sourceFolder
|
||||
: await this.getFolderUri(sourceFolder, accessToken)
|
||||
|
||||
const requestInfo = {
|
||||
method: 'GET',
|
||||
headers: {
|
||||
'Content-Type': 'application/json',
|
||||
Authorization: 'Bearer ' + accessToken
|
||||
}
|
||||
}
|
||||
|
||||
const { result: members } = await this.requestClient.get<{ items: any[] }>(
|
||||
`${this.serverUrl}${sourceFolderUri}/members?limit=${limit}`,
|
||||
accessToken
|
||||
@@ -1490,21 +1039,4 @@ export class SASViyaApiClient {
|
||||
|
||||
return movedFolder
|
||||
}
|
||||
|
||||
private async getTokens(authConfig: AuthConfig): Promise<AuthConfig> {
|
||||
const logger = process.logger || console
|
||||
let { access_token, refresh_token, client, secret } = authConfig
|
||||
if (
|
||||
isAccessTokenExpiring(access_token) ||
|
||||
isRefreshTokenExpiring(refresh_token)
|
||||
) {
|
||||
logger.info('Refreshing access and refresh tokens.')
|
||||
;({ access_token, refresh_token } = await this.refreshTokens(
|
||||
client,
|
||||
secret,
|
||||
refresh_token
|
||||
))
|
||||
}
|
||||
return { access_token, refresh_token, client, secret }
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,12 @@ import { SASViyaApiClient } from './SASViyaApiClient'
|
||||
import { SAS9ApiClient } from './SAS9ApiClient'
|
||||
import { FileUploader } from './FileUploader'
|
||||
import { AuthManager } from './auth'
|
||||
import { ServerType, MacroVar, AuthConfig } from '@sasjs/utils/types'
|
||||
import {
|
||||
ServerType,
|
||||
MacroVar,
|
||||
AuthConfig,
|
||||
ExtraResponseAttributes
|
||||
} from '@sasjs/utils/types'
|
||||
import { RequestClient } from './request/RequestClient'
|
||||
import {
|
||||
JobExecutor,
|
||||
@@ -14,7 +19,6 @@ import {
|
||||
Sas9JobExecutor
|
||||
} from './job-execution'
|
||||
import { ErrorResponse } from './types/errors'
|
||||
import { ExtraResponseAttributes } from '@sasjs/utils/types'
|
||||
|
||||
const defaultConfig: SASjsConfig = {
|
||||
serverUrl: '',
|
||||
|
||||
293
src/api/viya/executeScript.ts
Normal file
293
src/api/viya/executeScript.ts
Normal file
@@ -0,0 +1,293 @@
|
||||
import { timestampToYYYYMMDDHHMMSS } from '@sasjs/utils/time'
|
||||
import { AuthConfig, MacroVar } from '@sasjs/utils/types'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import {
|
||||
PollOptions,
|
||||
Job,
|
||||
ComputeJobExecutionError,
|
||||
NotFoundError
|
||||
} from '../..'
|
||||
import { getTokens } from '../../auth/getTokens'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { SessionManager } from '../../SessionManager'
|
||||
import { isRelativePath, fetchLogByChunks } from '../../utils'
|
||||
import { formatDataForRequest } from '../../utils/formatDataForRequest'
|
||||
import { pollJobState } from './pollJobState'
|
||||
import { uploadTables } from './uploadTables'
|
||||
|
||||
/**
|
||||
* Executes code on the current SAS Viya server.
|
||||
* @param jobPath - the path to the file being submitted for execution.
|
||||
* @param linesOfCode - an array of code lines to execute.
|
||||
* @param contextName - the context to execute the code in.
|
||||
* @param authConfig - an object containing an access token, refresh token, client ID and secret.
|
||||
* @param data - execution data.
|
||||
* @param debug - when set to true, the log will be returned.
|
||||
* @param expectWebout - when set to true, the automatic _webout fileref will be checked for content, and that content returned. This fileref is used when the Job contains a SASjs web request (as opposed to executing arbitrary SAS code).
|
||||
* @param waitForResult - when set to true, function will return the session
|
||||
* @param pollOptions - an object that represents poll interval(milliseconds) and maximum amount of attempts. Object example: { MAX_POLL_COUNT: 24 * 60 * 60, POLL_INTERVAL: 1000 }.
|
||||
* @param printPid - a boolean that indicates whether the function should print (PID) of the started job.
|
||||
* @param variables - an object that represents macro variables.
|
||||
*/
|
||||
export async function executeScript(
|
||||
requestClient: RequestClient,
|
||||
sessionManager: SessionManager,
|
||||
rootFolderName: string,
|
||||
jobPath: string,
|
||||
linesOfCode: string[],
|
||||
contextName: string,
|
||||
authConfig?: AuthConfig,
|
||||
data: any = null,
|
||||
debug: boolean = false,
|
||||
expectWebout = false,
|
||||
waitForResult = true,
|
||||
pollOptions?: PollOptions,
|
||||
printPid = false,
|
||||
variables?: MacroVar
|
||||
): Promise<any> {
|
||||
let access_token = (authConfig || {}).access_token
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
const logger = process.logger || console
|
||||
|
||||
try {
|
||||
let executionSessionId: string
|
||||
|
||||
const session = await sessionManager
|
||||
.getSession(access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session. ')
|
||||
})
|
||||
|
||||
executionSessionId = session!.id
|
||||
|
||||
if (printPid) {
|
||||
const { result: jobIdVariable } = await sessionManager
|
||||
.getVariable(executionSessionId, 'SYSJOBID', access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session variable. ')
|
||||
})
|
||||
|
||||
if (jobIdVariable && jobIdVariable.value) {
|
||||
const relativeJobPath = rootFolderName
|
||||
? jobPath.split(rootFolderName).join('').replace(/^\//, '')
|
||||
: jobPath
|
||||
|
||||
const logger = process.logger || console
|
||||
|
||||
logger.info(
|
||||
`Triggered '${relativeJobPath}' with PID ${
|
||||
jobIdVariable.value
|
||||
} at ${timestampToYYYYMMDDHHMMSS()}`
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
const jobArguments: { [key: string]: any } = {
|
||||
_contextName: contextName,
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: true,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: true
|
||||
}
|
||||
|
||||
if (debug) {
|
||||
jobArguments['_OMITTEXTLOG'] = false
|
||||
jobArguments['_OMITSESSIONRESULTS'] = false
|
||||
}
|
||||
|
||||
let fileName
|
||||
|
||||
if (isRelativePath(jobPath)) {
|
||||
fileName = `exec-${
|
||||
jobPath.includes('/') ? jobPath.split('/')[1] : jobPath
|
||||
}`
|
||||
} else {
|
||||
const jobPathParts = jobPath.split('/')
|
||||
fileName = jobPathParts.pop()
|
||||
}
|
||||
|
||||
let jobVariables: any = {
|
||||
SYS_JES_JOB_URI: '',
|
||||
_program: isRelativePath(jobPath)
|
||||
? rootFolderName + '/' + jobPath
|
||||
: jobPath
|
||||
}
|
||||
|
||||
if (variables) jobVariables = { ...jobVariables, ...variables }
|
||||
|
||||
if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 }
|
||||
|
||||
let files: any[] = []
|
||||
|
||||
if (data) {
|
||||
if (JSON.stringify(data).includes(';')) {
|
||||
files = await uploadTables(requestClient, data, access_token).catch(
|
||||
(err) => {
|
||||
throw prefixMessage(err, 'Error while uploading tables. ')
|
||||
}
|
||||
)
|
||||
|
||||
jobVariables['_webin_file_count'] = files.length
|
||||
|
||||
files.forEach((fileInfo, index) => {
|
||||
jobVariables[
|
||||
`_webin_fileuri${index + 1}`
|
||||
] = `/files/files/${fileInfo.file.id}`
|
||||
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName
|
||||
})
|
||||
} else {
|
||||
jobVariables = { ...jobVariables, ...formatDataForRequest(data) }
|
||||
}
|
||||
}
|
||||
|
||||
// Execute job in session
|
||||
const jobRequestBody = {
|
||||
name: fileName,
|
||||
description: 'Powered by SASjs',
|
||||
code: linesOfCode,
|
||||
variables: jobVariables,
|
||||
arguments: jobArguments
|
||||
}
|
||||
|
||||
const { result: postedJob, etag } = await requestClient
|
||||
.post<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs`,
|
||||
jobRequestBody,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while posting job. ')
|
||||
})
|
||||
|
||||
if (!waitForResult) return session
|
||||
|
||||
if (debug) {
|
||||
logger.info(`Job has been submitted for '${fileName}'.`)
|
||||
logger.info(
|
||||
`You can monitor the job progress at '${requestClient.getBaseUrl()}${
|
||||
postedJob.links.find((l: any) => l.rel === 'state')!.href
|
||||
}'.`
|
||||
)
|
||||
}
|
||||
|
||||
const jobStatus = await pollJobState(
|
||||
requestClient,
|
||||
postedJob,
|
||||
debug,
|
||||
authConfig,
|
||||
pollOptions
|
||||
).catch(async (err) => {
|
||||
const error = err?.response?.data
|
||||
const result = /err=[0-9]*,/.exec(error)
|
||||
|
||||
const errorCode = '5113'
|
||||
if (result?.[0]?.slice(4, -1) === errorCode) {
|
||||
const sessionLogUrl =
|
||||
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
|
||||
const logCount = 1000000
|
||||
err.log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
sessionLogUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
})
|
||||
|
||||
if (authConfig) {
|
||||
;({ access_token } = await getTokens(requestClient, authConfig))
|
||||
}
|
||||
|
||||
const { result: currentJob } = await requestClient
|
||||
.get<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting job. ')
|
||||
})
|
||||
|
||||
let jobResult
|
||||
let log = ''
|
||||
|
||||
const logLink = currentJob.links.find((l) => l.rel === 'log')
|
||||
|
||||
if (debug && logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
}
|
||||
|
||||
if (jobStatus === 'failed' || jobStatus === 'error') {
|
||||
throw new ComputeJobExecutionError(currentJob, log)
|
||||
}
|
||||
|
||||
if (!expectWebout) {
|
||||
return { job: currentJob, log }
|
||||
}
|
||||
|
||||
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
|
||||
|
||||
jobResult = await requestClient
|
||||
.get<any>(resultLink, access_token, 'text/plain')
|
||||
.catch(async (e) => {
|
||||
if (e instanceof NotFoundError) {
|
||||
if (logLink) {
|
||||
const logUrl = `${logLink.href}/content`
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
requestClient,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
|
||||
return Promise.reject({
|
||||
status: 500,
|
||||
log
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
result: JSON.stringify(e)
|
||||
}
|
||||
})
|
||||
|
||||
await sessionManager
|
||||
.clearSession(executionSessionId, access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while clearing session. ')
|
||||
})
|
||||
|
||||
return { result: jobResult?.result, log }
|
||||
} catch (e) {
|
||||
if (e && e.status === 404) {
|
||||
return executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
rootFolderName,
|
||||
jobPath,
|
||||
linesOfCode,
|
||||
contextName,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
false,
|
||||
true
|
||||
)
|
||||
} else {
|
||||
throw prefixMessage(e, 'Error while executing script. ')
|
||||
}
|
||||
}
|
||||
}
|
||||
246
src/api/viya/pollJobState.ts
Normal file
246
src/api/viya/pollJobState.ts
Normal file
@@ -0,0 +1,246 @@
|
||||
import { AuthConfig } from '@sasjs/utils/types'
|
||||
import { Job, PollOptions } from '../..'
|
||||
import { getTokens } from '../../auth/getTokens'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { JobStatePollError } from '../../types/errors'
|
||||
import { generateTimestamp } from '@sasjs/utils/time'
|
||||
import { saveLog } from './saveLog'
|
||||
import { createWriteStream } from '@sasjs/utils/file'
|
||||
import { WriteStream } from 'fs'
|
||||
import { Link } from '../../types'
|
||||
|
||||
export async function pollJobState(
|
||||
requestClient: RequestClient,
|
||||
postedJob: Job,
|
||||
debug: boolean,
|
||||
authConfig?: AuthConfig,
|
||||
pollOptions?: PollOptions
|
||||
) {
|
||||
const logger = process.logger || console
|
||||
|
||||
let pollInterval = 300
|
||||
let maxPollCount = 1000
|
||||
|
||||
if (pollOptions) {
|
||||
pollInterval = pollOptions.pollInterval || pollInterval
|
||||
maxPollCount = pollOptions.maxPollCount || maxPollCount
|
||||
}
|
||||
|
||||
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
throw new Error(`Job state link was not found.`)
|
||||
}
|
||||
|
||||
let currentState = await getJobState(
|
||||
requestClient,
|
||||
postedJob,
|
||||
'',
|
||||
debug,
|
||||
authConfig
|
||||
).catch((err) => {
|
||||
logger.error(
|
||||
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return 'unavailable'
|
||||
})
|
||||
|
||||
let pollCount = 0
|
||||
|
||||
if (currentState === 'completed') {
|
||||
return Promise.resolve(currentState)
|
||||
}
|
||||
|
||||
let logFileStream
|
||||
if (pollOptions?.streamLog) {
|
||||
const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log`
|
||||
const logFilePath = `${
|
||||
pollOptions?.logFolderPath || process.cwd()
|
||||
}/${logFileName}`
|
||||
|
||||
logFileStream = await createWriteStream(logFilePath)
|
||||
}
|
||||
|
||||
let result = await doPoll(
|
||||
requestClient,
|
||||
postedJob,
|
||||
currentState,
|
||||
debug,
|
||||
pollCount,
|
||||
authConfig,
|
||||
pollOptions,
|
||||
logFileStream
|
||||
)
|
||||
|
||||
currentState = result.state
|
||||
pollCount = result.pollCount
|
||||
|
||||
if (!needsRetry(currentState) || pollCount >= maxPollCount) {
|
||||
return currentState
|
||||
}
|
||||
|
||||
// If we get to this point, this is a long-running job that needs longer polling.
|
||||
// We will resume polling with a bigger interval of 1 minute
|
||||
let longJobPollOptions: PollOptions = {
|
||||
maxPollCount: 24 * 60,
|
||||
pollInterval: 60000,
|
||||
streamLog: false
|
||||
}
|
||||
if (pollOptions) {
|
||||
longJobPollOptions.streamLog = pollOptions.streamLog
|
||||
longJobPollOptions.logFolderPath = pollOptions.logFolderPath
|
||||
}
|
||||
|
||||
result = await doPoll(
|
||||
requestClient,
|
||||
postedJob,
|
||||
currentState,
|
||||
debug,
|
||||
pollCount,
|
||||
authConfig,
|
||||
longJobPollOptions,
|
||||
logFileStream
|
||||
)
|
||||
|
||||
currentState = result.state
|
||||
pollCount = result.pollCount
|
||||
|
||||
if (logFileStream) {
|
||||
logFileStream.end()
|
||||
}
|
||||
|
||||
return currentState
|
||||
}
|
||||
|
||||
const getJobState = async (
|
||||
requestClient: RequestClient,
|
||||
job: Job,
|
||||
currentState: string,
|
||||
debug: boolean,
|
||||
authConfig?: AuthConfig
|
||||
) => {
|
||||
const stateLink = job.links.find((l: any) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
throw new Error(`Job state link was not found.`)
|
||||
}
|
||||
|
||||
if (needsRetry(currentState)) {
|
||||
let tokens
|
||||
if (authConfig) {
|
||||
tokens = await getTokens(requestClient, authConfig)
|
||||
}
|
||||
|
||||
const { result: jobState } = await requestClient
|
||||
.get<string>(
|
||||
`${stateLink.href}?_action=wait&wait=300`,
|
||||
tokens?.access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
debug
|
||||
)
|
||||
.catch((err) => {
|
||||
throw new JobStatePollError(job.id, err)
|
||||
})
|
||||
|
||||
return jobState.trim()
|
||||
} else {
|
||||
return currentState
|
||||
}
|
||||
}
|
||||
|
||||
const needsRetry = (state: string) =>
|
||||
state === 'running' ||
|
||||
state === '' ||
|
||||
state === 'pending' ||
|
||||
state === 'unavailable'
|
||||
|
||||
const doPoll = async (
|
||||
requestClient: RequestClient,
|
||||
postedJob: Job,
|
||||
currentState: string,
|
||||
debug: boolean,
|
||||
pollCount: number,
|
||||
authConfig?: AuthConfig,
|
||||
pollOptions?: PollOptions,
|
||||
logStream?: WriteStream
|
||||
): Promise<{ state: string; pollCount: number }> => {
|
||||
let pollInterval = 300
|
||||
let maxPollCount = 1000
|
||||
let maxErrorCount = 5
|
||||
let errorCount = 0
|
||||
let state = currentState
|
||||
let printedState = ''
|
||||
let startLogLine = 0
|
||||
|
||||
const logger = process.logger || console
|
||||
|
||||
if (pollOptions) {
|
||||
pollInterval = pollOptions.pollInterval || pollInterval
|
||||
maxPollCount = pollOptions.maxPollCount || maxPollCount
|
||||
}
|
||||
|
||||
const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
throw new Error(`Job state link was not found.`)
|
||||
}
|
||||
|
||||
while (needsRetry(state) && pollCount <= 100 && pollCount <= maxPollCount) {
|
||||
state = await getJobState(
|
||||
requestClient,
|
||||
postedJob,
|
||||
state,
|
||||
debug,
|
||||
authConfig
|
||||
).catch((err) => {
|
||||
errorCount++
|
||||
if (pollCount >= maxPollCount || errorCount >= maxErrorCount) {
|
||||
throw err
|
||||
}
|
||||
logger.error(
|
||||
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
|
||||
err
|
||||
)
|
||||
return 'unavailable'
|
||||
})
|
||||
|
||||
pollCount++
|
||||
|
||||
if (pollOptions?.streamLog) {
|
||||
const jobUrl = postedJob.links.find((l: Link) => l.rel === 'self')
|
||||
const { result: job } = await requestClient.get<Job>(
|
||||
jobUrl!.href,
|
||||
authConfig?.access_token
|
||||
)
|
||||
|
||||
const endLogLine = job.logStatistics?.lineCount ?? 1000000
|
||||
|
||||
await saveLog(
|
||||
postedJob,
|
||||
requestClient,
|
||||
startLogLine,
|
||||
endLogLine,
|
||||
logStream,
|
||||
authConfig?.access_token
|
||||
)
|
||||
|
||||
startLogLine += endLogLine
|
||||
}
|
||||
|
||||
if (debug && printedState !== state) {
|
||||
logger.info('Polling job status...')
|
||||
logger.info(`Current job state: ${state}`)
|
||||
|
||||
printedState = state
|
||||
}
|
||||
|
||||
if (state != 'unavailable' && errorCount > 0) {
|
||||
errorCount = 0
|
||||
}
|
||||
|
||||
await delay(pollInterval)
|
||||
}
|
||||
|
||||
return { state, pollCount }
|
||||
}
|
||||
|
||||
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
|
||||
55
src/api/viya/saveLog.ts
Normal file
55
src/api/viya/saveLog.ts
Normal file
@@ -0,0 +1,55 @@
|
||||
import { Job } from '../..'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { fetchLog } from '../../utils'
|
||||
import { WriteStream } from 'fs'
|
||||
import { writeStream } from './writeStream'
|
||||
|
||||
/**
|
||||
* Appends logs to a supplied write stream.
|
||||
* This is useful for getting quick feedback on longer running jobs.
|
||||
* @param job - the job to fetch logs for
|
||||
* @param requestClient - the pre-configured HTTP request client
|
||||
* @param startLine - the line at which to start fetching the log
|
||||
* @param endLine - the line at which to stop fetching the log
|
||||
* @param logFileStream - the write stream to which the log is appended
|
||||
* @accessToken - an optional access token for authentication/authorization
|
||||
* The access token is not required when fetching logs from the browser.
|
||||
*/
|
||||
export async function saveLog(
|
||||
job: Job,
|
||||
requestClient: RequestClient,
|
||||
startLine: number,
|
||||
endLine: number,
|
||||
logFileStream?: WriteStream,
|
||||
accessToken?: string
|
||||
) {
|
||||
if (!accessToken) {
|
||||
throw new Error(
|
||||
`Logs for job ${job.id} cannot be fetched without a valid access token.`
|
||||
)
|
||||
}
|
||||
|
||||
if (!logFileStream) {
|
||||
throw new Error(
|
||||
`Logs for job ${job.id} cannot be written without a valid write stream.`
|
||||
)
|
||||
}
|
||||
|
||||
const logger = process.logger || console
|
||||
const jobLogUrl = job.links.find((l) => l.rel === 'log')
|
||||
|
||||
if (!jobLogUrl) {
|
||||
throw new Error(`Log URL for job ${job.id} was not found.`)
|
||||
}
|
||||
|
||||
const log = await fetchLog(
|
||||
requestClient,
|
||||
accessToken,
|
||||
`${jobLogUrl.href}/content`,
|
||||
startLine,
|
||||
endLine
|
||||
)
|
||||
|
||||
logger.info(`Writing logs to ${logFileStream.path}`)
|
||||
await writeStream(logFileStream, log || '')
|
||||
}
|
||||
675
src/api/viya/spec/executeScript.spec.ts
Normal file
675
src/api/viya/spec/executeScript.spec.ts
Normal file
@@ -0,0 +1,675 @@
|
||||
import { RequestClient } from '../../../request/RequestClient'
|
||||
import { SessionManager } from '../../../SessionManager'
|
||||
import { executeScript } from '../executeScript'
|
||||
import { mockSession, mockAuthConfig, mockJob } from './mockResponses'
|
||||
import * as pollJobStateModule from '../pollJobState'
|
||||
import * as uploadTablesModule from '../uploadTables'
|
||||
import * as getTokensModule from '../../../auth/getTokens'
|
||||
import * as formatDataModule from '../../../utils/formatDataForRequest'
|
||||
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
|
||||
import { PollOptions } from '../../../types'
|
||||
import { ComputeJobExecutionError, NotFoundError } from '../../../types/errors'
|
||||
import { Logger, LogLevel } from '@sasjs/utils'
|
||||
|
||||
const sessionManager = new (<jest.Mock<SessionManager>>SessionManager)()
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
const defaultPollOptions: PollOptions = {
|
||||
maxPollCount: 100,
|
||||
pollInterval: 500,
|
||||
streamLog: false
|
||||
}
|
||||
|
||||
describe('executeScript', () => {
|
||||
beforeEach(() => {
|
||||
;(process as any).logger = new Logger(LogLevel.Off)
|
||||
setupMocks()
|
||||
})
|
||||
|
||||
it('should not try to get fresh tokens if an authConfig is not provided', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context'
|
||||
)
|
||||
|
||||
expect(getTokensModule.getTokens).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should try to get fresh tokens if an authConfig is provided', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context',
|
||||
mockAuthConfig
|
||||
)
|
||||
|
||||
expect(getTokensModule.getTokens).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig
|
||||
)
|
||||
})
|
||||
|
||||
it('should get a session from the session manager before executing', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context'
|
||||
)
|
||||
|
||||
expect(sessionManager.getSession).toHaveBeenCalledWith(undefined)
|
||||
})
|
||||
|
||||
it('should handle errors while getting a session', async () => {
|
||||
jest
|
||||
.spyOn(sessionManager, 'getSession')
|
||||
.mockImplementation(() => Promise.reject('Test Error'))
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context'
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while getting session.')
|
||||
})
|
||||
|
||||
it('should fetch the PID when printPid is true', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(sessionManager.getVariable).toHaveBeenCalledWith(
|
||||
mockSession.id,
|
||||
'SYSJOBID',
|
||||
mockAuthConfig.access_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle errors while getting the job PID', async () => {
|
||||
jest
|
||||
.spyOn(sessionManager, 'getVariable')
|
||||
.mockImplementation(() => Promise.reject('Test Error'))
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
null,
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while getting session variable.')
|
||||
})
|
||||
|
||||
it('should use the file upload approach when data contains semicolons', async () => {
|
||||
jest
|
||||
.spyOn(uploadTablesModule, 'uploadTables')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve([{ tableName: 'test', file: { id: 1 } }])
|
||||
)
|
||||
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar;' },
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(uploadTablesModule.uploadTables).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
{ foo: 'bar;' },
|
||||
mockAuthConfig.access_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should format data as CSV when it does not contain semicolons', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put hello'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(formatDataModule.formatDataForRequest).toHaveBeenCalledWith({
|
||||
foo: 'bar'
|
||||
})
|
||||
})
|
||||
|
||||
it('should submit a job for execution via the compute API', async () => {
|
||||
jest
|
||||
.spyOn(formatDataModule, 'formatDataForRequest')
|
||||
.mockImplementation(() => ({ sasjs_tables: 'foo', sasjs0data: 'bar' }))
|
||||
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(requestClient.post).toHaveBeenCalledWith(
|
||||
`/compute/sessions/${mockSession.id}/jobs`,
|
||||
{
|
||||
name: 'exec-test',
|
||||
description: 'Powered by SASjs',
|
||||
code: ['%put "hello";'],
|
||||
variables: {
|
||||
SYS_JES_JOB_URI: '',
|
||||
_program: 'test/test',
|
||||
sasjs_tables: 'foo',
|
||||
sasjs0data: 'bar'
|
||||
},
|
||||
arguments: {
|
||||
_contextName: 'test context',
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: true,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: true
|
||||
}
|
||||
},
|
||||
mockAuthConfig.access_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should set the correct variables when debug is true', async () => {
|
||||
jest
|
||||
.spyOn(formatDataModule, 'formatDataForRequest')
|
||||
.mockImplementation(() => ({ sasjs_tables: 'foo', sasjs0data: 'bar' }))
|
||||
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(requestClient.post).toHaveBeenCalledWith(
|
||||
`/compute/sessions/${mockSession.id}/jobs`,
|
||||
{
|
||||
name: 'exec-test',
|
||||
description: 'Powered by SASjs',
|
||||
code: ['%put "hello";'],
|
||||
variables: {
|
||||
SYS_JES_JOB_URI: '',
|
||||
_program: 'test/test',
|
||||
sasjs_tables: 'foo',
|
||||
sasjs0data: 'bar',
|
||||
_DEBUG: 131
|
||||
},
|
||||
arguments: {
|
||||
_contextName: 'test context',
|
||||
_OMITJSONLISTING: true,
|
||||
_OMITJSONLOG: true,
|
||||
_OMITSESSIONRESULTS: false,
|
||||
_OMITTEXTLISTING: true,
|
||||
_OMITTEXTLOG: false
|
||||
}
|
||||
},
|
||||
mockAuthConfig.access_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle errors during job submission', async () => {
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() => Promise.reject('Test Error'))
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while posting job')
|
||||
})
|
||||
|
||||
it('should immediately return the session when waitForResult is false', async () => {
|
||||
const result = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
false,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(result).toEqual(mockSession)
|
||||
})
|
||||
|
||||
it('should poll for job completion when waitForResult is true', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(pollJobStateModule.pollJobState).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
defaultPollOptions
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle general errors when polling for job status', async () => {
|
||||
jest
|
||||
.spyOn(pollJobStateModule, 'pollJobState')
|
||||
.mockImplementation(() => Promise.reject('Poll Error'))
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while polling job status.')
|
||||
})
|
||||
|
||||
it('should fetch the log and append it to the error in case of a 5113 error code', async () => {
|
||||
jest
|
||||
.spyOn(pollJobStateModule, 'pollJobState')
|
||||
.mockImplementation(() =>
|
||||
Promise.reject({ response: { data: 'err=5113,' } })
|
||||
)
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig.access_token,
|
||||
mockJob.links.find((l) => l.rel === 'up')!.href + '/log',
|
||||
1000000
|
||||
)
|
||||
expect(error.log).toEqual('Test Log')
|
||||
})
|
||||
|
||||
it('should fetch the logs for the job if debug is true and a log URL is available', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig.access_token,
|
||||
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
|
||||
mockJob.logStatistics.lineCount
|
||||
)
|
||||
})
|
||||
|
||||
it('should not fetch the logs for the job if debug is false', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should throw a ComputeJobExecutionError if the job has failed', async () => {
|
||||
jest
|
||||
.spyOn(pollJobStateModule, 'pollJobState')
|
||||
.mockImplementation(() => Promise.resolve('failed'))
|
||||
|
||||
const error: ComputeJobExecutionError = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig.access_token,
|
||||
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
|
||||
mockJob.logStatistics.lineCount
|
||||
)
|
||||
|
||||
expect(error).toBeInstanceOf(ComputeJobExecutionError)
|
||||
expect(error.log).toEqual('Test Log')
|
||||
expect(error.job).toEqual(mockJob)
|
||||
})
|
||||
|
||||
it('should throw a ComputeJobExecutionError if the job has errored out', async () => {
|
||||
jest
|
||||
.spyOn(pollJobStateModule, 'pollJobState')
|
||||
.mockImplementation(() => Promise.resolve('error'))
|
||||
|
||||
const error: ComputeJobExecutionError = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
true,
|
||||
false,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig.access_token,
|
||||
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
|
||||
mockJob.logStatistics.lineCount
|
||||
)
|
||||
|
||||
expect(error).toBeInstanceOf(ComputeJobExecutionError)
|
||||
expect(error.log).toEqual('Test Log')
|
||||
expect(error.job).toEqual(mockJob)
|
||||
})
|
||||
|
||||
it('should fetch the result if expectWebout is true', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(requestClient.get).toHaveBeenCalledWith(
|
||||
`/compute/sessions/${mockSession.id}/filerefs/_webout/content`,
|
||||
mockAuthConfig.access_token,
|
||||
'text/plain'
|
||||
)
|
||||
})
|
||||
|
||||
it('should fetch the logs if the webout file was not found', async () => {
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url, ...rest) => {
|
||||
if (url.includes('_webout')) {
|
||||
return Promise.reject(new NotFoundError(url))
|
||||
}
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
})
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(requestClient.get).toHaveBeenCalledWith(
|
||||
`/compute/sessions/${mockSession.id}/filerefs/_webout/content`,
|
||||
mockAuthConfig.access_token,
|
||||
'text/plain'
|
||||
)
|
||||
|
||||
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig.access_token,
|
||||
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
|
||||
mockJob.logStatistics.lineCount
|
||||
)
|
||||
|
||||
expect(error.status).toEqual(500)
|
||||
expect(error.log).toEqual('Test Log')
|
||||
})
|
||||
|
||||
it('should clear the session after execution is complete', async () => {
|
||||
await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
)
|
||||
|
||||
expect(sessionManager.clearSession).toHaveBeenCalledWith(
|
||||
mockSession.id,
|
||||
mockAuthConfig.access_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle errors while clearing a session', async () => {
|
||||
jest
|
||||
.spyOn(sessionManager, 'clearSession')
|
||||
.mockImplementation(() => Promise.reject('Clear Session Error'))
|
||||
|
||||
const error = await executeScript(
|
||||
requestClient,
|
||||
sessionManager,
|
||||
'test',
|
||||
'test',
|
||||
['%put "hello";'],
|
||||
'test context',
|
||||
mockAuthConfig,
|
||||
{ foo: 'bar' },
|
||||
false,
|
||||
true,
|
||||
true,
|
||||
defaultPollOptions,
|
||||
true
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while clearing session.')
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../../request/RequestClient')
|
||||
jest.mock('../../../SessionManager')
|
||||
jest.mock('../../../auth/getTokens')
|
||||
jest.mock('../pollJobState')
|
||||
jest.mock('../uploadTables')
|
||||
jest.mock('../../../utils/formatDataForRequest')
|
||||
jest.mock('../../../utils/fetchLogByChunks')
|
||||
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() => Promise.resolve({ result: mockJob, etag: '' }))
|
||||
jest
|
||||
.spyOn(requestClient, 'get')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
)
|
||||
jest
|
||||
.spyOn(requestClient, 'delete')
|
||||
.mockImplementation(() => Promise.resolve({ result: {}, etag: '' }))
|
||||
jest
|
||||
.spyOn(getTokensModule, 'getTokens')
|
||||
.mockImplementation(() => Promise.resolve(mockAuthConfig))
|
||||
jest
|
||||
.spyOn(pollJobStateModule, 'pollJobState')
|
||||
.mockImplementation(() => Promise.resolve('completed'))
|
||||
jest
|
||||
.spyOn(sessionManager, 'getVariable')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: { value: 'test' }, etag: 'test', status: 200 })
|
||||
)
|
||||
jest
|
||||
.spyOn(sessionManager, 'getSession')
|
||||
.mockImplementation(() => Promise.resolve(mockSession))
|
||||
jest
|
||||
.spyOn(sessionManager, 'clearSession')
|
||||
.mockImplementation(() => Promise.resolve())
|
||||
jest
|
||||
.spyOn(formatDataModule, 'formatDataForRequest')
|
||||
.mockImplementation(() => ({ sasjs_tables: 'test', sasjs0data: 'test' }))
|
||||
jest
|
||||
.spyOn(fetchLogsModule, 'fetchLogByChunks')
|
||||
.mockImplementation(() => Promise.resolve('Test Log'))
|
||||
}
|
||||
73
src/api/viya/spec/mockResponses.ts
Normal file
73
src/api/viya/spec/mockResponses.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { AuthConfig } from '@sasjs/utils/types'
|
||||
import { Job, Session } from '../../../types'
|
||||
|
||||
export const mockSession: Session = {
|
||||
id: 's35510n',
|
||||
state: 'idle',
|
||||
links: [],
|
||||
attributes: {
|
||||
sessionInactiveTimeout: 1
|
||||
},
|
||||
creationTimeStamp: new Date().valueOf().toString()
|
||||
}
|
||||
|
||||
export const mockJob: Job = {
|
||||
id: 'j0b',
|
||||
name: 'test job',
|
||||
uri: '/j0b',
|
||||
createdBy: 'test user',
|
||||
results: {
|
||||
'_webout.json': 'test'
|
||||
},
|
||||
logStatistics: {
|
||||
lineCount: 100,
|
||||
modifiedTimeStamp: new Date().valueOf().toString()
|
||||
},
|
||||
links: [
|
||||
{
|
||||
rel: 'log',
|
||||
href: '/log',
|
||||
method: 'GET',
|
||||
type: 'log',
|
||||
uri: 'log'
|
||||
},
|
||||
{
|
||||
rel: 'self',
|
||||
href: '/job',
|
||||
method: 'GET',
|
||||
type: 'job',
|
||||
uri: 'job'
|
||||
},
|
||||
{
|
||||
rel: 'state',
|
||||
href: '/state',
|
||||
method: 'GET',
|
||||
type: 'state',
|
||||
uri: 'state'
|
||||
},
|
||||
{
|
||||
rel: 'up',
|
||||
href: '/job',
|
||||
method: 'GET',
|
||||
type: 'up',
|
||||
uri: 'job'
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
export const mockAuthConfig: AuthConfig = {
|
||||
client: 'cl13nt',
|
||||
secret: '53cr3t',
|
||||
access_token: 'acc355',
|
||||
refresh_token: 'r3fr35h'
|
||||
}
|
||||
|
||||
export class MockStream {
|
||||
_write(chunk: string, _: any, next: Function) {
|
||||
next()
|
||||
}
|
||||
|
||||
reset() {}
|
||||
|
||||
destroy() {}
|
||||
}
|
||||
313
src/api/viya/spec/pollJobState.spec.ts
Normal file
313
src/api/viya/spec/pollJobState.spec.ts
Normal file
@@ -0,0 +1,313 @@
|
||||
import { Logger, LogLevel } from '@sasjs/utils'
|
||||
import * as fileModule from '@sasjs/utils/file'
|
||||
import { RequestClient } from '../../../request/RequestClient'
|
||||
import { mockAuthConfig, mockJob } from './mockResponses'
|
||||
import { pollJobState } from '../pollJobState'
|
||||
import * as getTokensModule from '../../../auth/getTokens'
|
||||
import * as saveLogModule from '../saveLog'
|
||||
import { PollOptions } from '../../../types'
|
||||
import { WriteStream } from 'fs'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
const defaultPollOptions: PollOptions = {
|
||||
maxPollCount: 100,
|
||||
pollInterval: 500,
|
||||
streamLog: false
|
||||
}
|
||||
|
||||
describe('pollJobState', () => {
|
||||
beforeEach(() => {
|
||||
;(process as any).logger = new Logger(LogLevel.Off)
|
||||
setupMocks()
|
||||
})
|
||||
|
||||
it('should get valid tokens if the authConfig has been provided', async () => {
|
||||
await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(getTokensModule.getTokens).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
mockAuthConfig
|
||||
)
|
||||
})
|
||||
|
||||
it('should not attempt to get tokens if the authConfig has not been provided', async () => {
|
||||
await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(getTokensModule.getTokens).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should throw an error if the job does not have a state link', async () => {
|
||||
const error = await pollJobState(
|
||||
requestClient,
|
||||
{ ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'state') },
|
||||
false,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
).catch((e) => e)
|
||||
|
||||
expect((error as Error).message).toContain('Job state link was not found.')
|
||||
})
|
||||
|
||||
it('should attempt to refresh tokens before each poll', async () => {
|
||||
mockSimplePoll()
|
||||
|
||||
await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(getTokensModule.getTokens).toHaveBeenCalledTimes(3)
|
||||
})
|
||||
|
||||
it('should attempt to fetch and save the log after each poll when streamLog is true', async () => {
|
||||
mockSimplePoll()
|
||||
|
||||
await pollJobState(requestClient, mockJob, false, mockAuthConfig, {
|
||||
...defaultPollOptions,
|
||||
streamLog: true
|
||||
})
|
||||
|
||||
expect(saveLogModule.saveLog).toHaveBeenCalledTimes(2)
|
||||
})
|
||||
|
||||
it('should not attempt to fetch and save the log after each poll when streamLog is false', async () => {
|
||||
mockSimplePoll()
|
||||
|
||||
await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(saveLogModule.saveLog).not.toHaveBeenCalled()
|
||||
})
|
||||
|
||||
it('should return the current status when the max poll count is reached', async () => {
|
||||
mockRunningPoll()
|
||||
|
||||
const state = await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
{
|
||||
...defaultPollOptions,
|
||||
maxPollCount: 1
|
||||
}
|
||||
)
|
||||
|
||||
expect(state).toEqual('running')
|
||||
})
|
||||
|
||||
it('should poll with a larger interval for longer running jobs', async () => {
|
||||
mockLongPoll()
|
||||
|
||||
const state = await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
mockAuthConfig,
|
||||
{
|
||||
...defaultPollOptions,
|
||||
maxPollCount: 200,
|
||||
pollInterval: 10
|
||||
}
|
||||
)
|
||||
|
||||
expect(state).toEqual('completed')
|
||||
}, 200000)
|
||||
|
||||
it('should continue polling until the job completes or errors', async () => {
|
||||
mockSimplePoll(1)
|
||||
|
||||
const state = await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(requestClient.get).toHaveBeenCalledTimes(2)
|
||||
expect(state).toEqual('completed')
|
||||
})
|
||||
|
||||
it('should print the state to the console when debug is on', async () => {
|
||||
jest.spyOn((process as any).logger, 'info')
|
||||
mockSimplePoll()
|
||||
|
||||
await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
true,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect((process as any).logger.info).toHaveBeenCalledTimes(4)
|
||||
expect((process as any).logger.info).toHaveBeenNthCalledWith(
|
||||
1,
|
||||
'Polling job status...'
|
||||
)
|
||||
expect((process as any).logger.info).toHaveBeenNthCalledWith(
|
||||
2,
|
||||
'Current job state: running'
|
||||
)
|
||||
expect((process as any).logger.info).toHaveBeenNthCalledWith(
|
||||
3,
|
||||
'Polling job status...'
|
||||
)
|
||||
expect((process as any).logger.info).toHaveBeenNthCalledWith(
|
||||
4,
|
||||
'Current job state: completed'
|
||||
)
|
||||
})
|
||||
|
||||
it('should continue polling when there is a single error in between', async () => {
|
||||
mockPollWithSingleError()
|
||||
|
||||
const state = await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
)
|
||||
|
||||
expect(requestClient.get).toHaveBeenCalledTimes(2)
|
||||
expect(state).toEqual('completed')
|
||||
})
|
||||
|
||||
it('should throw an error when the error count exceeds the set value of 5', async () => {
|
||||
mockErroredPoll()
|
||||
|
||||
const error = await pollJobState(
|
||||
requestClient,
|
||||
mockJob,
|
||||
false,
|
||||
undefined,
|
||||
defaultPollOptions
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error.message).toEqual(
|
||||
'Error while polling job state for job j0b: Status Error'
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../../request/RequestClient')
|
||||
jest.mock('../../../auth/getTokens')
|
||||
jest.mock('../saveLog')
|
||||
jest.mock('@sasjs/utils/file')
|
||||
|
||||
jest
|
||||
.spyOn(requestClient, 'get')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: 'completed', etag: '', status: 200 })
|
||||
)
|
||||
jest
|
||||
.spyOn(getTokensModule, 'getTokens')
|
||||
.mockImplementation(() => Promise.resolve(mockAuthConfig))
|
||||
jest
|
||||
.spyOn(saveLogModule, 'saveLog')
|
||||
.mockImplementation(() => Promise.resolve())
|
||||
jest
|
||||
.spyOn(fileModule, 'createWriteStream')
|
||||
.mockImplementation(() => Promise.resolve({} as unknown as WriteStream))
|
||||
}
|
||||
|
||||
const mockSimplePoll = (runningCount = 2) => {
|
||||
let count = 0
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
|
||||
count++
|
||||
if (url.includes('job')) {
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
}
|
||||
return Promise.resolve({
|
||||
result:
|
||||
count === 0
|
||||
? 'pending'
|
||||
: count <= runningCount
|
||||
? 'running'
|
||||
: 'completed',
|
||||
etag: '',
|
||||
status: 200
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const mockRunningPoll = () => {
|
||||
let count = 0
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
|
||||
count++
|
||||
if (url.includes('job')) {
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
}
|
||||
return Promise.resolve({
|
||||
result: count === 0 ? 'pending' : 'running',
|
||||
etag: '',
|
||||
status: 200
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const mockLongPoll = () => {
|
||||
let count = 0
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
|
||||
count++
|
||||
if (url.includes('job')) {
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
}
|
||||
return Promise.resolve({
|
||||
result: count <= 101 ? 'running' : 'completed',
|
||||
etag: '',
|
||||
status: 200
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const mockPollWithSingleError = () => {
|
||||
let count = 0
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
|
||||
count++
|
||||
if (url.includes('job')) {
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
}
|
||||
if (count === 1) {
|
||||
return Promise.reject('Status Error')
|
||||
}
|
||||
return Promise.resolve({
|
||||
result: count === 0 ? 'pending' : 'completed',
|
||||
etag: '',
|
||||
status: 200
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
const mockErroredPoll = () => {
|
||||
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
|
||||
if (url.includes('job')) {
|
||||
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
|
||||
}
|
||||
return Promise.reject('Status Error')
|
||||
})
|
||||
}
|
||||
73
src/api/viya/spec/saveLog.spec.ts
Normal file
73
src/api/viya/spec/saveLog.spec.ts
Normal file
@@ -0,0 +1,73 @@
|
||||
import { Logger, LogLevel } from '@sasjs/utils'
|
||||
import { RequestClient } from '../../../request/RequestClient'
|
||||
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
|
||||
import * as writeStreamModule from '../writeStream'
|
||||
import { saveLog } from '../saveLog'
|
||||
import { mockJob } from './mockResponses'
|
||||
import { WriteStream } from 'fs'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
const stream = {} as unknown as WriteStream
|
||||
|
||||
describe('saveLog', () => {
|
||||
beforeEach(() => {
|
||||
;(process as any).logger = new Logger(LogLevel.Off)
|
||||
setupMocks()
|
||||
})
|
||||
|
||||
it('should throw an error when a valid access token is not provided', async () => {
|
||||
const error = await saveLog(mockJob, requestClient, 0, 100, stream).catch(
|
||||
(e) => e
|
||||
)
|
||||
|
||||
expect(error.message).toContain(
|
||||
`Logs for job ${mockJob.id} cannot be fetched without a valid access token.`
|
||||
)
|
||||
})
|
||||
|
||||
it('should throw an error when the log URL is not available', async () => {
|
||||
const error = await saveLog(
|
||||
{ ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'log') },
|
||||
requestClient,
|
||||
0,
|
||||
100,
|
||||
stream,
|
||||
't0k3n'
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error.message).toContain(
|
||||
`Log URL for job ${mockJob.id} was not found.`
|
||||
)
|
||||
})
|
||||
|
||||
it('should fetch and save logs to the given path', async () => {
|
||||
await saveLog(mockJob, requestClient, 0, 100, stream, 't0k3n')
|
||||
|
||||
expect(fetchLogsModule.fetchLog).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
't0k3n',
|
||||
'/log/content',
|
||||
0,
|
||||
100
|
||||
)
|
||||
expect(writeStreamModule.writeStream).toHaveBeenCalledWith(
|
||||
stream,
|
||||
'Test Log'
|
||||
)
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../../request/RequestClient')
|
||||
jest.mock('../../../utils/fetchLogByChunks')
|
||||
jest.mock('@sasjs/utils')
|
||||
jest.mock('../writeStream')
|
||||
|
||||
jest
|
||||
.spyOn(fetchLogsModule, 'fetchLog')
|
||||
.mockImplementation(() => Promise.resolve('Test Log'))
|
||||
jest
|
||||
.spyOn(writeStreamModule, 'writeStream')
|
||||
.mockImplementation(() => Promise.resolve())
|
||||
}
|
||||
67
src/api/viya/spec/uploadTables.spec.ts
Normal file
67
src/api/viya/spec/uploadTables.spec.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { RequestClient } from '../../../request/RequestClient'
|
||||
import * as convertToCsvModule from '../../../utils/convertToCsv'
|
||||
import { uploadTables } from '../uploadTables'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
|
||||
describe('uploadTables', () => {
|
||||
beforeEach(() => {
|
||||
setupMocks()
|
||||
})
|
||||
|
||||
it('should return a list of uploaded files', async () => {
|
||||
const data = { foo: 'bar' }
|
||||
|
||||
const files = await uploadTables(requestClient, data, 't0k3n')
|
||||
|
||||
expect(files).toEqual([{ tableName: 'foo', file: 'test-file' }])
|
||||
expect(requestClient.uploadFile).toHaveBeenCalledTimes(1)
|
||||
expect(requestClient.uploadFile).toHaveBeenCalledWith(
|
||||
'/files/files#rawUpload',
|
||||
'Test CSV',
|
||||
't0k3n'
|
||||
)
|
||||
})
|
||||
|
||||
it('should throw an error when the CSV exceeds the maximum length', async () => {
|
||||
const data = { foo: 'bar' }
|
||||
jest
|
||||
.spyOn(convertToCsvModule, 'convertToCSV')
|
||||
.mockImplementation(() => 'ERROR: LARGE STRING LENGTH')
|
||||
|
||||
const error = await uploadTables(requestClient, data, 't0k3n').catch(
|
||||
(e) => e
|
||||
)
|
||||
|
||||
expect(requestClient.uploadFile).not.toHaveBeenCalled()
|
||||
expect(error.message).toEqual(
|
||||
'The max length of a string value in SASjs is 32765 characters.'
|
||||
)
|
||||
})
|
||||
|
||||
it('should throw an error when the file upload fails', async () => {
|
||||
const data = { foo: 'bar' }
|
||||
jest
|
||||
.spyOn(requestClient, 'uploadFile')
|
||||
.mockImplementation(() => Promise.reject('Upload Error'))
|
||||
|
||||
const error = await uploadTables(requestClient, data, 't0k3n').catch(
|
||||
(e) => e
|
||||
)
|
||||
|
||||
expect(error).toContain('Error while uploading file.')
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../../utils/convertToCsv')
|
||||
jest
|
||||
.spyOn(convertToCsvModule, 'convertToCSV')
|
||||
.mockImplementation(() => 'Test CSV')
|
||||
jest
|
||||
.spyOn(requestClient, 'uploadFile')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: 'test-file', etag: '' })
|
||||
)
|
||||
}
|
||||
37
src/api/viya/uploadTables.ts
Normal file
37
src/api/viya/uploadTables.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { convertToCSV } from '../../utils/convertToCsv'
|
||||
|
||||
/**
|
||||
* Uploads tables to SAS as specially formatted CSVs.
|
||||
* This is more compact than JSON, and easier to read within SAS.
|
||||
* @param requestClient - the pre-configured HTTP request client
|
||||
* @param data - the JSON representation of the data to be uploaded
|
||||
* @param accessToken - an optional access token for authentication/authorization
|
||||
* The access token is not required when uploading tables from the browser.
|
||||
*/
|
||||
export async function uploadTables(
|
||||
requestClient: RequestClient,
|
||||
data: any,
|
||||
accessToken?: string
|
||||
) {
|
||||
const uploadedFiles = []
|
||||
|
||||
for (const tableName in data) {
|
||||
const csv = convertToCSV(data[tableName])
|
||||
if (csv === 'ERROR: LARGE STRING LENGTH') {
|
||||
throw new Error(
|
||||
'The max length of a string value in SASjs is 32765 characters.'
|
||||
)
|
||||
}
|
||||
|
||||
const uploadResponse = await requestClient
|
||||
.uploadFile(`/files/files#rawUpload`, csv, accessToken)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while uploading file. ')
|
||||
})
|
||||
|
||||
uploadedFiles.push({ tableName, file: uploadResponse.result })
|
||||
}
|
||||
return uploadedFiles
|
||||
}
|
||||
15
src/api/viya/writeStream.ts
Normal file
15
src/api/viya/writeStream.ts
Normal file
@@ -0,0 +1,15 @@
|
||||
import { WriteStream } from 'fs'
|
||||
|
||||
export const writeStream = async (
|
||||
stream: WriteStream,
|
||||
content: string
|
||||
): Promise<void> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
stream.write(content + '\n\nnext chunk\n\n', (e) => {
|
||||
if (e) {
|
||||
return reject(e)
|
||||
}
|
||||
return resolve()
|
||||
})
|
||||
})
|
||||
}
|
||||
53
src/auth/getAccessToken.ts
Normal file
53
src/auth/getAccessToken.ts
Normal file
@@ -0,0 +1,53 @@
|
||||
import { SasAuthResponse } from '@sasjs/utils/types'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import { RequestClient } from '../request/RequestClient'
|
||||
|
||||
/**
|
||||
* Exchanges the auth code for an access token for the given client.
|
||||
* @param requestClient - the pre-configured HTTP request client
|
||||
* @param clientId - the client ID to authenticate with.
|
||||
* @param clientSecret - the client secret to authenticate with.
|
||||
* @param authCode - the auth code received from the server.
|
||||
*/
|
||||
export async function getAccessToken(
|
||||
requestClient: RequestClient,
|
||||
clientId: string,
|
||||
clientSecret: string,
|
||||
authCode: string
|
||||
): Promise<SasAuthResponse> {
|
||||
const url = '/SASLogon/oauth/token'
|
||||
let token
|
||||
if (typeof Buffer === 'undefined') {
|
||||
token = btoa(clientId + ':' + clientSecret)
|
||||
} else {
|
||||
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
|
||||
}
|
||||
const headers = {
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
|
||||
let formData
|
||||
if (typeof FormData === 'undefined') {
|
||||
formData = new NodeFormData()
|
||||
} else {
|
||||
formData = new FormData()
|
||||
}
|
||||
formData.append('grant_type', 'authorization_code')
|
||||
formData.append('code', authCode)
|
||||
|
||||
const authResponse = await requestClient
|
||||
.post(
|
||||
url,
|
||||
formData,
|
||||
undefined,
|
||||
'multipart/form-data; boundary=' + (formData as any)._boundary,
|
||||
headers
|
||||
)
|
||||
.then((res) => res.result as SasAuthResponse)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting access token')
|
||||
})
|
||||
|
||||
return authResponse
|
||||
}
|
||||
40
src/auth/getTokens.ts
Normal file
40
src/auth/getTokens.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
import {
|
||||
AuthConfig,
|
||||
isAccessTokenExpiring,
|
||||
isRefreshTokenExpiring,
|
||||
hasTokenExpired
|
||||
} from '@sasjs/utils'
|
||||
import { RequestClient } from '../request/RequestClient'
|
||||
import { refreshTokens } from './refreshTokens'
|
||||
|
||||
/**
|
||||
* Returns the auth configuration, refreshing the tokens if necessary.
|
||||
* @param requestClient - the pre-configured HTTP request client
|
||||
* @param authConfig - an object containing a client ID, secret, access token and refresh token
|
||||
*/
|
||||
export async function getTokens(
|
||||
requestClient: RequestClient,
|
||||
authConfig: AuthConfig
|
||||
): Promise<AuthConfig> {
|
||||
const logger = process.logger || console
|
||||
let { access_token, refresh_token, client, secret } = authConfig
|
||||
if (
|
||||
isAccessTokenExpiring(access_token) ||
|
||||
isRefreshTokenExpiring(refresh_token)
|
||||
) {
|
||||
if (hasTokenExpired(refresh_token)) {
|
||||
const error =
|
||||
'Unable to obtain new access token. Your refresh token has expired.'
|
||||
logger.error(error)
|
||||
throw new Error(error)
|
||||
}
|
||||
logger.info('Refreshing access and refresh tokens.')
|
||||
;({ access_token, refresh_token } = await refreshTokens(
|
||||
requestClient,
|
||||
client,
|
||||
secret,
|
||||
refresh_token
|
||||
))
|
||||
}
|
||||
return { access_token, refresh_token, client, secret }
|
||||
}
|
||||
49
src/auth/refreshTokens.ts
Normal file
49
src/auth/refreshTokens.ts
Normal file
@@ -0,0 +1,49 @@
|
||||
import { SasAuthResponse } from '@sasjs/utils/types'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import { RequestClient } from '../request/RequestClient'
|
||||
|
||||
/**
|
||||
* Exchanges the refresh token for an access token for the given client.
|
||||
* @param requestClient - the pre-configured HTTP request client
|
||||
* @param clientId - the client ID to authenticate with.
|
||||
* @param clientSecret - the client secret to authenticate with.
|
||||
* @param authCode - the refresh token received from the server.
|
||||
*/
|
||||
export async function refreshTokens(
|
||||
requestClient: RequestClient,
|
||||
clientId: string,
|
||||
clientSecret: string,
|
||||
refreshToken: string
|
||||
) {
|
||||
const url = '/SASLogon/oauth/token'
|
||||
let token
|
||||
token =
|
||||
typeof Buffer === 'undefined'
|
||||
? btoa(clientId + ':' + clientSecret)
|
||||
: Buffer.from(clientId + ':' + clientSecret).toString('base64')
|
||||
|
||||
const headers = {
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
|
||||
const formData =
|
||||
typeof FormData === 'undefined' ? new NodeFormData() : new FormData()
|
||||
formData.append('grant_type', 'refresh_token')
|
||||
formData.append('refresh_token', refreshToken)
|
||||
|
||||
const authResponse = await requestClient
|
||||
.post<SasAuthResponse>(
|
||||
url,
|
||||
formData,
|
||||
undefined,
|
||||
'multipart/form-data; boundary=' + (formData as any)._boundary,
|
||||
headers
|
||||
)
|
||||
.then((res) => res.result)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while refreshing tokens')
|
||||
})
|
||||
|
||||
return authResponse
|
||||
}
|
||||
75
src/auth/spec/getAccessToken.spec.ts
Normal file
75
src/auth/spec/getAccessToken.spec.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { AuthConfig } from '@sasjs/utils'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import { generateToken, mockAuthResponse } from './mockResponses'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { getAccessToken } from '../getAccessToken'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
|
||||
describe('getAccessToken', () => {
|
||||
it('should attempt to refresh tokens', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(30)
|
||||
const refresh_token = generateToken(30)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: mockAuthResponse, etag: '' })
|
||||
)
|
||||
const token = Buffer.from(
|
||||
authConfig.client + ':' + authConfig.secret
|
||||
).toString('base64')
|
||||
|
||||
await getAccessToken(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
)
|
||||
|
||||
expect(requestClient.post).toHaveBeenCalledWith(
|
||||
'/SASLogon/oauth/token',
|
||||
expect.any(NodeFormData),
|
||||
undefined,
|
||||
expect.stringContaining('multipart/form-data; boundary='),
|
||||
{
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle errors while refreshing tokens', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(30)
|
||||
const refresh_token = generateToken(30)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() => Promise.reject('Token Error'))
|
||||
|
||||
const error = await getAccessToken(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while getting access token')
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../request/RequestClient')
|
||||
}
|
||||
79
src/auth/spec/getTokens.spec.ts
Normal file
79
src/auth/spec/getTokens.spec.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import { AuthConfig } from '@sasjs/utils'
|
||||
import * as refreshTokensModule from '../refreshTokens'
|
||||
import { generateToken, mockAuthResponse } from './mockResponses'
|
||||
import { getTokens } from '../getTokens'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
|
||||
describe('getTokens', () => {
|
||||
it('should attempt to refresh tokens if the access token is expiring', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(30)
|
||||
const refresh_token = generateToken(86400000)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
|
||||
await getTokens(requestClient, authConfig)
|
||||
|
||||
expect(refreshTokensModule.refreshTokens).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should attempt to refresh tokens if the refresh token is expiring', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(86400000)
|
||||
const refresh_token = generateToken(30)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
|
||||
await getTokens(requestClient, authConfig)
|
||||
|
||||
expect(refreshTokensModule.refreshTokens).toHaveBeenCalledWith(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
)
|
||||
})
|
||||
|
||||
it('should throw an error if the refresh token has already expired', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(86400000)
|
||||
const refresh_token = generateToken(-36000)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
const expectedError =
|
||||
'Unable to obtain new access token. Your refresh token has expired.'
|
||||
|
||||
const error = await getTokens(requestClient, authConfig).catch((e) => e)
|
||||
|
||||
expect(error.message).toEqual(expectedError)
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../request/RequestClient')
|
||||
jest.mock('../refreshTokens')
|
||||
|
||||
jest
|
||||
.spyOn(refreshTokensModule, 'refreshTokens')
|
||||
.mockImplementation(() => Promise.resolve(mockAuthResponse))
|
||||
}
|
||||
@@ -1,2 +1,24 @@
|
||||
import { SasAuthResponse } from '@sasjs/utils/types'
|
||||
|
||||
export const mockLoginAuthoriseRequiredResponse = `<form id="application_authorization" action="/SASLogon/oauth/authorize" method="POST"><input type="hidden" name="X-Uaa-Csrf" value="2nfuxIn6WaOURWL7tzTXCe"/>`
|
||||
export const mockLoginSuccessResponse = `You have signed in`
|
||||
|
||||
export const mockAuthResponse: SasAuthResponse = {
|
||||
access_token: 'acc355',
|
||||
refresh_token: 'r3fr35h',
|
||||
id_token: 'id',
|
||||
token_type: 'bearer',
|
||||
expires_in: new Date().valueOf(),
|
||||
scope: 'default',
|
||||
jti: 'test'
|
||||
}
|
||||
|
||||
export const generateToken = (timeToLiveSeconds: number): string => {
|
||||
const exp =
|
||||
new Date(new Date().getTime() + timeToLiveSeconds * 1000).getTime() / 1000
|
||||
const header = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9'
|
||||
const payload = Buffer.from(JSON.stringify({ exp })).toString('base64')
|
||||
const signature = '4-iaDojEVl0pJQMjrbM1EzUIfAZgsbK_kgnVyVxFSVo'
|
||||
const token = `${header}.${payload}.${signature}`
|
||||
return token
|
||||
}
|
||||
|
||||
75
src/auth/spec/refreshTokens.spec.ts
Normal file
75
src/auth/spec/refreshTokens.spec.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { AuthConfig } from '@sasjs/utils'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import { generateToken, mockAuthResponse } from './mockResponses'
|
||||
import { RequestClient } from '../../request/RequestClient'
|
||||
import { refreshTokens } from '../refreshTokens'
|
||||
|
||||
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
|
||||
|
||||
describe('refreshTokens', () => {
|
||||
it('should attempt to refresh tokens', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(30)
|
||||
const refresh_token = generateToken(30)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() =>
|
||||
Promise.resolve({ result: mockAuthResponse, etag: '' })
|
||||
)
|
||||
const token = Buffer.from(
|
||||
authConfig.client + ':' + authConfig.secret
|
||||
).toString('base64')
|
||||
|
||||
await refreshTokens(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
)
|
||||
|
||||
expect(requestClient.post).toHaveBeenCalledWith(
|
||||
'/SASLogon/oauth/token',
|
||||
expect.any(NodeFormData),
|
||||
undefined,
|
||||
expect.stringContaining('multipart/form-data; boundary='),
|
||||
{
|
||||
Authorization: 'Basic ' + token
|
||||
}
|
||||
)
|
||||
})
|
||||
|
||||
it('should handle errors while refreshing tokens', async () => {
|
||||
setupMocks()
|
||||
const access_token = generateToken(30)
|
||||
const refresh_token = generateToken(30)
|
||||
const authConfig: AuthConfig = {
|
||||
access_token,
|
||||
refresh_token,
|
||||
client: 'cl13nt',
|
||||
secret: 's3cr3t'
|
||||
}
|
||||
jest
|
||||
.spyOn(requestClient, 'post')
|
||||
.mockImplementation(() => Promise.reject('Token Error'))
|
||||
|
||||
const error = await refreshTokens(
|
||||
requestClient,
|
||||
authConfig.client,
|
||||
authConfig.secret,
|
||||
authConfig.refresh_token
|
||||
).catch((e) => e)
|
||||
|
||||
expect(error).toContain('Error while refreshing tokens')
|
||||
})
|
||||
})
|
||||
|
||||
const setupMocks = () => {
|
||||
jest.restoreAllMocks()
|
||||
jest.mock('../../request/RequestClient')
|
||||
}
|
||||
@@ -43,6 +43,7 @@ export interface HttpClient {
|
||||
|
||||
getCsrfToken(type: 'general' | 'file'): CsrfToken | undefined
|
||||
clearCsrfTokens(): void
|
||||
getBaseUrl(): string
|
||||
}
|
||||
|
||||
export class RequestClient implements HttpClient {
|
||||
@@ -78,6 +79,10 @@ export class RequestClient implements HttpClient {
|
||||
this.fileUploadCsrfToken = { headerName: '', value: '' }
|
||||
}
|
||||
|
||||
public getBaseUrl() {
|
||||
return this.httpClient.defaults.baseURL || ''
|
||||
}
|
||||
|
||||
public async get<T>(
|
||||
url: string,
|
||||
accessToken: string | undefined,
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
export interface PollOptions {
|
||||
MAX_POLL_COUNT?: number
|
||||
POLL_INTERVAL?: number
|
||||
maxPollCount: number
|
||||
pollInterval: number
|
||||
streamLog: boolean
|
||||
logFolderPath?: string
|
||||
}
|
||||
|
||||
11
src/types/errors/JobStatePollError.ts
Normal file
11
src/types/errors/JobStatePollError.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
export class JobStatePollError extends Error {
|
||||
constructor(id: string, public originalError: Error) {
|
||||
super(
|
||||
`Error while polling job state for job ${id}: ${
|
||||
originalError.message || originalError
|
||||
}`
|
||||
)
|
||||
this.name = 'JobStatePollError'
|
||||
Object.setPrototypeOf(this, JobStatePollError.prototype)
|
||||
}
|
||||
}
|
||||
@@ -2,6 +2,7 @@ export * from './AuthorizeError'
|
||||
export * from './ComputeJobExecutionError'
|
||||
export * from './InternalServerError'
|
||||
export * from './JobExecutionError'
|
||||
export * from './JobStatePollError'
|
||||
export * from './LoginRequiredError'
|
||||
export * from './NotFoundError'
|
||||
export * from './ErrorResponse'
|
||||
|
||||
@@ -14,18 +14,36 @@ export const fetchLogByChunks = async (
|
||||
accessToken: string,
|
||||
logUrl: string,
|
||||
logCount: number
|
||||
): Promise<string> => {
|
||||
return await fetchLog(requestClient, accessToken, logUrl, 0, logCount)
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetches a section of the log file delineated by start and end lines
|
||||
* @param {object} requestClient - client object of Request Client.
|
||||
* @param {string} accessToken - an access token for an authorized user.
|
||||
* @param {string} logUrl - url of the log file.
|
||||
* @param {number} start - the line at which to start fetching the log.
|
||||
* @param {number} end - the line at which to stop fetching the log.
|
||||
* @returns an string containing log lines.
|
||||
*/
|
||||
export const fetchLog = async (
|
||||
requestClient: RequestClient,
|
||||
accessToken: string,
|
||||
logUrl: string,
|
||||
start: number,
|
||||
end: number
|
||||
): Promise<string> => {
|
||||
const logger = process.logger || console
|
||||
|
||||
let log: string = ''
|
||||
|
||||
const loglimit = logCount < 10000 ? logCount : 10000
|
||||
let start = 0
|
||||
const loglimit = end < 10000 ? end : 10000
|
||||
do {
|
||||
logger.info(
|
||||
`Fetching logs from line no: ${start + 1} to ${
|
||||
start + loglimit
|
||||
} of ${logCount}.`
|
||||
} of ${end}.`
|
||||
)
|
||||
const logChunkJson = await requestClient!
|
||||
.get<any>(`${logUrl}?start=${start}&limit=${loglimit}`, accessToken)
|
||||
@@ -40,6 +58,6 @@ export const fetchLogByChunks = async (
|
||||
log += logChunk
|
||||
|
||||
start += loglimit
|
||||
} while (start < logCount)
|
||||
} while (start < end)
|
||||
return log
|
||||
}
|
||||
|
||||
@@ -9,5 +9,5 @@
|
||||
"sourceMap": true
|
||||
},
|
||||
"include": ["src"],
|
||||
"exclude": ["node_modules", "**/*.spec.ts"]
|
||||
"exclude": ["node_modules"]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user