DataFrameCache(max_size: int = 50, default_ttl: int = 3600, compress_threshold: int = 1024 * 1024, compression_level: int = 6)
Bases: MemoryCache
Specialized cache for pandas DataFrames.
Provides compression (gzip + pickle), DataFrame-specific hash generation, memory usage tracking, and automatic compression for large DataFrames.
Attributes:
| Name | Type | Description |
compress_threshold | int | DataFrame size (bytes) above which compression is used |
compression_level | int | Gzip compression level (1-9) |
Initialize DataFrame cache.
Parameters:
| Name | Type | Description | Default |
max_size | int | Maximum number of DataFrames to cache. | 50 |
default_ttl | int | | 3600 |
compress_threshold | int | Size threshold (bytes) for compression. | 1048576 |
compression_level | int | Gzip compression level (1-9). | 6 |
Source code in src/infrastructure/cache/dataframe_cache.py
| def __init__(
self,
max_size: int = 50,
default_ttl: int = 3600,
compress_threshold: int = 1024 * 1024, # 1 MB
compression_level: int = 6
):
"""
Initialize DataFrame cache.
Parameters
----------
max_size : int, default=50
Maximum number of DataFrames to cache.
default_ttl : int, default=3600
Default TTL in seconds.
compress_threshold : int, default=1048576
Size threshold (bytes) for compression.
compression_level : int, default=6
Gzip compression level (1-9).
"""
super().__init__(max_size=max_size, default_ttl=default_ttl)
self.compress_threshold = compress_threshold
self.compression_level = compression_level
logger.info(
f"Initialized {self.__class__.__name__}",
extra={
'max_size': max_size,
'compress_threshold_mb': round(
compress_threshold / 1024**2, 2
),
'compression_level': compression_level
}
)
|
Functions
cache_dataframe
cache_dataframe(key: str, df: DataFrame, ttl: Optional[int] = None) -> None
Cache a DataFrame.
Parameters:
| Name | Type | Description | Default |
key | str | | required |
df | DataFrame | | required |
ttl | Optional[int] | | None |
Source code in src/infrastructure/cache/dataframe_cache.py
| def cache_dataframe(
self,
key: str,
df: pd.DataFrame,
ttl: Optional[int] = None
) -> None:
"""
Cache a DataFrame.
Parameters
----------
key : str
Cache key
df : pd.DataFrame
DataFrame to cache
ttl : Optional[int], default=None
TTL in seconds
"""
# Get DataFrame size
df_size = df.memory_usage(deep=True).sum()
# Compress if necessary
if df_size > self.compress_threshold:
logger.debug(
f"Compressing large DataFrame: {key}",
extra={'size_mb': round(df_size / 1024**2, 2)}
)
cached_data = self._compress_dataframe(df)
is_compressed = True
CACHE_OPERATIONS_TOTAL.labels(
cache_type=self._cache_type,
operation="compress",
outcome="applied",
).inc()
else:
cached_data = df.copy()
is_compressed = False
CACHE_OPERATIONS_TOTAL.labels(
cache_type=self._cache_type,
operation="compress",
outcome="skipped",
).inc()
# Store metadata along with data
cache_entry = {
'data': cached_data,
'is_compressed': is_compressed,
'original_size_mb': round(df_size / 1024**2, 2),
'shape': df.shape
}
self.set(key, cache_entry, ttl=ttl)
CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(float(df_size))
logger.info(
f"Cached DataFrame: {key}",
extra={
'shape': df.shape,
'size_mb': round(df_size / 1024**2, 2),
'compressed': is_compressed
}
)
|
get_cached_dataframe
get_cached_dataframe(key: str) -> Optional[pd.DataFrame]
Retrieve cached DataFrame.
Parameters:
| Name | Type | Description | Default |
key | str | | required |
Returns:
| Type | Description |
Optional[DataFrame] | Cached DataFrame or None if not found |
Source code in src/infrastructure/cache/dataframe_cache.py
| def get_cached_dataframe(self, key: str) -> Optional[pd.DataFrame]:
"""
Retrieve cached DataFrame.
Parameters
----------
key : str
Cache key
Returns
-------
Optional[pd.DataFrame]
Cached DataFrame or None if not found
"""
cache_entry = self.get(key)
if cache_entry is None:
return None
# Decompress if necessary
if cache_entry['is_compressed']:
logger.debug(f"Decompressing DataFrame: {key}")
CACHE_OPERATIONS_TOTAL.labels(
cache_type=self._cache_type,
operation="decompress",
outcome="ok",
).inc()
return self._decompress_dataframe(cache_entry['data'])
else:
return cache_entry['data'].copy()
|
generate_dataframe_key
generate_dataframe_key(df: DataFrame, prefix: str = '') -> str
Generate unique cache key for DataFrame.
Parameters:
| Name | Type | Description | Default |
df | DataFrame | DataFrame to generate key for | required |
prefix | str | | '' |
Returns:
| Type | Description |
str | Cache key based on DataFrame content hash |
Notes
- Key based on columns, shape, and sample data
Source code in src/infrastructure/cache/dataframe_cache.py
| def generate_dataframe_key(
self,
df: pd.DataFrame,
prefix: str = ''
) -> str:
"""
Generate unique cache key for DataFrame.
Parameters
----------
df : pd.DataFrame
DataFrame to generate key for
prefix : str, default=''
Optional prefix for key
Returns
-------
str
Cache key based on DataFrame content hash
Notes
-----
- Key based on columns, shape, and sample data
"""
# Create hash from DataFrame characteristics
hash_components = [
str(df.shape),
str(sorted(df.columns.tolist())),
str(df.dtypes.to_dict()),
# Sample first/last rows for content hash
(str(df.head(5).values.tobytes())
if len(df) > 0 else ''),
(str(df.tail(5).values.tobytes())
if len(df) > 5 else '')
]
hash_string = '|'.join(hash_components)
hash_value = hashlib.md5(
hash_string.encode()
).hexdigest()[:16]
if prefix:
return f"{prefix}_{hash_value}"
return hash_value
|
get
get(key: str) -> Optional[Any]
Get value from cache.
Parameters:
| Name | Type | Description | Default |
key | str | | required |
Returns:
| Type | Description |
Optional[Any] | Cached value or None if not found/expired |
Source code in src/infrastructure/cache/memory_cache.py
| def get(self, key: str) -> Optional[Any]:
"""
Get value from cache.
Parameters
----------
key : str
Cache key
Returns
-------
Optional[Any]
Cached value or None if not found/expired
"""
# Check if key exists
if key not in self._cache:
logger.debug(f"Cache miss: {key}")
self._misses += 1
self._emit_operation_metric("get", "miss")
self._emit_hit_ratio_metric()
return None
# Check if expired
if self._is_expired(key):
logger.debug(f"Cache expired: {key}")
self.delete(key)
self._misses += 1
self._emit_operation_metric("get", "expired")
self._emit_hit_ratio_metric()
return None
# Move to end (LRU)
self._cache.move_to_end(key)
logger.debug(f"Cache hit: {key}")
self._hits += 1
self._emit_operation_metric("get", "hit")
self._emit_hit_ratio_metric()
return self._cache[key]
|
set
set(key: str, value: Any, ttl: Optional[int] = None) -> None
Set value in cache.
Parameters:
| Name | Type | Description | Default |
key | str | | required |
value | Any | | required |
ttl | Optional[int] | TTL in seconds (if None, uses default_ttl) | None |
Source code in src/infrastructure/cache/memory_cache.py
| def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> None:
"""
Set value in cache.
Parameters
----------
key : str
Cache key
value : Any
Value to cache
ttl : Optional[int], default=None
TTL in seconds (if None, uses default_ttl)
"""
# Enforce max size (evict oldest if necessary)
if key not in self._cache and len(self._cache) >= self.max_size:
self._evict_oldest()
# Set value
self._cache[key] = value
self._cache.move_to_end(key)
# Set expiry
ttl = ttl if ttl is not None else self.default_ttl
if ttl > 0:
self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
else:
self._expiry[key] = None
logger.debug(
f"Cache set: {key}",
extra={'ttl': ttl}
)
self._emit_operation_metric("set", "ok")
CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(
self._estimate_value_size_bytes(value)
)
self._emit_size_metric()
|
delete
Delete entry from cache.
Parameters:
| Name | Type | Description | Default |
key | str | | required |
Returns:
| Type | Description |
bool | True if deleted, False if key not found |
Source code in src/infrastructure/cache/memory_cache.py
| def delete(self, key: str) -> bool:
"""
Delete entry from cache.
Parameters
----------
key : str
Cache key
Returns
-------
bool
True if deleted, False if key not found
"""
if key in self._cache:
del self._cache[key]
del self._expiry[key]
logger.debug(f"Cache delete: {key}")
self._emit_operation_metric("delete", "ok")
self._emit_size_metric()
return True
return False
|
clear
Clear all cache entries.
Source code in src/infrastructure/cache/memory_cache.py
| def clear(self) -> None:
"""Clear all cache entries."""
count = len(self._cache)
self._cache.clear()
self._expiry.clear()
logger.info(f"Cache cleared: {count} entries removed")
self._emit_operation_metric("clear", "ok")
self._emit_size_metric()
|
exists
Check if key exists in cache (and not expired).
Parameters:
| Name | Type | Description | Default |
key | str | | required |
Returns:
| Type | Description |
bool | True if exists and not expired |
Source code in src/infrastructure/cache/memory_cache.py
| def exists(self, key: str) -> bool:
"""
Check if key exists in cache (and not expired).
Parameters
----------
key : str
Cache key
Returns
-------
bool
True if exists and not expired
"""
if key not in self._cache:
return False
if self._is_expired(key):
self.delete(key)
return False
return True
|
size
Get current cache size.
Returns:
| Type | Description |
int | Number of entries in cache |
Source code in src/infrastructure/cache/memory_cache.py
| def size(self) -> int:
"""
Get current cache size.
Returns
-------
int
Number of entries in cache
"""
return len(self._cache)
|
get_stats
Get cache statistics.
Returns:
| Type | Description |
dict | Statistics including size, max_size, usage percentage |
Source code in src/infrastructure/cache/memory_cache.py
| def get_stats(self) -> dict:
"""
Get cache statistics.
Returns
-------
dict
Statistics including size, max_size, usage percentage
"""
return {
'current_size': len(self._cache),
'max_size': self.max_size,
'default_ttl': self.default_ttl,
'usage_percent': (len(self._cache) / self.max_size) * 100
}
|