Skip to content

API Reference

pydase.data_service

DataService

DataService()

Bases: AbstractDataService

Source code in src/pydase/data_service/data_service.py
def __init__(self) -> None:
    super().__init__()
    self.__check_instance_classes()

serialize

serialize() -> SerializedObject

Serializes the instance into a dictionary, preserving the structure of the instance.

For each attribute, method, and property, the method includes its name, type, value, readonly status, and documentation if any in the resulting dictionary. Attributes and methods starting with an underscore are ignored.

For nested DataService instances, the method serializes recursively. For attributes of type list, each item in the list is serialized individually. If an item in the list is an instance of DataService, it is serialized recursively.

Returns:

Name Type Description
dict SerializedObject

The serialized instance.

Source code in src/pydase/data_service/data_service.py
def serialize(self) -> SerializedObject:
    """
    Serializes the instance into a dictionary, preserving the structure of the
    instance.

    For each attribute, method, and property, the method includes its name, type,
    value, readonly status, and documentation if any in the resulting dictionary.
    Attributes and methods starting with an underscore are ignored.

    For nested DataService instances, the method serializes recursively.
    For attributes of type list, each item in the list is serialized individually.
    If an item in the list is an instance of DataService, it is serialized
    recursively.

    Returns:
        dict: The serialized instance.
    """
    return Serializer.serialize_object(self)

pydase.data_service.data_service_cache

DataServiceCache

DataServiceCache(service: DataService)

Maintains a serialized cache of the current state of a DataService instance.

This class is responsible for storing and updating a representation of the service’s public attributes and properties. It is primarily used by the StateManager and the web server to serve consistent state to clients without accessing the DataService attributes directly.

The cache is initialized once upon construction by serializing the full state of the service. After that, it can be incrementally updated using attribute paths and values as notified by the DataServiceObserver.

Parameters:

Name Type Description Default
service DataService

The DataService instance whose state should be cached.

required
Source code in src/pydase/data_service/data_service_cache.py
def __init__(self, service: "DataService") -> None:
    self._cache: SerializedObject
    self.service = service
    self._initialize_cache()

pydase.data_service.data_service_observer

DataServiceObserver

DataServiceObserver(state_manager: StateManager)

Bases: PropertyObserver

Source code in src/pydase/data_service/data_service_observer.py
def __init__(self, state_manager: StateManager) -> None:
    self.state_manager = state_manager
    self._notification_callbacks: list[
        Callable[[str, Any, SerializedObject], None]
    ] = []
    super().__init__(state_manager.service)

add_notification_callback

add_notification_callback(callback: Callable[[str, Any, SerializedObject], None]) -> None

Registers a callback function to be invoked upon attribute changes in the observed object.

This method allows for the addition of custom callback functions that will be executed whenever there is a change in the value of an observed attribute. The callback function is called with detailed information about the change, enabling external logic to respond to specific state changes within the observable object.

Parameters:

Name Type Description Default
callback Callable[[str, Any, SerializedObject], None]

The callback function to be registered. The function should have the following signature:

  • full_access_path (str): The full dot-notation access path of the changed attribute. This path indicates the location of the changed attribute within the observable object’s structure.
  • value (Any): The new value of the changed attribute.
  • cached_value_dict (dict[str, Any]): A dictionary representing the cached state of the attribute prior to the change. This can be useful for understanding the nature of the change and for historical comparison.
required
Source code in src/pydase/data_service/data_service_observer.py
def add_notification_callback(
    self, callback: Callable[[str, Any, SerializedObject], None]
) -> None:
    """
    Registers a callback function to be invoked upon attribute changes in the
    observed object.

    This method allows for the addition of custom callback functions that will be
    executed whenever there is a change in the value of an observed attribute. The
    callback function is called with detailed information about the change, enabling
    external logic to respond to specific state changes within the observable
    object.

    Args:
        callback:
            The callback function to be registered. The function should have the
            following signature:

            - full_access_path (str): The full dot-notation access path of the
              changed attribute. This path indicates the location of the changed
              attribute within the observable object's structure.
            - value (Any): The new value of the changed attribute.
            - cached_value_dict (dict[str, Any]): A dictionary representing the
              cached state of the attribute prior to the change. This can be useful
              for understanding the nature of the change and for historical
              comparison.
    """
    self._notification_callbacks.append(callback)

pydase.data_service.state_manager

StateManager

StateManager(service: DataService, filename: str | Path | None = None, autosave_interval: float | None = None)

Manages the state of a DataService instance, serving as both a cache and a persistence layer. It provides fast access to the most recently known state of the service and ensures consistent state updates across connected clients and service restarts.

The StateManager is used by the web server to apply updates to service attributes and to serve the current state to newly connected clients. Internally, it creates a DataServiceCache instance to track the state of public attributes and properties.

The StateManager also handles state persistence: it can load a previously saved state from disk at startup and periodically autosave the current state to a file during runtime.

Parameters:

Name Type Description Default
service DataService

The DataService instance whose state is being managed.

required
filename str | Path | None

The file name used for loading and storing the DataService’s state. If provided, the state is loaded from this file at startup and saved to it on shutdown or at regular intervals.

None
autosave_interval float | None

Interval in seconds between automatic state save events. If set to None, automatic saving is disabled.

None
Note

The StateManager does not autonomously poll hardware state. It relies on the service to perform such updates. The cache maintained by DataServiceCache reflects the last known state as notified by the DataServiceObserver, and is used by the web interface to provide fast and accurate state rendering for connected clients.

Source code in src/pydase/data_service/state_manager.py
def __init__(
    self,
    service: "DataService",
    filename: str | Path | None = None,
    autosave_interval: float | None = None,
) -> None:
    self.filename = getattr(service, "_filename", None)

    if filename is not None:
        if self.filename is not None:
            logger.warning(
                "Overwriting filename '%s' with '%s'.", self.filename, filename
            )
        self.filename = filename

    self.service = service
    self.cache_manager = DataServiceCache(self.service)
    self.autosave_interval = autosave_interval

cache_value property

cache_value: dict[str, SerializedObject]

Returns the “value” value of the DataService serialization.

__is_loadable_state_attribute

__is_loadable_state_attribute(full_access_path: str) -> bool

Checks if an attribute defined by a dot-separated path should be loaded from storage.

For properties, it verifies the presence of the ‘@load_state’ decorator. Regular attributes default to being loadable.

Source code in src/pydase/data_service/state_manager.py
def __is_loadable_state_attribute(self, full_access_path: str) -> bool:
    """Checks if an attribute defined by a dot-separated path should be loaded from
    storage.

    For properties, it verifies the presence of the '@load_state' decorator. Regular
    attributes default to being loadable.
    """

    path_parts = parse_full_access_path(full_access_path)
    parent_object = get_object_by_path_parts(self.service, path_parts[:-1])

    if is_property_attribute(parent_object, path_parts[-1]):
        prop = getattr(type(parent_object), path_parts[-1])
        has_decorator = has_load_state_decorator(prop)
        if not has_decorator:
            logger.debug(
                "Property '%s' has no '@load_state' decorator. "
                "Ignoring value from JSON file...",
                path_parts[-1],
            )
        return has_decorator

    try:
        cached_serialization_dict = self.cache_manager.get_value_dict_from_cache(
            full_access_path
        )

        if cached_serialization_dict["value"] == "method":
            return False

        # nested objects cannot be loaded
        return not serialized_dict_is_nested_object(cached_serialization_dict)
    except SerializationPathError:
        logger.debug(
            "Path %a could not be loaded. It does not correspond to an attribute of"
            " the class. Ignoring value from JSON file...",
            path_parts[-1],
        )
        return False

autosave async

autosave() -> None

Periodically saves the current service state to the configured file.

This coroutine is automatically started by the pydase.Server when a filename is provided. It runs in the background and writes the latest known state of the service to disk every autosave_interval seconds.

If autosave_interval is set to None, autosaving is disabled and this coroutine exits immediately.

Source code in src/pydase/data_service/state_manager.py
async def autosave(self) -> None:
    """Periodically saves the current service state to the configured file.

    This coroutine is automatically started by the [`pydase.Server`][pydase.Server]
    when a filename is provided. It runs in the background and writes the latest
    known state of the service to disk every `autosave_interval` seconds.

    If `autosave_interval` is set to `None`, autosaving is disabled and this
    coroutine exits immediately.
    """

    if self.autosave_interval is None:
        return

    while True:
        try:
            if self.filename is not None:
                self.save_state()
            await asyncio.sleep(self.autosave_interval)
        except Exception as e:
            logger.exception(e)

load_state

load_state() -> None

Loads the DataService’s state from a JSON file defined by self.filename. Updates the service’s attributes, respecting type and read-only constraints.

Source code in src/pydase/data_service/state_manager.py
def load_state(self) -> None:
    """Loads the DataService's state from a JSON file defined by `self.filename`.
    Updates the service's attributes, respecting type and read-only constraints.
    """

    # Traverse the serialized representation and set the attributes of the class
    json_dict = self._get_state_dict_from_json_file()
    if json_dict == {}:
        logger.debug("Could not load the service state.")
        return

    for path in generate_serialized_data_paths(json_dict):
        if self.__is_loadable_state_attribute(path):
            nested_json_dict = get_nested_dict_by_path(json_dict, path)
            try:
                nested_class_dict = self.cache_manager.get_value_dict_from_cache(
                    path
                )
            except (SerializationPathError, KeyError):
                nested_class_dict = {
                    "full_access_path": path,
                    "value": None,
                    "type": "None",
                    "doc": None,
                    "readonly": False,
                }

            value_type = nested_json_dict["type"]
            class_attr_value_type = nested_class_dict.get("type", None)

            if class_attr_value_type == value_type:
                self.set_service_attribute_value_by_path(path, nested_json_dict)
            else:
                logger.info(
                    "Attribute type of '%s' changed from '%s' to "
                    "'%s'. Ignoring value from JSON file...",
                    path,
                    value_type,
                    class_attr_value_type,
                )

