import uuid
from typing import Any
from ppc_robot_lib.platform import PlatformType
from ppc_robot_lib.tasks.abstract_task_type import AbstractTaskType
from ppc_robot_lib.tasks.task import Task
from ppc_robot_lib.tasks.types import TaskLevel
from ppc_robot_lib.tasks.task_id_allocator import TaskIdAllocator
class Error(Exception):
"""
Base class for exceptions raised by TaskFactory.
"""
pass
class TaskTypeNotFound(Error):
"""
Signals that the task type with given name does not exist.
"""
def __init__(self, platform: PlatformType, task_type_name: str, task_type_version: int):
self.platform = platform
self.name = task_type_name
self.version = task_type_version
[docs]
class TaskFactory:
"""
Creates task from job definition. Contains dictionary of named task types.
"""
def __init__(self, task_id_allocator: TaskIdAllocator | None = None):
"""
:param task_id_allocator: Deprecated service for allocating task IDs. Never used, kept for
backwards compatibility.
"""
from ppc_robot_task_types import task_types
self._task_types = task_types.supported_types
[docs]
def create_task(
self, task_id: uuid.UUID, platform: PlatformType, task_type_name: str, version: int, parameters: dict[str, Any]
):
"""
Creates a task of given type with given parameters.
:param task_id: Assigned Task ID. If not given, a new one is generated.
:param platform: Platform.
:param task_type_name: Name of the task type.
:param version: Task type version
:param parameters: Task parameters.
:return: Created task.
"""
task_type_cls = self.get_task_type(platform, task_type_name, version)
task_type = task_type_cls()
return self.create_task_from_instance(task_type, parameters, task_id)
@staticmethod
def create_task_from_instance(
task_type: AbstractTaskType, parameters: dict[str, Any], task_id: uuid.UUID | None = None
) -> Task:
if task_id is None:
task_id = uuid.uuid4()
if task_type.is_service_account_level():
level = TaskLevel.SERVICE_ACCOUNT
else:
level = TaskLevel.CLIENT_ACCOUNT
converted_parameters = task_type.convert_parameters(parameters)
if task_type.uses_function_execution_model:
function = task_type.execute
return Task(task_id, level, report_function=function, parameters=converted_parameters)
else:
steps = task_type.create_steps(converted_parameters)
return Task(task_id, level, steps)
[docs]
def get_task_type(self, platform: PlatformType, task_type_name: str, version: int) -> type[AbstractTaskType]:
"""
Gets task type with given name.
:param platform: Platform.
:param task_type_name: Name of the task type.
:param version: Task type version.
:return: Task type.
"""
try:
return self._task_types.get(platform, task_type_name).get_task_type(version)
except ValueError:
raise TaskTypeNotFound(platform, task_type_name, version)