Source code for ppc_robot_lib.reporting.transformation.joining

from collections.abc import Iterable

import numpy
import pandas
import logging

from ppc_robot_lib.utils.types import JoinType


logger = logging.getLogger(__name__)


JoinSetSpec = str | list[str]


[docs] def join( left_table: pandas.DataFrame, right_table: pandas.DataFrame, on: JoinSetSpec = None, left_on: JoinSetSpec = None, right_on: JoinSetSpec = None, join_type: JoinType = JoinType.INNER, do_sort: bool = True, ambiguous_column_suffixes=('', '_right'), left_index=False, right_index=False, ) -> pandas.DataFrame: """ Performs an SQL-like join on two tables. This step supports 4 types of joins: * :py:attr:`ppc_robot_lib.utils.types.JoinType.INNER` * :py:attr:`ppc_robot_lib.utils.types.JoinType.OUTER` * :py:attr:`ppc_robot_lib.utils.types.JoinType.LEFT` * :py:attr:`ppc_robot_lib.utils.types.JoinType.RIGHT` You can join either by columns with same name from both tables, or you can provide list of columns in both left and right table. The lists has to be of equal length. When performing the join, values of columns on corresponding indexes must be equal in order to match the rows. For details on joins in Pandas, see :ref:`pandas:merging.join`. **Example:** >>> from ppc_robot_lib.steps.transformations import JoinOnColumnStep, JoinType >>> JoinOnColumnStep("keywords", "adgroups", ... left_on=["CampaignName", "AdGroupName"], ... right_on=['CampaignName', "Name"], ... join_type=JoinType.INNER, ... output_table="keywords_with_adgroup_data") :param left_table: Left table. :param right_table: Right table. :param on: Column names that must match in both tables. Exclusive with ``left_on`` and ``right_on``. :param left_on: List of columns to match in the left table. Must be the same length with ``right_on``. Exclusive with ``on``. :param right_on: List of columns to match in the left table. Must be the same length with ``right_on``. Exclusive with ``on``. :param join_type: Type of the join. See :ref:`pandas:merging.join` for explanation of individual types. :param do_sort: Set to ``True`` if you would like to sort the tables by join columns. :param ambiguous_column_suffixes: Pair of column suffixes that is used when tables with conflicting column names are used. The first element is used for the left table, the second one for the right table. :param left_index: Use index from the left table when matching the values. :param right_index: Use index from the right table when matching the values. :return: Resulting table. """ # Validate the input parameters if (on is None) and ((left_on is None and left_index is False) or (right_on is None and right_index is False)): raise ValueError('Either on or both left_on/left_index and right_on/right_index must be specified.') elif on is not None and (left_on is not None or left_index is True or right_on is not None or right_index is True): raise ValueError('When on is specified, left_on/left_index and right_on/right_index must not be given.') # Fix object dtypes on empty tables. if len(left_table.index) == 0: _fix_object_dtypes(left_table, on, left_on, left_index, right_table, right_on, right_index) if len(right_table.index) == 0: _fix_object_dtypes(right_table, on, right_on, right_index, left_table, left_on, left_index) try: return pandas.merge( left_table, right_table, how=join_type.value, on=on, left_on=left_on, right_on=right_on, left_index=left_index, right_index=right_index, sort=do_sort, suffixes=ambiguous_column_suffixes, ) except ValueError: logger.exception( f'Error while merging tables.\n' f'left_table: \n{_format_table_info(left_table)}\n\n' f'right_table: \n{_format_table_info(right_table)}' ) raise
def _fix_object_dtypes( table: pandas.DataFrame, on: JoinSetSpec, table_on: JoinSetSpec, table_index: bool, foreign_table: pandas.DataFrame, foreign_on: JoinSetSpec, foreign_index: bool, ) -> None: if on: source_cols, source_cols_names = _get_columns(table, on) foreign_cols, _ = _get_columns(foreign_table, on) else: if table_on: source_cols, source_cols_names = _get_columns(table, table_on) elif table_index: source_cols = _get_index(table) source_cols_names = None else: raise ValueError('No join columns/indexes given!') if foreign_on: foreign_cols, _ = _get_columns(foreign_table, foreign_on) elif foreign_index: foreign_cols = _get_index(foreign_table) else: raise ValueError('No join columns/indexes given!') if len(source_cols) != len(foreign_cols): raise ValueError( 'Cannot fix columns on invalid join conditions - main table contains ' f'{len(source_cols)} columns, foreign table contains {len(foreign_cols)} columns.' ) modified = False for i, source_col in enumerate(source_cols): foreign_col = foreign_cols[i] if source_col.dtype == numpy.object_ and foreign_col.dtype != numpy.object_: modified = True source_cols[i] = source_col.astype(foreign_col.dtype) if modified: if source_cols_names is not None: for i, col_name in enumerate(source_cols_names): table[col_name] = source_cols[i] elif len(source_cols) == 1: new_index = source_cols[0] if not isinstance(new_index, pandas.Index): new_index = pandas.Index(new_index) table.index = new_index else: new_index = pandas.MultiIndex.from_arrays(source_cols) table.index = new_index def _get_columns(table: pandas.DataFrame, columns: str | list[str]) -> tuple[list, list[str]]: if not isinstance(columns, str) and isinstance(columns, Iterable): return [table[column] for column in columns], [column for column in columns] else: return [table[columns]], [columns] def _get_index(table: pandas.DataFrame): index = table.index return [index.get_level_values(i) for i in range(index.nlevels)] def _format_table_info(table: pandas.DataFrame): index_repr = '\n'.join(_format_index_dtype(table.index.get_level_values(i)) for i in range(table.index.nlevels)) columns_repr = str(table.dtypes) if columns_repr.endswith('\ndtype: object'): columns_repr = columns_repr[: -len('\ndtype: object')] return f'Row Count: {len(table)}\nIndex:\n{index_repr}\n\nColumns:\n{columns_repr}' def _format_index_dtype(index: pandas.Index): return f'{index.name}\t{index.dtype_str}'