As you start to do more computationally heavy tasks, you will find that calculations can take a very long time. Often, this can invole applying some function to every object in a list or other container. If these operations are all independent of each other (for example running a simulation with different sets of initial conditions) you can parallelise these tasks to speed things up, performing multiple computations simultaneously on different processors/cores on your device.

One way you can run parallel computations in Python is using the multiprocessing module (this should come bundled in with your Python installation).

In this tutorial I don’t want to dive deep into the details of parallel processing - all you need to know is that if I have 8 cores, then if all goes well I can speed up my calculations by a factor of 8x. Instead, I want to give practical implementation details for realistic scenarios - something I have found is lacking in many online multiprocessing tutorials.

For reference, I’m using Python 3.11.4.

Canonical example

Before we dive into the fun stuff, let’s do a very simple example to see how things work. We will apply a function compute_stuff() to every number in a list - this is to simulate some heavy computation. We will parallelise the computation, performing n_processes calculations simultaneously, where n_processes is the number of cores my PC has (using mp.Pool with any more processes won’t speed up your computation).

The if __name__ == "__main__" line controls the entry point of the script, and is only run when you run call the script from your main process, not from any of the subprocesses. For more details see the ‘Safe importing of main module’ note here.

import multiprocessing as mp # to do the heavy lifting
import time

def compute_stuff(x):
    # to simulate a long computation we will just wait for a second
    time.sleep(1)
    return x

if __name__ == "__main__":
    # Find out how many CPUs I have on my computer
    n_processes = mp.cpu_count()

    with mp.Pool(n_processes) as pool:
        # Iteratively apply our compute_stuff() function to 
        # each number 1,...,10, and save the results.
        results = pool.map(compute_stuff, range(10))

    print(results)

We’ll now save this script as multiprocessing_test.py. To run it we go to our shell (I’m using Powershell on Windows but any should work) and run the script. Here is the first part where you might trip up - you can’t use multiprocessing in a Jupyter notebook or in the Python REPL shell, you need to run your file as a script.

> python multiprocessing_test.py
[0,1,2,3,4,5,6,7,8,9]

Note: the results of pool.map are returned in the correct order but computations are not necessarily done in that order. See appendix.

By the way, we can check that parallelisation is in fact speeding things up. With my 8-core laptop it ran computations about 6.1x faster - see details here.

The real deal

Now suppose you want to do something more hefty, and you want to actually use the output. Good luck finding examples online! My situation was that I had a function tgt_and_dim_estimate_1point that would compute some quantity for a point in a point cloud. The point cloud was represented as a numpy array with a row for each data point in \(D\) dimensions.

If you want me to shut up and get to the point, skip to here. Otherwise, I’ll talk you through my thought process: my function took multiple parameters but for my purposes these parameters would stay the same every run and only the index of the data point would change. Easy enough - just use a lambda function (or so I thought):

import multiprocessing as mp
from estimators import tgt_and_dim_estimate_1point()
import numpy as np

if __name__ == "__main__":
    X = np.load("X_small.npy")
    m = len(X) # number of data points
    output_filename = "estims_small.npy"
    
    n_processes = mp.cpu_count()
    with mp.Pool(n_processes) as p:
        # Create a lambda function based off of my estimator that only takes i as an argument
        dim_estimate_1point = lambda i: tgt_and_dim_estimate_1point(i, X=X, r=1, eta=1, dim_only=True)
        dim_estims = p.map(dim_estimate_1point, range(m))

    # The object returned by pool map isn't actually a list so we need to convert it first
    np.save(output_filename, np.array(list(dim_estims)))
    print("Dimension estimates computed and saved to", output_filename)

Now I run the script:

> python run_estimation.py
Traceback (most recent call last):
  File "C:\...\run_estimation.py", line 14, in <module>
    dim_estims = p.map(dim_estimate_1point, range(m))
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
...
_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001E7F497AA20>: attribute lookup <lambda> on __main__ failed

Disaster! Why is multiprocessing complaining about pickles? Has it gone mad?? Looking up a solution says that I should define my function at the top level of my script. I can do this, but now even if I make X global, it still doesn’t seem to get passed to the subprocesses:

def dim_estimate_1point(i):
    global X
    return tgt_and_dim_estimate_1point(i, X=X, r=1, eta=1, dim_only=True)

if __name__ == "__main__":
    global X
    X = np.load("X_S3_small.npy")
    ...
    with mp.Pool(n_processes) as p:
        dim_estims = p.map(dim_estimate_1point, range(m))
    ...
