Skip to content

Application Services

Application Layer - Services Package.

This package contains application services that provide high-level business operations and coordinate between different layers of the application.

Modules

Package Overview

Application Services Module.

This module contains application-level services that provide cross-cutting concerns and coordinate complex operations across multiple domain entities. Services handle caching, progress tracking, and analysis orchestration.

Classes:

Name Description
CacheService

Manages application-level caching with hash-based keys

ProgressTracker

Tracks processing progress with weighted stages (simplified, no threading)

AnalysisOrchestrator

Orchestrates complex multi-step analysis workflows

Notes

Services in this module are stateless and use dependency injection. Progress tracking is simplified without threading, as Dash is single-threaded.


AnalysisOrchestrator

AnalysisOrchestrator

AnalysisOrchestrator(upload_handler: UploadHandler, data_processor: DataProcessor, result_exporter: ResultExporter, cache_service: CacheService, progress_tracker: ProgressTracker)

Orchestrate the complete analysis workflow.

Coordinates the entire analysis pipeline from file upload through processing to export. Uses composition and dependency injection to maintain clean architecture principles.

Parameters:

Name Type Description Default
upload_handler UploadHandler

Handler for file uploads

required
data_processor DataProcessor

Processor for data merging and analysis

required
result_exporter ResultExporter

Exporter for results

required
cache_service CacheService

Service for caching data

required
progress_tracker ProgressTracker

Tracker for progress updates

required

Methods:

Name Description
execute_workflow

Execute complete workflow from upload to export

process_upload

Process file upload step

process_data

Process data merging step

export_results

Export results in multiple formats

get_session_state

Retrieve current session state

Notes
  • Uses dependency injection for all operations
  • Maintains session state via DTOs
  • Handles errors gracefully with detailed messages
  • Updates progress at each workflow step
  • Caches intermediate results

Initialize the AnalysisOrchestrator with dependencies.

Parameters:

Name Type Description Default
upload_handler UploadHandler

Handler for file uploads

required
data_processor DataProcessor

Processor for data merging

required
result_exporter ResultExporter

Exporter for results

required
cache_service CacheService

Service for caching

required
progress_tracker ProgressTracker

Tracker for progress

required
Notes

All dependencies are injected for testability.

Source code in src/application/services/analysis_orchestrator.py
def __init__(
    self,
    upload_handler: UploadHandler,
    data_processor: DataProcessor,
    result_exporter: ResultExporter,
    cache_service: CacheService,
    progress_tracker: ProgressTracker,
):
    """
    Initialize the AnalysisOrchestrator with dependencies.

    Parameters
    ----------
    upload_handler : UploadHandler
        Handler for file uploads
    data_processor : DataProcessor
        Processor for data merging
    result_exporter : ResultExporter
        Exporter for results
    cache_service : CacheService
        Service for caching
    progress_tracker : ProgressTracker
        Tracker for progress

    Notes
    -----
    All dependencies are injected for testability.
    """
    self._upload_handler = upload_handler
    self._data_processor = data_processor
    self._result_exporter = result_exporter
    self._cache_service = cache_service
    self._progress_tracker = progress_tracker
    self._sessions: dict[str, AnalysisSessionDTO] = {}

Functions

execute_workflow
execute_workflow(content: str, filename: str, session_id: str, export_formats: Optional[List[ExportFormat]] = None) -> AnalysisSessionDTO

Execute the complete analysis workflow.

Coordinates the entire pipeline: 1. Upload and validate file 2. Process and merge data 3. Export results in requested formats 4. Update session state

Parameters:

Name Type Description Default
content str

Base64-encoded file content

required
filename str

Original filename

required
session_id str

Unique session identifier

required
export_formats Optional[List[ExportFormat]]

Formats to export (CSV, Excel, JSON), defaults to [CSV]

None

Returns:

Type Description
AnalysisSessionDTO

Complete session state with results

Notes
  • Creates new session if doesn't exist
  • Updates progress at each step
  • Handles errors without stopping entire workflow
  • Caches results for performance
