Skip to content

Commit cd9e1bd

Browse files
committed
Optimize workers
1 parent da36584 commit cd9e1bd

File tree

7 files changed

+161
-31
lines changed

7 files changed

+161
-31
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public long End(int jobs, string type)
126126

127127
public class Benchmark
128128
{
129-
private const int jobCount = 20000;
129+
private const int jobCount = 2000;
130130
private const int loopCount = 1000;
131131
private static JobScheduler jobScheduler = new();
132132

@@ -208,14 +208,16 @@ private static long BenchVector(bool useVector)
208208
{
209209
var timer = new JobTimer();
210210
var data = new VectorCalculationJob { a = new float[jobCount], b = new float[jobCount], result = new float[jobCount], Repetitions = 500 };
211+
var parentJob = jobScheduler.Schedule();
211212
for (var sindex = 0; sindex < loopCount; sindex++)
212213
{
213214
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, jobScheduler, 16, !useVector);
214215
job.CheckAndSplit();
216+
job.GetHandle().SetParent(parentJob);
215217
jobScheduler.Flush(job.GetHandle());
216-
jobScheduler.Wait(job.GetHandle());
217218
}
218-
219+
jobScheduler.Flush(parentJob);
220+
jobScheduler.Wait(parentJob);
219221
return timer.End(jobCount * loopCount, $"Use vector: {useVector}");
220222
}
221223

@@ -234,7 +236,7 @@ private static void Main(string[] args)
234236
// config = config.WithOptions(ConfigOptions.DisableOptimizationsValidator);
235237
// BenchmarkRunner.Run<JobSchedulerBenchmark>(config);
236238
// return;
237-
for (var i = 0;; i++)
239+
for (var i = 0; i < 20; i++)
238240
{
239241
// CorrectnessTestJob();
240242
// BenchB();
@@ -244,6 +246,8 @@ private static void Main(string[] args)
244246
var nonVectorized = BenchVector(false);
245247
Console.WriteLine($"Ratio {(double)nonVectorized / vectorized}");
246248
}
249+
jobScheduler.Dispose();
250+
247251
//using var jobScheduler = new JobScheduler();
248252

249253
// Spawn massive jobs and wait for finish

JobScheduler/JobScheduler.cs

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,14 +102,9 @@ public float CalculateThreadUsage()
102102

103103
private void FlushDirect(JobHandle job)
104104
{
105-
// Round Robin,
106-
var workerIndex = NextWorkerIndex;
107-
while (!Workers[workerIndex].Enqueue(job))
105+
while (!Worker.Enqueue(job))
108106
{
109-
NextWorkerIndex = (NextWorkerIndex + 1) % Workers.Count;
110107
}
111-
112-
NextWorkerIndex = (NextWorkerIndex + 1) % Workers.Count;
113108
}
114109

