Skip to content

Analysis Orchestrator

analysis_orchestrator

Application Layer - Analysis Orchestrator.

This module orchestrates the complete workflow from upload to export, coordinating between multiple application services and core operations.

Classes:

Name Description
AnalysisOrchestrator

High-level coordinator for the complete analysis workflow.

Notes
  • Follows Clean Architecture (depends on abstractions)
  • Coordinates but doesn't duplicate logic
  • Uses Dependency Injection for testability
  • Maintains workflow state through DTOs

Classes

AnalysisSessionDTO dataclass

AnalysisSessionDTO(session_id: str, upload_result: Optional[UploadResultDTO], processing_result: Optional[MergedDataDTO], export_results: List[ExportResultDTO], created_at: str, is_complete: bool)

Data Transfer Object for analysis session state.

Attributes:

Name Type Description
session_id str

Unique identifier for the session

upload_result Optional[UploadResultDTO]

Result of file upload

processing_result Optional[MergedDataDTO]

Result of data processing

export_results List[ExportResultDTO]

List of export operations performed

created_at str

ISO format timestamp of session creation

is_complete bool

Whether analysis is complete

AnalysisOrchestrator

AnalysisOrchestrator(upload_handler: UploadHandler, data_processor: DataProcessor, result_exporter: ResultExporter, cache_service: CacheService, progress_tracker: ProgressTracker)

Orchestrate the complete analysis workflow.

Coordinates the entire analysis pipeline from file upload through processing to export. Uses composition and dependency injection to maintain clean architecture principles.

Parameters:

Name Type Description Default
upload_handler UploadHandler

Handler for file uploads

required
data_processor DataProcessor

Processor for data merging and analysis

required
result_exporter ResultExporter

Exporter for results

required
cache_service CacheService

Service for caching data

required
progress_tracker ProgressTracker

Tracker for progress updates

required

Methods:

Name Description
execute_workflow

Execute complete workflow from upload to export

process_upload

Process file upload step

process_data

Process data merging step

export_results

Export results in multiple formats

get_session_state

Retrieve current session state

Notes
  • Uses dependency injection for all operations
  • Maintains session state via DTOs
  • Handles errors gracefully with detailed messages
  • Updates progress at each workflow step
  • Caches intermediate results

Initialize the AnalysisOrchestrator with dependencies.

Parameters:

Name Type Description Default
upload_handler UploadHandler

Handler for file uploads

required
data_processor DataProcessor

Processor for data merging

required
result_exporter ResultExporter

Exporter for results

required
cache_service CacheService

Service for caching

required
progress_tracker ProgressTracker

Tracker for progress

required
Notes

All dependencies are injected for testability.

Source code in src/application/services/analysis_orchestrator.py
def __init__(
    self,
    upload_handler: UploadHandler,
    data_processor: DataProcessor,
    result_exporter: ResultExporter,
    cache_service: CacheService,
    progress_tracker: ProgressTracker,
):
    """
    Initialize the AnalysisOrchestrator with dependencies.

    Parameters
    ----------
    upload_handler : UploadHandler
        Handler for file uploads
    data_processor : DataProcessor
        Processor for data merging
    result_exporter : ResultExporter
        Exporter for results
    cache_service : CacheService
        Service for caching
    progress_tracker : ProgressTracker
        Tracker for progress

    Notes
    -----
    All dependencies are injected for testability.
    """
    self._upload_handler = upload_handler
    self._data_processor = data_processor
    self._result_exporter = result_exporter
    self._cache_service = cache_service
    self._progress_tracker = progress_tracker
    self._sessions: dict[str, AnalysisSessionDTO] = {}
Functions
execute_workflow
execute_workflow(content: str, filename: str, session_id: str, export_formats: Optional[List[ExportFormat]] = None) -> AnalysisSessionDTO

Execute the complete analysis workflow.

Coordinates the entire pipeline: 1. Upload and validate file 2. Process and merge data 3. Export results in requested formats 4. Update session state

Parameters:

Name Type Description Default
content str

Base64-encoded file content

required
filename str

Original filename

required
session_id str

Unique session identifier

required
export_formats Optional[List[ExportFormat]]

Formats to export (CSV, Excel, JSON), defaults to [CSV]

None

Returns:

Type Description
AnalysisSessionDTO

Complete session state with results

Notes
  • Creates new session if doesn't exist
  • Updates progress at each step
  • Handles errors without stopping entire workflow
  • Caches results for performance
Source code in src/application/services/analysis_orchestrator.py
def execute_workflow(
    self,
    content: str,
    filename: str,
    session_id: str,
    export_formats: Optional[List[ExportFormat]] = None,
) -> AnalysisSessionDTO:
    """
    Execute the complete analysis workflow.

    Coordinates the entire pipeline:
    1. Upload and validate file
    2. Process and merge data
    3. Export results in requested formats
    4. Update session state

    Parameters
    ----------
    content : str
        Base64-encoded file content
    filename : str
        Original filename
    session_id : str
        Unique session identifier
    export_formats : Optional[List[ExportFormat]], default=None
        Formats to export (CSV, Excel, JSON), defaults to [CSV]

    Returns
    -------
    AnalysisSessionDTO
        Complete session state with results

    Notes
    -----
    - Creates new session if doesn't exist
    - Updates progress at each step
    - Handles errors without stopping entire workflow
    - Caches results for performance
    """
    if export_formats is None:
        export_formats = [ExportFormat.CSV]

    # Initialize session
    created_at = datetime.now().isoformat()

    # Step 1: Process upload
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="upload",
        stage_number=1,
        total_stages=8,
        message="Processing file upload...",
    )

    upload_result = self._upload_handler.process_upload(content, filename)

    if not upload_result.success:
        # Upload failed - return early
        session = AnalysisSessionDTO(
            session_id=session_id,
            upload_result=upload_result,
            processing_result=None,
            export_results=[],
            created_at=created_at,
            is_complete=False,
        )
        self._sessions[session_id] = session
        return session

    # Step 2: Process data
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="processing",
        stage_number=2,
        total_stages=8,
        message="Processing and merging data...",
    )

    processing_result = self._data_processor.process(
        dataset=upload_result.dataset, session_id=session_id
    )

    # Step 3: Export results
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="exporting",
        stage_number=7,
        total_stages=8,
        message="Exporting results...",
    )

    export_results = []
    for fmt in export_formats:
        export_result = self._export_result_by_format(
            processing_result=processing_result, format=fmt, session_id=session_id
        )
        export_results.append(export_result)

    # Step 4: Finalize
    self._progress_tracker.update_progress(
        session_id=session_id,
        current_stage="complete",
        stage_number=8,
        total_stages=8,
        message="Analysis complete!",
    )

    session = AnalysisSessionDTO(
        session_id=session_id,
        upload_result=upload_result,
        processing_result=processing_result,
        export_results=export_results,
        created_at=created_at,
        is_complete=True,
    )

    self._sessions[session_id] = session
    return session
process_upload
process_upload(content: str, filename: str) -> UploadResultDTO

Process file upload independently.

Parameters:

Name Type Description Default
content str

Base64-encoded file content

required
filename str

Original filename

required

Returns:

Type Description
UploadResultDTO

Upload processing result

Notes

Can be used for upload-only operations without full workflow execution.

Source code in src/application/services/analysis_orchestrator.py
def process_upload(self, content: str, filename: str) -> UploadResultDTO:
    """
    Process file upload independently.

    Parameters
    ----------
    content : str
        Base64-encoded file content
    filename : str
        Original filename

    Returns
    -------
    UploadResultDTO
        Upload processing result

    Notes
    -----
    Can be used for upload-only operations without full workflow execution.
    """
    return self._upload_handler.process_upload(content, filename)
process_data
process_data(dataset, session_id: str) -> MergedDataDTO

Process data merging independently.

Parameters:

Name Type Description Default
dataset Dataset

Domain entity with samples

required
session_id str

Session identifier

