Skip to content

Cache Layer

The Cache Layer provides high-performance in-memory caching implementations optimized for different data types.

Cache Implementations

MemoryCache

MemoryCache

MemoryCache(max_size: int = 1000, default_ttl: int = 0)

In-memory LRU cache implementation.

Provides LRU eviction policy, TTL support, max size enforcement, and basic thread-safe operations.

Attributes:

Name Type Description
max_size int

Maximum number of entries in cache

default_ttl int

Default TTL in seconds (0 = no expiration)

_cache OrderedDict

Ordered dictionary storing cached values

_expiry dict[str, Optional[datetime]]

Expiry timestamps for cache entries

Methods:

Name Description
get

Get value from cache

set

Set value in cache

delete

Delete entry from cache

clear

Clear all cache entries

exists

Check if key exists in cache

size

Get current cache size

get_stats

Get cache statistics

Initialize memory cache.

Parameters:

Name Type Description Default
max_size int

Maximum number of cache entries.

1000
default_ttl int

Default TTL in seconds (0 = no expiration).

0
Source code in src/infrastructure/cache/memory_cache.py
def __init__(
    self,
    max_size: int = 1000,
    default_ttl: int = 0
):
    """
    Initialize memory cache.

    Parameters
    ----------
    max_size : int, default=1000
        Maximum number of cache entries.
    default_ttl : int, default=0
        Default TTL in seconds (0 = no expiration).
    """
    self.max_size = max_size
    self.default_ttl = default_ttl
    self._cache: OrderedDict = OrderedDict()
    self._expiry: dict[str, Optional[datetime]] = {}
    self._cache_type = self.CACHE_TYPE
    self._hits = 0
    self._misses = 0

    logger.info(
        f"Initialized {self.__class__.__name__}",
        extra={'max_size': max_size, 'default_ttl': default_ttl}
    )
    self._emit_size_metric()
    self._emit_hit_ratio_metric()

Functions

get
get(key: str) -> Optional[Any]

Get value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Any]

Cached value or None if not found/expired

Source code in src/infrastructure/cache/memory_cache.py
def get(self, key: str) -> Optional[Any]:
    """
    Get value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[Any]
        Cached value or None if not found/expired
    """
    # Check if key exists
    if key not in self._cache:
        logger.debug(f"Cache miss: {key}")
        self._misses += 1
        self._emit_operation_metric("get", "miss")
        self._emit_hit_ratio_metric()
        return None

    # Check if expired
    if self._is_expired(key):
        logger.debug(f"Cache expired: {key}")
        self.delete(key)
        self._misses += 1
        self._emit_operation_metric("get", "expired")
        self._emit_hit_ratio_metric()
        return None

    # Move to end (LRU)
    self._cache.move_to_end(key)

    logger.debug(f"Cache hit: {key}")
    self._hits += 1
    self._emit_operation_metric("get", "hit")
    self._emit_hit_ratio_metric()
    return self._cache[key]
set
set(key: str, value: Any, ttl: Optional[int] = None) -> None

Set value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl Optional[int]

TTL in seconds (if None, uses default_ttl)

None
Source code in src/infrastructure/cache/memory_cache.py
def set(
    self,
    key: str,
    value: Any,
    ttl: Optional[int] = None
) -> None:
    """
    Set value in cache.

    Parameters
    ----------
    key : str
        Cache key
    value : Any
        Value to cache
    ttl : Optional[int], default=None
        TTL in seconds (if None, uses default_ttl)
    """
    # Enforce max size (evict oldest if necessary)
    if key not in self._cache and len(self._cache) >= self.max_size:
        self._evict_oldest()

    # Set value
    self._cache[key] = value
    self._cache.move_to_end(key)

    # Set expiry
    ttl = ttl if ttl is not None else self.default_ttl
    if ttl > 0:
        self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
    else:
        self._expiry[key] = None

    logger.debug(
        f"Cache set: {key}",
        extra={'ttl': ttl}
    )
    self._emit_operation_metric("set", "ok")
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(
        self._estimate_value_size_bytes(value)
    )
    self._emit_size_metric()
