Skip to content

Commit 46197f6

Browse files
committed
update cluster map interface v2
Signed-off-by: Miles Song <[email protected]>
1 parent 8b81ad9 commit 46197f6

12 files changed

+449
-84
lines changed

src/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ target_link_libraries(valkey_search PUBLIC utils)
8080
target_link_libraries(valkey_search PUBLIC status_macros)
8181
target_link_libraries(valkey_search PUBLIC valkey_module)
8282
target_link_libraries(valkey_search PUBLIC acl)
83+
target_link_libraries(valkey_search PUBLIC cluster_map)
8384

8485
set(SRCS_KEYSPACE_EVENT_MANAGER
8586
${CMAKE_CURRENT_LIST_DIR}/keyspace_event_manager.cc

src/commands/ft_dropindex.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
#include "src/schema_manager.h"
1414
#include "src/valkey_search.h"
1515
#include "src/valkey_search_options.h"
16+
#include "vmsdk/src/cluster_map.h"
1617
#include "vmsdk/src/status/status_macros.h"
1718
#include "vmsdk/src/type_conversions.h"
1819
#include "vmsdk/src/utils.h"
@@ -24,32 +25,36 @@ class DropConsistencyCheckFanoutOperation
2425
: public query::fanout::FanoutOperationBase<
2526
coordinator::InfoIndexPartitionRequest,
2627
coordinator::InfoIndexPartitionResponse,
27-
query::fanout::FanoutTargetMode::kAll> {
28+
vmsdk::cluster_map::FanoutTargetMode::kAll> {
2829
public:
2930
DropConsistencyCheckFanoutOperation(uint32_t db_num,
3031
const std::string& index_name,
3132
unsigned timeout_ms)
3233
: query::fanout::FanoutOperationBase<
3334
coordinator::InfoIndexPartitionRequest,
3435
coordinator::InfoIndexPartitionResponse,
35-
query::fanout::FanoutTargetMode::kAll>(),
36+
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
3637
db_num_(db_num),
3738
index_name_(index_name),
3839
timeout_ms_(timeout_ms){};
3940

41+
std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const {
42+
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
43+
}
44+
4045
unsigned GetTimeoutMs() const override { return timeout_ms_; }
4146

4247
coordinator::InfoIndexPartitionRequest GenerateRequest(
43-
const query::fanout::FanoutSearchTarget&) override {
48+
const vmsdk::cluster_map::NodeInfo&) override {
4449
coordinator::InfoIndexPartitionRequest req;
4550
req.set_db_num(db_num_);
4651
req.set_index_name(index_name_);
4752
return req;
4853
}
4954

50-
void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
51-
[[maybe_unused]] const query::fanout::FanoutSearchTarget&
52-
target) override {
55+
void OnResponse(
56+
const coordinator::InfoIndexPartitionResponse& resp,
57+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) override {
5358
// if the index exist on some node and returns a valid response, treat it as
5459
// inconsistent error
5560
absl::MutexLock lock(&mutex_);
@@ -59,7 +64,7 @@ class DropConsistencyCheckFanoutOperation
5964
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
6065
GetLocalResponse(
6166
const coordinator::InfoIndexPartitionRequest& request,
62-
[[maybe_unused]] const query::fanout::FanoutSearchTarget&) override {
67+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override {
6368
return coordinator::Service::GenerateInfoResponse(request);
6469
}
6570

src/query/cluster_info_fanout_operation.cc

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
1616
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
1717
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
1818
coordinator::InfoIndexPartitionResponse,
19-
fanout::FanoutTargetMode::kAll>(),
19+
vmsdk::cluster_map::FanoutTargetMode::kAll>(),
2020
db_num_(db_num),
2121
index_name_(index_name),
2222
timeout_ms_(timeout_ms),
@@ -25,12 +25,18 @@ ClusterInfoFanoutOperation::ClusterInfoFanoutOperation(
2525
backfill_complete_percent_min_(0.0f),
2626
backfill_in_progress_(false) {}
2727

28+
std::vector<vmsdk::cluster_map::NodeInfo>
29+
ClusterInfoFanoutOperation::GetTargets() const {
30+
return ValkeySearch::Instance().GetClusterMap()->GetAllTargets();
31+
}
32+
2833
unsigned ClusterInfoFanoutOperation::GetTimeoutMs() const {
2934
return timeout_ms_;
3035
}
3136

3237
coordinator::InfoIndexPartitionRequest
33-
ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
38+
ClusterInfoFanoutOperation::GenerateRequest(
39+
const vmsdk::cluster_map::NodeInfo&) {
3440
coordinator::InfoIndexPartitionRequest req;
3541
req.set_db_num(db_num_);
3642
req.set_index_name(index_name_);
@@ -39,7 +45,7 @@ ClusterInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
3945

4046
void ClusterInfoFanoutOperation::OnResponse(
4147
const coordinator::InfoIndexPartitionResponse& resp,
42-
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
48+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
4349
if (!resp.error().empty()) {
4450
grpc::Status status =
4551
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
@@ -111,7 +117,7 @@ void ClusterInfoFanoutOperation::OnResponse(
111117
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
112118
ClusterInfoFanoutOperation::GetLocalResponse(
113119
const coordinator::InfoIndexPartitionRequest& request,
114-
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
120+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
115121
return coordinator::Service::GenerateInfoResponse(request);
116122
}
117123

src/query/cluster_info_fanout_operation.h

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,34 @@
1717
#include "src/coordinator/coordinator.pb.h"
1818
#include "src/query/fanout_operation_base.h"
1919
#include "src/query/fanout_template.h"
20+
#include "vmsdk/src/cluster_map.h"
2021

2122
namespace valkey_search::query::cluster_info_fanout {
2223

23-
class ClusterInfoFanoutOperation : public fanout::FanoutOperationBase<
24-
coordinator::InfoIndexPartitionRequest,
25-
coordinator::InfoIndexPartitionResponse,
26-
fanout::FanoutTargetMode::kAll> {
24+
class ClusterInfoFanoutOperation
25+
: public fanout::FanoutOperationBase<
26+
coordinator::InfoIndexPartitionRequest,
27+
coordinator::InfoIndexPartitionResponse,
28+
vmsdk::cluster_map::FanoutTargetMode::kAll> {
2729
public:
2830
ClusterInfoFanoutOperation(uint32_t db_num, const std::string& index_name,
2931
unsigned timeout_ms);
3032

33+
std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const override;
34+
3135
unsigned GetTimeoutMs() const override;
3236

3337
coordinator::InfoIndexPartitionRequest GenerateRequest(
34-
const fanout::FanoutSearchTarget&) override;
38+
const vmsdk::cluster_map::NodeInfo&) override;
3539

36-
void OnResponse(const coordinator::InfoIndexPartitionResponse& resp,
37-
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
40+
void OnResponse(
41+
const coordinator::InfoIndexPartitionResponse& resp,
42+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;
3843

3944
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
40-
GetLocalResponse(const coordinator::InfoIndexPartitionRequest& request,
41-
[[maybe_unused]] const fanout::FanoutSearchTarget&) override;
45+
GetLocalResponse(
46+
const coordinator::InfoIndexPartitionRequest& request,
47+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) override;
4248

4349
void InvokeRemoteRpc(
4450
coordinator::Client* client,

src/query/fanout_operation_base.h

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "src/utils/cancel.h"
2222
#include "src/valkey_search.h"
2323
#include "vmsdk/src/blocked_client.h"
24+
#include "vmsdk/src/cluster_map.h"
2425
#include "vmsdk/src/debug.h"
2526
#include "vmsdk/src/log.h"
2627
#include "vmsdk/src/module_config.h"
@@ -30,7 +31,8 @@ namespace valkey_search::query::fanout {
3031

3132
constexpr unsigned kNoValkeyTimeout = 86400000;
3233

33-
template <typename Request, typename Response, FanoutTargetMode kTargetMode>
34+
template <typename Request, typename Response,
35+
vmsdk::cluster_map::FanoutTargetMode kTargetMode>
3436
class FanoutOperationBase {
3537
public:
3638
explicit FanoutOperationBase() = default;
@@ -43,7 +45,19 @@ class FanoutOperationBase {
4345
blocked_client_->MeasureTimeStart();
4446
deadline_tp_ = std::chrono::steady_clock::now() +
4547
std::chrono::milliseconds(GetTimeoutMs());
46-
targets_ = GetTargets(ctx);
48+
49+
targets_ = GetTargets();
50+
// If we only got 1 local target, the cluster map is likely stale
51+
// Refresh it to get the full cluster topology
52+
if (targets_.size() == 1 &&
53+
targets_[0].location == vmsdk::cluster_map::NodeInfo::kLocal) {
54+
VMSDK_LOG(NOTICE, ctx)
55+
<< "Only 1 local target found in cluster mode, "
56+
<< "refreshing cluster map to get full topology...";
57+
ValkeySearch::Instance().RefreshClusterMap(ctx);
58+
VMSDK_LOG(NOTICE, ctx) << "Cluster map refreshed";
59+
}
60+
4761
StartFanoutRound();
4862
}
4963

@@ -77,6 +91,7 @@ class FanoutOperationBase {
7791
}
7892

7993
void StartFanoutRound() {
94+
targets_ = GetTargets();
8095
outstanding_ = targets_.size();
8196
unsigned timeout_ms = GetTimeoutMs();
8297
for (const auto& target : targets_) {
@@ -85,16 +100,14 @@ class FanoutOperationBase {
85100
}
86101
}
87102

88-
std::vector<FanoutSearchTarget> GetTargets(ValkeyModuleCtx* ctx) const {
89-
return query::fanout::FanoutTemplate::GetTargets(ctx, kTargetMode);
90-
}
103+
virtual std::vector<vmsdk::cluster_map::NodeInfo> GetTargets() const = 0;
91104

92-
void IssueRpc(const FanoutSearchTarget& target, const Request& request,
93-
unsigned timeout_ms) {
105+
void IssueRpc(const vmsdk::cluster_map::NodeInfo& target,
106+
const Request& request, unsigned timeout_ms) {
94107
coordinator::ClientPool* client_pool_ =
95108
ValkeySearch::Instance().GetCoordinatorClientPool();
96109

97-
if (target.type == FanoutSearchTarget::Type::kLocal) {
110+
if (target.location == vmsdk::cluster_map::NodeInfo::kLocal) {
98111
vmsdk::RunByMain([this, target, request]() {
99112
auto [status, resp] = this->GetLocalResponse(request, target);
100113
if (status.ok()) {
@@ -151,7 +164,7 @@ class FanoutOperationBase {
151164
}
152165

153166
virtual std::pair<grpc::Status, Response> GetLocalResponse(
154-
const Request&, [[maybe_unused]] const FanoutSearchTarget&) = 0;
167+
const Request&, [[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;
155168

156169
virtual void InvokeRemoteRpc(coordinator::Client*, const Request&,
157170
std::function<void(grpc::Status, Response&)>,
@@ -160,14 +173,15 @@ class FanoutOperationBase {
160173
virtual unsigned GetTimeoutMs() const = 0;
161174

162175
virtual Request GenerateRequest(
163-
[[maybe_unused]] const FanoutSearchTarget&) = 0;
176+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;
164177

165-
virtual void OnResponse(const Response&,
166-
[[maybe_unused]] const FanoutSearchTarget&) = 0;
178+
virtual void OnResponse(
179+
const Response&,
180+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo&) = 0;
167181

168182
virtual void OnError(grpc::Status status,
169183
coordinator::FanoutErrorType error_type,
170-
const FanoutSearchTarget& target) {
184+
const vmsdk::cluster_map::NodeInfo& target) {
171185
absl::MutexLock lock(&mutex_);
172186
if (error_type == coordinator::FanoutErrorType::INDEX_NAME_ERROR) {
173187
index_name_error_nodes.push_back(target);
@@ -205,8 +219,9 @@ class FanoutOperationBase {
205219
// Log index name errors
206220
if (!index_name_error_nodes.empty()) {
207221
error_message = "Index name not found.";
208-
for (const FanoutSearchTarget& target : index_name_error_nodes) {
209-
if (target.type == FanoutSearchTarget::Type::kLocal) {
222+
for (const vmsdk::cluster_map::NodeInfo& target :
223+
index_name_error_nodes) {
224+
if (target.location == vmsdk::cluster_map::NodeInfo::kLocal) {
210225
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
211226
<< INDEX_NAME_ERROR_LOG_PREFIX << "LOCAL NODE";
212227
} else {
@@ -218,8 +233,9 @@ class FanoutOperationBase {
218233
// Log communication errors
219234
if (!communication_error_nodes.empty()) {
220235
error_message = "Communication error between nodes found.";
221-
for (const FanoutSearchTarget& target : communication_error_nodes) {
222-
if (target.type == FanoutSearchTarget::Type::kLocal) {
236+
for (const vmsdk::cluster_map::NodeInfo& target :
237+
communication_error_nodes) {
238+
if (target.location == vmsdk::cluster_map::NodeInfo::kLocal) {
223239
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
224240
<< COMMUNICATION_ERROR_LOG_PREFIX << "LOCAL NODE";
225241
} else {
@@ -231,8 +247,9 @@ class FanoutOperationBase {
231247
// Log inconsistent state errors
232248
if (!inconsistent_state_error_nodes.empty()) {
233249
error_message = "Inconsistent index state error found.";
234-
for (const FanoutSearchTarget& target : inconsistent_state_error_nodes) {
235-
if (target.type == FanoutSearchTarget::Type::kLocal) {
250+
for (const vmsdk::cluster_map::NodeInfo& target :
251+
inconsistent_state_error_nodes) {
252+
if (target.location == vmsdk::cluster_map::NodeInfo::kLocal) {
236253
VMSDK_LOG_EVERY_N_SEC(WARNING, ctx, 1)
237254
<< INCONSISTENT_STATE_ERROR_LOG_PREFIX << "LOCAL NODE";
238255
} else {
@@ -286,10 +303,10 @@ class FanoutOperationBase {
286303
unsigned outstanding_{0};
287304
absl::Mutex mutex_;
288305
std::unique_ptr<vmsdk::BlockedClient> blocked_client_;
289-
std::vector<FanoutSearchTarget> index_name_error_nodes;
290-
std::vector<FanoutSearchTarget> inconsistent_state_error_nodes;
291-
std::vector<FanoutSearchTarget> communication_error_nodes;
292-
std::vector<FanoutSearchTarget> targets_;
306+
std::vector<vmsdk::cluster_map::NodeInfo> index_name_error_nodes;
307+
std::vector<vmsdk::cluster_map::NodeInfo> inconsistent_state_error_nodes;
308+
std::vector<vmsdk::cluster_map::NodeInfo> communication_error_nodes;
309+
std::vector<vmsdk::cluster_map::NodeInfo> targets_;
293310
std::chrono::steady_clock::time_point deadline_tp_;
294311
bool timeout_occurred_ = false;
295312
};

src/query/primary_info_fanout_operation.cc

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,16 @@
99

1010
#include "src/coordinator/metadata_manager.h"
1111
#include "src/schema_manager.h"
12+
#include "vmsdk/src/cluster_map.h"
1213

1314
namespace valkey_search::query::primary_info_fanout {
1415

1516
PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
1617
uint32_t db_num, const std::string& index_name, unsigned timeout_ms)
17-
: fanout::FanoutOperationBase<coordinator::InfoIndexPartitionRequest,
18-
coordinator::InfoIndexPartitionResponse,
19-
fanout::FanoutTargetMode::kPrimary>(),
18+
: fanout::FanoutOperationBase<
19+
coordinator::InfoIndexPartitionRequest,
20+
coordinator::InfoIndexPartitionResponse,
21+
vmsdk::cluster_map::FanoutTargetMode::kPrimary>(),
2022
db_num_(db_num),
2123
index_name_(index_name),
2224
timeout_ms_(timeout_ms),
@@ -25,12 +27,22 @@ PrimaryInfoFanoutOperation::PrimaryInfoFanoutOperation(
2527
num_records_(0),
2628
hash_indexing_failures_(0) {}
2729

30+
std::vector<vmsdk::cluster_map::NodeInfo>
31+
PrimaryInfoFanoutOperation::GetTargets() const {
32+
auto targets = ValkeySearch::Instance().GetClusterMap()->GetPrimaryTargets();
33+
for (size_t i = 0; i < targets.size(); ++i) {
34+
VMSDK_LOG(NOTICE, nullptr) << "Target[" << i << "]: " << targets[i];
35+
}
36+
return targets;
37+
}
38+
2839
unsigned PrimaryInfoFanoutOperation::GetTimeoutMs() const {
2940
return timeout_ms_;
3041
}
3142

3243
coordinator::InfoIndexPartitionRequest
33-
PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
44+
PrimaryInfoFanoutOperation::GenerateRequest(
45+
const vmsdk::cluster_map::NodeInfo&) {
3446
coordinator::InfoIndexPartitionRequest req;
3547
req.set_db_num(db_num_);
3648
req.set_index_name(index_name_);
@@ -39,7 +51,7 @@ PrimaryInfoFanoutOperation::GenerateRequest(const fanout::FanoutSearchTarget&) {
3951

4052
void PrimaryInfoFanoutOperation::OnResponse(
4153
const coordinator::InfoIndexPartitionResponse& resp,
42-
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
54+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
4355
if (!resp.error().empty()) {
4456
grpc::Status status =
4557
grpc::Status(grpc::StatusCode::INTERNAL, resp.error());
@@ -96,7 +108,7 @@ void PrimaryInfoFanoutOperation::OnResponse(
96108
std::pair<grpc::Status, coordinator::InfoIndexPartitionResponse>
97109
PrimaryInfoFanoutOperation::GetLocalResponse(
98110
const coordinator::InfoIndexPartitionRequest& request,
99-
[[maybe_unused]] const fanout::FanoutSearchTarget& target) {
111+
[[maybe_unused]] const vmsdk::cluster_map::NodeInfo& target) {
100112
return coordinator::Service::GenerateInfoResponse(request);
101113
}
102114

0 commit comments

Comments
 (0)