Skip to content

Persistence Layer

The Persistence Layer provides repository implementations for accessing external databases containing biological, pathway, and toxicity data.


Repository Implementations

KEGGRepository

KEGGRepository

KEGGRepository(filepath: Path = Path('data/databases/kegg_degradation_db.csv'), encoding: str = 'utf-8', separator: str = ';')

Bases: CSVDatabaseRepository

Repository for KEGG degradation pathways database.

Provides access to KEGG pathway data for degradation processes. Database file: data/databases/kegg_degradation_db.csv

Attributes:

Name Type Description
filepath Path

Path to KEGG database CSV file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

Required columns: ['ko', 'pathname']

Initialize KEGG repository.

Parameters:

Name Type Description Default
filepath Path

Path to KEGG database CSV file.

Path('data/databases/kegg_degradation_db.csv')
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
Source code in src/infrastructure/persistence/kegg_repository.py
def __init__(
    self,
    filepath: Path = Path("data/databases/kegg_degradation_db.csv"),
    encoding: str = "utf-8",
    separator: str = ";",
):
    """
    Initialize KEGG repository.

    Parameters
    ----------
    filepath : Path, default=Path('data/databases/kegg_degradation_db.csv')
        Path to KEGG database CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    """
    super().__init__(
        filepath=filepath,
        encoding=encoding,
        separator=separator,
        required_columns=["ko", "pathname"],  # Nome real da coluna no CSV
    )

Functions

load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }

BioRemPPRepository

BioRemPPRepository

BioRemPPRepository(filepath: Path = Path('data/databases/biorempp_db.csv'), encoding: str = 'utf-8', separator: str = ';')

Bases: CSVDatabaseRepository

Repository for BioRemPP bioremediation database.

Provides access to bioremediation data mapped to KEGG Orthology IDs. Database file: data/databases/biorempp_db.csv

Attributes:

Name Type Description
filepath Path

Path to BioRemPP database CSV file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

Required columns: ['ko']

Initialize BioRemPP repository.

Parameters:

Name Type Description Default
filepath Path

Path to BioRemPP database CSV file.

Path('data/databases/biorempp_db.csv')
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
Source code in src/infrastructure/persistence/biorempp_repository.py
def __init__(
    self,
    filepath: Path = Path("data/databases/biorempp_db.csv"),
    encoding: str = "utf-8",
    separator: str = ";",
):
    """
    Initialize BioRemPP repository.

    Parameters
    ----------
    filepath : Path, default=Path('data/databases/biorempp_db.csv')
        Path to BioRemPP database CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    """
    super().__init__(
        filepath=filepath,
        encoding=encoding,
        separator=separator,
        required_columns=["ko"],  # Minimum required column
    )

Functions

load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }

HADEGRepository

HADEGRepository

HADEGRepository(filepath: Path = Path('data/databases/hadeg_db.csv'), encoding: str = 'utf-8', separator: str = ';')

Bases: CSVDatabaseRepository

Repository for HADEG enzyme database.

Provides access to enzyme data for hydrocarbon degradation pathways. Database file: data/databases/hadeg_db.csv

Attributes:

Name Type Description
filepath Path

Path to HADEG database CSV file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

Required columns: ['ko', 'Gene', 'Pathway']

Initialize HADEG repository.

Parameters:

Name Type Description Default
filepath Path

Path to HADEG database CSV file.

Path('data/databases/hadeg_db.csv')
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
Source code in src/infrastructure/persistence/hadeg_repository.py
def __init__(
    self,
    filepath: Path = Path("data/databases/hadeg_db.csv"),
    encoding: str = "utf-8",
    separator: str = ";",
):
    """
    Initialize HADEG repository.

    Parameters
    ----------
    filepath : Path, default=Path('data/databases/hadeg_db.csv')
        Path to HADEG database CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    """
    super().__init__(
        filepath=filepath,
        encoding=encoding,
        separator=separator,
        required_columns=[
            "ko",
            "Gene",
            "Pathway",
        ],  # Nome real: Pathway com maiúscula
    )

Functions

load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }

ToxCSMRepository

ToxCSMRepository

ToxCSMRepository(filepath: Path = Path('data/databases/toxcsm_db.csv'), encoding: str = 'utf-8', separator: str = ';')

Bases: CSVDatabaseRepository

Repository for ToxCSM toxicity prediction database.

Provides access to compound-level toxicity predictions. Database file: data/databases/toxcsm_db.csv

Attributes:

Name Type Description
filepath Path

Path to ToxCSM database CSV file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

Required columns: ['cpd']

Methods:

Name Description
merge_with_compound_data

Merge compound data with toxicity predictions

Notes
  • Merges on 'cpd' column instead of 'ko' (compound-level data)

Initialize ToxCSM repository.

Parameters:

Name Type Description Default
filepath Path

Path to ToxCSM database CSV file.

