Module molcrawl.evaluation.gpt2.omim_real_data_processor
OMIM Real Data Processor
A module that processes actual OMIM data files to create datasets for genome sequence evaluation
Functions
def process_omim_real_data(config_path: str,
output_dir: str,
existing_omim_dir: str | None = None,
force_download: bool = False) ‑> str-
Expand source code
def process_omim_real_data( config_path: str, output_dir: str, existing_omim_dir: Optional[str] = None, force_download: bool = False, ) -> str: """ Process OMIM real data to create datasets Args: config_path: configuration file path output_dir: Output directory existing_omim_dir: Existing OMIM data directory (skip download if specified) force_download: force download flag Returns: output file path """ # Log settings log_dir = os.path.join(output_dir, "logs") os.makedirs(log_dir, exist_ok=True) log_file = os.path.join(log_dir, f"omim_real_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s", handlers=[logging.FileHandler(log_file), logging.StreamHandler()], ) logger = logging.getLogger(__name__) logger.info("Starting OMIM real data processing") if existing_omim_dir: logger.info(f"Using existing OMIM directory: {existing_omim_dir}") try: # Processor initialization processor = OMIMRealDataProcessor(config_path, logger) # Download file or use existing file if existing_omim_dir and os.path.isdir(existing_omim_dir): logger.info("Using existing OMIM files, skipping download") # set path to existing file downloaded_files = { "mim2gene": os.path.join(existing_omim_dir, "mim2gene.txt"), "mim_titles": os.path.join(existing_omim_dir, "mimTitles.txt"), "genemap2": os.path.join(existing_omim_dir, "genemap2.txt"), "morbidmap": os.path.join(existing_omim_dir, "morbidmap.txt"), } # Check the existence of the file for _key, path in downloaded_files.items(): if not os.path.exists(path): logger.warning(f"File not found: {path}") else: # file download downloaded_files = processor.download_omim_files(force_download) # Create dataset dataset = processor.create_evaluation_dataset(downloaded_files) # Save results # Assuming output_dir already points to data os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, "omim_real_evaluation_dataset.csv") dataset.to_csv(output_file, index=False) logger.info("OMIM real data processing completed") logger.info(f"Output file: {output_file}") return output_file except Exception as e: logger.error(f"OMIM real data processing failed: {e}") raiseProcess OMIM real data to create datasets
Args
config_path- configuration file path
output_dir- Output directory
existing_omim_dir- Existing OMIM data directory (skip download if specified)
force_download- force download flag
Returns
output file path
Classes
class OMIMRealDataProcessor (config_path: str, logger: logging.Logger | None = None)-
Expand source code
class OMIMRealDataProcessor: """OMIM real data processing class""" def __init__(self, config_path: str, logger: Optional[logging.Logger] = None): self.logger = logger or logging.getLogger(__name__) # configuration fileloading with open(config_path, "r") as f: self.config = yaml.safe_load(f) self.data_dir = self.config["data_directories"]["base_dir"] self.cache_dir = self.config["data_directories"]["cache_dir"] self.processed_dir = self.config["data_directories"]["processed_dir"] # create directory for dir_path in [self.data_dir, self.cache_dir, self.processed_dir]: os.makedirs(dir_path, exist_ok=True) self.logger.info("OMIM Real Data Processor initialized") self.logger.info(f"Data directory: {self.data_dir}") def download_omim_files(self, force_download: bool = False) -> Dict[str, str]: """Download OMIM file""" downloaded_files = {} for file_key, file_info in self.config["omim_data_sources"].items(): local_path = os.path.join(self.data_dir, os.path.basename(file_info["local_path"])) if os.path.exists(local_path) and not force_download: self.logger.info(f"File already exists: {local_path}") downloaded_files[file_key] = local_path continue self.logger.info(f"Downloading {file_info['description']}") self.logger.info(f"URL: {file_info['url']}") try: response = requests.get(file_info["url"], timeout=30) response.raise_for_status() with open(local_path, "w", encoding="utf-8") as f: f.write(response.text) self.logger.info(f"Downloaded: {local_path}") downloaded_files[file_key] = local_path except Exception as e: self.logger.error(f"Failed to download {file_key}: {e}") raise return downloaded_files def parse_mim2gene(self, file_path: str) -> pd.DataFrame: """Analyze mim2gene.txt file""" self.logger.info("Parsing mim2gene.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= 5: data.append( { "mim_number": parts[0], "mim_entry_type": parts[1], "entrez_gene_id": parts[2] if parts[2] != "" else None, "approved_gene_symbol": parts[3] if parts[3] != "" else None, "ensembl_gene_id": parts[4] if parts[4] != "" else None, } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} mim2gene entries") return df def parse_mim_titles(self, file_path: str) -> pd.DataFrame: """Parse mimTitles.txt file""" self.logger.info("Parsing mimTitles.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue # Format: Prefix MIM_Number Title parts = line.split("\t") if len(parts) >= 3: prefix = parts[0] mim_number = parts[1] title = parts[2] # Estimate genetic pattern inheritance_pattern = self._extract_inheritance_pattern(title) data.append( { "prefix": prefix, "mim_number": mim_number, "title": title, "inheritance_pattern": inheritance_pattern, "is_phenotype": prefix in ["#", "%", "^", "*"], } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} mim titles entries") return df def parse_genemap2(self, file_path: str) -> pd.DataFrame: """Analyze genemap2.txt file""" self.logger.info("Parsing genemap2.txt") column_names = [ "chromosome", "genomic_position_start", "genomic_position_end", "cyto_location", "computed_cyto_location", "mim_number", "gene_symbols", "gene_name", "approved_symbol", "entrez_gene_id", "ensembl_gene_id", "comments", "phenotypes", "mouse_gene_symbol_id", ] data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= len(column_names): row_data = {} for i, col in enumerate(column_names): row_data[col] = parts[i] if i < len(parts) and parts[i] != "" else None data.append(row_data) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} genemap2 entries") return df def parse_morbidmap(self, file_path: str) -> pd.DataFrame: """Parse morbidmap.txt file""" self.logger.info("Parsing morbidmap.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= 4: disorder = parts[0] gene_symbols = parts[1] mim_number = parts[2] cyto_location = parts[3] # Estimate pathogenicity pathogenicity = self._estimate_pathogenicity(disorder) data.append( { "disorder": disorder, "gene_symbols": gene_symbols, "mim_number": mim_number, "cyto_location": cyto_location, "pathogenicity": pathogenicity, "is_disease_causing": 1 if pathogenicity in ["pathogenic", "likely_pathogenic"] else 0, } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} morbidmap entries") return df def _extract_inheritance_pattern(self, title: str) -> str: """Extract genetic pattern from title""" title_lower = title.lower() if "autosomal dominant" in title_lower or "ad" in title_lower: return "autosomal_dominant" elif "autosomal recessive" in title_lower or "ar" in title_lower: return "autosomal_recessive" elif "x-linked" in title_lower or "xlr" in title_lower or "xld" in title_lower: return "x_linked" elif "mitochondrial" in title_lower or "maternal" in title_lower: return "mitochondrial" elif "complex" in title_lower or "multifactorial" in title_lower: return "complex" else: return "unknown" def _estimate_pathogenicity(self, disorder: str) -> str: """Estimating pathogenicity from disease name""" disorder_lower = disorder.lower() # Serious disease keyword severe_keywords = [ "cancer", "carcinoma", "tumor", "syndrome", "disease", "deficiency", "dystrophy", "atrophy", "degeneration", ] # Mild disease keyword mild_keywords = ["susceptibility", "predisposition", "variant", "polymorphism"] if any(keyword in disorder_lower for keyword in severe_keywords): return "pathogenic" elif any(keyword in disorder_lower for keyword in mild_keywords): return "likely_pathogenic" else: return "uncertain_significance" def generate_sequences_for_genes(self, gene_symbols: List[str], sequence_length: int = 100) -> Dict[str, str]: """Generate dummy array based on gene symbol""" sequences = {} for gene in gene_symbols: # Generate reproducible sequences using gene names as seeds seed = hash(gene) % (2**32) np.random.seed(seed) nucleotides = ["A", "T", "G", "C"] sequence = "".join(np.random.choice(nucleotides, sequence_length)) sequences[gene] = sequence return sequences def create_evaluation_dataset(self, downloaded_files: Dict[str, str]) -> pd.DataFrame: """Create evaluation dataset""" self.logger.info("Creating evaluation dataset from OMIM real data") # parse each file mim2gene_df = self.parse_mim2gene(downloaded_files["mim2gene"]) mim_titles_df = self.parse_mim_titles(downloaded_files["mim_titles"]) genemap2_df = self.parse_genemap2(downloaded_files["genemap2"]) morbidmap_df = self.parse_morbidmap(downloaded_files["morbidmap"]) # Integrate data self.logger.info("Merging OMIM datasets") # Combine other data based on morbidmap merged_df = morbidmap_df.copy() merged_df = merged_df.merge( mim2gene_df[["mim_number", "approved_gene_symbol", "entrez_gene_id"]], on="mim_number", how="left", suffixes=("", "_mim2gene"), ) merged_df = merged_df.merge( mim_titles_df[["mim_number", "inheritance_pattern", "is_phenotype"]], on="mim_number", how="left", ) # Combine with genemap2 merged_df = merged_df.merge( genemap2_df[["mim_number", "chromosome", "gene_name", "phenotypes"]], on="mim_number", how="left", suffixes=("", "_genemap2"), ) # data cleaning merged_df = merged_df.dropna(subset=["gene_symbols"]) merged_df = merged_df[merged_df["gene_symbols"] != ""] # Filtering based on settings max_sequences = self.config["processing_options"]["max_sequences"] if len(merged_df) > max_sequences: merged_df = merged_df.sample(n=max_sequences, random_state=42) # create array self.logger.info("Generating sequences for genes") unique_genes = [] for gene_symbols in merged_df["gene_symbols"].unique(): if pd.notna(gene_symbols): genes = [g.strip() for g in gene_symbols.split(",")] unique_genes.extend(genes) unique_genes = list(set(unique_genes)) sequence_length = self.config["processing_options"]["sequence_length"] sequences = self.generate_sequences_for_genes(unique_genes, sequence_length) # add array to data frame def get_sequence_for_row(row): gene_symbols = row["gene_symbols"] if pd.notna(gene_symbols): first_gene = gene_symbols.split(",")[0].strip() return sequences.get(first_gene, "") return "" merged_df["sequence"] = merged_df.apply(get_sequence_for_row, axis=1) # exclude empty arrays merged_df = merged_df[merged_df["sequence"] != ""] self.logger.info(f"Created evaluation dataset with {len(merged_df)} entries") self.logger.info(f"Disease-causing variants: {merged_df['is_disease_causing'].sum()}") self.logger.info(f"Benign variants: {(merged_df['is_disease_causing'] == 0).sum()}") return merged_dfOMIM real data processing class
Methods
def create_evaluation_dataset(self, downloaded_files: Dict[str, str]) ‑> pandas.core.frame.DataFrame-
Expand source code
def create_evaluation_dataset(self, downloaded_files: Dict[str, str]) -> pd.DataFrame: """Create evaluation dataset""" self.logger.info("Creating evaluation dataset from OMIM real data") # parse each file mim2gene_df = self.parse_mim2gene(downloaded_files["mim2gene"]) mim_titles_df = self.parse_mim_titles(downloaded_files["mim_titles"]) genemap2_df = self.parse_genemap2(downloaded_files["genemap2"]) morbidmap_df = self.parse_morbidmap(downloaded_files["morbidmap"]) # Integrate data self.logger.info("Merging OMIM datasets") # Combine other data based on morbidmap merged_df = morbidmap_df.copy() merged_df = merged_df.merge( mim2gene_df[["mim_number", "approved_gene_symbol", "entrez_gene_id"]], on="mim_number", how="left", suffixes=("", "_mim2gene"), ) merged_df = merged_df.merge( mim_titles_df[["mim_number", "inheritance_pattern", "is_phenotype"]], on="mim_number", how="left", ) # Combine with genemap2 merged_df = merged_df.merge( genemap2_df[["mim_number", "chromosome", "gene_name", "phenotypes"]], on="mim_number", how="left", suffixes=("", "_genemap2"), ) # data cleaning merged_df = merged_df.dropna(subset=["gene_symbols"]) merged_df = merged_df[merged_df["gene_symbols"] != ""] # Filtering based on settings max_sequences = self.config["processing_options"]["max_sequences"] if len(merged_df) > max_sequences: merged_df = merged_df.sample(n=max_sequences, random_state=42) # create array self.logger.info("Generating sequences for genes") unique_genes = [] for gene_symbols in merged_df["gene_symbols"].unique(): if pd.notna(gene_symbols): genes = [g.strip() for g in gene_symbols.split(",")] unique_genes.extend(genes) unique_genes = list(set(unique_genes)) sequence_length = self.config["processing_options"]["sequence_length"] sequences = self.generate_sequences_for_genes(unique_genes, sequence_length) # add array to data frame def get_sequence_for_row(row): gene_symbols = row["gene_symbols"] if pd.notna(gene_symbols): first_gene = gene_symbols.split(",")[0].strip() return sequences.get(first_gene, "") return "" merged_df["sequence"] = merged_df.apply(get_sequence_for_row, axis=1) # exclude empty arrays merged_df = merged_df[merged_df["sequence"] != ""] self.logger.info(f"Created evaluation dataset with {len(merged_df)} entries") self.logger.info(f"Disease-causing variants: {merged_df['is_disease_causing'].sum()}") self.logger.info(f"Benign variants: {(merged_df['is_disease_causing'] == 0).sum()}") return merged_dfCreate evaluation dataset
def download_omim_files(self, force_download: bool = False) ‑> Dict[str, str]-
Expand source code
def download_omim_files(self, force_download: bool = False) -> Dict[str, str]: """Download OMIM file""" downloaded_files = {} for file_key, file_info in self.config["omim_data_sources"].items(): local_path = os.path.join(self.data_dir, os.path.basename(file_info["local_path"])) if os.path.exists(local_path) and not force_download: self.logger.info(f"File already exists: {local_path}") downloaded_files[file_key] = local_path continue self.logger.info(f"Downloading {file_info['description']}") self.logger.info(f"URL: {file_info['url']}") try: response = requests.get(file_info["url"], timeout=30) response.raise_for_status() with open(local_path, "w", encoding="utf-8") as f: f.write(response.text) self.logger.info(f"Downloaded: {local_path}") downloaded_files[file_key] = local_path except Exception as e: self.logger.error(f"Failed to download {file_key}: {e}") raise return downloaded_filesDownload OMIM file
def generate_sequences_for_genes(self, gene_symbols: List[str], sequence_length: int = 100) ‑> Dict[str, str]-
Expand source code
def generate_sequences_for_genes(self, gene_symbols: List[str], sequence_length: int = 100) -> Dict[str, str]: """Generate dummy array based on gene symbol""" sequences = {} for gene in gene_symbols: # Generate reproducible sequences using gene names as seeds seed = hash(gene) % (2**32) np.random.seed(seed) nucleotides = ["A", "T", "G", "C"] sequence = "".join(np.random.choice(nucleotides, sequence_length)) sequences[gene] = sequence return sequencesGenerate dummy array based on gene symbol
def parse_genemap2(self, file_path: str) ‑> pandas.core.frame.DataFrame-
Expand source code
def parse_genemap2(self, file_path: str) -> pd.DataFrame: """Analyze genemap2.txt file""" self.logger.info("Parsing genemap2.txt") column_names = [ "chromosome", "genomic_position_start", "genomic_position_end", "cyto_location", "computed_cyto_location", "mim_number", "gene_symbols", "gene_name", "approved_symbol", "entrez_gene_id", "ensembl_gene_id", "comments", "phenotypes", "mouse_gene_symbol_id", ] data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= len(column_names): row_data = {} for i, col in enumerate(column_names): row_data[col] = parts[i] if i < len(parts) and parts[i] != "" else None data.append(row_data) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} genemap2 entries") return dfAnalyze genemap2.txt file
def parse_mim2gene(self, file_path: str) ‑> pandas.core.frame.DataFrame-
Expand source code
def parse_mim2gene(self, file_path: str) -> pd.DataFrame: """Analyze mim2gene.txt file""" self.logger.info("Parsing mim2gene.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= 5: data.append( { "mim_number": parts[0], "mim_entry_type": parts[1], "entrez_gene_id": parts[2] if parts[2] != "" else None, "approved_gene_symbol": parts[3] if parts[3] != "" else None, "ensembl_gene_id": parts[4] if parts[4] != "" else None, } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} mim2gene entries") return dfAnalyze mim2gene.txt file
def parse_mim_titles(self, file_path: str) ‑> pandas.core.frame.DataFrame-
Expand source code
def parse_mim_titles(self, file_path: str) -> pd.DataFrame: """Parse mimTitles.txt file""" self.logger.info("Parsing mimTitles.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue # Format: Prefix MIM_Number Title parts = line.split("\t") if len(parts) >= 3: prefix = parts[0] mim_number = parts[1] title = parts[2] # Estimate genetic pattern inheritance_pattern = self._extract_inheritance_pattern(title) data.append( { "prefix": prefix, "mim_number": mim_number, "title": title, "inheritance_pattern": inheritance_pattern, "is_phenotype": prefix in ["#", "%", "^", "*"], } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} mim titles entries") return dfParse mimTitles.txt file
def parse_morbidmap(self, file_path: str) ‑> pandas.core.frame.DataFrame-
Expand source code
def parse_morbidmap(self, file_path: str) -> pd.DataFrame: """Parse morbidmap.txt file""" self.logger.info("Parsing morbidmap.txt") data = [] with open(file_path, "r") as f: for line in f: line = line.strip() if line.startswith("#") or not line: continue parts = line.split("\t") if len(parts) >= 4: disorder = parts[0] gene_symbols = parts[1] mim_number = parts[2] cyto_location = parts[3] # Estimate pathogenicity pathogenicity = self._estimate_pathogenicity(disorder) data.append( { "disorder": disorder, "gene_symbols": gene_symbols, "mim_number": mim_number, "cyto_location": cyto_location, "pathogenicity": pathogenicity, "is_disease_causing": 1 if pathogenicity in ["pathogenic", "likely_pathogenic"] else 0, } ) df = pd.DataFrame(data) self.logger.info(f"Parsed {len(df)} morbidmap entries") return dfParse morbidmap.txt file