delete
delete(key: str) -> bool

Delete entry from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if deleted, False if key not found

Source code in src/infrastructure/cache/memory_cache.py
def delete(self, key: str) -> bool:
    """
    Delete entry from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if deleted, False if key not found
    """
    if key in self._cache:
        del self._cache[key]
        del self._expiry[key]
        logger.debug(f"Cache delete: {key}")
        self._emit_operation_metric("delete", "ok")
        self._emit_size_metric()
        return True
    return False
clear
clear() -> None

Clear all cache entries.

Source code in src/infrastructure/cache/memory_cache.py
def clear(self) -> None:
    """Clear all cache entries."""
    count = len(self._cache)
    self._cache.clear()
    self._expiry.clear()
    logger.info(f"Cache cleared: {count} entries removed")
    self._emit_operation_metric("clear", "ok")
    self._emit_size_metric()
exists
exists(key: str) -> bool

Check if key exists in cache (and not expired).

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if exists and not expired

Source code in src/infrastructure/cache/memory_cache.py
def exists(self, key: str) -> bool:
    """
    Check if key exists in cache (and not expired).

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if exists and not expired
    """
    if key not in self._cache:
        return False

    if self._is_expired(key):
        self.delete(key)
        return False

    return True
size
size() -> int

Get current cache size.

Returns:

Type Description
int

Number of entries in cache

Source code in src/infrastructure/cache/memory_cache.py
def size(self) -> int:
    """
    Get current cache size.

    Returns
    -------
    int
        Number of entries in cache
    """
    return len(self._cache)
get_stats
get_stats() -> dict

Get cache statistics.

Returns:

Type Description
dict

Statistics including size, max_size, usage percentage

Source code in src/infrastructure/cache/memory_cache.py
def get_stats(self) -> dict:
    """
    Get cache statistics.

    Returns
    -------
    dict
        Statistics including size, max_size, usage percentage
    """
    return {
        'current_size': len(self._cache),
        'max_size': self.max_size,
        'default_ttl': self.default_ttl,
        'usage_percent': (len(self._cache) / self.max_size) * 100
    }

DataFrameCache

DataFrameCache

DataFrameCache(max_size: int = 50, default_ttl: int = 3600, compress_threshold: int = 1024 * 1024, compression_level: int = 6)

Bases: MemoryCache

Specialized cache for pandas DataFrames.

Provides compression (gzip + pickle), DataFrame-specific hash generation, memory usage tracking, and automatic compression for large DataFrames.

Attributes:

Name Type Description
compress_threshold int

DataFrame size (bytes) above which compression is used

compression_level int

Gzip compression level (1-9)

Initialize DataFrame cache.

Parameters:

Name Type Description Default
max_size int

Maximum number of DataFrames to cache.

50
default_ttl int

Default TTL in seconds.

3600
compress_threshold int

Size threshold (bytes) for compression.

1048576
compression_level int

Gzip compression level (1-9).

6
Source code in src/infrastructure/cache/dataframe_cache.py
def __init__(
    self,
    max_size: int = 50,
    default_ttl: int = 3600,
    compress_threshold: int = 1024 * 1024,  # 1 MB
    compression_level: int = 6
):
    """
    Initialize DataFrame cache.

    Parameters
    ----------
    max_size : int, default=50
        Maximum number of DataFrames to cache.
    default_ttl : int, default=3600
        Default TTL in seconds.
    compress_threshold : int, default=1048576
        Size threshold (bytes) for compression.
    compression_level : int, default=6
        Gzip compression level (1-9).
    """
    super().__init__(max_size=max_size, default_ttl=default_ttl)
    self.compress_threshold = compress_threshold
    self.compression_level = compression_level

    logger.info(
        f"Initialized {self.__class__.__name__}",
        extra={
            'max_size': max_size,
            'compress_threshold_mb': round(
                compress_threshold / 1024**2, 2
            ),
            'compression_level': compression_level
        }
    )

Functions

get
get(key: str) -> Optional[Any]

