Skip to content

Data Processor

data_processor

Data Processor - Orchestrate Data Processing and Merge Operations.

Provides the main orchestrator for processing user data, coordinating dataset merging with multiple databases, and preparing results. Acts as the central coordinator for the entire processing pipeline.

Implements defensive programming with try/except per stage, circuit breaker, DataFrame validation, timeout handling, retry logic, and structured logging. All settings are loaded from config.settings and can be overridden via environment variables.

Classes:

Name Description
DataProcessor

Orchestrates multi-stage data processing workflow

CircuitBreaker

Simple circuit breaker for fault tolerance

Classes

CircuitBreaker

CircuitBreaker(failure_threshold: int = CIRCUIT_BREAKER_THRESHOLD, timeout: int = CIRCUIT_BREAKER_TIMEOUT)

Simple circuit breaker to prevent cascading failures.

Attributes:

Name Type Description
failure_threshold int

Number of failures before opening circuit

timeout int

Seconds before attempting to close circuit

failures int

Current failure count

last_failure_time float

Timestamp of last failure

is_open bool

Whether circuit is currently open

Initialize circuit breaker.

Source code in src/application/core/data_processor.py
def __init__(
    self,
    failure_threshold: int = CIRCUIT_BREAKER_THRESHOLD,
    timeout: int = CIRCUIT_BREAKER_TIMEOUT,
):
    """Initialize circuit breaker."""
    self.failure_threshold = failure_threshold
    self.timeout = timeout
    self.failures = 0
    self.last_failure_time: Optional[float] = None
    self._is_open = False
Attributes
is_open property
is_open: bool

Check if circuit is open.

Functions
record_failure
record_failure() -> None

Record a failure.

Source code in src/application/core/data_processor.py
def record_failure(self) -> None:
    """Record a failure."""
    self.failures += 1
    self.last_failure_time = time.time()

    if self.failures >= self.failure_threshold:
        self._is_open = True
        logger.error(
            f"Circuit breaker opened after {self.failures} failures",
            extra={"failures": self.failures},
        )
record_success
record_success() -> None

Record a success.

Source code in src/application/core/data_processor.py
def record_success(self) -> None:
    """Record a success."""
    self.failures = 0
    self._is_open = False

DataProcessor

DataProcessor(cache_service: CacheService, progress_tracker: ProgressTracker, merge_service: Optional[MergeService] = None, timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS)

Orchestrate data processing with robust error handling.

Coordinates the entire processing pipeline including cache checking, progress tracking, database merging, and result preparation with defensive programming practices.

Parameters:

Name Type Description Default
cache_service CacheService

Service for caching merge results

required
progress_tracker ProgressTracker

Tracker for progress updates

required
merge_service Optional[MergeService]

Domain merge service (uses default if None)

None
timeout_seconds int

Timeout for processing operations (default: 60)

DEFAULT_TIMEOUT_SECONDS

Attributes:

Name Type Description
_cache CacheService

Cache service instance

_tracker ProgressTracker

Progress tracker instance

_merge_service MergeService

Domain merge service

_timeout_seconds int

Timeout for operations

_circuit_breakers dict

Circuit breakers for each database

Methods:

Name Description
process

Process dataset through full pipeline

_merge_with_retry

Execute merge with retry logic

_validate_dataframe

Validate DataFrame is not empty

Notes

Processing pipeline with error handling: 1. Cache check 2. BioRemPP merge (with retry + circuit breaker) 3. KEGG merge (with retry + circuit breaker) 4. HADEG merge (with retry + circuit breaker) 5. ToxCSM merge (with retry + circuit breaker) 6. Result preparation + validation 7. Cache storage

Uses dependency injection for testability.

Initialize DataProcessor with dependencies.

Parameters:

Name Type Description Default
cache_service CacheService

Cache service for merge results

required
progress_tracker ProgressTracker

Progress tracker for UI updates

required
merge_service Optional[MergeService]

Merge service (uses default if None)

None
timeout_seconds int

Timeout for operations