Source code in src/application/services/analysis_orchestrator.py
def execute_workflow(
    self,
    content: str,
    filename: str,
    session_id: str,
    export_formats: Optional[List[ExportFormat]] = None,
) -> AnalysisSessionDTO:
    """
    Execute the complete analysis workflow.

    Coordinates the entire pipeline:
    1. Upload and validate file
    2. Process and merge data
    3. Export results in requested formats
    4. Update session state

    Parameters
    ----------
    content : str
        Base64-encoded file content
    filename : str
        Original filename
    session_id : str
        Unique session identifier
    export_formats : Optional[List[ExportFormat]], default=None
        Formats to export (CSV, Excel, JSON), defaults to [CSV]

    Returns
    -------
    AnalysisSessionDTO
        Complete session state with results

    Notes
    -----
    - Creates new session if doesn't exist
    - Updates progress at each step
    - Handles errors without stopping entire workflow
    - Caches results for performance
    """
    if export_formats is None:
        export_formats = [ExportFormat.CSV]

    # Initialize session
    created_at = datetime.now().isoformat()

    # Step 1: Process upload
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="upload",
        stage_number=1,
        total_stages=8,
        message="Processing file upload...",
    )

    upload_result = self._upload_handler.process_upload(content, filename)

    if not upload_result.success:
        # Upload failed - return early
        session = AnalysisSessionDTO(
            session_id=session_id,
            upload_result=upload_result,
            processing_result=None,
            export_results=[],
            created_at=created_at,
            is_complete=False,
        )
        self._sessions[session_id] = session
        return session

    # Step 2: Process data
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="processing",
        stage_number=2,
        total_stages=8,
        message="Processing and merging data...",
    )

    processing_result = self._data_processor.process(
        dataset=upload_result.dataset, session_id=session_id
    )

    # Step 3: Export results
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="exporting",
        stage_number=7,
        total_stages=8,
        message="Exporting results...",
    )

    export_results = []
    for fmt in export_formats:
        export_result = self._export_result_by_format(
            processing_result=processing_result, format=fmt, session_id=session_id
        )
        export_results.append(export_result)

    # Step 4: Finalize
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="complete",
        stage_number=8,
        total_stages=8,
        message="Analysis complete!",
    )

    session = AnalysisSessionDTO(
        session_id=session_id,
        upload_result=upload_result,
        processing_result=processing_result,
        export_results=export_results,
        created_at=created_at,
        is_complete=True,
    )

    self._sessions[session_id] = session
    return session
process_upload
process_upload(content: str, filename: str) -> UploadResultDTO

Process file upload independently.

Parameters:

Name Type Description Default
content str

Base64-encoded file content

required
filename str

Original filename

required

Returns:

Type Description
UploadResultDTO

Upload processing result

Notes

Can be used for upload-only operations without full workflow execution.

Source code in src/application/services/analysis_orchestrator.py
def process_upload(self, content: str, filename: str) -> UploadResultDTO:
    """
    Process file upload independently.

    Parameters
    ----------
    content : str
        Base64-encoded file content
    filename : str
        Original filename

    Returns
    -------
    UploadResultDTO
        Upload processing result

    Notes
    -----
    Can be used for upload-only operations without full workflow execution.
    """
    return self._upload_handler.process_upload(content, filename)
process_data
process_data(dataset, session_id: str) -> MergedDataDTO

Process data merging independently.

Parameters:

Name Type Description Default
dataset Dataset

Domain entity with samples

required
session_id str

Session identifier

required

Returns:

Type Description
MergedDataDTO

Processing result

Notes

Can be used when upload is already complete and only processing is needed.

Source code in src/application/services/analysis_orchestrator.py
def process_data(self, dataset, session_id: str) -> MergedDataDTO:
    """
    Process data merging independently.

    Parameters
    ----------
    dataset : Dataset
        Domain entity with samples
    session_id : str
        Session identifier

    Returns
    -------
    MergedDataDTO
        Processing result

    Notes
    -----
    Can be used when upload is already complete and only processing is
    needed.
    """
    return self._data_processor.process(dataset, session_id)
export_results
export_results(data: DataFrame, session_id: str, formats: Optional[List[ExportFormat]] = None) -> List[ExportResultDTO]

Export results in multiple formats.

Parameters:

Name Type Description Default
data DataFrame

Data to export

required
session_id str

Session identifier for filenames

required
formats Optional[List[ExportFormat]]

Export formats, defaults to [CSV]

None

Returns:

Type Description
List[ExportResultDTO]

List of export results

Notes

Can be used for exporting existing data without running the full workflow.

