{ "cells": [ { "cell_type": "markdown", "id": "52cc8fc5", "metadata": { "editable": true, "slideshow": { "slide_type": "" }, "tags": [ "pyflyby-cell" ] }, "source": [ "# Debugging Tasks with Pyrseus\n", "\n", "In this notebook, we show how Pyrseus can help troubleshoot problems with\n", "tasks, especially problems with pickling." ] }, { "cell_type": "markdown", "id": "9857872f", "metadata": {}, "source": [ "## Setup\n", "\n", "Before running this notebook, make sure everything it depends on is installed:\n", "\n", " # Modify the docs/requirements.txt path if you're running this\n", " # command from anything except the repository's root directory.\n", " python -m pip install -r docs/requirements.txt\n", "\n", "These are some imports that we'll use throughout this notebook:" ] }, { "cell_type": "code", "execution_count": null, "id": "73723476", "metadata": {}, "outputs": [], "source": [ "import sys\n", "\n", "sys.path.append(\"../../../src\") # assume we're running it from a Pyrseus source clone\n", "import pickle\n", "import random\n", "from concurrent.futures import ProcessPoolExecutor\n", "from multiprocessing import get_context\n", "\n", "import cloudpickle\n", "\n", "from pyrseus import CpProcessPoolExecutor, InlineExecutor, PInlineExecutor\n", "from pyrseus.core.pickle import call_with_round_trip_pickling, try_pickle_round_trip" ] }, { "cell_type": "markdown", "id": "1842f15d", "metadata": { "lines_to_next_cell": 2 }, "source": [ "And here's a simple custom function we'll be experimenting with. It works like\n", "`sorted`, but it uses the (slow) Selection Sort algorithm." ] }, { "cell_type": "code", "execution_count": null, "id": "1f7b6480", "metadata": {}, "outputs": [], "source": [ "def selection_sort(data):\n", " \"\"\"\n", " Simple selection sort.\n", "\n", " Adapted from: https://en.wikipedia.org/wiki/Selection_sort\n", " \"\"\"\n", " # Make a shallow copy of the data so that this can be a\n", " # non-mutating function.\n", " ret = list(data)\n", "\n", " # Now perform the selection sort.\n", " for i in range(len(data)):\n", " j_min = i\n", " for j in range(i + 1, len(data)):\n", " if ret[j] < ret[j_min]:\n", " j_min = j\n", " if j_min != i:\n", " ret[j_min], ret[i] = ret[i], ret[j_min]\n", "\n", " return ret" ] }, { "cell_type": "markdown", "id": "d03c35d7", "metadata": {}, "source": [ "## Non-executor Usage\n", "\n", "Let's first try out the function by calling it directly with a few\n", "hand-crafted test cases." ] }, { "cell_type": "code", "execution_count": null, "id": "554a60af", "metadata": { "lines_to_next_cell": 2 }, "outputs": [], "source": [ "for data in (\n", " (),\n", " (1,),\n", " (1, 2),\n", " (1, 2, 3),\n", " (1, 2, 3, 4),\n", " (1, 2, 3, 4, 5),\n", " (1, 5, 2, 4, 3),\n", " (5, 4, 3, 2, 1),\n", " (4, 3, 2, 1),\n", " (3, 2, 1),\n", "):\n", " expected = sorted(data)\n", " actual = selection_sort(data)\n", " assert actual == expected, (data, actual, expected)\n", " print(f\"{str(data):<15s} -> {actual}\")" ] }, { "cell_type": "markdown", "id": "f5b20bbf", "metadata": { "lines_to_next_cell": 2 }, "source": [ "Let's also create a randomized test helper function and run it on a few\n", "different inputs." ] }, { "cell_type": "code", "execution_count": null, "id": "4bc2456a", "metadata": {}, "outputs": [], "source": [ "def sorting_test_with_big_random_list(seed, n=1000, min_int=0, max_int=500):\n", " # Make this test repeatable.\n", " random.seed(seed)\n", " # Generate some random data.\n", " data = [random.randint(min_int, max_int) for _ in range(n)]\n", " # Sort with our method.\n", " actual = selection_sort(data)\n", " # Sort with a known good implementation.\n", " expected = sorted(data)\n", " # Tell whether the two match.\n", " return actual == expected\n", " if actual != expected:\n", " raise ValueError(f\"Results for: {seed=}, {n=}, {min_int=}, {max_int=}\")\n", "\n", "\n", "assert sorting_test_with_big_random_list(0)\n", "assert sorting_test_with_big_random_list(42)" ] }, { "cell_type": "markdown", "id": "689d7ed0", "metadata": {}, "source": [ "## Failures with ProcessPoolExecutor\n", "\n", "Now suppose we want to run that test helper many times in parallel, using\n", "`ProcessPoolExecutor`. Unfortunately, we quickly run into trouble. Depending\n", "on your Python version, your platform, and exactly what was submitted, this\n", "will result in at least one of the following:\n", " - dead workers (undesirable),\n", " - workers printing messages to stderr (undesirable),\n", " - `BrokenProcessPool` exceptions (a symptom),\n", " - exceptions talking about `'__main__'` (a symptom),\n", " - exceptions talking about pickling (the real problem), and/or\n", " - exceptions talking about unpickling (a symptom).\n", "\n", "For the sake of this notebook, we force it to use the most widely supported\n", "pool type (`\"spawn\"`). At least on Python 3.10--3.12, this type also results\n", "in the most verbose and confusing output." ] }, { "cell_type": "code", "execution_count": null, "id": "fb0f7fcc", "metadata": {}, "outputs": [], "source": [ "# In this cell, we first encounter a problem running our function in some\n", "# multiprocessing workers.\n", "try:\n", " print(\n", " \"This test should print out many stderr lines from the workers, \"\n", " \"and ultimately fail.\"\n", " )\n", " sys.stdout.flush()\n", "\n", " with ProcessPoolExecutor(4, mp_context=get_context(\"spawn\")) as exe:\n", " futs = [\n", " exe.submit(sorting_test_with_big_random_list, seed) for seed in range(25)\n", " ]\n", " for seed, fut in enumerate(futs):\n", " if not fut.result():\n", " print(f\"Seed {seed} failed.\")\n", "\n", "except Exception as ex:\n", " sys.stderr.flush()\n", " print(\"CAUGHT EXCEPTION (expected):\", ex)\n", "else:\n", " sys.stderr.flush()\n", " raise RuntimeError(\"An exception should have been thrown.\")" ] }, { "cell_type": "markdown", "id": "7c0f2aaa", "metadata": {}, "source": [ "## Troubleshooting with Serial Executors\n", "\n", "In cases where the exception message doesn't make it clear to the user what to\n", "do, a common strategy is to run the code serially." ] }, { "cell_type": "code", "execution_count": null, "id": "37e3dd2e", "metadata": {}, "outputs": [], "source": [ "# In this cell, we'll try reproducing the problem with an InlineExecutor, since\n", "# that's often a good first thing to try. We fail to do so: this cell works\n", "# fine.\n", "with InlineExecutor() as exe:\n", " futs = [exe.submit(sorting_test_with_big_random_list, seed) for seed in range(25)]\n", " for seed, fut in enumerate(futs):\n", " if not fut.result():\n", " print(f\"Seed {seed} failed.\")" ] }, { "cell_type": "markdown", "id": "fc0c34d1", "metadata": {}, "source": [ "Unfortunately, the above snippet doesn't reproduce the problem. Let's assume\n", "this led to us doing some more experiments and/or web searches, making us\n", "think this could be related to pickling and/or unpickling.\n", "\n", "At this point, we may try using `PInlineExecutor`, since it advertises itself\n", "as a tool for troubleshooting pickling problems. And indeed we now have a\n", "reproducer." ] }, { "cell_type": "code", "execution_count": null, "id": "da8ec6e5", "metadata": {}, "outputs": [], "source": [ "# In this cell, we have successfully replicated the problem with a serial\n", "# executor that performs a pickling test for each task.\n", "try:\n", " with PInlineExecutor() as exe:\n", " futs = [\n", " exe.submit(sorting_test_with_big_random_list, seed) for seed in range(25)\n", " ]\n", " for seed, fut in enumerate(futs):\n", " if not fut.result():\n", " print(f\"Seed {seed} failed.\")\n", "\n", "except Exception as ex:\n", " print(\"CAUGHT EXCEPTION (expected):\", ex)" ] }, { "cell_type": "markdown", "id": "bf834bf8", "metadata": {}, "source": [ "Additionally, this test was done with the pure Python pickler, so we could\n", "even trace into it with ``ipdb`` if we want." ] }, { "cell_type": "code", "execution_count": null, "id": "43f1d93e", "metadata": {}, "outputs": [], "source": [ "# If you'd like to try debugging it yourself, then\n", "# (a) remove the try-except wrapper around the previous cell,\n", "# (b) uncomment the %debug line from this cell, and\n", "# (c) run both cells, one at a time.\n", "\n", "# %debug" ] }, { "cell_type": "markdown", "id": "35de3482", "metadata": {}, "source": [ "## Testing Cloudpickle Serially\n", "\n", "At this point, we have figured out it's a picklability problem. The error\n", "message suggests that we're using a function that's defined in ``__main__``\n", "instead of in an imported module. Additionally, we've hopefully heard about\n", "the `cloudpickle` library being a solution to this kind of problem.\n", "\n", "We could test this last hypothesis in a few ways. First, let's verify whether\n", "`cloudpickle` works at all on our function. It does:" ] }, { "cell_type": "code", "execution_count": null, "id": "bd548623", "metadata": {}, "outputs": [], "source": [ "# cloudpickle says it can handle our function, so we have a chance.\n", "pickled = cloudpickle.dumps(sorting_test_with_big_random_list, -1)\n", "reconstructed = cloudpickle.loads(pickled)\n", "assert reconstructed(0)" ] }, { "cell_type": "markdown", "id": "5d897007", "metadata": {}, "source": [ "That said, we probably shouldn't trust `cloudpickle` that much since `pickle`\n", "thought it could handle our function too (and technically it can, but only\n", "when we don't send the pickled bytestring to another process for unpickling).\n", "\n", "Fortunately, Pyrseus ships with a simple test function that simulates this\n", "situation. First let's show that we can replicate the problem with it when\n", "using `pickle`." ] }, { "cell_type": "code", "execution_count": null, "id": "1c33e4c5", "metadata": {}, "outputs": [], "source": [ "# First, make sure that try_pickle_round_trip can replicate our problem when using pickle.\n", "try:\n", " try_pickle_round_trip(\n", " sorting_test_with_big_random_list,\n", " dumps=pickle.dumps, # Reproduce the problem by using the built-in pickler\n", " loads=pickle.loads,\n", " hide_main=True, # The default is true. We include it here for emphasis.\n", " )\n", "except Exception:\n", " print(\"try_pickle_round_trip successfully replicated the problem with pickle.\")\n", "else:\n", " raise RuntimeError(\"try_pickle_round_trip failed to replicate the problem.\")" ] }, { "cell_type": "markdown", "id": "8d4e7855", "metadata": {}, "source": [ "Now let's try using that function to see it thinks `cloudpickle` will fix our\n", "problems." ] }, { "cell_type": "code", "execution_count": null, "id": "6559432b", "metadata": {}, "outputs": [], "source": [ "# Indeed, try_pickle_round_trip tells us that if we use cloudpickle, then our\n", "# pickling problems will likely go away.\n", "reconstructed = try_pickle_round_trip(\n", " sorting_test_with_big_random_list,\n", " dumps=cloudpickle.dumps, # Fix the problem by using cloudpickle instead of pickle\n", " loads=cloudpickle.loads,\n", ")\n", "assert reconstructed(0)" ] }, { "cell_type": "markdown", "id": "de24ddbe", "metadata": {}, "source": [ "We might also try using an even more complete tester that internally:\n", "- runs `try_pickle_round_trip` on the function (similar to above),\n", "- calls the function (so we see if the call itself is a problem), and\n", "- runs `try_pickle_round_trip` on the function result (in case there's a\n", " picklability problem with it)." ] }, { "cell_type": "code", "execution_count": null, "id": "4219f414", "metadata": {}, "outputs": [], "source": [ "# call_with_round_trip_pickling also thinks that everything's good if we\n", "# switch to using cloudpickle.\n", "assert call_with_round_trip_pickling(\n", " sorting_test_with_big_random_list,\n", " args=(0,),\n", " kwargs={},\n", " dumps=cloudpickle.dumps,\n", " loads=cloudpickle.loads,\n", ")" ] }, { "cell_type": "markdown", "id": "a86fd319", "metadata": {}, "source": [ "## Trying a Cloudpickle-enabled Executor\n", "\n", "So now let's try some `cloudpickle`-enabled executors.\n", "\n", "First, we see that `CpProcessPoolExecutor` works fine. It's just a thin\n", "wrapper around `ProcessPoolExecutor` that uses `cloudpickle` for pickling\n", "tasks and their results." ] }, { "cell_type": "code", "execution_count": null, "id": "1aa4bfe4", "metadata": {}, "outputs": [], "source": [ "# CpProcessPoolExecutor works!\n", "with CpProcessPoolExecutor(4, mp_context=get_context(\"spawn\")) as exe:\n", " futs = [exe.submit(sorting_test_with_big_random_list, seed) for seed in range(25)]\n", " for seed, fut in enumerate(futs):\n", " if not fut.result():\n", " print(f\"Seed {seed} failed.\")" ] }, { "cell_type": "markdown", "id": "04cbcf4f", "metadata": {}, "source": [ "Now, we're done debugging. We know that we just need to make sure we use a\n", "`cloudpickle`-enabled executor like `CpProcessPoolExecutor`." ] } ], "metadata": { "jupytext": { "formats": "ipynb,py:percent" }, "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" } }, "nbformat": 4, "nbformat_minor": 5 }