Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions storage/src/compaction/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down Expand Up @@ -139,11 +140,13 @@ mod tests {
#[test]
fn scans_through_memtables_l0_l1_sstables() {
let db_dir = String::from(tempdir().unwrap().path().to_str().unwrap());

let config = Config {
sst_size: 4,
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down Expand Up @@ -206,6 +209,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);
let mut entries = get_entries();
Expand Down Expand Up @@ -262,6 +266,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down
23 changes: 18 additions & 5 deletions storage/src/compaction/flush.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use anyhow::Result;
use std::{
fs,
path::Path,
sync::Arc,
thread::{self, JoinHandle},
time::Duration,
Expand All @@ -15,11 +17,11 @@ impl Storage {
let mut sst_builder = SSTableBuilder::new(self.config.block_size);
let mut guard = self.state.write().unwrap();

let mut memtables = guard.frozen_memtables.clone();
let mut frozen_memtables = guard.frozen_memtables.clone();
let mut l0_sstables = guard.l0_sstables.clone();
let mut sstables = guard.sstables.clone();

let Some(memtable) = memtables.pop() else {
let Some(memtable) = frozen_memtables.pop() else {
return Ok(());
};
memtable.flush(&mut sst_builder)?;
Expand All @@ -32,12 +34,16 @@ impl Storage {
l0_sstables.insert(0, memtable.id);
sstables.insert(memtable.id, Arc::new(sst));

if self.config.enable_wal {
self.remove_wal(memtable.id)?;
}

*guard = Arc::new(StorageState {
memtable: guard.memtable.clone(),
frozen_memtables: memtables,
levels: guard.levels.clone(),
frozen_memtables,
l0_sstables,
sstables,
levels: guard.levels.clone(),
memtable: guard.memtable.clone(),
});

memtable.id
Expand All @@ -61,6 +67,13 @@ impl Storage {
Ok(())
}

fn remove_wal(&self, id: usize) -> Result<()> {
let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal"));
fs::remove_file(wal_path)?;

Ok(())
}

pub fn sst_path(&self, id: usize) -> String {
format!("{}/{}.sst", self.config.db_dir, id)
}
Expand Down
113 changes: 99 additions & 14 deletions storage/src/lsm_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ pub struct Config {
pub block_size: usize,
pub num_memtable_limit: usize,
pub db_dir: String,
pub enable_wal: bool,
}

pub fn new(config: Config) -> Arc<Storage> {
let state_lock = Mutex::new(());
let block_cache = Arc::new(BlockCache::new(4096));
let db_dir = Path::new(&config.db_dir);

create_db_dir(db_dir);

let manifest;
Expand All @@ -66,26 +69,74 @@ pub fn new(config: Config) -> Arc<Storage> {
Err(_) => manifest = Manifest::create(manifest_file).unwrap(),
}

let (l0_sst_ids, l1_sst_ids, sstables) =
let (memtable_ids, l0_sst_ids, l1_sst_ids, sstables) =
load_sstables(db_dir, block_cache, manifest_records).expect("loaded sstables");

let sst_id = match ([&l0_sst_ids[..], &l1_sst_ids[..]].concat()).iter().max() {
let sst_id = match ([&memtable_ids[..], &l0_sst_ids[..], &l1_sst_ids[..]].concat())
.iter()
.max()
{
Some(id) => id + 1,
None => 0,
};

let (memtable, frozen_memtables) = match config.enable_wal {
true => {
let wal_path = db_dir.join(format!("{sst_id}.wal"));
if memtable_ids.is_empty() {
let memtable = Memtable::new_with_wal(sst_id, wal_path.as_path()).expect("");
manifest
.add_record(
&state_lock.lock().unwrap(),
ManifestRecord::NewMemtable(sst_id),
)
.expect("added manifest record");

(Arc::new(memtable), vec![])
} else {
let mut memtables = vec![];
for id in memtable_ids {
let wal_path = db_dir.join(format!("{id}.wal"));
let memtable =
Memtable::new_with_wal(id, wal_path.as_path()).expect("created memtable");
memtables.push(Arc::new(memtable));
}

(memtables.remove(0), memtables)
}
}
_ => {
manifest
.add_record(
&state_lock.lock().unwrap(),
ManifestRecord::NewMemtable(sst_id),
)
.expect("added manifest record");

(Arc::new(Memtable::new(sst_id)), vec![])
}
};

if memtable.get_size() == 0 {
manifest
.add_record(
&state_lock.lock().unwrap(),
ManifestRecord::NewMemtable(sst_id),
)
.expect("added manifest record");
}

Arc::new(Storage {
config,
sst_id: AtomicUsize::new(sst_id),
state_lock: Mutex::new(()),
state_lock,
manifest,
sst_id: AtomicUsize::new(sst_id),
block_cache: Arc::new(BlockCache::new(1 << 20)), // 1mb cache
state: RwLock::new(Arc::new(StorageState {
l0_sstables: l0_sst_ids,
sstables,
frozen_memtables,
memtable,
l0_sstables: l0_sst_ids,
levels: vec![(0, l1_sst_ids)],
frozen_memtables: Vec::new(),
memtable: Arc::new(Memtable::new(sst_id)),
})),
})
}
Expand All @@ -100,7 +151,7 @@ impl Storage {
size = guard.memtable.get_size();
}

self.try_freeze(size);
self.try_freeze(size)?;
Ok(())
}

Expand Down Expand Up @@ -231,26 +282,32 @@ impl Storage {
Ok(LsmIterator::new(mem_l0_l1, map_bound(upper)))
}

fn try_freeze(&self, size: usize) {
fn try_freeze(&self, size: usize) -> Result<()> {
if size >= self.config.sst_size {
let lock = self.state_lock.lock().unwrap();
self.freeze(&lock);
self.freeze(&lock)?;
}

Ok(())
}

fn freeze(&self, _state_lock: &MutexGuard<()>) {
fn freeze(&self, state_lock: &MutexGuard<()>) -> Result<()> {
let mut guard = self.state.write().unwrap();
let memtable = guard.memtable.clone();
memtable.sync_wal()?;

// check again, another thread might have frozen the memtable already.
if memtable.get_size() >= self.config.sst_size {
let mut frozen_memtables = guard.frozen_memtables.clone();
frozen_memtables.insert(0, memtable);

let id = self.get_sst_id();
self.manifest
.add_record(state_lock, ManifestRecord::NewMemtable(id))?;
let memtable = self.create_memtable(id)?;

*guard = Arc::new(StorageState {
memtable: Arc::new(Memtable::new(id)),
memtable: Arc::new(memtable),
frozen_memtables,
l0_sstables: guard.l0_sstables.clone(),
sstables: guard.sstables.clone(),
Expand All @@ -259,12 +316,27 @@ impl Storage {

drop(guard);
}

Ok(())
}

fn create_memtable(&self, id: usize) -> Result<Memtable> {
let wal_path = Path::new(&self.config.db_dir).join(format!("{id}.wal"));
let memtable = match self.config.enable_wal {
true => Memtable::new_with_wal(id, wal_path.as_path()).expect("memtable with wal"),
_ => Memtable::new(id),
};

Ok(memtable)
}

pub(crate) fn get_sst_id(&self) -> usize {
self.sst_id.fetch_add(1, SeqCst);
self.sst_id.load(SeqCst)
}
pub fn sync(&self) -> Result<()> {
self.state.read().unwrap().memtable.sync_wal()
}
}

#[cfg(test)]
Expand All @@ -284,6 +356,7 @@ mod tests {
block_size: 32,
db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -303,6 +376,7 @@ mod tests {
block_size: 32,
db_dir: String::from(tempdir().unwrap().path().to_str().unwrap()),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -328,6 +402,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -345,6 +420,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};

let storage = new(config);
Expand All @@ -360,6 +436,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -376,6 +453,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};

let storage = new(config);
Expand All @@ -398,6 +476,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -422,6 +501,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};

let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")];
Expand Down Expand Up @@ -472,6 +552,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand All @@ -496,6 +577,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};

let new_entries = vec![(b"a", b"20"), (b"e", b"21"), (b"d", b"22"), (b"b", b"23")];
Expand All @@ -516,7 +598,7 @@ mod tests {
while iter.is_valid() {
let k = from_utf8(iter.key()).unwrap();
let v = from_utf8(iter.value()).unwrap();

println!("key: {:?}, value: {:?}", k, v);
keys.push(String::from(k));
values.push(String::from(v));

Expand All @@ -535,6 +617,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down Expand Up @@ -593,6 +676,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down Expand Up @@ -623,6 +707,7 @@ mod tests {
block_size: 32,
db_dir: db_dir.clone(),
num_memtable_limit: 5,
enable_wal: true,
};
let storage = new(config);

Expand Down
Loading
Loading