save_state

save_state() -> None

Saves the DataService’s current state to a JSON file defined by self.filename.

Source code in src/pydase/data_service/state_manager.py
def save_state(self) -> None:
    """Saves the DataService's current state to a JSON file defined by
    `self.filename`.
    """

    if self.filename is not None:
        with open(self.filename, "w") as f:
            json.dump(self.cache_value, f, indent=4)
    else:
        logger.debug(
            "State manager was not initialised with a filename. Skipping "
            "'save_state'..."
        )

set_service_attribute_value_by_path

set_service_attribute_value_by_path(path: str, serialized_value: SerializedObject) -> None

Sets the value of an attribute in the service managed by the StateManager given its path as a dot-separated string.

This method updates the attribute specified by ‘path’ with ‘value’ only if the attribute is not read-only and the new value differs from the current one. It also handles type-specific conversions for the new value before setting it.

Parameters:

Name Type Description Default
path str

A dot-separated string indicating the hierarchical path to the attribute.

required
serialized_value SerializedObject

The serialized representation of the new value to set for the attribute.

required
Source code in src/pydase/data_service/state_manager.py
def set_service_attribute_value_by_path(
    self,
    path: str,
    serialized_value: SerializedObject,
) -> None:
    """Sets the value of an attribute in the service managed by the `StateManager`
    given its path as a dot-separated string.

    This method updates the attribute specified by 'path' with 'value' only if the
    attribute is not read-only and the new value differs from the current one.
    It also handles type-specific conversions for the new value before setting it.

    Args:
        path:
            A dot-separated string indicating the hierarchical path to the
            attribute.
        serialized_value:
            The serialized representation of the new value to set for the attribute.
    """

    try:
        current_value_dict = self.cache_manager.get_value_dict_from_cache(path)
    except (SerializationPathError, KeyError):
        current_value_dict = {
            "full_access_path": path,
            "value": None,
            "type": "None",
            "doc": None,
            "readonly": False,
        }

    if "full_access_path" not in serialized_value:
        # Backwards compatibility for JSON files not containing the
        # full_access_path
        logger.warning(
            "The format of your JSON file is out-of-date. This might lead "
            "to unexpected errors. Please consider updating it."
        )
        serialized_value["full_access_path"] = current_value_dict[
            "full_access_path"
        ]

    # only set value when it has changed
    if self.__attr_value_has_changed(serialized_value, current_value_dict):
        self.__update_attribute_by_path(path, serialized_value)
    else:
        logger.debug("Value of attribute '%s' has not changed...", path)

has_load_state_decorator

has_load_state_decorator(prop: property) -> bool

Determines if the property’s setter method is decorated with the @load_state decorator.

Source code in src/pydase/data_service/state_manager.py
def has_load_state_decorator(prop: property) -> bool:
    """Determines if the property's setter method is decorated with the `@load_state`
    decorator.
    """

    try:
        return prop.fset._load_state  # type: ignore[union-attr]
    except AttributeError:
        return False

load_state

load_state(func: Callable[..., Any]) -> Callable[..., Any]

This function should be used as a decorator on property setters to indicate that the value should be loaded from the JSON file.

Example
class Service(pydase.DataService):
    _name = "Service"

    @property
    def name(self) -> str:
        return self._name

    @name.setter
    @load_state
    def name(self, value: str) -> None:
        self._name = value
Source code in src/pydase/data_service/state_manager.py
def load_state(func: Callable[..., Any]) -> Callable[..., Any]:
    """This function should be used as a decorator on property setters to indicate that
    the value should be loaded from the JSON file.

    Example:
        ```python
        class Service(pydase.DataService):
            _name = "Service"

            @property
            def name(self) -> str:
                return self._name

            @name.setter
            @load_state
            def name(self, value: str) -> None:
                self._name = value
        ```
    """

    func._load_state = True  # type: ignore[attr-defined]
    return func

pydase.server.server

AdditionalServer

Bases: TypedDict

A TypedDict that represents the configuration for an additional server to be run alongside the main server.

kwargs instance-attribute

kwargs: dict[str, Any]

Additional keyword arguments that will be passed to the server’s constructor

port instance-attribute

port: int

Port on which the server should run.

server instance-attribute

Server adhering to the AdditionalServerProtocol.

AdditionalServerProtocol

AdditionalServerProtocol(data_service_observer: DataServiceObserver, host: str, port: int, **kwargs: Any)

Bases: Protocol

A Protocol that defines the interface for additional servers.

This protocol sets the standard for how additional servers should be implemented to ensure compatibility with the main Server class. The protocol requires that any server implementing it should have an init method for initialization and a serve method for starting the server.

Parameters:

Name Type Description Default
data_service_observer DataServiceObserver

Observer for the DataService, handling state updates and communication to connected clients through injected callbacks. Can be utilized to access the service and state manager, and to add custom state-update callbacks.

required
host str

Hostname or IP address where the server is accessible. Commonly ‘0.0.0.0’ to bind to all network interfaces.

required
port int

Port number on which the server listens. Typically in the range 1024-65535 (non-standard ports).

required
**kwargs Any

Any additional parameters required for initializing the server. These parameters are specific to the server’s implementation.

{}
Source code in src/pydase/server/server.py
def __init__(
    self,
    data_service_observer: DataServiceObserver,
    host: str,
    port: int,
    **kwargs: Any,
) -> None: ...

serve async

serve() -> Any

Starts the server. This method should be implemented as an asynchronous method, which means that it should be able to run concurrently with other tasks.

Source code in src/pydase/server/server.py
async def serve(self) -> Any:
    """Starts the server. This method should be implemented as an asynchronous
    method, which means that it should be able to run concurrently with other tasks.
    """

Server

Server(service: DataService, host: str = '0.0.0.0', web_port: int | None = None, enable_web: bool = True, filename: str | Path | None = None, additional_servers: list[AdditionalServer] | None = None, autosave_interval: float = 30.0, **kwargs: Any)

The Server class provides a flexible server implementation for the DataService.

Parameters:

Name Type Description Default
service DataService

The DataService instance that this server will manage.

required
host str

The host address for the server. Defaults to '0.0.0.0', which means all available network interfaces.

'0.0.0.0'
web_port int | None

The port number for the web server. If set to None, it will use the port defined in ServiceConfig().web_port. Defaults to None.

None
enable_web bool

Whether to enable the web server.

True
filename str | Path | None

Filename of the file managing the service state persistence.

None
additional_servers list[AdditionalServer] | None

A list of additional servers to run alongside the main server. Here’s an example of how you might define an additional server:

class MyCustomServer:
    def __init__(
        self,
        data_service_observer: DataServiceObserver,
        host: str,
        port: int,
        **kwargs: Any,
    ) -> None:
        self.observer = data_service_observer
        self.state_manager = self.observer.state_manager
        self.service = self.state_manager.service
        self.port = port
        self.host = host
        # handle any additional arguments...

    async def serve(self):
        # code to start the server...

And here’s how you might add it to the additional_servers list when creating a Server instance:

server = Server(
    service=my_data_service,
    additional_servers=[
        {
            "server": MyCustomServer,
            "port": 12345,
            "kwargs": {"some_arg": "some_value"}
        }
    ],
)
server.run()
None
autosave_interval float

Interval in seconds between automatic state save events. If set to None, automatic saving is disabled. Defaults to 30 seconds.

30.0
**kwargs Any

Additional keyword arguments.

{}

Advanced

  • post_startup hook:

    This method is intended to be overridden in subclasses. It runs immediately after all servers (web and additional) are initialized and before entering the main event loop. You can use this hook to register custom logic after the server is fully started.

Source code in src/pydase/server/server.py
def __init__(  # noqa: PLR0913
    self,
    service: DataService,
    host: str = "0.0.0.0",
    web_port: int | None = None,
    enable_web: bool = True,
    filename: str | Path | None = None,
    additional_servers: list[AdditionalServer] | None = None,
    autosave_interval: float = 30.0,
    **kwargs: Any,
) -> None:
    if additional_servers is None:
        additional_servers = []
    self._service = service
    self._host = host
    if web_port is None:
        self._web_port = ServiceConfig().web_port
    else:
        self._web_port = web_port
    self._enable_web = enable_web
    self._kwargs = kwargs
    self._additional_servers = additional_servers
    self.should_exit = False
    self.servers: dict[str, asyncio.Future[Any]] = {}

    self._loop = asyncio.new_event_loop()
    asyncio.set_event_loop(self._loop)

    self._state_manager = StateManager(
        service=self._service,
        filename=filename,
        autosave_interval=autosave_interval,
    )
    self._observer = DataServiceObserver(self._state_manager)
    self._state_manager.load_state()
    autostart_service_tasks(self._service)

    self._web_server = WebServer(
        data_service_observer=self._observer,
        host=self._host,
        port=self._web_port,
        enable_frontend=self._enable_web,
        **self._kwargs,
    )

post_startup async

post_startup() -> None

Override this in a subclass to register custom logic after startup.

Source code in src/pydase/server/server.py
async def post_startup(self) -> None:
    """Override this in a subclass to register custom logic after startup."""

run

run() -> None

Initializes the asyncio event loop and starts the server.

This method should be called to start the server after it’s been instantiated.

Source code in src/pydase/server/server.py
def run(self) -> None:
    """
    Initializes the asyncio event loop and starts the server.

    This method should be called to start the server after it's been instantiated.
    """
    try:
        self._loop.run_until_complete(self.serve())
    finally:
        self._loop.close()

pydase.server.web_server

WebServer

