Source code for ppc_robot_lib.steps.transformations.split_by_column

from typing import Any
from collections.abc import Callable
import pandas
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib.tasks import TaskContextInterface, StepPerformance


[docs] class SplitByColumnStep(AbstractStep): """ Enables you to split one table to multiple tables according to value of a specified column. ``output_tables`` is a dictionary, key is the name of resulting table and value can be either: * Value of the column to match. * Callable recieving a single value and returing :py:class:`bool`. If the callable returns ``True``, the row will be inserted into the given table. * List of values or callables with the same semantics as in the previous step. The row iwill be inserted if at least one of the values or callables match. Please note that row order might not be preserved, as this operation internally uses group by and then operates on disjunct segments of table. **Example:** Let's assume we have the following table: .. csv-table:: in_table :header: "A", "X" :widths: 20, 20 "A1", 1 "A3", 3 "A2", 2 "A4", 4 "A7", 7 "A6", 6 "A5", 5 "A9", 9 "A0", 10 "A8", 8 "AA", 11 By executing step:: >>> from ppc_robot_lib.steps.transformations import SplitByColumnStep >>> SplitByColumnStep('in_table', 'X', { ... 'o1': [1, 2, 3, lambda x: 8 <= x <= 10], 'o2': lambda x: 4 <= x <= 7 ... }, default_table='default') We would get the following tables: .. csv-table:: o1 :header: "A", "X" :widths: 20, 20 "A1", 1 "A3", 3 "A2", 2 "A9", 9 "A0", 10 "A8", 8 .. csv-table:: o2 :header: "A", "X" :widths: 20, 20 "A4", 4 "A7", 7 "A6", 6 "A5", 5 .. csv-table:: default :header: "A", "X" :widths: 20, 20 "AA", 11 """ def __init__( self, table: str, column: str, output_tables: dict[str, Any | list[Any] | Callable[[Any], bool]], default_table: str = None, ): """ :param table: Input table. :param column: Column to use for split. :param output_tables: Dictionary of output tables. Key is name of the result table, value is either value to match, callable or list of values/callables to filter by. :param default_table: Table to be used when the value does not match any of the tables specified in the ``output_tables`` argument. If none is given, these rows are dropped. """ self.table = table self.column = column self.output_tables = output_tables self.default_table = default_table def execute(self, task_ctx: TaskContextInterface) -> StepPerformance: result_tables: dict[str, list[pandas.DataFrame]] = {} # Create map of value -> table and list of (callback, table). value_map = {} callable_list: list[tuple[Callable[[Any], bool], str]] = [] for table_name, val in self.output_tables.items(): result_tables[table_name] = [] if isinstance(val, list): for inner_val in val: if callable(inner_val): callable_list.append((inner_val, table_name)) else: value_map[inner_val] = table_name elif callable(val): callable_list.append((val, table_name)) else: value_map[val] = table_name # Create definition for the default table. if self.default_table is not None and self.default_table not in result_tables: result_tables[self.default_table] = [] table = task_ctx.work_set.get_table(self.table) rows_in = len(table.index) split_table = table.groupby(self.column, sort=False) for value, frame in split_table: # Check if the value can be mapped directly to table by a value map. if value in value_map: table_name = value_map[value] else: # Check if any of the callbacks in the list returns True. table_name = next((table for callable_fn, table in callable_list if callable_fn(value)), None) if table_name is None: table_name = self.default_table # Add data-frame part for merge, throw away if no table is matching # (please note that default table name is set above). if table_name is not None: result_tables[table_name].append(frame) rows_out = 0 # Create new tables. for new_table_name, frames in result_tables.items(): if len(frames) > 0: if len(frames) == 1: new_table = frames[0] else: new_table = pandas.concat(frames, copy=False) else: # Create empty table when no rows matched. new_table = pandas.DataFrame(columns=table.columns, data=None) if new_table_name in task_ctx.work_set: task_ctx.work_set.delete_table(new_table_name) rows_out += len(new_table.index) task_ctx.work_set.set_table(new_table_name, new_table) return StepPerformance(rows_in=rows_in, rows_out=rows_out)