1
0
mirror of https://github.com/sasjs/adapter.git synced 2025-12-11 01:14:36 +00:00

Compare commits

...

49 Commits

Author SHA1 Message Date
Allan Bowe
507722da0d Merge pull request #465 from sasjs/stream-job-logs
feat(stream-logs): Save logs to file during job status poll
2021-07-21 18:49:50 +03:00
Krishna Acondy
c8e029cff4 chore(deps): bump utils 2021-07-21 08:37:45 +01:00
Krishna Acondy
7bd2e31f3b chore(cleanup): remove console logs 2021-07-21 08:13:45 +01:00
Krishna Acondy
cfa0c8b9af chore(refactor): only fetch job if streaming logs, fix tests, add JSDoc comments 2021-07-21 08:12:34 +01:00
Krishna Acondy
df9c1c643f chore(merge): pull in changes from master 2021-07-20 09:26:34 +01:00
Krishna Acondy
5c8d311ae8 chore(streamlog): optimise polling mechanism 2021-07-20 09:25:39 +01:00
Muhammad Saad
bec4180dcf Merge pull request #467 from sasjs/removed-url-package
fix: removed url package
2021-07-16 17:02:24 +05:00
Saad Jutt
1bb7807c25 chore(merge): Merge branch 'master' into removed-url-package 2021-07-16 04:12:20 +05:00
Allan Bowe
816f1d19d4 Merge pull request #471 from sasjs/windows-tests
Windows tests
2021-07-15 16:22:49 +03:00
d38d032309 chore: readme updates 2021-07-15 13:01:12 +02:00
d2a90c77fd chore: readme update 2021-07-15 10:57:44 +02:00
8a0f14b780 chore: windows fallback 2021-07-15 10:43:30 +02:00
f6cb2c4fac chore: sasjs-tests windows 2021-07-15 10:41:10 +02:00
Krishna Acondy
1594f0c7db chore(merge): pull in changes from master 2021-07-15 07:33:44 +01:00
Allan Bowe
7cb2a43f95 Merge pull request #462 from sasjs/json-fix
fix: invalid json checking
2021-07-14 20:46:16 +03:00
Allan Bowe
6e85c7a588 Merge branch 'master' into json-fix 2021-07-14 20:44:06 +03:00
Allan Bowe
a68f6962fd Merge pull request #450 from sasjs/allanbowe-patch-1
Update README.md
2021-07-14 20:42:09 +03:00
Allan Bowe
a650ba15dd Merge branch 'master' into allanbowe-patch-1 2021-07-14 20:41:51 +03:00
Allan Bowe
6ca1b489fc Merge pull request #466 from sasjs/session-state-fix
fix(session): provide more info if could not get session state
2021-07-14 20:41:25 +03:00
Yury Shkoda
a5c9f11c75 test(session): cover case when could not get session state 2021-07-14 14:17:20 +03:00
Krishna Acondy
1ff3937d11 chore(deps): update dependencies 2021-07-14 08:03:54 +01:00
Krishna Acondy
d4725d2e54 chore(refactor): change property name in PollOptions 2021-07-14 07:50:25 +01:00
Saad Jutt
db578564ba fix: removed url package 2021-07-13 17:11:49 +05:00
Yury Shkoda
d4ebef4290 fix(session): provide more info if could not get session state 2021-07-13 14:50:46 +03:00
Krishna Acondy
b9f368193d chore(refactor): add more tests 2021-07-13 08:12:15 +01:00
Krishna Acondy
4257ec78aa chore(ci): add coverage report to build workflow 2021-07-12 20:45:09 +01:00
Krishna Acondy
a0fbe1a740 chore(ci): add coverage report action 2021-07-12 20:42:49 +01:00
Krishna Acondy
123b9fb535 chore(refactor): split up and add tests for core functionality 2021-07-12 20:31:17 +01:00
Krishna Acondy
f57c7b8f7d chore(deps): up utils version 2021-07-12 20:30:42 +01:00
89590f9a37 chore: only removed 2021-07-12 14:42:19 +02:00
5d61bebc9e fix: isValidJson function returns the JSON parsed 2021-07-12 14:29:43 +02:00
99afa6e7e4 style: lint 2021-07-12 12:58:27 +02:00
b590a9f41b chore(tests): testing the isValidJson function 2021-07-12 12:58:04 +02:00
4466ee30d2 chore: preventing double parse of invalid json check 2021-07-12 11:02:01 +02:00
db372950b4 chore: type fix 2021-07-09 15:17:33 +02:00
46f5e07f11 fix: invalid json checking 2021-07-09 13:36:12 +02:00
Krishna Acondy
1c90f4f455 chore(*): remove log 2021-07-09 09:29:57 +01:00
Krishna Acondy
0114a80e38 chore(execute): add tests for executeScript 2021-07-09 09:17:49 +01:00
Krishna Acondy
13be2f9c70 chore(*): remove unused dependencies and variables, fix imports 2021-07-09 09:17:26 +01:00
Krishna Acondy
e396091aa7 chore(merge): pull in changes from master 2021-07-08 09:04:49 +01:00
Krishna Acondy
a00cb1ebec Merge pull request #461 from sasjs/fix-utils-imports
fix(imports): change imports from main barrel into internal barrels
2021-07-08 08:58:14 +01:00
Krishna Acondy
7b1264d140 fix(imports): change imports from main barrel into internal barrels 2021-07-08 08:46:28 +01:00
Krishna Acondy
04ccbf6843 feat(log): write logs to file when polling for job status 2021-07-07 10:02:14 +01:00
Allan Bowe
369b9fb023 Merge branch 'master' into allanbowe-patch-1 2021-07-05 12:02:01 +03:00
Allan Bowe
76487b00e9 Merge pull request #453 from sasjs/support-contribute-on-windows
fix: updated 'prepare' + using copyfiles instead of cp
2021-07-05 12:01:38 +03:00
Saad Jutt
2d0515e25b chore(merge): Merged master branch 2021-07-05 13:17:08 +05:00
Saad Jutt
b132b99586 fix: globstars support on mac with copyfiles 2021-07-05 13:11:40 +05:00
Saad Jutt
6cac008b61 fix: updated 'prepare' + using copyfiles instead of cp 2021-07-05 04:06:04 +05:00
Allan Bowe
929ec6eb1c Update README.md 2021-07-02 12:50:03 +03:00
37 changed files with 4554 additions and 5185 deletions

View File

@@ -27,6 +27,10 @@ jobs:
run: npm run lint
- name: Run unit tests
run: npm test
- name: Generate coverage report
uses: artiomtr/jest-coverage-report-action@v2.0-rc.2
with:
github-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build Package
run: npm run package:lib
env:

View File

@@ -194,6 +194,8 @@ Note - to use the web approach, the `useComputeApi` property must be `undefined`
### Using the JES API
Here we are running Jobs using the Job Execution Service except this time we are making the requests directly using the REST API instead of through the JES Web App. This is helpful when we need to call web services outside of a browser (eg with the SASjs CLI or other commandline tools). To save one network request, the adapter prefetches the JOB URIs and passes them in the `__job` parameter. Depending on your network bandwidth, it may or may not be faster than the JES Web approach.
This approach (`useComputeApi: false`) also ensures that jobs are displayed in Environment Manager.
```
{
appLoc:"/Your/Path",
@@ -206,6 +208,8 @@ Here we are running Jobs using the Job Execution Service except this time we are
### Using the Compute API
This approach is by far the fastest, as a result of the optimisations we have built into the adapter. With this configuration, in the first sasjs request, we take a URI map of the services in the target folder, and create a session manager. This manager will spawn a additional session every time a request is made. Subsequent requests will use the existing 'hot' session, if it exists. Sessions are always deleted after every use, which actually makes this _less_ resource intensive than a typical JES web app, in which all sessions are kept alive by default for 15 minutes.
With this approach (`useComputeApi: true`), the requests/logs will _not_ appear in the list in Environment manager.
```
{
appLoc:"/Your/Path",

6681
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -3,8 +3,8 @@
"description": "JavaScript adapter for SAS",
"homepage": "https://adapter.sasjs.io",
"scripts": {
"build": "rimraf build && rimraf node && mkdir node && cp -r src/* node && webpack && rimraf build/src && rimraf node",
"package:lib": "npm run build && cp ./package.json build && cd build && npm version \"5.0.0\" && npm pack",
"build": "rimraf build && rimraf node && mkdir node && copyfiles -u 1 \"./src/**/*\" ./node && webpack && rimraf build/src && rimraf node",
"package:lib": "npm run build && copyfiles ./package.json build && cd build && npm version \"5.0.0\" && npm pack",
"publish:lib": "npm run build && cd build && npm publish",
"lint:fix": "npx prettier --write \"src/**/*.{ts,tsx,js,jsx,html,css,sass,less,json,yml,md,graphql}\" && npx prettier --write \"sasjs-tests/src/**/*.{ts,tsx,js,jsx,html,css,sass,less,json,yml,md,graphql}\"",
"lint": "npx prettier --check \"src/**/*.{ts,tsx,js,jsx,html,css,sass,less,json,yml,md,graphql}\" && npx prettier --check \"sasjs-tests/src/**/*.{ts,tsx,js,jsx,html,css,sass,less,json,yml,md,graphql}\"",
@@ -13,7 +13,7 @@
"postpublish": "git clean -fd",
"semantic-release": "semantic-release",
"typedoc": "typedoc",
"prepare": "[ -d .git ] && git config core.hooksPath ./.git-hooks || true"
"prepare": "git rev-parse --git-dir && git config core.hooksPath ./.git-hooks && git config core.autocrlf false || true"
},
"publishConfig": {
"access": "public"
@@ -38,14 +38,16 @@
},
"license": "ISC",
"devDependencies": {
"@types/jest": "^26.0.23",
"@types/axios": "^0.14.0",
"@types/form-data": "^2.5.0",
"@types/jest": "^26.0.24",
"@types/mime": "^2.0.3",
"@types/tough-cookie": "^4.0.0",
"@types/tough-cookie": "^4.0.1",
"copyfiles": "^2.4.1",
"cp": "^0.2.0",
"dotenv": "^10.0.0",
"jest": "^27.0.6",
"jest-extended": "^0.11.5",
"mime": "^2.5.2",
"node-polyfill-webpack-plugin": "^1.1.4",
"path": "^0.12.7",
"process": "^0.11.10",
@@ -56,21 +58,20 @@
"ts-loader": "^9.2.2",
"tslint": "^6.1.3",
"tslint-config-prettier": "^1.18.0",
"typedoc": "^0.21.2",
"typedoc": "^0.21.4",
"typedoc-neo-theme": "^1.1.1",
"typedoc-plugin-external-module-name": "^4.0.6",
"typescript": "^4.3.4",
"webpack": "^5.41.1",
"typescript": "^4.3.5",
"webpack": "^5.44.0",
"webpack-cli": "^4.7.2"
},
"main": "index.js",
"dependencies": {
"@sasjs/utils": "^2.22.0",
"@sasjs/utils": "^2.25.4",
"axios": "^0.21.1",
"axios-cookiejar-support": "^1.0.1",
"form-data": "^4.0.0",
"https": "^1.0.0",
"tough-cookie": "^4.0.0",
"url": "^0.11.0"
"tough-cookie": "^4.0.0"
}
}