Path('data/databases/toxcsm_db.csv')
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
Source code in src/infrastructure/persistence/toxcsm_repository.py
def __init__(
    self,
    filepath: Path = Path("data/databases/toxcsm_db.csv"),
    encoding: str = "utf-8",
    separator: str = ";",
):
    """
    Initialize ToxCSM repository.

    Parameters
    ----------
    filepath : Path, default=Path('data/databases/toxcsm_db.csv')
        Path to ToxCSM database CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    """
    super().__init__(
        filepath=filepath,
        encoding=encoding,
        separator=separator,
        required_columns=["cpd"],  # Nome real da coluna no CSV
    )

Functions

merge_with_compound_data
merge_with_compound_data(compound_df: DataFrame, on: str = 'cpd', how: str = 'left') -> pd.DataFrame

Merge compound data with toxicity predictions.

Parameters:

Name Type Description Default
compound_df DataFrame

DataFrame containing compound information (must have join column)

required
on str

Column to join on

'cpd'
how str

Join type (default 'left' keeps all compounds)

'left'

Returns:

Type Description
DataFrame

Merged DataFrame with toxicity predictions

Source code in src/infrastructure/persistence/toxcsm_repository.py
def merge_with_compound_data(
    self,
    compound_df: pd.DataFrame,
    on: str = "cpd",  # Atualizar nome da coluna padrão
    how: str = "left",
) -> pd.DataFrame:
    """
    Merge compound data with toxicity predictions.

    Parameters
    ----------
    compound_df : pd.DataFrame
        DataFrame containing compound information (must have join column)
    on : str, default='cpd'
        Column to join on
    how : str, default='left'
        Join type (default 'left' keeps all compounds)

    Returns
    -------
    pd.DataFrame
        Merged DataFrame with toxicity predictions
    """
    logger.info(
        "Merging compound data with ToxCSM predictions",
        extra={"input_rows": len(compound_df)},
    )

    return self.merge_with_dataset(dataset_df=compound_df, on=on, how=how)
load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }

CSVDatabaseRepository

CSVDatabaseRepository

CSVDatabaseRepository(filepath: Path, encoding: str = 'utf-8', separator: str = ';', required_columns: Optional[list[str]] = None)

Base implementation for CSV-based database repositories.

Provides common functionality for loading, caching, validating, and merging CSV databases. Specific database repositories inherit from this class.

Attributes:

Name Type Description
filepath Path

Path to CSV database file

encoding str

File encoding (default: 'utf-8')

separator str

CSV separator (default: ';')

required_columns list[str]

List of required column names for validation

_data Optional[DataFrame]

Cached database data (lazy loaded)

Methods:

Name Description
load_data

Load CSV database with caching

reload_data

Force reload database from file

merge_with_dataset

Merge dataset with database

get_column_names

Get column names from database

validate_schema

Validate database schema

get_stats

Get database statistics

Notes
  • Implements lazy loading with caching for performance
  • Optimizes dtypes to reduce memory usage

Initialize CSV database repository.

Parameters:

Name Type Description Default
filepath Path

Path to CSV file.

required
encoding str

File encoding.

'utf-8'
separator str

CSV separator.

';'
required_columns Optional[list[str]]

List of required column names for validation.

None
Source code in src/infrastructure/persistence/csv_database_repository.py
def __init__(
    self,
    filepath: Path,
    encoding: str = "utf-8",
    separator: str = ";",
    required_columns: Optional[list[str]] = None,
):
    """
    Initialize CSV database repository.

    Parameters
    ----------
    filepath : Path
        Path to CSV file.
    encoding : str, default='utf-8'
        File encoding.
    separator : str, default=';'
        CSV separator.
    required_columns : Optional[list[str]], default=None
        List of required column names for validation.
    """
    self.filepath = filepath
    self.encoding = encoding
    self.separator = separator
    self.required_columns = required_columns or []
    self._data: Optional[pd.DataFrame] = None

    logger.info(
        f"Initialized {self.__class__.__name__}",
        extra={
            "filepath": str(filepath),
            "encoding": encoding,
            "separator": separator,
            "required_columns": self.required_columns,
        },
    )

Functions

load_data
load_data() -> pd.DataFrame

Load CSV database into DataFrame with caching.

Returns:

Type Description
DataFrame

Database data with optimized dtypes

Raises:

Type Description
FileNotFoundError

If CSV file doesn't exist

ValueError

If CSV format is invalid or required columns missing

Source code in src/infrastructure/persistence/csv_database_repository.py
def load_data(self) -> pd.DataFrame:
    """
    Load CSV database into DataFrame with caching.

    Returns
    -------
    pd.DataFrame
        Database data with optimized dtypes

    Raises
    ------
    FileNotFoundError
        If CSV file doesn't exist
    ValueError
        If CSV format is invalid or required columns missing
    """
    if self._data is not None:
        logger.debug(
            f"Using cached data for {self.filepath.name}",
            extra={"rows": len(self._data)},
        )
        return self._data

    logger.info(f"Loading database from {self.filepath}")

    if not self.filepath.exists():
        error_msg = f"Database file not found: {self.filepath}"
        logger.error(error_msg)
        raise FileNotFoundError(error_msg)

    try:
        # Load CSV
        df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)

        # Validate schema
        if not self.validate_schema(df):
            raise ValueError(
                f"Invalid database schema. "
                f"Required columns: {self.required_columns}"
            )

        # Optimize dtypes
        df = self._optimize_dtypes(df)

        # Cache
        self._data = df

        logger.info(
            f"Successfully loaded database: {self.filepath.name}",
            extra={
                "rows": len(df),
                "columns": len(df.columns),
                "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
            },
        )

        return df

    except Exception as e:
        logger.error(f"Failed to load database: {e}")
        raise