Get value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Any]

Cached value or None if not found/expired

Source code in src/infrastructure/cache/memory_cache.py
def get(self, key: str) -> Optional[Any]:
    """
    Get value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[Any]
        Cached value or None if not found/expired
    """
    # Check if key exists
    if key not in self._cache:
        logger.debug(f"Cache miss: {key}")
        self._misses += 1
        self._emit_operation_metric("get", "miss")
        self._emit_hit_ratio_metric()
        return None

    # Check if expired
    if self._is_expired(key):
        logger.debug(f"Cache expired: {key}")
        self.delete(key)
        self._misses += 1
        self._emit_operation_metric("get", "expired")
        self._emit_hit_ratio_metric()
        return None

    # Move to end (LRU)
    self._cache.move_to_end(key)

    logger.debug(f"Cache hit: {key}")
    self._hits += 1
    self._emit_operation_metric("get", "hit")
    self._emit_hit_ratio_metric()
    return self._cache[key]
set
set(key: str, value: Any, ttl: Optional[int] = None) -> None

Set value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl Optional[int]

TTL in seconds (if None, uses default_ttl)

None
Source code in src/infrastructure/cache/memory_cache.py
def set(
    self,
    key: str,
    value: Any,
    ttl: Optional[int] = None
) -> None:
    """
    Set value in cache.

    Parameters
    ----------
    key : str
        Cache key
    value : Any
        Value to cache
    ttl : Optional[int], default=None
        TTL in seconds (if None, uses default_ttl)
    """
    # Enforce max size (evict oldest if necessary)
    if key not in self._cache and len(self._cache) >= self.max_size:
        self._evict_oldest()

    # Set value
    self._cache[key] = value
    self._cache.move_to_end(key)

    # Set expiry
    ttl = ttl if ttl is not None else self.default_ttl
    if ttl > 0:
        self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
    else:
        self._expiry[key] = None

    logger.debug(
        f"Cache set: {key}",
        extra={'ttl': ttl}
    )
    self._emit_operation_metric("set", "ok")
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(
        self._estimate_value_size_bytes(value)
    )
    self._emit_size_metric()
delete
delete(key: str) -> bool

Delete entry from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if deleted, False if key not found

Source code in src/infrastructure/cache/memory_cache.py
def delete(self, key: str) -> bool:
    """
    Delete entry from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if deleted, False if key not found
    """
    if key in self._cache:
        del self._cache[key]
        del self._expiry[key]
        logger.debug(f"Cache delete: {key}")
        self._emit_operation_metric("delete", "ok")
        self._emit_size_metric()
        return True
    return False
clear
clear() -> None

Clear all cache entries.

Source code in src/infrastructure/cache/memory_cache.py
def clear(self) -> None:
    """Clear all cache entries."""
    count = len(self._cache)
    self._cache.clear()
    self._expiry.clear()
    logger.info(f"Cache cleared: {count} entries removed")
    self._emit_operation_metric("clear", "ok")
    self._emit_size_metric()
exists
exists(key: str) -> bool

Check if key exists in cache (and not expired).

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if exists and not expired

Source code in src/infrastructure/cache/memory_cache.py
def exists(self, key: str) -> bool:
    """
    Check if key exists in cache (and not expired).

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if exists and not expired
    """
    if key not in self._cache:
        return False

    if self._is_expired(key):
        self.delete(key)
        return False

    return True
size
size() -> int

Get current cache size.

Returns:

Type Description
int

Number of entries in cache

Source code in src/infrastructure/cache/memory_cache.py
def size(self) -> int:
    """
    Get current cache size.

    Returns
    -------
    int
        Number of entries in cache
    """
    return len(self._cache)
get_stats
get_stats() -> dict

Get cache statistics.

Returns:

Type Description
dict

Statistics including size, max_size, usage percentage

Source code in src/infrastructure/cache/memory_cache.py
def get_stats(self) -> dict:
    """
    Get cache statistics.

    Returns
    -------
    dict
        Statistics including size, max_size, usage percentage
    """
    return {
        'current_size': len(self._cache),
        'max_size': self.max_size,
        'default_ttl': self.default_ttl,
        'usage_percent': (len(self._cache) / self.max_size) * 100
    }
