Skip to content

JSON

JSON Serialization is the most common serialization format used in the KVDB library. It is a good choice for serializing data that will be sent over the network or stored in a file, and is customized to provide serialization of most data types and objects.

Additionally, it can be customized to use sub-libraries such as simdjson, orjson, ujson, and the standard json library.

References

JSON Encoder Utility

The following function is called to support the JSON serialization of objects:

Helper to serialize an object

PARAMETER DESCRIPTION
obj

the object to serialize

TYPE: kvdb.io.serializers.utils.SerializableObject

RETURNS DESCRIPTION
typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Dict[str, typing.Any]], typing.Any]

the serialized object in dict

typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Dict[str, typing.Any]], typing.Any]

{ "type": ..., "value": ...,

typing.Union[typing.Dict[str, typing.Any], typing.List[typing.Dict[str, typing.Any]], typing.Any]

}

Source code in kvdb/io/serializers/utils.py
def serialize_object(
    obj: SerializableObject,
    **kwargs
) -> Union[Dict[str, Any], List[Dict[str, Any]], Any]:
    """
    Helper to serialize an object

    Args:
        obj: the object to serialize

    Returns:
        the serialized object in dict
        {
            "__type__": ...,
            "value": ...,
        }
    """
    if obj is None: return None

    if isinstance(obj, BaseModel) or hasattr(obj, 'model_dump'):
        obj_class_name = register_object_class(obj)

        obj_value = obj.model_dump(mode = 'json', round_trip = True, **kwargs)
        # for k,v in obj_value.items():
        #     # if isinstance(v, BaseModel) or hasattr(v, 'model_dump'):
        #     #     obj_value[k] = serialize_object(v)
        #     if not is_primitive(v):
        #         obj_value[k] = serialize_object(v)
        # logger.info(f'Pydantic Serializing Object: |r|({type(obj)})|e| {str(obj_value)[:1000]}', colored = True)
        return {
            "__type__": "pydantic",
            "__class__": obj_class_name,
            "value": obj_value,
        }

    # Move this to the top before primitives
    if np is not None:
        # if isinstance(obj, (np.int_, np.intc, np.intp, np.int8, np.int16, np.int32, np.int64, np.uint8, np.uint16, np.uint32, np.uint64)):
        if isinstance(obj, np_int_types):
            obj_class_name = register_object_class(obj)
            return {
                "__type__": "numpy",
                "__class__": obj_class_name,
                "value": int(obj),
            }


        # if isinstance(obj, (np.float_, np.float16, np.float32, np.float64)):
        if isinstance(obj, np_float_types):
            obj_class_name = register_object_class(obj)
            return {
                "__type__": "numpy",
                "__class__": obj_class_name,
                "value": float(obj),
            }


    if is_primitive(obj, exclude_bytes = True):
        return obj

    if isinstance(obj, (list, tuple)):
        return [serialize_object(item) for item in obj]

    if isinstance(obj, dict):
        if "__type__" in obj: return obj
        return {key: serialize_object(value) for key, value in obj.items()}

    if isinstance(obj, (datetime.datetime, datetime.date, datetime.time)):
        return {
            "__type__": "datetime",
            "value": obj.isoformat(),
        }

    if isinstance(obj, datetime.timedelta):
        return {
            "__type__": "timedelta",
            "value": obj.total_seconds(),
        }

    if isinstance(obj, dataclasses.InitVar) or dataclasses.is_dataclass(obj):
        obj_class_name = register_object_class(obj)
        return {
            "__type__": "dataclass",
            "__class__": obj_class_name,
            "value": dataclasses.asdict(obj),
        }

    if hasattr(obj, 'as_posix'):
        obj_class_name = register_object_class(obj)
        return {
            "__type__": "path",
            "__class__": obj_class_name,
            "value": obj.as_posix(),
        }

    if isinstance(obj, (bytes, bytearray)):
        return {
            "__type__": "bytes",
            "value": obj.hex(),
        }

    if isinstance(obj, (set, frozenset)):
        return {
            "__type__": "set",
            "value": list(obj),
        }

    if isinstance(obj, Enum):
        obj_class_name = register_object_class(obj)
        return {
            "__type__": "enum",
            "__class__": obj_class_name,
            "value": obj.value,
        }

    if isinstance(obj, UUID):
        return {
            "__type__": "uuid",
            "value": str(obj),
        }

    if isinstance(obj, abc.ABC):
        logger.info(f'Pickle Serializing Object: |r|({type(obj)}) {str(obj)[:1000]}', colored = True)
        obj_bytes = default_pickle.dumps(obj)
        return {
            "__type__": "pickle",
            "value": obj_bytes.hex(),
        }


    if hasattr(obj, "numpy"):  # Checks for TF tensors without needing the import
        return {
            "__type__": "tensor",
            "value": obj.numpy().tolist(),
        }

    if hasattr(obj, 'tolist'): # Checks for torch tensors without importing
        return {
            "__type__": "tensor",
            "value": obj.tolist(),
        }

    # Try one shot encoding objects
    # with contextlib.suppress(Exception):

    try:
        logger.info(f'Pickle Serializing Object: |r|({type(obj)}) {str(obj)[:1000]}', colored = True)
        obj_bytes = default_pickle.dumps(obj)
        return {
            "__type__": "pickle",
            "value": obj_bytes.hex(),
        }
    except Exception as e:

        logger.info(f'Error Serializing Object: |r|({type(obj)}) {e}|e| {str(obj)[:1000]}', colored = True)

    raise TypeError(f"Cannot serialize object of type {type(obj)}")