required

Returns:

Type Description
MergedDataDTO

Processing result

Notes

Can be used when upload is already complete and only processing is needed.

Source code in src/application/services/analysis_orchestrator.py
def process_data(self, dataset, session_id: str) -> MergedDataDTO:
    """
    Process data merging independently.

    Parameters
    ----------
    dataset : Dataset
        Domain entity with samples
    session_id : str
        Session identifier

    Returns
    -------
    MergedDataDTO
        Processing result

    Notes
    -----
    Can be used when upload is already complete and only processing is
    needed.
    """
    return self._data_processor.process(dataset, session_id)
export_results
export_results(data: DataFrame, session_id: str, formats: Optional[List[ExportFormat]] = None) -> List[ExportResultDTO]

Export results in multiple formats.

Parameters:

Name Type Description Default
data DataFrame

Data to export

required
session_id str

Session identifier for filenames

required
formats Optional[List[ExportFormat]]

Export formats, defaults to [CSV]

None

Returns:

Type Description
List[ExportResultDTO]

List of export results

Notes

Can be used for exporting existing data without running the full workflow.

Source code in src/application/services/analysis_orchestrator.py
def export_results(
    self,
    data: pd.DataFrame,
    session_id: str,
    formats: Optional[List[ExportFormat]] = None,
) -> List[ExportResultDTO]:
    """
    Export results in multiple formats.

    Parameters
    ----------
    data : pd.DataFrame
        Data to export
    session_id : str
        Session identifier for filenames
    formats : Optional[List[ExportFormat]], default=None
        Export formats, defaults to [CSV]

    Returns
    -------
    List[ExportResultDTO]
        List of export results

    Notes
    -----
    Can be used for exporting existing data without running the full
    workflow.
    """
    if formats is None:
        formats = [ExportFormat.CSV]

    results = []
    for fmt in formats:
        filename = f"biorempp_results_{session_id}"
        result = self._result_exporter.export(data, fmt, filename)
        results.append(result)

    return results
get_session_state
get_session_state(session_id: str) -> Optional[AnalysisSessionDTO]

Retrieve current session state.

Parameters:

Name Type Description Default
session_id str

Session identifier

required

Returns:

Type Description
Optional[AnalysisSessionDTO]

Session state or None if not found

Notes

Returns None if session doesn't exist.

Source code in src/application/services/analysis_orchestrator.py
def get_session_state(self, session_id: str) -> Optional[AnalysisSessionDTO]:
    """
    Retrieve current session state.

    Parameters
    ----------
    session_id : str
        Session identifier

    Returns
    -------
    Optional[AnalysisSessionDTO]
        Session state or None if not found

    Notes
    -----
    Returns None if session doesn't exist.
    """
    return self._sessions.get(session_id)
get_progress
get_progress(session_id: str) -> Optional[ProcessingProgressDTO]

Get current progress for a session.

Parameters:

Name Type Description Default
session_id str

Session identifier

required

Returns:

Type Description
Optional[ProcessingProgressDTO]

Current progress or None

Source code in src/application/services/analysis_orchestrator.py
def get_progress(self, session_id: str) -> Optional[ProcessingProgressDTO]:
    """
    Get current progress for a session.

    Parameters
    ----------
    session_id : str
        Session identifier

    Returns
    -------
    Optional[ProcessingProgressDTO]
        Current progress or None
    """
    return self._progress_tracker.get_progress(session_id)
clear_session
clear_session(session_id: str) -> None

Clear session data and cache.

Parameters:

Name Type Description Default
session_id str

Session identifier to clear

required
Notes

Removes session from memory and clears cache.

Source code in src/application/services/analysis_orchestrator.py
def clear_session(self, session_id: str) -> None:
    """
    Clear session data and cache.

    Parameters
    ----------
    session_id : str
        Session identifier to clear

    Notes
    -----
    Removes session from memory and clears cache.
    """
    if session_id in self._sessions:
        del self._sessions[session_id]

    # Clear cache for this session
    self._cache_service.clear()