Skip to content

Domain Services

Domain Services encapsulate business logic that doesn't naturally fit within entities or value objects. They orchestrate complex operations across multiple domain objects.


MergeService

MergeService

MergeService(biorempp_repo: DatabaseRepository, kegg_repo: DatabaseRepository, hadeg_repo: DatabaseRepository, toxcsm_repo: DatabaseRepository)

Domain service to orchestrate merges with databases.

Coordinates the process of merging the input dataset with the 4 system databases: BioRemPP, KEGG, HADEG, and ToxCSM.

Parameters:

Name Type Description Default
biorempp_repo DatabaseRepository

Repository for the BioRemPP database

required
kegg_repo DatabaseRepository

Repository for the KEGG database

required
hadeg_repo DatabaseRepository

Repository for the HADEG database

required
toxcsm_repo DatabaseRepository

Repository for the ToxCSM database

required
Notes

This service depends on repositories that will be injected, following the Dependency Inversion Principle (SOLID).

Initialize the service with the necessary repositories.

Parameters:

Name Type Description Default
biorempp_repo DatabaseRepository

BioRemPP repository

required
kegg_repo DatabaseRepository

KEGG repository

required
hadeg_repo DatabaseRepository

HADEG repository

required
toxcsm_repo DatabaseRepository

ToxCSM repository

required
Source code in src/domain/services/merge_service.py
def __init__(
    self,
    biorempp_repo: DatabaseRepository,
    kegg_repo: DatabaseRepository,
    hadeg_repo: DatabaseRepository,
    toxcsm_repo: DatabaseRepository,
):
    """
    Initialize the service with the necessary repositories.

    Parameters
    ----------
    biorempp_repo : DatabaseRepository
        BioRemPP repository
    kegg_repo : DatabaseRepository
        KEGG repository
    hadeg_repo : DatabaseRepository
        HADEG repository
    toxcsm_repo : DatabaseRepository
        ToxCSM repository
    """
    self.biorempp_repo = biorempp_repo
    self.kegg_repo = kegg_repo
    self.hadeg_repo = hadeg_repo
    self.toxcsm_repo = toxcsm_repo

    logger.info(
        "MergeService initialized",
        extra={"repositories": ["biorempp", "kegg", "hadeg", "toxcsm"]},
    )

Functions

merge_all
merge_all(dataset: Dataset) -> MergedData

Execute all merges sequentially.

Parameters:

Name Type Description Default
dataset Dataset

Input dataset with samples and KOs

required

Returns:

Type Description
MergedData

Entity with all merge results

Raises:

Type Description
ValueError

If any mandatory merge fails

Notes

