Skip to content

Commit 54365e3

Browse files
authored
Change the cancellation detection strategy (#203)
* Change the cancellation detection strategy * Detect if task is cancelled due to background service shutdown * Bump version
1 parent a8609b9 commit 54365e3

File tree

9 files changed

+94
-65
lines changed

9 files changed

+94
-65
lines changed

src/ConductorSharp.Client/ConductorSharp.Client.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Authors>Codaxy</Authors>
77
<Company>Codaxy</Company>
88
<PackageId>ConductorSharp.Client</PackageId>
9-
<Version>3.4.0</Version>
9+
<Version>3.4.1</Version>
1010
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
1111
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
1212
<PackageTags>netflix;conductor</PackageTags>

src/ConductorSharp.Engine/Behaviors/RequestResponseLoggingBehavior.cs

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,20 @@
1111

1212
namespace ConductorSharp.Engine.Behaviors
1313
{
14+
// TODO: Consider removing this
1415
public class RequestResponseLoggingBehavior<TRequest, TResponse> : IPipelineBehavior<TRequest, TResponse>
1516
where TRequest : IRequest<TResponse>
1617
{
1718
private readonly ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> _logger;
19+
private readonly ConductorSharpExecutionContext _context;
1820

19-
public RequestResponseLoggingBehavior(ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> logger)
21+
public RequestResponseLoggingBehavior(
22+
ILogger<RequestResponseLoggingBehavior<TRequest, TResponse>> logger,
23+
ConductorSharpExecutionContext context
24+
)
2025
{
2126
_logger = logger;
27+
_context = context;
2228
}
2329

2430
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
@@ -52,14 +58,9 @@ public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TRe
5258

5359
return response;
5460
}
55-
catch (TaskCanceledException)
61+
catch (OperationCanceledException) when (_context.TaskId != null)
5662
{
57-
_logger.LogWarning(
58-
$"Request {{Request}} cancelled with payload {{@{requestName}}} and with id {{RequestId}}",
59-
requestName,
60-
request,
61-
requestId
62-
);
63+
// Simply rethrow and do not log in order for cancellation notifier to work
6364
throw;
6465
}
6566
catch (Exception exc)

src/ConductorSharp.Engine/ConductorSharp.Engine.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<Authors>Codaxy</Authors>
77
<Company>Codaxy</Company>
88
<PackageId>ConductorSharp.Engine</PackageId>
9-
<Version>3.4.0</Version>
9+
<Version>3.4.1</Version>
1010
<Description>Client library for Netflix Conductor, with some additional quality of life features.</Description>
1111
<RepositoryUrl>https://github.com/codaxy/conductor-sharp</RepositoryUrl>
1212
<PackageTags>netflix;conductor</PackageTags>

src/ConductorSharp.Engine/ExecutionManager.cs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,7 @@ private async Task PollAndHandle(TaskToWorker scheduledWorker, CancellationToken
140140
return;
141141
}
142142

143-
try
144-
{
145-
using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken);
146-
await ProcessPolledTask(pollResponse, workerId, scheduledWorker, tokenHolder.CancellationToken);
147-
}
148-
catch (TaskCanceledException)
149-
{
150-
_logger.LogWarning(
151-
"Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled",
152-
pollResponse.TaskDefName,
153-
pollResponse.TaskId,
154-
pollResponse.WorkflowType,
155-
pollResponse.WorkflowInstanceId
156-
);
157-
}
143+
await ProcessPolledTask(pollResponse, workerId, scheduledWorker, cancellationToken);
158144
}
159145

160146
private async Task ProcessPolledTask(
@@ -164,6 +150,8 @@ private async Task ProcessPolledTask(
164150
CancellationToken cancellationToken
165151
)
166152
{
153+
using var tokenHolder = _cancellationNotifier.GetCancellationToken(pollResponse.TaskId, cancellationToken);
154+
167155
try
168156
{
169157
if (!string.IsNullOrEmpty(pollResponse.ExternalInputPayloadStoragePath))
@@ -178,7 +166,7 @@ CancellationToken cancellationToken
178166
);
179167

180168
// TODO: iffy
181-
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, cancellationToken);
169+
var file = await _externalPayloadService.GetExternalStorageDataAsync(externalStorageLocation.Path, tokenHolder.CancellationToken);
182170

183171
using TextReader textReader = new StreamReader(file.Stream);
184172
var json = await textReader.ReadToEndAsync();
@@ -211,7 +199,7 @@ CancellationToken cancellationToken
211199
context.WorkerId = workerId;
212200
}
213201

214-
var response = await mediator.Send(inputData, cancellationToken);
202+
var response = await mediator.Send(inputData, tokenHolder.CancellationToken);
215203