API Reference

Bases: kvdb.io.serializers.base.BaseSerializer

Source code in kvdb/io/serializers/_json.py
def __init__(
    self, 
    jsonlib: Optional[Union[str, Any]] = None,
    compression: Optional[str] = None,
    compression_level: int | None = None, 
    encoding: str | None = None, 
    serialization_obj: Optional[Type[BaseModel]] = None,
    serialization_obj_kwargs: Optional[Dict[str, Any]] = None,
    disable_object_serialization: Optional[bool] = None,
    **kwargs
):
    super().__init__(compression, compression_level, encoding, **kwargs)
    self.serialization_obj = serialization_obj
    self.serialization_obj_kwargs = serialization_obj_kwargs or {}
    self.serialization_schemas: Dict[str, Type[BaseModel]] = {}
    if disable_object_serialization is not None:
        self.disable_object_serialization = disable_object_serialization
    if jsonlib is not None:
        if isinstance(jsonlib, str):
            jsonlib = lazy_import(jsonlib, is_module=True)
        assert hasattr(jsonlib, "dumps") and hasattr(jsonlib, "loads"), f"Invalid JSON Library: {jsonlib}"
        self.jsonlib = jsonlib
    self.jsonlib_name = self.jsonlib.__name__

compression_enabled property

compression_enabled: bool

Returns if compression is enabled

compression_level property

compression_level: typing.Optional[int]

Returns the compression level

adecode async

