Source code for ppc_robot_lib.tasks.task_factory

import uuid
from typing import Any

from ppc_robot_lib.platform import PlatformType
from ppc_robot_lib.tasks.abstract_task_type import AbstractTaskType
from ppc_robot_lib.tasks.task import Task
from ppc_robot_lib.tasks.types import TaskLevel
from ppc_robot_lib.tasks.task_id_allocator import TaskIdAllocator


class Error(Exception):
    """
    Base class for exceptions raised by TaskFactory.
    """

    pass


class TaskTypeNotFound(Error):
    """
    Signals that the task type with given name does not exist.
    """

    def __init__(self, platform: PlatformType, task_type_name: str, task_type_version: int):
        self.platform = platform
        self.name = task_type_name
        self.version = task_type_version


[docs] class TaskFactory: """ Creates task from job definition. Contains dictionary of named task types. """ def __init__(self, task_id_allocator: TaskIdAllocator | None = None): """ :param task_id_allocator: Deprecated service for allocating task IDs. Never used, kept for backwards compatibility. """ from ppc_robot_task_types import task_types self._task_types = task_types.supported_types
[docs] def create_task( self, task_id: uuid.UUID, platform: PlatformType, task_type_name: str, version: int, parameters: dict[str, Any] ): """ Creates a task of given type with given parameters. :param task_id: Assigned Task ID. If not given, a new one is generated. :param platform: Platform. :param task_type_name: Name of the task type. :param version: Task type version :param parameters: Task parameters. :return: Created task. """ task_type_cls = self.get_task_type(platform, task_type_name, version) task_type = task_type_cls() return self.create_task_from_instance(task_type, parameters, task_id)
@staticmethod def create_task_from_instance( task_type: AbstractTaskType, parameters: dict[str, Any], task_id: uuid.UUID | None = None ) -> Task: if task_id is None: task_id = uuid.uuid4() if task_type.is_service_account_level(): level = TaskLevel.SERVICE_ACCOUNT else: level = TaskLevel.CLIENT_ACCOUNT converted_parameters = task_type.convert_parameters(parameters) if task_type.uses_function_execution_model: function = task_type.execute return Task(task_id, level, report_function=function, parameters=converted_parameters) else: steps = task_type.create_steps(converted_parameters) return Task(task_id, level, steps)
[docs] def get_task_type(self, platform: PlatformType, task_type_name: str, version: int) -> type[AbstractTaskType]: """ Gets task type with given name. :param platform: Platform. :param task_type_name: Name of the task type. :param version: Task type version. :return: Task type. """ try: return self._task_types.get(platform, task_type_name).get_task_type(version) except ValueError: raise TaskTypeNotFound(platform, task_type_name, version)