labthings_fastapi.outputs

Support for additional output formats.

Currently, this submodule provides an MJPEG Stream output. See MJPEGStreamDescriptor.

Submodules

Classes

MJPEGStream

Manage streaming images over HTTP as an MJPEG stream.

MJPEGStreamDescriptor

A descriptor that returns a MJPEGStream object when accessed.

Package Contents

class labthings_fastapi.outputs.MJPEGStream(thing_server_interface: labthings_fastapi.thing_server_interface.ThingServerInterface, ringbuffer_size: int = 10)

Manage streaming images over HTTP as an MJPEG stream.

An MJPEGStream object handles accepting images (already in JPEG format) and streaming them to HTTP clients as a multipart response.

The minimum needed to make the stream work is to periodically call add_frame with JPEG image data.

To add a stream to a Thing, use the MJPEGStreamDescriptor which will handle creating an MJPEGStream object on first access, and will also add it to the HTTP API.

The MJPEG stream buffers the last few frames (10 by default) and also has a hook to notify the size of each frame as it is added. The latter is used by OpenFlexure’s autofocus routine. The ringbuffer is intended to support clients receiving notification of new frames, and then retrieving the frame (shortly) afterwards.

Initialise an MJPEG stream.

See the class docstring for MJPEGStream. Note that it will often be initialised by MJPEGStreamDescriptor.

Parameters:
  • thing_server_interface – the ThingServerInterface of the Thing associated with this stream. It’s used to run the async code that relays frames to open connections.

  • ringbuffer_size – The number of frames to retain in memory, to allow retrieval after the frame has been sent.

_lock
condition
_streaming = False
_ringbuffer: list[RingbufferEntry] = []
_thing_server_interface
reset(ringbuffer_size: int | None = None) None

Reset the stream and optionally change the ringbuffer size.

Discard all frames from the ringbuffer and reset the frame index.

Parameters:

ringbuffer_size – the number of frames to keep in memory.

stop() None

Stop the stream.

Stop the stream and cause all clients to disconnect.

async ringbuffer_entry(i: int) RingbufferEntry

Return the ith frame acquired by the camera.

The ringbuffer means we can retrieve frames even if they are not the latest frame. Specifying i also makes it simple to ensure that every frame in a stream is acquired.

Parameters:

i – The index of the frame to read.

Returns:

the frame, together with a timestamp and its index.

Raises:

ValueError – if the frame is not available.

async buffer_for_reading(i: int) AsyncIterator[bytes]

Yield the ith frame as a bytes object.

Retrieve frame i from the ringbuffer.

This allows async code access to a frame in the ringbuffer. The frame will not be copied, and should not be written to. The frame may not exist after the function has completed (i.e. after any with statement has finished).

Using a context manager is intended to allow future versions of this code to manage access to the ringbuffer (e.g. allowing buffer reuse). Currently, buffers are always created as fresh bytes objects, so this context manager does not provide additional functionality over MJPEGStream.ringbuffer_entry.

Parameters:

i – The index of the frame to read

Yield:

The frame’s data as bytes, along with timestamp and index.

async next_frame() int

Wait for the next frame, and return its index.

This async function will yield until a new frame arrives, then return its index. The index may then be used to retrieve the new frame with MJPEGStream.buffer_for_reading.

Returns:

the index of the next frame to arrive.

Raises:

StopAsyncIteration – if the stream has stopped.

async grab_frame() bytes

Wait for the next frame, and return it.

This copies the frame for safety, so there is no need to release or return the buffer.

Returns:

The next JPEG frame, as a bytes object.

async next_frame_size() int

Wait for the next frame and return its size.

This is useful if you want to use JPEG size as a sharpness metric.

Returns:

The size of the next JPEG frame, in bytes.

async frame_async_generator() AsyncGenerator[bytes, None]

Yield frames as bytes objects.

This generator will return frames from the MJPEG stream. These are taken from the ringbuffer by MJPEGStream.buffer_for_reading and so should have any buffer-management considerations taken care of.

Code using this generator should complete as quickly as possible, because future implementations may hold a lock while this function yields. If lengthy processing is required, please copy the buffer and continue processing elsewhere.

Note that this will wait for a new frame each time. There is no guarantee that we won’t skip frames.

Yield:

the frames in sequence, as a bytes object containing JPEG data.

async mjpeg_stream_response() MJPEGStreamResponse

Return a StreamingResponse that streams an MJPEG stream.

This wraps each frame with the required header to make the multipart stream work, and sends it to the client via a streaming response. It is sufficient to show up as a video in an img tag, or to be streamed to disk as an MJPEG format video.

Returns:

a streaming response in MJPEG format.

add_frame(frame: bytes) None

Add a JPEG to the MJPEG stream.

This function adds a frame to the stream. It may be called from threaded code, but uses an anyio.from_thread.BlockingPortal to call code in the anyio event loop, which is where notifications are handled.

Parameters:

frame – The frame to add

Raises:

ValueError – if the supplied frame does not start with the JPEG start bytes and end with the end bytes.

async notify_new_frame(i: int) None

Notify any waiting tasks that a new frame is available.

Parameters:

i – The number of the frame (which counts up since the server starts)

async notify_stream_stopped() None

Raise an exception in any waiting tasks to signal the stream has stopped.

This should be run only when streaming has stopped, i.e. self._streaming is False and an error will be raised if this isn’t the case.

Raises:

RuntimeError – if the stream is still streaming.

class labthings_fastapi.outputs.MJPEGStreamDescriptor(**kwargs: Any)

A descriptor that returns a MJPEGStream object when accessed.

If this descriptor is added to a Thing, it will create an MJPEGStream object when it is first accessed. It will also add two HTTP endpoints, one with the name of the descriptor serving the MJPEG stream, and another with /viewer appended, which serves a basic HTML page that views the stream.

This descriptor does not currently show up in the Thing Description.

Initialise an MJPEGStreamDescriptor.

Parameters:

**kwargs – keyword arguments are passed to the initialiser of MJPEGStream.

_kwargs: Any
__set_name__(_owner: labthings_fastapi.thing.Thing, name: str) None

Remember the name to which we are assigned.

The name is important, as it will set the URL of the HTTP endpoint used to access the stream.

Parameters:
  • _owner – the Thing to which we are attached.

  • name – the name to which this descriptor is assigned.

__get__(obj: Literal[None], type: MJPEGStreamDescriptor.__get__.type | None = None) Self
__get__(obj: labthings_fastapi.thing.Thing, type: MJPEGStreamDescriptor.__get__.type | None = None) MJPEGStream

Return the MJPEG Stream, or the descriptor object.

When accessed on the class, this __get__ method will return the descriptor object. This allows LabThings to add it to the HTTP API.

When accessed on the object, an MJPEGStream is returned.

Parameters:
  • obj – the host Thing, or None if accessed on the class.

  • type – the class on which we are defined.

Returns:

an MJPEGStream, or this descriptor.

async viewer_page(url: str) fastapi.responses.HTMLResponse

Generate a trivial viewer page for the stream.

Parameters:

url – the URL of the stream.

Returns:

a trivial HTML page that views the stream.

add_to_fastapi(app: fastapi.FastAPI, thing: labthings_fastapi.thing.Thing) None

Add the stream to the FastAPI app.

We create two endpoints, one for the MJPEG stream (using the name of the descriptor, relative to the host Thing) and one serving a basic viewer.

The example code below would create endpoints at /camera/stream and /camera/stream/viewer.

import labthings_fastapi as lt


class Camera(lt.Thing):
    stream = MJPEGStreamDescriptor()


server = lt.ThingServer.from_things({"camera": Camera})
Parameters:
  • app – the fastapi.FastAPI application to which we are being added.

  • thing – the host Thing instance.