Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions Source/HiveMQtt/Client/Connection/ConnectionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,18 @@ public partial class ConnectionManager : IDisposable
// This is how we kill innocent and not so innocent Tasks
private CancellationTokenSource cancellationTokenSource;

// The state of the connection
internal ConnectState State { get; set; }
// The state of the connection (thread-safe using Interlocked)
private int stateValue = (int)ConnectState.Disconnected;

/// <summary>
/// Gets or sets the connection state in a thread-safe manner.
/// Uses Volatile.Read for reads and Interlocked.Exchange for writes to ensure thread safety.
/// </summary>
internal ConnectState State
{
get => (ConnectState)Volatile.Read(ref this.stateValue);
set => Interlocked.Exchange(ref this.stateValue, (int)value);
}

// The protocol specific transport layer (TCP, WebSocket, etc.)
internal BaseTransport Transport { get; set; }
Expand Down
11 changes: 7 additions & 4 deletions Source/HiveMQtt/Client/Connection/ConnectionManagerHandlers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ internal async Task HandleIncomingPubCompPacketAsync(PubCompPacket pubCompPacket
/// <returns>A task that represents the asynchronous operation.</returns>
internal async Task<bool> HandleDisconnectionAsync(bool clean = true)
{
// Thread-safe check: if already disconnected, return early
if (this.State == ConnectState.Disconnected)
{
Logger.Trace("HandleDisconnection: Already disconnected.");
Expand All @@ -388,18 +389,20 @@ internal async Task<bool> HandleDisconnectionAsync(bool clean = true)

Logger.Debug($"HandleDisconnection: Handling disconnection. clean={clean}.");

// Cancel all background tasks and close the socket
this.State = ConnectState.Disconnected;

// Reset the connection-ready signal for the next connect cycle
this.ResetConnectedSignal();

// Cancel all background tasks
// Cancel all background tasks BEFORE setting state to Disconnected
// This prevents race conditions where tasks check state after it's set but before cancellation
await this.CancelBackgroundTasksAsync().ConfigureAwait(false);

// Close the Transport
await this.Transport.CloseAsync().ConfigureAwait(false);

// Set state to Disconnected AFTER tasks are cancelled and transport is closed
// This ensures tasks see the correct state when they check during cancellation
this.State = ConnectState.Disconnected;

if (clean)
{
if (!this.SendQueue.IsEmpty)
Expand Down
Loading