adecode(
    value: typing.Union[str, bytes],
    schema_map: typing.Optional[
        typing.Dict[str, str]
    ] = None,
    raise_errors: typing.Optional[bool] = None,
    **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value asynchronously

Source code in kvdb/io/serializers/_json.py
async def adecode(self, value: Union[str, bytes], schema_map: Optional[Dict[str, str]] = None, raise_errors: Optional[bool] = None, **kwargs) -> ObjectValue:
    """
    Decodes the value asynchronously
    """
    # try:
    return await Pooler.arun(self.decode, value, schema_map = schema_map, raise_errors = raise_errors, **kwargs)

adumps async

adumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Dumps the value asynchronously
    """
    return await Pooler.arun(self.dumps, value, **kwargs)

aencode async

aencode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value asynchronously
    """
    return await Pooler.arun(self.encode, value, **kwargs)

aloads async

aloads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value asynchronously
    """
    return await Pooler.arun(self.loads, value, **kwargs)

check_encoded_value

check_encoded_value(
    value: typing.Union[str, bytes]
) -> typing.Union[str, bytes]

Check the encoded value to remove the prefix

Source code in kvdb/io/serializers/_json.py
def check_encoded_value(self, value: Union[str, bytes]) -> Union[str, bytes]:
    """
    Check the encoded value to remove the prefix
    """
    if isinstance(value, bytes):
        logger.info(f'Value Bytes: {value}')
        if value.startswith(ENCODER_SERIALIZER_PREFIX_BYTES):
            value = value[ENCODER_SERIALIZER_PREFIX_BYTES_LEN:]
    elif isinstance(value, str):
        logger.info(f'Value Str: {value}')
        if value.startswith(ENCODER_SERIALIZER_PREFIX):
            value = value[ENCODER_SERIALIZER_PREFIX_LEN:]
    return value

compress_value

compress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Compresses the value

Source code in kvdb/io/serializers/base.py
def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    """
    Compresses the value
    """
    if self.compression_enabled:
        if isinstance(value, str): value = value.encode(self.encoding)
        return self.compressor.compress(value)
    return value

copy

copy(**kwargs) -> kvdb.io.serializers.base.BaseSerializer

Copies the serializer

Source code in kvdb/io/serializers/base.py
def copy(self, **kwargs) -> BaseSerializer:
    """
    Copies the serializer
    """
    new = deepcopy(self)
    for k, v in kwargs.items():
        if hasattr(new, k):
            setattr(new, k, v)
        else:
            new._kwargs[k] = v
        # setattr(new, k, v)
    return new

create_hash

create_hash(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Creates a hash for the object

Source code in kvdb/io/serializers/base.py
def create_hash(self, obj: ObjectValue) -> str:
    """
    Creates a hash for the object
    """
    return create_obj_hash(obj)

decode

decode(
    value: typing.Union[str, bytes],
    schema_map: typing.Optional[
        typing.Dict[str, str]
    ] = None,
    raise_errors: typing.Optional[bool] = None,
    **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value

Source code in kvdb/io/serializers/_json.py
def decode(self, value: Union[str, bytes], schema_map: Optional[Dict[str, str]] = None, raise_errors: Optional[bool] = None, **kwargs) -> ObjectValue:
    """
    Decodes the value
    """
    try:
        decompressed_value = self.decompress_value(value, **kwargs)
        if decompressed_value is not None:
            value = decompressed_value
    except Exception as e:
        if raise_errors or self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
        # return self.decode_value(value, **kwargs)
    return self.decode_value(value, schema_map = schema_map, raise_errors = raise_errors, **kwargs)

decode_one

decode_one(value: str, **kwargs) -> typing.Union[
    kvdb.io.serializers.base.SchemaType,
    typing.Dict,
    typing.Any,
]

Decode the value with the JSON Library

Source code in kvdb/io/serializers/_json.py
def decode_one(self, value: str, **kwargs) -> Union[SchemaType, Dict, Any]:
    """
    Decode the value with the JSON Library
    """
    if value is None: return None
    try:
        value = self.jsonlib.loads(value, **kwargs)
        if not self.disable_object_serialization and isinstance(value, dict) and '__class__' in value:
            obj_class_name = value.pop('__class__')
            if obj_class_name not in self.serialization_schemas:
                self.serialization_schemas[obj_class_name] = lazy_import(obj_class_name)
            obj_class = self.serialization_schemas[obj_class_name]
            value = obj_class.model_validate(value)
        elif self.serialization_obj is not None:
            value = self.serialization_obj.model_validate(value)
        return value
    except Exception as e:
        if not self.is_encoder: 
            logger.info(f'Error Decoding Value: |r|({type(value)}) {e}|e| {str(value)[:1000]}', colored = True, prefix = self.jsonlib_name)
            # logger.trace(f'Error Decoding Value: ({type(value)}) {str(value)[:1000]}', e, prefix = self.jsonlib_name)
        if self.raise_errors: raise e
    return None

decode_value

decode_value(
    value: str,
    schema_map: typing.Optional[
        typing.Dict[str, str]
    ] = None,
    raise_errors: typing.Optional[bool] = None,
    **kwargs
) -> typing.Union[
    kvdb.io.serializers.base.SchemaType,
    typing.Dict,
    typing.Any,
]

Decode the value with the JSON Library

Source code in kvdb/io/serializers/_json.py
def decode_value(self, value: str, schema_map: Optional[Dict[str, str]] = None, raise_errors: Optional[bool] = None, **kwargs) -> Union[SchemaType, Dict, Any]:
    """
    Decode the value with the JSON Library
    """
    if value is None: return None
    if isinstance(value, (str, bytes)):
        try:
            # value = self.check_encoded_value(value)
            value = self.jsonlib.loads(value, **kwargs)
        except Exception as e:
            if isinstance(value, str) and 'Exception' in value or 'Traceback (most recent call last):' in value:
                return value
            if not self.is_encoder: 
                str_value = str(value)
                if not schema_map: str_value = str_value[:1000]
                logger.info(f'Error JSON Decoding Value: |r|({type(value)}) {e}|e| {str_value}', colored = True, prefix = self.jsonlib_name)
                # logger.trace(f'Error JSON Decoding Value: ({type(value)}) {str(value)[:1000]}', e, prefix = self.jsonlib_name)
            if raise_errors or self.raise_errors: raise e
    try:
        return deserialize_object(value, schema_map = schema_map, allow_failed_import = self.allow_failed_import)
    except Exception as e:
        if not self.is_encoder: 
            str_value = str(value)
            if not schema_map: str_value = str_value[:1000]
            logger.trace(f'Error Deserializing Object: ({type(value)}) {str_value}', e, prefix = self.jsonlib_name)
            # logger.info(f'Error Decoding Value: |r|({type(value)}) {e}|e| {str(value)[:1000]}', colored = True, prefix = self.jsonlib_name)
        if raise_errors or self.raise_errors: raise e
    return None

decompress_value

decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Decompresses the value

Source code in kvdb/io/serializers/base.py
def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    # sourcery skip: extract-duplicate-method
    """
    Decompresses the value
    """
    if not self.compression_enabled: return value
    try:
        value = self.compressor.decompress(value, **kwargs)
    except Exception as e:
        if self.enable_deprecation_support or self.previous_compressor is not None:
            value = self.deprecated_decompress_value(value, **kwargs)
    if value is not None and not self.binary: value = value.decode(self.encoding)
    return value

deprecated_decompress_value

deprecated_decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]

Attempts to decompress the value using the deprecated compressor

Source code in kvdb/io/serializers/base.py
def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
    """
    Attempts to decompress the value using the deprecated compressor
    """
    e = None
    attempt_msg = f"{self.name}"
    if self.previous_compressor is not None:
        try:
            return self.previous_compressor.decompress(value)
        except Exception as e:
            attempt_msg += f"-> {self.previous_compressor.name}"
    try:
        return zlib.decompress(value)
    except Exception as e:
        attempt_msg += " -> ZLib"
        if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
        return None

dumps

dumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value

Source code in kvdb/io/serializers/base.py
def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    # sourcery skip: class-extract-method
    """
    Dumps the value
    """
    try:
        return self.encode(value, **kwargs)
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
        return None

encode

encode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value

Source code in kvdb/io/serializers/base.py
def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value
    """
    return self.compress_value(self.encode_value(value, **kwargs))

encode_value

encode_value(
    value: typing.Union[
        typing.Any, kvdb.io.serializers.base.SchemaType
    ],
    **kwargs
) -> str

Encode the value with the JSON Library

Source code in kvdb/io/serializers/_json.py
def encode_value(self, value: Union[Any, SchemaType], **kwargs) -> str:
    """
    Encode the value with the JSON Library
    """
    try:
        value_dict = serialize_object(value, **self.serialization_obj_kwargs)
        # logger.info(f'Value Dict: {value_dict}')
        encoded = self.jsonlib.dumps(value_dict, **kwargs)
        if self.ensure_string_value and isinstance(encoded, bytes):
            encoded = encoded.decode(self.encoding)
        return encoded

    except Exception as e:
        if not self.is_encoder: logger.trace(f'Error Encoding Value: |r|({type(value)})|e| {str(value)[:1000]}', e, colored = True)
    try:
        encoded = self.jsonlib.dumps(value, **kwargs)
        if self.ensure_string_value and isinstance(encoded, bytes):
            encoded = encoded.decode(self.encoding)
        return encoded
    except Exception as e:
        if not self.is_encoder: 
            logger.info(f'Error Encoding Value: |r|({type(value)}) {e}|e| {str(value)[:1000]}', colored = True, prefix = self.jsonlib_name)
        if self.raise_errors: raise e
    return None

fetch_object_classname

fetch_object_classname(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Fetches the object classname

Source code in kvdb/io/serializers/base.py
def fetch_object_classname(self, obj: ObjectValue) -> str:
    """
    Fetches the object classname
    """
    return f"{obj.__class__.__module__}.{obj.__class__.__name__}"

loads

loads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value

Source code in kvdb/io/serializers/base.py
def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value
    """
    try:
        return self.decode(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
        return None

set_default_lib classmethod

set_default_lib(
    lib: typing.Union[
        str,
        kvdb.io.serializers._json.JsonLibT,
        kvdb.io.serializers.base.ModuleType,
    ]
) -> None

Sets the default JSON library

Source code in kvdb/io/serializers/_json.py
@classmethod
def set_default_lib(cls, lib: Union[str, JsonLibT, ModuleType]) -> None:
    """
    Sets the default JSON library
    """
    global default_json
    if isinstance(lib, str):
        lib = lazy_import(lib, is_module=True)
    assert hasattr(lib, "dumps") and hasattr(lib, "loads"), f"Invalid JSON Library: {lib}"
    cls.jsonlib = lib
    default_json = lib