reload_data
reload_data() -> pd.DataFrame

Force reload database from file.

Clears cache and reloads data from CSV file.

Returns:

Type Description
DataFrame

Freshly loaded database data

Source code in src/infrastructure/persistence/csv_database_repository.py
def reload_data(self) -> pd.DataFrame:
    """
    Force reload database from file.

    Clears cache and reloads data from CSV file.

    Returns
    -------
    pd.DataFrame
        Freshly loaded database data
    """
    logger.info(f"Forcing reload of {self.filepath.name}")
    self._data = None
    return self.load_data()
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame

Merge dataset with database.

Parameters:

Name Type Description Default
dataset_df DataFrame

Input dataset (must have join column)

required
on str

Column name to join on

'ko'
how str

Join type ('inner', 'left', 'right', 'outer')

'inner'

Returns:

Type Description
DataFrame

Merged DataFrame

Raises:

Type Description
ValueError

If join column missing in either DataFrame

Source code in src/infrastructure/persistence/csv_database_repository.py
def merge_with_dataset(
    self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
    """
    Merge dataset with database.

    Parameters
    ----------
    dataset_df : pd.DataFrame
        Input dataset (must have join column)
    on : str, default='ko'
        Column name to join on
    how : str, default='inner'
        Join type ('inner', 'left', 'right', 'outer')

    Returns
    -------
    pd.DataFrame
        Merged DataFrame

    Raises
    ------
    ValueError
        If join column missing in either DataFrame
    """
    logger.info(
        f"Merging dataset with {self.__class__.__name__}",
        extra={"on": on, "how": how, "input_rows": len(dataset_df)},
    )

    # Load database if not already loaded
    db_df = self.load_data()

    # Validate join column exists
    if on not in dataset_df.columns:
        error_msg = f"Column '{on}' not found in dataset"
        logger.error(error_msg)
        raise ValueError(error_msg)

    if on not in db_df.columns:
        error_msg = f"Column '{on}' not found in database"
        logger.error(error_msg)
        raise ValueError(error_msg)

    # Perform merge
    merged = pd.merge(dataset_df, db_df, on=on, how=how)

    # Calculate match rate
    match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0

    logger.info(
        f"Merge completed",
        extra={
            "input_rows": len(dataset_df),
            "database_rows": len(db_df),
            "output_rows": len(merged),
            "match_rate_percent": round(match_rate, 2),
        },
    )

    return merged
get_column_names
get_column_names() -> list[str]

Get column names from database.

Returns:

Type Description
list[str]

List of column names

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_column_names(self) -> list[str]:
    """
    Get column names from database.

    Returns
    -------
    list[str]
        List of column names
    """
    df = self.load_data()
    return df.columns.tolist()
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool

Validate database schema.

Checks if all required columns are present in DataFrame.

Parameters:

Name Type Description Default
df Optional[DataFrame]

DataFrame to validate (if None, loads from file)

None

Returns:

Type Description
bool

True if all required columns present, False otherwise

Source code in src/infrastructure/persistence/csv_database_repository.py
def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
    """
    Validate database schema.

    Checks if all required columns are present in DataFrame.

    Parameters
    ----------
    df : Optional[pd.DataFrame], default=None
        DataFrame to validate (if None, loads from file)

    Returns
    -------
    bool
        True if all required columns present, False otherwise
    """
    if df is None:
        df = self.load_data()

    if not self.required_columns:
        # No required columns specified, always valid
        return True

    missing = set(self.required_columns) - set(df.columns)

    if missing:
        logger.warning(
            f"Missing required columns in {self.filepath.name}",
            extra={
                "missing_columns": list(missing),
                "required": self.required_columns,
                "found": df.columns.tolist(),
            },
        )
        return False

    return True
get_stats
get_stats() -> dict

Get database statistics.

Returns:

Type Description
dict

Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes

Source code in src/infrastructure/persistence/csv_database_repository.py
def get_stats(self) -> dict:
    """
    Get database statistics.

    Returns
    -------
    dict
        Dictionary containing:
        - 'rows': Number of rows
        - 'columns': Number of columns
        - 'memory_mb': Memory usage in MB
        - 'column_names': List of column names
        - 'dtypes': Dictionary of column datatypes
    """
    df = self.load_data()

    return {
        "rows": len(df),
        "columns": len(df.columns),
        "memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
        "column_names": df.columns.tolist(),
        "dtypes": df.dtypes.astype(str).to_dict(),
    }