# {py:mod}`labthings_fastapi.outputs.mjpeg_stream` ```{py:module} labthings_fastapi.outputs.mjpeg_stream ``` ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream :allowtitles: ``` ## Module Contents ### Classes ````{list-table} :class: autosummary longtable :align: left * - {py:obj}`RingbufferEntry ` - ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry :summary: ``` * - {py:obj}`MJPEGStreamResponse ` - ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse :summary: ``` * - {py:obj}`MJPEGStream ` - ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream :summary: ``` * - {py:obj}`MJPEGStreamDescriptor ` - ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor :summary: ``` ```` ### API `````{py:class} RingbufferEntry :canonical: labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry ``` ````{py:attribute} frame :canonical: labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.frame :type: bytes :value: > None ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.frame ``` ```` ````{py:attribute} timestamp :canonical: labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.timestamp :type: datetime.datetime :value: > None ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.timestamp ``` ```` ````{py:attribute} index :canonical: labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.index :type: int :value: > None ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry.index ``` ```` ````` `````{py:class} MJPEGStreamResponse(gen: typing.AsyncGenerator[bytes, None], status_code: int = 200) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse Bases: {py:obj}`fastapi.responses.StreamingResponse` ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse ``` ````{py:attribute} media_type :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.media_type :value: > 'multipart/x-mixed-replace; boundary=frame' ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.media_type ``` ```` ````{py:method} __init__(gen: typing.AsyncGenerator[bytes, None], status_code: int = 200) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.__init__ ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.__init__ ``` ```` ````{py:method} mjpeg_async_generator() -> typing.AsyncGenerator[bytes, None] :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.mjpeg_async_generator :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse.mjpeg_async_generator ``` ```` ````` `````{py:class} MJPEGStream(ringbuffer_size: int = 10) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream ``` ````{py:method} __init__(ringbuffer_size: int = 10) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.__init__ ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.__init__ ``` ```` ````{py:method} reset(ringbuffer_size: typing.Optional[int] = None) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.reset ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.reset ``` ```` ````{py:method} stop(portal: anyio.from_thread.BlockingPortal) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.stop ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.stop ``` ```` ````{py:method} ringbuffer_entry(i: int) -> labthings_fastapi.outputs.mjpeg_stream.RingbufferEntry :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.ringbuffer_entry :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.ringbuffer_entry ``` ```` ````{py:method} buffer_for_reading(i: int) -> typing.AsyncIterator[bytes] :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.buffer_for_reading :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.buffer_for_reading ``` ```` ````{py:method} next_frame() -> int :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.next_frame :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.next_frame ``` ```` ````{py:method} grab_frame() -> bytes :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.grab_frame :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.grab_frame ``` ```` ````{py:method} next_frame_size() -> int :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.next_frame_size :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.next_frame_size ``` ```` ````{py:method} frame_async_generator() -> typing.AsyncGenerator[bytes, None] :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.frame_async_generator :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.frame_async_generator ``` ```` ````{py:method} mjpeg_stream_response() -> labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamResponse :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.mjpeg_stream_response :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.mjpeg_stream_response ``` ```` ````{py:method} add_frame(frame: bytes, portal: anyio.from_thread.BlockingPortal) -> None :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.add_frame ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.add_frame ``` ```` ````{py:method} notify_new_frame(i: int) -> None :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.notify_new_frame :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.notify_new_frame ``` ```` ````{py:method} notify_stream_stopped() -> None :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.notify_stream_stopped :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStream.notify_stream_stopped ``` ```` ````` `````{py:class} MJPEGStreamDescriptor(**kwargs) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor ``` ````{py:method} __init__(**kwargs) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__init__ ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__init__ ``` ```` ````{py:method} __set_name__(owner, name) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__set_name__ ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__set_name__ ``` ```` ````{py:method} __get__(obj: typing.Optional[labthings_fastapi.thing.Thing], type=None) -> typing.Union[labthings_fastapi.outputs.mjpeg_stream.MJPEGStream, typing_extensions.Self] :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__get__ ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.__get__ ``` ```` ````{py:method} viewer_page(url: str) -> fastapi.responses.HTMLResponse :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.viewer_page :async: ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.viewer_page ``` ```` ````{py:method} add_to_fastapi(app: fastapi.FastAPI, thing: labthings_fastapi.thing.Thing) :canonical: labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.add_to_fastapi ```{autodoc2-docstring} labthings_fastapi.outputs.mjpeg_stream.MJPEGStreamDescriptor.add_to_fastapi ``` ```` `````