Skip to content

Add Test #933

@mtmk

Description

@mtmk

#927 (comment)

using NATS.Client.Core;
using NATS.Client.JetStream;
using log4net;
using log4net.Appender;
using log4net.Config;
using log4net.Layout;
using NATS.Client.JetStream.Models;
using NATS.Net;
using System.Diagnostics;
using Microsoft.Extensions.Logging.Abstractions;

public class EphemeralConsumerTest
{
    private static readonly ILog Log = LogManager.GetLogger(typeof(EphemeralConsumerTest));
    private static INatsJSConsumer _consumer;
    private static INatsJSContext _context;
    private const string Url = "127.0.0.1:4222";
    private static CancellationTokenSource _cts = null;
    private static Task _task = null;

    private const string DestName = "messaging.destination.name";
    private const string OpKey = "messaging.operation";
    private const string OpPub = "publish";
    private const string ConsumerNextVerb = "$JS.API.CONSUMER.MSG.NEXT";
    private const string NatsActivitySource = "NATS.Net";
    // Connect to NATS server. Since connection is disposable at the end of our scope we should flush

    // our buffers and close connection cleanly.
    private static NatsOpts opts = NatsOpts.Default with
    {
        Url = Url,
        Name = "Spiral-503-Example",
        AuthOpts = new NatsAuthOpts() { Username = "admin", Password = "admin" },
        LoggerFactory = new NullLoggerFactory(),//new NatsNetLoggerFactory(LogLevel.Trace),
        RequestTimeout = TimeSpan.FromSeconds(15),
        PingInterval = TimeSpan.FromSeconds(1800),
        MaxPingOut = 100
    };

    private static NatsConnection _connection = null;

    static async Task Main(string[] args)
    {
        Activity.DefaultIdFormat = ActivityIdFormat.W3C;
        Activity.ForceDefaultIdFormat = true;
        
        var consumerMsgNextStatement = $"{ConsumerNextVerb}.{_consumer.Info.StreamName}.{_consumer.Info.Name}";

        ActivitySource.AddActivityListener(new ActivityListener()
        {
            ShouldListenTo = (source) => string.Equals(source.Name, NatsActivitySource, StringComparison.OrdinalIgnoreCase),
            Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllDataAndRecorded,
            ActivityStarted = activity => LogActivityStarted(activity, consumerMsgNextStatement, _consumer.Info.Name),
            ActivityStopped = _ => { },
        });

        ConfigureLog4Net();
       
        _task = Task.Run(RunMessageLoop);
        await _task;
    }

    private static void LogActivityStarted(Activity activity, string consumerMsgNextStatement, string consumerName)
    {
        if (activity.OperationName == $"$JS.API {OpPub}" && activity.Tags.Any(t => t is { Key: OpKey, Value: OpPub })
            && activity.Tags.Any(t => t.Key == DestName && t.Value == consumerMsgNextStatement))
        {
            var tag = activity.Tags.First(t => t.Key == DestName);
            Log.Info($"Issuing another pull request for the consumer, Consumer={consumerName}, Destination={tag.Value}");
        }
    }

    private async static Task RunMessageLoop()
    {
        _cts = new CancellationTokenSource(TimeSpan.FromMinutes(20));
        var opts = new NatsJSConsumeOpts
        {
            Expires = TimeSpan.FromSeconds(30),
            IdleHeartbeat = TimeSpan.FromSeconds(15),
            MaxMsgs = 1000
        };

        await foreach (var msg in _consumer.ConsumeAsync<string>(opts: opts, cancellationToken: _cts.Token))
        {
            await Task.Delay(TimeSpan.FromSeconds(10));//emulate processing
            await msg.AckAsync();
            Console.WriteLine($"received msg on {msg.Subject} with data {msg.Data}");
        }
    }

    static EphemeralConsumerTest()
    {
        _connection = new NatsConnection(opts);
        _context = _connection.CreateJetStreamContext();
        _consumer = CreateConsumer().GetAwaiter().GetResult();
    }

    private static async Task<INatsJSConsumer> CreateConsumer()
    {
        var consumerCfg = new ConsumerConfig()
        {
            FilterSubjects = ["TS.TEST.Incoming", "TS.TEST.Incoming.>"],
            MemStorage = true,
            InactiveThreshold = TimeSpan.FromSeconds(600),
            Name = "Ephemeral1",
            DurableName = null,
            DeliverPolicy = ConsumerConfigDeliverPolicy.New,
            MaxWaiting = 1000,
            AckWait = TimeSpan.FromSeconds(30),
            MaxAckPending = 1000,
            MaxDeliver = 20
        };

        var _consumer = await _context.CreateOrUpdateConsumerAsync("TS-TEST", consumerCfg);

        return _consumer;
    }

    static void ConfigureLog4Net()
    {
        var layout = new PatternLayout("[%date] %-5level - %message%newline");

        var consoleAppender = new ConsoleAppender
        {
            Layout = layout,
            Threshold = log4net.Core.Level.Info
        };
        layout.ActivateOptions();
        consoleAppender.ActivateOptions();

        var repository = LogManager.GetRepository(System.Reflection.Assembly.GetEntryAssembly());
        BasicConfigurator.Configure(repository, consoleAppender);
    }
}
nats stream add 'TS-TEST' `
 --storage=file `
 --compression=none `
 --replicas 1 `
 --retention=limits `
 --discard=old `
 --max-age=2d `
 --max-bytes=3GB `
 --max-msg-size=32MB `
 --max-msgs=20000 `
 --subjects="TS.TEST.Incoming.>,TS.TEST.Incoming" `
 --defaults

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions