Skip to content

Overview

Serialization plays an important role in the KVDB library. It allows you to store almost any type of object when using any set/get methods without having to worry about serializing and deserializing prior.

The kvdb library provides several default serializers and compression algorithms to choose from, and provides extensibility to add custom serializers and compression algorithms.

Default Supported Serializers:

A session can be configured to use a serializer when serializing and deserializing data in-flight. This allows you to store almost any type of object when using any set/get methods without having to worry about serializing and deserializing prior.

Even with json, almost all data types and objects can be serialized by utilizing an intelligent deterministic storing of the object's metadata on serialization and then reconstructing the object on deserialization. This is done by storing the object's type and any additional metadata required to reconstruct the object.

Serialization has first-class support for pydantic models and dataclasses.

This can be done by passing a serializer to the get_session() method. It is currently not recommended to mix serialization with sub-dependent libraries that may do encoding and decoding prior to passing the data to KV Store, as it can lead to serialization errors.

The following is a list of the arguments that can be passed to the get_session() method:

  • serializer: The serializer to use when serializing and deserializing data. Defaults to None. Set to auto, which defaults to json to automatically use a serializer based on the data type. Each type supports sub-libraries which will be used if they are installed. Additionally, you can pass a custom kwarg to try to import the serializer. Supported serializers (and sub-libraries based on priority):

    • json: The JSON serializer. Kwarg: jsonlib
    • [x] simdjson
    • [x] orjson
    • [x] ujson
    • [x] json
    • msgpack: The MessagePack serializer.
    • pickle: The pickle serializer. Kwarg: picklelib
      • [x] cloudpickle
      • [x] dill
      • [x] pickle
  • serializer_kwargs: The keyword arguments to pass to the serializer when serializing and deserializing data. Defaults to None.

  • compression: The compression algorithm to use when compressing and decompressing data. Defaults to None. Supported compression algorithms:
    • zstd
    • lz4
    • gzip
    • zlib
  • compression_level: The compression level to use when compressing data. Defaults to None, which will use the default compression level for the compression algorithm.
  • compression_enabled: Whether or not compression should be enabled. Defaults to None. If True and compression is None, then it will be determined based on which compression algorithms are available. If False, then compression will be disabled.

Serialization Classes

The following is the base class that serializers from:

Bases: abc.ABC

The Base Serializer Class

Initializes the serializer

Source code in kvdb/io/serializers/base.py
def __init__(
    self,
    compression: Optional[str] = None,
    compression_level: Optional[int] = None,
    encoding: Optional[str] = None,
    raise_errors: bool = False,
    enable_deprecation_support: bool = True,
    is_encoder: Optional[bool] = None,
    **kwargs,
):
    """
    Initializes the serializer
    """
    # Add support for copying serializers
    if 'compressor' in kwargs: 
        self.compressor = kwargs.pop('compressor')
        if 'previous_compressor' in kwargs: 
            self.previous_compressor = kwargs.pop('previous_compressor')
    elif compression is not None or compression_level is not None:
        from ..compression import get_compression
        compression_kwargs = kwargs.pop("compression_kwargs", None)
        decompression_kwargs = kwargs.pop("decompression_kwargs", None)
        deprecated_compression = kwargs.pop("deprecated_compression", None)
        self.compressor = get_compression(
            compression, 
            compression_level = compression_level, 
            compression_kwargs = compression_kwargs, 
            decompression_kwargs = decompression_kwargs,
        )
        if deprecated_compression is not None and deprecated_compression != compression:
            self.previous_compressor = get_compression(deprecated_compression)
    if encoding is not None: self.encoding = encoding
    # logger.info(f"Initializing Serializer: {self.name}: {self.compressor}")
    self.raise_errors = raise_errors
    self.enable_deprecation_support = enable_deprecation_support
    self.is_encoder = is_encoder
    self._kwargs = kwargs

compression_enabled property

compression_enabled: bool

Returns if compression is enabled

compression_level property

compression_level: typing.Optional[int]

Returns the compression level

adecode async

