Skip to content

DataFrame Cache

dataframe_cache

DataFrame Cache - Pandas DataFrame Caching.

Provides specialized caching for pandas DataFrames with compression support for memory efficiency.

Classes:

Name Description
DataFrameCache

Specialized cache for pandas DataFrames with gzip compression

Classes

DataFrameCache

DataFrameCache(max_size: int = 50, default_ttl: int = 3600, compress_threshold: int = 1024 * 1024, compression_level: int = 6)

Bases: MemoryCache

Specialized cache for pandas DataFrames.

Provides compression (gzip + pickle), DataFrame-specific hash generation, memory usage tracking, and automatic compression for large DataFrames.

Attributes:

Name Type Description
compress_threshold int

DataFrame size (bytes) above which compression is used

compression_level int

Gzip compression level (1-9)

Initialize DataFrame cache.

Parameters:

Name Type Description Default
max_size int

Maximum number of DataFrames to cache.

50
default_ttl int

Default TTL in seconds.

3600
compress_threshold int

Size threshold (bytes) for compression.

1048576
compression_level int

Gzip compression level (1-9).

6
Source code in src/infrastructure/cache/dataframe_cache.py
def __init__(
    self,
    max_size: int = 50,
    default_ttl: int = 3600,
    compress_threshold: int = 1024 * 1024,  # 1 MB
    compression_level: int = 6
):
    """
    Initialize DataFrame cache.

    Parameters
    ----------
    max_size : int, default=50
        Maximum number of DataFrames to cache.
    default_ttl : int, default=3600
        Default TTL in seconds.
    compress_threshold : int, default=1048576
        Size threshold (bytes) for compression.
    compression_level : int, default=6
        Gzip compression level (1-9).
    """
    super().__init__(max_size=max_size, default_ttl=default_ttl)
    self.compress_threshold = compress_threshold
    self.compression_level = compression_level

    logger.info(
        f"Initialized {self.__class__.__name__}",
        extra={
            'max_size': max_size,
            'compress_threshold_mb': round(
                compress_threshold / 1024**2, 2
            ),
            'compression_level': compression_level
        }
    )
Functions
cache_dataframe
cache_dataframe(key: str, df: DataFrame, ttl: Optional[int] = None) -> None

Cache a DataFrame.

Parameters:

Name Type Description Default
key str

Cache key

required
df DataFrame

DataFrame to cache

required
ttl Optional[int]

TTL in seconds

None
Source code in src/infrastructure/cache/dataframe_cache.py
def cache_dataframe(
    self,
    key: str,
    df: pd.DataFrame,
    ttl: Optional[int] = None
) -> None:
    """
    Cache a DataFrame.

    Parameters
    ----------
    key : str
        Cache key
    df : pd.DataFrame
        DataFrame to cache
    ttl : Optional[int], default=None
        TTL in seconds
    """
    # Get DataFrame size
    df_size = df.memory_usage(deep=True).sum()

    # Compress if necessary
    if df_size > self.compress_threshold:
        logger.debug(
            f"Compressing large DataFrame: {key}",
            extra={'size_mb': round(df_size / 1024**2, 2)}
        )
        cached_data = self._compress_dataframe(df)
        is_compressed = True
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="compress",
            outcome="applied",
        ).inc()
    else:
        cached_data = df.copy()
        is_compressed = False
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="compress",
            outcome="skipped",
        ).inc()

    # Store metadata along with data
    cache_entry = {
        'data': cached_data,
        'is_compressed': is_compressed,
        'original_size_mb': round(df_size / 1024**2, 2),
        'shape': df.shape
    }

    self.set(key, cache_entry, ttl=ttl)
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(float(df_size))

    logger.info(
        f"Cached DataFrame: {key}",
        extra={
            'shape': df.shape,
            'size_mb': round(df_size / 1024**2, 2),
            'compressed': is_compressed
        }
    )
get_cached_dataframe
get_cached_dataframe(key: str) -> Optional[pd.DataFrame]

Retrieve cached DataFrame.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[DataFrame]

Cached DataFrame or None if not found

Source code in src/infrastructure/cache/dataframe_cache.py
def get_cached_dataframe(self, key: str) -> Optional[pd.DataFrame]:
    """
    Retrieve cached DataFrame.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[pd.DataFrame]
        Cached DataFrame or None if not found
    """
    cache_entry = self.get(key)

    if cache_entry is None:
        return None

    # Decompress if necessary
    if cache_entry['is_compressed']:
        logger.debug(f"Decompressing DataFrame: {key}")
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="decompress",
            outcome="ok",
        ).inc()
        return self._decompress_dataframe(cache_entry['data'])
    else:
        return cache_entry['data'].copy()
generate_dataframe_key
generate_dataframe_key(df: DataFrame, prefix: str = '') -> str

Generate unique cache key for DataFrame.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to generate key for

required
prefix str

Optional prefix for key

''

Returns:

Type Description
str

Cache key based on DataFrame content hash

Notes
  • Key based on columns, shape, and sample data
