import logging
import pickle
from typing import Any
import pandas
from ppc_robot_lib import tasks
from ppc_robot_lib.reporting.exceptions import ClientNotAvailableError
from ppc_robot_lib.reporting.input.account_info import get_client_id
from ppc_robot_lib.tasks import job_storage
from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist, StorageNotAvailableError
PICKLE_PROTOCOL = pickle.HIGHEST_PROTOCOL
PICKLE_PANDAS_COMPRESSION = None
PICKLE_MIME_TYPE = 'application/octet-stream'
logger = logging.getLogger(__name__)
[docs]
def save_job_object(key: str, obj: Any) -> None:
"""
Saves a given object to persistent storage of the current job.
The ``pickle` module is used to store the object, therefor the object to store must be `picklable`_.
Example::
>>> obj = {'key': 'value'}
>>> save_job_object('example', obj)
.. _picklable: https://docs.python.org/3.6/library/pickle.html#what-can-be-pickled-and-unpickled
:param key: Object key, can be any string that forms a valid file-system path.
:param obj: Object to store. The object must be `picklable`_.
"""
storage = _get_job_storage()
_save_object(storage, key, obj)
[docs]
def save_job_dataframe(key: str, df: pandas.DataFrame) -> None:
"""
Saves a dataframe to persistent storage of the current job.
Example::
>>> import pandas
>>> df = pandas.DataFrame(data=[('a', 1), ('b', 2)], columns=['key', 'value'])
>>> save_job_dataframe('example_df', df)
:param key: Object key, can be any string that forms a valid file-system path.
:param df: DataFrame to store. The DataFrame is stored as pickle object.
"""
storage = _get_job_storage()
_save_dataframe(storage, key, df)
[docs]
def load_job_object(key: str) -> Any:
"""
Loads a previously saved object from the persistent storage of the current job.
Example::
>>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist
>>> try:
... obj = load_job_object('example')
... except ObjectDoesNotExist:
... obj = {} # Create empty object.
:param key: Key of the previously stored object.
:return: Previously stored object.
:raises ObjectDoesNotExist: if the object does not exist.
"""
storage = _get_job_storage()
return _load_object(storage, key)
[docs]
def load_job_dataframe(key: str) -> pandas.DataFrame:
"""
Loads a previously saved DataFrame from the persistent storage of the current job.
Example::
>>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist
>>> try:
... df = load_job_dataframe('example')
... except ObjectDoesNotExist:
... df = pandas.DataFrame(data=[], columns=['key', 'value']) # Create a new DataFrame
:param key: Key of the previously stored DataFrame.
:return: Previously stored DataFrame.
:raises ObjectDoesNotExist: if the object does not exist.
"""
storage = _get_job_storage()
return _load_dataframe(storage, key)
def save_client_object(key: str, obj: Any) -> None:
"""
Saves a given object to persistent storage of the current client.
If the job is being executed for a client account, the client to which the account is linked to will be used.
The ``pickle` module is used to store the object, therefor the object to store must be `picklable`_.
Example::
>>> obj = {'key': 'value'}
>>> save_client_object('example', obj)
.. _picklable: https://docs.python.org/3.6/library/pickle.html#what-can-be-pickled-and-unpickled
:param key: Object key, can be any string that forms a valid file-system path.
:param obj: Object to store. The object must be `picklable`_.
:raises StorageNotAvailableError: if the client storage is not available for the current job.
"""
storage = _get_client_storage()
_save_object(storage, key, obj)
def save_client_dataframe(key: str, df: pandas.DataFrame) -> None:
"""
Saves a dataframe to persistent storage of the current client.
Example::
>>> import pandas
>>> df = pandas.DataFrame(data=[('a', 1), ('b', 2)], columns=['key', 'value'])
>>> save_client_dataframe('example_df', df)
:param key: Object key, can be any string that forms a valid file-system path.
:param df: DataFrame to store. The DataFrame is stored as pickle object.
:raises StorageNotAvailableError: if the client storage is not available for the current job.
"""
storage = _get_client_storage()
_save_dataframe(storage, key, df)
def load_client_object(key: str) -> Any:
"""
Loads a previously saved object from the persistent storage of the current client.
Example::
>>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist
>>> try:
... obj = load_client_object('example')
... except ObjectDoesNotExist:
... obj = {} # Create empty object.
:param key: Key of the previously stored object.
:return: Previously stored object.
:raises ObjectDoesNotExist: if the object does not exist.
:raises StorageNotAvailableError: if the client storage is not available for the current job.
"""
storage = _get_client_storage()
return _load_object(storage, key)
def load_client_dataframe(key: str) -> pandas.DataFrame:
"""
Loads a previously saved DataFrame from the persistent storage of the current client.
Example::
>>> from ppc_robot_lib.tasks.job_storage.exceptions import ObjectDoesNotExist
>>> try:
... df = load_job_dataframe('example')
... except ObjectDoesNotExist:
... df = pandas.DataFrame(data=[], columns=['key', 'value']) # Create a new DataFrame
:param key: Key of the previously stored DataFrame.
:return: Previously stored DataFrame.
:raises ObjectDoesNotExist: if the object does not exist.
:raises StorageNotAvailableError: if the client storage is not available for the current job.
"""
storage = _get_client_storage()
return _load_dataframe(storage, key)
def _get_job_storage() -> 'job_storage.ObjectNamespacedJobStorage':
"""
:return: Job storage for the current job. The current Job ID is retrieved from Task Context.
"""
task_ctx = tasks.get_context()
job_id = task_ctx.job.id
storage = job_storage.get_job_storage()
return storage.get_storage_for_object(job_id)
def _get_client_storage() -> 'job_storage.ObjectNamespacedJobStorage':
"""
:return: Job storage for client set for the current job. The current Client ID is retrieved from Task Context.
"""
try:
client_id = get_client_id()
except ClientNotAvailableError as exc:
raise StorageNotAvailableError('Client-level storage is not available for the current job.') from exc
storage = job_storage.get_client_job_storage()
return storage.get_storage_for_object(client_id)
def _save_object(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, obj: Any) -> None:
"""
Serializes a given object with pickle and stores in the given storage under the specified key.
:param storage: Namespaced storage instance.
:param key: Object key.
:param obj: Picklable object to save.
"""
with _open_for_write(storage, key, PICKLE_MIME_TYPE) as stream:
pickle.dump(obj, stream, PICKLE_PROTOCOL)
def _load_object(storage: 'job_storage.ObjectNamespacedJobStorage', key: str) -> Any:
"""
Loads an object with specified key from given storage and deserializes it into a Python representation.
:param storage: Namespaced storage instance.
:param key: Object key.
:return: The loaded object.
"""
with _open_for_read(storage, key) as stream:
return pickle.load(stream)
def _save_dataframe(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, df: pandas.DataFrame):
"""
Serializes a given dataframe and stores in the given storage under the specified key.
:param storage: Namespaced storage instance.
:param key: Object key.
:param df: Dataframe to save.
"""
with _open_for_write(storage, key, PICKLE_MIME_TYPE) as stream:
df.to_pickle(stream, compression=PICKLE_PANDAS_COMPRESSION, protocol=PICKLE_PROTOCOL)
def _load_dataframe(storage: 'job_storage.ObjectNamespacedJobStorage', key: str) -> pandas.DataFrame:
"""
Loads a dataframe with specified key from the given storage and deserializes it.
:param storage: Namespaced storage instance.
:param key: Object key.
:return: The loaded dataframe.
"""
with _open_for_read(storage, key) as stream:
return pandas.read_pickle(stream, compression=PICKLE_PANDAS_COMPRESSION)
def _open_for_read(storage: 'job_storage.ObjectNamespacedJobStorage', key: str):
"""
Opens the stream of a previously stored object for reading.
:param key: Object key.
:return: Stream opened for reading.
:raises ObjectDoesNotExist: if the object does not exist.
"""
try:
return storage.open_for_read(key)
except ObjectDoesNotExist:
logger.warning(f'Object with key {key!r} does not exist in storage for {storage.format_prefix()}')
raise
def _open_for_write(storage: 'job_storage.ObjectNamespacedJobStorage', key: str, content_type: str | None = None):
"""
Opens the stream of the object. If the object does not exist, a new object will be created.
If the object does exist, it will be overwritten.
:param key: Object key.
:param content_type: Content-Type of the object. Will be used by the Google Cloud Storage adapter.
:return: Stream opened for writing.
"""
return storage.open_for_write(key, content_type)