Skip to content

Pickle

Pickle is a binary serialization format that is specific to Python. It is a very powerful serialization format that can serialize almost any Python object. However, it is not recommended to use pickle to serialize and deserialize data from untrusted sources, as it can lead to security vulnerabilities.

It is only used as the default serializer for Jobs to ensure that the Job can be serialized and deserialized across different Python environments.

By default, the pickle serializer uses the the first available sub-library in the following order of priority: - cloudpickle - dill - pickle

References


API Reference

Bases: kvdb.io.serializers.base.BinaryBaseSerializer

Source code in kvdb/io/serializers/_pickle.py
def __init__(
    self, 
    picklelib: Optional[Union[str, Any]] = None,
    compression: Optional[str] = None,
    compression_level: int | None = None, 
    encoding: str | None = None, 
    **kwargs
):
    super().__init__(compression, compression_level, encoding, **kwargs)
    if picklelib is not None:
        if isinstance(picklelib, str):
            picklelib = lazy_import(picklelib, is_module=True)
        assert hasattr(picklelib, "dumps") and hasattr(picklelib, "loads"), f"Invalid Pickle Library: {picklelib}"
        self.picklelib = picklelib
    self.picklelib_name = self.picklelib.__name__

compression_enabled property

compression_enabled: bool

Returns if compression is enabled

compression_level property

compression_level: typing.Optional[int]

Returns the compression level

adecode async

adecode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adecode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value asynchronously
    """
    return await Pooler.arun(self.decode, value, **kwargs)

adumps async

adumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value asynchronously

Source code in kvdb/io/serializers/base.py
async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Dumps the value asynchronously
    """
    return await Pooler.arun(self.dumps, value, **kwargs)

aencode async

aencode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value asynchronously
    """
    return await Pooler.arun(self.encode, value, **kwargs)

aloads async

aloads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value asynchronously

Source code in kvdb/io/serializers/base.py
async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value asynchronously
    """
    return await Pooler.arun(self.loads, value, **kwargs)

compress_value

compress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Compresses the value

Source code in kvdb/io/serializers/base.py
def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    """
    Compresses the value
    """
    if self.compression_enabled:
        if isinstance(value, str): value = value.encode(self.encoding)
        return self.compressor.compress(value)
    return value

copy

copy(**kwargs) -> kvdb.io.serializers.base.BaseSerializer

Copies the serializer

Source code in kvdb/io/serializers/base.py
def copy(self, **kwargs) -> BaseSerializer:
    """
    Copies the serializer
    """
    new = deepcopy(self)
    for k, v in kwargs.items():
        if hasattr(new, k):
            setattr(new, k, v)
        else:
            new._kwargs[k] = v
        # setattr(new, k, v)
    return new

create_hash