115110
/// <summary>
@@ -143,24 +138,28 @@ public void Wait(JobHandle job)
143138
{
144139
while (!job.IsFinished())
145140
{
146-
var jobWasFound = false;
147-
for (var i = 0; i < Workers.Count; i++)
141+
var nextJob = Worker.TryStealJobExternal(out var stolenJob);
142+
if (!nextJob)
148143
{
149-
var nextJob = Workers[i].Queue.TrySteal(out var stolenJob);
150-
if (!nextJob)
144+
for (var i = 0; i < Workers.Count; i++)
151145
{
152-
continue;
146+
if (nextJob)
147+
{
148+
break;
149+
}
150+
nextJob = Workers[i].Queue.TrySteal(out stolenJob);
153151
}
152+
}
154153

155-
jobWasFound = true;
154+
if (nextJob)
155+
{
156156
stolenJob.Job?.Execute();
157157
Finish(stolenJob);
158158
}
159159

160160
// Dont yield if you can find something to execute.
161-
if (!jobWasFound)
161+
if (!nextJob)
162162
{
163-
// Console.WriteLine($"Cpu usage: {CalculateThreadUsages()}");
164163
Thread.Yield();
165164
}
166165
}

JobScheduler/Utils/BulkQueue.cs

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace Schedulers.Utils;
4+
5+
/// <summary>
6+
/// Thread-safe bulk queue implementation.
7+
/// Provides around 10-20% performance in jobs compared to ordinary flush.
8+
/// </summary>
9+
public class BulkQueue<T>
10+
{
11+
public BulkQueue(int segmentCount, int segmentSize)
12+
{
13+
fullSegments = [];
14+
emptySegments = [];
15+
for (var i = 0; i < segmentCount; i++)
16+
{
17+
emptySegments.Add(new(segmentSize));
18+
}
19+
}
20+
21+
public struct Segment
22+
{
23+
private T[] items;
24+
private int index;
25+
26+
public Segment(int capacity)
27+
{
28+
items = new T[capacity];
29+
}
30+
31+
public bool Dequeue(out T item)
32+
{
33+
if (index == 0)
34+
{
35+
item = default!;
36+
return false;
37+
}
38+
39+
index--;
40+
item = items[index];
41+
items[index] = default!; // Clear the reference
42+
return true;
43+
}
44+
45+
public bool Enqueue(T item)
46+
{
47+
if (index >= items.Length)
48+
{
49+
return false; // Segment is full
50+
}
51+
52+
items[index] = item;
53+
index++;
54+
return true;
55+
}
56+
}
57+
58+
private ConcurrentBag<Segment> emptySegments = new();
59+
private ConcurrentBag<Segment> fullSegments = new();
60+
61+
public bool GetEmtySegment(out Segment segment)
62+
{
63+
return emptySegments.TryTake(out segment);
64+
}
65+
66+
public bool GetFullSegment(out Segment segment)
67+
{
68+
return fullSegments.TryTake(out segment);
69+
}
70+
71+
public void Return(Segment segment)
72+
{
73+
emptySegments.Add(segment);
74+
}
75+
76+
public void Enqueue(Segment segment)
77+
{
78+
fullSegments.Add(segment);
79+
}
80+
}

JobScheduler/Utils/IParallelJobProducer.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@ private ParallelJobProducer(T producer, JobScheduler scheduler, JobHandle parent
7575
_loopSize = loopSize;
7676
_onlySingle = onlySingle;
7777
_selfHandle = _scheduler.Schedule(this, parent);
78-
_scheduler.Flush(_selfHandle);
7978
}
8079

8180
/// <summary>
@@ -141,6 +140,10 @@ public bool CheckAndSplit()
141140
private void Split()
142141
{
143142
var childrenToSplitInto = CalculateChildrenToSplitInto();
143+
BulkQueue<JobHandle>.Segment segment;
144+
while (!Worker.GetEmptySegment(out segment))
145+
{
146+
}
144147
_selfHandle.Job = null;
145148
for (var i = 0; i < childrenToSplitInto; i++)
146149
{
@@ -151,8 +154,11 @@ private void Split()
151154
throw new($"Invalid range from {start} to {end}");
152155
}
153156

154-
_ = new ParallelJobProducer<T>(_producer, _scheduler, _selfHandle, start, end, _loopSize, _onlySingle);
157+
var pjob = new ParallelJobProducer<T>(_producer, _scheduler, _selfHandle, start, end, _loopSize, _onlySingle);
158+
segment.Enqueue(pjob._selfHandle);
159+
pjob._selfHandle.UnfinishedJobs--;
155160
}
161+
Worker.EnqueueFullSegment(segment);
156162
}
157163

158164
/// <summary>

