Module molcrawl.protein_sequence.dataset.uniprot.uniprot_download

Functions

def compute_md5(file_name: str, chunk_size: int = 65536) ‑> str
Expand source code
def compute_md5(file_name: str, chunk_size: int = 65536) -> str:
    """
    Compute MD5 of the file.

    Parameters:
        file_name (str): file name
        chunk_size (int, optional): chunk size for reading large files
    """

    md5 = hashlib.md5()
    with open(file_name, "rb") as fin:
        while chunk := fin.read(chunk_size):
            md5.update(chunk)
    return md5.hexdigest()

Compute MD5 of the file.

Parameters

file_name (str): file name chunk_size (int, optional): chunk size for reading large files

def download(url: str, path: str, use_md5: bool, md5: str | None = None) ‑> str
Expand source code
def download(url: str, path: str, use_md5: bool, md5: Optional[str] = None) -> str:
    """
    Download the md5 of the files
    Download a file from the specified url.
    Skip the downloading step if there exists a file satisfying the given MD5.

    Parameters:
        url (str): URL to download
        path (str, optional): path to store the downloaded file. If not specify tmp file.
        md5 (str, optional): MD5 of the file
    """

    if need_download(path, use_md5, md5):
        logger.info("Downloading %s to %s" % (url, path))
        path, _ = urlretrieve(url, path)
    else:
        logger.info("Skipping %s since already downloaded at %s" % (url, path))

    return path

Download the md5 of the files Download a file from the specified url. Skip the downloading step if there exists a file satisfying the given MD5.

Parameters

url (str): URL to download path (str, optional): path to store the downloaded file. If not specify tmp file. md5 (str, optional): MD5 of the file

def download_full_http_dir(url: str, download_dir: str | os.PathLike[str], num_worker: int) ‑> List[str]
Expand source code
def download_full_http_dir(url: str, download_dir: Union[str, os.PathLike[str]], num_worker: int) -> List[str]:
    file_md5_dict = get_url_md5_mapping(url)
    urls = [os.path.join(url, file) for file in file_md5_dict.keys()]
    paths = [os.path.join(download_dir, file) for file in file_md5_dict.keys()]
    md5s = list(file_md5_dict.values())
    with concurrent.futures.ProcessPoolExecutor(max_workers=num_worker) as executor:
        archive_paths = list(
            track(
                executor.map(_download_with_md5_check, urls, paths, md5s),
                total=len(urls),
                description="Downloading...",
            )
        )
    return archive_paths
def get_url_md5_mapping(url: str) ‑> Dict[str, str]
Expand source code
def get_url_md5_mapping(url: str) -> Dict[str, str]:
    file_path, _ = urlretrieve(os.path.join(url, "RELEASE.metalink"))

    # Parse the XML file
    tree = ET.parse(file_path)
    root = tree.getroot()

    # Define the namespace (from the XML structure)
    namespace = {"ns": "http://www.metalinker.org/"}

    file_md5_dict: Dict[str, str] = {}

    # Iterate through the XML structure to find file names and md5 hashes
    for file in root.findall(".//ns:file", namespace):
        file_name = file.attrib.get("name")
        md5_hash_elem = file.find('.//ns:hash[@type="md5"]', namespace)
        md5_hash: Optional[str] = md5_hash_elem.text if md5_hash_elem is not None else None
        if file_name and md5_hash:
            file_md5_dict[file_name] = md5_hash

    return file_md5_dict
def need_download(path: str, use_md5: bool, md5: str | None = None)
Expand source code
def need_download(path: str, use_md5: bool, md5: Optional[str] = None):
    if md5 is not None and use_md5:
        logger.info(f"Compute md5 of file {path}")
        need_download = md5 != compute_md5(path)
        if need_download:
            logger.warning(f"MD5 is different redownloading {path}")
    else:
        need_download = not os.path.exists(path) or os.path.getsize(path) == 0
    return need_download
def process_dataset(dataset: str, output_dir: str | os.PathLike[str], num_worker: int, use_md5: bool)
Expand source code
def process_dataset(dataset: str, output_dir: Union[str, os.PathLike[str]], num_worker: int, use_md5: bool):
    logging.info(f"Processing dataset {dataset}...")
    logging.info("Downloading archive from the server...")
    output_dir = Path(output_dir) / dataset
    if dataset == UniProtDatasetEnum.UniParc:
        download_dir = output_dir / "archive"
        download_dir.mkdir(parents=True, exist_ok=True)
        archive_path = download_full_http_dir(pubmed_fasta_url[dataset], download_dir, num_worker)

        fasta_dir = output_dir / "fasta_files"
        fasta_dir.mkdir(parents=True, exist_ok=True)
        paths = [fasta_dir / Path(path).with_suffix("").name for path in archive_path]
        with concurrent.futures.ThreadPoolExecutor(max_workers=num_worker) as executor:
            list(
                track(
                    executor.map(unzip_file, archive_path, paths),
                    total=len(paths),
                    description="Extracting...",
                )
            )
    else:
        output_dir.mkdir(parents=True, exist_ok=True)
        download_path = Path(output_dir) / Path(pubmed_fasta_url[dataset]).name
        downloaded_path = download(
            pubmed_fasta_url[dataset], str(download_path), use_md5=use_md5, md5=uniproto_fasta_md5[dataset]
        )
        logging.info("Decompressing the archive...")
        unzip_file(downloaded_path, Path(output_dir) / Path(downloaded_path).with_suffix("").name)
def unzip_file(archive_path: str, output_path: pathlib.Path)
Expand source code
def unzip_file(archive_path: str, output_path: Path):
    if output_path.exists():
        logger.info(f"Skipping extraction, {output_path} already exist")
        return output_path
    logger.info(f"Extracting {output_path}")
    with gzip.open(archive_path, "rt") as archive, open(output_path, "w") as file:
        file.write(archive.read())

    return output_path

Classes

class UniProtDatasetEnum
Expand source code
class UniProtDatasetEnum:
    UniprotKB_reviewed = "UniprotKB_reviewed"
    UniprotKB_unreviewed = "UniprotKB_unreviewed"
    UniRef100 = "UniRef100"
    UniRef90 = "UniRef90"
    UniRef50 = "UniRef50"
    UniParc = "UniParc"

Class variables

var UniParc
var UniRef100
var UniRef50
var UniRef90
var UniprotKB_reviewed
var UniprotKB_unreviewed