The process follows this order: 1. Merge with BioRemPP (mandatory) 2. Merge with KEGG (mandatory) 3. Merge with HADEG (mandatory) 4. Merge with ToxCSM (optional, depends on compounds)

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.INFO)
@log_performance(threshold_ms=1000.0)
def merge_all(self, dataset: Dataset) -> MergedData:
    """
    Execute all merges sequentially.

    Parameters
    ----------
    dataset : Dataset
        Input dataset with samples and KOs

    Returns
    -------
    MergedData
        Entity with all merge results

    Raises
    ------
    ValueError
        If any mandatory merge fails

    Notes
    -----
    The process follows this order:
    1. Merge with BioRemPP (mandatory)
    2. Merge with KEGG (mandatory)
    3. Merge with HADEG (mandatory)
    4. Merge with ToxCSM (optional, depends on compounds)
    """
    logger.info(
        "Starting merge process",
        extra={
            "sample_count": dataset.total_samples,
            "ko_count": dataset.total_kos,
        },
    )

    # Convert dataset to dictionary format
    input_data = dataset.to_dict()

    # Merge 1: BioRemPP (main base)
    logger.debug("Starting BioRemPP merge")
    biorempp_db = self.biorempp_repo.load()
    biorempp_merged = self._merge_by_ko(input_data, biorempp_db)
    logger.debug("BioRemPP merge completed")

    # Merge 2: KEGG
    logger.debug("Starting KEGG merge")
    kegg_db = self.kegg_repo.load()
    kegg_merged = self._merge_by_ko(input_data, kegg_db)
    logger.debug("KEGG merge completed")

    # Merge 3: HADEG
    logger.debug("Starting HADEG merge")
    hadeg_db = self.hadeg_repo.load()
    hadeg_merged = self._merge_by_ko(input_data, hadeg_db)
    logger.debug("HADEG merge completed")

    # Merge 4: ToxCSM (uses compounds from biorempp)
    logger.debug("Starting ToxCSM merge")
    toxcsm_db = self.toxcsm_repo.load()
    toxcsm_merged = self._merge_toxcsm(biorempp_merged, toxcsm_db)
    logger.debug("ToxCSM merge completed")

    # Create MergedData entity
    merged_data = MergedData(
        original_dataset=dataset,
        biorempp_data=biorempp_merged,
        kegg_data=kegg_merged,
        hadeg_data=hadeg_merged,
        toxcsm_data=toxcsm_merged,
    )

    # Validate result
    try:
        merged_data.validate()
        logger.info(
            "Merge process completed successfully",
            extra={"is_fully_merged": merged_data.is_fully_merged},
        )
    except ValueError as e:
        logger.error("Merge validation failed", extra={"error": str(e)})
        raise

    return merged_data
merge_biorempp
merge_biorempp(dataset: Dataset) -> Dict[str, Any]

Execute only the merge with BioRemPP.

Parameters:

Name Type Description Default
dataset Dataset

Input dataset

required

Returns:

Type Description
Dict[str, Any]

Data merged with BioRemPP

Notes

Useful for partial or incremental processing.

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.INFO)
def merge_biorempp(self, dataset: Dataset) -> Dict[str, Any]:
    """
    Execute only the merge with BioRemPP.

    Parameters
    ----------
    dataset : Dataset
        Input dataset

    Returns
    -------
    Dict[str, Any]
        Data merged with BioRemPP

    Notes
    -----
    Useful for partial or incremental processing.
    """
    logger.info(
        "Starting BioRemPP-only merge",
        extra={"sample_count": dataset.total_samples},
    )

    input_data = dataset.to_dict()
    biorempp_db = self.biorempp_repo.load()
    result = self._merge_by_ko(input_data, biorempp_db)

    logger.info("BioRemPP merge completed")
    return result
get_merge_statistics
get_merge_statistics(merged_data: MergedData) -> Dict[str, Any]

Calculate statistics about the merges performed.

Parameters:

Name Type Description Default
merged_data MergedData

Merged data

required

Returns:

Type Description
Dict[str, Any]

Merge statistics

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.DEBUG)
def get_merge_statistics(self, merged_data: MergedData) -> Dict[str, Any]:
    """
    Calculate statistics about the merges performed.

    Parameters
    ----------
    merged_data : MergedData
        Merged data

    Returns
    -------
    Dict[str, Any]
        Merge statistics
    """
    status = merged_data.get_merge_status()

    stats = {
        "total_databases": 4,
        "successful_merges": sum(status.values()),
        "merge_status": status,
        "is_fully_merged": merged_data.is_fully_merged,
        "total_samples": merged_data.original_dataset.total_samples,
        "total_kos": merged_data.original_dataset.total_kos,
    }

    logger.debug(
        "Merge statistics calculated",
        extra={
            "successful_merges": stats["successful_merges"],
            "is_fully_merged": stats["is_fully_merged"],
        },
    )

    return stats

ValidationService

ValidationService

Domain service for complex validations.

Implements validation rules that involve multiple entities or business logic that does not belong to a specific entity.

Notes

This is a Domain Service that encapsulates complex validation logic, keeping the entities simple and focused. All methods are static and stateless.

