Module molcrawl.genome_sequence.dataset.refseq.fasta_to_raw

Functions

def fasta_to_raw_genome(output_dir: str | pathlib.Path, num_worker: int, max_lines_per_file: int)
Expand source code
def fasta_to_raw_genome(output_dir: Union[str, Path], num_worker: int, max_lines_per_file: int):
    fasta_dir = Path(output_dir) / "extracted_files"
    raw_dir = Path(output_dir) / "raw_files"
    print(f"⌛ Parsing fasta files in {fasta_dir} to raw files in {raw_dir} with {num_worker} workers.")
    raw_dir.mkdir(parents=True, exist_ok=True)
    parse_fasta_to_raw_sequence(fasta_dir, raw_dir, num_worker, max_lines_per_file)
def get_sequence_from_fasta(fasta_filepath: pathlib.Path,
max_lines_per_file: int,
num_worker: int,
sequence_list: multiprocessing.managers.ListProxy) ‑> None
Expand source code
def get_sequence_from_fasta(fasta_filepath: Path, max_lines_per_file: int, num_worker: int, sequence_list: ListProxy) -> None:
    sequence_chunk: List[str] = []
    for sequence in read_fasta_sequences(fasta_filepath):
        sequence_chunk.append(sequence)
        if len(sequence_chunk) == max_lines_per_file:
            while len(sequence_list) > num_worker:
                time.sleep(0.5)
            sequence_list.append(sequence_chunk)
            sequence_chunk = []

    if len(sequence_chunk):
        sequence_list.append(sequence_chunk)
def iterate_over_chunk_raw_files(fasta_filepaths: List[pathlib.Path], num_worker: int, max_lines_per_file: int) ‑> Iterator[list[str]]
Expand source code
def iterate_over_chunk_raw_files(fasta_filepaths: List[Path], num_worker: int, max_lines_per_file: int) -> Iterator[list[str]]:
    """
    Reads sequences from a FASTA file and yields them one by one.

    Parameters:
    - fasta_filepath: Path to the input FASTA file.

    Yields:
    - A sequence string (without the header).
    """
    unfinished_jobs = len(fasta_filepaths)
    _lock = threading.Lock()
    columns = list(pb.Progress.get_default_columns()) + [pb.MofNCompleteColumn()]
    with pb.Progress(*columns) as progress:
        task = progress.add_task("Processing fasta files to raw...", total=unfinished_jobs)

        def job_done_callback(future):
            nonlocal unfinished_jobs
            with _lock:
                unfinished_jobs -= 1
            progress.advance(task)

        sequence_chunk: List[str] = []
        with concurrent.futures.ThreadPoolExecutor(num_worker) as executor, Manager() as manager:
            sequence_list = manager.list()
            jobs = [
                executor.submit(get_sequence_from_fasta, path, max_lines_per_file, num_worker, sequence_list)
                for path in fasta_filepaths
            ]
            for job in jobs:
                job.add_done_callback(job_done_callback)
            while unfinished_jobs > 0 or len(sequence_list) > 0:
                if len(sequence_list):
                    sequence_chunk = sequence_chunk + sequence_list.pop(0)
                if len(sequence_chunk) > max_lines_per_file:
                    yield sequence_chunk[:max_lines_per_file]
                    sequence_chunk = sequence_chunk[max_lines_per_file:]
        if len(sequence_chunk):
            yield sequence_chunk

Reads sequences from a FASTA file and yields them one by one.

Parameters: - fasta_filepath: Path to the input FASTA file.

Yields: - A sequence string (without the header).

def parse_fasta_to_raw_sequence(fasta_dir, raw_dir, num_worker: int, max_lines_per_file: int) ‑> None
Expand source code
def parse_fasta_to_raw_sequence(fasta_dir, raw_dir, num_worker: int, max_lines_per_file: int) -> None:
    """
    Parses FASTA file and writes the sequences to raw files, splitting them into chunks if necessary.

    Parameters:
    - fasta_dir: Path to the input FASTA files.
    - raw_dir: Dir to save the raw files.
    - max_lines_per_file: Maximum number of lines per output file.
    """

    fasta_filepaths = [path for path in Path(fasta_dir).rglob("*.fna")]
    chunk_content_iterator = iterate_over_chunk_raw_files(fasta_filepaths, num_worker, max_lines_per_file=max_lines_per_file)

    with concurrent.futures.ThreadPoolExecutor(num_worker) as executor:
        func = partial(process_chunk, raw_dir=raw_dir, max_lines_per_file=max_lines_per_file)
        # list(rich.progress.track(pool.imap(func, enumerate(chunk_content_iterator)), "Reading and splitting fasta file in chunks..."))
        list(executor.map(func, enumerate(chunk_content_iterator)))

Parses FASTA file and writes the sequences to raw files, splitting them into chunks if necessary.

Parameters: - fasta_dir: Path to the input FASTA files. - raw_dir: Dir to save the raw files. - max_lines_per_file: Maximum number of lines per output file.

def process_chunk(id_and_chunk: Tuple[int, List[str]], raw_dir, max_lines_per_file)
Expand source code
def process_chunk(id_and_chunk: Tuple[int, List[str]], raw_dir, max_lines_per_file):
    i, chunk = id_and_chunk
    path_chunk = Path(raw_dir) / f"chunk_{max_lines_per_file * i}_{max_lines_per_file * (i + 1)}.raw"
    write_chunk_file(path_chunk, chunk)
def read_fasta_sequences(fasta_filepath: pathlib.Path) ‑> Iterator[str]
Expand source code
def read_fasta_sequences(fasta_filepath: Path) -> Iterator[str]:
    """
    Reads sequences from a FASTA file and yields them one by one.

    Parameters:
    - fasta_filepath: Path to the input FASTA file.

    Yields:
    - A sequence string (without the header).
    """
    current_sequence: List[str] = []

    with open(fasta_filepath, "r") as fasta_file:
        for line in fasta_file:
            line = line.strip()
            if line.startswith(">"):
                if current_sequence:
                    sequence_str = "".join(current_sequence)
                    if "N" not in sequence_str:
                        yield sequence_str.upper() + "\n"

                    current_sequence = []
            else:
                current_sequence.append(line)
        if current_sequence:
            sequence_str = "".join(current_sequence)
            if "N" not in sequence_str:
                yield sequence_str + "\n"

Reads sequences from a FASTA file and yields them one by one.

Parameters: - fasta_filepath: Path to the input FASTA file.

Yields: - A sequence string (without the header).

def write_chunk_file(path_file, chunk_sequence: List[str])
Expand source code
def write_chunk_file(path_file, chunk_sequence: List[str]):
    with open(path_file, "w") as raw_file:
        raw_file.writelines(chunk_sequence)