diff --git a/storage/src/block/iterator.rs b/storage/src/block/iterator.rs index fb9103d..a12693c 100644 --- a/storage/src/block/iterator.rs +++ b/storage/src/block/iterator.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use bytes::{Buf, Bytes}; +use bytes::Buf; use crate::block::Block; @@ -134,7 +134,7 @@ mod test { let value = block_iterator.value(); keys_res.push(key[0]); - values_res.push(from_utf8(&value).unwrap().to_string()); + values_res.push(from_utf8(value).unwrap().to_string()); block_iterator.next(); } @@ -176,7 +176,7 @@ mod test { let value = block_iterator.value(); keys_res.push(key[0]); - values_res.push(from_utf8(&value).unwrap().to_string()); + values_res.push(from_utf8(value).unwrap().to_string()); block_iterator.next(); } diff --git a/storage/src/iterators/lsm_iterator.rs b/storage/src/iterators/lsm_iterator.rs index a1e6de4..72c2cfa 100644 --- a/storage/src/iterators/lsm_iterator.rs +++ b/storage/src/iterators/lsm_iterator.rs @@ -1,19 +1,31 @@ -use anyhow::Ok; +use std::ops::Bound; + +use anyhow::{Ok, Result}; +use bytes::Bytes; use crate::{ - common::iterator::StorageIterator, iterators::merged_iterator::MergedIterator, + SSTableIterator, + common::iterator::StorageIterator, + iterators::{merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator}, memtable::memtable_iterator::MemtableIterator, }; -type LsmIteratorInner = MergedIterator; +type LsmIteratorInner = + TwoMergeIterator, MergedIterator>; pub struct LsmIterator { inner: LsmIteratorInner, + end_bound: Bound, + valid: bool, } impl LsmIterator { - pub fn new(iter: LsmIteratorInner) -> Self { - let mut lsm_iterator = Self { inner: iter }; + pub fn new(iter: LsmIteratorInner, upper: Bound) -> Self { + let mut lsm_iterator = Self { + inner: iter, + end_bound: upper, + valid: true, + }; lsm_iterator.skip_deleted(); lsm_iterator @@ -36,12 +48,24 @@ impl StorageIterator for LsmIterator { } fn is_valid(&self) -> bool { - self.inner.is_valid() + self.valid } - fn next(&mut self) -> anyhow::Result<()> { - let _ = self.inner.next(); + fn next(&mut self) -> Result<()> { + self.inner.next()?; + + if !self.inner.is_valid() { + self.valid = false; + return Ok(()); + } + self.skip_deleted(); + + match self.end_bound.as_ref() { + Bound::Unbounded => {} + Bound::Included(key) => self.valid = self.inner.key() <= &key[..], + Bound::Excluded(key) => self.valid = self.inner.key() < &key[..], + }; Ok(()) } } diff --git a/storage/src/iterators/two_merge_iterator.rs b/storage/src/iterators/two_merge_iterator.rs index e60bdc6..6835b50 100644 --- a/storage/src/iterators/two_merge_iterator.rs +++ b/storage/src/iterators/two_merge_iterator.rs @@ -1,4 +1,4 @@ -use std::{cmp::Ordering, str::from_utf8}; +use std::cmp::Ordering; use anyhow::{Ok, Result}; @@ -18,7 +18,7 @@ impl TwoMergeIterato match mem_iter.key().cmp(sst_iter.key()) { Ordering::Equal => mem_iter.is_valid(), Ordering::Less => mem_iter.is_valid(), - Ordering::Greater => false, + Ordering::Greater => !sst_iter.is_valid(), // use mem_iter if sst_iter is invalid } }; @@ -48,7 +48,10 @@ impl StorageIterator } fn is_valid(&self) -> bool { - self.mem_iter.is_valid() || self.sst_iter.is_valid() + match self.is_mem { + true => self.mem_iter.is_valid(), + _ => self.sst_iter.is_valid(), + } } fn next(&mut self) -> Result<()> { @@ -67,7 +70,7 @@ impl StorageIterator match self.mem_iter.key().cmp(self.sst_iter.key()) { Ordering::Equal => self.mem_iter.is_valid(), Ordering::Less => self.mem_iter.is_valid(), - Ordering::Greater => false, + Ordering::Greater => !self.sst_iter.is_valid(), } }; diff --git a/storage/src/lsm_storage.rs b/storage/src/lsm_storage.rs index d99b992..64c1711 100644 --- a/storage/src/lsm_storage.rs +++ b/storage/src/lsm_storage.rs @@ -1,19 +1,26 @@ use std::{ + collections::HashMap, ops::Bound, sync::{Arc, Mutex, MutexGuard, RwLock}, }; use crate::{ - common::errors::KeyNotFound, - iterators::{lsm_iterator::LsmIterator, merged_iterator::MergedIterator}, - memtable::table::Memtable, + SSTable, SSTableIterator, + common::{errors::KeyNotFound, iterator::StorageIterator}, + iterators::{ + lsm_iterator::LsmIterator, merged_iterator::MergedIterator, + two_merge_iterator::TwoMergeIterator, + }, + memtable::{memtable_iterator::map_bound, table::Memtable}, + sst::BlockCache, }; use anyhow::Result; use bytes::Bytes; #[derive(Debug)] pub struct Storage { - state: RwLock, + state: RwLock>, + block_cache: Arc, state_lock: Mutex<()>, config: Config, } @@ -22,6 +29,8 @@ pub struct Storage { struct StorageState { memtable: Arc, frozen_memtables: Vec>, + l0_sstables: Vec, + sstables: HashMap>, } #[derive(Debug)] @@ -33,10 +42,13 @@ pub fn new(config: Config) -> Storage { Storage { config, state_lock: Mutex::new(()), - state: RwLock::new(StorageState { + block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache + state: RwLock::new(Arc::new(StorageState { memtable: Arc::new(Memtable::default()), frozen_memtables: Vec::new(), - }), + l0_sstables: vec![], + sstables: HashMap::new(), + })), } } @@ -70,19 +82,46 @@ impl Storage { Ok(value) } - pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> LsmIterator { - let guard = self.state.read().unwrap(); + pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result { + let state = { + let guard = self.state.read().unwrap(); + guard.clone() + }; + let mut iters = vec![]; // insert memtables from newest to oldest - iters.push(guard.memtable.scan(lower, upper)); + iters.push(state.memtable.scan(lower, upper)); - let frozen_tables = &guard.frozen_memtables; + let frozen_tables = &state.frozen_memtables; for frozen_table in frozen_tables { iters.push(frozen_table.scan(lower, upper)); } + let mem_iters = MergedIterator::new(iters); + + let mut table_iters = Vec::with_capacity(state.l0_sstables.len()); + // TODO: only consider sstables that might contain the key + for table_id in state.l0_sstables.iter() { + let table = state.sstables[table_id].clone(); + let iter = match lower { + Bound::Included(key) => SSTableIterator::create_and_seek_to_key(table, key)?, + Bound::Unbounded => SSTableIterator::create_and_seek_to_first(table)?, + Bound::Excluded(key) => { + let mut iter = SSTableIterator::create_and_seek_to_key(table, key)?; + + if iter.is_valid() && iter.key() == key { + iter.next()?; + } + iter + } + }; - LsmIterator::new(MergedIterator::new(iters)) + table_iters.push(iter); + } + + let sst_iters = MergedIterator::new(table_iters); + let inner = TwoMergeIterator::create(mem_iters, sst_iters).unwrap(); + Ok(LsmIterator::new(inner, map_bound(upper))) } fn try_freeze(&self, size: usize) { @@ -98,8 +137,17 @@ impl Storage { // check again, another thread might have frozen the memtable already. if memtable.get_size() >= self.config.sst_size { - guard.frozen_memtables.insert(0, memtable); - guard.memtable = Arc::new(Memtable::default()); + let mut frozen_memtables = guard.frozen_memtables.clone(); + frozen_memtables.insert(0, memtable); + + *guard = Arc::new(StorageState { + memtable: Arc::new(Memtable::default()), + frozen_memtables, + l0_sstables: guard.l0_sstables.clone(), + sstables: guard.sstables.clone(), + }); + + drop(guard); } } } diff --git a/storage/src/sst/mod.rs b/storage/src/sst/mod.rs index ebdb1e3..13b83ee 100644 --- a/storage/src/sst/mod.rs +++ b/storage/src/sst/mod.rs @@ -7,4 +7,4 @@ mod table; pub use builder::SSTableBuilder; pub use file::FileObject; pub use iterator::SSTableIterator; -pub use table::SSTable; +pub use table::{BlockCache, SSTable}; diff --git a/storage/tests/storage.rs b/storage/tests/storage.rs index 4cca8ca..dd3cf96 100644 --- a/storage/tests/storage.rs +++ b/storage/tests/storage.rs @@ -67,7 +67,7 @@ fn scan_items() { let mut expected_keys = vec![]; let mut expected_values = vec![]; - let mut iter = storage.scan(Unbounded, Unbounded); + let mut iter = storage.scan(Unbounded, Unbounded).unwrap(); while iter.is_valid() { expected_keys.push(iter.key().to_vec()); expected_values.push(iter.value().to_vec()); diff --git a/storage/tests/two_merge_iter.rs b/storage/tests/two_merge_iter.rs index da590d8..07b9e1f 100644 --- a/storage/tests/two_merge_iter.rs +++ b/storage/tests/two_merge_iter.rs @@ -117,17 +117,6 @@ fn create_empty_merged_iter() -> MergedIterator { MergedIterator::new(iterators) } -fn create_empty_sst_iter() -> SSTableIterator { - let mut sst_builder = SSTableBuilder::new(32); // block size of 32 bytes - let block_cache = Arc::new(Cache::new(1)); - let tmp_file = NamedTempFile::new().unwrap(); - let sst = sst_builder - .build(1, block_cache.clone(), tmp_file.path()) - .unwrap(); - - SSTableIterator::create_and_seek_to_first(Arc::new(sst)).unwrap() -} - fn get_sst_entries() -> Vec<(&'static [u8], &'static [u8])> { vec![ (b"b", b"-2"),