from collections.abc import Callable
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib.tasks import TaskContextInterface, StepPerformance
import inspect
import pandas
import numpy
[docs]
class ComputeStep(AbstractStep):
"""
Computes one or more columns and add them to the table.
Columns are given as a dictionary, where key is a name of a new column and value is an expression.
Expression can be either callable, or string with an expression for :py:func:`pandas.eval`. Syntax is briefly
described at :ref:`pandas:enhancingperf.eval`. Please note that calllable is strongly preferred.
The callable receives a :py:class:`pandas.DataFrame` as the first positional argument. If the callable also contains
the ``task_ctx`` argument, it will receive an :py:class:`ppc_robot_lib.tasks.task_context.TaskContextInterface`
as a keyword argument.
The callable should return a value that is directly assignable to the column: :py:class:`pandas.Series` or a single
scalar value.
**Example:**
>>> from ppc_robot_lib.steps.transformations import ComputeStep
>>> ComputeStep("table", {
... 'Ctr1': lambda df: df['Clicks'] / df['Impressions'],
... 'Ctr2': 'Clicks / Impressions',
... })
"""
def __init__(self, table: str, columns: dict[str, Callable[[pandas.DataFrame], pandas.Series]]):
"""
:param table: Table to use.
:param columns: Dictionary, where key is a name of a new column and value is an expression.
"""
for new_col, expr in columns.items():
if not callable(expr) and not isinstance(expr, str):
raise ValueError(
f'Invalid expression for column {new_col}: must be a callable or a string (expression for eval), '
f'{type(expr)} given'
)
self.table = table
self.columns = columns
def execute(self, task_ctx: TaskContextInterface) -> StepPerformance:
table = task_ctx.work_set.get_table(self.table)
for new_column, expr in self.columns.items():
if callable(expr):
signature = inspect.signature(expr)
if 'task_ctx' in signature.parameters:
result = expr(table, task_ctx=task_ctx)
else:
result = expr(table)
if result is not None and not numpy.isscalar(result) and len(result) == 0:
result = None
table[new_column] = result
elif isinstance(expr, str):
table.eval(f'{new_column} = {expr}', inplace=True)
task_ctx.work_set.delete_table(self.table)
task_ctx.work_set.set_table(self.table, table)
rows = len(table.index)
return StepPerformance(table, rows_in=rows, rows_out=rows)
def get_label_args(self):
return set(self.columns.keys())