Module molcrawl.rna.dataset.cellxgene.script.download_by_dataset

Per-dataset download strategy for CellxGene census data.

Problem with the original download.py approach: get_anndata(obs_coords=[5000 arbitrary soma_joinids]) → These IDs span many datasets/TileDB fragments → expensive random I/O.

This module instead: 1. Queries census obs metadata once per tissue (no X, fast) → soma_joinid→dataset_id 2. Groups our needed obs_ids by dataset_id 3. For each dataset: queries get_anndata in sub-batches of _MAX_CELLS_PER_BATCH cells, routes them into chunk buffers, and flushes complete chunks to disk immediately to keep peak memory low.

Compatible drop-in replacement for download.download().

Functions

def download(output_dir: str, version: str, num_worker: int, size_workload: int) ‑> None
Expand source code
def download(
    output_dir: str,
    version: str,
    num_worker: int,
    size_workload: int,
) -> None:
    """Drop-in replacement for download.download() using per-dataset queries.

    Processes tissues sequentially; within each tissue, dataset queries run
    sequentially to avoid SOMA rate-limiting.  num_worker is accepted for
    interface compatibility but currently unused at the tissue level.
    """
    output_path = Path(output_dir)
    output_path.joinpath("download_dir").mkdir(exist_ok=True, parents=True)

    remaining = _find_remaining_chunks(output_path, size_workload)
    if not remaining:
        logger.info("All chunks already downloaded.")
        return

    total_chunks = sum(len(v) for v in remaining.values())
    logger.info(
        f"download_by_dataset: {len(remaining)} tissues, "
        f"{total_chunks} remaining chunks"
    )
    for tissue, n in sorted(
        ((t, len(c)) for t, c in remaining.items()), key=lambda x: -x[1]
    ):
        logger.info(f"  {tissue}: {n} chunks remaining")

    total_written = 0
    for tissue, chunks in sorted(
        remaining.items(), key=lambda x: -len(x[1])
    ):
        logger.info(f"=== Processing tissue: {tissue} ({len(chunks)} chunks) ===")
        written = _process_tissue(output_path, version, tissue, chunks)
        total_written += written
        logger.info(f"[{tissue}] Done: {written}/{len(chunks)} chunks written")

    logger.info(f"download_by_dataset complete: {total_written}/{total_chunks} chunks written")

Drop-in replacement for download.download() using per-dataset queries.

Processes tissues sequentially; within each tissue, dataset queries run sequentially to avoid SOMA rate-limiting. num_worker is accepted for interface compatibility but currently unused at the tissue level.