JobScheduler/Utils/JobHandlePool.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ namespace Schedulers.Utils;
66
/// This <see cref="JobHandlePool"/> class
77
/// acts as a pool for <see cref="JobHandle"/> ids.
88
/// </summary>
9-
// This class somehow provides a boost of around 4.4m to 6.1m in the benchmark.
109
internal class JobHandlePool
1110
{
1211
// Handles are returned to the _freeHandles queue.
@@ -36,7 +35,7 @@ private void SwapQueues()
3635
/// <param name="size">Its maximum size.</param>
3736
public JobHandlePool(int size)
3837
{
39-
MainThreadHandleCutoff = (ushort)(size / 2);
38+
MainThreadHandleCutoff = (ushort)(size * 0.5f); // 80% of the handles are for the main thread.
4039
_mainThreadFreeHandles = new();
4140
_freeHandlesOtherThreads = new();
4241
_handleCache = new(MainThreadHandleCutoff);

JobScheduler/Utils/UnorderedQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace Schedulers.Utils;
55
public class UnorderedThreadSafeQueue<T>
66
{
77
private ConcurrentQueue<T> _queue = new();
8-
private const int maxItems = 512;
8+
private const int maxItems = 512 * 100;
99

1010
public UnorderedThreadSafeQueue()
1111
{

JobScheduler/Worker.cs

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
using Schedulers.Utils;
1+
using System.Runtime.InteropServices;
2+
using Schedulers.Utils;
23

34
namespace Schedulers;
45

@@ -11,15 +12,21 @@ internal class Worker
1112
private readonly int _workerId;
1213
private readonly Thread _thread;
1314

15+
/// <summary>
16+
/// Use a single combined <see cref="UnorderedThreadSafeQueue{T}"/> for all <see cref="Worker"/>s to enqueue <see cref="JobHandle"/>s.
17+
/// Because there really isn't performance advantage to having multiple queues for each <see cref="Worker"/>,
18+
/// </summary>
1419
private static readonly UnorderedThreadSafeQueue<JobHandle> _incomingQueue = new();
15-
private readonly WorkStealingDeque<JobHandle> _queue;
1620

21+
private readonly WorkStealingDeque<JobHandle> _queue;
1722
private readonly JobScheduler _jobScheduler;
1823
private volatile CancellationTokenSource _cancellationToken;
24+
private static readonly BulkQueue<JobHandle> _bulkQueue = new(1000, 128);
1925
public bool IsCurrentlyWorking { get; private set; } = false;
26+
2027
// use a high spin count to avoid sleeping the thread under variable load.
2128
// 2047 is the maximum value for a ManualResetEventSlim spin count.
22-
private readonly ManualResetEventSlim _workAvailable = new(false, 2047);
29+
private readonly static ManualResetEventSlim _workAvailable = new(false, 2047);
2330

2431
/// <summary>
2532
/// Creates a new <see cref="Worker"/>.
@@ -39,16 +46,25 @@ public Worker(JobScheduler jobScheduler, int id)
3946
_thread.Name = $"Arch Worker #{_workerId}";
4047
}
4148

42-
public bool Enqueue(JobHandle handle)
49+
public static bool Enqueue(JobHandle handle)
4350
{
4451
var result = _incomingQueue.TryEnqueue(handle);
45-
_workAvailable.Set();
4652
return result;
4753
}
4854

4955
public void ForceEnqueue(JobHandle handle)
5056
{
51-
_incomingQueue.ForceEnqueue(handle);
57+
_incomingQueue.ForceEnqueue(handle);
58+
}
59+
60+
public static bool GetEmptySegment(out BulkQueue<JobHandle>.Segment segment)
61+
{
62+
return _bulkQueue.GetEmtySegment(out segment);
63+
}
64+
65+
public static void EnqueueFullSegment(BulkQueue<JobHandle>.Segment segment)
66+
{
67+
_bulkQueue.Enqueue(segment);
5268
}
5369

5470
/// <summary>
@@ -81,7 +97,14 @@ public void Start()
8197
public void Stop()
8298
{
8399
_cancellationToken.Cancel();
84-
// _workAvailable.Set();
100+
_workAvailable.Set();
101+
}
102+
103+
public static bool TryStealJobExternal(out JobHandle job)
104+
{
105+
// job = new JobHandle();
106+
// return false;
107+
return _incomingQueue.TryDequeue(out job);
85108
}
86109

87110
/// <summary>
@@ -95,11 +118,29 @@ private void Run(CancellationToken token)
95118
{
96119
while (!token.IsCancellationRequested)
97120
{
121+
// it is faster to do it in a single threaded manner to lower the queue contention.
122+
if (_workerId == 0)
123+
{
124+
while (_bulkQueue.GetFullSegment(out var segment))
125+
{
126+
while (segment.Dequeue(out var item))
127+
{
128+
ForceEnqueue(item);
129+
}
130+
_bulkQueue.Return(segment);
131+
}
132+
}
133+
98134
IsCurrentlyWorking = true;
99135
var noWorkFound = true;
100136
// Pass jobs to the local queue
101137
while (_queue.Size() < 32 && _incomingQueue.TryDequeue(out var jobHandle))
102138
{
139+
if (_workerId == 0)
140+
{
141+
_workAvailable.Set();
142+
}
143+
103144
_queue.PushBottom(jobHandle);
104145
noWorkFound = false;
105146
}
@@ -134,6 +175,7 @@ private void Run(CancellationToken token)
134175
break;
135176
}
136177
}
178+
137179
if (noWorkFound)
138180
{
139181
IsCurrentlyWorking = false;
@@ -146,7 +188,7 @@ private void Run(CancellationToken token)
146188
else
147189
{
148190
_workAvailable.Reset();
149-
_workAvailable.Wait(1, token);
191+
_workAvailable.Wait(100, token);
150192
}
151193
}
152194
}

0 commit comments

Comments
 (0)