Skip to main content

2 posts tagged with "python"

View All Tags

Unraveling the Hang in Python's Multiprocessing Pool

· 4 min read

Python's multiprocessing.Pool is an incredibly powerful tool for parallelizing tasks, but it can occasionally lead to frustrating "hangs" that are notoriously difficult to debug. This article explores common scenarios where Pool might get stuck and provides robust solutions to prevent such issues.

The Silent Killer: Abnormal Child Process Termination

Consider the following seemingly innocuous program:

import sys
import multiprocessing as mp

def run(rank):
if rank == 0:
print("hi from rank 0", flush=True)
else:
sys.exit(1) # Exits abnormally for rank != 0

with mp.Pool(processes=2) as p:
p.map(run, [(rank,) for rank in range(2)])

When a child process exits abnormally (e.g., via sys.exit(1) as seen above), the Pool internally waits indefinitely for results from that process. Since the process terminated unexpectedly, those results will never arrive, leading to a hang. The map function blocks until all results are collected, creating a deadlock.

Solution: Leveraging map_async with a Timeout

A common and effective strategy to mitigate indefinite hangs is to use Pool's asynchronous API, map_async, in conjunction with a timeout. This allows you to set an upper limit on how long you're willing to wait for results.

import sys
import multiprocessing as mp

def run(rank):
if rank == 0:
print("hi from rank 0", flush=True)
else:
sys.exit(1)

with mp.Pool(processes=2) as p:
async_result = p.map_async(run, [(rank,) for rank in range(2)])
try:
result = async_result.get(timeout=5)
except mp.TimeoutError:
print("Timed out while waiting for results.")
finally:
p.terminate() # forcefully terminates worker processes
p.join() # waits for worker processes to exit

Key Improvements:

  • map_async: This function returns an AsyncResult object immediately, allowing your main program to continue execution.
  • async_result.get(timeout=5): This is crucial. It attempts to retrieve the results but will raise a multiprocessing.TimeoutError if the results aren't available within 5 seconds.
  • p.terminate(): Unlike p.close(), which waits for all tasks to complete before shutting down, p.terminate() immediately stops the worker processes. This is essential when you've hit a timeout or detected an issue.
  • p.join(): This waits until the worker processes have actually exited. It's good practice to call join() after terminate() to ensure a clean shutdown.

When terminate() Isn't Enough: The Stubborn Process

Even with p.terminate(), you might encounter scenarios where a child process remains unresponsive. This often happens when the process is executing low-level code (like C extensions) that doesn't immediately respond to signals. Consider this example:

import time
import signal
import multiprocessing as mp

def run(rank):
if rank == 0:
print("hi from rank 0", flush=True)
else:
# Simulate a process stuck in a blocking operation (e.g., C extension, I/O)
# We explicitly ignore SIGTERM to demonstrate stubbornness
signal.signal(signal.SIGTERM, lambda signum, frame: None)
time.sleep(600) # Long sleep to simulate blocking I/O

with mp.Pool(processes=2) as p:
async_result = p.map_async(run, [(rank,) for rank in range(2)])
try:
result = async_result.get(timeout=5)
except mp.TimeoutError:
print("Timed out, but process might still be stuck.")
finally:
p.terminate()
p.join()

Why it still hangs (or appears to): p.terminate() sends SIGTERM (signal 15) to its child processes. However, a process can choose to ignore SIGTERM, as demonstrated by signal.signal(signal.SIGTERM, lambda signum, frame: None). If the process is deep within a blocking C extension or a system call, it might not even have the opportunity to process the signal, effectively making it unresponsive.

The Last Resort: Forceful Termination with SIGKILL

In situations where SIGTERM fails to stop a process, the only remaining option is to send SIGKILL (signal 9). This signal cannot be caught or ignored by the process, forcing its immediate termination.

While multiprocessing.Pool doesn't directly expose a mechanism to send SIGKILL to individual worker processes, you can implement a more robust shutdown mechanism by tracking child process PIDs and terminating them explicitly if p.join() times out.

The key takeaway is that while multiprocessing.Pool is convenient, highly resilient applications dealing with potentially unstable or long-running child processes might benefit from a more explicit process management strategy using multiprocessing.Process objects directly, which allows for individual process tracking and more aggressive termination if necessary.

Patching with Python's Import System

· 2 min read

In many scenarios, we need to apply patches to third-party libraries in Python. A common approach is to use "monkey patching." However, monkey patching is not a perfect solution because it dynamically changes attributes after a module has been imported. Sometimes, the module being modified might have already been imported before the changes take effect, causing the monkey patch to not work as expected.

We need to find a way to modify modules as early as possible. A better method is to leverage Python's import system to achieve this. For detailed documentation on Python's import system, please refer to the official documentation. In short, Python imports a module in three steps:

  1. Search for the module using a Finder.
  2. Create the module using a Loader.
  3. Bind the module in the current namespace.

In step 1, we can hook into sys.meta_path to create a custom finder, which can return a different module specification (module spec) based on a given module name. In step 2, we can create a new loader for a specific module, which replaces certain attributes (functions, classes, variables) of the module before the created module is returned.

Therefore, with this approach, we can replace an entire module or its attributes when the module is first imported. Since sys.modules acts as a cache, each module is created only once. Consequently, after a module is modified, it will never change again, which is exactly what we expect.

Below is an example code:

import importlib
import importlib.abc
import importlib.util
import sys

ALIAS_MAP = {"d": "patch.d"}
REPLACE_MAP = {"pkg.a.a_1": {"a_1_func1": ("patch.pkg.a.a_1", "a_1_func1_wrapper")}}


class AttrReplacingLoader(importlib.abc.Loader):
def __init__(self, original_loader, replace_map):
self.original_loader = original_loader
self.replace_map = replace_map

def create_module(self, spec):
if hasattr(self.original_loader, "create_module"):
return self.original_loader.create_module(spec)
return None

def exec_module(self, module):
self.original_loader.exec_module(module)
to_replace = self.replace_map[module.__spec__.name]
for attr_name, (new_module_name, new_attr_name) in to_replace.items():
new_module = importlib.import_module(new_module_name)
if hasattr(module, attr_name):
setattr(module, attr_name, getattr(new_module, new_attr_name)(getattr(module, attr_name)))


class PatchModuleFinder(importlib.abc.MetaPathFinder):
def __init__(self, alias_map, replace_map):
self.alias_map = alias_map
self.replace_map = replace_map

def find_spec(self, fullname, path, target=None):
if fullname in self.alias_map:
original_name = self.alias_map[fullname]
original_spec = importlib.util.find_spec(original_name)
if original_spec:
return original_spec

if fullname in self.replace_map:
if self in sys.meta_path:
sys.meta_path.remove(self)
original_spec = None
try:
original_spec = importlib.util.find_spec(fullname, path)
finally:
if self not in sys.meta_path:
sys.meta_path.insert(0, self)

if original_spec and original_spec.loader:
return importlib.util.spec_from_loader(
fullname,
AttrReplacingLoader(original_spec.loader, self.replace_map),
origin=original_spec.origin,
is_package=original_spec.submodule_search_locations is not None,
)
return None

sys.meta_path.insert(0, PatchModuleFinder(ALIAS_MAP, REPLACE_MAP))