labthings_fastapi.message_broker

Handle pub-sub style events.

Both properties and actions can emit events that may be observed. This module handles all the pub-sub messaging in LabThings.

Attributes

LOGGER

Classes

Message

A pub-sub event message.

MessageBroker

A class that relays pub/sub messages.

Module Contents

labthings_fastapi.message_broker.LOGGER
class labthings_fastapi.message_broker.Message

A pub-sub event message.

This is the message that is sent when a property or action generates an event.

This is a pydantic dataclass, so we validate the message. This might change in the future for performance reasons.

Parameters:
  • thing – The name of the Thing generating the event.

  • affordance – The name of the affordance generating the event.

  • message_type – The kind of affordance from which the event originates.

  • payload – Data specific to the event (e.g. property value, action status).

thing: str
affordance: str
message_type: Literal['property', 'action']
payload: Any
class labthings_fastapi.message_broker.MessageBroker

A class that relays pub/sub messages.

This class takes care of relaying messages to streams that have subscribed to them. It does not format messages or handle any details of e.g. websocket protocol.

Subscriptions require an ObjectSendStream[Message] and each time a Message matching the subscription parameters (thing and affordance) is published, it will be sent on that stream.

The broker does not validate thing or affordance names: that’s up to the code calling MessageBroker.subscribe.

Initialise the message broker.

_subscriptions: dict[str, dict[str, weakref.WeakSet[anyio.streams.memory.MemoryObjectSendStream[Message]]]]
async subscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) None

Subscribe to messages from a particular affordance.

Note that this method must be called from the event loop, as the message broker is deliberately not thread safe.

Parameters:
  • thing – The name of the Thing being subscribed to.

  • affordance – The name of the affordance being subscribed to.

  • stream – A stream to send the messages to.

Raises:

TypeError – if the thing or affordance argument is not a string.

async unsubscribe(thing: str, affordance: str, stream: anyio.streams.memory.MemoryObjectSendStream[Message]) None

Unsubscribe a stream from messages from a particular affordance.

This function is often not necessary: streams will be unsubscribed automatically if they are closed or finalised. As the message broker only keeps a weak reference to the stream, that means it will be finalised and unsubscribed when the code that created it goes out of scope.

Parameters:
  • thing – The name of the Thing being unsubscribed from.

  • affordance – The name of the affordance being unsubscribed from.

  • stream – The stream to unsubscribe.

Raises:
  • KeyError – if there is no such subscription.

  • TypeError – if the thing or affordance argument is not a string.

async publish(message: Message) None

Publish a message.

This async method will relay the message to any subscriber streams.

Parameters:

message – the message to send.

async close_streams() None

Close all streams that are subscribed to receive messages.

This should be called when the server shuts down.