Source code for ppc_robot_lib.steps.control.for_each_client_union

from typing import Any
from collections.abc import Iterable
import pandas
from ppc_robot_lib import accounts
from ppc_robot_lib import WorkSet
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib.tasks import TaskContextInterface, StepPerformance
from ppc_robot_lib.steps.control.for_each_client import ForEachClientStep


[docs] class ForEachClientUnionStep(ForEachClientStep): """ Executes the given list of ``steps`` for each client account specified by the ``selector``. Then takes a table specified by the ``inner_table`` argument of each client and merges them together (union) into ``output_table``. ``selector`` can be instance of the following classes: * :py:class:`ppc_robot_lib.client_selector.PredefinedClientSelector`: iterates over a given set of Client IDs. Useful when the user has to select for which accounts the report should run. When executed, it runs the given set of steps once for each client. Each execution begins with an empty :py:class:`ppc_robot_lib.work_set.WorkSet`. A table of given name (``account_performance``) should be produced in each of these WorkSets. Suppose that the execution produced the following two tables: ==================== ============= ======== ExternalCustomerId Impressions Clicks ==================== ============= ======== 111222333 256 16 ==================== ============= ======== ==================== ============= ======== ExternalCustomerId Impressions Clicks ==================== ============= ======== 444555666 128 32 ==================== ============= ======== After all executions are done, the tables are merged (like in SQL's UNION) together: ==================== ============= ======== ExternalCustomerId Impressions Clicks ==================== ============= ======== 111222333 256 16 444555666 128 32 ==================== ============= ======== """ def __init__( self, selector: 'accounts.ClientSelectorInterface', inner_table: str, output_table: str = None, steps: Iterable[AbstractStep] = None, parallel_workers=None, ): """ :param selector: Account selector. :param inner_table: Name of the table that is merged from each client. The table must be produced by ``steps``. :param output_table: Name of the output table. :param steps: Steps to execute for each client. """ super().__init__(selector, steps, parallel_workers) self.inner_table = inner_table if output_table is not None: self.output_table = output_table else: self.output_table = inner_table def gather(self, task_ctx: TaskContextInterface, work_sets: dict[Any, WorkSet]): tables = [] for work_set in work_sets.values(): tables.append(work_set.get_table(self.inner_table)) if len(tables) > 0: new_table = pandas.concat(tables, ignore_index=True, sort=True) # type: pandas.DataFrame else: new_table = pandas.DataFrame() task_ctx.work_set.set_table(self.output_table, new_table) return StepPerformance(new_table, rows_out=len(new_table.index))