Skip to content

Commit 41ef639

Browse files
committed
Fix hierachy and add lots of other things
Add ability to wait a job handle Optimize workers Fix workers Fix parallel for jobs not being parallel Improve threads
1 parent eec44f8 commit 41ef639

18 files changed

+1050
-247
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 31 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,5 @@
11
using System.Diagnostics;
22
using System.Numerics;
3-
using BenchmarkDotNet.Jobs;
4-
using BenchmarkDotNet.Toolchains.CsProj;
5-
using BenchmarkDotNet.Toolchains.InProcess.NoEmit;
6-
using CommunityToolkit.HighPerformance;
73
using Schedulers;
84
using Schedulers.Benchmarks;
95
using Schedulers.Utils;
@@ -14,7 +10,6 @@ public struct EmptyJob : IJob
1410
{
1511
public void Execute()
1612
{
17-
1813
}
1914
}
2015

@@ -69,7 +64,7 @@ public HeavyCalculationJob(int first, int second)
6964

7065
public void Execute()
7166
{
72-
for (var i = 0; i < 100; i++)
67+
for (var i = 0; i < 1; i++)
7368
{
7469
_first = double.Sqrt(_second);
7570
_second = double.Sqrt(_first) + 1;
@@ -105,7 +100,7 @@ public void RunVectorized(int index, int end)
105100

106101
public void RunSingle(int index)
107102
{
108-
if (!acceptsNewEntries) throw new("Should not accept new entries");
103+
if (!acceptsNewEntries) throw new($"Should not accept new entries {index}");
109104
var newValue = Interlocked.Increment(ref total);
110105
// Console.WriteLine($" {index} {newValue}");
111106
}
@@ -131,24 +126,28 @@ public long End(int jobs, string type)
131126

132127
public class Benchmark
133128
{
134-
private const int jobCount = 200000;
135-
private const int loopCount = 100;
129+
private const int jobCount = 2000;
130+
private const int loopCount = 1000;
131+
private static JobScheduler jobScheduler = new();
136132

137133
private static void CorrectnessTestJob()
138134
{
139-
using var jobScheduler = new JobScheduler();
140135
var timer = new JobTimer();
141136
for (var sindex = 0; sindex < loopCount; sindex++)
142137
{
143138
TestCorrectnessJob.total = 0;
144139
TestCorrectnessJob.acceptsNewEntries = true;
145-
var job = new ParallelJobProducer<TestCorrectnessJob>(jobCount, new(), jobScheduler);
140+
var job = new ParallelJobProducer<TestCorrectnessJob>(0, jobCount, new(), jobScheduler);
141+
job.CheckAndSplit();
142+
jobScheduler.Flush(job.GetHandle());
146143
jobScheduler.Wait(job.GetHandle());
147-
TestCorrectnessJob.acceptsNewEntries = false;
148-
var expected = jobCount;
149-
if (TestCorrectnessJob.total != expected)
144+
// Thread.Sleep(1);
145+
// Console.WriteLine($"UnfinishedJobs {job.GetHandle().UnfinishedJobs} total {TestCorrectnessJob.total}");
146+
// TestCorrectnessJob.acceptsNewEntries = false;
147+
var total = TestCorrectnessJob.total;
148+
if (total != jobCount)
150149
{
151-
throw new($"{TestCorrectnessJob.total} != {expected}");
150+
throw new($"{total} != {jobCount}");
152151
}
153152
}
154153

@@ -166,8 +165,7 @@ private static void BenchB()
166165
{
167166
var job = new HeavyCalculationJob(index, index);
168167
var handle = jobScheduler.Schedule(job);
169-
handle.Parent = parentHandle.Index;
170-
handle.SetDependsOn(parentHandle);
168+
handle.SetParent(parentHandle);
171169
jobScheduler.Flush(handle);
172170
}
173171

@@ -184,7 +182,7 @@ private static void BenchC()
184182
var timer = new JobTimer();
185183
for (var sindex = 0; sindex < loopCount; sindex++)
186184
{
187-
var job = new ParallelJobProducer<HeavyCalculationJob>(jobCount, new(), jobScheduler);
185+
var job = new ParallelJobProducer<HeavyCalculationJob>(0, jobCount, new(), jobScheduler);
188186
jobScheduler.Wait(job.GetHandle());
189187
}
190188

@@ -206,22 +204,28 @@ private static void BenchD()
206204
timer.End(jobCount * loopCount, "Just Parallel.For");
207205
}
208206

209-
private static long BenchVector(bool dontUseVector)
207+
private static long BenchVector(bool useVector)
210208
{
211-
using var jobScheduler = new JobScheduler();
212209
var timer = new JobTimer();
213210
var data = new VectorCalculationJob { a = new float[jobCount], b = new float[jobCount], result = new float[jobCount], Repetitions = 500 };
211+
var parentJob = jobScheduler.Schedule();
214212
for (var sindex = 0; sindex < loopCount; sindex++)
215213
{
216-
var job = new ParallelJobProducer<VectorCalculationJob>(jobCount, data, jobScheduler, 16, !dontUseVector);
217-
jobScheduler.Wait(job.GetHandle());
214+
var job = new ParallelJobProducer<VectorCalculationJob>(0, jobCount, data, jobScheduler, 16, !useVector);
215+
job.CheckAndSplit();
216+
job.GetHandle().SetParent(parentJob);
217+
jobScheduler.Flush(job.GetHandle());
218218
}
219-
220-
return timer.End(jobCount * loopCount, $"Use vector: {!dontUseVector}");
219+
jobScheduler.Flush(parentJob);
220+
jobScheduler.Wait(parentJob);
221+
return timer.End(jobCount * loopCount, $"Use vector: {useVector}");
221222
}
222223

223224
private static void Main(string[] args)
224225
{
226+
// new JobHierarchyTest();
227+
// return;
228+
225229
// var config = DefaultConfig.Instance.AddJob(Job.Default
226230
// .WithWarmupCount(2)
227231
// .WithMinIterationCount(10)
@@ -232,7 +236,7 @@ private static void Main(string[] args)
232236
// config = config.WithOptions(ConfigOptions.DisableOptimizationsValidator);
233237
// BenchmarkRunner.Run<JobSchedulerBenchmark>(config);
234238
// return;
235-
for (var i = 0;; i++)
239+
for (var i = 0; i < 20; i++)
236240
{
237241
// CorrectnessTestJob();
238242
// BenchB();
@@ -242,6 +246,8 @@ private static void Main(string[] args)
242246
var nonVectorized = BenchVector(false);
243247
Console.WriteLine($"Ratio {(double)nonVectorized / vectorized}");
244248
}
249+
jobScheduler.Dispose();
250+
245251
//using var jobScheduler = new JobScheduler();
246252

247253
// Spawn massive jobs and wait for finish
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using Arch.Benchmarks;
2+
3+
namespace Schedulers.Benchmarks;
4+
5+
public class JobHierarchyTest
6+
{
7+
private class JobThing : IJob
8+
{
9+
public void Execute()
10+
{
11+
Interlocked.Increment(ref totalPassedJobs);
12+
}
13+
}
14+
15+
private int layers = 6;
16+
private int growSizePerLayer = 10;
17+
private int totalScheduledJobs = 0;
18+
private static int totalPassedJobs = 0;
19+
private JobScheduler jobScheduler = new();
20+
private JobThing jobThing = new();
21+
22+
public JobHierarchyTest()
23+
{
24+
// InvertedPyramidJobs();
25+
// InvertedPyramidJobs();
26+
// InvertedPyramidJobs();
27+
// InvertedPyramidJobs();
28+
for (int i = 0; i < 10; i++)
29+
{
30+
PyramidJobs();
31+
}
32+
33+
jobScheduler.Dispose();
34+
}
35+
36+
private void PyramidJobs()
37+
{
38+
totalScheduledJobs = 0;
39+
totalPassedJobs = 0;
40+
var timer = new JobTimer();
41+
var topJob = jobScheduler.Schedule();
42+
AddPyramidJobs(topJob, 0);
43+
jobScheduler.Flush(topJob);
44+
jobScheduler.Wait(topJob);
45+
timer.End(totalScheduledJobs, "PyramidJobs test");
46+
var passedExpected = (int)Math.Pow(growSizePerLayer, layers);
47+
Console.WriteLine($"Total jobs scheduled: {totalScheduledJobs}, total passed jobs: {totalPassedJobs}");
48+
if (totalPassedJobs != passedExpected)
49+
{
50+
throw new($"Total passed jobs {totalPassedJobs} does not match expected {passedExpected}.");
51+
}
52+
}
53+
54+
private void InvertedPyramidJobs()
55+
{
56+
totalScheduledJobs = 0;
57+
totalPassedJobs = 0;
58+
var timer = new JobTimer();
59+
var bottomJob = jobScheduler.Schedule();
60+
var topJob = jobScheduler.Schedule();
61+
AddInvertedPyramidJobs(bottomJob, 1, topJob);
62+
jobScheduler.Flush(bottomJob);
63+
jobScheduler.Flush(topJob);
64+
jobScheduler.Wait(topJob);
65+
timer.End(totalScheduledJobs, "InvertedPyramidJobs test");
66+
var passedExpected = (int)Math.Pow(growSizePerLayer, layers - 1);
67+
Console.WriteLine($"Total jobs scheduled: {totalScheduledJobs}, total passed jobs: {totalPassedJobs}");
68+
if (totalPassedJobs != passedExpected)
69+
{
70+
throw new($"Total passed jobs {totalPassedJobs} does not match expected {passedExpected}.");
71+
}
72+
}
73+
74+
private void AddPyramidJobs(JobHandle parent, int layer)
75+
{
76+
if (layer >= layers)
77+
{
78+
return;
79+
}
80+
81+
for (var i = 0; i < growSizePerLayer; i++)
82+
{
83+
var isNotLastLayer = layer + 1 < layers;
84+
var handle = jobScheduler.Schedule(isNotLastLayer ? null : jobThing, parent);
85+
AddPyramidJobs(handle, layer + 1);
86+
jobScheduler.Flush(handle);
87+
totalScheduledJobs++;
88+
}
89+
}
90+
91+
private void AddInvertedPyramidJobs(JobHandle source, int layer, JobHandle topJob)
92+
{
93+
if (layer >= layers)
94+
{
95+
return;
96+
}
97+
98+
for (var i = 0; i < growSizePerLayer; i++)
99+
{
100+
var isLastLayer = layer + 1 == layers;
101+
var target = jobScheduler.Schedule(isLastLayer ? jobThing : null);
102+
target.SetDependsOn(source);
103+
target.SetParent(topJob);
104+
AddInvertedPyramidJobs(target, layer + 1, topJob);
105+
jobScheduler.Flush(target);
106+
totalScheduledJobs++;
107+
}
108+
}
109+
}

JobScheduler.Benchmarks/JobSchedulerBenchmark.cs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
using Arch.Benchmarks;
2-
using CommunityToolkit.HighPerformance;
1+
using CommunityToolkit.HighPerformance;
32
using Schedulers.Utils;
43

54
namespace Schedulers.Benchmarks;
@@ -9,6 +8,7 @@ public struct CalculationJob : IJob
98
private int _first;
109
private int _second;
1110
public static int _result;
11+
1212
public CalculationJob(int first, int second)
1313
{
1414
_first = first;
@@ -35,8 +35,8 @@ public class JobSchedulerBenchmark
3535
[IterationSetup]
3636
public void Setup()
3737
{
38-
_jobScheduler = new JobScheduler();
39-
_jobHandles = new List<JobHandle>(Jobs);
38+
_jobScheduler = new();
39+
_jobHandles = new(Jobs);
4040
}
4141

4242
[IterationCleanup]

JobScheduler.Test/BenchmarkTests/GraphGeneratorTests.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
using System.Diagnostics;
2-
3-
namespace Schedulers.Test;
1+
namespace Schedulers.Test;
42

53
/*
64
[TestFixture]

JobScheduler.Test/JobSchedulerTests.cs

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public void Flush_ParentWithChild()
5353
jobScheduler.Flush(handle1);
5454
jobScheduler.Flush(handle2);
5555

56-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
57-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
56+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(50));
57+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(50));
5858

5959
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
6060
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -80,8 +80,8 @@ public void Flush_Child()
8080

8181
jobScheduler.Flush(handle2);
8282

83-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
84-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
83+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
84+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
8585

8686
IsFalse(job1CompletedFlag, "Job1 did not complete in time.");
8787
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -111,8 +111,8 @@ public void Wait_ParentWithChild()
111111
// Waits on handle1 to ensure both handles ran
112112
jobScheduler.Wait(handle1);
113113

114-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
115-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
114+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
115+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
116116

117117
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
118118
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -142,8 +142,8 @@ public void Wait_All_ParentWithChild()
142142
// Waits on handle1 to ensure both handles ran
143143
jobScheduler.Wait(handle1, handle2);
144144

145-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
146-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
145+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
146+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
147147

148148
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
149149
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -161,17 +161,59 @@ public void Flush_Dependencies()
161161
var job1 = new TestJob(1, () => { job1Completed.Set(); });
162162
var job2 = new TestJob(2, () => { job2Completed.Set(); });
163163

164-
// Job2 should finish after Job1 since its his child.
164+
// Job2 should finish after Job1 since it's his child.
165165
var handle1 = jobScheduler.Schedule(job1);
166166
var handle2 = jobScheduler.Schedule(job2);
167167
jobScheduler.AddDependency(handle2, handle1);
168168

169169
jobScheduler.Flush(handle1);
170+
jobScheduler.Flush(handle2);
170171

171-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
172-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(10));
172+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
173+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(10));
173174

174175
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
175176
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
176177
}
178+
179+
[Test]
180+
public void OneMoreDependencyThanNodeLimitSize()
181+
{
182+
using var jobScheduler = new JobScheduler();
183+
var sourceJob = jobScheduler.Schedule();
184+
var targetJob = jobScheduler.Schedule();
185+
for (var i = 0; i < 9; i++)
186+
{
187+
var midJob = jobScheduler.Schedule();
188+
targetJob.SetDependsOn(midJob);
189+
midJob.SetDependsOn(sourceJob);
190+
jobScheduler.Flush(midJob);
191+
}
192+
193+
jobScheduler.Flush(targetJob);
194+
jobScheduler.Flush(sourceJob);
195+
jobScheduler.Wait(targetJob);
196+
}
197+
198+
[Test]
199+
public void ManyMoreDependencyThanNodeLimitSize()
200+
{
201+
using var jobScheduler = new JobScheduler();
202+
Parallel.For(0, 100, _ =>
203+
{
204+
var sourceJob = jobScheduler.Schedule();
205+
var targetJob = jobScheduler.Schedule();
206+
for (var i = 0; i < 90; i++)
207+
{
208+
var midJob = jobScheduler.Schedule();
209+
targetJob.SetDependsOn(midJob);
210+
midJob.SetDependsOn(sourceJob);
211+
jobScheduler.Flush(midJob);
212+
}
213+
214+
jobScheduler.Flush(targetJob);
215+
jobScheduler.Flush(sourceJob);
216+
jobScheduler.Wait(targetJob);
217+
});
218+
}
177219
}

0 commit comments

Comments
 (0)