Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 131 additions & 4 deletions storage/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ impl Storage {
let guard = self.state.read().unwrap();
guard.clone()
};
let ssts_to_compact = state.l0_sstables.clone();
if ssts_to_compact.is_empty() {
let l0 = state.l0_sstables.clone();
if l0.is_empty() {
return Ok(());
}
let l1 = state.levels[0].1.clone();
let ssts_to_compact = [&l0[..], &l1[..]].concat();

let mut iters = vec![];
for sst_id in ssts_to_compact.iter() {
Expand All @@ -46,10 +48,15 @@ impl Storage {
let mut sstables = write_guard.sstables.clone();
let mut levels = write_guard.levels.clone();

for sst_id in ssts_to_compact.iter() {
for sst_id in l0.iter() {
sstables.remove(sst_id);
l0_sstables.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}

for sst_id in l1.iter() {
sstables.remove(sst_id);
levels[0].1.retain(|&x| x != *sst_id);
remove_file(self.sst_path(*sst_id))?;
}

Expand Down Expand Up @@ -81,9 +88,11 @@ impl Storage {

#[cfg(test)]
mod tests {
use std::{ops::Bound, str::from_utf8};

use tempfile::tempdir;

use crate::{Config, lsm_util::get_entries, new};
use crate::{Config, common::iterator::StorageIterator, lsm_util::get_entries, new};

#[test]
fn test_compaction() {
Expand Down Expand Up @@ -117,4 +126,122 @@ mod tests {
assert_eq!(curr_sst_count, initial_sst_count - 2);
assert_eq!(l1_entries.len(), 1);
}

#[test]
fn scans_through_memtables_l0_l1_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};
let storage = new(config);

for (key, value) in get_entries() {
storage.put(key, value).unwrap();
}

// will create two sstables at l0
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

// compact l0 sstables to an l1 sstable
storage.trigger_compaction().expect("compacted");

// create another l0 sstable
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");

let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")];
for (key, value) in new_entries {
let _ = storage.put(key, value);
}

let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
let mut keys = vec![];
let mut values = vec![];

while iter.is_valid() {
let k = from_utf8(iter.key()).unwrap();
let v = from_utf8(iter.value()).unwrap();

keys.push(String::from(k));
values.push(String::from(v));

let _ = iter.next();
}

assert_eq!(
keys,
vec!["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l"]
);
assert_eq!(
values,
vec![
"20", "23", "3", "22", "21", "6", "7", "8", "9", "10", "11", "12"
]
);
}

#[test]
fn test_can_latest_key_is_read_from_l1_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());
let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
};
let storage = new(config);
let mut entries = get_entries();

// adds a new version of key a=3
entries[2] = (b"a", b"3");

for (key, value) in entries {
storage.put(key, value).unwrap();
}

// will create an sstables at l0 with a, b
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
// compact l0 sstables to an l1 sstable
storage.trigger_compaction().expect("compacted");

// will create an sstable at l0 with a, c. a newer version of the initially stored a
storage
.flush_frozen_memtable()
.expect("memtable to have been frozen");
storage.trigger_compaction().expect("compacted");

let mut iter = storage.scan(Bound::Unbounded, Bound::Unbounded).unwrap();
let mut keys = vec![];
let mut values = vec![];

while iter.is_valid() {
let k = from_utf8(iter.key()).unwrap();
let v = from_utf8(iter.value()).unwrap();

keys.push(String::from(k));
values.push(String::from(v));

let _ = iter.next();
}

assert_eq!(
keys,
vec!["a", "b", "d", "e", "f", "g", "h", "i", "j", "k", "l"]
);
assert_eq!(
values,
vec!["3", "2", "4", "5", "6", "7", "8", "9", "10", "11", "12"]
);
}
}
56 changes: 33 additions & 23 deletions storage/src/iterators/concat_iterator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use anyhow::Ok;
use anyhow::{Ok, Result};

use crate::{SSTable, SSTableIterator, common::iterator::StorageIterator};

Expand All @@ -11,27 +11,27 @@ pub struct ConcatIterator {
}

impl ConcatIterator {
fn create_and_seek_to_first(sstables: Vec<Arc<SSTable>>) -> Self {
pub fn create_and_seek_to_first(sstables: Vec<Arc<SSTable>>) -> Result<Self> {
if sstables.is_empty() {
return Self {
return Ok(Self {
current: None,
next_sst_idx: 0,
sstables,
};
});
}

let current = sstables
.first()
.map(|table| SSTableIterator::create_and_seek_to_first(table.clone()).unwrap());

Self {
Ok(Self {
current,
next_sst_idx: 1,
sstables,
}
})
}

fn create_and_seek_to_key(sstables: Vec<Arc<SSTable>>, key: &[u8]) -> Self {
pub fn create_and_seek_to_key(sstables: Vec<Arc<SSTable>>, key: &[u8]) -> Result<Self> {
let mut idx = 0;
for table in sstables.iter() {
if key <= table.last_key() {
Expand All @@ -45,38 +45,48 @@ impl ConcatIterator {
.get(idx)
.map(|table| SSTableIterator::create_and_seek_to_key(table.clone(), key).unwrap());

Self {
Ok(Self {
current,
next_sst_idx: idx + 1,
sstables,
}
})
}
}

impl StorageIterator for ConcatIterator {
fn value(&self) -> &[u8] {
self.current.as_ref().unwrap().value()
fn key(&self) -> &[u8] {
match self.current.as_ref() {
Some(iter) => iter.key(),
None => &[],
}
}

fn key(&self) -> &[u8] {
self.current.as_ref().unwrap().key()
fn value(&self) -> &[u8] {
match self.current.as_ref() {
Some(iter) => iter.value(),
None => &[],
}
}

fn is_valid(&self) -> bool {
self.current.is_some()
}

fn next(&mut self) -> anyhow::Result<()> {
if self.current.as_ref().unwrap().is_valid() {
self.current = self
.sstables
.get(self.next_sst_idx)
.map(|sstable| SSTableIterator::create_and_seek_to_first(sstable.clone()).unwrap());

self.next_sst_idx += 1;
return Ok(());
match self.current.as_mut() {
Some(iter) => {
if !iter.is_valid() {
self.current = self.sstables.get(self.next_sst_idx).map(|sstable| {
SSTableIterator::create_and_seek_to_first(sstable.clone()).unwrap()
});

self.next_sst_idx += 1;
return Ok(());
}

iter.next()
}
_ => Ok(()),
}

self.current.as_mut().unwrap().next()
}
}
13 changes: 9 additions & 4 deletions storage/src/iterators/lsm_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@ use bytes::Bytes;
use crate::{
SSTableIterator,
common::iterator::StorageIterator,
iterators::{merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator},
iterators::{
concat_iterator::ConcatIterator, merged_iterator::MergedIterator,
two_merge_iterator::TwoMergeIterator,
},
memtable::memtable_iterator::MemtableIterator,
};

type LsmIteratorInner =
type LsmIteratorInnerL0 =
TwoMergeIterator<MergedIterator<MemtableIterator>, MergedIterator<SSTableIterator>>;

type LsmIteratorInnerL1 = TwoMergeIterator<LsmIteratorInnerL0, ConcatIterator>;

pub struct LsmIterator {
inner: LsmIteratorInner,
inner: LsmIteratorInnerL1,
end_bound: Bound<Bytes>,
valid: bool,
}

impl LsmIterator {
pub fn new(iter: LsmIteratorInner, upper: Bound<Bytes>) -> Self {
pub fn new(iter: LsmIteratorInnerL1, upper: Bound<Bytes>) -> Self {
let mut lsm_iterator = Self {
inner: iter,
end_bound: upper,
Expand Down
50 changes: 45 additions & 5 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::{
SSTable, SSTableIterator,
common::{errors::KeyNotFound, iterator::StorageIterator},
iterators::{
lsm_iterator::LsmIterator, merged_iterator::MergedIterator,
two_merge_iterator::TwoMergeIterator,
concat_iterator::ConcatIterator, lsm_iterator::LsmIterator,
merged_iterator::MergedIterator, two_merge_iterator::TwoMergeIterator,
},
lsm_util::{create_db_dir, load_sstables},
memtable::{memtable_iterator::map_bound, table::Memtable},
Expand Down Expand Up @@ -106,7 +106,7 @@ impl Storage {
}
}

// search in ssts
// search in l1 ssts
if res.is_none() {
let mut table_iters = Vec::with_capacity(state.l0_sstables.len());
for table_id in state.l0_sstables.iter() {
Expand All @@ -127,6 +127,25 @@ impl Storage {
}
}

// search in l1 sstables
if res.is_none() {
let mut tables = Vec::with_capacity(state.levels[0].1.len());
for table_id in state.levels[0].1.iter() {
let table = state.sstables[table_id].clone();
if key < table.first_key() || key > table.last_key() {
continue;
}
tables.push(table);
}

let concat_iter = ConcatIterator::create_and_seek_to_key(tables, key).unwrap();
if !concat_iter.key().is_empty() && concat_iter.key() == key {
res = Some(Bytes::copy_from_slice(concat_iter.value()))
} else {
res = None;
}
}

match res {
Some(value) => Ok(value),
None => Err(KeyNotFound),
Expand Down Expand Up @@ -170,9 +189,30 @@ impl Storage {
table_iters.push(iter);
}

let concat_iter = {
let mut tables = Vec::with_capacity(state.levels[0].1.len());
for table_id in state.levels[0].1.iter() {
let table = state.sstables[table_id].clone();
tables.push(table);
}
match lower {
Bound::Included(key) => ConcatIterator::create_and_seek_to_key(tables, key)?,
Bound::Unbounded => ConcatIterator::create_and_seek_to_first(tables)?,
Bound::Excluded(key) => {
let mut iter = ConcatIterator::create_and_seek_to_key(tables, key)?;

if iter.is_valid() && iter.key() == key {
iter.next()?;
}
iter
}
}
};

let sst_iters = MergedIterator::new(table_iters);
let inner = TwoMergeIterator::create(mem_iters, sst_iters).unwrap();
Ok(LsmIterator::new(inner, map_bound(upper)))
let mem_l0 = TwoMergeIterator::create(mem_iters, sst_iters).unwrap();
let mem_l0_l1 = TwoMergeIterator::create(mem_l0, concat_iter).unwrap();
Ok(LsmIterator::new(mem_l0_l1, map_bound(upper)))
}

fn try_freeze(&self, size: usize) {
Expand Down
Loading