labthings_fastapi.outputs ========================= .. py:module:: labthings_fastapi.outputs .. autoapi-nested-parse:: Support for additional output formats. Currently, this submodule provides an MJPEG Stream output. See `.MJPEGStreamDescriptor`. Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/labthings_fastapi/outputs/blob/index /autoapi/labthings_fastapi/outputs/mjpeg_stream/index Classes ------- .. autoapisummary:: labthings_fastapi.outputs.MJPEGStream labthings_fastapi.outputs.MJPEGStreamDescriptor Package Contents ---------------- .. py:class:: MJPEGStream(thing_server_interface: labthings_fastapi.thing_server_interface.ThingServerInterface, ringbuffer_size: int = 10) Manage streaming images over HTTP as an MJPEG stream. An MJPEGStream object handles accepting images (already in JPEG format) and streaming them to HTTP clients as a multipart response. The minimum needed to make the stream work is to periodically call `add_frame` with JPEG image data. To add a stream to a `~lt.Thing`, use the `.MJPEGStreamDescriptor` which will handle creating an `.MJPEGStream` object on first access, and will also add it to the HTTP API. The MJPEG stream buffers the last few frames (10 by default) and also has a hook to notify the size of each frame as it is added. The latter is used by OpenFlexure's autofocus routine. The ringbuffer is intended to support clients receiving notification of new frames, and then retrieving the frame (shortly) afterwards. Initialise an MJPEG stream. See the class docstring for `.MJPEGStream`. Note that it will often be initialised by `.MJPEGStreamDescriptor`. :param thing_server_interface: the `~lt.ThingServerInterface` of the `~lt.Thing` associated with this stream. It's used to run the async code that relays frames to open connections. :param ringbuffer_size: The number of frames to retain in memory, to allow retrieval after the frame has been sent. .. py:attribute:: _lock .. py:attribute:: condition .. py:attribute:: _streaming :value: False .. py:attribute:: _ringbuffer :type: list[RingbufferEntry] :value: [] .. py:attribute:: _thing_server_interface .. py:method:: reset(ringbuffer_size: Optional[int] = None) -> None Reset the stream and optionally change the ringbuffer size. Discard all frames from the ringbuffer and reset the frame index. :param ringbuffer_size: the number of frames to keep in memory. .. py:method:: stop() -> None Stop the stream. Stop the stream and cause all clients to disconnect. .. py:method:: ringbuffer_entry(i: int) -> RingbufferEntry :async: Return the ith frame acquired by the camera. The ringbuffer means we can retrieve frames even if they are not the latest frame. Specifying ``i`` also makes it simple to ensure that every frame in a stream is acquired. :param i: The index of the frame to read. :return: the frame, together with a timestamp and its index. :raise ValueError: if the frame is not available. .. py:method:: buffer_for_reading(i: int) -> AsyncIterator[bytes] :async: Yield the ith frame as a bytes object. Retrieve frame ``i`` from the ringbuffer. This allows async code access to a frame in the ringbuffer. The frame will not be copied, and should not be written to. The frame may not exist after the function has completed (i.e. after any ``with`` statement has finished). Using a context manager is intended to allow future versions of this code to manage access to the ringbuffer (e.g. allowing buffer reuse). Currently, buffers are always created as fresh `bytes` objects, so this context manager does not provide additional functionality over `.MJPEGStream.ringbuffer_entry`. :param i: The index of the frame to read :yield: The frame's data as `bytes`, along with timestamp and index. .. py:method:: next_frame() -> int :async: Wait for the next frame, and return its index. This async function will yield until a new frame arrives, then return its index. The index may then be used to retrieve the new frame with `.MJPEGStream.buffer_for_reading`. :return: the index of the next frame to arrive. :raise StopAsyncIteration: if the stream has stopped. .. py:method:: grab_frame() -> bytes :async: Wait for the next frame, and return it. This copies the frame for safety, so there is no need to release or return the buffer. :return: The next JPEG frame, as a `bytes` object. .. py:method:: next_frame_size() -> int :async: Wait for the next frame and return its size. This is useful if you want to use JPEG size as a sharpness metric. :return: The size of the next JPEG frame, in bytes. .. py:method:: frame_async_generator() -> AsyncGenerator[bytes, None] :async: Yield frames as bytes objects. This generator will return frames from the MJPEG stream. These are taken from the ringbuffer by `.MJPEGStream.buffer_for_reading` and so should have any buffer-management considerations taken care of. Code using this generator should complete as quickly as possible, because future implementations may hold a lock while this function yields. If lengthy processing is required, please copy the buffer and continue processing elsewhere. Note that this will wait for a new frame each time. There is no guarantee that we won't skip frames. :yield: the frames in sequence, as a `bytes` object containing JPEG data. .. py:method:: mjpeg_stream_response() -> MJPEGStreamResponse :async: Return a StreamingResponse that streams an MJPEG stream. This wraps each frame with the required header to make the multipart stream work, and sends it to the client via a streaming response. It is sufficient to show up as a video in an ``img`` tag, or to be streamed to disk as an MJPEG format video. :return: a streaming response in MJPEG format. .. py:method:: add_frame(frame: bytes) -> None Add a JPEG to the MJPEG stream. This function adds a frame to the stream. It may be called from threaded code, but uses an `anyio.from_thread.BlockingPortal` to call code in the `anyio` event loop, which is where notifications are handled. :param frame: The frame to add :raise ValueError: if the supplied frame does not start with the JPEG start bytes and end with the end bytes. .. py:method:: notify_new_frame(i: int) -> None :async: Notify any waiting tasks that a new frame is available. :param i: The number of the frame (which counts up since the server starts) .. py:method:: notify_stream_stopped() -> None :async: Raise an exception in any waiting tasks to signal the stream has stopped. This should be run only when streaming has stopped, i.e. ``self._streaming`` is ``False`` and an error will be raised if this isn't the case. :raises RuntimeError: if the stream is still streaming. .. py:class:: MJPEGStreamDescriptor(**kwargs: Any) A descriptor that returns a MJPEGStream object when accessed. If this descriptor is added to a `~lt.Thing`, it will create an `.MJPEGStream` object when it is first accessed. It will also add two HTTP endpoints, one with the name of the descriptor serving the MJPEG stream, and another with `/viewer` appended, which serves a basic HTML page that views the stream. This descriptor does not currently show up in the :ref:`wot_td`. Initialise an MJPEGStreamDescriptor. :param \**kwargs: keyword arguments are passed to the initialiser of `.MJPEGStream`. .. py:attribute:: _kwargs :type: Any .. py:method:: __set_name__(_owner: labthings_fastapi.thing.Thing, name: str) -> None Remember the name to which we are assigned. The name is important, as it will set the URL of the HTTP endpoint used to access the stream. :param _owner: the `~lt.Thing` to which we are attached. :param name: the name to which this descriptor is assigned. .. py:method:: __get__(obj: Literal[None], type: MJPEGStreamDescriptor.__get__.type | None = None) -> Self __get__(obj: labthings_fastapi.thing.Thing, type: MJPEGStreamDescriptor.__get__.type | None = None) -> MJPEGStream Return the MJPEG Stream, or the descriptor object. When accessed on the class, this ``__get__`` method will return the descriptor object. This allows LabThings to add it to the HTTP API. When accessed on the object, an `.MJPEGStream` is returned. :param obj: the host `~lt.Thing`, or ``None`` if accessed on the class. :param type: the class on which we are defined. :return: an `.MJPEGStream`, or this descriptor. .. py:method:: viewer_page(url: str) -> fastapi.responses.HTMLResponse :async: Generate a trivial viewer page for the stream. :param url: the URL of the stream. :return: a trivial HTML page that views the stream. .. py:method:: add_to_fastapi(app: fastapi.FastAPI, thing: labthings_fastapi.thing.Thing) -> None Add the stream to the FastAPI app. We create two endpoints, one for the MJPEG stream (using the name of the descriptor, relative to the host `~lt.Thing`) and one serving a basic viewer. The example code below would create endpoints at ``/camera/stream`` and ``/camera/stream/viewer``. .. code-block:: python import labthings_fastapi as lt class Camera(lt.Thing): stream = MJPEGStreamDescriptor() server = lt.ThingServer.from_things({"camera": Camera}) :param app: the `fastapi.FastAPI` application to which we are being added. :param thing: the host `~lt.Thing` instance.