View File

@@ -41,6 +41,19 @@ So you can run the script like so:
SSH_ACCOUNT=me@my-sas-server.com DEPLOY_PATH=/var/www/html/my-folder/sasjs-tests npm run deploy
```
If you are on `WINDOWS`, you will first need to install one dependency:
```bash
npm i -g copyfiles
```
and then run to build:
```bash
npm run update:adapter && npm run build
```
when it finishes run to deploy:
```bash
scp -rp ./build/* me@my-sas-server.com:/var/www/html/my-folder/sasjs-tests
```
If you'd like to deploy just `sasjs-tests` without changing the adapter version, you can use the `deploy:tests` script, while also setting the same environment variables as above.
## 3. Creating the required SAS services

View File

@@ -23,7 +23,8 @@
"test": "react-scripts test",
"eject": "react-scripts eject",
"update:adapter": "cd .. && npm run package:lib && cd sasjs-tests && npm i ../build/sasjs-adapter-5.0.0.tgz",
"deploy:tests": "rsync -avhe ssh ./build/* --delete $SSH_ACCOUNT:$DEPLOY_PATH",
"deploy:tests": "rsync -avhe ssh ./build/* --delete $SSH_ACCOUNT:$DEPLOY_PATH || npm run deploy:tests-win",
"deploy:tests-win": "scp %DEPLOY_PATH% ./build/*",
"deploy": "npm run update:adapter && npm run build && npm run deploy:tests"
},
"eslintConfig": {

View File

@@ -1,10 +1,4 @@
import {
convertToCSV,
isRelativePath,
isUri,
isUrl,
fetchLogByChunks
} from './utils'
import { isRelativePath, isUri, isUrl } from './utils'
import * as NodeFormData from 'form-data'
import {
Job,
@@ -17,28 +11,19 @@ import {
JobDefinition,
PollOptions
} from './types'
import {
ComputeJobExecutionError,
JobExecutionError,
NotFoundError
} from './types/errors'
import { formatDataForRequest } from './utils/formatDataForRequest'
import { JobExecutionError } from './types/errors'
import { SessionManager } from './SessionManager'
import { ContextManager } from './ContextManager'
import {
timestampToYYYYMMDDHHMMSS,
isAccessTokenExpiring,
isRefreshTokenExpiring,
Logger,
LogLevel,
SasAuthResponse,
MacroVar,
AuthConfig
} from '@sasjs/utils'
import { SasAuthResponse, MacroVar, AuthConfig } from '@sasjs/utils/types'
import { isAuthorizeFormRequired } from './auth/isAuthorizeFormRequired'
import { RequestClient } from './request/RequestClient'
import { prefixMessage } from '@sasjs/utils/error'
import * as mime from 'mime'
import { pollJobState } from './api/viya/pollJobState'
import { getTokens } from './auth/getTokens'
import { uploadTables } from './api/viya/uploadTables'
import { executeScript } from './api/viya/executeScript'
import { getAccessToken } from './auth/getAccessToken'
import { refreshTokens } from './auth/refreshTokens'
/**
* A client for interfacing with the SAS Viya REST API.
@@ -174,13 +159,6 @@ export class SASViyaApiClient {
throw new Error(`Execution context ${contextName} not found.`)
}
const createSessionRequest = {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json'
}
}
const { result: createdSession } = await this.requestClient.post<Session>(
`/compute/contexts/${executionContext.id}/sessions`,
{},
@@ -295,249 +273,22 @@ export class SASViyaApiClient {
printPid = false,
variables?: MacroVar
): Promise<any> {
let access_token = (authConfig || {}).access_token
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
}
const logger = process.logger || console
try {
let executionSessionId: string
const session = await this.sessionManager
.getSession(access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while getting session. ')
})
executionSessionId = session!.id
if (printPid) {
const { result: jobIdVariable } = await this.sessionManager
.getVariable(executionSessionId, 'SYSJOBID', access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while getting session variable. ')
})
if (jobIdVariable && jobIdVariable.value) {
const relativeJobPath = this.rootFolderName
? jobPath.split(this.rootFolderName).join('').replace(/^\//, '')
: jobPath
const logger = new Logger(debug ? LogLevel.Debug : LogLevel.Info)
logger.info(
`Triggered '${relativeJobPath}' with PID ${
jobIdVariable.value
} at ${timestampToYYYYMMDDHHMMSS()}`
)
}
}
const jobArguments: { [key: string]: any } = {
_contextName: contextName,
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: true,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: true
}
if (debug) {
jobArguments['_OMITTEXTLOG'] = false
jobArguments['_OMITSESSIONRESULTS'] = false
}
let fileName
if (isRelativePath(jobPath)) {
fileName = `exec-${
jobPath.includes('/') ? jobPath.split('/')[1] : jobPath
}`
} else {
const jobPathParts = jobPath.split('/')
fileName = jobPathParts.pop()
}
let jobVariables: any = {
SYS_JES_JOB_URI: '',
_program: isRelativePath(jobPath)
? this.rootFolderName + '/' + jobPath
: jobPath
}
if (variables) jobVariables = { ...jobVariables, ...variables }
if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 }
let files: any[] = []
if (data) {
if (JSON.stringify(data).includes(';')) {
files = await this.uploadTables(data, access_token).catch((err) => {
throw prefixMessage(err, 'Error while uploading tables. ')
})
jobVariables['_webin_file_count'] = files.length
files.forEach((fileInfo, index) => {
jobVariables[
`_webin_fileuri${index + 1}`
] = `/files/files/${fileInfo.file.id}`
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName
})
} else {
jobVariables = { ...jobVariables, ...formatDataForRequest(data) }
}
}
// Execute job in session
const jobRequestBody = {
name: fileName,
description: 'Powered by SASjs',
code: linesOfCode,
variables: jobVariables,
arguments: jobArguments
}
const { result: postedJob, etag } = await this.requestClient
.post<Job>(
`/compute/sessions/${executionSessionId}/jobs`,
jobRequestBody,
access_token
)
.catch((err) => {
throw prefixMessage(err, 'Error while posting job. ')
})
if (!waitForResult) return session
if (debug) {
logger.info(`Job has been submitted for '${fileName}'.`)
logger.info(
`You can monitor the job progress at '${this.serverUrl}${
postedJob.links.find((l: any) => l.rel === 'state')!.href
}'.`
)
}
const jobStatus = await this.pollJobState(
postedJob,
etag,
authConfig,
pollOptions
).catch(async (err) => {
const error = err?.response?.data
const result = /err=[0-9]*,/.exec(error)
const errorCode = '5113'
if (result?.[0]?.slice(4, -1) === errorCode) {
const sessionLogUrl =
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
const logCount = 1000000
err.log = await fetchLogByChunks(
this.requestClient,
access_token!,
sessionLogUrl,
logCount
)
}
throw prefixMessage(err, 'Error while polling job status. ')
})
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
}
const { result: currentJob } = await this.requestClient
.get<Job>(
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
access_token
)
.catch((err) => {
throw prefixMessage(err, 'Error while getting job. ')
})
let jobResult
let log = ''
const logLink = currentJob.links.find((l) => l.rel === 'log')
if (debug && logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
this.requestClient,
access_token!,
logUrl,
logCount
)
}
if (jobStatus === 'failed' || jobStatus === 'error') {
return Promise.reject(new ComputeJobExecutionError(currentJob, log))
}
let resultLink
if (expectWebout) {
resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
} else {
return { job: currentJob, log }
}
if (resultLink) {
jobResult = await this.requestClient
.get<any>(resultLink, access_token, 'text/plain')
.catch(async (e) => {
if (e instanceof NotFoundError) {
if (logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
this.requestClient,
access_token!,
logUrl,
logCount
)
return Promise.reject({
status: 500,
log
})
}
}
return {
result: JSON.stringify(e)
}
})
}
await this.sessionManager
.clearSession(executionSessionId, access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while clearing session. ')
})
return { result: jobResult?.result, log }
} catch (e) {
if (e && e.status === 404) {
return this.executeScript(
jobPath,
linesOfCode,
contextName,
authConfig,
data,
debug,
false,
true
)
} else {
throw prefixMessage(e, 'Error while executing script. ')
}
}
return executeScript(
this.requestClient,
this.sessionManager,
this.rootFolderName,
jobPath,
linesOfCode,
contextName,
authConfig,
data,
debug,
expectWebout,
waitForResult,
pollOptions,
printPid,
variables
)
}
/**
@@ -584,9 +335,6 @@ export class SASViyaApiClient {
const formData = new NodeFormData()
formData.append('file', contentBuffer, fileName)
const mimeType =
mime.getType(fileName.match(/\.[0-9a-z]+$/i)?.[0] || '') ?? 'text/plain'
return (
await this.requestClient.post<File>(
`/files/files?parentFolderUri=${parentFolderUri}&typeDefName=file#rawUpload`,
@@ -772,37 +520,7 @@ export class SASViyaApiClient {
clientSecret: string,
authCode: string
): Promise<SasAuthResponse> {
const url = this.serverUrl + '/SASLogon/oauth/token'
let token
if (typeof Buffer === 'undefined') {
token = btoa(clientId + ':' + clientSecret)
} else {
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
}
const headers = {
Authorization: 'Basic ' + token
}
let formData
if (typeof FormData === 'undefined') {
formData = new NodeFormData()
} else {
formData = new FormData()
}
formData.append('grant_type', 'authorization_code')
formData.append('code', authCode)
const authResponse = await this.requestClient
.post(
url,
formData,
undefined,
'multipart/form-data; boundary=' + (formData as any)._boundary,
headers
)
.then((res) => res.result as SasAuthResponse)
return authResponse
return getAccessToken(this.requestClient, clientId, clientSecret, authCode)
}
/**
@@ -816,39 +534,12 @@ export class SASViyaApiClient {
clientSecret: string,
refreshToken: string
) {
const url = this.serverUrl + '/SASLogon/oauth/token'
let token
if (typeof Buffer === 'undefined') {
token = btoa(clientId + ':' + clientSecret)
} else {
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
}
const headers = {
Authorization: 'Basic ' + token
}
let formData
if (typeof FormData === 'undefined') {
formData = new NodeFormData()
formData.append('grant_type', 'refresh_token')
formData.append('refresh_token', refreshToken)
} else {
formData = new FormData()
formData.append('grant_type', 'refresh_token')
formData.append('refresh_token', refreshToken)
}
const authResponse = await this.requestClient
.post<SasAuthResponse>(
url,
formData,
undefined,
'multipart/form-data; boundary=' + (formData as any)._boundary,
headers
)
.then((res) => res.result)
return authResponse
return refreshTokens(
this.requestClient,
clientId,
clientSecret,
refreshToken
)
}
/**
@@ -895,7 +586,7 @@ export class SASViyaApiClient {
) {
let access_token = (authConfig || {}).access_token
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
;({ access_token } = await getTokens(this.requestClient, authConfig))
}
if (isRelativePath(sasJob) && !this.rootFolderName) {
@@ -991,7 +682,7 @@ export class SASViyaApiClient {
) {
let access_token = (authConfig || {}).access_token
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
;({ access_token } = await getTokens(this.requestClient, authConfig))
}
if (isRelativePath(sasJob) && !this.rootFolderName) {
throw new Error(
@@ -1063,18 +754,16 @@ export class SASViyaApiClient {
jobDefinition,
arguments: jobArguments
}
const { result: postedJob, etag } = await this.requestClient.post<Job>(
const { result: postedJob } = await this.requestClient.post<Job>(
`${this.serverUrl}/jobExecution/jobs?_action=wait`,
postJobRequestBody,
access_token
)
const jobStatus = await this.pollJobState(
postedJob,
etag,
authConfig
).catch((err) => {
throw prefixMessage(err, 'Error while polling job status. ')
})
const jobStatus = await this.pollJobState(postedJob, authConfig).catch(
(err) => {
throw prefixMessage(err, 'Error while polling job status. ')
}
)
const { result: currentJob } = await this.requestClient.get<Job>(
`${this.serverUrl}/jobExecution/jobs/${postedJob.id}`,
access_token
@@ -1140,157 +829,22 @@ export class SASViyaApiClient {
this.folderMap.set(path, itemsAtRoot)
}
// REFACTOR: set default value for 'pollOptions' attribute
private async pollJobState(
postedJob: any,
etag: string | null,
postedJob: Job,
authConfig?: AuthConfig,
pollOptions?: PollOptions
) {
const logger = process.logger || console
let POLL_INTERVAL = 300
let MAX_POLL_COUNT = 1000
let MAX_ERROR_COUNT = 5
let access_token = (authConfig || {}).access_token
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
}
if (pollOptions) {
POLL_INTERVAL = pollOptions.POLL_INTERVAL || POLL_INTERVAL
MAX_POLL_COUNT = pollOptions.MAX_POLL_COUNT || MAX_POLL_COUNT
}
let postedJobState = ''
let pollCount = 0
let errorCount = 0
const headers: any = {
'Content-Type': 'application/json',
'If-None-Match': etag
}
if (access_token) {
headers.Authorization = `Bearer ${access_token}`
}
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
Promise.reject(`Job state link was not found.`)
}
const { result: state } = await this.requestClient
.get<string>(
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
access_token,
'text/plain',
{},
this.debug
)
.catch((err) => {
console.error(
`Error fetching job state from ${this.serverUrl}${stateLink.href}. Starting poll, assuming job to be running.`,
err
)
return { result: 'unavailable' }
})
const currentState = state.trim()
if (currentState === 'completed') {
return Promise.resolve(currentState)
}
return new Promise(async (resolve, _) => {
let printedState = ''
const interval = setInterval(async () => {
if (
postedJobState === 'running' ||
postedJobState === '' ||
postedJobState === 'pending' ||
postedJobState === 'unavailable'
) {
if (authConfig) {
;({ access_token } = await this.getTokens(authConfig))
}
if (stateLink) {
const { result: jobState } = await this.requestClient
.get<string>(
`${this.serverUrl}${stateLink.href}?_action=wait&wait=300`,
access_token,
'text/plain',
{},
this.debug
)
.catch((err) => {
errorCount++
if (
pollCount >= MAX_POLL_COUNT ||
errorCount >= MAX_ERROR_COUNT
) {
throw prefixMessage(
err,
'Error while getting job state after interval. '
)
}
console.error(
`Error fetching job state from ${this.serverUrl}${stateLink.href}. Resuming poll, assuming job to be running.`,
err
)
return { result: 'unavailable' }
})
postedJobState = jobState.trim()
if (postedJobState != 'unavailable' && errorCount > 0) {
errorCount = 0
}
if (this.debug && printedState !== postedJobState) {
logger.info('Polling job status...')
logger.info(`Current job state: ${postedJobState}`)
printedState = postedJobState
}
pollCount++
if (pollCount >= MAX_POLL_COUNT) {
resolve(postedJobState)
}
}
} else {
clearInterval(interval)
resolve(postedJobState)
}
}, POLL_INTERVAL)
})
return pollJobState(
this.requestClient,
postedJob,
this.debug,
authConfig,
pollOptions
)
}
private async uploadTables(data: any, accessToken?: string) {
const uploadedFiles = []
const headers: any = {
'Content-Type': 'application/json'
}
if (accessToken) {
headers.Authorization = `Bearer ${accessToken}`
}
for (const tableName in data) {
const csv = convertToCSV(data[tableName])
if (csv === 'ERROR: LARGE STRING LENGTH') {
throw new Error(
'The max length of a string value in SASjs is 32765 characters.'
)
}
const uploadResponse = await this.requestClient
.uploadFile(`${this.serverUrl}/files/files#rawUpload`, csv, accessToken)
.catch((err) => {
throw prefixMessage(err, 'Error while uploading file. ')
})
uploadedFiles.push({ tableName, file: uploadResponse.result })
}
return uploadedFiles
return uploadTables(this.requestClient, data, accessToken)
}
private async getFolderDetails(
@@ -1379,14 +933,6 @@ export class SASViyaApiClient {
? sourceFolder
: await this.getFolderUri(sourceFolder, accessToken)
const requestInfo = {
method: 'GET',
headers: {
'Content-Type': 'application/json',
Authorization: 'Bearer ' + accessToken
}
}
const { result: members } = await this.requestClient.get<{ items: any[] }>(
`${this.serverUrl}${sourceFolderUri}/members?limit=${limit}`,
accessToken
@@ -1493,21 +1039,4 @@ export class SASViyaApiClient {
return movedFolder
}
private async getTokens(authConfig: AuthConfig): Promise<AuthConfig> {
const logger = process.logger || console
let { access_token, refresh_token, client, secret } = authConfig
if (
isAccessTokenExpiring(access_token) ||
isRefreshTokenExpiring(refresh_token)
) {
logger.info('Refreshing access and refresh tokens.')
;({ access_token, refresh_token } = await this.refreshTokens(
client,
secret,
refresh_token
))
}
return { access_token, refresh_token, client, secret }
}
}

View File

@@ -4,7 +4,12 @@ import { SASViyaApiClient } from './SASViyaApiClient'
import { SAS9ApiClient } from './SAS9ApiClient'
import { FileUploader } from './FileUploader'
import { AuthManager } from './auth'
import { ServerType, MacroVar, AuthConfig } from '@sasjs/utils/types'
import {
ServerType,
MacroVar,
AuthConfig,
ExtraResponseAttributes
} from '@sasjs/utils/types'
import { RequestClient } from './request/RequestClient'
import {
JobExecutor,
@@ -14,7 +19,6 @@ import {
Sas9JobExecutor
} from './job-execution'
import { ErrorResponse } from './types/errors'
import { ExtraResponseAttributes } from '@sasjs/utils/types'
const defaultConfig: SASjsConfig = {
serverUrl: '',

View File

@@ -1,4 +1,5 @@
import { Session, Context, CsrfToken, SessionVariable } from './types'
import { Session, Context, SessionVariable } from './types'
import { NoSessionStateError } from './types/errors'
import { asyncForEach, isUrl } from './utils'
import { prefixMessage } from '@sasjs/utils/error'
import { RequestClient } from './request/RequestClient'
@@ -173,13 +174,14 @@ export class SessionManager {
this.printedSessionState.printed = true
}
const state = await this.getSessionState(
`${this.serverUrl}${stateLink.href}?wait=30`,
etag!,
accessToken
).catch((err) => {
throw prefixMessage(err, 'Error while getting session state.')
})
const { result: state, responseStatus: responseStatus } =
await this.getSessionState(
`${this.serverUrl}${stateLink.href}?wait=30`,
etag!,
accessToken
).catch((err) => {
throw prefixMessage(err, 'Error while getting session state.')
})
sessionState = state.trim()
@@ -198,7 +200,14 @@ export class SessionManager {
resolve(this.waitForSession(session, etag, accessToken))
} else {
reject('Could not get session state.')
reject(
new NoSessionStateError(
responseStatus,
this.serverUrl + stateLink.href,
session.links.find((l: any) => l.rel === 'log')
?.href as string
)
)
}
}
@@ -217,7 +226,10 @@ export class SessionManager {
) {
return await this.requestClient
.get(url, accessToken, 'text/plain', { 'If-None-Match': etag })
.then((res) => res.result as string)
.then((res) => ({
result: res.result as string,
responseStatus: res.status
}))
.catch((err) => {
throw err
})

View File

@@ -0,0 +1,293 @@
import { timestampToYYYYMMDDHHMMSS } from '@sasjs/utils/time'
import { AuthConfig, MacroVar } from '@sasjs/utils/types'
import { prefixMessage } from '@sasjs/utils/error'
import {
PollOptions,
Job,
ComputeJobExecutionError,
NotFoundError
} from '../..'
import { getTokens } from '../../auth/getTokens'
import { RequestClient } from '../../request/RequestClient'
import { SessionManager } from '../../SessionManager'
import { isRelativePath, fetchLogByChunks } from '../../utils'
import { formatDataForRequest } from '../../utils/formatDataForRequest'
import { pollJobState } from './pollJobState'
import { uploadTables } from './uploadTables'
/**
* Executes code on the current SAS Viya server.
* @param jobPath - the path to the file being submitted for execution.
* @param linesOfCode - an array of code lines to execute.
* @param contextName - the context to execute the code in.
* @param authConfig - an object containing an access token, refresh token, client ID and secret.
* @param data - execution data.
* @param debug - when set to true, the log will be returned.
* @param expectWebout - when set to true, the automatic _webout fileref will be checked for content, and that content returned. This fileref is used when the Job contains a SASjs web request (as opposed to executing arbitrary SAS code).
* @param waitForResult - when set to true, function will return the session
* @param pollOptions - an object that represents poll interval(milliseconds) and maximum amount of attempts. Object example: { MAX_POLL_COUNT: 24 * 60 * 60, POLL_INTERVAL: 1000 }.
* @param printPid - a boolean that indicates whether the function should print (PID) of the started job.
* @param variables - an object that represents macro variables.
*/
export async function executeScript(
requestClient: RequestClient,
sessionManager: SessionManager,
rootFolderName: string,
jobPath: string,
linesOfCode: string[],
contextName: string,
authConfig?: AuthConfig,
data: any = null,
debug: boolean = false,
expectWebout = false,
waitForResult = true,
pollOptions?: PollOptions,
printPid = false,
variables?: MacroVar
): Promise<any> {
let access_token = (authConfig || {}).access_token
if (authConfig) {
;({ access_token } = await getTokens(requestClient, authConfig))
}
const logger = process.logger || console
try {
let executionSessionId: string
const session = await sessionManager
.getSession(access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while getting session. ')
})
executionSessionId = session!.id
if (printPid) {
const { result: jobIdVariable } = await sessionManager
.getVariable(executionSessionId, 'SYSJOBID', access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while getting session variable. ')
})
if (jobIdVariable && jobIdVariable.value) {
const relativeJobPath = rootFolderName
? jobPath.split(rootFolderName).join('').replace(/^\//, '')
: jobPath
const logger = process.logger || console
logger.info(
`Triggered '${relativeJobPath}' with PID ${
jobIdVariable.value
} at ${timestampToYYYYMMDDHHMMSS()}`
)
}
}
const jobArguments: { [key: string]: any } = {
_contextName: contextName,
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: true,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: true
}
if (debug) {
jobArguments['_OMITTEXTLOG'] = false
jobArguments['_OMITSESSIONRESULTS'] = false
}
let fileName
if (isRelativePath(jobPath)) {
fileName = `exec-${
jobPath.includes('/') ? jobPath.split('/')[1] : jobPath
}`
} else {
const jobPathParts = jobPath.split('/')
fileName = jobPathParts.pop()
}
let jobVariables: any = {
SYS_JES_JOB_URI: '',
_program: isRelativePath(jobPath)
? rootFolderName + '/' + jobPath
: jobPath
}
if (variables) jobVariables = { ...jobVariables, ...variables }
if (debug) jobVariables = { ...jobVariables, _DEBUG: 131 }
let files: any[] = []
if (data) {
if (JSON.stringify(data).includes(';')) {
files = await uploadTables(requestClient, data, access_token).catch(
(err) => {
throw prefixMessage(err, 'Error while uploading tables. ')
}
)
jobVariables['_webin_file_count'] = files.length
files.forEach((fileInfo, index) => {
jobVariables[
`_webin_fileuri${index + 1}`
] = `/files/files/${fileInfo.file.id}`
jobVariables[`_webin_name${index + 1}`] = fileInfo.tableName
})
} else {
jobVariables = { ...jobVariables, ...formatDataForRequest(data) }
}
}
// Execute job in session
const jobRequestBody = {
name: fileName,
description: 'Powered by SASjs',
code: linesOfCode,
variables: jobVariables,
arguments: jobArguments
}
const { result: postedJob, etag } = await requestClient
.post<Job>(
`/compute/sessions/${executionSessionId}/jobs`,
jobRequestBody,
access_token
)
.catch((err) => {
throw prefixMessage(err, 'Error while posting job. ')
})
if (!waitForResult) return session
if (debug) {
logger.info(`Job has been submitted for '${fileName}'.`)
logger.info(
`You can monitor the job progress at '${requestClient.getBaseUrl()}${
postedJob.links.find((l: any) => l.rel === 'state')!.href
}'.`
)
}
const jobStatus = await pollJobState(
requestClient,
postedJob,
debug,
authConfig,
pollOptions
).catch(async (err) => {
const error = err?.response?.data
const result = /err=[0-9]*,/.exec(error)
const errorCode = '5113'
if (result?.[0]?.slice(4, -1) === errorCode) {
const sessionLogUrl =
postedJob.links.find((l: any) => l.rel === 'up')!.href + '/log'
const logCount = 1000000
err.log = await fetchLogByChunks(
requestClient,
access_token!,
sessionLogUrl,
logCount
)
}
throw prefixMessage(err, 'Error while polling job status. ')
})
if (authConfig) {
;({ access_token } = await getTokens(requestClient, authConfig))
}
const { result: currentJob } = await requestClient
.get<Job>(
`/compute/sessions/${executionSessionId}/jobs/${postedJob.id}`,
access_token
)
.catch((err) => {
throw prefixMessage(err, 'Error while getting job. ')
})
let jobResult
let log = ''
const logLink = currentJob.links.find((l) => l.rel === 'log')
if (debug && logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
requestClient,
access_token!,
logUrl,
logCount
)
}
if (jobStatus === 'failed' || jobStatus === 'error') {
throw new ComputeJobExecutionError(currentJob, log)
}
if (!expectWebout) {
return { job: currentJob, log }
}
const resultLink = `/compute/sessions/${executionSessionId}/filerefs/_webout/content`
jobResult = await requestClient
.get<any>(resultLink, access_token, 'text/plain')
.catch(async (e) => {
if (e instanceof NotFoundError) {
if (logLink) {
const logUrl = `${logLink.href}/content`
const logCount = currentJob.logStatistics?.lineCount ?? 1000000
log = await fetchLogByChunks(
requestClient,
access_token!,
logUrl,
logCount
)
return Promise.reject({
status: 500,
log
})
}
}
return {
result: JSON.stringify(e)
}
})
await sessionManager
.clearSession(executionSessionId, access_token)
.catch((err) => {
throw prefixMessage(err, 'Error while clearing session. ')
})
return { result: jobResult?.result, log }
} catch (e) {
if (e && e.status === 404) {
return executeScript(
requestClient,
sessionManager,
rootFolderName,
jobPath,
linesOfCode,
contextName,
authConfig,
data,
debug,
false,
true
)
} else {
throw prefixMessage(e, 'Error while executing script. ')
}
}
}

