from typing import Any
from collections.abc import Iterable
import pandas
from ppc_robot_lib import accounts
from ppc_robot_lib import WorkSet
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib.tasks import TaskContextInterface, StepPerformance
from ppc_robot_lib.steps.control.for_each_client import ForEachClientStep
[docs]
class ForEachClientUnionStep(ForEachClientStep):
"""
Executes the given list of ``steps`` for each client account specified by the ``selector``. Then takes a table
specified by the ``inner_table`` argument of each client and merges them together (union) into ``output_table``.
``selector`` can be instance of the following classes:
* :py:class:`ppc_robot_lib.client_selector.PredefinedClientSelector`: iterates over a given set of Client IDs.
Useful when the user has to select for which accounts the report should run.
When executed, it runs the given set of steps once for each client. Each execution begins with an empty
:py:class:`ppc_robot_lib.work_set.WorkSet`. A table of given name (``account_performance``) should be produced
in each of these WorkSets. Suppose that the execution produced the following two tables:
==================== ============= ========
ExternalCustomerId Impressions Clicks
==================== ============= ========
111222333 256 16
==================== ============= ========
==================== ============= ========
ExternalCustomerId Impressions Clicks
==================== ============= ========
444555666 128 32
==================== ============= ========
After all executions are done, the tables are merged (like in SQL's UNION) together:
==================== ============= ========
ExternalCustomerId Impressions Clicks
==================== ============= ========
111222333 256 16
444555666 128 32
==================== ============= ========
"""
def __init__(
self,
selector: 'accounts.ClientSelectorInterface',
inner_table: str,
output_table: str = None,
steps: Iterable[AbstractStep] = None,
parallel_workers=None,
):
"""
:param selector: Account selector.
:param inner_table: Name of the table that is merged from each client. The table must be produced by ``steps``.
:param output_table: Name of the output table.
:param steps: Steps to execute for each client.
"""
super().__init__(selector, steps, parallel_workers)
self.inner_table = inner_table
if output_table is not None:
self.output_table = output_table
else:
self.output_table = inner_table
def gather(self, task_ctx: TaskContextInterface, work_sets: dict[Any, WorkSet]):
tables = []
for work_set in work_sets.values():
tables.append(work_set.get_table(self.inner_table))
if len(tables) > 0:
new_table = pandas.concat(tables, ignore_index=True, sort=True) # type: pandas.DataFrame
else:
new_table = pandas.DataFrame()
task_ctx.work_set.set_table(self.output_table, new_table)
return StepPerformance(new_table, rows_out=len(new_table.index))