Source code for ppc_robot_lib.steps.input.join_account_info

import numpy
import pandas
from django.db import transaction

from ppc_robot_lib.models import ClientAccount, Job
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface


[docs] class JoinAccountInfoStep(AbstractStep): """ Fetches account information and settings and joins them to the specified table. The accounts are fetched either by value in column ``client_id_column``, or by client ID specified in task context. Lookups are always performed by the ``ext_id`` column in the :py:class:`ppc_robot_lib.models.client_account.ClientAccount` model. Properties is either a list of columns to be fetched, any field from :py:class:`ppc_robot_lib.models.client_account.ClientAccount` can be used. If the column already exits in the table, specified suffix is appended to the result name (``_account`` by default). """ def __init__( self, table: str, properties=list[str], client_id_column: str = None, context_client_id: bool = False, suffix='_account', ): """ :param table: Table name to which the account information should be added. :param properties: Properties of the :py:class:`ppc_robot_lib.models.client_account.ClientAccount` model which should be added as columns to the table. :param client_id_column: Use the specified column values as input for the lookup. :param context_client_id: Set to ``True`` if you would like to use client_id stored in the task context. :param suffix: Suffix to add to the columns if they already exists in the table. """ if client_id_column is None and context_client_id is False: raise ValueError('You must either specify client_id_column, or set context_client_id to True.') self.table = table self.properties = properties self.client_id_column = client_id_column self.context_client_id = context_client_id self.suffix = suffix def execute(self, task_ctx: TaskContextInterface) -> StepPerformance: table = task_ctx.work_set.get_table(self.table) select_columns = self.properties.copy() keys = self.properties select_columns.append('ext_id') with transaction.atomic(): user_credentials = task_ctx.user_credentials service_account_id = task_ctx.target_object.service_account.id if self.client_id_column is not None: ext_ids = table[self.client_id_column].unique().tolist() client_account_list = ( ClientAccount.objects.filter(service_account_id=service_account_id, ext_id__in=ext_ids) .only(*select_columns) .values() ) # Determine join column type for the conversion. col_type = table[self.client_id_column].dtype if numpy.issubdtype(col_type, numpy.integer): transform = int elif numpy.issubdtype(col_type, numpy.floating): transform = float else: transform = str # Create index from ext_ids. idx = [transform(row['ext_id']) for row in client_account_list] # Import results to DataFrame. client_list_table = pandas.DataFrame(data=list(client_account_list), index=idx, columns=keys) # Join the account information to the main table. new_table = pandas.merge( table, client_list_table, how='left', left_on=self.client_id_column, right_index=True, sort=False, suffixes=('', self.suffix), ) # Replace the table in the workset. task_ctx.work_set.delete_table(self.table) task_ctx.work_set.set_table(self.table, new_table) return StepPerformance(rows_out=len(new_table.index)) else: client_id = task_ctx.client_id if client_id is None: raise ValueError('Client ID is unknown, cannot fetch account information!') client_accounts = ClientAccount.objects.filter( user_id=user_credentials.user_id, service_account_id=service_account_id, ext_id=client_id ).only(*select_columns) client_account = client_accounts.first() # type: ClientAccount if client_account is None: raise ValueError( f'Client account with ID {client_id} for user {user_credentials.user_id} was not found!' ) col_iter = self.properties for column in col_iter: if column not in table: table[column] = getattr(client_account, column) else: table[column + self.suffix] = getattr(client_account, column) return StepPerformance(rows_out=1)