import json
import numpy
import datetime
import threading
import decimal
from ppc_robot_lib.utils import types
ENCODE_MAP = {
numpy.integer: lambda x: int(x) if numpy.isfinite(x) else None,
numpy.int16: lambda x: int(x) if numpy.isfinite(x) else None,
numpy.int32: lambda x: int(x) if numpy.isfinite(x) else None,
numpy.int64: lambda x: int(x) if numpy.isfinite(x) else None,
numpy.floating: lambda x: float(x) if numpy.isfinite(x) else None,
numpy.ndarray: lambda x: x.tolist(),
datetime.date: lambda x: x.strftime('%Y-%m-%d'),
datetime.datetime: lambda x: x.strftime('%Y-%m-%dT%H:%M:%S'),
decimal.Decimal: lambda x: float(x) if x.is_finite() else None,
types.ResolvedValue: lambda x: x.get_value(),
}
[docs]
class JSONEncoder(json.JSONEncoder):
"""
JSON encoder with support for numpy types, :py:class:``decimal.Decimal``, :py:class:``datetime.date``, and
:py:class:``datetime.datetime``.
"""
[docs]
def default(self, obj):
tp = type(obj)
if tp in ENCODE_MAP:
return ENCODE_MAP[tp](obj)
else:
return super().default(obj)
encoder = JSONEncoder()
_tls = threading.local()
[docs]
def get_encoder() -> JSONEncoder:
"""
Gets a JSON encoder with numpy support.
:return: JSON ecnoder instance.
"""
if hasattr(_tls, 'encoder'):
return _tls.encoder
else:
enc = JSONEncoder()
_tls.encoder = enc
return enc
[docs]
def get_decoder() -> json.JSONDecoder:
"""
Gets a default JSON decoder. In the future, this might be replaced with a decoder implementation
that provides additional processing.
:return: JSON decoder instance.
"""
if hasattr(_tls, 'decoder'):
return _tls.decoder
else:
dec = json.JSONDecoder()
_tls.decoder = dec
return dec