cache_dataframe
cache_dataframe(key: str, df: DataFrame, ttl: Optional[int] = None) -> None

Cache a DataFrame.

Parameters:

Name Type Description Default
key str

Cache key

required
df DataFrame

DataFrame to cache

required
ttl Optional[int]

TTL in seconds

None
Source code in src/infrastructure/cache/dataframe_cache.py
def cache_dataframe(
    self,
    key: str,
    df: pd.DataFrame,
    ttl: Optional[int] = None
) -> None:
    """
    Cache a DataFrame.

    Parameters
    ----------
    key : str
        Cache key
    df : pd.DataFrame
        DataFrame to cache
    ttl : Optional[int], default=None
        TTL in seconds
    """
    # Get DataFrame size
    df_size = df.memory_usage(deep=True).sum()

    # Compress if necessary
    if df_size > self.compress_threshold:
        logger.debug(
            f"Compressing large DataFrame: {key}",
            extra={'size_mb': round(df_size / 1024**2, 2)}
        )
        cached_data = self._compress_dataframe(df)
        is_compressed = True
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="compress",
            outcome="applied",
        ).inc()
    else:
        cached_data = df.copy()
        is_compressed = False
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="compress",
            outcome="skipped",
        ).inc()

    # Store metadata along with data
    cache_entry = {
        'data': cached_data,
        'is_compressed': is_compressed,
        'original_size_mb': round(df_size / 1024**2, 2),
        'shape': df.shape
    }

    self.set(key, cache_entry, ttl=ttl)
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(float(df_size))

    logger.info(
        f"Cached DataFrame: {key}",
        extra={
            'shape': df.shape,
            'size_mb': round(df_size / 1024**2, 2),
            'compressed': is_compressed
        }
    )
get_cached_dataframe
get_cached_dataframe(key: str) -> Optional[pd.DataFrame]

Retrieve cached DataFrame.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[DataFrame]

Cached DataFrame or None if not found

Source code in src/infrastructure/cache/dataframe_cache.py
def get_cached_dataframe(self, key: str) -> Optional[pd.DataFrame]:
    """
    Retrieve cached DataFrame.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[pd.DataFrame]
        Cached DataFrame or None if not found
    """
    cache_entry = self.get(key)

    if cache_entry is None:
        return None

    # Decompress if necessary
    if cache_entry['is_compressed']:
        logger.debug(f"Decompressing DataFrame: {key}")
        CACHE_OPERATIONS_TOTAL.labels(
            cache_type=self._cache_type,
            operation="decompress",
            outcome="ok",
        ).inc()
        return self._decompress_dataframe(cache_entry['data'])
    else:
        return cache_entry['data'].copy()
generate_dataframe_key
generate_dataframe_key(df: DataFrame, prefix: str = '') -> str

Generate unique cache key for DataFrame.

Parameters:

Name Type Description Default
df DataFrame

DataFrame to generate key for

required
prefix str

Optional prefix for key

''

Returns:

Type Description
str

Cache key based on DataFrame content hash

Notes
  • Key based on columns, shape, and sample data
Source code in src/infrastructure/cache/dataframe_cache.py
def generate_dataframe_key(
    self,
    df: pd.DataFrame,
    prefix: str = ''
) -> str:
    """
    Generate unique cache key for DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame to generate key for
    prefix : str, default=''
        Optional prefix for key

    Returns
    -------
    str
        Cache key based on DataFrame content hash

    Notes
    -----
    - Key based on columns, shape, and sample data
    """
    # Create hash from DataFrame characteristics
    hash_components = [
        str(df.shape),
        str(sorted(df.columns.tolist())),
        str(df.dtypes.to_dict()),
        # Sample first/last rows for content hash
        (str(df.head(5).values.tobytes())
         if len(df) > 0 else ''),
        (str(df.tail(5).values.tobytes())
         if len(df) > 5 else '')
    ]

    hash_string = '|'.join(hash_components)
    hash_value = hashlib.md5(
        hash_string.encode()
    ).hexdigest()[:16]

    if prefix:
        return f"{prefix}_{hash_value}"
    return hash_value