216204
await _taskManager.UpdateAsync(
217205
new TaskResult
@@ -221,13 +209,28 @@ await _taskManager.UpdateAsync(
221209
OutputData = SerializationHelper.ObjectToDictionary(response, ConductorConstants.IoJsonSerializerSettings),
222210
WorkflowInstanceId = pollResponse.WorkflowInstanceId
223211
},
224-
cancellationToken
212+
tokenHolder.CancellationToken
225213
);
226214
}
227-
catch (TaskCanceledException)
215+
catch (OperationCanceledException) when (tokenHolder.IsCancellationRequestedByNotifier)
228216
{
229-
// Propagate this exception to outer handler
230-
throw;
217+
_logger.LogWarning(
218+
"Polled task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) is cancelled",
219+
pollResponse.TaskDefName,
220+
pollResponse.TaskId,
221+
pollResponse.WorkflowType,
222+
pollResponse.WorkflowInstanceId
223+
);
224+
}
225+
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested) // This is fine since we know cancellationToken comes from background service
226+
{
227+
_logger.LogWarning(
228+
"Cancelling task {Task}(id={TaskId}) of workflow {Workflow}(id={WorkflowId}) due to background service shutdown",
229+
pollResponse.TaskDefName,
230+
pollResponse.TaskId,
231+
pollResponse.WorkflowType,
232+
pollResponse.WorkflowInstanceId
233+
);
231234
}
232235
catch (Exception exception)
233236
{
@@ -256,10 +259,10 @@ await Task.WhenAll(
256259
OutputData = SerializationHelper.ObjectToDictionary(errorMessage, ConductorConstants.IoJsonSerializerSettings),
257260
WorkflowInstanceId = pollResponse?.WorkflowInstanceId
258261
},
259-
cancellationToken
262+
tokenHolder.CancellationToken
260263
),
261-
_taskManager.LogAsync(pollResponse.TaskId, exception.Message, cancellationToken),
262-
_taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, cancellationToken)
264+
_taskManager.LogAsync(pollResponse.TaskId, exception.Message, tokenHolder.CancellationToken),
265+
_taskManager.LogAsync(pollResponse.TaskId, exception.StackTrace, tokenHolder.CancellationToken)
263266
]
264267
);
265268
}

src/ConductorSharp.Engine/Interface/ICancellationNotifier.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ public interface ICancellationNotifier
88
public interface ICancellationTokenHolder : IDisposable
99
{
1010
CancellationToken CancellationToken { get; }
11+
bool IsCancellationRequestedByNotifier { get; }
1112
}
1213

1314
ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken);

