Skip to content

Commit 25793be

Browse files
committed
support BRPOP/BLPOP command
1 parent 17a822d commit 25793be

File tree

11 files changed

+737
-4
lines changed

11 files changed

+737
-4
lines changed

src/tendisplus/commands/command_test.cpp

Lines changed: 210 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,6 +1392,216 @@ TEST(Command, testObject) {
13921392
#endif
13931393
}
13941394

1395+
void testBrpop(std::shared_ptr<ServerEntry> svr) {
1396+
asio::io_context ioContext;
1397+
asio::ip::tcp::socket socket(ioContext), socket1(ioContext);
1398+
NetSession sess(svr, std::move(socket), 1, false, nullptr, nullptr);
1399+
sess.setArgs({"lpush", "list", "a"});
1400+
auto expect = Command::runSessionCmd(&sess);
1401+
EXPECT_TRUE(expect.ok());
1402+
sess.setArgs({"brpop", "list", "10"});
1403+
expect = Command::runSessionCmd(&sess);
1404+
EXPECT_TRUE(expect.ok());
1405+
}
1406+
1407+
TEST(Command, brpop) {
1408+
const auto guard = MakeGuard([] { destroyEnv(); });
1409+
1410+
EXPECT_TRUE(setupEnv());
1411+
1412+
auto cfg = makeServerParam();
1413+
auto server = makeServerEntry(cfg);
1414+
1415+
testBrpop(server);
1416+
1417+
#ifndef _WIN32
1418+
server->stop();
1419+
EXPECT_EQ(server.use_count(), 1);
1420+
#endif
1421+
}
1422+
1423+
void testBlockCommand(std::shared_ptr<ServerEntry> svr) {
1424+
asio::io_context ioContext;
1425+
asio::ip::tcp::socket socket(ioContext), socket1(ioContext),
1426+
socket2(ioContext);
1427+
auto sess = std::make_shared<NetSession>(
1428+
svr, std::move(socket), 1, false, nullptr, nullptr);
1429+
auto sess1 = std::make_shared<NetSession>(
1430+
svr, std::move(socket1), 2, false, nullptr, nullptr);
1431+
auto sess2 = std::make_shared<NetSession>(
1432+
svr, std::move(socket2), 3, false, nullptr, nullptr);
1433+
svr->addSession(sess);
1434+
svr->addSession(sess1);
1435+
svr->addSession(sess2);
1436+
{
1437+
sess->setArgs({"blpop", "list1", "1"});
1438+
auto expect = Command::runSessionCmd(sess.get());
1439+
EXPECT_EQ(sess->isBlocked(), true);
1440+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1441+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
1442+
EXPECT_EQ(sess->isBlocked(), true);
1443+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
1444+
EXPECT_EQ(sess->isBlocked(), true);
1445+
std::this_thread::sleep_for(std::chrono::milliseconds(400));
1446+
EXPECT_EQ(sess->isBlocked(), false);
1447+
}
1448+
{
1449+
sess->setArgs({"blpop", "list1", "0"});
1450+
auto expect = Command::runSessionCmd(sess.get());
1451+
EXPECT_EQ(sess->isBlocked(), true);
1452+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1453+
sess1->setArgs({"rpush", "list1", "a"});
1454+
expect = Command::runSessionCmd(sess1.get());
1455+
std::this_thread::sleep_for(std::chrono::seconds(1));
1456+
EXPECT_EQ(sess->isBlocked(), false);
1457+
}
1458+
{
1459+
sess2->setArgs({"brpop", "list1", "0"});
1460+
auto expect = Command::runSessionCmd(sess2.get());
1461+
EXPECT_EQ(sess2->isBlocked(), true);
1462+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1463+
sess->setArgs({"brpop", "list1", "0"});
1464+
expect = Command::runSessionCmd(sess.get());
1465+
EXPECT_EQ(sess->isBlocked(), true);
1466+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1467+
1468+
svr->endSession(sess2->id());
1469+
sess1->setArgs({"lpush", "list1", "a"});
1470+
expect = Command::runSessionCmd(sess1.get());
1471+
std::this_thread::sleep_for(std::chrono::seconds(1));
1472+
EXPECT_EQ(sess->isBlocked(), false);
1473+
}
1474+
}
1475+
1476+
TEST(Command, testBlockCommand) {
1477+
const auto guard = MakeGuard([] { destroyEnv(); });
1478+
1479+
EXPECT_TRUE(setupEnv());
1480+
1481+
auto cfg = makeServerParam();
1482+
auto server = makeServerEntry(cfg);
1483+
testBlockCommand(server);
1484+
1485+
#ifndef _WIN32
1486+
server->stop();
1487+
EXPECT_EQ(server.use_count(), 1);
1488+
#endif
1489+
}
1490+
1491+
TEST(Command, BlockCommand) {
1492+
const auto guard = MakeGuard([] { destroyEnv(); });
1493+
1494+
EXPECT_TRUE(setupEnv());
1495+
1496+
auto cfg = makeServerParam();
1497+
auto server = makeServerEntry(cfg);
1498+
asio::io_context ioContext;
1499+
asio::ip::tcp::socket socket(ioContext), socket1(ioContext),
1500+
socket2(ioContext);
1501+
auto sess = std::make_shared<NetSession>(
1502+
server, std::move(socket), 1, false, nullptr, nullptr);
1503+
auto sess1 = std::make_shared<NetSession>(
1504+
server, std::move(socket1), 2, false, nullptr, nullptr);
1505+
auto sess2 = std::make_shared<NetSession>(
1506+
server, std::move(socket2), 3, false, nullptr, nullptr);
1507+
server->addSession(sess);
1508+
server->addSession(sess1);
1509+
server->addSession(sess2);
1510+
// test blpop case 1: block command nerver block permanently due to lost
1511+
// wakeup
1512+
for (int i = 0; i < 20; i++) {
1513+
sess1->setArgs({"brpop", "list1", "list2", "0"});
1514+
auto expect = Command::runSessionCmd(sess1.get());
1515+
EXPECT_EQ(sess1->isBlocked(), true);
1516+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1517+
sess2->setArgs({"brpop", "list1", "0"});
1518+
expect = Command::runSessionCmd(sess2.get());
1519+
EXPECT_EQ(sess2->isBlocked(), true);
1520+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1521+
sess->setArgs({"lpush", "list2", "b"});
1522+
expect = Command::runSessionCmd(sess.get());
1523+
sess->setArgs({"lpush", "list1", "a"});
1524+
expect = Command::runSessionCmd(sess.get());
1525+
std::this_thread::sleep_for(std::chrono::milliseconds(5));
1526+
EXPECT_EQ(sess1->isBlocked(), false);
1527+
EXPECT_EQ(sess2->isBlocked(), false);
1528+
}
1529+
// test blpop case 2: block command nerver block session on the same key twice
1530+
for (int i = 0; i < 20; i++) {
1531+
sess1->setArgs({"brpop", "list1", "list1", "0"});
1532+
auto expect = Command::runSessionCmd(sess1.get());
1533+
EXPECT_EQ(sess1->isBlocked(), true);
1534+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1535+
sess2->setArgs({"brpop", "list1", "0"});
1536+
expect = Command::runSessionCmd(sess2.get());
1537+
EXPECT_EQ(sess2->isBlocked(), true);
1538+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1539+
sess->setArgs({"lpush", "list1", "a", "b"});
1540+
expect = Command::runSessionCmd(sess.get());
1541+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1542+
EXPECT_EQ(sess1->isBlocked(), false);
1543+
EXPECT_EQ(sess2->isBlocked(), false);
1544+
}
1545+
// test blpop case 3: block command will be waked up by any key which is in
1546+
// the key list
1547+
for (int i = 0; i < 20; i++) {
1548+
sess1->setArgs({"brpop", "list1", "list2", "0"});
1549+
auto expect = Command::runSessionCmd(sess1.get());
1550+
EXPECT_EQ(sess1->isBlocked(), true);
1551+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1552+
auto key = genRand() % 2 ? "list1" : "list2";
1553+
sess->setArgs({"lpush", key, "a"});
1554+
expect = Command::runSessionCmd(sess.get());
1555+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1556+
EXPECT_EQ(sess1->isBlocked(), false);
1557+
}
1558+
// test blpop case 4: block command will not be waked up by any key when key
1559+
// is was poped
1560+
for (int i = 0; i < 20; i++) {
1561+
sess1->setArgs({"brpop", "list1", "list2", "0"});
1562+
auto expect = Command::runSessionCmd(sess1.get());
1563+
EXPECT_EQ(sess1->isBlocked(), true);
1564+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1565+
auto key = genRand() % 2 ? "list1" : "list2";
1566+
{
1567+
std::lock_guard<std::mutex> lock(sess1->_mtx);
1568+
sess->setArgs({"lpush", key, "a"});
1569+
expect = Command::runSessionCmd(sess.get());
1570+
sess->setArgs({"lpop", key});
1571+
expect = Command::runSessionCmd(sess.get());
1572+
}
1573+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1574+
EXPECT_EQ(sess1->isBlocked(), true);
1575+
sess->setArgs({"lpush", key, "a"});
1576+
expect = Command::runSessionCmd(sess.get());
1577+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1578+
EXPECT_EQ(sess1->isBlocked(), false);
1579+
}
1580+
// test blpop case 5: block command will be timeout or wakeup
1581+
for (int i = 0; i < 20; i++) {
1582+
sess1->setArgs({"brpop", "list1", "list2", "1"});
1583+
auto expect = Command::runSessionCmd(sess1.get());
1584+
EXPECT_EQ(sess1->isBlocked(), true);
1585+
EXPECT_EQ(expect.status().code(), ErrorCodes::ERR_BLOCKCMD);
1586+
auto key = genRand() % 2 ? "list1" : "list2";
1587+
{
1588+
std::lock_guard<std::mutex> lock(sess1->_mtx);
1589+
sess->setArgs({"lpush", key, "a"});
1590+
expect = Command::runSessionCmd(sess.get());
1591+
std::this_thread::sleep_for(std::chrono::milliseconds(995));
1592+
}
1593+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
1594+
EXPECT_EQ(sess1->isBlocked(), false);
1595+
sess->setArgs({"lpop", key});
1596+
expect = Command::runSessionCmd(sess.get());
1597+
}
1598+
#ifndef _WIN32
1599+
server->stop();
1600+
EXPECT_EQ(server.use_count(), 1);
1601+
#endif
1602+
}
1603+
1604+
13951605
void testRenameCommand(std::shared_ptr<ServerEntry> svr) {
13961606
asio::io_context ioContext;
13971607
asio::ip::tcp::socket socket(ioContext), socket1(ioContext);

src/tendisplus/commands/list.cpp

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -295,6 +295,125 @@ class RPopCommand : public ListPopWrapper {
295295
RPopCommand() : ListPopWrapper(ListPos::LP_TAIL, "wF") {}
296296
} rpopCommand;
297297

298+
class BlockPopWrapper : public Command {
299+
public:
300+
explicit BlockPopWrapper(ListPos pos, const char* sflags)
301+
: Command(pos == ListPos::LP_HEAD ? "blpop" : "brpop", sflags), _pos(pos) {}
302+
303+
ssize_t arity() const {
304+
return -2;
305+
}
306+
307+
int32_t firstkey() const {
308+
return 1;
309+
}
310+
311+
int32_t lastkey() const {
312+
return 1;
313+
}
314+
315+
int32_t keystep() const {
316+
return 1;
317+
}
318+
319+
Expected<std::string> tryPop(Session* sess, const std::string& key) {
320+
SessionCtx* pCtx = sess->getCtx();
321+
INVARIANT(pCtx != nullptr);
322+
323+
auto server = sess->getServerEntry();
324+
auto expdb = server->getSegmentMgr()->getDbWithKeyLock(
325+
sess, key, mgl::LockMode::LOCK_X);
326+
if (!expdb.ok()) {
327+
return expdb.status();
328+
}
329+
Expected<RecordValue> rv =
330+
Command::expireKeyIfNeeded(sess, key, RecordType::RT_LIST_META);
331+
if (!rv.ok()) {
332+
return rv.status();
333+
}
334+
335+
// record exists
336+
RecordKey metaRk(expdb.value().chunkId,
337+
pCtx->getDbId(),
338+
RecordType::RT_LIST_META,
339+
key,
340+
"");
341+
PStore kvstore = expdb.value().store;
342+
343+
auto ptxn = sess->getCtx()->createTransaction(kvstore);
344+
if (!ptxn.ok()) {
345+
return ptxn.status();
346+
}
347+
Expected<std::string> s1 =
348+
genericPop(sess, kvstore, ptxn.value(), metaRk, rv, _pos);
349+
if (!s1.ok()) {
350+
return s1.status();
351+
}
352+
auto s = sess->getCtx()->commitTransaction(ptxn.value());
353+
if (s.ok()) {
354+
return Command::fmtBulk(s1.value());
355+
}
356+
return s.status();
357+
}
358+
359+
Expected<std::string> tryPop(Session* sess,
360+
const std::vector<std::string>& keys) {
361+
for (const auto& key : keys) {
362+
auto status = tryPop(sess, key);
363+
if (status.ok()) {
364+
return status;
365+
}
366+
}
367+
return {ErrorCodes::ERR_NOTFOUND, "not found"};
368+
}
369+
370+
Expected<std::string> run(Session* sess) final {
371+
const std::vector<std::string>& args = sess->getArgs();
372+
std::vector<std::string> keys;
373+
for (size_t i = 1; i + 1 < args.size(); i++) {
374+
keys.push_back(args[i]);
375+
}
376+
Expected<double> timeout = tendisplus::stod(args.back());
377+
if (!timeout.ok()) {
378+
return {timeout.status().code(), "timeout is not a valid float"};
379+
}
380+
auto status = tryPop(sess, keys);
381+
if (status.ok()) {
382+
return status;
383+
}
384+
startBlocking(sess, keys, timeout.value());
385+
return {ErrorCodes::ERR_BLOCKCMD, "block command"};
386+
}
387+
388+
void startBlocking(Session* sess,
389+
const std::vector<std::string>& keys,
390+
double timeout) {
391+
auto nSess = dynamic_cast<NetSession*>(sess);
392+
if (!nSess) {
393+
return;
394+
}
395+
auto executor = [this, nSess](const std::string& key) { return tryPop(nSess, key); };
396+
397+
auto duration_sec = std::chrono::duration<double>(timeout);
398+
auto microsec =
399+
std::chrono::duration_cast<std::chrono::microseconds>(duration_sec);
400+
nSess->pauseSession(std::move(executor), keys, microsec);
401+
}
402+
403+
private:
404+
ListPos _pos;
405+
};
406+
407+
class BLPopCommand : public BlockPopWrapper {
408+
public:
409+
BLPopCommand() : BlockPopWrapper(ListPos::LP_HEAD, "wF") {}
410+
} blpopCommand;
411+
412+
class BRPopCommand : public BlockPopWrapper {
413+
public:
414+
BRPopCommand() : BlockPopWrapper(ListPos::LP_TAIL, "wF") {}
415+
} brpopCommand;
416+
298417
class ListPushWrapper : public Command {
299418
public:
300419
explicit ListPushWrapper(const std::string& name,
@@ -368,6 +487,7 @@ class ListPushWrapper : public Command {
368487
}
369488
auto s = sess->getCtx()->commitTransaction(ptxn.value());
370489
if (s.ok()) {
490+
server->notifyKeyAvailable(key, valargs.size());
371491
return s1.value();
372492
} else if (s.status().code() != ErrorCodes::ERR_COMMIT_RETRY) {
373493
return s.status();

0 commit comments

Comments
 (0)