WebServer(data_service_observer: DataServiceObserver, host: str, port: int, *, enable_frontend: bool = True, css: str | Path | None = None, favicon_path: str | Path | None = None, enable_cors: bool = True, config_dir: Path = ServiceConfig().config_dir, generate_web_settings: bool = WebServerConfig().generate_web_settings, frontend_src: Path = Path(__file__).parent.parent.parent / 'frontend')

Represents a web server that adheres to the AdditionalServerProtocol, designed to work with a DataService instance. This server facilitates client-server communication and state management through web protocols and socket connections.

The WebServer class initializes and manages a web server environment aiohttp and Socket.IO, allowing for HTTP and Socket.IO communications. It incorporates CORS (Cross-Origin Resource Sharing) support, custom CSS, and serves a static files directory. It also initializes web server settings based on configuration files or generates default settings if necessary.

Configuration for the web server (like service configuration directory and whether to generate new web settings) is determined in the following order of precedence:

  1. Values provided directly to the constructor.
  2. Environment variable settings (via configuration classes like ServiceConfig and WebServerConfig).
  3. Default values defined in the configuration classes.

Parameters:

Name Type Description Default
data_service_observer DataServiceObserver

Observer for the DataService, handling state updates and communication to connected clients.

required
host str

Hostname or IP address where the server is accessible. Commonly ‘0.0.0.0’ to bind to all network interfaces.

required
port int

Port number on which the server listens. Typically in the range 1024-65535 (non-standard ports).

required
css str | Path | None

Path to a custom CSS file for styling the frontend. If None, no custom styles are applied. Defaults to None.

None
favicon_path str | Path | None

Path to a custom favicon.ico file. Defaults to None.

None
enable_cors bool

Flag to enable or disable CORS policy. When True, CORS is enabled, allowing cross-origin requests. Defaults to True.

True
config_dir Path

Path to the configuration directory where the web settings will be stored. Defaults to ServiceConfig().config_dir.

config_dir
generate_web_settings bool

Flag to enable or disable generation of new web settings if the configuration file is missing. Defaults to WebServerConfig().generate_web_settings.

generate_web_settings
Source code in src/pydase/server/web_server/web_server.py
def __init__(  # noqa: PLR0913
    self,
    data_service_observer: DataServiceObserver,
    host: str,
    port: int,
    *,
    enable_frontend: bool = True,
    css: str | Path | None = None,
    favicon_path: str | Path | None = None,
    enable_cors: bool = True,
    config_dir: Path = ServiceConfig().config_dir,
    generate_web_settings: bool = WebServerConfig().generate_web_settings,
    frontend_src: Path = Path(__file__).parent.parent.parent / "frontend",
) -> None:
    self.observer = data_service_observer
    self.state_manager = self.observer.state_manager
    self.service = self.state_manager.service
    self.port = port
    self.host = host
    self.css = css
    self.enable_cors = enable_cors
    self.frontend_src = frontend_src
    self.favicon_path: Path | str = favicon_path  # type: ignore
    self.enable_frontend = enable_frontend

    if self.favicon_path is None:
        self.favicon_path = self.frontend_src / "favicon.ico"

    self._service_config_dir = config_dir
    self._generate_web_settings = generate_web_settings
    self._loop = asyncio.get_event_loop()
    self._sio = setup_sio_server(self.observer, self.enable_cors, self._loop)
    self._initialise_configuration()

pydase.client

Client

Client(*, url: str, block_until_connected: bool = True, sio_client_kwargs: dict[str, Any] = {}, client_id: str | None = None, proxy_url: str | None = None, auto_update_proxy: bool = True)

A client for connecting to a remote pydase service using Socket.IO. This client handles asynchronous communication with a service, manages events such as connection, disconnection, and updates, and ensures that the proxy object is up-to-date with the server state.

Parameters:

Name Type Description Default
url str

