diff --git a/.vscode/settings.json b/.vscode/settings.json index 0779d4799..b5d265a10 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,7 +5,7 @@ "clangd.path": "/usr/bin/clangd", "C_Cpp.codeAnalysis.runAutomatically": true, "editor.formatOnSave": true, - "C_Cpp.intelliSenseEngine": "disabled", + "C_Cpp.intelliSenseEngine": "default", "C_Cpp.codeAnalysis.clangTidy.enabled": true, "C_Cpp.codeAnalysis.clangTidy.useBuildPath": true, "clangd.arguments": [ @@ -17,5 +17,8 @@ "--query-driver=**" ], "clangd.inactiveRegions.useBackgroundHighlight": true, - "clangd.enableCodeCompletion": true -} + "clangd.enableCodeCompletion": true, + "files.associations": { + "string_view": "cpp" + } +} \ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f8f90adb3..a591471b5 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -80,6 +80,7 @@ target_link_libraries(valkey_search PUBLIC utils) target_link_libraries(valkey_search PUBLIC status_macros) target_link_libraries(valkey_search PUBLIC valkey_module) target_link_libraries(valkey_search PUBLIC acl) +target_link_libraries(valkey_search PUBLIC cluster_map) set(SRCS_KEYSPACE_EVENT_MANAGER ${CMAKE_CURRENT_LIST_DIR}/keyspace_event_manager.cc diff --git a/src/commands/ft_dropindex.cc b/src/commands/ft_dropindex.cc index f97f0184c..f4013526d 100644 --- a/src/commands/ft_dropindex.cc +++ b/src/commands/ft_dropindex.cc @@ -13,6 +13,7 @@ #include "src/schema_manager.h" #include "src/valkey_search.h" #include "src/valkey_search_options.h" +#include "vmsdk/src/cluster_map.h" #include "vmsdk/src/status/status_macros.h" #include "vmsdk/src/type_conversions.h" #include "vmsdk/src/utils.h" @@ -24,7 +25,7 @@ class DropConsistencyCheckFanoutOperation : public query::fanout::FanoutOperationBase< coordinator::InfoIndexPartitionRequest, coordinator::InfoIndexPartitionResponse, - query::fanout::FanoutTargetMode::kAll> { + fanout::FanoutTargetMode::kAll> { public: DropConsistencyCheckFanoutOperation(uint32_t db_num, const std::string& index_name, @@ -32,7 +33,7 @@ class DropConsistencyCheckFanoutOperation : query::fanout::FanoutOperationBase< coordinator::InfoIndexPartitionRequest, coordinator::InfoIndexPartitionResponse, - query::fanout::FanoutTargetMode::kAll>(), + fanout::FanoutTargetMode::kAll>(), db_num_(db_num), index_name_(index_name), timeout_ms_(timeout_ms){}; @@ -40,16 +41,16 @@ class DropConsistencyCheckFanoutOperation unsigned GetTimeoutMs() const override { return timeout_ms_; } coordinator::InfoIndexPartitionRequest GenerateRequest( - const query::fanout::FanoutSearchTarget&) override { + const vmsdk::cluster_map::NodeInfo&) override { coordinator::InfoIndexPartitionRequest req; req.set_db_num(db_num_); req.set_index_name(index_name_); return req; } - void OnResponse(const coordinator::InfoIndexPartitionResponse& resp, - [[maybe_unused]] const query::fanout::FanoutSearchTarget& - target) override { + void OnResponse( + const coordinator::InfoIndexPartitionResponse& resp, + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) override { // if the index exist on some node and returns a valid response, treat it as // inconsistent error absl::MutexLock lock(&mutex_); @@ -59,7 +60,7 @@ class DropConsistencyCheckFanoutOperation std::pair GetLocalResponse( const coordinator::InfoIndexPartitionRequest& request, - [[maybe_unused]] const query::fanout::FanoutSearchTarget&) override { + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override { return coordinator::Service::GenerateInfoResponse(request); } diff --git a/src/query/cluster_info_fanout_operation.cc b/src/query/cluster_info_fanout_operation.cc index e94bd0a64..f972953a3 100644 --- a/src/query/cluster_info_fanout_operation.cc +++ b/src/query/cluster_info_fanout_operation.cc @@ -16,7 +16,7 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation( uint32_t db_num, const std::string& index_name, unsigned timeout_ms) : fanout::FanoutOperationBase(), + vmsdk::cluster_map::FanoutTargetMode::kAll>(), db_num_(db_num), index_name_(index_name), timeout_ms_(timeout_ms), @@ -30,7 +30,8 @@ unsigned ClusterInfoFanoutOperation::GetTimeoutMs() const { } coordinator::InfoIndexPartitionRequest -ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) { +ClusterInfoFanoutOperation::GenerateRequest( + const vmsdk::cluster_map::NodeInfo&) { coordinator::InfoIndexPartitionRequest req; req.set_db_num(db_num_); req.set_index_name(index_name_); @@ -39,7 +40,7 @@ ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) { void ClusterInfoFanoutOperation::OnResponse( const coordinator::InfoIndexPartitionResponse& resp, - [[maybe_unused]] const fanout::FanoutSearchTarget& target) { + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) { if (!resp.error().empty()) { grpc::Status status = grpc::Status(grpc::StatusCode::INTERNAL, resp.error()); @@ -111,7 +112,7 @@ void ClusterInfoFanoutOperation::OnResponse( std::pair ClusterInfoFanoutOperation::GetLocalResponse( const coordinator::InfoIndexPartitionRequest& request, - [[maybe_unused]] const fanout::FanoutSearchTarget& target) { + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) { return coordinator::Service::GenerateInfoResponse(request); } diff --git a/src/query/cluster_info_fanout_operation.h b/src/query/cluster_info_fanout_operation.h index ab81eeae9..77d9b88ec 100644 --- a/src/query/cluster_info_fanout_operation.h +++ b/src/query/cluster_info_fanout_operation.h @@ -17,13 +17,15 @@ #include "src/coordinator/coordinator.pb.h" #include "src/query/fanout_operation_base.h" #include "src/query/fanout_template.h" +#include "vmsdk/src/cluster_map.h" namespace valkey_search::query::cluster_info_fanout { -class ClusterInfoFanoutOperation : public fanout::FanoutOperationBase< - coordinator::InfoIndexPartitionRequest, - coordinator::InfoIndexPartitionResponse, - fanout::FanoutTargetMode::kAll> { +class ClusterInfoFanoutOperation + : public fanout::FanoutOperationBase< + coordinator::InfoIndexPartitionRequest, + coordinator::InfoIndexPartitionResponse, + fanout::FanoutTargetMode::kAll> { public: ClusterInfoFanoutOperation(uint32_t db_num, const std::string& index_name, unsigned timeout_ms); @@ -31,14 +33,16 @@ class ClusterInfoFanoutOperation : public fanout::FanoutOperationBase< unsigned GetTimeoutMs() const override; coordinator::InfoIndexPartitionRequest GenerateRequest( - const fanout::FanoutSearchTarget&) override; + const vmsdk::cluster_map::NodeInfo&) override; - void OnResponse(const coordinator::InfoIndexPartitionResponse& resp, - [[maybe_unused]] const fanout::FanoutSearchTarget&) override; + void OnResponse( + const coordinator::InfoIndexPartitionResponse& resp, + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override; std::pair - GetLocalResponse(const coordinator::InfoIndexPartitionRequest& request, - [[maybe_unused]] const fanout::FanoutSearchTarget&) override; + GetLocalResponse( + const coordinator::InfoIndexPartitionRequest& request, + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override; void InvokeRemoteRpc( coordinator::Client* client, diff --git a/src/query/fanout_operation_base.h b/src/query/fanout_operation_base.h index 4592ed6e3..dfc99ecdb 100644 --- a/src/query/fanout_operation_base.h +++ b/src/query/fanout_operation_base.h @@ -21,6 +21,7 @@ #include "src/utils/cancel.h" #include "src/valkey_search.h" #include "vmsdk/src/blocked_client.h" +#include "vmsdk/src/cluster_map.h" #include "vmsdk/src/debug.h" #include "vmsdk/src/log.h" #include "vmsdk/src/module_config.h" @@ -77,6 +78,7 @@ class FanoutOperationBase { } void StartFanoutRound() { + targets_ = GetTargets(); outstanding_ = targets_.size(); unsigned timeout_ms = GetTimeoutMs(); for (const auto& target : targets_) { diff --git a/src/query/primary_info_fanout_operation.cc b/src/query/primary_info_fanout_operation.cc index 654d3bf5f..e74c73575 100644 --- a/src/query/primary_info_fanout_operation.cc +++ b/src/query/primary_info_fanout_operation.cc @@ -9,14 +9,16 @@ #include "src/coordinator/metadata_manager.h" #include "src/schema_manager.h" +#include "vmsdk/src/cluster_map.h" namespace valkey_search::query::primary_info_fanout { PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation( uint32_t db_num, const std::string& index_name, unsigned timeout_ms) - : fanout::FanoutOperationBase(), + : fanout::FanoutOperationBase< + coordinator::InfoIndexPartitionRequest, + coordinator::InfoIndexPartitionResponse, + vmsdk::cluster_map::FanoutTargetMode::kPrimary>(), db_num_(db_num), index_name_(index_name), timeout_ms_(timeout_ms), @@ -30,7 +32,8 @@ unsigned PrimaryInfoFanoutOperation::GetTimeoutMs() const { } coordinator::InfoIndexPartitionRequest -PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) { +PrimaryInfoFanoutOperation::GenerateRequest( + const vmsdk::cluster_map::NodeInfo&) { coordinator::InfoIndexPartitionRequest req; req.set_db_num(db_num_); req.set_index_name(index_name_); @@ -39,7 +42,7 @@ PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) { void PrimaryInfoFanoutOperation::OnResponse( const coordinator::InfoIndexPartitionResponse& resp, - [[maybe_unused]] const fanout::FanoutSearchTarget& target) { + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) { if (!resp.error().empty()) { grpc::Status status = grpc::Status(grpc::StatusCode::INTERNAL, resp.error()); @@ -96,7 +99,7 @@ void PrimaryInfoFanoutOperation::OnResponse( std::pair PrimaryInfoFanoutOperation::GetLocalResponse( const coordinator::InfoIndexPartitionRequest& request, - [[maybe_unused]] const fanout::FanoutSearchTarget& target) { + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) { return coordinator::Service::GenerateInfoResponse(request); } diff --git a/src/query/primary_info_fanout_operation.h b/src/query/primary_info_fanout_operation.h index b4ec80fa6..98c5b2e37 100644 --- a/src/query/primary_info_fanout_operation.h +++ b/src/query/primary_info_fanout_operation.h @@ -17,6 +17,7 @@ #include "src/coordinator/coordinator.pb.h" #include "src/query/fanout_operation_base.h" #include "src/query/fanout_template.h" +#include "vmsdk/src/cluster_map.h" namespace valkey_search::query::primary_info_fanout { @@ -31,14 +32,16 @@ class PrimaryInfoFanoutOperation : public fanout::FanoutOperationBase< unsigned GetTimeoutMs() const override; coordinator::InfoIndexPartitionRequest GenerateRequest( - const fanout::FanoutSearchTarget&) override; + const vmsdk::cluster_map::NodeInfo&) override; - void OnResponse(const coordinator::InfoIndexPartitionResponse& resp, - [[maybe_unused]] const fanout::FanoutSearchTarget&) override; + void OnResponse( + const coordinator::InfoIndexPartitionResponse& resp, + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override; std::pair - GetLocalResponse(const coordinator::InfoIndexPartitionRequest& request, - [[maybe_unused]] const fanout::FanoutSearchTarget&) override; + GetLocalResponse( + const coordinator::InfoIndexPartitionRequest& request, + [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override; void InvokeRemoteRpc( coordinator::Client* client, diff --git a/src/valkey_search.cc b/src/valkey_search.cc index 934d3534f..9e4f31691 100644 --- a/src/valkey_search.cc +++ b/src/valkey_search.cc @@ -939,6 +939,24 @@ void ValkeySearch::OnServerCronCallback(ValkeyModuleCtx *ctx, absl::Seconds(options::GetMaxWorkerSuspensionSecs().GetValue())) { ResumeWriterThreadPool(ctx, /*is_expired=*/true); } + // refresh cluster map in cluster mode + if (options::GetUseCoordinator().GetValue() && IsCluster()) { + if (!cluster_map_refresh_watch_.has_value() || + cluster_map_refresh_watch_->Duration() > absl::Seconds(1)) { + VMSDK_LOG(NOTICE, nullptr) << "Refreshing cluster map in cron..."; + // Use thread-safe context for calling CLUSTER SLOTS + // ValkeyModuleCtx *detached_ctx = + // ValkeyModule_GetDetachedThreadSafeContext(ctx); + // RefreshClusterMap(detached_ctx); + // ValkeyModule_FreeThreadSafeContext(detached_ctx); + + RefreshClusterMap(GetBackgroundCtx()); + + VMSDK_LOG(NOTICE, nullptr) << "Cluster map refresh completed"; + + cluster_map_refresh_watch_ = vmsdk::StopWatch(); + } + } } void ValkeySearch::OnForkChildCallback(ValkeyModuleCtx *ctx, @@ -1027,6 +1045,8 @@ absl::Status ValkeySearch::Startup(ValkeyModuleCtx *ctx) { coordinator::MetadataManager::InitInstance( std::make_unique(ctx, *client_pool_)); coordinator::MetadataManager::Instance().RegisterForClusterMessages(ctx); + // create initial cluster map when server startup + cluster_map_ = vmsdk::cluster_map::ClusterMap::CreateNewClusterMap(ctx); } SchemaManager::InitInstance(std::make_unique( ctx, server_events::SubscribeToServerEvents, writer_thread_pool_.get(), diff --git a/src/valkey_search.h b/src/valkey_search.h index b07977edd..2c8fbad92 100644 --- a/src/valkey_search.h +++ b/src/valkey_search.h @@ -21,6 +21,7 @@ #include "src/coordinator/client_pool.h" #include "src/coordinator/server.h" #include "src/index_schema.h" +#include "vmsdk/src/cluster_map.h" #include "vmsdk/src/thread_pool.h" #include "vmsdk/src/utils.h" #include "vmsdk/src/valkey_module_api/valkey_module.h" @@ -97,6 +98,30 @@ class ValkeySearch { // of the program. ValkeyModuleCtx *GetBackgroundCtx() const { return ctx_; } + // Get current cluster map (thread-safe) + std::shared_ptr GetClusterMap() const { + return cluster_map_.load(); + } + + // Update cluster map with a new one (thread-safe atomic swap) + void UpdateClusterMap( + std::shared_ptr new_map) { + cluster_map_.store(new_map); + } + + // Refresh cluster map by creating a new one from current cluster state + void RefreshClusterMap(ValkeyModuleCtx *ctx) { + VMSDK_LOG(NOTICE, ctx) << "RefreshClusterMap called"; + auto new_map = vmsdk::cluster_map::ClusterMap::CreateNewClusterMap(ctx); + if (new_map) { + VMSDK_LOG(NOTICE, ctx) << "Updating cluster map with new map, is_full=" + << new_map->GetIsClusterMapFull(); + UpdateClusterMap(new_map); + } else { + VMSDK_LOG(WARNING, ctx) << "CreateNewClusterMap returned nullptr"; + } + } + protected: std::unique_ptr reader_thread_pool_; std::unique_ptr writer_thread_pool_; @@ -117,9 +142,11 @@ class ValkeySearch { uint64_t inc_id_{0}; ValkeyModuleCtx *ctx_{nullptr}; std::optional writer_thread_pool_suspend_watch_; + std::optional cluster_map_refresh_watch_; std::unique_ptr coordinator_; std::unique_ptr client_pool_; + std::atomic> cluster_map_; }; void ModuleInfo(ValkeyModuleInfoCtx *ctx, int for_crash_report); } // namespace valkey_search diff --git a/vmsdk/src/CMakeLists.txt b/vmsdk/src/CMakeLists.txt index 03cfa600c..bb5e50c3e 100644 --- a/vmsdk/src/CMakeLists.txt +++ b/vmsdk/src/CMakeLists.txt @@ -144,3 +144,15 @@ set(SRCS_MEMORY_TRACKER ${CMAKE_CURRENT_LIST_DIR}/memory_tracker.h) valkey_search_add_static_library(memory_tracker "${SRCS_MEMORY_TRACKER}") target_include_directories(memory_tracker PUBLIC ${CMAKE_CURRENT_LIST_DIR}) + +set(SRCS_CLUSTER_MAP ${CMAKE_CURRENT_LIST_DIR}/cluster_map.cc + ${CMAKE_CURRENT_LIST_DIR}/cluster_map.h) + +valkey_search_add_static_library(cluster_map "${SRCS_CLUSTER_MAP}") +target_include_directories(cluster_map PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_link_libraries(cluster_map PUBLIC valkey_module) +if(APPLE) + target_link_libraries(cluster_map PUBLIC absl::core_headers) +else() + target_link_libraries(cluster_map PUBLIC ${GRPC_LIB}) +endif() diff --git a/vmsdk/src/cluster_map.cc b/vmsdk/src/cluster_map.cc new file mode 100644 index 000000000..83b106ddd --- /dev/null +++ b/vmsdk/src/cluster_map.cc @@ -0,0 +1,497 @@ +/* + * Copyright (c) 2025, valkey-search contributors + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * + */ + +#include "vmsdk/src/cluster_map.h" + +#include + +#include "absl/container/flat_hash_map.h" +#include "absl/random/random.h" +#include "absl/status/status.h" +#include "absl/strings/str_cat.h" +#include "vmsdk/src/log.h" +#include "vmsdk/src/managed_pointers.h" +#include "vmsdk/src/valkey_module_api/valkey_module.h" + +namespace vmsdk { +namespace cluster_map { + +namespace coordinator { +// Coordinator port offset - same as in src/coordinator/util.h +// This offset results in 26673 for Valkey default port 6379 - which is COORD +// on a telephone keypad. +static constexpr int kCoordinatorPortOffset = 20294; + +inline int GetCoordinatorPort(int valkey_port) { + // TODO Make handling of TLS more robust + if (valkey_port == 6378) { + return valkey_port + kCoordinatorPortOffset + 1; + } + return valkey_port + kCoordinatorPortOffset; +} +} // namespace coordinator + +bool ClusterMap::GetIsClusterMapFull() const { return is_cluster_map_full_; } + +// return pre-generated primary targets +const std::vector& ClusterMap::GetPrimaryTargets() const { + return primary_targets_; +} + +// return pre-generated replica targets +const std::vector& ClusterMap::GetReplicaTargets() const { + return replica_targets_; +} + +// return pre-generated all targets +const std::vector& ClusterMap::GetAllTargets() const { + return all_targets_; +} + +// generate a random targets vector from cluster bus (not pre-generated) +std::vector ClusterMap::GetRandomTargets(ValkeyModuleCtx* ctx) { + return GetTargets(ctx, FanoutTargetMode::kRandom); +} + +// slot ownership checks +bool ClusterMap::IsSlotOwned(uint16_t slot) const { return owned_slots_[slot]; } + +// shard lookups, will return nullptr if shard does not exist +const ShardInfo* ClusterMap::GetShardById(std::string_view shard_id) const { + auto it = shards_.find(std::string(shard_id)); + if (it != shards_.end()) { + return &it->second; + } + return nullptr; +} + +// return all shards as a map +const absl::flat_hash_map& ClusterMap::GetAllShards() + const { + return shards_; +} + +// get cluster level slot fingerprint +uint64_t ClusterMap::GetClusterSlotsFingerprint() const { + return cluster_slots_fingerprint_; +} + +// private helper function to refresh targets in CreateNewClusterMap +std::vector ClusterMap::GetTargets(ValkeyModuleCtx* ctx, + FanoutTargetMode target_mode) { + size_t num_nodes; + auto nodes = vmsdk::MakeUniqueValkeyClusterNodesList(ctx, &num_nodes); + + std::vector selected_targets; + + if (target_mode == FanoutTargetMode::kPrimary) { + // Select all primary (master) nodes directly + for (size_t i = 0; i < num_nodes; ++i) { + std::string node_id(nodes.get()[i], VALKEYMODULE_NODE_ID_LEN); + char ip[INET6_ADDRSTRLEN] = ""; + char master_id[VALKEYMODULE_NODE_ID_LEN] = ""; + int port; + int flags; + + if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), ip, master_id, + &port, &flags) != VALKEYMODULE_OK) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Failed to get node info for node " << node_id + << ", skipping node..."; + continue; + } + + if (flags & (VALKEYMODULE_NODE_PFAIL | VALKEYMODULE_NODE_FAIL)) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Node " << node_id << " (" << ip + << ") is failing, skipping for fanout..."; + continue; + } + + // Only select master nodes + if (flags & VALKEYMODULE_NODE_MASTER) { + NodeInfo node_info; + node_info.node_id = node_id; + node_info.role = NodeInfo::NodeRole::kPrimary; + if (flags & VALKEYMODULE_NODE_MYSELF) { + node_info.location = NodeInfo::NodeLocation::kLocal; + node_info.address = ""; + } else { + node_info.location = NodeInfo::NodeLocation::kRemote; + node_info.address = + absl::StrCat(ip, ":", coordinator::GetCoordinatorPort(port)); + } + selected_targets.push_back(std::move(node_info)); + } + } + } else if (target_mode == FanoutTargetMode::kAll) { + // Select all nodes (both primary and replica) + for (size_t i = 0; i < num_nodes; ++i) { + std::string node_id(nodes.get()[i], VALKEYMODULE_NODE_ID_LEN); + char ip[INET6_ADDRSTRLEN] = ""; + char master_id[VALKEYMODULE_NODE_ID_LEN] = ""; + int port; + int flags; + if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), ip, master_id, + &port, &flags) != VALKEYMODULE_OK) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Failed to get node info for node " << node_id + << ", skipping node..."; + continue; + } + + if (flags & (VALKEYMODULE_NODE_PFAIL | VALKEYMODULE_NODE_FAIL)) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Node " << node_id << " (" << ip + << ") is failing, skipping for fanout..."; + continue; + } + + // Select all nodes (both master and replica) + NodeInfo node_info; + node_info.node_id = node_id; + node_info.role = (flags & VALKEYMODULE_NODE_MASTER) + ? NodeInfo::NodeRole::kPrimary + : NodeInfo::NodeRole::kReplica; + if (flags & VALKEYMODULE_NODE_MYSELF) { + node_info.location = NodeInfo::NodeLocation::kLocal; + node_info.address = ""; + } else { + node_info.location = NodeInfo::NodeLocation::kRemote; + node_info.address = + absl::StrCat(ip, ":", coordinator::GetCoordinatorPort(port)); + } + selected_targets.push_back(std::move(node_info)); + } + } else { + CHECK(target_mode == FanoutTargetMode::kRandom || + target_mode == FanoutTargetMode::kReplicasOnly); + // Original logic: group master and replica into shards and randomly + // select one, unless confined to replicas only + absl::flat_hash_map> + shard_id_to_node_indices; + + for (size_t i = 0; i < num_nodes; ++i) { + std::string node_id(nodes.get()[i], VALKEYMODULE_NODE_ID_LEN); + char ip[INET6_ADDRSTRLEN] = ""; + char master_id[VALKEYMODULE_NODE_ID_LEN] = ""; + int port; + int flags; + if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), ip, master_id, + &port, &flags) != VALKEYMODULE_OK) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Failed to get node info for node " << node_id + << ", skipping node..."; + continue; + } + auto master_id_str = std::string(master_id, VALKEYMODULE_NODE_ID_LEN); + if (flags & (VALKEYMODULE_NODE_PFAIL | VALKEYMODULE_NODE_FAIL)) { + VMSDK_LOG_EVERY_N_SEC(DEBUG, ctx, 1) + << "Node " << node_id << " (" << ip + << ") is failing, skipping for fanout..."; + continue; + } + if (flags & VALKEYMODULE_NODE_MASTER) { + master_id_str = node_id; + if (target_mode == FanoutTargetMode::kReplicasOnly) { + continue; + } + } + + // Store only the node index + shard_id_to_node_indices[master_id_str].push_back(i); + } + + // Random selection first, then create only the selected target objects + absl::BitGen gen; + for (const auto& [shard_id, node_indices] : shard_id_to_node_indices) { + size_t index = absl::Uniform(gen, 0u, node_indices.size()); + size_t selected_node_index = node_indices.at(index); + + // Re-fetch node info only for the selected node + std::string node_id(nodes.get()[selected_node_index], + VALKEYMODULE_NODE_ID_LEN); + char ip[INET6_ADDRSTRLEN] = ""; + char master_id[VALKEYMODULE_NODE_ID_LEN] = ""; + int port; + int flags; + if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), ip, master_id, + &port, &flags) != VALKEYMODULE_OK) { + continue; + } + + // Create target object only for the selected node + NodeInfo node_info; + node_info.node_id = node_id; + node_info.role = (flags & VALKEYMODULE_NODE_MASTER) + ? NodeInfo::NodeRole::kPrimary + : NodeInfo::NodeRole::kReplica; + if (flags & VALKEYMODULE_NODE_MYSELF) { + node_info.location = NodeInfo::NodeLocation::kLocal; + node_info.address = ""; + } else { + node_info.location = NodeInfo::NodeLocation::kRemote; + node_info.address = + absl::StrCat(ip, ":", coordinator::GetCoordinatorPort(port)); + } + selected_targets.push_back(std::move(node_info)); + } + } + return selected_targets; +} + +void PrintReplyStructure(ValkeyModuleCallReply* reply, int depth = 0) { + if (!reply) { + VMSDK_LOG(NOTICE, nullptr) << std::string(depth * 2, ' ') << "NULL"; + return; + } + + std::string indent(depth * 2, ' '); + int reply_type = ValkeyModule_CallReplyType(reply); + + switch (reply_type) { + case VALKEYMODULE_REPLY_ARRAY: { + size_t len = ValkeyModule_CallReplyLength(reply); + VMSDK_LOG(NOTICE, nullptr) << indent << "ARRAY[" << len << "] {"; + for (size_t i = 0; i < len; i++) { + VMSDK_LOG(NOTICE, nullptr) << indent << " [" << i << "]:"; + ValkeyModuleCallReply* element = + ValkeyModule_CallReplyArrayElement(reply, i); + PrintReplyStructure(element, depth + 2); + } + VMSDK_LOG(NOTICE, nullptr) << indent << "}"; + break; + } + case VALKEYMODULE_REPLY_STRING: { + size_t len; + const char* str = ValkeyModule_CallReplyStringPtr(reply, &len); + VMSDK_LOG(NOTICE, nullptr) + << indent << "STRING: \"" << std::string(str, len) << "\""; + break; + } + case VALKEYMODULE_REPLY_INTEGER: { + long long val = ValkeyModule_CallReplyInteger(reply); + VMSDK_LOG(NOTICE, nullptr) << indent << "INTEGER: " << val; + break; + } + case VALKEYMODULE_REPLY_ERROR: { + size_t len; + const char* str = ValkeyModule_CallReplyStringPtr(reply, &len); + VMSDK_LOG(NOTICE, nullptr) + << indent << "ERROR: \"" << std::string(str, len) << "\""; + break; + } + default: + VMSDK_LOG(NOTICE, nullptr) << indent << "UNKNOWN_TYPE: " << reply_type; + break; + } +} + +// create a new cluster map in the background +std::shared_ptr ClusterMap::CreateNewClusterMap( + ValkeyModuleCtx* ctx) { + auto new_map = std::shared_ptr(new ClusterMap()); + + // Pre-compute all target lists + new_map->primary_targets_ = GetTargets(ctx, FanoutTargetMode::kPrimary); + new_map->replica_targets_ = GetTargets(ctx, FanoutTargetMode::kReplicasOnly); + new_map->all_targets_ = GetTargets(ctx, FanoutTargetMode::kAll); + + // call CLUSTER_SLOTS from Valkey Module API + auto reply = vmsdk::UniquePtrValkeyCallReply( + ValkeyModule_Call(ctx, "CLUSTER", "c", "SLOTS")); + if (reply == nullptr) { + // if Valkey Module API returns nullptr, return an empty map + VMSDK_IO_LOG_EVERY_N_SEC(WARNING, nullptr, 1) + << "CLUSTER_MAP_ERROR: CLUSTER SLOTS call returns nullptr"; + new_map->is_cluster_map_full_ = false; + return new_map; + } + + auto reply_type = ValkeyModule_CallReplyType(reply.get()); + if (reply_type != VALKEYMODULE_REPLY_ARRAY) { + // if Valkey Module API returns incorrect type, return an empty map + VMSDK_IO_LOG_EVERY_N_SEC(WARNING, nullptr, 1) + << "CLUSTER_MAP_ERROR: CLUSTER SLOTS call returns incorrect type, " + "expect VALKEYMODULE_REPLY_ARRAY but got " + << reply_type; + new_map->is_cluster_map_full_ = false; + return new_map; + } + + // // Print the entire reply structure + // VMSDK_LOG(NOTICE, nullptr) << "=== CLUSTER SLOTS Reply Structure ==="; + // PrintReplyStructure(reply.get()); + // VMSDK_LOG(NOTICE, nullptr) << "=== End of CLUSTER SLOTS Reply ==="; + + // Track which slots have been assigned across the entire cluster + std::bitset assigned_slots; + + // Get local node ID to identify which shard we belong to + char my_node_id[VALKEYMODULE_NODE_ID_LEN]; + size_t len = ValkeyModule_CallReplyLength(reply.get()); + + // reply is an array of arrays + // each array element should contain at least 3 elements + // (1) start slot range (2) end slot range (3) master info (4)... replica info + for (size_t i = 0; i < len; ++i) { + ValkeyModuleCallReply* slot_range = + ValkeyModule_CallReplyArrayElement(reply.get(), i); + if (!slot_range || + ValkeyModule_CallReplyType(slot_range) != VALKEYMODULE_REPLY_ARRAY) { + continue; + } + size_t slot_len = ValkeyModule_CallReplyLength(slot_range); + // each array element should have at least 3 elements + if (slot_len < 3) { + continue; + } + // Get start and end slots + ValkeyModuleCallReply* start_slot = + ValkeyModule_CallReplyArrayElement(slot_range, 0); + ValkeyModuleCallReply* end_slot = + ValkeyModule_CallReplyArrayElement(slot_range, 1); + long long start = ValkeyModule_CallReplyInteger(start_slot); + long long end = ValkeyModule_CallReplyInteger(end_slot); + // Mark slots as assigned in cluster + std::set slot_set; + for (long long slot = start; slot <= end; slot++) { + if (slot >= 0 && slot < k_num_slots) { + assigned_slots[slot] = true; + slot_set.insert(static_cast(slot)); + } + } + // node info is an array + // (1) address (2) port number (3) node_id (4) hostname (optional) + // Get master node ID + ValkeyModuleCallReply* master_node = + ValkeyModule_CallReplyArrayElement(slot_range, 2); + if (!master_node || ValkeyModule_CallReplyLength(master_node) < 3) { + continue; + } + size_t master_id_len; + const char* master_id_str = ValkeyModule_CallReplyStringPtr( + ValkeyModule_CallReplyArrayElement(master_node, 2), &master_id_len); + std::string master_id(master_id_str, master_id_len); + + // Check if any node in this shard is local + bool is_local_shard = false; + for (size_t j = 2; j < slot_len; j++) { + ValkeyModuleCallReply* node = + ValkeyModule_CallReplyArrayElement(slot_range, j); + if (!node || ValkeyModule_CallReplyLength(node) < 3) continue; + + size_t node_id_len; + const char* node_id_str = ValkeyModule_CallReplyStringPtr( + ValkeyModule_CallReplyArrayElement(node, 2), &node_id_len); + std::string node_id(node_id_str, node_id_len); + + char ip[INET6_ADDRSTRLEN]; + char master_buf[VALKEYMODULE_NODE_ID_LEN]; + int port, flags; + if (ValkeyModule_GetClusterNodeInfo(ctx, node_id.c_str(), ip, master_buf, + &port, &flags) == VALKEYMODULE_OK && + (flags & VALKEYMODULE_NODE_MYSELF)) { + is_local_shard = true; + break; + } + } + // Mark owned slots + if (is_local_shard) { + for (long long slot = start; slot <= end; slot++) { + if (slot >= 0 && slot < k_num_slots) { + new_map->owned_slots_[slot] = true; + } + } + } + // Update shards map + auto it = new_map->shards_.find(master_id); + if (it == new_map->shards_.end()) { + ShardInfo shard; + shard.shard_id = master_id; + shard.owned_slots = slot_set; + shard.slots_fingerprint = 0; + new_map->shards_[master_id] = std::move(shard); + } else { + it->second.owned_slots.insert(slot_set.begin(), slot_set.end()); + } + } + new_map->is_cluster_map_full_ = assigned_slots.all(); + + // Log cluster map state + VMSDK_LOG(NOTICE, nullptr) << "=== Cluster Map Created ==="; + VMSDK_LOG(NOTICE, nullptr) + << "is_cluster_map_full_: " << new_map->is_cluster_map_full_; + + // Log owned_slots_ + size_t owned_count = new_map->owned_slots_.count(); + VMSDK_LOG(NOTICE, nullptr) << "owned_slots_ count: " << owned_count; + if (owned_count > 0) { + std::string owned_ranges; + int range_start = -1; + int range_end = -1; + for (size_t i = 0; i < k_num_slots; i++) { + if (new_map->owned_slots_[i]) { + if (range_start == -1) { + range_start = i; + range_end = i; + } else if (i == range_end + 1) { + range_end = i; + } else { + if (!owned_ranges.empty()) owned_ranges += ", "; + owned_ranges += + std::to_string(range_start) + "-" + std::to_string(range_end); + range_start = i; + range_end = i; + } + } + } + if (range_start != -1) { + if (!owned_ranges.empty()) owned_ranges += ", "; + owned_ranges += + std::to_string(range_start) + "-" + std::to_string(range_end); + } + VMSDK_LOG(NOTICE, nullptr) << "owned_slots_ ranges: " << owned_ranges; + } + + // Log shards_ + VMSDK_LOG(NOTICE, nullptr) << "shards_ count: " << new_map->shards_.size(); + for (const auto& [shard_id, shard_info] : new_map->shards_) { + VMSDK_LOG(NOTICE, nullptr) << "Shard ID: " << shard_id; + VMSDK_LOG(NOTICE, nullptr) + << " owned_slots count: " << shard_info.owned_slots.size(); + if (!shard_info.owned_slots.empty()) { + std::string slot_ranges; + auto it = shard_info.owned_slots.begin(); + int range_start = *it; + int range_end = *it; + ++it; + for (; it != shard_info.owned_slots.end(); ++it) { + if (*it == range_end + 1) { + range_end = *it; + } else { + if (!slot_ranges.empty()) slot_ranges += ", "; + slot_ranges += + std::to_string(range_start) + "-" + std::to_string(range_end); + range_start = *it; + range_end = *it; + } + } + if (!slot_ranges.empty()) slot_ranges += ", "; + slot_ranges += + std::to_string(range_start) + "-" + std::to_string(range_end); + VMSDK_LOG(NOTICE, nullptr) << " slot ranges: " << slot_ranges; + } + } + + VMSDK_LOG(NOTICE, nullptr) << "=== End Cluster Map ==="; + + return new_map; +} + +} // namespace cluster_map +} // namespace vmsdk diff --git a/vmsdk/src/cluster_map.h b/vmsdk/src/cluster_map.h new file mode 100644 index 000000000..bd3155d3f --- /dev/null +++ b/vmsdk/src/cluster_map.h @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2025, valkey-search contributors + * All rights reserved. + * SPDX-License-Identifier: BSD 3-Clause + * + */ + +#ifndef VMSDK_SRC_CLUSTER_MAP_H_ +#define VMSDK_SRC_CLUSTER_MAP_H_ + +#include +#include +#include +#include +#include +#include + +#include "absl/container/flat_hash_map.h" +#include "vmsdk/src/valkey_module_api/valkey_module.h" + +namespace vmsdk { +namespace cluster_map { + +const size_t k_num_slots = 16384; + +// Enumeration for fanout target modes +enum class FanoutTargetMode { + kRandom, // Default: randomly select one node per shard + kReplicasOnly, // Select only replicas, one per shard + kPrimary, // Select all primary (master) nodes + kAll // Select all nodes (both primary and replica) +}; + +struct NodeInfo { + enum NodeRole { kPrimary, kReplica }; + enum NodeLocation { + kLocal, + kRemote, + }; + std::string node_id; + NodeRole role; + NodeLocation location; + // Empty string if location is kLocal. + std::string address; + + bool operator==(const NodeInfo& other) const { + return role == other.role && location == other.location && + address == other.address; + } + + friend std::ostream& operator<<(std::ostream& os, const NodeInfo& target) { + os << "NodeInfo{role: " << target.role << ", location: " << target.location + << ", address: " << target.address << "}"; + return os; + } +}; + +struct ShardInfo { + // shard_id is the primary node id + std::string shard_id; + // primary node can be empty + std::optional primary; + std::vector replicas; + std::set owned_slots; + // Hash of owned_slots vector + uint64_t slots_fingerprint; +}; + +class ClusterMap { + public: + // Create a new cluster map by querying current cluster state + // This builds the map in the background and can be called from any thread + static std::shared_ptr CreateNewClusterMap(ValkeyModuleCtx* ctx); + + // return pre-generated target vectors + const std::vector& GetPrimaryTargets() const; + const std::vector& GetReplicaTargets() const; + const std::vector& GetAllTargets() const; + + bool GetIsClusterMapFull() const; + + // generate a random targets vector from cluster bus + std::vector GetRandomTargets(ValkeyModuleCtx* ctx); + + // slot ownership checks + bool IsSlotOwned(uint16_t slot) const; + + // shard lookups, will return nullptr if shard does not exist + const ShardInfo* GetShardById(std::string_view shard_id) const; + const absl::flat_hash_map& GetAllShards() const; + + // get cluster level slot fingerprint + uint64_t GetClusterSlotsFingerprint() const; + + private: + // 1: slot is owned by this cluster, 0: slot is not owned by this cluster + std::bitset owned_slots_; + + absl::flat_hash_map shards_; + + // Cluster-level fingerprint (hash of all shard fingerprints) + uint64_t cluster_slots_fingerprint_; + + bool is_cluster_map_full_; + + // Pre-computed target lists + std::vector primary_targets_; + std::vector replica_targets_; + std::vector all_targets_; + + // private helper function to refresh targets in CreateNewClusterMap + static std::vector GetTargets(ValkeyModuleCtx* ctx, + FanoutTargetMode target_mode); +}; + +} // namespace cluster_map +} // namespace vmsdk + +#endif // VMSDK_SRC_CLUSTER_MAP_H_