Source code in src/infrastructure/cache/dataframe_cache.py
def generate_dataframe_key(
    self,
    df: pd.DataFrame,
    prefix: str = ''
) -> str:
    """
    Generate unique cache key for DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to generate key for
    prefix : str, default=''
        Optional prefix for key

    Returns
    -------
    str
        Cache key based on DataFrame content hash

    Notes
    -----
    - Key based on columns, shape, and sample data
    """
    # Create hash from DataFrame characteristics
    hash_components = [
        str(df.shape),
        str(sorted(df.columns.tolist())),
        str(df.dtypes.to_dict()),
        # Sample first/last rows for content hash
        (str(df.head(5).values.tobytes())
         if len(df) > 0 else ''),
        (str(df.tail(5).values.tobytes())
         if len(df) > 5 else '')
    ]

    hash_string = '|'.join(hash_components)
    hash_value = hashlib.md5(
        hash_string.encode()
    ).hexdigest()[:16]

    if prefix:
        return f"{prefix}_{hash_value}"
    return hash_value
get
get(key: str) -> Optional[Any]

Get value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Any]

Cached value or None if not found/expired

Source code in src/infrastructure/cache/memory_cache.py
def get(self, key: str) -> Optional[Any]:
    """
    Get value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[Any]
        Cached value or None if not found/expired
    """
    # Check if key exists
    if key not in self._cache:
        logger.debug(f"Cache miss: {key}")
        self._misses += 1
        self._emit_operation_metric("get", "miss")
        self._emit_hit_ratio_metric()
        return None

    # Check if expired
    if self._is_expired(key):
        logger.debug(f"Cache expired: {key}")
        self.delete(key)
        self._misses += 1
        self._emit_operation_metric("get", "expired")
        self._emit_hit_ratio_metric()
        return None

    # Move to end (LRU)
    self._cache.move_to_end(key)

    logger.debug(f"Cache hit: {key}")
    self._hits += 1
    self._emit_operation_metric("get", "hit")
    self._emit_hit_ratio_metric()
    return self._cache[key]
set
set(key: str, value: Any, ttl: Optional[int] = None) -> None

Set value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl Optional[int]

TTL in seconds (if None, uses default_ttl)

None
Source code in src/infrastructure/cache/memory_cache.py
def set(
    self,
    key: str,
    value: Any,
    ttl: Optional[int] = None
) -> None:
    """
    Set value in cache.

    Parameters
    ----------
    key : str
        Cache key
    value : Any
        Value to cache
    ttl : Optional[int], default=None
        TTL in seconds (if None, uses default_ttl)
    """
    # Enforce max size (evict oldest if necessary)
    if key not in self._cache and len(self._cache) >= self.max_size:
        self._evict_oldest()

    # Set value
    self._cache[key] = value
    self._cache.move_to_end(key)

    # Set expiry
    ttl = ttl if ttl is not None else self.default_ttl
    if ttl > 0:
        self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
    else:
        self._expiry[key] = None

    logger.debug(
        f"Cache set: {key}",
        extra={'ttl': ttl}
    )
    self._emit_operation_metric("set", "ok")
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(
        self._estimate_value_size_bytes(value)
    )
    self._emit_size_metric()
delete
delete(key: str) -> bool

Delete entry from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if deleted, False if key not found

Source code in src/infrastructure/cache/memory_cache.py
def delete(self, key: str) -> bool:
    """
    Delete entry from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if deleted, False if key not found
    """
    if key in self._cache:
        del self._cache[key]
        del self._expiry[key]
        logger.debug(f"Cache delete: {key}")
        self._emit_operation_metric("delete", "ok")
        self._emit_size_metric()
        return True
    return False
clear
clear() -> None

Clear all cache entries.

Source code in src/infrastructure/cache/memory_cache.py
def clear(self) -> None:
    """Clear all cache entries."""
    count = len(self._cache)
    self._cache.clear()
    self._expiry.clear()
    logger.info(f"Cache cleared: {count} entries removed")
    self._emit_operation_metric("clear", "ok")
    self._emit_size_metric()
exists
exists(key: str) -> bool

Check if key exists in cache (and not expired).

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if exists and not expired

Source code in src/infrastructure/cache/memory_cache.py
def exists(self, key: str) -> bool:
    """
    Check if key exists in cache (and not expired).

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if exists and not expired
    """
    if key not in self._cache:
        return False

    if self._is_expired(key):
        self.delete(key)
        return False

    return True
size
size() -> int

Get current cache size.

Returns:

Type Description
int

Number of entries in cache

Source code in src/infrastructure/cache/memory_cache.py
def size(self) -> int:
    """
    Get current cache size.

    Returns
    -------
    int
        Number of entries in cache
    """
    return len(self._cache)
get_stats
get_stats() -> dict

Get cache statistics.

Returns:

Type Description
dict

Statistics including size, max_size, usage percentage

Source code in src/infrastructure/cache/memory_cache.py
def get_stats(self) -> dict:
    """
    Get cache statistics.

    Returns
    -------
    dict
        Statistics including size, max_size, usage percentage
    """
    return {
        'current_size': len(self._cache),
        'max_size': self.max_size,
        'default_ttl': self.default_ttl,
        'usage_percent': (len(self._cache) / self.max_size) * 100
    }

Functions