processes – Single/Multiprocessing abstraction

Multi/single processing abstraction.

Execute functions hiding the information whether are executed in parallel, using multiprocessing.pool.Pool or serially.

The module provides a high level factory function to create and retrieve worker instances get_worker(). The first call with a given name creates the worker, while subsequent calls returns it, ignoring all arguments besides the name are ignored.

If you need or want to reuse a name after closing the worker, e.g. when using the worker in a with statement, you can remove it with remove_worker()

Examples

>>> import pyhetdex.tools.processes as p
>>> def func(args):
...     print(", ".join(args))
...     return " ".join(args)

Run single processor jobs

>>> worker_sp = p.get_worker(name="singlep")
>>> job = worker_sp(func, ["from", "single", "processor"])
from, single, processor
>>> job
<pyhetdex.tools.processes.Result object at ...>
>>> job.get()
'from single processor'
>>> worker_sp.get_results()
['from single processor']
>>> worker_sp.close()

Or multiprocessor jobs

>>> worker_mp = p.get_worker(name="multip", multiprocessing=True)
>>> job = worker_mp(func, ["from", "multi", "processor"])
>>> job
<multiprocessing.pool.ApplyResult object at ...>
>>> job.get()  
'from single processor'
>>> worker_mp.get_results()  
['from multi processor']
>>> worker_mp.close()

Run some initialisation function when creating the multiprocessing pool

>>> def init_func(message):
...     print(message)
>>> worker_mp_init = p.get_worker(name="multip_init", multiprocessing=True,
...                               initializer=init_func,
...                               initargs=("docstring",))
>>> worker_mp_init.close()

Alternatively, you can use the worker within a with statement

>>> def func1(args):
...     return " ".join(args[::-1])
>>> with p.get_worker(name="inwith", multiprocessing=True) as wworker:
...     wworker(func1, ["in", "with", "statement"])
...     wworker.get_results()  
['in with statement']

Public inteface

pyhetdex.tools.processes.get_worker(name='default', multiprocessing=False, always_wait=False, poolclass=<bound method BaseContext.Pool of <multiprocessing.context.DefaultContext object>>, result_class=<class 'pyhetdex.tools.processes.Result'>, **kwargs)[source]

Returns a worker with the specified name.

At the first call with a given name, the worker is created using the remaining arguments. By default the worker is for a single process. Subsequent calls with the same name return always the same worker instance and the remaining options are ignored. This means that worker instance never need to be passed between different parts of the application.

Parameters:
name : string, optional

name to associate to the _Worker object. If does not exist a new object is created, stored and returned

multiprocessing : bool, optional

use multi processing

always_wait : bool, optional

if False, terminate the jobs when exiting the with statement upon an error; if True wait for the running job to finish before closing

poolclass : class

class implementing the multiprocessing.Pool interface

result_class : class, optional

Result-like class to use to do the single processor execution

kwargs : dictionary

options passed to multiprocessing.Pool; ignored if multiprocessing is False

Returns:
worker: :class:`_Worker` instance
pyhetdex.tools.processes.remove_worker(name='default')[source]

Remove the worker called default

Parameters:
name : string, optional

name to associate to the pool object.

Raises:
WorkerNameException

if the name does not exist

pyhetdex.tools.processes.ignore_keyboard_interrupt()[source]

Ignore the KeyboardInterrupt signal

Result class

class pyhetdex.tools.processes.Result(func, *args, **kwargs)[source]

Bases: object

Implements the same interface as multiprocessing.pool.AsyncResult and execute the function at instantiation time.

Used to abstract single/multi processor cases in _Worker and to postpone error handling.

Parameters:
func : callable

function to execute

args : list

positional arguments to pass to the function

kwargs : dict

keyword arguments to pass to the function

get(timeout=None)[source]

Return the result. If the call raised an exception then that exception will be reraised.

timeout is ignored.

ready()[source]
Returns:
bool

whether the call has completed: always True

successful()[source]
Returns:
bool

True whether the call completed without raising an exception

wait(timeout=None)[source]

Do nothing method. Provided for compatibility.

class pyhetdex.tools.processes.DeferredResult(func, *args, **kwargs)[source]

Bases: pyhetdex.tools.processes.Result

Reimplement Result executing the function in the get() method.

Used to abstract single/multi processor cases in _Worker to postpone function execution.

Parameters:
func : callable

function to execute

args : list

positional arguments to pass to the function

kwargs : dict

keyword arguments to pass to the function

get(timeout=None)[source]

Execute the function and return its return value(s).

timeout is ignored.

Exceptions

exception pyhetdex.tools.processes.WorkerException[source]

Bases: Exception

Generic exception

exception pyhetdex.tools.processes.WorkerNameException[source]

Bases: KeyError, pyhetdex.tools.processes.WorkerException

The required name does not exist

The worker

Although the instantiation and retrieval of the class should be done via the get_worker(), the attributes are public.

class pyhetdex.tools.processes._Worker(multiprocessing=False, always_wait=False, poolclass=<bound method BaseContext.Pool of <multiprocessing.context.DefaultContext object>>, result_class=<class 'pyhetdex.tools.processes.Result'>, **kwargs)[source]

Class to hide the details of the single or multiprocessor execution. The class declaration should be considered as private and should be created and retrieved through the get_worker() factory functions.

The instance can be used once in a with statement. Upon exiting, it close/terminate and join the pool, if multiprocessing is used.

Parameters:
multiprocessing : bool, optional

use multi processing

always_wait : bool, optional

if False, terminate the jobs when exiting the with statement upon an error; if True wait for the running job to finish before closing

poolclass : class, optional

class implementing the multiprocessing.Pool interface

result_class : class, optional

Result-like class to use to do the single processor execution

kwargs : dictionary

options passed to multiprocessing.Pool; ignored if multiprocessing is False

__call__(func, *args, **kwargs)[source]

Apply func on args and kwargs (asynchronously if multiprocessing is on).

Parameters:
func: callable

function to execute

args: list

positional arguments to pass to the function

kwargs: dict

keyword arguments to pass to the function

Returns:
job : AsyncResult or Result

stores the result of the computation can be recovered with the multiprocessing.pool.AsyncResult.get() or the Result.get() methods, respectively. The jobs are also stored into an internal list and can be accessed with jobs and get_results().

__enter__()[source]

Entry point for the with statement

clear_jobs()[source]

Clear the list of jobs

close()[source]

Close and join the pool: normal termination

get_results(fail_safe=False)[source]

Wait for all the processes to finish and return the results.

Parameters:
fail_safe : bool

if any of the jobs raise an exception, capture it and add the corresponding instance in the output list instead of the result. The default value is to re-raise the exception

Returns:
list

list of the actual results from the computations

jobs

list of AsyncResult or Result instances

jobs_stat()[source]

Return the number of completed jobs

Returns:
n_completed : int

number of completed jobs

n_error : int

number of jobs that raised an exception

n_tot : int

total number of submitted jobs

multiprocessing

Whether multiprocessing is enabled or not

Returns:
bool
pool

multiprocessing.Pool instance or None, for single processor computations

terminate()[source]

Terminate and join the pool: emergency exit

wait(timeout=None)[source]

Wait for all the jobs to finish or until timeout seconds passed

Parameters:
timeout : float, optional

seconds for the timeout