View File

@@ -0,0 +1,246 @@
import { AuthConfig } from '@sasjs/utils/types'
import { Job, PollOptions } from '../..'
import { getTokens } from '../../auth/getTokens'
import { RequestClient } from '../../request/RequestClient'
import { JobStatePollError } from '../../types/errors'
import { generateTimestamp } from '@sasjs/utils/time'
import { saveLog } from './saveLog'
import { createWriteStream } from '@sasjs/utils/file'
import { WriteStream } from 'fs'
import { Link } from '../../types'
export async function pollJobState(
requestClient: RequestClient,
postedJob: Job,
debug: boolean,
authConfig?: AuthConfig,
pollOptions?: PollOptions
) {
const logger = process.logger || console
let pollInterval = 300
let maxPollCount = 1000
if (pollOptions) {
pollInterval = pollOptions.pollInterval || pollInterval
maxPollCount = pollOptions.maxPollCount || maxPollCount
}
const stateLink = postedJob.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
let currentState = await getJobState(
requestClient,
postedJob,
'',
debug,
authConfig
).catch((err) => {
logger.error(
`Error fetching job state from ${stateLink.href}. Starting poll, assuming job to be running.`,
err
)
return 'unavailable'
})
let pollCount = 0
if (currentState === 'completed') {
return Promise.resolve(currentState)
}
let logFileStream
if (pollOptions?.streamLog) {
const logFileName = `${postedJob.name || 'job'}-${generateTimestamp()}.log`
const logFilePath = `${
pollOptions?.logFolderPath || process.cwd()
}/${logFileName}`
logFileStream = await createWriteStream(logFilePath)
}
let result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
authConfig,
pollOptions,
logFileStream
)
currentState = result.state
pollCount = result.pollCount
if (!needsRetry(currentState) || pollCount >= maxPollCount) {
return currentState
}
// If we get to this point, this is a long-running job that needs longer polling.
// We will resume polling with a bigger interval of 1 minute
let longJobPollOptions: PollOptions = {
maxPollCount: 24 * 60,
pollInterval: 60000,
streamLog: false
}
if (pollOptions) {
longJobPollOptions.streamLog = pollOptions.streamLog
longJobPollOptions.logFolderPath = pollOptions.logFolderPath
}
result = await doPoll(
requestClient,
postedJob,
currentState,
debug,
pollCount,
authConfig,
longJobPollOptions,
logFileStream
)
currentState = result.state
pollCount = result.pollCount
if (logFileStream) {
logFileStream.end()
}
return currentState
}
const getJobState = async (
requestClient: RequestClient,
job: Job,
currentState: string,
debug: boolean,
authConfig?: AuthConfig
) => {
const stateLink = job.links.find((l: any) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
if (needsRetry(currentState)) {
let tokens
if (authConfig) {
tokens = await getTokens(requestClient, authConfig)
}
const { result: jobState } = await requestClient
.get<string>(
`${stateLink.href}?_action=wait&wait=300`,
tokens?.access_token,
'text/plain',
{},
debug
)
.catch((err) => {
throw new JobStatePollError(job.id, err)
})
return jobState.trim()
} else {
return currentState
}
}
const needsRetry = (state: string) =>
state === 'running' ||
state === '' ||
state === 'pending' ||
state === 'unavailable'
const doPoll = async (
requestClient: RequestClient,
postedJob: Job,
currentState: string,
debug: boolean,
pollCount: number,
authConfig?: AuthConfig,
pollOptions?: PollOptions,
logStream?: WriteStream
): Promise<{ state: string; pollCount: number }> => {
let pollInterval = 300
let maxPollCount = 1000
let maxErrorCount = 5
let errorCount = 0
let state = currentState
let printedState = ''
let startLogLine = 0
const logger = process.logger || console
if (pollOptions) {
pollInterval = pollOptions.pollInterval || pollInterval
maxPollCount = pollOptions.maxPollCount || maxPollCount
}
const stateLink = postedJob.links.find((l: Link) => l.rel === 'state')
if (!stateLink) {
throw new Error(`Job state link was not found.`)
}
while (needsRetry(state) && pollCount <= 100 && pollCount <= maxPollCount) {
state = await getJobState(
requestClient,
postedJob,
state,
debug,
authConfig
).catch((err) => {
errorCount++
if (pollCount >= maxPollCount || errorCount >= maxErrorCount) {
throw err
}
logger.error(
`Error fetching job state from ${stateLink.href}. Resuming poll, assuming job to be running.`,
err
)
return 'unavailable'
})
pollCount++
if (pollOptions?.streamLog) {
const jobUrl = postedJob.links.find((l: Link) => l.rel === 'self')
const { result: job } = await requestClient.get<Job>(
jobUrl!.href,
authConfig?.access_token
)
const endLogLine = job.logStatistics?.lineCount ?? 1000000
await saveLog(
postedJob,
requestClient,
startLogLine,
endLogLine,
logStream,
authConfig?.access_token
)
startLogLine += endLogLine
}
if (debug && printedState !== state) {
logger.info('Polling job status...')
logger.info(`Current job state: ${state}`)
printedState = state
}
if (state != 'unavailable' && errorCount > 0) {
errorCount = 0
}
await delay(pollInterval)
}
return { state, pollCount }
}
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))

