Source code for ppc_robot_lib.steps.input.sklik_report_details_batch

from collections.abc import Callable, Iterable

from ppc_robot_lib.sklik import Query, SklikCredentials, ReportDownloader, Condition, Op
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib import tasks

import pandas
import numpy

from copy import deepcopy

from ppc_robot_lib.utils.iter import chunks


[docs] class SklikReportDetailsBatchStep(AbstractStep): """ Downloads additional details for a given table. This step takes a column from source table, takes the base query and adds a new IN[] condition for specified column (defaults to the source column name) to the query. Then it uses this query to fetch report data. The data is fetched in multiple batches and ten concatenated. Example:: >>> from ppc_robot_lib.sklik import Query, During >>> from ppc_robot_lib.steps.input import SklikReportDetailsBatchStep >>> base_query = Query( ... select=['Id', 'Name', 'Impressions', 'Clicks'], ... from_report='campaign', ... during=During.LAST_30_DAYS, ... ) >>> SklikReportDetailsBatchStep( ... source_table='Campaigns', ... source_column='CampaignId', ... base_query=base_query, ... cond_column='Id', ... output_table='campaign_data', ... batch_size=5000, ... ) """ def __init__( self, source_table: str, source_column: str, base_query: Query, output_table: str, cond_column: str = None, allow_empty_statistics=False, transformation: Callable[[Iterable], Iterable] = None, custom_columns: list[str] = None, batch_size: int = 10000, ): """ :param source_table: Name of the source table. :param source_column: Name of the source column - values of this column will be used for the IN[] condition. :param base_query: Base Query used to build a query for each batch. :param cond_column: Name of the column that will be used in the condition. Defaults to ``source_column``. :param output_table: Name of the output table. :param allow_empty_statistics: Should we include rows with zero impressions? :param transformation: Custom transformation function. The function receives an iterable (a generator, to be more precise, but this behaviour can change in the future) and must return another iterable, that can be passed to :py:class:`pandas.DataFrame` as the ``data`` argument. :param custom_columns: Custom column names for the output table. If no columns are given, names from the query are used. Useful if you use a custom transformation that can change the columns. :param batch_size: Number of values in the IN[] condition for each batch. """ self.source_table = source_table self.source_column = source_column self.base_query = base_query self.output_table = output_table self.cond_column = cond_column if cond_column is not None else source_column self.allow_empty_statistics = allow_empty_statistics self.transformation = transformation self.custom_columns = custom_columns self.batch_size = batch_size def execute(self, task_ctx: 'tasks.TaskContextInterface') -> tasks.StepPerformance: credentials: SklikCredentials = task_ctx.credentials downloader = ReportDownloader(credentials.get_client(task_ctx)) src_table = task_ctx.work_set.get_table(self.source_table) values: numpy.array = src_table[self.source_column].unique() values = values[pandas.notnull(values)] partial_tables = [] query = deepcopy(self.base_query) in_cond = Condition(self.cond_column, Op.EQ, []) query.add_condition(in_cond) if self.custom_columns: columns = self.custom_columns else: columns = query.get_select() for in_expr in chunks(values, self.batch_size): in_cond.value = list(in_expr) report = downloader.create_report_definition(query) records = downloader.download(report, self.allow_empty_statistics) # Apply transformation. if self.transformation: records = self.transformation(records) partial_tables.append(pandas.DataFrame(data=records, columns=columns)) if len(partial_tables) > 0: new_table = pandas.concat(partial_tables, ignore_index=True) else: new_table = pandas.DataFrame(data=[], columns=columns) task_ctx.work_set.set_table(self.output_table, new_table) rows_in = len(new_table.index) task_ctx.performance.add_input_rows(rows_in) return tasks.StepPerformance(new_table, rows_out=rows_in) def get_label_args(self): return [ self.source_table, self.source_table, str(self.base_query), self.cond_column, self.output_table, self.allow_empty_statistics, self.batch_size, ]