Source code for pyrseus.executors.inline

"""
Provides a simple serial executor that captures exceptions in the standard way.

This is useful:

- for running executor-based code serially without having to rewrite all of the
  control flow, and
- making it easy to use a debugger to trace into task functions.
"""

from concurrent.futures import Executor, Future
from typing import Callable, TypeVar

__all__ = ["InlineExecutor"]

# Represents the generic return type of a submitted callable.
Ret = TypeVar("Ret")


[docs] class InlineExecutor(Executor):
[docs] def __init__(self): """ An `~concurrent.futures.Executor` that evaluates the task immediately upon submission, trapping exceptions like normal executors do. Summary ------- - *Common Use Cases:* - Light workloads: this executor is useful for avoiding concurrency overhead when running small batches of tasks. This lets developers avoid the alternative of rewriting all of their control flow to not use executors at all, just to get serial execution. - Troubleshooting: since tasks are executed immediately and within the same thread, tracing through the task code in a debugger is trivially easy. - *Concurrency:* This is a non-concurrent, serial-only executor. All tasks are immediately run in the same process and thread they were submitted in. - *Exceptions:* This executor has standard exception-handling semantics: all task-related exceptions are captured in the task's future. - *Default max_workers:* Not applicable. - *Pickling:* This executor does not perform any pickling. Details ------- As with the only built-in within-process executor, `~concurrent.futures.ThreadPoolExecutor`, arbitrary callables can be used, including non-picklable ones like lambdas: >>> import os >>> with InlineExecutor() as exe: ... non_picklable = lambda: os.getpid() # can't be pickled ... fut = exe.submit(non_picklable) # but works anyway ... worker_pid_is_my_pid = fut.result() # no pickling error ... assert os.getpid() == worker_pid_is_my_pid See :doc:`../executors` for a list of related executors. """ self._closing = False
[docs] def submit(self, fcn: Callable[..., Ret], /, *args, **kwargs) -> Future[Ret]: """ Immediately evaluates ``fcn(*args, **kwargs)`` and embeds the result in a `~concurrent.futures.Future`. As with standard executors, any exceptions that occur are caught and recorded in that returned `~concurrent.futures.Future`. """ # This implementation is the same as NoCatchExecutor.submit, except this # one captures all exceptions in the returned # `~concurrent.futures.Future` object, like the standard # `~concurrent.futures` executors do. # To be consistent with `~concurrent.futures.ProcessPoolExecutor`, # disallow new submissions once shutdown has started. if self._closing: raise RuntimeError( "Submissions are not allowed to an executor that is shutting down." ) fut: Future[Ret] = Future() try: # Eagerly execute the function. This makes it easy for users to use # `pdb.set_trace` and similar troubleshooting techniques to debug # `fcn`. result = fcn(*args, **kwargs) except BaseException as exc: fut.set_exception(exc) else: fut.set_result(result) return fut
[docs] def shutdown(self, *args, **kwargs): """ As a minor improvement on the base class' method, this override disallows submissions after a shutdown has been started. This can assist with finding bugs in user code. """ self._closing = True super().shutdown(*args, **kwargs)