Functions

validate_raw_input staticmethod
validate_raw_input(content: str) -> Tuple[bool, str]

Validate raw content from sample upload.

Parameters:

Name Type Description Default
content str

Content of the samples file in BioRemPP format

required

Returns:

Type Description
Tuple[bool, str]

Tuple (is_valid, error_message) where is_valid is True if the content is valid, and error_message contains the error description if any

Notes

Expected format: - Lines starting with '>' indicate the start of a new sample - Lines starting with 'K' are KO entries

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.INFO)
def validate_raw_input(content: str) -> Tuple[bool, str]:
    """
    Validate raw content from sample upload.

    Parameters
    ----------
    content : str
        Content of the samples file in BioRemPP format

    Returns
    -------
    Tuple[bool, str]
        Tuple (is_valid, error_message) where is_valid is True if
        the content is valid, and error_message contains the
        error description if any

    Notes
    -----
    Expected format:
    - Lines starting with '>' indicate the start of a new sample
    - Lines starting with 'K' are KO entries
    """
    if not content or not content.strip():
        logger.warning("Validation failed: Empty file content")
        return False, "Empty file content"

    lines = content.strip().split("\n")

    # Must have at least 1 sample and 1 KO
    if len(lines) < 2:
        logger.warning(
            "Validation failed: Insufficient content",
            extra={"line_count": len(lines)},
        )
        return (
            False,
            "File must contain at least one sample and one KO",
        )

    # First line must be a sample (starts with '>')
    first_line = lines[0].strip()
    if not first_line.startswith(">"):
        logger.warning(
            "Validation failed: Invalid first line format",
            extra={"first_line": first_line[:50]},
        )
        return (
            False,
            "File must start with sample identifier (>SampleName)",
        )

    # Validate structure line by line
    sample_count = 0
    ko_count = 0
    line_number = 0

    for line in lines:
        line_number += 1
        line = line.strip()

        if not line:
            continue

        if line.startswith(">"):
            sample_count += 1
            # Check if sample name is not empty
            sample_name = line[1:].strip()
            if not sample_name:
                error_msg = f"Line {line_number}: Sample name cannot be empty"
                logger.warning(
                    "Validation failed: Empty sample name",
                    extra={"line_number": line_number},
                )
                return (
                    False,
                    error_msg,
                )

        elif line.startswith("K"):
            ko_count += 1
            # Validate KO format
            try:
                KO(line.strip())
            except ValueError as e:
                error_msg = f"Line {line_number}: {str(e)}"
                logger.warning(
                    "Validation failed: Invalid KO format",
                    extra={
                        "line_number": line_number,
                        "ko_value": line.strip(),
                        "error": str(e),
                    },
                )
                return (
                    False,
                    error_msg,
                )

        else:
            error_msg = f"Line {line_number}: Invalid line format: {line}"
            logger.warning(
                "Validation failed: Invalid line format",
                extra={"line_number": line_number, "line_content": line[:50]},
            )
            return (
                False,
                error_msg,
            )

    if sample_count == 0:
        logger.warning("Validation failed: No samples found in file")
        return False, "No samples found in file"

    if ko_count == 0:
        logger.warning("Validation failed: No KO entries found in file")
        return False, "No KO entries found in file"

    logger.info(
        "Raw input validation successful",
        extra={
            "sample_count": sample_count,
            "ko_count": ko_count,
            "total_lines": len(lines),
        },
    )
    return True, ""
validate_dataset staticmethod
validate_dataset(dataset: Dataset) -> Tuple[bool, str]

Validate a complete dataset.

Parameters:

Name Type Description Default
dataset Dataset

Dataset to be validated

required

Returns:

Type Description
Tuple[bool, str]

Tuple (is_valid, error_message)

Notes

