Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 84 additions & 21 deletions sdk/src/signaling_device/src/signaling_channel_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

#include <cstdint>
#include <exception>
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include <utility>

Expand All @@ -32,7 +34,12 @@ void SignalingChannelImpl::handleMessage(const nlohmann::json& msg) {
NABTO_SIGNALING_LOGD << "Handling DATA";
sendAck(msg);
const auto& str = msg.at("data");
for (const auto& [id, handler] : messageHandlers_) {
std::map<MessageListenerId, SignalingMessageHandler> messageHandlers;
{
const std::lock_guard<std::mutex> lock(mutex_);
messageHandlers = messageHandlers_;
}
for (const auto& [id, handler] : messageHandlers) {
handler(str);
}
} else if (type == "ACK") {
Expand All @@ -48,15 +55,19 @@ void SignalingChannelImpl::handleMessage(const nlohmann::json& msg) {
}

void SignalingChannelImpl::wsClosed() {
for (const auto& [id, handler] : stateHandlers_) {
handler(SignalingChannelState::CLOSED);
}
changeState(SignalingChannelState::CLOSED);
const std::lock_guard<std::mutex> lock(mutex_);
messageHandlers_.clear();
stateHandlers_.clear();
errorHandlers_.clear();
}

void SignalingChannelImpl::sendMessage(const nlohmann::json& message) {
const std::lock_guard<std::mutex> lock(mutex_);
if (stateIsEnded()) {
NABTO_SIGNALING_LOGE << "sendMessage called from invalid state";
return;
}
const nlohmann::json root = {
{"type", "DATA"}, {"seq", sendSeq_}, {"data", message}};
sendSeq_++;
Expand All @@ -65,16 +76,26 @@ void SignalingChannelImpl::sendMessage(const nlohmann::json& message) {
}

void SignalingChannelImpl::sendError(const SignalingError& error) {
signaler_->websocketSendError(channelId_, error);
{
const std::lock_guard<std::mutex> lock(mutex_);
if (stateIsEnded()) {
NABTO_SIGNALING_LOGE << "sendError called from invalid state";
return;
}
signaler_->websocketSendError(channelId_, error);
}
changeState(SignalingChannelState::FAILED);
}

void SignalingChannelImpl::sendAck(const nlohmann::json& msg) {
const nlohmann::json ack = {{"type", "ACK"},
{"seq", msg.at("seq").get<uint32_t>()}};
const std::lock_guard<std::mutex> lock(mutex_);
signaler_->websocketSendMessage(channelId_, ack);
}

void SignalingChannelImpl::handleAck(const nlohmann::json& msg) {
const std::lock_guard<std::mutex> lock(mutex_);
if (unackedMessages_.empty()) {
NABTO_SIGNALING_LOGE << "Got an ack but we have no unacked messages";
return;
Expand All @@ -96,34 +117,59 @@ void SignalingChannelImpl::handleAck(const nlohmann::json& msg) {
}

void SignalingChannelImpl::peerConnected() {
for (auto const& message : unackedMessages_) {
signaler_->websocketSendMessage(channelId_, message);
}
for (const auto& [id, handler] : stateHandlers_) {
handler(SignalingChannelState::ONLINE);
{
const std::lock_guard<std::mutex> lock(mutex_);

for (auto const& message : unackedMessages_) {
signaler_->websocketSendMessage(channelId_, message);
}
}
changeState(SignalingChannelState::ONLINE);
}

void SignalingChannelImpl::peerOffline() {
NABTO_SIGNALING_LOGI << "Peer: " << channelId_ << " went offline";
for (const auto& [id, handler] : stateHandlers_) {
handler(SignalingChannelState::OFFLINE);
{
const std::lock_guard<std::mutex> lock(mutex_);

NABTO_SIGNALING_LOGI << "Peer: " << channelId_ << " went offline";
}
changeState(SignalingChannelState::OFFLINE);
}

void SignalingChannelImpl::handleError(const SignalingError& error) {
NABTO_SIGNALING_LOGI << "Got error: (" << error.errorCode() << ") "
<< error.errorMessage() << " from Peer: " << channelId_;
for (const auto& [id, handler] : errorHandlers_) {
std::map<ChannelErrorListenerId, SignalingErrorHandler> errorHandlers;
{
const std::lock_guard<std::mutex> lock(mutex_);
NABTO_SIGNALING_LOGI << "Got error: (" << error.errorCode() << ") "
<< error.errorMessage()
<< " from Peer: " << channelId_;
if (stateIsEnded()) {
NABTO_SIGNALING_LOGI
<< "Got error while in error state. Not reinvoking handlers";
} else {
errorHandlers = errorHandlers_;
}
}
for (const auto& [id, handler] : errorHandlers) {
handler(error);
}
}

void SignalingChannelImpl::close() {
signaler_->channelClosed(channelId_);
messageHandlers_.clear();
stateHandlers_.clear();
errorHandlers_.clear();
{
const std::lock_guard<std::mutex> lock(mutex_);
if (stateIsEnded()) {
return;
}
}
changeState(SignalingChannelState::CLOSED);
{
const std::lock_guard<std::mutex> lock(mutex_);
signaler_->channelClosed(channelId_);
messageHandlers_.clear();
stateHandlers_.clear();
errorHandlers_.clear();
}
}

bool SignalingChannelImpl::isInitialMessage(const nlohmann::json& msg) {
Expand All @@ -136,7 +182,24 @@ bool SignalingChannelImpl::isInitialMessage(const nlohmann::json& msg) {
}
}

std::string SignalingChannelImpl::getChannelId() { return channelId_; }
std::string SignalingChannelImpl::getChannelId() {
const std::lock_guard<std::mutex> lock(mutex_);
return channelId_;
}

void SignalingChannelImpl::changeState(SignalingChannelState state) {
std::map<ChannelStateListenerId, SignalingChannelStateHandler> stateHandlers;
{
const std::lock_guard<std::mutex> lock(mutex_);
if (state != state_) {
stateHandlers = stateHandlers_;
state_ = state;
}
}
for (const auto& [id, handler] : stateHandlers) {
handler(state);
}
}

} // namespace signaling
} // namespace nabto
15 changes: 15 additions & 0 deletions sdk/src/signaling_device/src/signaling_channel_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include <cstdint>
#include <memory>
#include <mutex>
#include <string>

namespace nabto {
Expand Down Expand Up @@ -35,13 +36,15 @@ class SignalingChannelImpl
*/
MessageListenerId addMessageListener(
SignalingMessageHandler handler) override {
const std::lock_guard<std::mutex> lock(mutex_);
const MessageListenerId id = currMsgListId_;
currMsgListId_++;
messageHandlers_.insert({id, handler});
return id;
}

void removeMessageListener(MessageListenerId id) override {
const std::lock_guard<std::mutex> lock(mutex_);
messageHandlers_.erase(id);
}

Expand All @@ -52,13 +55,15 @@ class SignalingChannelImpl
*/
ChannelStateListenerId addStateChangeListener(
SignalingChannelStateHandler handler) override {
const std::lock_guard<std::mutex> lock(mutex_);
const ChannelStateListenerId id = currStateListId_;
currStateListId_++;
stateHandlers_.insert({id, handler});
return id;
};

void removeStateChangeListener(ChannelStateListenerId id) override {
const std::lock_guard<std::mutex> lock(mutex_);
stateHandlers_.erase(id);
}

Expand All @@ -69,13 +74,15 @@ class SignalingChannelImpl
*/
ChannelErrorListenerId addErrorListener(
SignalingErrorHandler handler) override {
const std::lock_guard<std::mutex> lock(mutex_);
const ChannelErrorListenerId id = currErrListId_;
currErrListId_++;
errorHandlers_.insert({id, handler});
return id;
};

void removeErrorListener(ChannelErrorListenerId id) override {
const std::lock_guard<std::mutex> lock(mutex_);
errorHandlers_.erase(id);
}

Expand Down Expand Up @@ -126,9 +133,17 @@ class SignalingChannelImpl
uint32_t recvSeq_ = 0;
uint32_t sendSeq_ = 0;
std::vector<nlohmann::json> unackedMessages_;
std::mutex mutex_;
SignalingChannelState state_ = SignalingChannelState::NEW;

void sendAck(const nlohmann::json& msg);
void handleAck(const nlohmann::json& msg);
void changeState(SignalingChannelState state);

bool stateIsEnded() {
return (state_ == SignalingChannelState::CLOSED ||
state_ == SignalingChannelState::FAILED);
}
};

} // namespace signaling
Expand Down
4 changes: 2 additions & 2 deletions sdk/src/signaling_device/src/signaling_device_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,24 +298,24 @@ void SignalingDeviceImpl::handleWsMessage(SignalingMessageType type,
NABTO_SIGNALING_LOGE
<< "Got an message for an unknown channel, but the message is "
"not an initial message. Discarding the message";
mutex_.unlock();
websocketSendError(
connId, SignalingError(SignalingErrorCode::CHANNEL_NOT_FOUND,
"Got a message for a signaling channel "
"which does not exist."));
mutex_.unlock();
return;
}
auto self = shared_from_this();
chan = SignalingChannelImpl::create(self, connId);
channels_.insert(std::make_pair(connId, chan));

if (chanHandlers_.empty()) {
mutex_.unlock();
websocketSendError(
connId,
SignalingError(
SignalingErrorCode::INTERNAL_ERROR,
"No NewChannelHandler was set, dropping the channel."));
mutex_.unlock();
return;
}
auto chanHandlers = chanHandlers_;
Expand Down