Skip to content

MsgPack

MsgPack is a binary serialization format that is similar to JSON, but more efficient. It is a good choice for serializing data that will be sent over the network or stored in a file.

References


API Reference

Bases: kvdb.io.serializers.base.BinaryBaseSerializer

Source code in kvdb/io/serializers/_msgpack.py
def __init__(
    self, 
    msgpacklib: Optional[Union[str, Any]] = None,
    compression: Optional[str] = None,
    compression_level: int | None = None, 
    encoding: str | None = None, 
    serialization_obj: Optional[Type[BaseModel]] = None,
    serialization_obj_kwargs: Optional[Dict[str, Any]] = None,
    disable_object_serialization: Optional[bool] = None,
    jsonlib: Optional[Union[str, Any]] = None,
    **kwargs
):
    if not default_msgpack:
        raise ImportError("MsgPack Serializer is not available. Please install `msgpack`")
    super().__init__(compression, compression_level, encoding, **kwargs)
    self.serialization_obj = serialization_obj
    self.serialization_obj_kwargs = serialization_obj_kwargs or {}
    self.serialization_schemas: Dict[str, Type[BaseModel]] = {}
    if disable_object_serialization is not None:
        self.disable_object_serialization = disable_object_serialization

    if msgpacklib is not None:
        if isinstance(msgpacklib, str):
            msgpacklib = lazy_import(msgpacklib, is_module=True)
        assert hasattr(msgpacklib, "packb") and hasattr(msgpacklib, "unpackb"), f"Invalid MsgPack Library: {picklelib}"
        self.msgpacklib = msgpacklib
    self.msgpacklib_name = self.msgpacklib.__name__
    if jsonlib is not None:
        if isinstance(jsonlib, str):
            jsonlib = lazy_import(jsonlib, is_module=True)
        assert hasattr(jsonlib, "dumps") and hasattr(jsonlib, "loads"), f"Invalid JSON Library: {jsonlib}"
        self.jsonlib = jsonlib
    self.jsonlib_name = self.jsonlib.__name__

compression_enabled property

compression_enabled: bool

Returns if compression is enabled

compression_level property

compression_level: typing.Optional[int]

Returns the compression level

adecode async

adecode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adecode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value asynchronously
    """
    return await Pooler.arun(self.decode, value, **kwargs)

adumps async

adumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Dumps the value asynchronously
    """
    return await Pooler.arun(self.dumps, value, **kwargs)

aencode async

aencode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value asynchronously
    """
    return await Pooler.arun(self.encode, value, **kwargs)

aloads async

aloads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value asynchronously
    """
    return await Pooler.arun(self.loads, value, **kwargs)

compress_value

compress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Compresses the value

Source code in kvdb/io/serializers/base.py
def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    """
    Compresses the value
    """
    if self.compression_enabled:
        if isinstance(value, str): value = value.encode(self.encoding)
        return self.compressor.compress(value)
    return value

copy

copy(**kwargs) -> kvdb.io.serializers.base.BaseSerializer

Copies the serializer

Source code in kvdb/io/serializers/base.py
def copy(self, **kwargs) -> BaseSerializer:
    """
    Copies the serializer
    """
    new = deepcopy(self)
    for k, v in kwargs.items():
        if hasattr(new, k):
            setattr(new, k, v)
        else:
            new._kwargs[k] = v
        # setattr(new, k, v)
    return new

create_hash

