Skip to content

Cachify Object

The cachify object is the wrapper around each individual cached function that is managed by the cachify context.

The following example demonstrates usage of the Cachify class to cache objects.

objects.py
"""
A Very basic demonstration of how to use cachify with objects

Without any arguments, cachify will use the default settings and initialize
a default session.

Similiar to registering functions, you can register an object for caching.

- The objects functions that are explicitly registered will be cached, and any that are not
    will not be affected.

Run this example:

    # cwd: examples/caching
    $ python objects.py
"""

import time
import abc
import asyncio
from lazyops.utils.times import Timer
from kvdb.io import cachify
from kvdb.utils.logs import logger

DEBUG_ENABLED = False

@cachify.register_object()
class TestObject(abc.ABC):

    def __init__(self, *args, **kwargs):
        logger.info('running init')

    @cachify.register(ttl = 10, verbosity = 2 if DEBUG_ENABLED else None, cache_max_size = 15)
    async def async_fibonacci(self, number: int):
        if number == 0: return 0
        elif number == 1: return 1
        return await self.async_fibonacci(number - 1) + await self.async_fibonacci(number - 2)

    @cachify.register(ttl = 10, verbosity = 2 if DEBUG_ENABLED else None)
    def fibonacci(self, number: int):
        if number == 0: return 0
        elif number == 1: return 1
        return self.fibonacci(number - 1) + self.fibonacci(number - 2)

    # No Cache Versions
    async def async_fibonacci_nc(self, number: int):
        if number == 0: return 0
        elif number == 1: return 1
        return await self.async_fibonacci_nc(number - 1) + await self.async_fibonacci_nc(number - 2)

    def fibonacci_nc(self, number: int):
        if number == 0: return 0
        elif number == 1: return 1
        return self.fibonacci_nc(number - 1) + self.fibonacci_nc(number - 2)


async def run_tests(
    start_n: int = 1,
    runs: int = 10,
    print_every: int = 5,
):

    """
    Test that both results are the same.
    """

    t = Timer(format_ms=True)
    o = TestObject()

    # Test Sync
    st = Timer(format_ms=True)
    for i in range(runs):
        r = o.fibonacci(start_n+i)
        d = st.duration_s
        if i % print_every == 0:
            logger.info(f'[Sync - {i}/{runs}] Result: {r} | Time: {d}')
    logger.info(f'[Sync] Cache Average Time: {st.total_average_s(runs)} | Total Time: {st.total_s}')
    logger.info(o.fibonacci.cache_info(), prefix = '[Sync] Cache Info')

    # Test Async
    at = Timer(format_ms=True)
    for i in range(runs):
        r = await o.async_fibonacci(start_n+i)
        d = at.duration_s
        if i % print_every == 0:
            logger.info(f'[Async - {i}/{runs}] Result: {r} | Time: {d}')
    logger.info(f'[Async] Cache Average Time: {at.total_average_s(runs)} | Total Time: {at.total_s}')
    logger.info(await o.async_fibonacci.cache_info(), prefix = '[Async] Cache Info')
    logger.info(t.total_s, prefix = 'Total Time')

    # Clear the Cache
    o.fibonacci.clear()
    logger.info(o.fibonacci.cache_info(), prefix = '[Sync] Cache Info')

    await o.async_fibonacci.clear()
    logger.info(await o.async_fibonacci.cache_info(), prefix = '[Async] Cache Info')

    logger.info('Testing Non-Cached Functions')
    t = Timer(format_ms=True)

    # Test Sync
    st = Timer(format_ms=True)
    for i in range(runs):
        r = o.fibonacci_nc(start_n+i)
        d = st.duration_s
        if i % print_every == 0:
            logger.info(f'[Sync - {i}/{runs}] Result: {r} | Time: {d}')
    logger.info(f'[Sync] Cache Average Time: {st.total_average_s(runs)} | Total Time: {st.total_s}')

    # Test Async
    at = Timer(format_ms=True)
    for i in range(runs):
        r = await o.async_fibonacci_nc(start_n+i)
        d = at.duration_s
        if i % print_every == 0:
            logger.info(f'[Async - {i}/{runs}] Result: {r} | Time: {d}')
    logger.info(f'[Async] Cache Average Time: {at.total_average_s(runs)} | Total Time: {at.total_s}')
    logger.info(t.total_s, prefix = 'Total Time')



if __name__ == '__main__':
    asyncio.run(run_tests(
        start_n = 5,
        runs = 40,
        print_every = 5,
    ))

API Reference

Bases: kvdb.configs.caching.KVDBCachifyConfig

The Cachify Config

acache_expirations async property

acache_expirations: typing.Dict[str, float]

Returns the expirations of the cache

acache_info async property

acache_info: typing.Dict[str, typing.Any]

Returns the info for the cache

acache_items async property

acache_items: typing.Dict[str, typing.Any]

Returns the items

acache_keyhits async property

acache_keyhits: typing.Dict[str, int]

Returns the keyhits of the cache

acache_keys async property

acache_keys: typing.List[str]

Returns the keys

acache_timestamps async property

acache_timestamps: typing.Dict[str, float]

Returns the timestamps of the cache

acache_values async property

acache_values: typing.List[typing.Any]

Returns the values

anum_hits async property

anum_hits: int

Returns the number of hits

anum_keys async property

anum_keys: int

Returns the number of keys

cache_expirations property

cache_expirations: typing.Dict[str, float]

Returns the expirations of the cache

cache_info property

cache_info: typing.Dict[str, typing.Any]

Returns the info for the cache

cache_items property

cache_items: typing.Dict[str, typing.Any]

Returns the items

cache_keyhits property

cache_keyhits: typing.Dict[str, int]

Returns the keyhits of the cache

cache_keys property

cache_keys: typing.List[str]

Returns the keys

cache_timestamps property

cache_timestamps: typing.Dict[str, float]

Returns the timestamps of the cache

cache_values property

cache_values: typing.List[typing.Any]

Returns the values

client property

client: 'ClientT'

Returns the client

data property

data: lazyops.libs.persistence.PersistentDict

Returns the persistent data

has_async_loop property

has_async_loop: bool

Checks if the current process is running in an async loop

has_post_call_hook property

has_post_call_hook: bool

Returns whether or not there is a post call hook

has_post_init_hook property

has_post_init_hook: bool

Returns whether or not there is a post init hook

is_enabled property

is_enabled: bool

Returns whether or not the cache is enabled [session is available]

num_default_keys property

num_default_keys: int

Returns the number of default keys

num_hits property

num_hits: int

Returns the number of hits

num_keys property

num_keys: int

Returns the number of keys

super_verbose property

super_verbose: bool

Returns whether or not the cache is super verbose

__call__

__call__(
    function: typing.Callable[
        kvdb.io.cachify.base.FuncP,
        kvdb.io.cachify.base.FuncT,
    ]
) -> typing.Callable[
    kvdb.io.cachify.base.FuncP,
    typing.Union[
        kvdb.io.cachify.base.FuncT,
        typing.Awaitable[kvdb.io.cachify.base.FuncT],
    ],
]

Performs the decorator

Source code in kvdb/io/cachify/base.py
def __call__(
    self,
    function: Callable[FuncP, FuncT]
) -> Callable[FuncP, Union[FuncT, Awaitable[FuncT]]]:
#     function: FunctionT,
# ) -> Callable[..., ReturnValueT]:
    """
    Performs the decorator
    """
    try:
        if not self.is_enabled: return function
    except Exception as e:
        logger.error(f'Error in Cachify Function {function.__name__}: {e}')
        return function
    if self.verbosity and self.verbosity > 4:
        logger.info(f'[{self.cache_field}] Cachifying Function: {get_function_name(function)} ({is_coro_func(function)}), {self.model_dump()}')
    if is_coro_func(function):
        return self.create_async_decorator(function)
    return self.create_sync_decorator(function)

aadd_hit async

aadd_hit()

Adds a hit to the cache

Source code in kvdb/io/cachify/base.py
async def aadd_hit(self):
    """
    Adds a hit to the cache
    """
    with self.safely():
        if not await self.data.acontains('hits'): await self.data.aset('hits', 0)
        try:
            self.data['hits'] += 1
        except Exception as e:
            await self.data.aset('hits', 1)

aadd_key_expiration async

aadd_key_expiration(key: str, ttl: int)

