PYTHON ADVANCED: PROFESSIONAL ENGINEERING MASTERY / L07MULTIPROCESSING: TRUE PARALLELISM IN PYTHON
课程 · 12 · 07 / 12
LESSON 07 · ADVANCED · 55 MIN · ◆ 2 INSTRUMENTS

Multiprocessing: True Parallelism in Python

Bypass the GIL with multiprocessing. Use process pools, manage shared state safely, and understand inter-process communication.

TIP

Learning Objectives: After this lesson, you'll understand how to bypass the GIL with multiprocessing, use process pools efficiently, manage shared state safely, and implement inter-process communication patterns.

Why Multiprocessing?

Multiprocessing creates separate Python processes, each with its own GIL, enabling true parallelism.

FIG. 02Flow Diagram
DIAGRAM
LOADING INSTRUMENT
Fig. 02Flow diagrams, timelines, and process visualizations

Multiprocessing vs Threading

In real multiprocessing, each process gets its own memory space, its own Python interpreter, and its own GIL. That is what allows true parallel execution of CPU-bound work.

AspectThreadingMultiprocessing
MemorySharedSeparate
GILShared (blocked)Separate (free)
OverheadLowHigher
CommunicationDirectIPC needed
Best forI/O-boundCPU-bound
Crash isolationShared riskIsolated

Basic Process Creation

FIG. 04Python Code Executor
INTERACTIVE
LOADING INSTRUMENT
Fig. 04Interactive Python code execution environment

Process Pools

Process pools manage a fixed number of worker processes for efficient task distribution.

FIG. 06Flow Diagram
DIAGRAM
LOADING INSTRUMENT
Fig. 06Flow diagrams, timelines, and process visualizations

Using Pool

FIG. 08Python Code Executor
INTERACTIVE
LOADING INSTRUMENT
Fig. 08Interactive Python code execution environment

Pool Methods

multiprocessing.Pool provides several methods for distributing work:

MethodBlocking?ReturnsNotes
pool.map(func, iterable)YesOrdered listLike [func(x) for x in iterable]
pool.map_async(func, iterable)NoAsyncResultCall .get() to retrieve results
pool.apply(func, args)YesSingle valueLike func(*args)
pool.apply_async(func, args)NoAsyncResultUse a callback for result handling
pool.starmap(func, iterable)YesOrdered listUnpacks arguments: func(a, b) from [(a1,b1), ...]
pool.imap(func, iterable)LazyIteratorMemory efficient for large datasets
pool.imap_unordered(func, iterable)LazyIteratorYields results as completed (not in order, faster)

A representative pattern using several of these methods together:

from multiprocessing import Pool def process_item(item): return item * 2 if __name__ == "__main__": with Pool(processes=4) as pool: # Blocking map results = pool.map(process_item, range(10)) # Async with callback def handle_result(result): print(f"Got result: {result}") async_result = pool.apply_async( process_item, (42,), callback=handle_result ) async_result.wait() # Starmap for multiple arguments pairs = [(1, 2), (3, 4), (5, 6)] def add(a, b): return a + b sums = pool.starmap(add, pairs) # [3, 7, 11]

Shared State and Communication

Processes don't share memory by default. Here's how to share data.

FIG. 10Flow Diagram
DIAGRAM
LOADING INSTRUMENT
Fig. 10Flow diagrams, timelines, and process visualizations

Shared Memory Values

multiprocessing provides Value and Array for shared memory. This pattern needs real OS processes, so read it rather than run it:

from multiprocessing import Process, Value, Array import ctypes # Shared counter counter = Value('i', 0) # 'i' = signed int counter = Value(ctypes.c_int, 0) # equivalent # Shared array arr = Array('d', [1.0, 2.0, 3.0]) # 'd' = double def increment_counter(counter, n): for _ in range(n): with counter.get_lock(): counter.value += 1 def modify_array(arr, idx, value): arr[idx] = value if __name__ == "__main__": processes = [ Process(target=increment_counter, args=(counter, 1000)) for _ in range(4) ] for p in processes: p.start() for p in processes: p.join() print(f"Counter: {counter.value}") # 4000

The type code passed to Value/Array selects the C type backing the shared memory:

