Source code for ppc_robot_lib.tasks.task_executor

import time
from typing import Any
from collections.abc import Callable, Iterable

import contextvars
from celery.exceptions import SoftTimeLimitExceeded

from ppc_robot_lib import models
from ppc_robot_lib import tasks
from ppc_robot_lib.generic_error import GenericError
from ppc_robot_lib.steps import abstract_step
from ppc_robot_lib.tasks.exceptions import TaskTypeDisabledError
from ppc_robot_lib.tasks.task import ReportFunc, Task
from ppc_robot_lib.tasks.task_context import TargetObject, TaskContext, TaskStatusReporterInterface
from ppc_robot_lib.tasks.task_context_factory import TaskContextFactory
from ppc_robot_lib.throttling import ServiceContainerInterface


class TaskExecutionException(Exception):
    def __init__(self, *args: Any, **kwargs) -> None:
        super().__init__(*args)
        if 'task_ctx' in kwargs:
            self.task_ctx: TaskContext = kwargs['task_ctx']
        else:
            self.task_ctx: TaskContext = None
        self.message = GenericError.UNKNOWN_MESSAGE


[docs] class TaskExecutor: """ Executes tasks with the given account credentials: creates a working set, prepares context and iterates through task steps and executes them. """ def __init__(self, context_factory: TaskContextFactory): self._context_factory = context_factory def execute( self, job: 'models.Job', task: Task, user_id: int, target_object: TargetObject, locale: str = None, throttling_service_container: ServiceContainerInterface = None, error_reporter: Callable[[Exception], None] = None, status_reporter: TaskStatusReporterInterface = None, ) -> TaskContext: task_ctx = self._context_factory.create_root_context( job, task, user_id, locale, target_object, throttling_service_container, error_reporter, status_reporter ) execution_exception = None start = time.perf_counter() try: self.execute_task(task, task_ctx) except TaskTypeDisabledError: # Raise the exception as-is without wrapping it into TaskExecutionException. raise except SoftTimeLimitExceeded as exc: # Raise the execution exception with a different error message. execution_exception = exc raise TaskExecutionException('Time-limit reached when executing the report.', task_ctx=task_ctx) from exc except Exception as exc: execution_exception = exc raise TaskExecutionException('Error during execution of the report.', task_ctx=task_ctx) from exc finally: output_exception = None try: self.finalize_output(job, task_ctx, execution_exception) except Exception as exc: output_exception = exc from ppc_robot_lib.report_fields import registry registry.clear() end = time.perf_counter() task_ctx.performance.total_time = end - start if output_exception: raise output_exception return task_ctx def execute_task(self, task: Task, task_ctx: TaskContext): if task.uses_function_execution_model: execution_ctx = contextvars.copy_context() execution_ctx.run( self.execute_function, report_function=task.report_function, parameters=task.parameters, task_ctx=task_ctx, ) else: self.execute_steps(task.steps, task_ctx) def execute_steps(self, steps: Iterable['abstract_step.AbstractStep'], task_ctx: TaskContext): for step in steps: step_label = step.get_label() task_ctx.step_begin(step_label) step.execute(task_ctx) task_ctx.step_end() def execute_function(self, report_function: ReportFunc, parameters: dict[str, Any], task_ctx: TaskContext): # Initialize global context: tasks.context.set(task_ctx) # Run the report function: report_function(parameters) def finalize_output(self, job: 'models.Job', task_ctx: TaskContext, exc: BaseException): if task_ctx.output_adapter: task_ctx.output_adapter.finalize(task_ctx, exc) output_ctx = task_ctx.output_adapter.output_ctx job.output_path = output_ctx.output_path job.output_state_json = output_ctx.state job.output_state_type = job.output_type job.save(update_fields=['output_path', 'output_state_json', 'output_state_type'])