Validates both the dataset structure and each sample individually, ensuring complete consistency.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.INFO)
def validate_dataset(dataset: Dataset) -> Tuple[bool, str]:
    """
    Validate a complete dataset.

    Parameters
    ----------
    dataset : Dataset
        Dataset to be validated

    Returns
    -------
    Tuple[bool, str]
        Tuple (is_valid, error_message)

    Notes
    -----
    Validates both the dataset structure and each sample
    individually, ensuring complete consistency.
    """
    if dataset.total_samples == 0:
        logger.warning("Dataset validation failed: No samples in dataset")
        return False, "Dataset has no samples"

    logger.debug(
        "Validating dataset", extra={"sample_count": dataset.total_samples}
    )

    # Validate each sample
    for sample in dataset.samples:
        try:
            sample.validate()
        except ValueError as e:
            logger.warning(
                "Dataset validation failed: Invalid sample",
                extra={"sample_id": str(sample.id), "error": str(e)},
            )
            return False, str(e)

    logger.info(
        "Dataset validation successful",
        extra={
            "sample_count": dataset.total_samples,
            "total_ko_count": sum(len(s.ko_list) for s in dataset.samples),
        },
    )
    return True, ""
validate_ko_list staticmethod
validate_ko_list(ko_list: List[str]) -> Tuple[bool, str, List[KO]]

Validate and convert a list of strings to a list of KOs.

Parameters:

Name Type Description Default
ko_list List[str]

List of strings representing KOs

required

Returns:

Type Description
Tuple[bool, str, List[KO]]

Tuple (is_valid, error_message, ko_objects) where ko_objects contains the validated KOs if successful

Notes

This method is useful for validating user inputs before creating domain entities.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def validate_ko_list(ko_list: List[str]) -> Tuple[bool, str, List[KO]]:
    """
    Validate and convert a list of strings to a list of KOs.

    Parameters
    ----------
    ko_list : List[str]
        List of strings representing KOs

    Returns
    -------
    Tuple[bool, str, List[KO]]
        Tuple (is_valid, error_message, ko_objects) where ko_objects
        contains the validated KOs if successful

    Notes
    -----
    This method is useful for validating user inputs
    before creating domain entities.
    """
    if not ko_list:
        logger.warning("KO list validation failed: Empty list")
        return False, "KO list cannot be empty", []

    logger.debug("Validating KO list", extra={"ko_count": len(ko_list)})

    validated_kos = []
    for idx, ko_str in enumerate(ko_list):
        try:
            ko = KO(ko_str.strip())
            validated_kos.append(ko)
        except ValueError as e:
            logger.warning(
                "KO list validation failed: Invalid KO",
                extra={"index": idx, "ko_value": ko_str, "error": str(e)},
            )
            return False, str(e), []

    logger.info(
        "KO list validation successful",
        extra={"validated_count": len(validated_kos)},
    )
    return True, "", validated_kos
check_duplicate_samples staticmethod
check_duplicate_samples(dataset: Dataset) -> Tuple[bool, List[str]]

Check for duplicate samples in the dataset.

Parameters:

Name Type Description Default
dataset Dataset

Dataset to be checked

required

Returns:

Type Description
Tuple[bool, List[str]]

Tuple (has_duplicates, duplicate_ids) where has_duplicates is True if there are duplicates, and duplicate_ids contains the duplicate IDs

Notes

