from collections import OrderedDict
from typing import Any
from collections.abc import Callable
import pandas
from ppc_robot_lib.steps.abstract_step import AbstractStep
from ppc_robot_lib.tasks import StepPerformance, TaskContextInterface
Agg = str | Callable[[pandas.DataFrame], Callable[[pandas.Series], Any]]
[docs]
class GroupByAndAggregateStep(AbstractStep):
"""
Groups the given table by one or more columns and performs aggregations on the columns.
The ``columns`` argument is a name of the column or a list of columns that should be used for the group-by
operation.
The ``aggregations`` is a dictionary, key is a name of the column, value is either a name of the aggregation
(method of :py:class:`pandas.core.groupby.DataFrameGroupBy`, see
`Pandas GroupBy Documentation <https://pandas.pydata.org/pandas-docs/stable/api.html#groupby>`_ for list),
or a callable.
If you use callable, the callable must return another callable. The outer callable will receive
a :py:class:`pandas.DataFrame`, the inner will receive only a column slice to aggregate. You can call methods
like ``sum``, ``avg`` on the slice, or use it in functions provided by numpy.
This is useful when you need, for instance, a weighted average.
Please note that columns that are not present in neither ``columns`` or ``aggregations`` will be not present
in the output table.
Unlike :py:class:`ppc_robot_lib.steps.transformations.aggregate_by_column.AggregateByColumnStep` and
:py:class:`ppc_robot_lib.steps.transformations.group_by_column.GroupByColumnStep`, this
step returns a :py:class:`pandas.DataFrame`, not a :py:class:`pandas.core.groupby.DataFrameGroupBy`.
**Example:**
Assume we have the following table:
.. csv-table:: in_table
:header: "Campaign", "AdGroup", "Tag", "Impressions", "Quality"
:widths: 20, 20, 20, 20, 20
"Campaign A", "Group 1", "product", 10, 1
"Campaign A", "Group 2", "product", 20, 10
"Campaign B", "Group 1", "remarketing", 14, 7
"Campaign B", "Group 2", "remarketing", 36, 5
"Campaign B", "Group 3", "remarketing", 52, 10
"Campaign C", "Group 1", "dsa", 14, 5
"Campaign D", "Group 1", "product", 0, 10
"Campaign D", "Group 2", "product", 10, 3
"Campaign E", "Group 1", "banner", 35, 8
We can use the following code to get weighted quality score::
>>> import numpy
>>> from ppc_robot_lib.steps.transformations import GroupByAndAggregateStep
>>> def agg_weighted_avg(df: pandas.DataFrame):
... return lambda quality: numpy.average(quality, weights=df.loc[quality.index, 'Impressions'])
>>> GroupByAndAggregateStep("in_table", "Campaign", {
... "Tag": "first",
... "Impressions": "sum",
... "Quality": agg_weighted_avg,
... })
.. note::
The aggregation function receives only a slice of the ``Quality`` column in the argument ``quality``.
However, we can use ``quality.index`` to access the corresponding columns in dataframe ``df``. Expression ::
df.loc[quality.index, 'Impressions']
will get only the related values in the ``Impressions`` columns from the original, non-grouped dataframe.
The resulting table:
.. csv-table:: in_table
:header: "Campaign", "Tag", "Impressions", "Quality"
:widths: 20, 20, 20, 20
"Campaign A", "product", 30, 7
"Campaign B", "remarketing", 102, "7.8235..."
"Campaign C", "dsa", 14, 5
"Campaign D", "product", 10, 3
"Campaign E", "banner", 35, 8
"""
def __init__(
self,
input_table: str,
group_by: str | list[str] | tuple[str, ...],
aggregations: dict[str, Agg],
output_table: str = None,
sort=False,
):
"""
:param input_table: Input table.
:param group_by: Column name or list of columns to group by.
:param aggregations: Dictionary of aggregations. Key is name of the column and value is either name
of the aggregation, or a callable. See documentation above for explanation.
:param output_table: Name of the output table.
:param sort: Set to ``True`` if you would like to sort the result by the columns used for grouping.
"""
for col, agg_func in aggregations.items():
if not isinstance(agg_func, str) and not callable(agg_func):
raise ValueError(f'Aggregation must be either a string or callable, {type(agg_func)} given for {col}.')
self.input_table = input_table
self.group_by = group_by
self.aggregations = aggregations
if output_table is not None:
self.output_table = output_table
else:
self.output_table = input_table
self.sort = sort
def execute(self, task_ctx: TaskContextInterface) -> StepPerformance:
table = task_ctx.work_set.get_table(self.input_table)
rows_in = len(table.index)
aggs = OrderedDict()
for col, agg_func in self.aggregations.items():
if isinstance(agg_func, str):
aggs[col] = agg_func
elif callable(agg_func):
aggs[col] = agg_func(table)
grouped = table.groupby(by=self.group_by, sort=self.sort, as_index=False)
new_table = grouped.agg(aggs)
if self.output_table in task_ctx.work_set:
task_ctx.work_set.delete_table(self.output_table)
task_ctx.work_set.set_table(self.output_table, new_table)
rows_out = len(new_table.index)
return StepPerformance(table, rows_in=rows_in, rows_out=rows_out)