Source code in src/application/services/analysis_orchestrator.py
def export_results(
    self,
    data: pd.DataFrame,
    session_id: str,
    formats: Optional[List[ExportFormat]] = None,
) -> List[ExportResultDTO]:
    """
    Export results in multiple formats.

    Parameters
    ----------
    data : pd.DataFrame
        Data to export
    session_id : str
        Session identifier for filenames
    formats : Optional[List[ExportFormat]], default=None
        Export formats, defaults to [CSV]

    Returns
    -------
    List[ExportResultDTO]
        List of export results

    Notes
    -----
    Can be used for exporting existing data without running the full
    workflow.
    """
    if formats is None:
        formats = [ExportFormat.CSV]

    results = []
    for fmt in formats:
        filename = f"biorempp_results_{session_id}"
        result = self._result_exporter.export(data, fmt, filename)
        results.append(result)

    return results
get_session_state
get_session_state(session_id: str) -> Optional[AnalysisSessionDTO]

Retrieve current session state.

Parameters:

Name Type Description Default
session_id str

Session identifier

required

Returns:

Type Description
Optional[AnalysisSessionDTO]

Session state or None if not found

Notes

Returns None if session doesn't exist.

Source code in src/application/services/analysis_orchestrator.py
def get_session_state(self, session_id: str) -> Optional[AnalysisSessionDTO]:
    """
    Retrieve current session state.

    Parameters
    ----------
    session_id : str
        Session identifier

    Returns
    -------
    Optional[AnalysisSessionDTO]
        Session state or None if not found

    Notes
    -----
    Returns None if session doesn't exist.
    """
    return self._sessions.get(session_id)
get_progress
get_progress(session_id: str) -> Optional[ProcessingProgressDTO]

Get current progress for a session.

Parameters:

Name Type Description Default
session_id str

Session identifier

required

Returns:

Type Description
Optional[ProcessingProgressDTO]

Current progress or None

Source code in src/application/services/analysis_orchestrator.py
def get_progress(self, session_id: str) -> Optional[ProcessingProgressDTO]:
    """
    Get current progress for a session.

    Parameters
    ----------
    session_id : str
        Session identifier

    Returns
    -------
    Optional[ProcessingProgressDTO]
        Current progress or None
    """
    return self._progress_tracker.get_progress(session_id)
clear_session
clear_session(session_id: str) -> None

Clear session data and cache.

Parameters:

Name Type Description Default
session_id str

Session identifier to clear

required
Notes

Removes session from memory and clears cache.

Source code in src/application/services/analysis_orchestrator.py
def clear_session(self, session_id: str) -> None:
    """
    Clear session data and cache.

    Parameters
    ----------
    session_id : str
        Session identifier to clear

    Notes
    -----
    Removes session from memory and clears cache.
    """
    if session_id in self._sessions:
        del self._sessions[session_id]

    # Clear cache for this session
    self._cache_service.clear()

AnalysisSessionDTO

AnalysisSessionDTO dataclass

AnalysisSessionDTO(session_id: str, upload_result: Optional[UploadResultDTO], processing_result: Optional[MergedDataDTO], export_results: List[ExportResultDTO], created_at: str, is_complete: bool)

Data Transfer Object for analysis session state.

Attributes:

Name Type Description
session_id str

Unique identifier for the session

upload_result Optional[UploadResultDTO]

Result of file upload

processing_result Optional[MergedDataDTO]

Result of data processing

export_results List[ExportResultDTO]

List of export operations performed

created_at str

ISO format timestamp of session creation

is_complete bool

Whether analysis is complete


CacheService

CacheService

CacheService(max_size: int = 100, default_ttl_seconds: int = 3600)

Manage application-level caching with hash-based keys.

Provides caching for expensive operations with automatic expiration, size limits, and hash-based key generation.

Parameters:

Name Type Description Default
max_size int

Maximum number of cached items

100
default_ttl_seconds int

Default time-to-live in seconds (1 hour)

3600

Attributes:

Name Type Description
_cache Dict[str, Dict[str, Any]]

Internal cache storage

_max_size int

Maximum cache size

_default_ttl int

Default TTL

Methods:

Name Description
set

Store value in cache

get

Retrieve value from cache

delete

Remove value from cache

clear

Clear entire cache

generate_hash_key

Generate hash-based cache key

has

Check if key exists and is valid

size

Get current cache size

Notes

Cache entries structure: { 'key': { 'value': cached_value, 'timestamp': creation_time, 'ttl': time_to_live_seconds } }

Uses SHA256 for hash key generation.

Initialize cache service.

Parameters:

Name Type Description Default
max_size int

Maximum cached items

100
default_ttl_seconds int

Default TTL (1 hour)