DEFAULT_TIMEOUT_SECONDS
Source code in src/application/core/data_processor.py
def __init__(
    self,
    cache_service: CacheService,
    progress_tracker: ProgressTracker,
    merge_service: Optional[MergeService] = None,
    timeout_seconds: int = DEFAULT_TIMEOUT_SECONDS,
) -> None:
    """
    Initialize DataProcessor with dependencies.

    Parameters
    ----------
    cache_service : CacheService
        Cache service for merge results
    progress_tracker : ProgressTracker
        Progress tracker for UI updates
    merge_service : Optional[MergeService]
        Merge service (uses default if None)
    timeout_seconds : int
        Timeout for operations
    """
    self._cache = cache_service
    self._tracker = progress_tracker
    self._merge_service = (
        merge_service if merge_service is not None else MergeService
    )
    self._timeout_seconds = timeout_seconds

    # Circuit breakers for each database
    self._circuit_breakers = {
        "biorempp": CircuitBreaker(),
        "kegg": CircuitBreaker(),
        "hadeg": CircuitBreaker(),
        "toxcsm": CircuitBreaker(),
    }

    logger.info("DataProcessor initialized", extra={"timeout": timeout_seconds})
Functions
process
process(dataset: Dataset, session_id: str) -> MergedDataDTO

Process dataset through complete pipeline with error handling.

Parameters:

Name Type Description Default
dataset Dataset

Validated dataset to process

required
session_id str

Session identifier for tracking

required

Returns:

Type Description
MergedDataDTO

Result DTO with merged data

Raises:

Type Description
DataProcessingTimeoutError

If processing exceeds timeout

StageProcessingError

If a critical stage fails

Notes

Pipeline stages with error handling: 1. Check cache 2. BioRemPP merge (critical - with retry) 3. KEGG merge (with retry) 4. HADEG merge (with retry) 5. ToxCSM merge (with retry) 6. Result preparation + validation 7. Cache storage

Source code in src/application/core/data_processor.py
@log_performance()
def process(self, dataset: Dataset, session_id: str) -> MergedDataDTO:
    """
    Process dataset through complete pipeline with error handling.

    Parameters
    ----------
    dataset : Dataset
        Validated dataset to process
    session_id : str
        Session identifier for tracking

    Returns
    -------
    MergedDataDTO
        Result DTO with merged data

    Raises
    ------
    DataProcessingTimeoutError
        If processing exceeds timeout
    StageProcessingError
        If a critical stage fails

    Notes
    -----
    Pipeline stages with error handling:
    1. Check cache
    2. BioRemPP merge (critical - with retry)
    3. KEGG merge (with retry)
    4. HADEG merge (with retry)
    5. ToxCSM merge (with retry)
    6. Result preparation + validation
    7. Cache storage
    """
    logger.info(
        f"Starting processing for session: {session_id}",
        extra={"session_id": session_id},
    )
    start_time = time.time()

    try:
        # Use timeout context
        with timeout_handler(self._timeout_seconds):
            # Generate cache key
            cache_key = self._generate_cache_key(dataset)

            # Check cache
            cached_result = self._cache.get(cache_key)
            if cached_result is not None:
                logger.info(
                    "Cache hit - returning cached result",
                    extra={"cache_key": str(cache_key)[:16]},
                )
                self._tracker.complete()
                return cached_result

            logger.info("Cache miss - proceeding with processing")

            # Initialize result dictionary
            merge_results = {}

            # Stage 3: BioRemPP Merge (CRITICAL)
            merge_results["biorempp_df"] = self._process_stage(
                stage_num=3,
                stage_name="BioRemPP Database Merge",
                stage_desc="Merging with main database",
                merge_func=self._merge_biorempp,
                dataset=dataset,
                breaker_key="biorempp",
            )

            # Stage 4: KEGG Merge
            merge_results["kegg_df"] = self._process_stage(
                stage_num=4,
                stage_name="KEGG Database Merge",
                stage_desc="Merging with KEGG pathways",
                merge_func=self._merge_kegg,
                biorempp_df=merge_results["biorempp_df"],
                breaker_key="kegg",
            )

            # Stage 5: HADEG Merge
            merge_results["hadeg_df"] = self._process_stage(
                stage_num=5,
                stage_name="HADEG Database Merge",
                stage_desc="Merging with HADEG",
                merge_func=self._merge_hadeg,
                dataset=dataset,
                breaker_key="hadeg",
            )

            # Stage 6: ToxCSM Merge
            merge_results["toxcsm_df"] = self._process_stage(
                stage_num=6,
                stage_name="ToxCSM Database Merge",
                stage_desc="Merging with ToxCSM",
                merge_func=self._merge_toxcsm,
                biorempp_df=merge_results["biorempp_df"],
                breaker_key="toxcsm",
            )

            # Stage 7: Result Preparation
            logger.info("Starting result preparation")
            self._tracker.start_stage(7, "Result Preparation", "Preparing results")
            self._tracker.update_progress(50.0)

            processing_time = time.time() - start_time
            result_dto = self.prepare_result(
                merge_results, cache_key, processing_time
            )

            self._tracker.update_progress(100.0)

            # Stage 8: Finalization
            logger.info("Finalizing and caching results")
            self._tracker.start_stage(8, "Finalization", "Caching results")
            self._cache.set(cache_key, result_dto, ttl_seconds=3600)
            self._tracker.complete()

            logger.info(
                f"Processing completed successfully in {processing_time:.2f}s",
                extra={
                    "session_id": session_id,
                    "processing_time": processing_time,
                    "match_count": result_dto.match_count,
                },
            )

            return result_dto

    except DataProcessingTimeoutError as e:
        logger.error(
            f"Processing timeout: {str(e)}", extra={"session_id": session_id}
        )
        self._tracker.complete()
        raise

    except StageProcessingError as e:
        logger.error(
            f"Stage processing failed: {e.stage_name}",
            extra={
                "session_id": session_id,
                "stage": e.stage_name,
                "error": str(e.original_error),
            },
        )
        self._tracker.complete()
        raise

    except Exception as e:
        logger.exception(
            "Unexpected error during processing",
            exc_info=True,
            extra={"session_id": session_id},
        )
        self._tracker.complete()
        raise
