labthings_fastapi.outputs
Support for additional output formats.
Currently, this submodule provides an MJPEG Stream output. See
MJPEGStreamDescriptor.
Submodules
Classes
Manage streaming images over HTTP as an MJPEG stream. |
|
A descriptor that returns a MJPEGStream object when accessed. |
Package Contents
- class labthings_fastapi.outputs.MJPEGStream(thing_server_interface: labthings_fastapi.thing_server_interface.ThingServerInterface, ringbuffer_size: int = 10)
Manage streaming images over HTTP as an MJPEG stream.
An MJPEGStream object handles accepting images (already in JPEG format) and streaming them to HTTP clients as a multipart response.
The minimum needed to make the stream work is to periodically call
add_framewith JPEG image data.To add a stream to a
Thing, use theMJPEGStreamDescriptorwhich will handle creating anMJPEGStreamobject on first access, and will also add it to the HTTP API.The MJPEG stream buffers the last few frames (10 by default) and also has a hook to notify the size of each frame as it is added. The latter is used by OpenFlexure’s autofocus routine. The ringbuffer is intended to support clients receiving notification of new frames, and then retrieving the frame (shortly) afterwards.
Initialise an MJPEG stream.
See the class docstring for
MJPEGStream. Note that it will often be initialised byMJPEGStreamDescriptor.- Parameters:
thing_server_interface – the
ThingServerInterfaceof theThingassociated with this stream. It’s used to run the async code that relays frames to open connections.ringbuffer_size – The number of frames to retain in memory, to allow retrieval after the frame has been sent.
- _lock
- condition
- _streaming = False
- _ringbuffer: list[RingbufferEntry] = []
- _thing_server_interface
- reset(ringbuffer_size: int | None = None) None
Reset the stream and optionally change the ringbuffer size.
Discard all frames from the ringbuffer and reset the frame index.
- Parameters:
ringbuffer_size – the number of frames to keep in memory.
- async ringbuffer_entry(i: int) RingbufferEntry
Return the ith frame acquired by the camera.
The ringbuffer means we can retrieve frames even if they are not the latest frame. Specifying
ialso makes it simple to ensure that every frame in a stream is acquired.- Parameters:
i – The index of the frame to read.
- Returns:
the frame, together with a timestamp and its index.
- Raises:
ValueError – if the frame is not available.
- async buffer_for_reading(i: int) AsyncIterator[bytes]
Yield the ith frame as a bytes object.
Retrieve frame
ifrom the ringbuffer.This allows async code access to a frame in the ringbuffer. The frame will not be copied, and should not be written to. The frame may not exist after the function has completed (i.e. after any
withstatement has finished).Using a context manager is intended to allow future versions of this code to manage access to the ringbuffer (e.g. allowing buffer reuse). Currently, buffers are always created as fresh
bytesobjects, so this context manager does not provide additional functionality overMJPEGStream.ringbuffer_entry.- Parameters:
i – The index of the frame to read
- Yield:
The frame’s data as
bytes, along with timestamp and index.
- async next_frame() int
Wait for the next frame, and return its index.
This async function will yield until a new frame arrives, then return its index. The index may then be used to retrieve the new frame with
MJPEGStream.buffer_for_reading.- Returns:
the index of the next frame to arrive.
- Raises:
StopAsyncIteration – if the stream has stopped.
- async grab_frame() bytes
Wait for the next frame, and return it.
This copies the frame for safety, so there is no need to release or return the buffer.
- Returns:
The next JPEG frame, as a
bytesobject.
- async next_frame_size() int
Wait for the next frame and return its size.
This is useful if you want to use JPEG size as a sharpness metric.
- Returns:
The size of the next JPEG frame, in bytes.
- async frame_async_generator() AsyncGenerator[bytes, None]
Yield frames as bytes objects.
This generator will return frames from the MJPEG stream. These are taken from the ringbuffer by
MJPEGStream.buffer_for_readingand so should have any buffer-management considerations taken care of.Code using this generator should complete as quickly as possible, because future implementations may hold a lock while this function yields. If lengthy processing is required, please copy the buffer and continue processing elsewhere.
Note that this will wait for a new frame each time. There is no guarantee that we won’t skip frames.
- Yield:
the frames in sequence, as a
bytesobject containing JPEG data.
- async mjpeg_stream_response() MJPEGStreamResponse
Return a StreamingResponse that streams an MJPEG stream.
This wraps each frame with the required header to make the multipart stream work, and sends it to the client via a streaming response. It is sufficient to show up as a video in an
imgtag, or to be streamed to disk as an MJPEG format video.- Returns:
a streaming response in MJPEG format.
- add_frame(frame: bytes) None
Add a JPEG to the MJPEG stream.
This function adds a frame to the stream. It may be called from threaded code, but uses an
anyio.from_thread.BlockingPortalto call code in theanyioevent loop, which is where notifications are handled.- Parameters:
frame – The frame to add
- Raises:
ValueError – if the supplied frame does not start with the JPEG start bytes and end with the end bytes.
- async notify_new_frame(i: int) None
Notify any waiting tasks that a new frame is available.
- Parameters:
i – The number of the frame (which counts up since the server starts)
- async notify_stream_stopped() None
Raise an exception in any waiting tasks to signal the stream has stopped.
This should be run only when streaming has stopped, i.e.
self._streamingisFalseand an error will be raised if this isn’t the case.- Raises:
RuntimeError – if the stream is still streaming.
- class labthings_fastapi.outputs.MJPEGStreamDescriptor(**kwargs: Any)
A descriptor that returns a MJPEGStream object when accessed.
If this descriptor is added to a
Thing, it will create anMJPEGStreamobject when it is first accessed. It will also add two HTTP endpoints, one with the name of the descriptor serving the MJPEG stream, and another with/viewerappended, which serves a basic HTML page that views the stream.This descriptor does not currently show up in the Thing Description.
Initialise an MJPEGStreamDescriptor.
- Parameters:
**kwargs – keyword arguments are passed to the initialiser of
MJPEGStream.
- _kwargs: Any
- __set_name__(_owner: labthings_fastapi.thing.Thing, name: str) None
Remember the name to which we are assigned.
The name is important, as it will set the URL of the HTTP endpoint used to access the stream.
- Parameters:
_owner – the
Thingto which we are attached.name – the name to which this descriptor is assigned.
- __get__(obj: Literal[None], type: MJPEGStreamDescriptor.__get__.type | None = None) Self
- __get__(obj: labthings_fastapi.thing.Thing, type: MJPEGStreamDescriptor.__get__.type | None = None) MJPEGStream
Return the MJPEG Stream, or the descriptor object.
When accessed on the class, this
__get__method will return the descriptor object. This allows LabThings to add it to the HTTP API.When accessed on the object, an
MJPEGStreamis returned.- Parameters:
obj – the host
Thing, orNoneif accessed on the class.type – the class on which we are defined.
- Returns:
an
MJPEGStream, or this descriptor.
- async viewer_page(url: str) fastapi.responses.HTMLResponse
Generate a trivial viewer page for the stream.
- Parameters:
url – the URL of the stream.
- Returns:
a trivial HTML page that views the stream.
- add_to_fastapi(app: fastapi.FastAPI, thing: labthings_fastapi.thing.Thing) None
Add the stream to the FastAPI app.
We create two endpoints, one for the MJPEG stream (using the name of the descriptor, relative to the host
Thing) and one serving a basic viewer.The example code below would create endpoints at
/camera/streamand/camera/stream/viewer.import labthings_fastapi as lt class Camera(lt.Thing): stream = MJPEGStreamDescriptor() server = lt.ThingServer.from_things({"camera": Camera})
- Parameters:
app – the
fastapi.FastAPIapplication to which we are being added.thing – the host
Thinginstance.