Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur

[package]
name = "worktable"
version = "0.9.0-beta0.1.1"
version = "0.9.0-beta0.1.4"
edition = "2024"
authors = ["Handy-caT"]
license = "MIT"
Expand All @@ -19,7 +19,7 @@ s3-support = ["dep:rusty-s3", "dep:url", "dep:reqwest", "dep:walkdir", "worktabl
[dependencies]
async-trait = "0.1.89"
convert_case = "0.6.0"
data_bucket = "=0.3.13"
data_bucket = "=0.3.14"
# data_bucket = { git = "https://github.com/pathscale/DataBucket", branch = "page_cdc_correction", version = "0.2.7" }
# data_bucket = { path = "../DataBucket", version = "0.3.11" }
derive_more = { version = "2.0.1", features = ["from", "error", "display", "debug", "into"] }
Expand All @@ -46,7 +46,7 @@ tracing = "0.1"
url = { version = "2", optional = true }
uuid = { version = "1.10.0", features = ["v4", "v7"] }
walkdir = { version = "2", optional = true }
worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.0" }
worktable_codegen = { path = "codegen", version = "=0.9.0-beta0.1.3" }

[dev-dependencies]
chrono = "0.4.43"
Expand Down
2 changes: 1 addition & 1 deletion codegen/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "worktable_codegen"
version = "0.9.0-beta0.1.0"
version = "0.9.0-beta0.1.3"
edition = "2024"
license = "MIT"
description = "WorkTable codegeneration crate"
Expand Down
80 changes: 46 additions & 34 deletions codegen/src/persist_index/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ impl Generator {
}
} else {
quote! {
let inner: Vec<_> = page
let mut inner: Vec<_> = page
.inner
.get_node()
.into_iter()
Expand All @@ -381,22 +381,28 @@ impl Generator {
value: OffsetEqLink(p.value),
})
.collect();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
if p.key == last_key {
let multi = p.with_last_discriminator(discriminator) ;
discriminator = multi.discriminator;
multi
} else {
last_key = p.key.clone();
let multi: IndexMultiPair<_, _> = p.into();
discriminator = multi.discriminator;
multi
}
}).collect::<Vec<_>>();
inner.sort();
let node = UnsizedNode::from_inner(inner, #const_name);

let node_id = inner.pop();

let mut sorted: Vec<_> = inner.into_iter()
.map(|p| IndexMultiPair {
key: p.key,
value: p.value,
discriminator: 0,
})
.collect();
sorted.sort();

let mut current_discriminator = 1u64;
for entry in sorted.iter_mut() {
entry.discriminator = current_discriminator.min(u64::MAX - 1);
current_discriminator += 1;
}
if let Some(node_id) = node_id {
sorted.push(IndexMultiPair { key: node_id.key, value: node_id.value, discriminator: u64::MAX - 1 });
}

let node = UnsizedNode::from_inner(sorted, #const_name);
#i.attach_multi_node(node);
}
};
Expand All @@ -422,7 +428,7 @@ impl Generator {
}
} else {
quote! {
let inner: Vec<_> = page
let mut inner: Vec<_> = page
.inner
.get_node()
.into_iter()
Expand All @@ -431,22 +437,28 @@ impl Generator {
value: OffsetEqLink(p.value),
})
.collect();
let mut last_key = inner.first().expect("Node should be not empty").key.clone();
let mut discriminator = 0;
let mut inner = inner.into_iter().map(move |p| {
if p.key == last_key {
let multi = p.with_last_discriminator(discriminator) ;
discriminator = multi.discriminator;
multi
} else {
last_key = p.key.clone();
let multi: IndexMultiPair<_, _> = p.into();
discriminator = multi.discriminator;
multi
}
}).collect::<Vec<_>>();
inner.sort();
#i.attach_multi_node(inner);

let node_id = inner.pop();

let mut sorted: Vec<_> = inner.into_iter()
.map(|p| IndexMultiPair {
key: p.key,
value: p.value,
discriminator: 0,
})
.collect();
sorted.sort();

let mut current_discriminator = 1u64;
for entry in sorted.iter_mut() {
entry.discriminator = current_discriminator.min(u64::MAX - 1);
current_discriminator += 1;
}
if let Some(node_id) = node_id {
sorted.push(IndexMultiPair { key: node_id.key, value: node_id.value, discriminator: u64::MAX - 1 });
}

#i.attach_multi_node(sorted);
}
};
quote! {
Expand Down
9 changes: 9 additions & 0 deletions src/index/multipair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use indexset::core::pair::Pair;

pub trait MultiPairRecreate<T, L> {
fn with_last_discriminator(self, discriminator: u64) -> MultiPair<T, L>;
fn with_discriminator(self, discriminator: u64) -> MultiPair<T, L>;
}

impl<T, L> MultiPairRecreate<T, L> for Pair<T, L> {
Expand All @@ -13,4 +14,12 @@ impl<T, L> MultiPairRecreate<T, L> for Pair<T, L> {
discriminator: fastrand::u64(discriminator..),
}
}

fn with_discriminator(self, discriminator: u64) -> MultiPair<T, L> {
MultiPair {
key: self.key,
value: self.value,
discriminator,
}
}
}
26 changes: 13 additions & 13 deletions src/persistence/space/index/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,31 +6,31 @@ use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use convert_case::{Case, Casing};
use data_bucket::page::{IndexValue, PageId};
use data_bucket::{
GENERAL_HEADER_SIZE, GeneralHeader, GeneralPage, IndexPage, IndexPageUtility, Link, PageType,
SizeMeasurable, SpaceId, SpaceInfoPage, get_index_page_size_from_data_length, parse_page,
persist_page, persist_pages_batch,
get_index_page_size_from_data_length, parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPage,
IndexPageUtility, Link, PageType, SizeMeasurable, SpaceId,
SpaceInfoPage, GENERAL_HEADER_SIZE,
};
use eyre::eyre;
use indexset::cdc::change::ChangeEvent;
use indexset::concurrent::map::BTreeMap;
use indexset::core::pair::Pair;
use rkyv::de::Pool;
use rkyv::rancor::Strategy;
use rkyv::ser::Serializer;
use rkyv::ser::allocator::ArenaHandle;
use rkyv::ser::sharing::Share;
use rkyv::ser::Serializer;
use rkyv::util::AlignedVec;
use rkyv::{Archive, Deserialize, Serialize, rancor};
use rkyv::{rancor, Archive, Deserialize, Serialize};
use tokio::fs::File;

