diff --git a/storage/src/compaction/compact.rs b/storage/src/compaction/compact.rs index da4cd5b..443cab8 100644 --- a/storage/src/compaction/compact.rs +++ b/storage/src/compaction/compact.rs @@ -111,6 +111,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -139,11 +140,13 @@ mod tests { #[test] fn scans_through_memtables_l0_l1_sstables() { let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap()); + let config = Config { sst_size: 4, block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -206,6 +209,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); let mut entries = get_entries(); @@ -262,6 +266,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); diff --git a/storage/src/compaction/flush.rs b/storage/src/compaction/flush.rs index e63f417..161a8a6 100644 --- a/storage/src/compaction/flush.rs +++ b/storage/src/compaction/flush.rs @@ -1,5 +1,7 @@ use anyhow::Result; use std::{ + fs, + path::Path, sync::Arc, thread::{self, JoinHandle}, time::Duration, @@ -15,11 +17,11 @@ impl Storage { let mut sst_builder = SSTableBuilder::new(self.config.block_size); let mut guard = self.state.write().unwrap(); - let mut memtables = guard.frozen_memtables.clone(); + let mut frozen_memtables = guard.frozen_memtables.clone(); let mut l0_sstables = guard.l0_sstables.clone(); let mut sstables = guard.sstables.clone(); - let Some(memtable) = memtables.pop() else { + let Some(memtable) = frozen_memtables.pop() else { return Ok(()); }; memtable.flush(&mut sst_builder)?; @@ -32,12 +34,16 @@ impl Storage { l0_sstables.insert(0, memtable.id); sstables.insert(memtable.id, Arc::new(sst)); + if self.config.enable_wal { + self.remove_wal(memtable.id)?; + } + *guard = Arc::new(StorageState { - memtable: guard.memtable.clone(), - frozen_memtables: memtables, - levels: guard.levels.clone(), + frozen_memtables, l0_sstables, sstables, + levels: guard.levels.clone(), + memtable: guard.memtable.clone(), }); memtable.id @@ -61,6 +67,13 @@ impl Storage { Ok(()) } + fn remove_wal(&self, id: usize) -> Result<()> { + let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal")); + fs::remove_file(wal_path)?; + + Ok(()) + } + pub fn sst_path(&self, id: usize) -> String { format!("{}/{}.sst", self.config.db_dir, id) } diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index 473edb5..7bf2413 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -48,11 +48,14 @@ pub struct Config { pub block_size: usize, pub num_memtable_limit: usize, pub db_dir: String, + pub enable_wal: bool, } pub fn new(config: Config) -> Arc { + let state_lock = Mutex::new(()); let block_cache = Arc::new(BlockCache::new(4096)); let db_dir = Path::new(&config.db_dir); + create_db_dir(db_dir); let manifest; @@ -66,26 +69,74 @@ pub fn new(config: Config) -> Arc { Err(_) => manifest = Manifest::create(manifest_file).unwrap(), } - let (l0_sst_ids, l1_sst_ids, sstables) = + let (memtable_ids, l0_sst_ids, l1_sst_ids, sstables) = load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables"); - - let sst_id = match ([&l0_sst_ids[..], &l1_sst_ids[..]].concat()).iter().max() { + let sst_id = match ([&memtable_ids[..], &l0_sst_ids[..], &l1_sst_ids[..]].concat()) + .iter() + .max() + { Some(id) => id + 1, None => 0, }; + let (memtable, frozen_memtables) = match config.enable_wal { + true => { + let wal_path = db_dir.join(format!("{sst_id}.wal")); + if memtable_ids.is_empty() { + let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path()).expect(""); + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + + (Arc::new(memtable), vec![]) + } else { + let mut memtables = vec![]; + for id in memtable_ids { + let wal_path = db_dir.join(format!("{id}.wal")); + let memtable = + Memtable::new_with_wal(id, wal_path.as_path()).expect("created memtable"); + memtables.push(Arc::new(memtable)); + } + + (memtables.remove(0), memtables) + } + } + _ => { + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + + (Arc::new(Memtable::new(sst_id)), vec![]) + } + }; + + if memtable.get_size() == 0 { + manifest + .add_record( + &state_lock.lock().unwrap(), + ManifestRecord::NewMemtable(sst_id), + ) + .expect("added manifest record"); + } + Arc::new(Storage { config, - sst_id: AtomicUsize::new(sst_id), - state_lock: Mutex::new(()), + state_lock, manifest, + sst_id: AtomicUsize::new(sst_id), block_cache: Arc::new(BlockCache::new(1 << 20)), // 1mb cache state: RwLock::new(Arc::new(StorageState { - l0_sstables: l0_sst_ids, sstables, + frozen_memtables, + memtable, + l0_sstables: l0_sst_ids, levels: vec![(0, l1_sst_ids)], - frozen_memtables: Vec::new(), - memtable: Arc::new(Memtable::new(sst_id)), })), }) } @@ -100,7 +151,7 @@ impl Storage { size = guard.memtable.get_size(); } - self.try_freeze(size); + self.try_freeze(size)?; Ok(()) } @@ -231,16 +282,19 @@ impl Storage { Ok(LsmIterator::new(mem_l0_l1, map_bound(upper))) } - fn try_freeze(&self, size: usize) { + fn try_freeze(&self, size: usize) -> Result<()> { if size >= self.config.sst_size { let lock = self.state_lock.lock().unwrap(); - self.freeze(&lock); + self.freeze(&lock)?; } + + Ok(()) } - fn freeze(&self, _state_lock: &MutexGuard<()>) { + fn freeze(&self, state_lock: &MutexGuard<()>) -> Result<()> { let mut guard = self.state.write().unwrap(); let memtable = guard.memtable.clone(); + memtable.sync_wal()?; // check again, another thread might have frozen the memtable already. if memtable.get_size() >= self.config.sst_size { @@ -248,9 +302,12 @@ impl Storage { frozen_memtables.insert(0, memtable); let id = self.get_sst_id(); + self.manifest + .add_record(state_lock, ManifestRecord::NewMemtable(id))?; + let memtable = self.create_memtable(id)?; *guard = Arc::new(StorageState { - memtable: Arc::new(Memtable::new(id)), + memtable: Arc::new(memtable), frozen_memtables, l0_sstables: guard.l0_sstables.clone(), sstables: guard.sstables.clone(), @@ -259,12 +316,27 @@ impl Storage { drop(guard); } + + Ok(()) + } + + fn create_memtable(&self, id: usize) -> Result { + let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal")); + let memtable = match self.config.enable_wal { + true => Memtable::new_with_wal(id, wal_path.as_path()).expect("memtable with wal"), + _ => Memtable::new(id), + }; + + Ok(memtable) } pub(crate) fn get_sst_id(&self) -> usize { self.sst_id.fetch_add(1, SeqCst); self.sst_id.load(SeqCst) } + pub fn sync(&self) -> Result<()> { + self.state.read().unwrap().memtable.sync_wal() + } } #[cfg(test)] @@ -284,6 +356,7 @@ mod tests { block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -303,6 +376,7 @@ mod tests { block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -328,6 +402,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -345,6 +420,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -360,6 +436,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -376,6 +453,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -398,6 +476,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -422,6 +501,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; @@ -472,6 +552,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -496,6 +577,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")]; @@ -516,7 +598,7 @@ mod tests { while iter.is_valid() { let k = from_utf8(iter.key()).unwrap(); let v = from_utf8(iter.value()).unwrap(); - + println!("key: {:?}, value: {:?}", k, v); keys.push(String::from(k)); values.push(String::from(v)); @@ -535,6 +617,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -593,6 +676,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); @@ -623,6 +707,7 @@ mod tests { block_size: 32, db_dir: db_dir.clone(), num_memtable_limit: 5, + enable_wal: true, }; let storage = new(config); diff --git a/storage/src/lsm_util.rs b/storage/src/lsm_util.rs index 3d2a023..d83d0df 100644 --- a/storage/src/lsm_util.rs +++ b/storage/src/lsm_util.rs @@ -1,21 +1,35 @@ use anyhow::Result; -use std::{collections::HashMap, fs, path::Path, sync::Arc}; +use std::{ + collections::{HashMap, HashSet}, + fs, + path::Path, + sync::Arc, +}; use crate::{FileObject, SSTable, manifest::ManifestRecord, sst::BlockCache}; -type LoadedSstables = (Vec, Vec, HashMap>); +type RecoveredState = ( + Vec, + Vec, + Vec, + HashMap>, +); pub(crate) fn load_sstables( path: &Path, block_cache: Arc, manifest_recs: Vec, -) -> Result { +) -> Result { let mut l0 = vec![]; let mut l1 = vec![]; + let mut memtables = vec![]; let mut sstables = HashMap::new(); for record in manifest_recs { match record { + ManifestRecord::NewMemtable(id) => { + memtables.insert(0, id); + } ManifestRecord::Flush(sst_id) => { l0.insert(0, sst_id); } @@ -26,8 +40,11 @@ pub(crate) fn load_sstables( // we only support full compaction which means there's only one l1 sstable l1 = vec![sst_id] } - _ => {} } + + // filter out memtables flushed to l0 + let l0_set: HashSet<_> = l0.iter().collect(); + memtables.retain(|x| !l0_set.contains(x)); } for l0_sst_id in &l0 { @@ -46,7 +63,7 @@ pub(crate) fn load_sstables( sstables.insert(sst.id, Arc::new(sst)); } - anyhow::Ok((l0, l1, sstables)) + anyhow::Ok((memtables, l0, l1, sstables)) } pub(crate) fn create_db_dir(path: &Path) { diff --git a/storage/src/memtable/table.rs b/storage/src/memtable/table.rs index 3f710a1..183e036 100644 --- a/storage/src/memtable/table.rs +++ b/storage/src/memtable/table.rs @@ -1,5 +1,6 @@ use std::{ ops::Bound, + path::Path, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -10,24 +11,42 @@ use anyhow::Result; use bytes::Bytes; use crossbeam_skiplist::SkipMap; -use crate::{SSTableBuilder, memtable::memtable_iterator::MemtableIterator}; +use crate::{SSTableBuilder, memtable::memtable_iterator::MemtableIterator, wal::Wal}; #[derive(Debug, Clone)] pub struct Memtable { pub(crate) size: Arc, pub(crate) id: usize, skip_map: Arc>, + wal: Option, } impl Memtable { + pub fn new_with_wal(id: usize, wal_path: &Path) -> Result { + let skip_map = SkipMap::new(); + let (size, wal) = Wal::recover(wal_path, &skip_map)?; + + Ok(Self { + id, + wal: Some(wal), + skip_map: Arc::new(skip_map), + size: Arc::new(AtomicUsize::new(size as usize)), + }) + } + pub fn new(id: usize) -> Self { Self { + wal: None, skip_map: Arc::new(SkipMap::new()), size: Arc::new(AtomicUsize::new(0)), id, } } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { + if let Some(wal) = &self.wal { + wal.put(key, value)?; + }; + self.skip_map .insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); @@ -49,6 +68,13 @@ impl Memtable { MemtableIterator::create(self.skip_map.clone(), lower, upper) } + pub fn sync_wal(&self) -> Result<()> { + if let Some(ref wal) = self.wal { + wal.sync()?; + } + Ok(()) + } + pub fn flush(&self, builder: &mut SSTableBuilder) -> Result<()> { for entry in self.skip_map.iter() { if entry.value().is_empty() { diff --git a/storage/src/wal.rs b/storage/src/wal.rs index ebf8a19..6c5640b 100644 --- a/storage/src/wal.rs +++ b/storage/src/wal.rs @@ -6,6 +6,7 @@ use std::io::{BufWriter, Read, Write}; use std::path::Path; use std::sync::{Arc, Mutex}; +#[derive(Clone, Debug)] pub struct Wal { file: Arc>>, } @@ -14,7 +15,7 @@ impl Wal { pub fn create(path: impl AsRef) -> Result { let mut open_opts = OpenOptions::new(); open_opts.read(true).write(true).create(true); - let mut file = open_opts.open(path)?; + let file = open_opts.open(path)?; let buf_writer = BufWriter::new(file); Ok(Wal { @@ -22,8 +23,12 @@ impl Wal { }) } - pub fn recover(path: impl AsRef, skiplist: &SkipMap) -> Result { + pub fn recover( + path: impl AsRef, + skiplist: &SkipMap, + ) -> Result<(u16, Self)> { let mut open_opts = OpenOptions::new(); + let mut size = 0; open_opts.read(true).write(true).create(true); let mut file = open_opts.open(path)?; @@ -35,18 +40,23 @@ impl Wal { let len = buf_ptr.get_u16(); let key = &buf_ptr[..len as usize]; buf_ptr.advance(len as usize); + size += len; let len = buf_ptr.get_u16(); let value = &buf_ptr[..len as usize]; buf_ptr.advance(len as usize); + size += len; skiplist.insert(Bytes::copy_from_slice(key), Bytes::copy_from_slice(value)); } let buf_writer = BufWriter::new(file); - Ok(Wal { - file: Arc::new(Mutex::new(buf_writer)), - }) + Ok(( + size, + Wal { + file: Arc::new(Mutex::new(buf_writer)), + }, + )) } pub fn put(&self, key: &[u8], value: &[u8]) -> Result<()> { @@ -95,6 +105,7 @@ mod test { for entry in memtable.iter() { res.push((entry.key().to_vec(), entry.value().to_vec())); } + assert_eq!(memtable.len(), res.len()); let res_slices: Vec<(&[u8], &[u8])> = res .iter() diff --git a/storage/tests/storage.rs b/storage/tests/storage.rs index 48cd531..433a489 100644 --- a/storage/tests/storage.rs +++ b/storage/tests/storage.rs @@ -14,6 +14,7 @@ fn get_returns_latest_entry() { block_size: 32, db_dir: get_temp_dir(), num_memtable_limit: 5, + enable_wal: true, }; let storage = cabin_storage::new(config); let entries = vec![ @@ -38,6 +39,7 @@ fn can_read_frozen_memtable() { block_size: 32, db_dir: get_temp_dir(), num_memtable_limit: 5, + enable_wal: true, }; let storage = cabin_storage::new(config); let entries = vec![(b"1", b"20"), (b"2", b"21"), (b"3", b"22"), (b"4", b"23")]; @@ -58,6 +60,7 @@ fn get_invalid_key() { block_size: 32, db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()), num_memtable_limit: 5, + enable_wal: true, }; let storage = cabin_storage::new(config); @@ -71,6 +74,7 @@ fn scan_items() { block_size: 32, db_dir: get_temp_dir(), num_memtable_limit: 5, + enable_wal: true, }; let storage = cabin_storage::new(config); let entries = vec![