Short Answer
Pool's chunksize-algorithm is a heuristic. It provides a simple solution for all imaginable problem scenarios you are trying to stuff into Pool's methods. As a consequence, it cannot be optimized for any specific scenario.
The algorithm arbitrarily divides the iterable in approximately four times more chunks than the naive approach. More chunks mean more overhead, but increased scheduling flexibility. How this answer will show, this leads to a higher worker-utilization on average, but without the guarantee of a shorter overall computation time for every case.
"That's nice to know" you might think, "but how does knowing this help me with my concrete multiprocessing problems?" Well, it doesn't. The more honest short answer is, "there is no short answer", "multiprocessing is complex" and "it depends". An observed symptom can have different roots, even for similar scenarios.
This answer tries to provide you with basic concepts helping you to get a clearer picture of Pool's scheduling black box. It also tries to give you some basic tools at hand for recognizing and avoiding potential cliffs as far they are related to chunksize.
Table of Contents
Part I
- Definitions
- Parallelization Goals
- Parallelization Scenarios
- Risks of Chunksize > 1
- Pool's Chunksize-Algorithm
Quantifying Algorithm Efficiency
6.1 Models
6.2 Parallel Schedule
6.3 Efficiencies
6.3.1 Absolute Distribution Efficiency (ADE)
6.3.2 Relative Distribution Efficiency (RDE)
Part II
- Naive vs. Pool's Chunksize-Algorithm
- Reality Check
- Conclusion
It is necessary to clarify some important terms first.
1. Definitions
Chunk
A chunk here is a share of the iterable
-argument specified in a pool-method call. How the chunksize gets calculated and what effects this can have, is the topic of this answer.
Task
A task's physical representation in a worker-process in terms of data can be seen in the figure below.
The figure shows an example call to pool.map()
, displayed along a line of code, taken from the multiprocessing.pool.worker
function, where a task read from the inqueue
gets unpacked. worker
is the underlying main-function in the MainThread
of a pool-worker-process. The func
-argument specified in the pool-method will only match the func
-variable inside the worker
-function for single-call methods like apply_async
and for imap
with chunksize=1
. For the rest of the pool-methods with a chunksize
-parameter the processing-function func
will be a mapper-function (mapstar
or starmapstar
). This function maps the user-specified func
-parameter on every element of the transmitted chunk of the iterable (--> "map-tasks"). The time this takes, defines a task also as a unit of work.
Taskel
While the usage of the word "task" for the whole processing of one chunk is matched by code within multiprocessing.pool
, there is no indication how a single call to the user-specified func
, with one
element of the chunk as argument(s), should be referred to. To avoid confusion emerging from naming conflicts (think of maxtasksperchild
-parameter for Pool's __init__
-method), this answer will refer to
the single units of work within a task as taskel.
A taskel (from task + element) is the smallest unit of work within a task.
It is the single execution of the function specified with the func
-parameter of a Pool
-method, called with arguments obtained from a single element of the transmitted chunk.
A task consists of chunksize
taskels.
Parallelization Overhead (PO)
PO consists of Python-internal overhead and overhead for inter-process communication (IPC). The per-task overhead within Python comes with the code needed for packaging and unpacking the tasks and its results. IPC-overhead comes with the necessary synchronization of threads and the copying of data between different address spaces (two copy steps needed: parent -> queue -> child). The amount of IPC-overhead is OS-, hardware- and data-size dependent, what makes generalizations about the impact difficult.
2. Parallelization Goals
When using multiprocessing, our overall goal (obviously) is to minimize total processing time for all tasks. To reach this overall goal, our technical goal needs to be optimizing the utilization of hardware resources.
Some important sub-goals for achieving the technical goal are:
- minimize parallelization overhead (most famously, but not alone: IPC)
- high utilization across all cpu-cores
- keeping memory usage limited to prevent the OS from excessive paging (trashing)
At first, the tasks need to be computationally heavy (intensive) enough, to earn back the PO we have to pay for parallelization. The relevance of PO decreases with increasing absolute computation time per taskel. Or, to put it the other way around, the bigger the absolute computation time per taskel for your problem, the less relevant gets the need for reducing PO. If your computation will take hours per taskel, the IPC overhead will be negligible in comparison. The primary concern here is to prevent idling worker processes after all tasks have been distributed. Keeping all cores loaded means, we are parallelizing as much as possible.
3. Parallelization Scenarios
What factors determine an optimal chunksize argument to methods like multiprocessing.Pool.map()
The major factor in question is how much computation time may vary across our single taskels. To name it, the choice for an optimal chunksize is determined by the Coefficient of Variation (CV) for computation times per taskel.
The two extreme scenarios on a scale, following from the extent of this variation are:
- All taskels need exactly the same computation time.
- A taskel could take seconds or days to finish.
For better memorability, I will refer to these scenarios as:
- Dense Scenario
- Wide Scenario
Dense Scenario
In a Dense Scenario it would be desirable to distribute all taskels at once, to keep necessary IPC and context switching at a minimum. This means we want to create only as much chunks, as much worker processes there are. How already stated above, the weight of PO increases with shorter computation times per taskel.
For maximal throughput, we also want all worker processes busy until all tasks are processed (no idling workers). For this goal, the distributed chunks should be of equal size or close to.
Wide Scenario
The prime example for a Wide Scenario would be an optimization problem, where results either converge quickly or computation can take hours, if not days. Usually it is not predictable what mixture of "light taskels" and "heavy taskels" a task will contain in such a case, hence it's not advisable to distribute too many taskels in a task-batch at once. Distributing less taskels at once than possible, means increasing scheduling flexibility. This is needed here to reach our sub-goal of high utilization of all cores.
If Pool
methods, by default, would be totally optimized for the Dense Scenario, they would increasingly create suboptimal timings for every problem located closer to the Wide Scenario.
4. Risks of Chunksize > 1
Consider this simplified pseudo-code example of a Wide Scenario-iterable, which we want to pass into a pool-method:
good_luck_iterable = [60, 60, 86400, 60, 86400, 60, 60, 84600]
Instead of the actual values, we pretend to see the needed computation time in seconds, for simplicity only 1 minute or 1 day.
We assume the pool has four worker processes (on four cores) and chunksize
is set to 2
. Because the order will be kept, the chunks send to the workers will be these:
[(60, 60), (86400, 60), (86400, 60), (60, 84600)]
Since we have enough workers and the computation time is high enough, we can say, that every worker process will get a chunk to work on in the first place. (This does not have to be the case for fast completing tasks). Further we can say, the whole processing will take about 86400+60 seconds, because that's the highest total computation time for a chunk in this artificial scenario and we distribute chunks only once.
Now consider this iterable, which has only one element switching its position compared to the previous iterable:
bad_luck_iterable = [60, 60, 86400, 86400, 60, 60, 60, 84600]
...and the corresponding chunks:
[(60, 60), (86400, 86400), (60, 60), (60, 84600)]
Just bad luck with the sorting of our iterable nearly doubled (86400+86400) our total processing time! The worker getting the vicious (86400, 86400)-chunk is blocking the second heavy taskel in its task from getting distributed to one of the idling workers already finished with their (60, 60)-chunks. We obviously would not risk such an unpleasant outcome if we set chunksize=1
.
This is the risk of bigger chunksizes. With higher chunksizes we trade scheduling flexibility for less overhead and in cases like above, that's a bad deal.
How we will see in chapter 6. Quantifying Algorithm Efficiency, bigger chunksizes can also lead to suboptimal results for Dense Scenarios.
5. Pool's Chunksize-Algorithm
Below you will find a slightly modified version of the algorithm inside the source code. As you can see, I cut off the lower part and wrapped it into a function for calculating the chunksize
argument externally. I also replaced 4
with a factor
parameter and outsourced the len()
calls.
# mp_utils.py
def calc_chunksize(n_workers, len_iterable, factor=4):
"""Calculate chunksize argument for Pool-methods.
Resembles source-code within `multiprocessing.pool.Pool._map_async`.
"""
chunksize, extra = divmod(len_iterable, n_workers * factor)
if extra:
chunksize += 1
return chunksize