Adds an expiration to the cache key

Source code in kvdb/io/cachify/base.py
async def aadd_key_expiration(self, key: str, ttl: int):
    """
    Adds an expiration to the cache key
    """
    if ttl is None: return
    with self.safely():
        if self.hset_enabled:
            expirations = await self.data.aget('expirations', {})
            expirations[key] = time.time() + ttl
            await self.data.aset('expirations', expirations)
            return
        await self._expire(key, ttl)

aadd_key_hit async

aadd_key_hit(key: str)

Adds a hit to the cache key

Source code in kvdb/io/cachify/base.py
async def aadd_key_hit(self, key: str):
    """
    Adds a hit to the cache key
    """
    with self.safely():
        key_hits = await self.data.aget('keyhits', {}) #  or {}
        if key not in key_hits: key_hits[key] = 0
        key_hits[key] += 1
        await self.data.aset('keyhits', key_hits)

aadd_key_timestamp async

aadd_key_timestamp(key: str)

Adds a timestamp to the cache key

Source code in kvdb/io/cachify/base.py
async def aadd_key_timestamp(self, key: str):
    """
    Adds a timestamp to the cache key
    """
    with self.safely():
        timestamps = await self.data.aget('timestamps', {})
        timestamps[key] = time.time()
        await self.data.aset('timestamps', timestamps)

abuild_hash_key async

abuild_hash_key(*args, **kwargs) -> str

Builds the key for the function

Source code in kvdb/io/cachify/base.py
async def abuild_hash_key(self, *args, **kwargs) -> str:
    """
    Builds the key for the function
    """

    hash_func = self.keybuilder or create_cache_key_from_kwargs
    return await ThreadPooler.asyncish(
        hash_func, 
        # base = self.prefix,
        args = args, 
        kwargs = kwargs, 
        typed = self.typed, 
        exclude_keys = self.exclude_keys,
        exclude_null = self.exclude_null_values_in_hash,
        exclude_defaults = self.exclude_default_values_in_hash,
        is_classmethod = self.is_class_method,
    )

abuild_hash_name async

abuild_hash_name(
    func: typing.Callable, *args, **kwargs
) -> str

Builds the name for the function

Source code in kvdb/io/cachify/base.py
async def abuild_hash_name(self, func: Callable, *args, **kwargs) -> str:
    """
    Builds the name for the function
    """
    if self.cache_field is not None: return self.cache_field
    if self.name: self.cache_field = await ThreadPooler.asyncish(self.name, func, *args, **kwargs) if callable(self.name) else self.name
    else: 
        self.cache_field = full_name(func)
        if self.prefix: self.cache_field = f'{self.prefix}:{self.cache_field}'
    return self.cache_field

acheck_cache_policies async

