Skip to content

BioRemPP Repository

biorempp_repository

BioRemPP Repository - Bioremediation Database Access.

Provides repository implementation for accessing BioRemPP database containing bioremediation information mapped to KEGG Orthology identifiers.

Classes:

Name Description
BioRemPPRepository

Repository for BioRemPP bioremediation database

Classes

BioRemPPRepository

BioRemPPRepository(filepath: Path = Path('data/databases/biorempp_db.csv'), encoding: str = 'utf-8', separator: str = ';')

Bases: CSVDatabaseRepository

Repository for BioRemPP bioremediation database.

Provides access to bioremediation data mapped to KEGG Orthology IDs. Database file: data/databases/biorempp_db.csv

Attributes:

Name Type Description
filepath Path

Path to BioRemPP database CSV file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

Required columns: ['ko']

Initialize BioRemPP repository.

Parameters:

Name Type Description Default
filepath Path

Path to BioRemPP database CSV file.

Path('data/databases/biorempp_db.csv')
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
Source code in src/infrastructure/persistence/biorempp_repository.py
def __init__(
    self,
    filepath: Path = Path("data/databases/biorempp_db.csv"),
    encoding: str = "utf-8",
    separator: str = ";",
):
    """
    Initialize BioRemPP repository.

    Parameters
    ----------
    filepath : Path, default=Path('data/databases/biorempp_db.csv')
        Path to BioRemPP database CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    """
    super().__init__(
        filepath=filepath,
        encoding=encoding,
        separator=separator,
        required_columns=["ko"],  # Minimum required column
    )
Functions
load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }