1
0
mirror of https://github.com/sasjs/server.git synced 2026-01-03 21:10:05 +00:00

feat: add support for R stored programs

This commit is contained in:
2022-09-06 21:52:21 +05:00
parent b9d032f148
commit d6651bbdbe
13 changed files with 277 additions and 88 deletions

View File

@@ -231,9 +231,45 @@ export class PythonSessionController extends SessionController {
}
}
export class RSessionController extends SessionController {
protected async createSession(): Promise<Session> {
const sessionId = generateUniqueFileName(generateTimestamp())
const sessionFolder = path.join(getSessionsFolder(), sessionId)
const creationTimeStamp = sessionId.split('-').pop() as string
// death time of session is 15 mins from creation
const deathTimeStamp = (
parseInt(creationTimeStamp) +
15 * 60 * 1000 -
1000
).toString()
const session: Session = {
id: sessionId,
ready: true,
inUse: true,
consumed: false,
completed: false,
creationTimeStamp,
deathTimeStamp,
path: sessionFolder
}
const headersPath = path.join(session.path, 'stpsrv_header.txt')
await createFile(headersPath, 'Content-type: text/plain')
this.sessions.push(session)
return session
}
}
export const getSessionController = (
runTime: RunTimeType
): SASSessionController | JSSessionController | PythonSessionController => {
):
| SASSessionController
| JSSessionController
| PythonSessionController
| RSessionController => {
if (runTime === RunTimeType.SAS) {
return getSASSessionController()
}
@@ -246,6 +282,10 @@ export const getSessionController = (
return getPythonSessionController()
}
if (runTime === RunTimeType.R) {
return getRSessionController()
}
throw new Error('No Runtime is configured')
}
@@ -273,6 +313,14 @@ const getPythonSessionController = (): PythonSessionController => {
return process.pythonSessionController
}
const getRSessionController = (): RSessionController => {
if (process.rSessionController) return process.rSessionController
process.rSessionController = new RSessionController()
return process.rSessionController
}
const autoExecContent = `
data _null_;
/* remove the dummy SYSIN */

View File

@@ -0,0 +1,68 @@
import { isWindows } from '@sasjs/utils'
import { PreProgramVars, Session } from '../../types'
import { generateFileUploadRCode } from '../../utils'
import { ExecutionVars } from '.'
export const createRProgram = async (
program: string,
preProgramVariables: PreProgramVars,
vars: ExecutionVars,
session: Session,
weboutPath: string,
headersPath: string,
tokenFile: string,
otherArgs?: any
) => {
const varStatments = Object.keys(vars).reduce(
(computed: string, key: string) => `${computed}.${key} <- '${vars[key]}'\n`,
''
)
const preProgramVarStatments = `
._SASJS_SESSION_PATH <- '${
isWindows() ? session.path.replace(/\\/g, '\\\\') : session.path
}';
._WEBOUT <- '${isWindows() ? weboutPath.replace(/\\/g, '\\\\') : weboutPath}';
._SASJS_WEBOUT_HEADERS <- '${headersPath}';
._SASJS_TOKENFILE <- '${
isWindows() ? tokenFile.replace(/\\/g, '\\\\') : tokenFile
}';
._SASJS_USERNAME <- '${preProgramVariables?.username}';
._SASJS_USERID <- '${preProgramVariables?.userId}';
._SASJS_DISPLAYNAME <- '${preProgramVariables?.displayName}';
._METAPERSON <- ._SASJS_DISPLAYNAME;
._METAUSER <- ._SASJS_USERNAME;
SASJSPROCESSMODE <- 'Stored Program';
`
const requiredModules = ``
program = `
# runtime vars
${varStatments}
# dynamic user-provided vars
${preProgramVarStatments}
# change working directory to session folder
setwd(._SASJS_SESSION_PATH)
# actual job code
${program}
`
// if no files are uploaded filesNamesMap will be undefined
if (otherArgs?.filesNamesMap) {
const uploadRCode = await generateFileUploadRCode(
otherArgs.filesNamesMap,
session.path
)
// If any files are uploaded, the program needs to be updated with some
// dynamically generated variables (pointers) for ease of ingestion
if (uploadRCode.length > 0) {
program = `${uploadRCode}\n` + program
}
}
return requiredModules + program
}

View File

@@ -5,4 +5,5 @@ export * from './FileUploadController'
export * from './createSASProgram'
export * from './createJSProgram'
export * from './createPythonProgram'
export * from './createRProgram'
export * from './processProgram'

View File

@@ -9,7 +9,8 @@ import {
ExecutionVars,
createSASProgram,
createJSProgram,
createPythonProgram
createPythonProgram,
createRProgram
} from './'
export const processProgram = async (
@@ -24,81 +25,7 @@ export const processProgram = async (
logPath: string,
otherArgs?: any
) => {
if (runTime === RunTimeType.JS) {
program = await createJSProgram(
program,
preProgramVariables,
vars,
session,
weboutPath,
headersPath,
tokenFile,
otherArgs
)
const codePath = path.join(session.path, 'code.js')
try {
await createFile(codePath, program)
// create a stream that will write to console outputs to log file
const writeStream = fs.createWriteStream(logPath)
// waiting for the open event so that we can have underlying file descriptor
await once(writeStream, 'open')
execFileSync(process.nodeLoc!, [codePath], {
stdio: ['ignore', writeStream, writeStream]
})
// copy the code.js program to log and end write stream
writeStream.end(program)
session.completed = true
console.log('session completed', session)
} catch (err: any) {
session.completed = true
session.crashed = err.toString()
console.log('session crashed', session.id, session.crashed)
}
} else if (runTime === RunTimeType.PY) {
program = await createPythonProgram(
program,
preProgramVariables,
vars,
session,
weboutPath,
headersPath,
tokenFile,
otherArgs
)
const codePath = path.join(session.path, 'code.py')
try {
await createFile(codePath, program)
// create a stream that will write to console outputs to log file
const writeStream = fs.createWriteStream(logPath)
// waiting for the open event so that we can have underlying file descriptor
await once(writeStream, 'open')
execFileSync(process.pythonLoc!, [codePath], {
stdio: ['ignore', writeStream, writeStream]
})
// copy the code.py program to log and end write stream
writeStream.end(program)
session.completed = true
console.log('session completed', session)
} catch (err: any) {
session.completed = true
session.crashed = err.toString()
console.log('session crashed', session.id, session.crashed)
}
} else {
if (runTime === RunTimeType.SAS) {
program = await createSASProgram(
program,
preProgramVariables,
@@ -124,6 +51,82 @@ export const processProgram = async (
while (!session.completed) {
await delay(50)
}
} else {
let codePath: string
let executablePath: string
switch (runTime) {
case RunTimeType.JS:
program = await createJSProgram(
program,
preProgramVariables,
vars,
session,
weboutPath,
headersPath,
tokenFile,
otherArgs
)
codePath = path.join(session.path, 'code.js')
executablePath = process.nodeLoc!
break
case RunTimeType.PY:
program = await createPythonProgram(
program,
preProgramVariables,
vars,
session,
weboutPath,
headersPath,
tokenFile,
otherArgs
)
codePath = path.join(session.path, 'code.py')
executablePath = process.pythonLoc!
break
case RunTimeType.R:
program = await createRProgram(
program,
preProgramVariables,
vars,
session,
weboutPath,
headersPath,
tokenFile,
otherArgs
)
codePath = path.join(session.path, 'code.r')
executablePath = process.rscriptLoc!
break
default:
throw new Error('Invalid runtime!')
}
try {
await createFile(codePath, program)
// create a stream that will write to console outputs to log file
const writeStream = fs.createWriteStream(logPath)
// waiting for the open event so that we can have underlying file descriptor
await once(writeStream, 'open')
execFileSync(executablePath, [codePath], {
stdio: ['ignore', writeStream, writeStream]
})
// copy the code file to log and end write stream
writeStream.end(program)
session.completed = true
console.log('session completed', session)
} catch (err: any) {
session.completed = true
session.crashed = err.toString()
console.log('session crashed', session.id, session.crashed)
}
}
}