From 916e36220d8b299f259bac4f2efcadf38b9f111a Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Sun, 8 Feb 2026 15:30:01 +0300 Subject: [PATCH 1/5] some guard ideas --- .../src/worktable/generator/queries/delete.rs | 8 +- .../worktable/generator/queries/in_place.rs | 6 +- .../src/worktable/generator/queries/update.rs | 110 +++++++++-------- src/lock/mod.rs | 116 ++++++++++++++++++ src/lock/row_lock.rs | 7 +- src/table/vacuum/vacuum.rs | 6 +- 6 files changed, 191 insertions(+), 62 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index b0a1c93..2d03278 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -44,13 +44,13 @@ impl Generator { where #pk_ident: From { let pk: #pk_ident = pk.into(); - let lock = { + let _guard = { #full_row_lock - }; + }.guard(); #delete_logic - lock.unlock(); // Releases locks + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks core::result::Result::Ok(()) @@ -113,7 +113,7 @@ impl Generator { .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { - lock.unlock(); // Releases locks + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks return Err(e); } diff --git a/codegen/src/worktable/generator/queries/in_place.rs b/codegen/src/worktable/generator/queries/in_place.rs index efe4ca3..4a2ce6f 100644 --- a/codegen/src/worktable/generator/queries/in_place.rs +++ b/codegen/src/worktable/generator/queries/in_place.rs @@ -109,9 +109,9 @@ impl Generator { where #pk_type: From { let pk: #pk_type = by.into(); - let lock = { + let _guard = { #custom_lock - }; + }.guard(); let link = self .0 .primary_index.pk_map @@ -125,7 +125,7 @@ impl Generator { .map_err(WorkTableError::PagesError)? }; - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); Ok(()) diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 952fd8f..5402878 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -63,34 +63,33 @@ impl Generator { quote! {} } else { quote! { - if true { - lock.unlock(); // Releases locks - let lock = { - #full_row_lock - }; - let row_old = self.0.data.select_non_ghosted(link)?; - if let Err(e) = self.reinsert(row_old, row).await { - self.0.update_state.remove(&pk); - lock.unlock(); - - return Err(e); - } - + drop(_guard); + let _guard = { + #full_row_lock + }.guard(); + let row_old = self.0.data.select_non_ghosted(link)?; + if let Err(e) = self.reinsert(row_old, row).await { self.0.update_state.remove(&pk); - lock.unlock(); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks + drop(_guard); + self.0.lock_manager.remove_with_lock_check(&pk); - return core::result::Result::Ok(()); + return Err(e); } + + self.0.update_state.remove(&pk); + drop(_guard); + self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks + + return core::result::Result::Ok(()); } }; quote! { pub async fn update(&self, row: #row_ident) -> core::result::Result<(), WorkTableError> { let pk = row.get_primary_key(); - let lock = { + let _guard = { #full_row_lock - }; + }.guard(); let mut link: Link = match self.0 .primary_index @@ -101,7 +100,7 @@ impl Generator { { Ok(l) => l, Err(e) => { - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); @@ -128,7 +127,7 @@ impl Generator { self.0.update_state.remove(&pk); - lock.unlock(); // Releases locks + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks #persist_call @@ -266,10 +265,11 @@ impl Generator { let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { - lock.unlock(); - let lock = { + // CRITICAL: Drop the outer guard first before creating a new one for reinsert + drop(_guard); + let _guard = { #full_row_lock - }; + }.guard(); let row_old = self.0.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); @@ -277,12 +277,13 @@ impl Generator { #(#row_updates)* if let Err(e) = self.reinsert(row_old, row_new).await { self.0.update_state.remove(&pk); - lock.unlock(); + drop(_guard); + self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); } - lock.unlock(); // Releases locks + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks return core::result::Result::Ok(()); @@ -475,9 +476,9 @@ impl Generator { where #pk_ident: From { let pk = pk.into(); - let lock = { + let _guard = { #custom_lock - }; + }.guard(); let mut link: Link = match self.0 .primary_index @@ -487,7 +488,7 @@ impl Generator { .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); @@ -508,7 +509,7 @@ impl Generator { #diff_process_remove - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); #persist_call @@ -545,6 +546,7 @@ impl Generator { }) .collect::>(); + let custom_lock_for_size_check = self.gen_custom_lock_for_update(lock_ident.clone()); let size_check = if let Some(f) = unsized_fields { let fields_check: Vec<_> = f .iter() @@ -569,27 +571,31 @@ impl Generator { let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { - let op_lock = locks.remove(&pk).expect("should not be deleted as links are unique"); - op_lock.unlock(); - let lock = { + // CRITICAL: First remove and drop the existing guard to unlock before acquiring full row lock + let _old_guard = guards.remove(&pk).expect("guard should exist for this pk"); + drop(_old_guard); + + let _guard = { #full_row_lock - }; + }.guard(); let row_old = self.0.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); #(#row_updates)* if let Err(e) = self.reinsert(row_old, row_new).await { self.0.update_state.remove(&pk); - lock.unlock(); - + drop(_guard); + self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); } - lock.unlock(); // Releases locks - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks - + drop(_guard); + self.0.lock_manager.remove_with_lock_check(&pk); + // Re-acquire a new guard and insert back for the final unlock loop + let new_guard = { + #custom_lock_for_size_check + }.guard(); + guards.insert(pk, new_guard); continue; - } else { - pk_to_unlock.insert(pk.clone(), locks.remove(&pk).expect("should not be deleted as links are unique")); } } } else { @@ -614,17 +620,17 @@ impl Generator { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); - let mut locks = std::collections::HashMap::new(); + // Acquire ALL locks as guards upfront to prevent deadlock + let mut guards: std::collections::HashMap<_, worktable::lock::LockGuard> = std::collections::HashMap::new(); for link in links.iter() { let pk = self.0.data.select_non_ghosted(*link)?.get_primary_key().clone(); - let op_lock = { + let lock = { #custom_lock }; - locks.insert(pk, op_lock); + guards.insert(pk, lock.guard()); } let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); - let mut pk_to_unlock: std::collections::HashMap<_, std::sync::Arc> = std::collections::HashMap::new(); let op_id = OperationId::Multi(uuid::Uuid::now_v7()); for link in links.into_iter() { let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone(); @@ -649,11 +655,13 @@ impl Generator { #diff_process_remove #persist_call - } - for (pk, lock) in pk_to_unlock { - lock.unlock(); + + // Remove guard, drop it to unlock, then remove from LockMap + let _guard = guards.remove(&pk).expect("guard should exist for this pk"); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); } + // All remaining guards (error paths) dropped automatically here core::result::Result::Ok(()) } } @@ -719,9 +727,9 @@ impl Generator { let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone(); - let lock = { + let _guard = { #custom_lock - }; + }.guard(); let link = loop { let link = match self.0.indexes.#index @@ -730,7 +738,7 @@ impl Generator { .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); @@ -760,7 +768,7 @@ impl Generator { #diff_process_remove - lock.unlock(); + drop(_guard); self.0.lock_manager.remove_with_lock_check(&pk); #persist_call diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 7dbe0d0..1f9ff6a 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -1,8 +1,10 @@ mod map; mod row_lock; +use std::cell::Cell; use std::future::Future; use std::hash::{Hash, Hasher}; +use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; @@ -14,6 +16,33 @@ use parking_lot::Mutex; pub use map::LockMap; pub use row_lock::{FullRowLock, RowLock}; +/// RAII guard that automatically unlocks a [`Lock`] when dropped. +/// +/// The [`Lock`] is automatically released when the guard is dropped, or can be +/// explicitly released early using the `unlock()` method. +pub struct LockGuard { + lock: Option>, + /// Marker to make this type `!Sync` (but still `Send`) + _not_sync: PhantomData>, +} + +impl LockGuard { + /// Explicitly unlocks the lock before the guard is dropped. + pub fn unlock(mut self) { + if let Some(lock) = self.lock.take() { + lock.unlock(); + } + } +} + +impl Drop for LockGuard { + fn drop(&mut self) { + if let Some(lock) = &self.lock { + lock.unlock(); + } + } +} + #[derive(Debug)] pub struct Lock { id: u16, @@ -79,6 +108,15 @@ impl Lock { waker, } } + + /// Creates a [`LockGuard`] that will automatically unlock this lock when + /// dropped. + pub fn guard(self: Arc) -> LockGuard { + LockGuard { + lock: Some(self), + _not_sync: PhantomData, + } + } } #[derive(Debug)] @@ -104,3 +142,81 @@ impl Future for LockWait { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::panic::AssertUnwindSafe; + + #[test] + fn test_unlock_on_drop() { + let lock = Arc::new(Lock::new(1)); + assert!(lock.is_locked()); + + { + let _guard = lock.clone().guard(); + assert!(lock.is_locked()); + } + + assert!(!lock.is_locked()); + } + + #[test] + fn test_explicit_unlock() { + let lock = Arc::new(Lock::new(1)); + assert!(lock.is_locked()); + + let guard = lock.clone().guard(); + assert!(lock.is_locked()); + + guard.unlock(); + + assert!(!lock.is_locked()); + } + + #[test] + fn test_panic_releases_lock() { + let lock = Arc::new(Lock::new(1)); + assert!(lock.is_locked()); + + let result = std::panic::catch_unwind(AssertUnwindSafe(|| { + let _guard = lock.clone().guard(); + panic!("test panic"); + })); + + assert!(result.is_err()); + + assert!(!lock.is_locked()); + } + + #[test] + fn test_multiple_guards_can_be_held() { + let lock1 = Arc::new(Lock::new(1)); + let lock2 = Arc::new(Lock::new(2)); + let lock3 = Arc::new(Lock::new(3)); + + assert!(lock1.is_locked()); + assert!(lock2.is_locked()); + assert!(lock3.is_locked()); + + { + let _guard1 = lock1.clone().guard(); + let _guard2 = lock2.clone().guard(); + let _guard3 = lock3.clone().guard(); + + assert!(lock1.is_locked()); + assert!(lock2.is_locked()); + assert!(lock3.is_locked()); + } + + assert!(!lock1.is_locked()); + assert!(!lock2.is_locked()); + assert!(!lock3.is_locked()); + } + + #[test] + fn test_guard_is_send() { + fn assert_send() {} + assert_send::(); + } +} diff --git a/src/lock/row_lock.rs b/src/lock/row_lock.rs index 7b75ba4..df63318 100644 --- a/src/lock/row_lock.rs +++ b/src/lock/row_lock.rs @@ -1,4 +1,4 @@ -use crate::lock::{Lock, LockWait}; +use crate::lock::{Lock, LockGuard, LockWait}; use std::collections::HashSet; use std::sync::Arc; @@ -32,6 +32,11 @@ impl FullRowLock { self.l.unlock(); } + /// Creates a [`LockGuard`] that will automatically unlock this lock when dropped. + pub fn guard(self) -> LockGuard { + self.l.guard() + } + pub fn wait(&self) -> LockWait { self.l.wait() } diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 1663885..082b6b2 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -247,13 +247,13 @@ where drop(range); for (from_link, pk) in links { - let lock = self.full_row_lock(&pk).await; + let _guard = self.full_row_lock(&pk).await.guard(); if self .data_pages .with_ref(from_link.0, |r| r.is_deleted()) .expect("link should be valid") { - lock.unlock(); + drop(_guard); self.lock_manager.remove_with_lock_check(&pk); continue; } @@ -270,7 +270,7 @@ where .expect("page is not full as checked on links collection"); self.update_index_after_move(pk.clone(), from_link.0, new_link); self.lock_manager.remove_with_lock_check(&pk); - lock.unlock(); + drop(_guard); } (from_page_will_be_moved, to_page_will_be_filled) From 974b53ffa811de266f2ff39edeb86d44e3bf475b Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Sun, 8 Feb 2026 17:07:37 +0300 Subject: [PATCH 2/5] WIP --- .../src/worktable/generator/queries/delete.rs | 14 +- .../worktable/generator/queries/in_place.rs | 12 +- .../src/worktable/generator/queries/update.rs | 146 ++++++------------ src/lib.rs | 3 +- src/lock/mod.rs | 105 +++++++++---- src/lock/row_lock.rs | 16 +- src/table/vacuum/vacuum.rs | 10 +- 7 files changed, 161 insertions(+), 145 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index 2d03278..e27fdc4 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -44,15 +44,15 @@ impl Generator { where #pk_ident: From { let pk: #pk_ident = pk.into(); - let _guard = { - #full_row_lock - }.guard(); + let __op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + __op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); #delete_logic - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks - core::result::Result::Ok(()) } } @@ -113,8 +113,6 @@ impl Generator { .ok_or(WorkTableError::NotFound) { Ok(l) => l, Err(e) => { - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks return Err(e); } }; diff --git a/codegen/src/worktable/generator/queries/in_place.rs b/codegen/src/worktable/generator/queries/in_place.rs index 4a2ce6f..5b734ee 100644 --- a/codegen/src/worktable/generator/queries/in_place.rs +++ b/codegen/src/worktable/generator/queries/in_place.rs @@ -109,9 +109,12 @@ impl Generator { where #pk_type: From { let pk: #pk_type = by.into(); - let _guard = { - #custom_lock - }.guard(); + let __op_lock = { #custom_lock }; + let _guard = LockGuard::new( + __op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); let link = self .0 .primary_index.pk_map @@ -125,9 +128,6 @@ impl Generator { .map_err(WorkTableError::PagesError)? }; - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - Ok(()) } } diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 5402878..360b6f7 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -64,21 +64,20 @@ impl Generator { } else { quote! { drop(_guard); - let _guard = { - #full_row_lock - }.guard(); + let op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); let row_old = self.0.data.select_non_ghosted(link)?; if let Err(e) = self.reinsert(row_old, row).await { self.0.update_state.remove(&pk); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); } self.0.update_state.remove(&pk); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks return core::result::Result::Ok(()); } @@ -87,25 +86,19 @@ impl Generator { quote! { pub async fn update(&self, row: #row_ident) -> core::result::Result<(), WorkTableError> { let pk = row.get_primary_key(); - let _guard = { - #full_row_lock - }.guard(); - - let mut link: Link = match self.0 + let op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); + + let mut link: Link = self.0 .primary_index .pk_map .get(&pk) .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound) - { - Ok(l) => l, - Err(e) => { - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - - return Err(e); - } - }; + .ok_or(WorkTableError::NotFound)?; let row_old = self.0.data.select_non_ghosted(link)?; self.0.update_state.insert(pk.clone(), row_old); @@ -127,9 +120,6 @@ impl Generator { self.0.update_state.remove(&pk); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks - #persist_call core::result::Result::Ok(()) @@ -265,27 +255,23 @@ impl Generator { let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { - // CRITICAL: Drop the outer guard first before creating a new one for reinsert drop(_guard); - let _guard = { - #full_row_lock - }.guard(); + let op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); let row_old = self.0.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); - let pk = row_old.get_primary_key().clone(); #(#row_updates)* if let Err(e) = self.reinsert(row_old, row_new).await { self.0.update_state.remove(&pk); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); } - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); // Removes locks - return core::result::Result::Ok(()); } } @@ -476,24 +462,19 @@ impl Generator { where #pk_ident: From { let pk = pk.into(); - let _guard = { - #custom_lock - }.guard(); - - let mut link: Link = match self.0 + let op_lock = { #custom_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); + + let mut link: Link = self.0 .primary_index .pk_map .get(&pk) .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound) { - Ok(l) => l, - Err(e) => { - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - - return Err(e); - } - }; + .ok_or(WorkTableError::NotFound)?; let mut bytes = rkyv::to_bytes::(&row).map_err(|_| WorkTableError::SerializeError)?; let mut archived_row = unsafe { rkyv::access_unchecked_mut::<<#query_ident as rkyv::Archive>::Archived>(&mut bytes[..]).unseal_unchecked() }; @@ -509,9 +490,6 @@ impl Generator { #diff_process_remove - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - #persist_call core::result::Result::Ok(()) @@ -571,30 +549,23 @@ impl Generator { let mut need_to_reinsert = true; #(#fields_check)* if need_to_reinsert { - // CRITICAL: First remove and drop the existing guard to unlock before acquiring full row lock - let _old_guard = guards.remove(&pk).expect("guard should exist for this pk"); - drop(_old_guard); - - let _guard = { - #full_row_lock - }.guard(); + let old_guard = guards.remove(&pk).expect("guard should exist for this pk"); + drop(old_guard); + + let op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); let row_old = self.0.select(pk.clone()).expect("should not be deleted by other thread"); let mut row_new = row_old.clone(); #(#row_updates)* if let Err(e) = self.reinsert(row_old, row_new).await { self.0.update_state.remove(&pk); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); return Err(e); } - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - // Re-acquire a new guard and insert back for the final unlock loop - let new_guard = { - #custom_lock_for_size_check - }.guard(); - guards.insert(pk, new_guard); continue; } } @@ -620,14 +591,11 @@ impl Generator { pub async fn #method_ident(&self, row: #query_ident, by: #by_ident) -> core::result::Result<(), WorkTableError> { let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); - // Acquire ALL locks as guards upfront to prevent deadlock - let mut guards: std::collections::HashMap<_, worktable::lock::LockGuard> = std::collections::HashMap::new(); + let mut guards: std::collections::HashMap<_, _> = std::collections::HashMap::new(); for link in links.iter() { let pk = self.0.data.select_non_ghosted(*link)?.get_primary_key().clone(); - let lock = { - #custom_lock - }; - guards.insert(pk, lock.guard()); + let __op_lock = { #custom_lock }; + guards.insert(pk, LockGuard::new(__op_lock, self.0.lock_manager.clone(), pk.clone())); } let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); @@ -656,12 +624,8 @@ impl Generator { #persist_call - // Remove guard, drop it to unlock, then remove from LockMap - let _guard = guards.remove(&pk).expect("guard should exist for this pk"); - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); + guards.remove(&pk); } - // All remaining guards (error paths) dropped automatically here core::result::Result::Ok(()) } } @@ -727,23 +691,18 @@ impl Generator { let pk = self.0.data.select_non_ghosted(link)?.get_primary_key().clone(); - let _guard = { - #custom_lock - }.guard(); + let op_lock = { #custom_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); let link = loop { - let link = match self.0.indexes.#index + let link = self.0.indexes.#index .get(#by) .map(|v| v.get().value.into()) - .ok_or(WorkTableError::NotFound) { - Ok(l) => l, - Err(e) => { - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - - return Err(e); - } - }; + .ok_or(WorkTableError::NotFound)?; if let Err(e) = self.0.data.select_non_vacuumed(link) { if e.is_vacuumed() { @@ -768,9 +727,6 @@ impl Generator { #diff_process_remove - drop(_guard); - self.0.lock_manager.remove_with_lock_check(&pk); - #persist_call core::result::Result::Ok(()) diff --git a/src/lib.rs b/src/lib.rs index 9e2648b..16a19cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,8 +19,9 @@ pub mod prelude { pub use crate::in_memory::{ ArchivedRowWrapper, Data, DataPages, Query, RowWrapper, StorableRow, }; - pub use crate::lock::LockMap; + pub use crate::lock::{LockGuard, LockMap}; pub use crate::lock::{Lock, RowLock}; + pub use crate::lock::FullRowLock; pub use crate::mem_stat::MemStat; pub use crate::persistence::{ DeleteOperation, IndexTableOfContents, InsertOperation, Operation, OperationId, diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 1f9ff6a..6951744 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -2,6 +2,7 @@ mod map; mod row_lock; use std::cell::Cell; +use std::fmt::Debug; use std::future::Future; use std::hash::{Hash, Hasher}; use std::marker::PhantomData; @@ -20,26 +21,52 @@ pub use row_lock::{FullRowLock, RowLock}; /// /// The [`Lock`] is automatically released when the guard is dropped, or can be /// explicitly released early using the `unlock()` method. -pub struct LockGuard { - lock: Option>, +/// +/// The guard will also attempt to remove the lock entry from the map on drop +/// (preventing memory leaks). +pub struct LockGuard { + lock: Arc, + lock_map: Arc>, + primary_key: PrimaryKey, /// Marker to make this type `!Sync` (but still `Send`) _not_sync: PhantomData>, } -impl LockGuard { - /// Explicitly unlocks the lock before the guard is dropped. - pub fn unlock(mut self) { - if let Some(lock) = self.lock.take() { - lock.unlock(); +impl LockGuard +where + LockType: RowLock, + PrimaryKey: Hash + Eq + Debug + Clone, +{ + /// Creates a new guard that will clean up the lock entry from the map on drop. + pub fn new( + lock: Arc, + lock_map: Arc>, + primary_key: PrimaryKey, + ) -> Self { + Self { + lock, + lock_map, + primary_key, + _not_sync: PhantomData, } } + + /// Explicitly unlocks the lock before the guard is dropped. + pub fn unlock(self) { + self.lock.unlock(); + self.lock_map.remove_with_lock_check(&self.primary_key); + } } -impl Drop for LockGuard { +impl Drop for LockGuard +where + LockType: RowLock, + PrimaryKey: Hash + Eq + Debug + Clone, +{ fn drop(&mut self) { - if let Some(lock) = &self.lock { - lock.unlock(); - } + self.lock.unlock(); + // Remove from lock map after unlocking + self.lock_map.remove_with_lock_check(&self.primary_key); } } @@ -108,15 +135,6 @@ impl Lock { waker, } } - - /// Creates a [`LockGuard`] that will automatically unlock this lock when - /// dropped. - pub fn guard(self: Arc) -> LockGuard { - LockGuard { - lock: Some(self), - _not_sync: PhantomData, - } - } } #[derive(Debug)] @@ -151,10 +169,12 @@ mod tests { #[test] fn test_unlock_on_drop() { let lock = Arc::new(Lock::new(1)); + let lock_map: Arc> = Arc::new(LockMap::default()); + let pk = 1u64; assert!(lock.is_locked()); { - let _guard = lock.clone().guard(); + let _guard = LockGuard::::new(lock.clone(), lock_map.clone(), pk); assert!(lock.is_locked()); } @@ -164,9 +184,11 @@ mod tests { #[test] fn test_explicit_unlock() { let lock = Arc::new(Lock::new(1)); + let lock_map: Arc> = Arc::new(LockMap::default()); + let pk = 1u64; assert!(lock.is_locked()); - let guard = lock.clone().guard(); + let guard = LockGuard::::new(lock.clone(), lock_map.clone(), pk); assert!(lock.is_locked()); guard.unlock(); @@ -177,10 +199,12 @@ mod tests { #[test] fn test_panic_releases_lock() { let lock = Arc::new(Lock::new(1)); + let lock_map: Arc> = Arc::new(LockMap::default()); + let pk = 1u64; assert!(lock.is_locked()); let result = std::panic::catch_unwind(AssertUnwindSafe(|| { - let _guard = lock.clone().guard(); + let _guard = LockGuard::::new(lock.clone(), lock_map.clone(), pk); panic!("test panic"); })); @@ -194,15 +218,16 @@ mod tests { let lock1 = Arc::new(Lock::new(1)); let lock2 = Arc::new(Lock::new(2)); let lock3 = Arc::new(Lock::new(3)); + let lock_map: Arc> = Arc::new(LockMap::default()); assert!(lock1.is_locked()); assert!(lock2.is_locked()); assert!(lock3.is_locked()); { - let _guard1 = lock1.clone().guard(); - let _guard2 = lock2.clone().guard(); - let _guard3 = lock3.clone().guard(); + let _guard1 = LockGuard::::new(lock1.clone(), lock_map.clone(), 1u64); + let _guard2 = LockGuard::::new(lock2.clone(), lock_map.clone(), 2u64); + let _guard3 = LockGuard::::new(lock3.clone(), lock_map.clone(), 3u64); assert!(lock1.is_locked()); assert!(lock2.is_locked()); @@ -217,6 +242,32 @@ mod tests { #[test] fn test_guard_is_send() { fn assert_send() {} - assert_send::(); + // LockGuard is Send if LockType and PrimaryKey are Send + assert_send::>(); + } + + #[tokio::test] + async fn test_lock_cleanup_on_guard_drop() { + use crate::lock::FullRowLock; + use crate::lock::RowLock; + + let lock_map: Arc> = Arc::new(LockMap::default()); + let pk = 42u64; + + // Create and insert a lock + let (lock_type, lock) = FullRowLock::with_lock(lock_map.next_id()); + let rw_lock = Arc::new(tokio::sync::RwLock::new(lock_type)); + lock_map.insert(pk, rw_lock); + + // Verify the lock is in the map + assert!(lock_map.get(&pk).is_some()); + + // Create a guard and drop it + { + let _guard = LockGuard::new(lock, lock_map.clone(), pk); + } + + // Verify the lock entry was removed from the map + assert!(lock_map.get(&pk).is_none()); } } diff --git a/src/lock/row_lock.rs b/src/lock/row_lock.rs index df63318..e874373 100644 --- a/src/lock/row_lock.rs +++ b/src/lock/row_lock.rs @@ -1,7 +1,10 @@ -use crate::lock::{Lock, LockGuard, LockWait}; use std::collections::HashSet; +use std::fmt::Debug; +use std::hash::Hash; use std::sync::Arc; +use crate::lock::{Lock, LockGuard, LockMap, LockWait}; + pub trait RowLock { /// Checks if any column of this row is locked. fn is_locked(&self) -> bool; @@ -32,9 +35,14 @@ impl FullRowLock { self.l.unlock(); } - /// Creates a [`LockGuard`] that will automatically unlock this lock when dropped. - pub fn guard(self) -> LockGuard { - self.l.guard() + /// Creates a [`LockGuard`] that will automatically unlock this lock when + /// dropped. + pub fn guard( + self, + lock_map: Arc>, + primary_key: PrimaryKey, + ) -> LockGuard { + LockGuard::new(self.l, lock_map, primary_key) } pub fn wait(&self) -> LockWait { diff --git a/src/table/vacuum/vacuum.rs b/src/table/vacuum/vacuum.rs index 082b6b2..e93291a 100644 --- a/src/table/vacuum/vacuum.rs +++ b/src/table/vacuum/vacuum.rs @@ -16,7 +16,8 @@ use rkyv::util::AlignedVec; use rkyv::{Archive, Deserialize, Serialize}; use crate::in_memory::{ArchivedRowWrapper, DataPages, RowWrapper, StorableRow}; -use crate::prelude::{Lock, LockMap, OffsetEqLink, RowLock, TablePrimaryKey}; +use crate::lock::{Lock, LockMap, RowLock}; +use crate::prelude::{OffsetEqLink, TablePrimaryKey}; use crate::vacuum::VacuumStats; use crate::vacuum::WorkTableVacuum; use crate::vacuum::fragmentation_info::FragmentationInfo; @@ -247,13 +248,13 @@ where drop(range); for (from_link, pk) in links { - let _guard = self.full_row_lock(&pk).await.guard(); + let lock = self.full_row_lock(&pk).await; if self .data_pages .with_ref(from_link.0, |r| r.is_deleted()) .expect("link should be valid") { - drop(_guard); + lock.unlock(); self.lock_manager.remove_with_lock_check(&pk); continue; } @@ -269,8 +270,9 @@ where .save_raw_row(&raw_data) .expect("page is not full as checked on links collection"); self.update_index_after_move(pk.clone(), from_link.0, new_link); + + lock.unlock(); self.lock_manager.remove_with_lock_check(&pk); - drop(_guard); } (from_page_will_be_moved, to_page_will_be_filled) From 0526f269d31d210056d7b574deabc6eb9c1bfdb1 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 9 Feb 2026 10:49:45 +0300 Subject: [PATCH 3/5] correction --- .../src/worktable/generator/queries/update.rs | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index 360b6f7..f5621ad 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -63,23 +63,25 @@ impl Generator { quote! {} } else { quote! { - drop(_guard); - let op_lock = { #full_row_lock }; - let _guard = LockGuard::new( - op_lock, - self.0.lock_manager.clone(), - pk.clone(), - ); - let row_old = self.0.data.select_non_ghosted(link)?; - if let Err(e) = self.reinsert(row_old, row).await { - self.0.update_state.remove(&pk); + if true { + drop(_guard); + let op_lock = { #full_row_lock }; + let _guard = LockGuard::new( + op_lock, + self.0.lock_manager.clone(), + pk.clone(), + ); + let row_old = self.0.data.select_non_ghosted(link)?; + if let Err(e) = self.reinsert(row_old, row).await { + self.0.update_state.remove(&pk); - return Err(e); - } + return Err(e); + } - self.0.update_state.remove(&pk); + self.0.update_state.remove(&pk); - return core::result::Result::Ok(()); + return core::result::Result::Ok(()); + } } }; @@ -524,7 +526,6 @@ impl Generator { }) .collect::>(); - let custom_lock_for_size_check = self.gen_custom_lock_for_update(lock_ident.clone()); let size_check = if let Some(f) = unsized_fields { let fields_check: Vec<_> = f .iter() @@ -595,7 +596,7 @@ impl Generator { for link in links.iter() { let pk = self.0.data.select_non_ghosted(*link)?.get_primary_key().clone(); let __op_lock = { #custom_lock }; - guards.insert(pk, LockGuard::new(__op_lock, self.0.lock_manager.clone(), pk.clone())); + guards.insert(pk.clone(), LockGuard::new(__op_lock, self.0.lock_manager.clone(), pk.clone())); } let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); From a02fe80083fea402a244a3870f0336812c436883 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 9 Feb 2026 10:59:37 +0300 Subject: [PATCH 4/5] corrections --- codegen/src/worktable/generator/queries/delete.rs | 4 ++-- codegen/src/worktable/generator/queries/in_place.rs | 4 ++-- codegen/src/worktable/generator/queries/update.rs | 4 ++-- src/lib.rs | 4 ++-- src/lock/mod.rs | 13 +++++++------ 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/codegen/src/worktable/generator/queries/delete.rs b/codegen/src/worktable/generator/queries/delete.rs index e27fdc4..4b2f794 100644 --- a/codegen/src/worktable/generator/queries/delete.rs +++ b/codegen/src/worktable/generator/queries/delete.rs @@ -44,9 +44,9 @@ impl Generator { where #pk_ident: From { let pk: #pk_ident = pk.into(); - let __op_lock = { #full_row_lock }; + let op_lock = { #full_row_lock }; let _guard = LockGuard::new( - __op_lock, + op_lock, self.0.lock_manager.clone(), pk.clone(), ); diff --git a/codegen/src/worktable/generator/queries/in_place.rs b/codegen/src/worktable/generator/queries/in_place.rs index 5b734ee..a3aa6ea 100644 --- a/codegen/src/worktable/generator/queries/in_place.rs +++ b/codegen/src/worktable/generator/queries/in_place.rs @@ -109,9 +109,9 @@ impl Generator { where #pk_type: From { let pk: #pk_type = by.into(); - let __op_lock = { #custom_lock }; + let op_lock = { #custom_lock }; let _guard = LockGuard::new( - __op_lock, + op_lock, self.0.lock_manager.clone(), pk.clone(), ); diff --git a/codegen/src/worktable/generator/queries/update.rs b/codegen/src/worktable/generator/queries/update.rs index f5621ad..fa4161f 100644 --- a/codegen/src/worktable/generator/queries/update.rs +++ b/codegen/src/worktable/generator/queries/update.rs @@ -595,8 +595,8 @@ impl Generator { let mut guards: std::collections::HashMap<_, _> = std::collections::HashMap::new(); for link in links.iter() { let pk = self.0.data.select_non_ghosted(*link)?.get_primary_key().clone(); - let __op_lock = { #custom_lock }; - guards.insert(pk.clone(), LockGuard::new(__op_lock, self.0.lock_manager.clone(), pk.clone())); + let op_lock = { #custom_lock }; + guards.insert(pk.clone(), LockGuard::new(op_lock, self.0.lock_manager.clone(), pk.clone())); } let links: Vec<_> = self.0.indexes.#index.get(#by).map(|(_, l)| l.0).collect(); diff --git a/src/lib.rs b/src/lib.rs index 16a19cf..230b259 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,9 +19,9 @@ pub mod prelude { pub use crate::in_memory::{ ArchivedRowWrapper, Data, DataPages, Query, RowWrapper, StorableRow, }; - pub use crate::lock::{LockGuard, LockMap}; - pub use crate::lock::{Lock, RowLock}; pub use crate::lock::FullRowLock; + pub use crate::lock::{Lock, RowLock}; + pub use crate::lock::{LockGuard, LockMap}; pub use crate::mem_stat::MemStat; pub use crate::persistence::{ DeleteOperation, IndexTableOfContents, InsertOperation, Operation, OperationId, diff --git a/src/lock/mod.rs b/src/lock/mod.rs index 6951744..7cee60b 100644 --- a/src/lock/mod.rs +++ b/src/lock/mod.rs @@ -19,8 +19,9 @@ pub use row_lock::{FullRowLock, RowLock}; /// RAII guard that automatically unlocks a [`Lock`] when dropped. /// -/// The [`Lock`] is automatically released when the guard is dropped, or can be -/// explicitly released early using the `unlock()` method. +/// The [`Lock`] is automatically released when the [`LockGuard`] is +/// [`Drop`]ped, or can be explicitly released early using the `unlock()` +/// method. /// /// The guard will also attempt to remove the lock entry from the map on drop /// (preventing memory leaks). @@ -28,7 +29,7 @@ pub struct LockGuard { lock: Arc, lock_map: Arc>, primary_key: PrimaryKey, - /// Marker to make this type `!Sync` (but still `Send`) + /// Marker to make this type ![`Sync`] (but still [`Send`]) _not_sync: PhantomData>, } @@ -37,7 +38,8 @@ where LockType: RowLock, PrimaryKey: Hash + Eq + Debug + Clone, { - /// Creates a new guard that will clean up the lock entry from the map on drop. + /// Creates a new [`LockGuard`] that will clean up the [`Lock`] entry from + /// the [`LockMap`] on [`Drop`]. pub fn new( lock: Arc, lock_map: Arc>, @@ -51,7 +53,7 @@ where } } - /// Explicitly unlocks the lock before the guard is dropped. + /// Explicitly unlocks the [`Lock`] before the [`LockGuard`] is [`Drop`]ped. pub fn unlock(self) { self.lock.unlock(); self.lock_map.remove_with_lock_check(&self.primary_key); @@ -65,7 +67,6 @@ where { fn drop(&mut self) { self.lock.unlock(); - // Remove from lock map after unlocking self.lock_map.remove_with_lock_check(&self.primary_key); } } From 9515b2f282859e8225b4b899598a95aa475ddc91 Mon Sep 17 00:00:00 2001 From: Handy-caT <37216852+Handy-caT@users.noreply.github.com> Date: Mon, 9 Feb 2026 11:01:40 +0300 Subject: [PATCH 5/5] bump --- Cargo.toml | 4 ++-- codegen/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 48fd4ca..08c22d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,7 +3,7 @@ members = ["codegen", "examples", "performance_measurement", "performance_measur [package] name = "worktable" -version = "0.8.22" +version = "0.8.23" edition = "2024" authors = ["Handy-caT"] license = "MIT" @@ -16,7 +16,7 @@ perf_measurements = ["dep:performance_measurement", "dep:performance_measurement # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -worktable_codegen = { path = "codegen", version = "=0.8.22" } +worktable_codegen = { path = "codegen", version = "=0.8.23" } async-trait = "0.1.89" eyre = "0.6.12" diff --git a/codegen/Cargo.toml b/codegen/Cargo.toml index 5c05d55..ec2d99f 100644 --- a/codegen/Cargo.toml +++ b/codegen/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "worktable_codegen" -version = "0.8.22" +version = "0.8.23" edition = "2024" license = "MIT" description = "WorkTable codegeneration crate"