diff --git a/Cargo.toml b/Cargo.toml index 22ae50a..21021c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.9.0-alpha8" +version = "0.9.0-beta0.1.1" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -19,15 +19,15 @@ s3-support = ["dep:rusty-s3", "dep:url", "dep:reqwest", "dep:walkdir", "worktabl [dependencies] async-trait = "0.1.89" convert_case = "0.6.0" -data_bucket = "=0.3.12" +data_bucket = "=0.3.13" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } # data_bucket = { path = "../DataBucket", version = "0.3.11" } derive_more = { version = "2.0.1", features = ["from", "error", "display", "debug", "into"] } eyre = "0.6.12" fastrand = "2.3.0" futures = "0.3.30" -indexset = { version = "=0.14.0", features = ["concurrent", "cdc", "multimap"] } -# indexset = { package = "wt-indexset", path = "../indexset", version = "0.12.10", features = ["concurrent", "cdc", "multimap"] } +indexset = { version = "=0.16.0", features = ["concurrent", "cdc", "multimap"] } +# indexset = { path = "../indexset", version = "0.15.0", features = ["concurrent", "cdc", "multimap"] } # indexset = { package = "wt-indexset", version = "=0.12.12", features = ["concurrent", "cdc", "multimap"] } lockfree = { version = "0.5.1" } log = "0.4.29" @@ -39,14 +39,14 @@ prettytable-rs = "^0.10" psc-nanoid = { version = "3.1.1", features = ["rkyv", "packed"] } rkyv = { version = "0.8.9", features = ["uuid-1"] } reqwest = { version = "0.12", optional = true, default-features = false, features = ["rustls-tls-webpki-roots", "charset", "http2"] } -rusty-s3 = { version = "0.9.0", optional = true } +rusty-s3 = { package = "rusty-s3-temp", version = "0.9.0", optional = true } smart-default = "0.7.1" tokio = { version = "1", features = ["full"] } tracing = "0.1" url = { version = "2", optional = true } uuid = { version = "1.10.0", features = ["v4", "v7"] } walkdir = { version = "2", optional = true } -worktable_codegen = { path = "codegen", version = "=0.9.0-alpha4" } +worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.0" } [dev-dependencies] chrono = "0.4.43" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 262ae69..8025a62 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.9.0-alpha4" +version = "0.9.0-beta0.1.0" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" @@ -18,4 +18,5 @@ rkyv = { version = "0.7.45" } syn = { version = "2.0.74", features = ["full"] } quote = "1.0.36" proc-macro2 = "1.0.86" -convert_case = "0.6.0" \ No newline at end of file +convert_case = "0.6.0" +indexmap = "2" \ No newline at end of file diff --git a/codegen/src/persist_table/generator/space_file/mod.rs b/codegen/src/persist_table/generator/space_file/mod.rs index b0db26b..b0f9e29 100644 --- a/codegen/src/persist_table/generator/space_file/mod.rs +++ b/codegen/src/persist_table/generator/space_file/mod.rs @@ -179,8 +179,6 @@ impl Generator { C: Clone + PersistenceConfig, { let mut page_id = 1; - println!("{:?}", self.data_info.inner); - let data = self.data.into_iter().map(|p| { let mut data = Data::from_data_page(p); data.set_page_id(page_id.into()); diff --git a/codegen/src/worktable/generator/index/cdc.rs b/codegen/src/worktable/generator/index/cdc.rs index 20edfff..2178333 100644 --- a/codegen/src/worktable/generator/index/cdc.rs +++ b/codegen/src/worktable/generator/index/cdc.rs @@ -17,6 +17,7 @@ impl Generator { let save_row_cdc = self.gen_save_row_cdc_index_fn(); let reinsert_row_cdc = self.gen_reinsert_row_cdc_index_fn(); let delete_row_cdc = self.gen_delete_row_cdc_index_fn(); + let delete_from_indexes_cdc = self.gen_delete_from_indexes_cdc_index_fn(); let process_difference_insert_cdc = self.gen_process_difference_insert_cdc_index_fn(); let process_difference_remove_cdc = self.gen_process_difference_remove_cdc_index_fn(); @@ -25,6 +26,7 @@ impl Generator { #reinsert_row_cdc #save_row_cdc #delete_row_cdc + #delete_from_indexes_cdc #process_difference_insert_cdc #process_difference_remove_cdc } @@ -50,13 +52,16 @@ impl Generator { let index_variant: TokenStream = camel_case_name.parse().unwrap(); quote! { + partial_events.#index_field_name = vec![]; let #index_field_name = if let Some(events) = self.#index_field_name.insert_checked_cdc(row.#i.clone(), link) { - events.into_iter().map(|ev| ev.into()).collect() + let evs: Vec<_> = events.into_iter().map(|ev| ev.into()).collect(); + partial_events.#index_field_name = evs.clone(); + evs } else { - return Err(IndexError::AlreadyExists { + return (partial_events, Err(IndexError::AlreadyExists { at: #available_index_ident::#index_variant, inserted_already: inserted_indexes.clone(), - }); + })); }; inserted_indexes.push(#available_index_ident::#index_variant); } @@ -70,15 +75,14 @@ impl Generator { .collect::>(); quote! { - fn save_row_cdc(&self, row: #row_type_ident, link: Link) -> Result<#events_ident, IndexError<#available_index_ident>> { + fn save_row_cdc(&self, row: #row_type_ident, link: Link) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { let mut inserted_indexes: Vec<#available_index_ident> = vec![]; + let mut partial_events = #events_ident::default(); #(#save_rows)* - core::result::Result::Ok( - #events_ident { - #(#idents,)* - } - ) + (#events_ident { + #(#idents,)* + }, Ok(())) } } } @@ -122,13 +126,16 @@ impl Generator { let insert = if idx.is_unique { quote! { let mut #index_field_name = if row_new.#i != row_old.#i { + partial_events.#index_field_name = vec![]; let #index_field_name: Vec<_> = if let Some(events) = self.#index_field_name.insert_checked_cdc(row_new.#i.clone(), link_new) { - events.into_iter().map(|ev| ev.into()).collect() + let evs: Vec<_> = events.into_iter().map(|ev| ev.into()).collect(); + partial_events.#index_field_name = evs.clone(); + evs } else { - return Err(IndexError::AlreadyExists { + return (partial_events, Err(IndexError::AlreadyExists { at: #available_index_ident::#index_variant, inserted_already: inserted_indexes.clone(), - }); + })); }; inserted_indexes.push(#available_index_ident::#index_variant); @@ -159,16 +166,15 @@ impl Generator { link_old: Link, row_new: #row_type_ident, link_new: Link - ) -> Result<#events_ident, IndexError<#available_index_ident>> { + ) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { let mut inserted_indexes: Vec<#available_index_ident> = vec![]; + let mut partial_events = #events_ident::default(); #(#insert_rows)* #(#remove_rows)* - core::result::Result::Ok( - #events_ident { - #(#idents,)* - } - ) + (#events_ident { + #(#idents,)* + }, Ok(())) } } } @@ -199,13 +205,82 @@ impl Generator { .collect::>(); quote! { - fn delete_row_cdc(&self, row: #row_type_ident, link: Link) -> Result<#events_ident, IndexError<#available_index_ident>> { + fn delete_row_cdc(&self, row: #row_type_ident, link: Link) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { #(#delete_rows)* - core::result::Result::Ok( - #events_ident { - #(#idents,)* + (#events_ident { + #(#idents,)* + }, Ok(())) + } + } + } + + fn gen_delete_from_indexes_cdc_index_fn(&self) -> TokenStream { + let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); + let row_type_ident = name_generator.get_row_type_ident(); + let events_ident = name_generator.get_space_secondary_index_events_ident(); + let available_index_ident = name_generator.get_available_indexes_ident(); + + let matches = self + .columns + .indexes + .iter() + .map(|(i, idx)| { + let index_field_name = &idx.name; + let camel_case_name = index_field_name + .to_string() + .from_case(Case::Snake) + .to_case(Case::Pascal); + let index_variant: TokenStream = camel_case_name.parse().unwrap(); + let type_str = self.columns + .columns_map + .get(i) + .unwrap() + .to_string(); + let row = if is_float(type_str.as_str()) { + quote! { + OrderedFloat(row.#i) } - ) + } else if type_str == "String" { + quote! { + row.#i.clone() + } + } else { + quote! { + row.#i + } + }; + + quote! { + #available_index_ident::#index_variant => { + let (_, events) = TableIndexCdc::remove_cdc(&self.#index_field_name, #row, link); + partial_events.#index_field_name = events.into_iter().map(|ev| ev.into()).collect(); + }, + } + }) + .collect::>(); + + let inner = if matches.is_empty() { + quote! {} + } else { + quote! { + for index in indexes { + match index { + #(#matches)* + } + } + } + }; + + quote! { + fn delete_from_indexes_cdc( + &self, + row: #row_type_ident, + link: Link, + indexes: Vec<#available_index_ident>, + ) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { + let mut partial_events = #events_ident::default(); + #inner + (partial_events, Ok(())) } } } @@ -261,13 +336,11 @@ impl Generator { &self, link: Link, difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> - ) -> Result<#events_ident, IndexError<#available_index_ident>> { + ) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { #(#process_difference_rows)* - core::result::Result::Ok( - #events_ident { - #(#idents,)* - } - ) + (#events_ident { + #(#idents,)* + }, Ok(())) } } } @@ -304,13 +377,16 @@ impl Generator { let mut events = vec![]; if let #avt_type_ident::#variant_ident(new) = &diff.new { let key_new = #new_value_expr; - if let Some(evs) = TableIndexCdc::insert_checked_cdc(&self.#index_field_name, key_new, link) { + partial_events.#index_field_name = vec![]; + if let Some(evs) = TableIndexCdc::insert_checked_cdc(&self.#index_field_name, key_new, link) { + let evs: Vec<_> = evs.into_iter().collect(); + partial_events.#index_field_name = evs.clone(); events.extend_from_slice(evs.as_ref()); } else { - return Err(IndexError::AlreadyExists { + return (partial_events, Err(IndexError::AlreadyExists { at: #available_index_ident::#index_variant, inserted_already: inserted_indexes.clone(), - }); + })); } inserted_indexes.push(#available_index_ident::#index_variant); } @@ -335,15 +411,14 @@ impl Generator { &self, link: Link, difference: std::collections::HashMap<&str, Difference<#avt_type_ident>> - ) -> Result<#events_ident, IndexError<#available_index_ident>> { + ) -> (#events_ident, Result<(), IndexError<#available_index_ident>>) { let mut inserted_indexes: Vec<#available_index_ident> = vec![]; + let mut partial_events = #events_ident::default(); #(#process_difference_insert_rows)* - core::result::Result::Ok( - #events_ident { - #(#idents,)* - } - ) + (#events_ident { + #(#idents,)* + }, Ok(())) } } } diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index c77a946..66c72b2 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -81,7 +81,8 @@ impl Generator { let process = if self.is_persist { quote! { - let secondary_keys_events = self.0.indexes.delete_row_cdc(row, link)?; + let (secondary_keys_events, res) = self.0.indexes.delete_row_cdc(row, link); + res?; let (_, primary_key_events) = self.0.primary_index.remove_cdc(pk.clone(), link); self.0.data.delete(link).map_err(WorkTableError::PagesError)?; let mut op: Operation< diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 5465e8a..50c2953 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -351,28 +351,45 @@ impl Generator { }; let process_difference = if self.is_persist { + let secondary_events_ident = name_generator.get_space_secondary_index_events_ident(); if idx_idents.is_some() { quote! { - let indexes_res = self.0.indexes.process_difference_insert_cdc(link, diffs.clone()); + let (secondary_events, indexes_res): (#secondary_events_ident, _) = self.0.indexes.process_difference_insert_cdc(link, diffs.clone()); if let Err(e) = indexes_res { return match e { IndexError::AlreadyExists { at, inserted_already, } => { - self.0.indexes - .delete_from_indexes(row_new.merge(row_old.clone()), link, inserted_already)?; + // Generate rollback CDC events for secondary indexes + let (rollback_secondary_events, _): (#secondary_events_ident, _) = self.0.indexes.delete_from_indexes_cdc( + row_new.merge(row_old.clone()), + link, + inserted_already + ); + + // Merge original partial insert events with rollback events + let mut merged_events = secondary_events.clone(); + merged_events.extend(rollback_secondary_events); + + // Create AcknowledgeOperation with all events + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(uuid::Uuid::now_v7()), + primary_key_events: vec![], // Updates don't modify primary key + secondary_keys_events: merged_events, + }); + self.1.apply_operation(ack_op); Err(WorkTableError::AlreadyExists(at.to_string_value())) } IndexError::NotFound => Err(WorkTableError::NotFound), }; } - let mut secondary_keys_events = indexes_res.expect("was just checked for correctness"); + let mut secondary_keys_events = secondary_events; } } else { quote! { - let secondary_keys_events = core::default::Default::default(); + let secondary_keys_events: #secondary_events_ident = core::default::Default::default(); } } } else if idx_idents.is_some() { @@ -408,7 +425,8 @@ impl Generator { let process_difference = if self.is_persist { if idx_idents.is_some() { quote! { - let secondary_keys_events_remove = self.0.indexes.process_difference_remove_cdc(link, diffs)?; + let (secondary_keys_events_remove, res) = self.0.indexes.process_difference_remove_cdc(link, diffs); + res?; op.extend_secondary_key_events(secondary_keys_events_remove); } } else { diff --git a/codegen/src/worktable/generator/table/impls.rs b/codegen/src/worktable/generator/table/impls.rs index 1b6d92f..172a85d 100644 --- a/codegen/src/worktable/generator/table/impls.rs +++ b/codegen/src/worktable/generator/table/impls.rs @@ -153,12 +153,15 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_type = name_generator.get_row_type_ident(); let primary_key_type = name_generator.get_primary_key_type_ident(); + let secondary_events_ident = name_generator.get_space_secondary_index_events_ident(); let insert = if self.is_persist { quote! { - let (pk, op) = self.0.insert_cdc(row)?; - self.1.apply_operation(op); - core::result::Result::Ok(pk) + let (op, res) = self.0.insert_cdc::<#secondary_events_ident>(row); + if let Some(op) = op { + self.1.apply_operation(op); + } + res } } else { quote! { @@ -177,12 +180,15 @@ impl Generator { let name_generator = WorktableNameGenerator::from_table_name(self.name.to_string()); let row_type = name_generator.get_row_type_ident(); let primary_key_type = name_generator.get_primary_key_type_ident(); + let secondary_events_ident = name_generator.get_space_secondary_index_events_ident(); let reinsert = if self.is_persist { quote! { - let (pk, op) = self.0.reinsert_cdc(row_old, row_new)?; - self.1.apply_operation(op); - core::result::Result::Ok(pk) + let (op, res) = self.0.reinsert_cdc::<#secondary_events_ident>(row_old, row_new); + if let Some(op) = op { + self.1.apply_operation(op); + } + res } } else { quote! { diff --git a/codegen/src/worktable/model/column.rs b/codegen/src/worktable/model/column.rs index 4090c8c..e0de2df 100644 --- a/codegen/src/worktable/model/column.rs +++ b/codegen/src/worktable/model/column.rs @@ -1,3 +1,4 @@ +use indexmap::IndexMap; use std::collections::HashMap; use crate::worktable::model::GeneratorType; @@ -15,7 +16,7 @@ pub struct Columns { pub is_sized: bool, pub columns_map: HashMap, pub field_positions: HashMap, - pub indexes: HashMap, + pub indexes: IndexMap, pub primary_keys: Vec, pub generator_type: GeneratorType, } diff --git a/codegen/src/worktable/parser/index.rs b/codegen/src/worktable/parser/index.rs index 90c9310..e40c920 100644 --- a/codegen/src/worktable/parser/index.rs +++ b/codegen/src/worktable/parser/index.rs @@ -1,11 +1,11 @@ use crate::worktable::Parser; use crate::worktable::model::Index; +use indexmap::IndexMap; use proc_macro2::{Delimiter, Ident, TokenTree}; -use std::collections::HashMap; use syn::spanned::Spanned; impl Parser { - pub fn parse_indexes(&mut self) -> syn::Result> { + pub fn parse_indexes(&mut self) -> syn::Result> { let ident = self.input_iter.next().ok_or(syn::Error::new( self.input.span(), "Expected `indexes` field in declaration", @@ -44,7 +44,7 @@ impl Parser { let mut parser = Parser::new(tt); - let mut rows = HashMap::new(); + let mut rows = IndexMap::new(); let mut ind = true; while ind { diff --git a/src/features/s3_support.rs b/src/features/s3_support.rs index 5e3e54c..c67d910 100644 --- a/src/features/s3_support.rs +++ b/src/features/s3_support.rs @@ -106,9 +106,7 @@ where let region = config.region.clone().unwrap_or_else(|| "auto".to_string()); let bucket = Bucket::new(endpoint, UrlStyle::Path, config.bucket_name.clone(), region)?; - let client = Client::builder() - .timeout(Duration::from_secs(30)) - .build()?; + let client = Client::builder().timeout(Duration::from_secs(30)).build()?; Ok((bucket, credentials, client)) } @@ -188,11 +186,7 @@ where action.with_delimiter("/"); let url = action.sign(Duration::from_secs(3600)); - let response = client - .get(url) - .send() - .await? - .error_for_status()?; + let response = client.get(url).send().await?.error_for_status()?; let text = response.text().await?; let parsed = ListObjectsV2::parse_response(&text)?; @@ -221,11 +215,7 @@ where let action = bucket.get_object(Some(credentials), s3_key); let url = action.sign(Duration::from_secs(3600)); - let response = client - .get(url) - .send() - .await? - .error_for_status()?; + let response = client.get(url).send().await?.error_for_status()?; let content = response.bytes().await?; tokio::fs::write(&local_path, content).await?; @@ -315,4 +305,4 @@ where fn config(&self) -> &Self::Config { &self.config } -} \ No newline at end of file +} diff --git a/src/index/primary_index.rs b/src/index/primary_index.rs index 8bd9440..7d53dec 100644 --- a/src/index/primary_index.rs +++ b/src/index/primary_index.rs @@ -12,7 +12,7 @@ use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use crate::util::OffsetEqLink; -use crate::{IndexMap, TableIndex, TableIndexCdc, convert_change_events}; +use crate::{convert_change_events, IndexMap, TableIndex, TableIndexCdc}; /// Combined storage for primary and reverse indexes. /// @@ -66,13 +66,7 @@ where fn insert_checked(&self, value: PrimaryKey, link: Link) -> Option<()> { let offset_link = OffsetEqLink(link); self.pk_map.checked_insert(value.clone(), offset_link)?; - if self - .reverse_pk_map - .checked_insert(offset_link, value) - .is_none() - { - return None; - } + self.reverse_pk_map.checked_insert(offset_link, value)?; Some(()) } diff --git a/src/index/table_secondary_index/cdc.rs b/src/index/table_secondary_index/cdc.rs index 88f7e7c..27cd560 100644 --- a/src/index/table_secondary_index/cdc.rs +++ b/src/index/table_secondary_index/cdc.rs @@ -9,29 +9,35 @@ pub trait TableSecondaryIndexCdc Result>; + ) -> (SecondaryEvents, Result<(), IndexError>); fn reinsert_row_cdc( &self, row_old: Row, link_old: Link, row_new: Row, link_new: Link, - ) -> Result>; + ) -> (SecondaryEvents, Result<(), IndexError>); fn delete_row_cdc( &self, row: Row, link: Link, - ) -> Result>; + ) -> (SecondaryEvents, Result<(), IndexError>); + fn delete_from_indexes_cdc( + &self, + row: Row, + link: Link, + indexes: Vec, + ) -> (SecondaryEvents, Result<(), IndexError>); fn process_difference_insert_cdc( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result>; + ) -> (SecondaryEvents, Result<(), IndexError>); fn process_difference_remove_cdc( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result>; + ) -> (SecondaryEvents, Result<(), IndexError>); } impl @@ -39,8 +45,8 @@ impl where T: TableSecondaryIndex, { - fn save_row_cdc(&self, row: Row, link: Link) -> Result<(), IndexError> { - self.save_row(row, link) + fn save_row_cdc(&self, row: Row, link: Link) -> ((), Result<(), IndexError>) { + ((), self.save_row(row, link)) } fn reinsert_row_cdc( @@ -49,27 +55,36 @@ where link_old: Link, row_new: Row, link_new: Link, - ) -> Result<(), IndexError> { - self.reinsert_row(row_old, link_old, row_new, link_new) + ) -> ((), Result<(), IndexError>) { + ((), self.reinsert_row(row_old, link_old, row_new, link_new)) + } + + fn delete_row_cdc(&self, row: Row, link: Link) -> ((), Result<(), IndexError>) { + ((), self.delete_row(row, link)) } - fn delete_row_cdc(&self, row: Row, link: Link) -> Result<(), IndexError> { - self.delete_row(row, link) + fn delete_from_indexes_cdc( + &self, + row: Row, + link: Link, + indexes: Vec, + ) -> ((), Result<(), IndexError>) { + ((), self.delete_from_indexes(row, link, indexes)) } fn process_difference_insert_cdc( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result<(), IndexError> { - self.process_difference_insert(link, differences) + ) -> ((), Result<(), IndexError>) { + ((), self.process_difference_insert(link, differences)) } fn process_difference_remove_cdc( &self, link: Link, differences: HashMap<&str, Difference>, - ) -> Result<(), IndexError> { - self.process_difference_remove(link, differences) + ) -> ((), Result<(), IndexError>) { + ((), self.process_difference_remove(link, differences)) } } diff --git a/src/lib.rs b/src/lib.rs index 7015e18..03f3f9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -31,7 +31,7 @@ pub mod prelude { pub use crate::lock::{LockGuard, LockMap}; pub use crate::mem_stat::MemStat; pub use crate::persistence::{ - DeleteOperation, DiskConfig, DiskPersistenceEngine, IndexTableOfContents, InsertOperation, + AcknowledgeOperation, DeleteOperation, DiskConfig, DiskPersistenceEngine, IndexTableOfContents, InsertOperation, Operation, OperationId, PersistedWorkTable, PersistenceConfig, PersistenceEngine, PersistenceTask, SpaceData, SpaceDataOps, SpaceIndex, SpaceIndexOps, SpaceIndexUnsized, SpaceSecondaryIndexOps, UpdateOperation, map_index_pages_to_toc_and_general, diff --git a/src/persistence/engine.rs b/src/persistence/engine.rs index c498557..eb49148 100644 --- a/src/persistence/engine.rs +++ b/src/persistence/engine.rs @@ -162,6 +162,10 @@ where .process_change_events(delete.secondary_keys_events) .await } + Operation::Acknowledge(_) => { + // Acknowledge operations carry orphaned events for sequence continuity. + Ok(()) + } } } @@ -194,7 +198,6 @@ where } if let Some(pk_gen_state_update) = batch_op.get_pk_gen_state()? { - println!("PK gen state update: {:?}", pk_gen_state_update); let info = self.data.get_mut_info(); info.inner.pk_gen_state = pk_gen_state_update; self.data.save_info().await?; diff --git a/src/persistence/mod.rs b/src/persistence/mod.rs index b4738b3..7d889e2 100644 --- a/src/persistence/mod.rs +++ b/src/persistence/mod.rs @@ -5,7 +5,7 @@ use crate::persistence::operation::BatchOperation; pub use engine::DiskConfig; pub use engine::DiskPersistenceEngine; pub use operation::{ - DeleteOperation, InsertOperation, Operation, OperationId, OperationType, UpdateOperation, + AcknowledgeOperation, DeleteOperation, InsertOperation, Operation, OperationId, OperationType, UpdateOperation, validate_events, }; pub use space::{ diff --git a/src/persistence/operation/batch.rs b/src/persistence/operation/batch.rs index f144054..a841197 100644 --- a/src/persistence/operation/batch.rs +++ b/src/persistence/operation/batch.rs @@ -352,15 +352,30 @@ where } pub fn get_indexes_evs(&self) -> eyre::Result<(BatchChangeEvent, SecondaryEvents)> { - if let Some(evs) = &self.prepared_index_evs { - Ok((evs.primary_evs.clone(), evs.secondary_evs.clone())) - } else { - tracing::warn!( - "Index events are not validated and it can cause errors while applying batch" - ); - let evs = self.prepare_indexes_evs()?; - Ok((evs.primary_evs.clone(), evs.secondary_evs.clone())) + let prepared_evs = self + .prepared_index_evs + .as_ref() + .expect("prepared_index_evs should be set by validate() before calling get_indexes_evs"); + + // Clone the prepared events (already sorted in validate()) + let mut primary_evs = prepared_evs.primary_evs.clone(); + let mut secondary_evs = prepared_evs.secondary_evs.clone(); + + // Remove events from Acknowledge operations + for op in &self.ops { + if let Operation::Acknowledge(ack) = op { + // Remove primary events from ack + for ack_ev in &ack.primary_key_events { + if let Ok(pos) = primary_evs.binary_search_by(|ev| ev.id().cmp(&ack_ev.id())) { + primary_evs.remove(pos); + } + } + // Remove secondary events from ack using the trait's remove method + secondary_evs.remove(&ack.secondary_keys_events); + } } + + Ok((primary_evs, secondary_evs)) } pub fn get_batch_data_op(&self) -> eyre::Result { diff --git a/src/persistence/operation/mod.rs b/src/persistence/operation/mod.rs index cd4e549..d918f3a 100644 --- a/src/persistence/operation/mod.rs +++ b/src/persistence/operation/mod.rs @@ -15,7 +15,7 @@ use uuid::Uuid; use crate::prelude::From; pub use batch::{BatchInnerRow, BatchInnerWorkTable, BatchOperation, PosByOpIdQuery}; -pub use operation::{DeleteOperation, InsertOperation, Operation, UpdateOperation}; +pub use operation::{AcknowledgeOperation, DeleteOperation, InsertOperation, Operation, UpdateOperation}; pub use util::validate_events; /// Represents page's identifier. Is unique within the table bounds @@ -95,6 +95,7 @@ pub enum OperationType { Insert, Update, Delete, + Acknowledge, } impl SizeMeasurable for OperationType { diff --git a/src/persistence/operation/operation.rs b/src/persistence/operation/operation.rs index e00ad97..44d8589 100644 --- a/src/persistence/operation/operation.rs +++ b/src/persistence/operation/operation.rs @@ -13,6 +13,7 @@ pub enum Operation { Insert(InsertOperation), Update(UpdateOperation), Delete(DeleteOperation), + Acknowledge(AcknowledgeOperation), } impl Hash @@ -44,6 +45,7 @@ impl Operation::Insert(_) => OperationType::Insert, Operation::Update(_) => OperationType::Update, Operation::Delete(_) => OperationType::Delete, + Operation::Acknowledge(_) => OperationType::Acknowledge, } } @@ -52,6 +54,7 @@ impl Operation::Insert(insert) => insert.id, Operation::Update(update) => update.id, Operation::Delete(delete) => delete.id, + Operation::Acknowledge(ack) => ack.id, } } @@ -60,6 +63,7 @@ impl Operation::Insert(insert) => insert.link, Operation::Update(update) => update.link, Operation::Delete(delete) => delete.link, + Operation::Acknowledge(_) => Link::default(), } } @@ -68,6 +72,7 @@ impl Operation::Insert(insert) => Some(&insert.bytes), Operation::Update(update) => Some(&update.bytes), Operation::Delete(_) => None, + Operation::Acknowledge(_) => None, } } @@ -76,6 +81,7 @@ impl Operation::Insert(insert) => Some(&insert.primary_key_events), Operation::Update(_) => None, Operation::Delete(delete) => Some(&delete.primary_key_events), + Operation::Acknowledge(ack) => Some(&ack.primary_key_events), } } @@ -84,6 +90,7 @@ impl Operation::Insert(insert) => &insert.secondary_keys_events, Operation::Update(update) => &update.secondary_keys_events, Operation::Delete(delete) => &delete.secondary_keys_events, + Operation::Acknowledge(ack) => &ack.secondary_keys_events, } } @@ -95,6 +102,7 @@ impl Operation::Insert(insert) => insert.secondary_keys_events.extend(evs), Operation::Update(update) => update.secondary_keys_events.extend(evs), Operation::Delete(delete) => delete.secondary_keys_events.extend(evs), + Operation::Acknowledge(ack) => ack.secondary_keys_events.extend(evs), } } @@ -103,6 +111,7 @@ impl Operation::Insert(insert) => Some(&insert.pk_gen_state), Operation::Update(_) => None, Operation::Delete(_) => None, + Operation::Acknowledge(_) => None, } } } @@ -132,3 +141,10 @@ pub struct DeleteOperation { pub secondary_keys_events: SecondaryKeys, pub link: Link, } + +#[derive(Clone, Debug)] +pub struct AcknowledgeOperation { + pub id: OperationId, + pub primary_key_events: Vec>>, + pub secondary_keys_events: SecondaryKeys, +} diff --git a/src/persistence/task.rs b/src/persistence/task.rs index 774416a..8e3807e 100644 --- a/src/persistence/task.rs +++ b/src/persistence/task.rs @@ -2,18 +2,18 @@ use std::collections::{HashMap, HashSet}; use std::fmt::Debug; use std::hash::Hash; use std::marker::PhantomData; -use std::sync::Arc; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::sync::Arc; use std::time::Duration; use data_bucket::page::PageId; use tokio::sync::Notify; use worktable_codegen::worktable; -use crate::persistence::PersistenceEngine; use crate::persistence::operation::{ BatchInnerRow, BatchInnerWorkTable, BatchOperation, OperationId, PosByOpIdQuery, }; +use crate::persistence::PersistenceEngine; use crate::prelude::*; use crate::util::OptimizedVec; @@ -71,7 +71,7 @@ where self.primary_id = another.primary_id } for (index, id) in another.secondary_ids { - if id != IndexChangeEventId::default() { + if id != IndexChangeEventId::default() || !self.secondary_ids.contains_key(&index) { self.secondary_ids.insert(index, id); } } diff --git a/src/table/mod.rs b/src/table/mod.rs index d718f01..b4a9c66 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -3,13 +3,14 @@ pub mod system_info; pub mod vacuum; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; -use crate::persistence::{InsertOperation, Operation}; +use crate::persistence::{AcknowledgeOperation, InsertOperation, Operation}; use crate::prelude::{Link, LockMap, OperationId, PrimaryKeyGeneratorState}; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; use crate::util::OffsetEqLink; use crate::{ AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, TableIndexCdc, TableRow, - TableSecondaryIndex, TableSecondaryIndexCdc, convert_change_events, in_memory, + TableSecondaryIndex, TableSecondaryIndexCdc, TableSecondaryIndexEventsOps, + convert_change_events, in_memory, }; use data_bucket::INNER_PAGE_SIZE; use derive_more::{Display, Error, From}; @@ -234,13 +235,10 @@ where pub fn insert_cdc( &self, row: Row, - ) -> Result< - ( - PrimaryKey, - Operation<::State, PrimaryKey, SecondaryEvents>, - ), - WorkTableError, - > + ) -> ( + Option::State, PrimaryKey, SecondaryEvents>>, + Result, + ) where Row: Archive + Clone @@ -253,59 +251,109 @@ where >, <::WrappedRow as Archive>::Archived: ArchivedRowWrapper, PrimaryKey: Clone, + SecondaryEvents: Debug + Default + Clone + TableSecondaryIndexEventsOps, SecondaryIndexes: TableSecondaryIndex + TableSecondaryIndexCdc, PkGen: PrimaryKeyGeneratorState, + ::State: Debug, AvailableIndexes: Debug + AvailableIndex, { let pk = row.get_primary_key().clone(); - let (link, _) = self - .data - .insert_cdc(row.clone()) - .map_err(WorkTableError::PagesError)?; + + let (link, _) = match self.data.insert_cdc(row.clone()) { + Ok(result) => result, + Err(e) => return (None, Err(WorkTableError::PagesError(e))), + }; + let primary_key_events = self.primary_index.insert_checked_cdc(pk.clone(), link); let Some(primary_key_events) = primary_key_events else { - self.data.delete(link).map_err(WorkTableError::PagesError)?; - return Err(WorkTableError::AlreadyExists("Primary".to_string())); + if let Err(e) = self.data.delete(link) { + return (None, Err(WorkTableError::PagesError(e))); + } + return ( + None, + Err(WorkTableError::AlreadyExists("Primary".to_string())), + ); }; let primary_key_events = convert_change_events(primary_key_events); - let indexes_res = self.indexes.save_row_cdc(row.clone(), link); + + let (secondary_events, indexes_res) = self.indexes.save_row_cdc(row.clone(), link); if let Err(e) = indexes_res { - return match e { + let (ack_op, error) = match e { IndexError::AlreadyExists { at, inserted_already, } => { - self.data.delete(link).map_err(WorkTableError::PagesError)?; - self.primary_index.remove(&pk, link); - self.indexes - .delete_from_indexes(row, link, inserted_already)?; + let (_, rollback_pk_events) = self.primary_index.remove_cdc(pk.clone(), link); + let rollback_pk_events = convert_change_events(rollback_pk_events); - Err(WorkTableError::AlreadyExists(at.to_string_value())) + let (rollback_secondary_events, _) = + self.indexes + .delete_from_indexes_cdc(row.clone(), link, inserted_already); + + let mut merged_primary_events = primary_key_events.clone(); + merged_primary_events.extend(rollback_pk_events); + + let mut merged_secondary_events = secondary_events.clone(); + merged_secondary_events.extend(rollback_secondary_events); + + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: merged_primary_events, + secondary_keys_events: merged_secondary_events, + }); + + if let Err(e) = self.data.delete(link) { + (ack_op, WorkTableError::PagesError(e)) + } else { + (ack_op, WorkTableError::AlreadyExists(at.to_string_value())) + } + } + IndexError::NotFound => { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + (ack_op, WorkTableError::NotFound) } - IndexError::NotFound => Err(WorkTableError::NotFound), }; + return (Some(ack_op), Err(error)); } + unsafe { - self.data - .with_mut_ref(link, |r| r.unghost()) - .map_err(WorkTableError::PagesError)? + if let Err(e) = self.data.with_mut_ref(link, |r| r.unghost()) { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + return (Some(ack_op), Err(WorkTableError::PagesError(e))); + } } - let bytes = self - .data - .select_raw(link) - .map_err(WorkTableError::PagesError)?; + + let bytes = match self.data.select_raw(link) { + Ok(bytes) => bytes, + Err(e) => { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + return (Some(ack_op), Err(WorkTableError::PagesError(e))); + } + }; let op = Operation::Insert(InsertOperation { id: OperationId::Single(Uuid::now_v7()), pk_gen_state: self.pk_gen.get_state(), primary_key_events, - secondary_keys_events: indexes_res.expect("was checked before"), + secondary_keys_events: secondary_events, bytes, link, }); - Ok((pk, op)) + (Some(op), Ok(pk)) } /// Reinserts provided row with updating indexes and saving it's data in new @@ -389,13 +437,10 @@ where &self, row_old: Row, row_new: Row, - ) -> Result< - ( - PrimaryKey, - Operation<::State, PrimaryKey, SecondaryEvents>, - ), - WorkTableError, - > + ) -> ( + Option::State, PrimaryKey, SecondaryEvents>>, + Result, + ) where Row: Archive + Clone @@ -408,6 +453,7 @@ where >, <::WrappedRow as Archive>::Archived: ArchivedRowWrapper, PrimaryKey: Clone, + SecondaryEvents: Debug + Default + Clone + TableSecondaryIndexEventsOps, SecondaryIndexes: TableSecondaryIndex + TableSecondaryIndexCdc, PkGen: PrimaryKeyGeneratorState, @@ -415,65 +461,117 @@ where { let pk = row_new.get_primary_key().clone(); if pk != row_old.get_primary_key() { - return Err(WorkTableError::PrimaryUpdateTry); + return (None, Err(WorkTableError::PrimaryUpdateTry)); } - let old_link = self - .primary_index - .pk_map - .get(&pk) - .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound)?; - let (new_link, _) = self - .data - .insert_cdc(row_new.clone()) - .map_err(WorkTableError::PagesError)?; + + // Get old link - if not found, no events to acknowledge + let old_link = match self.primary_index.pk_map.get(&pk) { + Some(v) => v.get().value.into(), + None => return (None, Err(WorkTableError::NotFound)), + }; + + // Insert new data - if this fails, no events to acknowledge + let (new_link, _) = match self.data.insert_cdc(row_new.clone()) { + Ok(result) => result, + Err(e) => return (None, Err(WorkTableError::PagesError(e))), + }; + + // Unghost the new data - if this fails, we have no events yet to acknowledge unsafe { - self.data - .with_mut_ref(new_link, |r| r.unghost()) - .map_err(WorkTableError::PagesError)? + if let Err(e) = self.data.with_mut_ref(new_link, |r| r.unghost()) { + return (None, Err(WorkTableError::PagesError(e))); + } } + + // Update primary index let (_, primary_key_events) = self.primary_index.insert_cdc(pk.clone(), new_link); let primary_key_events = convert_change_events(primary_key_events); - let indexes_res = + + // Update secondary indexes + let (secondary_events, indexes_res) = self.indexes .reinsert_row_cdc(row_old, old_link, row_new.clone(), new_link); + if let Err(e) = indexes_res { - return match e { + let (ack_op, error) = match e { IndexError::AlreadyExists { at, inserted_already, } => { - self.primary_index.insert(pk.clone(), old_link); - self.indexes - .delete_from_indexes(row_new, new_link, inserted_already)?; - self.data - .delete(new_link) - .map_err(WorkTableError::PagesError)?; + // Rollback: generate CDC events for restoring old link in primary index + let (_, rollback_pk_events) = + self.primary_index.insert_cdc(pk.clone(), old_link); + let rollback_pk_events = convert_change_events(rollback_pk_events); - Err(WorkTableError::AlreadyExists(at.to_string_value())) + // Rollback: generate CDC events for cleaning up new secondary indexes + let (rollback_secondary_events, _) = + self.indexes + .delete_from_indexes_cdc(row_new, new_link, inserted_already); + + // Merge original partial insert events with rollback events + let mut merged_primary_events = primary_key_events.clone(); + merged_primary_events.extend(rollback_pk_events); + + let mut merged_secondary_events = secondary_events.clone(); + merged_secondary_events.extend(rollback_secondary_events); + + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: merged_primary_events, + secondary_keys_events: merged_secondary_events, + }); + + if let Err(e) = self.data.delete(new_link) { + (ack_op, WorkTableError::PagesError(e)) + } else { + (ack_op, WorkTableError::AlreadyExists(at.to_string_value())) + } + } + IndexError::NotFound => { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + (ack_op, WorkTableError::NotFound) } - IndexError::NotFound => Err(WorkTableError::NotFound), }; + return (Some(ack_op), Err(error)); } - self.data - .delete(old_link) - .map_err(WorkTableError::PagesError)?; - let bytes = self - .data - .select_raw(new_link) - .map_err(WorkTableError::PagesError)?; + // Delete old data + if let Err(e) = self.data.delete(old_link) { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + return (Some(ack_op), Err(WorkTableError::PagesError(e))); + } + + // Get raw bytes for persistence + let bytes = match self.data.select_raw(new_link) { + Ok(bytes) => bytes, + Err(e) => { + let ack_op = Operation::Acknowledge(AcknowledgeOperation { + id: OperationId::Single(Uuid::now_v7()), + primary_key_events: primary_key_events.clone(), + secondary_keys_events: secondary_events.clone(), + }); + return (Some(ack_op), Err(WorkTableError::PagesError(e))); + } + }; let op = Operation::Insert(InsertOperation { id: OperationId::Single(Uuid::now_v7()), pk_gen_state: self.pk_gen.get_state(), primary_key_events, - secondary_keys_events: indexes_res.expect("was checked just before"), + secondary_keys_events: secondary_events, bytes, link: new_link, }); - Ok((pk, op)) + (Some(op), Ok(pk)) } } diff --git a/tests/persistence/failure/insert.rs b/tests/persistence/failure/insert.rs new file mode 100644 index 0000000..b8bbe38 --- /dev/null +++ b/tests/persistence/failure/insert.rs @@ -0,0 +1,563 @@ +use super::*; +use crate::remove_dir_if_exists; + +#[test] +fn test_insert_two_indexes_first_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_two_first", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_two_first".to_string()).await; + + // Phase 1: Setup - insert a row to populate first unique index + let existing_a = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.unique_a + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: existing_a, + unique_b: 300, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + WorkTableError::AlreadyExists(_) + )); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!( + wait_result.is_ok(), + "BUG: persistence blocked after insert failure on first index!" + ); + } + + // Phase 3: Verify state + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + // Can insert new valid row + let new_row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 4000, + unique_b: 4001, + }; + assert!(table.insert(new_row).is_ok()); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_insert_two_indexes_second_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_two_second", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_two_second".to_string()).await; + + // Phase 1: Setup - insert a row to populate second unique index + let existing_b = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.unique_b + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 300, + unique_b: existing_b, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + WorkTableError::AlreadyExists(_) + )); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!( + wait_result.is_ok(), + "BUG: persistence blocked after insert failure on second index!" + ); + } + + // Phase 3: Verify state - check rollback worked + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 300, + unique_b: 301, + }; + assert!( + table.insert(new_row).is_ok(), + "BUG: orphaned entry in unique_a_idx!" + ); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_insert_three_indexes_first_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_three_first", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_three_first".to_string()).await; + + // Phase 1: Setup + let existing_a = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.unique_a + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: existing_a, + unique_b: 400, + unique_c: 500, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 4000, + unique_b: 4001, + unique_c: 4002, + }; + assert!(table.insert(new_row).is_ok()); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_insert_three_indexes_middle_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_three_middle", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_three_middle".to_string()).await; + + // Phase 1: Setup + let existing_b = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.unique_b + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: existing_b, + unique_c: 500, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify - first index value should be rolled back + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 201, + unique_c: 500, + }; + assert!( + table.insert(new_row).is_ok(), + "BUG: orphaned entry in unique_a_idx!" + ); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_insert_three_indexes_last_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_three_last", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_three_last".to_string()).await; + + // Phase 1: Setup + let existing_c = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.unique_c + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 500, + unique_c: existing_c, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify - first two index values should be rolled back + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 500, + unique_c: 301, + }; + assert!( + table.insert(new_row).is_ok(), + "BUG: orphaned entries in indexes!" + ); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_insert_primary_duplicate() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/insert_primary_dup", + PrimaryOnlyWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/insert_primary_dup".to_string()).await; + + // Phase 1: Setup + let existing_pk = { + let engine = PrimaryOnlyPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = PrimaryOnlyWorkTable::load(engine).await.unwrap(); + + let row = PrimaryOnlyRow { + id: table.get_next_pk().0, + data: 100, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.id + }; + + // Phase 2: 2 valid inserts -> failure -> valid insert -> wait_for_ops + { + let engine = PrimaryOnlyPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = PrimaryOnlyWorkTable::load(engine).await.unwrap(); + + let valid_row1 = PrimaryOnlyRow { + id: table.get_next_pk().0, + data: 1000, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = PrimaryOnlyRow { + id: table.get_next_pk().0, + data: 2000, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let failing_row = PrimaryOnlyRow { + id: existing_pk, // Duplicate PK + data: 200, + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + + let valid_row3 = PrimaryOnlyRow { + id: table.get_next_pk().0, + data: 3000, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok()); + } + + // Phase 3: Verify + { + let engine = PrimaryOnlyPersistenceEngine::new(config).await.unwrap(); + let table = PrimaryOnlyWorkTable::load(engine).await.unwrap(); + + let original = table.select(existing_pk).unwrap(); + assert_eq!(original.data, 100); + table.wait_for_ops().await; + } + }); +} diff --git a/tests/persistence/failure/mod.rs b/tests/persistence/failure/mod.rs new file mode 100644 index 0000000..58e0ad5 --- /dev/null +++ b/tests/persistence/failure/mod.rs @@ -0,0 +1,94 @@ +use std::time::Duration; +use tokio::time::timeout; +use worktable::prelude::*; +use worktable::worktable; + +mod insert; +mod reinsert; +mod update; +mod update_non_unique; +mod update_unsized; + +worktable!( + name: TwoUniqueIdx, + persist: true, + columns: { + id: u64 primary_key autoincrement, + unique_a: u64, + unique_b: u64, + }, + indexes: { + unique_a_idx: unique_a unique, + unique_b_idx: unique_b unique, + }, +); + +worktable!( + name: ThreeUniqueIdx, + persist: true, + columns: { + id: u64 primary_key autoincrement, + unique_a: u64, + unique_b: u64, + unique_c: u64, + }, + indexes: { + unique_a_idx: unique_a unique, + unique_b_idx: unique_b unique, + unique_c_idx: unique_c unique, + }, +); + +worktable!( + name: MixedIdx, + persist: true, + columns: { + id: u64 primary_key autoincrement, + category: u64, + unique_value: u64, + data: u64, + }, + indexes: { + category_idx: category, + unique_value_idx: unique_value unique, + }, + queries: { + update: { UniqueValueByCategory(unique_value) by category }, + }, +); + +worktable!( + name: PrimaryOnly, + persist: true, + columns: { + id: u64 primary_key autoincrement, + data: u64, + }, +); + +worktable!( + name: NonUniqueUnsized, + persist: true, + columns: { + id: u64 primary_key autoincrement, + category: u64, + unique_value: u64, + name: String, + }, + indexes: { + category_idx: category, + unique_value_idx: unique_value unique, + }, + queries: { + update: { NameAndValueByCategory(name, unique_value) by category }, + }, +); + +pub fn get_runtime() -> tokio::runtime::Runtime { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap() +} diff --git a/tests/persistence/failure/reinsert.rs b/tests/persistence/failure/reinsert.rs new file mode 100644 index 0000000..7de7ab1 --- /dev/null +++ b/tests/persistence/failure/reinsert.rs @@ -0,0 +1,602 @@ +use super::*; +use crate::remove_dir_if_exists; + +#[test] +fn test_reinsert_pk_mismatch() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_pk_mismatch", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_pk_mismatch".to_string()).await; + + // Phase 1: Setup + let existing_pk = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.id + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let new_row = TwoUniqueIdxRow { + id: existing_pk + 1, + unique_a: 100, + unique_b: 200, + }; + + let result = table.update(new_row).await; + assert!(result.is_err()); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok()); + } + + // Phase 3: Verify + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let original = table.select(existing_pk).unwrap(); + assert_eq!(original.unique_a, 100); + assert_eq!(original.unique_b, 200); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_reinsert_two_indexes_first_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_two_first", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_two_first".to_string()).await; + + // Phase 1: Setup - insert two rows + let (row1_pk, row2_pk, conflict_a) = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 300, + unique_b: 400, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + (row1.id, row2.id, row1.unique_a) + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let updated_row = TwoUniqueIdxRow { + id: row2_pk, + unique_a: conflict_a, + unique_b: 500, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!( + wait_result.is_ok(), + "BUG: persistence blocked after reinsert failure!" + ); + } + + // Phase 3: Verify + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + // Both rows unchanged + let row1 = table.select(row1_pk).unwrap(); + assert_eq!(row1.unique_a, 100); + assert_eq!(row1.unique_b, 200); + + let row2 = table.select(row2_pk).unwrap(); + assert_eq!(row2.unique_a, 300); + assert_eq!(row2.unique_b, 400); + + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_reinsert_two_indexes_second_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_two_second", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_two_second".to_string()).await; + + // Phase 1: Setup + let conflict_b = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 300, + unique_b: 400, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + row1.unique_b + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let row2 = table.select_by_unique_a(300).unwrap(); + + let updated_row = TwoUniqueIdxRow { + id: row2.id, + unique_a: 500, + unique_b: conflict_b, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify rollback worked + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 500, + unique_b: 600, + }; + assert!( + table.insert(new_row).is_ok(), + "BUG: orphaned entry in unique_a_idx!" + ); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_reinsert_three_indexes_first_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_three_first", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_three_first".to_string()).await; + + // Phase 1: Setup + let conflict_a = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 500, + unique_c: 600, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + row1.unique_a + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let row2 = table.select_by_unique_a(400).unwrap(); + + let updated_row = ThreeUniqueIdxRow { + id: row2.id, + unique_a: conflict_a, + unique_b: 700, + unique_c: 800, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row2 = table.select_by_unique_a(400).unwrap(); + assert_eq!(row2.unique_a, 400); + assert_eq!(row2.unique_b, 500); + assert_eq!(row2.unique_c, 600); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_reinsert_three_indexes_middle_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_three_middle", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_three_middle".to_string()).await; + + // Phase 1: Setup + let conflict_b = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 500, + unique_c: 600, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + row1.unique_b + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let row2 = table.select_by_unique_a(400).unwrap(); + + let updated_row = ThreeUniqueIdxRow { + id: row2.id, + unique_a: 700, + unique_b: conflict_b, + unique_c: 800, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify rollback + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 700, + unique_b: 900, + unique_c: 1000, + }; + assert!(table.insert(new_row).is_ok(), "BUG: orphaned entry!"); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_reinsert_three_indexes_last_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/reinsert_three_last", + ThreeUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/reinsert_three_last".to_string()).await; + + // Phase 1: Setup + let conflict_c = { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + unique_c: 300, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 400, + unique_b: 500, + unique_c: 600, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + row1.unique_c + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + unique_c: 1002, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + unique_c: 2002, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let row2 = table.select_by_unique_a(400).unwrap(); + + let updated_row = ThreeUniqueIdxRow { + id: row2.id, + unique_a: 700, + unique_b: 800, + unique_c: conflict_c, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + unique_c: 3002, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify rollback + { + let engine = ThreeUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = ThreeUniqueIdxWorkTable::load(engine).await.unwrap(); + + let new_row = ThreeUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 700, + unique_b: 800, + unique_c: 900, + }; + assert!(table.insert(new_row).is_ok(), "BUG: orphaned entries!"); + table.wait_for_ops().await; + } + }); +} diff --git a/tests/persistence/failure/update.rs b/tests/persistence/failure/update.rs new file mode 100644 index 0000000..c5d5049 --- /dev/null +++ b/tests/persistence/failure/update.rs @@ -0,0 +1,180 @@ +/// Update failure tests - testing CDC event gaps during update operations +use super::*; +use crate::remove_dir_if_exists; + +#[test] +fn test_update_unique_secondary_conflict() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_unique_conflict", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_unique_conflict".to_string()).await; + + // Phase 1: Setup + let row1_pk = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 300, + unique_b: 400, + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + row1.id + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let updated_row = TwoUniqueIdxRow { + id: row1_pk, + unique_a: 300, + unique_b: 500, + }; + + let result = table.update(updated_row).await; + assert!(result.is_err()); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = table.select(row1_pk).unwrap(); + assert_eq!(row1.unique_a, 100); + assert_eq!(row1.unique_b, 200); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_update_pk_based_success() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_pk_success", + TwoUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_pk_success".to_string()).await; + + // Phase 1: Setup + let row1_pk = { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 100, + unique_b: 200, + }; + table.insert(row1.clone()).unwrap(); + table.wait_for_ops().await; + row1.id + }; + + // Phase 2: 2 valid inserts -> success update -> valid insert -> wait_for_ops + { + let engine = TwoUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1000, + unique_b: 1001, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 2000, + unique_b: 2001, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let updated_row = TwoUniqueIdxRow { + id: row1_pk, + unique_a: 150, + unique_b: 250, + }; + + let result = table.update(updated_row).await; + assert!(result.is_ok()); + + let valid_row3 = TwoUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 3000, + unique_b: 3001, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok()); + } + + // Phase 3: Verify + { + let engine = TwoUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = TwoUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row1 = table.select(row1_pk).unwrap(); + assert_eq!(row1.unique_a, 150); + assert_eq!(row1.unique_b, 250); + table.wait_for_ops().await; + } + }); +} \ No newline at end of file diff --git a/tests/persistence/failure/update_non_unique.rs b/tests/persistence/failure/update_non_unique.rs new file mode 100644 index 0000000..1c9c05c --- /dev/null +++ b/tests/persistence/failure/update_non_unique.rs @@ -0,0 +1,212 @@ +/// Non-unique index update failure tests +use super::*; +use crate::remove_dir_if_exists; + +#[test] +fn test_update_non_unique_middle_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_non_unique_middle", + MixedIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_non_unique_middle".to_string()).await; + + // Phase 1: Setup + let (row1_pk, row2_pk, row3_pk) = { + let engine = MixedIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + let row1 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + data: 100, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 20, + data: 200, + }; + table.insert(row2.clone()).unwrap(); + + let row3 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 30, + data: 300, + }; + table.insert(row3.clone()).unwrap(); + table.wait_for_ops().await; + + (row1.id, row2.id, row3.id) + }; + + // Phase 2: 2 valid inserts -> failure bulk update -> valid insert -> wait_for_ops + { + let engine = MixedIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = MixedIdxRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 1000, + data: 1000, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = MixedIdxRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 2000, + data: 2000, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = UniqueValueByCategoryQuery { unique_value: 99 }; + let result = table.update_unique_value_by_category(query, 1).await; + assert!(result.is_err()); + + let valid_row3 = MixedIdxRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 3000, + data: 3000, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = MixedIdxPersistenceEngine::new(config).await.unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + assert!(table.select(row1_pk).is_some()); + assert!(table.select(row2_pk).is_some()); + assert!(table.select(row3_pk).is_some()); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_update_non_unique_last_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_non_unique_last", + MixedIdxWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_non_unique_last".to_string()).await; + + // Phase 1: Setup + let conflict_pk = { + let engine = MixedIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + let conflict_row = MixedIdxRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 99, + data: 0, + }; + table.insert(conflict_row.clone()).unwrap(); + + let row1 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + data: 100, + }; + table.insert(row1.clone()).unwrap(); + + let row2 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 20, + data: 200, + }; + table.insert(row2.clone()).unwrap(); + + let row3 = MixedIdxRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 30, + data: 300, + }; + table.insert(row3.clone()).unwrap(); + table.wait_for_ops().await; + + conflict_row.id + }; + + // Phase 2: 2 valid inserts -> failure bulk update -> valid insert -> wait_for_ops + { + let engine = MixedIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + let valid_row1 = MixedIdxRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 1000, + data: 1000, + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = MixedIdxRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 2000, + data: 2000, + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = UniqueValueByCategoryQuery { unique_value: 99 }; + let result = table.update_unique_value_by_category(query, 1).await; + assert!(result.is_err()); + + let valid_row3 = MixedIdxRow { + id: table.get_next_pk().0, + category: 400, + unique_value: 3000, + data: 3000, + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = MixedIdxPersistenceEngine::new(config).await.unwrap(); + let table = MixedIdxWorkTable::load(engine).await.unwrap(); + + let conflict = table.select(conflict_pk).unwrap(); + assert_eq!(conflict.unique_value, 99); + table.wait_for_ops().await; + } + }); +} \ No newline at end of file diff --git a/tests/persistence/failure/update_unsized.rs b/tests/persistence/failure/update_unsized.rs new file mode 100644 index 0000000..8e0b6a1 --- /dev/null +++ b/tests/persistence/failure/update_unsized.rs @@ -0,0 +1,417 @@ +/// Unsized field update failure tests +use super::*; +use crate::remove_dir_if_exists; + +#[test] +fn test_update_unsized_same_size() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_unsized_same_size", + NonUniqueUnsizedWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_unsized_same_size".to_string()).await; + + // Phase 1: Setup + let (row1_pk, row2_pk, row3_pk) = { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + name: "aaa".to_string(), + }; + table.insert(row1.clone()).unwrap(); + + let row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 20, + name: "bbb".to_string(), + }; + table.insert(row2.clone()).unwrap(); + + let row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 30, + name: "ccc".to_string(), + }; + table.insert(row3.clone()).unwrap(); + table.wait_for_ops().await; + + (row1.id, row2.id, row3.id) + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let valid_row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 1000, + name: "xxx".to_string(), + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 2000, + name: "yyy".to_string(), + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = NameAndValueByCategoryQuery { + name: "xxx".to_string(), + unique_value: 99, + }; + let result = table.update_name_and_value_by_category(query, 1).await; + assert!(result.is_err()); + + let valid_row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 3000, + name: "zzz".to_string(), + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config).await.unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + assert!(table.select(row1_pk).is_some()); + assert!(table.select(row2_pk).is_some()); + assert!(table.select(row3_pk).is_some()); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_update_unsized_larger_all_success() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_unsized_larger_success", + NonUniqueUnsizedWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_unsized_larger_success".to_string()).await; + + // Phase 1: Setup + let row_pk = { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let row = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + name: "a".to_string(), + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.id + }; + + // Phase 2: 2 valid inserts -> success update -> valid insert -> wait_for_ops + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let valid_row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 1000, + name: "xxx".to_string(), + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 2000, + name: "yyy".to_string(), + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = NameAndValueByCategoryQuery { + name: "larger_name".to_string(), + unique_value: 20, + }; + + let result = table.update_name_and_value_by_category(query, 1).await; + assert!(result.is_ok()); + + let valid_row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 3000, + name: "zzz".to_string(), + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok()); + } + + // Phase 3: Verify + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config).await.unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let row = table.select(row_pk).unwrap(); + assert_eq!(row.unique_value, 20); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_update_unsized_larger_middle_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_unsized_larger_middle", + NonUniqueUnsizedWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_unsized_larger_middle".to_string()).await; + + // Phase 1: Setup + let (conflict_pk, row2_pk, row3_pk) = { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let conflict = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 99, + name: "x".to_string(), + }; + table.insert(conflict.clone()).unwrap(); + + let row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + name: "a".to_string(), + }; + table.insert(row1.clone()).unwrap(); + + let row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 20, + name: "b".to_string(), + }; + table.insert(row2.clone()).unwrap(); + + let row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 30, + name: "c".to_string(), + }; + table.insert(row3.clone()).unwrap(); + table.wait_for_ops().await; + + (conflict.id, row2.id, row3.id) + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let valid_row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 1000, + name: "xxx".to_string(), + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 2000, + name: "yyy".to_string(), + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = NameAndValueByCategoryQuery { + name: "larger_name".to_string(), + unique_value: 99, + }; + + let result = table.update_name_and_value_by_category(query, 1).await; + assert!(result.is_err()); + + let valid_row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 400, + unique_value: 3000, + name: "zzz".to_string(), + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config).await.unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let row2 = table.select(row2_pk).unwrap(); + assert_eq!(row2.name, "b".to_string()); + assert_eq!(row2.unique_value, 20); + + let row3 = table.select(row3_pk).unwrap(); + assert_eq!(row3.name, "c".to_string()); + assert_eq!(row3.unique_value, 30); + + let conflict = table.select(conflict_pk).unwrap(); + assert_eq!(conflict.unique_value, 99); + table.wait_for_ops().await; + } + }); +} + +#[test] +fn test_update_unsized_larger_last_fail() { + let config = DiskConfig::new_with_table_name( + "tests/data/failure/update_unsized_larger_last", + NonUniqueUnsizedWorkTable::name_snake_case(), + ); + + let runtime = get_runtime(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/failure/update_unsized_larger_last".to_string()).await; + + // Phase 1: Setup + let (row1_pk, row2_pk) = { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 10, + name: "a".to_string(), + }; + table.insert(row1.clone()).unwrap(); + + let row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 1, + unique_value: 20, + name: "b".to_string(), + }; + table.insert(row2.clone()).unwrap(); + table.wait_for_ops().await; + + (row1.id, row2.id) + }; + + // Phase 2: 2 valid inserts -> failure update -> valid insert -> wait_for_ops + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + let valid_row1 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 100, + unique_value: 1000, + name: "xxx".to_string(), + }; + table.insert(valid_row1).unwrap(); + + let valid_row2 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 200, + unique_value: 2000, + name: "yyy".to_string(), + }; + table.insert(valid_row2).unwrap(); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let query = NameAndValueByCategoryQuery { + name: "larger".to_string(), + unique_value: 99, + }; + + let result = table.update_name_and_value_by_category(query, 1).await; + assert!(result.is_err()); + + let valid_row3 = NonUniqueUnsizedRow { + id: table.get_next_pk().0, + category: 300, + unique_value: 3000, + name: "zzz".to_string(), + }; + table.insert(valid_row3).unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!(wait_result.is_ok(), "BUG: persistence blocked!"); + } + + // Phase 3: Verify + { + let engine = NonUniqueUnsizedPersistenceEngine::new(config).await.unwrap(); + let table = NonUniqueUnsizedWorkTable::load(engine).await.unwrap(); + + // Row2: unchanged (failed to update) + let row2 = table.select(row2_pk).unwrap(); + assert_eq!(row2.name, "b".to_string()); + assert_eq!(row2.unique_value, 20); + + // Row1: was updated before row2 failed + let row1 = table.select(row1_pk).unwrap(); + assert_eq!(row1.name, "larger".to_string()); + assert_eq!(row1.unique_value, 99); + table.wait_for_ops().await; + } + }); +} \ No newline at end of file diff --git a/tests/persistence/mod.rs b/tests/persistence/mod.rs index ef05162..9b455cd 100644 --- a/tests/persistence/mod.rs +++ b/tests/persistence/mod.rs @@ -3,6 +3,7 @@ use worktable::prelude::*; use worktable::worktable; mod concurrent; +mod failure; mod index_page; mod read; mod space_index; diff --git a/tests/persistence/sync/failure.rs b/tests/persistence/sync/failure.rs new file mode 100644 index 0000000..69e3e0f --- /dev/null +++ b/tests/persistence/sync/failure.rs @@ -0,0 +1,225 @@ +use crate::remove_dir_if_exists; +use worktable::prelude::*; + +use super::{ + AnotherByIdQuery, FieldByAnotherQuery, TestSyncPersistenceEngine, TestSyncRow, + TestSyncWorkTable, +}; + +#[test] +fn test_failed_update_by_pk_doesnt_corrupt_persistence() { + let config = DiskConfig::new_with_table_name( + "tests/data/sync/failure_update_pk", + TestSyncWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/failure_update_pk".to_string()).await; + + let pks = { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + let mut pks = vec![]; + for i in 0..100 { + let row = TestSyncRow { + id: table.get_next_pk().0, + another: i, + non_unique: 0, + field: i as f64, + }; + table.insert(row.clone()).unwrap(); + pks.push(row.id); + } + table.wait_for_ops().await; + pks + }; + + { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + + let result = table + .update_another_by_id(AnotherByIdQuery { another: 9999 }, 9999) + .await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), WorkTableError::NotFound)); + + for (i, pk) in pks.iter().enumerate() { + table + .update_another_by_id( + AnotherByIdQuery { + another: i as u64 + 1000, + }, + *pk, + ) + .await + .unwrap(); + } + table.wait_for_ops().await; + } + + { + let engine = TestSyncPersistenceEngine::new(config).await.unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + for (i, pk) in pks.iter().enumerate() { + let row = table.select(*pk).unwrap(); + assert_eq!(row.another, i as u64 + 1000); + } + let last_pk = *pks.last().unwrap(); + assert_eq!(table.0.pk_gen.get_state(), last_pk + 1); + } + }); +} + +#[test] +fn test_failed_update_by_unique_index_doesnt_corrupt_persistence() { + let config = DiskConfig::new_with_table_name( + "tests/data/sync/failure_update_unique", + TestSyncWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/failure_update_unique".to_string()).await; + + let pks = { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + let mut pks = vec![]; + for i in 0..100 { + let row = TestSyncRow { + id: table.get_next_pk().0, + another: i, + non_unique: 0, + field: i as f64, + }; + table.insert(row.clone()).unwrap(); + pks.push(row.id); + } + table.wait_for_ops().await; + pks + }; + + { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + + let result = table + .update_field_by_another(FieldByAnotherQuery { field: 9999.0 }, 9999) + .await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), WorkTableError::NotFound)); + + for (i, _pk) in pks.iter().enumerate() { + table + .update_field_by_another( + FieldByAnotherQuery { + field: i as f64 + 1000.0, + }, + i as u64, + ) + .await + .unwrap(); + } + table.wait_for_ops().await; + } + + { + let engine = TestSyncPersistenceEngine::new(config).await.unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + for (i, pk) in pks.iter().enumerate() { + let row = table.select(*pk).unwrap(); + assert_eq!(row.field, i as f64 + 1000.0); + } + let last_pk = *pks.last().unwrap(); + assert_eq!(table.0.pk_gen.get_state(), last_pk + 1); + } + }); +} + +#[test] +fn test_failed_delete_by_pk_doesnt_corrupt_persistence() { + let config = DiskConfig::new_with_table_name( + "tests/data/sync/failure_delete_pk", + TestSyncWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/failure_delete_pk".to_string()).await; + + let pks = { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + let mut pks = vec![]; + for i in 0..100 { + let row = TestSyncRow { + id: table.get_next_pk().0, + another: i, + non_unique: 0, + field: i as f64, + }; + table.insert(row.clone()).unwrap(); + pks.push(row.id); + } + table.wait_for_ops().await; + pks + }; + + { + let engine = TestSyncPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + + let result = table.delete(9999).await; + assert!(result.is_err()); + assert!(matches!(result.unwrap_err(), WorkTableError::NotFound)); + + table.wait_for_ops().await; + } + + { + let engine = TestSyncPersistenceEngine::new(config).await.unwrap(); + let table = TestSyncWorkTable::load(engine).await.unwrap(); + for pk in &pks { + let row = table.select(*pk).unwrap(); + assert_eq!( + row.another, + pks.iter().position(|p| p == pk).unwrap() as u64 + ); + } + let last_pk = *pks.last().unwrap(); + assert_eq!(table.0.pk_gen.get_state(), last_pk + 1); + } + }); +} diff --git a/tests/persistence/sync/failure_multi_index.rs b/tests/persistence/sync/failure_multi_index.rs new file mode 100644 index 0000000..dea8f8f --- /dev/null +++ b/tests/persistence/sync/failure_multi_index.rs @@ -0,0 +1,165 @@ +/// Test for multi-index persistence hang bug +/// +/// This test detects a bug where: +/// 1. Insert succeeds on Index A (CDC event generated) +/// 2. Insert fails on Index B (duplicate value - must be unique index) +/// 3. Rollback removes Index A entry in memory +/// 4. CDC system may have queued an event for the partial insert that never gets cleaned up +/// 5. This leaves the persistence queue/state corrupted, blocking future operations +use crate::remove_dir_if_exists; +use std::time::Duration; +use tokio::time::timeout; +use worktable::prelude::*; +use worktable::worktable; + + +// Table with TWO unique indexes to trigger the bug scenario +worktable!( + name: MultiUniqueIdx, + persist: true, + columns: { + id: u64 primary_key autoincrement, + unique_a: u64, + unique_b: u64, + }, + indexes: { + unique_a_idx: unique_a unique, + unique_b_idx: unique_b unique, + }, + queries: {} +); + +#[test] +fn test_multi_index_insert_failure_doesnt_corrupt_persistence() { + let config = DiskConfig::new_with_table_name( + "tests/data/sync/failure_multi_index_insert", + MultiUniqueIdxWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/failure_multi_index_insert".to_string()).await; + + // Phase 1: Insert initial rows to populate indexes + let pk = { + let engine = MultiUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MultiUniqueIdxWorkTable::load(engine).await.unwrap(); + + let row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 0, + unique_b: 0, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row.id + }; + + // Phase 2: The critical test - failed insert followed by valid insert + // This tests if the persistence system gets stuck after a failed insert + let valid_insert_pk = { + let engine = MultiUniqueIdxPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = MultiUniqueIdxWorkTable::load(engine).await.unwrap(); + + let valid_row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 9999, + unique_b: 999, + }; + table.insert(valid_row).unwrap(); + + let valid_row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 1, + unique_b: 1, + }; + table.insert(valid_row).unwrap(); + + tokio::time::sleep(Duration::from_millis(500)).await; + + let failing_row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 99, + unique_b: 0, // This already exists + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + WorkTableError::AlreadyExists(_) + )); + + let failing_row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 99, + unique_b: 0, // This already exists + }; + + let result = table.insert(failing_row); + assert!(result.is_err()); + assert!(matches!( + result.unwrap_err(), + WorkTableError::AlreadyExists(_) + )); + + let valid_row = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 999, + unique_b: 99, + }; + let valid_pk = valid_row.id; + table.insert(valid_row).unwrap(); + + // Use timeout to detect if persistence is stuck + // If this hangs, the bug exists - CDC queue is blocked + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + + if wait_result.is_err() { + panic!( + "BUG DETECTED: Persistence system is stuck! \ + wait_for_ops() timed out after 10 seconds. " + ); + } + + valid_pk + }; + + // Phase 3: Reload from disk and verify + { + let engine = MultiUniqueIdxPersistenceEngine::new(config).await.unwrap(); + let table = MultiUniqueIdxWorkTable::load(engine).await.unwrap(); + + let original_row = table.select(pk).unwrap(); + assert_eq!(original_row.unique_a, 0); + assert_eq!(original_row.unique_b, 0); + + let persisted_row = table.select(valid_insert_pk).unwrap(); + assert_eq!(persisted_row.unique_a, 999); + assert_eq!(persisted_row.unique_b, 99); + + let row_with_99 = MultiUniqueIdxRow { + id: table.get_next_pk().0, + unique_a: 99, + unique_b: 100, + }; + let result = table.insert(row_with_99.clone()); + assert!( + result.is_ok(), + "BUG DETECTED: unique_a_idx has orphaned entry for unique_a=99 \ + from the failed insert. This indicates CDC event loss during rollback." + ); + assert!(table.select(row_with_99.id).is_some()); + } + }); +} diff --git a/tests/persistence/sync/mod.rs b/tests/persistence/sync/mod.rs index c540b25..e12b9a8 100644 --- a/tests/persistence/sync/mod.rs +++ b/tests/persistence/sync/mod.rs @@ -5,11 +5,14 @@ use worktable::prelude::PersistedWorkTable; use worktable::prelude::*; use worktable::worktable; +mod failure; +mod failure_multi_index; mod many_strings; mod option; mod string_primary_index; mod string_re_read; mod string_secondary_index; +mod string_update_timeout; mod uuid_; worktable! ( diff --git a/tests/persistence/sync/string_update_timeout.rs b/tests/persistence/sync/string_update_timeout.rs new file mode 100644 index 0000000..4af4b4c --- /dev/null +++ b/tests/persistence/sync/string_update_timeout.rs @@ -0,0 +1,114 @@ +use crate::remove_dir_if_exists; +use std::time::Duration; +use tokio::time::timeout; +use worktable::prelude::*; +use worktable::worktable; + +worktable!( + name: User, + persist: true, + columns: { + id: u64 primary_key autoincrement, + public_id: u64, + fk_app_id: u64, + fk_app_pub_id: u64, + username: String, + display_name: String optional, + telegram_username: String optional, + telegram_confirmed: bool, + status: u64, + honey_app_role: u64, + }, + indexes: { + public_id_idx: public_id unique, + username_idx: username, + fk_app_id_idx: fk_app_id, + fk_app_public_id_idx: fk_app_pub_id, + }, + queries: { + update: { + DisplayNameByPublicId(display_name) by public_id, + UsernameByPublicId(username) by public_id, + StatusByPublicId(status) by public_id, + }, + delete: { + ByFkAppId() by fk_app_id, + ByFkAppPubId() by fk_app_pub_id, + ByPubId() by public_id, + } + } +); + +#[test] +fn test_string_update_doesnt_block_persistence() { + let config = DiskConfig::new_with_table_name( + "tests/data/sync/string_update_timeout", + UserWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/sync/string_update_timeout".to_string()).await; + + // Phase 1: Insert initial row with String fields + let row = { + let engine = UserPersistenceEngine::new(config.clone()).await.unwrap(); + let table = UserWorkTable::load(engine).await.unwrap(); + + let row = UserRow { + id: table.get_next_pk().0, + public_id: 1001, + fk_app_id: 42, + fk_app_pub_id: 9999, + username: "test_user".to_string(), + display_name: None, + telegram_username: None, + telegram_confirmed: false, + status: 1, + honey_app_role: 2, + }; + table.insert(row.clone()).unwrap(); + table.wait_for_ops().await; + row + }; + + { + let engine = UserPersistenceEngine::new(config.clone()).await.unwrap(); + let table = UserWorkTable::load(engine).await.unwrap(); + + table.update(row.clone()).await.unwrap(); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + + if wait_result.is_err() { + panic!( + "BUG DETECTED: Persistence system is stuck! \ + wait_for_ops() timed out after String update." + ); + } + } + + // Phase 3: Reload and verify persisted values + { + let engine = UserPersistenceEngine::new(config).await.unwrap(); + let table = UserWorkTable::load(engine).await.unwrap(); + + let persisted_row = table.select(row.id).unwrap(); + assert_eq!(persisted_row.username, row.username); + assert_eq!(persisted_row.display_name, row.display_name); + assert_eq!(persisted_row.public_id, row.public_id); + assert_eq!(persisted_row.fk_app_id, row.fk_app_id); + assert_eq!(persisted_row.fk_app_pub_id, row.fk_app_pub_id); + assert_eq!(persisted_row.telegram_username, row.telegram_username); + assert_eq!(persisted_row.telegram_confirmed, row.telegram_confirmed); + assert_eq!(persisted_row.status, row.status); + assert_eq!(persisted_row.honey_app_role, row.honey_app_role); + } + }); +} diff --git a/tests/worktable/index/mod.rs b/tests/worktable/index/mod.rs index 5642e8d..e949c1e 100644 --- a/tests/worktable/index/mod.rs +++ b/tests/worktable/index/mod.rs @@ -1,4 +1,5 @@ mod insert; +mod order; mod update_by_pk; mod update_full; mod update_query; diff --git a/tests/worktable/index/order.rs b/tests/worktable/index/order.rs new file mode 100644 index 0000000..77d41ca --- /dev/null +++ b/tests/worktable/index/order.rs @@ -0,0 +1,99 @@ +use worktable::prelude::*; +use worktable::worktable; + +worktable!( + name: OrderTest, + columns: { + id: u64 primary_key autoincrement, + field_a: String, + field_b: i64, + field_c: u32, + }, + indexes: { + idx_c: field_c unique, // should be 1st field in struct + idx_a: field_a unique, // should be 2nd field in struct + idx_b: field_b unique, // should be 3rd field in struct + } +); + +#[test] +fn index_struct_field_order() { + let variant_c = OrderTestAvailableIndexes::IdxC; + let variant_a = OrderTestAvailableIndexes::IdxA; + let variant_b = OrderTestAvailableIndexes::IdxB; + + assert_eq!(variant_c as usize, 0); + assert_eq!(variant_a as usize, 1); + assert_eq!(variant_b as usize, 2); +} + +#[test] +fn available_indexes_enum_order() { + let variants = vec![ + OrderTestAvailableIndexes::IdxC, + OrderTestAvailableIndexes::IdxA, + OrderTestAvailableIndexes::IdxB, + ]; + + assert!(variants[0] < variants[1], "IdxC should be less than IdxA"); + assert!(variants[1] < variants[2], "IdxA should be less than IdxB"); + assert!(variants[0] < variants[2], "IdxC should be less than IdxB"); +} + +#[tokio::test] +async fn insert_failure_rollback_order() { + let table = OrderTestWorkTable::default(); + + let row1 = OrderTestRow { + id: 0, + field_a: "a".to_string(), + field_b: 1, + field_c: 1, + }; + let pk1 = table.insert(row1).unwrap(); + let pk1_val: u64 = pk1.into(); + assert_eq!(pk1_val, 0u64); + + let row2 = OrderTestRow { + id: 1, + field_a: "a".to_string(), + field_b: 2, + field_c: 2, + }; + let err = table.insert(row2).unwrap_err(); + + let err_str = err.to_string(); + assert!( + err_str.contains("unique") || err_str.contains("IdxA") || err_str.contains("idx_a"), + "Error should mention the failed index: {}", + err_str + ); +} + +#[tokio::test] +async fn insert_success_order() { + let table = OrderTestWorkTable::default(); + + let row = OrderTestRow { + id: 0, + field_a: "test".to_string(), + field_b: 42, + field_c: 100, + }; + + let pk = table.insert(row.clone()).unwrap(); + let pk_val: u64 = pk.into(); + assert_eq!(pk_val, 0u64); + + let by_idx_c = table.select_by_field_c(100); + assert!(by_idx_c.is_some()); + assert_eq!(by_idx_c.unwrap().field_a, "test"); + + let by_idx_a = table.select_by_field_a("test".to_string()); + assert!(by_idx_a.is_some()); + assert_eq!(by_idx_a.unwrap().field_c, 100); + + let by_idx_b = table.select_by_field_b(42); + assert!(by_idx_b.is_some()); + assert_eq!(by_idx_b.unwrap().field_a, "test"); +}