The URL of the pydase Socket.IO server. This should always contain the protocol (e.g., ws or wss) and the hostname, and can optionally include a path prefix (e.g., ws://localhost:8001/service).

required
block_until_connected bool

If set to True, the constructor will block until the connection to the service has been established. This is useful for ensuring the client is ready to use immediately after instantiation. Default is True.

True
sio_client_kwargs dict[str, Any]

Additional keyword arguments passed to the underlying [AsyncClient][socketio.AsyncClient]. This allows fine-tuning of the client’s behaviour (e.g., reconnection attempts or reconnection delay).

{}
client_id str | None

An optional client identifier. This ID is sent to the server as the X-Client-Id HTTP header. It can be used for logging or authentication purposes on the server side. If not provided, it defaults to the hostname of the machine running the client.

None
proxy_url str | None

An optional proxy URL to route the connection through. This is useful if the service is only reachable via an SSH tunnel or behind a firewall (e.g., socks5://localhost:2222).

None
auto_update_proxy bool

If False, disables automatic updates from the server. Useful for request-only clients where real-time synchronization is not needed.

True
Example

Connect to a service directly:

client = pydase.Client(url="ws://localhost:8001")

Connect over a secure connection:

client = pydase.Client(url="wss://my-service.example.com")

Connect using a SOCKS5 proxy (e.g., through an SSH tunnel):

ssh -D 2222 user@gateway.example.com
client = pydase.Client(
    url="ws://remote-server:8001",
    proxy_url="socks5://localhost:2222"
)
Source code in src/pydase/client/client.py
def __init__(  # noqa: PLR0913
    self,
    *,
    url: str,
    block_until_connected: bool = True,
    sio_client_kwargs: dict[str, Any] = {},
    client_id: str | None = None,
    proxy_url: str | None = None,
    auto_update_proxy: bool = True,  # new argument
):
    # Parse the URL to separate base URL and path prefix
    parsed_url = urllib.parse.urlparse(url)

    # Construct the base URL without the path
    self._base_url = urllib.parse.urlunparse(
        (parsed_url.scheme, parsed_url.netloc, "", "", "", "")
    )

    # Store the path prefix (e.g., "/service" in "ws://localhost:8081/service")
    self._path_prefix = parsed_url.path.rstrip("/")  # Remove trailing slash if any
    self._url = url
    self._proxy_url = proxy_url
    self._client_id = client_id or socket.gethostname()
    self._sio_client_kwargs = sio_client_kwargs
    self._loop: asyncio.AbstractEventLoop | None = None
    self._thread: threading.Thread | None = None
    self._auto_update_proxy = auto_update_proxy
    self.proxy: ProxyClass
    """A proxy object representing the remote service, facilitating interaction as
    if it were local."""
    self.connect(block_until_connected=block_until_connected)

proxy instance-attribute

proxy: ProxyClass

A proxy object representing the remote service, facilitating interaction as if it were local.

get_value

get_value(access_path: str) -> Any

Retrieve the current value of a remote attribute.

Parameters:

Name Type Description Default
access_path str

The dot-separated path to the attribute in the remote service.

required

Returns:

Type Description
Any

The deserialized value of the remote attribute, or None if the client is not

Any

connected.

Example
value = client.get_value("my_device.temperature")
print(value)
Source code in src/pydase/client/client.py
def get_value(self, access_path: str) -> Any:
    """Retrieve the current value of a remote attribute.

    Args:
        access_path: The dot-separated path to the attribute in the remote service.

    Returns:
        The deserialized value of the remote attribute, or None if the client is not
        connected.

    Example:
        ```python
        value = client.get_value("my_device.temperature")
        print(value)
        ```
    """

    if self._loop is not None:
        return get_value(
            sio_client=self._sio,
            loop=self._loop,
            access_path=access_path,
        )
    return None

trigger_method

trigger_method(access_path: str, *args: Any, **kwargs: Any) -> Any

Trigger a remote method with optional arguments.

Parameters:

Name Type Description Default
access_path str

The dot-separated path to the method in the remote service.

required
*args Any

Positional arguments to pass to the method.

()
**kwargs Any

Keyword arguments to pass to the method.

{}

Returns:

Type Description
Any

The return value of the method call, if any.

Example
result = client.trigger_method("my_device.calibrate", timeout=5)
print(result)
Source code in src/pydase/client/client.py
def trigger_method(self, access_path: str, *args: Any, **kwargs: Any) -> Any:
    """Trigger a remote method with optional arguments.

    Args:
        access_path: The dot-separated path to the method in the remote service.
        *args: Positional arguments to pass to the method.
        **kwargs: Keyword arguments to pass to the method.

    Returns:
        The return value of the method call, if any.

    Example:
        ```python
        result = client.trigger_method("my_device.calibrate", timeout=5)
        print(result)
        ```
    """

    if self._loop is not None:
        return trigger_method(
            sio_client=self._sio,
            loop=self._loop,
            access_path=access_path,
            args=list(args),
            kwargs=kwargs,
        )
    return None

update_value

update_value(access_path: str, new_value: Any) -> Any

Set a new value for a remote attribute.

Parameters:

Name Type Description Default
access_path str

The dot-separated path to the attribute in the remote service.

required
new_value Any

The new value to assign to the attribute.

required
Example
client.update_value("my_device.power", True)
Source code in src/pydase/client/client.py
def update_value(self, access_path: str, new_value: Any) -> Any:
    """Set a new value for a remote attribute.

    Args:
        access_path: The dot-separated path to the attribute in the remote service.
        new_value: The new value to assign to the attribute.

    Example:
        ```python
        client.update_value("my_device.power", True)
        ```
    """

    if self._loop is not None:
        update_value(
            sio_client=self._sio,
            loop=self._loop,
            access_path=access_path,
            value=new_value,
        )

pydase.components

The components module is a collection of specialized subclasses of the DataService class that are designed to model different types of user interface components. These classes can be used to represent the state of various UI elements in a data interface, and provide a simple way to interact with these elements programmatically.

Each class in the components module corresponds to a specific type of UI element, such as a slider, a file upload, a graph, etc. The state of these UI elements is maintained by the instance variables of the respective classes. This allows you to keep track of the user’s interactions with the UI elements and update your application’s state accordingly.

You can use the classes in the components module as attributes of a DataService subclass to model the state of your application’s UI. Here is an example of how to use the NumberSlider class:

from components import NumberSlider

class MyService(DataService):
    voltage = NumberSlider(1, 0, 10, 0.1)

# Then, you can modify or access the voltage value like this:
my_service = MyService()
my_service.voltage.value = 5
print(my_service.voltage.value)  # Output: 5

ColouredEnum

Bases: Enum

Represents a UI element that can display colour-coded text based on its value.

This class extends the standard Enum but requires its values to be valid CSS colour codes. Supported colour formats include:

  • Hexadecimal colours
  • Hexadecimal colours with transparency
  • RGB colours
  • RGBA colours
  • HSL colours
  • HSLA colours
  • Predefined/Cross-browser colour names

Refer to the this website for more details on colour formats: (https://www.w3schools.com/cssref/css_colours_legal.php)

The behavior of this component in the UI depends on how it’s defined in the data service:

  • As property with a setter or as attribute: Renders as a dropdown menu, allowing users to select and change its value from the frontend.
  • As property without a setter: Displays as a coloured box with the key of the ColouredEnum as text inside, serving as a visual indicator without user interaction.
Example
import pydase.components as pyc
import pydase

class MyStatus(pyc.ColouredEnum):
    PENDING = "#FFA500"  # Orange
    RUNNING = "#0000FF80"  # Transparent Blue
    PAUSED = "rgb(169, 169, 169)"  # Dark Gray
    RETRYING = "rgba(255, 255, 0, 0.3)"  # Transparent Yellow
    COMPLETED = "hsl(120, 100%, 50%)"  # Green
    FAILED = "hsla(0, 100%, 50%, 0.7)"  # Transparent Red
    CANCELLED = "SlateGray"  # Slate Gray

class StatusExample(pydase.DataService):
    _status = MyStatus.RUNNING

    @property
    def status(self) -> MyStatus:
        return self._status

    @status.setter
    def status(self, value: MyStatus) -> None:
        # Custom logic here...
        self._status = value

# Example usage:
my_service = StatusExample()
my_service.status = MyStatus.FAILED
Note

Each enumeration name and value must be unique. This means that you should use different colour formats when you want to use a colour multiple times.

DeviceConnection

DeviceConnection()

Bases: DataService

Base class for device connection management within the pydase framework.

This class serves as the foundation for subclasses that manage connections to specific devices. It implements automatic reconnection logic that periodically checks the device’s availability and attempts to reconnect if the connection is lost. The frequency of these checks is controlled by the _reconnection_wait_time attribute.

Subclassing

Users should primarily override the connect method to establish a connection to the device. This method should update the self._connected attribute to reflect the connection status:

class MyDeviceConnection(DeviceConnection):
    def connect(self) -> None:
        # Implementation to connect to the device
        # Update self._connected to `True` if connection is successful,
        # `False` otherwise
        ...

Optionally, if additional logic is needed to determine the connection status, the connected property can also be overridden:

class MyDeviceConnection(DeviceConnection):
    @property
    def connected(self) -> bool:
        # Custom logic to determine connection status
        return some_custom_condition
Frontend Representation

In the frontend, this class is represented without directly exposing the connect method and connected attribute. Instead, user-defined attributes, methods, and properties are displayed. When self.connected is False, the frontend component shows an overlay that allows manual triggering of the connect() method. This overlay disappears once the connection is successfully re-established.

Source code in src/pydase/components/device_connection.py
def __init__(self) -> None:
    super().__init__()
    self._connected = False
    self._reconnection_wait_time = 10.0

connected property

connected: bool

Indicates if the device is currently connected or was recently connected. Users may override this property to incorporate custom logic for determining the connection status.

connect

connect() -> None

Tries connecting to the device and changes self._connected status accordingly. This method is called every self._reconnection_wait_time seconds when self.connected is False. Users should override this method to implement device-specific connection logic.

Source code in src/pydase/components/device_connection.py
def connect(self) -> None:
    """Tries connecting to the device and changes `self._connected` status
    accordingly. This method is called every `self._reconnection_wait_time` seconds
    when `self.connected` is False. Users should override this method to implement
    device-specific connection logic.
    """

serialize

serialize() -> SerializedObject

Serializes the instance into a dictionary, preserving the structure of the instance.

For each attribute, method, and property, the method includes its name, type, value, readonly status, and documentation if any in the resulting dictionary. Attributes and methods starting with an underscore are ignored.

For nested DataService instances, the method serializes recursively. For attributes of type list, each item in the list is serialized individually. If an item in the list is an instance of DataService, it is serialized recursively.

Returns:

Name Type Description
dict SerializedObject

The serialized instance.

Source code in src/pydase/data_service/data_service.py
def serialize(self) -> SerializedObject:
    """
    Serializes the instance into a dictionary, preserving the structure of the
    instance.

    For each attribute, method, and property, the method includes its name, type,
    value, readonly status, and documentation if any in the resulting dictionary.
    Attributes and methods starting with an underscore are ignored.

    For nested DataService instances, the method serializes recursively.
    For attributes of type list, each item in the list is serialized individually.
    If an item in the list is an instance of DataService, it is serialized
    recursively.

    Returns:
        dict: The serialized instance.
    """
    return Serializer.serialize_object(self)

NumberSlider

NumberSlider(value: Any = 0.0, min_: Any = 0.0, max_: Any = 100.0, step_size: Any = 1.0)

Bases: DataService

This class models a UI slider for a data service, allowing for adjustments of a parameter within a specified range and increments.

Parameters:

Name Type Description Default
value Any

The initial value of the slider. Defaults to 0.0.

0.0
min_ Any

The minimum value of the slider. Defaults to 0.0.

0.0
max_ Any

The maximum value of the slider. Defaults to 100.0.

100.0
step_size Any

The increment/decrement step size of the slider. Defaults to 1.0.

1.0
Example
class MySlider(pydase.components.NumberSlider):
    def __init__(
        self,
        value: float = 0.0,
        min_: float = 0.0,
        max_: float = 100.0,
        step_size: float = 1.0,
    ) -> None:
        super().__init__(value, min_, max_, step_size)

    @property
    def min(self) -> float:
        return self._min

    @min.setter
    def min(self, value: float) -> None:
        self._min = value

    @property
    def max(self) -> float:
        return self._max

    @max.setter
    def max(self, value: float) -> None:
        self._max = value

    @property
    def step_size(self) -> float:
        return self._step_size

    @step_size.setter
    def step_size(self, value: float) -> None:
        self._step_size = value

    @property
    def value(self) -> float:
        return self._value

    @value.setter
    def value(self, value: float) -> None:
        if value < self._min or value > self._max:
            raise ValueError(
                "Value is either below allowed min or above max value."
            )

        self._value = value

class MyService(pydase.DataService):
    def __init__(self) -> None:
        self.voltage = MyService()

# Modifying or accessing the voltage value:
my_service = MyService()
my_service.voltage.value = 5
print(my_service.voltage.value)  # Output: 5
Source code in src/pydase/components/number_slider.py
def __init__(
    self,
    value: Any = 0.0,
    min_: Any = 0.0,
    max_: Any = 100.0,
    step_size: Any = 1.0,
) -> None:
    super().__init__()
    self._step_size = step_size
    self._value = value
    self._min = min_
    self._max = max_

max property

max: Any

The min property.

min property

min: Any

The min property.

step_size property

step_size: Any

The min property.

value property writable

value: Any

The value property.

serialize

serialize() -> SerializedObject

Serializes the instance into a dictionary, preserving the structure of the instance.

For each attribute, method, and property, the method includes its name, type, value, readonly status, and documentation if any in the resulting dictionary. Attributes and methods starting with an underscore are ignored.

For nested DataService instances, the method serializes recursively. For attributes of type list, each item in the list is serialized individually. If an item in the list is an instance of DataService, it is serialized recursively.

Returns:

Name Type Description
dict SerializedObject

The serialized instance.

Source code in src/pydase/data_service/data_service.py
def serialize(self) -> SerializedObject:
    """
    Serializes the instance into a dictionary, preserving the structure of the
    instance.

    For each attribute, method, and property, the method includes its name, type,
    value, readonly status, and documentation if any in the resulting dictionary.
    Attributes and methods starting with an underscore are ignored.

    For nested DataService instances, the method serializes recursively.
    For attributes of type list, each item in the list is serialized individually.
    If an item in the list is an instance of DataService, it is serialized
    recursively.

    Returns:
        dict: The serialized instance.
    """
    return Serializer.serialize_object(self)

pydase.task

autostart

autostart_service_tasks

autostart_service_tasks(service: DataService) -> None

Starts the service tasks defined with the autostart keyword argument.

This method goes through the attributes of the passed service and its nested DataService instances and calls the start method on autostart-tasks.

Source code in src/pydase/task/autostart.py
def autostart_service_tasks(
    service: pydase.data_service.data_service.DataService,
) -> None:
    """Starts the service tasks defined with the `autostart` keyword argument.

    This method goes through the attributes of the passed service and its nested
    [`DataService`][pydase.DataService] instances and calls the start method on
    autostart-tasks.
    """

    for attr in dir(service):
        if is_property_attribute(service, attr) or attr in {
            "_observers",
            "__dict__",
        }:  # prevent eval of property attrs and recursion
            continue

        val = getattr(service, attr)
        if isinstance(val, pydase.task.task.Task):
            if val.autostart and val.status == TaskStatus.NOT_RUNNING:
                val.start()
            else:
                continue
        else:
            autostart_nested_service_tasks(val)

decorator

PerInstanceTaskDescriptor

PerInstanceTaskDescriptor(func: Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]], autostart: bool, restart_on_exception: bool, restart_sec: float, start_limit_interval_sec: float | None, start_limit_burst: int, exit_on_failure: bool)

Bases: Generic[R]

A descriptor class that provides a unique Task object for each instance of a DataService class.

The PerInstanceTaskDescriptor is used to transform an asynchronous function into a task that is managed independently for each instance of a DataService subclass. This allows tasks to be initialized, started, and stopped on a per-instance basis, providing better control over task execution within the service.

The PerInstanceTaskDescriptor is not intended to be used directly. Instead, it is used internally by the @task decorator to manage task objects for each instance of the service class.

Source code in src/pydase/task/decorator.py
def __init__(  # noqa: PLR0913
    self,
    func: Callable[[Any], Coroutine[None, None, R]]
    | Callable[[], Coroutine[None, None, R]],
    autostart: bool,
    restart_on_exception: bool,
    restart_sec: float,
    start_limit_interval_sec: float | None,
    start_limit_burst: int,
    exit_on_failure: bool,
) -> None:
    self.__func = func
    self.__autostart = autostart
    self.__task_instances: dict[object, Task[R]] = {}
    self.__restart_on_exception = restart_on_exception
    self.__restart_sec = restart_sec
    self.__start_limit_interval_sec = start_limit_interval_sec
    self.__start_limit_burst = start_limit_burst
    self.__exit_on_failure = exit_on_failure
__set_name__
__set_name__(owner: type[DataService], name: str) -> None

Stores the name of the task within the owning class. This method is called automatically when the descriptor is assigned to a class attribute.

Source code in src/pydase/task/decorator.py
def __set_name__(self, owner: type[DataService], name: str) -> None:
    """Stores the name of the task within the owning class. This method is called
    automatically when the descriptor is assigned to a class attribute.
    """

    self.__task_name = name

task

task(*, autostart: bool = False, restart_on_exception: bool = True, restart_sec: float = 1.0, start_limit_interval_sec: float | None = None, start_limit_burst: int = 3, exit_on_failure: bool = False) -> Callable[[Callable[[Any], Coroutine[None, None, R]] | Callable[[], Coroutine[None, None, R]]], PerInstanceTaskDescriptor[R]]

A decorator to define an asynchronous function as a per-instance task within a DataService class.

This decorator transforms an asynchronous function into a Task object that is unique to each instance of the DataService class. The resulting Task object provides methods like start() and stop() to control the execution of the task, and manages the task’s lifecycle independently for each instance of the service.

The decorator is particularly useful for defining tasks that need to run periodically or perform asynchronous operations, such as polling data sources, updating databases, or any recurring job that should be managed within the context of a DataService.

The keyword arguments that can be passed to this decorator are inspired by systemd unit services.

Parameters:

Name Type Description Default
autostart bool

If set to True, the task will automatically start when the service is initialized. Defaults to False.

False
restart_on_exception bool

Configures whether the task shall be restarted when it exits with an exception other than [asyncio.CancelledError][asyncio.CancelledError].

True
restart_sec float

Configures the time to sleep before restarting a task. Defaults to 1.0.

1.0
start_limit_interval_sec float | None

Configures start rate limiting. Tasks which are started more than start_limit_burst times within an start_limit_interval_sec time span are not permitted to start any more. Defaults to None (disabled rate limiting).

None
start_limit_burst int

Configures unit start rate limiting. Tasks which are started more than start_limit_burst times within an start_limit_interval_sec time span are not permitted to start any more. Defaults to 3.

3
exit_on_failure bool

If True, exit the service if the task fails and restart_on_exception is False or burst limits are exceeded.

False

Returns: A decorator that wraps an asynchronous function in a PerInstanceTaskDescriptor object, which, when accessed, provides an instance-specific Task object.

Example
import asyncio

import pydase
from pydase.task.decorator import task


class MyService(pydase.DataService):
    @task(autostart=True)
    async def my_task(self) -> None:
        while True:
            # Perform some periodic work
            await asyncio.sleep(1)


if __name__ == "__main__":
    service = MyService()
    pydase.Server(service=service).run()

In this example, my_task is defined as a task using the @task decorator, and it will start automatically when the service is initialized because autostart=True is set. You can manually start or stop the task using service.my_task.start() and service.my_task.stop(), respectively.

Source code in src/pydase/task/decorator.py
def task(  # noqa: PLR0913
    *,
    autostart: bool = False,
    restart_on_exception: bool = True,
    restart_sec: float = 1.0,
    start_limit_interval_sec: float | None = None,
    start_limit_burst: int = 3,
    exit_on_failure: bool = False,
) -> Callable[
    [
        Callable[[Any], Coroutine[None, None, R]]
        | Callable[[], Coroutine[None, None, R]]
    ],
    PerInstanceTaskDescriptor[R],
]:
    """
    A decorator to define an asynchronous function as a per-instance task within a
    [`DataService`][pydase.DataService] class.

    This decorator transforms an asynchronous function into a
    [`Task`][pydase.task.task.Task] object that is unique to each instance of the
    `DataService` class. The resulting `Task` object provides methods like `start()`
    and `stop()` to control the execution of the task, and manages the task's lifecycle
    independently for each instance of the service.

    The decorator is particularly useful for defining tasks that need to run
    periodically or perform asynchronous operations, such as polling data sources,
    updating databases, or any recurring job that should be managed within the context
    of a `DataService`.

    The keyword arguments that can be passed to this decorator are inspired by systemd
    unit services.

    Args:
        autostart:
            If set to True, the task will automatically start when the service is
            initialized. Defaults to False.
        restart_on_exception:
            Configures whether the task shall be restarted when it exits with an
            exception other than [`asyncio.CancelledError`][asyncio.CancelledError].
        restart_sec:
            Configures the time to sleep before restarting a task. Defaults to 1.0.
        start_limit_interval_sec:
            Configures start rate limiting. Tasks which are started more than
            `start_limit_burst` times within an `start_limit_interval_sec` time span are
            not permitted to start any more. Defaults to None (disabled rate limiting).
        start_limit_burst:
            Configures unit start rate limiting. Tasks which are started more than
            `start_limit_burst` times within an `start_limit_interval_sec` time span are
            not permitted to start any more. Defaults to 3.
        exit_on_failure:
            If True, exit the service if the task fails and restart_on_exception is
            False or burst limits are exceeded.
    Returns:
        A decorator that wraps an asynchronous function in a
        [`PerInstanceTaskDescriptor`][pydase.task.decorator.PerInstanceTaskDescriptor]
        object, which, when accessed, provides an instance-specific
        [`Task`][pydase.task.task.Task] object.

    Example:
        ```python
        import asyncio

        import pydase
        from pydase.task.decorator import task


        class MyService(pydase.DataService):
            @task(autostart=True)
            async def my_task(self) -> None:
                while True:
                    # Perform some periodic work
                    await asyncio.sleep(1)


        if __name__ == "__main__":
            service = MyService()
            pydase.Server(service=service).run()
        ```

        In this example, `my_task` is defined as a task using the `@task` decorator, and
        it will start automatically when the service is initialized because
        `autostart=True` is set. You can manually start or stop the task using
        `service.my_task.start()` and `service.my_task.stop()`, respectively.
    """

    def decorator(
        func: Callable[[Any], Coroutine[None, None, R]]
        | Callable[[], Coroutine[None, None, R]],
    ) -> PerInstanceTaskDescriptor[R]:
        return PerInstanceTaskDescriptor(
            func,
            autostart=autostart,
            restart_on_exception=restart_on_exception,
            restart_sec=restart_sec,
            start_limit_interval_sec=start_limit_interval_sec,
            start_limit_burst=start_limit_burst,
            exit_on_failure=exit_on_failure,
        )

    return decorator

task

Task

Task(func: Callable[[], Coroutine[None, None, R | None]], *, autostart: bool, restart_on_exception: bool, restart_sec: float, start_limit_interval_sec: float | None, start_limit_burst: int, exit_on_failure: bool)

Bases: DataService, Generic[R]

A class representing a task within the pydase framework.

The Task class wraps an asynchronous function and provides methods to manage its lifecycle, such as start() and stop(). It is typically used to perform periodic or recurring jobs in a DataService, like reading sensor data, updating databases, or executing other background tasks.

When a function is decorated with the @task decorator, it is replaced by a Task instance that controls the execution of the original function.

The keyword arguments that can be passed to this class are inspired by systemd unit services.

Parameters:

Name Type Description Default
func Callable[[], Coroutine[None, None, R | None]]

The asynchronous function that this task wraps. It must be a coroutine without arguments.

required
autostart bool

If set to True, the task will automatically start when the service is initialized. Defaults to False.

required
restart_on_exception bool

Configures whether the task shall be restarted when it exits with an exception other than [asyncio.CancelledError][asyncio.CancelledError].

required
restart_sec float

Configures the time to sleep before restarting a task. Defaults to 1.0.

required
start_limit_interval_sec float | None

Configures start rate limiting. Tasks which are started more than start_limit_burst times within an start_limit_interval_sec time span are not permitted to start any more. Defaults to None (disabled rate limiting).

required
start_limit_burst int

Configures unit start rate limiting. Tasks which are started more than start_limit_burst times within an start_limit_interval_sec time span are not permitted to start any more. Defaults to 3.

required
exit_on_failure bool

If True, exit the service if the task fails and restart_on_exception is False or burst limits are exceeded.

required
Example
import asyncio

import pydase
from pydase.task.decorator import task


class MyService(pydase.DataService):
    @task(autostart=True)
    async def my_task(self) -> None:
        while True:
            # Perform some periodic work
            await asyncio.sleep(1)


if __name__ == "__main__":
    service = MyService()
    pydase.Server(service=service).run()

In this example, my_task is defined as a task using the @task decorator, and it will start automatically when the service is initialized because autostart=True is set. You can manually start or stop the task using service.my_task.start() and service.my_task.stop(), respectively.

Source code in src/pydase/task/task.py
def __init__(  # noqa: PLR0913
    self,
    func: Callable[[], Coroutine[None, None, R | None]],
    *,
    autostart: bool,
    restart_on_exception: bool,
    restart_sec: float,
    start_limit_interval_sec: float | None,
    start_limit_burst: int,
    exit_on_failure: bool,
) -> None:
    super().__init__()
    self._autostart = autostart
    self._restart_on_exception = restart_on_exception
    self._restart_sec = restart_sec
    self._start_limit_interval_sec = start_limit_interval_sec
    self._start_limit_burst = start_limit_burst
    self._exit_on_failure = exit_on_failure
    self._func_name = func.__name__
    self._func = func
    self._task: asyncio.Task[R | None] | None = None
    self._status = TaskStatus.NOT_RUNNING
    self._result: R | None = None

    if not current_event_loop_exists():
        self._loop = asyncio.new_event_loop()
        asyncio.set_event_loop(self._loop)
    else:
        self._loop = asyncio.get_event_loop()
autostart property
autostart: bool

Defines if the task should be started automatically when the Server starts.

status property
status: TaskStatus

Returns the current status of the task.

start
start() -> None

Starts the asynchronous task if it is not already running.

Source code in src/pydase/task/task.py
def start(self) -> None:
    """Starts the asynchronous task if it is not already running."""
    if self._task:
        return

    def task_done_callback(task: asyncio.Task[R | None]) -> None:
        """Handles tasks that have finished.

        Updates the task status, calls the defined callbacks, and logs and re-raises
        exceptions.
        """

        self._task = None
        self._status = TaskStatus.NOT_RUNNING

        exception = None
        try:
            exception = task.exception()
        except asyncio.CancelledError:
            return

        if exception is not None:
            logger.error(
                "Task '%s' encountered an exception: %r",
                self._func_name,
                exception,
            )
            os.kill(os.getpid(), signal.SIGTERM)
        else:
            self._result = task.result()

    logger.info("Creating task %r", self._func_name)
    self._task = self._loop.create_task(self.__running_task_loop())
    self._task.add_done_callback(task_done_callback)
stop
stop() -> None

Stops the running asynchronous task by cancelling it.

Source code in src/pydase/task/task.py
def stop(self) -> None:
    """Stops the running asynchronous task by cancelling it."""

    if self._task:
        self._task.cancel()

task_status

TaskStatus

Bases: Enum

Possible statuses of a Task.

pydase.utils.serialization.serializer

Serializer

Serializes objects into SerializedObject representations.

serialize_object classmethod

serialize_object(obj: Any, access_path: str = '') -> SerializedObject

Serialize obj to a SerializedObject.

Parameters:

Name Type Description Default
obj Any

Object to be serialized.

required
access_path str

String corresponding to the full access path of the object. This will be prepended to the full_access_path in the SerializedObject entries.

''

Returns:

Type Description
SerializedObject

Dictionary representation of obj.

Source code in src/pydase/utils/serialization/serializer.py
@classmethod
def serialize_object(cls, obj: Any, access_path: str = "") -> SerializedObject:  # noqa: C901
    """Serialize `obj` to a
    [`SerializedObject`][pydase.utils.serialization.types.SerializedObject].

    Args:
        obj:
            Object to be serialized.
        access_path:
            String corresponding to the full access path of the object. This will be
            prepended to the full_access_path in the SerializedObject entries.

    Returns:
        Dictionary representation of `obj`.
    """
    from pydase.client.client import ProxyClass

    result: SerializedObject

    if isinstance(obj, Exception):
        result = cls._serialize_exception(obj)

    elif isinstance(obj, datetime):
        result = cls._serialize_datetime(obj, access_path=access_path)

    elif isinstance(obj, ProxyClass):
        result = cls._serialize_proxy_class(obj, access_path=access_path)

    elif isinstance(obj, AbstractDataService):
        result = cls._serialize_data_service(obj, access_path=access_path)

    elif isinstance(obj, list):
        result = cls._serialize_list(obj, access_path=access_path)

    elif isinstance(obj, dict):
        result = cls._serialize_dict(obj, access_path=access_path)

    # Special handling for u.Quantity
    elif isinstance(obj, u.Quantity):
        result = cls._serialize_quantity(obj, access_path=access_path)

    # Handling for Enums
    elif isinstance(obj, Enum):
        result = cls._serialize_enum(obj, access_path=access_path)

    # Methods and coroutines
    elif inspect.isfunction(obj) or inspect.ismethod(obj):
        result = cls._serialize_method(obj, access_path=access_path)

    elif isinstance(obj, int | float | bool | str | None):
        result = cls._serialize_primitive(obj, access_path=access_path)

    try:
        return result
    except UnboundLocalError:
        raise SerializationError(
            f"Could not serialized object of type {type(obj)}."
        )

add_prefix_to_full_access_path

add_prefix_to_full_access_path(serialized_obj: SerializedObject, prefix: str) -> Any

Recursively adds a specified prefix to all full access paths of the serialized object.

Parameters:

Name Type Description Default
serialized_obj SerializedObject

The serialized object to process.

required
prefix str

The prefix string to prepend to each full access path.

required

Returns:

Type Description
Any

The modified serialized object with the prefix added to all full access paths.

Example
>>> serialized_obj = {
...     "full_access_path": "",
...     "value": {
...         "item": {
...             "full_access_path": "some_item_path",
...             "value": 1.0
...         }
...     }
... }
...
... modified_data = add_prefix_to_full_access_path(serialized_obj, 'prefix')
{"full_access_path": "prefix", "value": {"item": {"full_access_path":
"prefix.some_item_path", "value": 1.0}}}
Source code in src/pydase/utils/serialization/serializer.py
def add_prefix_to_full_access_path(
    serialized_obj: SerializedObject, prefix: str
) -> Any:
    """Recursively adds a specified prefix to all full access paths of the serialized
    object.

    Args:
        serialized_obj:
            The serialized object to process.
        prefix:
            The prefix string to prepend to each full access path.

    Returns:
        The modified serialized object with the prefix added to all full access paths.

    Example:
        ```python
        >>> serialized_obj = {
        ...     "full_access_path": "",
        ...     "value": {
        ...         "item": {
        ...             "full_access_path": "some_item_path",
        ...             "value": 1.0
        ...         }
        ...     }
        ... }
        ...
        ... modified_data = add_prefix_to_full_access_path(serialized_obj, 'prefix')
        {"full_access_path": "prefix", "value": {"item": {"full_access_path":
        "prefix.some_item_path", "value": 1.0}}}
        ```
    """

    try:
        if serialized_obj.get("full_access_path", None) is not None:
            serialized_obj["full_access_path"] = (
                prefix + "." + serialized_obj["full_access_path"]
                if serialized_obj["full_access_path"] != ""
                else prefix
            )

        if isinstance(serialized_obj["value"], list):
            for value in serialized_obj["value"]:
                add_prefix_to_full_access_path(cast("SerializedObject", value), prefix)

        elif isinstance(serialized_obj["value"], dict):
            for value in cast(
                "dict[str, SerializedObject]", serialized_obj["value"]
            ).values():
                add_prefix_to_full_access_path(cast("SerializedObject", value), prefix)
    except (TypeError, KeyError, AttributeError):
        # passed dictionary is not a serialized object
        pass
    return serialized_obj

create_empty_serialized_object

create_empty_serialized_object() -> SerializedObject

Create a new empty serialized object.

Source code in src/pydase/utils/serialization/serializer.py
def create_empty_serialized_object() -> SerializedObject:
    """Create a new empty serialized object."""

    return {
        "full_access_path": "",
        "value": None,
        "type": "None",
        "doc": None,
        "readonly": False,
    }

dump

dump(obj: Any) -> SerializedObject

Serialize obj to a SerializedObject.

The Serializer is used for encoding.

Parameters:

Name Type Description Default
obj Any

Object to be serialized.

required

Returns:

Type Description
SerializedObject

Dictionary representation of obj.

Source code in src/pydase/utils/serialization/serializer.py
def dump(obj: Any) -> SerializedObject:
    """Serialize `obj` to a
    [`SerializedObject`][pydase.utils.serialization.types.SerializedObject].

    The [`Serializer`][pydase.utils.serialization.serializer.Serializer] is used for
    encoding.

    Args:
        obj:
            Object to be serialized.

    Returns:
        Dictionary representation of `obj`.
    """
    return Serializer.serialize_object(obj)

generate_serialized_data_paths

generate_serialized_data_paths(data: dict[str, SerializedObject]) -> list[str]

Recursively extracts full access paths from a serialized DataService class instance.

Parameters:

Name Type Description Default
data dict[str, SerializedObject]

The value of the “value” key of a serialized DataService class instance.

required

Returns:

Type Description
list[str]

A list of strings, each representing a full access path in the serialized object.

Source code in src/pydase/utils/serialization/serializer.py
def generate_serialized_data_paths(
    data: dict[str, SerializedObject],
) -> list[str]:
    """
    Recursively extracts full access paths from a serialized DataService class instance.

    Args:
        data:
            The value of the "value" key of a serialized DataService class instance.

    Returns:
        A list of strings, each representing a full access path in the serialized
            object.
    """

    paths: list[str] = []

    for key, value in data.items():
        paths.append(key)

        if serialized_dict_is_nested_object(value):
            paths.extend(get_data_paths_from_serialized_object(value, key))
    return paths

get_container_item_by_key

get_container_item_by_key(container: dict[Any, SerializedObject] | list[SerializedObject], key: str, *, allow_append: bool = False) -> SerializedObject

Retrieve an item from a container specified by the passed key. Add an item to the container if allow_append is set to True.

If specified keys or indexes do not exist, the function can append new elements to dictionaries and to lists if allow_append is True and the missing element is exactly the next sequential index (for lists).

Parameters:

Name Type Description Default
container dict[Any, SerializedObject] | list[SerializedObject]

The container representing serialized data.

required
key str

The key name representing the attribute in the dictionary, which may include direct keys or indexes (e.g., ‘attr_name’, ‘[“key”]’ or ‘[0]’).

required
allow_append bool

Flag to allow appending a new entry if the specified index is out of range by exactly one position.

False

Returns:

Type Description
SerializedObject

The dictionary or list item corresponding to the specified attribute and index.

Raises:

Type Description
SerializationPathError

If the path composed of attr_name and any specified index is invalid, or leads to an IndexError or KeyError. This error is also raised if an attempt to access a nonexistent key or index occurs without permission to append.

Source code in src/pydase/utils/serialization/serializer.py
def get_container_item_by_key(
    container: dict[Any, SerializedObject] | list[SerializedObject],
    key: str,
    *,
    allow_append: bool = False,
) -> SerializedObject:
    """
    Retrieve an item from a container specified by the passed key. Add an item to the
    container if `allow_append` is set to `True`.

    If specified keys or indexes do not exist, the function can append new elements to
    dictionaries and to lists if `allow_append` is True and the missing element is
    exactly the next sequential index (for lists).

    Args:
        container:
            The container representing serialized data.
        key:
            The key name representing the attribute in the dictionary, which may include
            direct keys or indexes (e.g., 'attr_name', '["key"]' or '[0]').
        allow_append:
            Flag to allow appending a new entry if the specified index is out of range
            by exactly one position.

    Returns:
        The dictionary or list item corresponding to the specified attribute and index.

    Raises:
        SerializationPathError:
            If the path composed of `attr_name` and any specified index is invalid, or
            leads to an IndexError or KeyError. This error is also raised if an attempt
            to access a nonexistent key or index occurs without permission to append.
    """
    processed_key = parse_serialized_key(key)

    try:
        return get_or_create_item_in_container(
            container, processed_key, allow_add_key=allow_append
        )
    except IndexError as e:
        raise SerializationPathError(f"Index '{processed_key}': {e}")
    except KeyError as e:
        raise SerializationPathError(f"Key '{processed_key}': {e}")

get_data_paths_from_serialized_object

get_data_paths_from_serialized_object(serialized_obj: SerializedObject, parent_path: str = '') -> list[str]

Recursively extracts full access paths from a serialized object.

Parameters:

Name Type Description Default
serialized_obj SerializedObject

The dictionary representing the serialization of an object. Produced by pydase.utils.serializer.Serializer.

required

Returns:

Type Description
list[str]

A list of strings, each representing a full access path in the serialized object.

Source code in src/pydase/utils/serialization/serializer.py
def get_data_paths_from_serialized_object(  # noqa: C901
    serialized_obj: SerializedObject,
    parent_path: str = "",
) -> list[str]:
    """
    Recursively extracts full access paths from a serialized object.

    Args:
        serialized_obj:
            The dictionary representing the serialization of an object. Produced by
            `pydase.utils.serializer.Serializer`.

    Returns:
        A list of strings, each representing a full access path in the serialized
            object.
    """

    paths: list[str] = []

    if isinstance(serialized_obj["value"], list):
        for index, value in enumerate(serialized_obj["value"]):
            new_path = f"{parent_path}[{index}]"
            paths.append(new_path)
            if serialized_dict_is_nested_object(value):
                paths.extend(get_data_paths_from_serialized_object(value, new_path))

    elif serialized_dict_is_nested_object(serialized_obj):
        for key, value in cast(
            "dict[str, SerializedObject]", serialized_obj["value"]
        ).items():
            # Serialized dictionaries need to have a different new_path than nested
            # classes
            if serialized_obj["type"] == "dict":
                processed_key = key
                if isinstance(key, str):
                    processed_key = f'"{key}"'
                new_path = f"{parent_path}[{processed_key}]"
            else:
                new_path = f"{parent_path}.{key}" if parent_path != "" else key

            paths.append(new_path)
            if serialized_dict_is_nested_object(value):
                paths.extend(get_data_paths_from_serialized_object(value, new_path))

    return paths

get_or_create_item_in_container

get_or_create_item_in_container(container: dict[Any, SerializedObject] | list[SerializedObject], key: Any, *, allow_add_key: bool) -> SerializedObject

Ensure the key exists in the dictionary, append if necessary and allowed.

Source code in src/pydase/utils/serialization/serializer.py
def get_or_create_item_in_container(
    container: dict[Any, SerializedObject] | list[SerializedObject],
    key: Any,
    *,
    allow_add_key: bool,
) -> SerializedObject:
    """Ensure the key exists in the dictionary, append if necessary and allowed."""

    try:
        return container[key]
    except IndexError:
        if allow_add_key and key == len(container):
            cast("list[SerializedObject]", container).append(
                create_empty_serialized_object()
            )
            return container[key]
        raise
    except KeyError:
        if allow_add_key:
            container[key] = create_empty_serialized_object()
            return container[key]
        raise

set_nested_value_by_path

set_nested_value_by_path(serialization_dict: dict[Any, SerializedObject], path: str, value: Any) -> None

Set a value in a nested dictionary structure, which conforms to the serialization format used by Serializer, using a dot-notation path.

Parameters:

Name Type Description Default
serialization_dict dict[Any, SerializedObject]

The base dictionary representing data serialized with Serializer.

required
path str

The dot-notation path (e.g., ‘attr1.attr2[0].attr3’) indicating where to set the value.

required
value Any

The new value to set at the specified path.

required
Note

If the index equals the length of the list, the function will append the serialized representation of the ‘value’ to the list.

Source code in src/pydase/utils/serialization/serializer.py
def set_nested_value_by_path(
    serialization_dict: dict[Any, SerializedObject], path: str, value: Any
) -> None:
    """
    Set a value in a nested dictionary structure, which conforms to the serialization
    format used by [`Serializer`][pydase.utils.serialization.serializer.Serializer],
    using a dot-notation path.

    Args:
        serialization_dict:
            The base dictionary representing data serialized with
            [`Serializer`][pydase.utils.serialization.serializer.Serializer].
        path:
            The dot-notation path (e.g., 'attr1.attr2[0].attr3') indicating where to
            set the value.
        value:
            The new value to set at the specified path.

    Note:
        If the index equals the length of the list, the function will append the
        serialized representation of the 'value' to the list.
    """

    path_parts = parse_full_access_path(path)
    current_dict: dict[Any, SerializedObject] = serialization_dict

    try:
        for path_part in path_parts[:-1]:
            next_level_serialized_object = get_container_item_by_key(
                current_dict, path_part, allow_append=False
            )
            current_dict = cast(
                "dict[Any, SerializedObject]",
                next_level_serialized_object["value"],
            )

        next_level_serialized_object = get_container_item_by_key(
            current_dict, path_parts[-1], allow_append=True
        )
    except (SerializationPathError, KeyError) as e:
        logger.exception("Error occured trying to change %a: %s", path, e)
        return

    if next_level_serialized_object["type"] == "method":  # state change of task
        next_level_serialized_object["value"] = (
            "RUNNING" if isinstance(value, TaskStatus) else None
        )
    else:
        serialized_value = Serializer.serialize_object(value, access_path=path)
        serialized_value["readonly"] = next_level_serialized_object["readonly"]

        keys_to_keep = set(serialized_value.keys())

        next_level_serialized_object.update(serialized_value)  # type: ignore

        # removes keys that are not present in the serialized new value
        for key in list(next_level_serialized_object.keys()):
            if key not in keys_to_keep:
                next_level_serialized_object.pop(key, None)  # type: ignore

pydase.utils.serialization.deserializer

Deserializer

deserialize classmethod

deserialize(serialized_object: SerializedObject) -> Any

Deserialize serialized_object (a dict) to a Python object.

Source code in src/pydase/utils/serialization/deserializer.py
@classmethod
def deserialize(cls, serialized_object: SerializedObject) -> Any:
    """Deserialize `serialized_object` (a `dict`) to a Python object."""
    type_handler: dict[str | None, None | Callable[..., Any]] = {
        None: None,
        "int": cls.deserialize_primitive,
        "float": cls.deserialize_primitive,
        "bool": cls.deserialize_primitive,
        "str": cls.deserialize_primitive,
        "NoneType": cls.deserialize_primitive,
        "Quantity": cls.deserialize_quantity,
        "Enum": cls.deserialize_enum,
        "ColouredEnum": lambda serialized_object: cls.deserialize_enum(
            serialized_object, enum_class=pydase.components.ColouredEnum
        ),
        "list": cls.deserialize_list,
        "dict": cls.deserialize_dict,
        "method": cls.deserialize_method,
        "Exception": cls.deserialize_exception,
        "datetime": cls.deserialize_datetime,
    }

    # First go through handled types (as ColouredEnum is also within the components)
    handler = type_handler.get(serialized_object["type"])
    if handler:
        return handler(serialized_object)

    # Custom types like Components or DataService classes
    service_base_class = cls.get_service_base_class(serialized_object["type"])
    if service_base_class:
        return cls.deserialize_data_service(serialized_object, service_base_class)

    return None

loads

loads(serialized_object: SerializedObject) -> Any

Deserialize serialized_object (a dict) to a Python object.

Source code in src/pydase/utils/serialization/deserializer.py
def loads(serialized_object: SerializedObject) -> Any:
    """Deserialize `serialized_object` (a `dict`) to a Python object."""
    return Deserializer.deserialize(serialized_object)

pydase.utils.serialization.types

SerializedObject module-attribute

SerializedObject = SerializedBool | SerializedFloat | SerializedInteger | SerializedString | SerializedDatetime | SerializedList | SerializedDict | SerializedNoneType | SerializedMethod | SerializedException | SerializedDataService | SerializedEnum | SerializedQuantity | SerializedNoValue

This type can be any of the following:

  • SerializedBool
  • SerializedFloat
  • SerializedInteger
  • SerializedString
  • SerializedDatetime
  • SerializedList
  • SerializedDict
  • SerializedNoneType
  • SerializedMethod
  • SerializedException
  • SerializedDataService
  • SerializedEnum
  • SerializedQuantity
  • SerializedNoValue

pydase.utils.decorators

frontend

frontend(func: Callable[..., Any]) -> Callable[..., Any]

Decorator to mark a DataService method for frontend rendering. Ensures that the method does not contain arguments, as they are not supported for frontend rendering.

Source code in src/pydase/utils/decorators.py
def frontend(func: Callable[..., Any]) -> Callable[..., Any]:
    """Decorator to mark a [`DataService`][pydase.DataService] method for frontend
    rendering. Ensures that the method does not contain arguments, as they are not
    supported for frontend rendering.
    """

    if function_has_arguments(func):
        raise FunctionDefinitionError(
            "The @frontend decorator requires functions without arguments. Function "
            f"'{func.__name__}' has at least one argument. "
            "Please remove the argument(s) from this function to use it with the "
            "@frontend decorator."
        )

    # Mark the function for frontend display.
    func._display_in_frontend = True  # type: ignore
    return func

pydase.utils.logging

DefaultFormatter

DefaultFormatter(fmt: str | None = None, datefmt: str | None = None, style: Literal['%', '{', '$'] = '%', use_colors: bool | None = None)

Bases: Formatter

A custom log formatter class that:

  • Outputs the LOG_LEVEL with an appropriate color.
  • If a log call includes an extras={"color_message": ...} it will be used for formatting the output, instead of the plain text message.
Source code in src/pydase/utils/logging.py
def __init__(
    self,
    fmt: str | None = None,
    datefmt: str | None = None,
    style: Literal["%", "{", "$"] = "%",
    use_colors: bool | None = None,
):
    if use_colors in (True, False):
        self.use_colors = use_colors
    else:
        self.use_colors = sys.stdout.isatty()
    super().__init__(fmt=fmt, datefmt=datefmt, style=style)

NameFilter

NameFilter(match: str, invert: bool = False)

Bases: Filter

Logging filter that allows filtering logs based on the logger name. Can either include or exclude a specific logger.

Source code in src/pydase/utils/logging.py
def __init__(self, match: str, invert: bool = False):
    super().__init__()
    self.match = match
    self.invert = invert

SocketIOHandler

SocketIOHandler(sio: AsyncServer)

Bases: Handler

Custom logging handler that emits ERROR and CRITICAL log records to a Socket.IO server, allowing for real-time logging in applications that use Socket.IO for communication.

Source code in src/pydase/utils/logging.py
def __init__(self, sio: socketio.AsyncServer) -> None:
    super().__init__(logging.ERROR)
    self._sio = sio

configure_logging_with_pydase_formatter

configure_logging_with_pydase_formatter(name: str | None = None, level: int = logging.INFO, stream: TextIO | None = None) -> None

Configure a logger with the pydase DefaultFormatter.

This sets up a StreamHandler with the custom DefaultFormatter, which includes timestamp, log level with color (if supported), logger name, function, and line number. It can be used to configure the root logger or any named logger.

Parameters:

Name Type Description Default
name str | None

The name of the logger to configure. If None, the root logger is used.

None
level int

The logging level to set on the logger (e.g., logging.DEBUG, logging.INFO). Defaults to logging.INFO.

INFO
stream TextIO | None

The output stream for the log messages (e.g., sys.stdout or sys.stderr). If None, defaults to sys.stderr.

None
Example

Configure logging in your service:

import sys
from pydase.utils.logging import configure_logging_with_pydase_formatter

configure_logging_with_pydase_formatter(
    name="my_service",      # Use the package/module name or None for the root logger
    level=logging.DEBUG,    # Set the desired logging level (defaults to INFO)
    stream=sys.stdout       # Set the output stream (stderr by default)
)
Notes
  • This function adds a new handler each time it’s called. Use carefully to avoid duplicate logs.
  • Colors are enabled if the stream supports TTY (e.g., in terminal).
Source code in src/pydase/utils/logging.py
def configure_logging_with_pydase_formatter(
    name: str | None = None, level: int = logging.INFO, stream: TextIO | None = None
) -> None:
    """Configure a logger with the pydase `DefaultFormatter`.

    This sets up a `StreamHandler` with the custom `DefaultFormatter`, which includes
    timestamp, log level with color (if supported), logger name, function, and line
    number. It can be used to configure the root logger or any named logger.

    Args:
        name: The name of the logger to configure. If None, the root logger is used.
        level: The logging level to set on the logger (e.g., logging.DEBUG,
            logging.INFO). Defaults to logging.INFO.
        stream: The output stream for the log messages (e.g., sys.stdout or sys.stderr).
            If None, defaults to sys.stderr.

    Example:
        Configure logging in your service:

        ```python
        import sys
        from pydase.utils.logging import configure_logging_with_pydase_formatter

        configure_logging_with_pydase_formatter(
            name="my_service",      # Use the package/module name or None for the root logger
            level=logging.DEBUG,    # Set the desired logging level (defaults to INFO)
            stream=sys.stdout       # Set the output stream (stderr by default)
        )
        ```

    Notes:
        - This function adds a new handler each time it's called.
          Use carefully to avoid duplicate logs.
        - Colors are enabled if the stream supports TTY (e.g., in terminal).
    """  # noqa: E501

    logger = logging.getLogger(name=name)
    handler = logging.StreamHandler(stream=stream)
    formatter = DefaultFormatter(
        fmt="%(asctime)s.%(msecs)03d | %(levelprefix)s | "
        "%(name)s:%(funcName)s:%(lineno)d - %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(level)

setup_logging

setup_logging() -> None

Configures the logging settings for the application.

This function sets up logging with specific formatting and colorization of log messages. The log level is determined based on the application’s operation mode. By default, in a development environment, the log level is set to DEBUG, whereas in other environments, it is set to INFO.

Source code in src/pydase/utils/logging.py
def setup_logging() -> None:
    """
    Configures the logging settings for the application.

    This function sets up logging with specific formatting and colorization of log
    messages. The log level is determined based on the application's operation mode. By
    default, in a development environment, the log level is set to DEBUG, whereas in
    other environments, it is set to INFO.
    """

    logger.debug("Configuring pydase logging.")

    logging.config.dictConfig(LOGGING_CONFIG)

pydase.units

convert_to_quantity

convert_to_quantity(value: QuantityDict | float | Quantity, unit: str = '') -> Quantity

Convert a given value into a pint.Quantity object with the specified unit.

Parameters:

Name Type Description Default
value QuantityDict | float | Quantity

The value to be converted into a Quantity object.

  • If value is a float or int, it will be directly converted to the specified unit.
  • If value is a dict, it must have keys ‘magnitude’ and ‘unit’ to represent the value and unit.
  • If value is a Quantity object, it will remain unchanged.
required
unit str

The target unit for conversion. If empty and value is not a Quantity object, it will assume a unitless quantity.

''

Returns:

Type Description
Quantity

The converted value as a pint.Quantity object with the specified unit.

Examples:

>>> convert_to_quantity(5, 'm')
<Quantity(5.0, 'meters')>
>>> convert_to_quantity({'magnitude': 10, 'unit': 'mV'})
<Quantity(10.0, 'millivolt')>
>>> convert_to_quantity(10.0 * u.units.V)
<Quantity(10.0, 'volt')>
Note

If unit is not provided and value is a float or int, the resulting Quantity will be unitless.

Source code in src/pydase/units.py
def convert_to_quantity(
    value: QuantityDict | float | Quantity, unit: str = ""
) -> Quantity:
    """
    Convert a given value into a pint.Quantity object with the specified unit.

    Args:
        value:
            The value to be converted into a Quantity object.

            - If value is a float or int, it will be directly converted to the specified
              unit.
            - If value is a dict, it must have keys 'magnitude' and 'unit' to represent
              the value and unit.
            - If value is a Quantity object, it will remain unchanged.\n
        unit:
            The target unit for conversion. If empty and value is not a Quantity object,
            it will assume a unitless quantity.

    Returns:
        The converted value as a pint.Quantity object with the specified unit.

    Examples:
        >>> convert_to_quantity(5, 'm')
        <Quantity(5.0, 'meters')>
        >>> convert_to_quantity({'magnitude': 10, 'unit': 'mV'})
        <Quantity(10.0, 'millivolt')>
        >>> convert_to_quantity(10.0 * u.units.V)
        <Quantity(10.0, 'volt')>

    Note:
        If unit is not provided and value is a float or int, the resulting Quantity will
        be unitless.
    """

    if isinstance(value, int | float):
        quantity = float(value) * Unit(unit)
    elif isinstance(value, dict):
        quantity = float(value["magnitude"]) * Unit(value["unit"])
    else:
        quantity = value
    return quantity

pydase.config

OperationMode

Bases: BaseConfig

environment class-attribute instance-attribute

environment: Literal['testing', 'development', 'production'] = 'development'

The service’s operation mode.

ServiceConfig

Bases: BaseConfig

Service configuration.

Variables can be set through environment variables prefixed with SERVICE_ or an .env file containing those variables.

config_dir class-attribute instance-attribute

config_dir: Path = Path('config')

Configuration directory

web_port class-attribute instance-attribute

web_port: int = 8001

Web server port

WebServerConfig

Bases: BaseConfig

The service’s web server configuration.

generate_web_settings class-attribute instance-attribute

generate_web_settings: bool = False

Should generate web_settings.json file