From d54d50ef931f4dfe0bc9b5cfd3edd0da11cac2b1 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:39:07 +0300 Subject: [PATCH 01/10] debug --- src/in_memory/pages.rs | 21 +- src/index/primary_index.rs | 11 +- src/table/mod.rs | 8 +- src/table/vacuum/vacuum.rs | 10 +- tests/worktable/vacuum.rs | 1093 +++++++++++++++++++++++++++++++++++- 5 files changed, 1110 insertions(+), 33 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 486a109..84d7df2 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -4,25 +4,25 @@ use parking_lot::RwLock; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ - Archive, Deserialize, Portable, Serialize, - api::high::HighDeserializer, - rancor::Strategy, - ser::{Serializer, allocator::ArenaHandle, sharing::Share}, - util::AlignedVec, + api::high::HighDeserializer, rancor::Strategy, ser::{allocator::ArenaHandle, sharing::Share, Serializer}, util::AlignedVec, + Archive, + Deserialize, + Portable, + Serialize, }; use std::collections::VecDeque; use std::{ fmt::Debug, - sync::Arc, sync::atomic::{AtomicU32, AtomicU64, Ordering}, + sync::Arc, }; use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::prelude::ArchivedRowWrapper; use crate::{ in_memory::{ - DATA_INNER_LENGTH, Data, DataExecutionError, - row::{RowWrapper, StorableRow}, + row::{RowWrapper, StorableRow}, Data, DataExecutionError, + DATA_INNER_LENGTH, }, prelude::Link, }; @@ -120,7 +120,6 @@ where } return Ok(link); } - // Ok(l) => return Ok(l), Err(e) => match e { DataExecutionError::InvalidLink => { self.empty_links.push(link); @@ -508,8 +507,8 @@ impl ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; use std::thread; use std::time::Instant; @@ -519,7 +518,7 @@ mod tests { use crate::in_memory::data::Data; use crate::in_memory::pages::{DataPages, ExecutionError}; - use crate::in_memory::{DATA_INNER_LENGTH, PagesExecutionError, RowWrapper, StorableRow}; + use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow, DATA_INNER_LENGTH}; use crate::prelude::ArchivedRowWrapper; #[derive( diff --git a/src/index/primary_index.rs b/src/index/primary_index.rs index c49a4c0..337b417 100644 --- a/src/index/primary_index.rs +++ b/src/index/primary_index.rs @@ -12,7 +12,7 @@ use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use crate::util::OffsetEqLink; -use crate::{IndexMap, TableIndex, TableIndexCdc, convert_change_events}; +use crate::{convert_change_events, IndexMap, TableIndex, TableIndexCdc}; /// Combined storage for primary and reverse indexes. /// @@ -66,7 +66,14 @@ where fn insert_checked(&self, value: PrimaryKey, link: Link) -> Option<()> { let offset_link = OffsetEqLink(link); self.pk_map.checked_insert(value.clone(), offset_link)?; - self.reverse_pk_map.checked_insert(offset_link, value)?; + if self + .reverse_pk_map + .checked_insert(offset_link, value) + .is_none() + { + println!("Reverse map checked insert failed"); + return None; + } Some(()) } diff --git a/src/table/mod.rs b/src/table/mod.rs index d718f01..5a14480 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -8,8 +8,8 @@ use crate::prelude::{Link, LockMap, OperationId, PrimaryKeyGeneratorState}; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; use crate::util::OffsetEqLink; use crate::{ - AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, TableIndexCdc, TableRow, - TableSecondaryIndex, TableSecondaryIndexCdc, convert_change_events, in_memory, + convert_change_events, in_memory, AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, + TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, }; use data_bucket::INNER_PAGE_SIZE; use derive_more::{Display, Error, From}; @@ -19,9 +19,9 @@ use indexset::core::pair::Pair; use performance_measurement_codegen::performance_measurement; use rkyv::api::high::HighDeserializer; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use std::fmt::Debug; @@ -197,12 +197,14 @@ where .data .insert(row.clone()) .map_err(WorkTableError::PagesError)?; + println!("Inserting {:?} to {:?}", pk, link); if self .primary_index .insert_checked(pk.clone(), link) .is_none() { self.data.delete(link).map_err(WorkTableError::PagesError)?; + println!("Already exists {:?}", pk); return Err(WorkTableError::AlreadyExists("Primary".to_string())); }; if let Err(e) = self.indexes.save_row(row.clone(), link) { diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index e93291a..7d76d30 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -4,23 +4,23 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Instant; -use data_bucket::Link; use data_bucket::page::PageId; +use data_bucket::Link; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; use crate::lock::{Lock, LockMap, RowLock}; use crate::prelude::{OffsetEqLink, TablePrimaryKey}; +use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::vacuum::VacuumStats; use crate::vacuum::WorkTableVacuum; -use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::{ AvailableIndex, PrimaryIndex, TableIndex, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, }; @@ -271,6 +271,8 @@ where .expect("page is not full as checked on links collection"); self.update_index_after_move(pk.clone(), from_link.0, new_link); + println!("Moved {:?} from {:?} to {:?}", pk, from_link, new_link); + lock.unlock(); self.lock_manager.remove_with_lock_check(&pk); } @@ -390,7 +392,7 @@ mod tests { use std::sync::Arc; use indexset::core::pair::Pair; - use worktable_codegen::{MemStat, worktable}; + use worktable_codegen::{worktable, MemStat}; use crate::in_memory::{ArchivedRowWrapper, RowWrapper, StorableRow}; use crate::prelude::*; diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index 49a8864..8110561 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -5,20 +5,1086 @@ use std::sync::Arc; use std::time::Duration; use worktable::prelude::*; use worktable::vacuum::{VacuumManager, VacuumManagerConfig}; -use worktable_codegen::worktable; -worktable!( - name: VacuumTest, - columns: { - id: u64 primary_key autoincrement, - value: i64, - data: String - }, - indexes: { - value_idx: value unique, - data_idx: data, +// worktable!( +// name: VacuumTest, +// columns: { +// id: u64 primary_key autoincrement, +// value: i64, +// data: String +// }, +// indexes: { +// value_idx: value unique, +// data_idx: data, +// } +// ); + +#[derive( + Clone, + rkyv::Archive, + Debug, + Default, + rkyv::Deserialize, + Hash, + rkyv::Serialize, + From, + Eq, + Into, + PartialEq, + PartialOrd, + Ord, + SizeMeasure, + MemStat, +)] +#[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))] +pub struct VacuumTestPrimaryKey(u64); +impl TablePrimaryKey for VacuumTestPrimaryKey { + type Generator = std::sync::atomic::AtomicU64; +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] +#[rkyv(derive(Debug))] +#[repr(C)] +pub struct VacuumTestRow { + pub id: u64, + pub value: i64, + pub data: String, +} +impl TableRow for VacuumTestRow { + fn get_primary_key(&self) -> VacuumTestPrimaryKey { + self.id.clone().into() + } +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq)] +#[rkyv(derive(Debug))] +#[repr(C)] +pub enum VacuumTestRowFields { + Id, + Value, + Data, +} +impl Query for VacuumTestRow { + fn merge(self, row: VacuumTestRow) -> VacuumTestRow { + self + } +} +#[derive(Clone, Debug, From, PartialEq)] +#[non_exhaustive] +pub enum VacuumTestAvaiableTypes { + #[from] + STRING(String), + #[from] + I64(i64), +} +#[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] +#[repr(C)] +pub struct VacuumTestWrapper { + inner: VacuumTestRow, + is_ghosted: bool, + is_deleted: bool, + is_in_vacuum_process: bool, +} +impl RowWrapper for VacuumTestWrapper { + fn get_inner(self) -> VacuumTestRow { + self.inner + } + fn is_ghosted(&self) -> bool { + self.is_ghosted + } + fn is_vacuumed(&self) -> bool { + self.is_in_vacuum_process } + fn is_deleted(&self) -> bool { + self.is_deleted + } + fn from_inner(inner: VacuumTestRow) -> Self { + Self { + inner, + is_ghosted: true, + is_deleted: false, + is_in_vacuum_process: false, + } + } +} +impl StorableRow for VacuumTestRow { + type WrappedRow = VacuumTestWrapper; +} +impl ArchivedRowWrapper for ArchivedVacuumTestWrapper { + fn unghost(&mut self) { + self.is_ghosted = false; + } + fn set_in_vacuum_process(&mut self) { + self.is_in_vacuum_process = true; + } + fn delete(&mut self) { + self.is_deleted = true; + } + fn is_deleted(&self) -> bool { + self.is_deleted + } +} +#[derive(Debug, Clone)] +pub struct VacuumTestLock { + id_lock: Option>, + value_lock: Option>, + data_lock: Option>, +} +impl VacuumTestLock { + pub fn new() -> Self { + Self { + id_lock: None, + value_lock: None, + data_lock: None, + } + } +} +impl RowLock for VacuumTestLock { + fn is_locked(&self) -> bool { + self.id_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + || self + .value_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + || self + .data_lock + .as_ref() + .map(|l| l.is_locked()) + .unwrap_or(false) + } + #[allow(clippy::mutable_key_type)] + fn lock( + &mut self, + id: u16, + ) -> ( + std::collections::HashSet>, + std::sync::Arc, + ) { + let mut set = std::collections::HashSet::new(); + let lock = std::sync::Arc::new(Lock::new(id)); + if let Some(lock) = &self.id_lock { + set.insert(lock.clone()); + } + self.id_lock = Some(lock.clone()); + if let Some(lock) = &self.value_lock { + set.insert(lock.clone()); + } + self.value_lock = Some(lock.clone()); + if let Some(lock) = &self.data_lock { + set.insert(lock.clone()); + } + self.data_lock = Some(lock.clone()); + (set, lock) + } + fn with_lock(id: u16) -> (Self, std::sync::Arc) { + let lock = std::sync::Arc::new(Lock::new(id)); + ( + Self { + id_lock: Some(lock.clone()), + value_lock: Some(lock.clone()), + data_lock: Some(lock.clone()), + }, + lock, + ) + } + #[allow(clippy::mutable_key_type)] + fn merge(&mut self, other: &mut Self) -> std::collections::HashSet> { + let mut set = std::collections::HashSet::new(); + if let Some(id_lock) = &other.id_lock { + if self.id_lock.is_none() { + self.id_lock = Some(id_lock.clone()); + } else { + set.insert(id_lock.clone()); + } + } + other.id_lock = self.id_lock.clone(); + if let Some(value_lock) = &other.value_lock { + if self.value_lock.is_none() { + self.value_lock = Some(value_lock.clone()); + } else { + set.insert(value_lock.clone()); + } + } + other.value_lock = self.value_lock.clone(); + if let Some(data_lock) = &other.data_lock { + if self.data_lock.is_none() { + self.data_lock = Some(data_lock.clone()); + } else { + set.insert(data_lock.clone()); + } + } + other.data_lock = self.data_lock.clone(); + set + } +} +#[derive(Debug, MemStat)] +pub struct VacuumTestIndex { + value_idx: IndexMap, + data_idx: + IndexMultiMap>>, +} +impl TableSecondaryIndex + for VacuumTestIndex +{ + fn save_row( + &self, + row: VacuumTestRow, + link: Link, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + if self + .value_idx + .insert_checked(row.value.clone(), link) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: VacuumTestAvailableIndexes::ValueIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); + if self + .data_idx + .insert_checked(row.data.clone(), link) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: VacuumTestAvailableIndexes::DataIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(VacuumTestAvailableIndexes::DataIdx); + core::result::Result::Ok(()) + } + fn reinsert_row( + &self, + row_old: VacuumTestRow, + link_old: Link, + row_new: VacuumTestRow, + link_new: Link, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + let row = &row_new; + let val_new = row.value.clone(); + let row = &row_old; + let val_old = row.value.clone(); + if val_new != val_old { + if self + .value_idx + .insert_checked(val_new.clone(), link_new) + .is_none() + { + return Err(IndexError::AlreadyExists { + at: VacuumTestAvailableIndexes::ValueIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); + } + let row = &row_new; + let val_new = row.value.clone(); + let row = &row_old; + let val_old = row.value.clone(); + if val_new == val_old { + TableIndex::insert(&self.value_idx, val_new.clone(), link_new); + } else { + TableIndex::remove(&self.value_idx, &val_old, link_old); + } + let row = &row_new; + let val_new = row.data.clone(); + let row = &row_old; + let val_old = row.data.clone(); + TableIndex::insert(&self.data_idx, val_new.clone(), link_new); + TableIndex::remove(&self.data_idx, &val_old, link_old); + core::result::Result::Ok(()) + } + fn delete_row( + &self, + row: VacuumTestRow, + link: Link, + ) -> core::result::Result<(), IndexError> { + TableIndex::remove(&self.value_idx, &row.value, link); + TableIndex::remove(&self.data_idx, &row.data, link); + core::result::Result::Ok(()) + } + fn process_difference_insert( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference>, + ) -> core::result::Result<(), IndexError> { + let mut inserted_indexes: Vec = vec![]; + if let Some(diff) = difference.get("value") { + if let VacuumTestAvaiableTypes::I64(new) = &diff.new { + let key_new = *new; + if TableIndex::insert_checked(&self.value_idx, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: VacuumTestAvailableIndexes::ValueIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); + } + } + if let Some(diff) = difference.get("data") { + if let VacuumTestAvaiableTypes::STRING(new) = &diff.new { + let key_new = new.to_string(); + if TableIndex::insert_checked(&self.data_idx, key_new, link).is_none() { + return Err(IndexError::AlreadyExists { + at: VacuumTestAvailableIndexes::DataIdx, + inserted_already: inserted_indexes.clone(), + }); + } + inserted_indexes.push(VacuumTestAvailableIndexes::DataIdx); + } + } + core::result::Result::Ok(()) + } + fn process_difference_remove( + &self, + link: Link, + difference: std::collections::HashMap<&str, Difference>, + ) -> core::result::Result<(), IndexError> { + if let Some(diff) = difference.get("value") { + if let VacuumTestAvaiableTypes::I64(old) = &diff.old { + let key_old = *old; + TableIndex::remove(&self.value_idx, &key_old, link); + } + } + if let Some(diff) = difference.get("data") { + if let VacuumTestAvaiableTypes::STRING(old) = &diff.old { + let key_old = old.to_string(); + TableIndex::remove(&self.data_idx, &key_old, link); + } + } + core::result::Result::Ok(()) + } + fn delete_from_indexes( + &self, + row: VacuumTestRow, + link: Link, + indexes: Vec, + ) -> core::result::Result<(), IndexError> { + for index in indexes { + match index { + VacuumTestAvailableIndexes::ValueIdx => { + TableIndex::remove(&self.value_idx, &row.value, link); + } + VacuumTestAvailableIndexes::DataIdx => { + TableIndex::remove(&self.data_idx, &row.data, link); + } + } + } + core::result::Result::Ok(()) + } +} +impl TableSecondaryIndexInfo for VacuumTestIndex { + fn index_info(&self) -> Vec { + let mut info = Vec::new(); + info.push(IndexInfo { + name: "value_idx".to_string(), + index_type: IndexKind::Unique, + key_count: self.value_idx.len(), + capacity: self.value_idx.capacity(), + heap_size: self.value_idx.heap_size(), + used_size: self.value_idx.used_size(), + node_count: self.value_idx.node_count(), + }); + info.push(IndexInfo { + name: "data_idx".to_string(), + index_type: IndexKind::NonUnique, + key_count: self.data_idx.len(), + capacity: self.data_idx.capacity(), + heap_size: self.data_idx.heap_size(), + used_size: self.data_idx.used_size(), + node_count: self.data_idx.node_count(), + }); + info + } + fn is_empty(&self) -> bool { + self.value_idx.len() == 0 && self.data_idx.len() == 0 + } +} +impl Default for VacuumTestIndex { + fn default() -> Self { + Self { + value_idx: IndexMap::with_maximum_node_size( + get_index_page_size_from_data_length::(VACUUM_TEST_INNER_SIZE), + ), + data_idx: IndexMultiMap::with_maximum_node_size(VACUUM_TEST_INNER_SIZE), + } + } +} +#[derive(Debug, Clone, Copy, MoreDisplay, PartialEq, PartialOrd, Ord, Hash, Eq)] +pub enum VacuumTestAvailableIndexes { + ValueIdx, + DataIdx, +} +impl AvailableIndex for VacuumTestAvailableIndexes { + fn to_string_value(&self) -> String { + ToString::to_string(&self) + } +} +const VACUUM_TEST_PAGE_SIZE: usize = PAGE_SIZE; +const VACUUM_TEST_INNER_SIZE: usize = VACUUM_TEST_PAGE_SIZE - GENERAL_HEADER_SIZE; +#[derive(Debug)] +pub struct VacuumTestWorkTable( + WorkTable< + VacuumTestRow, + VacuumTestPrimaryKey, + VacuumTestAvaiableTypes, + VacuumTestAvailableIndexes, + VacuumTestIndex, + VacuumTestLock, + ::Generator, + { INNER_PAGE_SIZE }, + Vec>>, + >, ); +impl Default for VacuumTestWorkTable { + fn default() -> Self { + let mut inner = WorkTable::default(); + inner.table_name = "VacuumTest"; + Self(inner) + } +} +impl VacuumTestWorkTable { + pub fn name(&self) -> &'static str { + &self.0.table_name + } + pub fn select(&self, pk: Pk) -> Option + where + VacuumTestPrimaryKey: From, + { + self.0.select(pk.into()) + } + pub fn insert( + &self, + row: VacuumTestRow, + ) -> core::result::Result { + self.0.insert(row) + } + pub async fn reinsert( + &self, + row_old: VacuumTestRow, + row_new: VacuumTestRow, + ) -> core::result::Result { + self.0.reinsert(row_old, row_new).await + } + pub async fn upsert(&self, row: VacuumTestRow) -> core::result::Result<(), WorkTableError> { + let pk = row.get_primary_key(); + let need_to_update = { + if let Some(link) = self.0.primary_index.pk_map.get(&pk) { + true + } else { + false + } + }; + if need_to_update { + self.update(row).await?; + } else { + self.insert(row)?; + } + core::result::Result::Ok(()) + } + pub fn count(&self) -> usize { + let count = self.0.primary_index.pk_map.len(); + count + } + pub fn get_next_pk(&self) -> VacuumTestPrimaryKey { + self.0.get_next_pk() + } + pub fn iter_with core::result::Result<(), WorkTableError>>( + &self, + f: F, + ) -> core::result::Result<(), WorkTableError> { + let first = self + .0 + .primary_index + .pk_map + .iter() + .next() + .map(|(k, v)| (k.clone(), v.0)); + let Some((mut k, link)) = first else { + return Ok(()); + }; + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data)?; + let mut ind = false; + while !ind { + let next = { + let mut iter = self.0.primary_index.pk_map.range(k.clone()..); + let next = iter + .next() + .map(|(k, v)| (k.clone(), v.0)) + .filter(|(key, _)| key != &k); + if next.is_some() { + next + } else { + iter.next().map(|(k, v)| (k.clone(), v.0)) + } + }; + if let Some((key, link)) = next { + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data)?; + k = key + } else { + ind = true; + }; + } + core::result::Result::Ok(()) + } + pub async fn iter_with_async< + F: Fn(VacuumTestRow) -> Fut, + Fut: std::future::Future>, + >( + &self, + f: F, + ) -> core::result::Result<(), WorkTableError> { + let first = self + .0 + .primary_index + .pk_map + .iter() + .next() + .map(|(k, v)| (k.clone(), v.0)); + let Some((mut k, link)) = first else { + return Ok(()); + }; + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data).await?; + let mut ind = false; + while !ind { + let next = { + let mut iter = self.0.primary_index.pk_map.range(k.clone()..); + let next = iter + .next() + .map(|(k, v)| (k.clone(), v.0)) + .filter(|(key, _)| key != &k); + if next.is_some() { + next + } else { + iter.next().map(|(k, v)| (k.clone(), v.0)) + } + }; + if let Some((key, link)) = next { + let data = self + .0 + .data + .select_non_ghosted(link) + .map_err(WorkTableError::PagesError)?; + f(data).await?; + k = key + } else { + ind = true; + }; + } + core::result::Result::Ok(()) + } + pub fn system_info(&self) -> SystemInfo { + self.0.system_info() + } + pub fn vacuum(&self) -> std::sync::Arc { + std::sync::Arc::new(EmptyDataVacuum::<_, _, _, _, _, _, VacuumTestLock, _>::new( + "VacuumTest", + std::sync::Arc::clone(&self.0.data), + std::sync::Arc::clone(&self.0.lock_manager), + std::sync::Arc::clone(&self.0.primary_index), + std::sync::Arc::clone(&self.0.indexes), + )) + } +} +impl VacuumTestWorkTable { + pub fn select_by_value(&self, by: i64) -> Option { + let link: Link = self + .0 + .indexes + .value_idx + .get(&by) + .map(|kv| kv.get().value.into())?; + self.0.data.select_non_ghosted(link).ok() + } + pub fn select_by_data( + &self, + by: String, + ) -> SelectQueryBuilder< + VacuumTestRow, + impl DoubleEndedIterator + '_, + VacuumTestColumnRange, + VacuumTestRowFields, + > { + let rows = self + .0 + .indexes + .data_idx + .get(&by) + .into_iter() + .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()) + .filter(move |r| &r.data == &by); + SelectQueryBuilder::new(rows) + } +} +impl SelectQueryExecutor + for SelectQueryBuilder +where + I: DoubleEndedIterator + Sized, +{ + fn where_by( + self, + predicate: F, + ) -> SelectQueryBuilder< + VacuumTestRow, + impl DoubleEndedIterator + Sized, + VacuumTestColumnRange, + VacuumTestRowFields, + > + where + F: FnMut(&VacuumTestRow) -> bool, + { + SelectQueryBuilder { + params: self.params, + iter: self.iter.filter(predicate), + } + } + fn execute(self) -> Result, WorkTableError> { + let mut iter: Box> = Box::new(self.iter); + if !self.params.range.is_empty() { + for (range, column) in &self.params.range { + iter = match (column, range.clone().into()) { + (VacuumTestRowFields::Id, VacuumTestColumnRange::U64(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (VacuumTestRowFields::Id, VacuumTestColumnRange::U64Inclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (VacuumTestRowFields::Id, VacuumTestColumnRange::U64From(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (VacuumTestRowFields::Id, VacuumTestColumnRange::U64To(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (VacuumTestRowFields::Id, VacuumTestColumnRange::U64ToInclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.id))) + as Box> + } + (VacuumTestRowFields::Value, VacuumTestColumnRange::I64(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.value))) + as Box> + } + (VacuumTestRowFields::Value, VacuumTestColumnRange::I64Inclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.value))) + as Box> + } + (VacuumTestRowFields::Value, VacuumTestColumnRange::I64From(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.value))) + as Box> + } + (VacuumTestRowFields::Value, VacuumTestColumnRange::I64To(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.value))) + as Box> + } + (VacuumTestRowFields::Value, VacuumTestColumnRange::I64ToInclusive(range)) => { + Box::new(iter.filter(move |row| range.contains(&row.value))) + as Box> + } + _ => unreachable!(), + }; + } + } + if !self.params.order.is_empty() { + let mut items: Vec = iter.collect(); + items.sort_by(|a, b| { + for (order, col) in &self.params.order { + match col { + VacuumTestRowFields::Id => { + let cmp = a.id.partial_cmp(&b.id).unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + VacuumTestRowFields::Value => { + let cmp = a + .value + .partial_cmp(&b.value) + .unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + VacuumTestRowFields::Data => { + let cmp = a + .data + .partial_cmp(&b.data) + .unwrap_or(std::cmp::Ordering::Equal); + if cmp != std::cmp::Ordering::Equal { + return match order { + Order::Asc => cmp, + Order::Desc => cmp.reverse(), + }; + } + } + _ => continue, + } + } + std::cmp::Ordering::Equal + }); + iter = Box::new(items.into_iter()); + } + let iter_result: Box> = + if let Some(offset) = self.params.offset { + Box::new(iter.skip(offset)) + } else { + Box::new(iter) + }; + let iter_result: Box> = + if let Some(limit) = self.params.limit { + Box::new(iter_result.take(limit)) + } else { + Box::new(iter_result) + }; + Ok(iter_result.collect()) + } +} +#[derive(Debug, Clone)] +pub enum VacuumTestColumnRange { + U64(std::ops::Range), + U64Inclusive(std::ops::RangeInclusive), + U64From(std::ops::RangeFrom), + U64To(std::ops::RangeTo), + U64ToInclusive(std::ops::RangeToInclusive), + I64(std::ops::Range), + I64Inclusive(std::ops::RangeInclusive), + I64From(std::ops::RangeFrom), + I64To(std::ops::RangeTo), + I64ToInclusive(std::ops::RangeToInclusive), +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::Range) -> Self { + Self::U64(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeInclusive) -> Self { + Self::U64Inclusive(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeFrom) -> Self { + Self::U64From(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeTo) -> Self { + Self::U64To(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeToInclusive) -> Self { + Self::U64ToInclusive(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::Range) -> Self { + Self::I64(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeInclusive) -> Self { + Self::I64Inclusive(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeFrom) -> Self { + Self::I64From(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeTo) -> Self { + Self::I64To(range) + } +} +impl From> for VacuumTestColumnRange { + fn from(range: std::ops::RangeToInclusive) -> Self { + Self::I64ToInclusive(range) + } +} +impl VacuumTestWorkTable { + pub fn select_all( + &self, + ) -> SelectQueryBuilder< + VacuumTestRow, + impl DoubleEndedIterator + '_ + Sized, + VacuumTestColumnRange, + VacuumTestRowFields, + > { + let iter = self + .0 + .primary_index + .pk_map + .iter() + .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()); + SelectQueryBuilder::new(iter) + } +} +impl VacuumTestWorkTable { + pub async fn update(&self, row: VacuumTestRow) -> core::result::Result<(), WorkTableError> { + let pk = row.get_primary_key(); + let op_lock = { + let lock_id = self.0.lock_manager.next_id(); + if let Some(lock) = self.0.lock_manager.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); + let mut link: Link = self + .0 + .primary_index + .pk_map + .get(&pk) + .map(|v| v.get().value.into()) + .ok_or(WorkTableError::NotFound)?; + let row_old = self.0.data.select_non_ghosted(link)?; + self.0.update_state.insert(pk.clone(), row_old); + let mut bytes = rkyv::to_bytes::(&row) + .map_err(|_| WorkTableError::SerializeError)?; + if true { + drop(_guard); + let op_lock = { + let lock_id = self.0.lock_manager.next_id(); + if let Some(lock) = self.0.lock_manager.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all( + locks.iter().map(|l| l.wait()).collect::>(), + ) + .await; + } + op_lock + } + }; + let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); + let row_old = self.0.data.select_non_ghosted(link)?; + if let Err(e) = self.reinsert(row_old, row).await { + self.0.update_state.remove(&pk); + return Err(e); + } + self.0.update_state.remove(&pk); + return core::result::Result::Ok(()); + } + let mut archived_row = unsafe { + rkyv::access_unchecked_mut::<::Archived>(&mut bytes[..]) + .unseal_unchecked() + }; + let op_id = OperationId::Single(uuid::Uuid::now_v7()); + let row_old = self.0.data.select_non_ghosted(link)?; + let row_new = row.clone(); + let updated_bytes: Vec = vec![]; + let mut diffs: std::collections::HashMap<&str, Difference> = + std::collections::HashMap::new(); + let old = &row_old.value; + let new = &row_new.value; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("value", diff); + } + let old = &row_old.data; + let new = &row_new.data; + if old != new { + let diff = Difference:: { + old: old.clone().into(), + new: new.clone().into(), + }; + diffs.insert("data", diff); + } + let indexes_res = self + .0 + .indexes + .process_difference_insert(link, diffs.clone()); + if let Err(e) = indexes_res { + return match e { + IndexError::AlreadyExists { + at, + inserted_already, + } => { + self.0.indexes.delete_from_indexes( + row_new.merge(row_old.clone()), + link, + inserted_already, + )?; + Err(WorkTableError::AlreadyExists(at.to_string_value())) + } + IndexError::NotFound => Err(WorkTableError::NotFound), + }; + } + unsafe { + self.0 + .data + .with_mut_ref(link, move |archived| { + std::mem::swap(&mut archived.inner.id, &mut archived_row.id); + std::mem::swap(&mut archived.inner.value, &mut archived_row.value); + std::mem::swap(&mut archived.inner.data, &mut archived_row.data); + }) + .map_err(WorkTableError::PagesError)? + }; + self.0.indexes.process_difference_remove(link, diffs)?; + self.0.update_state.remove(&pk); + core::result::Result::Ok(()) + } +} +impl VacuumTestWorkTable {} +impl VacuumTestWorkTable { + pub async fn delete(&self, pk: Pk) -> core::result::Result<(), WorkTableError> + where + VacuumTestPrimaryKey: From, + { + let pk: VacuumTestPrimaryKey = pk.into(); + let op_lock = { + let lock_id = self.0.lock_manager.next_id(); + if let Some(lock) = self.0.lock_manager.get(&pk) { + let mut lock_guard = lock.write().await; + #[allow(clippy::mutable_key_type)] + let (locks, op_lock) = lock_guard.lock(lock_id); + drop(lock_guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; + op_lock + } else { + #[allow(clippy::mutable_key_type)] + let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); + let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); + let mut guard = lock.write().await; + if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { + let mut old_lock_guard = old_lock.write().await; + #[allow(clippy::mutable_key_type)] + let locks = guard.merge(&mut *old_lock_guard); + drop(old_lock_guard); + drop(guard); + futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) + .await; + } + op_lock + } + }; + let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); + let link = match self + .0 + .primary_index + .pk_map + .get(&pk) + .map(|v| v.get().value.into()) + .ok_or(WorkTableError::NotFound) + { + Ok(l) => l, + Err(e) => { + return Err(e); + } + }; + + let Some(row) = self.0.select(pk.clone()) else { + println!("Found link {:?} for {:?}", link, pk); + panic!("Should exist") + }; + self.0.indexes.delete_row(row, link)?; + self.0.primary_index.remove(&pk, link); + self.0 + .data + .delete(link) + .map_err(WorkTableError::PagesError)?; + core::result::Result::Ok(()) + } + pub async fn delete_without_lock(&self, pk: Pk) -> core::result::Result<(), WorkTableError> + where + VacuumTestPrimaryKey: From, + { + let pk: VacuumTestPrimaryKey = pk.into(); + let link = self + .0 + .primary_index + .pk_map + .get(&pk) + .map(|v| v.get().value.into()) + .ok_or(WorkTableError::NotFound)?; + let row = self.0.select(pk.clone()).unwrap(); + self.0.indexes.delete_row(row, link)?; + self.0.primary_index.remove(&pk, link); + self.0 + .data + .delete(link) + .map_err(WorkTableError::PagesError)?; + core::result::Result::Ok(()) + } +} +impl VacuumTestWorkTable { + fn get_data_size(&self, link: Link) -> core::result::Result { + self.0 + .data + .with_ref(link, |row_ref| { + row_ref.inner.data.as_str().to_string().aligned_size() + }) + .map_err(WorkTableError::PagesError) + } +} #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn vacuum_parallel_with_selects() { @@ -211,7 +1277,6 @@ async fn vacuum_parallel_with_upserts() { } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] -#[ignore] async fn vacuum_loop_test() { let config = VacuumManagerConfig { check_interval: Duration::from_millis(1_000), @@ -244,6 +1309,7 @@ async fn vacuum_loop_test() { data: format!("test_data_{}", i), }; insert_table.insert(row.clone()).unwrap(); + println!("Inserted {:?}", row.id); tokio::time::sleep(Duration::from_micros(500)).await; i += 1; } @@ -267,7 +1333,8 @@ async fn vacuum_loop_test() { .map(|(_, l)| table.0.data.select(**l).unwrap()) .collect::>(); for row in ids_to_remove { - table.delete(row.id).await.unwrap() + table.delete(row.id).await.unwrap(); + println!("Removed {:?}", row.id); } } } From d7d0b74248039a7bf55465a6ff1841c71357970b Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Tue, 3 Mar 2026 16:59:55 +0300 Subject: [PATCH 02/10] debug --- src/in_memory/pages.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 84d7df2..bc1a7e4 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -118,6 +118,7 @@ where if let Some(l) = left_link { self.empty_links.push(l); } + println!("Inserted in empty link"); return Ok(link); } Err(e) => match e { @@ -144,6 +145,7 @@ where match link { Ok(link) => { self.row_count.fetch_add(1, Ordering::Relaxed); + println!("Inserted in new place"); return Ok(link); } Err(e) => match e { @@ -401,6 +403,7 @@ where } pub fn mark_page_empty(&self, page_id: PageId) { + println!("Mark {:?} as empty", page_id); if u32::from(page_id) != self.current_page_id.load(Ordering::Acquire) { let mut g = self.empty_pages.write(); g.push_back(page_id); From 34fffe313b4b208e17a645a24c9fedb5faf75f96 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 5 Mar 2026 10:31:53 +0300 Subject: [PATCH 03/10] WIP --- src/in_memory/pages.rs | 24 ++++++++++++++---------- src/table/vacuum/vacuum.rs | 12 ++++++++---- 2 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index bc1a7e4..be59def 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -4,25 +4,25 @@ use parking_lot::RwLock; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ - api::high::HighDeserializer, rancor::Strategy, ser::{allocator::ArenaHandle, sharing::Share, Serializer}, util::AlignedVec, - Archive, - Deserialize, - Portable, - Serialize, + Archive, Deserialize, Portable, Serialize, + api::high::HighDeserializer, + rancor::Strategy, + ser::{Serializer, allocator::ArenaHandle, sharing::Share}, + util::AlignedVec, }; use std::collections::VecDeque; use std::{ fmt::Debug, - sync::atomic::{AtomicU32, AtomicU64, Ordering}, sync::Arc, + sync::atomic::{AtomicU32, AtomicU64, Ordering}, }; use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::prelude::ArchivedRowWrapper; use crate::{ in_memory::{ - row::{RowWrapper, StorableRow}, Data, DataExecutionError, - DATA_INNER_LENGTH, + DATA_INNER_LENGTH, Data, DataExecutionError, + row::{RowWrapper, StorableRow}, }, prelude::Link, }; @@ -484,6 +484,10 @@ where self } + + pub fn current_page_id(&self) -> PageId { + self.current_page_id.load(Ordering::Acquire).into() + } } #[derive(Debug, Display, Error, From, PartialEq)] @@ -510,8 +514,8 @@ impl ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Instant; @@ -521,7 +525,7 @@ mod tests { use crate::in_memory::data::Data; use crate::in_memory::pages::{DataPages, ExecutionError}; - use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow, DATA_INNER_LENGTH}; + use crate::in_memory::{DATA_INNER_LENGTH, PagesExecutionError, RowWrapper, StorableRow}; use crate::prelude::ArchivedRowWrapper; #[derive( diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 7d76d30..5768dbf 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -4,23 +4,23 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Instant; -use data_bucket::page::PageId; use data_bucket::Link; +use data_bucket::page::PageId; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; -use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; use crate::lock::{Lock, LockMap, RowLock}; use crate::prelude::{OffsetEqLink, TablePrimaryKey}; -use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::vacuum::VacuumStats; use crate::vacuum::WorkTableVacuum; +use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::{ AvailableIndex, PrimaryIndex, TableIndex, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, }; @@ -138,6 +138,10 @@ where let info_iter = per_page_info.into_iter(); for info in info_iter { let page_from = info.page_id; + if self.data_pages.current_page_id() == page_from { + // don't touch current page or else inserts will be broken + continue; + } loop { let page_to = if let Some(id) = defragmented_pages.pop_front() { id @@ -392,7 +396,7 @@ mod tests { use std::sync::Arc; use indexset::core::pair::Pair; - use worktable_codegen::{worktable, MemStat}; + use worktable_codegen::{MemStat, worktable}; use crate::in_memory::{ArchivedRowWrapper, RowWrapper, StorableRow}; use crate::prelude::*; From d9ff39adb48ab3e3b32211bf8227359f431e704b Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:38:31 +0300 Subject: [PATCH 04/10] debug --- tests/worktable/vacuum.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index 8110561..b91df00 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -1037,6 +1037,7 @@ impl VacuumTestWorkTable { { Ok(l) => l, Err(e) => { + println!("Error getting primary index: {} for {:?}", e, pk.clone()); return Err(e); } }; From 6b36dcafa986890661fad6e5f03c44f1e8bca7da Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:36:57 +0300 Subject: [PATCH 05/10] =?UTF-8?q?=D0=A6=D0=A8=D0=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/in_memory/pages.rs | 23 +++++++++++++---------- src/table/vacuum/vacuum.rs | 14 +++++++++----- tests/worktable/vacuum.rs | 4 +++- 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index be59def..104bda7 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -4,25 +4,25 @@ use parking_lot::RwLock; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ - Archive, Deserialize, Portable, Serialize, - api::high::HighDeserializer, - rancor::Strategy, - ser::{Serializer, allocator::ArenaHandle, sharing::Share}, - util::AlignedVec, + api::high::HighDeserializer, rancor::Strategy, ser::{allocator::ArenaHandle, sharing::Share, Serializer}, util::AlignedVec, + Archive, + Deserialize, + Portable, + Serialize, }; use std::collections::VecDeque; use std::{ fmt::Debug, - sync::Arc, sync::atomic::{AtomicU32, AtomicU64, Ordering}, + sync::Arc, }; use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::prelude::ArchivedRowWrapper; use crate::{ in_memory::{ - DATA_INNER_LENGTH, Data, DataExecutionError, - row::{RowWrapper, StorableRow}, + row::{RowWrapper, StorableRow}, Data, DataExecutionError, + DATA_INNER_LENGTH, }, prelude::Link, }; @@ -266,6 +266,9 @@ where if gen_row.is_ghosted() { return Err(ExecutionError::Ghosted); } + if gen_row.is_deleted() { + return Err(ExecutionError::Ghosted); + } Ok(gen_row.get_inner()) } @@ -514,8 +517,8 @@ impl ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; use std::thread; use std::time::Instant; @@ -525,7 +528,7 @@ mod tests { use crate::in_memory::data::Data; use crate::in_memory::pages::{DataPages, ExecutionError}; - use crate::in_memory::{DATA_INNER_LENGTH, PagesExecutionError, RowWrapper, StorableRow}; + use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow, DATA_INNER_LENGTH}; use crate::prelude::ArchivedRowWrapper; #[derive( diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 5768dbf..40664ff 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -2,25 +2,25 @@ use std::collections::VecDeque; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; -use data_bucket::Link; use data_bucket::page::PageId; +use data_bucket::Link; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use rkyv::rancor::Strategy; -use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; +use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; use crate::lock::{Lock, LockMap, RowLock}; use crate::prelude::{OffsetEqLink, TablePrimaryKey}; +use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::vacuum::VacuumStats; use crate::vacuum::WorkTableVacuum; -use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::{ AvailableIndex, PrimaryIndex, TableIndex, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, }; @@ -123,6 +123,10 @@ where let registry = self.data_pages.empty_links_registry(); let mut per_page_info = registry.get_per_page_info(); let _registry_lock = registry.lock_vacuum().await; + + // to avoid some rewrites of ops that used link from empty links registry + tokio::time::sleep(Duration::from_millis(100)).await; + per_page_info.sort_by(|l, r| { OrderedFloat(l.filled_empty_ratio).cmp(&OrderedFloat(r.filled_empty_ratio)) }); @@ -396,7 +400,7 @@ mod tests { use std::sync::Arc; use indexset::core::pair::Pair; - use worktable_codegen::{MemStat, worktable}; + use worktable_codegen::{worktable, MemStat}; use crate::in_memory::{ArchivedRowWrapper, RowWrapper, StorableRow}; use crate::prelude::*; diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index b91df00..00a642f 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -1052,6 +1052,7 @@ impl VacuumTestWorkTable { .data .delete(link) .map_err(WorkTableError::PagesError)?; + println!("Deleted link {:?} for {:?}", link, pk); core::result::Result::Ok(()) } pub async fn delete_without_lock(&self, pk: Pk) -> core::result::Result<(), WorkTableError> @@ -1331,8 +1332,9 @@ async fn vacuum_loop_test() { .indexes .value_idx .range(..outdated_ts) - .map(|(_, l)| table.0.data.select(**l).unwrap()) + .map(|(_, l)| table.0.data.select_non_ghosted(**l).unwrap()) .collect::>(); + println!("Ids to Remove {:?}", ids_to_remove); for row in ids_to_remove { table.delete(row.id).await.unwrap(); println!("Removed {:?}", row.id); From 179e8ceddd9f9d24c4c058de07528d4004c85e48 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:52:22 +0300 Subject: [PATCH 06/10] WIP --- src/in_memory/pages.rs | 6 ++++++ tests/worktable/vacuum.rs | 15 +++++++++++++-- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index 104bda7..da19885 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -217,6 +217,9 @@ where let index = page_id_mapper(page_id.into()); let page = pages[index].clone(); page.reset(); + + println!("Used empty page {:?}", page_id); + return page; } @@ -224,6 +227,9 @@ where let index = self.last_page_id.fetch_add(1, Ordering::AcqRel) + 1; let page = Arc::new(Data::new(index.into())); pages.push(page.clone()); + + println!("Used new page {:?}", page_id); + page } diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index 00a642f..2b4f9c5 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -305,8 +305,19 @@ impl TableSecondaryIndex core::result::Result<(), IndexError> { - TableIndex::remove(&self.value_idx, &row.value, link); - TableIndex::remove(&self.data_idx, &row.data, link); + if let Some(v) = TableIndex::remove(&self.value_idx, &row.value, link) { + println!("Delete value from value_idx: {:?}", v); + } else { + println!( + "Not deleted value from value_idx: {:?} {:?}", + row.value, link + ); + } + if let Some(v) = TableIndex::remove(&self.data_idx, &row.data, link) { + println!("Delete value from data_idx: {:?}", v); + } else { + println!("Not deleted from data_idx: {:?} {:?}", row.data, link); + } core::result::Result::Ok(()) } fn process_difference_insert( From 6b73e0ddb7a607955680ff28307cf40b62ef757f Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 12 Mar 2026 18:19:55 +0300 Subject: [PATCH 07/10] heehee debug --- src/index/table_index/mod.rs | 15 ++++++++++++--- tests/worktable/vacuum.rs | 5 +++-- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index 88cf043..65bdad7 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -27,10 +27,12 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { + //println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { + //println!("Checked Insert to indexmap: {:?} {:?}", value, link); if self.insert(value, OffsetEqLink(link)).is_some() { None } else { @@ -39,8 +41,11 @@ where } fn remove(&self, value: &T, link: Link) -> Option<(T, Link)> { - self.remove(value, &OffsetEqLink(link)) - .map(|(v, l)| (v, l.0)) + let res = self + .remove(value, &OffsetEqLink(link)) + .map(|(v, l)| (v, l.0)); + //println!("Remove from indexmap: {:?} {:?} {:?}", value, link, res); + res } } @@ -50,14 +55,18 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { + //println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { + //println!("Checked Insert to indexmap: {:?} {:?}", value, link); self.checked_insert(value, OffsetEqLink(link)) } fn remove(&self, value: &T, _: Link) -> Option<(T, Link)> { - self.remove(value).map(|(v, l)| (v, l.0)) + let res = self.remove(value).map(|(v, l)| (v, l.0)); + //println!("Remove from indexmap: {:?} {:?}", value, res); + res } } diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index 2b4f9c5..fa742e5 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -1057,6 +1057,7 @@ impl VacuumTestWorkTable { println!("Found link {:?} for {:?}", link, pk); panic!("Should exist") }; + println!("Deleted row {:?}", row); self.0.indexes.delete_row(row, link)?; self.0.primary_index.remove(&pk, link); self.0 @@ -1343,10 +1344,10 @@ async fn vacuum_loop_test() { .indexes .value_idx .range(..outdated_ts) - .map(|(_, l)| table.0.data.select_non_ghosted(**l).unwrap()) + .map(|(v, l)| (table.0.data.select_non_ghosted(**l).unwrap(), l, v)) .collect::>(); println!("Ids to Remove {:?}", ids_to_remove); - for row in ids_to_remove { + for (row, _, _) in ids_to_remove { table.delete(row.id).await.unwrap(); println!("Removed {:?}", row.id); } From 4e84da2c9e3ab35f257d0d5b7255b7106fa83387 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:44:55 +0300 Subject: [PATCH 08/10] yayay --- src/in_memory/pages.rs | 23 ++++++++++------------- src/index/table_index/mod.rs | 12 ++++++------ src/table/vacuum/vacuum.rs | 20 +++++++++++++------- 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index da19885..e20d5cb 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -4,25 +4,25 @@ use parking_lot::RwLock; #[cfg(feature = "perf_measurements")] use performance_measurement_codegen::performance_measurement; use rkyv::{ - api::high::HighDeserializer, rancor::Strategy, ser::{allocator::ArenaHandle, sharing::Share, Serializer}, util::AlignedVec, - Archive, - Deserialize, - Portable, - Serialize, + Archive, Deserialize, Portable, Serialize, + api::high::HighDeserializer, + rancor::Strategy, + ser::{Serializer, allocator::ArenaHandle, sharing::Share}, + util::AlignedVec, }; use std::collections::VecDeque; use std::{ fmt::Debug, - sync::atomic::{AtomicU32, AtomicU64, Ordering}, sync::Arc, + sync::atomic::{AtomicU32, AtomicU64, Ordering}, }; use crate::in_memory::empty_link_registry::EmptyLinkRegistry; use crate::prelude::ArchivedRowWrapper; use crate::{ in_memory::{ - row::{RowWrapper, StorableRow}, Data, DataExecutionError, - DATA_INNER_LENGTH, + DATA_INNER_LENGTH, Data, DataExecutionError, + row::{RowWrapper, StorableRow}, }, prelude::Link, }; @@ -272,9 +272,6 @@ where if gen_row.is_ghosted() { return Err(ExecutionError::Ghosted); } - if gen_row.is_deleted() { - return Err(ExecutionError::Ghosted); - } Ok(gen_row.get_inner()) } @@ -523,8 +520,8 @@ impl ExecutionError { #[cfg(test)] mod tests { use std::collections::HashSet; - use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; + use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; use std::time::Instant; @@ -534,7 +531,7 @@ mod tests { use crate::in_memory::data::Data; use crate::in_memory::pages::{DataPages, ExecutionError}; - use crate::in_memory::{PagesExecutionError, RowWrapper, StorableRow, DATA_INNER_LENGTH}; + use crate::in_memory::{DATA_INNER_LENGTH, PagesExecutionError, RowWrapper, StorableRow}; use crate::prelude::ArchivedRowWrapper; #[derive( diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index 65bdad7..fdaf82f 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -27,12 +27,12 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - //println!("Insert to indexmap: {:?} {:?}", value, link); + println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - //println!("Checked Insert to indexmap: {:?} {:?}", value, link); + println!("Checked Insert to indexmap: {:?} {:?}", value, link); if self.insert(value, OffsetEqLink(link)).is_some() { None } else { @@ -44,7 +44,7 @@ where let res = self .remove(value, &OffsetEqLink(link)) .map(|(v, l)| (v, l.0)); - //println!("Remove from indexmap: {:?} {:?} {:?}", value, link, res); + println!("Remove from indexmap: {:?} {:?} {:?}", value, link, res); res } } @@ -55,18 +55,18 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - //println!("Insert to indexmap: {:?} {:?}", value, link); + println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - //println!("Checked Insert to indexmap: {:?} {:?}", value, link); + println!("Checked Insert to indexmap: {:?} {:?}", value, link); self.checked_insert(value, OffsetEqLink(link)) } fn remove(&self, value: &T, _: Link) -> Option<(T, Link)> { let res = self.remove(value).map(|(v, l)| (v, l.0)); - //println!("Remove from indexmap: {:?} {:?}", value, res); + println!("Remove from indexmap: {:?} {:?}", value, res); res } } diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 40664ff..72a4bd1 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -4,23 +4,23 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::{Duration, Instant}; -use data_bucket::page::PageId; use data_bucket::Link; +use data_bucket::page::PageId; use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; -use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; use crate::lock::{Lock, LockMap, RowLock}; use crate::prelude::{OffsetEqLink, TablePrimaryKey}; -use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::vacuum::VacuumStats; use crate::vacuum::WorkTableVacuum; +use crate::vacuum::fragmentation_info::FragmentationInfo; use crate::{ AvailableIndex, PrimaryIndex, TableIndex, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, }; @@ -41,7 +41,7 @@ pub struct EmptyDataVacuum< SecondaryEvents = (), > where PrimaryKey: Clone + Ord + Send + 'static + std::hash::Hash, - Row: StorableRow + Send + Clone + 'static, + Row: StorableRow + Send + Clone + 'static + Debug, PkNodeType: NodeLike>> + Send + 'static, { table_name: &'static str, @@ -87,7 +87,7 @@ where + Clone + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + Debug, ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, @@ -242,6 +242,10 @@ where break; }; + if next.page_id != from { + continue; + } + if sum_links_len + next.length > to_free_space as u32 { to_page_will_be_filled = true; if range.next().is_none() { @@ -323,6 +327,8 @@ where .select(new_link) .expect("should exist as link was moved correctly"); + println!("Updating indexes for {:?} with pk {:?}", row, pk); + self.secondary_indexes .reinsert_row(row.clone(), old_link, row, new_link) .expect("should be ok as index were no violated"); @@ -362,7 +368,7 @@ where + Clone + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, - >, + > + Debug, ::WrappedRow: Archive + for<'a> Serialize< Strategy, Share>, rkyv::rancor::Error>, @@ -400,7 +406,7 @@ mod tests { use std::sync::Arc; use indexset::core::pair::Pair; - use worktable_codegen::{worktable, MemStat}; + use worktable_codegen::{MemStat, worktable}; use crate::in_memory::{ArchivedRowWrapper, RowWrapper, StorableRow}; use crate::prelude::*; From a97ebe443941abef0dd2ed30ada73d023c873acb Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 12 Mar 2026 22:53:30 +0300 Subject: [PATCH 09/10] WIP --- src/in_memory/pages.rs | 7 - src/index/primary_index.rs | 3 +- src/index/table_index/mod.rs | 15 +- src/table/mod.rs | 8 +- src/table/vacuum/vacuum.rs | 4 - tests/worktable/vacuum.rs | 1106 +--------------------------------- 6 files changed, 19 insertions(+), 1124 deletions(-) diff --git a/src/in_memory/pages.rs b/src/in_memory/pages.rs index e20d5cb..d724541 100644 --- a/src/in_memory/pages.rs +++ b/src/in_memory/pages.rs @@ -118,7 +118,6 @@ where if let Some(l) = left_link { self.empty_links.push(l); } - println!("Inserted in empty link"); return Ok(link); } Err(e) => match e { @@ -145,7 +144,6 @@ where match link { Ok(link) => { self.row_count.fetch_add(1, Ordering::Relaxed); - println!("Inserted in new place"); return Ok(link); } Err(e) => match e { @@ -218,8 +216,6 @@ where let page = pages[index].clone(); page.reset(); - println!("Used empty page {:?}", page_id); - return page; } @@ -228,8 +224,6 @@ where let page = Arc::new(Data::new(index.into())); pages.push(page.clone()); - println!("Used new page {:?}", page_id); - page } @@ -409,7 +403,6 @@ where } pub fn mark_page_empty(&self, page_id: PageId) { - println!("Mark {:?} as empty", page_id); if u32::from(page_id) != self.current_page_id.load(Ordering::Acquire) { let mut g = self.empty_pages.write(); g.push_back(page_id); diff --git a/src/index/primary_index.rs b/src/index/primary_index.rs index 337b417..8bd9440 100644 --- a/src/index/primary_index.rs +++ b/src/index/primary_index.rs @@ -12,7 +12,7 @@ use indexset::core::node::NodeLike; use indexset::core::pair::Pair; use crate::util::OffsetEqLink; -use crate::{convert_change_events, IndexMap, TableIndex, TableIndexCdc}; +use crate::{IndexMap, TableIndex, TableIndexCdc, convert_change_events}; /// Combined storage for primary and reverse indexes. /// @@ -71,7 +71,6 @@ where .checked_insert(offset_link, value) .is_none() { - println!("Reverse map checked insert failed"); return None; } Some(()) diff --git a/src/index/table_index/mod.rs b/src/index/table_index/mod.rs index fdaf82f..88cf043 100644 --- a/src/index/table_index/mod.rs +++ b/src/index/table_index/mod.rs @@ -27,12 +27,10 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - println!("Checked Insert to indexmap: {:?} {:?}", value, link); if self.insert(value, OffsetEqLink(link)).is_some() { None } else { @@ -41,11 +39,8 @@ where } fn remove(&self, value: &T, link: Link) -> Option<(T, Link)> { - let res = self - .remove(value, &OffsetEqLink(link)) - .map(|(v, l)| (v, l.0)); - println!("Remove from indexmap: {:?} {:?} {:?}", value, link, res); - res + self.remove(value, &OffsetEqLink(link)) + .map(|(v, l)| (v, l.0)) } } @@ -55,18 +50,14 @@ where Node: NodeLike> + Send + 'static, { fn insert(&self, value: T, link: Link) -> Option { - println!("Insert to indexmap: {:?} {:?}", value, link); self.insert(value, OffsetEqLink(link)).map(|l| l.0) } fn insert_checked(&self, value: T, link: Link) -> Option<()> { - println!("Checked Insert to indexmap: {:?} {:?}", value, link); self.checked_insert(value, OffsetEqLink(link)) } fn remove(&self, value: &T, _: Link) -> Option<(T, Link)> { - let res = self.remove(value).map(|(v, l)| (v, l.0)); - println!("Remove from indexmap: {:?} {:?}", value, res); - res + self.remove(value).map(|(v, l)| (v, l.0)) } } diff --git a/src/table/mod.rs b/src/table/mod.rs index 5a14480..d718f01 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -8,8 +8,8 @@ use crate::prelude::{Link, LockMap, OperationId, PrimaryKeyGeneratorState}; use crate::primary_key::{PrimaryKeyGenerator, TablePrimaryKey}; use crate::util::OffsetEqLink; use crate::{ - convert_change_events, in_memory, AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, - TableIndexCdc, TableRow, TableSecondaryIndex, TableSecondaryIndexCdc, + AvailableIndex, IndexError, IndexMap, PrimaryIndex, TableIndex, TableIndexCdc, TableRow, + TableSecondaryIndex, TableSecondaryIndexCdc, convert_change_events, in_memory, }; use data_bucket::INNER_PAGE_SIZE; use derive_more::{Display, Error, From}; @@ -19,9 +19,9 @@ use indexset::core::pair::Pair; use performance_measurement_codegen::performance_measurement; use rkyv::api::high::HighDeserializer; use rkyv::rancor::Strategy; +use rkyv::ser::Serializer; use rkyv::ser::allocator::ArenaHandle; use rkyv::ser::sharing::Share; -use rkyv::ser::Serializer; use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use std::fmt::Debug; @@ -197,14 +197,12 @@ where .data .insert(row.clone()) .map_err(WorkTableError::PagesError)?; - println!("Inserting {:?} to {:?}", pk, link); if self .primary_index .insert_checked(pk.clone(), link) .is_none() { self.data.delete(link).map_err(WorkTableError::PagesError)?; - println!("Already exists {:?}", pk); return Err(WorkTableError::AlreadyExists("Primary".to_string())); }; if let Err(e) = self.indexes.save_row(row.clone(), link) { diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 72a4bd1..e508127 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -283,8 +283,6 @@ where .expect("page is not full as checked on links collection"); self.update_index_after_move(pk.clone(), from_link.0, new_link); - println!("Moved {:?} from {:?} to {:?}", pk, from_link, new_link); - lock.unlock(); self.lock_manager.remove_with_lock_check(&pk); } @@ -327,8 +325,6 @@ where .select(new_link) .expect("should exist as link was moved correctly"); - println!("Updating indexes for {:?} with pk {:?}", row, pk); - self.secondary_indexes .reinsert_row(row.clone(), old_link, row, new_link) .expect("should be ok as index were no violated"); diff --git a/tests/worktable/vacuum.rs b/tests/worktable/vacuum.rs index fa742e5..df12d31 100644 --- a/tests/worktable/vacuum.rs +++ b/tests/worktable/vacuum.rs @@ -5,1100 +5,20 @@ use std::sync::Arc; use std::time::Duration; use worktable::prelude::*; use worktable::vacuum::{VacuumManager, VacuumManagerConfig}; +use worktable_codegen::worktable; -// worktable!( -// name: VacuumTest, -// columns: { -// id: u64 primary_key autoincrement, -// value: i64, -// data: String -// }, -// indexes: { -// value_idx: value unique, -// data_idx: data, -// } -// ); - -#[derive( - Clone, - rkyv::Archive, - Debug, - Default, - rkyv::Deserialize, - Hash, - rkyv::Serialize, - From, - Eq, - Into, - PartialEq, - PartialOrd, - Ord, - SizeMeasure, - MemStat, -)] -#[rkyv(derive(PartialEq, Eq, PartialOrd, Ord, Debug))] -pub struct VacuumTestPrimaryKey(u64); -impl TablePrimaryKey for VacuumTestPrimaryKey { - type Generator = std::sync::atomic::AtomicU64; -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq, MemStat)] -#[rkyv(derive(Debug))] -#[repr(C)] -pub struct VacuumTestRow { - pub id: u64, - pub value: i64, - pub data: String, -} -impl TableRow for VacuumTestRow { - fn get_primary_key(&self) -> VacuumTestPrimaryKey { - self.id.clone().into() - } -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, Clone, rkyv::Serialize, PartialEq)] -#[rkyv(derive(Debug))] -#[repr(C)] -pub enum VacuumTestRowFields { - Id, - Value, - Data, -} -impl Query for VacuumTestRow { - fn merge(self, row: VacuumTestRow) -> VacuumTestRow { - self - } -} -#[derive(Clone, Debug, From, PartialEq)] -#[non_exhaustive] -pub enum VacuumTestAvaiableTypes { - #[from] - STRING(String), - #[from] - I64(i64), -} -#[derive(rkyv::Archive, Debug, rkyv::Deserialize, rkyv::Serialize)] -#[repr(C)] -pub struct VacuumTestWrapper { - inner: VacuumTestRow, - is_ghosted: bool, - is_deleted: bool, - is_in_vacuum_process: bool, -} -impl RowWrapper for VacuumTestWrapper { - fn get_inner(self) -> VacuumTestRow { - self.inner - } - fn is_ghosted(&self) -> bool { - self.is_ghosted - } - fn is_vacuumed(&self) -> bool { - self.is_in_vacuum_process - } - fn is_deleted(&self) -> bool { - self.is_deleted - } - fn from_inner(inner: VacuumTestRow) -> Self { - Self { - inner, - is_ghosted: true, - is_deleted: false, - is_in_vacuum_process: false, - } - } -} -impl StorableRow for VacuumTestRow { - type WrappedRow = VacuumTestWrapper; -} -impl ArchivedRowWrapper for ArchivedVacuumTestWrapper { - fn unghost(&mut self) { - self.is_ghosted = false; - } - fn set_in_vacuum_process(&mut self) { - self.is_in_vacuum_process = true; - } - fn delete(&mut self) { - self.is_deleted = true; - } - fn is_deleted(&self) -> bool { - self.is_deleted - } -} -#[derive(Debug, Clone)] -pub struct VacuumTestLock { - id_lock: Option>, - value_lock: Option>, - data_lock: Option>, -} -impl VacuumTestLock { - pub fn new() -> Self { - Self { - id_lock: None, - value_lock: None, - data_lock: None, - } - } -} -impl RowLock for VacuumTestLock { - fn is_locked(&self) -> bool { - self.id_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - || self - .value_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - || self - .data_lock - .as_ref() - .map(|l| l.is_locked()) - .unwrap_or(false) - } - #[allow(clippy::mutable_key_type)] - fn lock( - &mut self, - id: u16, - ) -> ( - std::collections::HashSet>, - std::sync::Arc, - ) { - let mut set = std::collections::HashSet::new(); - let lock = std::sync::Arc::new(Lock::new(id)); - if let Some(lock) = &self.id_lock { - set.insert(lock.clone()); - } - self.id_lock = Some(lock.clone()); - if let Some(lock) = &self.value_lock { - set.insert(lock.clone()); - } - self.value_lock = Some(lock.clone()); - if let Some(lock) = &self.data_lock { - set.insert(lock.clone()); - } - self.data_lock = Some(lock.clone()); - (set, lock) - } - fn with_lock(id: u16) -> (Self, std::sync::Arc) { - let lock = std::sync::Arc::new(Lock::new(id)); - ( - Self { - id_lock: Some(lock.clone()), - value_lock: Some(lock.clone()), - data_lock: Some(lock.clone()), - }, - lock, - ) - } - #[allow(clippy::mutable_key_type)] - fn merge(&mut self, other: &mut Self) -> std::collections::HashSet> { - let mut set = std::collections::HashSet::new(); - if let Some(id_lock) = &other.id_lock { - if self.id_lock.is_none() { - self.id_lock = Some(id_lock.clone()); - } else { - set.insert(id_lock.clone()); - } - } - other.id_lock = self.id_lock.clone(); - if let Some(value_lock) = &other.value_lock { - if self.value_lock.is_none() { - self.value_lock = Some(value_lock.clone()); - } else { - set.insert(value_lock.clone()); - } - } - other.value_lock = self.value_lock.clone(); - if let Some(data_lock) = &other.data_lock { - if self.data_lock.is_none() { - self.data_lock = Some(data_lock.clone()); - } else { - set.insert(data_lock.clone()); - } - } - other.data_lock = self.data_lock.clone(); - set - } -} -#[derive(Debug, MemStat)] -pub struct VacuumTestIndex { - value_idx: IndexMap, - data_idx: - IndexMultiMap>>, -} -impl TableSecondaryIndex - for VacuumTestIndex -{ - fn save_row( - &self, - row: VacuumTestRow, - link: Link, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - if self - .value_idx - .insert_checked(row.value.clone(), link) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: VacuumTestAvailableIndexes::ValueIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); - if self - .data_idx - .insert_checked(row.data.clone(), link) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: VacuumTestAvailableIndexes::DataIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(VacuumTestAvailableIndexes::DataIdx); - core::result::Result::Ok(()) - } - fn reinsert_row( - &self, - row_old: VacuumTestRow, - link_old: Link, - row_new: VacuumTestRow, - link_new: Link, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - let row = &row_new; - let val_new = row.value.clone(); - let row = &row_old; - let val_old = row.value.clone(); - if val_new != val_old { - if self - .value_idx - .insert_checked(val_new.clone(), link_new) - .is_none() - { - return Err(IndexError::AlreadyExists { - at: VacuumTestAvailableIndexes::ValueIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); - } - let row = &row_new; - let val_new = row.value.clone(); - let row = &row_old; - let val_old = row.value.clone(); - if val_new == val_old { - TableIndex::insert(&self.value_idx, val_new.clone(), link_new); - } else { - TableIndex::remove(&self.value_idx, &val_old, link_old); - } - let row = &row_new; - let val_new = row.data.clone(); - let row = &row_old; - let val_old = row.data.clone(); - TableIndex::insert(&self.data_idx, val_new.clone(), link_new); - TableIndex::remove(&self.data_idx, &val_old, link_old); - core::result::Result::Ok(()) - } - fn delete_row( - &self, - row: VacuumTestRow, - link: Link, - ) -> core::result::Result<(), IndexError> { - if let Some(v) = TableIndex::remove(&self.value_idx, &row.value, link) { - println!("Delete value from value_idx: {:?}", v); - } else { - println!( - "Not deleted value from value_idx: {:?} {:?}", - row.value, link - ); - } - if let Some(v) = TableIndex::remove(&self.data_idx, &row.data, link) { - println!("Delete value from data_idx: {:?}", v); - } else { - println!("Not deleted from data_idx: {:?} {:?}", row.data, link); - } - core::result::Result::Ok(()) - } - fn process_difference_insert( - &self, - link: Link, - difference: std::collections::HashMap<&str, Difference>, - ) -> core::result::Result<(), IndexError> { - let mut inserted_indexes: Vec = vec![]; - if let Some(diff) = difference.get("value") { - if let VacuumTestAvaiableTypes::I64(new) = &diff.new { - let key_new = *new; - if TableIndex::insert_checked(&self.value_idx, key_new, link).is_none() { - return Err(IndexError::AlreadyExists { - at: VacuumTestAvailableIndexes::ValueIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(VacuumTestAvailableIndexes::ValueIdx); - } - } - if let Some(diff) = difference.get("data") { - if let VacuumTestAvaiableTypes::STRING(new) = &diff.new { - let key_new = new.to_string(); - if TableIndex::insert_checked(&self.data_idx, key_new, link).is_none() { - return Err(IndexError::AlreadyExists { - at: VacuumTestAvailableIndexes::DataIdx, - inserted_already: inserted_indexes.clone(), - }); - } - inserted_indexes.push(VacuumTestAvailableIndexes::DataIdx); - } - } - core::result::Result::Ok(()) - } - fn process_difference_remove( - &self, - link: Link, - difference: std::collections::HashMap<&str, Difference>, - ) -> core::result::Result<(), IndexError> { - if let Some(diff) = difference.get("value") { - if let VacuumTestAvaiableTypes::I64(old) = &diff.old { - let key_old = *old; - TableIndex::remove(&self.value_idx, &key_old, link); - } - } - if let Some(diff) = difference.get("data") { - if let VacuumTestAvaiableTypes::STRING(old) = &diff.old { - let key_old = old.to_string(); - TableIndex::remove(&self.data_idx, &key_old, link); - } - } - core::result::Result::Ok(()) - } - fn delete_from_indexes( - &self, - row: VacuumTestRow, - link: Link, - indexes: Vec, - ) -> core::result::Result<(), IndexError> { - for index in indexes { - match index { - VacuumTestAvailableIndexes::ValueIdx => { - TableIndex::remove(&self.value_idx, &row.value, link); - } - VacuumTestAvailableIndexes::DataIdx => { - TableIndex::remove(&self.data_idx, &row.data, link); - } - } - } - core::result::Result::Ok(()) +worktable!( + name: VacuumTest, + columns: { + id: u64 primary_key autoincrement, + value: i64, + data: String + }, + indexes: { + value_idx: value unique, + data_idx: data, } -} -impl TableSecondaryIndexInfo for VacuumTestIndex { - fn index_info(&self) -> Vec { - let mut info = Vec::new(); - info.push(IndexInfo { - name: "value_idx".to_string(), - index_type: IndexKind::Unique, - key_count: self.value_idx.len(), - capacity: self.value_idx.capacity(), - heap_size: self.value_idx.heap_size(), - used_size: self.value_idx.used_size(), - node_count: self.value_idx.node_count(), - }); - info.push(IndexInfo { - name: "data_idx".to_string(), - index_type: IndexKind::NonUnique, - key_count: self.data_idx.len(), - capacity: self.data_idx.capacity(), - heap_size: self.data_idx.heap_size(), - used_size: self.data_idx.used_size(), - node_count: self.data_idx.node_count(), - }); - info - } - fn is_empty(&self) -> bool { - self.value_idx.len() == 0 && self.data_idx.len() == 0 - } -} -impl Default for VacuumTestIndex { - fn default() -> Self { - Self { - value_idx: IndexMap::with_maximum_node_size( - get_index_page_size_from_data_length::(VACUUM_TEST_INNER_SIZE), - ), - data_idx: IndexMultiMap::with_maximum_node_size(VACUUM_TEST_INNER_SIZE), - } - } -} -#[derive(Debug, Clone, Copy, MoreDisplay, PartialEq, PartialOrd, Ord, Hash, Eq)] -pub enum VacuumTestAvailableIndexes { - ValueIdx, - DataIdx, -} -impl AvailableIndex for VacuumTestAvailableIndexes { - fn to_string_value(&self) -> String { - ToString::to_string(&self) - } -} -const VACUUM_TEST_PAGE_SIZE: usize = PAGE_SIZE; -const VACUUM_TEST_INNER_SIZE: usize = VACUUM_TEST_PAGE_SIZE - GENERAL_HEADER_SIZE; -#[derive(Debug)] -pub struct VacuumTestWorkTable( - WorkTable< - VacuumTestRow, - VacuumTestPrimaryKey, - VacuumTestAvaiableTypes, - VacuumTestAvailableIndexes, - VacuumTestIndex, - VacuumTestLock, - ::Generator, - { INNER_PAGE_SIZE }, - Vec>>, - >, ); -impl Default for VacuumTestWorkTable { - fn default() -> Self { - let mut inner = WorkTable::default(); - inner.table_name = "VacuumTest"; - Self(inner) - } -} -impl VacuumTestWorkTable { - pub fn name(&self) -> &'static str { - &self.0.table_name - } - pub fn select(&self, pk: Pk) -> Option - where - VacuumTestPrimaryKey: From, - { - self.0.select(pk.into()) - } - pub fn insert( - &self, - row: VacuumTestRow, - ) -> core::result::Result { - self.0.insert(row) - } - pub async fn reinsert( - &self, - row_old: VacuumTestRow, - row_new: VacuumTestRow, - ) -> core::result::Result { - self.0.reinsert(row_old, row_new).await - } - pub async fn upsert(&self, row: VacuumTestRow) -> core::result::Result<(), WorkTableError> { - let pk = row.get_primary_key(); - let need_to_update = { - if let Some(link) = self.0.primary_index.pk_map.get(&pk) { - true - } else { - false - } - }; - if need_to_update { - self.update(row).await?; - } else { - self.insert(row)?; - } - core::result::Result::Ok(()) - } - pub fn count(&self) -> usize { - let count = self.0.primary_index.pk_map.len(); - count - } - pub fn get_next_pk(&self) -> VacuumTestPrimaryKey { - self.0.get_next_pk() - } - pub fn iter_with core::result::Result<(), WorkTableError>>( - &self, - f: F, - ) -> core::result::Result<(), WorkTableError> { - let first = self - .0 - .primary_index - .pk_map - .iter() - .next() - .map(|(k, v)| (k.clone(), v.0)); - let Some((mut k, link)) = first else { - return Ok(()); - }; - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data)?; - let mut ind = false; - while !ind { - let next = { - let mut iter = self.0.primary_index.pk_map.range(k.clone()..); - let next = iter - .next() - .map(|(k, v)| (k.clone(), v.0)) - .filter(|(key, _)| key != &k); - if next.is_some() { - next - } else { - iter.next().map(|(k, v)| (k.clone(), v.0)) - } - }; - if let Some((key, link)) = next { - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data)?; - k = key - } else { - ind = true; - }; - } - core::result::Result::Ok(()) - } - pub async fn iter_with_async< - F: Fn(VacuumTestRow) -> Fut, - Fut: std::future::Future>, - >( - &self, - f: F, - ) -> core::result::Result<(), WorkTableError> { - let first = self - .0 - .primary_index - .pk_map - .iter() - .next() - .map(|(k, v)| (k.clone(), v.0)); - let Some((mut k, link)) = first else { - return Ok(()); - }; - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data).await?; - let mut ind = false; - while !ind { - let next = { - let mut iter = self.0.primary_index.pk_map.range(k.clone()..); - let next = iter - .next() - .map(|(k, v)| (k.clone(), v.0)) - .filter(|(key, _)| key != &k); - if next.is_some() { - next - } else { - iter.next().map(|(k, v)| (k.clone(), v.0)) - } - }; - if let Some((key, link)) = next { - let data = self - .0 - .data - .select_non_ghosted(link) - .map_err(WorkTableError::PagesError)?; - f(data).await?; - k = key - } else { - ind = true; - }; - } - core::result::Result::Ok(()) - } - pub fn system_info(&self) -> SystemInfo { - self.0.system_info() - } - pub fn vacuum(&self) -> std::sync::Arc { - std::sync::Arc::new(EmptyDataVacuum::<_, _, _, _, _, _, VacuumTestLock, _>::new( - "VacuumTest", - std::sync::Arc::clone(&self.0.data), - std::sync::Arc::clone(&self.0.lock_manager), - std::sync::Arc::clone(&self.0.primary_index), - std::sync::Arc::clone(&self.0.indexes), - )) - } -} -impl VacuumTestWorkTable { - pub fn select_by_value(&self, by: i64) -> Option { - let link: Link = self - .0 - .indexes - .value_idx - .get(&by) - .map(|kv| kv.get().value.into())?; - self.0.data.select_non_ghosted(link).ok() - } - pub fn select_by_data( - &self, - by: String, - ) -> SelectQueryBuilder< - VacuumTestRow, - impl DoubleEndedIterator + '_, - VacuumTestColumnRange, - VacuumTestRowFields, - > { - let rows = self - .0 - .indexes - .data_idx - .get(&by) - .into_iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()) - .filter(move |r| &r.data == &by); - SelectQueryBuilder::new(rows) - } -} -impl SelectQueryExecutor - for SelectQueryBuilder -where - I: DoubleEndedIterator + Sized, -{ - fn where_by( - self, - predicate: F, - ) -> SelectQueryBuilder< - VacuumTestRow, - impl DoubleEndedIterator + Sized, - VacuumTestColumnRange, - VacuumTestRowFields, - > - where - F: FnMut(&VacuumTestRow) -> bool, - { - SelectQueryBuilder { - params: self.params, - iter: self.iter.filter(predicate), - } - } - fn execute(self) -> Result, WorkTableError> { - let mut iter: Box> = Box::new(self.iter); - if !self.params.range.is_empty() { - for (range, column) in &self.params.range { - iter = match (column, range.clone().into()) { - (VacuumTestRowFields::Id, VacuumTestColumnRange::U64(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (VacuumTestRowFields::Id, VacuumTestColumnRange::U64Inclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (VacuumTestRowFields::Id, VacuumTestColumnRange::U64From(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (VacuumTestRowFields::Id, VacuumTestColumnRange::U64To(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (VacuumTestRowFields::Id, VacuumTestColumnRange::U64ToInclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.id))) - as Box> - } - (VacuumTestRowFields::Value, VacuumTestColumnRange::I64(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.value))) - as Box> - } - (VacuumTestRowFields::Value, VacuumTestColumnRange::I64Inclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.value))) - as Box> - } - (VacuumTestRowFields::Value, VacuumTestColumnRange::I64From(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.value))) - as Box> - } - (VacuumTestRowFields::Value, VacuumTestColumnRange::I64To(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.value))) - as Box> - } - (VacuumTestRowFields::Value, VacuumTestColumnRange::I64ToInclusive(range)) => { - Box::new(iter.filter(move |row| range.contains(&row.value))) - as Box> - } - _ => unreachable!(), - }; - } - } - if !self.params.order.is_empty() { - let mut items: Vec = iter.collect(); - items.sort_by(|a, b| { - for (order, col) in &self.params.order { - match col { - VacuumTestRowFields::Id => { - let cmp = a.id.partial_cmp(&b.id).unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - VacuumTestRowFields::Value => { - let cmp = a - .value - .partial_cmp(&b.value) - .unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - VacuumTestRowFields::Data => { - let cmp = a - .data - .partial_cmp(&b.data) - .unwrap_or(std::cmp::Ordering::Equal); - if cmp != std::cmp::Ordering::Equal { - return match order { - Order::Asc => cmp, - Order::Desc => cmp.reverse(), - }; - } - } - _ => continue, - } - } - std::cmp::Ordering::Equal - }); - iter = Box::new(items.into_iter()); - } - let iter_result: Box> = - if let Some(offset) = self.params.offset { - Box::new(iter.skip(offset)) - } else { - Box::new(iter) - }; - let iter_result: Box> = - if let Some(limit) = self.params.limit { - Box::new(iter_result.take(limit)) - } else { - Box::new(iter_result) - }; - Ok(iter_result.collect()) - } -} -#[derive(Debug, Clone)] -pub enum VacuumTestColumnRange { - U64(std::ops::Range), - U64Inclusive(std::ops::RangeInclusive), - U64From(std::ops::RangeFrom), - U64To(std::ops::RangeTo), - U64ToInclusive(std::ops::RangeToInclusive), - I64(std::ops::Range), - I64Inclusive(std::ops::RangeInclusive), - I64From(std::ops::RangeFrom), - I64To(std::ops::RangeTo), - I64ToInclusive(std::ops::RangeToInclusive), -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::Range) -> Self { - Self::U64(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeInclusive) -> Self { - Self::U64Inclusive(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeFrom) -> Self { - Self::U64From(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeTo) -> Self { - Self::U64To(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeToInclusive) -> Self { - Self::U64ToInclusive(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::Range) -> Self { - Self::I64(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeInclusive) -> Self { - Self::I64Inclusive(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeFrom) -> Self { - Self::I64From(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeTo) -> Self { - Self::I64To(range) - } -} -impl From> for VacuumTestColumnRange { - fn from(range: std::ops::RangeToInclusive) -> Self { - Self::I64ToInclusive(range) - } -} -impl VacuumTestWorkTable { - pub fn select_all( - &self, - ) -> SelectQueryBuilder< - VacuumTestRow, - impl DoubleEndedIterator + '_ + Sized, - VacuumTestColumnRange, - VacuumTestRowFields, - > { - let iter = self - .0 - .primary_index - .pk_map - .iter() - .filter_map(|(_, link)| self.0.data.select_non_ghosted(link.0).ok()); - SelectQueryBuilder::new(iter) - } -} -impl VacuumTestWorkTable { - pub async fn update(&self, row: VacuumTestRow) -> core::result::Result<(), WorkTableError> { - let pk = row.get_primary_key(); - let op_lock = { - let lock_id = self.0.lock_manager.next_id(); - if let Some(lock) = self.0.lock_manager.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); - let mut link: Link = self - .0 - .primary_index - .pk_map - .get(&pk) - .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound)?; - let row_old = self.0.data.select_non_ghosted(link)?; - self.0.update_state.insert(pk.clone(), row_old); - let mut bytes = rkyv::to_bytes::(&row) - .map_err(|_| WorkTableError::SerializeError)?; - if true { - drop(_guard); - let op_lock = { - let lock_id = self.0.lock_manager.next_id(); - if let Some(lock) = self.0.lock_manager.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all( - locks.iter().map(|l| l.wait()).collect::>(), - ) - .await; - } - op_lock - } - }; - let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); - let row_old = self.0.data.select_non_ghosted(link)?; - if let Err(e) = self.reinsert(row_old, row).await { - self.0.update_state.remove(&pk); - return Err(e); - } - self.0.update_state.remove(&pk); - return core::result::Result::Ok(()); - } - let mut archived_row = unsafe { - rkyv::access_unchecked_mut::<::Archived>(&mut bytes[..]) - .unseal_unchecked() - }; - let op_id = OperationId::Single(uuid::Uuid::now_v7()); - let row_old = self.0.data.select_non_ghosted(link)?; - let row_new = row.clone(); - let updated_bytes: Vec = vec![]; - let mut diffs: std::collections::HashMap<&str, Difference> = - std::collections::HashMap::new(); - let old = &row_old.value; - let new = &row_new.value; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("value", diff); - } - let old = &row_old.data; - let new = &row_new.data; - if old != new { - let diff = Difference:: { - old: old.clone().into(), - new: new.clone().into(), - }; - diffs.insert("data", diff); - } - let indexes_res = self - .0 - .indexes - .process_difference_insert(link, diffs.clone()); - if let Err(e) = indexes_res { - return match e { - IndexError::AlreadyExists { - at, - inserted_already, - } => { - self.0.indexes.delete_from_indexes( - row_new.merge(row_old.clone()), - link, - inserted_already, - )?; - Err(WorkTableError::AlreadyExists(at.to_string_value())) - } - IndexError::NotFound => Err(WorkTableError::NotFound), - }; - } - unsafe { - self.0 - .data - .with_mut_ref(link, move |archived| { - std::mem::swap(&mut archived.inner.id, &mut archived_row.id); - std::mem::swap(&mut archived.inner.value, &mut archived_row.value); - std::mem::swap(&mut archived.inner.data, &mut archived_row.data); - }) - .map_err(WorkTableError::PagesError)? - }; - self.0.indexes.process_difference_remove(link, diffs)?; - self.0.update_state.remove(&pk); - core::result::Result::Ok(()) - } -} -impl VacuumTestWorkTable {} -impl VacuumTestWorkTable { - pub async fn delete(&self, pk: Pk) -> core::result::Result<(), WorkTableError> - where - VacuumTestPrimaryKey: From, - { - let pk: VacuumTestPrimaryKey = pk.into(); - let op_lock = { - let lock_id = self.0.lock_manager.next_id(); - if let Some(lock) = self.0.lock_manager.get(&pk) { - let mut lock_guard = lock.write().await; - #[allow(clippy::mutable_key_type)] - let (locks, op_lock) = lock_guard.lock(lock_id); - drop(lock_guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()).await; - op_lock - } else { - #[allow(clippy::mutable_key_type)] - let (lock, op_lock) = VacuumTestLock::with_lock(lock_id); - let lock = std::sync::Arc::new(tokio::sync::RwLock::new(lock)); - let mut guard = lock.write().await; - if let Some(old_lock) = self.0.lock_manager.insert(pk.clone(), lock.clone()) { - let mut old_lock_guard = old_lock.write().await; - #[allow(clippy::mutable_key_type)] - let locks = guard.merge(&mut *old_lock_guard); - drop(old_lock_guard); - drop(guard); - futures::future::join_all(locks.iter().map(|l| l.wait()).collect::>()) - .await; - } - op_lock - } - }; - let _guard = LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone()); - let link = match self - .0 - .primary_index - .pk_map - .get(&pk) - .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound) - { - Ok(l) => l, - Err(e) => { - println!("Error getting primary index: {} for {:?}", e, pk.clone()); - return Err(e); - } - }; - - let Some(row) = self.0.select(pk.clone()) else { - println!("Found link {:?} for {:?}", link, pk); - panic!("Should exist") - }; - println!("Deleted row {:?}", row); - self.0.indexes.delete_row(row, link)?; - self.0.primary_index.remove(&pk, link); - self.0 - .data - .delete(link) - .map_err(WorkTableError::PagesError)?; - println!("Deleted link {:?} for {:?}", link, pk); - core::result::Result::Ok(()) - } - pub async fn delete_without_lock(&self, pk: Pk) -> core::result::Result<(), WorkTableError> - where - VacuumTestPrimaryKey: From, - { - let pk: VacuumTestPrimaryKey = pk.into(); - let link = self - .0 - .primary_index - .pk_map - .get(&pk) - .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound)?; - let row = self.0.select(pk.clone()).unwrap(); - self.0.indexes.delete_row(row, link)?; - self.0.primary_index.remove(&pk, link); - self.0 - .data - .delete(link) - .map_err(WorkTableError::PagesError)?; - core::result::Result::Ok(()) - } -} -impl VacuumTestWorkTable { - fn get_data_size(&self, link: Link) -> core::result::Result { - self.0 - .data - .with_ref(link, |row_ref| { - row_ref.inner.data.as_str().to_string().aligned_size() - }) - .map_err(WorkTableError::PagesError) - } -} #[tokio::test(flavor = "multi_thread", worker_threads = 3)] async fn vacuum_parallel_with_selects() { @@ -1291,6 +211,7 @@ async fn vacuum_parallel_with_upserts() { } #[tokio::test(flavor = "multi_thread", worker_threads = 3)] +#[ignore] async fn vacuum_loop_test() { let config = VacuumManagerConfig { check_interval: Duration::from_millis(1_000), @@ -1323,7 +244,6 @@ async fn vacuum_loop_test() { data: format!("test_data_{}", i), }; insert_table.insert(row.clone()).unwrap(); - println!("Inserted {:?}", row.id); tokio::time::sleep(Duration::from_micros(500)).await; i += 1; } @@ -1346,10 +266,8 @@ async fn vacuum_loop_test() { .range(..outdated_ts) .map(|(v, l)| (table.0.data.select_non_ghosted(**l).unwrap(), l, v)) .collect::>(); - println!("Ids to Remove {:?}", ids_to_remove); for (row, _, _) in ids_to_remove { table.delete(row.id).await.unwrap(); - println!("Removed {:?}", row.id); } } } From 71c9e39679df8a758e3db76e3208a1196ec3878e Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Thu, 12 Mar 2026 23:04:04 +0300 Subject: [PATCH 10/10] version bump --- Cargo.toml | 4 ++-- codegen/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f961929..85040c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.9.0-alpha3" +version = "0.9.0-alpha4" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -17,7 +17,7 @@ s3-support = ["dep:rust-s3", "dep:aws-creds", "dep:aws-region", "dep:walkdir", " # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "=0.9.0-alpha3" } +worktable_codegen = { path = "codegen", version = "=0.9.0-alpha4" } async-trait = "0.1.89" eyre = "0.6.12" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index d68dc7f..262ae69 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.9.0-alpha3" +version = "0.9.0-alpha4" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate"