Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing:
>>> from math import sqrt
>>> [sqrt(i ** 2) for i in range(10)]
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
can be spread over 2 CPUs using the following:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Under the hood, the Parallel object create a multiprocessing pool that forks the Python interpreter in multiple processes to execute each of the items of the list. The delayed function is a simple trick to be able to create a tuple (function, args, kwargs) with a function-call syntax.
Warning
Under Windows, it is important to protect the main loop of code to avoid recursive spawning of subprocesses when using joblib.Parallel. In other words, you should be writing code like this:
import ....
def function1(...):
...
def function2(...):
...
...
if __name__ == '__main__':
# do stuff with imports and functions defined about
...
No code should run outside of the “if __name__ == ‘__main__’” blocks, only imports and definitions.
By default Parallel uses the Python multiprocessing module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes.
If you know that the function you are calling is based on a compiled extension that releases the Python Global Interpreter Lock (GIL) during most of its computation then it might be more efficient to use threads instead of Python processes as concurrent workers. For instance this is the case if you write the CPU intensive part of your code inside a `with nogil`_ block of a Cython function.
To use the threads, just pass "threading" as the value of the backend parameter of the Parallel constructor:
>>> Parallel(n_jobs=2, backend="threading")(
... delayed(sqrt)(i ** 2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Some algorithms require to make several consecutive calls to a parallel function interleaved with processing of the intermediate results. Calling Parallel several times in a loop is sub-optimal because it will create and destroy a pool of workers (threads or processes) several times which can cause a significant overhead.
For this case it is more efficient to use the context manager API of the Parallel class to re-use the same pool of workers for several calls to the Parallel object:
>>> with Parallel(n_jobs=2) as parallel:
... accumulator = 0.
... n_iter = 0
... while accumulator < 1000:
... results = parallel(delayed(sqrt)(accumulator + i ** 2)
... for i in range(5))
... accumulator += sum(results) # synchronization barrier
... n_iter += 1
...
>>> (accumulator, n_iter)
(1136.596..., 14)
Prior to Python 3.4 the 'multiprocessing' backend of joblib can only use the fork strategy to create worker processes under non-Windows systems. This can cause some third-party libraries to crash or freeze. Such libraries include as Apple vecLib / Accelerate (used by NumPy under OSX), some old version of OpenBLAS (prior to 0.2.10) or the OpenMP runtime implementation from GCC.
To avoid this problem joblib.Parallel can be configured to use the 'forkserver' start method on Python 3.4 and later. The start method has to be configured by setting the JOBLIB_START_METHOD environment variable to 'forkserver' instead of the default 'fork' start method. However the user should be aware that using the 'forkserver' prevents joblib.Parallel to call function interactively defined in a shell session.
You can read more on this topic in the multiprocessing documentation.
Under Windows the fork system call does not exist at all so this problem does not exist (but multiprocessing has more overhead).
Helper class for readable parallel mapping.
Parameters: | n_jobs: int, default: 1 :
backend: str or None, default: ‘multiprocessing’ :
verbose: int, optional :
pre_dispatch: {‘all’, integer, or expression, as in ‘3*n_jobs’} :
batch_size: int or ‘auto’, default: ‘auto’ :
temp_folder: str, optional :
max_nbytes int, str, or None, optional, 1M by default :
|
---|
Notes
This object uses the multiprocessing module to compute in parallel the application of a function to many different arguments. The main functionality it brings in addition to using the raw multiprocessing API are (see examples for details):
More readable code, in particular since it avoids constructing list of arguments.
- Easier debugging:
- informative tracebacks even when the error happens on the client side
- using ‘n_jobs=1’ enables to turn off parallel computing for debugging without changing the codepath
- early capture of pickling errors
An optional progress meter.
Interruption of multiprocesses jobs with ‘Ctrl-C’
Flexible pickling control for the communication to and from the worker processes.
Ability to use shared memory efficiently with worker processes for large numpy-based datastructures.
Examples
A simple example:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=1)(delayed(sqrt)(i**2) for i in range(10))
[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]
Reshaping the output when the function has several return values:
>>> from math import modf
>>> from joblib import Parallel, delayed
>>> r = Parallel(n_jobs=1)(delayed(modf)(i/2.) for i in range(10))
>>> res, i = zip(*r)
>>> res
(0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5, 0.0, 0.5)
>>> i
(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0)
The progress meter: the higher the value of verbose, the more messages:
>>> from time import sleep
>>> from joblib import Parallel, delayed
>>> r = Parallel(n_jobs=2, verbose=5)(delayed(sleep)(.1) for _ in range(10))
[Parallel(n_jobs=2)]: Done 1 out of 10 | elapsed: 0.1s remaining: 0.9s
[Parallel(n_jobs=2)]: Done 3 out of 10 | elapsed: 0.2s remaining: 0.5s
[Parallel(n_jobs=2)]: Done 6 out of 10 | elapsed: 0.3s remaining: 0.2s
[Parallel(n_jobs=2)]: Done 9 out of 10 | elapsed: 0.5s remaining: 0.1s
[Parallel(n_jobs=2)]: Done 10 out of 10 | elapsed: 0.5s finished
Traceback example, note how the line of the error is indicated as well as the values of the parameter passed to the function that triggered the exception, even though the traceback happens in the child process:
>>> from heapq import nlargest
>>> from joblib import Parallel, delayed
>>> Parallel(n_jobs=2)(delayed(nlargest)(2, n) for n in (range(4), 'abcde', 3))
#...
---------------------------------------------------------------------------
Sub-process traceback:
---------------------------------------------------------------------------
TypeError Mon Nov 12 11:37:46 2012
PID: 12934 Python 2.7.3: /usr/bin/python
...........................................................................
/usr/lib/python2.7/heapq.pyc in nlargest(n=2, iterable=3, key=None)
419 if n >= size:
420 return sorted(iterable, key=key, reverse=True)[:n]
421
422 # When key is none, use simpler decoration
423 if key is None:
--> 424 it = izip(iterable, count(0,-1)) # decorate
425 result = _nlargest(n, it)
426 return map(itemgetter(0), result) # undecorate
427
428 # General case, slowest method
TypeError: izip argument #1 must support iteration
___________________________________________________________________________
Using pre_dispatch in a producer/consumer situation, where the data is generated on the fly. Note how the producer is first called a 3 times before the parallel loop is initiated, and then called to generate new data on the fly. In this case the total number of iterations cannot be reported in the progress messages:
>>> from math import sqrt
>>> from joblib import Parallel, delayed
>>> def producer():
... for i in range(6):
... print('Produced %s' % i)
... yield i
>>> out = Parallel(n_jobs=2, verbose=100, pre_dispatch='1.5*n_jobs')(
... delayed(sqrt)(i) for i in producer())
Produced 0
Produced 1
Produced 2
[Parallel(n_jobs=2)]: Done 1 jobs | elapsed: 0.0s
Produced 3
[Parallel(n_jobs=2)]: Done 2 jobs | elapsed: 0.0s
Produced 4
[Parallel(n_jobs=2)]: Done 3 jobs | elapsed: 0.0s
Produced 5
[Parallel(n_jobs=2)]: Done 4 jobs | elapsed: 0.0s
[Parallel(n_jobs=2)]: Done 5 out of 6 | elapsed: 0.0s remaining: 0.0s
[Parallel(n_jobs=2)]: Done 6 out of 6 | elapsed: 0.0s finished