Source code for ppc_robot_lib.reporting.persistence.job_storage

import logging
import pickle
from typing import Any

import pandas

from ppc_robot_lib import tasks
from ppc_robot_lib.reporting.exceptions import ClientNotAvailableError
from ppc_robot_lib.reporting.input.account_info import get_client_id
from ppc_robot_lib.tasks import job_storage
from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist, StorageNotAvailableError


PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
PICKLE_PANDAS_COMPRESSION = None
PICKLE_MIME_TYPE = 'application/octet-stream'


logger = logging.getLogger(__name__)


[docs] def save_job_object(key: str, obj: Any) -> None: """ Saves a given object to persistent storage of the current job. The ``pickle` module is used to store the object, therefor the object to store must be `picklable`_. Example:: >>> obj = {'key': 'value'} >>> save_job_object('example', obj) .. _picklable: https://docs.python.org/3.6/library/pickle.html#what-can-be-pickled-and-unpickled :param key: Object key, can be any string that forms a valid file-system path. :param obj: Object to store. The object must be `picklable`_. """ storage = _get_job_storage() _save_object(storage, key, obj)
[docs] def save_job_dataframe(key: str, df: pandas.DataFrame) -> None: """ Saves a dataframe to persistent storage of the current job. Example:: >>> import pandas >>> df = pandas.DataFrame(data=[('a', 1), ('b', 2)], columns=['key', 'value']) >>> save_job_dataframe('example_df', df) :param key: Object key, can be any string that forms a valid file-system path. :param df: DataFrame to store. The DataFrame is stored as pickle object. """ storage = _get_job_storage() _save_dataframe(storage, key, df)
[docs] def load_job_object(key: str) -> Any: """ Loads a previously saved object from the persistent storage of the current job. Example:: >>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist >>> try: ... obj = load_job_object('example') ... except ObjectDoesNotExist: ... obj = {} # Create empty object. :param key: Key of the previously stored object. :return: Previously stored object. :raises ObjectDoesNotExist: if the object does not exist. """ storage = _get_job_storage() return _load_object(storage, key)
[docs] def load_job_dataframe(key: str) -> pandas.DataFrame: """ Loads a previously saved DataFrame from the persistent storage of the current job. Example:: >>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist >>> try: ... df = load_job_dataframe('example') ... except ObjectDoesNotExist: ... df = pandas.DataFrame(data=[], columns=['key', 'value']) # Create a new DataFrame :param key: Key of the previously stored DataFrame. :return: Previously stored DataFrame. :raises ObjectDoesNotExist: if the object does not exist. """ storage = _get_job_storage() return _load_dataframe(storage, key)
def save_client_object(key: str, obj: Any) -> None: """ Saves a given object to persistent storage of the current client. If the job is being executed for a client account, the client to which the account is linked to will be used. The ``pickle` module is used to store the object, therefor the object to store must be `picklable`_. Example:: >>> obj = {'key': 'value'} >>> save_client_object('example', obj) .. _picklable: https://docs.python.org/3.6/library/pickle.html#what-can-be-pickled-and-unpickled :param key: Object key, can be any string that forms a valid file-system path. :param obj: Object to store. The object must be `picklable`_. :raises StorageNotAvailableError: if the client storage is not available for the current job. """ storage = _get_client_storage() _save_object(storage, key, obj) def save_client_dataframe(key: str, df: pandas.DataFrame) -> None: """ Saves a dataframe to persistent storage of the current client. Example:: >>> import pandas >>> df = pandas.DataFrame(data=[('a', 1), ('b', 2)], columns=['key', 'value']) >>> save_client_dataframe('example_df', df) :param key: Object key, can be any string that forms a valid file-system path. :param df: DataFrame to store. The DataFrame is stored as pickle object. :raises StorageNotAvailableError: if the client storage is not available for the current job. """ storage = _get_client_storage() _save_dataframe(storage, key, df) def load_client_object(key: str) -> Any: """ Loads a previously saved object from the persistent storage of the current client. Example:: >>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist >>> try: ... obj = load_client_object('example') ... except ObjectDoesNotExist: ... obj = {} # Create empty object. :param key: Key of the previously stored object. :return: Previously stored object. :raises ObjectDoesNotExist: if the object does not exist. :raises StorageNotAvailableError: if the client storage is not available for the current job. """ storage = _get_client_storage() return _load_object(storage, key) def load_client_dataframe(key: str) -> pandas.DataFrame: """ Loads a previously saved DataFrame from the persistent storage of the current client. Example:: >>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist >>> try: ... df = load_job_dataframe('example') ... except ObjectDoesNotExist: ... df = pandas.DataFrame(data=[], columns=['key', 'value']) # Create a new DataFrame :param key: Key of the previously stored DataFrame. :return: Previously stored DataFrame. :raises ObjectDoesNotExist: if the object does not exist. :raises StorageNotAvailableError: if the client storage is not available for the current job. """ storage = _get_client_storage() return _load_dataframe(storage, key) def _get_job_storage() -> 'job_storage.ObjectNamespacedJobStorage': """ :return: Job storage for the current job. The current Job ID is retrieved from Task Context. """ task_ctx = tasks.get_context() job_id = task_ctx.job.id storage = job_storage.get_job_storage() return storage.get_storage_for_object(job_id) def _get_client_storage() -> 'job_storage.ObjectNamespacedJobStorage': """ :return: Job storage for client set for the current job. The current Client ID is retrieved from Task Context. """ try: client_id = get_client_id() except ClientNotAvailableError as exc: raise StorageNotAvailableError('Client-level storage is not available for the current job.') from exc storage = job_storage.get_client_job_storage() return storage.get_storage_for_object(client_id) def _save_object(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, obj: Any) -> None: """ Serializes a given object with pickle and stores in the given storage under the specified key. :param storage: Namespaced storage instance. :param key: Object key. :param obj: Picklable object to save. """ with _open_for_write(storage, key, PICKLE_MIME_TYPE) as stream: pickle.dump(obj, stream, PICKLE_PROTOCOL) def _load_object(storage: 'job_storage.ObjectNamespacedJobStorage', key: str) -> Any: """ Loads an object with specified key from given storage and deserializes it into a Python representation. :param storage: Namespaced storage instance. :param key: Object key. :return: The loaded object. """ with _open_for_read(storage, key) as stream: return pickle.load(stream) def _save_dataframe(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, df: pandas.DataFrame): """ Serializes a given dataframe and stores in the given storage under the specified key. :param storage: Namespaced storage instance. :param key: Object key. :param df: Dataframe to save. """ with _open_for_write(storage, key, PICKLE_MIME_TYPE) as stream: df.to_pickle(stream, compression=PICKLE_PANDAS_COMPRESSION, protocol=PICKLE_PROTOCOL) def _load_dataframe(storage: 'job_storage.ObjectNamespacedJobStorage', key: str) -> pandas.DataFrame: """ Loads a dataframe with specified key from the given storage and deserializes it. :param storage: Namespaced storage instance. :param key: Object key. :return: The loaded dataframe. """ with _open_for_read(storage, key) as stream: return pandas.read_pickle(stream, compression=PICKLE_PANDAS_COMPRESSION) def _open_for_read(storage: 'job_storage.ObjectNamespacedJobStorage', key: str): """ Opens the stream of a previously stored object for reading. :param key: Object key. :return: Stream opened for reading. :raises ObjectDoesNotExist: if the object does not exist. """ try: return storage.open_for_read(key) except ObjectDoesNotExist: logger.warning(f'Object with key {key!r} does not exist in storage for {storage.format_prefix()}') raise def _open_for_write(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, content_type: str | None = None): """ Opens the stream of the object. If the object does not exist, a new object will be created. If the object does exist, it will be overwritten. :param key: Object key. :param content_type: Content-Type of the object. Will be used by the Google Cloud Storage adapter. :return: Stream opened for writing. """ return storage.open_for_write(key, content_type)