acheck_cache_policies(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Runs the cache policies

Source code in kvdb/io/cachify/base.py
async def acheck_cache_policies(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    # sourcery skip: low-code-quality
    """
    Runs the cache policies
    """
    if await self.anum_keys <= self.cache_max_size: return
    num_keys = await self.anum_keys
    if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field}] Cache Max Size Reached: {num_keys}/{self.cache_max_size}. Running Cache Policy: {self.cache_max_size_policy}')
    if self.cache_max_size_policy == CachePolicy.LRU:
        # Least Recently Used
        timestamps = await self.data.aget('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get, reverse=True)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - LRU] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        await self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.LFU:
        # Least Frequently Used
        key_hits = await self.data.aget('keyhits', {})
        keys_to_delete = sorted(key_hits, key = key_hits.get)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - LFU] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        await self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.FIFO:
        # First In First Out
        timestamps = await self.data.aget('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get, reverse = True)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - FIFO] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        await self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.LIFO:
        # Last In First Out
        timestamps = await self.data.aget('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - LIFO] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        await self.clear(keys_to_delete)
        return

add_hit

add_hit()

Adds a hit to the cache

Source code in kvdb/io/cachify/base.py
def add_hit(self):
    """
    Adds a hit to the cache
    """
    with self.safely():
        if not self.data.contains('hits'): self.data.set('hits', 0)
        try:
            self.data['hits'] += 1
        except Exception as e:
            self.data.set('hits', 1)

add_key_expiration

add_key_expiration(key: str, ttl: int)

Adds an expiration to the cache key

Source code in kvdb/io/cachify/base.py
def add_key_expiration(self, key: str, ttl: int):
    """
    Adds an expiration to the cache key
    """
    if ttl is None: return
    with self.safely():
        if self.hset_enabled:
            expirations = self.data.get('expirations', {})
            expirations[key] = time.time() + ttl
            self.data['expirations'] = expirations
            return
        self._expire(key, ttl)

add_key_hit

add_key_hit(key: str)

Adds a hit to the cache key

Source code in kvdb/io/cachify/base.py
def add_key_hit(self, key: str):
    """
    Adds a hit to the cache key
    """
    with self.safely():
        key_hits = self.data.get('keyhits', {})
        if key not in key_hits: key_hits[key] = 0
        key_hits[key] += 1
        self.data['keyhits'] = key_hits

add_key_timestamp

add_key_timestamp(key: str)

Adds a timestamp to the cache key

Source code in kvdb/io/cachify/base.py
def add_key_timestamp(self, key: str):
    """
    Adds a timestamp to the cache key
    """
    with self.safely():
        timestamps = self.data.get('timestamps', {})
        timestamps[key] = time.time()
        self.data['timestamps'] = timestamps

aexpire_cache_expired_keys async

aexpire_cache_expired_keys()

Expires the cache keys

Source code in kvdb/io/cachify/base.py
async def aexpire_cache_expired_keys(self):
    """
    Expires the cache keys
    """
    with self.safely():
        expirations = await self.data.aget('expirations', {})
        if not isinstance(expirations, dict): expirations = {}
        to_delete = [
            key
            for key, expiration in expirations.items()
            if time.time() > expiration
        ]
        if to_delete: 
            keyhits = await self.data.aget('keyhits', {})
            if not isinstance(keyhits, dict): keyhits = {}
            timestamps = await self.data.aget('timestamps', {})
            if not isinstance(timestamps, dict): timestamps = {}
            for key in to_delete:
                keyhits.pop(key, None)
                timestamps.pop(key, None)
                expirations.pop(key, None)
            await self.data.aset('expirations', expirations)
            await self.data.aset('keyhits', keyhits)
            await self.data.aset('timestamps', timestamps)
            if self.verbosity and not self.is_silenced('expire'): logger.info(f'[{self.cache_field}] Deleting {len(to_delete)} Expired Keys: {to_delete}')
            await self.clear(to_delete)

aretrieve async

aretrieve(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> typing.Any

Retrieves the value from the cache

Source code in kvdb/io/cachify/base.py
async def aretrieve(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> Any:
    """
    Retrieves the value from the cache
    """
    if await self.ashould_overwrite(*args, cache_kwargs = cache_kwargs, **kwargs): 
        if self.super_verbose and not self.is_silenced('cache_ovewrite', 'cache', 'retrieve'): logger.info(f'[{self.cache_field}:{key}] Overwriting Cache')
        return ENOVAL
    value = None
    try:
        with self.safely():
            if not await self._exists(key):
                if self.super_verbose and not self.is_silenced('cache_miss', 'cache', 'retrieve'): logger.info(f'[{self.cache_field}:{key}] Not Found')
                return ENOVAL
            value = await self._get(key)
        if value is None: return ENOVAL

    except TimeoutError:
        if self.super_verbose: logger.error(f'[{self.cache_field}:{key}] Retrieve Timeout')
        return ENOVAL

    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Retrieve Exception', error = e)
        return ENOVAL

    if not self.disable_background_tasks:
        ThreadPooler.background_task(self.avalidate_cache_policies, key, *args, cache_kwargs = cache_kwargs, **kwargs)
    else:
        await self.avalidate_cache_policies(key, *args, cache_kwargs = cache_kwargs, **kwargs)
    try:
        result = self.decode_hit(value, *args, **kwargs)
        if result is not None: return result
    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Decode Exception', error = e)
    return ENOVAL

arun_cache_operation async

arun_cache_operation(
    cache_key: str,
    value: typing.Any,
    *args,
    cachify_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> None

Runs the cache operation

Source code in kvdb/io/cachify/base.py
async def arun_cache_operation(self, cache_key: str, value: Any, *args, cachify_kwargs: Dict[str, Any] = None, **kwargs) -> None:
    """
    Runs the cache operation
    """
    if self.disable_background_tasks:
        return await self._arun_cache_operation(cache_key, value, *args, cachify_kwargs = cachify_kwargs, **kwargs)
    ThreadPooler.background_task(self._arun_cache_operation, cache_key, value, *args, cachify_kwargs = cachify_kwargs, **kwargs)

arun_post_call_hook async

arun_post_call_hook(
    result: typing.Any,
    *args,
    is_hit: typing.Optional[bool] = None,
    **kwargs
) -> None

Runs the post call hook which fires after the function is called

Source code in kvdb/io/cachify/base.py
async def arun_post_call_hook(self, result: Any, *args, is_hit: Optional[bool] = None, **kwargs) -> None:
    """
    Runs the post call hook which fires after the function is called
    """
    if not self.has_post_call_hook: return
    if self.super_verbose and not self.is_silenced('post_call'): logger.info(f'[{self.cache_field}] Running Post Call Hook')
    ThreadPooler.background_task(self.post_call_hook, result, *args, is_hit = is_hit, **kwargs)

arun_post_init_hook async

arun_post_init_hook(
    func: typing.Callable, *args, **kwargs
) -> None

Runs the post init hook which fires once after the function is initialized

Source code in kvdb/io/cachify/base.py
async def arun_post_init_hook(self, func: Callable, *args, **kwargs) -> None:
    """
    Runs the post init hook which fires once after the function is initialized
    """
    if not self.has_post_init_hook: return
    if self.has_ran_post_init_hook: return
    if self.verbosity and not self.is_silenced('post_init'): logger.info(f'[{self.cache_field}] Running Post Init Hook')
    ThreadPooler.background_task(self.post_init_hook, func, *args, **kwargs)
    self.has_ran_post_init_hook = True

aset async

aset(
    key: str,
    value: typing.Any,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Sets the value in the cache

Source code in kvdb/io/cachify/base.py
async def aset(self, key: str, value: Any, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    """
    Sets the value in the cache
    """
    try:
        with self.safely():
            await self._set(key, self.encode_hit(value, *args, **kwargs))
            await self.aadd_key_expiration(key, (cache_kwargs.get('ttl') or self.ttl))
    except TimeoutError:
        if self.super_verbose: logger.error(f'[{self.cache_field}:{key}] Set Timeout')
    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Set Exception: {value}', error = e)

ashould_cache_value async

ashould_cache_value(
    val: typing.Any,
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the value should be cached

Source code in kvdb/io/cachify/base.py
async def ashould_cache_value(self, val: Any,  *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the value should be cached
    """
    if self.exclude_null and val is None: return False
    if self.exclude_exceptions:
        if isinstance(self.exclude_exceptions, list): 
            return not isinstance(val, tuple(self.exclude_exceptions))
        if isinstance(val, Exception): return False
    if self.exclude_kws and cache_kwargs.get('exclude') is True: return False
    if self.exclude_if is not None: return not (await ThreadPooler.asyncish(self.exclude_if, val, *args, **kwargs))
    return True

ashould_disable async

ashould_disable(
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the function should be cached

Source code in kvdb/io/cachify/base.py
async def ashould_disable(self, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the function should be cached
    """
    if self.disabled is not None: self.disabled
    if self.disabled_kws and cache_kwargs.get('disabled') is True: return True
    return await ThreadPooler.asyncish(self.disabled, *args, **kwargs) if callable(self.disabled) else False

ashould_invalidate async

ashould_invalidate(
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the function should be invalidated

Source code in kvdb/io/cachify/base.py
async def ashould_invalidate(self, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the function should be invalidated
    """
    if self.invalidate_if is not None: return await ThreadPooler.asyncish(self.invalidate_if, *args, **kwargs)
    if self.invalidate_kws and cache_kwargs.get('invalidate') is True: return True
    if self.invalidate_after is not None: 
        _hits = await self.anum_hits
        if isinstance(self.invalidate_after, int):
            return _hits is not None and _hits >= self.invalidate_after
        # if _hits and isinstance(self.invalidate_after, int):
        #     return _hits >= self.invalidate_after
        return await ThreadPooler.asyncish(self.invalidate_after, *args, _hits = _hits, **kwargs)
    return False

ashould_overwrite async

ashould_overwrite(
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the value should be overwritten which is based on the overwrite_if function

Source code in kvdb/io/cachify/base.py
async def ashould_overwrite(self, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the value should be overwritten
    which is based on the overwrite_if function
    """
    if self.overwrite_if is not None: 
        return await ThreadPooler.asyncish(self.overwrite_if, *args, **kwargs)
    if self.overwrite_kws and cache_kwargs.get('overwrite') is True: return True
    return False

avalidate_cache_policies async

avalidate_cache_policies(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Runs the cache policies

Source code in kvdb/io/cachify/base.py
async def avalidate_cache_policies(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    """
    Runs the cache policies
    """
    await self.aadd_hit()
    await self.aexpire_cache_expired_keys()
    if not self.hset_enabled or self.cache_max_size is None: return
    await self.aadd_key_timestamp(key)
    await self.aadd_key_hit(key)
    await self.acheck_cache_policies(key, *args, cache_kwargs = cache_kwargs, **kwargs)

build_hash_key

build_hash_key(*args, **kwargs) -> str

Builds the key for the function

Source code in kvdb/io/cachify/base.py
def build_hash_key(self, *args, **kwargs) -> str:
    """
    Builds the key for the function
    """
    hash_func = self.keybuilder or create_cache_key_from_kwargs
    return hash_func(
        # base = self.prefix,
        args = args, 
        kwargs = kwargs, 
        typed = self.typed, 
        exclude_keys = self.exclude_keys,
        exclude_null = self.exclude_null_values_in_hash,
        exclude_defaults = self.exclude_default_values_in_hash,
        is_classmethod = self.is_class_method,
    )

build_hash_name

build_hash_name(
    func: typing.Callable, *args, **kwargs
) -> str

Builds the name for the function

Source code in kvdb/io/cachify/base.py
def build_hash_name(self, func: Callable, *args, **kwargs) -> str:
    """
    Builds the name for the function
    """
    if self.cache_field is not None: return self.cache_field
    if self.name: self.cache_field = self.name(func, *args, **kwargs) if callable(self.name) else self.name
    else: 
        self.cache_field = full_name(func)
        if self.prefix: self.cache_field = f'{self.prefix}:{self.cache_field}'
    return self.cache_field

check_cache_policies

check_cache_policies(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Runs the cache policies

Source code in kvdb/io/cachify/base.py
def check_cache_policies(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    # sourcery skip: low-code-quality
    """
    Runs the cache policies
    """
    if self.num_keys <= self.cache_max_size: return
    num_keys = self.num_keys
    if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field}] Cache Max Size Reached: {num_keys}/{self.cache_max_size}. Running Cache Policy: {self.cache_max_size_policy}')
    if self.cache_max_size_policy == CachePolicy.LRU:
        # Least Recently Used
        timestamps = self.data.get('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field}- LRU] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.LFU:
        # Least Frequently Used
        key_hits = self.data.get('keyhits', {})
        keys_to_delete = sorted(key_hits, key = key_hits.get)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - LFU] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.FIFO:
        # First In First Out
        timestamps = self.data.get('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get, reverse = True)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - FIFO] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        self.clear(keys_to_delete)
        return

    if self.cache_max_size_policy == CachePolicy.LIFO:
        # Last In First Out
        timestamps = self.data.get('timestamps', {})
        keys_to_delete = sorted(timestamps, key = timestamps.get)[:num_keys - self.cache_max_size]
        if key in keys_to_delete: keys_to_delete.remove(key)
        if self.verbosity and not self.is_silenced('cache_policy'): logger.info(f'[{self.cache_field} - LIFO] Deleting {len(keys_to_delete)} Keys: {keys_to_delete}')
        self.clear(keys_to_delete)
        return

clear

clear(
    keys: typing.Union[str, typing.List[str]] = None
) -> typing.Optional[int]

Clears the cache

Source code in kvdb/io/cachify/base.py
def clear(self, keys: Union[str, List[str]] = None) -> Optional[int]:
    """
    Clears the cache
    """
    with self.safely():
        if keys: 
            if self.hset_enabled: return self.client.hdel(self.cache_field, *keys)
            return self.client.delete(*[self.get_key(k) for k in keys])
        self.data.clear()
        if self.hset_enabled: 
            return self.client.delete(self.cache_field)
        keys = self.client.keys(self.get_key(self.cache_field, '*'))
        return self.client.delete(*keys)

create_async_decorator

create_async_decorator(
    func: typing.Callable[
        kvdb.io.cachify.base.FuncP,
        kvdb.io.cachify.base.FuncT,
    ]
) -> typing.Callable[
    kvdb.io.cachify.base.FuncP,
    typing.Awaitable[kvdb.io.cachify.base.FuncT],
]

Creates the async wrapper

Source code in kvdb/io/cachify/base.py
def create_async_decorator(
    self,
    func: Callable[FuncP, FuncT]
) -> Callable[FuncP, Awaitable[FuncT]]:
    # func: FunctionT,
# ) -> Callable[..., ReturnValueT]:
    """
    Creates the async wrapper
    """
    self.is_async = True
    if self.retry_enabled:
        _retry_func_wrapper = functools.partial(
            backoff.on_exception,
            backoff.expo, 
            exception = Exception, 
            giveup = self.retry_giveup_callable,
            factor = 5,
        )
        func = _retry_func_wrapper(max_tries = self.retry_max_attempts + 1)(func)

    _current_cache_key = None
    _current_was_cached = False

    async def is_session_available():
        if self.session_available is None:
            with contextlib.suppress(Exception):
                with anyio.move_on_after(1.0):
                    if await self.session.aping():
                        self.session_available = True
        return self.session_available



    @functools.wraps(func)
    async def wrapper(*args: FuncP.args, **kwargs: FuncP.kwargs) -> FuncT:
        """
        Inner wrapper
        """
        nonlocal _current_cache_key, _current_was_cached
        if not await is_session_available():
            with contextlib.suppress(Exception):
                return await timed_cache(secs = self.ttl)(func)(*args, **kwargs)
            return await func(*args, **kwargs)

        # Set the cache field
        await self.abuild_hash_name(func, *args, **kwargs)
        self.validate_is_class_method(func)
        await self.arun_post_init_hook(func, *args, **kwargs)
        cachify_kwargs, kwargs = self.extract_cache_kwargs(**kwargs)

        # Check if we should disable the cache
        if await self.ashould_disable(*args, cache_kwargs = cachify_kwargs, **kwargs):
            if self.super_verbose and not self.is_silenced('cache_disable', 'cache'): logger.info('Disabling', prefix = self.cache_field, colored = True)
            return await func(*args, **kwargs)

        # Get the cache key
        cache_key = await wrapper.__cache_key__(*args, **kwargs)
        _current_cache_key = cache_key

        # Check if we should invalidate
        if await self.ashould_invalidate(*args, cache_kwargs = cachify_kwargs, **kwargs):
            if self.verbosity and not self.is_silenced('cache_invalidate', 'cache'): logger.info('Invalidating', prefix = f'{self.cache_field}:{cache_key}', colored = True)
            await self.invalidate_cache(cache_key)

        # Check if we have a cache hit
        value = await self.aretrieve(cache_key, *args, cache_kwargs = cachify_kwargs, **kwargs)
        if value == ENOVAL:
            if self.super_verbose and not self.is_silenced('cache_miss', 'cache'): logger.info('Cache Miss', prefix = f'{self.cache_field}:{cache_key}', colored = True)
            try:
                value = await func(*args, **kwargs)
                await self.arun_cache_operation(cache_key, value, *args, cachify_kwargs = cachify_kwargs, **kwargs)
                # if await self.ashould_cache_value(value, *args, cache_kwargs = cachify_kwargs, **kwargs):
                #     if self.super_verbose and not self.is_silenced('cache_value', 'cache'): logger.info('Caching Value', prefix = f'{self.cache_field}:{cache_key}', colored = True)
                #     await self.aset(cache_key, value, *args, cache_kwargs = cachify_kwargs, **kwargs)
                # await self.arun_post_call_hook(value, *args, is_hit = False, **kwargs)
                return value

            except Exception as e:
                if self.verbosity: logger.trace(f'[{self.cache_field}:{cache_key}] Exception', error = e)
                if self.raise_exceptions and e is not None: raise e
                return None

        _current_was_cached = True
        if self.super_verbose and not self.is_silenced('cache_hit', 'cache'): logger.info('Cache Hit', prefix = f'{self.cache_field}:{cache_key}', colored = True)
        await self.arun_post_call_hook(value, *args, is_hit = True, **kwargs)
        return value

    async def __cache_key__(*args, **kwargs) -> str:
        """
        Returns the cache key
        """
        return await self.abuild_hash_key(*args, **kwargs)

    def is_cached() -> bool:
        """
        Returns whether or not the function is cached
        """
        return self._exists(_current_cache_key)

    def was_cached() -> bool:
        """
        Returns whether or not the function was cached
        """
        return _current_was_cached


    async def clear(keys: Optional[Union[str, List[str]]] = None, **kwargs) -> Optional[int]:
        """
        Clears the cache
        """
        return await self.clear(keys = keys)

    async def num_hits(*args, **kwargs) -> int:
        """
        Returns the number of hits
        """
        return await self.anum_hits

    async def num_keys(**kwargs) -> int:
        """
        Returns the number of keys
        """
        return await self.anum_keys

    async def cache_keys(**kwargs) -> List[str]:
        """
        Returns the keys
        """
        return await self.acache_keys

    async def cache_values(**kwargs) -> List[Any]:
        """
        Returns the values
        """
        return await self.acache_values

    async def cache_items(**kwargs) -> Dict[str, Any]:
        """
        Returns the items
        """
        return await self.acache_items

    async def invalidate_key(key: str, **kwargs) -> int:
        """
        Invalidates the cache
        """
        return await self.invalidate_cache(key)

    async def cache_timestamps(**kwargs) -> Dict[str, float]:
        """
        Returns the timestamps
        """
        return await self.acache_timestamps

    async def cache_keyhits(**kwargs) -> Dict[str, int]:
        """
        Returns the keyhits
        """
        return await self.acache_keyhits

    async def cache_policy(**kwargs) -> Dict[str, Union[int, CachePolicy]]:
        """
        Returns the cache policy
        """
        return {
            'max_size': self.cache_max_size,
            'max_size_policy': self.cache_max_size_policy,
        }

    async def cache_config(**kwargs) -> Dict[str, Any]:
        """
        Returns the cache config
        """
        values = self.model_dump(exclude = {'session'})
        for k, v in values.items():
            if callable(v): values[k] = get_function_name(v)
        return values

    async def cache_info(**kwargs) -> Dict[str, Any]:
        """
        Returns the info for the cache
        """
        return await self.acache_info

    async def cache_update(**kwargs) -> Dict[str, Any]:
        """
        Updates the cache config
        """
        self.update(**kwargs)
        return await cache_config(**kwargs)


    wrapper.__cache_key__ = __cache_key__
    wrapper.is_cached = is_cached
    wrapper.was_cached = was_cached
    wrapper.clear = clear
    wrapper.num_hits = num_hits
    wrapper.num_keys = num_keys
    wrapper.cache_keys = cache_keys
    wrapper.cache_values = cache_values
    wrapper.cache_items = cache_items
    wrapper.invalidate_key = invalidate_key
    wrapper.cache_timestamps = cache_timestamps
    wrapper.cache_keyhits = cache_keyhits
    wrapper.cache_policy = cache_policy
    wrapper.cache_config = cache_config
    wrapper.cache_info = cache_info
    wrapper.cache_update = cache_update
    return wrapper

create_sync_decorator

create_sync_decorator(
    func: typing.Callable[
        kvdb.io.cachify.base.FuncP,
        kvdb.io.cachify.base.FuncT,
    ]
) -> typing.Callable[
    kvdb.io.cachify.base.FuncP, kvdb.io.cachify.base.FuncT
]

Creates the sync wrapper

Source code in kvdb/io/cachify/base.py
def create_sync_decorator(self, func: Callable[FuncP, FuncT]) -> Callable[FuncP, FuncT]:
    """
    Creates the sync wrapper
    """
    self.is_async = False
    if self.retry_enabled:
        _retry_func_wrapper = functools.partial(
            backoff.on_exception,
            backoff.expo, 
            exception = Exception, 
            giveup = self.retry_giveup_callable,
            factor = 5,
        )
        func = _retry_func_wrapper(max_tries = self.retry_max_attempts + 1)(func)

    _current_cache_key = None
    _current_was_cached = False

    def is_session_available():
        if self.session_available is None:
            with contextlib.suppress(Exception):
                self.session.ping()
                self.session_available = True
        return self.session_available


    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        """
        Inner wrapper
        """
        nonlocal _current_cache_key, _current_was_cached

        if not is_session_available():
            with contextlib.suppress(Exception):
                return timed_cache(secs = self.ttl)(func)(*args, **kwargs)
            return func(*args, **kwargs)

        # Set the cache field
        self.build_hash_name(func, *args, **kwargs)
        self.validate_is_class_method(func)
        self.run_post_init_hook(func, *args, **kwargs)

        cachify_kwargs, kwargs = self.extract_cache_kwargs(**kwargs)

        # Check if we should disable the cache
        if self.should_disable(*args, cache_kwargs = cachify_kwargs, **kwargs):
            if self.super_verbose and not self.is_silenced('cache_disable', 'cache'): logger.info('Disabling', prefix = self.cache_field, colored = True)
            return func(*args, **kwargs)

        # Get the cache key
        cache_key = wrapper.__cache_key__(*args, **kwargs)
        _current_cache_key = cache_key


        # Check if we should invalidate
        if self.should_invalidate(*args, cache_kwargs = cachify_kwargs, **kwargs):
            if self.verbosity and not self.is_silenced('cache_invalidate', 'cache'): logger.info('Invalidating', prefix = f'{self.cache_field}:{cache_key}', colored = True)
            self.invalidate_cache(cache_key)

        # Check if we have a cache hit
        value = self.retrieve(cache_key, *args, cache_kwargs = cachify_kwargs, **kwargs)
        if value == ENOVAL:
            if self.super_verbose and not self.is_silenced('cache_miss', 'cache'): logger.info('Cache Miss', prefix = f'{self.cache_field}:{cache_key}', colored = True)
            try:
                value = func(*args, **kwargs)
                if self.should_cache_value(value, *args, cache_kwargs = cachify_kwargs, **kwargs):
                    if self.super_verbose and not self.is_silenced('cache_value', 'cache'): logger.info('Caching Value', prefix = f'{self.cache_field}:{cache_key}', colored = True)
                    self.set(cache_key, value, *args, cache_kwargs = cachify_kwargs, **kwargs)

                self.run_post_call_hook(value, *args, is_hit = False, **kwargs)
                return value

            except Exception as e:
                if self.verbosity: logger.trace(f'[{self.cache_field}:{cache_key}] Exception', error = e)
                if self.raise_exceptions and e is not None: raise e
                return None
        _current_was_cached = True
        if self.super_verbose and not self.is_silenced('cache_hit', 'cache'): logger.info('Cache Hit', prefix = f'{self.cache_field}:{cache_key}', colored = True)
        self.run_post_call_hook(value, *args, is_hit = True, **kwargs)
        return value

    def __cache_key__(*args, **kwargs) -> str:
        """
        Returns the cache key
        """
        return self.build_hash_key(*args, **kwargs)

    def is_cached() -> bool:
        """
        Returns whether or not the function is cached
        """
        return self._exists(_current_cache_key)

    def was_cached() -> bool:
        """
        Returns whether or not the function was cached
        """
        return _current_was_cached

    def clear(keys: Optional[Union[str, List[str]]] = None, **kwargs) -> Optional[int]:
        """
        Clears the cache
        """
        return self.clear(keys = keys)

    def num_hits(*args, **kwargs) -> int:
        """
        Returns the number of hits
        """
        return self.num_hits

    def num_keys(**kwargs) -> int:
        """
        Returns the number of keys
        """
        return self.num_keys

    def cache_keys(**kwargs) -> List[str]:
        """
        Returns the keys
        """
        return self.cache_keys

    def cache_values(**kwargs) -> List[Any]:
        """
        Returns the values
        """
        return self.cache_values

    def cache_items(**kwargs) -> Dict[str, Any]:
        """
        Returns the items
        """
        return self.cache_items

    def invalidate_key(key: str, **kwargs) -> int:
        """
        Invalidates the cache
        """
        return self.invalidate_cache(key)

    def cache_timestamps(**kwargs) -> Dict[str, float]:
        """
        Returns the timestamps
        """
        return self.cache_timestamps

    def cache_keyhits(**kwargs) -> Dict[str, int]:
        """
        Returns the keyhits
        """
        return self.cache_keyhits

    def cache_policy(**kwargs) -> Dict[str, Union[int, CachePolicy]]:
        """
        Returns the cache policy
        """
        return {
            'max_size': self.cache_max_size,
            'max_size_policy': self.cache_max_size_policy,
        }

    def cache_config(**kwargs) -> Dict[str, Any]:
        """
        Returns the cache config
        """
        values = self.model_dump(exclude = {'session'})
        for k, v in values.items():
            if callable(v): values[k] = get_function_name(v)
        return values

    def cache_info(**kwargs) -> Dict[str, Any]:
        """
        Returns the info for the cache
        """
        return self.cache_info

    def cache_update(**kwargs) -> Dict[str, Any]:
        """
        Updates the cache config
        """
        self.update(**kwargs)
        return cache_config(**kwargs)

    wrapper.__cache_key__ = __cache_key__
    wrapper.is_cached = is_cached
    wrapper.was_cached = was_cached
    wrapper.clear = clear
    wrapper.num_hits = num_hits
    wrapper.num_keys = num_keys
    wrapper.cache_keys = cache_keys
    wrapper.cache_values = cache_values
    wrapper.cache_items = cache_items
    wrapper.invalidate_key = invalidate_key
    wrapper.cache_timestamps = cache_timestamps
    wrapper.cache_keyhits = cache_keyhits
    wrapper.cache_policy = cache_policy
    wrapper.cache_config = cache_config
    wrapper.cache_info = cache_info
    wrapper.cache_update = cache_update
    return wrapper

decode

decode(value: bytes) -> typing.Any

Decodes the value

Source code in kvdb/io/cachify/base.py
def decode(self, value: bytes) -> Any:
    """
    Decodes the value
    """
    if self.session.session_serialization_enabled: return value
    return self.decoder(value)

decode_hit

decode_hit(value: bytes, *args, **kwargs) -> typing.Any

Decodes the hit

Source code in kvdb/io/cachify/base.py
def decode_hit(self, value: bytes, *args, **kwargs) -> Any:
    """
    Decodes the hit
    """
    value = self.decode(value)
    if self.hit_getter is not None: 
        value = self.hit_getter(value, *args, **kwargs)
    return value

encode

encode(value: typing.Any) -> bytes

Encodes the value

Source code in kvdb/io/cachify/base.py
def encode(self, value: Any) -> bytes:
    """
    Encodes the value
    """
    if self.session.session_serialization_enabled: return value
    return self.encoder(value)

encode_hit

encode_hit(value: typing.Any, *args, **kwargs) -> bytes

Encodes the hit

Source code in kvdb/io/cachify/base.py
def encode_hit(self, value: Any, *args, **kwargs) -> bytes:
    """
    Encodes the hit
    """
    if self.hit_setter is not None: 
        value = self.hit_setter(value, *args, **kwargs)
    return self.encode(value)

expire_cache_expired_keys

expire_cache_expired_keys()

Expires the cache keys

Source code in kvdb/io/cachify/base.py
def expire_cache_expired_keys(self):  # sourcery skip: extract-method
    """
    Expires the cache keys
    """
    with self.safely():
        expirations = self.data.get('expirations', {})
        if not isinstance(expirations, dict): expirations = {}
        to_delete = [
            key
            for key, expiration in expirations.items()
            if time.time() > expiration
        ]
        if to_delete: 
            keyhits = self.data.get('keyhits', {})
            if not isinstance(keyhits, dict): keyhits = {}
            timestamps = self.data.get('timestamps', {})
            if not isinstance(timestamps, dict): timestamps = {}
            for key in to_delete:
                keyhits.pop(key, None)
                timestamps.pop(key, None)
                expirations.pop(key, None)
            self.data['expirations'] = expirations
            self.data['keyhits'] = keyhits
            self.data['timestamps'] = timestamps
            if self.verbosity and not self.is_silenced('expire'): 
                logger.info(f'[{self.cache_field}] Deleting {len(to_delete)} Expired Keys: {to_delete}')
            self.clear(to_delete)

extract_cache_kwargs

extract_cache_kwargs(
    **kwargs,
) -> typing.Tuple[
    typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ],
    typing.Dict[str, typing.Any],
]

Extracts the cache kwargs from the kwargs

Returns the cache kwargs and the remaining kwargs

Source code in kvdb/io/cachify/base.py
def extract_cache_kwargs(self, **kwargs) -> Tuple[Dict[str, Union[bool, int, float, Any]], Dict[str, Any]]:
    """
    Extracts the cache kwargs from the kwargs

    Returns the cache kwargs and the remaining kwargs
    """
    cache_kwargs = {}
    if self.disabled_kws:
        for kw in self.disabled_kws:
            if kw in kwargs:
                cache_kwargs['disabled'] = kwargs.pop(kw)
                break
    if self.invalidate_kws:
        for kw in self.invalidate_kws:
            if kw in kwargs:
                cache_kwargs['invalidate'] = kwargs.pop(kw)
                break
    if self.overwrite_kws:
        for kw in self.overwrite_kws:
            if kw in kwargs:
                cache_kwargs['overwrite'] = kwargs.pop(kw)
                break
    if self.ttl_kws:
        for kw in self.ttl_kws:
            if kw in kwargs:
                cache_kwargs['ttl'] = kwargs.pop(kw)
                break
    if self.exclude_kws:
        for kw in self.exclude_kws:
            if kw in kwargs:
                cache_kwargs['exclude'] = kwargs.pop(kw)
                break
    return cache_kwargs, kwargs

extract_config_and_kwargs classmethod

extract_config_and_kwargs(
    _prefix: typing.Optional[str] = None,
    _include_prefix: typing.Optional[bool] = None,
    _include: typing.Optional[set[str]] = None,
    _exclude: typing.Optional[set[str]] = None,
    _exclude_none: typing.Optional[bool] = True,
    **kwargs
) -> typing.Tuple[
    typing.Dict[str, typing.Any],
    typing.Dict[str, typing.Any],
]

Extract the config that are valid for this model and returns the config and kwargs

RETURNS DESCRIPTION
typing.Tuple[typing.Dict[str, typing.Any], typing.Dict[str, typing.Any]]

Tuple[Dict[str, Any], Dict[str, Any]]: The config and deduplicated kwargs

Source code in kvdb/types/base.py
@classmethod
def extract_config_and_kwargs(
    cls,
    _prefix: Optional[str] = None,
    _include_prefix: Optional[bool] = None,
    _include: Optional[set[str]] = None,
    _exclude: Optional[set[str]] = None,
    _exclude_none: Optional[bool] = True,
    **kwargs
) -> Tuple[Dict[str, Any], Dict[str, Any]]:
    """
    Extract the config that are valid for this model and returns
    the config and kwargs

    Returns:
        Tuple[Dict[str, Any], Dict[str, Any]]: The config and deduplicated kwargs
    """
    config = cls.extract_kwargs(
        _prefix = _prefix,
        _include_prefix = _include_prefix,
        _include = _include,
        _exclude = _exclude,
        _exclude_none = _exclude_none,
        **kwargs
    )
    kwargs = {k: v for k, v in kwargs.items() if k not in config}
    if _prefix: kwargs = {k: v for k, v in kwargs.items() if f'{_prefix}{k}' not in config}
    return config, kwargs

extract_kwargs classmethod

extract_kwargs(
    _prefix: typing.Optional[str] = None,
    _include_prefix: typing.Optional[bool] = None,
    _include: typing.Optional[set[str]] = None,
    _exclude: typing.Optional[set[str]] = None,
    _exclude_none: typing.Optional[bool] = True,
    **kwargs
) -> typing.Dict[str, typing.Any]

Extract the kwargs that are valid for this model

Source code in kvdb/types/base.py
@classmethod
def extract_kwargs(
    cls, 
    _prefix: Optional[str] = None,
    _include_prefix: Optional[bool] = None,
    _include: Optional[set[str]] = None,
    _exclude: Optional[set[str]] = None,
    _exclude_none: Optional[bool] = True,
    **kwargs
) -> Dict[str, Any]:
    """
    Extract the kwargs that are valid for this model
    """
    if _prefix:
        _kwargs = {(k if _include_prefix else k.replace(_prefix, '')): v for k, v in kwargs.items() if k.startswith(_prefix) and k.replace(_prefix, '') in cls.__fields__}
    else:
        _kwargs = {k: v for k, v in kwargs.items() if k in cls.__fields__}
    if _exclude_none: _kwargs = {k: v for k, v in _kwargs.items() if v is not None}
    if _include is not None: 
        _extra_kwargs = {k: v for k, v in _kwargs.items() if k in _include}
        _kwargs.update(_extra_kwargs)
    if _exclude is not None: _kwargs = {k: v for k, v in _kwargs.items() if k not in _exclude}
    return _kwargs

get_encoder

get_encoder(
    serializer: typing.Optional[str] = None,
    serializer_enabled: typing.Optional[bool] = True,
    serializer_kwargs: typing.Optional[
        typing.Dict[str, typing.Any]
    ] = None,
    compression: typing.Optional[str] = None,
    compression_level: typing.Optional[int] = None,
    raise_errors: typing.Optional[bool] = False,
    encoding: typing.Optional[str] = None,
    decode_responses: typing.Optional[bool] = None,
    **kwargs
) -> "Encoder"

Returns the encoder

Source code in kvdb/configs/base.py
def get_encoder(
    self,
    serializer: Optional[str] = None,
    serializer_enabled: Optional[bool] = True,
    serializer_kwargs: Optional[Dict[str, Any]] = None,
    compression: Optional[str] = None,
    compression_level: Optional[int] = None,
    raise_errors: Optional[bool] = False,
    encoding: Optional[str] = None,
    decode_responses: Optional[bool] = None,
    **kwargs
) -> 'Encoder':
    """
    Returns the encoder
    """
    _serializer = self.get_serializer(
        serializer = serializer,
        serializer_kwargs = serializer_kwargs,
        compression = compression,
        compression_level = compression_level,
        raise_errors = raise_errors,
        encoding = encoding,
        **kwargs
    ) if serializer_enabled else None
    from kvdb.io.encoder import Encoder
    encoding = self.encoding if encoding is None else encoding
    decode_responses = self.decode_responses if decode_responses is None else decode_responses
    return Encoder(
        encoding = encoding,
        serializer = _serializer,
        decode_responses = decode_responses,
    )

get_key

get_key(key: str) -> str

Gets the Key

Source code in kvdb/io/cachify/base.py
def get_key(self, key: str) -> str:
    """
    Gets the Key
    """
    return key if self.hset_enabled else f'{self.cache_field}:{key}'

get_serializer

get_serializer(
    serializer: typing.Optional[str] = None,
    serializer_kwargs: typing.Optional[
        typing.Dict[str, typing.Any]
    ] = None,
    compression: typing.Optional[str] = None,
    compression_level: typing.Optional[int] = None,
    raise_errors: typing.Optional[bool] = False,
    encoding: typing.Optional[str] = None,
    **kwargs
) -> "SerializerT"

Returns the serializer

Source code in kvdb/configs/base.py
def get_serializer(
    self,
    serializer: Optional[str] = None,
    serializer_kwargs: Optional[Dict[str, Any]] = None,
    compression: Optional[str] = None,
    compression_level: Optional[int] = None,
    raise_errors: Optional[bool] = False,
    encoding: Optional[str] = None,
    **kwargs
) -> 'SerializerT':
    """
    Returns the serializer
    """
    from kvdb.io.serializers import get_serializer
    serializer = self.serializer if serializer is None else serializer
    serializer_kwargs = self.serializer_kwargs if serializer_kwargs is None else serializer_kwargs
    if 'compression' in serializer_kwargs:
        compression = serializer_kwargs.pop('compression')
    if 'compression_level' in serializer_kwargs:
        compression_level = serializer_kwargs.pop('compression_level')
    compression = self.compression if compression is None else compression
    compression_level = self.compression_level if compression_level is None else compression_level
    encoding = self.encoding if encoding is None else encoding
    if serializer is None:
        return None
    return get_serializer(
        serializer = serializer,
        serializer_kwargs = serializer_kwargs,
        compression = compression,
        compression_level = compression_level,
        encoding = encoding,
        raise_errors = raise_errors,
        **kwargs
    )

invalidate_cache

invalidate_cache(key: str) -> int

Invalidates the cache

Source code in kvdb/io/cachify/base.py
def invalidate_cache(self, key: str) -> int:
    """
    Invalidates the cache
    """
    with self.safely():
        if key in self.cache_keyhits:
            _ = self.data['keyhits'].pop(key, None)
        if key in self.cache_timestamps:
            _ = self.data['timestamps'].pop(key, None)
        if key in self.cache_expirations:
            _ = self.data['expirations'].pop(key, None)
        self.data.flush()
        return self._delete(key)

is_silenced

is_silenced(*stages: str) -> bool

Returns whether or not the stage is silenced

Source code in kvdb/io/cachify/base.py
def is_silenced(self, *stages: str) -> bool:
    """
    Returns whether or not the stage is silenced
    """
    if not self.silenced_stages: return False
    return any(stage in self.silenced_stages for stage in stages)

model_dump

model_dump(
    *,
    mode: str = "python",
    include: (
        set[int]
        | set[str]
        | dict[int, typing.Any]
        | dict[str, typing.Any]
        | None
    ) = None,
    exclude: (
        set[int]
        | set[str]
        | dict[int, typing.Any]
        | dict[str, typing.Any]
        | None
    ) = None,
    by_alias: bool = False,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    round_trip: bool = False,
    warnings: bool = True
) -> typing.Dict[str, typing.Any]

[v1] Support for model_dump

Source code in kvdb/types/base.py
def model_dump(self, *, mode: str = 'python', include: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None = None, exclude: set[int] | set[str] | dict[int, Any] | dict[str, Any] | None = None, by_alias: bool = False, exclude_unset: bool = False, exclude_defaults: bool = False, exclude_none: bool = False, round_trip: bool = False, warnings: bool = True) -> Dict[str, Any]:
    """
    [v1] Support for model_dump
    """
    return self.dict(
        include = include,
        exclude = exclude,
        by_alias = by_alias,
        exclude_unset = exclude_unset,
        exclude_defaults = exclude_defaults,
        exclude_none = exclude_none,

    )

model_validate classmethod

model_validate(
    value: typing.Any, *args, **kwargs
) -> "BaseModel"

[v1] Support for model_validate

Source code in kvdb/types/base.py
@classmethod
def model_validate(cls: type['BaseModel'], value: Any, *args, **kwargs) -> 'BaseModel':
    """
    [v1] Support for model_validate
    """
    return cls.parse_obj(value)

model_validate_json classmethod

model_validate_json(
    value: typing.Any, *args, **kwargs
) -> "BaseModel"

[v1] Support for model_validate_json

Source code in kvdb/types/base.py
@classmethod
def model_validate_json(cls: type['BaseModel'], value: Any, *args, **kwargs) -> 'BaseModel':
    """
    [v1] Support for model_validate_json
    """
    return cls.parse_raw(value)

retrieve

retrieve(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> typing.Any

Retrieves the value from the cache

Source code in kvdb/io/cachify/base.py
def retrieve(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> Any:
    """
    Retrieves the value from the cache
    """
    if self.should_overwrite(*args, cache_kwargs = cache_kwargs, **kwargs): 
        if self.super_verbose and not self.is_silenced('cache_overwrite', 'cache', 'retrieve'): logger.info(f'[{self.cache_field}:{key}] Overwriting Cache')
        return ENOVAL
    value = None
    try:
        with self.safely():
            if not self._exists(key):
                if self.super_verbose and not self.is_silenced('cache_miss', 'cache', 'retrieve'): logger.info(f'[{self.cache_field}:{key}] Not Found')
                return ENOVAL
            value = self._get(key)
        if value is None: return ENOVAL

    except TimeoutError:
        if self.super_verbose and not self.is_silenced('retrieve'): logger.error(f'[{self.cache_field}:{key}] Retrieve Timeout')
        return ENOVAL

    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Retrieve Exception', error = e)
        return ENOVAL

    # ThreadPooler.threadpool_task(self.validate_cache_policies, key, *args, cache_kwargs = cache_kwargs, **kwargs)
    self.validate_cache_policies(key, *args, cache_kwargs = cache_kwargs, **kwargs)
    try:
        result = self.decode_hit(value, *args, **kwargs)
        if result is not None: return result
    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Decode Exception', error = e)
    return ENOVAL

run_post_call_hook

run_post_call_hook(
    result: typing.Any,
    *args,
    is_hit: typing.Optional[bool] = None,
    **kwargs
) -> None

Runs the post call hook which fires after the function is called

Source code in kvdb/io/cachify/base.py
def run_post_call_hook(self, result: Any, *args, is_hit: Optional[bool] = None, **kwargs) -> None:
    """
    Runs the post call hook which fires after the function is called
    """
    if not self.has_post_call_hook: return
    if self.super_verbose and not self.is_silenced('post_call'): logger.info(f'[{self.cache_field}] Running Post Call Hook')
    self.post_call_hook(result, *args, is_hit = is_hit, **kwargs)

run_post_init_hook

run_post_init_hook(
    func: typing.Callable, *args, **kwargs
) -> None

Runs the post init hook which fires once after the function is initialized

Source code in kvdb/io/cachify/base.py
def run_post_init_hook(self, func: Callable, *args, **kwargs) -> None:
    """
    Runs the post init hook which fires once after the function is initialized
    """
    if not self.has_post_init_hook: return
    if self.has_ran_post_init_hook: return
    if self.verbosity and not self.is_silenced('post_init'): logger.info(f'[{self.cache_field}] Running Post Init Hook')
    # ThreadPooler.threadpool_task(self.post_init_hook, func, *args, **kwargs)
    self.post_init_hook(func, *args, **kwargs)
    self.has_ran_post_init_hook = True

safely

safely()

Safely wraps the function

Source code in kvdb/io/cachify/base.py
@contextlib.contextmanager
def safely(self):
    """
    Safely wraps the function
    """

    if self.is_async and self.has_async_loop:
        with anyio.move_on_after(self.timeout):
            yield
    else:
        with timeout(int(self.timeout), raise_errors = False):
            yield

set

set(
    key: str,
    value: typing.Any,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Sets the value in the cache

Source code in kvdb/io/cachify/base.py
def set(self, key: str, value: Any, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    """
    Sets the value in the cache
    """
    try:
        with self.safely():
            self._set(key, self.encode_hit(value, *args, **kwargs))
            self.add_key_expiration(key, (cache_kwargs.get('ttl') or self.ttl))
    except TimeoutError:
        if self.super_verbose: logger.error(f'[{self.cache_field}:{key}] Set Timeout')
    except Exception as e:
        if self.verbosity: logger.trace(f'[{self.cache_field}:{key}] Set Exception: {value}', error = e)

should_cache_value

should_cache_value(
    val: typing.Any,
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the value should be cached

Source code in kvdb/io/cachify/base.py
def should_cache_value(self, val: Any, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the value should be cached
    """
    if self.exclude_null and val is None: return False
    if self.exclude_exceptions:
        if isinstance(self.exclude_exceptions, list): 
            return not isinstance(val, tuple(self.exclude_exceptions))
        if isinstance(val, Exception): return False
    if self.exclude_kws and cache_kwargs.get('exclude') is True: return False
    if self.exclude_if is not None: return not self.exclude_if(val, *args, **kwargs)
    return True

should_disable

should_disable(
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not cache should be disabled for the function

Source code in kvdb/io/cachify/base.py
def should_disable(self, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not cache should be disabled for the function
    """
    if self.disabled is not None: self.disabled
    if self.disabled_kws and cache_kwargs.get('disabled') is True: return True
    return self.disabled(*args, **kwargs) if callable(self.disabled) else False

should_invalidate

should_invalidate(
    *args,
    _hits: typing.Optional[int] = None,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the function should be invalidated

Source code in kvdb/io/cachify/base.py
def should_invalidate(self, *args, _hits: Optional[int] = None, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the function should be invalidated
    """
    if self.invalidate_if is not None: return self.invalidate_if(*args, **kwargs)
    if self.invalidate_kws and cache_kwargs.get('invalidate') is True: return True
    if self.invalidate_after is not None: 
        if isinstance(self.invalidate_after, int):
            return _hits is not None and _hits >= self.invalidate_after
        # if _hits and isinstance(self.invalidate_after, int):
        #     return _hits >= self.invalidate_after
        return self.invalidate_after(*args, _hits = _hits, **kwargs)
    return False

should_overwrite

should_overwrite(
    *args,
    cache_kwargs: typing.Dict[str, typing.Any] = None,
    **kwargs
) -> bool

Returns whether or not the value should be overwritten which is based on the overwrite_if function

Source code in kvdb/io/cachify/base.py
def should_overwrite(self, *args, cache_kwargs: Dict[str, Any] = None, **kwargs) -> bool:
    """
    Returns whether or not the value should be overwritten
    which is based on the overwrite_if function
    """
    if self.overwrite_if is not None: return self.overwrite_if(*args, **kwargs)
    if self.overwrite_kws and cache_kwargs.get('overwrite') is True: return True
    return False

update

update(**kwargs)

Validates and updates the kwargs

Source code in kvdb/io/cachify/base.py
def update(self, **kwargs):
    """
    Validates and updates the kwargs
    """
    kwargs = self.validate_kws(kwargs, is_update = True)
    for k, v in kwargs.items():
        if not hasattr(self, k): continue
        setattr(self, k, v)

update_config

update_config(**kwargs)

Update the config for the other settings

Source code in kvdb/types/base.py
def update_config(self, **kwargs):
    """
    Update the config for the other settings
    """
    for k, v in kwargs.items():
        if not hasattr(self, k): continue
        if isinstance(getattr(self, k), BaseModel):
            val: 'BaseModel' = getattr(self, k)
            if hasattr(val, 'update_config'):
                val.update_config(**v)
            else: val = val.__class__(**v)
            setattr(self, k, val)
        else:  setattr(self, k, v)

validate_attrs

validate_attrs(
    values: typing.Dict[str, typing.Any]
) -> typing.Dict[str, typing.Any]

Validates the attributes

Source code in kvdb/io/cachify/base.py
@root_validator(pre = True)
def validate_attrs(cls, values: Dict[str, Any]) -> Dict[str, Any]:
    """
    Validates the attributes
    """
    return cls.validate_kws(values)

validate_cache_policies

validate_cache_policies(
    key: str,
    *args,
    cache_kwargs: typing.Dict[
        str, typing.Union[bool, int, float, typing.Any]
    ] = None,
    **kwargs
) -> None

Runs the cache policies

Source code in kvdb/io/cachify/base.py
def validate_cache_policies(self, key: str, *args, cache_kwargs: Dict[str, Union[bool, int, float, Any]] = None, **kwargs) -> None:
    """
    Runs the cache policies
    """
    self.add_hit()
    self.expire_cache_expired_keys()
    if not self.hset_enabled or self.cache_max_size is None: return
    self.add_key_hit(key)
    self.add_key_timestamp(key)
    self.check_cache_policies(key, *args, cache_kwargs = cache_kwargs, **kwargs)

validate_callable classmethod

validate_callable(
    v: typing.Optional[
        typing.Union[str, int, typing.Callable]
    ]
) -> typing.Optional[
    typing.Union[typing.Callable, typing.Any]
]

Validates the callable

Source code in kvdb/io/cachify/base.py
@classmethod
def validate_callable(cls, v: Optional[Union[str, int, Callable]]) -> Optional[Union[Callable, Any]]:
    """
    Validates the callable
    """
    return lazy_import(v) if isinstance(v, str) else v

validate_is_class_method

validate_is_class_method(func: typing.Callable)

Validates if the function is a class method

Source code in kvdb/io/cachify/base.py
def validate_is_class_method(self, func: Callable):
    """
    Validates if the function is a class method
    """
    if self.is_class_method is not None: return
    self.is_class_method = hasattr(func, '__class__') and inspect.isclass(func.__class__) and is_classmethod(func)

validate_kws classmethod

validate_kws(
    values: typing.Dict[str, typing.Any],
    is_update: typing.Optional[bool] = None,
) -> typing.Dict[str, typing.Any]

Validates the config

Source code in kvdb/io/cachify/base.py
@classmethod
def validate_kws(cls, values: Dict[str, Any], is_update: Optional[bool] = None) -> Dict[str, Any]:
    """
    Validates the config
    """
    _validated_serializer = False
    if 'serializer' in values:
        serializer = values.pop('serializer')
        from kvdb.io.serializers import get_serializer, BaseSerializer
        if isinstance(serializer, str):
            serializer_kwargs = values.pop('serializer_kwargs', {})
            if 'compression' not in serializer_kwargs:
                serializer_kwargs['compression'] = values.pop('compression', None)
            if 'compression_level' not in serializer_kwargs:
                serializer_kwargs['compression_level'] = values.pop('compression_level', None)
            serializer = get_serializer(serializer, **serializer_kwargs)
        if not isinstance(serializer, BaseSerializer):
            raise ValueError('`serializer` must be an instance of `BaseSerializer`')
        values['encoder'] = serializer.dumps
        values['decoder'] = serializer.loads
        _validated_serializer = True

    for key in {
        'name',
        'keybuilder',
        'encoder',
        'decoder',
        'hit_setter',
        'hit_getter',
        'disabled',
        'invalidate_if',
        'invalidate_after',
        'overwrite_if',
        'exclude_if',
        # 'bypass_if',
        'post_init_hook',
        'post_call_hook',
    }:
        if key in values:
            values[key] = cls.validate_callable(values[key])
            if key in {'encoder', 'decoder'}:
                if _validated_serializer: continue
                # if not inspect.isfunction(values[key]):
                if not callable(values[key]):
                    func_value = 'loads' if key == 'decoder' else 'dumps'
                    if hasattr(values[key], func_value) and inspect.isfunction(getattr(values[key], func_value)):
                        values[key] = getattr(values[key], func_value)
                    else:
                        raise ValueError(f'`{key}` must be callable or have a callable "{func_value}" method')

    if 'cache_max_size' in values:
        values['cache_max_size'] = int(values['cache_max_size']) if values['cache_max_size'] else None
        if 'cache_max_size_policy' in values:
            values['cache_max_size_policy'] = CachePolicy(values['cache_max_size_policy'])
        elif not is_update:
            values['cache_max_size_policy'] = CachePolicy.LFU
    elif 'cache_max_size_policy' in values:
        values['cache_max_size_policy'] = CachePolicy(values['cache_max_size_policy'])
    return values

validate_serializer

validate_serializer()

Validate the serializer config

Source code in kvdb/configs/base.py
@model_validator(mode = 'after')
def validate_serializer(self):
    """
    Validate the serializer config
    """
    if self.compression is None and self.compression_level is None:
        from kvdb.io.compression import get_default_compression
        self.compression, self.compression_level = get_default_compression(enabled = self.compression_enabled)
        if self.compression and not temp_data.has_logged('kvdb_default_compression'):
            logger.info(f'Setting default compression to {self.compression} with level {self.compression_level}')
    if self.serializer_kwargs is None: self.serializer_kwargs = {}
    return self

validateself_config

validateself_config()

Validates the cachify config

Source code in kvdb/io/cachify/base.py
@model_validator(mode = 'after')
def validateself_config(self):
    """
    Validates the cachify config
    """
    if self.encoder is None or self.decoder is None:
        serializer = self.get_serializer()
        self.encoder = serializer.dumps
        self.decoder = serializer.loads
    if self.kwarg_override_prefix:
        if self.disabled_kws: self.disabled_kws = [f'{self.kwarg_override_prefix}{kw}' for kw in self.disabled_kws]
        if self.invalidate_kws: self.invalidate_kws = [f'{self.kwarg_override_prefix}{kw}' for kw in self.invalidate_kws]
        if self.overwrite_kws: self.overwrite_kws = [f'{self.kwarg_override_prefix}{kw}' for kw in self.overwrite_kws]
        if self.ttl_kws: self.ttl_kws = [f'{self.kwarg_override_prefix}{kw}' for kw in self.ttl_kws]
        if self.exclude_kws: self.exclude_kws = [f'{self.kwarg_override_prefix}{kw}' for kw in self.exclude_kws]
    from kvdb.configs import settings
    self.settings = settings
    # self.has_async_loop = self.settings.is_in_async_loop()
    return self