Skip to content

Commit b943827

Browse files
authored
Better Packet Encoding with ArrayPool (#288)
1 parent 2de45b9 commit b943827

File tree

12 files changed

+603
-123
lines changed

12 files changed

+603
-123
lines changed

Source/HiveMQtt/MQTT5/ControlPacket.cs

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,73 @@ protected static short EncodeVariableByteInteger(MemoryStream stream, int number
279279
return written;
280280
}
281281

282+
/// <summary>
283+
/// Gets the size in bytes needed to encode a variable byte integer.
284+
/// </summary>
285+
/// <param name="number">The integer value.</param>
286+
/// <returns>The number of bytes needed to encode the value.</returns>
287+
protected static int GetVariableByteIntegerSize(int number)
288+
{
289+
if (number is < 0 or > 268435455)
290+
{
291+
throw new MQTTProtocolException("Value out of range for a variable byte integer: {value}");
292+
}
293+
294+
if (number <= 0x7F)
295+
{
296+
return 1;
297+
}
298+
299+
if (number <= 0x3FFF)
300+
{
301+
return 2;
302+
}
303+
304+
if (number <= 0x1FFFFF)
305+
{
306+
return 3;
307+
}
308+
309+
return 4;
310+
}
311+
312+
/// <summary>
313+
/// Encode an Integer into a <c>Span&lt;byte&gt;</c> as an MQTT Variable Byte Integer.
314+
/// </summary>
315+
/// <param name="span">The span to write the variable byte integer into.</param>
316+
/// <param name="number">The integer to be encoded and written.</param>
317+
/// <returns>The number of bytes written.</returns>
318+
protected static int EncodeVariableByteIntegerToSpan(Span<byte> span, int number)
319+
{
320+
if (number is < 0 or > 268435455)
321+
{
322+
throw new MQTTProtocolException("Value out of range for a variable byte integer: {value}");
323+
}
324+
325+
if (number <= 0x7F)
326+
{
327+
span[0] = (byte)number;
328+
return 1;
329+
}
330+
331+
var written = 0;
332+
var value = number;
333+
do
334+
{
335+
var encodedByte = value % 0x80;
336+
value /= 0x80;
337+
if (value > 0)
338+
{
339+
encodedByte |= 0x80;
340+
}
341+
342+
span[written++] = (byte)encodedByte;
343+
}
344+
while (value > 0);
345+
346+
return written;
347+
}
348+
282349
/// <summary>
283350
/// Encode an MQTT Binary Data data representation.
284351
///

Source/HiveMQtt/MQTT5/Packets/ConnectPacket.cs

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
namespace HiveMQtt.MQTT5.Packets;
1717

18+
using System.Buffers;
1819
using System.IO;
1920
using System.Text;
2021
using HiveMQtt.Client.Options;
@@ -102,18 +103,37 @@ public byte[] Encode()
102103
Array.Clear(passwordString.ToCharArray(), 0, passwordString.Length);
103104
}
104105

105-
// Construct the final packet
106-
var constructedPacket = new MemoryStream((int)vhAndPayloadStream.Length + 5);
106+
// Calculate the size needed for the final packet
107+
var vhAndPayloadLength = (int)vhAndPayloadStream.Length;
108+
var fixedHeaderSize = 1 + GetVariableByteIntegerSize(vhAndPayloadLength);
109+
var totalSize = fixedHeaderSize + vhAndPayloadLength;
107110

108-
// Write the Fixed Header
109-
constructedPacket.WriteByte(((byte)ControlPacketType.Connect) << 4);
110-
_ = EncodeVariableByteInteger(constructedPacket, (int)vhAndPayloadStream.Length);
111-
112-
// Copy the Variable Header and Payload
113-
vhAndPayloadStream.Position = 0;
114-
vhAndPayloadStream.CopyTo(constructedPacket);
115-
116-
return constructedPacket.ToArray();
111+
// Use ArrayPool for the final buffer
112+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(totalSize);
113+
try
114+
{
115+
var bufferSpan = rentedBuffer.AsSpan(0, totalSize);
116+
var offset = 0;
117+
118+
// Write the Fixed Header
119+
bufferSpan[offset++] = ((byte)ControlPacketType.Connect) << 4;
120+
offset += EncodeVariableByteIntegerToSpan(bufferSpan[offset..], vhAndPayloadLength);
121+
122+
// Copy the Variable Header and Payload directly from the stream
123+
vhAndPayloadStream.Position = 0;
124+
var vhAndPayloadBuffer = vhAndPayloadStream.GetBuffer();
125+
var vhAndPayloadSpan = new Span<byte>(vhAndPayloadBuffer, 0, vhAndPayloadLength);
126+
vhAndPayloadSpan.CopyTo(bufferSpan[offset..]);
127+
128+
// Return a properly sized array
129+
var result = new byte[totalSize];
130+
bufferSpan[..totalSize].CopyTo(result);
131+
return result;
132+
}
133+
finally
134+
{
135+
ArrayPool<byte>.Shared.Return(rentedBuffer);
136+
}
117137
}
118138
}
119139

