mirror of
https://github.com/sasjs/server.git
synced 2025-12-10 19:34:34 +00:00
feat: add support for R stored programs
This commit is contained in:
@@ -69,11 +69,11 @@ MODE=
|
||||
# Possible options at the moment are sas and js
|
||||
|
||||
# This string sets the priority of the available analytic runtimes
|
||||
# Valid runtimes are SAS (sas), JavaScript (js) and Python (py)
|
||||
# Valid runtimes are SAS (sas), JavaScript (js), Python (py) and R (r)
|
||||
# For each option provided, there should be a corresponding path,
|
||||
# eg SAS_PATH, NODE_PATH or PYTHON_PATH
|
||||
# eg SAS_PATH, NODE_PATH, PYTHON_PATH or RSCRIPT_PATH
|
||||
# Priority is given to runtimes earlier in the string
|
||||
# Example options: [sas,js,py | js,py | sas | sas,js]
|
||||
# Example options: [sas,js,py | js,py | sas | sas,js | r | sas,r]
|
||||
RUN_TIMES=
|
||||
|
||||
# Path to SAS executable (sas.exe / sas.sh)
|
||||
@@ -85,6 +85,9 @@ NODE_PATH=~/.nvm/versions/node/v16.14.0/bin/node
|
||||
# Path to Python executable
|
||||
PYTHON_PATH=/usr/bin/python
|
||||
|
||||
# Path to Rscript
|
||||
RSCRIPT_PATH=/usr/bin/Rscript
|
||||
|
||||
# Path to working directory
|
||||
# This location is for SAS WORK, staged files, DRIVE, configuration etc
|
||||
SASJS_ROOT=./sasjs_root
|
||||
|
||||
@@ -18,6 +18,7 @@ RUN_TIMES=[sas,js,py | js,py | sas | sas,js] default considered as sas
|
||||
SAS_PATH=/opt/sas/sas9/SASHome/SASFoundation/9.4/sas
|
||||
NODE_PATH=~/.nvm/versions/node/v16.14.0/bin/node
|
||||
PYTHON_PATH=/usr/bin/python
|
||||
RSCRIPT_PATH=/usr/bin/Rscript
|
||||
|
||||
SASJS_ROOT=./sasjs_root
|
||||
|
||||
|
||||
@@ -231,9 +231,45 @@ export class PythonSessionController extends SessionController {
|
||||
}
|
||||
}
|
||||
|
||||
export class RSessionController extends SessionController {
|
||||
protected async createSession(): Promise<Session> {
|
||||
const sessionId = generateUniqueFileName(generateTimestamp())
|
||||
const sessionFolder = path.join(getSessionsFolder(), sessionId)
|
||||
|
||||
const creationTimeStamp = sessionId.split('-').pop() as string
|
||||
// death time of session is 15 mins from creation
|
||||
const deathTimeStamp = (
|
||||
parseInt(creationTimeStamp) +
|
||||
15 * 60 * 1000 -
|
||||
1000
|
||||
).toString()
|
||||
|
||||
const session: Session = {
|
||||
id: sessionId,
|
||||
ready: true,
|
||||
inUse: true,
|
||||
consumed: false,
|
||||
completed: false,
|
||||
creationTimeStamp,
|
||||
deathTimeStamp,
|
||||
path: sessionFolder
|
||||
}
|
||||
|
||||
const headersPath = path.join(session.path, 'stpsrv_header.txt')
|
||||
await createFile(headersPath, 'Content-type: text/plain')
|
||||
|
||||
this.sessions.push(session)
|
||||
return session
|
||||
}
|
||||
}
|
||||
|
||||
export const getSessionController = (
|
||||
runTime: RunTimeType
|
||||
): SASSessionController | JSSessionController | PythonSessionController => {
|
||||
):
|
||||
| SASSessionController
|
||||
| JSSessionController
|
||||
| PythonSessionController
|
||||
| RSessionController => {
|
||||
if (runTime === RunTimeType.SAS) {
|
||||
return getSASSessionController()
|
||||
}
|
||||
@@ -246,6 +282,10 @@ export const getSessionController = (
|
||||
return getPythonSessionController()
|
||||
}
|
||||
|
||||
if (runTime === RunTimeType.R) {
|
||||
return getRSessionController()
|
||||
}
|
||||
|
||||
throw new Error('No Runtime is configured')
|
||||
}
|
||||
|
||||
@@ -273,6 +313,14 @@ const getPythonSessionController = (): PythonSessionController => {
|
||||
return process.pythonSessionController
|
||||
}
|
||||
|
||||
const getRSessionController = (): RSessionController => {
|
||||
if (process.rSessionController) return process.rSessionController
|
||||
|
||||
process.rSessionController = new RSessionController()
|
||||
|
||||
return process.rSessionController
|
||||
}
|
||||
|
||||
const autoExecContent = `
|
||||
data _null_;
|
||||
/* remove the dummy SYSIN */
|
||||
|
||||
68
api/src/controllers/internal/createRProgram.ts
Normal file
68
api/src/controllers/internal/createRProgram.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import { isWindows } from '@sasjs/utils'
|
||||
import { PreProgramVars, Session } from '../../types'
|
||||
import { generateFileUploadRCode } from '../../utils'
|
||||
import { ExecutionVars } from '.'
|
||||
|
||||
export const createRProgram = async (
|
||||
program: string,
|
||||
preProgramVariables: PreProgramVars,
|
||||
vars: ExecutionVars,
|
||||
session: Session,
|
||||
weboutPath: string,
|
||||
headersPath: string,
|
||||
tokenFile: string,
|
||||
otherArgs?: any
|
||||
) => {
|
||||
const varStatments = Object.keys(vars).reduce(
|
||||
(computed: string, key: string) => `${computed}.${key} <- '${vars[key]}'\n`,
|
||||
''
|
||||
)
|
||||
|
||||
const preProgramVarStatments = `
|
||||
._SASJS_SESSION_PATH <- '${
|
||||
isWindows() ? session.path.replace(/\\/g, '\\\\') : session.path
|
||||
}';
|
||||
._WEBOUT <- '${isWindows() ? weboutPath.replace(/\\/g, '\\\\') : weboutPath}';
|
||||
._SASJS_WEBOUT_HEADERS <- '${headersPath}';
|
||||
._SASJS_TOKENFILE <- '${
|
||||
isWindows() ? tokenFile.replace(/\\/g, '\\\\') : tokenFile
|
||||
}';
|
||||
._SASJS_USERNAME <- '${preProgramVariables?.username}';
|
||||
._SASJS_USERID <- '${preProgramVariables?.userId}';
|
||||
._SASJS_DISPLAYNAME <- '${preProgramVariables?.displayName}';
|
||||
._METAPERSON <- ._SASJS_DISPLAYNAME;
|
||||
._METAUSER <- ._SASJS_USERNAME;
|
||||
SASJSPROCESSMODE <- 'Stored Program';
|
||||
`
|
||||
|
||||
const requiredModules = ``
|
||||
|
||||
program = `
|
||||
# runtime vars
|
||||
${varStatments}
|
||||
|
||||
# dynamic user-provided vars
|
||||
${preProgramVarStatments}
|
||||
|
||||
# change working directory to session folder
|
||||
setwd(._SASJS_SESSION_PATH)
|
||||
|
||||
# actual job code
|
||||
${program}
|
||||
|
||||
`
|
||||
// if no files are uploaded filesNamesMap will be undefined
|
||||
if (otherArgs?.filesNamesMap) {
|
||||
const uploadRCode = await generateFileUploadRCode(
|
||||
otherArgs.filesNamesMap,
|
||||
session.path
|
||||
)
|
||||
|
||||
// If any files are uploaded, the program needs to be updated with some
|
||||
// dynamically generated variables (pointers) for ease of ingestion
|
||||
if (uploadRCode.length > 0) {
|
||||
program = `${uploadRCode}\n` + program
|
||||
}
|
||||
}
|
||||
return requiredModules + program
|
||||
}
|
||||
@@ -5,4 +5,5 @@ export * from './FileUploadController'
|
||||
export * from './createSASProgram'
|
||||
export * from './createJSProgram'
|
||||
export * from './createPythonProgram'
|
||||
export * from './createRProgram'
|
||||
export * from './processProgram'
|
||||
|
||||
@@ -9,7 +9,8 @@ import {
|
||||
ExecutionVars,
|
||||
createSASProgram,
|
||||
createJSProgram,
|
||||
createPythonProgram
|
||||
createPythonProgram,
|
||||
createRProgram
|
||||
} from './'
|
||||
|
||||
export const processProgram = async (
|
||||
@@ -24,81 +25,7 @@ export const processProgram = async (
|
||||
logPath: string,
|
||||
otherArgs?: any
|
||||
) => {
|
||||
if (runTime === RunTimeType.JS) {
|
||||
program = await createJSProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
vars,
|
||||
session,
|
||||
weboutPath,
|
||||
headersPath,
|
||||
tokenFile,
|
||||
otherArgs
|
||||
)
|
||||
|
||||
const codePath = path.join(session.path, 'code.js')
|
||||
|
||||
try {
|
||||
await createFile(codePath, program)
|
||||
|
||||
// create a stream that will write to console outputs to log file
|
||||
const writeStream = fs.createWriteStream(logPath)
|
||||
|
||||
// waiting for the open event so that we can have underlying file descriptor
|
||||
await once(writeStream, 'open')
|
||||
|
||||
execFileSync(process.nodeLoc!, [codePath], {
|
||||
stdio: ['ignore', writeStream, writeStream]
|
||||
})
|
||||
|
||||
// copy the code.js program to log and end write stream
|
||||
writeStream.end(program)
|
||||
|
||||
session.completed = true
|
||||
console.log('session completed', session)
|
||||
} catch (err: any) {
|
||||
session.completed = true
|
||||
session.crashed = err.toString()
|
||||
console.log('session crashed', session.id, session.crashed)
|
||||
}
|
||||
} else if (runTime === RunTimeType.PY) {
|
||||
program = await createPythonProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
vars,
|
||||
session,
|
||||
weboutPath,
|
||||
headersPath,
|
||||
tokenFile,
|
||||
otherArgs
|
||||
)
|
||||
|
||||
const codePath = path.join(session.path, 'code.py')
|
||||
|
||||
try {
|
||||
await createFile(codePath, program)
|
||||
|
||||
// create a stream that will write to console outputs to log file
|
||||
const writeStream = fs.createWriteStream(logPath)
|
||||
|
||||
// waiting for the open event so that we can have underlying file descriptor
|
||||
await once(writeStream, 'open')
|
||||
|
||||
execFileSync(process.pythonLoc!, [codePath], {
|
||||
stdio: ['ignore', writeStream, writeStream]
|
||||
})
|
||||
|
||||
// copy the code.py program to log and end write stream
|
||||
writeStream.end(program)
|
||||
|
||||
session.completed = true
|
||||
console.log('session completed', session)
|
||||
} catch (err: any) {
|
||||
session.completed = true
|
||||
session.crashed = err.toString()
|
||||
console.log('session crashed', session.id, session.crashed)
|
||||
}
|
||||
} else {
|
||||
if (runTime === RunTimeType.SAS) {
|
||||
program = await createSASProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
@@ -124,6 +51,82 @@ export const processProgram = async (
|
||||
while (!session.completed) {
|
||||
await delay(50)
|
||||
}
|
||||
} else {
|
||||
let codePath: string
|
||||
let executablePath: string
|
||||
switch (runTime) {
|
||||
case RunTimeType.JS:
|
||||
program = await createJSProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
vars,
|
||||
session,
|
||||
weboutPath,
|
||||
headersPath,
|
||||
tokenFile,
|
||||
otherArgs
|
||||
)
|
||||
codePath = path.join(session.path, 'code.js')
|
||||
executablePath = process.nodeLoc!
|
||||
|
||||
break
|
||||
case RunTimeType.PY:
|
||||
program = await createPythonProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
vars,
|
||||
session,
|
||||
weboutPath,
|
||||
headersPath,
|
||||
tokenFile,
|
||||
otherArgs
|
||||
)
|
||||
codePath = path.join(session.path, 'code.py')
|
||||
executablePath = process.pythonLoc!
|
||||
|
||||
break
|
||||
case RunTimeType.R:
|
||||
program = await createRProgram(
|
||||
program,
|
||||
preProgramVariables,
|
||||
vars,
|
||||
session,
|
||||
weboutPath,
|
||||
headersPath,
|
||||
tokenFile,
|
||||
otherArgs
|
||||
)
|
||||
codePath = path.join(session.path, 'code.r')
|
||||
executablePath = process.rscriptLoc!
|
||||
|
||||
break
|
||||
default:
|
||||
throw new Error('Invalid runtime!')
|
||||
}
|
||||
|
||||
try {
|
||||
await createFile(codePath, program)
|
||||
|
||||
// create a stream that will write to console outputs to log file
|
||||
const writeStream = fs.createWriteStream(logPath)
|
||||
|
||||
// waiting for the open event so that we can have underlying file descriptor
|
||||
await once(writeStream, 'open')
|
||||
|
||||
execFileSync(executablePath, [codePath], {
|
||||
stdio: ['ignore', writeStream, writeStream]
|
||||
})
|
||||
|
||||
// copy the code file to log and end write stream
|
||||
writeStream.end(program)
|
||||
|
||||
session.completed = true
|
||||
console.log('session completed', session)
|
||||
} catch (err: any) {
|
||||
session.completed = true
|
||||
session.crashed = err.toString()
|
||||
console.log('session crashed', session.id, session.crashed)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
2
api/src/types/system/process.d.ts
vendored
2
api/src/types/system/process.d.ts
vendored
@@ -3,12 +3,14 @@ declare namespace NodeJS {
|
||||
sasLoc?: string
|
||||
nodeLoc?: string
|
||||
pythonLoc?: string
|
||||
rscriptLoc?: string
|
||||
driveLoc: string
|
||||
logsLoc: string
|
||||
logsUUID: string
|
||||
sasSessionController?: import('../../controllers/internal').SASSessionController
|
||||
jsSessionController?: import('../../controllers/internal').JSSessionController
|
||||
pythonSessionController?: import('../../controllers/internal').PythonSessionController
|
||||
rSessionController?: import('../../controllers/internal').RSessionController
|
||||
appStreamConfig: import('../').AppStreamConfig
|
||||
logger: import('@sasjs/utils/logger').Logger
|
||||
runTimes: import('../../utils').RunTimeType[]
|
||||
|
||||
@@ -4,9 +4,9 @@ import { createFolder, fileExists, folderExists, isWindows } from '@sasjs/utils'
|
||||
import { RunTimeType } from './verifyEnvVariables'
|
||||
|
||||
export const getDesktopFields = async () => {
|
||||
const { SAS_PATH, NODE_PATH, PYTHON_PATH } = process.env
|
||||
const { SAS_PATH, NODE_PATH, PYTHON_PATH, RSCRIPT_PATH } = process.env
|
||||
|
||||
let sasLoc, nodeLoc, pythonLoc
|
||||
let sasLoc, nodeLoc, pythonLoc, rscriptLoc
|
||||
|
||||
if (process.runTimes.includes(RunTimeType.SAS)) {
|
||||
sasLoc = SAS_PATH ?? (await getSASLocation())
|
||||
@@ -20,7 +20,11 @@ export const getDesktopFields = async () => {
|
||||
pythonLoc = PYTHON_PATH ?? (await getPythonLocation())
|
||||
}
|
||||
|
||||
return { sasLoc, nodeLoc, pythonLoc }
|
||||
if (process.runTimes.includes(RunTimeType.R)) {
|
||||
rscriptLoc = RSCRIPT_PATH ?? (await getRScriptLocation())
|
||||
}
|
||||
|
||||
return { sasLoc, nodeLoc, pythonLoc, rscriptLoc }
|
||||
}
|
||||
|
||||
const getDriveLocation = async (): Promise<string> => {
|
||||
@@ -117,3 +121,25 @@ const getPythonLocation = async (): Promise<string> => {
|
||||
|
||||
return targetName
|
||||
}
|
||||
|
||||
const getRScriptLocation = async (): Promise<string> => {
|
||||
const validator = async (filePath: string) => {
|
||||
if (!filePath) return 'Path to RScript executable is required.'
|
||||
|
||||
if (!(await fileExists(filePath))) {
|
||||
return 'No file found at provided path.'
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
const defaultLocation = isWindows() ? 'C:\\Rscript' : '/usr/bin/Rscript'
|
||||
|
||||
const targetName = await getString(
|
||||
'Please enter full path to a Rscript executable: ',
|
||||
validator,
|
||||
defaultLocation
|
||||
)
|
||||
|
||||
return targetName
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@ import { RunTimeType } from '.'
|
||||
|
||||
export const getRunTimeAndFilePath = async (programPath: string) => {
|
||||
const ext = path.extname(programPath)
|
||||
// If programPath (_program) is provided with a ".sas", ".js" or ".py" extension
|
||||
// If programPath (_program) is provided with a ".sas", ".js", ".py" or ".r" extension
|
||||
// we should use that extension to determine the appropriate runTime
|
||||
if (ext && Object.values(RunTimeType).includes(ext.slice(1) as RunTimeType)) {
|
||||
const runTime = ext.slice(1)
|
||||
|
||||
@@ -29,12 +29,14 @@ export const setProcessVariables = async () => {
|
||||
process.sasLoc = process.env.SAS_PATH
|
||||
process.nodeLoc = process.env.NODE_PATH
|
||||
process.pythonLoc = process.env.PYTHON_PATH
|
||||
process.rscriptLoc = process.env.RSCRIPT_PATH
|
||||
} else {
|
||||
const { sasLoc, nodeLoc, pythonLoc } = await getDesktopFields()
|
||||
const { sasLoc, nodeLoc, pythonLoc, rscriptLoc } = await getDesktopFields()
|
||||
|
||||
process.sasLoc = sasLoc
|
||||
process.nodeLoc = nodeLoc
|
||||
process.pythonLoc = pythonLoc
|
||||
process.rscriptLoc = rscriptLoc
|
||||
}
|
||||
|
||||
const { SASJS_ROOT } = process.env
|
||||
|
||||
@@ -157,3 +157,30 @@ export const generateFileUploadPythonCode = async (
|
||||
|
||||
return uploadCode
|
||||
}
|
||||
|
||||
/**
|
||||
* Generates the R code that references uploaded files in the concurrent request
|
||||
* @param filesNamesMap object that maps hashed file names and original file names
|
||||
* @param sessionFolder name of the folder that is created for the purpose of files in concurrent request
|
||||
* @returns generated python code
|
||||
*/
|
||||
export const generateFileUploadRCode = async (
|
||||
filesNamesMap: FilenamesMap,
|
||||
sessionFolder: string
|
||||
) => {
|
||||
let uploadCode = ''
|
||||
let fileCount = 0
|
||||
|
||||
const sessionFolderList: string[] = await listFilesInFolder(sessionFolder)
|
||||
sessionFolderList.forEach(async (fileName) => {
|
||||
if (fileName.includes('req_file')) {
|
||||
fileCount++
|
||||
uploadCode += `\n._WEBIN_FILENAME${fileCount} <- '${filesNamesMap[fileName].originalName}'`
|
||||
uploadCode += `\n._WEBIN_NAME${fileCount} <- '${filesNamesMap[fileName].fieldName}'`
|
||||
}
|
||||
})
|
||||
|
||||
uploadCode += `\n._WEBIN_FILE_COUNT <- ${fileCount}`
|
||||
|
||||
return uploadCode
|
||||
}
|
||||
|
||||
@@ -34,7 +34,8 @@ export enum LOG_FORMAT_MORGANType {
|
||||
export enum RunTimeType {
|
||||
SAS = 'sas',
|
||||
JS = 'js',
|
||||
PY = 'py'
|
||||
PY = 'py',
|
||||
R = 'r'
|
||||
}
|
||||
|
||||
export enum ReturnCode {
|
||||
@@ -253,7 +254,8 @@ const verifyRUN_TIMES = (): string[] => {
|
||||
|
||||
const verifyExecutablePaths = () => {
|
||||
const errors: string[] = []
|
||||
const { RUN_TIMES, SAS_PATH, NODE_PATH, PYTHON_PATH, MODE } = process.env
|
||||
const { RUN_TIMES, SAS_PATH, NODE_PATH, PYTHON_PATH, RSCRIPT_PATH, MODE } =
|
||||
process.env
|
||||
|
||||
if (MODE === ModeType.Server) {
|
||||
const runTimes = RUN_TIMES?.split(',')
|
||||
@@ -269,6 +271,10 @@ const verifyExecutablePaths = () => {
|
||||
if (runTimes?.includes(RunTimeType.PY) && !PYTHON_PATH) {
|
||||
errors.push(`- PYTHON_PATH is required for ${RunTimeType.PY} run time`)
|
||||
}
|
||||
|
||||
if (runTimes?.includes(RunTimeType.R) && !RSCRIPT_PATH) {
|
||||
errors.push(`- RSCRIPT_PATH is required for ${RunTimeType.R} run time`)
|
||||
}
|
||||
}
|
||||
|
||||
return errors
|
||||
|
||||
@@ -16,7 +16,9 @@ export enum ModeType {
|
||||
|
||||
export enum RunTimeType {
|
||||
SAS = 'sas',
|
||||
JS = 'js'
|
||||
JS = 'js',
|
||||
PY = 'py',
|
||||
R = 'r'
|
||||
}
|
||||
|
||||
interface AppContextProps {
|
||||
|
||||
Reference in New Issue
Block a user