queue – Queue utilities

Common functionalities related with queue and queue listeners

QueueListener is a abstract base class that provides a threaded listener that extract items from a queue and handle them. The only method that needs to be implemented is QueueListener.handle(). The implementation is adapted from cPython 3.5 branch commit 9aee273bf8b7

The SetupQueueListener is a helper class that accept a class derived from QueueListener, a queue and a number of other arguments, and starts the queue listener, optionally in a separate process. The class also provides a context manager interfaces, that stop the thread, and the process, where the QueueListener is running. The implementation is based on logging cookbook.

QueueContext – A queue object with context manager

class pyhetdex.tools.queue.QueueContext(maxsize=0)[source]

Bases: multiprocessing.queues.Queue

Add a context manager to the multiprocessing.queues.Queue

Parameters:
maxsize : int, optional

sets the upper bound limit on the number of items that can be placed in the queue. Insertion will block once this size has been reached, until queue items are consumed. If maxsize is less than or equal to zero, the queue size is infinite.

__enter__()[source]

Returns an instance of the queue

__exit__(exc_type, exc_value, traceback)[source]

Close and join the thread where the queue lives

QueueListener – Log records from a queue

class pyhetdex.tools.queue.QueueListener(queue_)[source]

This class implements an internal threaded listener which watches for entries being added to a queue, removes them and passes them to a handler method, that subclasses must re-implement.

Parameters:
queue_ : queue-like instance
_monitor()[source]

Monitor the queue for records, and ask the handler to deal with them.

This method runs on a separate, internal thread. The thread will terminate if it sees a sentinel object in the queue.

dequeue(block)[source]

Dequeue a record and return it, optionally blocking.

The base implementation uses get. You may want to override this method if you want to use timeouts or work with custom queue implementations.

enqueue_sentinel()[source]

This is used to enqueue the sentinel record.

The base implementation uses put_nowait. You may want to override this method if you want to use timeouts or work with custom queue implementations.

handle(record)[source]

Handle a record.

Abstract method: this method must be implemented in derived classes.

Parameters:
record :

record to handle

Returns:
record :

record after passing through prepare().

prepare(record)[source]

Prepare a record for handling.

This method just returns the passed-in record. You may want to override this method if you need to do any custom marshalling or manipulation of the record before passing it to the handlers.

start()[source]

Start the listener.

This starts up a background thread to monitor the queue for records to process

stop()[source]

Stop the listener.

This asks the thread to terminate, and then waits for it to do so. Note that if you don’t call this before your application exits, there may be some records still left on the queue, which won’t be processed.

_abc_cache = <_weakrefset.WeakSet object>
_abc_negative_cache = <_weakrefset.WeakSet object>
_abc_negative_cache_version = 54
_abc_registry = <_weakrefset.WeakSet object>
_sentinel = None

SetupQueueListener – Setup the QueueListener

class pyhetdex.tools.queue.SetupQueueListener(qlc, queue_, use_process=True, qlc_args=(), qlc_kwargs={})[source]

Start the qlc, in a separate process if required.

The SetupQueueListener instance can be used as a context manager for a with statement.

Parameters:
qlc : QueueListener child

class to instantiate in the setup

queue_ : queue-like object

qeueu to pass as first argument to qlc

qlc_args : list

arguments to pass to the qlc when instantiating it

qlc_kwargs : dict

keyword arguments to pass to the qlc when instantiating it

use_process : bool, optional

if True start the listener in a separate process

Attributes:
qlc, queue, qlc_args, qlc_kwargs :

as above

stop_event : multiprocessing.Event instance

event used to signal to stop the listener

lp : :class multiprocessing.Process instance

process running the listener, if use_process is True

listener : qlc instance
__enter__()[source]

Entry point for the with statement

Returns:
self
__exit__(exc_type, exc_value, traceback)[source]

Exit point for the with statement

The base implementation call stop()

_listener_process()[source]

This initialises logging with the given handlers.

To be used in a separate process.

Starts the listener and waits for the main process to signal completion via the event. The listener is then stopped.

_start_listener()[source]

Create, start and return the listener

stop()[source]

Stop the listener and, if it’s running in a process, join it. Should be called before the main process finishes to avoid losing messages.