CodeC typeCodeC type
'b'signed char'l'signed long
'B'unsigned char'L'unsigned long
'i'signed int'f'float
'I'unsigned int'd'double

Manager for Complex Shared Objects

A Manager creates shared objects (proxies) that multiple processes can modify. Because it spawns a manager process, run it as a script rather than in the sandbox:

from multiprocessing import Manager, Process def worker(shared_dict, shared_list, worker_id): """Worker that modifies shared data""" shared_dict[f"worker_{worker_id}"] = f"result_{worker_id}" shared_list.append(worker_id) if __name__ == "__main__": with Manager() as manager: # Manager-created objects are proxies to shared state shared_dict = manager.dict() shared_list = manager.list() processes = [ Process(target=worker, args=(shared_dict, shared_list, i)) for i in range(4) ] for p in processes: p.start() for p in processes: p.join() print(f"Dict: {dict(shared_dict)}") print(f"List: {list(shared_list)}")

Expected output:

Dict: {'worker_0': 'result_0', 'worker_1': 'result_1', ...} List: [0, 1, 2, 3]

A Manager supports proxies for dict(), list(), Value(), Array(), Namespace(), the lock primitives (Lock(), RLock(), Semaphore()), and the synchronization types (Queue(), Event(), Condition()).

Queues for Communication

Queues provide safe inter-process communication. A producer/consumer setup with a sentinel value to signal completion looks like this (run it as a script):

from multiprocessing import Process, Queue import time def producer(queue, items): """Produce items into queue""" for item in items: print(f"Producing: {item}") queue.put(item) time.sleep(0.1) queue.put(None) # Sentinel def consumer(queue, name): """Consume items from queue""" while True: item = queue.get() if item is None: queue.put(None) # Pass sentinel break print(f"{name}: Got {item}") if __name__ == "__main__": queue = Queue() producer_p = Process(target=producer, args=(queue, list(range(5)))) consumers = [ Process(target=consumer, args=(queue, f"Consumer-{i}")) for i in range(2) ] producer_p.start() for c in consumers: c.start() producer_p.join() for c in consumers: c.join()

multiprocessing offers three queue variants:

TypeDescription
Queue()Standard FIFO queue
SimpleQueue()Simplified, faster queue (no task_done/join)
JoinableQueue()Queue with task_done() and join()

Pipes for Two-Way Communication

A Pipe returns two connection objects for direct two-way messaging between processes:

from multiprocessing import Process, Pipe def sender(conn, messages): """Send messages through pipe""" for msg in messages: conn.send(msg) conn.send(None) # Signal completion conn.close() def receiver(conn): """Receive and process messages""" while True: msg = conn.recv() if msg is None: break print(f"Received: {msg}") conn.close() if __name__ == "__main__": # Pipe returns two connection objects parent_conn, child_conn = Pipe() sender_p = Process(target=sender, args=(parent_conn, ["Hello", "World", "!"])) receiver_p = Process(target=receiver, args=(child_conn,)) sender_p.start() receiver_p.start() sender_p.join() receiver_p.join()

By default a pipe is duplex (Pipe(duplex=True)) so both ends can send and receive. With Pipe(duplex=False), conn1 only receives and conn2 only sends.

MethodDescription
conn.send(obj)Send a pickled object
conn.recv()Receive an object (blocks)
conn.poll()Check if data is available
conn.close()Close the connection

Practical Patterns

Map-Reduce Pattern

FIG. 12Flow Diagram
DIAGRAM
LOADING INSTRUMENT
Fig. 12Flow diagrams, timelines, and process visualizations
FIG. 14Python Code Executor
INTERACTIVE
LOADING INSTRUMENT
Fig. 14Interactive Python code execution environment

Parallel Pipeline

A pipeline chains processing stages, each a separate process connected by queues (an ETL-style extract → transform → load). Run this as a script:

