Skip to content

Commit 10a0006

Browse files
authored
Performance: Improve validation path and cached connection properties (#290)
1 parent 299eb88 commit 10a0006

File tree

10 files changed

+362
-114
lines changed

10 files changed

+362
-114
lines changed

Source/HiveMQtt/Client/Connection/ConnectionManagerHandlers.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ internal void HandleIncomingConnAckPacket(ConnAckPacket connAckPacket)
4343

4444
this.ConnectionProperties = connAckPacket.Properties;
4545

46+
// Update cached connection properties for fast access during publish operations
47+
this.Client.UpdateConnectionPropertyCache(connAckPacket.Properties);
48+
4649
this.Client.OnConnAckReceivedEventLauncher(connAckPacket);
4750
}
4851

@@ -404,6 +407,9 @@ internal async Task<bool> HandleDisconnectionAsync(bool clean = true)
404407
this.State = ConnectState.Disconnected;
405408
this.ResetNotDisconnectedSignal();
406409

410+
// Clear cached connection properties since we're disconnected
411+
this.Client.UpdateConnectionPropertyCache(null);
412+
407413
if (clean)
408414
{
409415
if (!this.SendQueue.IsEmpty)

Source/HiveMQtt/Client/HiveMQClient.cs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,36 @@ public HiveMQClient(HiveMQClientOptions? options = null)
6868

6969
private SemaphoreSlim SubscriptionsSemaphore { get; } = new(1, 1);
7070

71+
// Cached connection properties for fast access during publish operations
72+
// These are updated when ConnectionProperties change to avoid repeated property access
73+
private int cachedTopicAliasMaximum;
74+
private bool cachedRetainAvailable = true; // Default to true per MQTT spec
75+
private int? cachedMaximumQoS;
76+
private bool connectionPropertiesCached;
77+
78+
/// <summary>
79+
/// Updates the cached connection properties for fast access during publish operations.
80+
/// This method is called when connection properties are established or changed.
81+
/// </summary>
82+
/// <param name="properties">The connection properties to cache.</param>
83+
internal void UpdateConnectionPropertyCache(MQTT5Properties? properties)
84+
{
85+
if (properties == null)
86+
{
87+
// Reset to defaults when disconnected
88+
this.cachedTopicAliasMaximum = 0;
89+
this.cachedRetainAvailable = true;
90+
this.cachedMaximumQoS = null;
91+
this.connectionPropertiesCached = false;
92+
return;
93+
}
94+
95+
this.cachedTopicAliasMaximum = properties.TopicAliasMaximum ?? 0;
96+
this.cachedRetainAvailable = properties.RetainAvailable ?? true;
97+
this.cachedMaximumQoS = properties.MaximumQoS;
98+
this.connectionPropertiesCached = true;
99+
}
100+
71101
/// <summary>
72102
/// Clear all tracked subscriptions in a thread-safe manner.
73103
/// Intended for internal use when the broker indicates Session Present = false on CONNACK.
@@ -171,6 +201,9 @@ public async Task<ConnectResult> ConnectAsync(ConnectOptions? connectOptions = n
171201
// If the Session Expiry Interval is absent the value in the CONNECT Packet used.
172202
connectResult.Properties.SessionExpiryInterval ??= (uint)this.Options.SessionExpiryInterval;
173203

204+
// Update cached connection properties for fast access during publish operations
205+
this.UpdateConnectionPropertyCache(connAck.Properties);
206+
174207
// Fire the corresponding event
175208
this.AfterConnectEventLauncher(connectResult);
176209

@@ -238,10 +271,29 @@ public async Task<bool> DisconnectAsync(DisconnectOptions? options = null)
238271
/// <inheritdoc />
239272
public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, CancellationToken cancellationToken = default)
240273
{
274+
// Fast path for simple QoS 0 messages (most common case)
275+
// Skip validation overhead for messages that don't need it
276+
if (message.QoS == QualityOfService.AtMostOnceDelivery &&
277+
!message.TopicAlias.HasValue &&
278+
!message.Retain &&
279+
this.Connection?.State == ConnectState.Connected &&
280+
this.connectionPropertiesCached)
281+
{
282+
var publishPacket = new PublishPacket(message, 0);
283+
this.Connection.OutgoingPublishQueue.Enqueue(publishPacket);
284+
return new PublishResult(publishPacket.Message);
285+
}
286+
287+
// Full validation path for complex cases
241288
message.Validate();
242289

243290
// Check if topic alias is used but not supported by broker
244-
var topicAliasMaximum = this.Connection?.ConnectionProperties?.TopicAliasMaximum ?? 0;
291+
// Use cache as primary source for performance (avoids null-conditional property access)
292+
// Fall back to ConnectionProperties only if cache is not set (before ConnAck received)
293+
var topicAliasMaximum = this.connectionPropertiesCached
294+
? this.cachedTopicAliasMaximum
295+
: (this.Connection?.ConnectionProperties?.TopicAliasMaximum ?? 0);
296+
245297
if (message.TopicAlias.HasValue)
246298
{
247299
if (topicAliasMaximum == 0)
@@ -256,22 +308,33 @@ public async Task<PublishResult> PublishAsync(MQTT5PublishMessage message, Cance
256308
}
257309

258310
// Check if retain is used but not supported by broker
259-
var retainSupported = this.Connection?.ConnectionProperties?.RetainAvailable ?? true;
311+
// Use cache as primary source for performance (avoids null-conditional property access)
312+
// Fall back to ConnectionProperties only if cache is not set (before ConnAck received)
313+
var retainSupported = this.connectionPropertiesCached
314+
? this.cachedRetainAvailable
315+
: (this.Connection?.ConnectionProperties?.RetainAvailable ?? true);
316+
260317
if (!retainSupported && message.Retain)
261318
{
262319
throw new HiveMQttClientException("Retained messages are not supported by the broker");
263320
}
264321

265-
if (message.QoS.HasValue && this.Connection?.ConnectionProperties?.MaximumQoS.HasValue == true &&
266-
(ushort)message.QoS.Value > this.Connection.ConnectionProperties.MaximumQoS.Value)
322+
// Check QoS maximum
323+
// Use cache as primary source for performance (avoids null-conditional property access)
324+
// Fall back to ConnectionProperties only if cache is not set (before ConnAck received)
325+
var maximumQoS = this.connectionPropertiesCached
326+
? this.cachedMaximumQoS
327+
: this.Connection?.ConnectionProperties?.MaximumQoS;
328+
329+
if (maximumQoS.HasValue && (ushort)message.QoS.Value > maximumQoS.Value)
267330
{
268331
if (this.Connection == null)
269332
{
270333
throw new HiveMQttClientException("Connection is not available");
271334
}
272335

273-
Logger.Debug($"Reducing message QoS from {message.QoS} to broker enforced maximum of {this.Connection.ConnectionProperties.MaximumQoS}");
274-
message.QoS = (QualityOfService)this.Connection.ConnectionProperties.MaximumQoS.Value;
336+
Logger.Debug($"Reducing message QoS from {message.QoS} to broker enforced maximum of {maximumQoS}");
337+
message.QoS = (QualityOfService)maximumQoS.Value;
275338
}
276339

277340
// QoS 0: Fast Service

0 commit comments

Comments
 (0)