Application Services¶
Application Layer - Services Package.
This package contains application services that provide high-level business operations and coordinate between different layers of the application.
Modules¶
- Cache Service: Manages caching operations for improved performance
- Analysis Orchestrator: Orchestrates complex analysis workflows
- Progress Tracker: Tracks and reports processing progress
Package Overview¶
Application Services Module.
This module contains application-level services that provide cross-cutting concerns and coordinate complex operations across multiple domain entities. Services handle caching, progress tracking, and analysis orchestration.
Classes:
| Name | Description |
|---|---|
CacheService | Manages application-level caching with hash-based keys |
ProgressTracker | Tracks processing progress with weighted stages (simplified, no threading) |
AnalysisOrchestrator | Orchestrates complex multi-step analysis workflows |
Notes
Services in this module are stateless and use dependency injection. Progress tracking is simplified without threading, as Dash is single-threaded.
AnalysisOrchestrator¶
AnalysisOrchestrator ¶
AnalysisOrchestrator(upload_handler: UploadHandler, data_processor: DataProcessor, result_exporter: ResultExporter, cache_service: CacheService, progress_tracker: ProgressTracker)
Orchestrate the complete analysis workflow.
Coordinates the entire analysis pipeline from file upload through processing to export. Uses composition and dependency injection to maintain clean architecture principles.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
upload_handler | UploadHandler | Handler for file uploads | required |
data_processor | DataProcessor | Processor for data merging and analysis | required |
result_exporter | ResultExporter | Exporter for results | required |
cache_service | CacheService | Service for caching data | required |
progress_tracker | ProgressTracker | Tracker for progress updates | required |
Methods:
| Name | Description |
|---|---|
execute_workflow | Execute complete workflow from upload to export |
process_upload | Process file upload step |
process_data | Process data merging step |
export_results | Export results in multiple formats |
get_session_state | Retrieve current session state |
Notes
- Uses dependency injection for all operations
- Maintains session state via DTOs
- Handles errors gracefully with detailed messages
- Updates progress at each workflow step
- Caches intermediate results
Initialize the AnalysisOrchestrator with dependencies.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
upload_handler | UploadHandler | Handler for file uploads | required |
data_processor | DataProcessor | Processor for data merging | required |
result_exporter | ResultExporter | Exporter for results | required |
cache_service | CacheService | Service for caching | required |
progress_tracker | ProgressTracker | Tracker for progress | required |
Notes
All dependencies are injected for testability.
Source code in src/application/services/analysis_orchestrator.py
Functions¶
execute_workflow ¶
execute_workflow(content: str, filename: str, session_id: str, export_formats: Optional[List[ExportFormat]] = None) -> AnalysisSessionDTO
Execute the complete analysis workflow.
Coordinates the entire pipeline: 1. Upload and validate file 2. Process and merge data 3. Export results in requested formats 4. Update session state
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content | str | Base64-encoded file content | required |
filename | str | Original filename | required |
session_id | str | Unique session identifier | required |
export_formats | Optional[List[ExportFormat]] | Formats to export (CSV, Excel, JSON), defaults to [CSV] | None |
Returns:
| Type | Description |
|---|---|
AnalysisSessionDTO | Complete session state with results |
Notes
- Creates new session if doesn't exist
- Updates progress at each step
- Handles errors without stopping entire workflow
- Caches results for performance
Source code in src/application/services/analysis_orchestrator.py
147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 | |
process_upload ¶
Process file upload independently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content | str | Base64-encoded file content | required |
filename | str | Original filename | required |
Returns:
| Type | Description |
|---|---|
UploadResultDTO | Upload processing result |
Notes
Can be used for upload-only operations without full workflow execution.
Source code in src/application/services/analysis_orchestrator.py
process_data ¶
Process data merging independently.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
dataset | Dataset | Domain entity with samples | required |
session_id | str | Session identifier | required |
Returns:
| Type | Description |
|---|---|
MergedDataDTO | Processing result |
Notes
Can be used when upload is already complete and only processing is needed.
Source code in src/application/services/analysis_orchestrator.py
export_results ¶
export_results(data: DataFrame, session_id: str, formats: Optional[List[ExportFormat]] = None) -> List[ExportResultDTO]
Export results in multiple formats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data | DataFrame | Data to export | required |
session_id | str | Session identifier for filenames | required |
formats | Optional[List[ExportFormat]] | Export formats, defaults to [CSV] | None |
Returns:
| Type | Description |
|---|---|
List[ExportResultDTO] | List of export results |
Notes
Can be used for exporting existing data without running the full workflow.
Source code in src/application/services/analysis_orchestrator.py
get_session_state ¶
Retrieve current session state.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id | str | Session identifier | required |
Returns:
| Type | Description |
|---|---|
Optional[AnalysisSessionDTO] | Session state or None if not found |
Notes
Returns None if session doesn't exist.
Source code in src/application/services/analysis_orchestrator.py
get_progress ¶
Get current progress for a session.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id | str | Session identifier | required |
Returns:
| Type | Description |
|---|---|
Optional[ProcessingProgressDTO] | Current progress or None |
Source code in src/application/services/analysis_orchestrator.py
clear_session ¶
Clear session data and cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id | str | Session identifier to clear | required |
Notes
Removes session from memory and clears cache.
Source code in src/application/services/analysis_orchestrator.py
AnalysisSessionDTO¶
AnalysisSessionDTO dataclass ¶
AnalysisSessionDTO(session_id: str, upload_result: Optional[UploadResultDTO], processing_result: Optional[MergedDataDTO], export_results: List[ExportResultDTO], created_at: str, is_complete: bool)
Data Transfer Object for analysis session state.
Attributes:
| Name | Type | Description |
|---|---|---|
session_id | str | Unique identifier for the session |
upload_result | Optional[UploadResultDTO] | Result of file upload |
processing_result | Optional[MergedDataDTO] | Result of data processing |
export_results | List[ExportResultDTO] | List of export operations performed |
created_at | str | ISO format timestamp of session creation |
is_complete | bool | Whether analysis is complete |
CacheService¶
CacheService ¶
Manage application-level caching with hash-based keys.
Provides caching for expensive operations with automatic expiration, size limits, and hash-based key generation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_size | int | Maximum number of cached items | 100 |
default_ttl_seconds | int | Default time-to-live in seconds (1 hour) | 3600 |
Attributes:
| Name | Type | Description |
|---|---|---|
_cache | Dict[str, Dict[str, Any]] | Internal cache storage |
_max_size | int | Maximum cache size |
_default_ttl | int | Default TTL |
Methods:
| Name | Description |
|---|---|
set | Store value in cache |
get | Retrieve value from cache |
delete | Remove value from cache |
clear | Clear entire cache |
generate_hash_key | Generate hash-based cache key |
has | Check if key exists and is valid |
size | Get current cache size |
Notes
Cache entries structure: { 'key': { 'value': cached_value, 'timestamp': creation_time, 'ttl': time_to_live_seconds } }
Uses SHA256 for hash key generation.
Initialize cache service.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_size | int | Maximum cached items | 100 |
default_ttl_seconds | int | Default TTL (1 hour) | 3600 |
Source code in src/application/services/cache_service.py
Functions¶
set ¶
Store value in cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key | str | Cache key | required |
value | Any | Value to cache | required |
ttl_seconds | Optional[int] | Time-to-live (uses default if None) | None |
Notes
If cache is full, removes oldest entry (FIFO eviction).
Source code in src/application/services/cache_service.py
get ¶
Retrieve value from cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key | str | Cache key | required |
Returns:
| Type | Description |
|---|---|
Optional[Any] | Cached value or None if not found/expired |
Notes
Automatically removes expired entries.
Source code in src/application/services/cache_service.py
delete ¶
Remove value from cache.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key | str | Cache key | required |
Returns:
| Type | Description |
|---|---|
bool | True if key was deleted, False if not found |
Source code in src/application/services/cache_service.py
clear ¶
generate_hash_key ¶
Generate SHA256 hash-based cache key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content | str | Content to hash (e.g., upload content or dataset identifier) | required |
Returns:
| Type | Description |
|---|---|
str | SHA256 hash hexadecimal string |
Notes
- Same content always generates same key (deterministic)
- Compatible with legacy cache key generation
Source code in src/application/services/cache_service.py
has ¶
Check if key exists and is valid (not expired).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key | str | Cache key | required |
Returns:
| Type | Description |
|---|---|
bool | True if key exists and not expired |
Source code in src/application/services/cache_service.py
size ¶
Get current cache size (number of entries).
Returns:
| Type | Description |
|---|---|
int | Number of cached entries |
ProgressTracker¶
ProgressTracker ¶
Simplified progress tracker for multi-stage processing.
This class tracks progress across 8 weighted processing stages without threading complexity, designed specifically for single-threaded Dash.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id | str | Unique session identifier for tracking | required |
Attributes:
| Name | Type | Description |
|---|---|---|
session_id | str | Session identifier |
_stages | Dict[int, str] | Stage number to description mapping |
_stage_weights | Dict[int, float] | Stage number to weight mapping (sums to 100.0) |
_current_stage | int | Current stage number (1-8) |
_current_message | str | Current processing message |
_start_time | float | Timestamp when processing started |
_error | Optional[str] | Error message if processing failed |
Methods:
| Name | Description |
|---|---|
start_stage | Start a new processing stage |
update_progress | Update progress within current stage |
complete | Mark processing as complete |
set_error | Set error message and halt progress |
get_progress | Get current progress as DTO |
calculate_overall_progress | Calculate weighted overall progress |
Notes
8 Processing Stages with Weights: 1. Input Validation (5%) 2. Data Parsing (10%) 3. BioRemPP Merge (30%) 4. KEGG Merge (20%) 5. HADEG Merge (15%) 6. ToxCSM Merge (10%) 7. Result Preparation (5%) 8. Finalization (5%)
Simplifications from Legacy: - NO threading.Lock (single-threaded Dash) - NO callback notifications (handled externally) - Simple dict storage instead of complex state machine - Weighted progress calculation preserved
Examples:
>>> tracker = ProgressTracker("session_1")
>>> tracker.start_stage(1, "Validation", "Validating input")
>>> tracker.update_progress(50.0, "Checked 3/5 samples")
>>> progress = tracker.get_progress()
'Validation'
Initialize simplified progress tracker.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
session_id | str | Unique session identifier | required |
Source code in src/application/services/progress_tracker.py
Functions¶
start_stage ¶
Start a new processing stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
stage | int | Stage number (1-8) | required |
stage_name | str | Stage description (optional, uses default if empty) | required |
message | str | Initial stage message | "" |
Raises:
| Type | Description |
|---|---|
ValueError | If stage number is invalid |
Examples:
>>> tracker = ProgressTracker("session_1")
>>> tracker.start_stage(1, "Validation", "Starting validation")
Source code in src/application/services/progress_tracker.py
update_progress ¶
Update progress within current stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
progress | float | Progress percentage within stage (0.0-100.0) | required |
message | Optional[str] | Optional progress message | None |
Raises:
| Type | Description |
|---|---|
ValueError | If progress is out of range |
Examples:
>>> tracker.start_stage(3, "Merge", "Merging data")
>>> tracker.update_progress(50.0, "Processed 500/1000 records")
Source code in src/application/services/progress_tracker.py
complete ¶
Mark processing as complete (100%).
Examples:
Source code in src/application/services/progress_tracker.py
set_error ¶
Set error message and halt progress.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
error | str | Error description | required |
Examples:
Source code in src/application/services/progress_tracker.py
get_progress ¶
Get current progress as DTO.
Returns:
| Type | Description |
|---|---|
ProcessingProgressDTO | Current progress information |
Examples:
>>> tracker.start_stage(3, "Merge", "Processing")
>>> tracker.update_progress(50.0)
>>> dto = tracker.get_progress()
>>> dto.current_stage
'BioRemPP Database Merge'
Source code in src/application/services/progress_tracker.py
calculate_overall_progress ¶
Calculate weighted overall progress across all stages.
Returns:
| Type | Description |
|---|---|
float | Overall progress percentage (0.0-100.0) |
Notes
Formula: Σ(completed_stages_weights) + (current_stage_weight * stage_progress%)
Examples:
>>> tracker.start_stage(3, "Merge", "Processing")
>>> tracker.update_progress(50.0) # 50% of stage 3
>>> overall = tracker.calculate_overall_progress()
>>> # Stages 1-2 complete (15%) + 50% of stage 3 (15% of 30%)
>>> round(overall, 1)
30.0