Skip to content

Merge Service

merge_service

Merge Service.

Domain service to orchestrate merges with databases.

Classes:

Name Description
DatabaseRepository

Protocol defining repository interface

MergeService

Service coordinating merges with 4 databases

Classes

DatabaseRepository

Bases: Protocol

Protocol (interface) for database repositories.

Defines the contract that all database repositories must implement.

Notes

This is a Python Protocol (PEP 544) that allows duck typing while maintaining type safety. Concrete implementations will be in the Infrastructure layer.

Functions
load
load() -> Dict[str, Any]

Load data from the database.

Returns:

Type Description
Dict[str, Any]

Database data in dictionary format

Source code in src/domain/services/merge_service.py
def load(self) -> Dict[str, Any]:
    """
    Load data from the database.

    Returns
    -------
    Dict[str, Any]
        Database data in dictionary format
    """
    ...

MergeService

MergeService(biorempp_repo: DatabaseRepository, kegg_repo: DatabaseRepository, hadeg_repo: DatabaseRepository, toxcsm_repo: DatabaseRepository)

Domain service to orchestrate merges with databases.

Coordinates the process of merging the input dataset with the 4 system databases: BioRemPP, KEGG, HADEG, and ToxCSM.

Parameters:

Name Type Description Default
biorempp_repo DatabaseRepository

Repository for the BioRemPP database

required
kegg_repo DatabaseRepository

Repository for the KEGG database

required
hadeg_repo DatabaseRepository

Repository for the HADEG database

required
toxcsm_repo DatabaseRepository

Repository for the ToxCSM database

required
Notes

This service depends on repositories that will be injected, following the Dependency Inversion Principle (SOLID).

Initialize the service with the necessary repositories.

Parameters:

Name Type Description Default
biorempp_repo DatabaseRepository

BioRemPP repository

required
kegg_repo DatabaseRepository

KEGG repository

required
hadeg_repo DatabaseRepository

HADEG repository

required
toxcsm_repo DatabaseRepository

ToxCSM repository

required
Source code in src/domain/services/merge_service.py
def __init__(
    self,
    biorempp_repo: DatabaseRepository,
    kegg_repo: DatabaseRepository,
    hadeg_repo: DatabaseRepository,
    toxcsm_repo: DatabaseRepository,
):
    """
    Initialize the service with the necessary repositories.

    Parameters
    ----------
    biorempp_repo : DatabaseRepository
        BioRemPP repository
    kegg_repo : DatabaseRepository
        KEGG repository
    hadeg_repo : DatabaseRepository
        HADEG repository
    toxcsm_repo : DatabaseRepository
        ToxCSM repository
    """
    self.biorempp_repo = biorempp_repo
    self.kegg_repo = kegg_repo
    self.hadeg_repo = hadeg_repo
    self.toxcsm_repo = toxcsm_repo

    logger.info(
        "MergeService initialized",
        extra={"repositories": ["biorempp", "kegg", "hadeg", "toxcsm"]},
    )
Functions
merge_all
merge_all(dataset: Dataset) -> MergedData

Execute all merges sequentially.

Parameters:

Name Type Description Default
dataset Dataset

Input dataset with samples and KOs

required

Returns:

Type Description
MergedData

Entity with all merge results

Raises:

Type Description
ValueError

If any mandatory merge fails

Notes

