labthings_fastapi.outputs.mjpeg_stream

Module Contents

Classes

RingbufferEntry

A single entry in a ringbuffer

MJPEGStreamResponse

MJPEGStream

Manage streaming images over HTTP as an MJPEG stream

MJPEGStreamDescriptor

A descriptor that returns a MJPEGStream object when accessed

API

class labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry

A single entry in a ringbuffer

frame: bytes

None

timestamp: datetime.datetime

None

index: int

None

class labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse(gen: AsyncGenerator[bytes, None], status_code: int = 200)

Bases: fastapi.responses.StreamingResponse

media_type

‘multipart/x-mixed-replace; boundary=frame’

__init__(gen: AsyncGenerator[bytes, None], status_code: int = 200)

A StreamingResponse that streams an MJPEG stream

This response is initialised with an async generator that yields bytes objects, each of which is a JPEG file. We add the –frame markers and mime types that mark it as an MJPEG stream. This is sufficient to enable it to work in an img tag, with the src set to the MJPEG stream’s endpoint.

It expects an async generator that supplies individual JPEGs to be streamed, such as the one provided by .MJPEGStream.

NB the status_code argument is used by FastAPI to set the status code of the response in OpenAPI.

async mjpeg_async_generator() AsyncGenerator[bytes, None]

A generator yielding an MJPEG stream

class labthings_fastapi.outputs.mjpeg_stream.MJPEGStream(ringbuffer_size: int = 10)

Manage streaming images over HTTP as an MJPEG stream

An MJPEGStream object handles accepting images (already in JPEG format) and streaming them to HTTP clients as a multipart response.

The minimum needed to make the stream work is to periodically call add_frame with JPEG image data.

To add a stream to a .Thing, use the .MJPEGStreamDescriptor which will handle creating an MJPEGStream object on first access, and will also add it to the HTTP API.

The MJPEG stream buffers the last few frames (10 by default) and also has a hook to notify the size of each frame as it is added. The latter is used by OpenFlexure’s autofocus routine.

__init__(ringbuffer_size: int = 10)
reset(ringbuffer_size: Optional[int] = None)

Reset the stream and optionally change the ringbuffer size

stop(portal: anyio.from_thread.BlockingPortal)

Stop the stream

async ringbuffer_entry(i: int) labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry

Return the ith frame acquired by the camera

Parameters:

i – The index of the frame to read

async buffer_for_reading(i: int) AsyncIterator[bytes]

Yields the ith frame as a bytes object

Parameters:

i – The index of the frame to read

async next_frame() int

Wait for the next frame, and return its index

Raises:

StopAsyncIteration – if the stream has stopped.

async grab_frame() bytes

Wait for the next frame, and return it

This copies the frame for safety, so we can release the read lock on the buffer.

async next_frame_size() int

Wait for the next frame and return its size

This is useful if you want to use JPEG size as a sharpness metric.

async frame_async_generator() AsyncGenerator[bytes, None]

A generator that yields frames as bytes

async mjpeg_stream_response() labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse

Return a StreamingResponse that streams an MJPEG stream

add_frame(frame: bytes, portal: anyio.from_thread.BlockingPortal) None

Return the next buffer in the ringbuffer to write to

Parameters:
  • frame – The frame to add

  • portal – The blocking portal to use for scheduling tasks. This is necessary because tasks are handled asynchronously. The blocking portal may be obtained with a dependency, in labthings_fastapi.dependencies.blocking_portal.BlockingPortal.

async notify_new_frame(i: int) None

Notify any waiting tasks that a new frame is available.

Parameters:

i – The number of the frame (which counts up since the server starts)

async notify_stream_stopped() None

Raise an exception in any waiting tasks to signal the stream has stopped.

class labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor(**kwargs)

A descriptor that returns a MJPEGStream object when accessed

If this descriptor is added to a .Thing, it will create an .MJPEGStream object when it is first accessed. It will also add two HTTP endpoints, one with the name of the descriptor serving the MJPEG stream, and another with /viewer appended, which serves a basic HTML page that views the stream.

__init__(**kwargs)
__set_name__(owner, name)
__get__(obj: Optional[labthings_fastapi.thing.Thing], type=None) Union[labthings_fastapi.outputs.mjpeg_stream.MJPEGStream, typing_extensions.Self]

The value of the property

If obj is none (i.e. we are getting the attribute of the class), we return the descriptor.

If no getter is set, we’ll return either the initial value, or the value from the object’s __dict__, i.e. we behave like a variable.

If a getter is set, we will use it, unless the property is observable, at which point the getter is only ever used once, to set the initial value.

async viewer_page(url: str) fastapi.responses.HTMLResponse
add_to_fastapi(app: fastapi.FastAPI, thing: labthings_fastapi.thing.Thing)

Add the stream to the FastAPI app