From 5b81927132898a41368b2dbd7373ab9cd001ecdd Mon Sep 17 00:00:00 2001 From: tzeitim Date: Thu, 25 Dec 2025 04:44:18 +0200 Subject: [PATCH 1/5] implemented assembly function that uses expressions instead of strings The assemble_sequences_with_anchors function is implemented and working. Summary of Changes Rust (src/expressions.rs:325-410): - Added assemble_sequences_with_anchors_expr function - Takes 3 input series: sequences, start_anchor, end_anchor - Extracts anchor strings from first row of each anchor column - Only supports shortest_path method (compression/auto don't need anchors) Python (rogtk/__init__.py:155-231): - Added assemble_sequences_with_anchors() wrapper function - Takes expr, start_anchor_col, end_anchor_col as expression arguments - Same kwargs as original: k, min_coverage, method, etc. Usage segments.group_by(['umi', 'start_meta', 'end_meta']).agg( rogtk.assemble_sequences_with_anchors( expr=pl.col('segment_seq'), start_anchor_col=pl.first('start_meta_seq'), end_anchor_col=pl.first('end_meta_seq'), k=15, min_coverage=20, ).alias('consensus_seq') ) --- rogtk/__init__.py | 78 +++++++++++++++++++++++++++++++++++++++++ src/expressions.rs | 87 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+) diff --git a/rogtk/__init__.py b/rogtk/__init__.py index fe61cd6..d3329ef 100755 --- a/rogtk/__init__.py +++ b/rogtk/__init__.py @@ -152,6 +152,84 @@ def assemble_sequences( is_elementwise=False, ) +def assemble_sequences_with_anchors( + expr: IntoExpr, + start_anchor_col: IntoExpr, + end_anchor_col: IntoExpr, + k: int = 17, + min_coverage: int = 25, + method: str = 'shortest_path', + min_length: int | None = None, + export_graphs: bool = False, + auto_k: bool = False, + prefix: str | None = None +) -> pl.Expr: + """ + Assemble DNA sequences with dynamic per-group anchor sequences from columns. + + This function enables single group_by assembly operations where each group + can have different start/end anchors, avoiding slow Python loops over segment types. + + Parameters + ---------- + expr : IntoExpr + Input expression containing DNA sequences to assemble + start_anchor_col : IntoExpr + Column expression containing start anchor sequences (first value per group used) + end_anchor_col : IntoExpr + Column expression containing end anchor sequences (first value per group used) + k : int, default 17 + K-mer size for graph construction + min_coverage : int, default 25 + Minimum k-mer coverage threshold + method : str, default 'shortest_path' + Assembly method (only 'shortest_path' supported with dynamic anchors) + min_length : int, optional + Minimum contig length to return + export_graphs : bool, default False + Whether to export graph visualization files + auto_k : bool, default False + Whether to automatically adjust k-mer size + prefix : str, optional + Prefix for output files when export_graphs is True + + Returns + ------- + pl.Expr + Expression returning assembled consensus sequence(s) + + Examples + -------- + >>> segments.group_by(['umi', 'start_meta', 'end_meta']).agg( + ... rogtk.assemble_sequences_with_anchors( + ... expr=pl.col('segment_seq'), + ... start_anchor_col=pl.first('start_meta_seq'), + ... end_anchor_col=pl.first('end_meta_seq'), + ... k=15, + ... min_coverage=20, + ... ).alias('consensus_seq') + ... ) + """ + return register_plugin_function( + plugin_path=Path(__file__).parent, + function_name="assemble_sequences_with_anchors_expr", + args=[expr, start_anchor_col, end_anchor_col], + kwargs={ + "k": k, + "min_coverage": min_coverage, + "method": method, + "start_anchor": None, # Not used - anchors come from columns + "end_anchor": None, # Not used - anchors come from columns + "min_length": min_length, + "export_graphs": export_graphs, + "only_largest": False, + "auto_k": auto_k, + "prefix": prefix + }, + returns_scalar=True, + is_elementwise=False, + ) + def sweep_assembly_params( expr: IntoExpr, k_start: int = 5, diff --git a/src/expressions.rs b/src/expressions.rs index 52efc7b..650ffee 100755 --- a/src/expressions.rs +++ b/src/expressions.rs @@ -322,6 +322,93 @@ fn assemble_sequences_expr(inputs: &[Series], kwargs: AssemblyKwargs) -> PolarsR ).into_series()) } +/// Assembly expression that takes anchor sequences from input columns instead of kwargs. +/// This enables per-group dynamic anchors in group_by operations. +/// +/// inputs[0] = sequences (String column) +/// inputs[1] = start_anchor (String column - first value used) +/// inputs[2] = end_anchor (String column - first value used) +#[polars_expr(output_type_func=output_string_type)] +fn assemble_sequences_with_anchors_expr(inputs: &[Series], kwargs: AssemblyKwargs) -> PolarsResult { + debug!("Received kwargs for dynamic anchors: {:?}", kwargs); + + // Validate we have at least 3 inputs + if inputs.len() < 3 { + return Err(PolarsError::ComputeError( + "assemble_sequences_with_anchors requires 3 inputs: sequences, start_anchor, end_anchor".into() + )); + } + + // Extract anchor strings from the first row of each anchor column + let start_anchor_series = inputs[1].str()?; + let end_anchor_series = inputs[2].str()?; + + let start_anchor = start_anchor_series.get(0) + .ok_or_else(|| PolarsError::ComputeError( + "start_anchor column is empty".into() + ))? + .to_string(); + + let end_anchor = end_anchor_series.get(0) + .ok_or_else(|| PolarsError::ComputeError( + "end_anchor column is empty".into() + ))? + .to_string(); + + debug!("Dynamic anchors - start: {}, end: {}", start_anchor, end_anchor); + + // Build assembly method - for dynamic anchors, we only support shortest_path + let method = match kwargs.method.as_str() { + "compression" => { + return Err(PolarsError::ComputeError( + "compression method is not supported with dynamic anchors; use shortest_path".into() + )); + }, + "shortest_path" => AssemblyMethod::ShortestPath { + start_anchor: start_anchor.clone(), + end_anchor: end_anchor.clone(), + }, + "shortest_path_auto" => { + return Err(PolarsError::ComputeError( + "shortest_path_auto method is not supported with dynamic anchors; use shortest_path".into() + )); + }, + _ => return Err(PolarsError::ComputeError( + "Invalid assembly method for dynamic anchors. Must be 'shortest_path'".into() + )), + }; + + // Extract sequences from input series + let ca = inputs[0].str()?; + + // Convert string chunk to Vec + let sequences: Vec = ca.into_iter() + .flatten() + .map(|s| s.to_string()) + .collect(); + + // Call assembly function + let contigs = assemble_sequences( + sequences, + kwargs.k, + kwargs.min_coverage, + method, + kwargs.export_graphs, + Some(true), // Hardcoded to true + kwargs.min_length, + kwargs.auto_k, + kwargs.prefix, + ).map_err(|e| PolarsError::ComputeError( + format!("Assembly failed: {}", e).into() + ))?; + + let result = contigs.join("\n"); + Ok(StringChunked::from_slice( + PlSmallStr::from_str("assembled_sequences"), + &[result.as_str()] + ).into_series()) +} + #[derive(Deserialize)] #[allow(dead_code)] struct SweepParams { From 363ba16b517d822e4259c7543d7a3467a2a76a9b Mon Sep 17 00:00:00 2001 From: tzeitim Date: Thu, 25 Dec 2025 04:51:27 +0200 Subject: [PATCH 2/5] added expression based anchor sequence passing --- Cargo.toml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index dc00d6d..8477306 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rogtk" -version = "0.1.19" +version = "0.1.20" authors = [ "Pedro Olivares Chauvet ", "Pedro Olivares Chauvet ", diff --git a/pyproject.toml b/pyproject.toml index 288c623..c721221 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "maturin" [project] name = "rogtk" -version = "0.1.19" +version = "0.1.20" description = "Rust-based genomics toolkit - high-performance backend for ogtk" authors = [ {name = "Pedro Olivares", email = "pedroy.final@gmail.com"} From 1b033d7a32b98f674968861f72998773f37a61a6 Mon Sep 17 00:00:00 2001 From: tzeitim Date: Tue, 27 Jan 2026 21:26:15 +0200 Subject: [PATCH 3/5] Add CIGAR insertion extraction Polars plugin MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Rust expressions and Python wrappers to extract actual insertion sequences from CIGAR strings and enrich cassiopeia allele notation. Rust (src/expressions.rs): - extract_insertions_from_cigar(): Parse CIGAR to extract insertion sequences - enrich_allele_with_insertions(): Transform [pos:NI] to [pos:NI:SEQ] - enrich_allele_insertions_expr: Polars expression (allele, seq, cigar) - extract_cigar_insertions_expr: Polars expression (seq, cigar) - Handle 1-based to 0-based position conversion for allele notation Python (rogtk/__init__.py): - CigarNamespace: Registered as pl.Expr.cigar namespace - enrich_insertions(): Method to enrich allele strings - extract_cigar_insertions(): Standalone function for extraction Example: [78:5I] → [78:5I:ACTTA] --- rogtk/__init__.py | 103 +++++++++++++++++++- src/expressions.rs | 231 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 329 insertions(+), 5 deletions(-) diff --git a/rogtk/__init__.py b/rogtk/__init__.py index d3329ef..53bb9bb 100755 --- a/rogtk/__init__.py +++ b/rogtk/__init__.py @@ -490,26 +490,26 @@ def dust_score(self) -> pl.Expr: def umi_complexity_scores(expr: IntoExpr) -> pl.Expr: """ Calculate all UMI complexity scores and return them as separate columns. - + This function returns a struct containing: - shannon_entropy: Shannon entropy of nucleotide frequencies - - linguistic_complexity: Ratio of unique k-mers to total possible k-mers + - linguistic_complexity: Ratio of unique k-mers to total possible k-mers - homopolymer_fraction: Fraction of sequence in homopolymer runs (3+ bases) - dinucleotide_entropy: Entropy of dinucleotide frequencies - longest_homopolymer_run: Length of longest homopolymer run - dust_score: DUST score for low-complexity detection - combined_score: Weighted combination of all metrics - + Parameters ---------- expr : IntoExpr Input expression containing UMI sequences - + Returns ------- pl.Expr Struct expression with all complexity scores - + Examples -------- >>> df.with_columns(umi_complexity_scores(pl.col("umi"))) @@ -521,3 +521,96 @@ def umi_complexity_scores(expr: IntoExpr) -> pl.Expr: args=expr, is_elementwise=True, ) + + +@pl.api.register_expr_namespace("cigar") +class CigarNamespace: + """Namespace for CIGAR string operations on Polars expressions.""" + + def __init__(self, expr: pl.Expr): + self._expr = expr + + def enrich_insertions(self, seq_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: + """Enrich allele strings with actual insertion sequences from CIGAR. + + Transforms [pos:NI] notation to [pos:NI:SEQUENCE] by extracting + the actual inserted bases from the aligned sequence using the CIGAR string. + + Parameters + ---------- + seq_col : IntoExpr + Column containing the aligned sequence (e.g., 'Seq') + cigar_col : IntoExpr + Column containing the CIGAR string + + Returns + ------- + pl.Expr + Enriched allele string with insertion sequences + + Examples + -------- + >>> # Enrich a single allele column + >>> df.with_columns( + ... pl.col('r1').cigar.enrich_insertions(pl.col('Seq'), pl.col('CIGAR')) + ... ) + + >>> # Enrich multiple allele columns (r1-r15) + >>> df.with_columns([ + ... pl.col(f'r{i}').cigar.enrich_insertions(pl.col('Seq'), pl.col('CIGAR')).alias(f'r{i}') + ... for i in range(1, 16) + ... ]) + + Notes + ----- + Input allele format: "TAGTCATTAC[78:5I]ACTTAGACAGGTG" + Output format: "TAGTCATTAC[78:5I:GCTAG]ACTTAGACAGGTG" + + The CIGAR string is parsed to locate insertion positions and extract + the actual inserted bases from the sequence column. + """ + return register_plugin_function( + plugin_path=Path(__file__).parent, + function_name="enrich_allele_insertions_expr", + args=[self._expr, seq_col, cigar_col], + is_elementwise=True, + ) + + +def extract_cigar_insertions(seq_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: + """Extract all insertions from CIGAR alignment as position:sequence pairs. + + Parses CIGAR strings to find all insertion operations and extracts the + actual inserted bases from the corresponding sequence. + + Parameters + ---------- + seq_col : IntoExpr + Column containing the aligned sequence + cigar_col : IntoExpr + Column containing the CIGAR string + + Returns + ------- + pl.Expr + String in format "pos1:seq1|pos2:seq2|..." where each pair represents + an insertion at a reference position with its actual sequence. + Empty string if no insertions found. + + Examples + -------- + >>> df.with_columns( + ... rogtk.extract_cigar_insertions(pl.col('Seq'), pl.col('CIGAR')).alias('insertions') + ... ) + + >>> # Filter to rows with insertions + >>> df.filter( + ... rogtk.extract_cigar_insertions(pl.col('Seq'), pl.col('CIGAR')).str.len_chars() > 0 + ... ) + """ + return register_plugin_function( + plugin_path=Path(__file__).parent, + function_name="extract_cigar_insertions_expr", + args=[seq_col, cigar_col], + is_elementwise=True, + ) diff --git a/src/expressions.rs b/src/expressions.rs index 650ffee..9726dc6 100755 --- a/src/expressions.rs +++ b/src/expressions.rs @@ -7,6 +7,237 @@ use crate::djfind::AssemblyMethod; use crate::umi_score::calculate_umi_complexity; use log::debug; +use std::collections::HashMap; + +// ============================================================================ +// CIGAR Insertion Extraction Functions +// ============================================================================ + +/// Represents an insertion extracted from CIGAR alignment +#[derive(Debug, Clone)] +struct Insertion { + ref_pos: usize, + length: usize, + sequence: String, +} + +/// Parse CIGAR string and extract insertions with their actual sequences from the query. +/// +/// Returns a HashMap mapping reference position -> inserted sequence +fn extract_insertions_from_cigar(seq: &str, cigar: &str) -> HashMap { + let mut insertions = HashMap::new(); + let mut num_buf = String::new(); + let mut seq_pos: usize = 0; // position in query sequence + let mut ref_pos: usize = 0; // position in reference + + let seq_bytes = seq.as_bytes(); + + for c in cigar.chars() { + if c.is_ascii_digit() { + num_buf.push(c); + } else { + if let Ok(len) = num_buf.parse::() { + match c { + 'M' | '=' | 'X' => { + // Match/mismatch: advances both + seq_pos += len; + ref_pos += len; + } + 'I' => { + // Insertion: extract the actual inserted bases + if seq_pos + len <= seq_bytes.len() { + let inserted = String::from_utf8_lossy( + &seq_bytes[seq_pos..seq_pos + len] + ).to_string(); + insertions.insert(ref_pos, inserted); + } + seq_pos += len; + // ref_pos doesn't advance for insertions + } + 'D' | 'N' => { + // Deletion/skip: only advances reference + ref_pos += len; + } + 'S' => { + // Soft clip: only advances query + seq_pos += len; + } + 'H' | 'P' => { + // Hard clip/padding: doesn't advance either + } + _ => {} + } + } + num_buf.clear(); + } + } + + insertions +} + +/// Enrich an allele string by replacing [pos:NI] with [pos:NI:ACTG] +/// +/// Input: "TAGTCATTAC[78:5I]ACTTAGACAGGTG" +/// Output: "TAGTCATTAC[78:5I:GCTAG]ACTTAGACAGGTG" +fn enrich_allele_with_insertions( + allele: &str, + insertions: &HashMap, + output: &mut String +) { + let mut chars = allele.chars().peekable(); + + while let Some(c) = chars.next() { + if c == '[' { + // Start of CIGAR notation - parse it + let mut bracket_content = String::new(); + let mut found_close = false; + + for inner in chars.by_ref() { + if inner == ']' { + found_close = true; + break; + } + bracket_content.push(inner); + } + + if found_close { + // Parse content like "78:5I" or "20:432D" or "None" + if bracket_content == "None" { + output.push('['); + output.push_str(&bracket_content); + output.push(']'); + } else if let Some((pos_str, rest)) = bracket_content.split_once(':') { + if let Ok(pos) = pos_str.parse::() { + // Check if it's an insertion (ends with I) + if rest.ends_with('I') { + // It's an insertion - try to enrich with actual sequence + // Allele notation uses 1-based positions, CIGAR uses 0-based + // Try both pos-1 (convert to 0-based) and pos (in case already 0-based) + let ins_seq = if pos > 0 { + insertions.get(&(pos - 1)).or_else(|| insertions.get(&pos)) + } else { + insertions.get(&pos) + }; + if let Some(seq) = ins_seq { + output.push('['); + output.push_str(&bracket_content); + output.push(':'); + output.push_str(seq); + output.push(']'); + } else { + // No insertion found at this position, keep original + output.push('['); + output.push_str(&bracket_content); + output.push(']'); + } + } else { + // Not an insertion (deletion, etc.) - keep as-is + output.push('['); + output.push_str(&bracket_content); + output.push(']'); + } + } else { + // Couldn't parse position - keep original + output.push('['); + output.push_str(&bracket_content); + output.push(']'); + } + } else { + // Invalid format - keep original + output.push('['); + output.push_str(&bracket_content); + output.push(']'); + } + } else { + // No closing bracket found - just output what we have + output.push('['); + output.push_str(&bracket_content); + } + } else { + output.push(c); + } + } +} + +/// Polars expression: Enrich allele strings with actual insertion sequences +/// +/// Takes 3 inputs: +/// - inputs[0]: allele string column (e.g., "TAGTCATTAC[78:5I]ACTTAGACAGGTG") +/// - inputs[1]: sequence column (the full aligned sequence) +/// - inputs[2]: CIGAR string column +/// +/// Returns: enriched allele string with insertion sequences +#[polars_expr(output_type=String)] +fn enrich_allele_insertions_expr(inputs: &[Series]) -> PolarsResult { + let allele_ca: &StringChunked = inputs[0].str()?; + let seq_ca: &StringChunked = inputs[1].str()?; + let cigar_ca: &StringChunked = inputs[2].str()?; + + // Process each row + let results: StringChunked = allele_ca + .into_iter() + .zip(seq_ca.into_iter()) + .zip(cigar_ca.into_iter()) + .map(|((allele_opt, seq_opt), cigar_opt)| { + match (allele_opt, seq_opt, cigar_opt) { + (Some(allele), Some(seq), Some(cigar)) => { + let insertions = extract_insertions_from_cigar(seq, cigar); + let mut output = String::with_capacity(allele.len() + 50); + enrich_allele_with_insertions(allele, &insertions, &mut output); + Some(output) + } + (Some(allele), _, _) => Some(allele.to_string()), // Return original if missing seq/cigar + _ => None, + } + }) + .collect(); + + Ok(results.into_series()) +} + +/// Polars expression: Extract all insertions from CIGAR as a string map +/// +/// Takes 2 inputs: +/// - inputs[0]: sequence column +/// - inputs[1]: CIGAR string column +/// +/// Returns: String in format "pos1:seq1|pos2:seq2|..." +#[polars_expr(output_type=String)] +fn extract_cigar_insertions_expr(inputs: &[Series]) -> PolarsResult { + let seq_ca: &StringChunked = inputs[0].str()?; + let cigar_ca: &StringChunked = inputs[1].str()?; + + let results: StringChunked = seq_ca + .into_iter() + .zip(cigar_ca.into_iter()) + .map(|(seq_opt, cigar_opt)| { + match (seq_opt, cigar_opt) { + (Some(seq), Some(cigar)) => { + let insertions = extract_insertions_from_cigar(seq, cigar); + if insertions.is_empty() { + Some(String::new()) + } else { + let mut pairs: Vec<_> = insertions.into_iter().collect(); + pairs.sort_by_key(|(pos, _)| *pos); + let result = pairs + .into_iter() + .map(|(pos, seq)| format!("{}:{}", pos, seq)) + .collect::>() + .join("|"); + Some(result) + } + } + _ => None, + } + }) + .collect(); + + Ok(results.into_series()) +} + +// ============================================================================ +// Original CIGAR parsing (indel positions only, no sequences) +// ============================================================================ fn parse_cigar_str(cigar: &str, output: &mut String, block_dels: bool) { let mut num_buf = String::new(); From 50383ebc672f168a2302a85abfce75ec2d520c55 Mon Sep 17 00:00:00 2001 From: tzeitim Date: Tue, 10 Feb 2026 11:40:14 +0200 Subject: [PATCH 4/5] Added bams_to_parquet() for multi-BAM to single parquet conversion Process multiple BAM files into a single parquet file with bounded memory. Streams batches sequentially from each BAM, writing directly to parquet without intermediate files or memory spikes. Features: - Bounded memory usage via batch streaming (configurable batch_size) - Optional source_file column to track origin BAM per record - Input validation, progress reporting, and Python interrupt handling - Supports all existing options (compression, sequence/quality inclusion, limit) Performance: ~43k records/sec (1.86M records from 46 BAMs in 43s) Usage: rogtk.bams_to_parquet(bam_list, "output.parquet", include_source_file=True) lf = pl.scan_parquet("output.parquet") # lazy interface for downstream ops --- rogtk/__init__.py | 80 +++++++++++++++ src/bam.rs | 240 ++++++++++++++++++++++++++++++++++++++++++++- src/expressions.rs | 206 ++++++++++++++++++++++++++++++++++++++ src/lib.rs | 3 +- 4 files changed, 525 insertions(+), 4 deletions(-) diff --git a/rogtk/__init__.py b/rogtk/__init__.py index 53bb9bb..1873b2f 100755 --- a/rogtk/__init__.py +++ b/rogtk/__init__.py @@ -17,6 +17,7 @@ fracture_fasta, fracture_sequences, bam_to_parquet, + bams_to_parquet, bam_to_arrow_ipc, bam_to_arrow_ipc_parallel, bam_to_arrow_ipc_gzp_parallel, @@ -41,6 +42,7 @@ fracture_fasta, fracture_sequences, bam_to_parquet, + bams_to_parquet, bam_to_arrow_ipc, bam_to_arrow_ipc_parallel, bam_to_arrow_ipc_gzp_parallel, @@ -576,6 +578,84 @@ def enrich_insertions(self, seq_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: is_elementwise=True, ) + def align_to_ref(self, query_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: + """Expand CIGAR to show aligned reference with dashes for insertions. + + Takes a reference sequence (from self._expr), query sequence, and CIGAR string, + and produces an aligned reference string where: + - Matched bases appear uppercase + - Insertions (bases in query not in ref) are shown as dashes '-' + - Deletions consume reference bases normally + + Parameters + ---------- + query_col : IntoExpr + Column containing the query/read sequence + cigar_col : IntoExpr + Column containing the CIGAR string + + Returns + ------- + pl.Expr + Aligned reference string with dashes where query has insertions + + Examples + -------- + >>> # With a literal reference + >>> df.with_columns( + ... pl.lit(ref_seq).cigar.align_to_ref(pl.col('Seq'), pl.col('CIGAR')).alias('aligned_ref') + ... ) + + >>> # Compare aligned strings visually: + >>> # aligned_ref: ATCG----TACG + >>> # aligned_query: ATCGACTGTACG + """ + return register_plugin_function( + plugin_path=Path(__file__).parent, + function_name="cigar_aligned_ref_expr", + args=[self._expr, query_col, cigar_col], + is_elementwise=True, + ) + + def align_to_query(self, query_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: + """Expand CIGAR to show aligned query with dashes for deletions. + + Takes a reference sequence (from self._expr), query sequence, and CIGAR string, + and produces an aligned query string where: + - Matched bases appear uppercase + - Deletions (bases in ref not in query) are shown as dashes '-' + - Soft-clipped bases appear as lowercase (to distinguish from aligned bases) + + Parameters + ---------- + query_col : IntoExpr + Column containing the query/read sequence + cigar_col : IntoExpr + Column containing the CIGAR string + + Returns + ------- + pl.Expr + Aligned query string with dashes where reference has deletions + + Examples + -------- + >>> # With a literal reference + >>> df.with_columns( + ... pl.lit(ref_seq).cigar.align_to_query(pl.col('Seq'), pl.col('CIGAR')).alias('aligned_query') + ... ) + + >>> # Soft-clipped bases appear lowercase: + >>> # aligned_ref: ----ATCGTACG + >>> # aligned_query: atcgATCGTACG (first 4 bases were soft-clipped) + """ + return register_plugin_function( + plugin_path=Path(__file__).parent, + function_name="cigar_aligned_query_expr", + args=[self._expr, query_col, cigar_col], + is_elementwise=True, + ) + def extract_cigar_insertions(seq_col: IntoExpr, cigar_col: IntoExpr) -> pl.Expr: """Extract all insertions from CIGAR alignment as position:sequence pairs. diff --git a/src/bam.rs b/src/bam.rs index 4e566f0..5f8785e 100644 --- a/src/bam.rs +++ b/src/bam.rs @@ -398,7 +398,7 @@ pub fn bam_to_parquet( .map_err(|e| PyErr::new::(format!("Failed to close Parquet writer: {}", e)))?; let completion_msg = if limit.is_some() { - format!("Conversion complete: {} records (limited from potentially more) written to {}", + format!("Conversion complete: {} records (limited from potentially more) written to {}", total_records, parquet_path) } else { format!("Conversion complete: {} records written to {}", total_records, parquet_path) @@ -407,10 +407,244 @@ pub fn bam_to_parquet( Ok(()) } +/// Process multiple BAM files into a single parquet file with bounded memory. +/// +/// This function streams batches from each BAM file sequentially, writing them +/// to a single parquet file. Memory usage is bounded by batch_size, regardless +/// of the total number of input files or records. +/// +/// After writing, use `pl.scan_parquet()` for lazy operations on the result. #[pyfunction] #[pyo3(signature = ( - bam_path, - arrow_ipc_path, + bam_paths, + parquet_path, + batch_size = 50000, + include_sequence = true, + include_quality = true, + compression = "snappy", + limit = None, + include_source_file = false +))] +pub fn bams_to_parquet( + bam_paths: Vec, + parquet_path: &str, + batch_size: usize, + include_sequence: bool, + include_quality: bool, + compression: &str, + limit: Option, + include_source_file: bool, +) -> PyResult<()> { + if bam_paths.is_empty() { + return Err(PyErr::new::( + "No BAM files provided" + )); + } + + if batch_size == 0 { + return Err(PyErr::new::( + "batch_size must be greater than 0" + )); + } + + // Validate all input files exist + for bam_path in &bam_paths { + let input_path = Path::new(bam_path); + if !input_path.exists() { + return Err(PyErr::new::( + format!("BAM file does not exist: {}", bam_path) + )); + } + } + + let output_path = Path::new(parquet_path); + let effective_batch_size = batch_size.min(1000000); + + // Create output directory if needed + if let Some(parent) = output_path.parent() { + if !parent.as_os_str().is_empty() { + std::fs::create_dir_all(parent) + .map_err(|e| PyErr::new::( + format!("Failed to create output directory: {}", e) + ))?; + } + } + + // Create schema (with optional source_file column) + let schema = create_bam_schema_with_source(include_sequence, include_quality, include_source_file); + + let writer_props = WriterProperties::builder() + .set_compression(parse_compression(compression)) + .set_encoding(Encoding::PLAIN) + .build(); + + let output_file = File::create(output_path) + .map_err(|e| PyErr::new::(format!("Failed to create output file '{}': {}", parquet_path, e)))?; + + let mut parquet_writer = ArrowWriter::try_new(output_file, schema.clone(), Some(writer_props)) + .map_err(|e| PyErr::new::(format!("Failed to create Parquet writer: {}", e)))?; + + let mut reusable_buffers = ReusableBuffers::new(effective_batch_size); + let mut total_records: usize = 0; + let mut batch_count = 0; + let target_records = limit.unwrap_or(usize::MAX); + let num_files = bam_paths.len(); + + eprintln!("Processing {} BAM files to {}", num_files, parquet_path); + + for (file_idx, bam_path) in bam_paths.iter().enumerate() { + // Check if we've reached the limit + if total_records >= target_records { + eprintln!("Reached limit of {} records", target_records); + break; + } + + // Extract filename for source_file column + let source_filename = Path::new(bam_path) + .file_name() + .and_then(|n| n.to_str()) + .unwrap_or(bam_path) + .to_string(); + + eprintln!("[{}/{}] Processing: {}", file_idx + 1, num_files, source_filename); + + let mut file = File::open(bam_path) + .map_err(|e| PyErr::new::(format!("Failed to open BAM file '{}': {}", bam_path, e)))?; + + let mut bam_reader = bam::io::Reader::new(&mut file); + + let header = bam_reader.read_header() + .map_err(|e| PyErr::new::(format!("Failed to read BAM header for '{}': {}", bam_path, e)))?; + + let mut file_records = 0; + + loop { + // Check for Python interrupts periodically + if batch_count % 10 == 0 { + Python::with_gil(|py| { + py.check_signals().map_err(|e| { + eprintln!("Conversion interrupted by user"); + e + }) + })?; + } + + let remaining_records = target_records.saturating_sub(total_records); + if remaining_records == 0 { + break; + } + + let current_batch_size = effective_batch_size.min(remaining_records); + + let batch = read_bam_batch_enhanced( + &mut bam_reader, + &header, + current_batch_size, + include_sequence, + include_quality, + &mut reusable_buffers, + )?; + + if batch.num_rows() == 0 { + break; + } + + // If source_file column is requested, add it to the batch + let final_batch = if include_source_file { + add_source_file_column(&batch, &source_filename, &schema)? + } else { + batch + }; + + parquet_writer.write(&final_batch) + .map_err(|e| PyErr::new::(format!("Failed to write batch {}: {}", batch_count, e)))?; + + let rows_written = final_batch.num_rows(); + total_records += rows_written; + file_records += rows_written; + batch_count += 1; + + // Progress reporting every 200 batches + if batch_count % 200 == 0 { + let progress_msg = if let Some(limit_val) = limit { + format!("Processed {} batches, {} / {} records ({:.1}%)", + batch_count, total_records, limit_val, + 100.0 * total_records as f64 / limit_val as f64) + } else { + format!("Processed {} batches, {} total records", + batch_count, total_records) + }; + eprintln!("Progress: {}", progress_msg); + + // Force garbage collection every 400 batches + if batch_count % 400 == 0 { + Python::with_gil(|py| { + let gc = py.import_bound("gc").unwrap(); + let _ = gc.call_method0("collect"); + }); + } + } + } + + eprintln!(" -> {} records from {}", file_records, source_filename); + // BAM reader and file are dropped here, releasing memory + } + + parquet_writer.close() + .map_err(|e| PyErr::new::(format!("Failed to close Parquet writer: {}", e)))?; + + let completion_msg = if limit.is_some() { + format!("Conversion complete: {} records from {} files (limited) written to {}", + total_records, num_files, parquet_path) + } else { + format!("Conversion complete: {} records from {} files written to {}", + total_records, num_files, parquet_path) + }; + eprintln!("{}", completion_msg); + Ok(()) +} + +/// Create BAM schema with optional source_file column +fn create_bam_schema_with_source(include_sequence: bool, include_quality: bool, include_source_file: bool) -> Arc { + let mut fields = vec![ + Field::new("name", DataType::Utf8, false), + Field::new("chrom", DataType::Utf8, true), + Field::new("start", DataType::UInt32, true), + Field::new("end", DataType::UInt32, true), + Field::new("flags", DataType::UInt32, false), + ]; + + if include_sequence { + fields.push(Field::new("sequence", DataType::Utf8, true)); + } + + if include_quality { + fields.push(Field::new("quality_scores", DataType::Utf8, true)); + } + + if include_source_file { + fields.push(Field::new("source_file", DataType::Utf8, false)); + } + + Arc::new(Schema::new(fields)) +} + +/// Add source_file column to an existing RecordBatch +fn add_source_file_column(batch: &RecordBatch, source_filename: &str, schema: &Arc) -> PyResult { + let num_rows = batch.num_rows(); + let source_array: ArrayRef = Arc::new(StringArray::from(vec![source_filename; num_rows])); + + let mut columns: Vec = batch.columns().to_vec(); + columns.push(source_array); + + RecordBatch::try_new(schema.clone(), columns) + .map_err(|e| PyErr::new::(format!("Failed to add source_file column: {}", e))) +} + +#[pyfunction] +#[pyo3(signature = ( + bam_path, + arrow_ipc_path, batch_size = 50000, include_sequence = true, include_quality = true, diff --git a/src/expressions.rs b/src/expressions.rs index 9726dc6..ebc2777 100755 --- a/src/expressions.rs +++ b/src/expressions.rs @@ -235,6 +235,212 @@ fn extract_cigar_insertions_expr(inputs: &[Series]) -> PolarsResult { Ok(results.into_series()) } +// ============================================================================ +// CIGAR Alignment Expansion Functions +// ============================================================================ + +/// Expand CIGAR into aligned reference and query strings with dashes for gaps. +/// +/// Returns (aligned_ref, aligned_query) where: +/// - aligned_ref has dashes where the query has insertions +/// - aligned_query has dashes where the reference has deletions +/// - Soft-clipped bases appear as lowercase in the query +/// +/// CIGAR operations: +/// - M/=/X: match/mismatch - consume both ref and query (uppercase) +/// - I: insertion - add dashes to ref, consume query (uppercase) +/// - D/N: deletion/skip - consume ref, add dashes to query +/// - S: soft clip - add dashes to ref, consume query as LOWERCASE +/// - H: hard clip - skip (bases not in query sequence) +fn expand_cigar_alignment(ref_seq: &str, query_seq: &str, cigar: &str) -> Option<(String, String)> { + let ref_bytes = ref_seq.as_bytes(); + let query_bytes = query_seq.as_bytes(); + + let mut aligned_ref = String::with_capacity(ref_seq.len() + query_seq.len()); + let mut aligned_query = String::with_capacity(ref_seq.len() + query_seq.len()); + + let mut ref_pos: usize = 0; + let mut query_pos: usize = 0; + let mut num_buf = String::new(); + + for c in cigar.chars() { + if c.is_ascii_digit() { + num_buf.push(c); + } else { + if let Ok(len) = num_buf.parse::() { + match c { + 'M' | '=' | 'X' => { + // Match/mismatch: consume both ref and query (uppercase) + for _ in 0..len { + if ref_pos < ref_bytes.len() { + aligned_ref.push((ref_bytes[ref_pos] as char).to_ascii_uppercase()); + ref_pos += 1; + } + if query_pos < query_bytes.len() { + aligned_query.push((query_bytes[query_pos] as char).to_ascii_uppercase()); + query_pos += 1; + } + } + } + 'I' => { + // Insertion: add dashes to ref, consume query (uppercase) + for _ in 0..len { + aligned_ref.push('-'); + if query_pos < query_bytes.len() { + aligned_query.push((query_bytes[query_pos] as char).to_ascii_uppercase()); + query_pos += 1; + } + } + } + 'D' | 'N' => { + // Deletion/skip: consume ref, add dashes to query + for _ in 0..len { + if ref_pos < ref_bytes.len() { + aligned_ref.push((ref_bytes[ref_pos] as char).to_ascii_uppercase()); + ref_pos += 1; + } + aligned_query.push('-'); + } + } + 'S' => { + // Soft clip: add dashes to ref, consume query as LOWERCASE + for _ in 0..len { + aligned_ref.push('-'); + if query_pos < query_bytes.len() { + aligned_query.push((query_bytes[query_pos] as char).to_ascii_lowercase()); + query_pos += 1; + } + } + } + 'H' | 'P' => { + // Hard clip/padding: bases not in query, skip + } + _ => {} + } + } + num_buf.clear(); + } + } + + Some((aligned_ref, aligned_query)) +} + +/// Polars expression: Expand CIGAR to produce aligned reference string +/// +/// Takes 3 inputs: +/// - inputs[0]: reference sequence (scalar or column) +/// - inputs[1]: query sequence column +/// - inputs[2]: CIGAR string column +/// +/// Returns: aligned reference string with dashes where query has insertions +#[polars_expr(output_type=String)] +fn cigar_aligned_ref_expr(inputs: &[Series]) -> PolarsResult { + let ref_ca: &StringChunked = inputs[0].str()?; + let query_ca: &StringChunked = inputs[1].str()?; + let cigar_ca: &StringChunked = inputs[2].str()?; + + // Handle scalar reference (broadcast single value to all rows) + let is_scalar_ref = ref_ca.len() == 1 && query_ca.len() > 1; + let scalar_ref = if is_scalar_ref { + ref_ca.get(0) + } else { + None + }; + + let results: StringChunked = if is_scalar_ref { + // Broadcast scalar reference to all rows + query_ca + .into_iter() + .zip(cigar_ca.into_iter()) + .map(|(query_opt, cigar_opt)| { + match (scalar_ref, query_opt, cigar_opt) { + (Some(ref_seq), Some(query_seq), Some(cigar)) => { + expand_cigar_alignment(ref_seq, query_seq, cigar) + .map(|(aligned_ref, _)| aligned_ref) + } + _ => None, + } + }) + .collect() + } else { + // All inputs are columns of same length + ref_ca + .into_iter() + .zip(query_ca.into_iter()) + .zip(cigar_ca.into_iter()) + .map(|((ref_opt, query_opt), cigar_opt)| { + match (ref_opt, query_opt, cigar_opt) { + (Some(ref_seq), Some(query_seq), Some(cigar)) => { + expand_cigar_alignment(ref_seq, query_seq, cigar) + .map(|(aligned_ref, _)| aligned_ref) + } + _ => None, + } + }) + .collect() + }; + + Ok(results.into_series()) +} + +/// Polars expression: Expand CIGAR to produce aligned query string +/// +/// Takes 3 inputs: +/// - inputs[0]: reference sequence (scalar or column) +/// - inputs[1]: query sequence column +/// - inputs[2]: CIGAR string column +/// +/// Returns: aligned query string with dashes where reference has deletions +#[polars_expr(output_type=String)] +fn cigar_aligned_query_expr(inputs: &[Series]) -> PolarsResult { + let ref_ca: &StringChunked = inputs[0].str()?; + let query_ca: &StringChunked = inputs[1].str()?; + let cigar_ca: &StringChunked = inputs[2].str()?; + + // Handle scalar reference (broadcast single value to all rows) + let is_scalar_ref = ref_ca.len() == 1 && query_ca.len() > 1; + let scalar_ref = if is_scalar_ref { + ref_ca.get(0) + } else { + None + }; + + let results: StringChunked = if is_scalar_ref { + // Broadcast scalar reference to all rows + query_ca + .into_iter() + .zip(cigar_ca.into_iter()) + .map(|(query_opt, cigar_opt)| { + match (scalar_ref, query_opt, cigar_opt) { + (Some(ref_seq), Some(query_seq), Some(cigar)) => { + expand_cigar_alignment(ref_seq, query_seq, cigar) + .map(|(_, aligned_query)| aligned_query) + } + _ => None, + } + }) + .collect() + } else { + // All inputs are columns of same length + ref_ca + .into_iter() + .zip(query_ca.into_iter()) + .zip(cigar_ca.into_iter()) + .map(|((ref_opt, query_opt), cigar_opt)| { + match (ref_opt, query_opt, cigar_opt) { + (Some(ref_seq), Some(query_seq), Some(cigar)) => { + expand_cigar_alignment(ref_seq, query_seq, cigar) + .map(|(_, aligned_query)| aligned_query) + } + _ => None, + } + }) + .collect() + }; + + Ok(results.into_series()) +} + // ============================================================================ // Original CIGAR parsing (indel positions only, no sequences) // ============================================================================ diff --git a/src/lib.rs b/src/lib.rs index f0e1ff5..e0ba33d 100755 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,7 +33,7 @@ mod umi_score; use crate::single_fastq::{fastq_to_parquet}; use crate::fracture::{fracture_fasta, fracture_sequences}; -use crate::bam::{bam_to_parquet, bam_to_arrow_ipc, bam_to_arrow_ipc_parallel, bam_to_arrow_ipc_gzp_parallel}; +use crate::bam::{bam_to_parquet, bams_to_parquet, bam_to_arrow_ipc, bam_to_arrow_ipc_parallel, bam_to_arrow_ipc_gzp_parallel}; #[cfg(feature = "htslib")] use crate::bam::{bam_to_arrow_ipc_htslib_parallel, bam_to_arrow_ipc_htslib_multi_reader_parallel, bam_to_arrow_ipc_htslib_optimized, bam_to_arrow_ipc_htslib_mmap_parallel}; #[cfg(feature = "htslib")] @@ -487,6 +487,7 @@ fn rogtk(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_function(wrap_pyfunction!(fracture_fasta, m)?)?; m.add_function(wrap_pyfunction!(fracture_sequences, m)?)?; m.add_function(wrap_pyfunction!(bam_to_parquet, m)?)?; + m.add_function(wrap_pyfunction!(bams_to_parquet, m)?)?; m.add_function(wrap_pyfunction!(bam_to_arrow_ipc, m)?)?; m.add_function(wrap_pyfunction!(bam_to_arrow_ipc_parallel, m)?)?; m.add_function(wrap_pyfunction!(bam_to_arrow_ipc_gzp_parallel, m)?)?; From dda88a05cfd909572331f6947fd2859fe2856bcf Mon Sep 17 00:00:00 2001 From: tzeitim Date: Tue, 10 Feb 2026 11:41:51 +0200 Subject: [PATCH 5/5] bumped version 0.1.21 --- Cargo.toml | 2 +- pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8477306..c2f1f41 100755 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rogtk" -version = "0.1.20" +version = "0.1.21" authors = [ "Pedro Olivares Chauvet ", "Pedro Olivares Chauvet ", diff --git a/pyproject.toml b/pyproject.toml index c721221..14cd2db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,7 +8,7 @@ build-backend = "maturin" [project] name = "rogtk" -version = "0.1.20" +version = "0.1.21" description = "Rust-based genomics toolkit - high-performance backend for ogtk" authors = [ {name = "Pedro Olivares", email = "pedroy.final@gmail.com"}