labthings_fastapi.actions

Submodules

Package Contents

Classes

Invocation

A Thread subclass that retains output values and tracks progress

DequeLogHandler

ActionManager

A class to manage a collection of actions

Data

ACTION_INVOCATIONS_PATH

API

labthings_fastapi.actions.ACTION_INVOCATIONS_PATH

‘/action_invocations’

class labthings_fastapi.actions.Invocation(action: labthings_fastapi.descriptors.ActionDescriptor, thing: labthings_fastapi.thing.Thing, input: Optional[pydantic.BaseModel] = None, dependencies: Optional[dict[str, Any]] = None, default_stop_timeout: float = 5, log_len: int = 1000, id: Optional[uuid.UUID] = None, cancel_hook: Optional[labthings_fastapi.dependencies.invocation.CancelHook] = None)

Bases: threading.Thread

A Thread subclass that retains output values and tracks progress

TODO: In the future this should probably not be a Thread subclass, but might run in a thread anyway.

__init__(action: labthings_fastapi.descriptors.ActionDescriptor, thing: labthings_fastapi.thing.Thing, input: Optional[pydantic.BaseModel] = None, dependencies: Optional[dict[str, Any]] = None, default_stop_timeout: float = 5, log_len: int = 1000, id: Optional[uuid.UUID] = None, cancel_hook: Optional[labthings_fastapi.dependencies.invocation.CancelHook] = None)
property id: uuid.UUID

UUID for the thread. Note this not the same as the native thread ident.

property output: Any

Return value of the Action. If the Action is still running, returns None.

property log

A list of log items generated by the Action.

property status: labthings_fastapi.actions.invocation_model.InvocationStatus

Current running status of the thread.

============== ============================================= Status Meaning ============== ============================================= pending Not yet started running Currently in-progress completed Finished without error cancelled Thread stopped after a cancel request error Exception occurred in thread ============== =============================================

property action
property thing
cancel()

Cancel the task by requesting the code to stop

This is very much not guaranteed to work: the action must use a CancelHook dependency and periodically check it.

response(request: Optional[fastapi.Request] = None)
run()

Overrides default threading.Thread run() method

class labthings_fastapi.actions.DequeLogHandler(dest: MutableSequence, level=logging.INFO)

Bases: logging.Handler

__init__(dest: MutableSequence, level=logging.INFO)

Set up a log handler that appends messages to a list.

This log handler will first filter by thread, if one is supplied. This should be a threading.Thread object. Only log entries from the specified thread will be saved.

dest should specify a deque, to which we will append each log entry as it comes in. This is assumed to be thread safe.

NB this log handler does not currently rotate or truncate the list - so if you use it on a thread that produces a lot of log messages, you may run into memory problems.

emit(record)

Save a log record to the destination deque

class labthings_fastapi.actions.ActionManager

A class to manage a collection of actions

__init__()
property invocations
append_invocation(invocation: labthings_fastapi.actions.Invocation)
invoke_action(action: labthings_fastapi.descriptors.ActionDescriptor, thing: labthings_fastapi.thing.Thing, id: uuid.UUID, input: Any, dependencies: dict[str, Any], cancel_hook: labthings_fastapi.dependencies.invocation.CancelHook) labthings_fastapi.actions.Invocation

Invoke an action, returning the thread where it’s running

get_invocation(id: uuid.UUID) labthings_fastapi.actions.Invocation

Retrieve an invocation by ID

list_invocations(action: Optional[labthings_fastapi.descriptors.ActionDescriptor] = None, thing: Optional[labthings_fastapi.thing.Thing] = None, as_responses: bool = False, request: Optional[fastapi.Request] = None) list[labthings_fastapi.actions.invocation_model.InvocationModel]

All of the invocations currently managed

expire_invocations()

Delete invocations that have passed their expiry time

attach_to_app(app: fastapi.FastAPI)

Add /action_invocations and /action_invocation/{id} endpoints to FastAPI