Source code for ppc_robot_lib.reporting.control.for_each_client

import contextvars
import functools
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from collections.abc import Callable

import pandas

from ppc_robot_lib import accounts
from ppc_robot_lib import tasks
from ppc_robot_lib.models import ClientAccount
from ppc_robot_lib.reporting.transformation.concat import concat
from ppc_robot_lib.tasks.task_context import TargetObject


def run_for_each_client(
    selector: 'accounts.ClientSelectorInterface', fn: Callable[[], Any], parallel_workers=None
) -> dict[Any, Any]:
    task_ctx = tasks.get_context()

    do_execute = functools.partial(_execute_for_client, fn, task_ctx)

    clients = [client for client in selector.get_clients(task_ctx)]
    client_ids = [client.ext_id for client in clients]

    if parallel_workers is not None and parallel_workers > 1:
        with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
            results = {client_id: result for client_id, result in zip(client_ids, executor.map(do_execute, clients))}
    else:
        results = {client_id: result for client_id, result in zip(client_ids, map(do_execute, clients))}

    return results


def _execute_for_client(fn: Callable[[], Any], task_ctx: 'tasks.TaskContext', client: ClientAccount) -> Any:
    target_object = TargetObject.create_for_client_account(client)
    derived_ctx = task_ctx.create_derived_ctx(target_object)
    execution_ctx = contextvars.copy_context()
    return execution_ctx.run(
        _execute_with_ctx,
        fn=fn,
        task_ctx=derived_ctx,
    )


def _execute_with_ctx(fn: Callable[[], Any], task_ctx: 'tasks.TaskContextInterface') -> Any:
    tasks.context.set(task_ctx)
    return fn()


[docs] def for_each_client_union( selector: 'accounts.ClientSelectorInterface', fn: Callable[[], pandas.DataFrame], parallel_workers=None ) -> pandas.DataFrame: """ Executes the given ``fn`` for each client account specified by the ``selector``. The function must return a DataFrame. These DataFrames are then collected and concatenated into one DataFrame and returned. ``selector`` can be instance of the following classes: * :py:class:`ppc_robot_lib.client_selector.PredefinedClientSelector`: iterates over a given set of Client IDs. Useful when the user has to select for which accounts the report should run. Example:: >>> import pandas >>> from ppc_robot_lib import tasks >>> from ppc_robot_lib.reporting.control import for_each_client_union >>> from ppc_robot_lib.accounts.client_selectors.selected import SelectedClientSelector >>> # Client IDs were selected by user. >>> selector = SelectedClientsSelector({'accounts': ['111', '222', '333']}) >>> def client_fn(): ... ctx = tasks.get_context() ... return pandas.DataFrame(columns=['ExternalCustomerId'], data=[(ctx.client_id,)]) >>> for_each_client_union(selector, client_fn) When executed, the function will return the following results: +--------------------+ | ExternalCustomerId | +--------------------+ | 111 | | 222 | | 333 | +--------------------+ """ results = run_for_each_client(selector, fn, parallel_workers) if len(results) > 0: new_table = concat(*results.values()) else: new_table = pandas.DataFrame() return new_table