Skip to content

Commit da36584

Browse files
committed
Fix workers
1 parent 3fab45c commit da36584

File tree

3 files changed

+40
-39
lines changed

3 files changed

+40
-39
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -126,12 +126,12 @@ public long End(int jobs, string type)
126126

127127
public class Benchmark
128128
{
129-
private const int jobCount = 2000;
130-
private const int loopCount = 100;
129+
private const int jobCount = 20000;
130+
private const int loopCount = 1000;
131+
private static JobScheduler jobScheduler = new();
131132

132133
private static void CorrectnessTestJob()
133134
{
134-
using var jobScheduler = new JobScheduler();
135135
var timer = new JobTimer();
136136
for (var sindex = 0; sindex < loopCount; sindex++)
137137
{
@@ -206,12 +206,12 @@ private static void BenchD()
206206

207207
private static long BenchVector(bool useVector)
208208
{
209-
using var jobScheduler = new JobScheduler();
210209
var timer = new JobTimer();
211210
var data = new VectorCalculationJob { a = new float[jobCount], b = new float[jobCount], result = new float[jobCount], Repetitions = 500 };
212211
for (var sindex = 0; sindex < loopCount; sindex++)
213212
{
214213
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, jobScheduler, 16, !useVector);
214+
job.CheckAndSplit();
215215
jobScheduler.Flush(job.GetHandle());
216216
jobScheduler.Wait(job.GetHandle());
217217
}
@@ -221,8 +221,8 @@ private static long BenchVector(bool useVector)
221221

222222
private static void Main(string[] args)
223223
{
224-
new JobHierarchyTest();
225-
return;
224+
// new JobHierarchyTest();
225+
// return;
226226

227227
// var config = DefaultConfig.Instance.AddJob(Job.Default
228228
// .WithWarmupCount(2)

JobScheduler/JobScheduler.cs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -83,37 +83,33 @@ public void AddDependency(JobHandle dependency, JobHandle dependOn)
8383
dependency.SetDependsOn(dependOn);
8484
}
8585

86-
public float CalculateThreadUsages()
86+
/// <summary>
87+
/// A function that calculates the current thread usage of the <see cref="Workers"/>.
88+
/// It is very simple and just counts the amount of <see cref="Worker"/>s that are currently working.
89+
/// </summary>
90+
/// <returns>a value between 0 - 1</returns>
91+
public float CalculateThreadUsage()
8792
{
8893
var total = 0f;
8994
foreach (var worker in Workers)
9095
{
91-
total += worker.IsCurentlyWorking ? 1 : 0;
96+
total += worker.IsCurrentlyWorking ? 1 : 0;
9297
}
9398

99+
total /= Workers.Count;
94100
return total;
95101
}
96102

97103
private void FlushDirect(JobHandle job)
98104
{
99-
var workerIndex = NextWorkerIndex;
100105
// Round Robin,
101-
var failsBeforeForce = Workers.Count - 1; // We will try to enqueue the job on all workers before we force it.
106+
var workerIndex = NextWorkerIndex;
102107
while (!Workers[workerIndex].Enqueue(job))
103108
{
104109
NextWorkerIndex = (NextWorkerIndex + 1) % Workers.Count;
105-
failsBeforeForce--;
106-
if (failsBeforeForce == 0)
107-
{
108-
break;
109-
}
110110
}
111111

112-
// Force it on whatever worker we are on if we failed to enqueue it on all workers.
113-
if (failsBeforeForce == 0)
114-
{
115-
Workers[workerIndex].ForceEnqueue(job);
116-
}
112+
NextWorkerIndex = (NextWorkerIndex + 1) % Workers.Count;
117113
}
118114

119115
/// <summary>

JobScheduler/Worker.cs

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ internal class Worker
1111
private readonly int _workerId;
1212
private readonly Thread _thread;
1313

14-
private readonly UnorderedThreadSafeQueue<JobHandle> _incomingQueue;
14+
private static readonly UnorderedThreadSafeQueue<JobHandle> _incomingQueue = new();
1515
private readonly WorkStealingDeque<JobHandle> _queue;
1616

1717
private readonly JobScheduler _jobScheduler;
1818
private volatile CancellationTokenSource _cancellationToken;
19-
public bool IsCurentlyWorking { get; private set; } = false;
20-
private readonly SemaphoreSlim _workAvailable = new(0);
19+
public bool IsCurrentlyWorking { get; private set; } = false;
20+
// use a high spin count to avoid sleeping the thread under variable load.
21+
// 2047 is the maximum value for a ManualResetEventSlim spin count.
22+
private readonly ManualResetEventSlim _workAvailable = new(false, 2047);
2123

2224
/// <summary>
2325
/// Creates a new <see cref="Worker"/>.
@@ -28,7 +30,6 @@ public Worker(JobScheduler jobScheduler, int id)
2830
{
2931
_workerId = id;
3032

31-
_incomingQueue = new();
3233
_queue = new(32);
3334

3435
_jobScheduler = jobScheduler;
@@ -41,7 +42,7 @@ public Worker(JobScheduler jobScheduler, int id)
4142
public bool Enqueue(JobHandle handle)
4243
{
4344
var result = _incomingQueue.TryEnqueue(handle);
44-
_workAvailable.Release();
45+
_workAvailable.Set();
4546
return result;
4647
}
4748

@@ -94,11 +95,13 @@ private void Run(CancellationToken token)
9495
{
9596
while (!token.IsCancellationRequested)
9697
{
97-
IsCurentlyWorking = true;
98+
IsCurrentlyWorking = true;
99+
var noWorkFound = true;
98100
// Pass jobs to the local queue
99101
while (_queue.Size() < 32 && _incomingQueue.TryDequeue(out var jobHandle))
100102
{
101103
_queue.PushBottom(jobHandle);
104+
noWorkFound = false;
102105
}
103106

104107
// Process job in own queue
@@ -107,6 +110,7 @@ private void Run(CancellationToken token)
107110
{
108111
job.Job?.Execute();
109112
_jobScheduler.Finish(job);
113+
noWorkFound = false;
110114
}
111115
else
112116
{
@@ -126,22 +130,23 @@ private void Run(CancellationToken token)
126130

127131
job.Job?.Execute();
128132
_jobScheduler.Finish(job);
133+
noWorkFound = false;
129134
break;
130135
}
131-
132-
if (!exists)
136+
}
137+
if (noWorkFound)
138+
{
139+
IsCurrentlyWorking = false;
140+
// Keep a small fleet of threads always ready to work.
141+
// But don't use all the cpu power on yields.
142+
if (_workerId == 0)
133143
{
134-
IsCurentlyWorking = false;
135-
// Keep a small fleet of threads always ready to work.
136-
// But don't use all the cpu power on yields.
137-
if (_workerId < 3)
138-
{
139-
Thread.Yield();
140-
}
141-
else
142-
{
143-
_workAvailable.Wait(token);
144-
}
144+
Thread.Yield();
145+
}
146+
else
147+
{
148+
_workAvailable.Reset();
149+
_workAvailable.Wait(1, token);
145150
}
146151
}
147152
}

0 commit comments

Comments
 (0)