Source code for ppc_robot_lib.reporting.input.sklik_custom

from collections import OrderedDict
from typing import Any
from collections.abc import Callable

from ppc_robot_lib import tasks
from ppc_robot_lib import sklik
from ppc_robot_lib.reporting.transformation.compute import compute
from ppc_robot_lib.reporting.transformation.filtering import filter_table
from ppc_robot_lib.reporting.transformation.joining import join
from ppc_robot_lib.sklik import factories
from ppc_robot_lib.reporting.input.sklik import download_sklik_report

import pandas

from ppc_robot_lib.utils.types import BehaviourType, JoinType


[docs] def download_sklik_custom_report( custom_report: type[sklik.CustomReport], columns: list[dict[str, Any]], filters: list[dict[str, Any]], during: sklik.DuringClause, allow_empty_statistics: bool = False, ) -> pandas.DataFrame: task_ctx = tasks.get_context() row_header = custom_report.get_row_header() column_names = [col['field'] for col in columns] for header_col in reversed(row_header): if header_col not in column_names: column_names.insert(0, header_col) attributes: list[tuple[str, sklik.Field]] = [] segments: list[tuple[str, sklik.Field]] = [] metrics: list[tuple[str, sklik.Field]] = [] computed_fields: list[tuple[str, sklik.ComputedField]] = [] fixed_range_fields: dict[sklik.DuringClause, list[tuple[str, sklik.FixedRangeMetricsField]]] = {} virtual_segments: list[tuple[str, sklik.VirtualDateSegment]] = [] all_fields = set() def add_field(report_field_name): report_field = custom_report.all_fields[report_field_name] all_fields.add(report_field_name) field_record = (report_field_name, report_field) if isinstance(report_field, sklik.Field): if report_field.behaviour == BehaviourType.METRIC: metrics.append(field_record) elif report_field.behaviour == BehaviourType.ATTRIBUTE: attributes.append(field_record) else: segments.append(field_record) elif isinstance(report_field, sklik.ComputedField): computed_fields.append(field_record) elif isinstance(report_field, sklik.VirtualDateSegment): virtual_segments.append(field_record) elif isinstance(report_field, sklik.FixedRangeMetricsField): date_range = report_field.date_range if date_range not in fixed_range_fields: fixed_range_fields[date_range] = [] fixed_range_fields[date_range].append(field_record) return report_field # # Build list of fields. # for col in column_names: add_field(col) # Add Date field if required. if virtual_segments and 'Date' not in all_fields: add_field('Date') # # Determine the granularity and decide whether aggregation is required. # granularity, aggregate = factories.resolve_granularity(virtual_segments) # # Create filters. # query_conditions: list[tuple[BehaviourType, sklik.Condition]] = [] filter_functions: list[tuple[str, Callable[[pandas.DataFrame], bool | pandas.Series]]] = [] for filter_def in filters: field_name = filter_def['field'] op = sklik.Op(filter_def['op']) value = filter_def['value'] value_ref = filter_def.get('value_ref', None) field = custom_report.all_fields[field_name] if value is None: value = factories.get_empty_value(field) # Add referenced field to the list. if value_ref is not None: if value_ref not in all_fields: add_field(value_ref) if field.filterable: if isinstance(field, sklik.Field) and not value_ref: if not field.filterable_in_api or (aggregate and field.behaviour == BehaviourType.METRIC): # Add field if not yet present in the list. if field_name not in all_fields: add_field(field_name) filter_function = factories.create_filter_function(field_name, op, value) filter_functions.append((field_name, filter_function)) else: query_conditions.append((field.behaviour, sklik.Condition(field_name, op, value))) elif isinstance(field, sklik.ComputedField) or value_ref: # Add computed field if not yet present in the list. if field_name not in all_fields: add_field(field_name) filter_function = factories.create_filter_function(field_name, op, value, value_ref) filter_functions.append((field_name, filter_function)) else: raise ValueError(f'Field {field_name} is not filterable!') else: raise ValueError(f'Field {field_name} is not filterable!') # # Add requirements of computed metrics to the list. # for field_name, comp_metric in computed_fields: for requirement in comp_metric.requires: if requirement not in all_fields: add_field(requirement) # # Compute aggregation map # if not aggregate: aggregations = None post_agg_computed = None else: base_columns = [(name, field_def) for name, field_def in attributes if name not in row_header] + metrics aggregations, post_agg_computed = factories.resolve_aggregation_map(base_columns, all_fields, add_field) group_by = list(row_header) + [c for c, _ in segments if c != 'Date'] + [c for c, _ in virtual_segments] # # Fetch base report and perform aggregations. # base_columns = attributes + segments + metrics base_query = sklik.Query( select=[col for col, _ in base_columns], from_report=custom_report.report_name, where=[cond for _, cond in query_conditions], during=during, granularity=granularity, ) task_ctx.work_set.context.update({ 'report': custom_report, 'all_fields': all_fields, 'base_query': base_query, }) base_df = download_sklik_report(base_query, allow_empty_statistics) _derive_virtual_segments(base_df, virtual_segments) base_df = _perform_aggregations(base_df, group_by, aggregations, post_agg_computed) # # Fetch and join fixed range fields. # if fixed_range_fields: add_uid_column = not custom_report.get_row_header() segment_query_conditions = [cond for behaviour, cond in query_conditions if behaviour != BehaviourType.METRIC] select_columns = list(custom_report.get_row_header()) + [c for c, _ in segments if c != 'Date'] join_fields = select_columns.copy() if add_uid_column: join_fields.append('UID') base_df['UID'] = task_ctx.client_id for range_part, fields in fixed_range_fields.items(): range_query = sklik.Query( select=select_columns + [d.field_name for _, d in fields], from_report=custom_report.report_name, where=segment_query_conditions, during=range_part, granularity=sklik.Granularity.TOTAL, ) # Fetch report fixed_range_df = download_sklik_report(range_query, allow_empty_statistics) # Rename table and join it into the main table. fixed_range_df.rename(columns={f.field_name: n for n, f in fields}, inplace=True) if add_uid_column: fixed_range_df['UID'] = task_ctx.client_id base_df = join(base_df, fixed_range_df, on=join_fields, join_type=JoinType.LEFT, do_sort=False) del fixed_range_df # # Create dependency tree of computed fields and compute them in correct order. # ordered_compute_fields = factories.get_ordered_computed_fields(computed_fields, custom_report) # Compute metrics only if required. if len(ordered_compute_fields) > 0: compute(base_df, ordered_compute_fields) # # Perform in-memory filtering. # if len(filter_functions) > 0: base_df = filter_table(base_df, [fn for n, fn in filter_functions]) # Clear the context task_ctx.work_set.context.clear() return base_df
def _derive_virtual_segments(base_df: pandas.DataFrame, virtual_segments: list[tuple[str, sklik.VirtualDateSegment]]): def wrap_segment_formatter(formatter): return lambda df: df['Date'].apply(formatter) if virtual_segments: computed_segments = OrderedDict() for name, segment in virtual_segments: computed_segments[name] = wrap_segment_formatter(segment.formatter) compute(base_df, computed_segments) def _perform_aggregations(base_df: pandas.DataFrame, group_by: list[str], aggregations, post_agg_computed): if aggregations: base_df = base_df.groupby(by=group_by, as_index=False, sort=False).agg(aggregations) if post_agg_computed: compute(base_df, post_agg_computed) return base_df