diff --git a/rabbitmq/src/urabbitmq/client_settings.cpp b/rabbitmq/src/urabbitmq/client_settings.cpp index 466737ff8d15..6277695f260d 100644 --- a/rabbitmq/src/urabbitmq/client_settings.cpp +++ b/rabbitmq/src/urabbitmq/client_settings.cpp @@ -28,10 +28,10 @@ std::vector ParseHosts(const formats::json::Value& secdist_doc) { const auto hosts_json = secdist_doc["hosts"]; auto hosts = hosts_json.As>(); - UINVARIANT(!hosts.empty(), "Empty list of hosts in clickhouse secdist"); + UINVARIANT(!hosts.empty(), "Empty list of hosts in rabbitmq secdist"); const auto unique_count = std::unordered_set{hosts.begin(), hosts.end()}.size(); - UINVARIANT(unique_count == hosts.size(), "Hosts are not unique in clickhouse secdist"); + UINVARIANT(unique_count == hosts.size(), "Hosts are not unique in rabbitmq secdist"); return hosts; } diff --git a/rabbitmq/src/urabbitmq/connection_pool.cpp b/rabbitmq/src/urabbitmq/connection_pool.cpp index 689387592d32..3c0ee7040e3d 100644 --- a/rabbitmq/src/urabbitmq/connection_pool.cpp +++ b/rabbitmq/src/urabbitmq/connection_pool.cpp @@ -63,7 +63,9 @@ ConnectionPool::ConnectionPool( // Already logged in base class } - monitor_.Start("connection_pool_monitor", {{kPoolMonitorInterval}}, [this] { RunMonitor(); }); + monitor_.Start("connection_pool_monitor", {{kPoolMonitorInterval}, {}, logging::Level::kDebug}, [this] { + RunMonitor(); + }); } ConnectionPool::~ConnectionPool() { diff --git a/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp b/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp index cbaf1874c01b..672471c471e5 100644 --- a/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp +++ b/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.cpp @@ -153,6 +153,23 @@ void AmqpConnectionHandler::onReady(AMQP::Connection*) { connection_ready_event_.Send(); } +uint16_t AmqpConnectionHandler::onNegotiate(AMQP::Connection* connection, uint16_t interval) { + // we accept the suggestion from the server, but if the interval is smaller + // that one minute, we will use a one minute interval instead + if (interval < 60) interval = 60; + + heartbeats_.Start( + fmt::format("rabbitmq_heartbeats_for_{}:{}", address_.hostname(), address_.port()), + std::chrono::seconds(interval), + [this, connection] { + if (!IsBroken()) connection->heartbeat(); + } + ); + + // return the interval that we want to use + return interval; +} + void AmqpConnectionHandler::OnConnectionCreated(AmqpConnection* connection, engine::Deadline deadline) { reader_.Start(connection); @@ -169,7 +186,10 @@ void AmqpConnectionHandler::OnConnectionCreated(AmqpConnection* connection, engi void AmqpConnectionHandler::OnConnectionDestruction() { reader_.Stop(); } -void AmqpConnectionHandler::Invalidate() { broken_ = true; } +void AmqpConnectionHandler::Invalidate() { + heartbeats_.Stop(); + broken_ = true; +} bool AmqpConnectionHandler::IsBroken() const { return broken_.load(); } diff --git a/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp b/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp index 31b7a8a88806..d10e6cd99c37 100644 --- a/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp +++ b/rabbitmq/src/urabbitmq/impl/amqp_connection_handler.hpp @@ -12,6 +12,8 @@ #include +#include "userver/utils/periodic_task.hpp" + USERVER_NAMESPACE_BEGIN namespace engine::io { @@ -61,6 +63,8 @@ class AmqpConnectionHandler final : public AMQP::ConnectionHandler { void onReady(AMQP::Connection* connection) override; + uint16_t onNegotiate(AMQP::Connection* connection, uint16_t interval) override; + void OnConnectionCreated(AmqpConnection* connection, engine::Deadline deadline); void OnConnectionDestruction(); @@ -90,6 +94,8 @@ class AmqpConnectionHandler final : public AMQP::ConnectionHandler { std::atomic is_ready_{false}; std::optional error_; + + utils::PeriodicTask heartbeats_; }; } // namespace impl