Source/HiveMQtt/MQTT5/Packets/DisconnectPacket.cs

Lines changed: 31 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,16 +75,37 @@ public byte[] Encode()
7575

7676
// Disconnect has no payload
7777

78-
// Fixed Header - Add to the beginning of the stream
79-
var remainingLength = stream.Length - 2;
80-
81-
// Go back to the beginning of the stream and write
82-
// the first two bytes.
83-
stream.Position = 0;
84-
stream.WriteByte((byte)ControlPacketType.Disconnect << 4);
85-
EncodeVariableByteInteger(stream, (int)remainingLength);
86-
87-
return stream.ToArray();
78+
// Fixed Header - Calculate remaining length
79+
var remainingLength = (int)stream.Length - 2;
80+
var fixedHeaderSize = 1 + GetVariableByteIntegerSize(remainingLength);
81+
var totalSize = fixedHeaderSize + remainingLength;
82+
83+
// Use ArrayPool for the final buffer
84+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(totalSize);
85+
try
86+
{
87+
var bufferSpan = rentedBuffer.AsSpan(0, totalSize);
88+
var offset = 0;
89+
90+
// Write the Fixed Header
91+
bufferSpan[offset++] = (byte)ControlPacketType.Disconnect << 4;
92+
offset += EncodeVariableByteIntegerToSpan(bufferSpan[offset..], remainingLength);
93+
94+
// Copy the Variable Header directly from the stream
95+
stream.Position = 2;
96+
var streamBuffer = stream.GetBuffer();
97+
var variableHeaderSpan = new Span<byte>(streamBuffer, 2, remainingLength);
98+
variableHeaderSpan.CopyTo(bufferSpan[offset..]);
99+
100+
// Return a properly sized array
101+
var result = new byte[totalSize];
102+
bufferSpan[..totalSize].CopyTo(result);
103+
return result;
104+
}
105+
finally
106+
{
107+
ArrayPool<byte>.Shared.Return(rentedBuffer);
108+
}
88109
}
89110
}
90111

Source/HiveMQtt/MQTT5/Packets/PingReqPacket.cs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
namespace HiveMQtt.MQTT5.Packets;
1717

18-
using System.IO;
19-
2018
/// <summary>
2119
/// An MQTT PingReq Control Packet.
2220
///
@@ -33,13 +31,14 @@ public class PingReqPacket : ControlPacket
3331
/// <returns>An array of bytes ready to be sent.</returns>
3432
public static byte[] Encode()
3533
{
36-
using (var stream = new MemoryStream(2))
37-
{
38-
// Fixed Header
39-
stream.WriteByte(((byte)ControlPacketType.PingReq) << 4);
40-
stream.WriteByte(0x0);
34+
// PingReq is always 2 bytes - use stackalloc for zero allocation
35+
#pragma warning disable IDE0302 // Collection initialization - stackalloc is not a collection
36+
Span<byte> buffer = stackalloc byte[2];
37+
#pragma warning restore IDE0302
38+
buffer[0] = ((byte)ControlPacketType.PingReq) << 4;
39+
buffer[1] = 0x0;
4140

42-
return stream.ToArray();
43-
}
41+
// Return a new array (required for async operations)
42+
return buffer.ToArray();
4443
}
4544
}

Source/HiveMQtt/MQTT5/Packets/PubAckPacket.cs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,18 +51,37 @@ public byte[] Encode()
5151
vhStream.WriteByte((byte)this.ReasonCode);
5252
this.EncodeProperties(vhStream);
5353

54-
// Construct the final packet
55-
var constructedPacket = new MemoryStream((int)vhStream.Length + 5);
56-
57-
// Write the Fixed Header
58-
constructedPacket.WriteByte(((byte)ControlPacketType.PubAck) << 4);
59-
_ = EncodeVariableByteInteger(constructedPacket, (int)vhStream.Length);
60-
61-
// Copy the Variable Header and Payload
62-
vhStream.Position = 0;
63-
vhStream.CopyTo(constructedPacket);
64-
65-
return constructedPacket.ToArray();
54+
// Calculate the size needed for the final packet
55+
var vhLength = (int)vhStream.Length;
56+
var fixedHeaderSize = 1 + GetVariableByteIntegerSize(vhLength);
57+
var totalSize = fixedHeaderSize + vhLength;
58+
59+
// Use ArrayPool for the final buffer
60+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(totalSize);
61+
try
62+
{
63+
var bufferSpan = rentedBuffer.AsSpan(0, totalSize);
64+
var offset = 0;
65+
66+
// Write the Fixed Header
67+
bufferSpan[offset++] = ((byte)ControlPacketType.PubAck) << 4;
68+
offset += EncodeVariableByteIntegerToSpan(bufferSpan[offset..], vhLength);
69+
70+
// Copy the Variable Header directly from the stream
71+
vhStream.Position = 0;
72+
var vhBuffer = vhStream.GetBuffer();
73+
var vhSpan = new Span<byte>(vhBuffer, 0, vhLength);
74+
vhSpan.CopyTo(bufferSpan[offset..]);
75+
76+
// Return a properly sized array
77+
var result = new byte[totalSize];
78+
bufferSpan[..totalSize].CopyTo(result);
79+
return result;
80+
}
81+
finally
82+
{
83+
ArrayPool<byte>.Shared.Return(rentedBuffer);
84+
}
6685
}
6786
}
6887

