Module molcrawl.rna.dataset.cellxgene.script.download

Functions

def divide_workload(path: str | pathlib.Path, size_workload: int) ‑> List[Tuple[str, int, int, List[int]]]
Expand source code
def divide_workload(path: Union[str, Path], size_workload: int) -> List[Tuple[str, int, int, List[int]]]:
    """
    Function to split workload into specified sizes

    Read the *.obs_id.tsv file in the specified directory,
    Split each file's ID list into specified workload sizes.

        Args:
    path (Union[str, Path]): path of metadata preparation directory
    size_workload (int): Size of each workload (number of samples)

        Returns:
    List[Tuple[str, int, int, List[int]]]: List of partitioned workloads
    - str: file name (no extension)
    - int: starting line number
    - int: end line number
    - List[int]: ID list of applicable range
    """
    divided_workload = []
    for filename in Path(path).rglob("*.obs_id.tsv"):
        with open(filename, "r") as file:
            import numpy as np

            id_list = np.array(file.readlines()).astype(int)
        name = Path(filename).stem.split(".")[0]
        start_lines = range(0, len(id_list), size_workload)
        end_lines = list(range(size_workload, len(id_list), size_workload)) + [len(id_list)]

        divided_workload += [(name, start, end, id_list[start:end].tolist()) for start, end in zip(start_lines, end_lines)]

    return divided_workload

Function to split workload into specified sizes

Read the *.obs_id.tsv file in the specified directory, Split each file's ID list into specified workload sizes.

Args:

path (Union[str, Path]): path of metadata preparation directory size_workload (int): Size of each workload (number of samples)

Returns:

List[Tuple[str, int, int, List[int]]]: List of partitioned workloads - str: file name (no extension) - int: starting line number - int: end line number - List[int]: ID list of applicable range

def download(output_dir, version, num_worker, size_workload)
Expand source code
def download(output_dir, version, num_worker, size_workload):
    path_data_directory = Path(output_dir) / "metadata_preparation_dir"
    path_download_directory = Path(output_dir) / "download_dir"

    os.makedirs(path_download_directory, exist_ok=True)
    arg_list = divide_workload(path_data_directory, size_workload)

    with concurrent.futures.ThreadPoolExecutor(max_workers=num_worker) as executor:
        func = partial(run, Path(output_dir), version)
        list(
            track(
                executor.map(func, arg_list),
                description="Downloading...",
                total=len(arg_list),
            )
        )
def retrieve_adata(version: str,
id_list: List[int],
target_gene_ids: Sequence[int],
try_count: int = 0,
max_try: int = 5) ‑> Any
Expand source code
def retrieve_adata(
    version: str,
    id_list: List[int],
    target_gene_ids: Sequence[int],
    try_count: int = 0,
    max_try: int = 5,
) -> Any:
    import cellxgene_census

    census = retrieve_census(version)
    try:
        adata = cellxgene_census.get_anndata(
            census,
            organism="Homo sapiens",
            obs_coords=id_list,
            var_coords=target_gene_ids,
        )
    except KeyboardInterrupt as e:
        census.close()
        raise e
    except Exception as e:
        census.close()
        if try_count > max_try:
            raise e
        logging.warning(f"[Error] while retrieving adata, retrying (try: {try_count + 1})")
        time.sleep(10)
        return retrieve_adata(version, id_list, target_gene_ids, try_count + 1)
    else:
        census.close()
    return adata
def retrieve_census(version: str, try_count: int = 0, max_try: int = 5) ‑> Any
Expand source code
def retrieve_census(version: str, try_count: int = 0, max_try: int = 5) -> Any:
    import cellxgene_census
    import tiledbsoma

    # Access S3 directly instead of via the SOMA HTTPS API endpoint.
    # This bypasses the API server bottleneck and reads TileDB chunks from S3
    # client-side, which is significantly faster for bulk downloads.
    ctx = tiledbsoma.SOMATileDBContext(
        tiledb_config={
            "vfs.s3.region": "us-west-2",
            "vfs.s3.no_sign_request": "true",
        }
    )
    try:
        return cellxgene_census.open_soma(census_version=version, context=ctx)
    except KeyboardInterrupt as e:
        raise e
    except Exception as e:
        if try_count > max_try:
            raise e
        logging.warning(f"[Error] while retrieving census, retrying (try: {try_count + 1})")
        time.sleep(10)
        return retrieve_census(version, try_count + 1)
def run(output_dir: pathlib.Path, version, argv: Tuple[str, int, int, List[int]]) ‑> None
Expand source code
def run(output_dir: Path, version, argv: Tuple[str, int, int, List[int]]) -> None:
    import pandas as pd

    name, start_l, end_l, id_list = argv
    save_filename = output_dir / f"download_dir/{name}.{start_l:08d}-{end_l:08d}.h5ad"
    if save_filename.exists():
        try:
            import h5py

            with h5py.File(save_filename, "r") as _f:
                pass  # header check only — avoids loading full data into memory
            logging.info(f"{save_filename} exists, skipping download")
            return
        except Exception as e:
            logging.warning(f"{save_filename} is corrupt ({e}), re-downloading")
            save_filename.unlink()

    tsv_file = output_dir / "metadata_preparation_dir" / f"{name}.var.tsv"
    target_var = pd.read_csv(tsv_file, sep="\t", index_col=0)
    target_gene_ids = target_var["soma_joinid"].to_numpy()

    for attempt in range(1, _DOWNLOAD_MAX_RETRY + 1):
        # Each attempt runs in a dedicated thread so we can apply a hard wall-clock
        # timeout via future.result(timeout=…).  The abandoned thread will terminate
        # on its own once _SOCKET_TIMEOUT_SEC fires on the next socket recv().
        _exec = concurrent.futures.ThreadPoolExecutor(max_workers=1)
        fut = _exec.submit(_do_download_and_write, save_filename, version, id_list, target_gene_ids)
        timed_out = failed = False
        try:
            fut.result(timeout=_DOWNLOAD_TOTAL_TIMEOUT_SEC)
        except concurrent.futures.TimeoutError:
            timed_out = True
            logging.warning(
                f"[Timeout] Download exceeded {_DOWNLOAD_TOTAL_TIMEOUT_SEC}s wall-clock "
                f"(attempt {attempt}/{_DOWNLOAD_MAX_RETRY}): {save_filename}"
            )
        except Exception as exc:
            failed = True
            logging.warning(f"[Error] Download failed: {exc} (attempt {attempt}/{_DOWNLOAD_MAX_RETRY}): {save_filename}")
        finally:
            _exec.shutdown(wait=False)  # don't block; stuck thread ends via socket timeout

        if not timed_out and not failed:
            logging.info(f"Downloaded {save_filename} ({len(id_list)} cells)")
            return  # success

        # Remove any partial file before retrying
        if save_filename.exists():
            try:
                save_filename.unlink()
            except OSError:
                pass

        if attempt < _DOWNLOAD_MAX_RETRY:
            time.sleep(10)

    # All retries exhausted — log and skip so the worker continues with remaining chunks
    # instead of crashing the entire ThreadPoolExecutor.
    logging.error(f"Skipping {save_filename}: all {_DOWNLOAD_MAX_RETRY} download attempts failed")