from collections import OrderedDict
from typing import Any
from collections.abc import Callable
from ppc_robot_lib import tasks
from ppc_robot_lib import sklik
from ppc_robot_lib.reporting.transformation.compute import compute
from ppc_robot_lib.reporting.transformation.filtering import filter_table
from ppc_robot_lib.reporting.transformation.joining import join
from ppc_robot_lib.sklik import factories
from ppc_robot_lib.reporting.input.sklik import download_sklik_report
import pandas
from ppc_robot_lib.utils.types import BehaviourType, JoinType
[docs]
def download_sklik_custom_report(
custom_report: type[sklik.CustomReport],
columns: list[dict[str, Any]],
filters: list[dict[str, Any]],
during: sklik.DuringClause,
allow_empty_statistics: bool = False,
) -> pandas.DataFrame:
task_ctx = tasks.get_context()
row_header = custom_report.get_row_header()
column_names = [col['field'] for col in columns]
for header_col in reversed(row_header):
if header_col not in column_names:
column_names.insert(0, header_col)
attributes: list[tuple[str, sklik.Field]] = []
segments: list[tuple[str, sklik.Field]] = []
metrics: list[tuple[str, sklik.Field]] = []
computed_fields: list[tuple[str, sklik.ComputedField]] = []
fixed_range_fields: dict[sklik.DuringClause, list[tuple[str, sklik.FixedRangeMetricsField]]] = {}
virtual_segments: list[tuple[str, sklik.VirtualDateSegment]] = []
all_fields = set()
def add_field(report_field_name):
report_field = custom_report.all_fields[report_field_name]
all_fields.add(report_field_name)
field_record = (report_field_name, report_field)
if isinstance(report_field, sklik.Field):
if report_field.behaviour == BehaviourType.METRIC:
metrics.append(field_record)
elif report_field.behaviour == BehaviourType.ATTRIBUTE:
attributes.append(field_record)
else:
segments.append(field_record)
elif isinstance(report_field, sklik.ComputedField):
computed_fields.append(field_record)
elif isinstance(report_field, sklik.VirtualDateSegment):
virtual_segments.append(field_record)
elif isinstance(report_field, sklik.FixedRangeMetricsField):
date_range = report_field.date_range
if date_range not in fixed_range_fields:
fixed_range_fields[date_range] = []
fixed_range_fields[date_range].append(field_record)
return report_field
#
# Build list of fields.
#
for col in column_names:
add_field(col)
# Add Date field if required.
if virtual_segments and 'Date' not in all_fields:
add_field('Date')
#
# Determine the granularity and decide whether aggregation is required.
#
granularity, aggregate = factories.resolve_granularity(virtual_segments)
#
# Create filters.
#
query_conditions: list[tuple[BehaviourType, sklik.Condition]] = []
filter_functions: list[tuple[str, Callable[[pandas.DataFrame], bool | pandas.Series]]] = []
for filter_def in filters:
field_name = filter_def['field']
op = sklik.Op(filter_def['op'])
value = filter_def['value']
value_ref = filter_def.get('value_ref', None)
field = custom_report.all_fields[field_name]
if value is None:
value = factories.get_empty_value(field)
# Add referenced field to the list.
if value_ref is not None:
if value_ref not in all_fields:
add_field(value_ref)
if field.filterable:
if isinstance(field, sklik.Field) and not value_ref:
if not field.filterable_in_api or (aggregate and field.behaviour == BehaviourType.METRIC):
# Add field if not yet present in the list.
if field_name not in all_fields:
add_field(field_name)
filter_function = factories.create_filter_function(field_name, op, value)
filter_functions.append((field_name, filter_function))
else:
query_conditions.append((field.behaviour, sklik.Condition(field_name, op, value)))
elif isinstance(field, sklik.ComputedField) or value_ref:
# Add computed field if not yet present in the list.
if field_name not in all_fields:
add_field(field_name)
filter_function = factories.create_filter_function(field_name, op, value, value_ref)
filter_functions.append((field_name, filter_function))
else:
raise ValueError(f'Field {field_name} is not filterable!')
else:
raise ValueError(f'Field {field_name} is not filterable!')
#
# Add requirements of computed metrics to the list.
#
for field_name, comp_metric in computed_fields:
for requirement in comp_metric.requires:
if requirement not in all_fields:
add_field(requirement)
#
# Compute aggregation map
#
if not aggregate:
aggregations = None
post_agg_computed = None
else:
base_columns = [(name, field_def) for name, field_def in attributes if name not in row_header] + metrics
aggregations, post_agg_computed = factories.resolve_aggregation_map(base_columns, all_fields, add_field)
group_by = list(row_header) + [c for c, _ in segments if c != 'Date'] + [c for c, _ in virtual_segments]
#
# Fetch base report and perform aggregations.
#
base_columns = attributes + segments + metrics
base_query = sklik.Query(
select=[col for col, _ in base_columns],
from_report=custom_report.report_name,
where=[cond for _, cond in query_conditions],
during=during,
granularity=granularity,
)
task_ctx.work_set.context.update({
'report': custom_report,
'all_fields': all_fields,
'base_query': base_query,
})
base_df = download_sklik_report(base_query, allow_empty_statistics)
_derive_virtual_segments(base_df, virtual_segments)
base_df = _perform_aggregations(base_df, group_by, aggregations, post_agg_computed)
#
# Fetch and join fixed range fields.
#
if fixed_range_fields:
add_uid_column = not custom_report.get_row_header()
segment_query_conditions = [cond for behaviour, cond in query_conditions if behaviour != BehaviourType.METRIC]
select_columns = list(custom_report.get_row_header()) + [c for c, _ in segments if c != 'Date']
join_fields = select_columns.copy()
if add_uid_column:
join_fields.append('UID')
base_df['UID'] = task_ctx.client_id
for range_part, fields in fixed_range_fields.items():
range_query = sklik.Query(
select=select_columns + [d.field_name for _, d in fields],
from_report=custom_report.report_name,
where=segment_query_conditions,
during=range_part,
granularity=sklik.Granularity.TOTAL,
)
# Fetch report
fixed_range_df = download_sklik_report(range_query, allow_empty_statistics)
# Rename table and join it into the main table.
fixed_range_df.rename(columns={f.field_name: n for n, f in fields}, inplace=True)
if add_uid_column:
fixed_range_df['UID'] = task_ctx.client_id
base_df = join(base_df, fixed_range_df, on=join_fields, join_type=JoinType.LEFT, do_sort=False)
del fixed_range_df
#
# Create dependency tree of computed fields and compute them in correct order.
#
ordered_compute_fields = factories.get_ordered_computed_fields(computed_fields, custom_report)
# Compute metrics only if required.
if len(ordered_compute_fields) > 0:
compute(base_df, ordered_compute_fields)
#
# Perform in-memory filtering.
#
if len(filter_functions) > 0:
base_df = filter_table(base_df, [fn for n, fn in filter_functions])
# Clear the context
task_ctx.work_set.context.clear()
return base_df
def _derive_virtual_segments(base_df: pandas.DataFrame, virtual_segments: list[tuple[str, sklik.VirtualDateSegment]]):
def wrap_segment_formatter(formatter):
return lambda df: df['Date'].apply(formatter)
if virtual_segments:
computed_segments = OrderedDict()
for name, segment in virtual_segments:
computed_segments[name] = wrap_segment_formatter(segment.formatter)
compute(base_df, computed_segments)
def _perform_aggregations(base_df: pandas.DataFrame, group_by: list[str], aggregations, post_agg_computed):
if aggregations:
base_df = base_df.groupby(by=group_by, as_index=False, sort=False).agg(aggregations)
if post_agg_computed:
compute(base_df, post_agg_computed)
return base_df