prepare_result
prepare_result(merge_results: dict, cache_key: str, processing_time: float) -> MergedDataDTO

Prepare final result DTO from merge results.

Parameters:

Name Type Description Default
merge_results dict

Dictionary with merge results

required
cache_key str

Cache key for this result

required
processing_time float

Time taken for processing

required

Returns:

Type Description
MergedDataDTO

Final result DTO

Source code in src/application/core/data_processor.py
def prepare_result(
    self, merge_results: dict, cache_key: str, processing_time: float
) -> MergedDataDTO:
    """
    Prepare final result DTO from merge results.

    Parameters
    ----------
    merge_results : dict
        Dictionary with merge results
    cache_key : str
        Cache key for this result
    processing_time : float
        Time taken for processing

    Returns
    -------
    MergedDataDTO
        Final result DTO
    """
    biorempp_df = merge_results["biorempp_df"]
    match_count = len(biorempp_df)
    total_records = len(biorempp_df)  # After merge, these are matched records

    return MergedDataDTO(
        biorempp_data=biorempp_df,
        hadeg_data=merge_results.get("hadeg_df"),
        toxcsm_data=merge_results.get("toxcsm_df"),
        match_count=match_count,
        total_records=total_records,
        cache_key=cache_key,
        processing_time_seconds=processing_time,
    )

Functions

timeout_handler

timeout_handler(seconds: int = DEFAULT_TIMEOUT_SECONDS)

Context manager for timeout handling.

Parameters:

Name Type Description Default
seconds int

Timeout in seconds

DEFAULT_TIMEOUT_SECONDS

Raises:

Type Description
DataProcessingTimeoutError

If operation exceeds timeout

Notes
  • Uses signal.alarm for Unix-like systems
  • For Windows, timeout is advisory only
Source code in src/application/core/data_processor.py
@contextmanager
def timeout_handler(seconds: int = DEFAULT_TIMEOUT_SECONDS):
    """
    Context manager for timeout handling.

    Parameters
    ----------
    seconds : int
        Timeout in seconds

    Raises
    ------
    DataProcessingTimeoutError
        If operation exceeds timeout

    Notes
    -----
    - Uses signal.alarm for Unix-like systems
    - For Windows, timeout is advisory only
    """

    def timeout_signal_handler(signum, frame):
        raise DataProcessingTimeoutError(f"Operation timed out after {seconds} seconds")

    # Set timeout (Unix-like systems only)
    try:
        old_handler = signal.signal(signal.SIGALRM, timeout_signal_handler)
        signal.alarm(seconds)
    except (AttributeError, ValueError):
        # Windows or signal not available - log warning
        logger.warning("Signal-based timeout not available on this platform")
        old_handler = None

    try:
        yield
    finally:
        # Cancel alarm
        try:
            signal.alarm(0)
            if old_handler:
                signal.signal(signal.SIGALRM, old_handler)
        except (AttributeError, ValueError):
            pass