IMessageQueue

class carb.eventdispatcher.IMessageQueue

Bases: pybind11_object

An instance of a message queue.

A message queue is a one-way weak coupling device that allows “messages” to be sent from a variety of senders and received by a specific target. A message queue has a push-side and a pop-side. The push-side can be accessed by any thread. The pop-side can only be accessed by the owning thread, or a group of threads if no owning thread is specified. Violations of this policy are enforced by raising errors.

Can be created with IMessageQueueFactory.create_message_queue(), or an existing message queue can be found with IMessageQueueFactory.get_message_queue().

Methods

__init__(*args, **kwargs)

get_name(self)

Retrieves the unique name of this message queue.

get_owning_thread(self)

Retrieves the thread ID of the thread which owns this message queue.

has_messages(self)

Returns whether this message queue has pending messages.

peek(self, fn)

Inspects the message at the front of the queue, if any, without removing it.

pop(fn)

Waits until a message has been pushed to the queue and might be available.

push(event_name[, payload])

Posts a message to the message queue without waiting for the message to be processed.

push_and_wait(event_name[, payload])

Pushes a message to the message queue and does not return until the message is processed.

stop(self)

Stops the message queue before destruction.

try_pop(self, fn)

Pops the message at the front of the queue and calls a function with the message.

__init__(*args, **kwargs)
get_name(self: carb.eventdispatcher._eventdispatcher.IMessageQueue) str

Retrieves the unique name of this message queue.

Parameters

None

Returns

The unique name of this message queue.

Return type

str

get_owning_thread(self: carb.eventdispatcher._eventdispatcher.IMessageQueue) int

Retrieves the thread ID of the thread which owns this message queue.

This value should be comparable with threading.get_ident().

Parameters

None

Returns

The thread identifier that owns the queue, or 0 if no single thread owns the queue.

Return type

uint

has_messages(self: carb.eventdispatcher._eventdispatcher.IMessageQueue) bool

Returns whether this message queue has pending messages.

Parameters

None

Returns

True if there are messages in the queue; False if there are no messages in the queue.

Return type

bool

peek(self: carb.eventdispatcher._eventdispatcher.IMessageQueue, fn: Callable[[carb.eventdispatcher._eventdispatcher.Event], None]) bool

Inspects the message at the front of the queue, if any, without removing it.

The message is not removed when this function is called. Call pop() to process and remove the message.

This function only works in a single-owner-thread situation, otherwise a RuntimeError is raised.

Parameters

fn – (function) A function that is called with a carb.eventdispatcher.Event as the only parameter.

Returns

True if the given function was called with a message; False if there are no messages in the queue.

Return type

bool

Raises
  • IndexError – The message queue has been stopped.

  • RuntimeError – The calling thread is not the owner thread, the queue is not in single-thread mode, or another error.

async pop(fn)

Waits until a message has been pushed to the queue and might be available. Returns after the message is handled.

Note, this function must be awaited. The message is removed from the queue atomically before processing. In a single-owner-thread situation this function must be called within the context of the owning thread.

If a task or thread is waiting on the message, the message will be considered ‘completed’ and unblock the waiting task/thread as soon as fn() returns, even if the carb.eventdispatcher.Event passed to the function is retained.

This function returns once a message is processed. If the queue is stopped with stop() while awaiting, this function will stop waiting and will throw IndexError.

Parameters

fn – (function) A function that is called with a carb.eventdispatcher.Event as the only parameter.

Returns

None

Raises
  • IndexError – The message queue has been stopped.

  • RuntimeError – The calling thread is not the owner thread, or another error.

push(event_name, payload: Optional[dict] = None)

Posts a message to the message queue without waiting for the message to be processed.

Parameters
  • event_name – (str) The event name for the message.

  • payload – (dict) (optional) A dictionary that functions as the payload for the message.

Returns

None

Raises
  • IndexError – The message queue has been stopped.

  • MemoryError – Failed to allocate memory for the message.

  • RuntimeError – Any other error.)

async push_and_wait(event_name, payload: Optional[dict] = None)

Pushes a message to the message queue and does not return until the message is processed.

Note, this function must be awaited. For a function that pushes a message without waiting, use push().

Parameters
  • event_name – (str) The event name for the message

  • payload – (dict) (optional) A dictionary that functions as the payload for the message.

Returns

None

Raises
  • IndexError – The message queue has been stopped.

  • MemoryError – Failed to allocate memory for the message.

  • RuntimeError – Any other error.

stop(self: carb.eventdispatcher._eventdispatcher.IMessageQueue) None

Stops the message queue before destruction. This is a one-time, irreversible command to a message queue that the queue is no longer processing messages. It is not required to call this function before the last reference is removed and `*this` is destroyed.

When stop() returns it is guaranteed that:

  • Any future attempts to push messages or pop messages will result in an IndexError being raised.

  • All threads or tasks that are awaiting a message or waiting for a message to be processed have been unblocked. The existing function calls and future function calls will raise an IndexError.

  • The message queue is removed from IMessageQueueFactory; attempts to retrieve the message queue by name will return None and a new message queue with the same name can be created again.

Subsequent calls to this function will raise IndexError as the message queue has already been stopped.

It is undefined behavior to call this from within the functions passed to pop() or peek(). Instead, it is recommended that the handlers for pop() or peek() set a flag that can be checked after the pop() or peek() returns which then calls stop().

Parameters

None

Returns

None

Raises
  • IndexError – The message queue has already been stopped.

  • RuntimeError – The calling thread does not have permission to stop the queue, or another error.

try_pop(self: carb.eventdispatcher._eventdispatcher.IMessageQueue, fn: Callable[[carb.eventdispatcher._eventdispatcher.Event], None]) bool

Pops the message at the front of the queue and calls a function with the message.

The message is removed from the queue atomically before processing. In a single-owner-thread situation, this function must be called within the context of the owning thread.

If a task or thread is waiting on the message, the message will be considered ‘completed’ and unblock the waiting task/thread as soon as fn() returns, even if the carb.eventdispatcher.Event passed to the function is retained.

Parameters

fn – (function) A function that is called with a carb.eventdispatcher.Event as the only parameter.

Returns

True if the given function was called with a message; False if there are no messages in the queue.

Return type

bool

Raises
  • IndexError – The message queue has been stopped.

  • RuntimeError – The calling thread is not the owner thread, or another error.