Overview
Serialization plays an important role in the KVDB
library. It allows you to store almost any type of object when using any set/get
methods without having to worry about serializing and deserializing prior.
The kvdb
library provides several default serializers and compression algorithms to choose from, and provides extensibility to add custom serializers and compression algorithms.
Default Supported Serializers:
A session can be configured to use a serializer when serializing and deserializing data in-flight. This allows you to store almost any type of object when using any set/get
methods without having to worry about serializing and deserializing prior.
Even with json
, almost all data types and objects can be serialized by utilizing an intelligent deterministic storing of the object's metadata on serialization and then reconstructing the object on deserialization. This is done by storing the object's type and any additional metadata required to reconstruct the object.
Serialization has first-class support for pydantic
models and dataclasses
.
This can be done by passing a serializer to the get_session()
method.
It is currently not recommended to mix serialization with sub-dependent libraries that may do encoding and decoding prior to passing the data to KV Store, as it can lead to serialization errors.
The following is a list of the arguments that can be passed to the get_session()
method:
-
serializer
: The serializer to use when serializing and deserializing data. Defaults to None
. Set to auto
, which defaults to json
to automatically use a serializer based on the data type. Each type supports sub-libraries which will be used if they are installed. Additionally, you can pass a custom kwarg to try to import the serializer.
Supported serializers (and sub-libraries based on priority):
json
: The JSON serializer.
Kwarg: jsonlib
- [x]
simdjson
- [x]
orjson
- [x]
ujson
- [x]
json
msgpack
: The MessagePack serializer.
pickle
: The pickle serializer.
Kwarg: picklelib
- [x]
cloudpickle
- [x]
dill
- [x]
pickle
-
serializer_kwargs
: The keyword arguments to pass to the serializer when serializing and deserializing data. Defaults to None
.
compression
: The compression algorithm to use when compressing and decompressing data. Defaults to None
.
Supported compression algorithms:
compression_level
: The compression level to use when compressing data. Defaults to None
, which will use the default compression level for the compression algorithm.
compression_enabled
: Whether or not compression should be enabled. Defaults to None
. If True
and compression
is None
, then it will be determined based on which compression algorithms are available. If False
, then compression will be disabled.
Serialization Classes
The following is the base class that serializers from:
Bases: abc.ABC
The Base Serializer Class
Initializes the serializer
Source code in kvdb/io/serializers/base.py
| def __init__(
self,
compression: Optional[str] = None,
compression_level: Optional[int] = None,
encoding: Optional[str] = None,
raise_errors: bool = False,
enable_deprecation_support: bool = True,
is_encoder: Optional[bool] = None,
**kwargs,
):
"""
Initializes the serializer
"""
# Add support for copying serializers
if 'compressor' in kwargs:
self.compressor = kwargs.pop('compressor')
if 'previous_compressor' in kwargs:
self.previous_compressor = kwargs.pop('previous_compressor')
elif compression is not None or compression_level is not None:
from ..compression import get_compression
compression_kwargs = kwargs.pop("compression_kwargs", None)
decompression_kwargs = kwargs.pop("decompression_kwargs", None)
deprecated_compression = kwargs.pop("deprecated_compression", None)
self.compressor = get_compression(
compression,
compression_level = compression_level,
compression_kwargs = compression_kwargs,
decompression_kwargs = decompression_kwargs,
)
if deprecated_compression is not None and deprecated_compression != compression:
self.previous_compressor = get_compression(deprecated_compression)
if encoding is not None: self.encoding = encoding
# logger.info(f"Initializing Serializer: {self.name}: {self.compressor}")
self.raise_errors = raise_errors
self.enable_deprecation_support = enable_deprecation_support
self.is_encoder = is_encoder
self._kwargs = kwargs
|
compression_enabled
property
compression_enabled: bool
Returns if compression is enabled
compression_level
property
compression_level: typing.Optional[int]
Returns the compression level
adecode
async
adecode(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Decodes the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def adecode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Decodes the value asynchronously
"""
return await Pooler.arun(self.decode, value, **kwargs)
|
adumps
async
adumps(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Dumps the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Dumps the value asynchronously
"""
return await Pooler.arun(self.dumps, value, **kwargs)
|
aencode
async
aencode(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Encodes the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Encodes the value asynchronously
"""
return await Pooler.arun(self.encode, value, **kwargs)
|
aloads
async
aloads(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Loads the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Loads the value asynchronously
"""
return await Pooler.arun(self.loads, value, **kwargs)
|
compress_value
compress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]
Compresses the value
Source code in kvdb/io/serializers/base.py
| def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
"""
Compresses the value
"""
if self.compression_enabled:
if isinstance(value, str): value = value.encode(self.encoding)
return self.compressor.compress(value)
return value
|
copy
Copies the serializer
Source code in kvdb/io/serializers/base.py
| def copy(self, **kwargs) -> BaseSerializer:
"""
Copies the serializer
"""
new = deepcopy(self)
for k, v in kwargs.items():
if hasattr(new, k):
setattr(new, k, v)
else:
new._kwargs[k] = v
# setattr(new, k, v)
return new
|
create_hash
create_hash(
obj: kvdb.io.serializers.base.ObjectValue,
) -> str
Creates a hash for the object
Source code in kvdb/io/serializers/base.py
| def create_hash(self, obj: ObjectValue) -> str:
"""
Creates a hash for the object
"""
return create_obj_hash(obj)
|
decode
decode(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Decodes the value
Source code in kvdb/io/serializers/base.py
| def decode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Decodes the value
"""
try:
decompressed_value = self.decompress_value(value, **kwargs)
if decompressed_value is not None:
value = decompressed_value
except Exception as e:
if self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
# return self.decode_value(value, **kwargs)
return self.decode_value(value, **kwargs)
|
decode_value
decode_value(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Decodes the value
Source code in kvdb/io/serializers/base.py
| def decode_value(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Decodes the value
"""
raise NotImplementedError
|
decompress_value
decompress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]
Decompresses the value
Source code in kvdb/io/serializers/base.py
| def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
# sourcery skip: extract-duplicate-method
"""
Decompresses the value
"""
if not self.compression_enabled: return value
try:
value = self.compressor.decompress(value, **kwargs)
except Exception as e:
if self.enable_deprecation_support or self.previous_compressor is not None:
value = self.deprecated_decompress_value(value, **kwargs)
if value is not None and not self.binary: value = value.decode(self.encoding)
return value
|
deprecated_decompress_value
deprecated_decompress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]
Attempts to decompress the value using the deprecated compressor
Source code in kvdb/io/serializers/base.py
| def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
"""
Attempts to decompress the value using the deprecated compressor
"""
e = None
attempt_msg = f"{self.name}"
if self.previous_compressor is not None:
try:
return self.previous_compressor.decompress(value)
except Exception as e:
attempt_msg += f"-> {self.previous_compressor.name}"
try:
return zlib.decompress(value)
except Exception as e:
attempt_msg += " -> ZLib"
if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
return None
|
dumps
dumps(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Dumps the value
Source code in kvdb/io/serializers/base.py
| def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
# sourcery skip: class-extract-method
"""
Dumps the value
"""
try:
return self.encode(value, **kwargs)
except Exception as e:
if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
return None
|
encode
encode(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Encodes the value
Source code in kvdb/io/serializers/base.py
| def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Encodes the value
"""
return self.compress_value(self.encode_value(value, **kwargs))
|
encode_value
encode_value(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Encodes the value
Source code in kvdb/io/serializers/base.py
| def encode_value(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Encodes the value
"""
raise NotImplementedError
|
fetch_object_classname
fetch_object_classname(
obj: kvdb.io.serializers.base.ObjectValue,
) -> str
Fetches the object classname
Source code in kvdb/io/serializers/base.py
| def fetch_object_classname(self, obj: ObjectValue) -> str:
"""
Fetches the object classname
"""
return f"{obj.__class__.__module__}.{obj.__class__.__name__}"
|
loads
loads(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Loads the value
Source code in kvdb/io/serializers/base.py
| def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Loads the value
"""
try:
return self.decode(value, **kwargs)
except Exception as e:
if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
return None
|
set_default_lib
classmethod
set_default_lib(
lib: typing.Union[str, types.ModuleType]
) -> None
Sets the default library
Source code in kvdb/io/serializers/base.py
| @classmethod
def set_default_lib(cls, lib: Union[str, ModuleType]) -> None:
"""
Sets the default library
"""
pass
|