Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions storage/src/block/iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use bytes::{Buf, Bytes};
use bytes::Buf;

use crate::block::Block;

Expand Down Expand Up @@ -134,7 +134,7 @@ mod test {
let value = block_iterator.value();

keys_res.push(key[0]);
values_res.push(from_utf8(&value).unwrap().to_string());
values_res.push(from_utf8(value).unwrap().to_string());

block_iterator.next();
}
Expand Down Expand Up @@ -176,7 +176,7 @@ mod test {
let value = block_iterator.value();

keys_res.push(key[0]);
values_res.push(from_utf8(&value).unwrap().to_string());
values_res.push(from_utf8(value).unwrap().to_string());

block_iterator.next();
}
Expand Down
40 changes: 32 additions & 8 deletions storage/src/iterators/lsm_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
use anyhow::Ok;
use std::ops::Bound;

use anyhow::{Ok, Result};
use bytes::Bytes;

use crate::{
common::iterator::StorageIterator, iterators::merged_iterator::MergedIterator,
SSTableIterator,
common::iterator::StorageIterator,
iterators::{merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator},
memtable::memtable_iterator::MemtableIterator,
};

type LsmIteratorInner = MergedIterator<MemtableIterator>;
type LsmIteratorInner =
TwoMergeIterator<MergedIterator<MemtableIterator>, MergedIterator<SSTableIterator>>;

pub struct LsmIterator {
inner: LsmIteratorInner,
end_bound: Bound<Bytes>,
valid: bool,
}

