import time
from typing import Any
from collections.abc import Callable, Iterable
import contextvars
from celery.exceptions import SoftTimeLimitExceeded
from ppc_robot_lib import models
from ppc_robot_lib import tasks
from ppc_robot_lib.generic_error import GenericError
from ppc_robot_lib.steps import abstract_step
from ppc_robot_lib.tasks.exceptions import TaskTypeDisabledError
from ppc_robot_lib.tasks.task import ReportFunc, Task
from ppc_robot_lib.tasks.task_context import TargetObject, TaskContext, TaskStatusReporterInterface
from ppc_robot_lib.tasks.task_context_factory import TaskContextFactory
from ppc_robot_lib.throttling import ServiceContainerInterface
class TaskExecutionException(Exception):
def __init__(self, *args: Any, **kwargs) -> None:
super().__init__(*args)
if 'task_ctx' in kwargs:
self.task_ctx: TaskContext = kwargs['task_ctx']
else:
self.task_ctx: TaskContext = None
self.message = GenericError.UNKNOWN_MESSAGE
[docs]
class TaskExecutor:
"""
Executes tasks with the given account credentials: creates a working set, prepares context and iterates through
task steps and executes them.
"""
def __init__(self, context_factory: TaskContextFactory):
self._context_factory = context_factory
def execute(
self,
job: 'models.Job',
task: Task,
user_id: int,
target_object: TargetObject,
locale: str = None,
throttling_service_container: ServiceContainerInterface = None,
error_reporter: Callable[[Exception], None] = None,
status_reporter: TaskStatusReporterInterface = None,
) -> TaskContext:
task_ctx = self._context_factory.create_root_context(
job, task, user_id, locale, target_object, throttling_service_container, error_reporter, status_reporter
)
execution_exception = None
start = time.perf_counter()
try:
self.execute_task(task, task_ctx)
except TaskTypeDisabledError:
# Raise the exception as-is without wrapping it into TaskExecutionException.
raise
except SoftTimeLimitExceeded as exc:
# Raise the execution exception with a different error message.
execution_exception = exc
raise TaskExecutionException('Time-limit reached when executing the report.', task_ctx=task_ctx) from exc
except Exception as exc:
execution_exception = exc
raise TaskExecutionException('Error during execution of the report.', task_ctx=task_ctx) from exc
finally:
output_exception = None
try:
self.finalize_output(job, task_ctx, execution_exception)
except Exception as exc:
output_exception = exc
from ppc_robot_lib.report_fields import registry
registry.clear()
end = time.perf_counter()
task_ctx.performance.total_time = end - start
if output_exception:
raise output_exception
return task_ctx
def execute_task(self, task: Task, task_ctx: TaskContext):
if task.uses_function_execution_model:
execution_ctx = contextvars.copy_context()
execution_ctx.run(
self.execute_function,
report_function=task.report_function,
parameters=task.parameters,
task_ctx=task_ctx,
)
else:
self.execute_steps(task.steps, task_ctx)
def execute_steps(self, steps: Iterable['abstract_step.AbstractStep'], task_ctx: TaskContext):
for step in steps:
step_label = step.get_label()
task_ctx.step_begin(step_label)
step.execute(task_ctx)
task_ctx.step_end()
def execute_function(self, report_function: ReportFunc, parameters: dict[str, Any], task_ctx: TaskContext):
# Initialize global context:
tasks.context.set(task_ctx)
# Run the report function:
report_function(parameters)
def finalize_output(self, job: 'models.Job', task_ctx: TaskContext, exc: BaseException):
if task_ctx.output_adapter:
task_ctx.output_adapter.finalize(task_ctx, exc)
output_ctx = task_ctx.output_adapter.output_ctx
job.output_path = output_ctx.output_path
job.output_state_json = output_ctx.state
job.output_state_type = job.output_type
job.save(update_fields=['output_path', 'output_state_json', 'output_state_type'])