HADEGRepository(filepath: Path = Path('data/databases/hadeg_db.csv'), encoding: str = 'utf-8', separator: str = ';')
Bases: CSVDatabaseRepository
Repository for HADEG enzyme database.
Provides access to enzyme data for hydrocarbon degradation pathways. Database file: data/databases/hadeg_db.csv
Attributes:
| Name | Type | Description |
filepath | Path | Path to HADEG database CSV file |
encoding | str | File encoding (default: 'utf-8') |
separator | str | CSV separator (default: ';') |
required_columns | list[str] | Required columns: ['ko', 'Gene', 'Pathway'] |
Initialize HADEG repository.
Parameters:
| Name | Type | Description | Default |
filepath | Path | Path to HADEG database CSV file. | Path('data/databases/hadeg_db.csv') |
encoding | str | | 'utf-8' |
separator | str | | ';' |
Source code in src/infrastructure/persistence/hadeg_repository.py
| def __init__(
self,
filepath: Path = Path("data/databases/hadeg_db.csv"),
encoding: str = "utf-8",
separator: str = ";",
):
"""
Initialize HADEG repository.
Parameters
----------
filepath : Path, default=Path('data/databases/hadeg_db.csv')
Path to HADEG database CSV file.
encoding : str, default='utf-8'
File encoding.
separator : str, default=';'
CSV separator.
"""
super().__init__(
filepath=filepath,
encoding=encoding,
separator=separator,
required_columns=[
"ko",
"Gene",
"Pathway",
], # Nome real: Pathway com maiúscula
)
|
Functions
load_data
load_data() -> pd.DataFrame
Load CSV database into DataFrame with caching.
Returns:
| Type | Description |
DataFrame | Database data with optimized dtypes |
Raises:
| Type | Description |
FileNotFoundError | If CSV file doesn't exist |
ValueError | If CSV format is invalid or required columns missing |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def load_data(self) -> pd.DataFrame:
"""
Load CSV database into DataFrame with caching.
Returns
-------
pd.DataFrame
Database data with optimized dtypes
Raises
------
FileNotFoundError
If CSV file doesn't exist
ValueError
If CSV format is invalid or required columns missing
"""
if self._data is not None:
logger.debug(
f"Using cached data for {self.filepath.name}",
extra={"rows": len(self._data)},
)
return self._data
logger.info(f"Loading database from {self.filepath}")
if not self.filepath.exists():
error_msg = f"Database file not found: {self.filepath}"
logger.error(error_msg)
raise FileNotFoundError(error_msg)
try:
# Load CSV
df = pd.read_csv(self.filepath, encoding=self.encoding, sep=self.separator)
# Validate schema
if not self.validate_schema(df):
raise ValueError(
f"Invalid database schema. "
f"Required columns: {self.required_columns}"
)
# Optimize dtypes
df = self._optimize_dtypes(df)
# Cache
self._data = df
logger.info(
f"Successfully loaded database: {self.filepath.name}",
extra={
"rows": len(df),
"columns": len(df.columns),
"memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
},
)
return df
except Exception as e:
logger.error(f"Failed to load database: {e}")
raise
|
reload_data
reload_data() -> pd.DataFrame
Force reload database from file.
Clears cache and reloads data from CSV file.
Returns:
| Type | Description |
DataFrame | Freshly loaded database data |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def reload_data(self) -> pd.DataFrame:
"""
Force reload database from file.
Clears cache and reloads data from CSV file.
Returns
-------
pd.DataFrame
Freshly loaded database data
"""
logger.info(f"Forcing reload of {self.filepath.name}")
self._data = None
return self.load_data()
|
merge_with_dataset
merge_with_dataset(dataset_df: DataFrame, on: str = 'ko', how: str = 'inner') -> pd.DataFrame
Merge dataset with database.
Parameters:
| Name | Type | Description | Default |
dataset_df | DataFrame | Input dataset (must have join column) | required |
on | str | | 'ko' |
how | str | Join type ('inner', 'left', 'right', 'outer') | 'inner' |
Returns:
| Type | Description |
DataFrame | |
Raises:
| Type | Description |
ValueError | If join column missing in either DataFrame |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def merge_with_dataset(
self, dataset_df: pd.DataFrame, on: str = "ko", how: str = "inner"
) -> pd.DataFrame:
"""
Merge dataset with database.
Parameters
----------
dataset_df : pd.DataFrame
Input dataset (must have join column)
on : str, default='ko'
Column name to join on
how : str, default='inner'
Join type ('inner', 'left', 'right', 'outer')
Returns
-------
pd.DataFrame
Merged DataFrame
Raises
------
ValueError
If join column missing in either DataFrame
"""
logger.info(
f"Merging dataset with {self.__class__.__name__}",
extra={"on": on, "how": how, "input_rows": len(dataset_df)},
)
# Load database if not already loaded
db_df = self.load_data()
# Validate join column exists
if on not in dataset_df.columns:
error_msg = f"Column '{on}' not found in dataset"
logger.error(error_msg)
raise ValueError(error_msg)
if on not in db_df.columns:
error_msg = f"Column '{on}' not found in database"
logger.error(error_msg)
raise ValueError(error_msg)
# Perform merge
merged = pd.merge(dataset_df, db_df, on=on, how=how)
# Calculate match rate
match_rate = len(merged) / len(dataset_df) * 100 if len(dataset_df) > 0 else 0
logger.info(
f"Merge completed",
extra={
"input_rows": len(dataset_df),
"database_rows": len(db_df),
"output_rows": len(merged),
"match_rate_percent": round(match_rate, 2),
},
)
return merged
|
get_column_names
get_column_names() -> list[str]
Get column names from database.
Returns:
| Type | Description |
list[str] | |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def get_column_names(self) -> list[str]:
"""
Get column names from database.
Returns
-------
list[str]
List of column names
"""
df = self.load_data()
return df.columns.tolist()
|
validate_schema
validate_schema(df: Optional[DataFrame] = None) -> bool
Validate database schema.
Checks if all required columns are present in DataFrame.
Parameters:
| Name | Type | Description | Default |
df | Optional[DataFrame] | DataFrame to validate (if None, loads from file) | None |
Returns:
| Type | Description |
bool | True if all required columns present, False otherwise |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def validate_schema(self, df: Optional[pd.DataFrame] = None) -> bool:
"""
Validate database schema.
Checks if all required columns are present in DataFrame.
Parameters
----------
df : Optional[pd.DataFrame], default=None
DataFrame to validate (if None, loads from file)
Returns
-------
bool
True if all required columns present, False otherwise
"""
if df is None:
df = self.load_data()
if not self.required_columns:
# No required columns specified, always valid
return True
missing = set(self.required_columns) - set(df.columns)
if missing:
logger.warning(
f"Missing required columns in {self.filepath.name}",
extra={
"missing_columns": list(missing),
"required": self.required_columns,
"found": df.columns.tolist(),
},
)
return False
return True
|
get_stats
Get database statistics.
Returns:
| Type | Description |
dict | Dictionary containing: - 'rows': Number of rows - 'columns': Number of columns - 'memory_mb': Memory usage in MB - 'column_names': List of column names - 'dtypes': Dictionary of column datatypes |
Source code in src/infrastructure/persistence/csv_database_repository.py
| def get_stats(self) -> dict:
"""
Get database statistics.
Returns
-------
dict
Dictionary containing:
- 'rows': Number of rows
- 'columns': Number of columns
- 'memory_mb': Memory usage in MB
- 'column_names': List of column names
- 'dtypes': Dictionary of column datatypes
"""
df = self.load_data()
return {
"rows": len(df),
"columns": len(df.columns),
"memory_mb": round(df.memory_usage(deep=True).sum() / 1024**2, 2),
"column_names": df.columns.tolist(),
"dtypes": df.dtypes.astype(str).to_dict(),
}
|