55
src/api/viya/saveLog.ts Normal file
View File

@@ -0,0 +1,55 @@
import { Job } from '../..'
import { RequestClient } from '../../request/RequestClient'
import { fetchLog } from '../../utils'
import { WriteStream } from 'fs'
import { writeStream } from './writeStream'
/**
* Appends logs to a supplied write stream.
* This is useful for getting quick feedback on longer running jobs.
* @param job - the job to fetch logs for
* @param requestClient - the pre-configured HTTP request client
* @param startLine - the line at which to start fetching the log
* @param endLine - the line at which to stop fetching the log
* @param logFileStream - the write stream to which the log is appended
* @accessToken - an optional access token for authentication/authorization
* The access token is not required when fetching logs from the browser.
*/
export async function saveLog(
job: Job,
requestClient: RequestClient,
startLine: number,
endLine: number,
logFileStream?: WriteStream,
accessToken?: string
) {
if (!accessToken) {
throw new Error(
`Logs for job ${job.id} cannot be fetched without a valid access token.`
)
}
if (!logFileStream) {
throw new Error(
`Logs for job ${job.id} cannot be written without a valid write stream.`
)
}
const logger = process.logger || console
const jobLogUrl = job.links.find((l) => l.rel === 'log')
if (!jobLogUrl) {
throw new Error(`Log URL for job ${job.id} was not found.`)
}
const log = await fetchLog(
requestClient,
accessToken,
`${jobLogUrl.href}/content`,
startLine,
endLine
)
logger.info(`Writing logs to ${logFileStream.path}`)
await writeStream(logFileStream, log || '')
}

