1
0
mirror of https://github.com/sasjs/adapter.git synced 2026-01-09 13:30:04 +00:00

fix(*): separate job execution code from main SASjs class

This commit is contained in:
Krishna Acondy
2021-01-27 20:30:13 +00:00
parent e0d85f458b
commit d7ecaf5932
18 changed files with 798 additions and 714 deletions

View File

@@ -1,30 +1,16 @@
import {
convertToCSV,
compareTimestamps,
splitChunks,
parseSourceCode,
parseGeneratedCode,
parseWeboutResponse,
needsRetry,
asyncForEach,
isRelativePath
} from './utils'
import {
SASjsConfig,
SASjsRequest,
SASjsWaitingRequest,
CsrfToken,
UploadFile,
EditContextInput,
ErrorResponse,
PollOptions
} from './types'
import { compareTimestamps, asyncForEach } from './utils'
import { SASjsConfig, UploadFile, EditContextInput, PollOptions } from './types'
import { SASViyaApiClient } from './SASViyaApiClient'
import { SAS9ApiClient } from './SAS9ApiClient'
import { FileUploader } from './FileUploader'
import { isLogInRequired, AuthManager } from './auth'
import { AuthManager } from './auth'
import { ServerType } from '@sasjs/utils/types'
import { RequestClient } from './request/client'
import { RequestClient } from './request/RequestClient'
import {
JobExecutor,
WebJobExecutor,
ComputeJobExecutor
} from './job-execution'
const defaultConfig: SASjsConfig = {
serverUrl: '',
@@ -37,8 +23,6 @@ const defaultConfig: SASjsConfig = {
useComputeApi: false
}
const requestRetryLimit = 5
/**
* SASjs is a JavaScript adapter for SAS.
*
@@ -46,17 +30,14 @@ const requestRetryLimit = 5
export default class SASjs {
private sasjsConfig: SASjsConfig = new SASjsConfig()
private jobsPath: string = ''
private csrfTokenWeb: CsrfToken | null = null
private retryCountWeb: number = 0
private retryCountComputeApi: number = 0
private retryCountJeseApi: number = 0
private sasjsRequests: SASjsRequest[] = []
private sasjsWaitingRequests: SASjsWaitingRequest[] = []
private sasViyaApiClient: SASViyaApiClient | null = null
private sas9ApiClient: SAS9ApiClient | null = null
private fileUploader: FileUploader | null = null
private authManager: AuthManager | null = null
private requestClient: RequestClient | null = null
private webJobExecutor: JobExecutor | null = null
private computeJobExecutor: JobExecutor | null = null
private jesJobExecutor: JobExecutor | null = null
constructor(config?: any) {
this.sasjsConfig = {
@@ -260,6 +241,11 @@ export default class SASjs {
debug?: boolean
) {
this.isMethodSupported('executeScriptSASViya', ServerType.SasViya)
if (!contextName) {
throw new Error(
'Context name is undefined. Please set a `contextName` in your SASjs or override config.'
)
}
return await this.sasViyaApiClient!.executeScript(
fileName,
@@ -514,8 +500,6 @@ export default class SASjs {
loginRequiredCallback?: any,
accessToken?: string
) {
let requestResponse
config = {
...this.sasjsConfig,
...config
@@ -523,36 +507,30 @@ export default class SASjs {
if (config.serverType === ServerType.SasViya && config.contextName) {
if (config.useComputeApi) {
requestResponse = await this.executeJobViaComputeApi(
return await this.computeJobExecutor!.execute(
sasJob,
data,
config,
loginRequiredCallback,
accessToken
)
this.retryCountComputeApi = 0
} else {
requestResponse = await this.executeJobViaJesApi(
return await this.jesJobExecutor!.execute(
sasJob,
data,
config,
loginRequiredCallback,
accessToken
)
this.retryCountJeseApi = 0
}
} else {
requestResponse = await this.executeJobViaWeb(
return await this.webJobExecutor!.execute(
sasJob,
data,
config,
loginRequiredCallback
)
}
return requestResponse
}
/**
@@ -674,576 +652,10 @@ export default class SASjs {
)
}
private async executeJobViaComputeApi(
sasJob: string,
data: any,
config: any,
loginRequiredCallback?: any,
accessToken?: string
) {
const sasjsWaitingRequest: SASjsWaitingRequest = {
requestPromise: {
promise: null,
resolve: null,
reject: null
},
SASjob: sasJob,
data
}
sasjsWaitingRequest.requestPromise.promise = new Promise(
async (resolve, reject) => {
const waitForResult = true
const expectWebout = true
this.sasViyaApiClient
?.executeComputeJob(
sasJob,
config.contextName,
config.debug,
data,
accessToken,
waitForResult,
expectWebout
)
.then((response) => {
if (!config.debug) {
this.appendSasjsRequest(null, sasJob, null)
} else {
this.appendSasjsRequest(response, sasJob, null)
}
let responseJson
try {
if (typeof response!.result === 'string') {
responseJson = JSON.parse(response!.result)
} else {
responseJson = response!.result
}
} catch {
responseJson = JSON.parse(parseWeboutResponse(response!.result))
}
resolve(responseJson)
})
.catch(async (response) => {
let error = response.error || response
if (needsRetry(JSON.stringify(error))) {
if (this.retryCountComputeApi < requestRetryLimit) {
let retryResponse = await this.executeJobViaComputeApi(
sasJob,
data,
config,
loginRequiredCallback,
accessToken
)
this.retryCountComputeApi++
resolve(retryResponse)
} else {
this.retryCountComputeApi = 0
reject(
new ErrorResponse('Compute API retry requests limit reached.')
)
}
}
if (response?.log) {
this.appendSasjsRequest(response.log, sasJob, null)
}
if (error.toString().includes('Job was not found')) {
reject(
new ErrorResponse('Service not found on the server.', {
sasJob: sasJob
})
)
}
if (error && error.status === 401) {
if (loginRequiredCallback) loginRequiredCallback(true)
sasjsWaitingRequest.requestPromise.resolve = resolve
sasjsWaitingRequest.requestPromise.reject = reject
sasjsWaitingRequest.config = config
this.sasjsWaitingRequests.push(sasjsWaitingRequest)
} else {
reject(new ErrorResponse('Job execution failed.', error))
}
})
}
)
return sasjsWaitingRequest.requestPromise.promise
}
private async executeJobViaJesApi(
sasJob: string,
data: any,
config: any,
loginRequiredCallback?: any,
accessToken?: string
) {
const sasjsWaitingRequest: SASjsWaitingRequest = {
requestPromise: {
promise: null,
resolve: null,
reject: null
},
SASjob: sasJob,
data
}
sasjsWaitingRequest.requestPromise.promise = new Promise(
async (resolve, reject) => {
const session = await this.checkSession()
if (!session.isLoggedIn && !accessToken) {
if (loginRequiredCallback) loginRequiredCallback(true)
sasjsWaitingRequest.requestPromise.resolve = resolve
sasjsWaitingRequest.requestPromise.reject = reject
sasjsWaitingRequest.config = config
this.sasjsWaitingRequests.push(sasjsWaitingRequest)
} else {
resolve(
await this.sasViyaApiClient
?.executeJob(
sasJob,
config.contextName,
config.debug,
data,
accessToken
)
.then((response) => {
if (!config.debug) {
this.appendSasjsRequest(null, sasJob, null)
} else {
this.appendSasjsRequest(response, sasJob, null)
}
let responseJson
try {
if (typeof response!.result === 'string') {
responseJson = JSON.parse(response!.result)
} else {
responseJson = response!.result
}
} catch {
responseJson = JSON.parse(
parseWeboutResponse(response!.result)
)
}
return responseJson
})
.catch(async (response) => {
if (needsRetry(JSON.stringify(response))) {
if (this.retryCountJeseApi < requestRetryLimit) {
let retryResponse = await this.executeJobViaJesApi(
sasJob,
data,
config,
loginRequiredCallback,
accessToken
)
this.retryCountJeseApi++
resolve(retryResponse)
} else {
this.retryCountJeseApi = 0
reject(
new ErrorResponse('Jes API retry requests limit reached.')
)
}
}
if (response?.log) {
this.appendSasjsRequest(response.log, sasJob, null)
}
if (response.toString().includes('Job was not found')) {
reject(
new ErrorResponse('Service not found on the server.', {
sasJob: sasJob
})
)
}
reject(new ErrorResponse('Job execution failed.', response))
})
)
}
}
)
return sasjsWaitingRequest.requestPromise.promise
}
private async executeJobViaWeb(
sasJob: string,
data: any,
config: any,
loginRequiredCallback?: any
) {
const sasjsWaitingRequest: SASjsWaitingRequest = {
requestPromise: {
promise: null,
resolve: null,
reject: null
},
SASjob: sasJob,
data
}
const program = isRelativePath(sasJob)
? config.appLoc
? config.appLoc.replace(/\/?$/, '/') + sasJob.replace(/^\//, '')
: sasJob
: sasJob
const jobUri =
config.serverType === ServerType.SasViya
? await this.getJobUri(sasJob)
: ''
const apiUrl = `${config.serverUrl}${this.jobsPath}/?${
jobUri.length > 0
? '__program=' + program + '&_job=' + jobUri
: '_program=' + program
}`
const requestParams = {
...this.getRequestParamsWeb(config)
}
const formData = new FormData()
let isError = false
let errorMsg = ''
if (data) {
const stringifiedData = JSON.stringify(data)
if (
config.serverType === ServerType.Sas9 ||
stringifiedData.length > 500000 ||
stringifiedData.includes(';')
) {
// file upload approach
for (const tableName in data) {
if (isError) {
return
}
const name = tableName
const csv = convertToCSV(data[tableName])
if (csv === 'ERROR: LARGE STRING LENGTH') {
isError = true
errorMsg =
'The max length of a string value in SASjs is 32765 characters.'
}
const file = new Blob([csv], {
type: 'application/csv'
})
formData.append(name, file, `${name}.csv`)
}
} else {
// param based approach
const sasjsTables = []
let tableCounter = 0
for (const tableName in data) {
if (isError) {
return
}
tableCounter++
sasjsTables.push(tableName)
const csv = convertToCSV(data[tableName])
if (csv === 'ERROR: LARGE STRING LENGTH') {
isError = true
errorMsg =
'The max length of a string value in SASjs is 32765 characters.'
}
// if csv has length more then 16k, send in chunks
if (csv.length > 16000) {
const csvChunks = splitChunks(csv)
// append chunks to form data with same key
csvChunks.map((chunk) => {
formData.append(`sasjs${tableCounter}data`, chunk)
})
} else {
requestParams[`sasjs${tableCounter}data`] = csv
}
}
requestParams['sasjs_tables'] = sasjsTables.join(' ')
}
}
for (const key in requestParams) {
if (requestParams.hasOwnProperty(key)) {
formData.append(key, requestParams[key])
}
}
let isRedirected = false
sasjsWaitingRequest.requestPromise.promise = new Promise(
(resolve, reject) => {
if (isError) {
reject(new ErrorResponse(errorMsg))
}
const headers: any = {}
if (this.csrfTokenWeb) {
headers[this.csrfTokenWeb.headerName] = this.csrfTokenWeb.value
}
fetch(apiUrl, {
method: 'POST',
body: formData,
referrerPolicy: 'same-origin',
headers
})
.then(async (response) => {
if (!response.ok) {
if (response.status === 403) {
const tokenHeader = response.headers.get('X-CSRF-HEADER')
if (tokenHeader) {
const token = response.headers.get(tokenHeader)
this.csrfTokenWeb = {
headerName: tokenHeader,
value: token || ''
}
}
}
}
if (response.redirected && config.serverType === ServerType.Sas9) {
isRedirected = true
}
return response.text()
})
.then((responseText) => {
if (
(needsRetry(responseText) || isRedirected) &&
!isLogInRequired(responseText)
) {
if (this.retryCountWeb < requestRetryLimit) {
this.retryCountWeb++
this.request(sasJob, data, config, loginRequiredCallback).then(
(res: any) => resolve(res),
(err: any) => reject(err)
)
} else {
this.retryCountWeb = 0
reject(responseText)
}
} else {
this.retryCountWeb = 0
this.parseLogFromResponse(responseText, program)
if (isLogInRequired(responseText)) {
if (loginRequiredCallback) loginRequiredCallback(true)
sasjsWaitingRequest.requestPromise.resolve = resolve
sasjsWaitingRequest.requestPromise.reject = reject
sasjsWaitingRequest.config = config
this.sasjsWaitingRequests.push(sasjsWaitingRequest)
} else {
if (config.serverType === ServerType.Sas9 && config.debug) {
const jsonResponseText = parseWeboutResponse(responseText)
if (jsonResponseText !== '') {
resolve(JSON.parse(jsonResponseText))
} else {
reject(
new ErrorResponse(
'Job WEB execution failed.',
this.parseSAS9ErrorResponse(responseText)
)
)
}
} else if (
config.serverType === ServerType.SasViya &&
config.debug
) {
try {
this.parseSASVIYADebugResponse(responseText).then(
(resText: any) => {
try {
resolve(JSON.parse(resText))
} catch (e) {
reject(
new ErrorResponse(
'Job WEB debug response parsing failed.',
{ response: resText, exception: e }
)
)
}
},
(err: any) => {
reject(
new ErrorResponse(
'Job WEB debug response parsing failed.',
err
)
)
}
)
} catch (e) {
reject(
new ErrorResponse(
'Job WEB debug response parsing failed.',
{ response: responseText, exception: e }
)
)
}
} else {
if (
responseText.includes(
'The requested URL /SASStoredProcess/do/ was not found on this server.'
) ||
responseText.includes('Stored process not found')
) {
reject(
new ErrorResponse(
'Service not found on the server.',
{ service: sasJob },
responseText
)
)
}
try {
const parsedJson = JSON.parse(responseText)
resolve(parsedJson)
} catch (e) {
reject(
new ErrorResponse('Job WEB response parsing failed.', {
response: responseText,
exception: e
})
)
}
}
}
}
})
.catch((e: Error) => {
reject(new ErrorResponse('Job WEB request failed.', e))
})
}
)
return sasjsWaitingRequest.requestPromise.promise
}
private resendWaitingRequests = async () => {
for (const sasjsWaitingRequest of this.sasjsWaitingRequests) {
this.request(sasjsWaitingRequest.SASjob, sasjsWaitingRequest.data).then(
(res: any) => {
sasjsWaitingRequest.requestPromise.resolve(res)
},
(err: any) => {
sasjsWaitingRequest.requestPromise.reject(err)
}
)
}
this.sasjsWaitingRequests = []
}
private getRequestParamsWeb(config: any): any {
const requestParams: any = {}
if (this.csrfTokenWeb) {
requestParams['_csrf'] = this.csrfTokenWeb.value
}
if (config.debug) {
requestParams['_omittextlog'] = 'false'
requestParams['_omitsessionresults'] = 'false'
requestParams['_debug'] = 131
}
return requestParams
}
private parseSASVIYADebugResponse(response: string) {
return new Promise((resolve, reject) => {
const iframeStart = response.split(
'<iframe style="width: 99%; height: 500px" src="'
)[1]
const jsonUrl = iframeStart ? iframeStart.split('"></iframe>')[0] : null
if (jsonUrl) {
fetch(this.sasjsConfig.serverUrl + jsonUrl)
.then((res) => res.text())
.then((resText) => {
resolve(resText)
})
} else {
reject('No debug info found in response.')
}
})
}
private async getJobUri(sasJob: string) {
if (!this.sasViyaApiClient) return ''
let uri = ''
let folderPath
let jobName: string
if (isRelativePath(sasJob)) {
folderPath = sasJob.split('/')[0]
jobName = sasJob.split('/')[1]
} else {
const folderPathParts = sasJob.split('/')
jobName = folderPathParts.pop() || ''
folderPath = folderPathParts.join('/')
}
const locJobs = await this.sasViyaApiClient.getJobsInFolder(folderPath)
if (locJobs) {
const job = locJobs.find(
(el: any) => el.name === jobName && el.contentType === 'jobDefinition'
)
if (job) {
uri = job.uri
}
}
return uri
}
private parseSAS9ErrorResponse(response: string) {
const logLines = response.split('\n')
const parsedLines: string[] = []
let firstErrorLineIndex: number = -1
logLines.map((line: string, index: number) => {
if (
line.toLowerCase().includes('error') &&
!line.toLowerCase().includes('this request completed with errors.') &&
firstErrorLineIndex === -1
) {
firstErrorLineIndex = index
}
})
for (let i = firstErrorLineIndex - 10; i <= firstErrorLineIndex + 10; i++) {
parsedLines.push(logLines[i])
}
return parsedLines.join(', ')
}
private parseLogFromResponse(response: any, program: string) {
if (this.sasjsConfig.serverType === ServerType.Sas9) {
this.appendSasjsRequest(response, program, null)
} else {
if (!this.sasjsConfig.debug) {
this.appendSasjsRequest(null, program, null)
} else {
this.appendSasjsRequest(response, program, null)
}
}
await this.webJobExecutor?.resendWaitingRequests()
await this.computeJobExecutor?.resendWaitingRequests()
await this.jesJobExecutor?.resendWaitingRequests()
}
/**
@@ -1267,89 +679,20 @@ export default class SASjs {
})
}
private async appendSasjsRequest(
response: any,
program: string,
pgmData: any
) {
let sourceCode = ''
let generatedCode = ''
let sasWork = null
if (response && response.result && response.log) {
sourceCode = parseSourceCode(response.log)
generatedCode = parseGeneratedCode(response.log)
if (this.sasjsConfig.debug) {
if (response.log) {
sasWork = response.log
} else {
sasWork = JSON.parse(parseWeboutResponse(response.result)).WORK
}
} else {
sasWork = JSON.parse(response.result).WORK
}
} else {
if (response) {
sourceCode = parseSourceCode(response)
generatedCode = parseGeneratedCode(response)
sasWork = await this.parseSasWork(response)
}
}
this.sasjsRequests.push({
logFile: (response && response.log) || response,
serviceLink: program,
timestamp: new Date(),
sourceCode,
generatedCode,
SASWORK: sasWork
})
if (this.sasjsRequests.length > 20) {
this.sasjsRequests.splice(0, 1)
}
}
private async parseSasWork(response: any) {
if (this.sasjsConfig.debug) {
let jsonResponse
if (this.sasjsConfig.serverType === ServerType.Sas9) {
try {
jsonResponse = JSON.parse(parseWeboutResponse(response))
} catch (e) {
console.error(e)
}
} else {
await this.parseSASVIYADebugResponse(response).then(
(resText: any) => {
try {
jsonResponse = JSON.parse(resText)
} catch (e) {
console.error(e)
}
},
(err: any) => {
console.error(err)
}
)
}
if (jsonResponse) {
return jsonResponse.WORK
}
}
return null
}
public getSasRequests() {
const sortedRequests = this.sasjsRequests.sort(compareTimestamps)
const requests = [
...this.webJobExecutor!.getRequests(),
...this.computeJobExecutor!.getRequests(),
...this.jesJobExecutor!.getRequests()
]
const sortedRequests = requests.sort(compareTimestamps)
return sortedRequests
}
public clearSasRequests() {
this.sasjsRequests = []
this.webJobExecutor!.clearRequests()
this.computeJobExecutor!.clearRequests()
this.jesJobExecutor!.clearRequests()
}
private setupConfiguration() {
@@ -1413,6 +756,19 @@ export default class SASjs {
this.jobsPath,
this.requestClient
)
this.webJobExecutor = new WebJobExecutor(
this.sasjsConfig.serverUrl,
this.sasjsConfig.serverType!,
this.jobsPath,
this.requestClient,
this.sasViyaApiClient!
)
this.computeJobExecutor = new ComputeJobExecutor(
this.sasjsConfig.serverUrl,
this.sasViyaApiClient!
)
}
private async createFoldersAndServices(