-
Couldn't load subscription status.
- Fork 354
[OpAMP.Client] WebSocket Transport #3064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 22 commits
2a8eefc
5dcec99
b8b270c
4601198
68a3ecb
b560d56
aa1dc96
9135941
a8802db
91dbbd1
01543ce
2cf56b6
7f1aaef
75e706a
59d9314
7bac1e7
6bd280a
2424809
de41721
10b4be0
a730f72
457c32c
4cc4f0e
45f0616
9729ffd
1e50ffc
9d359df
4dd9658
d1c54ce
57d7480
75042e7
73491d0
eadbb09
295c1a1
d68874b
747d220
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,134 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #if NET | ||
|
|
||
| using System.Buffers; | ||
| using System.Net.WebSockets; | ||
| using OpenTelemetry.Internal; | ||
| using OpenTelemetry.OpAmp.Client.Internal; | ||
| using OpenTelemetry.OpAmp.Client.Internal.Utils; | ||
|
|
||
| namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket; | ||
|
|
||
| internal sealed class WsReceiver : IDisposable | ||
| { | ||
| private const int RentalBufferSize = 4096; | ||
| private const int ReceiveBufferSize = 8192; | ||
|
|
||
| private readonly ClientWebSocket ws; | ||
| private readonly Thread receiveThread; | ||
| private readonly FrameProcessor processor; | ||
|
|
||
| private readonly byte[] receiveBuffer = new byte[ReceiveBufferSize]; | ||
|
|
||
| private CancellationToken token; | ||
|
|
||
| public WsReceiver(ClientWebSocket ws, FrameProcessor processor) | ||
| { | ||
| Guard.ThrowIfNull(ws, nameof(ws)); | ||
| Guard.ThrowIfNull(processor, nameof(processor)); | ||
|
|
||
| this.ws = ws; | ||
| this.processor = processor; | ||
| this.receiveThread = new Thread(this.ReceiveLoop); | ||
martincostello marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| public void Start(CancellationToken token = default) | ||
| { | ||
| this.token = token; | ||
| this.receiveThread.Start(); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| this.receiveThread?.Join(); | ||
| } | ||
|
|
||
| private static void ReturnRentalBuffers(List<byte[]>? rentalBuffers) | ||
| { | ||
| if (rentalBuffers == null) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| foreach (var rental in rentalBuffers) | ||
| { | ||
| ArrayPool<byte>.Shared.Return(rental); | ||
| } | ||
| } | ||
|
|
||
| private async void ReceiveLoop() | ||
| { | ||
| while (true) | ||
| { | ||
| this.token.ThrowIfCancellationRequested(); | ||
martincostello marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| if (this.ws.State != WebSocketState.Open) | ||
| { | ||
| // Connection is closed, dont start a new loop | ||
| break; | ||
| } | ||
|
|
||
| await this.ReceiveAsync(this.token).ConfigureAwait(false); | ||
| } | ||
| } | ||
|
|
||
| private async Task ReceiveAsync(CancellationToken token) | ||
| { | ||
| var totalCount = 0; | ||
| var workingCount = 0; | ||
| var isClosed = false; | ||
| WebSocketReceiveResult result; | ||
| byte[] workingBuffer = this.receiveBuffer; | ||
|
|
||
| List<byte[]>? rentalBuffers = null; | ||
|
|
||
| do | ||
| { | ||
| this.token.ThrowIfCancellationRequested(); | ||
|
|
||
| // out of space, need to rent more | ||
| if (workingBuffer.Length - workingCount == 0) | ||
| { | ||
| if (rentalBuffers == null) | ||
| { | ||
| rentalBuffers = [this.receiveBuffer]; | ||
| } | ||
|
|
||
| workingBuffer = ArrayPool<byte>.Shared.Rent(RentalBufferSize); | ||
| workingCount = 0; | ||
| rentalBuffers.Add(workingBuffer); | ||
martincostello marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| if (this.ws.State != WebSocketState.Open) | ||
| { | ||
| // Connection is closed, nothing more to read. | ||
| isClosed = true; | ||
| break; | ||
| } | ||
|
|
||
| var segment1 = new ArraySegment<byte>(workingBuffer, workingCount, workingBuffer.Length - workingCount); | ||
| result = await this.ws.ReceiveAsync(segment1, token).ConfigureAwait(false); | ||
|
|
||
| isClosed = result.CloseStatus != null; | ||
| workingCount += result.Count; | ||
| totalCount += result.Count; | ||
| } | ||
| while (!result.EndOfMessage); | ||
|
|
||
| if (!isClosed) | ||
| { | ||
| var sequence = | ||
| rentalBuffers?.Count > 1 | ||
| ? rentalBuffers.CreateSequenceFromBuffers(workingCount + 1) | ||
| : new ReadOnlySequence<byte>(this.receiveBuffer); | ||
|
|
||
| this.processor.OnServerFrame(sequence, totalCount, verifyHeader: true); | ||
| } | ||
|
|
||
| ReturnRentalBuffers(rentalBuffers); | ||
| } | ||
| } | ||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,54 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #if NET | ||
|
|
||
| using System.Net.WebSockets; | ||
| using Google.Protobuf; | ||
| using OpenTelemetry.Internal; | ||
| using OpenTelemetry.OpAmp.Client.Internal.Utils; | ||
|
|
||
| namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket; | ||
|
|
||
| internal sealed class WsTransmitter | ||
| { | ||
| private const int BufferSize = 4096; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need to choose realistic buffer size, anyone suggests what are the usuals? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 4096 is common in things like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. May need to tune later then. Prob need to get full session up and have some stats based on different message sizes. |
||
|
|
||
| private readonly byte[] buffer = new byte[BufferSize]; | ||
|
|
||
| private readonly ClientWebSocket ws; | ||
|
|
||
| public WsTransmitter(ClientWebSocket ws) | ||
| { | ||
| Guard.ThrowIfNull(ws, nameof(ws)); | ||
|
|
||
| this.ws = ws; | ||
| } | ||
|
|
||
| public async Task SendAsync(IMessage message, CancellationToken token = default) | ||
| { | ||
| var headerSize = OpAmpWsHeaderHelper.WriteHeader(this.buffer); | ||
| var size = message.CalculateSize(); | ||
|
|
||
| // fits to the buffer, send completely | ||
| if (size + headerSize <= BufferSize) | ||
| { | ||
| var segment = new ArraySegment<byte>(this.buffer, headerSize, size); | ||
| message.WriteTo(segment); | ||
|
|
||
| // resegment to include the header byte | ||
| segment = new ArraySegment<byte>(this.buffer, 0, size + headerSize); | ||
|
|
||
| await this.ws.SendAsync(segment, WebSocketMessageType.Binary, true, token).ConfigureAwait(false); | ||
| } | ||
|
|
||
| // Does not fit, need to chunk the message | ||
| else | ||
| { | ||
| // TODO: Implement chunking logic for large messages | ||
| throw new NotImplementedException(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| #endif | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,68 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| #if NET | ||
|
|
||
| using System.Net.WebSockets; | ||
| using Google.Protobuf; | ||
| using OpenTelemetry.Internal; | ||
| using OpenTelemetry.OpAmp.Client.Internal; | ||
| using OpenTelemetry.OpAmp.Client.Internal.Transport; | ||
|
|
||
| namespace OpenTelemetry.OpAmp.Client.Transport.WebSocket; | ||
|
|
||
| internal sealed class WsTransport : IOpAmpTransport, IDisposable | ||
| { | ||
| private readonly Uri uri; | ||
| private readonly HttpClientHandler handler = new(); | ||
| private readonly ClientWebSocket ws = new(); | ||
| private readonly WsReceiver receiver; | ||
| private readonly WsTransmitter transmitter; | ||
| private readonly FrameProcessor processor; | ||
|
|
||
| public WsTransport(Uri serverUrl, FrameProcessor processor) | ||
| { | ||
| Guard.ThrowIfNull(serverUrl, nameof(serverUrl)); | ||
| Guard.ThrowIfNull(processor, nameof(processor)); | ||
|
|
||
| // TODO: fix trust all certificates | ||
| this.handler.ServerCertificateCustomValidationCallback = HttpClientHandler.DangerousAcceptAnyServerCertificateValidator; | ||
| this.uri = serverUrl; | ||
| this.processor = processor; | ||
| this.receiver = new WsReceiver(this.ws, this.processor); | ||
| this.transmitter = new WsTransmitter(this.ws); | ||
| } | ||
|
|
||
| public async Task StartAsync(CancellationToken token = default) | ||
| { | ||
| using var invoker = new HttpMessageInvoker(this.handler); | ||
|
|
||
| await this.ws | ||
| .ConnectAsync(this.uri, invoker, token) | ||
| .ConfigureAwait(false); | ||
|
|
||
| this.receiver.Start(token); | ||
| } | ||
|
|
||
| public async Task StopAsync(CancellationToken token = default) | ||
| { | ||
| await this.ws | ||
| .CloseAsync(WebSocketCloseStatus.NormalClosure, "Client closed connection", token) | ||
| .ConfigureAwait(false); | ||
| } | ||
|
|
||
| public Task SendAsync<T>(T message, CancellationToken token = default) | ||
| where T : IMessage<T> | ||
| { | ||
| return this.transmitter.SendAsync(message, token); | ||
| } | ||
|
|
||
| public void Dispose() | ||
| { | ||
| this.handler.Dispose(); | ||
| this.ws.Dispose(); | ||
| this.receiver.Dispose(); | ||
| } | ||
| } | ||
|
|
||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,21 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| using System.Buffers; | ||
|
|
||
| namespace OpenTelemetry.OpAmp.Client.Internal.Utils; | ||
|
|
||
| internal sealed class BufferSegment : ReadOnlySequenceSegment<byte> | ||
| { | ||
| public BufferSegment(byte[] buffer) | ||
| { | ||
| this.Memory = new ReadOnlyMemory<byte>(buffer); | ||
| } | ||
|
|
||
| public void SetNext(BufferSegment segment) | ||
| { | ||
| this.Next = segment; | ||
|
|
||
| segment.RunningIndex = this.RunningIndex + this.Memory.Length; | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,41 @@ | ||
| // Copyright The OpenTelemetry Authors | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| using System.Buffers; | ||
| using System.Diagnostics; | ||
|
|
||
| namespace OpenTelemetry.OpAmp.Client.Internal.Utils; | ||
|
|
||
| internal static class OpAmpWsHeaderHelper | ||
| { | ||
| public const int OpAmpProtocolHeader = 0x00; | ||
| public const int MaxHeaderLength = 10; // Maximum length for varint64 encoding | ||
|
|
||
| private static readonly byte[] EncodedHeader = Varint64.Encode(OpAmpProtocolHeader); | ||
|
|
||
| public static int WriteHeader(ArraySegment<byte> buffer) | ||
| { | ||
| Debug.Assert(buffer.Count >= EncodedHeader.Length, $"Ensure {EncodedHeader.Length} bytes for the buffer."); | ||
|
|
||
| Buffer.BlockCopy(EncodedHeader, 0, buffer.Array!, buffer.Offset, EncodedHeader.Length); | ||
|
|
||
| return EncodedHeader.Length; | ||
| } | ||
|
|
||
| public static bool TryVerifyHeader(ReadOnlySequence<byte> sequence, out int headerSize, out string errorMessage) | ||
| { | ||
| var result = Varint64.TryDecode(sequence, out headerSize, out ulong header, out errorMessage); | ||
| if (!result) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| if (header != OpAmpProtocolHeader) | ||
| { | ||
| errorMessage = $"Invalid OpAmp WebSocket header: {header}."; | ||
| return false; | ||
| } | ||
|
|
||
| return true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to choose realistic buffer size, anyone suggests what are the usuals?