Pickle
Pickle is a binary serialization format that is specific to Python. It is a very powerful serialization format that can serialize almost any Python object. However, it is not recommended to use pickle to serialize and deserialize data from untrusted sources, as it can lead to security vulnerabilities.
It is only used as the default serializer for Jobs
to ensure that the Job
can be serialized and deserialized across different Python environments.
By default, the pickle
serializer uses the the first available sub-library in the following order of priority:
- cloudpickle
- dill
- pickle
References
API Reference
Bases: kvdb.io.serializers.base.BinaryBaseSerializer
Source code in kvdb/io/serializers/_pickle.py
| def __init__(
self,
picklelib: Optional[Union[str, Any]] = None,
compression: Optional[str] = None,
compression_level: int | None = None,
encoding: str | None = None,
**kwargs
):
super().__init__(compression, compression_level, encoding, **kwargs)
if picklelib is not None:
if isinstance(picklelib, str):
picklelib = lazy_import(picklelib, is_module=True)
assert hasattr(picklelib, "dumps") and hasattr(picklelib, "loads"), f"Invalid Pickle Library: {picklelib}"
self.picklelib = picklelib
self.picklelib_name = self.picklelib.__name__
|
compression_enabled
property
compression_enabled: bool
Returns if compression is enabled
compression_level
property
compression_level: typing.Optional[int]
Returns the compression level
adecode
async
adecode(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Decodes the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def adecode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Decodes the value asynchronously
"""
return await Pooler.arun(self.decode, value, **kwargs)
|
adumps
async
adumps(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Dumps the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def adumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Dumps the value asynchronously
"""
return await Pooler.arun(self.dumps, value, **kwargs)
|
aencode
async
aencode(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Encodes the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def aencode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Encodes the value asynchronously
"""
return await Pooler.arun(self.encode, value, **kwargs)
|
aloads
async
aloads(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Loads the value asynchronously
Source code in kvdb/io/serializers/base.py
| async def aloads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Loads the value asynchronously
"""
return await Pooler.arun(self.loads, value, **kwargs)
|
compress_value
compress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]
Compresses the value
Source code in kvdb/io/serializers/base.py
| def compress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
"""
Compresses the value
"""
if self.compression_enabled:
if isinstance(value, str): value = value.encode(self.encoding)
return self.compressor.compress(value)
return value
|
copy
Copies the serializer
Source code in kvdb/io/serializers/base.py
| def copy(self, **kwargs) -> BaseSerializer:
"""
Copies the serializer
"""
new = deepcopy(self)
for k, v in kwargs.items():
if hasattr(new, k):
setattr(new, k, v)
else:
new._kwargs[k] = v
# setattr(new, k, v)
return new
|
create_hash
create_hash(
obj: kvdb.io.serializers.base.ObjectValue,
) -> str
Creates a hash for the object
Source code in kvdb/io/serializers/base.py
| def create_hash(self, obj: ObjectValue) -> str:
"""
Creates a hash for the object
"""
return create_obj_hash(obj)
|
decode
decode(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Decodes the value
Source code in kvdb/io/serializers/base.py
| def decode(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Decodes the value
"""
try:
decompressed_value = self.decompress_value(value, **kwargs)
if decompressed_value is not None:
value = decompressed_value
except Exception as e:
if self.raise_errors: raise DataError(f"[{self.name}] Error in Decompression: {str(value)[:100]}") from e
# return self.decode_value(value, **kwargs)
return self.decode_value(value, **kwargs)
|
decode_value
decode_value(value: bytes, **kwargs) -> typing.Union[
kvdb.io.serializers.base.SchemaType,
typing.Dict,
typing.Any,
]
Decode the value with the Pickle Library
Source code in kvdb/io/serializers/_pickle.py
| def decode_value(self, value: bytes, **kwargs) -> Union[SchemaType, Dict, Any]:
"""
Decode the value with the Pickle Library
"""
try:
if self.picklelib_name == 'cloudpickle':
if 'encoding' not in kwargs:
kwargs['encoding'] = self.encoding
if 'fix_imports' not in kwargs:
kwargs['fix_imports'] = False
return self.picklelib.loads(value, **kwargs)
except Exception as e:
if not self.is_encoder:
logger.trace(f'Error Deserializing Object: ({type(value)}) {str(value)[:1000]}', e, prefix = self.picklelib_name)
# logger.info(f'Error Decoding Value: |r|({type(value)}) {e}|e| {value}', colored = True, prefix = self.picklelib_name)
if self.raise_errors: raise e
return None
|
decompress_value
decompress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Union[str, bytes]
Decompresses the value
Source code in kvdb/io/serializers/base.py
| def decompress_value(self, value: Union[str, bytes], **kwargs) -> Union[str, bytes]:
# sourcery skip: extract-duplicate-method
"""
Decompresses the value
"""
if not self.compression_enabled: return value
try:
value = self.compressor.decompress(value, **kwargs)
except Exception as e:
if self.enable_deprecation_support or self.previous_compressor is not None:
value = self.deprecated_decompress_value(value, **kwargs)
if value is not None and not self.binary: value = value.decode(self.encoding)
return value
|
deprecated_decompress_value
deprecated_decompress_value(
value: typing.Union[str, bytes], **kwargs
) -> typing.Optional[typing.Union[str, bytes]]
Attempts to decompress the value using the deprecated compressor
Source code in kvdb/io/serializers/base.py
| def deprecated_decompress_value(self, value: Union[str, bytes], **kwargs) -> Optional[Union[str, bytes]]:
"""
Attempts to decompress the value using the deprecated compressor
"""
e = None
attempt_msg = f"{self.name}"
if self.previous_compressor is not None:
try:
return self.previous_compressor.decompress(value)
except Exception as e:
attempt_msg += f"-> {self.previous_compressor.name}"
try:
return zlib.decompress(value)
except Exception as e:
attempt_msg += " -> ZLib"
if self.raise_errors: raise DataError(f"[{attempt_msg}] Error in Decompression: {str(value)[:100]}") from e
return None
|
dumps
dumps(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Dumps the value
Source code in kvdb/io/serializers/base.py
| def dumps(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
# sourcery skip: class-extract-method
"""
Dumps the value
"""
try:
return self.encode(value, **kwargs)
except Exception as e:
if self.raise_errors: raise DataError(f"[{self.name}] Error in Encoding: {str(value)[:500]}") from e
return None
|
encode
encode(
value: kvdb.io.serializers.base.ObjectValue, **kwargs
) -> typing.Union[str, bytes]
Encodes the value
Source code in kvdb/io/serializers/base.py
| def encode(self, value: ObjectValue, **kwargs) -> Union[str, bytes]:
"""
Encodes the value
"""
return self.compress_value(self.encode_value(value, **kwargs))
|
encode_value
encode_value(
value: typing.Union[
typing.Any, kvdb.io.serializers.base.SchemaType
],
**kwargs
) -> bytes
Encode the value with the Pickle Library
Source code in kvdb/io/serializers/_pickle.py
| def encode_value(self, value: Union[Any, SchemaType], **kwargs) -> bytes:
"""
Encode the value with the Pickle Library
"""
try:
return self.picklelib.dumps(value, **kwargs)
except Exception as e:
if not self.is_encoder:
logger.trace(f'Error Encoding Object: ({type(value)}) {str(value)[:1000]}', e, prefix = self.picklelib_name)
# logger.info(f'Error Encoding Value: |r|({type(value)}) {e}|e| {value}', colored = True, prefix = self.picklelib_name)
if self.raise_errors: raise e
return None
|
fetch_object_classname
fetch_object_classname(
obj: kvdb.io.serializers.base.ObjectValue,
) -> str
Fetches the object classname
Source code in kvdb/io/serializers/base.py
| def fetch_object_classname(self, obj: ObjectValue) -> str:
"""
Fetches the object classname
"""
return f"{obj.__class__.__module__}.{obj.__class__.__name__}"
|
loads
loads(
value: typing.Union[str, bytes], **kwargs
) -> kvdb.io.serializers.base.ObjectValue
Loads the value
Source code in kvdb/io/serializers/base.py
| def loads(self, value: Union[str, bytes], **kwargs) -> ObjectValue:
"""
Loads the value
"""
try:
return self.decode(value, **kwargs)
except Exception as e:
if not self.is_encoder: logger.trace(f'[{self.name}] Error in Decoding: {str(value)[:500]}', e)
if self.raise_errors: raise DataError(f"[{self.name}] Error in Decoding: {str(value)[:500]}") from e
return None
|
set_default_lib
classmethod
set_default_lib(
lib: typing.Union[
str,
kvdb.io.serializers._pickle.PickleLibT,
kvdb.io.serializers.base.ModuleType,
]
) -> None
Sets the default Pickle library
Source code in kvdb/io/serializers/_pickle.py
| @classmethod
def set_default_lib(cls, lib: Union[str, PickleLibT, ModuleType]) -> None:
"""
Sets the default Pickle library
"""
global default_pickle
if isinstance(lib, str):
lib = lazy_import(lib, is_module=True)
assert hasattr(lib, "loads") and hasattr(lib, "dumps"), f"Invalid Pickle Library: `{lib}`"
cls.picklelib = lib
default_pickle = lib
|