from multiprocessing import Process, Queue def stage1_extract(input_queue, output_queue): """Extract: Read and parse data""" while True: item = input_queue.get() if item is None: output_queue.put(None) break # Parse/extract result = {"raw": item, "extracted": item.upper()} output_queue.put(result) def stage2_transform(input_queue, output_queue): """Transform: Process data""" while True: item = input_queue.get() if item is None: output_queue.put(None) break # Transform item["transformed"] = len(item["extracted"]) output_queue.put(item) def stage3_load(input_queue, results): """Load: Store results""" while True: item = input_queue.get() if item is None: break results.append(item) if __name__ == "__main__": from multiprocessing import Manager q1 = Queue() # raw -> stage1 q2 = Queue() # stage1 -> stage2 q3 = Queue() # stage2 -> stage3 with Manager() as manager: results = manager.list() # Start pipeline stages p1 = Process(target=stage1_extract, args=(q1, q2)) p2 = Process(target=stage2_transform, args=(q2, q3)) p3 = Process(target=stage3_load, args=(q3, results)) p1.start() p2.start() p3.start() # Feed data for item in ["hello", "world", "python"]: q1.put(item) q1.put(None) # Signal end # Wait for completion p1.join() p2.join() p3.join() print(list(results))

Expected output:

[{'raw': 'hello', 'extracted': 'HELLO', 'transformed': 5}, {'raw': 'world', 'extracted': 'WORLD', 'transformed': 5}, {'raw': 'python', 'extracted': 'PYTHON', 'transformed': 6}]

Worker Pool with Callbacks

FIG. 16Python Code Executor
INTERACTIVE
LOADING INSTRUMENT
Fig. 16Interactive Python code execution environment

Best Practices

  1. Always guard with if __name__ == "__main__": — required on Windows and macOS, and it prevents recursive process spawning.

  2. Use context managers for pools so they are cleaned up automatically:

    with Pool() as pool: results = pool.map(func, data)
  3. Prefer ProcessPoolExecutor for simple cases — cleaner API, better exception handling, and compatible with asyncio.

  4. Be mindful of data serialization — everything passed to processes must be picklable, large data means slow serialization, and you should consider shared memory for large arrays.

  5. Use appropriate chunk sizes — too small and overhead dominates, too large and you get poor load balancing: pool.map(func, data, chunksize=100).

  6. Handle exceptions properly — exceptions in workers can be silent, so always check future.result() and use try/except in worker functions.

  7. Clean up resources — close queues and pipes, terminate zombie processes, and use pool.terminate() for cleanup.

  8. Consider memory usage — each process holds a full memory copy, so use generators for large datasets and imap() for memory-efficient iteration.

When to use what:

ApproachUse for
ThreadingI/O-bound tasks (network, file, database); need shared memory (carefully); low overhead required
MultiprocessingCPU-bound tasks (calculations); need true parallelism; memory isolation beneficial
asyncio (next lesson)Many concurrent I/O operations; event-driven programming; single-threaded concurrency

Key Takeaways

ConceptDescription
ProcessSeparate Python interpreter, own GIL
PoolReusable worker processes
ProcessPoolExecutorModern, clean pool interface
Value/ArrayShared memory primitives
ManagerShared complex objects (dict, list)
QueueThread/process-safe communication
PipeTwo-way process communication

Multiprocessing vs Alternatives

ScenarioBest Choice
CPU-heavy calculationMultiprocessing
Network I/OThreading or asyncio
File I/OThreading
Many small tasksPool with chunksize
Large shared dataSharedMemory (Python 3.8+)
NumPy operationsNumPy (already parallel)

Next Steps

In the next lesson, we'll explore Async/Await Fundamentals—understand event loops, write coroutines, use asyncio for concurrent I/O, and build async patterns for production code.


Ready for non-blocking Python? Async/await awaits!


Further Reading

Official Docs

Tutorials

Modern Parallel Python

  • joblib — the parallelism library used by scikit-learn. Cleaner API than raw multiprocessing for embarrassingly-parallel work.
  • ray — distributed Python. Scales from one machine to a cluster with @ray.remote.
  • dask — parallel computing for analytics; familiar pandas/numpy APIs.
  • mpire — modern multiprocessing wrapper with progress bars and worker reuse.

Common Pitfalls

Books

  • Book: Fluent Python (2nd ed.) — Chapter 20 ("Concurrent Executors").
  • Book: High Performance Python — Gorelick & Ozsvald (2nd ed., 2020). Covers when multiprocessing actually helps vs. just adding overhead.
相关概念
multiprocessingparallelismprocess-poolshared-memory