Module molcrawl.experiment_tracker.database

Experiment management database - SQLite based

Classes

class ExperimentDatabase (db_path: str = 'experiments.db')
Expand source code
class ExperimentDatabase:
    """Experiment management database"""

    def __init__(self, db_path: str = "experiments.db"):
        """
        Args:
            db_path: Database file path
        """
        self.db_path = Path(db_path)
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self._initialize_database()

    @contextmanager
    def get_connection(self):
        """Context manager for database connections"""
        conn = sqlite3.connect(str(self.db_path))
        conn.row_factory = sqlite3.Row
        try:
            yield conn
            conn.commit()
        except Exception as e:
            conn.rollback()
            raise e
        finally:
            conn.close()

    def _initialize_database(self):
        """Initialize the database"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Experiment table
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS experiments (
                    experiment_id TEXT PRIMARY KEY,
                    experiment_name TEXT NOT NULL,
                    experiment_type TEXT NOT NULL,
                    model_type TEXT NOT NULL,
                    dataset_type TEXT NOT NULL,
                    status TEXT NOT NULL,
                    created_at TEXT NOT NULL,
                    started_at TEXT,
                    completed_at TEXT,
                    total_duration_seconds REAL,
                    config_path TEXT,
                    config TEXT,
                    results_dir TEXT,
                    results TEXT,
                    metrics TEXT,
                    tags TEXT,
                    notes TEXT,
                    environment TEXT
                )
            """
            )

            # step table
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS experiment_steps (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    experiment_id TEXT NOT NULL,
                    step_id TEXT NOT NULL,
                    step_name TEXT NOT NULL,
                    status TEXT NOT NULL,
                    start_time TEXT,
                    end_time TEXT,
                    duration_seconds REAL,
                    command TEXT,
                    output_path TEXT,
                    error_message TEXT,
                    metadata TEXT,
                    FOREIGN KEY (experiment_id) REFERENCES experiments(experiment_id)
                )
            """
            )

            # log table
            cursor.execute(
                """
                CREATE TABLE IF NOT EXISTS experiment_logs (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    experiment_id TEXT NOT NULL,
                    timestamp TEXT NOT NULL,
                    level TEXT NOT NULL,
                    message TEXT NOT NULL,
                    source TEXT,
                    FOREIGN KEY (experiment_id) REFERENCES experiments(experiment_id)
                )
            """
            )

            # create index
            cursor.execute(
                """
                CREATE INDEX IF NOT EXISTS idx_experiments_status
                ON experiments(status)
            """
            )
            cursor.execute(
                """
                CREATE INDEX IF NOT EXISTS idx_experiments_type
                ON experiments(experiment_type)
            """
            )
            cursor.execute(
                """
                CREATE INDEX IF NOT EXISTS idx_experiments_created
                ON experiments(created_at)
            """
            )

    def save_experiment(self, experiment: Experiment) -> None:
        """Save experiment"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            cursor.execute(
                """
                INSERT OR REPLACE INTO experiments (
                    experiment_id, experiment_name, experiment_type,
                    model_type, dataset_type, status,
                    created_at, started_at, completed_at,
                    total_duration_seconds, config_path, config,
                    results_dir, results, metrics,
                    tags, notes, environment
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
                (
                    experiment.experiment_id,
                    experiment.experiment_name,
                    experiment.experiment_type.value,
                    experiment.model_type.value,
                    experiment.dataset_type.value,
                    experiment.status.value,
                    experiment.created_at.isoformat(),
                    experiment.started_at.isoformat() if experiment.started_at else None,
                    experiment.completed_at.isoformat() if experiment.completed_at else None,
                    experiment.total_duration_seconds,
                    experiment.config_path,
                    json.dumps(experiment.config),
                    experiment.results_dir,
                    json.dumps(experiment.results),
                    json.dumps(experiment.metrics),
                    json.dumps(experiment.tags),
                    experiment.notes,
                    json.dumps(experiment.environment),
                ),
            )

            # save step
            cursor.execute(
                "DELETE FROM experiment_steps WHERE experiment_id = ?",
                (experiment.experiment_id,),
            )
            for step in experiment.steps:
                cursor.execute(
                    """
                    INSERT INTO experiment_steps (
                        experiment_id, step_id, step_name, status,
                        start_time, end_time, duration_seconds,
                        command, output_path, error_message, metadata
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                    (
                        experiment.experiment_id,
                        step.step_id,
                        step.step_name,
                        step.status.value,
                        step.start_time.isoformat() if step.start_time else None,
                        step.end_time.isoformat() if step.end_time else None,
                        step.duration_seconds,
                        step.command,
                        step.output_path,
                        step.error_message,
                        json.dumps(step.metadata),
                    ),
                )

    def add_log(self, experiment_id: str, log: ExperimentLog) -> None:
        """Add log"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            cursor.execute(
                """
                INSERT INTO experiment_logs (
                    experiment_id, timestamp, level, message, source
                ) VALUES (?, ?, ?, ?, ?)
            """,
                (
                    experiment_id,
                    log.timestamp.isoformat(),
                    log.level,
                    log.message,
                    log.source,
                ),
            )

    def get_experiment(self, experiment_id: str) -> Optional[Experiment]:
        """Get the experiment"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            # Get experiment information
            cursor.execute(
                """
                SELECT * FROM experiments WHERE experiment_id = ?
            """,
                (experiment_id,),
            )
            row = cursor.fetchone()

            if not row:
                return None

            # get step
            cursor.execute(
                """
                SELECT * FROM experiment_steps WHERE experiment_id = ?
                ORDER BY id
            """,
                (experiment_id,),
            )
            steps_rows = cursor.fetchall()

            # get log
            cursor.execute(
                """
                SELECT * FROM experiment_logs WHERE experiment_id = ?
                ORDER BY timestamp
            """,
                (experiment_id,),
            )
            logs_rows = cursor.fetchall()

            # Build the Experiment object
            experiment_data = dict(row)
            experiment_data["experiment_type"] = ExperimentType(experiment_data["experiment_type"])
            experiment_data["model_type"] = ModelType(experiment_data["model_type"])
            experiment_data["dataset_type"] = DatasetType(experiment_data["dataset_type"])
            experiment_data["status"] = ExperimentStatus(experiment_data["status"])
            experiment_data["created_at"] = datetime.fromisoformat(experiment_data["created_at"])

            if experiment_data.get("started_at"):
                experiment_data["started_at"] = datetime.fromisoformat(experiment_data["started_at"])
            if experiment_data.get("completed_at"):
                experiment_data["completed_at"] = datetime.fromisoformat(experiment_data["completed_at"])

            experiment_data["config"] = json.loads(experiment_data["config"]) if experiment_data["config"] else {}
            experiment_data["results"] = json.loads(experiment_data["results"]) if experiment_data["results"] else {}
            experiment_data["metrics"] = json.loads(experiment_data["metrics"]) if experiment_data["metrics"] else {}
            experiment_data["tags"] = json.loads(experiment_data["tags"]) if experiment_data["tags"] else []
            experiment_data["environment"] = (
                json.loads(experiment_data["environment"]) if experiment_data["environment"] else {}
            )

            # build step
            steps = []
            for step_row in steps_rows:
                step_data = dict(step_row)
                step_data["status"] = ExperimentStatus(step_data["status"])
                if step_data.get("start_time"):
                    step_data["start_time"] = datetime.fromisoformat(step_data["start_time"])
                if step_data.get("end_time"):
                    step_data["end_time"] = datetime.fromisoformat(step_data["end_time"])
                step_data["metadata"] = json.loads(step_data["metadata"]) if step_data["metadata"] else {}
                # remove unnecessary fields
                del step_data["id"]
                del step_data["experiment_id"]
                steps.append(ExperimentStep(**step_data))

            # build log
            logs = []
            for log_row in logs_rows:
                log_data = dict(log_row)
                log_data["timestamp"] = datetime.fromisoformat(log_data["timestamp"])
                # remove unnecessary fields
                del log_data["id"]
                del log_data["experiment_id"]
                logs.append(ExperimentLog(**log_data))

            experiment_data["steps"] = steps
            experiment_data["logs"] = logs

            return Experiment(**experiment_data)

    def list_experiments(
        self,
        status: Optional[ExperimentStatus] = None,
        experiment_type: Optional[ExperimentType] = None,
        model_type: Optional[ModelType] = None,
        dataset_type: Optional[DatasetType] = None,
        limit: int = 100,
        offset: int = 0,
    ) -> List[Experiment]:
        """Get experiment list"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            query = "SELECT experiment_id FROM experiments WHERE 1=1"
            params: list[object] = []

            if status:
                query += " AND status = ?"
                params.append(status.value)
            if experiment_type:
                query += " AND experiment_type = ?"
                params.append(experiment_type.value)
            if model_type:
                query += " AND model_type = ?"
                params.append(model_type.value)
            if dataset_type:
                query += " AND dataset_type = ?"
                params.append(dataset_type.value)

            query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
            params.extend([limit, offset])

            cursor.execute(query, params)
            rows = cursor.fetchall()

            experiments = []
            for row in rows:
                exp = self.get_experiment(row["experiment_id"])
                if exp:
                    experiments.append(exp)

            return experiments

    def get_statistics(self) -> Dict[str, Any]:
        """Get statistics"""
        with self.get_connection() as conn:
            cursor = conn.cursor()

            stats = {}

            # Total number of experiments
            cursor.execute("SELECT COUNT(*) as count FROM experiments")
            stats["total_experiments"] = cursor.fetchone()["count"]

            # By status
            cursor.execute(
                """
                SELECT status, COUNT(*) as count
                FROM experiments
                GROUP BY status
            """
            )
            stats["by_status"] = {row["status"]: row["count"] for row in cursor.fetchall()}

            # By type
            cursor.execute(
                """
                SELECT experiment_type, COUNT(*) as count
                FROM experiments
                GROUP BY experiment_type
            """
            )
            stats["by_type"] = {row["experiment_type"]: row["count"] for row in cursor.fetchall()}

            # By model
            cursor.execute(
                """
                SELECT model_type, COUNT(*) as count
                FROM experiments
                GROUP BY model_type
            """
            )
            stats["by_model"] = {row["model_type"]: row["count"] for row in cursor.fetchall()}

            # By dataset
            cursor.execute(
                """
                SELECT dataset_type, COUNT(*) as count
                FROM experiments
                GROUP BY dataset_type
            """
            )
            stats["by_dataset"] = {row["dataset_type"]: row["count"] for row in cursor.fetchall()}

            return stats

Experiment management database

Args

db_path
Database file path

Methods

def add_log(self,
experiment_id: str,
log: ExperimentLog) ‑> None
Expand source code
def add_log(self, experiment_id: str, log: ExperimentLog) -> None:
    """Add log"""
    with self.get_connection() as conn:
        cursor = conn.cursor()
        cursor.execute(
            """
            INSERT INTO experiment_logs (
                experiment_id, timestamp, level, message, source
            ) VALUES (?, ?, ?, ?, ?)
        """,
            (
                experiment_id,
                log.timestamp.isoformat(),
                log.level,
                log.message,
                log.source,
            ),
        )

Add log

def get_connection(self)
Expand source code
@contextmanager
def get_connection(self):
    """Context manager for database connections"""
    conn = sqlite3.connect(str(self.db_path))
    conn.row_factory = sqlite3.Row
    try:
        yield conn
        conn.commit()
    except Exception as e:
        conn.rollback()
        raise e
    finally:
        conn.close()

Context manager for database connections

def get_experiment(self, experiment_id: str) ‑> Experiment | None
Expand source code
def get_experiment(self, experiment_id: str) -> Optional[Experiment]:
    """Get the experiment"""
    with self.get_connection() as conn:
        cursor = conn.cursor()

        # Get experiment information
        cursor.execute(
            """
            SELECT * FROM experiments WHERE experiment_id = ?
        """,
            (experiment_id,),
        )
        row = cursor.fetchone()

        if not row:
            return None

        # get step
        cursor.execute(
            """
            SELECT * FROM experiment_steps WHERE experiment_id = ?
            ORDER BY id
        """,
            (experiment_id,),
        )
        steps_rows = cursor.fetchall()

        # get log
        cursor.execute(
            """
            SELECT * FROM experiment_logs WHERE experiment_id = ?
            ORDER BY timestamp
        """,
            (experiment_id,),
        )
        logs_rows = cursor.fetchall()

        # Build the Experiment object
        experiment_data = dict(row)
        experiment_data["experiment_type"] = ExperimentType(experiment_data["experiment_type"])
        experiment_data["model_type"] = ModelType(experiment_data["model_type"])
        experiment_data["dataset_type"] = DatasetType(experiment_data["dataset_type"])
        experiment_data["status"] = ExperimentStatus(experiment_data["status"])
        experiment_data["created_at"] = datetime.fromisoformat(experiment_data["created_at"])

        if experiment_data.get("started_at"):
            experiment_data["started_at"] = datetime.fromisoformat(experiment_data["started_at"])
        if experiment_data.get("completed_at"):
            experiment_data["completed_at"] = datetime.fromisoformat(experiment_data["completed_at"])

        experiment_data["config"] = json.loads(experiment_data["config"]) if experiment_data["config"] else {}
        experiment_data["results"] = json.loads(experiment_data["results"]) if experiment_data["results"] else {}
        experiment_data["metrics"] = json.loads(experiment_data["metrics"]) if experiment_data["metrics"] else {}
        experiment_data["tags"] = json.loads(experiment_data["tags"]) if experiment_data["tags"] else []
        experiment_data["environment"] = (
            json.loads(experiment_data["environment"]) if experiment_data["environment"] else {}
        )

        # build step
        steps = []
        for step_row in steps_rows:
            step_data = dict(step_row)
            step_data["status"] = ExperimentStatus(step_data["status"])
            if step_data.get("start_time"):
                step_data["start_time"] = datetime.fromisoformat(step_data["start_time"])
            if step_data.get("end_time"):
                step_data["end_time"] = datetime.fromisoformat(step_data["end_time"])
            step_data["metadata"] = json.loads(step_data["metadata"]) if step_data["metadata"] else {}
            # remove unnecessary fields
            del step_data["id"]
            del step_data["experiment_id"]
            steps.append(ExperimentStep(**step_data))

        # build log
        logs = []
        for log_row in logs_rows:
            log_data = dict(log_row)
            log_data["timestamp"] = datetime.fromisoformat(log_data["timestamp"])
            # remove unnecessary fields
            del log_data["id"]
            del log_data["experiment_id"]
            logs.append(ExperimentLog(**log_data))

        experiment_data["steps"] = steps
        experiment_data["logs"] = logs

        return Experiment(**experiment_data)

Get the experiment

def get_statistics(self) ‑> Dict[str, Any]
Expand source code
def get_statistics(self) -> Dict[str, Any]:
    """Get statistics"""
    with self.get_connection() as conn:
        cursor = conn.cursor()

        stats = {}

        # Total number of experiments
        cursor.execute("SELECT COUNT(*) as count FROM experiments")
        stats["total_experiments"] = cursor.fetchone()["count"]

        # By status
        cursor.execute(
            """
            SELECT status, COUNT(*) as count
            FROM experiments
            GROUP BY status
        """
        )
        stats["by_status"] = {row["status"]: row["count"] for row in cursor.fetchall()}

        # By type
        cursor.execute(
            """
            SELECT experiment_type, COUNT(*) as count
            FROM experiments
            GROUP BY experiment_type
        """
        )
        stats["by_type"] = {row["experiment_type"]: row["count"] for row in cursor.fetchall()}

        # By model
        cursor.execute(
            """
            SELECT model_type, COUNT(*) as count
            FROM experiments
            GROUP BY model_type
        """
        )
        stats["by_model"] = {row["model_type"]: row["count"] for row in cursor.fetchall()}

        # By dataset
        cursor.execute(
            """
            SELECT dataset_type, COUNT(*) as count
            FROM experiments
            GROUP BY dataset_type
        """
        )
        stats["by_dataset"] = {row["dataset_type"]: row["count"] for row in cursor.fetchall()}

        return stats

Get statistics

def list_experiments(self,
status: ExperimentStatus | None = None,
experiment_type: ExperimentType | None = None,
model_type: ModelType | None = None,
dataset_type: DatasetType | None = None,
limit: int = 100,
offset: int = 0) ‑> List[Experiment]
Expand source code
def list_experiments(
    self,
    status: Optional[ExperimentStatus] = None,
    experiment_type: Optional[ExperimentType] = None,
    model_type: Optional[ModelType] = None,
    dataset_type: Optional[DatasetType] = None,
    limit: int = 100,
    offset: int = 0,
) -> List[Experiment]:
    """Get experiment list"""
    with self.get_connection() as conn:
        cursor = conn.cursor()

        query = "SELECT experiment_id FROM experiments WHERE 1=1"
        params: list[object] = []

        if status:
            query += " AND status = ?"
            params.append(status.value)
        if experiment_type:
            query += " AND experiment_type = ?"
            params.append(experiment_type.value)
        if model_type:
            query += " AND model_type = ?"
            params.append(model_type.value)
        if dataset_type:
            query += " AND dataset_type = ?"
            params.append(dataset_type.value)

        query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
        params.extend([limit, offset])

        cursor.execute(query, params)
        rows = cursor.fetchall()

        experiments = []
        for row in rows:
            exp = self.get_experiment(row["experiment_id"])
            if exp:
                experiments.append(exp)

        return experiments

Get experiment list

def save_experiment(self,
experiment: Experiment) ‑> None
Expand source code
def save_experiment(self, experiment: Experiment) -> None:
    """Save experiment"""
    with self.get_connection() as conn:
        cursor = conn.cursor()

        cursor.execute(
            """
            INSERT OR REPLACE INTO experiments (
                experiment_id, experiment_name, experiment_type,
                model_type, dataset_type, status,
                created_at, started_at, completed_at,
                total_duration_seconds, config_path, config,
                results_dir, results, metrics,
                tags, notes, environment
            ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
            (
                experiment.experiment_id,
                experiment.experiment_name,
                experiment.experiment_type.value,
                experiment.model_type.value,
                experiment.dataset_type.value,
                experiment.status.value,
                experiment.created_at.isoformat(),
                experiment.started_at.isoformat() if experiment.started_at else None,
                experiment.completed_at.isoformat() if experiment.completed_at else None,
                experiment.total_duration_seconds,
                experiment.config_path,
                json.dumps(experiment.config),
                experiment.results_dir,
                json.dumps(experiment.results),
                json.dumps(experiment.metrics),
                json.dumps(experiment.tags),
                experiment.notes,
                json.dumps(experiment.environment),
            ),
        )

        # save step
        cursor.execute(
            "DELETE FROM experiment_steps WHERE experiment_id = ?",
            (experiment.experiment_id,),
        )
        for step in experiment.steps:
            cursor.execute(
                """
                INSERT INTO experiment_steps (
                    experiment_id, step_id, step_name, status,
                    start_time, end_time, duration_seconds,
                    command, output_path, error_message, metadata
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            """,
                (
                    experiment.experiment_id,
                    step.step_id,
                    step.step_name,
                    step.status.value,
                    step.start_time.isoformat() if step.start_time else None,
                    step.end_time.isoformat() if step.end_time else None,
                    step.duration_seconds,
                    step.command,
                    step.output_path,
                    step.error_message,
                    json.dumps(step.metadata),
                ),
            )

Save experiment