Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 7 additions & 62 deletions src/NATS.Client.Core/Commands/CommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,13 @@ internal sealed class CommandWriter : IAsyncDisposable
private readonly bool _trace;
private readonly string _name;
private readonly NatsConnection _connection;
private readonly ObjectPool _pool;
private readonly int _arrayPoolInitialSize;
private readonly object _lock = new();
private readonly CancellationTokenSource _cts;
private readonly ConnectionStatsCounter _counter;
private readonly Memory<byte> _consolidateMem = new byte[SendMemSize].AsMemory();
private readonly TimeSpan _defaultCommandTimeout;
private readonly Action<PingCommand> _enqueuePing;
private readonly ProtocolWriter _protocolWriter;
private readonly HeaderWriter _headerWriter;
private readonly Channel<int> _channelLock;
private readonly Channel<int> _channelSize;
private readonly PipeReader _pipeReader;
Expand All @@ -55,25 +52,19 @@ internal sealed class CommandWriter : IAsyncDisposable
private CancellationTokenSource? _ctsReader;
private volatile bool _disposed;

public CommandWriter(string name, NatsConnection connection, ObjectPool pool, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
public CommandWriter(string name, NatsConnection connection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing, TimeSpan? overrideCommandTimeout = default)
{
_logger = opts.LoggerFactory.CreateLogger<CommandWriter>();
_trace = _logger.IsEnabled(LogLevel.Trace);
_name = name;
_connection = connection;
_pool = pool;

// Derive ArrayPool rent size from buffer size to
// avoid defining another option.
_arrayPoolInitialSize = opts.WriterBufferSize / 256;

_counter = counter;
_defaultCommandTimeout = overrideCommandTimeout ?? opts.CommandTimeout;
_enqueuePing = enqueuePing;
_protocolWriter = new ProtocolWriter(opts.SubjectEncoding);
_channelLock = Channel.CreateBounded<int>(1);
_channelSize = Channel.CreateUnbounded<int>(new UnboundedChannelOptions { SingleWriter = true, SingleReader = true });
_headerWriter = new HeaderWriter(opts.HeaderEncoding);
_cts = new CancellationTokenSource();

var pipe = new Pipe(new PipeOptions(
Expand Down Expand Up @@ -294,49 +285,23 @@ public ValueTask PongAsync(CancellationToken cancellationToken = default)
return default;
}

public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers, string? replyTo, INatsSerialize<T> serializer, CancellationToken cancellationToken)
public ValueTask PublishAsync(string subject, ReadOnlyMemory<byte>? headersBuffer, ReadOnlyMemory<byte> payloadBuffer, string? replyTo, CancellationToken cancellationToken)
{
if (_trace)
{
_logger.LogTrace(NatsLogEvents.Protocol, "PUB {Subject} {ReplyTo}", subject, replyTo);
}

NatsPooledBufferWriter<byte>? headersBuffer = null;
if (headers != null)
{
if (!_pool.TryRent(out headersBuffer))
headersBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);
}

NatsPooledBufferWriter<byte> payloadBuffer;
if (!_pool.TryRent(out payloadBuffer!))
payloadBuffer = new NatsPooledBufferWriter<byte>(_arrayPoolInitialSize);

try
{
if (headers != null)
_headerWriter.Write(headersBuffer!, headers);

if (value != null)
serializer.Serialize(payloadBuffer, value);

var size = payloadBuffer.WrittenMemory.Length + (headersBuffer?.WrittenMemory.Length ?? 0);
var size = payloadBuffer.Length + (headersBuffer?.Length ?? 0);
if (_connection.ServerInfo is { } info && size > info.MaxPayload)
{
throw new NatsPayloadTooLargeException($"Payload size {size} exceeds server's maximum payload size {info.MaxPayload}");
}
}
catch
{
payloadBuffer.Reset();
_pool.Return(payloadBuffer);

if (headersBuffer != null)
{
headersBuffer.Reset();
_pool.Return(headersBuffer);
}

throw;
}

Expand All @@ -361,21 +326,12 @@ public ValueTask PublishAsync<T>(string subject, T? value, NatsHeaders? headers,
throw new ObjectDisposedException(nameof(CommandWriter));
}

_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer, payloadBuffer);
EnqueueCommand();
}
finally
{
_semLock.Release();

payloadBuffer.Reset();
_pool.Return(payloadBuffer);

if (headersBuffer != null)
{
headersBuffer.Reset();
_pool.Return(headersBuffer);
}
}