3600
Source code in src/application/services/cache_service.py
def __init__(self, max_size: int = 100, default_ttl_seconds: int = 3600) -> None:
    """
    Initialize cache service.

    Parameters
    ----------
    max_size : int, default=100
        Maximum cached items
    default_ttl_seconds : int, default=3600
        Default TTL (1 hour)
    """
    self._cache: Dict[str, Dict[str, Any]] = {}
    self._max_size = max_size
    self._default_ttl = default_ttl_seconds

Functions

set
set(key: str, value: Any, ttl_seconds: Optional[int] = None) -> None

Store value in cache.

Parameters:

Name Type Description Default
key str

Cache key

required
value Any

Value to cache

required
ttl_seconds Optional[int]

Time-to-live (uses default if None)

None
Notes

If cache is full, removes oldest entry (FIFO eviction).

Source code in src/application/services/cache_service.py
def set(self, key: str, value: Any, ttl_seconds: Optional[int] = None) -> None:
    """
    Store value in cache.

    Parameters
    ----------
    key : str
        Cache key
    value : Any
        Value to cache
    ttl_seconds : Optional[int]
        Time-to-live (uses default if None)

    Notes
    -----
    If cache is full, removes oldest entry (FIFO eviction).
    """
    # Check size limit and evict if needed
    if len(self._cache) >= self._max_size:
        self._evict_oldest()

    ttl = ttl_seconds if ttl_seconds is not None else self._default_ttl

    self._cache[key] = {
        "value": value,
        "timestamp": time.time(),
        "ttl": ttl,
    }
get
get(key: str) -> Optional[Any]

Retrieve value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
Optional[Any]

Cached value or None if not found/expired

Notes

Automatically removes expired entries.

Source code in src/application/services/cache_service.py
def get(self, key: str) -> Optional[Any]:
    """
    Retrieve value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    Optional[Any]
        Cached value or None if not found/expired

    Notes
    -----
    Automatically removes expired entries.
    """
    if key not in self._cache:
        return None

    entry = self._cache[key]

    # Check expiration
    if self._is_expired(entry):
        del self._cache[key]
        return None

    return entry["value"]
delete
delete(key: str) -> bool

Remove value from cache.

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if key was deleted, False if not found

Source code in src/application/services/cache_service.py
def delete(self, key: str) -> bool:
    """
    Remove value from cache.

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if key was deleted, False if not found
    """
    if key in self._cache:
        del self._cache[key]
        return True
    return False
clear
clear() -> None

Clear entire cache.

Source code in src/application/services/cache_service.py
def clear(self) -> None:
    """Clear entire cache."""
    self._cache.clear()
generate_hash_key
generate_hash_key(content: str) -> str

Generate SHA256 hash-based cache key.

Parameters:

Name Type Description Default
content str

Content to hash (e.g., upload content or dataset identifier)

required

Returns:

Type Description
str

SHA256 hash hexadecimal string

Notes
  • Same content always generates same key (deterministic)
  • Compatible with legacy cache key generation
Source code in src/application/services/cache_service.py
def generate_hash_key(self, content: str) -> str:
    """
    Generate SHA256 hash-based cache key.

    Parameters
    ----------
    content : str
        Content to hash (e.g., upload content or dataset identifier)

    Returns
    -------
    str
        SHA256 hash hexadecimal string

    Notes
    -----
    - Same content always generates same key (deterministic)
    - Compatible with legacy cache key generation
    """
    return hashlib.sha256(content.encode("utf-8")).hexdigest()
has
has(key: str) -> bool

Check if key exists and is valid (not expired).

Parameters:

Name Type Description Default
key str

Cache key

required

Returns:

Type Description
bool

True if key exists and not expired

Source code in src/application/services/cache_service.py
def has(self, key: str) -> bool:
    """
    Check if key exists and is valid (not expired).

    Parameters
    ----------
    key : str
        Cache key

    Returns
    -------
    bool
        True if key exists and not expired
    """
    if key not in self._cache:
        return False

    entry = self._cache[key]
    if self._is_expired(entry):
        del self._cache[key]
        return False

    return True
size
size() -> int

Get current cache size (number of entries).

Returns:

Type Description
int

Number of cached entries

Source code in src/application/services/cache_service.py
def size(self) -> int:
    """
    Get current cache size (number of entries).

    Returns
    -------
    int
        Number of cached entries
    """
    # Clean expired entries first
    self._clean_expired()
    return len(self._cache)

ProgressTracker

ProgressTracker

ProgressTracker(session_id: str)

Simplified progress tracker for multi-stage processing.

