queue
– Queue utilities¶
Common functionalities related with queue and queue listeners
QueueListener
is a abstract base class that provides a threaded
listener that extract items from a queue and handle them. The only method that
needs to be implemented is QueueListener.handle()
. The implementation is
adapted from cPython 3.5 branch commit
9aee273bf8b7
The SetupQueueListener
is a helper class that accept a class derived
from QueueListener
, a queue and a number of other arguments, and
starts the queue listener, optionally in a separate process. The class also
provides a context manager interfaces, that stop the thread, and the process,
where the QueueListener
is running. The implementation is based on
logging cookbook.
QueueContext
– A queue object with context manager¶
-
class
pyhetdex.tools.queue.
QueueContext
(maxsize=0)[source]¶ Bases:
multiprocessing.queues.Queue
Add a context manager to the
multiprocessing.queues.Queue
Parameters: - maxsize : int, optional
sets the upper bound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If
maxsize
is less than or equal to zero, the queue size is infinite.
QueueListener
– Log records from a queue¶
-
class
pyhetdex.tools.queue.
QueueListener
(queue_)[source]¶ This class implements an internal threaded listener which watches for entries being added to a queue, removes them and passes them to a handler method, that subclasses must re-implement.
Parameters: - queue_ : queue-like instance
-
_monitor
()[source]¶ Monitor the queue for records, and ask the handler to deal with them.
This method runs on a separate, internal thread. The thread will terminate if it sees a sentinel object in the queue.
-
dequeue
(block)[source]¶ Dequeue a record and return it, optionally blocking.
The base implementation uses get. You may want to override this method if you want to use timeouts or work with custom queue implementations.
-
enqueue_sentinel
()[source]¶ This is used to enqueue the sentinel record.
The base implementation uses
put_nowait
. You may want to override this method if you want to use timeouts or work with custom queue implementations.
-
handle
(record)[source]¶ Handle a record.
Abstract method: this method must be implemented in derived classes.
Parameters: - record :
record to handle
Returns: - record :
record after passing through
prepare()
.
-
prepare
(record)[source]¶ Prepare a record for handling.
This method just returns the passed-in record. You may want to override this method if you need to do any custom marshalling or manipulation of the record before passing it to the handlers.
-
start
()[source]¶ Start the listener.
This starts up a background thread to monitor the queue for records to process
-
stop
()[source]¶ Stop the listener.
This asks the thread to terminate, and then waits for it to do so. Note that if you don’t call this before your application exits, there may be some records still left on the queue, which won’t be processed.
-
_abc_cache
= <_weakrefset.WeakSet object>¶
-
_abc_negative_cache
= <_weakrefset.WeakSet object>¶
-
_abc_negative_cache_version
= 54¶
-
_abc_registry
= <_weakrefset.WeakSet object>¶
-
_sentinel
= None¶
SetupQueueListener
– Setup the QueueListener
¶
-
class
pyhetdex.tools.queue.
SetupQueueListener
(qlc, queue_, use_process=True, qlc_args=(), qlc_kwargs={})[source]¶ Start the
qlc
, in a separate process if required.The
SetupQueueListener
instance can be used as a context manager for awith
statement.Parameters: - qlc :
QueueListener
child class to instantiate in the setup
- queue_ : queue-like object
qeueu to pass as first argument to
qlc
- qlc_args : list
arguments to pass to the
qlc
when instantiating it- qlc_kwargs : dict
keyword arguments to pass to the
qlc
when instantiating it- use_process : bool, optional
if
True
start the listener in a separate process
Attributes: - qlc, queue, qlc_args, qlc_kwargs :
as above
- stop_event :
multiprocessing.Event
instance event used to signal to stop the listener
- lp : :class multiprocessing.Process instance
process running the listener, if
use_process
isTrue
- listener :
qlc
instance
-
__exit__
(exc_type, exc_value, traceback)[source]¶ Exit point for the with statement
The base implementation call
stop()
- qlc :