-
Notifications
You must be signed in to change notification settings - Fork 84
Open
Description
using NATS.Client.Core;
using NATS.Client.JetStream;
using log4net;
using log4net.Appender;
using log4net.Config;
using log4net.Layout;
using NATS.Client.JetStream.Models;
using NATS.Net;
using System.Diagnostics;
using Microsoft.Extensions.Logging.Abstractions;
public class EphemeralConsumerTest
{
private static readonly ILog Log = LogManager.GetLogger(typeof(EphemeralConsumerTest));
private static INatsJSConsumer _consumer;
private static INatsJSContext _context;
private const string Url = "127.0.0.1:4222";
private static CancellationTokenSource _cts = null;
private static Task _task = null;
private const string DestName = "messaging.destination.name";
private const string OpKey = "messaging.operation";
private const string OpPub = "publish";
private const string ConsumerNextVerb = "$JS.API.CONSUMER.MSG.NEXT";
private const string NatsActivitySource = "NATS.Net";
// Connect to NATS server. Since connection is disposable at the end of our scope we should flush
// our buffers and close connection cleanly.
private static NatsOpts opts = NatsOpts.Default with
{
Url = Url,
Name = "Spiral-503-Example",
AuthOpts = new NatsAuthOpts() { Username = "admin", Password = "admin" },
LoggerFactory = new NullLoggerFactory(),//new NatsNetLoggerFactory(LogLevel.Trace),
RequestTimeout = TimeSpan.FromSeconds(15),
PingInterval = TimeSpan.FromSeconds(1800),
MaxPingOut = 100
};
private static NatsConnection _connection = null;
static async Task Main(string[] args)
{
Activity.DefaultIdFormat = ActivityIdFormat.W3C;
Activity.ForceDefaultIdFormat = true;
var consumerMsgNextStatement = $"{ConsumerNextVerb}.{_consumer.Info.StreamName}.{_consumer.Info.Name}";
ActivitySource.AddActivityListener(new ActivityListener()
{
ShouldListenTo = (source) => string.Equals(source.Name, NatsActivitySource, StringComparison.OrdinalIgnoreCase),
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
ActivityStarted = activity => LogActivityStarted(activity, consumerMsgNextStatement, _consumer.Info.Name),
ActivityStopped = _ => { },
});
ConfigureLog4Net();
_task = Task.Run(RunMessageLoop);
await _task;
}
private static void LogActivityStarted(Activity activity, string consumerMsgNextStatement, string consumerName)
{
if (activity.OperationName == $"$JS.API {OpPub}" && activity.Tags.Any(t => t is { Key: OpKey, Value: OpPub })
&& activity.Tags.Any(t => t.Key == DestName && t.Value == consumerMsgNextStatement))
{
var tag = activity.Tags.First(t => t.Key == DestName);
Log.Info($"Issuing another pull request for the consumer, Consumer={consumerName}, Destination={tag.Value}");
}
}
private async static Task RunMessageLoop()
{
_cts = new CancellationTokenSource(TimeSpan.FromMinutes(20));
var opts = new NatsJSConsumeOpts
{
Expires = TimeSpan.FromSeconds(30),
IdleHeartbeat = TimeSpan.FromSeconds(15),
MaxMsgs = 1000
};
await foreach (var msg in _consumer.ConsumeAsync<string>(opts: opts, cancellationToken: _cts.Token))
{
await Task.Delay(TimeSpan.FromSeconds(10));//emulate processing
await msg.AckAsync();
Console.WriteLine($"received msg on {msg.Subject} with data {msg.Data}");
}
}
static EphemeralConsumerTest()
{
_connection = new NatsConnection(opts);
_context = _connection.CreateJetStreamContext();
_consumer = CreateConsumer().GetAwaiter().GetResult();
}
private static async Task<INatsJSConsumer> CreateConsumer()
{
var consumerCfg = new ConsumerConfig()
{
FilterSubjects = ["TS.TEST.Incoming", "TS.TEST.Incoming.>"],
MemStorage = true,
InactiveThreshold = TimeSpan.FromSeconds(600),
Name = "Ephemeral1",
DurableName = null,
DeliverPolicy = ConsumerConfigDeliverPolicy.New,
MaxWaiting = 1000,
AckWait = TimeSpan.FromSeconds(30),
MaxAckPending = 1000,
MaxDeliver = 20
};
var _consumer = await _context.CreateOrUpdateConsumerAsync("TS-TEST", consumerCfg);
return _consumer;
}
static void ConfigureLog4Net()
{
var layout = new PatternLayout("[%date] %-5level - %message%newline");
var consoleAppender = new ConsoleAppender
{
Layout = layout,
Threshold = log4net.Core.Level.Info
};
layout.ActivateOptions();
consoleAppender.ActivateOptions();
var repository = LogManager.GetRepository(System.Reflection.Assembly.GetEntryAssembly());
BasicConfigurator.Configure(repository, consoleAppender);
}
}nats stream add 'TS-TEST' `
--storage=file `
--compression=none `
--replicas 1 `
--retention=limits `
--discard=old `
--max-age=2d `
--max-bytes=3GB `
--max-msg-size=32MB `
--max-msgs=20000 `
--subjects="TS.TEST.Incoming.>,TS.TEST.Incoming" `
--defaultsMetadata
Metadata
Assignees
Labels
No labels