create_hash(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Creates a hash for the object

Source code in kvdb/io/serializers/base.py
def create_hash(self, obj: ObjectValue) -> str:
    """
    Creates a hash for the object
    """
    return create_obj_hash(obj)

decode

decode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value

Source code in kvdb/io/serializers/base.py
def decode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value
    """
    try:
        decompressed_value = self.decompress_value(value, **kwargs)
        if decompressed_value is not None:
            value = decompressed_value
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
        # return self.decode_value(value, **kwargs)
    return self.decode_value(value, **kwargs)

decode_value

decode_value(value: bytes, **kwargs) -> typing.Union[
    kvdb.io.serializers.base.SchemaType,
    typing.Dict,
    typing.Any,
]

Decode the value with the Pickle Library

Source code in kvdb/io/serializers/_msgpack.py
def decode_value(self, value: bytes, **kwargs) -> Union[SchemaType, Dict, Any]:
    """
    Decode the value with the Pickle Library
    """
    try:
        if 'raw' not in kwargs: kwargs['raw'] = False
        if 'ext_hook' not in kwargs: kwargs['ext_hook'] = self.default_deserialization_hook
        return self.msgpacklib.unpackb(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.info(f'Error Decoding Value: |r|({type(value)}) {e}|e| {str(value)[:500]}', colored = True, prefix = "msgpack")
        if self.raise_errors: raise e
    return None

decompress_value

decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Decompresses the value

Source code in kvdb/io/serializers/base.py
def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    # sourcery skip: extract-duplicate-method
    """
    Decompresses the value
    """
    if not self.compression_enabled: return value
    try:
        value = self.compressor.decompress(value, **kwargs)
    except Exception as e:
        if self.enable_deprecation_support or self.previous_compressor is not None:
            value = self.deprecated_decompress_value(value, **kwargs)
    if value is not None and not self.binary: value = value.decode(self.encoding)
    return value

default_deserialization_hook

default_deserialization_hook(
    code: int, data: typing.Union[str, bytes]
) -> kvdb.io.serializers.base.ObjectValue

Default Deserialization Hook

Source code in kvdb/io/serializers/_msgpack.py
def default_deserialization_hook(self, code: int, data: Union[str, bytes]) -> ObjectValue:
    """
    Default Deserialization Hook
    """
    if code != 2: return data
    if isinstance(data, bytes): data = data.decode(self.encoding)
    try:
        data = self.jsonlib.loads(data)
    except Exception as e:
        if not self.is_encoder: logger.info(f'Error Decoding Value: |r|({type(data)}) {e}|e| {str(data)[:500]}', colored = True, prefix = "msgpack")
        if self.raise_errors: raise e
        return data
    if not self.disable_object_serialization:
        _class = data.pop('__class__', None)
        if _class is not None:
            if _class not in self.serialization_schemas:
                self.serialization_schemas[_class] = lazy_import(_class)
            _class = self.serialization_schemas[_class]
            return _class.model_validate(data, **self.serialization_obj_kwargs)
    elif self.serialization_obj is not None:
        return self.serialization_obj.model_validate(data, **self.serialization_obj_kwargs)
    return data

default_serialization_hook

default_serialization_hook(
    obj: kvdb.io.serializers.base.ObjectValue,
)

Default Serialization Hook

Source code in kvdb/io/serializers/_msgpack.py
def default_serialization_hook(self, obj: ObjectValue):
    """
    Default Serialization Hook
    """
    if not isinstance(obj, BaseModel) and not hasattr(obj, 'model_dump'):
        if not self.is_encoder: logger.info(f'Invalid Object Type: |r|{type(obj)}|e| {obj}', colored = True, prefix = "msgpack")
        return obj

    if self.disable_object_serialization: 
        return obj.model_dump_json(**self.serialization_obj_kwargs)

    obj_class_name = self.fetch_object_classname(obj)
    if obj_class_name not in self.serialization_schemas:
        self.serialization_schemas[obj_class_name] = obj.__class__
    data = obj.model_dump(mode = 'json', **self.serialization_obj_kwargs)
    data['__class__'] = obj_class_name
    return self.msgpacklib.ExtType(2, self.jsonlib.dumps(data).encode(self.encoding))

deprecated_decompress_value

deprecated_decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]

Attempts to decompress the value using the deprecated compressor

Source code in kvdb/io/serializers/base.py
def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
    """
    Attempts to decompress the value using the deprecated compressor
    """
    e = None
    attempt_msg = f"{self.name}"
    if self.previous_compressor is not None:
        try:
            return self.previous_compressor.decompress(value)
        except Exception as e:
            attempt_msg += f"-> {self.previous_compressor.name}"
    try:
        return zlib.decompress(value)
    except Exception as e:
        attempt_msg += " -> ZLib"
        if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
        return None

dumps

dumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value

Source code in kvdb/io/serializers/base.py
def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    # sourcery skip: class-extract-method
    """
    Dumps the value
    """
    try:
        return self.encode(value, **kwargs)
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
        return None

encode

encode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value

Source code in kvdb/io/serializers/base.py
def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value
    """
    return self.compress_value(self.encode_value(value, **kwargs))

encode_value

encode_value(
    value: typing.Union[
        typing.Any, kvdb.io.serializers.base.SchemaType
    ],
    **kwargs
) -> bytes

Encode the value with the Pickle Library

Source code in kvdb/io/serializers/_msgpack.py
def encode_value(self, value: Union[Any, SchemaType], **kwargs) -> bytes:
    """
    Encode the value with the Pickle Library
    """
    if 'use_bin_type' not in kwargs: kwargs['use_bin_type'] = True
    if 'default' not in kwargs: kwargs['default'] = self.default_serialization_hook
    try:
        return self.msgpacklib.packb(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.info(f'Error Encoding Value: |r|({type(value)}) {e}|e| {str(value)[:500]}', colored = True, prefix = "msgpack")
        if self.raise_errors: raise e
    return None

fetch_object_classname

fetch_object_classname(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Fetches the object classname

Source code in kvdb/io/serializers/base.py
def fetch_object_classname(self, obj: ObjectValue) -> str:
    """
    Fetches the object classname
    """
    return f"{obj.__class__.__module__}.{obj.__class__.__name__}"

loads

loads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value

Source code in kvdb/io/serializers/base.py
def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value
    """
    try:
        return self.decode(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
        return None

set_default_lib classmethod

set_default_lib(
    lib: typing.Union[
        str,
        kvdb.io.serializers._msgpack.MsgPackLibT,
        kvdb.io.serializers.base.ModuleType,
    ]
) -> None

Sets the default MsgPack library

Source code in kvdb/io/serializers/_msgpack.py
@classmethod
def set_default_lib(cls, lib: Union[str, MsgPackLibT, ModuleType]) -> None:
    """
    Sets the default MsgPack library
    """
    global default_msgpack
    if isinstance(lib, str):
        lib = lazy_import(lib, is_module=True)
    assert hasattr(lib, "packb") and hasattr(lib, "unpackb"), f"Invalid Msgpack Library: `{lib}`"
    cls.msgpacklib = lib
    default_msgpack = lib