Mentions légales du service

Skip to content
Snippets Groups Projects
Verified Commit 4cff0d37 authored by ANDREY Paul's avatar ANDREY Paul
Browse files

Refactor 'run_as_processes' backend code.

parent 861d559b
No related branches found
No related tags found
No related merge requests found
......@@ -22,7 +22,7 @@ import multiprocessing as mp
import sys
import traceback
from queue import Queue
from typing import Any, Callable, Dict, List, Tuple, Union
from typing import Any, Callable, Dict, Iterable, List, Tuple, Union
__all__ = [
"run_as_processes",
......@@ -38,19 +38,18 @@ def run_as_processes(
Parameters
----------
*routines: tuple(function, tuple(any, ...))
Sequence of routines that need running concurrently,
each formatted as a 2-elements tuple containing the
function to run, and a tuple storing its arguments.
Sequence of routines that need running concurrently, each
formatted as a 2-elements tuple containing the function
to run, and a tuple storing its arguments.
auto_stop: bool, default=True
Whether to automatically interrupt all running routines
as soon as one failed and raised an exception. This can
avoid infinite runtime (e.g. if one awaits for a failed
routine to send information), but may also prevent some
exceptions from being caught due to the early stopping
of routines that would have failed later. Hence it may
be disabled in contexts where it is interesting to wait
for all routines to fail rather than assume that they
are co-dependent.
Whether to automatically interrupt all running routines as
soon as one failed and raised an exception. This can avoid
infinite runtime (e.g. if one awaits for a failed routine
to send information), but may also prevent some exceptions
from being caught due to the early stopping of routines that
would have failed later. Hence it may be disabled in contexts
where it is interesting to wait for all routines to fail rather
than assume that they are co-dependent.
Returns
-------
......@@ -64,7 +63,39 @@ def run_as_processes(
# Wrap routines into named processes and set up exceptions catching.
queue = (
mp.Manager().Queue()
) # type: Queue[Tuple[str, Union[Any, RuntimeError]]]
) # type: Queue # Queue[Tuple[str, Union[Any, RuntimeError]]] (py >=3.9)
processes, names = prepare_routine_processes(routines, queue)
# Run the processes concurrently.
run_processes(processes, auto_stop)
# Return success flag and re-ordered outputs and exceptions.
success = all(process.exitcode == 0 for process in processes)
dequeue = dict([queue.get_nowait() for _ in range(queue.qsize())])
int_err = RuntimeError("Process was interrupted while running.")
outputs = [dequeue.get(name, int_err) for name in names]
return success, outputs
def prepare_routine_processes(
routines: Iterable[Tuple[Callable[..., Any], Tuple[Any, ...]]],
queue: Queue, # Queue[Tuple[str, Union[Any, RuntimeError]]] (py >=3.9)
) -> Tuple[List[mp.Process], List[str]]:
"""Wrap up routines into named unstarted processes.
Parameters
----------
routines:
Iterators of (function, args) tuples to wrap as processes.
queue:
Queue where to put the routines' return value or raised exception
(always wrapped into a RuntimeError), together with their name.
Returns
-------
processes:
List of `multiprocessing.Process` instances wrapping `routines`.
names:
List of names identifying the processes (used for results collection).
"""
names = [] # type: List[str]
count = {} # type: Dict[str, int]
processes = [] # type: List[mp.Process]
......@@ -75,46 +106,24 @@ def run_as_processes(
func = add_exception_catching(func, queue, name)
names.append(name)
processes.append(mp.Process(target=func, args=args, name=name))
# Run the processes concurrently.
try:
# Start all processes.
for process in processes:
process.start()
# Regularly check for any failed process and exit if so.
while any(process.is_alive() for process in processes):
if auto_stop and any(process.exitcode for process in processes):
break
# Wait for at most 1 second on the first alive process.
for process in processes:
if process.is_alive():
process.join(timeout=1)
break
# Ensure not to leave processes running in the background.
finally:
for process in processes:
if process.is_alive():
process.terminate()
# Return success flag and re-ordered outputs and exceptions.
success = all(process.exitcode == 0 for process in processes)
dequeue = dict([queue.get_nowait() for _ in range(queue.qsize())])
int_err = RuntimeError("Process was interrupted while running.")
outputs = [dequeue.get(name, int_err) for name in names]
return success, outputs
return processes, names
def add_exception_catching(
func: Callable[..., Any],
queue: Queue,
queue: Queue, # Queue[Tuple[str, Union[Any, RuntimeError]]] (py >=3.9)
name: str,
) -> Callable[..., Any]:
"""Wrap a function to catch exceptions and put them in a Queue."""
return functools.partial(wrapped, func=func, queue=queue, name=name)
return functools.partial(
_run_with_exception_catching, func=func, queue=queue, name=name
)
def wrapped(
def _run_with_exception_catching(
*args: Any,
func: Callable[..., Any],
queue: Queue,
queue: Queue, # Queue[Tuple[str, Union[Any, RuntimeError]]] (py >=3.9)
name: str,
) -> Any:
"""Call the wrapped function and catch exceptions or results."""
......@@ -130,3 +139,28 @@ def wrapped(
else:
queue.put((name, result))
sys.exit(0)
def run_processes(
processes: List[mp.Process],
auto_stop: bool,
) -> None:
"""Run parallel processes, optionally interrupting all if any fails."""
try:
# Start all processes.
for process in processes:
process.start()
# Regularly check for any failed process and exit if so.
while any(process.is_alive() for process in processes):
if auto_stop and any(process.exitcode for process in processes):
break
# Wait for at most 1 second on the first alive process.
for process in processes:
if process.is_alive():
process.join(timeout=1)
break
# Ensure not to leave processes running in the background.
finally:
for process in processes:
if process.is_alive():
process.terminate()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment