Skip to content
6 changes: 6 additions & 0 deletions BedrockCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ BedrockCommand::BedrockCommand(SQLiteCommand&& baseCommand, BedrockPlugin* plugi
}
}
_commandCount++;
if (_plugin) {
_plugin->activeCommandCount++;
}
}

const string& BedrockCommand::getName() const
Expand Down Expand Up @@ -88,6 +91,9 @@ BedrockCommand::~BedrockCommand()
if (destructionCallback) {
(*destructionCallback)();
}
if (_plugin) {
_plugin->activeCommandCount--;
}
_commandCount--;
}

Expand Down
2 changes: 2 additions & 0 deletions BedrockPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include "BedrockServer.h"

map<string, function<BedrockPlugin* (BedrockServer&)>> BedrockPlugin::g_registeredPluginList;
map<string, void*> BedrockPlugin::g_pluginDLHandles;
map<string, string> BedrockPlugin::g_pluginPaths;

BedrockPlugin::BedrockPlugin(BedrockServer& s) : server(s)
{
Expand Down
7 changes: 7 additions & 0 deletions BedrockPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ class BedrockPlugin {
// Map of plugin names to functions that will return a new plugin of the given type.
static map<string, function<BedrockPlugin* (BedrockServer&)>> g_registeredPluginList;

// dlopen handles and filesystem paths for dynamically loaded plugins (keyed by UPPER(name)).
static map<string, void*> g_pluginDLHandles;
static map<string, string> g_pluginPaths;

// Tracks the number of in-flight commands owned by this plugin instance.
atomic<size_t> activeCommandCount{0};

// Reference to the BedrockServer object that owns this plugin.
BedrockServer& server;
};
345 changes: 341 additions & 4 deletions BedrockServer.cpp

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions BedrockServer.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#pragma once
#include <chrono>
#include <shared_mutex>
#include <libstuff/libstuff.h>
#include <sqlitecluster/SQLiteNode.h>
#include <sqlitecluster/SQLiteServer.h>
Expand Down Expand Up @@ -437,6 +438,13 @@ class BedrockServer : public SQLiteServer {
// Whether or not all plugins are detached
bool _pluginsDetached;

// Plugin hot-reload support. _pluginReloadInProgress is checked by getCommandFromPlugins() to reject
// commands for the plugin being reloaded. _reloadingPluginName is the UPPER-cased name of that plugin.
// _pluginsMutex protects the plugins map during the brief swap window.
atomic<bool> _pluginReloadInProgress{false};
string _reloadingPluginName;
shared_mutex _pluginsMutex;

// This is a snapshot of the state of the node taken at the beginning of any call to peekCommand or processCommand
// so that the state can't change for the lifetime of that call, from the view of that function.
static thread_local atomic<SQLiteNodeState> _nodeStateSnapshot;
Expand Down
1 change: 1 addition & 0 deletions libstuff/SLog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ static set<string> PARAMS_WHITELIST = {
"logParam",
"message",
"peer",
"PluginName",
"prepareElapsed",
"query",
"readElapsed",
Expand Down
18 changes: 18 additions & 0 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
///
#include <dlfcn.h>
#include <iostream>
#include <link.h>
#include <signal.h>
#include <sys/resource.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -145,6 +146,23 @@ set<string> loadPlugins(SData& args)
} else {
// Call the plugin registration function with the same name.
BedrockPlugin::g_registeredPluginList.emplace(make_pair(SToUpper(name), (BedrockPlugin * (*)(BedrockServer&)) sym));
BedrockPlugin::g_pluginDLHandles[SToUpper(name)] = lib;

// Resolve the full filesystem path of the loaded .so via dlinfo, since
// pluginName may be a bare filename (e.g., "auth.so") that dlopen resolved
// through the standard library search path.
string resolvedPath = pluginName;
struct link_map* lm = nullptr;
if (dlinfo(lib, RTLD_DI_LINKMAP, &lm) == 0 && lm && lm->l_name[0]) {
char* rp = realpath(lm->l_name, nullptr);
if (rp) {
resolvedPath = rp;
free(rp);
} else {
resolvedPath = lm->l_name;
}
}
BedrockPlugin::g_pluginPaths[SToUpper(name)] = resolvedPath;
}
}
}
Expand Down
151 changes: 151 additions & 0 deletions test/clustertest/tests/ReloadPluginTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#include <libstuff/SData.h>
#include <test/lib/BedrockTester.h>
#include <test/clustertest/BedrockClusterTester.h>