Duplicate samples may indicate an error in the input file or incorrect processing.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def check_duplicate_samples(dataset: Dataset) -> Tuple[bool, List[str]]:
    """
    Check for duplicate samples in the dataset.

    Parameters
    ----------
    dataset : Dataset
        Dataset to be checked

    Returns
    -------
    Tuple[bool, List[str]]
        Tuple (has_duplicates, duplicate_ids) where has_duplicates
        is True if there are duplicates, and duplicate_ids contains
        the duplicate IDs

    Notes
    -----
    Duplicate samples may indicate an error in the input file
    or incorrect processing.
    """
    logger.debug(
        "Checking for duplicate samples",
        extra={"total_samples": dataset.total_samples},
    )

    seen_ids = set()
    duplicates = []

    for sample in dataset.samples:
        sample_id_str = str(sample.id)
        if sample_id_str in seen_ids:
            duplicates.append(sample_id_str)
        seen_ids.add(sample_id_str)

    has_duplicates = len(duplicates) > 0

    if has_duplicates:
        logger.warning(
            "Duplicate samples found",
            extra={"duplicate_count": len(duplicates), "duplicate_ids": duplicates},
        )
    else:
        logger.debug("No duplicate samples found")

    return has_duplicates, duplicates
validate_file_size staticmethod
validate_file_size(size_bytes: int, max_bytes: int) -> Tuple[bool, str]

Validate file size against maximum limit.

Parameters:

Name Type Description Default
size_bytes int

File size in bytes

required
max_bytes int

Maximum allowed size in bytes

required

Returns:

Type Description
Tuple[bool, str]

Tuple (is_valid, error_message)

Notes

Provides user-friendly error messages with sizes in MB. Logs validation failures with detailed context.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def validate_file_size(size_bytes: int, max_bytes: int) -> Tuple[bool, str]:
    """
    Validate file size against maximum limit.

    Parameters
    ----------
    size_bytes : int
        File size in bytes
    max_bytes : int
        Maximum allowed size in bytes

    Returns
    -------
    Tuple[bool, str]
        Tuple (is_valid, error_message)

    Notes
    -----
    Provides user-friendly error messages with sizes in MB.
    Logs validation failures with detailed context.
    """
    if size_bytes > max_bytes:
        size_mb = size_bytes / (1024 * 1024)
        max_mb = max_bytes / (1024 * 1024)
        error_msg = (
            f"File size ({size_mb:.2f} MB) exceeds maximum "
            f"allowed size ({max_mb:.0f} MB)"
        )
        logger.warning(
            "File size validation failed",
            extra={
                "size_bytes": size_bytes,
                "size_mb": size_mb,
                "max_bytes": max_bytes,
                "max_mb": max_mb,
            },
        )
        return False, error_msg

    logger.debug(
        "File size validation passed",
        extra={"size_bytes": size_bytes, "max_bytes": max_bytes},
    )
    return True, ""
validate_sample_count staticmethod
validate_sample_count(sample_count: int, max_samples: int) -> Tuple[bool, str]

Validate number of samples against maximum limit.

Parameters:

Name Type Description Default
sample_count int

Number of samples in dataset

required
max_samples int

Maximum allowed samples

required

Returns:

Type Description
Tuple[bool, str]

Tuple (is_valid, error_message)

Notes

Provides clear error messages when limit is exceeded. Logs validation context for debugging.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def validate_sample_count(sample_count: int, max_samples: int) -> Tuple[bool, str]:
    """
    Validate number of samples against maximum limit.

    Parameters
    ----------
    sample_count : int
        Number of samples in dataset
    max_samples : int
        Maximum allowed samples

    Returns
    -------
    Tuple[bool, str]
        Tuple (is_valid, error_message)

    Notes
    -----
    Provides clear error messages when limit is exceeded.
    Logs validation context for debugging.
    """
    if sample_count > max_samples:
        error_msg = (
            f"Number of samples ({sample_count}) exceeds maximum "
            f"allowed ({max_samples})"
        )
        logger.warning(
            "Sample count validation failed",
            extra={"sample_count": sample_count, "max_samples": max_samples},
        )
        return False, error_msg

    logger.debug(
        "Sample count validation passed",
        extra={"sample_count": sample_count, "max_samples": max_samples},
    )
    return True, ""
validate_ko_count staticmethod
validate_ko_count(ko_count: int, max_kos: int) -> Tuple[bool, str]

Validate number of KO entries against maximum limit.

Parameters:

Name Type Description Default
ko_count int

Number of KO entries in dataset

required
max_kos int

Maximum allowed KO entries

required

Returns:

Type Description
Tuple[bool, str]

Tuple (is_valid, error_message)

Notes

Formats large numbers with commas for readability. Logs detailed context for validation failures.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def validate_ko_count(ko_count: int, max_kos: int) -> Tuple[bool, str]:
    """
    Validate number of KO entries against maximum limit.

    Parameters
    ----------
    ko_count : int
        Number of KO entries in dataset
    max_kos : int
        Maximum allowed KO entries

    Returns
    -------
    Tuple[bool, str]
        Tuple (is_valid, error_message)

    Notes
    -----
    Formats large numbers with commas for readability.
    Logs detailed context for validation failures.
    """
    if ko_count > max_kos:
        error_msg = (
            f"Number of KO entries ({ko_count:,}) exceeds maximum "
            f"allowed ({max_kos:,})"
        )
        logger.warning(
            "KO count validation failed",
            extra={"ko_count": ko_count, "max_kos": max_kos},
        )
        return False, error_msg

    logger.debug(
        "KO count validation passed",
        extra={"ko_count": ko_count, "max_kos": max_kos},
    )
    return True, ""
validate_encoding staticmethod
validate_encoding(content_bytes: bytes) -> Tuple[bool, str, str]

Validate and decode file content encoding.

Attempts UTF-8 decoding first, falls back to latin-1 if needed.

Parameters:

Name Type Description Default
content_bytes bytes

Raw file content

required

Returns:

Type Description
Tuple[bool, str, str]

Tuple (is_valid, decoded_content, error_message) - is_valid: True if decoding succeeded - decoded_content: Decoded string (empty if failed) - error_message: Error description (empty if successful)

Notes

Encoding priority: 1. UTF-8 (preferred) 2. Latin-1 (fallback)

Logs warnings when fallback encoding is used. Returns user-friendly error messages.

Source code in src/domain/services/validation_service.py
@staticmethod
@log_execution(level=logging.DEBUG)
def validate_encoding(content_bytes: bytes) -> Tuple[bool, str, str]:
    """
    Validate and decode file content encoding.

    Attempts UTF-8 decoding first, falls back to latin-1 if needed.

    Parameters
    ----------
    content_bytes : bytes
        Raw file content

    Returns
    -------
    Tuple[bool, str, str]
        Tuple (is_valid, decoded_content, error_message)
        - is_valid: True if decoding succeeded
        - decoded_content: Decoded string (empty if failed)
        - error_message: Error description (empty if successful)

    Notes
    -----
    Encoding priority:
    1. UTF-8 (preferred)
    2. Latin-1 (fallback)

    Logs warnings when fallback encoding is used.
    Returns user-friendly error messages.
    """
    # Try UTF-8 first
    try:
        decoded = content_bytes.decode("utf-8")
        logger.debug("Content decoded as UTF-8")
        return True, decoded, ""
    except UnicodeDecodeError as e:
        logger.warning(f"UTF-8 decoding failed: {e}", extra={"error": str(e)})

    # Try latin-1 as fallback
    try:
        decoded = content_bytes.decode("latin-1")
        logger.warning(
            "Content decoded as latin-1 (not UTF-8)", extra={"encoding": "latin-1"}
        )
        return True, decoded, ""
    except UnicodeDecodeError as e:
        error_msg = "Unable to decode file. Please ensure file is UTF-8 encoded."
        logger.error(f"All encoding attempts failed: {e}", extra={"error": str(e)})
        return False, "", error_msg


Domain Service Patterns

When to Use Domain Services

Use domain services when:

  1. Multi-Entity Operations: Logic spans multiple aggregates
  2. Complex Business Rules: Too complex for a single entity
  3. External Dependencies: Requires external data or calculations
  4. Stateless Operations: No state to maintain