Skip to content
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
2a8eefc
Add WS transport
RassK Sep 2, 2025
5dcec99
fix sln
RassK Sep 3, 2025
b8b270c
update changelog
RassK Sep 3, 2025
4601198
refactor and add large packages testing
RassK Sep 3, 2025
68a3ecb
Update src/OpenTelemetry.OpAmp.Client/Internal/Transport/WebSocket/Ws…
RassK Sep 3, 2025
b560d56
Update src/OpenTelemetry.OpAmp.Client/Internal/Transport/WebSocket/Ws…
RassK Sep 3, 2025
aa1dc96
seal classes
RassK Sep 3, 2025
9135941
add disposable support
RassK Sep 3, 2025
a8802db
adjusts language
RassK Sep 3, 2025
91dbbd1
Update test/OpenTelemetry.OpAmp.Client.Tests/WsTransportTest.cs
RassK Sep 3, 2025
01543ce
Update src/OpenTelemetry.OpAmp.Client/Internal/Utils/BufferSegment.cs
RassK Sep 3, 2025
2cf56b6
Update src/OpenTelemetry.OpAmp.Client/Internal/Utils/OpAmpWsHeaderHel…
RassK Sep 3, 2025
7f1aaef
replace separator
RassK Sep 3, 2025
75e706a
refactor magic numbers from varint64 helper
RassK Sep 4, 2025
59d9314
refactor sertification validator
RassK Sep 5, 2025
7bac1e7
fix no cancellation token
RassK Sep 5, 2025
6bd280a
fix cts token
RassK Sep 5, 2025
2424809
optimize varint encode
RassK Sep 5, 2025
de41721
refactor header decode
RassK Sep 5, 2025
10b4be0
add single buffer test
RassK Sep 5, 2025
a730f72
fix test namings
RassK Sep 5, 2025
457c32c
Merge branch 'main' into opamp-client-wstransport
RassK Sep 5, 2025
4cc4f0e
add shortcut for a single buffer
RassK Sep 8, 2025
45f0616
add thread name
RassK Sep 8, 2025
9729ffd
fix ws receiver loop
RassK Sep 8, 2025
1e50ffc
improve varint64 encode
RassK Sep 8, 2025
9d359df
Update test/OpenTelemetry.OpAmp.Client.Tests/Tools/OpAmpFakeWebSocket…
RassK Sep 8, 2025
4dd9658
Update test/OpenTelemetry.OpAmp.Client.Tests/WsTransportTest.cs
RassK Sep 8, 2025
d1c54ce
Merge branch 'main' into opamp-client-wstransport
RassK Sep 8, 2025
57d7480
Update test/OpenTelemetry.OpAmp.Client.Tests/Tools/OpAmpFakeWebSocket…
RassK Sep 8, 2025
75042e7
limit max message size
RassK Sep 8, 2025
73491d0
fix tests parameters language
RassK Sep 9, 2025
eadbb09
use binary literals
RassK Sep 9, 2025
295c1a1
Merge branch 'main' into opamp-client-wstransport
RassK Sep 9, 2025
d68874b
Update src/OpenTelemetry.OpAmp.Client/Internal/Utils/Varint64.cs
RassK Sep 9, 2025
747d220
Merge branch 'main' into opamp-client-wstransport
Kielek Sep 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions opentelemetry-dotnet-contrib.sln
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Shared", "Shared", "{70CA77
test\Shared\TestHttpServer.cs = test\Shared\TestHttpServer.cs
test\Shared\TestSampler.cs = test\Shared\TestSampler.cs
test\Shared\TestTextMapPropagator.cs = test\Shared\TestTextMapPropagator.cs
test\Shared\TestWebSocketServer.cs = test\Shared\TestWebSocketServer.cs
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "scripts", "scripts", "{45D29DAA-0DB9-4808-B879-1AECC37EF366}"
Expand Down
2 changes: 2 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,5 @@
([#2917](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2917))
* Added support for OpAMP Plain HTTP transport.
([#2926](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/2926))
* Added support for OpAMP WebSocket transport.
([#2926](https://github.com/open-telemetry/opentelemetry-dotnet-contrib/pull/3064))
30 changes: 30 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Internal/FrameProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
using OpenTelemetry.OpAmp.Client.Internal.Listeners;
using OpenTelemetry.OpAmp.Client.Internal.Listeners.Messages;

#if NET
using OpenTelemetry.OpAmp.Client.Internal.Utils;
#endif

namespace OpenTelemetry.OpAmp.Client.Internal;

internal sealed class FrameProcessor
Expand Down Expand Up @@ -51,6 +55,32 @@ public void OnServerFrame(ReadOnlySequence<byte> sequence)
this.Deserialize(sequence);
}

#if NET
public void OnServerFrame(ReadOnlySequence<byte> sequence, int count, bool verifyHeader)
{
var headerSize = 0;

// verify and decode
if (verifyHeader)
{
if (!OpAmpWsHeaderHelper.TryVerifyHeader(sequence, out headerSize, out string errorMessage))
{
// TODO: log error message

return;
}
}

this.Deserialize(sequence, count, headerSize);
}

private void Deserialize(ReadOnlySequence<byte> sequence, int count, int headerSize)
{
var dataSegment = sequence.Slice(headerSize, count - headerSize);
this.Deserialize(dataSegment);
}
#endif

private void Deserialize(ReadOnlySequence<byte> sequence)
{
var message = ServerToAgent.Parser.ParseFrom(sequence);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NET

using System.Buffers;
using System.Net.WebSockets;
using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Internal;
using OpenTelemetry.OpAmp.Client.Internal.Utils;

namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket;

internal sealed class WsReceiver : IDisposable
{
private const int RentalBufferSize = 4 * 1024; // 4 KB
private const int ReceiveBufferSize = 8 * 1024; // 8 KB
private const int MaxMessageSize = 128 * 1024; // 128 KB

private readonly ClientWebSocket ws;
private readonly Thread receiveThread;
private readonly FrameProcessor processor;

private readonly byte[] receiveBuffer = new byte[ReceiveBufferSize];

private CancellationToken token;

public WsReceiver(ClientWebSocket ws, FrameProcessor processor)
{
Guard.ThrowIfNull(ws, nameof(ws));
Guard.ThrowIfNull(processor, nameof(processor));

this.ws = ws;
this.processor = processor;
this.receiveThread = new Thread(this.ReceiveLoop)
{
Name = "OpAmp WebSocket Receive Loop",
};
}

public void Start(CancellationToken token = default)
{
this.token = token;
this.receiveThread.Start();
}

public void Dispose()
{
this.receiveThread?.Join();
}

private static void ReturnRentalBuffers(List<byte[]>? rentalBuffers)
{
if (rentalBuffers == null)
{
return;
}

foreach (var rental in rentalBuffers)
{
ArrayPool<byte>.Shared.Return(rental);
}
}

private async void ReceiveLoop()
{
while (!this.token.IsCancellationRequested && this.ws.State == WebSocketState.Open)
{
await this.ReceiveAsync().ConfigureAwait(false);
}
}

private async Task ReceiveAsync()
{
var totalCount = 0;
var workingCount = 0;
WebSocketReceiveResult result;
byte[] workingBuffer = this.receiveBuffer;

List<byte[]>? rentalBuffers = null;
bool continueRead;
bool isClosed;

do
{
// out of space, need to rent more
if (workingBuffer.Length - workingCount == 0)
{
if (rentalBuffers == null)
{
rentalBuffers = [this.receiveBuffer];
}

workingBuffer = ArrayPool<byte>.Shared.Rent(RentalBufferSize);
workingCount = 0;
rentalBuffers.Add(workingBuffer);
}

if (this.ws.State != WebSocketState.Open)
{
// Connection is closed, nothing more to read.
isClosed = true;
break;
}

var segment1 = new ArraySegment<byte>(workingBuffer, workingCount, workingBuffer.Length - workingCount);

try
{
result = await this.ws
.ReceiveAsync(segment1, this.token)
.ConfigureAwait(false);

continueRead = !result.EndOfMessage;
isClosed = result.CloseStatus != null;
workingCount += result.Count;
totalCount += result.Count;
}
catch (OperationCanceledException)
{
continueRead = false;
isClosed = true;
}

if (totalCount > MaxMessageSize)
{
// Message too large, abort the connection.
await this.ws
.CloseOutputAsync(WebSocketCloseStatus.MessageTooBig, "Message too large", CancellationToken.None)
.ConfigureAwait(false);

isClosed = true;
break;
}
}
while (continueRead && !this.token.IsCancellationRequested);

if (!isClosed)
{
var sequence =
rentalBuffers?.Count > 1
? rentalBuffers.CreateSequenceFromBuffers(workingCount + 1)
: new ReadOnlySequence<byte>(this.receiveBuffer);

this.processor.OnServerFrame(sequence, totalCount, verifyHeader: true);
}

ReturnRentalBuffers(rentalBuffers);
}
}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NET

using System.Net.WebSockets;
using Google.Protobuf;
using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Internal.Utils;

namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket;

internal sealed class WsTransmitter
{
private const int BufferSize = 4096;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to choose realistic buffer size, anyone suggests what are the usuals?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4096 is common in things like Stream - you could benchmark to find if there's a better value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to tune later then. Prob need to get full session up and have some stats based on different message sizes.
My theory is that the most common message size should be default. 4kB for small messages, maybe increase a new block size to 8kB if there are already 2-3 4kB blocks requested 🤔


private readonly byte[] buffer = new byte[BufferSize];

private readonly ClientWebSocket ws;

public WsTransmitter(ClientWebSocket ws)
{
Guard.ThrowIfNull(ws, nameof(ws));

this.ws = ws;
}

public async Task SendAsync(IMessage message, CancellationToken token = default)
{
var headerSize = OpAmpWsHeaderHelper.WriteHeader(this.buffer);
var size = message.CalculateSize();

// fits to the buffer, send completely
if (size + headerSize <= BufferSize)
{
var segment = new ArraySegment<byte>(this.buffer, headerSize, size);
message.WriteTo(segment);

// resegment to include the header byte
segment = new ArraySegment<byte>(this.buffer, 0, size + headerSize);

await this.ws.SendAsync(segment, WebSocketMessageType.Binary, true, token).ConfigureAwait(false);
}

// Does not fit, need to chunk the message
else
{
// TODO: Implement chunking logic for large messages
throw new NotImplementedException();
}
}
}

#endif
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

#if NET

using System.Net.WebSockets;
using Google.Protobuf;
using OpenTelemetry.Internal;
using OpenTelemetry.OpAmp.Client.Internal;
using OpenTelemetry.OpAmp.Client.Internal.Transport;

namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket;

internal sealed class WsTransport : IOpAmpTransport, IDisposable
{
private readonly Uri uri;
private readonly HttpClientHandler handler = new();
private readonly ClientWebSocket ws = new();
private readonly WsReceiver receiver;
private readonly WsTransmitter transmitter;
private readonly FrameProcessor processor;

public WsTransport(Uri serverUrl, FrameProcessor processor)
{
Guard.ThrowIfNull(serverUrl, nameof(serverUrl));
Guard.ThrowIfNull(processor, nameof(processor));

// TODO: fix trust all certificates
this.handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator;
this.uri = serverUrl;
this.processor = processor;
this.receiver = new WsReceiver(this.ws, this.processor);
this.transmitter = new WsTransmitter(this.ws);
}

public async Task StartAsync(CancellationToken token = default)
{
using var invoker = new HttpMessageInvoker(this.handler);

await this.ws
.ConnectAsync(this.uri, invoker, token)
.ConfigureAwait(false);

this.receiver.Start(token);
}

public async Task StopAsync(CancellationToken token = default)
{
await this.ws
.CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", token)
.ConfigureAwait(false);
}

public Task SendAsync<T>(T message, CancellationToken token = default)
where T : IMessage<T>
{
return this.transmitter.SendAsync(message, token);
}

public void Dispose()
{
this.handler.Dispose();
this.ws.Dispose();
this.receiver.Dispose();
}
}

#endif
21 changes: 21 additions & 0 deletions src/OpenTelemetry.OpAmp.Client/Internal/Utils/BufferSegment.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers;

namespace OpenTelemetry.OpAmp.Client.Internal.Utils;

internal sealed class BufferSegment : ReadOnlySequenceSegment<byte>
{
public BufferSegment(byte[] buffer)
{
this.Memory = new ReadOnlyMemory<byte>(buffer);
}

public void SetNext(BufferSegment segment)
{
this.Next = segment;

segment.RunningIndex = this.RunningIndex + this.Memory.Length;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

using System.Buffers;
using System.Diagnostics;

namespace OpenTelemetry.OpAmp.Client.Internal.Utils;

internal static class OpAmpWsHeaderHelper
{
public const int OpAmpProtocolHeader = 0x00;
public const int MaxHeaderLength = 10; // Maximum length for varint64 encoding

private static readonly byte[] EncodedHeader = Varint64.Encode(OpAmpProtocolHeader);

public static int WriteHeader(ArraySegment<byte> buffer)
{
Debug.Assert(buffer.Count >= EncodedHeader.Length, $"Ensure {EncodedHeader.Length} bytes for the buffer.");

Buffer.BlockCopy(EncodedHeader, 0, buffer.Array!, buffer.Offset, EncodedHeader.Length);

return EncodedHeader.Length;
}

public static bool TryVerifyHeader(ReadOnlySequence<byte> sequence, out int headerSize, out string errorMessage)
{
var result = Varint64.TryDecode(sequence, out headerSize, out ulong header, out errorMessage);
if (!result)
{
return false;
}

if (header != OpAmpProtocolHeader)
{
errorMessage = $"Invalid OpAmp WebSocket header: {header}.";
return false;
}

return true;
}
}
Loading
Loading