GraphCache

GraphCache

GraphCache(max_size: int = 100, default_ttl: int = 1800)

Bases: MemoryCache

Specialized cache for Plotly graph objects.

Provides JSON serialization for Plotly figures, graph-specific hash generation, and efficient storage of figure configurations.

Attributes:

Name Type Description
max_size int

Maximum number of graphs to cache

default_ttl int

Default TTL in seconds

Initialize graph cache.

Parameters:

Name Type Description Default
max_size int

Maximum number of figures to cache.

100
default_ttl int

Default TTL in seconds (30 minutes).

1800
Source code in src/infrastructure/cache/graph_cache.py
def __init__(
    self,
    max_size: int = 100,
    default_ttl: int = 1800  # 30 minutes
):
    """
    Initialize graph cache.

    Parameters
    ----------
    max_size : int, default=100
        Maximum number of figures to cache.
    default_ttl : int, default=1800
        Default TTL in seconds (30 minutes).
    """
    super().__init__(max_size=max_size, default_ttl=default_ttl)

    logger.info(
        f"Initialized {self.__class__.__name__}",
        extra={'max_size': max_size, 'default_ttl': default_ttl}
    )

Functions

get
get(key: str) -> Optional[Any]

Get value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Any]

Cached value or None if not found/expired

Source code in src/infrastructure/cache/memory_cache.py
def get(self, key: str) -> Optional[Any]:
    """
    Get value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[Any]
        Cached value or None if not found/expired
    """
    # Check if key exists
    if key not in self._cache:
        logger.debug(f"Cache miss: {key}")
        self._misses += 1
        self._emit_operation_metric("get", "miss")
        self._emit_hit_ratio_metric()
        return None

    # Check if expired
    if self._is_expired(key):
        logger.debug(f"Cache expired: {key}")
        self.delete(key)
        self._misses += 1
        self._emit_operation_metric("get", "expired")
        self._emit_hit_ratio_metric()
        return None

    # Move to end (LRU)
    self._cache.move_to_end(key)

    logger.debug(f"Cache hit: {key}")
    self._hits += 1
    self._emit_operation_metric("get", "hit")
    self._emit_hit_ratio_metric()
    return self._cache[key]
set
set(key: str, value: Any, ttl: Optional[int] = None) -> None

Set value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl Optional[int]

TTL in seconds (if None, uses default_ttl)

None
Source code in src/infrastructure/cache/memory_cache.py
def set(
    self,
    key: str,
    value: Any,
    ttl: Optional[int] = None
) -> None:
    """
    Set value in cache.

    Parameters
    ----------
    key : str
        Cache key
    value : Any
        Value to cache
    ttl : Optional[int], default=None
        TTL in seconds (if None, uses default_ttl)
    """
    # Enforce max size (evict oldest if necessary)
    if key not in self._cache and len(self._cache) >= self.max_size:
        self._evict_oldest()

    # Set value
    self._cache[key] = value
    self._cache.move_to_end(key)

    # Set expiry
    ttl = ttl if ttl is not None else self.default_ttl
    if ttl > 0:
        self._expiry[key] = datetime.now() + timedelta(seconds=ttl)
    else:
        self._expiry[key] = None

    logger.debug(
        f"Cache set: {key}",
        extra={'ttl': ttl}
    )
    self._emit_operation_metric("set", "ok")
    CACHE_ENTRY_SIZE_BYTES.labels(cache_type=self._cache_type).observe(
        self._estimate_value_size_bytes(value)
    )
    self._emit_size_metric()
delete
delete(key: str) -> bool

Delete entry from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if deleted, False if key not found