View File

@@ -0,0 +1,675 @@
import { RequestClient } from '../../../request/RequestClient'
import { SessionManager } from '../../../SessionManager'
import { executeScript } from '../executeScript'
import { mockSession, mockAuthConfig, mockJob } from './mockResponses'
import * as pollJobStateModule from '../pollJobState'
import * as uploadTablesModule from '../uploadTables'
import * as getTokensModule from '../../../auth/getTokens'
import * as formatDataModule from '../../../utils/formatDataForRequest'
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
import { PollOptions } from '../../../types'
import { ComputeJobExecutionError, NotFoundError } from '../../../types/errors'
import { Logger, LogLevel } from '@sasjs/utils'
const sessionManager = new (<jest.Mock<SessionManager>>SessionManager)()
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
const defaultPollOptions: PollOptions = {
maxPollCount: 100,
pollInterval: 500,
streamLog: false
}
describe('executeScript', () => {
beforeEach(() => {
;(process as any).logger = new Logger(LogLevel.Off)
setupMocks()
})
it('should not try to get fresh tokens if an authConfig is not provided', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context'
)
expect(getTokensModule.getTokens).not.toHaveBeenCalled()
})
it('should try to get fresh tokens if an authConfig is provided', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context',
mockAuthConfig
)
expect(getTokensModule.getTokens).toHaveBeenCalledWith(
requestClient,
mockAuthConfig
)
})
it('should get a session from the session manager before executing', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context'
)
expect(sessionManager.getSession).toHaveBeenCalledWith(undefined)
})
it('should handle errors while getting a session', async () => {
jest
.spyOn(sessionManager, 'getSession')
.mockImplementation(() => Promise.reject('Test Error'))
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context'
).catch((e) => e)
expect(error).toContain('Error while getting session.')
})
it('should fetch the PID when printPid is true', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context',
mockAuthConfig,
null,
false,
false,
false,
defaultPollOptions,
true
)
expect(sessionManager.getVariable).toHaveBeenCalledWith(
mockSession.id,
'SYSJOBID',
mockAuthConfig.access_token
)
})
it('should handle errors while getting the job PID', async () => {
jest
.spyOn(sessionManager, 'getVariable')
.mockImplementation(() => Promise.reject('Test Error'))
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context',
mockAuthConfig,
null,
false,
false,
false,
defaultPollOptions,
true
).catch((e) => e)
expect(error).toContain('Error while getting session variable.')
})
it('should use the file upload approach when data contains semicolons', async () => {
jest
.spyOn(uploadTablesModule, 'uploadTables')
.mockImplementation(() =>
Promise.resolve([{ tableName: 'test', file: { id: 1 } }])
)
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context',
mockAuthConfig,
{ foo: 'bar;' },
false,
false,
false,
defaultPollOptions,
true
)
expect(uploadTablesModule.uploadTables).toHaveBeenCalledWith(
requestClient,
{ foo: 'bar;' },
mockAuthConfig.access_token
)
})
it('should format data as CSV when it does not contain semicolons', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put hello'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
false,
defaultPollOptions,
true
)
expect(formatDataModule.formatDataForRequest).toHaveBeenCalledWith({
foo: 'bar'
})
})
it('should submit a job for execution via the compute API', async () => {
jest
.spyOn(formatDataModule, 'formatDataForRequest')
.mockImplementation(() => ({ sasjs_tables: 'foo', sasjs0data: 'bar' }))
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
false,
defaultPollOptions,
true
)
expect(requestClient.post).toHaveBeenCalledWith(
`/compute/sessions/${mockSession.id}/jobs`,
{
name: 'exec-test',
description: 'Powered by SASjs',
code: ['%put "hello";'],
variables: {
SYS_JES_JOB_URI: '',
_program: 'test/test',
sasjs_tables: 'foo',
sasjs0data: 'bar'
},
arguments: {
_contextName: 'test context',
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: true,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: true
}
},
mockAuthConfig.access_token
)
})
it('should set the correct variables when debug is true', async () => {
jest
.spyOn(formatDataModule, 'formatDataForRequest')
.mockImplementation(() => ({ sasjs_tables: 'foo', sasjs0data: 'bar' }))
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
false,
defaultPollOptions,
true
)
expect(requestClient.post).toHaveBeenCalledWith(
`/compute/sessions/${mockSession.id}/jobs`,
{
name: 'exec-test',
description: 'Powered by SASjs',
code: ['%put "hello";'],
variables: {
SYS_JES_JOB_URI: '',
_program: 'test/test',
sasjs_tables: 'foo',
sasjs0data: 'bar',
_DEBUG: 131
},
arguments: {
_contextName: 'test context',
_OMITJSONLISTING: true,
_OMITJSONLOG: true,
_OMITSESSIONRESULTS: false,
_OMITTEXTLISTING: true,
_OMITTEXTLOG: false
}
},
mockAuthConfig.access_token
)
})
it('should handle errors during job submission', async () => {
jest
.spyOn(requestClient, 'post')
.mockImplementation(() => Promise.reject('Test Error'))
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
false,
defaultPollOptions,
true
).catch((e) => e)
expect(error).toContain('Error while posting job')
})
it('should immediately return the session when waitForResult is false', async () => {
const result = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
false,
defaultPollOptions,
true
)
expect(result).toEqual(mockSession)
})
it('should poll for job completion when waitForResult is true', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
true,
defaultPollOptions,
true
)
expect(pollJobStateModule.pollJobState).toHaveBeenCalledWith(
requestClient,
mockJob,
false,
mockAuthConfig,
defaultPollOptions
)
})
it('should handle general errors when polling for job status', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.reject('Poll Error'))
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(error).toContain('Error while polling job status.')
})
it('should fetch the log and append it to the error in case of a 5113 error code', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() =>
Promise.reject({ response: { data: 'err=5113,' } })
)
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
requestClient,
mockAuthConfig.access_token,
mockJob.links.find((l) => l.rel === 'up')!.href + '/log',
1000000
)
expect(error.log).toEqual('Test Log')
})
it('should fetch the logs for the job if debug is true and a log URL is available', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
true,
defaultPollOptions,
true
)
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
requestClient,
mockAuthConfig.access_token,
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
mockJob.logStatistics.lineCount
)
})
it('should not fetch the logs for the job if debug is false', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
false,
true,
defaultPollOptions,
true
)
expect(fetchLogsModule.fetchLogByChunks).not.toHaveBeenCalled()
})
it('should throw a ComputeJobExecutionError if the job has failed', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('failed'))
const error: ComputeJobExecutionError = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
requestClient,
mockAuthConfig.access_token,
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
mockJob.logStatistics.lineCount
)
expect(error).toBeInstanceOf(ComputeJobExecutionError)
expect(error.log).toEqual('Test Log')
expect(error.job).toEqual(mockJob)
})
it('should throw a ComputeJobExecutionError if the job has errored out', async () => {
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('error'))
const error: ComputeJobExecutionError = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
true,
false,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
requestClient,
mockAuthConfig.access_token,
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
mockJob.logStatistics.lineCount
)
expect(error).toBeInstanceOf(ComputeJobExecutionError)
expect(error.log).toEqual('Test Log')
expect(error.job).toEqual(mockJob)
})
it('should fetch the result if expectWebout is true', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
true,
true,
defaultPollOptions,
true
)
expect(requestClient.get).toHaveBeenCalledWith(
`/compute/sessions/${mockSession.id}/filerefs/_webout/content`,
mockAuthConfig.access_token,
'text/plain'
)
})
it('should fetch the logs if the webout file was not found', async () => {
jest.spyOn(requestClient, 'get').mockImplementation((url, ...rest) => {
if (url.includes('_webout')) {
return Promise.reject(new NotFoundError(url))
}
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
})
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
true,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(requestClient.get).toHaveBeenCalledWith(
`/compute/sessions/${mockSession.id}/filerefs/_webout/content`,
mockAuthConfig.access_token,
'text/plain'
)
expect(fetchLogsModule.fetchLogByChunks).toHaveBeenCalledWith(
requestClient,
mockAuthConfig.access_token,
mockJob.links.find((l) => l.rel === 'log')!.href + '/content',
mockJob.logStatistics.lineCount
)
expect(error.status).toEqual(500)
expect(error.log).toEqual('Test Log')
})
it('should clear the session after execution is complete', async () => {
await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
true,
true,
defaultPollOptions,
true
)
expect(sessionManager.clearSession).toHaveBeenCalledWith(
mockSession.id,
mockAuthConfig.access_token
)
})
it('should handle errors while clearing a session', async () => {
jest
.spyOn(sessionManager, 'clearSession')
.mockImplementation(() => Promise.reject('Clear Session Error'))
const error = await executeScript(
requestClient,
sessionManager,
'test',
'test',
['%put "hello";'],
'test context',
mockAuthConfig,
{ foo: 'bar' },
false,
true,
true,
defaultPollOptions,
true
).catch((e) => e)
expect(error).toContain('Error while clearing session.')
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../../request/RequestClient')
jest.mock('../../../SessionManager')
jest.mock('../../../auth/getTokens')
jest.mock('../pollJobState')
jest.mock('../uploadTables')
jest.mock('../../../utils/formatDataForRequest')
jest.mock('../../../utils/fetchLogByChunks')
jest
.spyOn(requestClient, 'post')
.mockImplementation(() => Promise.resolve({ result: mockJob, etag: '' }))
jest
.spyOn(requestClient, 'get')
.mockImplementation(() =>
Promise.resolve({ result: mockJob, etag: '', status: 200 })
)
jest
.spyOn(requestClient, 'delete')
.mockImplementation(() => Promise.resolve({ result: {}, etag: '' }))
jest
.spyOn(getTokensModule, 'getTokens')
.mockImplementation(() => Promise.resolve(mockAuthConfig))
jest
.spyOn(pollJobStateModule, 'pollJobState')
.mockImplementation(() => Promise.resolve('completed'))
jest
.spyOn(sessionManager, 'getVariable')
.mockImplementation(() =>
Promise.resolve({ result: { value: 'test' }, etag: 'test', status: 200 })
)
jest
.spyOn(sessionManager, 'getSession')
.mockImplementation(() => Promise.resolve(mockSession))
jest
.spyOn(sessionManager, 'clearSession')
.mockImplementation(() => Promise.resolve())
jest
.spyOn(formatDataModule, 'formatDataForRequest')
.mockImplementation(() => ({ sasjs_tables: 'test', sasjs0data: 'test' }))
jest
.spyOn(fetchLogsModule, 'fetchLogByChunks')
.mockImplementation(() => Promise.resolve('Test Log'))
}

View File

@@ -0,0 +1,73 @@
import { AuthConfig } from '@sasjs/utils/types'
import { Job, Session } from '../../../types'
export const mockSession: Session = {
id: 's35510n',
state: 'idle',
links: [],
attributes: {
sessionInactiveTimeout: 1
},
creationTimeStamp: new Date().valueOf().toString()
}
export const mockJob: Job = {
id: 'j0b',
name: 'test job',
uri: '/j0b',
createdBy: 'test user',
results: {
'_webout.json': 'test'
},
logStatistics: {
lineCount: 100,
modifiedTimeStamp: new Date().valueOf().toString()
},
links: [
{
rel: 'log',
href: '/log',
method: 'GET',
type: 'log',
uri: 'log'
},
{
rel: 'self',
href: '/job',
method: 'GET',
type: 'job',
uri: 'job'
},
{
rel: 'state',
href: '/state',
method: 'GET',
type: 'state',
uri: 'state'
},
{
rel: 'up',
href: '/job',
method: 'GET',
type: 'up',
uri: 'job'
}
]
}
export const mockAuthConfig: AuthConfig = {
client: 'cl13nt',
secret: '53cr3t',
access_token: 'acc355',
refresh_token: 'r3fr35h'
}
export class MockStream {
_write(chunk: string, _: any, next: Function) {
next()
}
reset() {}
destroy() {}
}

View File

@@ -0,0 +1,313 @@
import { Logger, LogLevel } from '@sasjs/utils'
import * as fileModule from '@sasjs/utils/file'
import { RequestClient } from '../../../request/RequestClient'
import { mockAuthConfig, mockJob } from './mockResponses'
import { pollJobState } from '../pollJobState'
import * as getTokensModule from '../../../auth/getTokens'
import * as saveLogModule from '../saveLog'
import { PollOptions } from '../../../types'
import { WriteStream } from 'fs'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
const defaultPollOptions: PollOptions = {
maxPollCount: 100,
pollInterval: 500,
streamLog: false
}
describe('pollJobState', () => {
beforeEach(() => {
;(process as any).logger = new Logger(LogLevel.Off)
setupMocks()
})
it('should get valid tokens if the authConfig has been provided', async () => {
await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
defaultPollOptions
)
expect(getTokensModule.getTokens).toHaveBeenCalledWith(
requestClient,
mockAuthConfig
)
})
it('should not attempt to get tokens if the authConfig has not been provided', async () => {
await pollJobState(
requestClient,
mockJob,
false,
undefined,
defaultPollOptions
)
expect(getTokensModule.getTokens).not.toHaveBeenCalled()
})
it('should throw an error if the job does not have a state link', async () => {
const error = await pollJobState(
requestClient,
{ ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'state') },
false,
undefined,
defaultPollOptions
).catch((e) => e)
expect((error as Error).message).toContain('Job state link was not found.')
})
it('should attempt to refresh tokens before each poll', async () => {
mockSimplePoll()
await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
defaultPollOptions
)
expect(getTokensModule.getTokens).toHaveBeenCalledTimes(3)
})
it('should attempt to fetch and save the log after each poll when streamLog is true', async () => {
mockSimplePoll()
await pollJobState(requestClient, mockJob, false, mockAuthConfig, {
...defaultPollOptions,
streamLog: true
})
expect(saveLogModule.saveLog).toHaveBeenCalledTimes(2)
})
it('should not attempt to fetch and save the log after each poll when streamLog is false', async () => {
mockSimplePoll()
await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
defaultPollOptions
)
expect(saveLogModule.saveLog).not.toHaveBeenCalled()
})
it('should return the current status when the max poll count is reached', async () => {
mockRunningPoll()
const state = await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
{
...defaultPollOptions,
maxPollCount: 1
}
)
expect(state).toEqual('running')
})
it('should poll with a larger interval for longer running jobs', async () => {
mockLongPoll()
const state = await pollJobState(
requestClient,
mockJob,
false,
mockAuthConfig,
{
...defaultPollOptions,
maxPollCount: 200,
pollInterval: 10
}
)
expect(state).toEqual('completed')
}, 200000)
it('should continue polling until the job completes or errors', async () => {
mockSimplePoll(1)
const state = await pollJobState(
requestClient,
mockJob,
false,
undefined,
defaultPollOptions
)
expect(requestClient.get).toHaveBeenCalledTimes(2)
expect(state).toEqual('completed')
})
it('should print the state to the console when debug is on', async () => {
jest.spyOn((process as any).logger, 'info')
mockSimplePoll()
await pollJobState(
requestClient,
mockJob,
true,
undefined,
defaultPollOptions
)
expect((process as any).logger.info).toHaveBeenCalledTimes(4)
expect((process as any).logger.info).toHaveBeenNthCalledWith(
1,
'Polling job status...'
)
expect((process as any).logger.info).toHaveBeenNthCalledWith(
2,
'Current job state: running'
)
expect((process as any).logger.info).toHaveBeenNthCalledWith(
3,
'Polling job status...'
)
expect((process as any).logger.info).toHaveBeenNthCalledWith(
4,
'Current job state: completed'
)
})
it('should continue polling when there is a single error in between', async () => {
mockPollWithSingleError()
const state = await pollJobState(
requestClient,
mockJob,
false,
undefined,
defaultPollOptions
)
expect(requestClient.get).toHaveBeenCalledTimes(2)
expect(state).toEqual('completed')
})
it('should throw an error when the error count exceeds the set value of 5', async () => {
mockErroredPoll()
const error = await pollJobState(
requestClient,
mockJob,
false,
undefined,
defaultPollOptions
).catch((e) => e)
expect(error.message).toEqual(
'Error while polling job state for job j0b: Status Error'
)
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../../request/RequestClient')
jest.mock('../../../auth/getTokens')
jest.mock('../saveLog')
jest.mock('@sasjs/utils/file')
jest
.spyOn(requestClient, 'get')
.mockImplementation(() =>
Promise.resolve({ result: 'completed', etag: '', status: 200 })
)
jest
.spyOn(getTokensModule, 'getTokens')
.mockImplementation(() => Promise.resolve(mockAuthConfig))
jest
.spyOn(saveLogModule, 'saveLog')
.mockImplementation(() => Promise.resolve())
jest
.spyOn(fileModule, 'createWriteStream')
.mockImplementation(() => Promise.resolve({} as unknown as WriteStream))
}
const mockSimplePoll = (runningCount = 2) => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result:
count === 0
? 'pending'
: count <= runningCount
? 'running'
: 'completed',
etag: '',
status: 200
})
})
}
const mockRunningPoll = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result: count === 0 ? 'pending' : 'running',
etag: '',
status: 200
})
})
}
const mockLongPoll = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.resolve({
result: count <= 101 ? 'running' : 'completed',
etag: '',
status: 200
})
})
}
const mockPollWithSingleError = () => {
let count = 0
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
count++
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
if (count === 1) {
return Promise.reject('Status Error')
}
return Promise.resolve({
result: count === 0 ? 'pending' : 'completed',
etag: '',
status: 200
})
})
}
const mockErroredPoll = () => {
jest.spyOn(requestClient, 'get').mockImplementation((url) => {
if (url.includes('job')) {
return Promise.resolve({ result: mockJob, etag: '', status: 200 })
}
return Promise.reject('Status Error')
})
}

