from collections.abc import Callable, Iterable
from ppc_robot_lib.sklik import Query, SklikCredentials, ReportDownloader, Condition, Op
from ppc_robot_lib.steps import AbstractStep
from ppc_robot_lib import tasks
import pandas
import numpy
from copy import deepcopy
from ppc_robot_lib.utils.iter import chunks
[docs]
class SklikReportDetailsBatchStep(AbstractStep):
"""
Downloads additional details for a given table. This step takes a column from source table, takes the base
query and adds a new IN[] condition for specified column (defaults to the source column name) to the query.
Then it uses this query to fetch report data. The data is fetched in multiple batches and ten concatenated.
Example::
>>> from ppc_robot_lib.sklik import Query, During
>>> from ppc_robot_lib.steps.input import SklikReportDetailsBatchStep
>>> base_query = Query(
... select=['Id', 'Name', 'Impressions', 'Clicks'],
... from_report='campaign',
... during=During.LAST_30_DAYS,
... )
>>> SklikReportDetailsBatchStep(
... source_table='Campaigns',
... source_column='CampaignId',
... base_query=base_query,
... cond_column='Id',
... output_table='campaign_data',
... batch_size=5000,
... )
"""
def __init__(
self,
source_table: str,
source_column: str,
base_query: Query,
output_table: str,
cond_column: str = None,
allow_empty_statistics=False,
transformation: Callable[[Iterable], Iterable] = None,
custom_columns: list[str] = None,
batch_size: int = 10000,
):
"""
:param source_table: Name of the source table.
:param source_column: Name of the source column - values of this column will be used for the IN[] condition.
:param base_query: Base Query used to build a query for each batch.
:param cond_column: Name of the column that will be used in the condition. Defaults to ``source_column``.
:param output_table: Name of the output table.
:param allow_empty_statistics: Should we include rows with zero impressions?
:param transformation: Custom transformation function. The function receives an iterable (a generator, to be
more precise, but this behaviour can change in the future) and must return another iterable, that
can be passed to :py:class:`pandas.DataFrame` as the ``data`` argument.
:param custom_columns: Custom column names for the output table. If no columns are given, names from the
query are used. Useful if you use a custom transformation that can change the columns.
:param batch_size: Number of values in the IN[] condition for each batch.
"""
self.source_table = source_table
self.source_column = source_column
self.base_query = base_query
self.output_table = output_table
self.cond_column = cond_column if cond_column is not None else source_column
self.allow_empty_statistics = allow_empty_statistics
self.transformation = transformation
self.custom_columns = custom_columns
self.batch_size = batch_size
def execute(self, task_ctx: 'tasks.TaskContextInterface') -> tasks.StepPerformance:
credentials: SklikCredentials = task_ctx.credentials
downloader = ReportDownloader(credentials.get_client(task_ctx))
src_table = task_ctx.work_set.get_table(self.source_table)
values: numpy.array = src_table[self.source_column].unique()
values = values[pandas.notnull(values)]
partial_tables = []
query = deepcopy(self.base_query)
in_cond = Condition(self.cond_column, Op.EQ, [])
query.add_condition(in_cond)
if self.custom_columns:
columns = self.custom_columns
else:
columns = query.get_select()
for in_expr in chunks(values, self.batch_size):
in_cond.value = list(in_expr)
report = downloader.create_report_definition(query)
records = downloader.download(report, self.allow_empty_statistics)
# Apply transformation.
if self.transformation:
records = self.transformation(records)
partial_tables.append(pandas.DataFrame(data=records, columns=columns))
if len(partial_tables) > 0:
new_table = pandas.concat(partial_tables, ignore_index=True)
else:
new_table = pandas.DataFrame(data=[], columns=columns)
task_ctx.work_set.set_table(self.output_table, new_table)
rows_in = len(new_table.index)
task_ctx.performance.add_input_rows(rows_in)
return tasks.StepPerformance(new_table, rows_out=rows_in)
def get_label_args(self):
return [
self.source_table,
self.source_table,
str(self.base_query),
self.cond_column,
self.output_table,
self.allow_empty_statistics,
self.batch_size,
]