create_hash(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Creates a hash for the object

Source code in kvdb/io/serializers/base.py
def create_hash(self, obj: ObjectValue) -> str:
    """
    Creates a hash for the object
    """
    return create_obj_hash(obj)

decode

decode(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Decodes the value

Source code in kvdb/io/serializers/base.py
def decode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Decodes the value
    """
    try:
        decompressed_value = self.decompress_value(value, **kwargs)
        if decompressed_value is not None:
            value = decompressed_value
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
        # return self.decode_value(value, **kwargs)
    return self.decode_value(value, **kwargs)

decode_value

decode_value(value: bytes, **kwargs) -> typing.Union[
    kvdb.io.serializers.base.SchemaType,
    typing.Dict,
    typing.Any,
]

Decode the value with the Pickle Library

Source code in kvdb/io/serializers/_pickle.py
def decode_value(self, value: bytes, **kwargs) -> Union[SchemaType, Dict, Any]:
    """
    Decode the value with the Pickle Library
    """
    try:
        if self.picklelib_name == 'cloudpickle':
            if 'encoding' not in kwargs:
                kwargs['encoding'] = self.encoding
            if 'fix_imports' not in kwargs:
                kwargs['fix_imports'] = False
        return self.picklelib.loads(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: 
            logger.trace(f'Error Deserializing Object: ({type(value)}) {str(value)[:1000]}', e, prefix = self.picklelib_name)
            # logger.info(f'Error Decoding Value: |r|({type(value)}) {e}|e| {value}', colored = True, prefix = self.picklelib_name)
        if self.raise_errors: raise e
    return None

decompress_value

decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]

Decompresses the value

Source code in kvdb/io/serializers/base.py
def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
    # sourcery skip: extract-duplicate-method
    """
    Decompresses the value
    """
    if not self.compression_enabled: return value
    try:
        value = self.compressor.decompress(value, **kwargs)
    except Exception as e:
        if self.enable_deprecation_support or self.previous_compressor is not None:
            value = self.deprecated_decompress_value(value, **kwargs)
    if value is not None and not self.binary: value = value.decode(self.encoding)
    return value

deprecated_decompress_value

deprecated_decompress_value(
    value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]

Attempts to decompress the value using the deprecated compressor

Source code in kvdb/io/serializers/base.py
def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
    """
    Attempts to decompress the value using the deprecated compressor
    """
    e = None
    attempt_msg = f"{self.name}"
    if self.previous_compressor is not None:
        try:
            return self.previous_compressor.decompress(value)
        except Exception as e:
            attempt_msg += f"-> {self.previous_compressor.name}"
    try:
        return zlib.decompress(value)
    except Exception as e:
        attempt_msg += " -> ZLib"
        if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
        return None

dumps

dumps(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Dumps the value

Source code in kvdb/io/serializers/base.py
def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    # sourcery skip: class-extract-method
    """
    Dumps the value
    """
    try:
        return self.encode(value, **kwargs)
    except Exception as e:
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
        return None

encode

encode(
    value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]

Encodes the value

Source code in kvdb/io/serializers/base.py
def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
    """
    Encodes the value
    """
    return self.compress_value(self.encode_value(value, **kwargs))

encode_value

encode_value(
    value: typing.Union[
        typing.Any, kvdb.io.serializers.base.SchemaType
    ],
    **kwargs
) -> bytes

Encode the value with the Pickle Library

Source code in kvdb/io/serializers/_pickle.py
def encode_value(self, value: Union[Any, SchemaType], **kwargs) -> bytes:
    """
    Encode the value with the Pickle Library
    """
    try:
        return self.picklelib.dumps(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: 
            logger.trace(f'Error Encoding Object: ({type(value)}) {str(value)[:1000]}', e, prefix = self.picklelib_name)
            # logger.info(f'Error Encoding Value: |r|({type(value)}) {e}|e| {value}', colored = True, prefix = self.picklelib_name)
        if self.raise_errors: raise e
    return None

fetch_object_classname

fetch_object_classname(
    obj: kvdb.io.serializers.base.ObjectValue,
) -> str

Fetches the object classname

Source code in kvdb/io/serializers/base.py
def fetch_object_classname(self, obj: ObjectValue) -> str:
    """
    Fetches the object classname
    """
    return f"{obj.__class__.__module__}.{obj.__class__.__name__}"

loads

loads(
    value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue

Loads the value

Source code in kvdb/io/serializers/base.py
def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
    """
    Loads the value
    """
    try:
        return self.decode(value, **kwargs)
    except Exception as e:
        if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
        if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
        return None

set_default_lib classmethod

set_default_lib(
    lib: typing.Union[
        str,
        kvdb.io.serializers._pickle.PickleLibT,
        kvdb.io.serializers.base.ModuleType,
    ]
) -> None

Sets the default Pickle library

Source code in kvdb/io/serializers/_pickle.py
@classmethod
def set_default_lib(cls, lib: Union[str, PickleLibT, ModuleType]) -> None:
    """
    Sets the default Pickle library
    """
    global default_pickle
    if isinstance(lib, str):
        lib = lazy_import(lib, is_module=True)
    assert hasattr(lib, "loads") and hasattr(lib, "dumps"), f"Invalid Pickle Library: `{lib}`"
    cls.picklelib = lib
    default_pickle = lib