Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ description = "LSMTree based storage engine"
[dependencies]
anyhow = "1.0.99"
bytes = "1.10.1"
crossbeam = "0.8.4"
crossbeam-skiplist = "0.1.3"
moka = { version = "0.12.11", features = ["sync"] }
ouroboros = "0.18.5"
serde = "1.0.228"

[dev-dependencies]
tempfile = "3.23.0"
2 changes: 1 addition & 1 deletion storage/src/block/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use crate::block::{Block, SIZEOF_U16};
#[derive(Debug)]
pub struct BlockBuilder {
pub(crate) data: Vec<u8>,
pub(crate) first_key: Vec<u8>,
offsets: Vec<u16>,
block_size: usize,
pub(crate) first_key: Vec<u8>,
}

impl BlockBuilder {
Expand Down
70 changes: 70 additions & 0 deletions storage/src/compaction/flush.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use anyhow::Result;
use std::{
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
};

use crate::{SSTableBuilder, Storage, lsm_storage::StorageState};

impl Storage {
pub(crate) fn flush_frozen_memtable(&self) -> Result<()> {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
let mut guard = self.state.write().unwrap();

let mut memtables = guard.frozen_memtables.clone();
let mut l0_sstables = guard.l0_sstables.clone();
let mut sstables = guard.sstables.clone();

let Some(memtable) = memtables.pop() else {
return Ok(());
};
memtable.flush(&mut sst_builder)?;

let sst = sst_builder.build(
memtable.id,
self.block_cache.clone(),
self.sst_path(memtable.id),
)?;
l0_sstables.push(memtable.id);
sstables.insert(memtable.id, Arc::new(sst));

*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
l0_sstables,
sstables,
});

Ok(())
}

fn trigger_flush(&self) -> Result<()> {
let memtable_count = {
let guard = self.state.read().unwrap();
guard.frozen_memtables.len()
};

if self.config.num_memtable_limit > memtable_count {
self.flush_frozen_memtable()?;
}

Ok(())
}

fn sst_path(&self, id: usize) -> String {
format!("{}/sst/{}.sst", self.config.db_dir, id)
}
}

// TODO: suppot msg passing
pub fn spawn_flusher(storage: Arc<Storage>) -> JoinHandle<()> {
let this = storage.clone();

thread::spawn(move || {
loop {
this.trigger_flush().expect("memtable to have been flushed");
thread::sleep(Duration::from_millis(50));
}
})
}
1 change: 1 addition & 0 deletions storage/src/compaction/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod flush;
1 change: 1 addition & 0 deletions storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub mod common;
pub mod compaction;
pub mod iterators;
pub mod lsm_storage;
mod lsm_util;
Expand Down
Loading
Loading