Module molcrawl.genome_sequence.dataset.refseq.download_full_refseq

Functions

def download(url: str, path: str, try_count: int = 0, max_try: int = 3) ‑> str
Expand source code
def download(url: str, path: str, try_count: int = 0, max_try: int = 3) -> str:
    """
    Download a file from the specified url.
    Skip the downloading step if there exists a file with the same name

    Parameters:
        url (str): URL to download
        path (str, optional): path to store the downloaded file. If not specify tmp file.
    """

    if not os.path.exists(path) or os.path.getsize(path) == 0:
        logger.info("Downloading %s to %s" % (url, path))
        try:
            path, _ = urlretrieve(url, path)
        except Exception as e:
            if os.path.exists(path):
                os.remove(path)
            msg = str(e) + "\n" + "".join(traceback.format_exception(None, e, e.__traceback__))
            logger.error(f"[Try: {try_count + 1}] Error while downloading {path}: \n{msg}")
            if try_count < max_try:
                return download(url, path, try_count + 1)
    else:
        logger.info("Skipping %s since already downloaded at %s" % (url, path))

    return path

Download a file from the specified url. Skip the downloading step if there exists a file with the same name

Parameters

url (str): URL to download path (str, optional): path to store the downloaded file. If not specify tmp file.

def download_refseq(output_dir: str | os.PathLike[str], num_worker: int)
Expand source code
def download_refseq(output_dir: Union[str, os.PathLike[str]], num_worker: int):
    base_url = "https://ftp.ncbi.nlm.nih.gov/refseq/release/complete/"
    logger.info(f"Downloading PubMed from {base_url}")

    output_dir = Path(output_dir)
    download_dir = output_dir / "download_dir"
    extracted_dir = output_dir / "extracted_dir"

    download_dir.mkdir(parents=True, exist_ok=True)
    files = get_list_files(base_url)
    urls = [os.path.join(base_url, file) for file in files]
    paths = [os.path.join(download_dir, file) for file in files]

    with concurrent.futures.ThreadPoolExecutor(max_workers=min(num_worker, 3)) as executor:
        archive_paths = list(
            track(
                executor.map(download, urls, paths),
                total=len(urls),
                description="Downloading...",
            )
        )

    extracted_dir.mkdir(parents=True, exist_ok=True)

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_worker) as executor:
        func = partial(extract_file, output_dir=extracted_dir)
        list(
            track(
                executor.map(func, archive_paths),
                total=len(archive_paths),
                description="Extracting...",
            )
        )
def extract_file(archive_path: str,
output_dir: os.PathLike[str],
try_count: int = 0,
max_try: int = 3)
Expand source code
def extract_file(
    archive_path: str,
    output_dir: os.PathLike[str],
    try_count: int = 0,
    max_try: int = 3,
):
    # pass .sdf.gz to .sdf
    sdf_file_path = Path(output_dir) / Path(archive_path).with_suffix("").name
    if sdf_file_path.exists():
        logger.info(f"Skipping extraction of {sdf_file_path}, already exist")
        return
    logger.info(f"Extracting {archive_path} to {sdf_file_path}")
    if os.path.exists(archive_path):
        try:
            # Decompress the .gz file and save the result as .sdf
            with gzip.open(archive_path, "rb") as f_in:
                with open(sdf_file_path, "wb") as f_out:
                    shutil.copyfileobj(f_in, f_out)
        except Exception as e:
            if os.path.exists(sdf_file_path):
                os.remove(sdf_file_path)
            msg = str(e) + "\n" + "".join(traceback.format_exception(None, e, e.__traceback__))
            logging.error(f"[Try: {try_count + 1}]  File {archive_path} created an error : \n{msg}")
            if try_count < max_try:
                return extract_file(archive_path, output_dir, try_count + 1)
    else:
        logger.error(f"File {archive_path} does not exist skipping")
def get_list_files(url: str) ‑> List[str]
Expand source code
def get_list_files(url: str) -> List[str]:
    response = requests.get(url)
    response.raise_for_status()
    # Use a regex pattern to find all href links in the page
    files = re.findall(r'href="([^"]+).genomic.fna.gz"', response.text)

    return [f"{file}.fna.gz" for file in files]