labthings_fastapi.message_broker ================================ .. py:module:: labthings_fastapi.message_broker .. autoapi-nested-parse:: Handle pub-sub style events. Both properties and actions can emit events that may be observed. This module handles all the pub-sub messaging in LabThings. Attributes ---------- .. autoapisummary:: labthings_fastapi.message_broker.LOGGER Classes ------- .. autoapisummary:: labthings_fastapi.message_broker.Message labthings_fastapi.message_broker.MessageBroker Module Contents --------------- .. py:data:: LOGGER .. py:class:: Message A pub-sub event message. This is the message that is sent when a property or action generates an event. This is a pydantic dataclass, so we validate the message. This might change in the future for performance reasons. :param thing: The name of the Thing generating the event. :param affordance: The name of the affordance generating the event. :param message_type: The kind of affordance from which the event originates. :param payload: Data specific to the event (e.g. property value, action status). .. py:attribute:: thing :type: str .. py:attribute:: affordance :type: str .. py:attribute:: message_type :type: Literal['property', 'action'] .. py:attribute:: payload :type: Any .. py:class:: MessageBroker A class that relays pub/sub messages. This class takes care of relaying messages to streams that have subscribed to them. It does not format messages or handle any details of e.g. websocket protocol. Subscriptions require an `ObjectSendStream[Message]` and each time a `Message` matching the subscription parameters (``thing`` and ``affordance``) is published, it will be sent on that stream. The broker does not validate thing or affordance names: that's up to the code calling `MessageBroker.subscribe`\ . Initialise the message broker. .. py:attribute:: _subscriptions :type: dict[str, dict[str, weakref.WeakSet[anyio.streams.memory.MemoryObjectSendStream[Message]]]] .. py:method:: subscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) -> None :async: Subscribe to messages from a particular affordance. Note that this method must be called from the event loop, as the message broker is deliberately not thread safe. :param thing: The name of the `.Thing` being subscribed to. :param affordance: The name of the affordance being subscribed to. :param stream: A stream to send the messages to. :raises TypeError: if the `thing` or `affordance` argument is not a string. .. py:method:: unsubscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) -> None :async: Unsubscribe a stream from messages from a particular affordance. This function is often not necessary: streams will be unsubscribed automatically if they are closed or finalised. As the message broker only keeps a weak reference to the stream, that means it will be finalised and unsubscribed when the code that created it goes out of scope. :param thing: The name of the `.Thing` being unsubscribed from. :param affordance: The name of the affordance being unsubscribed from. :param stream: The stream to unsubscribe. :raises KeyError: if there is no such subscription. :raises TypeError: if the `thing` or `affordance` argument is not a string. .. py:method:: publish(message: Message) -> None :async: Publish a message. This async method will relay the message to any subscriber streams. :param message: the message to send. .. py:method:: close_streams() -> None :async: Close all streams that are subscribed to receive messages. This should be called when the server shuts down.