Source code in src/infrastructure/cache/memory_cache.py
def delete(self, key: str) -> bool:
    """
    Delete entry from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if deleted, False if key not found
    """
    if key in self._cache:
        del self._cache[key]
        del self._expiry[key]
        logger.debug(f"Cache delete: {key}")
        self._emit_operation_metric("delete", "ok")
        self._emit_size_metric()
        return True
    return False
clear
clear() -> None

Clear all cache entries.

Source code in src/infrastructure/cache/memory_cache.py
def clear(self) -> None:
    """Clear all cache entries."""
    count = len(self._cache)
    self._cache.clear()
    self._expiry.clear()
    logger.info(f"Cache cleared: {count} entries removed")
    self._emit_operation_metric("clear", "ok")
    self._emit_size_metric()
exists
exists(key: str) -> bool

Check if key exists in cache (and not expired).

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if exists and not expired

Source code in src/infrastructure/cache/memory_cache.py
def exists(self, key: str) -> bool:
    """
    Check if key exists in cache (and not expired).

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if exists and not expired
    """
    if key not in self._cache:
        return False

    if self._is_expired(key):
        self.delete(key)
        return False

    return True
size
size() -> int

Get current cache size.

Returns:

Type Description
int

Number of entries in cache

Source code in src/infrastructure/cache/memory_cache.py
def size(self) -> int:
    """
    Get current cache size.

    Returns
    -------
    int
        Number of entries in cache
    """
    return len(self._cache)
get_stats
get_stats() -> dict

Get cache statistics.

Returns:

Type Description
dict

Statistics including size, max_size, usage percentage

Source code in src/infrastructure/cache/memory_cache.py
def get_stats(self) -> dict:
    """
    Get cache statistics.

    Returns
    -------
    dict
        Statistics including size, max_size, usage percentage
    """
    return {
        'current_size': len(self._cache),
        'max_size': self.max_size,
        'default_ttl': self.default_ttl,
        'usage_percent': (len(self._cache) / self.max_size) * 100
    }
cache_figure
cache_figure(key: str, figure: Figure, ttl: Optional[int] = None) -> None

Cache a Plotly figure.

Parameters:

Name Type Description Default
key str

Cache key

required
figure Figure

Plotly figure to cache

required
ttl Optional[int]

TTL in seconds

None
Source code in src/infrastructure/cache/graph_cache.py
def cache_figure(
    self,
    key: str,
    figure: go.Figure,
    ttl: Optional[int] = None
) -> None:
    """
    Cache a Plotly figure.

    Parameters
    ----------
    key : str
        Cache key
    figure : go.Figure
        Plotly figure to cache
    ttl : Optional[int], default=None
        TTL in seconds
    """
    # Convert figure to JSON dict
    fig_dict = figure.to_dict()

    # Store JSON-serializable dict
    self.set(key, fig_dict, ttl=ttl)

    logger.info(f"Cached figure: {key}")
get_cached_figure
get_cached_figure(key: str) -> Optional[go.Figure]

Retrieve cached Plotly figure.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Figure]

Cached figure or None if not found

Source code in src/infrastructure/cache/graph_cache.py
def get_cached_figure(self, key: str) -> Optional[go.Figure]:
    """
    Retrieve cached Plotly figure.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[go.Figure]
        Cached figure or None if not found
    """
    fig_dict = self.get(key)

    if fig_dict is None:
        return None

    # Reconstruct figure from dict
    figure = go.Figure(fig_dict)

    logger.debug(f"Retrieved cached figure: {key}")

    return figure
generate_figure_key
generate_figure_key(analysis_id: str, filter_values: dict, config: Optional[dict] = None) -> str

Generate unique cache key for figure.

Parameters:

Name Type Description Default
analysis_id str

Analysis ID (e.g., 'UC1_1')

required
filter_values dict

Filter values used to generate figure

required
config Optional[dict]

Additional configuration parameters

None

Returns:

Type Description
str

Cache key

