Skip to content

Commit f077529

Browse files
bschwehnBenjamin Schwehn
andauthored
SubscribeFilter Include MediumMessage in ConsumerContext (#1464)
By including the MediumMessage, we give consumers more options for handling in the subscribe filter, for example to implement rules based on the number of retries already done, or to set disable retrying a message under some circumstances. Co-authored-by: Benjamin Schwehn <[email protected]>
1 parent fa7d77a commit f077529

File tree

4 files changed

+17
-7
lines changed

4 files changed

+17
-7
lines changed

src/DotNetCore.CAP/Internal/ConsumerContext.cs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using DotNetCore.CAP.Messages;
6+
using DotNetCore.CAP.Persistence;
67

78
namespace DotNetCore.CAP.Internal;
89

@@ -12,7 +13,7 @@ namespace DotNetCore.CAP.Internal;
1213
public class ConsumerContext
1314
{
1415
public ConsumerContext(ConsumerContext context)
15-
: this(context.ConsumerDescriptor, context.DeliverMessage)
16+
: this(context.ConsumerDescriptor, context.MediumMessage)
1617
{
1718
}
1819

@@ -21,10 +22,10 @@ public ConsumerContext(ConsumerContext context)
2122
/// </summary>
2223
/// <param name="descriptor">consumer method descriptor. </param>
2324
/// <param name="message"> received message.</param>
24-
public ConsumerContext(ConsumerExecutorDescriptor descriptor, Message message)
25+
public ConsumerContext(ConsumerExecutorDescriptor descriptor, MediumMessage message)
2526
{
2627
ConsumerDescriptor = descriptor ?? throw new ArgumentNullException(nameof(descriptor));
27-
DeliverMessage = message ?? throw new ArgumentNullException(nameof(message));
28+
MediumMessage = message ?? throw new ArgumentNullException(nameof(message));
2829
}
2930

3031
/// <summary>
@@ -35,5 +36,10 @@ public ConsumerContext(ConsumerExecutorDescriptor descriptor, Message message)
3536
/// <summary>
3637
/// consumer received message.
3738
/// </summary>
38-
public Message DeliverMessage { get; }
39+
public Message DeliverMessage => MediumMessage.Origin;
40+
41+
/// <summary>
42+
/// consumer received medium message.
43+
/// </summary>
44+
public MediumMessage MediumMessage { get; }
3945
}

src/DotNetCore.CAP/Internal/ISubscribeExector.Default.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ private bool UpdateMessageForRetry(MediumMessage message)
175175
private async Task InvokeConsumerMethodAsync(MediumMessage message, ConsumerExecutorDescriptor descriptor,
176176
CancellationToken cancellationToken)
177177
{
178-
var consumerContext = new ConsumerContext(descriptor, message.Origin);
178+
var consumerContext = new ConsumerContext(descriptor, message);
179179
var tracingTimestamp = TracingBefore(message.Origin, descriptor.MethodInfo);
180180
try
181181
{

test/DotNetCore.CAP.Test/SubscribeInvokerTest.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Threading.Tasks;
55
using DotNetCore.CAP.Internal;
66
using DotNetCore.CAP.Messages;
7+
using DotNetCore.CAP.Persistence;
78
using DotNetCore.CAP.Serialization;
89
using Microsoft.Extensions.DependencyInjection;
910
using Xunit;
@@ -45,7 +46,8 @@ public async Task InvokeTest()
4546
[Headers.MessageName] = "fake.output.integer"
4647
};
4748
var message = new Message(header, null);
48-
var context = new ConsumerContext(descriptor, message);
49+
var mediumMessage = new MediumMessage() { Origin = message };
50+
var context = new ConsumerContext(descriptor, mediumMessage);
4951

5052
var ret = await SubscribeInvoker.InvokeAsync(context);
5153
Assert.Equal(int.MaxValue, ret.Result);

test/DotNetCore.CAP.Test/SubscribeInvokerWithCancellation.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading.Tasks;
66
using DotNetCore.CAP.Internal;
77
using DotNetCore.CAP.Messages;
8+
using DotNetCore.CAP.Persistence;
89
using DotNetCore.CAP.Serialization;
910
using Microsoft.Extensions.DependencyInjection;
1011
using Xunit;
@@ -55,7 +56,8 @@ public async Task InvokeTest()
5556
[Headers.MessageName] = "fake.output.withcancellation"
5657
};
5758
var message = new Message(header, null);
58-
var context = new ConsumerContext(descriptor, message);
59+
var mediumMessage = new MediumMessage() { Origin = message };
60+
var context = new ConsumerContext(descriptor, mediumMessage);
5961

6062
var cancellationToken = new CancellationToken();
6163
var ret = await SubscribeInvoker.InvokeAsync(context, cancellationToken);

0 commit comments

Comments
 (0)