This class tracks progress across 8 weighted processing stages without threading complexity, designed specifically for single-threaded Dash.

Parameters:

Name Type Description Default
session_id str

Unique session identifier for tracking

required

Attributes:

Name Type Description
session_id str

Session identifier

_stages Dict[int, str]

Stage number to description mapping

_stage_weights Dict[int, float]

Stage number to weight mapping (sums to 100.0)

_current_stage int

Current stage number (1-8)

_current_message str

Current processing message

_start_time float

Timestamp when processing started

_error Optional[str]

Error message if processing failed

Methods:

Name Description
start_stage

Start a new processing stage

update_progress

Update progress within current stage

complete

Mark processing as complete

set_error

Set error message and halt progress

get_progress

Get current progress as DTO

calculate_overall_progress

Calculate weighted overall progress

Notes

8 Processing Stages with Weights: 1. Input Validation (5%) 2. Data Parsing (10%) 3. BioRemPP Merge (30%) 4. KEGG Merge (20%) 5. HADEG Merge (15%) 6. ToxCSM Merge (10%) 7. Result Preparation (5%) 8. Finalization (5%)

Simplifications from Legacy: - NO threading.Lock (single-threaded Dash) - NO callback notifications (handled externally) - Simple dict storage instead of complex state machine - Weighted progress calculation preserved

Examples:

>>> tracker = ProgressTracker("session_1")
>>> tracker.start_stage(1, "Validation", "Validating input")
>>> tracker.update_progress(50.0, "Checked 3/5 samples")
>>> progress = tracker.get_progress()
'Validation'

Initialize simplified progress tracker.

Parameters:

Name Type Description Default
session_id str

Unique session identifier

required
Source code in src/application/services/progress_tracker.py
def __init__(self, session_id: str) -> None:
    """
    Initialize simplified progress tracker.

    Parameters
    ----------
    session_id : str
        Unique session identifier
    """
    self.session_id = session_id
    self._stages = self.STAGES
    self._stage_weights = self.STAGE_WEIGHTS
    self._current_stage = 0
    self._current_message = ""
    self._stage_progress = 0.0  # Progress within current stage (0-100)
    self._start_time = time.time()
    self._error: Optional[str] = None

Functions

start_stage
start_stage(stage: int, stage_name: str, message: str = '') -> None

Start a new processing stage.

Parameters:

Name Type Description Default
stage int

Stage number (1-8)

required
stage_name str

Stage description (optional, uses default if empty)

required
message str

Initial stage message

""

Raises:

Type Description
ValueError

If stage number is invalid

Examples:

>>> tracker = ProgressTracker("session_1")
>>> tracker.start_stage(1, "Validation", "Starting validation")
Source code in src/application/services/progress_tracker.py
def start_stage(self, stage: int, stage_name: str, message: str = "") -> None:
    """
    Start a new processing stage.

    Parameters
    ----------
    stage : int
        Stage number (1-8)
    stage_name : str
        Stage description (optional, uses default if empty)
    message : str, default=""
        Initial stage message

    Raises
    ------
    ValueError
        If stage number is invalid

    Examples
    --------
    >>> tracker = ProgressTracker("session_1")
    >>> tracker.start_stage(1, "Validation", "Starting validation")
    """
    if stage < 1 or stage > self.TOTAL_STAGES:
        raise ValueError(
            f"Invalid stage number: {stage} (must be 1-{self.TOTAL_STAGES})"
        )

    self._current_stage = stage
    self._current_message = message
    self._stage_progress = 0.0
update_progress
update_progress(progress: float, message: Optional[str] = None) -> None

Update progress within current stage.

Parameters:

Name Type Description Default
progress float

Progress percentage within stage (0.0-100.0)

required
message Optional[str]

Optional progress message

None

Raises:

Type Description
ValueError

If progress is out of range

Examples:

>>> tracker.start_stage(3, "Merge", "Merging data")
>>> tracker.update_progress(50.0, "Processed 500/1000 records")
Source code in src/application/services/progress_tracker.py
def update_progress(self, progress: float, message: Optional[str] = None) -> None:
    """
    Update progress within current stage.

    Parameters
    ----------
    progress : float
        Progress percentage within stage (0.0-100.0)
    message : Optional[str]
        Optional progress message

    Raises
    ------
    ValueError
        If progress is out of range

    Examples
    --------
    >>> tracker.start_stage(3, "Merge", "Merging data")
    >>> tracker.update_progress(50.0, "Processed 500/1000 records")
    """
    if not 0.0 <= progress <= 100.0:
        raise ValueError("Progress must be between 0.0 and 100.0")

    self._stage_progress = progress
    if message is not None:
        self._current_message = message
