Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions be/src/service/staros_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ absl::Status StarOSWorker::update_worker_info(const staros::starlet::WorkerInfo&
absl::StatusOr<std::shared_ptr<fslib::FileSystem>> StarOSWorker::get_shard_filesystem(ShardId id,
const Configuration& conf) {
ShardInfo shard_info;
std::shared_ptr<std::string> existing_fs_cache_key;
{ // shared_lock, check if the filesystem already created
std::shared_lock l(_mtx);
auto it = _shards.find(id);
Expand All @@ -226,11 +227,12 @@ absl::StatusOr<std::shared_ptr<fslib::FileSystem>> StarOSWorker::get_shard_files
return fs;
}
shard_info = it->second.shard_info;
existing_fs_cache_key = it->second.fs_cache_key;
}

// Build the filesystem under no lock, so the op won't hold the lock for a long time.
// It is possible that multiple filesystems are built for the same shard from multiple threads under no lock here.
auto fs_or = build_filesystem_from_shard_info(shard_info, conf);
auto fs_or = build_filesystem_from_shard_info(shard_info, conf, existing_fs_cache_key);
if (!fs_or.ok()) {
return fs_or.status();
}
Expand Down Expand Up @@ -282,7 +284,8 @@ absl::StatusOr<std::shared_ptr<fslib::FileSystem>> StarOSWorker::build_filesyste
}

absl::StatusOr<std::pair<std::shared_ptr<std::string>, std::shared_ptr<fslib::FileSystem>>>
StarOSWorker::build_filesystem_from_shard_info(const ShardInfo& info, const Configuration& conf) {
StarOSWorker::build_filesystem_from_shard_info(const ShardInfo& info, const Configuration& conf,
const std::shared_ptr<std::string>& existing_fs_cache_key) {
auto localconf = build_conf_from_shard_info(info);
if (!localconf.ok()) {
return localconf.status();
Expand All @@ -292,7 +295,7 @@ StarOSWorker::build_filesystem_from_shard_info(const ShardInfo& info, const Conf
return scheme.status();
}

return new_shared_filesystem(*scheme, *localconf);
return new_shared_filesystem(info.id, *scheme, *localconf, existing_fs_cache_key);
}

bool StarOSWorker::need_enable_cache(const ShardInfo& info) {
Expand Down Expand Up @@ -334,7 +337,8 @@ absl::StatusOr<fslib::Configuration> StarOSWorker::build_conf_from_shard_info(co
}

absl::StatusOr<std::pair<std::shared_ptr<std::string>, std::shared_ptr<fslib::FileSystem>>>
StarOSWorker::new_shared_filesystem(std::string_view scheme, const Configuration& conf) {
StarOSWorker::new_shared_filesystem(ShardId shard_id, std::string_view scheme, const Configuration& conf,
const std::shared_ptr<std::string>& existing_fs_cache_key) {
std::string cache_key = get_cache_key(scheme, conf);

// Lookup LRU cache
Expand All @@ -361,7 +365,7 @@ StarOSWorker::new_shared_filesystem(std::string_view scheme, const Configuration
VLOG(9) << "Share filesystem";
return value_or;
}
auto fs_cache_key = insert_fs_cache(cache_key, fs);
auto fs_cache_key = insert_fs_cache(cache_key, fs, existing_fs_cache_key);

return std::make_pair(std::move(fs_cache_key), std::move(fs));
}
Expand All @@ -379,13 +383,23 @@ std::string StarOSWorker::get_cache_key(std::string_view scheme, const Configura
}

std::shared_ptr<std::string> StarOSWorker::insert_fs_cache(const std::string& key,
const std::shared_ptr<FileSystem>& fs) {
std::shared_ptr<std::string> fs_cache_key(new std::string(key), [](std::string* key) {
if (g_worker) {
g_worker->erase_fs_cache(*key);
}
delete key;
});
const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<std::string>& existing_fs_cache_key) {
std::shared_ptr<std::string> fs_cache_key;

// Reuse existing fs_cache_key if it matches the current key
if (existing_fs_cache_key != nullptr && *existing_fs_cache_key == key) {
fs_cache_key = existing_fs_cache_key;
}

if (fs_cache_key == nullptr) {
fs_cache_key = std::shared_ptr<std::string>(new std::string(key), [](std::string* key) {
if (g_worker) {
g_worker->erase_fs_cache(*key);
}
delete key;
});
}

CacheKey cache_key(key);
auto value = new CacheValue(fs_cache_key, fs);
Expand Down
9 changes: 6 additions & 3 deletions be/src/service/staros_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,15 @@ class StarOSWorker : public staros::starlet::Worker {

absl::StatusOr<std::shared_ptr<FileSystem>> build_filesystem_on_demand(ShardId id, const Configuration& conf);
absl::StatusOr<std::pair<std::shared_ptr<std::string>, std::shared_ptr<FileSystem>>>
build_filesystem_from_shard_info(const ShardInfo& info, const Configuration& conf);
build_filesystem_from_shard_info(const ShardInfo& info, const Configuration& conf,
const std::shared_ptr<std::string>& existing_fs_cache_key = nullptr);
absl::StatusOr<std::pair<std::shared_ptr<std::string>, std::shared_ptr<FileSystem>>> new_shared_filesystem(
std::string_view scheme, const Configuration& conf);
ShardId shard_id, std::string_view scheme, const Configuration& conf,
const std::shared_ptr<std::string>& existing_fs_cache_key = nullptr);
absl::Status invalidate_fs(const ShardInfo& shard);

std::shared_ptr<std::string> insert_fs_cache(const std::string& key, const std::shared_ptr<FileSystem>& fs);
std::shared_ptr<std::string> insert_fs_cache(const std::string& key, const std::shared_ptr<FileSystem>& fs,
const std::shared_ptr<std::string>& existing_fs_cache_key = nullptr);
void erase_fs_cache(const std::string& key);
std::shared_ptr<FileSystem> lookup_fs_cache(const std::string& key);
std::shared_ptr<FileSystem> lookup_fs_cache(const std::shared_ptr<std::string>& key);
Expand Down
Loading