Source code for pyrseus.core.pickle

"""
Helpers for writing `~concurrent.futures.Executor` classes that need to interact
with `pickle`-based serialization.

There are two main use cases:

- To change the serializer to a more powerful one like |cloudpickle|_. For
  examples, see the source code of `pyrseus.executors.cpprocess`. It is a
  wrapper for `~concurrent.futures.ProcessPoolExecutor` that uses |cloudpickle|_
  instead of `pickle` for task and result serialization.

- To help users troubleshoot serialization problems. For examples, see the
  source code of:

  - `pyrseus.executors.pinline`: serializes tasks and results in the same thread
    and process as the ``submit`` call, using `pickle`.

  - `pyrseus.executors.cpinline`:  serializes tasks and results in the same
    thread and process as the ``submit`` call, using |cloudpickle|_.
"""

import inspect
import pickle
import sys
from functools import cache
from threading import Lock
from typing import Any, Callable, Dict, Tuple, TypeVar

Ret = TypeVar("Ret")
"""
Represents the generic return type of a submitted callable.
"""


class _HideMain:
    """
    Helper for `.try_pickle_round_trip` that temporarily replaces
    ``sys.modules['__main__']``, causing any attribute access to raise an
    exception. This can make troubleshooting pickling problems easier.
    """

    def __init__(self, hide_from):
        self._hide_from = hide_from

    def __getattr__(self, name):
        raise RuntimeError(
            f"Blocked attempt to read {name} from __main__ while pickling "
            f"{self._hide_from}."
        )


_hide_main_lock = Lock()
"""
Protects against concurrent calls to `.try_pickle_round_trip` from permanently
corrupting ``sys.modules``.
"""


