import numpy
import pandas
from django.db import transaction
from ppc_robot_lib.models import ClientAccount, Job
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface
[docs]
class JoinAccountInfoStep(AbstractStep):
"""
Fetches account information and settings and joins them to the specified table. The accounts are fetched
either by value in column ``client_id_column``, or by client ID specified in task context. Lookups are
always performed by the ``ext_id`` column in the :py:class:`ppc_robot_lib.models.client_account.ClientAccount`
model.
Properties is either a list of columns to be fetched, any field from
:py:class:`ppc_robot_lib.models.client_account.ClientAccount` can be used. If the column already exits in the table,
specified suffix is appended to the result name (``_account`` by default).
"""
def __init__(
self,
table: str,
properties=list[str],
client_id_column: str = None,
context_client_id: bool = False,
suffix='_account',
):
"""
:param table: Table name to which the account information should be added.
:param properties: Properties of the :py:class:`ppc_robot_lib.models.client_account.ClientAccount` model which
should be added as columns to the table.
:param client_id_column: Use the specified column values as input for the lookup.
:param context_client_id: Set to ``True`` if you would like to use client_id stored in the task context.
:param suffix: Suffix to add to the columns if they already exists in the table.
"""
if client_id_column is None and context_client_id is False:
raise ValueError('You must either specify client_id_column, or set context_client_id to True.')
self.table = table
self.properties = properties
self.client_id_column = client_id_column
self.context_client_id = context_client_id
self.suffix = suffix
def execute(self, task_ctx: TaskContextInterface) -> StepPerformance:
table = task_ctx.work_set.get_table(self.table)
select_columns = self.properties.copy()
keys = self.properties
select_columns.append('ext_id')
with transaction.atomic():
user_credentials = task_ctx.user_credentials
service_account_id = task_ctx.target_object.service_account.id
if self.client_id_column is not None:
ext_ids = table[self.client_id_column].unique().tolist()
client_account_list = (
ClientAccount.objects.filter(service_account_id=service_account_id, ext_id__in=ext_ids)
.only(*select_columns)
.values()
)
# Determine join column type for the conversion.
col_type = table[self.client_id_column].dtype
if numpy.issubdtype(col_type, numpy.integer):
transform = int
elif numpy.issubdtype(col_type, numpy.floating):
transform = float
else:
transform = str
# Create index from ext_ids.
idx = [transform(row['ext_id']) for row in client_account_list]
# Import results to DataFrame.
client_list_table = pandas.DataFrame(data=list(client_account_list), index=idx, columns=keys)
# Join the account information to the main table.
new_table = pandas.merge(
table,
client_list_table,
how='left',
left_on=self.client_id_column,
right_index=True,
sort=False,
suffixes=('', self.suffix),
)
# Replace the table in the workset.
task_ctx.work_set.delete_table(self.table)
task_ctx.work_set.set_table(self.table, new_table)
return StepPerformance(rows_out=len(new_table.index))
else:
client_id = task_ctx.client_id
if client_id is None:
raise ValueError('Client ID is unknown, cannot fetch account information!')
client_accounts = ClientAccount.objects.filter(
user_id=user_credentials.user_id, service_account_id=service_account_id, ext_id=client_id
).only(*select_columns)
client_account = client_accounts.first() # type: ClientAccount
if client_account is None:
raise ValueError(
f'Client account with ID {client_id} for user {user_credentials.user_id} was not found!'
)
col_iter = self.properties
for column in col_iter:
if column not in table:
table[column] = getattr(client_account, column)
else:
table[column + self.suffix] = getattr(client_account, column)
return StepPerformance(rows_out=1)