Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/tendisplus/cluster/cluster_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ std::bitset<CLUSTER_SLOTS> ClusterNode::getSlots() const {
std::lock_guard<myMutex> lk(_mutex);
return _mySlots;
}
// Now the function is only called by ClusterState::clusterSaveNodes()

std::vector<uint16_t> ClusterNode::getSlotsVec() {
std::lock_guard<myMutex> lk(_mutex);
if (_slotsInfoIsOutOfDate) {
Expand Down
119 changes: 112 additions & 7 deletions src/tendisplus/cluster/cluster_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -981,7 +981,9 @@ bool compareNodeName(std::shared_ptr<ServerEntry> svr1,
}

// if slot set successfully , return ture
bool checkSlotInfo(std::shared_ptr<ClusterNode> node, std::string slots) {
bool checkSlotInfo(std::shared_ptr<ClusterNode> node,
std::string slots,
bool del) {
auto slotInfo = node->getSlots();
if ((slots.find('{') != std::string::npos) &&
(slots.find('}') != std::string::npos)) {
Expand All @@ -995,9 +997,12 @@ bool checkSlotInfo(std::shared_ptr<ClusterNode> node, std::string slots) {
auto end = endSlot.value();
if (start < end) {
for (size_t i = start; i < end; i++) {
if (!slotInfo.test(i)) {
if (!del && !slotInfo.test(i)) {
LOG(ERROR) << "set slot" << i << "fail";
return false;
} else if (del && slotInfo.test(i)) {
LOG(ERROR) << "del slot" << i << "fail";
return false;
}
}
return true;
Expand Down Expand Up @@ -1237,7 +1242,7 @@ TEST(Cluster, Random_Meet) {
servers.clear();
}

TEST(Cluster, AddSlot) {
void ChangeSlotsTest() {
std::vector<std::string> dirs = {"node1", "node2"};
uint32_t startPort = 16300;

Expand All @@ -1264,34 +1269,129 @@ TEST(Cluster, AddSlot) {
WorkLoad work1(node1, sess1);
work1.init();

auto ctx2 = std::make_shared<asio::io_context>();
auto sess2 = makeSession(node2, ctx2);
WorkLoad work2(node2, sess2);
work2.init();

work1.clusterMeet(node2->getParams()->bindIp, node2->getParams()->port);
waitClusterMeetEnd(servers);

// addslots test
std::vector<std::string> slots = {"{0..8000}", "{8001..16383}"};

work1.addSlots(slots[0]);
std::this_thread::sleep_for(std::chrono::seconds(10));

work2.addSlots(slots[1]);
std::this_thread::sleep_for(std::chrono::seconds(10));

for (size_t i = 0; i < slots.size(); i++) {
auto nodePtr =
servers[i]->getClusterMgr()->getClusterState()->getMyselfNode();
bool s = checkSlotInfo(nodePtr, slots[i], false);
EXPECT_TRUE(s);
}

std::this_thread::sleep_for(std::chrono::seconds(10));
for (auto svr : servers) {
compareClusterInfo(svr, node1);
}
// delslots test
std::vector<std::string> delslots = {"{4000..8000}", "{15000..16383}"};
work1.delSlots(delslots[0]);
std::this_thread::sleep_for(std::chrono::seconds(10));
work2.delSlots(delslots[1]);
std::this_thread::sleep_for(std::chrono::seconds(10));
for (size_t i = 0; i < delslots.size(); i++) {
auto nodePtr =
servers[i]->getClusterMgr()->getClusterState()->getMyselfNode();
bool s = checkSlotInfo(nodePtr, delslots[i], true);
EXPECT_TRUE(s);
}
std::this_thread::sleep_for(std::chrono::seconds(10));

#ifndef _WIN32
for (auto svr : servers) {
svr->stop();
LOG(INFO) << "stop " << svr->getParams()->port << " success";
}
#endif
servers.clear();
}

void RangeChangeSlotsTest() {
std::vector<std::string> dirs = {"node1", "node2"};
uint32_t startPort = 17300;

const auto guard = MakeGuard([dirs] {
for (auto dir : dirs) {
destroyEnv(dir);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
});

std::vector<std::shared_ptr<ServerEntry>> servers;

uint32_t index = 0;
for (auto dir : dirs) {
uint32_t nodePort = startPort + index++;
servers.emplace_back(std::move(makeClusterNode(dir, nodePort, storeCnt)));
}

auto& node1 = servers[0];
auto& node2 = servers[1];

auto ctx1 = std::make_shared<asio::io_context>();
auto sess1 = makeSession(node1, ctx1);
WorkLoad work1(node1, sess1);
work1.init();
auto ctx2 = std::make_shared<asio::io_context>();
auto sess2 = makeSession(node2, ctx2);
WorkLoad work2(node2, sess2);
work2.init();
work2.addSlots(slots[1]);

work1.clusterMeet(node2->getParams()->bindIp, node2->getParams()->port);
waitClusterMeetEnd(servers);

// addslotsrange test
std::vector<std::pair<uint32_t, uint32_t>> slots = {{0, 8000}, {8001, 16383}};
work1.addSlotsRange(slots[0].first, slots[0].second);
std::this_thread::sleep_for(std::chrono::seconds(10));

work2.addSlotsRange(slots[1].first, slots[1].second);
std::this_thread::sleep_for(std::chrono::seconds(10));
for (size_t i = 0; i < slots.size(); i++) {
auto nodePtr =
servers[i]->getClusterMgr()->getClusterState()->getMyselfNode();
bool s = checkSlotInfo(nodePtr, slots[i]);
bool s = checkSlotInfo(nodePtr,
"{" + std::to_string(slots[i].first) + ".." +
std::to_string(slots[i].second) + "}",
false);
EXPECT_TRUE(s);
}

std::this_thread::sleep_for(std::chrono::seconds(10));
for (auto svr : servers) {
compareClusterInfo(svr, node1);
}
// delslotsrange test
std::vector<std::pair<uint32_t, uint32_t>> delslots = {{4000, 8000},
{15000, 16383}};
work1.delSlotsRange(delslots[0].first, delslots[0].second);

std::this_thread::sleep_for(std::chrono::seconds(10));

work2.delSlotsRange(delslots[1].first, delslots[1].second);
std::this_thread::sleep_for(std::chrono::seconds(10));
for (size_t i = 0; i < delslots.size(); i++) {
auto nodePtr =
servers[i]->getClusterMgr()->getClusterState()->getMyselfNode();
bool s = checkSlotInfo(nodePtr,
"{" + std::to_string(delslots[i].first) + ".." +
std::to_string(delslots[i].second) + "}",
true);
EXPECT_TRUE(s);
}
std::this_thread::sleep_for(std::chrono::seconds(10));
#ifndef _WIN32
for (auto svr : servers) {
svr->stop();
Expand All @@ -1301,6 +1401,11 @@ TEST(Cluster, AddSlot) {
servers.clear();
}

TEST(Cluster, ChangeSlot) {
ChangeSlotsTest();
RangeChangeSlotsTest();
}

bool nodeIsMySlave(std::shared_ptr<ServerEntry> svr1,
std::shared_ptr<ServerEntry> svr2) {
if (svr1->getParams()->clusterEnabled && svr2->getParams()->clusterEnabled) {
Expand Down
165 changes: 120 additions & 45 deletions src/tendisplus/commands/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ class ClusterCommand : public Command {
Expected<int64_t> exptSlot = ::tendisplus::stoll(vs);
RET_IF_ERR_EXPECTED(exptSlot);

int32_t slot = (int32_t)exptSlot.value();
int32_t slot = static_cast<int32_t>(exptSlot.value());

if (slot > CLUSTER_SLOTS - 1 || slot < 0) {
LOG(ERROR) << "slot" << slot
Expand Down Expand Up @@ -335,48 +335,17 @@ class ClusterCommand : public Command {
return {ErrorCodes::ERR_CLUSTER, "Invalid cluster nodes info"};
}
} else if ((arg1 == "addslots" || arg1 == "delslots") && argSize >= 3) {
if (myself->nodeIsSlave()) {
return {ErrorCodes::ERR_CLUSTER,
"slave node can not be addslot or delslot"};
}
if (myself->nodeIsArbiter()) {
return {ErrorCodes::ERR_CLUSTER, "Can not add/del slots on arbiter."};
Status s = processSlotRange(args, 2, arg1, svr, clusterState, myself);
if (!s.ok()) {
return s;
}
for (size_t i = 2; i < argSize; ++i) {
if ((args[i].find('{') != std::string::npos) &&
(args[i].find('}') != std::string::npos)) {
auto eRange = getSlotRange(args[i]);

RET_IF_ERR_EXPECTED(eRange);
uint32_t start = eRange.value().first;
uint32_t end = eRange.value().second;

if (svr->getParams()->clusterSingleNode &&
(end - start) != (CLUSTER_SLOTS - 1)) {
return {
ErrorCodes::ERR_CLUSTER,
"You can only addslot 0..16383 when cluster-single-node is on"};
}

Status s = changeSlots(start, end, arg1, svr, clusterState, myself);
if (!s.ok()) {
LOG(ERROR) << "addslots fail from:" << start << "to:" << end;
return s;
}
} else {
auto slotInfo = ::tendisplus::stoul(args[i]);

RET_IF_ERR_EXPECTED(slotInfo);
uint32_t slot = static_cast<uint32_t>(slotInfo.value());
Status s = changeSlot(slot, arg1, svr, clusterState, myself);
if (!s.ok()) {
LOG(ERROR) << "addslots:" << slot << "fail";
return s;
}
}
return Command::fmtOK();
} else if ((arg1 == "addslotsrange" || arg1 == "delslotsrange") &&
argSize >= 4) {
Status s = processSlotRange(args, 2, arg1, svr, clusterState, myself);
if (!s.ok()) {
return s;
}
clusterState->clusterSaveNodes();
clusterState->clusterUpdateState();
return Command::fmtOK();
} else if (arg1 == "replicate" && argSize == 3) {
if (!svr->getParams()->binlogEnabled) {
Expand Down Expand Up @@ -445,7 +414,7 @@ class ClusterCommand : public Command {
return {ErrorCodes::ERR_CLUSTER, "keyslot invalid!"};
}
uint32_t hash =
uint32_t(redis_port::keyHashSlot(key.c_str(), key.size()));
static_cast<int32_t>(redis_port::keyHashSlot(key.c_str(), key.size()));
return Command::fmtBulk(std::to_string(hash));
} else if (arg1 == "info" && argSize == 2) {
std::string clusterInfo = clusterState->clusterGenStateDescription();
Expand Down Expand Up @@ -687,7 +656,7 @@ class ClusterCommand : public Command {
if (start < end) {
for (size_t i = start; i < end + 1; i++) {
uint32_t index = static_cast<uint32_t>(i);
if (arg == "addslots") {
if (arg == "addslots" || arg == "addslotsrange") {
if (clusterState->_allSlots[index] != nullptr) {
LOG(ERROR) << "slot" << index << "already busy";
continue;
Expand All @@ -713,7 +682,7 @@ class ClusterCommand : public Command {
<< "to:" << end;
return {ErrorCodes::ERR_CLUSTER, "ERR Invalid or out of range slot"};
}
return {ErrorCodes::ERR_OK, "finish addslots"};
return {ErrorCodes::ERR_OK, "finish change slots"};
}

Status changeSlot(uint32_t slot,
Expand Down Expand Up @@ -742,7 +711,113 @@ class ClusterCommand : public Command {
if (result == false) {
return {ErrorCodes::ERR_CLUSTER, "del or add slot fail"};
}
return {ErrorCodes::ERR_OK, "finish add sigle slot"};
return {ErrorCodes::ERR_OK, "finish change sigle slot"};
}

Status processSlotRange(const std::vector<std::string>& args,
size_t startIndex,
const std::string& cmd,
ServerEntry* svr,
const std::shared_ptr<ClusterState> clusterState,
const CNodePtr myself) {
// Check node role
if (myself->nodeIsSlave()) {
return {ErrorCodes::ERR_CLUSTER, "slave node cannot add or delete slots"};
}
if (myself->nodeIsArbiter()) {
return {ErrorCodes::ERR_CLUSTER, "Cannot add or delete slots on arbiter"};
}

// Parse and process slot ranges or individual slots
if (cmd == "addslots" || cmd == "delslots") {
for (size_t i = startIndex; i < args.size(); ++i) {
if ((args[i].find('{') != std::string::npos) &&
(args[i].find('}') != std::string::npos)) {
// Handle slot range
auto eRange = getSlotRange(args[i]);
RET_IF_ERR_EXPECTED(eRange);
uint32_t start = eRange.value().first;
uint32_t end = eRange.value().second;

// Validate and process the slot range
Status s = processSlotRangeInternal(
start, end, cmd, svr, clusterState, myself);
if (!s.ok()) {
return s;
}
} else {
// Handle single slot
auto slotInfo = ::tendisplus::stoul(args[i]);
RET_IF_ERR_EXPECTED(slotInfo);
uint32_t slot = static_cast<uint32_t>(slotInfo.value());

// Validate and process the single slot
Status s = changeSlot(slot, cmd, svr, clusterState, myself);
if (!s.ok()) {
LOG(ERROR) << cmd << ":" << slot << " failed";
return s;
}
}
}
} else if (cmd == "addslotsrange" || cmd == "delslotsrange") {
if ((args.size() - startIndex) % 2 != 0) {
return {ErrorCodes::ERR_CLUSTER, "Invalid number of arguments"};
}

for (size_t i = startIndex; i < args.size(); i += 2) {
auto startInfo = ::tendisplus::stoul(args[i]);
RET_IF_ERR_EXPECTED(startInfo);
auto endInfo = ::tendisplus::stoul(args[i + 1]);
RET_IF_ERR_EXPECTED(endInfo);
uint32_t start = static_cast<uint32_t>(startInfo.value());
uint32_t end = static_cast<uint32_t>(endInfo.value());

// Validate and process the slot range
Status s =
processSlotRangeInternal(start, end, cmd, svr, clusterState, myself);
if (!s.ok()) {
return s;
}
}
} else {
return {ErrorCodes::ERR_CLUSTER, "Invalid command for slot processing"};
}

// Save and update cluster state
clusterState->clusterSaveNodes();
clusterState->clusterUpdateState();

return {ErrorCodes::ERR_OK, "All slots processed successfully"};
}

Status processSlotRangeInternal(
uint32_t start,
uint32_t end,
const std::string& cmd,
ServerEntry* svr,
const std::shared_ptr<ClusterState> clusterState,
const CNodePtr myself) {
// Validate slot range
if (start > CLUSTER_SLOTS - 1 || end > CLUSTER_SLOTS - 1 || start > end) {
return {ErrorCodes::ERR_CLUSTER, "Invalid slot range"};
}

// Check cluster-single-node mode
if (svr->getParams()->clusterSingleNode &&
(end - start) != (CLUSTER_SLOTS - 1)) {
return {ErrorCodes::ERR_CLUSTER,
"You can only addslots/delslots[range] from 0 to 16383 when "
"cluster-single-node is on"};
}

// Use changeSlots to process the range
Status s = changeSlots(start, end, cmd, svr, clusterState, myself);
if (!s.ok()) {
LOG(ERROR) << cmd << " failed for range: " << start << " to " << end;
return s;
}

return {ErrorCodes::ERR_OK, "Slot range processed successfully"};
}

std::string getKeys(ServerEntry* svr, uint32_t slot, uint32_t count) {
Expand Down
Loading