From e92c69656a0e0abb64cfb778fb0c2a3d6599c98a Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 15 Apr 2026 12:54:42 +0300 Subject: [PATCH 1/7] WIP --- src/persistence/space/index/unsized_.rs | 45 ++++++++-- tests/persistence/sync/string_re_read.rs | 107 +++++++++++++++++++++++ 2 files changed, 145 insertions(+), 7 deletions(-) diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index e1d5cd7..09c9d93 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; -use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use data_bucket::page::PageId; use data_bucket::{ - GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, Link, PageType, SizeMeasurable, - SpaceId, SpaceInfoPage, UnsizedIndexPage, VariableSizeMeasurable, parse_page, persist_page, - persist_pages_batch, + parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, + Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage, + VariableSizeMeasurable, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -16,17 +16,17 @@ use indexset::concurrent::map::BTreeMap; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; -use rkyv::{Archive, Deserialize, Serialize, rancor}; +use rkyv::{rancor, Archive, Deserialize, Serialize}; use tokio::fs::File; -use crate::UnsizedNode; use crate::persistence::space::BatchChangeEvent; use crate::persistence::{IndexTableOfContents, SpaceIndex, SpaceIndexOps}; use crate::prelude::WT_INDEX_EXTENSION; +use crate::UnsizedNode; #[derive(Debug)] pub struct SpaceIndexUnsized { @@ -387,12 +387,30 @@ where &mut self, events: BatchChangeEvent, ) -> eyre::Result<()> { + println!("[INFO events]: {:?}", events); let mut pages: HashMap = HashMap::new(); for ev in events { + println!("[INFO event]: {:?}", ev); match &ev { ChangeEvent::InsertAt { max_value, .. } | ChangeEvent::RemoveAt { max_value, .. } => { let page_id = &(max_value.key.clone(), max_value.value); + + // DEBUG START + println!( + "[INFO InsertAt/RemoveAt] Looking for page_id: {:?}", + page_id + ); + println!("[INFO] Available entries in table_of_contents:"); + for (k, pid) in self.table_of_contents.iter() { + println!("[INFO] key={:?}, page_id={:?}", k, pid); + } + println!( + "[INFO] Total entries count: {}", + self.table_of_contents.iter().count() + ); + // DEBUG END + let page_index = self .table_of_contents .get(page_id) @@ -463,6 +481,19 @@ where split_index, } => { let page_id = &(max_value.key.clone(), max_value.value); + + // DEBUG START + println!("[INFO SplitNode] Looking for page_id: {:?}", page_id); + println!("[INFO] Available entries in table_of_contents:"); + for (k, pid) in self.table_of_contents.iter() { + println!("[INFO] key={:?}, page_id={:?}", k, pid); + } + println!( + "[INFO] Total entries count: {}", + self.table_of_contents.iter().count() + ); + // DEBUG END + let page_index = self .table_of_contents .get(page_id) diff --git a/tests/persistence/sync/string_re_read.rs b/tests/persistence/sync/string_re_read.rs index 26493c2..c4a79bc 100644 --- a/tests/persistence/sync/string_re_read.rs +++ b/tests/persistence/sync/string_re_read.rs @@ -1,4 +1,6 @@ use crate::remove_dir_if_exists; +use std::time::Duration; +use tokio::time::timeout; use worktable::prelude::PersistedWorkTable; use worktable::prelude::*; @@ -614,6 +616,111 @@ fn test_key_delete_by_non_unique() { }) } +#[test] +fn test_toc_not_updated_when_index_value_same_but_link_changes() { + let config = DiskConfig::new_with_table_name( + "tests/data/key/toc_link_bug", + StringReReadWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/key/toc_link_bug".to_string()).await; + + let pk1 = { + let engine = StringReReadPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + let pk1 = table + .insert(StringReReadRow { + first: "same_first".to_string(), + id: table.get_next_pk().into(), + third: "third_1".to_string(), + second: "second_1".to_string(), + last: "last_1".to_string(), + }) + .unwrap(); + + table + .insert(StringReReadRow { + first: "same_first".to_string(), + id: table.get_next_pk().into(), + third: "third_2".to_string(), + second: "second_2".to_string(), + last: "last_2".to_string(), + }) + .unwrap(); + + table.wait_for_ops().await; + pk1 + }; + + { + let engine = StringReReadPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + table + .update(StringReReadRow { + first: "same_first".to_string(), + id: pk1.into(), + third: "third_updated".to_string(), + second: "second_1".to_string(), + last: "last_updated".to_string(), + }) + .await + .unwrap(); + + table.wait_for_ops().await; + } + + { + let engine = StringReReadPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + let result = table.insert(StringReReadRow { + first: "same_first".to_string(), + id: table.get_next_pk().into(), + third: "third_3".to_string(), + second: "second_3".to_string(), + last: "last_3".to_string(), + }); + + assert!( + result.is_ok(), + "TOC entry is stale after update with same index value" + ); + + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + if wait_result.is_err() { + panic!("BUG DETECTED: Persistence system is stuck - wait_for_ops() timed out"); + } + } + + { + let engine = StringReReadPersistenceEngine::new(config).await.unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + assert_eq!(table.select_all().execute().unwrap().len(), 3); + assert_eq!( + table.select_by_first("same_first".to_string()).execute().unwrap().len(), + 3 + ); + } + }); +} + #[test] fn test_big_amount_reread() { let config = DiskConfig::new_with_table_name( From b619800bbd148c50454894e9e97b40e24c0609a5 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 15 Apr 2026 15:56:06 +0300 Subject: [PATCH 2/7] WIP --- src/persistence/space/index/mod.rs | 2 +- src/persistence/space/index/unsized_.rs | 31 +------------------------ 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index e11522d..35cdfe0 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -158,7 +158,7 @@ where ) .await?; - if node_id.key < value.key { + if node_id.key < value.key || (node_id.key == value.key && node_id.value != value.value) { utility.node_id = value.clone().into(); new_node_id = Some(value); } diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index 09c9d93..349cf5d 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -165,7 +165,7 @@ where (value_offset, (value_offset - previous_offset) as u16), ); - if node_id.key < value.key { + if node_id.key < value.key || (node_id.key == value.key && node_id.value != value.value) { utility.update_node_id(value.clone().into())?; new_node_id = Some(value); } @@ -395,22 +395,6 @@ where ChangeEvent::InsertAt { max_value, .. } | ChangeEvent::RemoveAt { max_value, .. } => { let page_id = &(max_value.key.clone(), max_value.value); - - // DEBUG START - println!( - "[INFO InsertAt/RemoveAt] Looking for page_id: {:?}", - page_id - ); - println!("[INFO] Available entries in table_of_contents:"); - for (k, pid) in self.table_of_contents.iter() { - println!("[INFO] key={:?}, page_id={:?}", k, pid); - } - println!( - "[INFO] Total entries count: {}", - self.table_of_contents.iter().count() - ); - // DEBUG END - let page_index = self .table_of_contents .get(page_id) @@ -481,19 +465,6 @@ where split_index, } => { let page_id = &(max_value.key.clone(), max_value.value); - - // DEBUG START - println!("[INFO SplitNode] Looking for page_id: {:?}", page_id); - println!("[INFO] Available entries in table_of_contents:"); - for (k, pid) in self.table_of_contents.iter() { - println!("[INFO] key={:?}, page_id={:?}", k, pid); - } - println!( - "[INFO] Total entries count: {}", - self.table_of_contents.iter().count() - ); - // DEBUG END - let page_index = self .table_of_contents .get(page_id) From d559bd105824ca0800fc7505523a8b761549c952 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:11:23 +0300 Subject: [PATCH 3/7] WIP --- codegen/src/persist_index/generator.rs | 92 +++++++++++++++++-------- src/index/multipair.rs | 9 +++ src/persistence/space/index/unsized_.rs | 30 +++++--- 3 files changed, 94 insertions(+), 37 deletions(-) diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index f07c70f..ef9ced8 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -381,22 +381,40 @@ impl Generator { value: OffsetEqLink(p.value), }) .collect(); - let mut last_key = inner.first().expect("Node should be not empty").key.clone(); - let mut discriminator = 0; - let mut inner = inner.into_iter().map(move |p| { - if p.key == last_key { - let multi = p.with_last_discriminator(discriminator) ; - discriminator = multi.discriminator; - multi + + // Sort first to establish deterministic ordering + let mut sorted: Vec<_> = inner.into_iter() + .map(|p| IndexMultiPair { + key: p.key, + value: p.value, + discriminator: 0, // temporary, will be reassigned + }) + .collect(); + sorted.sort(); + + // Reassign discriminators deterministically so last entry (max_value) has highest discriminator + let max_discriminator = u64::MAX - 1; // Avoid SUPREMUM + let mut current_discriminator = 1u64; // Avoid INFIMUM + let last_key = sorted.last().expect("Node should not be empty").key.clone(); + + for entry in sorted.iter_mut() { + if entry.key == last_key { + // Last key entries get ascending discriminators ending with max_discriminator + entry.discriminator = current_discriminator.min(max_discriminator); + current_discriminator += 1; } else { - last_key = p.key.clone(); - let multi: IndexMultiPair<_, _> = p.into(); - discriminator = multi.discriminator; - multi + // Other keys: assign sequential discriminators (they don't affect max_value lookup) + entry.discriminator = current_discriminator.min(max_discriminator); + current_discriminator += 1; } - }).collect::>(); - inner.sort(); - let node = UnsizedNode::from_inner(inner, #const_name); + } + + // Ensure the very last entry has max_discriminator + if let Some(last) = sorted.last_mut() { + last.discriminator = max_discriminator; + } + + let node = UnsizedNode::from_inner(sorted, #const_name); #i.attach_multi_node(node); } }; @@ -431,22 +449,40 @@ impl Generator { value: OffsetEqLink(p.value), }) .collect(); - let mut last_key = inner.first().expect("Node should be not empty").key.clone(); - let mut discriminator = 0; - let mut inner = inner.into_iter().map(move |p| { - if p.key == last_key { - let multi = p.with_last_discriminator(discriminator) ; - discriminator = multi.discriminator; - multi + + // Sort first to establish deterministic ordering + let mut sorted: Vec<_> = inner.into_iter() + .map(|p| IndexMultiPair { + key: p.key, + value: p.value, + discriminator: 0, // temporary, will be reassigned + }) + .collect(); + sorted.sort(); + + // Reassign discriminators deterministically so last entry (max_value) has highest discriminator + let max_discriminator = u64::MAX - 1; // Avoid SUPREMUM + let mut current_discriminator = 1u64; // Avoid INFIMUM + let last_key = sorted.last().expect("Node should not be empty").key.clone(); + + for entry in sorted.iter_mut() { + if entry.key == last_key { + // Last key entries get ascending discriminators ending with max_discriminator + entry.discriminator = current_discriminator.min(max_discriminator); + current_discriminator += 1; } else { - last_key = p.key.clone(); - let multi: IndexMultiPair<_, _> = p.into(); - discriminator = multi.discriminator; - multi + // Other keys: assign sequential discriminators (they don't affect max_value lookup) + entry.discriminator = current_discriminator.min(max_discriminator); + current_discriminator += 1; } - }).collect::>(); - inner.sort(); - #i.attach_multi_node(inner); + } + + // Ensure the very last entry has max_discriminator + if let Some(last) = sorted.last_mut() { + last.discriminator = max_discriminator; + } + + #i.attach_multi_node(sorted); } }; quote! { diff --git a/src/index/multipair.rs b/src/index/multipair.rs index ae2aca2..38e4503 100644 --- a/src/index/multipair.rs +++ b/src/index/multipair.rs @@ -3,6 +3,7 @@ use indexset::core::pair::Pair; pub trait MultiPairRecreate { fn with_last_discriminator(self, discriminator: u64) -> MultiPair; + fn with_discriminator(self, discriminator: u64) -> MultiPair; } impl MultiPairRecreate for Pair { @@ -13,4 +14,12 @@ impl MultiPairRecreate for Pair { discriminator: fastrand::u64(discriminator..), } } + + fn with_discriminator(self, discriminator: u64) -> MultiPair { + MultiPair { + key: self.key, + value: self.value, + discriminator, + } + } } diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index 349cf5d..bb1167b 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; -use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU32, Ordering}; use data_bucket::page::PageId; use data_bucket::{ - parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, - Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage, - VariableSizeMeasurable, + GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, Link, PageType, SizeMeasurable, + SpaceId, SpaceInfoPage, UnsizedIndexPage, VariableSizeMeasurable, parse_page, persist_page, + persist_pages_batch, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -16,17 +16,17 @@ use indexset::concurrent::map::BTreeMap; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; -use rkyv::ser::Serializer; use rkyv::util::AlignedVec; -use rkyv::{rancor, Archive, Deserialize, Serialize}; +use rkyv::{Archive, Deserialize, Serialize, rancor}; use tokio::fs::File; +use crate::UnsizedNode; use crate::persistence::space::BatchChangeEvent; use crate::persistence::{IndexTableOfContents, SpaceIndex, SpaceIndexOps}; use crate::prelude::WT_INDEX_EXTENSION; -use crate::UnsizedNode; #[derive(Debug)] pub struct SpaceIndexUnsized { @@ -387,14 +387,13 @@ where &mut self, events: BatchChangeEvent, ) -> eyre::Result<()> { - println!("[INFO events]: {:?}", events); let mut pages: HashMap = HashMap::new(); for ev in events { - println!("[INFO event]: {:?}", ev); match &ev { ChangeEvent::InsertAt { max_value, .. } | ChangeEvent::RemoveAt { max_value, .. } => { let page_id = &(max_value.key.clone(), max_value.value); + let page_index = self .table_of_contents .get(page_id) @@ -465,6 +464,19 @@ where split_index, } => { let page_id = &(max_value.key.clone(), max_value.value); + + // DEBUG START + println!("[INFO SplitNode] Looking for page_id: {:?}", page_id); + println!("[INFO] Available entries in table_of_contents:"); + for (k, pid) in self.table_of_contents.iter() { + println!("[INFO] key={:?}, page_id={:?}", k, pid); + } + println!( + "[INFO] Total entries count: {}", + self.table_of_contents.iter().count() + ); + // DEBUG END + let page_index = self .table_of_contents .get(page_id) From 09607896124922bfad1db81a2598dbc552297dfe Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 15 Apr 2026 22:38:31 +0300 Subject: [PATCH 4/7] WIP --- tests/persistence/sync/string_re_read.rs | 94 ++++++++++++++++++++++++ 1 file changed, 94 insertions(+) diff --git a/tests/persistence/sync/string_re_read.rs b/tests/persistence/sync/string_re_read.rs index c4a79bc..9e55fc4 100644 --- a/tests/persistence/sync/string_re_read.rs +++ b/tests/persistence/sync/string_re_read.rs @@ -784,3 +784,97 @@ fn test_big_amount_reread() { } }) } + +#[test] +fn test_unique_index_same_value_link_changes() { + let config = DiskConfig::new_with_table_name( + "tests/data/key/unique_link_change", + StringReReadWorkTable::name_snake_case(), + ); + + let runtime = tokio::runtime::Builder::new_multi_thread() + .worker_threads(2) + .enable_io() + .enable_time() + .build() + .unwrap(); + + runtime.block_on(async { + remove_dir_if_exists("tests/data/key/unique_link_change".to_string()).await; + + // Phase 1: Insert initial value + let pk1 = { + let engine = StringReReadPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + let pk1 = table + .insert(StringReReadRow { + first: "first_1".to_string(), + id: table.get_next_pk().into(), + third: "third_1".to_string(), + second: "unique_second".to_string(), + last: "last_1".to_string(), + }) + .unwrap(); + + table.wait_for_ops().await; + pk1 + }; + + // Phase 2: Update (same unique value) + Insert new (same block) + { + let engine = StringReReadPersistenceEngine::new(config.clone()) + .await + .unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + // Update: same second value, other fields change + table + .update(StringReReadRow { + first: "first_updated".to_string(), + id: pk1.into(), + third: "third_updated".to_string(), + second: "unique_second".to_string(), // SAME unique index value + last: "last_updated".to_string(), + }) + .await + .unwrap(); + + // Insert new row with different unique value + let result = table.insert(StringReReadRow { + first: "first_2".to_string(), + id: table.get_next_pk().into(), + third: "third_2".to_string(), + second: "unique_second_2".to_string(), + last: "last_2".to_string(), + }); + + assert!( + result.is_ok(), + "Insert should succeed after update with same unique index value" + ); + + // Timeout check for stuck persistence + let wait_result = timeout(Duration::from_secs(4), table.wait_for_ops()).await; + assert!( + wait_result.is_ok(), + "BUG: persistence blocked after unique index update" + ); + } + + // Phase 3: Load and verify all data + { + let engine = StringReReadPersistenceEngine::new(config).await.unwrap(); + let table = StringReReadWorkTable::load(engine).await.unwrap(); + + assert_eq!(table.select_all().execute().unwrap().len(), 2); + assert!(table.select_by_second("unique_second".to_string()).is_some()); + assert!(table.select_by_second("unique_second_2".to_string()).is_some()); + + let row1 = table.select_by_second("unique_second".to_string()).unwrap(); + assert_eq!(row1.first, "first_updated"); + } + }); +} From 809b2fb974587726aae66e7c304c16ebadd1a1ec Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Wed, 15 Apr 2026 23:09:11 +0300 Subject: [PATCH 5/7] WIP --- codegen/src/persist_index/generator.rs | 60 ++++++++------------------ 1 file changed, 18 insertions(+), 42 deletions(-) diff --git a/codegen/src/persist_index/generator.rs b/codegen/src/persist_index/generator.rs index ef9ced8..4492c19 100644 --- a/codegen/src/persist_index/generator.rs +++ b/codegen/src/persist_index/generator.rs @@ -372,7 +372,7 @@ impl Generator { } } else { quote! { - let inner: Vec<_> = page + let mut inner: Vec<_> = page .inner .get_node() .into_iter() @@ -382,36 +382,24 @@ impl Generator { }) .collect(); - // Sort first to establish deterministic ordering + let node_id = inner.pop(); + let mut sorted: Vec<_> = inner.into_iter() .map(|p| IndexMultiPair { key: p.key, value: p.value, - discriminator: 0, // temporary, will be reassigned + discriminator: 0, }) .collect(); sorted.sort(); - // Reassign discriminators deterministically so last entry (max_value) has highest discriminator - let max_discriminator = u64::MAX - 1; // Avoid SUPREMUM - let mut current_discriminator = 1u64; // Avoid INFIMUM - let last_key = sorted.last().expect("Node should not be empty").key.clone(); - + let mut current_discriminator = 1u64; for entry in sorted.iter_mut() { - if entry.key == last_key { - // Last key entries get ascending discriminators ending with max_discriminator - entry.discriminator = current_discriminator.min(max_discriminator); - current_discriminator += 1; - } else { - // Other keys: assign sequential discriminators (they don't affect max_value lookup) - entry.discriminator = current_discriminator.min(max_discriminator); - current_discriminator += 1; - } + entry.discriminator = current_discriminator.min(u64::MAX - 1); + current_discriminator += 1; } - - // Ensure the very last entry has max_discriminator - if let Some(last) = sorted.last_mut() { - last.discriminator = max_discriminator; + if let Some(node_id) = node_id { + sorted.push(IndexMultiPair { key: node_id.key, value: node_id.value, discriminator: u64::MAX - 1 }); } let node = UnsizedNode::from_inner(sorted, #const_name); @@ -440,7 +428,7 @@ impl Generator { } } else { quote! { - let inner: Vec<_> = page + let mut inner: Vec<_> = page .inner .get_node() .into_iter() @@ -450,36 +438,24 @@ impl Generator { }) .collect(); - // Sort first to establish deterministic ordering + let node_id = inner.pop(); + let mut sorted: Vec<_> = inner.into_iter() .map(|p| IndexMultiPair { key: p.key, value: p.value, - discriminator: 0, // temporary, will be reassigned + discriminator: 0, }) .collect(); sorted.sort(); - // Reassign discriminators deterministically so last entry (max_value) has highest discriminator - let max_discriminator = u64::MAX - 1; // Avoid SUPREMUM - let mut current_discriminator = 1u64; // Avoid INFIMUM - let last_key = sorted.last().expect("Node should not be empty").key.clone(); - + let mut current_discriminator = 1u64; for entry in sorted.iter_mut() { - if entry.key == last_key { - // Last key entries get ascending discriminators ending with max_discriminator - entry.discriminator = current_discriminator.min(max_discriminator); - current_discriminator += 1; - } else { - // Other keys: assign sequential discriminators (they don't affect max_value lookup) - entry.discriminator = current_discriminator.min(max_discriminator); - current_discriminator += 1; - } + entry.discriminator = current_discriminator.min(u64::MAX - 1); + current_discriminator += 1; } - - // Ensure the very last entry has max_discriminator - if let Some(last) = sorted.last_mut() { - last.discriminator = max_discriminator; + if let Some(node_id) = node_id { + sorted.push(IndexMultiPair { key: node_id.key, value: node_id.value, discriminator: u64::MAX - 1 }); } #i.attach_multi_node(sorted); From 5c205d841f0450ae959d77eaca428e7966e1fb07 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 16 Apr 2026 09:35:22 +0300 Subject: [PATCH 6/7] WIP --- Cargo.toml | 4 ++-- codegen/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 21021c2..bf5d300 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.9.0-beta0.1.1" +version = "0.9.0-beta0.1.2" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -46,7 +46,7 @@ tracing = "0.1" url = { version = "2", optional = true } uuid = { version = "1.10.0", features = ["v4", "v7"] } walkdir = { version = "2", optional = true } -worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.0" } +worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.2" } [dev-dependencies] chrono = "0.4.43" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 8025a62..5945f1e 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.9.0-beta0.1.0" +version = "0.9.0-beta0.1.2" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" From 899956940c410849aaed7d97452997b620e77f2f Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 16 Apr 2026 14:41:05 +0300 Subject: [PATCH 7/7] WIP --- Cargo.toml | 6 ++--- codegen/Cargo.toml | 2 +- src/persistence/space/index/mod.rs | 28 ++++++++++++------------ src/persistence/space/index/unsized_.rs | 29 +++++++------------------ 4 files changed, 26 insertions(+), 39 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index bf5d300..086b78a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.9.0-beta0.1.2" +version = "0.9.0-beta0.1.4" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -19,7 +19,7 @@ s3-support = ["dep:rusty-s3", "dep:url", "dep:reqwest", "dep:walkdir", "worktabl [dependencies] async-trait = "0.1.89" convert_case = "0.6.0" -data_bucket = "=0.3.13" +data_bucket = "=0.3.14" # data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" } # data_bucket = { path = "../DataBucket", version = "0.3.11" } derive_more = { version = "2.0.1", features = ["from", "error", "display", "debug", "into"] } @@ -46,7 +46,7 @@ tracing = "0.1" url = { version = "2", optional = true } uuid = { version = "1.10.0", features = ["v4", "v7"] } walkdir = { version = "2", optional = true } -worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.2" } +worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.3" } [dev-dependencies] chrono = "0.4.43" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 5945f1e..1da3170 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.9.0-beta0.1.2" +version = "0.9.0-beta0.1.3" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate" diff --git a/src/persistence/space/index/mod.rs b/src/persistence/space/index/mod.rs index 35cdfe0..b317027 100644 --- a/src/persistence/space/index/mod.rs +++ b/src/persistence/space/index/mod.rs @@ -6,15 +6,15 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; use std::path::Path; -use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use convert_case::{Case, Casing}; use data_bucket::page::{IndexValue, PageId}; use data_bucket::{ - GENERAL_HEADER_SIZE, GeneralHeader, GeneralPage, IndexPage, IndexPageUtility, Link, PageType, - SizeMeasurable, SpaceId, SpaceInfoPage, get_index_page_size_from_data_length, parse_page, - persist_page, persist_pages_batch, + get_index_page_size_from_data_length, parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPage, + IndexPageUtility, Link, PageType, SizeMeasurable, SpaceId, + SpaceInfoPage, GENERAL_HEADER_SIZE, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -22,15 +22,15 @@ use indexset::concurrent::map::BTreeMap; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; -use rkyv::{Archive, Deserialize, Serialize, rancor}; +use rkyv::{rancor, Archive, Deserialize, Serialize}; use tokio::fs::File; +use crate::persistence::space::{open_or_create_file, BatchChangeEvent}; use crate::persistence::SpaceIndexOps; -use crate::persistence::space::{BatchChangeEvent, open_or_create_file}; use crate::prelude::WT_INDEX_EXTENSION; pub use table_of_contents::IndexTableOfContents; @@ -158,7 +158,7 @@ where ) .await?; - if node_id.key < value.key || (node_id.key == value.key && node_id.value != value.value) { + if node_id.key < value.key { utility.node_id = value.clone().into(); new_node_id = Some(value); } @@ -433,14 +433,15 @@ where ChangeEvent::InsertAt { max_value, .. } | ChangeEvent::RemoveAt { max_value, .. } => { let page_id = &(max_value.key.clone(), max_value.value); - let Some(page_index) = self.table_of_contents.get(page_id) else { - panic!("page should be available in table of contents") - }; + + let page_index = self + .table_of_contents + .get(page_id) + .expect("page should be available in table of contents"); let page = pages.get_mut(&page_index); let page_to_update = if let Some(page) = page { page } else { - //println!("Trying to parse page {}", page_index); let page = parse_page::, INNER_PAGE_SIZE>( &mut self.index_file, page_index.into(), @@ -526,7 +527,6 @@ where .get_mut(&page_index) .expect("should be available as was just inserted before") }; - // println!("Event: {:?}", &ev); let splitted_page = page_to_update.inner.split(*split_index); let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() { id @@ -567,7 +567,7 @@ where #[cfg(test)] mod test { use data_bucket::{ - INNER_PAGE_SIZE, IndexPage, IndexValue, Persistable, get_index_page_size_from_data_length, + get_index_page_size_from_data_length, IndexPage, IndexValue, Persistable, INNER_PAGE_SIZE, }; #[test] diff --git a/src/persistence/space/index/unsized_.rs b/src/persistence/space/index/unsized_.rs index bb1167b..64228a5 100644 --- a/src/persistence/space/index/unsized_.rs +++ b/src/persistence/space/index/unsized_.rs @@ -1,14 +1,14 @@ use std::collections::HashMap; use std::fmt::Debug; use std::hash::Hash; -use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; use data_bucket::page::PageId; use data_bucket::{ - GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, Link, PageType, SizeMeasurable, - SpaceId, SpaceInfoPage, UnsizedIndexPage, VariableSizeMeasurable, parse_page, persist_page, - persist_pages_batch, + parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, + Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage, + VariableSizeMeasurable, }; use eyre::eyre; use indexset::cdc::change::ChangeEvent; @@ -16,17 +16,17 @@ use indexset::concurrent::map::BTreeMap; use indexset::core::pair::Pair; use rkyv::de::Pool; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; -use rkyv::{Archive, Deserialize, Serialize, rancor}; +use rkyv::{rancor, Archive, Deserialize, Serialize}; use tokio::fs::File; -use crate::UnsizedNode; use crate::persistence::space::BatchChangeEvent; use crate::persistence::{IndexTableOfContents, SpaceIndex, SpaceIndexOps}; use crate::prelude::WT_INDEX_EXTENSION; +use crate::UnsizedNode; #[derive(Debug)] pub struct SpaceIndexUnsized { @@ -165,7 +165,7 @@ where (value_offset, (value_offset - previous_offset) as u16), ); - if node_id.key < value.key || (node_id.key == value.key && node_id.value != value.value) { + if node_id.key < value.key { utility.update_node_id(value.clone().into())?; new_node_id = Some(value); } @@ -465,18 +465,6 @@ where } => { let page_id = &(max_value.key.clone(), max_value.value); - // DEBUG START - println!("[INFO SplitNode] Looking for page_id: {:?}", page_id); - println!("[INFO] Available entries in table_of_contents:"); - for (k, pid) in self.table_of_contents.iter() { - println!("[INFO] key={:?}, page_id={:?}", k, pid); - } - println!( - "[INFO] Total entries count: {}", - self.table_of_contents.iter().count() - ); - // DEBUG END - let page_index = self .table_of_contents .get(page_id) @@ -485,7 +473,6 @@ where let page_to_update = if let Some(page) = page { page } else { - // println!("Try to parse page: {:?} {:?}", page_index, page_id); let page = parse_page::, INNER_PAGE_SIZE>( &mut self.index_file,