import contextvars
import functools
from concurrent.futures import ThreadPoolExecutor
from typing import Any
from collections.abc import Callable
import pandas
from ppc_robot_lib import accounts
from ppc_robot_lib import tasks
from ppc_robot_lib.models import ClientAccount
from ppc_robot_lib.reporting.transformation.concat import concat
from ppc_robot_lib.tasks.task_context import TargetObject
def run_for_each_client(
selector: 'accounts.ClientSelectorInterface', fn: Callable[[], Any], parallel_workers=None
) -> dict[Any, Any]:
task_ctx = tasks.get_context()
do_execute = functools.partial(_execute_for_client, fn, task_ctx)
clients = [client for client in selector.get_clients(task_ctx)]
client_ids = [client.ext_id for client in clients]
if parallel_workers is not None and parallel_workers > 1:
with ThreadPoolExecutor(max_workers=parallel_workers) as executor:
results = {client_id: result for client_id, result in zip(client_ids, executor.map(do_execute, clients))}
else:
results = {client_id: result for client_id, result in zip(client_ids, map(do_execute, clients))}
return results
def _execute_for_client(fn: Callable[[], Any], task_ctx: 'tasks.TaskContext', client: ClientAccount) -> Any:
target_object = TargetObject.create_for_client_account(client)
derived_ctx = task_ctx.create_derived_ctx(target_object)
execution_ctx = contextvars.copy_context()
return execution_ctx.run(
_execute_with_ctx,
fn=fn,
task_ctx=derived_ctx,
)
def _execute_with_ctx(fn: Callable[[], Any], task_ctx: 'tasks.TaskContextInterface') -> Any:
tasks.context.set(task_ctx)
return fn()
[docs]
def for_each_client_union(
selector: 'accounts.ClientSelectorInterface', fn: Callable[[], pandas.DataFrame], parallel_workers=None
) -> pandas.DataFrame:
"""
Executes the given ``fn`` for each client account specified by the ``selector``. The function must
return a DataFrame. These DataFrames are then collected and concatenated into one DataFrame and returned.
``selector`` can be instance of the following classes:
* :py:class:`ppc_robot_lib.client_selector.PredefinedClientSelector`: iterates over a given set of Client IDs.
Useful when the user has to select for which accounts the report should run.
Example::
>>> import pandas
>>> from ppc_robot_lib import tasks
>>> from ppc_robot_lib.reporting.control import for_each_client_union
>>> from ppc_robot_lib.accounts.client_selectors.selected import SelectedClientSelector
>>> # Client IDs were selected by user.
>>> selector = SelectedClientsSelector({'accounts': ['111', '222', '333']})
>>> def client_fn():
... ctx = tasks.get_context()
... return pandas.DataFrame(columns=['ExternalCustomerId'], data=[(ctx.client_id,)])
>>> for_each_client_union(selector, client_fn)
When executed, the function will return the following results:
+--------------------+
| ExternalCustomerId |
+--------------------+
| 111 |
| 222 |
| 333 |
+--------------------+
"""
results = run_for_each_client(selector, fn, parallel_workers)
if len(results) > 0:
new_table = concat(*results.values())
else:
new_table = pandas.DataFrame()
return new_table