Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ target_link_libraries(index_schema PUBLIC type_conversions)
target_link_libraries(index_schema PUBLIC utils)
target_link_libraries(index_schema PUBLIC status_macros)
target_link_libraries(index_schema PUBLIC valkey_module)
target_link_libraries(index_schema PUBLIC memory_tracker)

set(SRCS_ATTRIBUTE_DATA_TYPE ${CMAKE_CURRENT_LIST_DIR}/attribute_data_type.cc
${CMAKE_CURRENT_LIST_DIR}/attribute_data_type.h)
Expand Down
19 changes: 12 additions & 7 deletions src/index_schema.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#include "vmsdk/src/blocked_client.h"
#include "vmsdk/src/log.h"
#include "vmsdk/src/managed_pointers.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/status/status_macros.h"
#include "vmsdk/src/thread_pool.h"
#include "vmsdk/src/time_sliced_mrmw_mutex.h"
Expand Down Expand Up @@ -77,10 +78,12 @@ absl::StatusOr<std::shared_ptr<indexes::IndexBase>> IndexFactory(
const auto &index = attribute.index();
switch (index.index_type_case()) {
case data_model::Index::IndexTypeCase::kTagIndex: {
return std::make_shared<indexes::Tag>(index.tag_index());
return std::make_shared<indexes::Tag>(index.tag_index(),
index_schema->GetMemoryPool());
}
case data_model::Index::IndexTypeCase::kNumericIndex: {
return std::make_shared<indexes::Numeric>(index.numeric_index());
return std::make_shared<indexes::Numeric>(index.numeric_index(),
index_schema->GetMemoryPool());
}
case data_model::Index::IndexTypeCase::kVectorIndex: {
switch (index.vector_index().algorithm_case()) {
Expand All @@ -93,10 +96,11 @@ absl::StatusOr<std::shared_ptr<indexes::IndexBase>> IndexFactory(
? indexes::VectorHNSW<float>::LoadFromRDB(
ctx, &index_schema->GetAttributeDataType(),
index.vector_index(), attribute.identifier(),
std::move(*iter))
std::move(*iter), index_schema->GetMemoryPool())
: indexes::VectorHNSW<float>::Create(
index.vector_index(), attribute.identifier(),
index_schema->GetAttributeDataType().ToProto()));
index_schema->GetAttributeDataType().ToProto(),
index_schema->GetMemoryPool()));
index_schema->SubscribeToVectorExternalizer(
attribute.identifier(), index.get());
return index;
Expand All @@ -118,10 +122,11 @@ absl::StatusOr<std::shared_ptr<indexes::IndexBase>> IndexFactory(
? indexes::VectorFlat<float>::LoadFromRDB(
ctx, &index_schema->GetAttributeDataType(),
index.vector_index(), attribute.identifier(),
std::move(*iter))
std::move(*iter), index_schema->GetMemoryPool())
: indexes::VectorFlat<float>::Create(
index.vector_index(), attribute.identifier(),
index_schema->GetAttributeDataType().ToProto()));
index_schema->GetAttributeDataType().ToProto(),
index_schema->GetMemoryPool()));
index_schema->SubscribeToVectorExternalizer(
attribute.identifier(), index.get());
return index;
Expand Down Expand Up @@ -1047,7 +1052,7 @@ void IndexSchema::OnLoadingEnded(ValkeyModuleCtx *ctx) {
<< " stale entries for {Index: " << name_ << "}";

for (auto &[key, attributes] : deletion_attributes) {
auto interned_key = std::make_shared<InternedString>(key);
auto interned_key = StringInternStore::Intern(key);
ProcessMutation(ctx, attributes, interned_key, true);
}
VMSDK_LOG(NOTICE, ctx) << "Scanned index schema " << name_
Expand Down
5 changes: 5 additions & 0 deletions src/index_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "src/utils/string_interning.h"
#include "vmsdk/src/blocked_client.h"
#include "vmsdk/src/managed_pointers.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/thread_pool.h"
#include "vmsdk/src/time_sliced_mrmw_mutex.h"
#include "vmsdk/src/utils.h"
Expand Down Expand Up @@ -172,6 +173,8 @@ class IndexSchema : public KeyspaceEventSubscription,
uint64_t GetBackfillDbSize() const;
InfoIndexPartitionData GetInfoIndexPartitionData() const;

MemoryPool &GetMemoryPool() { return memory_pool_; }

protected:
IndexSchema(ValkeyModuleCtx *ctx,
const data_model::IndexSchema &index_schema_proto,
Expand Down Expand Up @@ -264,6 +267,8 @@ class IndexSchema : public KeyspaceEventSubscription,
vmsdk::MainThreadAccessGuard<MultiMutations> multi_mutations_;
vmsdk::MainThreadAccessGuard<bool> schedule_multi_exec_processing_{false};

MemoryPool memory_pool_{0};

FRIEND_TEST(IndexSchemaRDBTest, SaveAndLoad);
FRIEND_TEST(IndexSchemaRDBTest, ComprehensiveSkipLoadTest);
FRIEND_TEST(IndexSchemaFriendTest, ConsistencyTest);
Expand Down
6 changes: 6 additions & 0 deletions src/indexes/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ target_link_libraries(index_base INTERFACE rdb_serialization)
target_link_libraries(index_base INTERFACE string_interning)
target_link_libraries(index_base INTERFACE managed_pointers)
target_link_libraries(index_base INTERFACE valkey_module)
target_link_libraries(index_base INTERFACE memory_tracker)

set(SRCS_VECTOR_BASE ${CMAKE_CURRENT_LIST_DIR}/vector_base.cc
${CMAKE_CURRENT_LIST_DIR}/vector_base.h)
Expand All @@ -32,6 +33,7 @@ target_link_libraries(vector_base PUBLIC managed_pointers)
target_link_libraries(vector_base PUBLIC type_conversions)
target_link_libraries(vector_base PUBLIC status_macros)
target_link_libraries(vector_base PUBLIC valkey_module)
target_link_libraries(vector_base PUBLIC memory_tracker)

set(SRCS_VECTOR_HNSW ${CMAKE_CURRENT_LIST_DIR}/vector_hnsw.cc
${CMAKE_CURRENT_LIST_DIR}/vector_hnsw.h)
Expand All @@ -50,6 +52,7 @@ target_link_libraries(vector_hnsw PUBLIC memory_allocation_overrides)
target_link_libraries(vector_hnsw PUBLIC utils)
target_link_libraries(vector_hnsw PUBLIC status_macros)
target_link_libraries(vector_hnsw PUBLIC valkey_module)
target_link_libraries(vector_hnsw PUBLIC memory_tracker)

set(SRCS_NUMERIC ${CMAKE_CURRENT_LIST_DIR}/numeric.cc
${CMAKE_CURRENT_LIST_DIR}/numeric.h)
Expand All @@ -62,6 +65,7 @@ target_link_libraries(numeric PUBLIC predicate_header)
target_link_libraries(numeric PUBLIC segment_tree)
target_link_libraries(numeric PUBLIC string_interning)
target_link_libraries(numeric PUBLIC valkey_module)
target_link_libraries(numeric PUBLIC memory_tracker)

set(SRCS_TAG ${CMAKE_CURRENT_LIST_DIR}/tag.cc ${CMAKE_CURRENT_LIST_DIR}/tag.h)

Expand All @@ -76,6 +80,7 @@ target_link_libraries(tag PUBLIC managed_pointers)
target_link_libraries(tag PUBLIC type_conversions)
target_link_libraries(tag PUBLIC valkey_module)
target_link_libraries(tag PUBLIC ${INDEX_SCHEMA_PROTO_LIB})
target_link_libraries(tag PUBLIC memory_tracker)

set(SRCS_VECTOR_FLAT ${CMAKE_CURRENT_LIST_DIR}/vector_flat.cc
${CMAKE_CURRENT_LIST_DIR}/vector_flat.h)
Expand All @@ -93,3 +98,4 @@ target_link_libraries(vector_flat PUBLIC log)
target_link_libraries(vector_flat PUBLIC memory_allocation_overrides)
target_link_libraries(vector_flat PUBLIC status_macros)
target_link_libraries(vector_flat PUBLIC valkey_module)
target_link_libraries(vector_flat PUBLIC memory_tracker)
9 changes: 8 additions & 1 deletion src/indexes/index_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "src/rdb_serialization.h"
#include "src/utils/string_interning.h"
#include "vmsdk/src/managed_pointers.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/valkey_module_api/valkey_module.h"

namespace valkey_search::indexes {
Expand All @@ -40,7 +41,8 @@ const absl::NoDestructor<absl::flat_hash_map<absl::string_view, IndexerType>>

class IndexBase {
public:
explicit IndexBase(IndexerType indexer_type) : indexer_type_(indexer_type) {}
explicit IndexBase(IndexerType indexer_type, MemoryPool& memory_pool)
: indexer_type_(indexer_type), memory_pool_(memory_pool) {}
virtual ~IndexBase() = default;

// Add/Remove/Modify will return true if the operation was successful, false
Expand All @@ -67,6 +69,11 @@ class IndexBase {
}
virtual uint64_t GetRecordCount() const = 0;

MemoryPool& GetMemoryPool() { return memory_pool_; }

protected:
MemoryPool& memory_pool_;

private:
IndexerType indexer_type_{IndexerType::kNone};
};
Expand Down
62 changes: 44 additions & 18 deletions src/indexes/numeric.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "src/indexes/index_base.h"
#include "src/query/predicate.h"
#include "src/utils/string_interning.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/valkey_module_api/valkey_module.h"

namespace valkey_search::indexes {
Expand All @@ -38,29 +39,48 @@ std::optional<double> ParseNumber(absl::string_view data) {
}
} // namespace

Numeric::Numeric(const data_model::NumericIndex& numeric_index_proto)
: IndexBase(IndexerType::kNumeric) {
Numeric::Numeric(const data_model::NumericIndex& numeric_index_proto,
MemoryPool& memory_pool)
: IndexBase(IndexerType::kNumeric, memory_pool) {
IsolatedMemoryScope scope{memory_pool};

tracked_keys_ = std::make_unique<InternedStringMap<double>>();
untracked_keys_ = std::make_unique<InternedStringSet>();
index_ = std::make_unique<BTreeNumericIndex>();
}

Numeric::~Numeric() {
IsolatedMemoryScope scope{memory_pool_};

tracked_keys_.reset();
untracked_keys_.reset();
index_.reset();
}

// NOTE: key should be stored interned string.
absl::StatusOr<bool> Numeric::AddRecord(const InternedStringPtr& key,
absl::string_view data) {
IsolatedMemoryScope scope{memory_pool_};

auto value = ParseNumber(data);
absl::MutexLock lock(&index_mutex_);
if (!value.has_value()) {
untracked_keys_.insert(key);
untracked_keys_->insert(key);
return false;
}
auto [_, succ] = tracked_keys_.insert({key, *value});
auto [_, succ] = tracked_keys_->insert({key, *value});
if (!succ) {
// NOTE: don't track allocation error.
DisableMemoryTracking disable_tracking;
return absl::AlreadyExistsError(
absl::StrCat("Key `", key->Str(), "` already exists"));
}
untracked_keys_.erase(key);
untracked_keys_->erase(key);
index_->Add(key, *value);
return true;
}

// NOTE: key should be stored interned string.
absl::StatusOr<bool> Numeric::ModifyRecord(const InternedStringPtr& key,
absl::string_view data) {
auto value = ParseNumber(data);
Expand All @@ -69,35 +89,41 @@ absl::StatusOr<bool> Numeric::ModifyRecord(const InternedStringPtr& key,
RemoveRecord(key, indexes::DeletionType::kIdentifier);
return false;
}

absl::MutexLock lock(&index_mutex_);
auto it = tracked_keys_.find(key);
if (it == tracked_keys_.end()) {
auto it = tracked_keys_->find(key);
if (it == tracked_keys_->end()) {
return absl::NotFoundError(
absl::StrCat("Key `", key->Str(), "` not found"));
}

IsolatedMemoryScope scope{memory_pool_};

index_->Modify(it->first, it->second, *value);
it->second = *value;
return true;
}

// NOTE: key should be stored interned string.
absl::StatusOr<bool> Numeric::RemoveRecord(const InternedStringPtr& key,
DeletionType deletion_type) {
IsolatedMemoryScope scope{memory_pool_};

absl::MutexLock lock(&index_mutex_);
if (deletion_type == DeletionType::kRecord) {
// If key is DELETED, remove it from untracked_keys_.
untracked_keys_.erase(key);
untracked_keys_->erase(key);
} else {
// If key doesn't have TAG but exists, insert it to untracked_keys_.
untracked_keys_.insert(key);
untracked_keys_->insert(key);
}
auto it = tracked_keys_.find(key);
if (it == tracked_keys_.end()) {
auto it = tracked_keys_->find(key);
if (it == tracked_keys_->end()) {
return false;
}

index_->Remove(it->first, it->second);
tracked_keys_.erase(it);
tracked_keys_->erase(it);
return true;
}

Expand All @@ -107,13 +133,13 @@ int Numeric::RespondWithInfo(ValkeyModuleCtx* ctx) const {
ValkeyModule_ReplyWithSimpleString(ctx, "size");
absl::MutexLock lock(&index_mutex_);
ValkeyModule_ReplyWithCString(ctx,
std::to_string(tracked_keys_.size()).c_str());
std::to_string(tracked_keys_->size()).c_str());
return 4;
}

bool Numeric::IsTracked(const InternedStringPtr& key) const {
absl::MutexLock lock(&index_mutex_);
return tracked_keys_.contains(key);
return tracked_keys_->contains(key);
}

std::unique_ptr<data_model::Index> Numeric::ToProto() const {
Expand All @@ -126,7 +152,7 @@ std::unique_ptr<data_model::Index> Numeric::ToProto() const {
const double* Numeric::GetValue(const InternedStringPtr& key) const {
// Note that the Numeric index is not mutated while the time sliced mutex is
// in a read mode and therefor it is safe to skip lock acquiring.
if (auto it = tracked_keys_.find(key); it != tracked_keys_.end()) {
if (auto it = tracked_keys_->find(key); it != tracked_keys_->end()) {
return &it->second;
}
return nullptr;
Expand Down Expand Up @@ -154,8 +180,8 @@ std::unique_ptr<Numeric::EntriesFetcher> Numeric::Search(
;
additional_entries_range.second = btree.end();
return std::make_unique<Numeric::EntriesFetcher>(
entries_range, size + untracked_keys_.size(), additional_entries_range,
&untracked_keys_);
entries_range, size + untracked_keys_->size(), additional_entries_range,
untracked_keys_.get());
}

entries_range.first = predicate.IsStartInclusive()
Expand Down Expand Up @@ -256,7 +282,7 @@ std::unique_ptr<EntriesFetcherIteratorBase> Numeric::EntriesFetcher::Begin() {

uint64_t Numeric::GetRecordCount() const {
absl::MutexLock lock(&index_mutex_);
return tracked_keys_.size();
return tracked_keys_->size();
}

} // namespace valkey_search::indexes
13 changes: 9 additions & 4 deletions src/indexes/numeric.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "src/rdb_serialization.h"
#include "src/utils/segment_tree.h"
#include "src/utils/string_interning.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/valkey_module_api/valkey_module.h"

namespace valkey_search::indexes {
Expand Down Expand Up @@ -81,7 +82,9 @@ class BTreeNumeric {

class Numeric : public IndexBase {
public:
explicit Numeric(const data_model::NumericIndex& numeric_index_proto);
explicit Numeric(const data_model::NumericIndex& numeric_index_proto,
MemoryPool& memory_pool);
~Numeric() override;
absl::StatusOr<bool> AddRecord(const InternedStringPtr& key,
absl::string_view data) override
ABSL_LOCKS_EXCLUDED(index_mutex_);
Expand All @@ -100,7 +103,7 @@ class Numeric : public IndexBase {
inline void ForEachTrackedKey(
absl::AnyInvocable<void(const InternedStringPtr&)> fn) const override {
absl::MutexLock lock(&index_mutex_);
for (const auto& [key, _] : tracked_keys_) {
for (const auto& [key, _] : *tracked_keys_) {
fn(key);
}
}
Expand Down Expand Up @@ -166,9 +169,11 @@ class Numeric : public IndexBase {

private:
mutable absl::Mutex index_mutex_;
InternedStringMap<double> tracked_keys_ ABSL_GUARDED_BY(index_mutex_);
std::unique_ptr<InternedStringMap<double>> tracked_keys_
ABSL_GUARDED_BY(index_mutex_);
// untracked keys is needed to support negate filtering
InternedStringSet untracked_keys_ ABSL_GUARDED_BY(index_mutex_);
std::unique_ptr<InternedStringSet> untracked_keys_
ABSL_GUARDED_BY(index_mutex_);
std::unique_ptr<BTreeNumericIndex> index_ ABSL_GUARDED_BY(index_mutex_);
};
} // namespace valkey_search::indexes
Expand Down
5 changes: 3 additions & 2 deletions src/indexes/tag.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "src/query/predicate.h"
#include "src/utils/patricia_tree.h"
#include "src/utils/string_interning.h"
#include "vmsdk/src/memory_tracker.h"
#include "vmsdk/src/valkey_module_api/valkey_module.h"

namespace valkey_search::indexes {
Expand All @@ -39,8 +40,8 @@ static bool IsValidPrefix(absl::string_view str) {
str[str.length() - 2] != '*';
}

Tag::Tag(const data_model::TagIndex& tag_index_proto)
: IndexBase(IndexerType::kTag),
Tag::Tag(const data_model::TagIndex& tag_index_proto, MemoryPool& memory_pool)
: IndexBase(IndexerType::kTag, memory_pool),
separator_(tag_index_proto.separator()[0]),
case_sensitive_(tag_index_proto.case_sensitive()),
tree_(case_sensitive_) {}
Expand Down
Loading
Loading