Module molcrawl.genome_sequence.dataset.refseq.download_refseq

Functions

def download_refseq(output_dir, path_species, num_worker, species_timeout=1800, max_retries=2)
Expand source code
def download_refseq(
    output_dir, path_species, num_worker, species_timeout=DEFAULT_SPECIES_TIMEOUT, max_retries=DEFAULT_MAX_RETRIES
):
    download_species_refseq(output_dir, path_species, num_worker, species_timeout=species_timeout, max_retries=max_retries)
    extract_refseq(output_dir, num_worker)
def download_species_refseq(output_dir, path_species, num_worker, species_timeout=1800, max_retries=2)
Expand source code
def download_species_refseq(
    output_dir,
    path_species,
    num_worker,
    species_timeout=DEFAULT_SPECIES_TIMEOUT,
    max_retries=DEFAULT_MAX_RETRIES,
):
    """Download RefSeq genome data for all species in the species list.

    Key safety features:
    - **Timeout**: Each species download runs in a separate process with a timeout.
      If ncbi_genome_download hangs, the child process is forcefully killed.
    - **Resume**: Species that already have downloaded files are automatically skipped.
    - **Retry**: Failed/timed-out species are retried up to ``max_retries`` times.
    - **Reporting**: A ``failed_species.json`` is written listing all species that
      could not be downloaded even after retries.
    """
    from rich.progress import Progress

    download_dir = Path(output_dir) / "download_dir"

    group_species_map = get_species(path_species)

    total_species = sum(len(v) for v in group_species_map.values())
    failed_species: list[dict] = []
    skipped_count = 0

    with Progress() as progress_bar:
        task = progress_bar.add_task("Processing ...", total=total_species)

        for group, species in group_species_map.items():
            download_group_dir = download_dir / group
            for sp in species:
                sp_name = sp.strip()
                sp_dir = download_group_dir / to_snake_case(sp_name)

                # ── Resume: skip already-downloaded species ──
                if _is_species_downloaded(sp_dir):
                    logger.info(f"Skipping '{sp_name}' in {group} (already downloaded)")
                    skipped_count += 1
                    progress_bar.update(task, advance=1)
                    continue

                progress_bar.update(task, description=f"Downloading refseq for species {sp_name} in {group}...")
                logger.info(f"Downloading refseq for species {sp_name} in {group}")

                # ── Retry loop ──
                success = False
                last_result = {}
                for attempt in range(1, max_retries + 1):
                    t0 = time.time()
                    result = _download_single_species(
                        genera=sp_name,
                        group=group,
                        sp_dir=str(sp_dir),
                        num_worker=num_worker,
                        timeout=species_timeout,
                    )
                    elapsed = time.time() - t0
                    last_result = result

                    if result["status"] == "ok":
                        logger.info(f"Successfully downloaded '{sp_name}' in {group} (attempt {attempt}, {elapsed:.1f}s)")
                        success = True
                        break
                    else:
                        logger.warning(
                            f"Download of '{sp_name}' in {group} failed "
                            f"(attempt {attempt}/{max_retries}, {elapsed:.1f}s): "
                            f"{result.get('error', 'unknown error')}"
                        )

                if not success:
                    error_info = {
                        "species": sp_name,
                        "group": group,
                        "error": last_result.get("error", "unknown"),
                        "status": last_result.get("status", "unknown"),
                        "timestamp": datetime.now().isoformat(),
                    }
                    failed_species.append(error_info)
                    logger.error(
                        f"Giving up on '{sp_name}' in {group} after {max_retries} attempts. Continuing with next species..."
                    )

                progress_bar.update(task, advance=1)

    # ── Summary report ──
    total_attempted = total_species - skipped_count
    total_failed = len(failed_species)
    total_succeeded = total_attempted - total_failed

    logger.info("=" * 60)
    logger.info("Download Summary")
    logger.info(f"  Total species     : {total_species}")
    logger.info(f"  Skipped (cached)  : {skipped_count}")
    logger.info(f"  Attempted         : {total_attempted}")
    logger.info(f"  Succeeded         : {total_succeeded}")
    logger.info(f"  Failed            : {total_failed}")
    logger.info("=" * 60)

    if failed_species:
        failed_path = Path(output_dir) / "failed_species.json"
        with open(failed_path, "w") as f:
            json.dump(failed_species, f, indent=2, ensure_ascii=False)
        logger.warning(f"Failed species list saved to {failed_path}")
        for fs in failed_species:
            logger.warning(f"  - {fs['species']} ({fs['group']}): {fs['error']}")

Download RefSeq genome data for all species in the species list.

Key safety features: - Timeout: Each species download runs in a separate process with a timeout. If ncbi_genome_download hangs, the child process is forcefully killed. - Resume: Species that already have downloaded files are automatically skipped. - Retry: Failed/timed-out species are retried up to max_retries times. - Reporting: A failed_species.json is written listing all species that could not be downloaded even after retries.

def extract_file(archive_path: str, try_count: int = 0, max_try: int = 3)
Expand source code
def extract_file(
    archive_path: str,
    try_count: int = 0,
    max_try: int = 3,
):
    # pass .sdf.gz to .sdf
    sdf_file_path = Path(archive_path.replace("download_dir", "extracted_files")).with_suffix("")
    if sdf_file_path.exists():
        logger.info(f"Skipping extraction of {sdf_file_path}, already exist")
        return
    sdf_file_path.parent.mkdir(parents=True, exist_ok=True)
    logger.info(f"Extracting {archive_path} to {sdf_file_path}")
    if os.path.exists(archive_path):
        try:
            # Decompress the .gz file and save the result as .sdf
            with gzip.open(archive_path, "rb") as f_in:
                with open(sdf_file_path, "wb") as f_out:
                    shutil.copyfileobj(f_in, f_out)
        except Exception as e:
            os.remove(sdf_file_path)
            msg = str(e) + "\n" + "".join(traceback.format_exception(None, e, e.__traceback__))
            logging.error(f"[Try: {try_count + 1}]  File {archive_path} created an error : \n{msg}")
            if try_count < max_try:
                return extract_file(archive_path, try_count + 1)
    else:
        logger.error(f"File {archive_path} does not exist skipping")
def extract_refseq(output_dir, num_worker)
Expand source code
def extract_refseq(output_dir, num_worker):
    from rich.progress import track

    download_dir = Path(output_dir) / "download_dir"

    archive_paths = [str(p) for p in download_dir.rglob("*genomic.fna.gz")]

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_worker) as executor:
        list(
            track(
                executor.map(extract_file, archive_paths),
                total=len(archive_paths),
                description="Extracting...",
            )
        )
def get_species(path_species)
Expand source code
def get_species(path_species):
    import ncbi_genome_download as ngd

    group_species_map = {}
    for group in ngd.SUPPORTED_TAXONOMIC_GROUPS:
        group_path = Path(path_species) / f"{group}.txt"

        if group_path.exists():
            with open(Path(path_species) / f"{group}.txt", "r") as file:
                species = file.readlines()
                group_species_map[group] = [sp.strip() for sp in species if sp.strip() != ""]
    return group_species_map
def to_snake_case(string)
Expand source code
def to_snake_case(string):
    return re.sub(r"(?<=[a-z])(?=[A-Z])|[^a-zA-Z]", "_", string).strip("_").lower()