Mentions légales du service

Skip to content
Snippets Groups Projects
task_thread.js 8.58 KiB
/**
 *
 * Implements an interruptible Task and Task utility functions
 * on top of worker threads with timeout (ref to `WorkerTimeout`).
 *
 * @module task_thread
 */

"use strict";

const assert = require("assert");
const { isMainThread, parentPort, workerData } = require("worker_threads");
const { WorkerTimeout } = require("./worker_timeout.js");
const { console } = require("./log_console.js");


/**
 * Task is an asynchroneous and interruptible task (termination or timeout).
 *
 * It calls back the given `nodeback(err, res)` with:
 *
 * - if `err` is not `null`, an error/terminate/timeout occured and
 *   `err` contains:
 *     - `.taskname`: the task name passed in the constructor
 *     - `.exit_code`: a non 0 exit code, either the exit code from the task itself
 *       if it caused an error or was exited with `process.exit(code)` with `code`
 *       not 0, or 1 if it was terminated from the parent with the `terminate()`
 *       method, or due to the timeout (ref to `.timeout` below)
 *     - `.error`: the error string
 *     - `.timeout`: a boolean specifying if the task was terminated due to
 *       the timeout passed in the constructor
 * - otherwise, on succesful completion `res` contains:
 *     - `.taskname`: the task name passed in the constructor
 *     -  `.result`: the task result as passed fron the task through `postTaskResult(result)`
 *      or `undefined` if the task does not generate result
 *
 * Use it for instance as shown below:
 *
 *     const nodeback = (err, res) => {
 *       if (err && err.timeout) {
 *         console.log(`Timeout`);
 *       } else if (err) {
 *         console.log(`Error (${err.exit_code}): ${err.error}`);
 *       } else {
 *         console.log(`Result:${res.result}`);
 *       }
 *     var task = Task("TASK1", "task1", [1, 2], "tasks.js", { timeout: 1000 }, nodeback)
 *     task.run()
 *
 * Where for intance in `tasks.js` we have:
 *
 *     ...
 *     assert(isTask);
 *     const data = getTaskData();
 *     if (data.funcname == "task1") {
 *       const res = task1(...data.funcargs);
 *       postTaskResult(res);
 *     } else {
 *       throw new Error(`Unexpeted task function name: ${data.funcname}`)
 *     }
 *
 */
class Task {
    /**
     * Task construction, no execution happens at this point.
     *
     * @param {string} taskname name of the task for identification in the returned response/error
     * @param {string} funcname name of the function to call,
     *                 it is up to the task filename to interpret this argument
     * @param {Array} funcargs arguments to pass to the funciton (or `[]` or `undefined`),
     *                it is up to the task filename to interpret this argument
     * @param {string} filename file to execute for the task (this file must get the `funcname`/`funcargs`
     *                 from the `getTaskData()` object, execute the request and optionally return a result
     *                 with `postTaskResult(result)`
     * @param {object} [options] passed to the underlying `WorkerTimeout` object,
     *        for instance: `{ timeout: 1000 }` for setting a task termination timeout of 1 sec
     * @param {callback} [nodeback] the user callback in nodeback style, i.e. called as `nodeback(err, res)`,
     *                              see the class usage above for details
     */
    constructor(taskname, funcname, funcargs, filename, options, nodeback) {
        this.taskname = taskname;
        this.funcname = funcname;
        this.funcargs = funcargs;
        this.filename = filename;
        this.options = options;
        this.nodeback = nodeback;
    }

