Source code for ppc_robot_lib.tasks.task_context

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import Any
from collections.abc import Callable
from uuid import UUID

import pandas

from ppc_robot_lib import output
from ppc_robot_lib.credentials import AccountCredentials, UserCredentials
from ppc_robot_lib import models
from ppc_robot_lib.platform import PlatformType
from ppc_robot_lib.steps import abstract_step
from ppc_robot_lib.tasks import task_context_factory
from ppc_robot_lib.throttling import ServiceContainerInterface
from ppc_robot_lib.work_set import WorkSet


class StepPerformance:
    def __init__(self, size=None, rows_in=None, rows_out=None, deep_size=True):
        self.name = None
        if isinstance(size, pandas.Series | pandas.DataFrame):
            self.size = len(size)
        else:
            self.size = size
        self.rows_in = rows_in
        self.rows_out = rows_out
        self.time = 0
        self.client_id = None

    def assign_step(self, task_ctx: TaskContextInterface, step: abstract_step.AbstractStep, time=None):
        self.name = step.get_name()
        if time is not None:
            self.time = time
        if task_ctx is not None and self.client_id is None and task_ctx.client_id is not None:
            self.client_id = task_ctx.client_id
        task_ctx.performance.append_step_performance(self)


class TaskPerformance:
    def __init__(self):
        self.total_time = 0
        self.total_rows_input = 0
        self.total_rows_output = 0
        self.step_performance: list[StepPerformance] = []

    def add_input_rows(self, rows):
        self.total_rows_input += rows

    def add_output_rows(self, rows):
        self.total_rows_output += rows

    def append_step_performance(self, step_performance: StepPerformance):
        self.step_performance.append(step_performance)


class TaskStatusReporterInterface(ABC):
    @abstractmethod
    def on_step_start(self, step_stack: list[str]):
        pass

    @abstractmethod
    def on_step_end(self, step_stack: list[str]):
        pass


class TargetObject:
    """
    Specification of an object that is the current target of the task.
    """

    def __init__(
        self,
        *,
        service_account: models.ServiceAccount = None,
        client_account: models.ClientAccount = None,
        client: models.Client = None,
    ):
        self._service_account = service_account
        self._client_account = client_account
        self._client = client

    @classmethod
    def create_for_client_account(cls, client_account: models.ClientAccount) -> TargetObject:
        return cls(service_account=client_account.service_account, client_account=client_account)

    @classmethod
    def create_for_service_account(cls, service_account: models.ServiceAccount) -> TargetObject:
        return cls(service_account=service_account)

    @classmethod
    def create_for_client(cls, client: models.Client) -> TargetObject:
        return cls(client=client)

    @classmethod
    def create_undefined(cls):
        return cls()

    def is_client_account(self):
        return self._service_account is not None and self._client_account is not None and self._client is None

    def is_service_account(self):
        return self._service_account is not None and self._client_account is None and self._client is None

    def is_client(self):
        return self._client is not None and self._service_account is None and self._client_account is None

    @property
    def service_account(self) -> models.ServiceAccount | None:
        return self._service_account

    @property
    def client_account(self) -> models.ClientAccount | None:
        return self._client_account

    @property
    def client(self) -> models.Client | None:
        return self._client

    @property
    def platform(self) -> PlatformType | None:
        if self._service_account:
            return self._service_account.type

        return None