The process follows this order: 1. Merge with BioRemPP (mandatory) 2. Merge with KEGG (mandatory) 3. Merge with HADEG (mandatory) 4. Merge with ToxCSM (optional, depends on compounds)

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.INFO)
@log_performance(threshold_ms=1000.0)
def merge_all(self, dataset: Dataset) -> MergedData:
    """
    Execute all merges sequentially.

    Parameters
    ----------
    dataset : Dataset
        Input dataset with samples and KOs

    Returns
    -------
    MergedData
        Entity with all merge results

    Raises
    ------
    ValueError
        If any mandatory merge fails

    Notes
    -----
    The process follows this order:
    1. Merge with BioRemPP (mandatory)
    2. Merge with KEGG (mandatory)
    3. Merge with HADEG (mandatory)
    4. Merge with ToxCSM (optional, depends on compounds)
    """
    logger.info(
        "Starting merge process",
        extra={
            "sample_count": dataset.total_samples,
            "ko_count": dataset.total_kos,
        },
    )

    # Convert dataset to dictionary format
    input_data = dataset.to_dict()

    # Merge 1: BioRemPP (main base)
    logger.debug("Starting BioRemPP merge")
    biorempp_db = self.biorempp_repo.load()
    biorempp_merged = self._merge_by_ko(input_data, biorempp_db)
    logger.debug("BioRemPP merge completed")

    # Merge 2: KEGG
    logger.debug("Starting KEGG merge")
    kegg_db = self.kegg_repo.load()
    kegg_merged = self._merge_by_ko(input_data, kegg_db)
    logger.debug("KEGG merge completed")

    # Merge 3: HADEG
    logger.debug("Starting HADEG merge")
    hadeg_db = self.hadeg_repo.load()
    hadeg_merged = self._merge_by_ko(input_data, hadeg_db)
    logger.debug("HADEG merge completed")

    # Merge 4: ToxCSM (uses compounds from biorempp)
    logger.debug("Starting ToxCSM merge")
    toxcsm_db = self.toxcsm_repo.load()
    toxcsm_merged = self._merge_toxcsm(biorempp_merged, toxcsm_db)
    logger.debug("ToxCSM merge completed")

    # Create MergedData entity
    merged_data = MergedData(
        original_dataset=dataset,
        biorempp_data=biorempp_merged,
        kegg_data=kegg_merged,
        hadeg_data=hadeg_merged,
        toxcsm_data=toxcsm_merged,
    )

    # Validate result
    try:
        merged_data.validate()
        logger.info(
            "Merge process completed successfully",
            extra={"is_fully_merged": merged_data.is_fully_merged},
        )
    except ValueError as e:
        logger.error("Merge validation failed", extra={"error": str(e)})
        raise

    return merged_data
merge_biorempp
merge_biorempp(dataset: Dataset) -> Dict[str, Any]

Execute only the merge with BioRemPP.

Parameters:

Name Type Description Default
dataset Dataset

Input dataset

required

Returns:

Type Description
Dict[str, Any]

Data merged with BioRemPP

Notes

Useful for partial or incremental processing.

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.INFO)
def merge_biorempp(self, dataset: Dataset) -> Dict[str, Any]:
    """
    Execute only the merge with BioRemPP.

    Parameters
    ----------
    dataset : Dataset
        Input dataset

    Returns
    -------
    Dict[str, Any]
        Data merged with BioRemPP

    Notes
    -----
    Useful for partial or incremental processing.
    """
    logger.info(
        "Starting BioRemPP-only merge",
        extra={"sample_count": dataset.total_samples},
    )

    input_data = dataset.to_dict()
    biorempp_db = self.biorempp_repo.load()
    result = self._merge_by_ko(input_data, biorempp_db)

    logger.info("BioRemPP merge completed")
    return result
get_merge_statistics
get_merge_statistics(merged_data: MergedData) -> Dict[str, Any]

Calculate statistics about the merges performed.

Parameters:

Name Type Description Default
merged_data MergedData

Merged data

required

Returns:

Type Description
Dict[str, Any]

Merge statistics

Source code in src/domain/services/merge_service.py
@log_execution(level=logging.DEBUG)
def get_merge_statistics(self, merged_data: MergedData) -> Dict[str, Any]:
    """
    Calculate statistics about the merges performed.

    Parameters
    ----------
    merged_data : MergedData
        Merged data

    Returns
    -------
    Dict[str, Any]
        Merge statistics
    """
    status = merged_data.get_merge_status()

    stats = {
        "total_databases": 4,
        "successful_merges": sum(status.values()),
        "merge_status": status,
        "is_fully_merged": merged_data.is_fully_merged,
        "total_samples": merged_data.original_dataset.total_samples,
        "total_kos": merged_data.original_dataset.total_kos,
    }

    logger.debug(
        "Merge statistics calculated",
        extra={
            "successful_merges": stats["successful_merges"],
            "is_fully_merged": stats["is_fully_merged"],
        },
    )

    return stats

Functions