use crate::persistence::space::{open_or_create_file, BatchChangeEvent};
use crate::persistence::SpaceIndexOps;
use crate::persistence::space::{BatchChangeEvent, open_or_create_file};
use crate::prelude::WT_INDEX_EXTENSION;

pub use table_of_contents::IndexTableOfContents;
Expand Down Expand Up @@ -433,14 +433,15 @@ where
ChangeEvent::InsertAt { max_value, .. }
| ChangeEvent::RemoveAt { max_value, .. } => {
let page_id = &(max_value.key.clone(), max_value.value);
let Some(page_index) = self.table_of_contents.get(page_id) else {
panic!("page should be available in table of contents")
};

let page_index = self
.table_of_contents
.get(page_id)
.expect("page should be available in table of contents");
let page = pages.get_mut(&page_index);
let page_to_update = if let Some(page) = page {
page
} else {
//println!("Trying to parse page {}", page_index);
let page = parse_page::<IndexPage<T>, INNER_PAGE_SIZE>(
&mut self.index_file,
page_index.into(),
Expand Down Expand Up @@ -526,7 +527,6 @@ where
.get_mut(&page_index)
.expect("should be available as was just inserted before")
};
// println!("Event: {:?}", &ev);
let splitted_page = page_to_update.inner.split(*split_index);
let new_page_id = if let Some(id) = self.table_of_contents.pop_empty_page_id() {
id
Expand Down Expand Up @@ -567,7 +567,7 @@ where
#[cfg(test)]
mod test {
use data_bucket::{
INNER_PAGE_SIZE, IndexPage, IndexValue, Persistable, get_index_page_size_from_data_length,
get_index_page_size_from_data_length, IndexPage, IndexValue, Persistable, INNER_PAGE_SIZE,
};

#[test]
Expand Down
17 changes: 9 additions & 8 deletions src/persistence/space/index/unsized_.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
use std::collections::HashMap;
use std::fmt::Debug;
use std::hash::Hash;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::Arc;

use data_bucket::page::PageId;
use data_bucket::{
GeneralHeader, GeneralPage, IndexPageUtility, IndexValue, Link, PageType, SizeMeasurable,
SpaceId, SpaceInfoPage, UnsizedIndexPage, VariableSizeMeasurable, parse_page, persist_page,
persist_pages_batch,
parse_page, persist_page, persist_pages_batch, GeneralHeader, GeneralPage, IndexPageUtility, IndexValue,
Link, PageType, SizeMeasurable, SpaceId, SpaceInfoPage, UnsizedIndexPage,
VariableSizeMeasurable,
};
use eyre::eyre;
use indexset::cdc::change::ChangeEvent;
use indexset::concurrent::map::BTreeMap;
use indexset::core::pair::Pair;
use rkyv::de::Pool;
use rkyv::rancor::Strategy;
use rkyv::ser::Serializer;
use rkyv::ser::allocator::ArenaHandle;
use rkyv::ser::sharing::Share;
use rkyv::ser::Serializer;
use rkyv::util::AlignedVec;
use rkyv::{Archive, Deserialize, Serialize, rancor};
use rkyv::{rancor, Archive, Deserialize, Serialize};
use tokio::fs::File;

use crate::UnsizedNode;
use crate::persistence::space::BatchChangeEvent;
use crate::persistence::{IndexTableOfContents, SpaceIndex, SpaceIndexOps};
use crate::prelude::WT_INDEX_EXTENSION;
use crate::UnsizedNode;

#[derive(Debug)]
pub struct SpaceIndexUnsized<T: Ord + Eq, const DATA_LENGTH: u32> {
Expand Down Expand Up @@ -393,6 +393,7 @@ where
ChangeEvent::InsertAt { max_value, .. }
| ChangeEvent::RemoveAt { max_value, .. } => {
let page_id = &(max_value.key.clone(), max_value.value);

let page_index = self
.table_of_contents
.get(page_id)
Expand Down Expand Up @@ -463,6 +464,7 @@ where
split_index,
} => {
let page_id = &(max_value.key.clone(), max_value.value);

let page_index = self
.table_of_contents
.get(page_id)
Expand All @@ -471,7 +473,6 @@ where
let page_to_update = if let Some(page) = page {
page
} else {
// println!("Try to parse page: {:?} {:?}", page_index, page_id);
let page =
parse_page::<UnsizedIndexPage<T, INNER_PAGE_SIZE>, INNER_PAGE_SIZE>(
&mut self.index_file,
Expand Down
Loading
Loading