labthings_fastapi.actions ========================= .. py:module:: labthings_fastapi.actions .. autoapi-nested-parse:: Actions module. :ref:`wot_actions` are represented by methods, decorated with the `.thing_action` decorator. See the :ref:`actions` documentation for a top-level overview of actions in LabThings-FastAPI. Developer notes --------------- Currently much of the code related to Actions is in `.thing_action` and the underlying `.ActionDescriptor`. This is likely to be refactored in the near future. Submodules ---------- .. toctree:: :maxdepth: 1 /autoapi/labthings_fastapi/actions/invocation_model/index Attributes ---------- .. autoapisummary:: labthings_fastapi.actions.blobdata_to_url_ctx labthings_fastapi.actions.ACTION_INVOCATIONS_PATH Exceptions ---------- .. autoapisummary:: labthings_fastapi.actions.InvocationCancelledError labthings_fastapi.actions.NoBlobManagerError Classes ------- .. autoapisummary:: labthings_fastapi.actions.EmptyInput labthings_fastapi.actions.LinkElement labthings_fastapi.actions.InvocationStatus labthings_fastapi.actions.LogRecordModel labthings_fastapi.actions.Invocation labthings_fastapi.actions.DequeLogHandler labthings_fastapi.actions.ActionManager Functions --------- .. autoapisummary:: labthings_fastapi.actions.model_to_dict Package Contents ---------------- .. py:function:: model_to_dict(model: Optional[pydantic.BaseModel]) -> Dict[str, Any] Convert a pydantic model to a dictionary, non-recursively. We convert only the top level model, i.e. we do not recurse into submodels. This is important to avoid serialising Blob objects in action inputs. This function returns `dict(model)`, with exceptions for the case of `None` (converted to an empty dictionary) and `pydantic.RootModel` (checked to see if they correspond to empty input). If `pydantic.RootModel` with non-empty input is allowed, this function will need to be updated to handle them. :param model: A Pydantic model (usually the input of an action). :return: A dictionary with string keys, which are the fields of the model. This should be suitable for using as ``**kwargs`` to an action. :raise ValueError: if we are given a root model that isn't empty. .. py:class:: EmptyInput(/, root=PydanticUndefined, **data) Bases: :py:obj:`pydantic.RootModel` Represent the input of an action that has no required parameters. This may be either a dictionary or ``None``. Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:attribute:: root :type: Optional[EmptyObject] :value: None .. py:class:: LinkElement(/, **data: Any) Bases: :py:obj:`pydantic.BaseModel` See https://www.w3.org/TR/wot-thing-description11/#link. Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:attribute:: href :type: AnyUri .. py:attribute:: type :type: Optional[str] :value: None .. py:attribute:: rel :type: Optional[str] :value: None .. py:attribute:: anchor :type: Optional[AnyUri] :value: None .. py:class:: InvocationStatus(*args, **kwds) Bases: :py:obj:`enum.Enum` The current status of an `.Invocation`. .. py:attribute:: PENDING :value: 'pending' The `.Invocation` has not yet been started. .. py:attribute:: RUNNING :value: 'running' The `.Invocation` is running in its thread. .. py:attribute:: COMPLETED :value: 'completed' The `.Invocation` finished successfully. A return value may be available. .. py:attribute:: CANCELLED :value: 'cancelled' The `.Invocation` was cancelled and has finished. .. py:attribute:: ERROR :value: 'error' The `.Invocation` terminated unexpectedly due to an error. .. py:class:: LogRecordModel(/, **data: Any) Bases: :py:obj:`pydantic.BaseModel` A model to serialise `logging.LogRecord` objects. Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:attribute:: model_config Configuration for the model, should be a dictionary conforming to [`ConfigDict`][pydantic.config.ConfigDict]. .. py:attribute:: message :type: str .. py:attribute:: levelname :type: str .. py:attribute:: levelno :type: int .. py:attribute:: lineno :type: int .. py:attribute:: filename :type: str .. py:attribute:: created :type: datetime.datetime .. py:method:: generate_message(data: Any) -> Any :classmethod: Ensure LogRecord objects have constructed their message. :param data: The LogRecord to process. :return: The LogRecord, with a message constructed. .. py:exception:: InvocationCancelledError Bases: :py:obj:`BaseException` An invocation was cancelled by the user. Note that this inherits from BaseException so won't be caught by `except Exception`, it must be handled specifically. Action code may want to handle cancellation gracefully. This exception should be propagated if the action's status should be reported as ``cancelled``, or it may be handled so that the action finishes, returns a value, and is marked as ``completed``. If this exception is handled, the `.CancelEvent` should be reset to allow another `.InvocationCancelledError` to be raised if the invocation receives a second cancellation signal. Initialize self. See help(type(self)) for accurate signature. .. py:data:: blobdata_to_url_ctx This context variable gives access to a function that makes BlobData objects downloadable, by assigning a URL and adding them to the [`BlobDataManager`](#labthings_fastapi.outputs.blob.BlobDataManager). It is only available within a [`blob_serialisation_context_manager`](#labthings_fastapi.outputs.blob.blob_serialisation_context_manager) because it requires access to the `BlobDataManager` and the `url_for` function from the FastAPI app. .. py:data:: ACTION_INVOCATIONS_PATH :value: '/action_invocations' The API route used to list `.Invocation` objects. .. py:exception:: NoBlobManagerError Bases: :py:obj:`RuntimeError` Raised if an API route accesses Invocation outputs without a BlobIOContextDep. Any access to an invocation output must have BlobIOContextDep as a dependency, as the output may be a blob, and the blob needs this context to resolve its URL. Initialize self. See help(type(self)) for accurate signature. .. py:class:: Invocation(action: labthings_fastapi.descriptors.ActionDescriptor, thing: labthings_fastapi.thing.Thing, id: uuid.UUID, input: Optional[pydantic.BaseModel] = None, dependencies: Optional[dict[str, Any]] = None, log_len: int = 1000, cancel_hook: Optional[labthings_fastapi.dependencies.invocation.CancelHook] = None) Bases: :py:obj:`threading.Thread` A Thread subclass that retains output values and tracks progress. `.Invocation` threads add several bits of functionality compared to the base `threading.Thread`. * They are instantiated with an `.ActionDescriptor` and a `.Thing` rather than a target function (see ``__init__``). * Each invocation is assigned a unique ``ID`` to allow it to be polled over HTTP. * A `.CancelHook` is provided to allow the invocation to stop gracefully if it is cancelled by the user. Create a thread to run an action and track its outputs. :param action: provides the function that we run, as well as metadata and type information. The descriptor is not bound to an object, so we supply the `.Thing` it's bound to when the function is run. :param thing: is the object on which we are running the ``action``, i.e. it is supplied to the function wrapped by ``action`` as the ``self`` argument. :param id: is a `uuid.UUID` used to identify the invocation, for example when polling its status via HTTP. :param input: is a `pydantic.BaseModel` representing the body of the HTTP request that invoked the action. It is supplied to the function as keyword arguments. :param dependencies: is a dictionary of keyword arguments, supplied by FastAPI by its dependency injection mechanism. :param log_len: sets the number of log entries that will be held in memory by the invocation's logger. :param cancel_hook: is a `threading.Event` subclass that tells the invocation it's time to stop. See `.CancelHook`. .. py:attribute:: action_ref .. py:attribute:: thing_ref .. py:attribute:: input :value: None .. py:attribute:: dependencies :value: None .. py:attribute:: cancel_hook :value: None .. py:attribute:: _ID .. py:attribute:: retention_time .. py:attribute:: expiry_time :type: Optional[datetime.datetime] :value: None .. py:attribute:: _status_lock .. py:attribute:: _status :type: invocation_model.InvocationStatus .. py:attribute:: _return_value :type: Optional[Any] :value: None .. py:attribute:: _request_time :type: datetime.datetime .. py:attribute:: _start_time :type: Optional[datetime.datetime] :value: None .. py:attribute:: _end_time :type: Optional[datetime.datetime] :value: None .. py:attribute:: _exception :type: Optional[Exception] :value: None .. py:attribute:: _log :type: collections.deque .. py:property:: id :type: uuid.UUID UUID for the thread. Note this not the same as the native thread ident. .. py:property:: output :type: Any Return value of the Action. If the Action is still running, returns None. :raise NoBlobManagerError: If this is called in a context where the blob manager context variables are not available. This stops errors being raised later once the blob is returned and tries to serialise. If the errors happen during serialisation the stack-trace will not clearly identify the route with the missing dependency. .. py:property:: log :type: list[invocation_model.LogRecordModel] A list of log items generated by the Action. .. py:property:: status :type: invocation_model.InvocationStatus Current running status of the thread. See `.InvocationStatus` for the values and their meanings. .. py:property:: action The `.ActionDescriptor` object running in this thread. .. py:property:: thing :type: labthings_fastapi.thing.Thing The `.Thing` to which the action is bound, i.e. this is ``self``. .. py:method:: cancel() -> None Cancel the task by requesting the code to stop. This is an opt-in feature: the action must use a `.CancelHook` dependency and periodically check it. .. py:method:: response(request: Optional[fastapi.Request] = None) -> invocation_model.InvocationModel Generate a representation of the invocation suitable for HTTP. When an invocation is polled, we return a JSON object that includes its status, any log entries, a return value (if completed), and a link to poll for updates. :param request: is used to generate the ``href`` in the response, which should retrieve an updated version of this response. :return: an `.InvocationModel` representing this `.Invocation`. .. py:method:: run() -> None Run the action and track progress. `.Invocation` overrides the default `threading.Thread.run` method to add ways to track its progress and capture the return value. The code to be run is the function wrapped in the `.ActionDescriptor` that is passed in as ``action``. Its arguments are the associated `.Thing` (the first argument, i.e. ``self``), the ``input`` model (split into keyword arguments for each field), and any ``dependencies`` (also as keyword arguments). We update the status of the action by setting ``self._status`` and emitting a changed event. This runs async code in the event loop that informs any clients listening over websockets that the event's status has changed. Logs are retained by a custom log handler, and are included when the `.Invocation` is serialised over HTTP. If exceptions are raised by the action code, these are caught and stored. The status is then set to ERROR and the thread terminates. See `.Invocation.status` for status values. :raise Exception: any exception raised in the action function will propagate through this method. Usually, this will just cause the thread to terminate after setting ``status`` to ``ERROR`` and saving the exception to ``self._exception``. .. py:class:: DequeLogHandler(dest: MutableSequence, level: int = logging.INFO) Bases: :py:obj:`logging.Handler` A log handler that stores entries in memory. Set up a log handler that appends messages to a deque. .. warning:: This log handler does not currently rotate or truncate the list - so if you use it on a thread that produces a lot of log messages, you may run into memory problems. Using a `.deque` with a finite capacity helps to mitigate this. :param dest: should specify a deque, to which we will append each log entry as it comes in. This is assumed to be thread safe. :param level: sets the level of the logger. For most invocations, a log level of `logging.INFO` is appropriate. .. py:attribute:: dest .. py:method:: emit(record: logging.LogRecord) -> None Save a log record to the destination deque. :param record: the `logging.LogRecord` object to add. .. py:class:: ActionManager A class to manage a collection of actions. Set up an `.ActionManager`. .. py:attribute:: _invocations .. py:attribute:: _invocations_lock .. py:property:: invocations :type: list[Invocation] A list of all the `.Invocation` objects running or recently completed. .. py:method:: append_invocation(invocation: Invocation) -> None Add an `.Invocation` to the `.ActionManager`. :param invocation: The `.Invocation` to add. .. py:method:: invoke_action(action: labthings_fastapi.descriptors.ActionDescriptor, thing: labthings_fastapi.thing.Thing, id: uuid.UUID, input: Any, dependencies: dict[str, Any], cancel_hook: labthings_fastapi.dependencies.invocation.CancelHook) -> Invocation Invoke an action, returning the thread where it's running. See `.Invocation` for more details. :param action: provides the function that we run, as well as metadata and type information. The descriptor is not bound to an object, so we supply the `.Thing` it's bound to when the function is run. :param thing: is the object on which we are running the ``action``, i.e. it is supplied to the function wrapped by ``action`` as the ``self`` argument. :param id: is a `uuid.UUID` used to identify the invocation, for example when polling its status via HTTP. :param input: is a `pydantic.BaseModel` representing the body of the HTTP request that invoked the action. It is supplied to the function as keyword arguments. :param dependencies: is a dictionary of keyword arguments, supplied by FastAPI by its dependency injection mechanism. :param cancel_hook: is a `threading.Event` subclass that tells the invocation it's time to stop. See `.CancelHook`. :return: an `.Invocation` object that has been started. .. py:method:: get_invocation(id: uuid.UUID) -> Invocation Retrieve an invocation by ID. :param id: the unique ID of the action to retrieve. :return: the `.Invocation` object. .. py:method:: list_invocations(action: Optional[labthings_fastapi.descriptors.ActionDescriptor] = None, thing: Optional[labthings_fastapi.thing.Thing] = None, request: Optional[fastapi.Request] = None) -> list[invocation_model.InvocationModel] All of the invocations currently managed. Returns a list of `.InvocationModel` instances representing all the invocations that are currently running, or have recently completed and not yet expired. :param action: filters out only the invocations of a particular `.ActionDescriptor`. Note that if there are two Things of the same subclass, filtering by action will return invocations on either `.Thing`. :param thing: returns only invocations of actions on a particular `.Thing`. This will often be combined with filtering by ``action`` to give the list of invocations returned by a GET request on an action endpoint. :param request: is used to pass a `fastapi.Request` object to the `.Invocation.response` method. Doing so ensures the URL returned as ``href`` in the response matches the address used to communicate with the server (i.e. it uses `fastapi.Request.url_for` instead of a path generated from a string). :return: A list of invocations, optionally filtered by Thing and/or Action. .. py:method:: expire_invocations() Delete invocations that have passed their expiry time. .. py:method:: attach_to_app(app: fastapi.FastAPI) -> None Add /action_invocations and /action_invocation/{id} endpoints to FastAPI. :param app: The `fastapi.FastAPI` application to which we add the endpoints.