View File

@@ -0,0 +1,73 @@
import { Logger, LogLevel } from '@sasjs/utils'
import { RequestClient } from '../../../request/RequestClient'
import * as fetchLogsModule from '../../../utils/fetchLogByChunks'
import * as writeStreamModule from '../writeStream'
import { saveLog } from '../saveLog'
import { mockJob } from './mockResponses'
import { WriteStream } from 'fs'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
const stream = {} as unknown as WriteStream
describe('saveLog', () => {
beforeEach(() => {
;(process as any).logger = new Logger(LogLevel.Off)
setupMocks()
})
it('should throw an error when a valid access token is not provided', async () => {
const error = await saveLog(mockJob, requestClient, 0, 100, stream).catch(
(e) => e
)
expect(error.message).toContain(
`Logs for job ${mockJob.id} cannot be fetched without a valid access token.`
)
})
it('should throw an error when the log URL is not available', async () => {
const error = await saveLog(
{ ...mockJob, links: mockJob.links.filter((l) => l.rel !== 'log') },
requestClient,
0,
100,
stream,
't0k3n'
).catch((e) => e)
expect(error.message).toContain(
`Log URL for job ${mockJob.id} was not found.`
)
})
it('should fetch and save logs to the given path', async () => {
await saveLog(mockJob, requestClient, 0, 100, stream, 't0k3n')
expect(fetchLogsModule.fetchLog).toHaveBeenCalledWith(
requestClient,
't0k3n',
'/log/content',
0,
100
)
expect(writeStreamModule.writeStream).toHaveBeenCalledWith(
stream,
'Test Log'
)
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../../request/RequestClient')
jest.mock('../../../utils/fetchLogByChunks')
jest.mock('@sasjs/utils')
jest.mock('../writeStream')
jest
.spyOn(fetchLogsModule, 'fetchLog')
.mockImplementation(() => Promise.resolve('Test Log'))
jest
.spyOn(writeStreamModule, 'writeStream')
.mockImplementation(() => Promise.resolve())
}

View File

@@ -0,0 +1,67 @@
import { RequestClient } from '../../../request/RequestClient'
import * as convertToCsvModule from '../../../utils/convertToCsv'
import { uploadTables } from '../uploadTables'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
describe('uploadTables', () => {
beforeEach(() => {
setupMocks()
})
it('should return a list of uploaded files', async () => {
const data = { foo: 'bar' }
const files = await uploadTables(requestClient, data, 't0k3n')
expect(files).toEqual([{ tableName: 'foo', file: 'test-file' }])
expect(requestClient.uploadFile).toHaveBeenCalledTimes(1)
expect(requestClient.uploadFile).toHaveBeenCalledWith(
'/files/files#rawUpload',
'Test CSV',
't0k3n'
)
})
it('should throw an error when the CSV exceeds the maximum length', async () => {
const data = { foo: 'bar' }
jest
.spyOn(convertToCsvModule, 'convertToCSV')
.mockImplementation(() => 'ERROR: LARGE STRING LENGTH')
const error = await uploadTables(requestClient, data, 't0k3n').catch(
(e) => e
)
expect(requestClient.uploadFile).not.toHaveBeenCalled()
expect(error.message).toEqual(
'The max length of a string value in SASjs is 32765 characters.'
)
})
it('should throw an error when the file upload fails', async () => {
const data = { foo: 'bar' }
jest
.spyOn(requestClient, 'uploadFile')
.mockImplementation(() => Promise.reject('Upload Error'))
const error = await uploadTables(requestClient, data, 't0k3n').catch(
(e) => e
)
expect(error).toContain('Error while uploading file.')
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../../utils/convertToCsv')
jest
.spyOn(convertToCsvModule, 'convertToCSV')
.mockImplementation(() => 'Test CSV')
jest
.spyOn(requestClient, 'uploadFile')
.mockImplementation(() =>
Promise.resolve({ result: 'test-file', etag: '' })
)
}

View File

@@ -0,0 +1,37 @@
import { prefixMessage } from '@sasjs/utils/error'
import { RequestClient } from '../../request/RequestClient'
import { convertToCSV } from '../../utils/convertToCsv'
/**
* Uploads tables to SAS as specially formatted CSVs.
* This is more compact than JSON, and easier to read within SAS.
* @param requestClient - the pre-configured HTTP request client
* @param data - the JSON representation of the data to be uploaded
* @param accessToken - an optional access token for authentication/authorization
* The access token is not required when uploading tables from the browser.
*/
export async function uploadTables(
requestClient: RequestClient,
data: any,
accessToken?: string
) {
const uploadedFiles = []
for (const tableName in data) {
const csv = convertToCSV(data[tableName])
if (csv === 'ERROR: LARGE STRING LENGTH') {
throw new Error(
'The max length of a string value in SASjs is 32765 characters.'
)
}
const uploadResponse = await requestClient
.uploadFile(`/files/files#rawUpload`, csv, accessToken)
.catch((err) => {
throw prefixMessage(err, 'Error while uploading file. ')
})
uploadedFiles.push({ tableName, file: uploadResponse.result })
}
return uploadedFiles
}

View File

@@ -0,0 +1,15 @@
import { WriteStream } from 'fs'
export const writeStream = async (
stream: WriteStream,
content: string
): Promise<void> => {
return new Promise((resolve, reject) => {
stream.write(content + '\n\nnext chunk\n\n', (e) => {
if (e) {
return reject(e)
}
return resolve()
})
})
}

View File

@@ -0,0 +1,53 @@
import { SasAuthResponse } from '@sasjs/utils/types'
import { prefixMessage } from '@sasjs/utils/error'
import * as NodeFormData from 'form-data'
import { RequestClient } from '../request/RequestClient'
/**
* Exchanges the auth code for an access token for the given client.
* @param requestClient - the pre-configured HTTP request client
* @param clientId - the client ID to authenticate with.
* @param clientSecret - the client secret to authenticate with.
* @param authCode - the auth code received from the server.
*/
export async function getAccessToken(
requestClient: RequestClient,
clientId: string,
clientSecret: string,
authCode: string
): Promise<SasAuthResponse> {
const url = '/SASLogon/oauth/token'
let token
if (typeof Buffer === 'undefined') {
token = btoa(clientId + ':' + clientSecret)
} else {
token = Buffer.from(clientId + ':' + clientSecret).toString('base64')
}
const headers = {
Authorization: 'Basic ' + token
}
let formData
if (typeof FormData === 'undefined') {
formData = new NodeFormData()
} else {
formData = new FormData()
}
formData.append('grant_type', 'authorization_code')
formData.append('code', authCode)
const authResponse = await requestClient
.post(
url,
formData,
undefined,
'multipart/form-data; boundary=' + (formData as any)._boundary,
headers
)
.then((res) => res.result as SasAuthResponse)
.catch((err) => {
throw prefixMessage(err, 'Error while getting access token')
})
return authResponse
}

40
src/auth/getTokens.ts Normal file
View File

@@ -0,0 +1,40 @@
import {
AuthConfig,
isAccessTokenExpiring,
isRefreshTokenExpiring,
hasTokenExpired
} from '@sasjs/utils'
import { RequestClient } from '../request/RequestClient'
import { refreshTokens } from './refreshTokens'
/**
* Returns the auth configuration, refreshing the tokens if necessary.
* @param requestClient - the pre-configured HTTP request client
* @param authConfig - an object containing a client ID, secret, access token and refresh token
*/
export async function getTokens(
requestClient: RequestClient,
authConfig: AuthConfig
): Promise<AuthConfig> {
const logger = process.logger || console
let { access_token, refresh_token, client, secret } = authConfig
if (
isAccessTokenExpiring(access_token) ||
isRefreshTokenExpiring(refresh_token)
) {
if (hasTokenExpired(refresh_token)) {
const error =
'Unable to obtain new access token. Your refresh token has expired.'
logger.error(error)
throw new Error(error)
}
logger.info('Refreshing access and refresh tokens.')
;({ access_token, refresh_token } = await refreshTokens(
requestClient,
client,
secret,
refresh_token
))
}
return { access_token, refresh_token, client, secret }
}

49
src/auth/refreshTokens.ts Normal file
View File

@@ -0,0 +1,49 @@
import { SasAuthResponse } from '@sasjs/utils/types'
import { prefixMessage } from '@sasjs/utils/error'
import * as NodeFormData from 'form-data'
import { RequestClient } from '../request/RequestClient'
/**
* Exchanges the refresh token for an access token for the given client.
* @param requestClient - the pre-configured HTTP request client
* @param clientId - the client ID to authenticate with.
* @param clientSecret - the client secret to authenticate with.
* @param authCode - the refresh token received from the server.
*/
export async function refreshTokens(
requestClient: RequestClient,
clientId: string,
clientSecret: string,
refreshToken: string
) {
const url = '/SASLogon/oauth/token'
let token
token =
typeof Buffer === 'undefined'
? btoa(clientId + ':' + clientSecret)
: Buffer.from(clientId + ':' + clientSecret).toString('base64')
const headers = {
Authorization: 'Basic ' + token
}
const formData =
typeof FormData === 'undefined' ? new NodeFormData() : new FormData()
formData.append('grant_type', 'refresh_token')
formData.append('refresh_token', refreshToken)
const authResponse = await requestClient
.post<SasAuthResponse>(
url,
formData,
undefined,
'multipart/form-data; boundary=' + (formData as any)._boundary,
headers
)
.then((res) => res.result)
.catch((err) => {
throw prefixMessage(err, 'Error while refreshing tokens')
})
return authResponse
}

View File

@@ -0,0 +1,75 @@
import { AuthConfig } from '@sasjs/utils'
import * as NodeFormData from 'form-data'
import { generateToken, mockAuthResponse } from './mockResponses'
import { RequestClient } from '../../request/RequestClient'
import { getAccessToken } from '../getAccessToken'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
describe('getAccessToken', () => {
it('should attempt to refresh tokens', async () => {
setupMocks()
const access_token = generateToken(30)
const refresh_token = generateToken(30)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
jest
.spyOn(requestClient, 'post')
.mockImplementation(() =>
Promise.resolve({ result: mockAuthResponse, etag: '' })
)
const token = Buffer.from(
authConfig.client + ':' + authConfig.secret
).toString('base64')
await getAccessToken(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
)
expect(requestClient.post).toHaveBeenCalledWith(
'/SASLogon/oauth/token',
expect.any(NodeFormData),
undefined,
expect.stringContaining('multipart/form-data; boundary='),
{
Authorization: 'Basic ' + token
}
)
})
it('should handle errors while refreshing tokens', async () => {
setupMocks()
const access_token = generateToken(30)
const refresh_token = generateToken(30)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
jest
.spyOn(requestClient, 'post')
.mockImplementation(() => Promise.reject('Token Error'))
const error = await getAccessToken(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
).catch((e) => e)
expect(error).toContain('Error while getting access token')
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../request/RequestClient')
}

View File

@@ -0,0 +1,79 @@
import { AuthConfig } from '@sasjs/utils'
import * as refreshTokensModule from '../refreshTokens'
import { generateToken, mockAuthResponse } from './mockResponses'
import { getTokens } from '../getTokens'
import { RequestClient } from '../../request/RequestClient'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
describe('getTokens', () => {
it('should attempt to refresh tokens if the access token is expiring', async () => {
setupMocks()
const access_token = generateToken(30)
const refresh_token = generateToken(86400000)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
await getTokens(requestClient, authConfig)
expect(refreshTokensModule.refreshTokens).toHaveBeenCalledWith(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
)
})
it('should attempt to refresh tokens if the refresh token is expiring', async () => {
setupMocks()
const access_token = generateToken(86400000)
const refresh_token = generateToken(30)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
await getTokens(requestClient, authConfig)
expect(refreshTokensModule.refreshTokens).toHaveBeenCalledWith(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
)
})
it('should throw an error if the refresh token has already expired', async () => {
setupMocks()
const access_token = generateToken(86400000)
const refresh_token = generateToken(-36000)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
const expectedError =
'Unable to obtain new access token. Your refresh token has expired.'
const error = await getTokens(requestClient, authConfig).catch((e) => e)
expect(error.message).toEqual(expectedError)
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../request/RequestClient')
jest.mock('../refreshTokens')
jest
.spyOn(refreshTokensModule, 'refreshTokens')
.mockImplementation(() => Promise.resolve(mockAuthResponse))
}

View File

@@ -1,2 +1,24 @@
import { SasAuthResponse } from '@sasjs/utils/types'
export const mockLoginAuthoriseRequiredResponse = `<form id="application_authorization" action="/SASLogon/oauth/authorize" method="POST"><input type="hidden" name="X-Uaa-Csrf" value="2nfuxIn6WaOURWL7tzTXCe"/>`
export const mockLoginSuccessResponse = `You have signed in`
export const mockAuthResponse: SasAuthResponse = {
access_token: 'acc355',
refresh_token: 'r3fr35h',
id_token: 'id',
token_type: 'bearer',
expires_in: new Date().valueOf(),
scope: 'default',
jti: 'test'
}
export const generateToken = (timeToLiveSeconds: number): string => {
const exp =
new Date(new Date().getTime() + timeToLiveSeconds * 1000).getTime() / 1000
const header = 'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9'
const payload = Buffer.from(JSON.stringify({ exp })).toString('base64')
const signature = '4-iaDojEVl0pJQMjrbM1EzUIfAZgsbK_kgnVyVxFSVo'
const token = `${header}.${payload}.${signature}`
return token
}

View File

@@ -0,0 +1,75 @@
import { AuthConfig } from '@sasjs/utils'
import * as NodeFormData from 'form-data'
import { generateToken, mockAuthResponse } from './mockResponses'
import { RequestClient } from '../../request/RequestClient'
import { refreshTokens } from '../refreshTokens'
const requestClient = new (<jest.Mock<RequestClient>>RequestClient)()
describe('refreshTokens', () => {
it('should attempt to refresh tokens', async () => {
setupMocks()
const access_token = generateToken(30)
const refresh_token = generateToken(30)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
jest
.spyOn(requestClient, 'post')
.mockImplementation(() =>
Promise.resolve({ result: mockAuthResponse, etag: '' })
)
const token = Buffer.from(
authConfig.client + ':' + authConfig.secret
).toString('base64')
await refreshTokens(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
)
expect(requestClient.post).toHaveBeenCalledWith(
'/SASLogon/oauth/token',
expect.any(NodeFormData),
undefined,
expect.stringContaining('multipart/form-data; boundary='),
{
Authorization: 'Basic ' + token
}
)
})
it('should handle errors while refreshing tokens', async () => {
setupMocks()
const access_token = generateToken(30)
const refresh_token = generateToken(30)
const authConfig: AuthConfig = {
access_token,
refresh_token,
client: 'cl13nt',
secret: 's3cr3t'
}
jest
.spyOn(requestClient, 'post')
.mockImplementation(() => Promise.reject('Token Error'))
const error = await refreshTokens(
requestClient,
authConfig.client,
authConfig.secret,
authConfig.refresh_token
).catch((e) => e)
expect(error).toContain('Error while refreshing tokens')
})
})
const setupMocks = () => {
jest.restoreAllMocks()
jest.mock('../../request/RequestClient')
}

View File

@@ -43,6 +43,7 @@ export interface HttpClient {
getCsrfToken(type: 'general' | 'file'): CsrfToken | undefined
clearCsrfTokens(): void
getBaseUrl(): string
}
export class RequestClient implements HttpClient {
@@ -78,13 +79,17 @@ export class RequestClient implements HttpClient {
this.fileUploadCsrfToken = { headerName: '', value: '' }
}
public getBaseUrl() {
return this.httpClient.defaults.baseURL || ''
}
public async get<T>(
url: string,
accessToken: string | undefined,
contentType: string = 'application/json',
overrideHeaders: { [key: string]: string | number } = {},
debug: boolean = false
): Promise<{ result: T; etag: string }> {
): Promise<{ result: T; etag: string; status: number }> {
const headers = {
...this.getHeaders(accessToken, contentType),
...overrideHeaders
@@ -429,8 +434,8 @@ export class RequestClient implements HttpClient {
throw new Error('Valid JSON could not be extracted from response.')
}
isValidJson(weboutResponse)
parsedResponse = JSON.parse(weboutResponse)
const jsonResponse = isValidJson(weboutResponse)
parsedResponse = jsonResponse
} catch {
parsedResponse = response.data
}
@@ -438,9 +443,15 @@ export class RequestClient implements HttpClient {
includeSAS9Log = true
}
let responseToReturn: { result: T; etag: any; log?: string } = {
let responseToReturn: {
result: T
etag: any
log?: string
status: number
} = {
result: parsedResponse as T,
etag
etag,
status: response.status
}
if (includeSAS9Log) {

View File

@@ -39,7 +39,7 @@ export class Sas9RequestClient extends RequestClient {
contentType: string = 'application/json',
overrideHeaders: { [key: string]: string | number } = {},
debug: boolean = false
): Promise<{ result: T; etag: string }> {
): Promise<{ result: T; etag: string; status: number }> {
const headers = {
...this.getHeaders(accessToken, contentType),
...overrideHeaders

View File

@@ -1,7 +1,9 @@
import { SessionManager } from '../SessionManager'
import * as dotenv from 'dotenv'
import { RequestClient } from '../request/RequestClient'
import { NoSessionStateError } from '../types/errors'
import * as dotenv from 'dotenv'
import axios from 'axios'
jest.mock('axios')
const mockedAxios = axios as jest.Mocked<typeof axios>
@@ -43,4 +45,38 @@ describe('SessionManager', () => {
).resolves.toEqual(expectedResponse)
})
})
describe('waitForSession', () => {
it('should reject with NoSessionStateError if SAS server did not provide session state', async () => {
const responseStatus = 304
mockedAxios.get.mockImplementation(() =>
Promise.resolve({ data: '', status: responseStatus })
)
await expect(
sessionManager['waitForSession'](
{
id: 'id',
state: '',
links: [
{ rel: 'state', href: '', uri: '', type: '', method: 'GET' }
],
attributes: {
sessionInactiveTimeout: 0
},
creationTimeStamp: ''
},
null,
'access_token'
)
).rejects.toEqual(
new NoSessionStateError(
responseStatus,
process.env.SERVER_URL as string,
'logUrl'
)
)
})
})
})

View File

@@ -0,0 +1,31 @@
import { isValidJson } from '../../utils'
describe('jsonValidator', () => {
it('should not throw an error with an valid json', () => {
const json = {
test: 'test'
}
expect(isValidJson(json)).toBe(json)
})
it('should not throw an error with an valid json string', () => {
const json = {
test: 'test'
}
expect(isValidJson(JSON.stringify(json))).toStrictEqual(json)
})
it('should throw an error with an invalid json', () => {
const json = `{\"test\":\"test\"\"test2\":\"test\"}`
expect(() => {
try {
isValidJson(json)
} catch (err) {
throw new Error()
}
}).toThrowError
})
})

View File

@@ -1,4 +1,6 @@
export interface PollOptions {
MAX_POLL_COUNT?: number
POLL_INTERVAL?: number
maxPollCount: number
pollInterval: number
streamLog: boolean
logFolderPath?: string
}

View File

@@ -0,0 +1,11 @@
export class JobStatePollError extends Error {
constructor(id: string, public originalError: Error) {
super(
`Error while polling job state for job ${id}: ${
originalError.message || originalError
}`
)
this.name = 'JobStatePollError'
Object.setPrototypeOf(this, JobStatePollError.prototype)
}
}

View File

@@ -0,0 +1,15 @@
export class NoSessionStateError extends Error {
constructor(
public serverResponseStatus: number,
public sessionStateUrl: string,
public logUrl: string
) {
super(
`Could not get session state. Server responded with ${serverResponseStatus} whilst checking state: ${sessionStateUrl}`
)
this.name = 'NoSessionStatus'
Object.setPrototypeOf(this, NoSessionStateError.prototype)
}
}

View File

@@ -2,6 +2,8 @@ export * from './AuthorizeError'
export * from './ComputeJobExecutionError'
export * from './InternalServerError'
export * from './JobExecutionError'
export * from './JobStatePollError'
export * from './LoginRequiredError'
export * from './NotFoundError'
export * from './ErrorResponse'
export * from './NoSessionStateError'

View File

@@ -14,18 +14,36 @@ export const fetchLogByChunks = async (
accessToken: string,
logUrl: string,
logCount: number
): Promise<string> => {
return await fetchLog(requestClient, accessToken, logUrl, 0, logCount)
}
/**
* Fetches a section of the log file delineated by start and end lines
* @param {object} requestClient - client object of Request Client.
* @param {string} accessToken - an access token for an authorized user.
* @param {string} logUrl - url of the log file.
* @param {number} start - the line at which to start fetching the log.
* @param {number} end - the line at which to stop fetching the log.
* @returns an string containing log lines.
*/
export const fetchLog = async (
requestClient: RequestClient,
accessToken: string,
logUrl: string,
start: number,
end: number
): Promise<string> => {
const logger = process.logger || console
let log: string = ''
const loglimit = logCount < 10000 ? logCount : 10000
let start = 0
const loglimit = end < 10000 ? end : 10000
do {
logger.info(
`Fetching logs from line no: ${start + 1} to ${
start + loglimit
} of ${logCount}.`
} of ${end}.`
)
const logChunkJson = await requestClient!
.get<any>(`${logUrl}?start=${start}&limit=${loglimit}`, accessToken)
@@ -40,6 +58,6 @@ export const fetchLogByChunks = async (
log += logChunk
start += loglimit
} while (start < logCount)
} while (start < end)
return log
}

View File

@@ -2,9 +2,11 @@
* Checks if string is in valid JSON format else throw error.
* @param str - string to check.
*/
export const isValidJson = (str: string) => {
export const isValidJson = (str: string | object) => {
try {
JSON.parse(str)
if (typeof str === 'object') return str
return JSON.parse(str)
} catch (e) {
throw new Error('Invalid JSON response.')
}

View File

@@ -9,5 +9,5 @@
"sourceMap": true
},
"include": ["src"],
"exclude": ["node_modules", "**/*.spec.ts"]
"exclude": ["node_modules"]
}