Source code in src/infrastructure/cache/graph_cache.py
def generate_figure_key(
    self,
    analysis_id: str,
    filter_values: dict,
    config: Optional[dict] = None
) -> str:
    """
    Generate unique cache key for figure.

    Parameters
    ----------
    analysis_id : str
        Analysis ID (e.g., 'UC1_1')
    filter_values : dict
        Filter values used to generate figure
    config : Optional[dict], default=None
        Additional configuration parameters

    Returns
    -------
    str
        Cache key
    """
    # Create hash from analysis + filters + config
    hash_components = {
        'analysis_id': analysis_id,
        'filters': filter_values,
        'config': config or {}
    }

    hash_string = json.dumps(hash_components, sort_keys=True)
    hash_value = hashlib.md5(
        hash_string.encode()
    ).hexdigest()[:16]

    return f"{analysis_id}_{hash_value}"

GraphCacheManager

GraphCacheManager

GraphCacheManager(max_size: int = 100, default_ttl: int = 3600)

Manager for graph caching operations.

Wraps GraphCache with additional management capabilities and metadata support.

Attributes:

Name Type Description
cache GraphCache

Underlying cache instance

Methods:

Name Description
cache_graph

Cache a Plotly figure

get_cached_graph

Retrieve cached Plotly figure

clear

Clear all cached graphs

get_stats

Get cache statistics

Initialize graph cache manager.

Parameters:

Name Type Description Default
max_size int

Maximum number of graphs to cache.

100
default_ttl int

Default TTL in seconds (1 hour).

3600
Source code in src/infrastructure/cache/graph_cache_manager.py
def __init__(
    self,
    max_size: int = 100,
    default_ttl: int = 3600
):
    """
    Initialize graph cache manager.

    Parameters
    ----------
    max_size : int, default=100
        Maximum number of graphs to cache.
    default_ttl : int, default=3600
        Default TTL in seconds (1 hour).
    """
    self.cache = GraphCache(
        max_size=max_size,
        default_ttl=default_ttl
    )
    logger.info("GraphCacheManager initialized")

Functions

cache_graph
cache_graph(key: str, figure: Figure, metadata: Optional[dict] = None, ttl: Optional[int] = None) -> None

Cache a Plotly figure.

Parameters:

Name Type Description Default
key str

Cache key

required
figure Figure

Plotly figure to cache

required
metadata Optional[dict]

Optional metadata (for logging/tracking)

None
ttl Optional[int]

TTL in seconds (if None, uses default)

None
Source code in src/infrastructure/cache/graph_cache_manager.py
def cache_graph(
    self,
    key: str,
    figure: go.Figure,
    metadata: Optional[dict] = None,
    ttl: Optional[int] = None
) -> None:
    """
    Cache a Plotly figure.

    Parameters
    ----------
    key : str
        Cache key
    figure : go.Figure
        Plotly figure to cache
    metadata : Optional[dict], default=None
        Optional metadata (for logging/tracking)
    ttl : Optional[int], default=None
        TTL in seconds (if None, uses default)
    """
    self.cache.cache_figure(key, figure, ttl=ttl)

    if metadata:
        logger.debug(f"Cached graph with metadata: {metadata}")
get_cached_graph
get_cached_graph(key: str) -> Optional[go.Figure]

Retrieve cached Plotly figure.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Figure]

Cached figure or None if not found

Source code in src/infrastructure/cache/graph_cache_manager.py
def get_cached_graph(self, key: str) -> Optional[go.Figure]:
    """
    Retrieve cached Plotly figure.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[go.Figure]
        Cached figure or None if not found
    """
    return self.cache.get_cached_figure(key)
clear
clear() -> None

Clear all cached graphs.

Source code in src/infrastructure/cache/graph_cache_manager.py
def clear(self) -> None:
    """Clear all cached graphs."""
    self.cache.clear()
    logger.info("All cached graphs cleared")
get_stats
get_stats() -> dict

Get cache statistics.

Returns:

Type Description
dict

Statistics including size, max_size, etc.

Source code in src/infrastructure/cache/graph_cache_manager.py
def get_stats(self) -> dict:
    """
    Get cache statistics.

    Returns
    -------
    dict
        Statistics including size, max_size, etc.
    """
    return self.cache.get_stats()