Source/HiveMQtt/MQTT5/Packets/PubCompPacket.cs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,37 @@ public byte[] Encode()
4949
vhStream.WriteByte((byte)this.ReasonCode);
5050
this.EncodeProperties(vhStream);
5151

52-
// Construct the final packet
53-
var constructedPacket = new MemoryStream((int)vhStream.Length + 5);
54-
55-
// Write the Fixed Header
56-
constructedPacket.WriteByte(((byte)ControlPacketType.PubComp) << 4);
57-
_ = EncodeVariableByteInteger(constructedPacket, (int)vhStream.Length);
58-
59-
// Copy the Variable Header and Payload
60-
vhStream.Position = 0;
61-
vhStream.CopyTo(constructedPacket);
62-
63-
return constructedPacket.ToArray();
52+
// Calculate the size needed for the final packet
53+
var vhLength = (int)vhStream.Length;
54+
var fixedHeaderSize = 1 + GetVariableByteIntegerSize(vhLength);
55+
var totalSize = fixedHeaderSize + vhLength;
56+
57+
// Use ArrayPool for the final buffer
58+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(totalSize);
59+
try
60+
{
61+
var bufferSpan = rentedBuffer.AsSpan(0, totalSize);
62+
var offset = 0;
63+
64+
// Write the Fixed Header
65+
bufferSpan[offset++] = ((byte)ControlPacketType.PubComp) << 4;
66+
offset += EncodeVariableByteIntegerToSpan(bufferSpan[offset..], vhLength);
67+
68+
// Copy the Variable Header directly from the stream
69+
vhStream.Position = 0;
70+
var vhBuffer = vhStream.GetBuffer();
71+
var vhSpan = new Span<byte>(vhBuffer, 0, vhLength);
72+
vhSpan.CopyTo(bufferSpan[offset..]);
73+
74+
// Return a properly sized array
75+
var result = new byte[totalSize];
76+
bufferSpan[..totalSize].CopyTo(result);
77+
return result;
78+
}
79+
finally
80+
{
81+
ArrayPool<byte>.Shared.Return(rentedBuffer);
82+
}
6483
}
6584
}
6685

Source/HiveMQtt/MQTT5/Packets/PubRecPacket.cs

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,37 @@ public byte[] Encode()
4949
vhStream.WriteByte((byte)this.ReasonCode);
5050
this.EncodeProperties(vhStream);
5151

52-
// Construct the final packet
53-
var constructedPacket = new MemoryStream((int)vhStream.Length + 5);
54-
55-
// Write the Fixed Header
56-
constructedPacket.WriteByte(((byte)ControlPacketType.PubRec) << 4);
57-
_ = EncodeVariableByteInteger(constructedPacket, (int)vhStream.Length);
58-
59-
// Copy the Variable Header and Payload
60-
vhStream.Position = 0;
61-
vhStream.CopyTo(constructedPacket);
62-
63-
return constructedPacket.ToArray();
52+
// Calculate the size needed for the final packet
53+
var vhLength = (int)vhStream.Length;
54+
var fixedHeaderSize = 1 + GetVariableByteIntegerSize(vhLength);
55+
var totalSize = fixedHeaderSize + vhLength;
56+
57+
// Use ArrayPool for the final buffer
58+
var rentedBuffer = ArrayPool<byte>.Shared.Rent(totalSize);
59+
try
60+
{
61+
var bufferSpan = rentedBuffer.AsSpan(0, totalSize);
62+
var offset = 0;
63+
64+
// Write the Fixed Header
65+
bufferSpan[offset++] = ((byte)ControlPacketType.PubRec) << 4;
66+
offset += EncodeVariableByteIntegerToSpan(bufferSpan[offset..], vhLength);
67+
68+
// Copy the Variable Header directly from the stream
69+
vhStream.Position = 0;
70+
var vhBuffer = vhStream.GetBuffer();
71+
var vhSpan = new Span<byte>(vhBuffer, 0, vhLength);
72+
vhSpan.CopyTo(bufferSpan[offset..]);
73+
74+
// Return a properly sized array
75+
var result = new byte[totalSize];
76+
bufferSpan[..totalSize].CopyTo(result);
77+
return result;
78+
}
79+
finally
80+
{
81+
ArrayPool<byte>.Shared.Return(rentedBuffer);
82+
}
6483
}
6584
}
6685

0 commit comments

Comments
 (0)