labthings_fastapi.message_broker
Handle pub-sub style events.
Both properties and actions can emit events that may be observed. This module handles all the pub-sub messaging in LabThings.
Attributes
Classes
A pub-sub event message. |
|
A class that relays pub/sub messages. |
Module Contents
- labthings_fastapi.message_broker.LOGGER
- class labthings_fastapi.message_broker.Message
A pub-sub event message.
This is the message that is sent when a property or action generates an event.
This is a pydantic dataclass, so we validate the message. This might change in the future for performance reasons.
- Parameters:
thing – The name of the Thing generating the event.
affordance – The name of the affordance generating the event.
message_type – The kind of affordance from which the event originates.
payload – Data specific to the event (e.g. property value, action status).
- message_type: Literal['property', 'action']
- payload: Any
- class labthings_fastapi.message_broker.MessageBroker
A class that relays pub/sub messages.
This class takes care of relaying messages to streams that have subscribed to them. It does not format messages or handle any details of e.g. websocket protocol.
Subscriptions require an
ObjectSendStream[Message]and each time aMessagematching the subscription parameters (thingandaffordance) is published, it will be sent on that stream.The broker does not validate thing or affordance names: that’s up to the code calling
MessageBroker.subscribe.Initialise the message broker.
- _subscriptions: dict[str, dict[str, weakref.WeakSet[anyio.streams.memory.MemoryObjectSendStream[Message]]]]
- async subscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) None
Subscribe to messages from a particular affordance.
Note that this method must be called from the event loop, as the message broker is deliberately not thread safe.
- async unsubscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) None
Unsubscribe a stream from messages from a particular affordance.
This function is often not necessary: streams will be unsubscribed automatically if they are closed or finalised. As the message broker only keeps a weak reference to the stream, that means it will be finalised and unsubscribed when the code that created it goes out of scope.