impl LsmIterator {
pub fn new(iter: LsmIteratorInner) -> Self {
let mut lsm_iterator = Self { inner: iter };
pub fn new(iter: LsmIteratorInner, upper: Bound<Bytes>) -> Self {
let mut lsm_iterator = Self {
inner: iter,
end_bound: upper,
valid: true,
};
lsm_iterator.skip_deleted();

lsm_iterator
Expand All @@ -36,12 +48,24 @@ impl StorageIterator for LsmIterator {
}

fn is_valid(&self) -> bool {
self.inner.is_valid()
self.valid
}

fn next(&mut self) -> anyhow::Result<()> {
let _ = self.inner.next();
fn next(&mut self) -> Result<()> {
self.inner.next()?;

if !self.inner.is_valid() {
self.valid = false;
return Ok(());
}

self.skip_deleted();

match self.end_bound.as_ref() {
Bound::Unbounded => {}
Bound::Included(key) => self.valid = self.inner.key() <= &key[..],
Bound::Excluded(key) => self.valid = self.inner.key() < &key[..],
};
Ok(())
}
}
11 changes: 7 additions & 4 deletions storage/src/iterators/two_merge_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{cmp::Ordering, str::from_utf8};
use std::cmp::Ordering;

use anyhow::{Ok, Result};

Expand All @@ -18,7 +18,7 @@ impl<T: 'static + StorageIterator, W: 'static + StorageIterator> TwoMergeIterato
match mem_iter.key().cmp(sst_iter.key()) {
Ordering::Equal => mem_iter.is_valid(),
Ordering::Less => mem_iter.is_valid(),
Ordering::Greater => false,
Ordering::Greater => !sst_iter.is_valid(), // use mem_iter if sst_iter is invalid
}
};

Expand Down Expand Up @@ -48,7 +48,10 @@ impl<T: 'static + StorageIterator, W: 'static + StorageIterator> StorageIterator
}

fn is_valid(&self) -> bool {
self.mem_iter.is_valid() || self.sst_iter.is_valid()
match self.is_mem {
true => self.mem_iter.is_valid(),
_ => self.sst_iter.is_valid(),
}
}

fn next(&mut self) -> Result<()> {
Expand All @@ -67,7 +70,7 @@ impl<T: 'static + StorageIterator, W: 'static + StorageIterator> StorageIterator
match self.mem_iter.key().cmp(self.sst_iter.key()) {
Ordering::Equal => self.mem_iter.is_valid(),
Ordering::Less => self.mem_iter.is_valid(),
Ordering::Greater => false,
Ordering::Greater => !self.sst_iter.is_valid(),
}
};

Expand Down
74 changes: 61 additions & 13 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::{
collections::HashMap,
ops::Bound,
sync::{Arc, Mutex, MutexGuard, RwLock},
};

use crate::{
common::errors::KeyNotFound,
iterators::{lsm_iterator::LsmIterator, merged_iterator::MergedIterator},
memtable::table::Memtable,
SSTable, SSTableIterator,
common::{errors::KeyNotFound, iterator::StorageIterator},
iterators::{
lsm_iterator::LsmIterator, merged_iterator::MergedIterator,
two_merge_iterator::TwoMergeIterator,
},
memtable::{memtable_iterator::map_bound, table::Memtable},
sst::BlockCache,
};
use anyhow::Result;
use bytes::Bytes;

#[derive(Debug)]
pub struct Storage {
state: RwLock<StorageState>,
state: RwLock<Arc<StorageState>>,
block_cache: Arc<BlockCache>,
state_lock: Mutex<()>,
config: Config,
}
Expand All @@ -22,6 +29,8 @@ pub struct Storage {
struct StorageState {
memtable: Arc<Memtable>,
frozen_memtables: Vec<Arc<Memtable>>,
l0_sstables: Vec<usize>,
sstables: HashMap<usize, Arc<SSTable>>,
}

#[derive(Debug)]
Expand All @@ -33,10 +42,13 @@ pub fn new(config: Config) -> Storage {
Storage {
config,
state_lock: Mutex::new(()),
state: RwLock::new(StorageState {
block_cache: Arc::new(BlockCache::new(1 << 20)), // 4gb cache
state: RwLock::new(Arc::new(StorageState {
memtable: Arc::new(Memtable::default()),
frozen_memtables: Vec::new(),
}),
l0_sstables: vec![],
sstables: HashMap::new(),
})),
}
}

Expand Down Expand Up @@ -70,19 +82,46 @@ impl Storage {
Ok(value)
}

pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> LsmIterator {
let guard = self.state.read().unwrap();
pub fn scan(&self, lower: Bound<&[u8]>, upper: Bound<&[u8]>) -> Result<LsmIterator> {
let state = {
let guard = self.state.read().unwrap();
guard.clone()
};

let mut iters = vec![];

// insert memtables from newest to oldest
iters.push(guard.memtable.scan(lower, upper));
iters.push(state.memtable.scan(lower, upper));

let frozen_tables = &guard.frozen_memtables;
let frozen_tables = &state.frozen_memtables;
for frozen_table in frozen_tables {
iters.push(frozen_table.scan(lower, upper));
}
let mem_iters = MergedIterator::new(iters);

let mut table_iters = Vec::with_capacity(state.l0_sstables.len());
// TODO: only consider sstables that might contain the key
for table_id in state.l0_sstables.iter() {
let table = state.sstables[table_id].clone();
let iter = match lower {
Bound::Included(key) => SSTableIterator::create_and_seek_to_key(table, key)?,
Bound::Unbounded => SSTableIterator::create_and_seek_to_first(table)?,
Bound::Excluded(key) => {
let mut iter = SSTableIterator::create_and_seek_to_key(table, key)?;

if iter.is_valid() && iter.key() == key {
iter.next()?;
}
iter
}
};

LsmIterator::new(MergedIterator::new(iters))
table_iters.push(iter);
}

let sst_iters = MergedIterator::new(table_iters);
let inner = TwoMergeIterator::create(mem_iters, sst_iters).unwrap();
Ok(LsmIterator::new(inner, map_bound(upper)))
}

fn try_freeze(&self, size: usize) {
Expand All @@ -98,8 +137,17 @@ impl Storage {

// check again, another thread might have frozen the memtable already.
if memtable.get_size() >= self.config.sst_size {
guard.frozen_memtables.insert(0, memtable);
guard.memtable = Arc::new(Memtable::default());
let mut frozen_memtables = guard.frozen_memtables.clone();
frozen_memtables.insert(0, memtable);

*guard = Arc::new(StorageState {
memtable: Arc::new(Memtable::default()),
frozen_memtables,
l0_sstables: guard.l0_sstables.clone(),
sstables: guard.sstables.clone(),
});

drop(guard);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion storage/src/sst/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ mod table;
pub use builder::SSTableBuilder;
pub use file::FileObject;
pub use iterator::SSTableIterator;
pub use table::SSTable;
pub use table::{BlockCache, SSTable};
2 changes: 1 addition & 1 deletion storage/tests/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ fn scan_items() {
let mut expected_keys = vec![];
let mut expected_values = vec![];

let mut iter = storage.scan(Unbounded, Unbounded);
let mut iter = storage.scan(Unbounded, Unbounded).unwrap();
while iter.is_valid() {
expected_keys.push(iter.key().to_vec());
expected_values.push(iter.value().to_vec());
Expand Down
11 changes: 0 additions & 11 deletions storage/tests/two_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,17 +117,6 @@ fn create_empty_merged_iter() -> MergedIterator<MemtableIterator> {
MergedIterator::new(iterators)
}

fn create_empty_sst_iter() -> SSTableIterator {
let mut sst_builder = SSTableBuilder::new(32); // block size of 32 bytes
let block_cache = Arc::new(Cache::new(1));
let tmp_file = NamedTempFile::new().unwrap();
let sst = sst_builder
.build(1, block_cache.clone(), tmp_file.path())
.unwrap();

SSTableIterator::create_and_seek_to_first(Arc::new(sst)).unwrap()
}

fn get_sst_entries() -> Vec<(&'static [u8], &'static [u8])> {
vec![
(b"b", b"-2"),
Expand Down
Loading