from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Any
from collections.abc import Callable
from uuid import UUID
import pandas
from ppc_robot_lib import output
from ppc_robot_lib.credentials import AccountCredentials, UserCredentials
from ppc_robot_lib import models
from ppc_robot_lib.platform import PlatformType
from ppc_robot_lib.steps import abstract_step
from ppc_robot_lib.tasks import task_context_factory
from ppc_robot_lib.throttling import ServiceContainerInterface
from ppc_robot_lib.work_set import WorkSet
class StepPerformance:
def __init__(self, size=None, rows_in=None, rows_out=None, deep_size=True):
self.name = None
if isinstance(size, pandas.Series | pandas.DataFrame):
self.size = len(size)
else:
self.size = size
self.rows_in = rows_in
self.rows_out = rows_out
self.time = 0
self.client_id = None
def assign_step(self, task_ctx: TaskContextInterface, step: abstract_step.AbstractStep, time=None):
self.name = step.get_name()
if time is not None:
self.time = time
if task_ctx is not None and self.client_id is None and task_ctx.client_id is not None:
self.client_id = task_ctx.client_id
task_ctx.performance.append_step_performance(self)
class TaskPerformance:
def __init__(self):
self.total_time = 0
self.total_rows_input = 0
self.total_rows_output = 0
self.step_performance: list[StepPerformance] = []
def add_input_rows(self, rows):
self.total_rows_input += rows
def add_output_rows(self, rows):
self.total_rows_output += rows
def append_step_performance(self, step_performance: StepPerformance):
self.step_performance.append(step_performance)
class TaskStatusReporterInterface(ABC):
@abstractmethod
def on_step_start(self, step_stack: list[str]):
pass
@abstractmethod
def on_step_end(self, step_stack: list[str]):
pass
class TargetObject:
"""
Specification of an object that is the current target of the task.
"""
def __init__(
self,
*,
service_account: models.ServiceAccount = None,
client_account: models.ClientAccount = None,
client: models.Client = None,
):
self._service_account = service_account
self._client_account = client_account
self._client = client
@classmethod
def create_for_client_account(cls, client_account: models.ClientAccount) -> TargetObject:
return cls(service_account=client_account.service_account, client_account=client_account)
@classmethod
def create_for_service_account(cls, service_account: models.ServiceAccount) -> TargetObject:
return cls(service_account=service_account)
@classmethod
def create_for_client(cls, client: models.Client) -> TargetObject:
return cls(client=client)
@classmethod
def create_undefined(cls):
return cls()
def is_client_account(self):
return self._service_account is not None and self._client_account is not None and self._client is None
def is_service_account(self):
return self._service_account is not None and self._client_account is None and self._client is None
def is_client(self):
return self._client is not None and self._service_account is None and self._client_account is None
@property
def service_account(self) -> models.ServiceAccount | None:
return self._service_account
@property
def client_account(self) -> models.ClientAccount | None:
return self._client_account
@property
def client(self) -> models.Client | None:
return self._client
@property
def platform(self) -> PlatformType | None:
if self._service_account:
return self._service_account.type
return None
[docs]
class TaskContextInterface(ABC):
"""
Interface describing the task context, which is passed to each step during its execution.
"""
@property
@abstractmethod
def job(self) -> models.Job:
"""
:return: Job descriptor.
"""
pass
@property
@abstractmethod
def task_id(self) -> UUID:
"""
:return: UUID of the task.
"""
pass
@property
@abstractmethod
def work_set(self) -> WorkSet:
"""
:return: WorkSet of the task that is mutated during the execution.
"""
pass
@property
@abstractmethod
def credentials(self) -> AccountCredentials:
"""
:return: Credentials for the given :term:`Service Account`.
"""
pass
@property
@abstractmethod
def user_credentials(self) -> UserCredentials:
"""
:return: OAuth2 credentials of the user -- can be used to access his Google Drive.
"""
pass
@property
@abstractmethod
def user_locale(self) -> str:
"""
:return: User's locale.
"""
pass
@property
@abstractmethod
def client_id(self) -> Any:
"""
:return: Client ID of :term:`Client Account`. Can be null for service account-level jobs.
"""
pass
@property
@abstractmethod
def client_login_id(self) -> Any:
"""
:return: Login Client ID of :term:`Client Account`. Can be null for service account-level jobs.
"""
pass
@property
@abstractmethod
def currency(self) -> str | None:
"""
:return: Currency code of :term:`Client Account`. Can be null for service account-level jobs.
"""
pass
@property
@abstractmethod
def performance(self) -> TaskPerformance:
"""
:return: Performance metrics for the whole task.
"""
pass
@property
@abstractmethod
def throttling_service_container(self) -> ServiceContainerInterface:
"""
:return: Optional container that handles throttling of various services.
"""
pass
@property
@abstractmethod
def error_reporter(self) -> Callable[[Exception], None]:
"""
:return: Callable that stores or logs an exception that might occur during the execution.
"""
pass
@property
@abstractmethod
def notifications(self) -> list[models.Notification]:
"""
:return: List of all notifications.
"""
pass
@property
@abstractmethod
def metric_cards(self) -> list[models.JobMetricCard]:
"""
:return: List of all metric cards.
"""
pass
@property
@abstractmethod
def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter:
pass
@output_adapter.setter
@abstractmethod
def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter):
pass
@property
@abstractmethod
def target_object(self) -> TargetObject:
pass
@property
@abstractmethod
def context_factory(self) -> task_context_factory.TaskContextFactory:
pass
[docs]
@abstractmethod
def add_notification(self, notification: models.Notification):
"""
Adds a new notification.
:param ppc_robot_lib.models.Notification notification: Notification to add.
"""
pass
[docs]
@abstractmethod
def clear_notifications(self):
"""
Clears all notifications.
"""
pass
[docs]
def add_metric_card(self, metric_card: models.JobMetricCard) -> None:
"""
Adds a new metric card to current job. These cards will be saved if the job completes successfully.
:param ppc_robot_lib.models.JobMetricCard metric_card: Metric card to add.
"""
pass
[docs]
@abstractmethod
def step_begin(self, step_label: str):
"""
Should be called by the executor when a step is about to begin.
Can be used for reporting of the current state. Implementation should handle nesting of steps and store
the full stack.
:param step_label: String representation of the current step.
"""
pass
[docs]
@abstractmethod
def step_end(self):
"""
Should be called by the executor when a step has ended. It has to be paired with call to :py:meth:`step_begin`.
"""
pass
[docs]
def create_derived_ctx(self, target_object: TargetObject) -> TaskContextInterface:
"""
Creates a new derived context.
:param target_object: Object that will be set as target in the derived context.
:return: New derived context that inherits from the current context.
"""
return self.context_factory.create_derived_context(self, target_object)
[docs]
class TaskContext(TaskContextInterface):
"""
Class used to store context of the currently executing task. The same instance is passed to each step.
"""
def __init__(
self,
job: models.Job,
task_id: UUID,
work_set: WorkSet,
credentials: AccountCredentials | None,
user_credentials: UserCredentials,
locale: str = None,
client_id: Any = None,
currency: str = None,
throttling_service_container: ServiceContainerInterface = None,
error_reporter: Callable[[Exception], None] = None,
status_reporter: TaskStatusReporterInterface = None,
client_login_id: Any = None,
target_object: TargetObject = None,
context_factory: task_context_factory.TaskContextFactory = None,
):
if target_object is None:
raise ValueError('target_object must be set on TaskContext.')
self._job: models.Job = job
self._task_id: UUID = task_id
self._work_set: WorkSet = work_set
self._credentials: AccountCredentials = credentials
self._user_credentials = user_credentials
self._locale = locale
self._client_id = client_id
self._client_login_id = client_login_id
self._currency = currency
self._performance = TaskPerformance()
self._throttling_service_container = throttling_service_container
self._error_reporter = error_reporter
self._status_reporter = status_reporter
self._notifications: list[models.Notification] = []
self._metric_cards: list[models.JobMetricCard] = []
self._output_adapter = None
self._step_stack = []
self._target_object = target_object
self._context_factory = context_factory
@property
def job(self) -> models.Job:
"""
:return: Job ID.
"""
return self._job
@property
def task_id(self) -> UUID:
"""
:return: Task ID.
"""
return self._task_id
@property
def work_set(self) -> WorkSet:
"""
:return: Work Set.
"""
return self._work_set
@property
def credentials(self) -> AccountCredentials | None:
"""
:return: Account credentials.
"""
return self._credentials
@property
def user_credentials(self):
return self._user_credentials
@property
def user_locale(self):
return self._locale
@property
def client_id(self) -> Any:
return self._client_id
@property
def client_login_id(self) -> Any:
return self._client_login_id
@property
def currency(self) -> str:
return self._currency
@property
def performance(self) -> TaskPerformance:
return self._performance
@property
def throttling_service_container(self) -> ServiceContainerInterface:
return self._throttling_service_container
@property
def error_reporter(self) -> Callable[[Exception], None]:
return self._error_reporter
@property
def notifications(self) -> list[models.Notification]:
return self._notifications
@property
def metric_cards(self) -> list[models.JobMetricCard]:
return self._metric_cards
@property
def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter:
return self._output_adapter
@output_adapter.setter
def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter):
self._output_adapter = value
@property
def target_object(self) -> TargetObject:
return self._target_object
@property
def context_factory(self) -> task_context_factory.TaskContextFactory:
return self._context_factory
[docs]
def add_notification(self, notification):
"""
:param ppc_robot_lib.models.Notification notification: Notification to add.
"""
self._notifications.append(notification)
[docs]
def clear_notifications(self):
self._notifications.clear()
[docs]
def add_metric_card(self, metric_card: models.JobMetricCard) -> None:
self._metric_cards.append(metric_card)
[docs]
def step_begin(self, step_label: str):
self._step_stack.append(step_label)
if self._status_reporter is not None:
self._status_reporter.on_step_start(self._step_stack)
[docs]
def step_end(self):
if self._status_reporter is not None:
self._status_reporter.on_step_start(self._step_stack)
if len(self._step_stack) > 0:
self._step_stack.pop()
[docs]
class DerivedTaskContext(TaskContextInterface):
"""
Represents a derived context. Shares job ID, task ID and credentials with the parent context and has it's own
work set and client ID.
"""
def __init__(
self,
parent: TaskContextInterface,
work_set: WorkSet,
client_id: Any = None,
currency: str = None,
client_login_id: str = None,
target_object: TargetObject = None,
credentials: AccountCredentials | None = None,
):
if target_object is None:
raise ValueError('target_object must be set on DerivedTaskContext')
self._parent = parent
self._work_set = work_set
self._client_id = client_id
self._currency = currency
self._client_login_id = client_login_id
self._target_object = target_object
self._credentials = credentials
@property
def parent(self) -> TaskContextInterface:
return self._parent
@property
def job(self) -> models.Job:
return self._parent.job
@property
def task_id(self) -> UUID:
return self._parent.task_id
@property
def work_set(self) -> WorkSet:
return self._work_set
@property
def credentials(self) -> AccountCredentials:
if self._credentials:
return self._credentials
else:
return self._parent.credentials
@property
def user_credentials(self):
return self._parent.user_credentials
@property
def user_locale(self):
return self._parent.user_locale
@property
def client_id(self) -> Any:
return self._client_id
@property
def client_login_id(self) -> Any:
return self._client_login_id
@property
def currency(self) -> str:
return self._currency
@property
def performance(self):
return self._parent.performance
@property
def throttling_service_container(self) -> ServiceContainerInterface:
return self._parent.throttling_service_container
@property
def error_reporter(self) -> Callable[[Exception], None]:
return self._parent.error_reporter
@property
def notifications(self):
return self._parent.notifications
@property
def metric_cards(self) -> list[models.JobMetricCard]:
return self._parent.metric_cards
@property
def output_adapter(self) -> output.abstract_adapter.AbstractOutputAdapter:
return self._parent.output_adapter
@output_adapter.setter
def output_adapter(self, value: output.abstract_adapter.AbstractOutputAdapter):
self._parent.output_adapter = value
@property
def target_object(self) -> TargetObject:
return self._target_object
@property
def context_factory(self) -> task_context_factory.TaskContextFactory:
return self._parent.context_factory
[docs]
def add_notification(self, notification):
self._parent.add_notification(notification)
[docs]
def clear_notifications(self):
self._parent.clear_notifications()
[docs]
def add_metric_card(self, metric_card: models.JobMetricCard) -> None:
self._parent.add_metric_card(metric_card)
[docs]
def step_begin(self, step_label: str):
self._parent.step_begin(step_label)
[docs]
def step_end(self):
self._parent.step_end()