return default;
Expand Down Expand Up @@ -815,7 +771,7 @@ private async ValueTask PongStateMachineAsync(bool lockHeld, CancellationToken c
#if !NETSTANDARD
[AsyncMethodBuilder(typeof(PoolingAsyncValueTaskMethodBuilder))]
#endif
private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, NatsPooledBufferWriter<byte>? headersBuffer, NatsPooledBufferWriter<byte> payloadBuffer, CancellationToken cancellationToken)
private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject, string? replyTo, ReadOnlyMemory<byte>? headersBuffer, ReadOnlyMemory<byte> payloadBuffer, CancellationToken cancellationToken)
{
try
{
Expand All @@ -839,7 +795,7 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
await _flushTask!.WaitAsync(_defaultCommandTimeout, cancellationToken).ConfigureAwait(false);
}

_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer?.WrittenMemory, payloadBuffer.WrittenMemory);
_protocolWriter.WritePublish(_pipeWriter, subject, replyTo, headersBuffer, payloadBuffer);
EnqueueCommand();
}
catch (TimeoutException)
Expand All @@ -848,21 +804,10 @@ private async ValueTask PublishStateMachineAsync(bool lockHeld, string subject,
// standardize to an OperationCanceledException as if a cancellationToken was used
throw new OperationCanceledException();
}
finally
{
_semLock.Release();
}
}
finally
{
payloadBuffer.Reset();
_pool.Return(payloadBuffer);

if (headersBuffer != null)
{
headersBuffer.Reset();
_pool.Return(headersBuffer);
}
_semLock.Release();
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Commands/PriorityCommandWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ internal sealed class PriorityCommandWriter : IAsyncDisposable

public PriorityCommandWriter(NatsConnection connection, ObjectPool pool, SocketConnectionWrapper socketConnection, NatsOpts opts, ConnectionStatsCounter counter, Action<PingCommand> enqueuePing)
{
CommandWriter = new CommandWriter("init", connection, pool, opts, counter, enqueuePing);
CommandWriter = new CommandWriter("init", connection, opts, counter, enqueuePing);
CommandWriter.Reset(socketConnection);
}

Expand Down
22 changes: 22 additions & 0 deletions src/NATS.Client.Core/Internal/NatsRecievedEvent.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
using System.Buffers;

namespace NATS.Client.Core.Internal;

public readonly record struct NatsRecievedEvent
{
public NatsRecievedEvent(string subject, string? replyTo, ReadOnlySequence<byte>? headersBuffer, ReadOnlySequence<byte> payload)
{
Subject = subject;
ReplyTo = replyTo;
HeadersBuffer = headersBuffer;
Payload = payload;
}

public string Subject { get; }

public string? ReplyTo { get; }

public ReadOnlySequence<byte>? HeadersBuffer { get; }

public ReadOnlySequence<byte> Payload { get; }
}
37 changes: 23 additions & 14 deletions src/NATS.Client.Core/Internal/ReplyTask.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,28 @@

namespace NATS.Client.Core.Internal;

internal sealed class ReplyTask<T> : ReplyTaskBase, IDisposable
internal sealed class ReplyTask : ReplyTaskBase, IDisposable
{
private readonly object _gate;
private readonly ReplyTaskFactory _factory;
private readonly long _id;
private readonly NatsConnection _connection;
private readonly INatsDeserialize<T> _deserializer;
private readonly TimeSpan _requestTimeout;
private readonly TaskCompletionSource _tcs;
private NatsMsg<T> _msg;
private NatsRecievedEvent _msg;

public ReplyTask(ReplyTaskFactory factory, long id, string subject, NatsConnection connection, INatsDeserialize<T> deserializer, TimeSpan requestTimeout)
public ReplyTask(ReplyTaskFactory factory, long id, string subject, TimeSpan requestTimeout)
{
_factory = factory;
_id = id;
Subject = subject;
_connection = connection;
_deserializer = deserializer;
_requestTimeout = requestTimeout;
_tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
_gate = new object();
}

public string Subject { get; }

public async ValueTask<NatsMsg<T>> GetResultAsync(CancellationToken cancellationToken)
public async ValueTask<NatsRecievedEvent> GetResultAsync(CancellationToken cancellationToken)
{
try
{
Expand All @@ -53,7 +49,23 @@ public override void SetResult(string? replyTo, ReadOnlySequence<byte> payload,
{
lock (_gate)
{
_msg = NatsMsg<T>.Build(Subject, replyTo, headersBuffer, payload, _connection, _connection.HeaderParser, _deserializer);
var payloadValue = ReadOnlySequence<byte>.Empty;
if (payload.Length > 0)
{
var payloadData = new byte[payload.Length];
payload.CopyTo(payloadData);
payloadValue = new ReadOnlySequence<byte>(payloadData);
}

ReadOnlySequence<byte>? headerValue = null;
if (headersBuffer != null)
{
var headerData = new byte[headersBuffer.Value.Length];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

allocating here defeats the point of managing pooled buffers and increases GC pressure.

headersBuffer.Value.CopyTo(headerData);
headerValue = new ReadOnlySequence<byte>(headerData);
}

_msg = new NatsRecievedEvent(Subject, replyTo, headerValue, payloadValue);
}

_tcs.TrySetResult();
Expand All @@ -73,7 +85,6 @@ internal sealed class ReplyTaskFactory
private readonly string _inboxPrefixString;
private readonly NatsConnection _connection;
private readonly ConcurrentDictionary<long, ReplyTaskBase> _replies;
private readonly INatsSerializerRegistry _serializerRegistry;
private readonly TimeSpan _requestTimeout;
private readonly int _subjectMaxLength;
private readonly bool _allocSubject;
Expand All @@ -86,14 +97,12 @@ public ReplyTaskFactory(NatsConnection connection)
_inboxPrefix = Encoding.UTF8.GetBytes(_inboxPrefixString);
_subjectMaxLength = _inboxPrefix.Length + 20; // 20 digits for long
_allocSubject = _subjectMaxLength < 128;
_serializerRegistry = _connection.Opts.SerializerRegistry;
_requestTimeout = _connection.Opts.RequestTimeout;
_replies = new ConcurrentDictionary<long, ReplyTaskBase>();
}

public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deserializer, TimeSpan? requestTimeout)
public ReplyTask CreateReplyTask(TimeSpan? requestTimeout)
{
deserializer ??= _serializerRegistry.GetDeserializer<TReply>();
var id = Interlocked.Increment(ref _nextId);

string subject;
Expand All @@ -117,7 +126,7 @@ public ReplyTask<TReply> CreateReplyTask<TReply>(INatsDeserialize<TReply>? deser
subject = _inboxPrefixString + id;
}

var rt = new ReplyTask<TReply>(this, id, subject, _connection, deserializer, requestTimeout ?? _requestTimeout);
var rt = new ReplyTask(this, id, subject, requestTimeout ?? _requestTimeout);
_replies.TryAdd(id, rt);
return rt;
}
Expand Down
2 changes: 1 addition & 1 deletion src/NATS.Client.Core/Internal/SubscriptionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ private async ValueTask SubscribeInternalAsync(string subject, string? queueGrou

try
{
await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, cancellationToken).ConfigureAwait(false);
await _connection.SubscribeCoreAsync(sid, subject, queueGroup, opts?.MaxMsgs, false, cancellationToken).ConfigureAwait(false);
await sub.ReadyAsync().ConfigureAwait(false);
}
catch
Expand Down
Loading
Loading