Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions Source/HiveMQtt/Client/Connection/ConnectionManagerTasks.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,11 @@ private async Task ConnectionMonitorAsync(CancellationToken cancellationToken)
{
try
{
// Capture state once to avoid race conditions
var currentState = this.State;

// If connected and no recent packets have been sent, send a ping
if (this.State == ConnectState.Connected)
if (currentState == ConnectState.Connected)
{
if (this.Client.Options.KeepAlive > 0 && this.lastCommunicationTimer.Elapsed > TimeSpan.FromSeconds(keepAlivePeriod))
{
Expand Down Expand Up @@ -144,7 +147,9 @@ private async Task ConnectionMonitorAsync(CancellationToken cancellationToken)
Logger.Error($"{this.Client.Options.ClientId}-(CM)- Exception: {ex}");

// Handle exception gracefully - trigger disconnection and exit
if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
try
{
Expand Down Expand Up @@ -213,7 +218,9 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) =
{
Logger.Trace($"{this.Client.Options.ClientId}-(PW)- ConnectionPublishWriter: Failed to write to transport.");

if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
// This is an unexpected exit and may be due to a network failure.
Logger.Debug($"{this.Client.Options.ClientId}-(PW)- ConnectionPublishWriter: unexpected exit. Disconnecting...");
Expand All @@ -240,7 +247,9 @@ private Task ConnectionPublishWriterAsync(CancellationToken cancellationToken) =
Logger.Error($"{this.Client.Options.ClientId}-(PW)- Exception: {ex}");

// Handle exception gracefully - trigger disconnection and exit
if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
try
{
Expand Down Expand Up @@ -342,7 +351,10 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.
if (!writeSuccess)
{
Logger.Error($"{this.Client.Options.ClientId}-(W)- Write failed. Disconnecting...");
if (this.State == ConnectState.Connected)

// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
}
Expand All @@ -369,7 +381,9 @@ private Task ConnectionWriterAsync(CancellationToken cancellationToken) => Task.
Logger.Error($"{this.Client.Options.ClientId}-(W)- Exception: {ex}");

// Handle exception gracefully - trigger disconnection and exit
if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
try
{
Expand Down Expand Up @@ -408,7 +422,10 @@ private Task<bool> ConnectionReaderAsync(CancellationToken cancellationToken) =>
if (readResult.Failed)
{
Logger.Debug($"{this.Client.Options.ClientId}-(R)- ConnectionReader exiting: Read from transport failed.");
if (this.State == ConnectState.Connected)

// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
await this.HandleDisconnectionAsync(false).ConfigureAwait(false);
}
Expand Down Expand Up @@ -524,7 +541,9 @@ private Task<bool> ConnectionReaderAsync(CancellationToken cancellationToken) =>
Logger.Error($"{this.Client.Options.ClientId}-(R)- Exception: {ex}");

// Handle exception gracefully - trigger disconnection and exit
if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
try
{
Expand Down Expand Up @@ -623,7 +642,9 @@ private Task ReceivedPacketsHandlerAsync(CancellationToken cancellationToken) =>
Logger.Error($"{this.Client.Options.ClientId}-(RPH)- Exception: {ex}");

// Handle exception gracefully - trigger disconnection and exit
if (this.State == ConnectState.Connected)
// Capture state once to avoid race conditions
var currentState = this.State;
if (currentState == ConnectState.Connected)
{
try
{
Expand Down
Loading