complete
complete() -> None

Mark processing as complete (100%).

Examples:

>>> tracker.complete()
>>> tracker.get_progress().is_complete()
True
Source code in src/application/services/progress_tracker.py
def complete(self) -> None:
    """
    Mark processing as complete (100%).

    Examples
    --------
    >>> tracker.complete()
    >>> tracker.get_progress().is_complete()
    True
    """
    self._current_stage = self.TOTAL_STAGES
    self._stage_progress = 100.0
    self._current_message = "Processing complete"
set_error
set_error(error: str) -> None

Set error message and halt progress.

Parameters:

Name Type Description Default
error str

Error description

required

Examples:

>>> tracker.set_error("Database connection failed")
>>> tracker.get_progress().has_error()
True
Source code in src/application/services/progress_tracker.py
def set_error(self, error: str) -> None:
    """
    Set error message and halt progress.

    Parameters
    ----------
    error : str
        Error description

    Examples
    --------
    >>> tracker.set_error("Database connection failed")
    >>> tracker.get_progress().has_error()
    True
    """
    self._error = error
    self._current_message = f"Error: {error}"
get_progress
get_progress() -> ProcessingProgressDTO

Get current progress as DTO.

Returns:

Type Description
ProcessingProgressDTO

Current progress information

Examples:

>>> tracker.start_stage(3, "Merge", "Processing")
>>> tracker.update_progress(50.0)
>>> dto = tracker.get_progress()
>>> dto.current_stage
'BioRemPP Database Merge'
Source code in src/application/services/progress_tracker.py
def get_progress(self) -> ProcessingProgressDTO:
    """
    Get current progress as DTO.

    Returns
    -------
    ProcessingProgressDTO
        Current progress information

    Examples
    --------
    >>> tracker.start_stage(3, "Merge", "Processing")
    >>> tracker.update_progress(50.0)
    >>> dto = tracker.get_progress()
    >>> dto.current_stage
    'BioRemPP Database Merge'
    """
    overall_progress = self.calculate_overall_progress()
    estimated_time = self._calculate_estimated_time(overall_progress)

    stage_name = (
        self._stages.get(self._current_stage, "Unknown")
        if self._current_stage > 0
        else "Not Started"
    )

    return ProcessingProgressDTO(
        current_stage=stage_name,
        stage_number=max(1, self._current_stage),
        total_stages=self.TOTAL_STAGES,
        progress_percentage=overall_progress,
        message=self._current_message,
        estimated_time_remaining=estimated_time,
        error=self._error,
    )
calculate_overall_progress
calculate_overall_progress() -> float

Calculate weighted overall progress across all stages.

Returns:

Type Description
float

Overall progress percentage (0.0-100.0)

Notes

Formula: Σ(completed_stages_weights) + (current_stage_weight * stage_progress%)

Examples:

>>> tracker.start_stage(3, "Merge", "Processing")
>>> tracker.update_progress(50.0)  # 50% of stage 3
>>> overall = tracker.calculate_overall_progress()
>>> # Stages 1-2 complete (15%) + 50% of stage 3 (15% of 30%)
>>> round(overall, 1)
30.0
Source code in src/application/services/progress_tracker.py
def calculate_overall_progress(self) -> float:
    """
    Calculate weighted overall progress across all stages.

    Returns
    -------
    float
        Overall progress percentage (0.0-100.0)

    Notes
    -----
    Formula: Σ(completed_stages_weights) + (current_stage_weight * stage_progress%)

    Examples
    --------
    >>> tracker.start_stage(3, "Merge", "Processing")
    >>> tracker.update_progress(50.0)  # 50% of stage 3
    >>> overall = tracker.calculate_overall_progress()
    >>> # Stages 1-2 complete (15%) + 50% of stage 3 (15% of 30%)
    >>> round(overall, 1)
    30.0
    """
    if self._current_stage == 0:
        return 0.0

    # Sum weights of completed stages
    completed_weight = sum(
        self._stage_weights[s]
        for s in range(1, self._current_stage)
        if s in self._stage_weights
    )

    # Add progress of current stage
    current_stage_weight = self._stage_weights.get(self._current_stage, 0.0)
    current_contribution = current_stage_weight * self._stage_progress / 100.0

    total_progress = completed_weight + current_contribution

    return min(100.0, total_progress)  # Cap at 100%