> python run_estimation.py
Traceback (most recent call last):
  File "C:\...\run_estimation.py", line 18, in <module>
    dim_estims = p.map(dim_estimate_1point, range(m))
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\Python\Python311\Lib\multiprocessing\pool.py", line 367, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\...\Python\Python311\Lib\multiprocessing\pool.py", line 774, in get
    raise self._value
NameError: name 'X' is not defined

We’re cooking

Just when I thought all hope was lost, I found out about functools.partial, which creates something like my lambda function from earlier but makes it picklable:

import multiprocessing as mp
from estimators import tgt_and_dim_estimate_1point
import numpy as np
from functools import partial

if __name__ == "__main__":
    global X
    X = np.load("X_S3_small.npy")
    m = len(X) # number of data points
    output_filename = "estims_small.npy"
    
    n_processes = mp.cpu_count()
    with mp.Pool(n_processes) as p:
        # Create a partial function based off of my estimator that only takes i as an argument
        dim_estimate_1point = partial(tgt_and_dim_estimate_1point, X=X, r=1, eta=1, dim_only = True)
        dim_estims = p.map(dim_estimate_1point, range(m))

    # The object returned by pool map isn't actually a list so we need to convert it first
    np.save(output_filename, np.array(list(dim_estims)))
    print("Dimension estimates computed and saved to", output_filename)
> python run_estimation.py
Dimension estimates computed and saved to estims_small.npy

Looks like it worked! I can double check my output and indeed I get what I expected with these parameters:

>>> np.load("estims_small.npy")
array([1, 1, 1, ..., 1, 1])

My final victory lap involved adding some bells and whistles - allowing me to more easily run the script with different parameters, input matrices, etc. For more details on sys.argv see this short article.

...
import sys

if __name__ == "__main__":
    nargs = len(sys.argv)

    if nargs < 4:
        print("Usage: ", sys.argv[0], " <matrix_input_filename.npy> <r> <eta> <output_filename.npy>")
    else:
        global X, r, eta, output_filename

        matrix_filename = sys.argv[1]
        r = float(sys.argv[2])
        eta = float(sys.argv[3])
        output_filename = sys.argv[4]

        X = np.load(matrix_filename)
        m = len(X)

        ...

        np.save(output_filename, np.array(list(dim_estims)))
        print("Dimension estimates computed and saved to ", output_filename)

So now my previous run would look like this:

> python run_estimation_script.py X_S3_small.npy 1 1 estims_small.npy
Dimension estimates computed and saved to estims_small.npy

If you’re curious you can see the actual code I ran on GitHub.

Conclusion

Parallel processing in Python isn’t too tricky, but you’ll have to get used to calling Python files as scripts, thinking a bit more careful about how you input parameters and output results, and how you prepare your function to be passed to subprocesses. I hope this was helpful!

p.s. If anyone knows how to avoid the annoying highlighting e.g. in the console blocks where I have error messages, or getting the Python REPL prompt >>> to look normal then please let me know!

Appendix

Order of computation

Note that although Pool.map() returns the results in the same order as your iterable, the computations aren’t necessarily done in that order. This may be important if your function changes its input. Upping the ante and running our function over 100 numbers (results = pool.map(compute_stuff, range(100))) and putting a print(x) line in compute_stuff() gives the following output:

> python .\multiprocessing_test.py
0
4
8
12
...
1
5
9
...
98
99
[0,1,2,3,4,...,99]

The proof is in the pudding

Running things serially (compute_stuff() still just waits for a second):

if __name__ == "__main__":
    t0 = time.time()
    results = [compute_stuff(x) for x in range(100)]
    time_taken = time.time() - t0
    
    print(results)
    print(f"Time taken: {time_taken:.2f}")
> python multiprocessing_test.py
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ..., 98, 99]
Time taken: 100.05

Now in parallel:

if __name__ == "__main__":
    n_processes = mp.cpu_count()

    t0 = time.time()
    with mp.Pool(n_processes) as pool:
        results = pool.map(compute_stuff, range(20))
    time_taken = time.time() - t0

    print(f"Using {n_processes=}")
    print(results)
    print(f"Time taken: {time_taken:.2f}")
> python multiprocessing_test.py
Using n_processes=8
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, ..., 98, 99]
Time taken: 16.31

In the end the speedup is about 6.1x - not quite 8x for any number of reasons (other processes running on the background of my computer, some time for multiprocessing to spin up, the solar wind, etc) but it’s pretty good.