processes
– Single/Multiprocessing abstraction¶
Multi/single processing abstraction.
Execute functions hiding the information whether are executed in parallel,
using multiprocessing.pool.Pool
or serially.
The module provides a high level factory function to create and retrieve worker
instances get_worker()
. The first call with a given name creates the
worker, while subsequent calls returns it, ignoring all arguments besides the
name are ignored.
If you need or want to reuse a name after closing the worker, e.g. when using
the worker in a with
statement, you can remove it with
remove_worker()
Examples¶
>>> import pyhetdex.tools.processes as p
>>> def func(args):
... print(", ".join(args))
... return " ".join(args)
Run single processor jobs
>>> worker_sp = p.get_worker(name="singlep")
>>> job = worker_sp(func, ["from", "single", "processor"])
from, single, processor
>>> job
<pyhetdex.tools.processes.Result object at ...>
>>> job.get()
'from single processor'
>>> worker_sp.get_results()
['from single processor']
>>> worker_sp.close()
Or multiprocessor jobs
>>> worker_mp = p.get_worker(name="multip", multiprocessing=True)
>>> job = worker_mp(func, ["from", "multi", "processor"])
>>> job
<multiprocessing.pool.ApplyResult object at ...>
>>> job.get()
'from single processor'
>>> worker_mp.get_results()
['from multi processor']
>>> worker_mp.close()
Run some initialisation function when creating the multiprocessing pool
>>> def init_func(message):
... print(message)
>>> worker_mp_init = p.get_worker(name="multip_init", multiprocessing=True,
... initializer=init_func,
... initargs=("docstring",))
>>> worker_mp_init.close()
Alternatively, you can use the worker within a with
statement
>>> def func1(args):
... return " ".join(args[::-1])
>>> with p.get_worker(name="inwith", multiprocessing=True) as wworker:
... wworker(func1, ["in", "with", "statement"])
... wworker.get_results()
['in with statement']
Public inteface¶
-
pyhetdex.tools.processes.
get_worker
(name='default', multiprocessing=False, always_wait=False, poolclass=<bound method BaseContext.Pool of <multiprocessing.context.DefaultContext object>>, result_class=<class 'pyhetdex.tools.processes.Result'>, **kwargs)[source]¶ Returns a worker with the specified name.
At the first call with a given
name
, the worker is created using the remaining arguments. By default the worker is for a single process. Subsequent calls with the samename
return always the same worker instance and the remaining options are ignored. This means that worker instance never need to be passed between different parts of the application.Parameters: - name : string, optional
name to associate to the
_Worker
object. If does not exist a new object is created, stored and returned- multiprocessing : bool, optional
use multi processing
- always_wait : bool, optional
if
False
, terminate the jobs when exiting thewith
statement upon an error; ifTrue
wait for the running job to finish before closing- poolclass : class
class implementing the
multiprocessing.Pool
interface- result_class : class, optional
Result
-like class to use to do the single processor execution- kwargs : dictionary
options passed to
multiprocessing.Pool
; ignored ifmultiprocessing
isFalse
Returns: - worker: :class:`_Worker` instance
-
pyhetdex.tools.processes.
remove_worker
(name='default')[source]¶ Remove the worker called
default
Parameters: - name : string, optional
name to associate to the pool object.
Raises: - WorkerNameException
if the name does not exist
Result class¶
-
class
pyhetdex.tools.processes.
Result
(func, *args, **kwargs)[source]¶ Bases:
object
Implements the same interface as
multiprocessing.pool.AsyncResult
and execute the function at instantiation time.Used to abstract single/multi processor cases in
_Worker
and to postpone error handling.Parameters: - func : callable
function to execute
- args : list
positional arguments to pass to the function
- kwargs : dict
keyword arguments to pass to the function
-
class
pyhetdex.tools.processes.
DeferredResult
(func, *args, **kwargs)[source]¶ Bases:
pyhetdex.tools.processes.Result
Reimplement
Result
executing the function in theget()
method.Used to abstract single/multi processor cases in
_Worker
to postpone function execution.Parameters: - func : callable
function to execute
- args : list
positional arguments to pass to the function
- kwargs : dict
keyword arguments to pass to the function
Exceptions¶
-
exception
pyhetdex.tools.processes.
WorkerNameException
[source]¶ Bases:
KeyError
,pyhetdex.tools.processes.WorkerException
The required name does not exist
The worker¶
Although the instantiation and retrieval of the class should be done via the
get_worker()
, the attributes are public.
-
class
pyhetdex.tools.processes.
_Worker
(multiprocessing=False, always_wait=False, poolclass=<bound method BaseContext.Pool of <multiprocessing.context.DefaultContext object>>, result_class=<class 'pyhetdex.tools.processes.Result'>, **kwargs)[source]¶ Class to hide the details of the single or multiprocessor execution. The class declaration should be considered as private and should be created and retrieved through the
get_worker()
factory functions.The instance can be used once in a
with
statement. Upon exiting, it close/terminate and join the pool, if multiprocessing is used.Parameters: - multiprocessing : bool, optional
use multi processing
- always_wait : bool, optional
if
False
, terminate the jobs when exiting thewith
statement upon an error; ifTrue
wait for the running job to finish before closing- poolclass : class, optional
class implementing the
multiprocessing.Pool
interface- result_class : class, optional
Result
-like class to use to do the single processor execution- kwargs : dictionary
options passed to
multiprocessing.Pool
; ignored ifmultiprocessing
isFalse
-
__call__
(func, *args, **kwargs)[source]¶ Apply
func
onargs
andkwargs
(asynchronously if multiprocessing is on).Parameters: - func: callable
function to execute
- args: list
positional arguments to pass to the function
- kwargs: dict
keyword arguments to pass to the function
Returns: - job :
AsyncResult
orResult
stores the result of the computation can be recovered with the
multiprocessing.pool.AsyncResult.get()
or theResult.get()
methods, respectively. The jobs are also stored into an internal list and can be accessed withjobs
andget_results()
.
-
get_results
(fail_safe=False)[source]¶ Wait for all the processes to finish and return the results.
Parameters: - fail_safe : bool
if any of the jobs raise an exception, capture it and add the corresponding instance in the output list instead of the result. The default value is to re-raise the exception
Returns: - list
list of the actual results from the computations
-
jobs
¶ list of
AsyncResult
orResult
instances
-
jobs_stat
()[source]¶ Return the number of completed jobs
Returns: - n_completed : int
number of completed jobs
- n_error : int
number of jobs that raised an exception
- n_tot : int
total number of submitted jobs
-
multiprocessing
¶ Whether multiprocessing is enabled or not
Returns: - bool
-
pool
¶ multiprocessing.Pool
instance or None, for single processor computations