Multiprocessing is possible in Python. Silas S. Brown shows us various ways.
It’s surprisingly easy to use more than one CPU core in Python. You can’t do it with straightforward threads, since the C implementation of Python has a Global Interpreter Lock (GIL) which means there can only ever be one thread performing active calculations at any one time, so threads in Python are generally useful only for waiting on I/O, handling GUIs and servers and such, not actually processing in parallel when you have multiple CPU cores. (The Java implementation has no GIL and really can run on multiple cores in parallel, but I’m assuming you have an existing Python project and want to stick with the C implementation.) But there are now ways of multiprocessing in standard C Python, and they’re not too difficult, even to add in to legacy Python code.
Python 3.2 introduced the concurrent.futures module as standard [
Python
], and there’s a backport for Python 2.7 which can usually be installed on Unix or GNU/Linux via
sudo pip install futures
(in Debian or Ubuntu you might need
sudo apt-get install python-pip
first; on a Mac try
sudo easy_install pip
). One nice thing about this module is it’s quite straightforward to roll your own ‘dummy version’ for when parallelism is not available: see Listing 1.
try: import concurrent.futures executor = \ concurrent.futures.ProcessPoolExecutor() except: class DummyExecutor: def submit(self, fn, *args, **kwargs): class Future: def result(self,*_): return fn(*args,**kwargs) return Future() def map(self, func, *iterables, **kwargs): for n in map(func,*iterables): yield n executor = DummyExecutor() |
Listing 1 |
This gives you an object called
executor
which supports
submit(function, arguments)
returning an object that will, when asked for its
result()
later, give you either the result of the calculation or the exception raised by it, as appropriate. (Java programmers should recognise these semantics.) The
executor
object also has a
map(function, iterables)
which works like the built-in
map()
. If you’re on a multi-core machine and the real
concurrent.futures
is available in its Python installation, then some of the work will be done asynchronously on other CPU cores in between the calls to
submit()
and
result()
, so you can parallelise programs simply by looking for situations where independent calculations can be started ahead of when their results are needed, or even just by parallelising a few calls to
map()
as long as your map functions are not so trivial that the overhead of parallelising them would outweigh any benefit. But if your script is run on an older machine with no
concurrent.futures
available, it will fall back to the ‘dummy’ code which simply runs the function sequentially when its result is called for. (And if that result turns out not to be required after all and is not asked for, then the function won’t run. So if parallelism is not available then at least you can benefit from lazy evaluation. But this applies only if your algorithm involves speculative computations i.e. ones you start before knowing if you’ll really need them.)
I like the idea of ‘if it’s there, use it; if not, do without’: it means users of my scripts don’t
have
to make sure
concurrent.futures
is available in their Python installation. If they don’t have whatever it takes to install it, they’ll simply get the sequential version of my script rather than an
ImportError
(
ImportError
s in your scripts can be bad PR). Note I’m not specifically catching
ImportError
around the
concurrent.futures
import, because it’s also possible for this import to succeed but still fail to make
ProcessPoolExecutor
available. This can be seen by reading
__init__.py
in the source code of
concurrent.futures
: if
ProcessPoolExecutor
cannot be loaded, then the module will just give you
ThreadPoolExecutor
. But there’s no point using
ThreadPoolExecutor
for multiprocessing, because
ThreadPoolExecutor
is subject to the GIL, so we want to verify that
ProcessPoolExecutor
is available before going ahead.
The interface to the ‘dummy’ object is actually quite a far cry from that of the real thing. With the real concurrent.futures, you can’t pass lambda or locally-defined functions to
submit()
or
map()
, but the dummy object lets you get away with doing this. Also, the real
concurrent.futures
has extra functionality, such as
add_done_callback
and polling for completion status, and does not run a function twice if you call its
result()
twice. All of this can be worked around by writing a more complex dummy object, but if all you’re going to do anyway is call
submit()
and
result()
then there’s not a lot of point making the fallback that complicated: if a few lines of script are supposed to be a ‘poor man’s’ fallback for a large library, then we don’t want to make the substitute so big and complicated that we almost might as well bundle the library itself into our script. Just make sure to test your code at least once with the real
concurrent.futures
to make sure you haven’t accidentally tried to give it a lambda function or something (the dummy object won’t pick up on this error). You can of course insert print statements into the code to tell you which branch it’s using, to make sure you’re testing the right one; you may even want to leave something in there for the production version (i.e. ‘this script should run faster if you install futures’).
Oversized data
From this point on, I’ll assume the real
concurrent.futures
is present on the system and you are doing real multiprocessing.
You don’t have to worry about causing too many context switches if too many tasks are launched at once, since
ProcessPoolExecutor
defaults to queuing up tasks when all CPU cores are already occupied with them. But you might sometimes be worried about what kind of data you are passing in to each task, since serialisation overheads could be a serious slow-down if it has to be large.
If you’re on Unix, Python’s underlying ‘multiprocessing’ module will start the new processes with
fork()
, which means they each get a copy of the parent process’s memory (with copy-on-write semantics if supported by the kernel, so that no copying occurs until a memory-page is actually changed). That means your functions can read module-level global variables that have been set up at runtime before the parallel work started (just don’t try to change these during the parallel work, unless you want to cope with such changes affecting some future calculations but not others depending on which CPU or process ID happens to run what).
fork()
does, however, mean you’d better be careful if you’re also using threads in the same program, such as for a GUI; there are ways of working around this, but I’d suggest concentrating on making a command-line tool and let somebody else wrap it in a GUI in a different process if they must.
But you can’t rely on having
fork()
if your script might be run on Windows, nor if you might eventually use multiple machines in a cluster using
mpi4py.futures
(more on this below), SCOOP [
SCOOP
], or a similar tool that gives you the same API as
concurrent.futures
. In these cases, it’s likely that your script will be separately imported on each core, so it had better not run unless
__name__ == "__main__"
. You can set up a few module-level variables when that happens; the subprocesses should still have the same
sys.argv
and
os.environ
if that’s any help. However, you probably won’t want to repeat a long precalculation when doing this.
Since most multiprocessing environments, even across multiple machines in a cluster, assume a shared filesystem, one fairly portable way of sharing such large precalculated data is to do it via the filesystem, as in Listing 2. To avoid the obvious race condition, this must be done before initialising the parallelism.
from cPickle import Pickler, Unpickler if __name__ == "__main__": data = our_precalculation() Pickler(open('precalc','wb'),-1).dump(data) else: try: data except NameError: data = Unpickler(open('precalc','rb')).load() |
Listing 2 |
Listing 2 can detect the case where
fork()
has been used and the data does not need to be read back from the filesystem, although without further low-level inspection it won’t be able to detect when it can avoid writing it to the filesystem at all (but that might not be an issue if you want to write it anyway). There are other ways of passing data to non-
fork()
ed subprocesses without using the filesystem, but they involve going at a lower level than concurrent.futures (you can’t get away with simply passing the data into a special ‘initialiser’ function to be run on each core, since the
concurrent.futures
API by itself offers no guarantee that all cores in use will be reached with it).
MPI
Message Passing Interface (MPI) is a standard traditionally used on high-performance computing (HPC) clusters, and you can access it from Python using a number of libraries for interacting with one of the underlying C implementations of MPI (typically MPICH or OpenMPI). Now that we have
concurrent.futures
, it’s a good idea to look for libraries supporting that API so we won’t have to write anything MPI-specific (if it’s there, we can use it; if not, we can use something else). mpi4py [
MPI
] plans to add an
mpi4py.futures
module in its version 2.1, but, at the time this article was written, version 2.1 was not yet a stable release (and standard
pip
commands were fetching version 2.0), so if you want to experiment with
mpi4py.futures
, you’ll have to download the in-development version of mpi4py.
Addendum for OpenMPI | |
|
On a typical GNU/Linux box, you can do this as follows: become root (
sudo su
), make the
mpicc
command available (on RedHat-based systems that requires typing something like
module add mpi/mpich-x86_64
after installing MPICH, or equivalent after installing OpenMPI; Debian/Ubuntu systems make it available by default when one of these packages is installed), make sure the python-dev or python-devel package is installed (apt-get install python-dev or yum install python-devel), and then try:
pip install https://bitbucket.org/mpi4py/mpi4py/get/master.tar.gz
At this point Listing 1 can be changed (after adding extra indentation to each line) by putting Listing 3 before the beginning. Here, we check if we are being run under MPI, and, if so, we use it; otherwise we drop back to the previous Listing 1 behaviour (use
concurrent.futures
if available, otherwise our ‘dummy’ object). A subtlety is that
mpi4py.futures
will work only if it is run in a command like this:
mpiexec -n 4 python -m mpi4py.futures script.py args...
and that in an MPI environment too (i.e. the above
module add
command will need to have been run in the same shell, if appropriate). Some versions of mpiexec also have options for forwarding standard input and environment variables to processes, but not all do, so you’ll probably have to arrange for the script to run without these. Also, any script that uses
sys.stdout.isatty()
to determine whether or not output is being redirected will need to be updated for running under MPI, because MPI always redirects the output from the program’s point of view even when it’s still being sent to the terminal.
If you want MPI to use other machines in a cluster, then how to do this depends on your MPI version: it may involve extra setup steps before starting your program, as is the case with mpd in older versions of MPICH2 such as version 1.2. But in MPICH2 version 1.5 (the current mpich2 package in Debian Jessie), and in MPICH 3.1 (Jessie’s current mpich package), the default process manager is
hydra
and you simply create a text file listing the host names (or IP addresses) of the cluster machines, ensure they can all ssh into each other without password and share the filesystem, and pass this text file to mpiexec using the
-f
parameter or the
HYDRA_HOST_FILE
environment variable. (In OpenMPI you use the
--hostfile
parameter.) Modern MPI implementations are also able to checkpoint and restart processes in the event of failure of one or more machines in the cluster; refer to each implementation’s documentation for how to set this up.
If our script is run outside of MPI, then our detecting and handling of ‘no MPI’ is a little subtle because
mpi4py.futures
(if installed) will still successfully import, and it will even let you instantiate an
MPIPoolExecutor()
, but then will likely crash after you submit a job, and catching that crash from your Python script is very awkward (normal
try
/
except
won’t cut it). So we need to look at the command line to check we’re being run in the right way for MPI first. But we can’t just inspect
sys.argv
, because that will have been rewritten before control is passed to our script, so we have to get the original command line from the
ps
command. The
ps
parameters in Listing 3 were tested on both GNU/Linux and Mac OS X, and if any system does not support them then we should just fall back to the safety of not using MPI.
try: import os, commands commands.getoutput( "ps -p " + str(os.getpid()) + " -o args") \ .index("-m mpi4py.futures") # ValueError # if not found import mpi4py.futures executor = mpi4py.futures.MPIPoolExecutor() except: # etc (as Listing 1, extra indent) |
Listing 3 |
A pattern for moving long-running functions to other CPUs
If you have a function that normally runs quite quickly but can take a long time on certain inputs, it might not pay to have every call run on a different CPU, since in fast cases the overheads of doing so would outweigh the savings. But it might be useful if the program could determine for itself whether or not to run this particular call on a different CPU.
Since, in Python, any function can be turned into a generator by replacing
return x
with
yield x ; return
(giving a generator that yields a single item), the pattern shown in Listing 4 seems natural as a way to refactor existing sequential code into multiprocessing. The part marked ‘first part of function goes here’ will be repeated on the other CPU, which seems wasteful but could be faster than passing variables across if they are large; it is assumed that this part of the function does what is necessary for us to be able to figure out if the function is likely to take a long time, e.g. if the first part of the function shows that we are now generating a LOT of intermediate data (which is why we probably don’t want to pass it all across once we’ve decided we’re better off running in the background). The
my_function_wrapped
part is necessary because
submit()
takes only functions not generators.
def my_function(param, can_background = True): # first part of function goes here if (can_background and likely_to_take_a_long_time()): job = executor.submit(my_function_wrapped, param) yield "backgrounded" yield job.result() ; return # rest of function goes here # change all 'return x' to 'yield x ; return' def my_function_wrapped(param): return my_function(param, False).next() def caller(): gen = my_function(param) result = gen.next() if result == "backgrounded": # Do something else for a while... result = gen.next() # get actual result |
Listing 4 |
I’m not suggesting writing new programs like Listing 4, but it might be a useful pattern for refactoring legacy sequential code.
Avoiding CPU overload
The above pattern for moving long-running functions to other CPUs should work as-is on MPI, but with
concurrent.futures
it will result in one too many processes, because
ProcessPoolExecutor
defaults to running as many parallel processes as there are CPU cores, on the assumption that the control program won’t need much CPU itself, an assumption that is likely to break down when using this pattern. The Linux and BSD kernels are of course perfectly capable of multiplexing a load that’s greater than the number of available CPU cores, but it might be more efficient to reduce the number of ‘slave’ processes by 1 to allow the master to have a CPU to itself. This can be accomplished using code like that in Listing 5.
import multiprocessing num_cpus = multiprocessing.cpu_count() if num_cpus < 2: raise Exception("Not enough CPUs") from concurrent.futures import \ ProcessPoolExecutor executor = ProcessPoolExecutor(num_cpus - 1) |
Listing 5 |
Evaluation
The above methods were used to partially parallelise Annotator Generator [
Brown12
] resulting in a 15% overall speed increase when using
concurrent.futures
as compared to the unmodified code. This could almost certainly be improved with more parallelisation (recall Amdahl’s Law: the speedup is limited by the fraction of the program that must be sequential). Only a fraction of a percent was saved by subtracting 1 from the number of CPUs to achieve a more even load.
Results using MPI were not so satisfactory. When running with 4 processes on a single quad-core machine using MPI, the program was actually slowed down by 8% compared with running single-core, which in turn was 6% slower than the unmodified code. I believe that 6% represents the overhead of converting functions into generators, and could be eliminated by duplicating and modifying the code for the single-core case, but that would introduce a maintenance issue unless it could somehow be automated. Given Annotator Generator’s desktop usage scenario, the prevalence of multi-core CPUs on desktops, and the speedup using
concurrent.futures
, it doesn’t seem very high-priority to invest code complexity in saving that 6% in the single-core case. MPI’s poor performance is more worrisome, but I later discovered it was due to the system running low on RAM (and therefore being slowed down by more page faults) while running four separate MPI processes:
concurrent.futures
was able to share the data structures, but MPI wasn’t (even though it could use shared memory for some message passing). Once I reduced the size of the input, MPI was 14% faster than the single-core case and
concurrent.futures
was 18% faster than the single-core case. Perhaps MPI would perform better on a real cluster, which I have not yet had an opportunity to test. A cluster of virtual machines with OpenMPI ran 5% faster than the single-core case, but because these machines were virtual and all running on the same actual machine, I do not believe that result to be meaningful other than as a demonstration that the underlying protocols were working. Still, I suspect a greater deal of parallelisation is required to outweigh the overheads of MPI beyond those of concurrent.futures. But as it can now use the same API as
concurrent.futures
, not to mention SCOOP, it is now possible to write for a single concurrency API and experiment to see which framework gives the best speed improvement to your particular application.
References
[Brown12] Silas S. Brown. Web Annotation with Modified-Yarowsky and Other Algorithms. Overload issue 112 (December 2012) page 4. The modified code is now at http://people.ds.cam.ac.uk/ssb22/adjuster/annogen.html
[MPI] MPI for Python http://mpi4py.scipy.org/
[Python] Python library documentation https://docs.python.org/3/library/concurrent.futures.html
[SCOOP] SCOOP (Scalable COncurrent Operations in Python) http://scoop.readthedocs.io/