mirror of
https://github.com/sasjs/adapter.git
synced 2026-01-09 05:20:05 +00:00
fix(job-execution): refresh access token if it has expired during job status checks
This commit is contained in:
@@ -3,7 +3,9 @@ import {
|
||||
isRelativePath,
|
||||
isUri,
|
||||
isUrl,
|
||||
fetchLogByChunks
|
||||
fetchLogByChunks,
|
||||
isAccessTokenExpiring,
|
||||
isRefreshTokenExpiring
|
||||
} from './utils'
|
||||
import * as NodeFormData from 'form-data'
|
||||
import {
|
||||
@@ -29,7 +31,7 @@ import { timestampToYYYYMMDDHHMMSS } from '@sasjs/utils/time'
|
||||
import { Logger, LogLevel } from '@sasjs/utils/logger'
|
||||
import { isAuthorizeFormRequired } from './auth/isAuthorizeFormRequired'
|
||||
import { RequestClient } from './request/RequestClient'
|
||||
import { SasAuthResponse, MacroVar } from '@sasjs/utils/types'
|
||||
import { SasAuthResponse, MacroVar, AuthConfig } from '@sasjs/utils/types'
|
||||
import { prefixMessage } from '@sasjs/utils/error'
|
||||
import * as mime from 'mime'
|
||||
|
||||
@@ -266,7 +268,7 @@ export class SASViyaApiClient {
|
||||
* @param jobPath - the path to the file being submitted for execution.
|
||||
* @param linesOfCode - an array of code lines to execute.
|
||||
* @param contextName - the context to execute the code in.
|
||||
* @param accessToken - an access token for an authorized user.
|
||||
* @param authConfig - an object containing an access token, refresh token, client ID and secret.
|
||||
* @param data - execution data.
|
||||
* @param debug - when set to true, the log will be returned.
|
||||
* @param expectWebout - when set to true, the automatic _webout fileref will be checked for content, and that content returned. This fileref is used when the Job contains a SASjs web request (as opposed to executing arbitrary SAS code).
|
||||
@@ -279,7 +281,7 @@ export class SASViyaApiClient {
|
||||
jobPath: string,
|
||||
linesOfCode: string[],
|
||||
contextName: string,
|
||||
accessToken?: string,
|
||||
authConfig?: AuthConfig,
|
||||
data = null,
|
||||
debug: boolean = false,
|
||||
expectWebout = false,
|
||||
@@ -288,17 +290,20 @@ export class SASViyaApiClient {
|
||||
printPid = false,
|
||||
variables?: MacroVar
|
||||
): Promise<any> {
|
||||
const { access_token } = authConfig || {}
|
||||
const logger = process.logger || console
|
||||
|
||||
try {
|
||||
const headers: any = {
|
||||
'Content-Type': 'application/json'
|
||||
}
|
||||
|
||||
if (accessToken) headers.Authorization = `Bearer ${accessToken}`
|
||||
if (access_token) headers.Authorization = `Bearer ${access_token}`
|
||||
|
||||
let executionSessionId: string
|
||||
|
||||
const session = await this.sessionManager
|
||||
.getSession(accessToken)
|
||||
.getSession(access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session. ')
|
||||
})
|
||||
@@ -307,7 +312,7 @@ export class SASViyaApiClient {
|
||||
|
||||
if (printPid) {
|
||||
const { result: jobIdVariable } = await this.sessionManager
|
||||
.getVariable(executionSessionId, 'SYSJOBID', accessToken)
|
||||
.getVariable(executionSessionId, 'SYSJOBID', access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting session variable. ')
|
||||
})
|
||||
@@ -366,7 +371,7 @@ export class SASViyaApiClient {
|
||||
|
||||
if (data) {
|
||||
if (JSON.stringify(data).includes(';')) {
|
||||
files = await this.uploadTables(data, accessToken).catch((err) => {
|
||||
files = await this.uploadTables(data, access_token).catch((err) => {
|
||||
throw prefixMessage(err, 'Error while uploading tables. ')
|
||||
})
|
||||
|
||||
@@ -396,7 +401,7 @@ export class SASViyaApiClient {
|
||||
.post<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs`,
|
||||
jobRequestBody,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while posting job. ')
|
||||
@@ -405,8 +410,8 @@ export class SASViyaApiClient {
|
||||
if (!waitForResult) return session
|
||||
|
||||
if (debug) {
|
||||
console.log(`Job has been submitted for '${fileName}'.`)
|
||||
console.log(
|
||||
logger.info(`Job has been submitted for '${fileName}'.`)
|
||||
logger.info(
|
||||
`You can monitor the job progress at '${this.serverUrl}${
|
||||
postedJob.links.find((l: any) => l.rel === 'state')!.href
|
||||
}'.`
|
||||
@@ -416,7 +421,7 @@ export class SASViyaApiClient {
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
accessToken,
|
||||
authConfig,
|
||||
pollOptions
|
||||
).catch(async (err) => {
|
||||
const error = err?.response?.data
|
||||
@@ -429,7 +434,7 @@ export class SASViyaApiClient {
|
||||
const logCount = 1000000
|
||||
err.log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
accessToken!,
|
||||
access_token!,
|
||||
sessionLogUrl,
|
||||
logCount
|
||||
)
|
||||
@@ -440,7 +445,7 @@ export class SASViyaApiClient {
|
||||
const { result: currentJob } = await this.requestClient
|
||||
.get<Job>(
|
||||
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting job. ')
|
||||
@@ -456,7 +461,7 @@ export class SASViyaApiClient {
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
accessToken!,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
@@ -476,7 +481,7 @@ export class SASViyaApiClient {
|
||||
|
||||
if (resultLink) {
|
||||
jobResult = await this.requestClient
|
||||
.get<any>(resultLink, accessToken, 'text/plain')
|
||||
.get<any>(resultLink, access_token, 'text/plain')
|
||||
.catch(async (e) => {
|
||||
if (e instanceof NotFoundError) {
|
||||
if (logLink) {
|
||||
@@ -484,7 +489,7 @@ export class SASViyaApiClient {
|
||||
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
|
||||
log = await fetchLogByChunks(
|
||||
this.requestClient,
|
||||
accessToken!,
|
||||
access_token!,
|
||||
logUrl,
|
||||
logCount
|
||||
)
|
||||
@@ -503,7 +508,7 @@ export class SASViyaApiClient {
|
||||
}
|
||||
|
||||
await this.sessionManager
|
||||
.clearSession(executionSessionId, accessToken)
|
||||
.clearSession(executionSessionId, access_token)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while clearing session. ')
|
||||
})
|
||||
@@ -515,7 +520,7 @@ export class SASViyaApiClient {
|
||||
jobPath,
|
||||
linesOfCode,
|
||||
contextName,
|
||||
accessToken,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
false,
|
||||
@@ -872,13 +877,15 @@ export class SASViyaApiClient {
|
||||
contextName: string,
|
||||
debug?: boolean,
|
||||
data?: any,
|
||||
accessToken?: string,
|
||||
authConfig?: AuthConfig,
|
||||
waitForResult = true,
|
||||
expectWebout = false,
|
||||
pollOptions?: PollOptions,
|
||||
printPid = false,
|
||||
variables?: MacroVar
|
||||
) {
|
||||
let { access_token, refresh_token, client, secret } = authConfig || {}
|
||||
|
||||
if (isRelativePath(sasJob) && !this.rootFolderName) {
|
||||
throw new Error(
|
||||
'Relative paths cannot be used without specifying a root folder name'
|
||||
@@ -892,7 +899,7 @@ export class SASViyaApiClient {
|
||||
? `${this.rootFolderName}/${folderPath}`
|
||||
: folderPath
|
||||
|
||||
await this.populateFolderMap(fullFolderPath, accessToken).catch((err) => {
|
||||
await this.populateFolderMap(fullFolderPath, access_token).catch((err) => {
|
||||
throw prefixMessage(err, 'Error while populating folder map. ')
|
||||
})
|
||||
|
||||
@@ -906,8 +913,8 @@ export class SASViyaApiClient {
|
||||
|
||||
const headers: any = { 'Content-Type': 'application/json' }
|
||||
|
||||
if (!!accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`
|
||||
if (!!access_token) {
|
||||
headers.Authorization = `Bearer ${access_token}`
|
||||
}
|
||||
|
||||
const jobToExecute = jobFolder?.find((item) => item.name === jobName)
|
||||
@@ -930,7 +937,7 @@ export class SASViyaApiClient {
|
||||
const { result: jobDefinition } = await this.requestClient
|
||||
.get<JobDefinition>(
|
||||
`${this.serverUrl}${jobDefinitionLink.href}`,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
.catch((err) => {
|
||||
throw prefixMessage(err, 'Error while getting job definition. ')
|
||||
@@ -950,7 +957,7 @@ export class SASViyaApiClient {
|
||||
sasJob,
|
||||
linesToExecute,
|
||||
contextName,
|
||||
accessToken,
|
||||
authConfig,
|
||||
data,
|
||||
debug,
|
||||
expectWebout,
|
||||
@@ -974,8 +981,9 @@ export class SASViyaApiClient {
|
||||
contextName: string,
|
||||
debug: boolean,
|
||||
data?: any,
|
||||
accessToken?: string
|
||||
authConfig?: AuthConfig
|
||||
) {
|
||||
let { access_token, refresh_token, client, secret } = authConfig || {}
|
||||
if (isRelativePath(sasJob) && !this.rootFolderName) {
|
||||
throw new Error(
|
||||
'Relative paths cannot be used without specifying a root folder name.'
|
||||
@@ -988,7 +996,7 @@ export class SASViyaApiClient {
|
||||
const fullFolderPath = isRelativePath(sasJob)
|
||||
? `${this.rootFolderName}/${folderPath}`
|
||||
: folderPath
|
||||
await this.populateFolderMap(fullFolderPath, accessToken)
|
||||
await this.populateFolderMap(fullFolderPath, access_token)
|
||||
|
||||
const jobFolder = this.folderMap.get(fullFolderPath)
|
||||
if (!jobFolder) {
|
||||
@@ -1001,7 +1009,7 @@ export class SASViyaApiClient {
|
||||
|
||||
let files: any[] = []
|
||||
if (data && Object.keys(data).length) {
|
||||
files = await this.uploadTables(data, accessToken)
|
||||
files = await this.uploadTables(data, access_token)
|
||||
}
|
||||
|
||||
if (!jobToExecute) {
|
||||
@@ -1013,7 +1021,7 @@ export class SASViyaApiClient {
|
||||
|
||||
const { result: jobDefinition } = await this.requestClient.get<Job>(
|
||||
`${this.serverUrl}${jobDefinitionLink}`,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
|
||||
const jobArguments: { [key: string]: any } = {
|
||||
@@ -1049,18 +1057,18 @@ export class SASViyaApiClient {
|
||||
const { result: postedJob, etag } = await this.requestClient.post<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs?_action=wait`,
|
||||
postJobRequestBody,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
const jobStatus = await this.pollJobState(
|
||||
postedJob,
|
||||
etag,
|
||||
accessToken
|
||||
authConfig
|
||||
).catch((err) => {
|
||||
throw prefixMessage(err, 'Error while polling job status. ')
|
||||
})
|
||||
const { result: currentJob } = await this.requestClient.get<Job>(
|
||||
`${this.serverUrl}/jobExecution/jobs/${postedJob.id}`,
|
||||
accessToken
|
||||
access_token
|
||||
)
|
||||
|
||||
let jobResult
|
||||
@@ -1071,13 +1079,13 @@ export class SASViyaApiClient {
|
||||
if (resultLink) {
|
||||
jobResult = await this.requestClient.get<any>(
|
||||
`${this.serverUrl}${resultLink}/content`,
|
||||
accessToken,
|
||||
access_token,
|
||||
'text/plain'
|
||||
)
|
||||
}
|
||||
if (debug && logLink) {
|
||||
log = await this.requestClient
|
||||
.get<any>(`${this.serverUrl}${logLink.href}/content`, accessToken)
|
||||
.get<any>(`${this.serverUrl}${logLink.href}/content`, access_token)
|
||||
.then((res: any) => res.result.items.map((i: any) => i.line).join('\n'))
|
||||
}
|
||||
if (jobStatus === 'failed') {
|
||||
@@ -1127,12 +1135,28 @@ export class SASViyaApiClient {
|
||||
private async pollJobState(
|
||||
postedJob: any,
|
||||
etag: string | null,
|
||||
accessToken?: string,
|
||||
authConfig?: AuthConfig,
|
||||
pollOptions?: PollOptions
|
||||
) {
|
||||
let POLL_INTERVAL = 300
|
||||
let MAX_POLL_COUNT = 1000
|
||||
let MAX_ERROR_COUNT = 5
|
||||
let { access_token, refresh_token, client, secret } = authConfig || {}
|
||||
if (access_token && refresh_token) {
|
||||
if (
|
||||
client &&
|
||||
secret &&
|
||||
refresh_token &&
|
||||
(isAccessTokenExpiring(access_token) ||
|
||||
isRefreshTokenExpiring(refresh_token))
|
||||
) {
|
||||
;({ access_token, refresh_token } = await this.refreshTokens(
|
||||
client,
|
||||
secret,
|
||||
refresh_token
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
if (pollOptions) {
|
||||
POLL_INTERVAL = pollOptions.POLL_INTERVAL || POLL_INTERVAL
|
||||
@@ -1146,8 +1170,8 @@ export class SASViyaApiClient {
|
||||
'Content-Type': 'application/json',
|
||||
'If-None-Match': etag
|
||||
}
|
||||
if (accessToken) {
|
||||
headers.Authorization = `Bearer ${accessToken}`
|
||||
if (access_token) {
|
||||
headers.Authorization = `Bearer ${access_token}`
|
||||
}
|
||||
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
|
||||
if (!stateLink) {
|
||||
@@ -1157,7 +1181,7 @@ export class SASViyaApiClient {
|
||||
const { result: state } = await this.requestClient
|
||||
.get<string>(
|
||||
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
|
||||
accessToken,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
this.debug
|
||||
@@ -1185,11 +1209,27 @@ export class SASViyaApiClient {
|
||||
postedJobState === 'pending' ||
|
||||
postedJobState === 'unavailable'
|
||||
) {
|
||||
if (access_token && refresh_token) {
|
||||
if (
|
||||
client &&
|
||||
secret &&
|
||||
refresh_token &&
|
||||
(isAccessTokenExpiring(access_token) ||
|
||||
isRefreshTokenExpiring(refresh_token))
|
||||
) {
|
||||
;({ access_token, refresh_token } = await this.refreshTokens(
|
||||
client,
|
||||
secret,
|
||||
refresh_token
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
if (stateLink) {
|
||||
const { result: jobState } = await this.requestClient
|
||||
.get<string>(
|
||||
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
|
||||
accessToken,
|
||||
access_token,
|
||||
'text/plain',
|
||||
{},
|
||||
this.debug
|
||||
|
||||
Reference in New Issue
Block a user