-
Christophe Guillon authoredChristophe Guillon authored
task_thread.js 8.58 KiB
/**
*
* Implements an interruptible Task and Task utility functions
* on top of worker threads with timeout (ref to `WorkerTimeout`).
*
* @module task_thread
*/
"use strict";
const assert = require("assert");
const { isMainThread, parentPort, workerData } = require("worker_threads");
const { WorkerTimeout } = require("./worker_timeout.js");
const { console } = require("./log_console.js");
/**
* Task is an asynchroneous and interruptible task (termination or timeout).
*
* It calls back the given `nodeback(err, res)` with:
*
* - if `err` is not `null`, an error/terminate/timeout occured and
* `err` contains:
* - `.taskname`: the task name passed in the constructor
* - `.exit_code`: a non 0 exit code, either the exit code from the task itself
* if it caused an error or was exited with `process.exit(code)` with `code`
* not 0, or 1 if it was terminated from the parent with the `terminate()`
* method, or due to the timeout (ref to `.timeout` below)
* - `.error`: the error string
* - `.timeout`: a boolean specifying if the task was terminated due to
* the timeout passed in the constructor
* - otherwise, on succesful completion `res` contains:
* - `.taskname`: the task name passed in the constructor
* - `.result`: the task result as passed fron the task through `postTaskResult(result)`
* or `undefined` if the task does not generate result
*
* Use it for instance as shown below:
*
* const nodeback = (err, res) => {
* if (err && err.timeout) {
* console.log(`Timeout`);
* } else if (err) {
* console.log(`Error (${err.exit_code}): ${err.error}`);
* } else {
* console.log(`Result:${res.result}`);
* }
* var task = Task("TASK1", "task1", [1, 2], "tasks.js", { timeout: 1000 }, nodeback)
* task.run()
*
* Where for intance in `tasks.js` we have:
*
* ...
* assert(isTask);
* const data = getTaskData();
* if (data.funcname == "task1") {
* const res = task1(...data.funcargs);
* postTaskResult(res);
* } else {
* throw new Error(`Unexpeted task function name: ${data.funcname}`)
* }
*
*/
class Task {
/**
* Task construction, no execution happens at this point.
*
* @param {string} taskname name of the task for identification in the returned response/error
* @param {string} funcname name of the function to call,
* it is up to the task filename to interpret this argument
* @param {Array} funcargs arguments to pass to the funciton (or `[]` or `undefined`),
* it is up to the task filename to interpret this argument
* @param {string} filename file to execute for the task (this file must get the `funcname`/`funcargs`
* from the `getTaskData()` object, execute the request and optionally return a result
* with `postTaskResult(result)`
* @param {object} [options] passed to the underlying `WorkerTimeout` object,
* for instance: `{ timeout: 1000 }` for setting a task termination timeout of 1 sec
* @param {callback} [nodeback] the user callback in nodeback style, i.e. called as `nodeback(err, res)`,
* see the class usage above for details
*/
constructor(taskname, funcname, funcargs, filename, options, nodeback) {
this.taskname = taskname;
this.funcname = funcname;
this.funcargs = funcargs;
this.filename = filename;
this.options = options;
this.nodeback = nodeback;
}
/**
* Run the Task
*
* After calling this functionm the task should be considered running in a backgrounbd thread.
* The user callback may be called as soon as the event loop is available.
* A running task can be interrupted from the launcher either due to:
*
* - the `timeout` defined in the constructors `options`
* - a call to `terminate()` on this object
*
* Otherwise, the task itself may terminate from the executed `filename` either due to:
*
* - a succesful end of the execution
* - an error while executing
* - an explicit call to `process.exit(code)`
*
*/
run() {
assert.ok(this.worker === undefined, "Task.run() called twice");
this.worker = new WorkerTimeout(this.filename, {
...this.options,
name: this.taskname,
workerData: {
taskname: this.taskname,
funcname: this.funcname,
funcargs: this.funcargs
}});
this.taskId = `${this.taskname}-${this.worker.threadId}`;
console.debug(`START WORKER: ${this.taskId}: func_name: ${this.funcname}, funcargs: ${this.funcargs}`);
this.res = { taskname: this.taskname, result: undefined };
this.err = { taskname: this.taskname, exit_code: undefined, error: undefined, timeout: false };
this.worker.on("message", resp => {
console.debug(`RESULT: WORKER: ${this.taskId}: result received: ${resp}`);
this.res.result = resp;
});
this.worker.on("error", err => {
console.debug(`ERROR: WORKER: ${this.taskId}: message received: ${err}`);
this.err.error = err;
});
this.worker.on("exit", code => {
if (this.err.exit_code === undefined) {
this.err.exit_code = code;
}
if (this.worker.is_timeout()) {
this.err.timeout = true;
this.err.error = `timeout: after ${this.options.timeout} msecs`;
} else if (this.err.exit_code != 0 && this.err.error === undefined) {
this.err.error = `terminated or unknown error`;
}
if (this.err.timeout) {
console.debug(`TIMEOUT: WORKER: ${this.taskId}: worker terminated after ${this.options.timeout} msecs.`);
} else {
console.debug(`EXIT: WORKER: ${this.taskId} exited with code ${code}.`);
}
if (this.nodeback) {
if (this.err.exit_code == 0) {
this.nodeback(null, this.res);
} else {
this.nodeback(this.err, null);
}
}
});
}
/**
* Terminate the task
*
* Terminates a running task with optional exit code and error message.
*
* @param {number} [exit_code] optional exit code to pass to the task `err`, or 1 if undefined.
* When 0 the task will be seen as succesful from the parent
* @param {string} [error] optional error string if `exit_code` is not 0
*/
terminate(exit_code, error) {
assert.ok(this.worker !== undefined, "Task.terminate() called while task is not running");
this.err.exit_code = exit_code
this.err.error = error
this.worker.terminate()
}
}
/**
* Create a task and call its `run()` method.
* This function is actually a shortcut for:
*
* var task = Task(...taskArgs);
* task.run();
*
* Optionally one may promisify this function and run a task as follow:
*
* const util = require("util");
* const resolved = (res) => { console.log(`Result: ${res.result}`); };
* const rejected = (err) => { console.log(`Error: ${err.error}`); };
* const promise = util.promisify(runTask);
* await promise(...taskArgs).then(resolved, rejected);
*
* @return {Task} the created task
*
*/
function runTask(taskname, funcname, funcargs, filename, options, nodeback) {
var task = new Task(taskname, funcname, funcargs, filename, options, nodeback);
task.run();
return task;
}
/**
* True only in the main thread (i.e. not in a Task).
*/
const isMain = isMainThread;
/**
* True only in a task thread (i.e. not in the main thread).
*/
const isTask = !isMainThread;
/**
* Gets the task input data which contains at least:
*
* - `.taskname`: the task name as passed to the `Task` constructor
* - `.funcname`: the function name as passed to the `Task` constructor
* - `.funcargs`: the funciton args as passed to the `Task` constructor
*
*/
function getTaskData() {
assert.ok(isTask, "getTaskData() must be called from a Task");
return workerData;
}
/**
* Returns the task result to the parent, can be anything including undefined
* if no result is generated. Refer to the class documentation above for details.
*/
function postTaskResult(res) {
assert.ok(isTask, "postTaskResult(res) must be called from a Task");
return parentPort.postMessage(res);
}
module.exports = { Task, runTask, isMain, isTask, getTaskData, postTaskResult };