From a1b12a5c4ae6206bb3e45cc2ea6d9a5b686c3a55 Mon Sep 17 00:00:00 2001 From: bertramlin Date: Tue, 17 Jun 2025 11:06:20 +0800 Subject: [PATCH] add command "addslotsrange" and "delslotsrange" --- src/tendisplus/cluster/cluster_manager.cpp | 2 +- src/tendisplus/cluster/cluster_test.cpp | 119 ++++++++++++++- src/tendisplus/commands/cluster.cpp | 165 +++++++++++++++------ src/tendisplus/utils/test_util.cpp | 22 +++ src/tendisplus/utils/test_util.h | 5 +- 5 files changed, 259 insertions(+), 54 deletions(-) diff --git a/src/tendisplus/cluster/cluster_manager.cpp b/src/tendisplus/cluster/cluster_manager.cpp index ac2423d3..99567b6b 100644 --- a/src/tendisplus/cluster/cluster_manager.cpp +++ b/src/tendisplus/cluster/cluster_manager.cpp @@ -555,7 +555,7 @@ std::bitset ClusterNode::getSlots() const { std::lock_guard lk(_mutex); return _mySlots; } -// Now the function is only called by ClusterState::clusterSaveNodes() + std::vector ClusterNode::getSlotsVec() { std::lock_guard lk(_mutex); if (_slotsInfoIsOutOfDate) { diff --git a/src/tendisplus/cluster/cluster_test.cpp b/src/tendisplus/cluster/cluster_test.cpp index 76248b44..ba79d37c 100644 --- a/src/tendisplus/cluster/cluster_test.cpp +++ b/src/tendisplus/cluster/cluster_test.cpp @@ -981,7 +981,9 @@ bool compareNodeName(std::shared_ptr svr1, } // if slot set successfully , return ture -bool checkSlotInfo(std::shared_ptr node, std::string slots) { +bool checkSlotInfo(std::shared_ptr node, + std::string slots, + bool del) { auto slotInfo = node->getSlots(); if ((slots.find('{') != std::string::npos) && (slots.find('}') != std::string::npos)) { @@ -995,9 +997,12 @@ bool checkSlotInfo(std::shared_ptr node, std::string slots) { auto end = endSlot.value(); if (start < end) { for (size_t i = start; i < end; i++) { - if (!slotInfo.test(i)) { + if (!del && !slotInfo.test(i)) { LOG(ERROR) << "set slot" << i << "fail"; return false; + } else if (del && slotInfo.test(i)) { + LOG(ERROR) << "del slot" << i << "fail"; + return false; } } return true; @@ -1237,7 +1242,7 @@ TEST(Cluster, Random_Meet) { servers.clear(); } -TEST(Cluster, AddSlot) { +void ChangeSlotsTest() { std::vector dirs = {"node1", "node2"}; uint32_t startPort = 16300; @@ -1264,34 +1269,129 @@ TEST(Cluster, AddSlot) { WorkLoad work1(node1, sess1); work1.init(); + auto ctx2 = std::make_shared(); + auto sess2 = makeSession(node2, ctx2); + WorkLoad work2(node2, sess2); + work2.init(); + work1.clusterMeet(node2->getParams()->bindIp, node2->getParams()->port); waitClusterMeetEnd(servers); - + // addslots test std::vector slots = {"{0..8000}", "{8001..16383}"}; work1.addSlots(slots[0]); std::this_thread::sleep_for(std::chrono::seconds(10)); + work2.addSlots(slots[1]); + std::this_thread::sleep_for(std::chrono::seconds(10)); + + for (size_t i = 0; i < slots.size(); i++) { + auto nodePtr = + servers[i]->getClusterMgr()->getClusterState()->getMyselfNode(); + bool s = checkSlotInfo(nodePtr, slots[i], false); + EXPECT_TRUE(s); + } + + std::this_thread::sleep_for(std::chrono::seconds(10)); + for (auto svr : servers) { + compareClusterInfo(svr, node1); + } + // delslots test + std::vector delslots = {"{4000..8000}", "{15000..16383}"}; + work1.delSlots(delslots[0]); + std::this_thread::sleep_for(std::chrono::seconds(10)); + work2.delSlots(delslots[1]); + std::this_thread::sleep_for(std::chrono::seconds(10)); + for (size_t i = 0; i < delslots.size(); i++) { + auto nodePtr = + servers[i]->getClusterMgr()->getClusterState()->getMyselfNode(); + bool s = checkSlotInfo(nodePtr, delslots[i], true); + EXPECT_TRUE(s); + } + std::this_thread::sleep_for(std::chrono::seconds(10)); + +#ifndef _WIN32 + for (auto svr : servers) { + svr->stop(); + LOG(INFO) << "stop " << svr->getParams()->port << " success"; + } +#endif + servers.clear(); +} + +void RangeChangeSlotsTest() { + std::vector dirs = {"node1", "node2"}; + uint32_t startPort = 17300; + + const auto guard = MakeGuard([dirs] { + for (auto dir : dirs) { + destroyEnv(dir); + } + std::this_thread::sleep_for(std::chrono::seconds(5)); + }); + + std::vector> servers; + + uint32_t index = 0; + for (auto dir : dirs) { + uint32_t nodePort = startPort + index++; + servers.emplace_back(std::move(makeClusterNode(dir, nodePort, storeCnt))); + } + + auto& node1 = servers[0]; + auto& node2 = servers[1]; + + auto ctx1 = std::make_shared(); + auto sess1 = makeSession(node1, ctx1); + WorkLoad work1(node1, sess1); + work1.init(); auto ctx2 = std::make_shared(); auto sess2 = makeSession(node2, ctx2); WorkLoad work2(node2, sess2); work2.init(); - work2.addSlots(slots[1]); + work1.clusterMeet(node2->getParams()->bindIp, node2->getParams()->port); + waitClusterMeetEnd(servers); + + // addslotsrange test + std::vector> slots = {{0, 8000}, {8001, 16383}}; + work1.addSlotsRange(slots[0].first, slots[0].second); std::this_thread::sleep_for(std::chrono::seconds(10)); + work2.addSlotsRange(slots[1].first, slots[1].second); + std::this_thread::sleep_for(std::chrono::seconds(10)); for (size_t i = 0; i < slots.size(); i++) { auto nodePtr = servers[i]->getClusterMgr()->getClusterState()->getMyselfNode(); - bool s = checkSlotInfo(nodePtr, slots[i]); + bool s = checkSlotInfo(nodePtr, + "{" + std::to_string(slots[i].first) + ".." + + std::to_string(slots[i].second) + "}", + false); EXPECT_TRUE(s); } - std::this_thread::sleep_for(std::chrono::seconds(10)); for (auto svr : servers) { compareClusterInfo(svr, node1); } + // delslotsrange test + std::vector> delslots = {{4000, 8000}, + {15000, 16383}}; + work1.delSlotsRange(delslots[0].first, delslots[0].second); + std::this_thread::sleep_for(std::chrono::seconds(10)); + + work2.delSlotsRange(delslots[1].first, delslots[1].second); + std::this_thread::sleep_for(std::chrono::seconds(10)); + for (size_t i = 0; i < delslots.size(); i++) { + auto nodePtr = + servers[i]->getClusterMgr()->getClusterState()->getMyselfNode(); + bool s = checkSlotInfo(nodePtr, + "{" + std::to_string(delslots[i].first) + ".." + + std::to_string(delslots[i].second) + "}", + true); + EXPECT_TRUE(s); + } + std::this_thread::sleep_for(std::chrono::seconds(10)); #ifndef _WIN32 for (auto svr : servers) { svr->stop(); @@ -1301,6 +1401,11 @@ TEST(Cluster, AddSlot) { servers.clear(); } +TEST(Cluster, ChangeSlot) { + ChangeSlotsTest(); + RangeChangeSlotsTest(); +} + bool nodeIsMySlave(std::shared_ptr svr1, std::shared_ptr svr2) { if (svr1->getParams()->clusterEnabled && svr2->getParams()->clusterEnabled) { diff --git a/src/tendisplus/commands/cluster.cpp b/src/tendisplus/commands/cluster.cpp index b3218403..1f0c3460 100644 --- a/src/tendisplus/commands/cluster.cpp +++ b/src/tendisplus/commands/cluster.cpp @@ -136,7 +136,7 @@ class ClusterCommand : public Command { Expected exptSlot = ::tendisplus::stoll(vs); RET_IF_ERR_EXPECTED(exptSlot); - int32_t slot = (int32_t)exptSlot.value(); + int32_t slot = static_cast(exptSlot.value()); if (slot > CLUSTER_SLOTS - 1 || slot < 0) { LOG(ERROR) << "slot" << slot @@ -335,48 +335,17 @@ class ClusterCommand : public Command { return {ErrorCodes::ERR_CLUSTER, "Invalid cluster nodes info"}; } } else if ((arg1 == "addslots" || arg1 == "delslots") && argSize >= 3) { - if (myself->nodeIsSlave()) { - return {ErrorCodes::ERR_CLUSTER, - "slave node can not be addslot or delslot"}; - } - if (myself->nodeIsArbiter()) { - return {ErrorCodes::ERR_CLUSTER, "Can not add/del slots on arbiter."}; + Status s = processSlotRange(args, 2, arg1, svr, clusterState, myself); + if (!s.ok()) { + return s; } - for (size_t i = 2; i < argSize; ++i) { - if ((args[i].find('{') != std::string::npos) && - (args[i].find('}') != std::string::npos)) { - auto eRange = getSlotRange(args[i]); - - RET_IF_ERR_EXPECTED(eRange); - uint32_t start = eRange.value().first; - uint32_t end = eRange.value().second; - - if (svr->getParams()->clusterSingleNode && - (end - start) != (CLUSTER_SLOTS - 1)) { - return { - ErrorCodes::ERR_CLUSTER, - "You can only addslot 0..16383 when cluster-single-node is on"}; - } - - Status s = changeSlots(start, end, arg1, svr, clusterState, myself); - if (!s.ok()) { - LOG(ERROR) << "addslots fail from:" << start << "to:" << end; - return s; - } - } else { - auto slotInfo = ::tendisplus::stoul(args[i]); - - RET_IF_ERR_EXPECTED(slotInfo); - uint32_t slot = static_cast(slotInfo.value()); - Status s = changeSlot(slot, arg1, svr, clusterState, myself); - if (!s.ok()) { - LOG(ERROR) << "addslots:" << slot << "fail"; - return s; - } - } + return Command::fmtOK(); + } else if ((arg1 == "addslotsrange" || arg1 == "delslotsrange") && + argSize >= 4) { + Status s = processSlotRange(args, 2, arg1, svr, clusterState, myself); + if (!s.ok()) { + return s; } - clusterState->clusterSaveNodes(); - clusterState->clusterUpdateState(); return Command::fmtOK(); } else if (arg1 == "replicate" && argSize == 3) { if (!svr->getParams()->binlogEnabled) { @@ -445,7 +414,7 @@ class ClusterCommand : public Command { return {ErrorCodes::ERR_CLUSTER, "keyslot invalid!"}; } uint32_t hash = - uint32_t(redis_port::keyHashSlot(key.c_str(), key.size())); + static_cast(redis_port::keyHashSlot(key.c_str(), key.size())); return Command::fmtBulk(std::to_string(hash)); } else if (arg1 == "info" && argSize == 2) { std::string clusterInfo = clusterState->clusterGenStateDescription(); @@ -687,7 +656,7 @@ class ClusterCommand : public Command { if (start < end) { for (size_t i = start; i < end + 1; i++) { uint32_t index = static_cast(i); - if (arg == "addslots") { + if (arg == "addslots" || arg == "addslotsrange") { if (clusterState->_allSlots[index] != nullptr) { LOG(ERROR) << "slot" << index << "already busy"; continue; @@ -713,7 +682,7 @@ class ClusterCommand : public Command { << "to:" << end; return {ErrorCodes::ERR_CLUSTER, "ERR Invalid or out of range slot"}; } - return {ErrorCodes::ERR_OK, "finish addslots"}; + return {ErrorCodes::ERR_OK, "finish change slots"}; } Status changeSlot(uint32_t slot, @@ -742,7 +711,113 @@ class ClusterCommand : public Command { if (result == false) { return {ErrorCodes::ERR_CLUSTER, "del or add slot fail"}; } - return {ErrorCodes::ERR_OK, "finish add sigle slot"}; + return {ErrorCodes::ERR_OK, "finish change sigle slot"}; + } + + Status processSlotRange(const std::vector& args, + size_t startIndex, + const std::string& cmd, + ServerEntry* svr, + const std::shared_ptr clusterState, + const CNodePtr myself) { + // Check node role + if (myself->nodeIsSlave()) { + return {ErrorCodes::ERR_CLUSTER, "slave node cannot add or delete slots"}; + } + if (myself->nodeIsArbiter()) { + return {ErrorCodes::ERR_CLUSTER, "Cannot add or delete slots on arbiter"}; + } + + // Parse and process slot ranges or individual slots + if (cmd == "addslots" || cmd == "delslots") { + for (size_t i = startIndex; i < args.size(); ++i) { + if ((args[i].find('{') != std::string::npos) && + (args[i].find('}') != std::string::npos)) { + // Handle slot range + auto eRange = getSlotRange(args[i]); + RET_IF_ERR_EXPECTED(eRange); + uint32_t start = eRange.value().first; + uint32_t end = eRange.value().second; + + // Validate and process the slot range + Status s = processSlotRangeInternal( + start, end, cmd, svr, clusterState, myself); + if (!s.ok()) { + return s; + } + } else { + // Handle single slot + auto slotInfo = ::tendisplus::stoul(args[i]); + RET_IF_ERR_EXPECTED(slotInfo); + uint32_t slot = static_cast(slotInfo.value()); + + // Validate and process the single slot + Status s = changeSlot(slot, cmd, svr, clusterState, myself); + if (!s.ok()) { + LOG(ERROR) << cmd << ":" << slot << " failed"; + return s; + } + } + } + } else if (cmd == "addslotsrange" || cmd == "delslotsrange") { + if ((args.size() - startIndex) % 2 != 0) { + return {ErrorCodes::ERR_CLUSTER, "Invalid number of arguments"}; + } + + for (size_t i = startIndex; i < args.size(); i += 2) { + auto startInfo = ::tendisplus::stoul(args[i]); + RET_IF_ERR_EXPECTED(startInfo); + auto endInfo = ::tendisplus::stoul(args[i + 1]); + RET_IF_ERR_EXPECTED(endInfo); + uint32_t start = static_cast(startInfo.value()); + uint32_t end = static_cast(endInfo.value()); + + // Validate and process the slot range + Status s = + processSlotRangeInternal(start, end, cmd, svr, clusterState, myself); + if (!s.ok()) { + return s; + } + } + } else { + return {ErrorCodes::ERR_CLUSTER, "Invalid command for slot processing"}; + } + + // Save and update cluster state + clusterState->clusterSaveNodes(); + clusterState->clusterUpdateState(); + + return {ErrorCodes::ERR_OK, "All slots processed successfully"}; + } + + Status processSlotRangeInternal( + uint32_t start, + uint32_t end, + const std::string& cmd, + ServerEntry* svr, + const std::shared_ptr clusterState, + const CNodePtr myself) { + // Validate slot range + if (start > CLUSTER_SLOTS - 1 || end > CLUSTER_SLOTS - 1 || start > end) { + return {ErrorCodes::ERR_CLUSTER, "Invalid slot range"}; + } + + // Check cluster-single-node mode + if (svr->getParams()->clusterSingleNode && + (end - start) != (CLUSTER_SLOTS - 1)) { + return {ErrorCodes::ERR_CLUSTER, + "You can only addslots/delslots[range] from 0 to 16383 when " + "cluster-single-node is on"}; + } + + // Use changeSlots to process the range + Status s = changeSlots(start, end, cmd, svr, clusterState, myself); + if (!s.ok()) { + LOG(ERROR) << cmd << " failed for range: " << start << " to " << end; + return s; + } + + return {ErrorCodes::ERR_OK, "Slot range processed successfully"}; } std::string getKeys(ServerEntry* svr, uint32_t slot, uint32_t count) { diff --git a/src/tendisplus/utils/test_util.cpp b/src/tendisplus/utils/test_util.cpp index e49e53e2..d634b417 100644 --- a/src/tendisplus/utils/test_util.cpp +++ b/src/tendisplus/utils/test_util.cpp @@ -615,6 +615,28 @@ void WorkLoad::addSlots(const std::string& slotsBuff) { EXPECT_TRUE(expect.ok()); } +void WorkLoad::delSlots(const std::string& slotsBuff) { + _session->setArgs({"cluster", "delslots", slotsBuff}); + + auto expect = Command::runSessionCmd(_session.get()); + EXPECT_TRUE(expect.ok()); +} + +void WorkLoad::addSlotsRange(uint32_t start, uint32_t end) { + _session->setArgs( + {"cluster", "addslotsrange", std::to_string(start), std::to_string(end)}); + + auto expect = Command::runSessionCmd(_session.get()); + EXPECT_TRUE(expect.ok()); +} +void WorkLoad::delSlotsRange(uint32_t start, uint32_t end) { + _session->setArgs( + {"cluster", "delslotsrange", std::to_string(start), std::to_string(end)}); + + auto expect = Command::runSessionCmd(_session.get()); + EXPECT_TRUE(expect.ok()); +} + void WorkLoad::replicate(const std::string& nodeName) { _session->setArgs({"cluster", "replicate", nodeName}); diff --git a/src/tendisplus/utils/test_util.h b/src/tendisplus/utils/test_util.h index 297cded0..711c4cce 100644 --- a/src/tendisplus/utils/test_util.h +++ b/src/tendisplus/utils/test_util.h @@ -138,7 +138,7 @@ class WorkLoad { : _session(session), _max_key_len(32) {} void init() { - std::srand((uint32_t)msSinceEpoch()); + std::srand(static_cast(msSinceEpoch())); } KeysWritten writeWork(RecordType, uint32_t count, @@ -155,6 +155,9 @@ class WorkLoad { void clusterNodes(); void clusterSlots(); void addSlots(const std::string& slotsBuff); + void delSlots(const std::string& slotsBuff); + void addSlotsRange(uint32_t start, uint32_t end); + void delSlotsRange(uint32_t start, uint32_t end); void replicate(const std::string& nodeName); bool manualFailover(); void lockDb(mstime_t locktime);