struct ReloadPluginTest : tpunit::TestFixture {
ReloadPluginTest()
: tpunit::TestFixture("ReloadPlugin",
TEST(ReloadPluginTest::happyPathReload),
TEST(ReloadPluginTest::reloadDrainsInflightCommands),
TEST(ReloadPluginTest::reloadBuiltinPluginRejected),
TEST(ReloadPluginTest::reloadNonexistentPluginRejected),
TEST(ReloadPluginTest::reloadPreservesDBState),
TEST(ReloadPluginTest::reloadOnFollower),
TEST(ReloadPluginTest::doubleReloadSerializes))
{
}

// Helper to send ReloadPlugin control command
string sendReloadPlugin(BedrockTester& tester, const string& pluginName) {
SData command("ReloadPlugin");
command["PluginName"] = pluginName;
return tester.executeWaitVerifyContent(command, "", true);
}

void happyPathReload() {
// Start a 3-node cluster with the test plugin
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

// Verify plugin works before reload
SData cmd("testcommand");
leader.executeWaitVerifyContent(cmd, "200");

// Reload the plugin
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
leader.executeWaitVerifyContent(reloadCmd, "200", true);

// Verify plugin still works after reload
SData cmd2("testcommand");
leader.executeWaitVerifyContent(cmd2, "200");
}

void reloadDrainsInflightCommands() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

// Start a slow query that will take a few seconds
SData slowCmd("slowquery");
slowCmd["size"] = "5000000";

// Send the slow command in a background thread
thread slowThread([&]() {
leader.executeWaitVerifyContent(slowCmd, "200");
});

// Give it a moment to start processing
usleep(100'000);

// Send reload - it should wait for the slow query to finish
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
leader.executeWaitVerifyContent(reloadCmd, "200", true);

slowThread.join();

// Verify commands work after reload
SData cmd("testcommand");
leader.executeWaitVerifyContent(cmd, "200");
}

void reloadBuiltinPluginRejected() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

SData cmd("ReloadPlugin");
cmd["PluginName"] = "DB";
leader.executeWaitVerifyContent(cmd, "400", true);
}

void reloadNonexistentPluginRejected() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

SData cmd("ReloadPlugin");
cmd["PluginName"] = "FAKEPLUGIN";
leader.executeWaitVerifyContent(cmd, "400", true);
}

void reloadPreservesDBState() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

// Insert some data
SData insertCmd("testquery");
insertCmd["Query"] = "INSERT INTO test (id, value) VALUES (12345, 'reload_test_data');";
leader.executeWaitVerifyContent(insertCmd, "200");

// Reload the plugin
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
leader.executeWaitVerifyContent(reloadCmd, "200", true);

// Verify data persists after reload
SData selectCmd("testquery");
selectCmd["Query"] = "SELECT value FROM test WHERE id = 12345;";
leader.executeWaitVerifyContent(selectCmd, "200");
}

void reloadOnFollower() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& follower = tester.getTester(1);

// Wait for the follower to be in FOLLOWING state
follower.waitForState("FOLLOWING");

// Reload on the follower
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
follower.executeWaitVerifyContent(reloadCmd, "200", true);

// Verify commands still work on the follower
SData cmd("testcommand");
follower.executeWaitVerifyContent(cmd, "200");
}

void doubleReloadSerializes() {
BedrockClusterTester tester(ClusterSize::THREE_NODE_CLUSTER);
BedrockTester& leader = tester.getTester(0);

// Send two reload commands concurrently
thread t1([&]() {
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
leader.executeWaitVerifyContent(reloadCmd, "", true);
});
thread t2([&]() {
SData reloadCmd("ReloadPlugin");
reloadCmd["PluginName"] = "TESTPLUGIN";
leader.executeWaitVerifyContent(reloadCmd, "", true);
});

t1.join();
t2.join();

// Server should still be stable
SData cmd("testcommand");
leader.executeWaitVerifyContent(cmd, "200");
}

} __ReloadPluginTest;
Loading