    /**
     * Run the Task
     *
     * After calling this functionm the task should be considered running in a backgrounbd thread.
     * The user callback may be called as soon as the event loop is available.
     * A running task can be interrupted from the launcher either due to:
     *
     * - the `timeout` defined in the constructors `options`
     * - a call to `terminate()` on this object
     *
     * Otherwise, the task itself may terminate from the executed `filename` either due to:
     *
     * - a succesful end of the execution
     * - an error while executing
     * - an explicit call to `process.exit(code)`
     *
     */
    run() {
        assert.ok(this.worker === undefined, "Task.run() called twice");
        this.worker = new WorkerTimeout(this.filename, {
            ...this.options,
            name: this.taskname,
            workerData: {
                taskname: this.taskname,
                funcname: this.funcname,
                funcargs: this.funcargs
            }});
        this.taskId = `${this.taskname}-${this.worker.threadId}`;
        console.debug(`START WORKER: ${this.taskId}: func_name: ${this.funcname}, funcargs: ${this.funcargs}`);
        this.res = { taskname: this.taskname, result: undefined };
        this.err = { taskname: this.taskname, exit_code: undefined, error: undefined, timeout: false };
        this.worker.on("message", resp => {
            console.debug(`RESULT: WORKER: ${this.taskId}: result received: ${resp}`);
            this.res.result = resp;
        });
        this.worker.on("error", err =>  {
            console.debug(`ERROR: WORKER: ${this.taskId}: message received: ${err}`);
            this.err.error = err;
        });
        this.worker.on("exit", code => {
            if (this.err.exit_code === undefined) {
                this.err.exit_code = code;
            }
            if (this.worker.is_timeout()) {
                this.err.timeout = true;
                this.err.error = `timeout: after ${this.options.timeout} msecs`;
            } else  if (this.err.exit_code != 0 && this.err.error === undefined) {
                this.err.error = `terminated or unknown error`;
            }
            if (this.err.timeout) {
                console.debug(`TIMEOUT: WORKER: ${this.taskId}: worker terminated after ${this.options.timeout} msecs.`);
            } else {
                console.debug(`EXIT: WORKER: ${this.taskId} exited with code ${code}.`);
            }
            if (this.nodeback) {
                if (this.err.exit_code == 0) {
                    this.nodeback(null, this.res);
                } else {
                    this.nodeback(this.err, null);
                }
            }
        });
    }

    /**
     * Terminate the task
     *
     * Terminates a running task with optional exit code and error message.
     *
     * @param {number} [exit_code] optional exit code to pass to the task `err`, or 1 if undefined.
     *                             When 0 the task will be seen as succesful from the parent
     * @param {string} [error] optional error string if `exit_code` is not 0
     */
    terminate(exit_code, error) {
        assert.ok(this.worker !== undefined, "Task.terminate() called while task is not running");
        this.err.exit_code = exit_code
        this.err.error = error
        this.worker.terminate()
    }
}


/**
 * Create a task and call its `run()` method.
 * This function is actually a shortcut for:
 *
 *     var task = Task(...taskArgs);
 *     task.run();
 *
 * Optionally one may promisify this function and run a task as follow:
 *
 *     const util = require("util");
 *     const resolved = (res) =>  { console.log(`Result: ${res.result}`); };
 *     const rejected = (err) =>  { console.log(`Error: ${err.error}`); };
 *     const promise = util.promisify(runTask);
 *     await promise(...taskArgs).then(resolved, rejected);
 *
 * @return {Task} the created task
 *
*/
function runTask(taskname, funcname, funcargs, filename, options, nodeback) {
    var task = new Task(taskname, funcname, funcargs, filename, options, nodeback);
    task.run();
    return task;
}


/**
 * True only in the main thread (i.e. not in a Task).
 */
const isMain = isMainThread;


/**
 * True only in a task thread (i.e. not in the main thread).
 */
const isTask = !isMainThread;


/**
 * Gets the task input data which contains at least:
 *
 * - `.taskname`: the task name as passed to the `Task` constructor
 * - `.funcname`: the function name as passed to the `Task` constructor
 * - `.funcargs`: the funciton args as passed to the `Task` constructor
 *
 */
function getTaskData() {
    assert.ok(isTask, "getTaskData() must be called from a Task");
    return workerData;
}


/**
 * Returns the task result to the parent, can be anything including undefined
 * if no result is generated. Refer to the class documentation above for details.
 */
function postTaskResult(res) {
    assert.ok(isTask, "postTaskResult(res) must be called from a Task");
    return parentPort.postMessage(res);
}


module.exports = { Task, runTask, isMain, isTask, getTaskData, postTaskResult };