diff --git a/src/SASViyaApiClient.ts b/src/SASViyaApiClient.ts index 068e780..3c1b324 100644 --- a/src/SASViyaApiClient.ts +++ b/src/SASViyaApiClient.ts @@ -1,10 +1,4 @@ -import { - convertToCSV, - isRelativePath, - isUri, - isUrl, - fetchLogByChunks -} from './utils' +import { isRelativePath, isUri, isUrl } from './utils' import * as NodeFormData from 'form-data' import { Job, @@ -17,28 +11,18 @@ import { JobDefinition, PollOptions } from './types' -import { - ComputeJobExecutionError, - JobExecutionError, - NotFoundError -} from './types/errors' -import { formatDataForRequest } from './utils/formatDataForRequest' +import { JobExecutionError } from './types/errors' import { SessionManager } from './SessionManager' import { ContextManager } from './ContextManager' -import { - timestampToYYYYMMDDHHMMSS, - isAccessTokenExpiring, - isRefreshTokenExpiring, - Logger, - LogLevel, - SasAuthResponse, - MacroVar, - AuthConfig -} from '@sasjs/utils' +import { SasAuthResponse, MacroVar, AuthConfig } from '@sasjs/utils' import { isAuthorizeFormRequired } from './auth/isAuthorizeFormRequired' import { RequestClient } from './request/RequestClient' import { prefixMessage } from '@sasjs/utils/error' import * as mime from 'mime' +import { pollJobState } from './api/viya/pollJobState' +import { getAccessToken, getTokens, refreshTokens } from './auth/tokens' +import { uploadTables } from './api/viya/uploadTables' +import { executeScript } from './api/viya/executeScript' /** * A client for interfacing with the SAS Viya REST API. @@ -174,13 +158,6 @@ export class SASViyaApiClient { throw new Error(`Execution context ${contextName} not found.`) } - const createSessionRequest = { - method: 'POST', - headers: { - Authorization: `Bearer ${accessToken}`, - 'Content-Type': 'application/json' - } - } const { result: createdSession } = await this.requestClient.post( `/compute/contexts/${executionContext.id}/sessions`, {}, @@ -295,249 +272,22 @@ export class SASViyaApiClient { printPid = false, variables?: MacroVar ): Promise { - let access_token = (authConfig || {}).access_token - if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) - } - - const logger = process.logger || console - - try { - let executionSessionId: string - - const session = await this.sessionManager - .getSession(access_token) - .catch((err) => { - throw prefixMessage(err, 'Error while getting session. ') - }) - - executionSessionId = session!.id - - if (printPid) { - const { result: jobIdVariable } = await this.sessionManager - .getVariable(executionSessionId, 'SYSJOBID', access_token) - .catch((err) => { - throw prefixMessage(err, 'Error while getting session variable. ') - }) - - if (jobIdVariable && jobIdVariable.value) { - const relativeJobPath = this.rootFolderName - ? jobPath.split(this.rootFolderName).join('').replace(/^\//, '') - : jobPath - - const logger = new Logger(debug ? LogLevel.Debug : LogLevel.Info) - - logger.info( - `Triggered '${relativeJobPath}' with PID ${ - jobIdVariable.value - } at ${timestampToYYYYMMDDHHMMSS()}` - ) - } - } - - const jobArguments: { [key: string]: any } = { - _contextName: contextName, - _OMITJSONLISTING: true, - _OMITJSONLOG: true, - _OMITSESSIONRESULTS: true, - _OMITTEXTLISTING: true, - _OMITTEXTLOG: true - } - - if (debug) { - jobArguments['_OMITTEXTLOG'] = false - jobArguments['_OMITSESSIONRESULTS'] = false - } - - let fileName - - if (isRelativePath(jobPath)) { - fileName = `exec-${ - jobPath.includes('/') ? jobPath.split('/')[1] : jobPath - }` - } else { - const jobPathParts = jobPath.split('/') - fileName = jobPathParts.pop() - } - - let jobVariables: any = { - SYS_JES_JOB_URI: '', - _program: isRelativePath(jobPath) - ? this.rootFolderName + '/' + jobPath - : jobPath - } - - if (variables) jobVariables = { ...jobVariables, ...variables } - - if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 } - - let files: any[] = [] - - if (data) { - if (JSON.stringify(data).includes(';')) { - files = await this.uploadTables(data, access_token).catch((err) => { - throw prefixMessage(err, 'Error while uploading tables. ') - }) - - jobVariables['_webin_file_count'] = files.length - - files.forEach((fileInfo, index) => { - jobVariables[ - `_webin_fileuri${index + 1}` - ] = `/files/files/${fileInfo.file.id}` - jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName - }) - } else { - jobVariables = { ...jobVariables, ...formatDataForRequest(data) } - } - } - - // Execute job in session - const jobRequestBody = { - name: fileName, - description: 'Powered by SASjs', - code: linesOfCode, - variables: jobVariables, - arguments: jobArguments - } - - const { result: postedJob, etag } = await this.requestClient - .post( - `/compute/sessions/${executionSessionId}/jobs`, - jobRequestBody, - access_token - ) - .catch((err) => { - throw prefixMessage(err, 'Error while posting job. ') - }) - - if (!waitForResult) return session - - if (debug) { - logger.info(`Job has been submitted for '${fileName}'.`) - logger.info( - `You can monitor the job progress at '${this.serverUrl}${ - postedJob.links.find((l: any) => l.rel === 'state')!.href - }'.` - ) - } - - const jobStatus = await this.pollJobState( - postedJob, - etag, - authConfig, - pollOptions - ).catch(async (err) => { - const error = err?.response?.data - const result = /err=[0-9]*,/.exec(error) - - const errorCode = '5113' - if (result?.[0]?.slice(4, -1) === errorCode) { - const sessionLogUrl = - postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log' - const logCount = 1000000 - err.log = await fetchLogByChunks( - this.requestClient, - access_token!, - sessionLogUrl, - logCount - ) - } - throw prefixMessage(err, 'Error while polling job status. ') - }) - - if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) - } - - const { result: currentJob } = await this.requestClient - .get( - `/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`, - access_token - ) - .catch((err) => { - throw prefixMessage(err, 'Error while getting job. ') - }) - - let jobResult - let log = '' - - const logLink = currentJob.links.find((l) => l.rel === 'log') - - if (debug && logLink) { - const logUrl = `${logLink.href}/content` - const logCount = currentJob.logStatistics?.lineCount ?? 1000000 - log = await fetchLogByChunks( - this.requestClient, - access_token!, - logUrl, - logCount - ) - } - - if (jobStatus === 'failed' || jobStatus === 'error') { - return Promise.reject(new ComputeJobExecutionError(currentJob, log)) - } - - let resultLink - - if (expectWebout) { - resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content` - } else { - return { job: currentJob, log } - } - - if (resultLink) { - jobResult = await this.requestClient - .get(resultLink, access_token, 'text/plain') - .catch(async (e) => { - if (e instanceof NotFoundError) { - if (logLink) { - const logUrl = `${logLink.href}/content` - const logCount = currentJob.logStatistics?.lineCount ?? 1000000 - log = await fetchLogByChunks( - this.requestClient, - access_token!, - logUrl, - logCount - ) - - return Promise.reject({ - status: 500, - log - }) - } - } - - return { - result: JSON.stringify(e) - } - }) - } - - await this.sessionManager - .clearSession(executionSessionId, access_token) - .catch((err) => { - throw prefixMessage(err, 'Error while clearing session. ') - }) - - return { result: jobResult?.result, log } - } catch (e) { - if (e && e.status === 404) { - return this.executeScript( - jobPath, - linesOfCode, - contextName, - authConfig, - data, - debug, - false, - true - ) - } else { - throw prefixMessage(e, 'Error while executing script. ') - } - } + return executeScript( + this.requestClient, + this.sessionManager, + this.rootFolderName, + jobPath, + linesOfCode, + contextName, + authConfig, + data, + debug, + expectWebout, + waitForResult, + pollOptions, + printPid, + variables + ) } /** @@ -772,37 +522,7 @@ export class SASViyaApiClient { clientSecret: string, authCode: string ): Promise { - const url = this.serverUrl + '/SASLogon/oauth/token' - let token - if (typeof Buffer === 'undefined') { - token = btoa(clientId + ':' + clientSecret) - } else { - token = Buffer.from(clientId + ':' + clientSecret).toString('base64') - } - const headers = { - Authorization: 'Basic ' + token - } - - let formData - if (typeof FormData === 'undefined') { - formData = new NodeFormData() - } else { - formData = new FormData() - } - formData.append('grant_type', 'authorization_code') - formData.append('code', authCode) - - const authResponse = await this.requestClient - .post( - url, - formData, - undefined, - 'multipart/form-data; boundary=' + (formData as any)._boundary, - headers - ) - .then((res) => res.result as SasAuthResponse) - - return authResponse + return getAccessToken(this.requestClient, clientId, clientSecret, authCode) } /** @@ -816,39 +536,12 @@ export class SASViyaApiClient { clientSecret: string, refreshToken: string ) { - const url = this.serverUrl + '/SASLogon/oauth/token' - let token - if (typeof Buffer === 'undefined') { - token = btoa(clientId + ':' + clientSecret) - } else { - token = Buffer.from(clientId + ':' + clientSecret).toString('base64') - } - const headers = { - Authorization: 'Basic ' + token - } - - let formData - if (typeof FormData === 'undefined') { - formData = new NodeFormData() - formData.append('grant_type', 'refresh_token') - formData.append('refresh_token', refreshToken) - } else { - formData = new FormData() - formData.append('grant_type', 'refresh_token') - formData.append('refresh_token', refreshToken) - } - - const authResponse = await this.requestClient - .post( - url, - formData, - undefined, - 'multipart/form-data; boundary=' + (formData as any)._boundary, - headers - ) - .then((res) => res.result) - - return authResponse + return refreshTokens( + this.requestClient, + clientId, + clientSecret, + refreshToken + ) } /** @@ -895,7 +588,7 @@ export class SASViyaApiClient { ) { let access_token = (authConfig || {}).access_token if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) + ;({ access_token } = await getTokens(this.requestClient, authConfig)) } if (isRelativePath(sasJob) && !this.rootFolderName) { @@ -991,7 +684,7 @@ export class SASViyaApiClient { ) { let access_token = (authConfig || {}).access_token if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) + ;({ access_token } = await getTokens(this.requestClient, authConfig)) } if (isRelativePath(sasJob) && !this.rootFolderName) { throw new Error( @@ -1140,157 +833,24 @@ export class SASViyaApiClient { this.folderMap.set(path, itemsAtRoot) } - // REFACTOR: set default value for 'pollOptions' attribute private async pollJobState( - postedJob: any, + postedJob: Job, etag: string | null, authConfig?: AuthConfig, pollOptions?: PollOptions ) { - const logger = process.logger || console - - let POLL_INTERVAL = 300 - let MAX_POLL_COUNT = 1000 - let MAX_ERROR_COUNT = 5 - let access_token = (authConfig || {}).access_token - if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) - } - - if (pollOptions) { - POLL_INTERVAL = pollOptions.POLL_INTERVAL || POLL_INTERVAL - MAX_POLL_COUNT = pollOptions.MAX_POLL_COUNT || MAX_POLL_COUNT - } - - let postedJobState = '' - let pollCount = 0 - let errorCount = 0 - const headers: any = { - 'Content-Type': 'application/json', - 'If-None-Match': etag - } - if (access_token) { - headers.Authorization = `Bearer ${access_token}` - } - const stateLink = postedJob.links.find((l: any) => l.rel === 'state') - if (!stateLink) { - Promise.reject(`Job state link was not found.`) - } - - const { result: state } = await this.requestClient - .get( - `${this.serverUrl}${stateLink.href}?_action=wait&wait=300`, - access_token, - 'text/plain', - {}, - this.debug - ) - .catch((err) => { - console.error( - `Error fetching job state from ${this.serverUrl}${stateLink.href}. Starting poll, assuming job to be running.`, - err - ) - return { result: 'unavailable' } - }) - - const currentState = state.trim() - if (currentState === 'completed') { - return Promise.resolve(currentState) - } - - return new Promise(async (resolve, _) => { - let printedState = '' - - const interval = setInterval(async () => { - if ( - postedJobState === 'running' || - postedJobState === '' || - postedJobState === 'pending' || - postedJobState === 'unavailable' - ) { - if (authConfig) { - ;({ access_token } = await this.getTokens(authConfig)) - } - - if (stateLink) { - const { result: jobState } = await this.requestClient - .get( - `${this.serverUrl}${stateLink.href}?_action=wait&wait=300`, - access_token, - 'text/plain', - {}, - this.debug - ) - .catch((err) => { - errorCount++ - if ( - pollCount >= MAX_POLL_COUNT || - errorCount >= MAX_ERROR_COUNT - ) { - throw prefixMessage( - err, - 'Error while getting job state after interval. ' - ) - } - console.error( - `Error fetching job state from ${this.serverUrl}${stateLink.href}. Resuming poll, assuming job to be running.`, - err - ) - return { result: 'unavailable' } - }) - - postedJobState = jobState.trim() - if (postedJobState != 'unavailable' && errorCount > 0) { - errorCount = 0 - } - - if (this.debug && printedState !== postedJobState) { - logger.info('Polling job status...') - logger.info(`Current job state: ${postedJobState}`) - - printedState = postedJobState - } - - pollCount++ - - if (pollCount >= MAX_POLL_COUNT) { - resolve(postedJobState) - } - } - } else { - clearInterval(interval) - resolve(postedJobState) - } - }, POLL_INTERVAL) - }) + return pollJobState( + this.requestClient, + postedJob, + this.debug, + etag, + authConfig, + pollOptions + ) } private async uploadTables(data: any, accessToken?: string) { - const uploadedFiles = [] - const headers: any = { - 'Content-Type': 'application/json' - } - if (accessToken) { - headers.Authorization = `Bearer ${accessToken}` - } - - for (const tableName in data) { - const csv = convertToCSV(data[tableName]) - if (csv === 'ERROR: LARGE STRING LENGTH') { - throw new Error( - 'The max length of a string value in SASjs is 32765 characters.' - ) - } - - const uploadResponse = await this.requestClient - .uploadFile(`${this.serverUrl}/files/files#rawUpload`, csv, accessToken) - .catch((err) => { - throw prefixMessage(err, 'Error while uploading file. ') - }) - - uploadedFiles.push({ tableName, file: uploadResponse.result }) - } - return uploadedFiles + return uploadTables(this.requestClient, data, accessToken) } private async getFolderDetails( @@ -1493,21 +1053,4 @@ export class SASViyaApiClient { return movedFolder } - - private async getTokens(authConfig: AuthConfig): Promise { - const logger = process.logger || console - let { access_token, refresh_token, client, secret } = authConfig - if ( - isAccessTokenExpiring(access_token) || - isRefreshTokenExpiring(refresh_token) - ) { - logger.info('Refreshing access and refresh tokens.') - ;({ access_token, refresh_token } = await this.refreshTokens( - client, - secret, - refresh_token - )) - } - return { access_token, refresh_token, client, secret } - } } diff --git a/src/api/viya/executeScript.ts b/src/api/viya/executeScript.ts new file mode 100644 index 0000000..0ea25c6 --- /dev/null +++ b/src/api/viya/executeScript.ts @@ -0,0 +1,303 @@ +import { + AuthConfig, + MacroVar, + Logger, + LogLevel, + timestampToYYYYMMDDHHMMSS +} from '@sasjs/utils' +import { prefixMessage } from '@sasjs/utils/error' +import { + PollOptions, + Job, + ComputeJobExecutionError, + NotFoundError +} from '../..' +import { getTokens } from '../../auth/tokens' +import { RequestClient } from '../../request/RequestClient' +import { SessionManager } from '../../SessionManager' +import { isRelativePath, fetchLogByChunks } from '../../utils' +import { formatDataForRequest } from '../../utils/formatDataForRequest' +import { pollJobState } from './pollJobState' +import { uploadTables } from './uploadTables' + +/** + * Executes code on the current SAS Viya server. + * @param jobPath - the path to the file being submitted for execution. + * @param linesOfCode - an array of code lines to execute. + * @param contextName - the context to execute the code in. + * @param authConfig - an object containing an access token, refresh token, client ID and secret. + * @param data - execution data. + * @param debug - when set to true, the log will be returned. + * @param expectWebout - when set to true, the automatic _webout fileref will be checked for content, and that content returned. This fileref is used when the Job contains a SASjs web request (as opposed to executing arbitrary SAS code). + * @param waitForResult - when set to true, function will return the session + * @param pollOptions - an object that represents poll interval(milliseconds) and maximum amount of attempts. Object example: { MAX_POLL_COUNT: 24 * 60 * 60, POLL_INTERVAL: 1000 }. + * @param printPid - a boolean that indicates whether the function should print (PID) of the started job. + * @param variables - an object that represents macro variables. + */ +export async function executeScript( + requestClient: RequestClient, + sessionManager: SessionManager, + rootFolderName: string, + jobPath: string, + linesOfCode: string[], + contextName: string, + authConfig?: AuthConfig, + data = null, + debug: boolean = false, + expectWebout = false, + waitForResult = true, + pollOptions?: PollOptions, + printPid = false, + variables?: MacroVar +): Promise { + let access_token = (authConfig || {}).access_token + if (authConfig) { + ;({ access_token } = await getTokens(requestClient, authConfig)) + } + + const logger = process.logger || console + + try { + let executionSessionId: string + + const session = await sessionManager + .getSession(access_token) + .catch((err) => { + throw prefixMessage(err, 'Error while getting session. ') + }) + + executionSessionId = session!.id + + if (printPid) { + const { result: jobIdVariable } = await sessionManager + .getVariable(executionSessionId, 'SYSJOBID', access_token) + .catch((err) => { + throw prefixMessage(err, 'Error while getting session variable. ') + }) + + if (jobIdVariable && jobIdVariable.value) { + const relativeJobPath = rootFolderName + ? jobPath.split(rootFolderName).join('').replace(/^\//, '') + : jobPath + + const logger = new Logger(debug ? LogLevel.Debug : LogLevel.Info) + + logger.info( + `Triggered '${relativeJobPath}' with PID ${ + jobIdVariable.value + } at ${timestampToYYYYMMDDHHMMSS()}` + ) + } + } + + const jobArguments: { [key: string]: any } = { + _contextName: contextName, + _OMITJSONLISTING: true, + _OMITJSONLOG: true, + _OMITSESSIONRESULTS: true, + _OMITTEXTLISTING: true, + _OMITTEXTLOG: true + } + + if (debug) { + jobArguments['_OMITTEXTLOG'] = false + jobArguments['_OMITSESSIONRESULTS'] = false + } + + let fileName + + if (isRelativePath(jobPath)) { + fileName = `exec-${ + jobPath.includes('/') ? jobPath.split('/')[1] : jobPath + }` + } else { + const jobPathParts = jobPath.split('/') + fileName = jobPathParts.pop() + } + + let jobVariables: any = { + SYS_JES_JOB_URI: '', + _program: isRelativePath(jobPath) + ? rootFolderName + '/' + jobPath + : jobPath + } + + if (variables) jobVariables = { ...jobVariables, ...variables } + + if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 } + + let files: any[] = [] + + if (data) { + if (JSON.stringify(data).includes(';')) { + files = await uploadTables(requestClient, data, access_token).catch( + (err) => { + throw prefixMessage(err, 'Error while uploading tables. ') + } + ) + + jobVariables['_webin_file_count'] = files.length + + files.forEach((fileInfo, index) => { + jobVariables[ + `_webin_fileuri${index + 1}` + ] = `/files/files/${fileInfo.file.id}` + jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName + }) + } else { + jobVariables = { ...jobVariables, ...formatDataForRequest(data) } + } + } + + // Execute job in session + const jobRequestBody = { + name: fileName, + description: 'Powered by SASjs', + code: linesOfCode, + variables: jobVariables, + arguments: jobArguments + } + + const { result: postedJob, etag } = await requestClient + .post( + `/compute/sessions/${executionSessionId}/jobs`, + jobRequestBody, + access_token + ) + .catch((err) => { + throw prefixMessage(err, 'Error while posting job. ') + }) + + if (!waitForResult) return session + + if (debug) { + logger.info(`Job has been submitted for '${fileName}'.`) + logger.info( + `You can monitor the job progress at '${requestClient.getBaseUrl()}${ + postedJob.links.find((l: any) => l.rel === 'state')!.href + }'.` + ) + } + + const jobStatus = await pollJobState( + requestClient, + postedJob, + debug, + etag, + authConfig, + pollOptions + ).catch(async (err) => { + const error = err?.response?.data + const result = /err=[0-9]*,/.exec(error) + + const errorCode = '5113' + if (result?.[0]?.slice(4, -1) === errorCode) { + const sessionLogUrl = + postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log' + const logCount = 1000000 + err.log = await fetchLogByChunks( + requestClient, + access_token!, + sessionLogUrl, + logCount + ) + } + throw prefixMessage(err, 'Error while polling job status. ') + }) + + if (authConfig) { + ;({ access_token } = await getTokens(requestClient, authConfig)) + } + + const { result: currentJob } = await requestClient + .get( + `/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`, + access_token + ) + .catch((err) => { + throw prefixMessage(err, 'Error while getting job. ') + }) + + let jobResult + let log = '' + + const logLink = currentJob.links.find((l) => l.rel === 'log') + + if (debug && logLink) { + const logUrl = `${logLink.href}/content` + const logCount = currentJob.logStatistics?.lineCount ?? 1000000 + log = await fetchLogByChunks( + requestClient, + access_token!, + logUrl, + logCount + ) + } + + if (jobStatus === 'failed' || jobStatus === 'error') { + return Promise.reject(new ComputeJobExecutionError(currentJob, log)) + } + + let resultLink + + if (expectWebout) { + resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content` + } else { + return { job: currentJob, log } + } + + if (resultLink) { + jobResult = await requestClient + .get(resultLink, access_token, 'text/plain') + .catch(async (e) => { + if (e instanceof NotFoundError) { + if (logLink) { + const logUrl = `${logLink.href}/content` + const logCount = currentJob.logStatistics?.lineCount ?? 1000000 + log = await fetchLogByChunks( + requestClient, + access_token!, + logUrl, + logCount + ) + + return Promise.reject({ + status: 500, + log + }) + } + } + + return { + result: JSON.stringify(e) + } + }) + } + + await sessionManager + .clearSession(executionSessionId, access_token) + .catch((err) => { + throw prefixMessage(err, 'Error while clearing session. ') + }) + + return { result: jobResult?.result, log } + } catch (e) { + if (e && e.status === 404) { + return executeScript( + requestClient, + sessionManager, + rootFolderName, + jobPath, + linesOfCode, + contextName, + authConfig, + data, + debug, + false, + true + ) + } else { + throw prefixMessage(e, 'Error while executing script. ') + } + } +} diff --git a/src/api/viya/pollJobState.ts b/src/api/viya/pollJobState.ts new file mode 100644 index 0000000..1bd85f8 --- /dev/null +++ b/src/api/viya/pollJobState.ts @@ -0,0 +1,178 @@ +import { AuthConfig, createFile, generateTimestamp } from '@sasjs/utils' +import { prefixMessage } from '@sasjs/utils/error' +import { Job, PollOptions } from '../..' +import { getTokens } from '../../auth/tokens' +import { RequestClient } from '../../request/RequestClient' +import { fetchLogByChunks } from '../../utils' + +export async function pollJobState( + requestClient: RequestClient, + postedJob: Job, + debug: boolean, + etag: string | null, + authConfig?: AuthConfig, + pollOptions?: PollOptions +) { + const logger = process.logger || console + + let POLL_INTERVAL = 300 + let MAX_POLL_COUNT = 1000 + let MAX_ERROR_COUNT = 5 + let access_token = (authConfig || {}).access_token + if (authConfig) { + ;({ access_token } = await getTokens(requestClient, authConfig)) + } + + if (pollOptions) { + POLL_INTERVAL = pollOptions.pollInterval || POLL_INTERVAL + MAX_POLL_COUNT = pollOptions.maxPollCount || MAX_POLL_COUNT + } + + let postedJobState = '' + let pollCount = 0 + let errorCount = 0 + const headers: any = { + 'Content-Type': 'application/json', + 'If-None-Match': etag + } + if (access_token) { + headers.Authorization = `Bearer ${access_token}` + } + const stateLink = postedJob.links.find((l: any) => l.rel === 'state') + if (!stateLink) { + return Promise.reject(`Job state link was not found.`) + } + + const { result: state } = await requestClient + .get( + `${stateLink.href}?_action=wait&wait=300`, + access_token, + 'text/plain', + {}, + debug + ) + .catch((err) => { + logger.error( + `Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`, + err + ) + return { result: 'unavailable' } + }) + + const currentState = state.trim() + if (currentState === 'completed') { + return Promise.resolve(currentState) + } + + return new Promise(async (resolve, _) => { + let printedState = '' + + const interval = setInterval(async () => { + if ( + postedJobState === 'running' || + postedJobState === '' || + postedJobState === 'pending' || + postedJobState === 'unavailable' + ) { + if (authConfig) { + ;({ access_token } = await getTokens(requestClient, authConfig)) + } + + if (stateLink) { + const { result: jobState } = await requestClient + .get( + `${stateLink.href}?_action=wait&wait=300`, + access_token, + 'text/plain', + {}, + debug + ) + .catch((err) => { + errorCount++ + if ( + pollCount >= MAX_POLL_COUNT || + errorCount >= MAX_ERROR_COUNT + ) { + throw prefixMessage( + err, + 'Error while getting job state after interval. ' + ) + } + logger.error( + `Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`, + err + ) + return { result: 'unavailable' } + }) + + postedJobState = jobState.trim() + if (postedJobState != 'unavailable' && errorCount > 0) { + errorCount = 0 + } + + if (debug && printedState !== postedJobState) { + logger.info('Polling job status...') + logger.info(`Current job state: ${postedJobState}`) + + printedState = postedJobState + } + + pollCount++ + + await saveLog( + postedJob, + requestClient, + pollOptions?.streamLog || false, + pollOptions?.logFilePath, + access_token + ) + + if (pollCount >= MAX_POLL_COUNT) { + resolve(postedJobState) + } + } + } else { + clearInterval(interval) + resolve(postedJobState) + } + }, POLL_INTERVAL) + }) +} + +async function saveLog( + job: Job, + requestClient: RequestClient, + shouldSaveLog: boolean, + logFilePath?: string, + accessToken?: string +) { + if (!shouldSaveLog) { + return + } + + if (!accessToken) { + throw new Error( + `Logs for job ${job.id} cannot be fetched without a valid access token.` + ) + } + + const logger = process.logger || console + const logFileName = `${job.name || 'job'}-${generateTimestamp()}.log` + const logPath = `${logFilePath || process.cwd()}/${logFileName}` + const jobLogUrl = job.links.find((l) => l.rel === 'log') + + if (!jobLogUrl) { + throw new Error(`Log URL for job ${job.id} was not found.`) + } + + const logCount = job.logStatistics?.lineCount ?? 1000000 + const log = await fetchLogByChunks( + requestClient, + accessToken, + `${jobLogUrl.href}/content`, + logCount + ) + + logger.info(`Writing logs to ${logPath}`) + await createFile(logPath, log) +} diff --git a/src/api/viya/uploadTables.ts b/src/api/viya/uploadTables.ts new file mode 100644 index 0000000..13bf7b8 --- /dev/null +++ b/src/api/viya/uploadTables.ts @@ -0,0 +1,35 @@ +import { prefixMessage } from '@sasjs/utils/error' +import { RequestClient } from '../../request/RequestClient' +import { convertToCSV } from '../../utils/convertToCsv' + +export async function uploadTables( + requestClient: RequestClient, + data: any, + accessToken?: string +) { + const uploadedFiles = [] + const headers: any = { + 'Content-Type': 'application/json' + } + if (accessToken) { + headers.Authorization = `Bearer ${accessToken}` + } + + for (const tableName in data) { + const csv = convertToCSV(data[tableName]) + if (csv === 'ERROR: LARGE STRING LENGTH') { + throw new Error( + 'The max length of a string value in SASjs is 32765 characters.' + ) + } + + const uploadResponse = await requestClient + .uploadFile(`/files/files#rawUpload`, csv, accessToken) + .catch((err) => { + throw prefixMessage(err, 'Error while uploading file. ') + }) + + uploadedFiles.push({ tableName, file: uploadResponse.result }) + } + return uploadedFiles +} diff --git a/src/auth/tokens.ts b/src/auth/tokens.ts new file mode 100644 index 0000000..bc940e2 --- /dev/null +++ b/src/auth/tokens.ts @@ -0,0 +1,122 @@ +import { + AuthConfig, + isAccessTokenExpiring, + isRefreshTokenExpiring, + SasAuthResponse +} from '@sasjs/utils' +import * as NodeFormData from 'form-data' +import { RequestClient } from '../request/RequestClient' + +/** + * Exchanges the auth code for an access token for the given client. + * @param requestClient - the pre-configured HTTP request client + * @param clientId - the client ID to authenticate with. + * @param clientSecret - the client secret to authenticate with. + * @param authCode - the auth code received from the server. + */ +export async function getAccessToken( + requestClient: RequestClient, + clientId: string, + clientSecret: string, + authCode: string +): Promise { + const url = '/SASLogon/oauth/token' + let token + if (typeof Buffer === 'undefined') { + token = btoa(clientId + ':' + clientSecret) + } else { + token = Buffer.from(clientId + ':' + clientSecret).toString('base64') + } + const headers = { + Authorization: 'Basic ' + token + } + + let formData + if (typeof FormData === 'undefined') { + formData = new NodeFormData() + } else { + formData = new FormData() + } + formData.append('grant_type', 'authorization_code') + formData.append('code', authCode) + + const authResponse = await requestClient + .post( + url, + formData, + undefined, + 'multipart/form-data; boundary=' + (formData as any)._boundary, + headers + ) + .then((res) => res.result as SasAuthResponse) + + return authResponse +} + +/** + * Returns the auth configuration, refreshing the tokens if necessary. + * @param requestClient - the pre-configured HTTP request client + * @param authConfig - an object containing a client ID, secret, access token and refresh token + */ +export async function getTokens( + requestClient: RequestClient, + authConfig: AuthConfig +): Promise { + const logger = process.logger || console + let { access_token, refresh_token, client, secret } = authConfig + if ( + isAccessTokenExpiring(access_token) || + isRefreshTokenExpiring(refresh_token) + ) { + logger.info('Refreshing access and refresh tokens.') + ;({ access_token, refresh_token } = await refreshTokens( + requestClient, + client, + secret, + refresh_token + )) + } + return { access_token, refresh_token, client, secret } +} + +/** + * Exchanges the refresh token for an access token for the given client. + * @param requestClient - the pre-configured HTTP request client + * @param clientId - the client ID to authenticate with. + * @param clientSecret - the client secret to authenticate with. + * @param authCode - the refresh token received from the server. + */ +export async function refreshTokens( + requestClient: RequestClient, + clientId: string, + clientSecret: string, + refreshToken: string +) { + const url = '/SASLogon/oauth/token' + let token + if (typeof Buffer === 'undefined') { + token = btoa(clientId + ':' + clientSecret) + } else { + token = Buffer.from(clientId + ':' + clientSecret).toString('base64') + } + const headers = { + Authorization: 'Basic ' + token + } + + const formData = + typeof FormData === 'undefined' ? new NodeFormData() : new FormData() + formData.append('grant_type', 'refresh_token') + formData.append('refresh_token', refreshToken) + + const authResponse = await requestClient + .post( + url, + formData, + undefined, + 'multipart/form-data; boundary=' + (formData as any)._boundary, + headers + ) + .then((res) => res.result) + + return authResponse +} diff --git a/src/request/RequestClient.ts b/src/request/RequestClient.ts index eb6aac2..2701986 100644 --- a/src/request/RequestClient.ts +++ b/src/request/RequestClient.ts @@ -43,6 +43,7 @@ export interface HttpClient { getCsrfToken(type: 'general' | 'file'): CsrfToken | undefined clearCsrfTokens(): void + getBaseUrl(): string } export class RequestClient implements HttpClient { @@ -78,6 +79,10 @@ export class RequestClient implements HttpClient { this.fileUploadCsrfToken = { headerName: '', value: '' } } + public getBaseUrl() { + return this.httpClient.defaults.baseURL || '' + } + public async get( url: string, accessToken: string | undefined, diff --git a/src/types/PollOptions.ts b/src/types/PollOptions.ts index 0472f0c..6daa1b7 100644 --- a/src/types/PollOptions.ts +++ b/src/types/PollOptions.ts @@ -1,4 +1,6 @@ export interface PollOptions { - MAX_POLL_COUNT?: number - POLL_INTERVAL?: number + maxPollCount: number + pollInterval: number + streamLog: boolean + logFilePath?: string }