src/ConductorSharp.Engine/Service/NoOpCancellationNotifier.cs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ internal class PassthroughCancellationTokenHolder(CancellationToken cancellation
1010
{
1111
public CancellationToken CancellationToken { get; } = cancellationToken;
1212

13+
public bool IsCancellationRequestedByNotifier => false;
14+
1315
public void Dispose() { }
1416
}
1517

src/ConductorSharp.KafkaCancellationNotifier/ConductorSharp.KafkaCancellationNotifier.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>net6.0</TargetFramework>
55
<ImplicitUsings>enable</ImplicitUsings>
66
<Nullable>enable</Nullable>
7-
<Version>3.4.0</Version>
7+
<Version>3.4.1</Version>
88
<Authors>Codaxy</Authors>
99
<Company>Codaxy</Company>
1010
</PropertyGroup>

src/ConductorSharp.KafkaCancellationNotifier/Service/KafkaCancellationNotifier.cs

Lines changed: 51 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ internal class CancellationTokenSourceHolder : ICancellationNotifier.ICancellati
1616

1717
public CancellationToken CancellationToken { get; }
1818

19+
public bool IsCancellationRequestedByNotifier => _notifier.IsCancellationRequested(_taskId);
20+
1921
public CancellationTokenSourceHolder(CancellationToken cancellationToken, string taskId, KafkaCancellationNotifier notifier)
2022
{
2123
CancellationToken = cancellationToken;
@@ -26,10 +28,21 @@ public CancellationTokenSourceHolder(CancellationToken cancellationToken, string
2628
public void Dispose() => _notifier.ClearTaskCts(_taskId);
2729
}
2830

31+
private class TaskCancellationInfo
32+
{
33+
public TaskCancellationInfo(CancellationTokenSource cancellationTokenSource)
34+
{
35+
CancellationTokenSource = cancellationTokenSource;
36+
}
37+
38+
public CancellationTokenSource CancellationTokenSource { get; }
39+
public bool IsCancellationRequested { get; set; }
40+
}
41+
2942
private readonly HashSet<string> _tasks;
3043
private readonly object _lock = new();
3144
private readonly ILogger<KafkaCancellationNotifier> _logger;
32-
private readonly Dictionary<string, CancellationTokenSource> _taskIdToCtsMap = new();
45+
private readonly Dictionary<string, TaskCancellationInfo> _taskIdToInfoMap = new();
3346

3447
public KafkaCancellationNotifier(IEnumerable<TaskToWorker> tasks, ILogger<KafkaCancellationNotifier> logger)
3548
{
@@ -39,8 +52,8 @@ public KafkaCancellationNotifier(IEnumerable<TaskToWorker> tasks, ILogger<KafkaC
3952

4053
public ICancellationNotifier.ICancellationTokenHolder GetCancellationToken(string taskId, CancellationToken engineCancellationToken)
4154
{
42-
var cts = CreateCts(taskId, engineCancellationToken);
43-
return new CancellationTokenSourceHolder(cts.Token, taskId, this);
55+
var token = CreateTaskCancellationInfoAndGetToken(taskId, engineCancellationToken);
56+
return new CancellationTokenSourceHolder(token, taskId, this);
4457
}
4558

4659
public void HandleKafkaEvent(TaskStatusModel taskStatusModel)
@@ -52,56 +65,65 @@ public void HandleKafkaEvent(TaskStatusModel taskStatusModel)
5265
)
5366
return;
5467

55-
var cts = GetCts(taskStatusModel.TaskId);
56-
if (cts is null)
68+
TryToCancelTask(taskStatusModel);
69+
}
70+
71+
private CancellationToken CreateTaskCancellationInfoAndGetToken(string taskId, CancellationToken engineCancellationToken = default)
72+
{
73+
CancellationToken token;
74+
75+
lock (_lock)
5776
{
58-
_logger.LogWarning(
59-
"Unable to cancel task {TaskId} of workflow {WorkflowId}",
60-
taskStatusModel.TaskId,
61-
taskStatusModel.WorkflowInstanceId
77+
var info = _taskIdToInfoMap[taskId] = new TaskCancellationInfo(
78+
CancellationTokenSource.CreateLinkedTokenSource(engineCancellationToken)
6279
);
63-
return;
80+
token = info.CancellationTokenSource.Token;
6481
}
6582

66-
cts.Cancel();
83+
return token;
6784
}
6885

69-
private CancellationTokenSource CreateCts(string taskId, CancellationToken engineCancellationToken = default)
86+
private void TryToCancelTask(TaskStatusModel taskStatusModel)
7087
{
71-
CancellationTokenSource cts;
72-
var stopwatch = Stopwatch.StartNew();
88+
TaskCancellationInfo? info;
7389

7490
lock (_lock)
7591
{
76-
cts = _taskIdToCtsMap[taskId] = CancellationTokenSource.CreateLinkedTokenSource(engineCancellationToken);
92+
info = _taskIdToInfoMap.GetValueOrDefault(taskStatusModel.TaskId);
7793
}
78-
_logger.LogDebug("CancellationTokenSource creation time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
7994

80-
return cts;
95+
if (info is null)
96+
{
97+
_logger.LogWarning(
98+
"Unable to cancel task {TaskId} of workflow {WorkflowId}",
99+
taskStatusModel.TaskId,
100+
taskStatusModel.WorkflowInstanceId
101+
);
102+
return;
103+
}
104+
105+
lock (_lock)
106+
{
107+
info.IsCancellationRequested = true;
108+
info.CancellationTokenSource.Cancel();
109+
}
81110
}
82111

83-
private CancellationTokenSource? GetCts(string taskId)
112+
private void ClearTaskCts(string taskId)
84113
{
85-
CancellationTokenSource? cts;
86-
var stopwatch = Stopwatch.StartNew();
87-
88114
lock (_lock)
89115
{
90-
cts = _taskIdToCtsMap.GetValueOrDefault(taskId);
116+
_taskIdToInfoMap[taskId].CancellationTokenSource.Dispose();
117+
_taskIdToInfoMap.Remove(taskId);
91118
}
92-
_logger.LogDebug("CancellationTokenSource get time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
93-
94-
return cts;
95119
}
96120

97-
private void ClearTaskCts(string taskId)
121+
private bool IsCancellationRequested(string taskId)
98122
{
99-
var stopwatch = Stopwatch.StartNew();
100123
lock (_lock)
101124
{
102-
_taskIdToCtsMap.Remove(taskId);
125+
return _taskIdToInfoMap[taskId].IsCancellationRequested;
103126
}
104-
_logger.LogDebug("CancellationTokenSource removal time {ElapsedMs}ms", stopwatch.ElapsedMilliseconds);
105127
}
106128
}
107129
}

src/ConductorSharp.Patterns/ConductorSharp.Patterns.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<GeneratePackageOnBuild>False</GeneratePackageOnBuild>
88
<Authors>Codaxy</Authors>
99
<Company>Codaxy</Company>
10-
<Version>3.4.0</Version>
10+
<Version>3.4.1</Version>
1111
</PropertyGroup>
1212

1313
<ItemGroup>

0 commit comments

Comments
 (0)