Source code for ppc_robot_lib.steps.transformations.group_by_and_aggregate

from collections import OrderedDict
from typing import Any
from collections.abc import Callable

import pandas
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface


Agg = str | Callable[[pandas.DataFrame], Callable[[pandas.Series], Any]]


[docs] class GroupByAndAggregateStep(AbstractStep): """ Groups the given table by one or more columns and performs aggregations on the columns. The ``columns`` argument is a name of the column or a list of columns that should be used for the group-by operation. The ``aggregations`` is a dictionary, key is a name of the column, value is either a name of the aggregation (method of :py:class:`pandas.core.groupby.DataFrameGroupBy`, see `Pandas GroupBy Documentation <https://pandas.pydata.org/pandas-docs/stable/api.html#groupby>`_ for list), or a callable. If you use callable, the callable must return another callable. The outer callable will receive a :py:class:`pandas.DataFrame`, the inner will receive only a column slice to aggregate. You can call methods like ``sum``, ``avg`` on the slice, or use it in functions provided by numpy. This is useful when you need, for instance, a weighted average. Please note that columns that are not present in neither ``columns`` or ``aggregations`` will be not present in the output table. Unlike :py:class:`ppc_robot_lib.steps.transformations.aggregate_by_column.AggregateByColumnStep` and :py:class:`ppc_robot_lib.steps.transformations.group_by_column.GroupByColumnStep`, this step returns a :py:class:`pandas.DataFrame`, not a :py:class:`pandas.core.groupby.DataFrameGroupBy`. **Example:** Assume we have the following table: .. csv-table:: in_table :header: "Campaign", "AdGroup", "Tag", "Impressions", "Quality" :widths: 20, 20, 20, 20, 20 "Campaign A", "Group 1", "product", 10, 1 "Campaign A", "Group 2", "product", 20, 10 "Campaign B", "Group 1", "remarketing", 14, 7 "Campaign B", "Group 2", "remarketing", 36, 5 "Campaign B", "Group 3", "remarketing", 52, 10 "Campaign C", "Group 1", "dsa", 14, 5 "Campaign D", "Group 1", "product", 0, 10 "Campaign D", "Group 2", "product", 10, 3 "Campaign E", "Group 1", "banner", 35, 8 We can use the following code to get weighted quality score:: >>> import numpy >>> from ppc_robot_lib.steps.transformations import GroupByAndAggregateStep >>> def agg_weighted_avg(df: pandas.DataFrame): ... return lambda quality: numpy.average(quality, weights=df.loc[quality.index, 'Impressions']) >>> GroupByAndAggregateStep("in_table", "Campaign", { ... "Tag": "first", ... "Impressions": "sum", ... "Quality": agg_weighted_avg, ... }) .. note:: The aggregation function receives only a slice of the ``Quality`` column in the argument ``quality``. However, we can use ``quality.index`` to access the corresponding columns in dataframe ``df``. Expression :: df.loc[quality.index, 'Impressions'] will get only the related values in the ``Impressions`` columns from the original, non-grouped dataframe. The resulting table: .. csv-table:: in_table :header: "Campaign", "Tag", "Impressions", "Quality" :widths: 20, 20, 20, 20 "Campaign A", "product", 30, 7 "Campaign B", "remarketing", 102, "7.8235..." "Campaign C", "dsa", 14, 5 "Campaign D", "product", 10, 3 "Campaign E", "banner", 35, 8 """ def __init__( self, input_table: str, group_by: str | list[str] | tuple[str, ...], aggregations: dict[str, Agg], output_table: str = None, sort=False, ): """ :param input_table: Input table. :param group_by: Column name or list of columns to group by. :param aggregations: Dictionary of aggregations. Key is name of the column and value is either name of the aggregation, or a callable. See documentation above for explanation. :param output_table: Name of the output table. :param sort: Set to ``True`` if you would like to sort the result by the columns used for grouping. """ for col, agg_func in aggregations.items(): if not isinstance(agg_func, str) and not callable(agg_func): raise ValueError(f'Aggregation must be either a string or callable, {type(agg_func)} given for {col}.') self.input_table = input_table self.group_by = group_by self.aggregations = aggregations if output_table is not None: self.output_table = output_table else: self.output_table = input_table self.sort = sort def execute(self, task_ctx: TaskContextInterface) -> StepPerformance: table = task_ctx.work_set.get_table(self.input_table) rows_in = len(table.index) aggs = OrderedDict() for col, agg_func in self.aggregations.items(): if isinstance(agg_func, str): aggs[col] = agg_func elif callable(agg_func): aggs[col] = agg_func(table) grouped = table.groupby(by=self.group_by, sort=self.sort, as_index=False) new_table = grouped.agg(aggs) if self.output_table in task_ctx.work_set: task_ctx.work_set.delete_table(self.output_table) task_ctx.work_set.set_table(self.output_table, new_table) rows_out = len(new_table.index) return StepPerformance(table, rows_in=rows_in, rows_out=rows_out)