adecode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adecode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value asynchronously
    """
    return await Pooler.arun(self.decode, value, **kwargs)

adumps async

adumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Dumps the value asynchronously
    """
    return await Pooler.arun(self.dumps, value, **kwargs)

aencode async

aencode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value asynchronously
    """
    return await Pooler.arun(self.encode, value, **kwargs)

aloads async

aloads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value asynchronously
    """
    return await Pooler.arun(self.loads, value, **kwargs)

compress_value

compress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Compresses the value

Source code in kvdb/io/serializers/base.py
def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    """
    Compresses the value
    """
    if self.compression_enabled:
        if isinstance(value, str): value = value.encode(self.encoding)
        return self.compressor.compress(value)
    return value

copy

copy(**kwargs) -> kvdb.io.serializers.base.BaseSerializer

Copies the serializer

Source code in kvdb/io/serializers/base.py
def copy(self, **kwargs) -> BaseSerializer:
    """
    Copies the serializer
    """
    new = deepcopy(self)
    for k, v in kwargs.items():
        if hasattr(new, k):
            setattr(new, k, v)
        else:
            new._kwargs[k] = v
        # setattr(new, k, v)
    return new

create_hash

create_hash(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Creates a hash for the object

Source code in kvdb/io/serializers/base.py
def create_hash(self, obj: ObjectValue) -> str:
    """
    Creates a hash for the object
    """
    return create_obj_hash(obj)

decode

decode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value

Source code in kvdb/io/serializers/base.py
def decode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value
    """
    try:
        decompressed_value = self.decompress_value(value, **kwargs)
        if decompressed_value is not None:
            value = decompressed_value
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
        # return self.decode_value(value, **kwargs)
    return self.decode_value(value, **kwargs)

decode_value

decode_value(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value

Source code in kvdb/io/serializers/base.py
def decode_value(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value
    """
    raise NotImplementedError

decompress_value

decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Decompresses the value

Source code in kvdb/io/serializers/base.py
def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    # sourcery skip: extract-duplicate-method
    """
    Decompresses the value
    """
    if not self.compression_enabled: return value
    try:
        value = self.compressor.decompress(value, **kwargs)
    except Exception as e:
        if self.enable_deprecation_support or self.previous_compressor is not None:
            value = self.deprecated_decompress_value(value, **kwargs)
    if value is not None and not self.binary: value = value.decode(self.encoding)
    return value

deprecated_decompress_value

deprecated_decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]

Attempts to decompress the value using the deprecated compressor

Source code in kvdb/io/serializers/base.py
def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
    """
    Attempts to decompress the value using the deprecated compressor
    """
    e = None
    attempt_msg = f"{self.name}"
    if self.previous_compressor is not None:
        try:
            return self.previous_compressor.decompress(value)
        except Exception as e:
            attempt_msg += f"-> {self.previous_compressor.name}"
    try:
        return zlib.decompress(value)
    except Exception as e:
        attempt_msg += " -> ZLib"
        if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
        return None

dumps

dumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value

Source code in kvdb/io/serializers/base.py
def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    # sourcery skip: class-extract-method
    """
    Dumps the value
    """
    try:
        return self.encode(value, **kwargs)
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
        return None

encode

encode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value

Source code in kvdb/io/serializers/base.py
def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value
    """
    return self.compress_value(self.encode_value(value, **kwargs))

encode_value

encode_value(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value

Source code in kvdb/io/serializers/base.py
def encode_value(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value
    """
    raise NotImplementedError

fetch_object_classname

fetch_object_classname(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Fetches the object classname

Source code in kvdb/io/serializers/base.py
def fetch_object_classname(self, obj: ObjectValue) -> str:
    """
    Fetches the object classname
    """
    return f"{obj.__class__.__module__}.{obj.__class__.__name__}"

loads

loads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value

Source code in kvdb/io/serializers/base.py
def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value
    """
    try:
        return self.decode(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
        return None

set_default_lib classmethod

set_default_lib(
    lib: typing.Union[str, types.ModuleType]
) -> None

Sets the default library

Source code in kvdb/io/serializers/base.py
@classmethod
def set_default_lib(cls, lib: Union[str, ModuleType]) -> None:
    """
    Sets the default library
    """
    pass