[docs] class TaskContextInterface(ABC): """ Interface describing the task context, which is passed to each step during its execution. """ @property @abstractmethod def job(self) -> models.Job: """ :return: Job descriptor. """ pass @property @abstractmethod def task_id(self) -> UUID: """ :return: UUID of the task. """ pass @property @abstractmethod def work_set(self) -> WorkSet: """ :return: WorkSet of the task that is mutated during the execution. """ pass @property @abstractmethod def credentials(self) -> AccountCredentials: """ :return: Credentials for the given :term:`Service Account`. """ pass @property @abstractmethod def user_credentials(self) -> UserCredentials: """ :return: OAuth2 credentials of the user -- can be used to access his Google Drive. """ pass @property @abstractmethod def user_locale(self) -> str: """ :return: User's locale. """ pass @property @abstractmethod def client_id(self) -> Any: """ :return: Client ID of :term:`Client Account`. Can be null for service account-level jobs. """ pass @property @abstractmethod def client_login_id(self) -> Any: """ :return: Login Client ID of :term:`Client Account`. Can be null for service account-level jobs. """ pass @property @abstractmethod def currency(self) -> str | None: """ :return: Currency code of :term:`Client Account`. Can be null for service account-level jobs. """ pass @property @abstractmethod def performance(self) -> TaskPerformance: """ :return: Performance metrics for the whole task. """ pass @property @abstractmethod def throttling_service_container(self) -> ServiceContainerInterface: """ :return: Optional container that handles throttling of various services. """ pass @property @abstractmethod def error_reporter(self) -> Callable[[Exception], None]: """ :return: Callable that stores or logs an exception that might occur during the execution. """ pass @property @abstractmethod def notifications(self) -> list[models.Notification]: """ :return: List of all notifications. """ pass @property @abstractmethod def metric_cards(self) -> list[models.JobMetricCard]: """ :return: List of all metric cards. """ pass @property @abstractmethod def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter: pass @output_adapter.setter @abstractmethod def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter): pass @property @abstractmethod def target_object(self) -> TargetObject: pass @property @abstractmethod def context_factory(self) -> task_context_factory.TaskContextFactory: pass
[docs] @abstractmethod def add_notification(self, notification: models.Notification): """ Adds a new notification. :param ppc_robot_lib.models.Notification notification: Notification to add. """ pass
[docs] @abstractmethod def clear_notifications(self): """ Clears all notifications. """ pass
[docs] def add_metric_card(self, metric_card: models.JobMetricCard) -> None: """ Adds a new metric card to current job. These cards will be saved if the job completes successfully. :param ppc_robot_lib.models.JobMetricCard metric_card: Metric card to add. """ pass
[docs] @abstractmethod def step_begin(self, step_label: str): """ Should be called by the executor when a step is about to begin. Can be used for reporting of the current state. Implementation should handle nesting of steps and store the full stack. :param step_label: String representation of the current step. """ pass
[docs] @abstractmethod def step_end(self): """ Should be called by the executor when a step has ended. It has to be paired with call to :py:meth:`step_begin`. """ pass
[docs] def create_derived_ctx(self, target_object: TargetObject) -> TaskContextInterface: """ Creates a new derived context. :param target_object: Object that will be set as target in the derived context. :return: New derived context that inherits from the current context. """ return self.context_factory.create_derived_context(self, target_object)
[docs] class TaskContext(TaskContextInterface): """ Class used to store context of the currently executing task. The same instance is passed to each step. """ def __init__( self, job: models.Job, task_id: UUID, work_set: WorkSet, credentials: AccountCredentials | None, user_credentials: UserCredentials, locale: str = None, client_id: Any = None, currency: str = None, throttling_service_container: ServiceContainerInterface = None, error_reporter: Callable[[Exception], None] = None, status_reporter: TaskStatusReporterInterface = None, client_login_id: Any = None, target_object: TargetObject = None, context_factory: task_context_factory.TaskContextFactory = None, ): if target_object is None: raise ValueError('target_object must be set on TaskContext.') self._job: models.Job = job self._task_id: UUID = task_id self._work_set: WorkSet = work_set self._credentials: AccountCredentials = credentials self._user_credentials = user_credentials self._locale = locale self._client_id = client_id self._client_login_id = client_login_id self._currency = currency self._performance = TaskPerformance() self._throttling_service_container = throttling_service_container self._error_reporter = error_reporter self._status_reporter = status_reporter self._notifications: list[models.Notification] = [] self._metric_cards: list[models.JobMetricCard] = [] self._output_adapter = None self._step_stack = [] self._target_object = target_object self._context_factory = context_factory @property def job(self) -> models.Job: """ :return: Job ID. """ return self._job @property def task_id(self) -> UUID: """ :return: Task ID. """ return self._task_id @property def work_set(self) -> WorkSet: """ :return: Work Set. """ return self._work_set @property def credentials(self) -> AccountCredentials | None: """ :return: Account credentials. """ return self._credentials @property def user_credentials(self): return self._user_credentials @property def user_locale(self): return self._locale @property def client_id(self) -> Any: return self._client_id @property def client_login_id(self) -> Any: return self._client_login_id @property def currency(self) -> str: return self._currency @property def performance(self) -> TaskPerformance: return self._performance @property def throttling_service_container(self) -> ServiceContainerInterface: return self._throttling_service_container @property def error_reporter(self) -> Callable[[Exception], None]: return self._error_reporter @property def notifications(self) -> list[models.Notification]: return self._notifications @property def metric_cards(self) -> list[models.JobMetricCard]: return self._metric_cards @property def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter: return self._output_adapter @output_adapter.setter def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter): self._output_adapter = value @property def target_object(self) -> TargetObject: return self._target_object @property def context_factory(self) -> task_context_factory.TaskContextFactory: return self._context_factory
[docs] def add_notification(self, notification): """ :param ppc_robot_lib.models.Notification notification: Notification to add. """ self._notifications.append(notification)
[docs] def clear_notifications(self): self._notifications.clear()
[docs] def add_metric_card(self, metric_card: models.JobMetricCard) -> None: self._metric_cards.append(metric_card)
[docs] def step_begin(self, step_label: str): self._step_stack.append(step_label) if self._status_reporter is not None: self._status_reporter.on_step_start(self._step_stack)
[docs] def step_end(self): if self._status_reporter is not None: self._status_reporter.on_step_start(self._step_stack) if len(self._step_stack) > 0: self._step_stack.pop()
[docs] class DerivedTaskContext(TaskContextInterface): """ Represents a derived context. Shares job ID, task ID and credentials with the parent context and has it's own work set and client ID. """ def __init__( self, parent: TaskContextInterface, work_set: WorkSet, client_id: Any = None, currency: str = None, client_login_id: str = None, target_object: TargetObject = None, credentials: AccountCredentials | None = None, ): if target_object is None: raise ValueError('target_object must be set on DerivedTaskContext') self._parent = parent self._work_set = work_set self._client_id = client_id self._currency = currency self._client_login_id = client_login_id self._target_object = target_object self._credentials = credentials @property def parent(self) -> TaskContextInterface: return self._parent @property def job(self) -> models.Job: return self._parent.job @property def task_id(self) -> UUID: return self._parent.task_id @property def work_set(self) -> WorkSet: return self._work_set @property def credentials(self) -> AccountCredentials: if self._credentials: return self._credentials else: return self._parent.credentials @property def user_credentials(self): return self._parent.user_credentials @property def user_locale(self): return self._parent.user_locale @property def client_id(self) -> Any: return self._client_id @property def client_login_id(self) -> Any: return self._client_login_id @property def currency(self) -> str: return self._currency @property def performance(self): return self._parent.performance @property def throttling_service_container(self) -> ServiceContainerInterface: return self._parent.throttling_service_container @property def error_reporter(self) -> Callable[[Exception], None]: return self._parent.error_reporter @property def notifications(self): return self._parent.notifications @property def metric_cards(self) -> list[models.JobMetricCard]: return self._parent.metric_cards @property def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter: return self._parent.output_adapter @output_adapter.setter def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter): self._parent.output_adapter = value @property def target_object(self) -> TargetObject: return self._target_object @property def context_factory(self) -> task_context_factory.TaskContextFactory: return self._parent.context_factory
[docs] def add_notification(self, notification): self._parent.add_notification(notification)
[docs] def clear_notifications(self): self._parent.clear_notifications()
[docs] def add_metric_card(self, metric_card: models.JobMetricCard) -> None: self._parent.add_metric_card(metric_card)
[docs] def step_begin(self, step_label: str): self._parent.step_begin(step_label)
[docs] def step_end(self): self._parent.step_end()