[docs] def try_pickle_round_trip( obj, *, dumps=pickle._dumps, protocol=-1, loads=pickle._loads, hide_main=True ): """ Attempts to pickle and unpickle ``obj`` using ``dumps`` and ``loads``, to help with testing and/or troubleshooting. Example ------- Here's an example of an object that can be pickled and unpickled without trouble: >>> try_pickle_round_trip(42) 42 Here's one that acts like it's picklable and unpicklable: >>> # simulate defining a trivial function in __main__, as happens >>> # for scripts and notebooks. >>> exec( ... 'def _func_in_main_for_try_pickle_round_trip(): return 42', ... sys.modules['__main__'].__dict__, ... ) >>> # retrieve it >>> func = sys.modules['__main__']._func_in_main_for_try_pickle_round_trip >>> # technically, it's picklable and unpicklable, as long as the >>> # unpickling happens in the same process: >>> pickle.loads(pickle.dumps(func, -1))() 42 But if the unpickling happens in another process that doesn't replicate the original's ``__main__`` module, then unpickling will fail, often with a confusing error message. But with this function, we can simulate those unpickling problems, but at pickling time when they're easier to troubleshoot: >>> # Here it fails with a slightly better error message, and by >>> # default using the pure Python pickler that's easier to debug >>> # than the C one. >>> try_pickle_round_trip(func) Traceback (most recent call last): ... File ...pickle.py... in save_global ... RuntimeError: Blocked attempt to read _func_in_main_for_try_pickle_round_trip from __main__ while pickling <function _func_in_main_for_try_pickle_round_trip at ...>. >>> # Clean up >>> del sys.modules['__main__']._func_in_main_for_try_pickle_round_trip .. warning:: When ``hide_main=True``, this function mutates global state by temporarily replacing ``sys.modules['__main__']``. This function is threadsafe with respect to itself, but if some other concurrent code requires access to the ``__main__`` module *via* ``sys.modules``, then they may see unexpected exceptions. Such situations are rare. :param obj: the object whose picklability is being tested :param protocol: pickle protocol number to use. This is passed to ``dumps`` as a keyword argument. :param dumps: a function that serializes objects to a pickle bytestring. This defaults to the pure Python `pickle._dumps` function. It is slower than the C `pickle.dumps` function, but it's easier to troubleshoot with a Python debugger. :param loads: a function that loads objects from a pickle bytestring. This defaults to the pure Python `pickle._loads` function. It is slower than the C `pickle.loads` function, but it's easier to troubleshoot with a Python debugger. :param hide_main: whether to hide the ``__main__`` module during the pickle operation. This is done by temporarily replacing ``sys.modules['__main__']`` with a proxy object that raises an exception on any attribute access. This makes it much easier to detect and troubleshoot pickling problems that arise from objects that belong to a script or notebook instead of a Python module, without needing to perform the unpickling in a separate process where it's harder to debug. """ if hide_main: with _hide_main_lock: old_main = sys.modules["__main__"] try: sys.modules["__main__"] = _HideMain(obj) pickled = dumps(obj, protocol=protocol) reconstructed = loads(pickled) finally: sys.modules["__main__"] = old_main else: pickled = dumps(obj, protocol=protocol) reconstructed = loads(pickled) return reconstructed
[docs] @cache def get_round_trip_keywords(): """ Helper for creating executors that wrap `try_pickle_round_trip` and/or `call_with_round_trip_pickling`. >>> sorted(get_round_trip_keywords()) ['dumps', 'hide_main', 'loads', 'protocol'] """ return { p.name for p in inspect.signature(try_pickle_round_trip).parameters.values() if p.kind is inspect.Parameter.KEYWORD_ONLY }
[docs] def call_with_round_trip_pickling(func, args, kwargs, **round_trip_kwargs): """ Calls ``func(*args, **kwargs)``, but (a) running those three things through `try_pickle_round_trip` first, and then (b) running the result through `try_pickle_round_trip` too. This function is designed for serial executors that help detect picklability problems in submitted tasks. :param func: function to call after round-trip pickling it :param args: positional arguments to pass to ``func`` after round trip pickling :param kwargs: keyword arguments to pass to ``func`` after round trip pickling :param round_trip_kwargs: passed to `try_pickle_round_trip` to override the round trip pickling settings :return: the return value of ``func(*args, **kwargs)``, after doing the two round trip pickling tests """ r_func, r_args, r_kwargs = try_pickle_round_trip( (func, args, kwargs), **round_trip_kwargs ) orig_ret = r_func(*r_args, **r_kwargs) ret = try_pickle_round_trip(orig_ret, **round_trip_kwargs) return ret
[docs] class OncePickledObject: """ Creates a wrapper around an arbitrary Python object that forces it to be pickled with the chosen pickler, but a copy of the original unwrapped object is returned at unpickling time. This wrapper does not provide any proxying of attributes or methods. It is only intended to be used when users know the object will be pickled and unpickled exactly once before use. >>> opo123 = OncePickledObject(123, pickle.dumps, pickle.loads) >>> opo123 <pyrseus.core.pickle.OncePickledObject object at ...> >>> pickle.loads(pickle.dumps(opo123, -1)) 123 """
[docs] def __init__(self, obj, dumps, loads): self._obj = obj self._dumps = dumps self._loads = loads
[docs] def __reduce_ex__(self, protocol: int): # Pickle the object we're wrapping, using the chosen pickler, returning # a bytestring. pickled = self._dumps(self._obj, protocol=protocol) # Tell the pickler that's calling us that it can reconstruct the # original unwrapped object by calling the chosen unpickler on the # bytestring created from the original object we wrapped. return self._loads, (pickled,)
[docs] class CustomPickledClosure:
[docs] def __init__( self, func: Callable, args: Tuple, kwargs: Dict, dumps: Callable[..., bytes], loads: Callable[[bytes], Any], ): """ Wraps a function and its arguments into a nullary closure that forces a chosen pickler to be used for the closure and its return value. - The closed function will remain wrapped by this class when it undergoes round trip pickling. - The return value of the closed function will be wrapped in a `.OncePickledObject` that uses the same ``dumps`` and ``loads`` functions as the wrapper. This is useful for wrapping functions that will be submitted to executors like `~concurrent.futures.ProcessPoolExecutor` with workers in other processes. This forces the chosen pickler to be used for serialization, instead of whatever default pickler the executor normally uses. :param func: the function whose pickling behavior we're overriding :param args: the positional arguments to ``func`` that we're closing over :param kwargs: the keyword arguments to ``func`` that we're closing over :param dumps: a `pickle.dumps`-like function that will be used for serializing the closure contents when this wrapper is pickled. This function must accept a ``protocol`` keyword argument. :param loads: a `pickle.loads`-like function that will be used for deserializing the closure contents when the pickled form of this wrapper is unpickled. To avoid bootstrapping problems, this function must itself be picklable by the built-in `pickle.dumps`. """ self._func = func self._args = args self._kwargs = kwargs self._dumps = dumps self._loads = loads
[docs] def __reduce_ex__(self, protocol: int): # First, we need to pickle up the unpickler. To avoid bootstrapping # problems if the unpickling happens in another process, this is done # with pickle.dumps itself. pickled_loads = pickle.dumps(self._loads, protocol=protocol) # Now pickle up the whole closure, using the requested serializer. For # efficiency, we don't re-serialize self._loads args = [self._func, self._args, self._kwargs, self._dumps] dumped_args = self._dumps(args, protocol=protocol) # This instructs the unpickler to use our special factory that undoes # the extra nested pickling. return type(self)._hydrate, (pickled_loads, dumped_args)
@classmethod def _hydrate(cls, pickled_loads: bytes, dumped_args: bytes): loads = pickle.loads(pickled_loads) hydrated_args = loads(dumped_args) hydrated_args.append(loads) return cls(*hydrated_args)
[docs] def __call__(self): # Call the underlying (reconstructed) function. raw_ret = self._func(*self._args, *self._kwargs) # Return a wrapper object that will produce a copy of the underlying # object when pickled and unpickled. return OncePickledObject(raw_ret, self._dumps, self._loads)