Skip to content

Commit 5d5043e

Browse files
committed
Fix hierachy
1 parent eec44f8 commit 5d5043e

15 files changed

+717
-176
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public void RunVectorized(int index, int end)
105105

106106
public void RunSingle(int index)
107107
{
108-
if (!acceptsNewEntries) throw new("Should not accept new entries");
108+
if (!acceptsNewEntries) throw new($"Should not accept new entries {index}");
109109
var newValue = Interlocked.Increment(ref total);
110110
// Console.WriteLine($" {index} {newValue}");
111111
}
@@ -144,11 +144,13 @@ private static void CorrectnessTestJob()
144144
TestCorrectnessJob.acceptsNewEntries = true;
145145
var job = new ParallelJobProducer<TestCorrectnessJob>(jobCount, new(), jobScheduler);
146146
jobScheduler.Wait(job.GetHandle());
147-
TestCorrectnessJob.acceptsNewEntries = false;
148-
var expected = jobCount;
149-
if (TestCorrectnessJob.total != expected)
147+
// Thread.Sleep(1);
148+
// Console.WriteLine($"UnfinishedJobs {job.GetHandle().UnfinishedJobs} total {TestCorrectnessJob.total}");
149+
// TestCorrectnessJob.acceptsNewEntries = false;
150+
var total = TestCorrectnessJob.total;
151+
if (total != jobCount)
150152
{
151-
throw new($"{TestCorrectnessJob.total} != {expected}");
153+
throw new($"{total} != {jobCount}");
152154
}
153155
}
154156

@@ -167,7 +169,7 @@ private static void BenchB()
167169
var job = new HeavyCalculationJob(index, index);
168170
var handle = jobScheduler.Schedule(job);
169171
handle.Parent = parentHandle.Index;
170-
handle.SetDependsOn(parentHandle);
172+
handle.HasParent(parentHandle);
171173
jobScheduler.Flush(handle);
172174
}
173175

@@ -214,6 +216,7 @@ private static long BenchVector(bool dontUseVector)
214216
for (var sindex = 0; sindex < loopCount; sindex++)
215217
{
216218
var job = new ParallelJobProducer<VectorCalculationJob>(jobCount, data, jobScheduler, 16, !dontUseVector);
219+
jobScheduler.Flush(job.GetHandle());
217220
jobScheduler.Wait(job.GetHandle());
218221
}
219222

@@ -222,6 +225,9 @@ private static long BenchVector(bool dontUseVector)
222225

223226
private static void Main(string[] args)
224227
{
228+
new JobHierarchyTest();
229+
return;
230+
225231
// var config = DefaultConfig.Instance.AddJob(Job.Default
226232
// .WithWarmupCount(2)
227233
// .WithMinIterationCount(10)
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
using System.Diagnostics;
2+
using Arch.Benchmarks;
3+
4+
namespace Schedulers.Benchmarks;
5+
6+
public class JobHierarchyTest
7+
{
8+
private class JobThing : IJob
9+
{
10+
public void Execute()
11+
{
12+
Interlocked.Increment(ref totalPassedJobs);
13+
}
14+
}
15+
16+
private int layers = 5;
17+
private int growSizePerLayer = 10;
18+
private int totalScheduledJobs = 0;
19+
private static int totalPassedJobs = 0;
20+
private JobScheduler jobScheduler = new();
21+
private JobThing jobThing = new();
22+
23+
public JobHierarchyTest()
24+
{
25+
InvertedPyramidJobs();
26+
InvertedPyramidJobs();
27+
InvertedPyramidJobs();
28+
InvertedPyramidJobs();
29+
PyramidJobs();
30+
PyramidJobs();
31+
PyramidJobs();
32+
PyramidJobs();
33+
jobScheduler.Dispose();
34+
}
35+
36+
private void PyramidJobs()
37+
{
38+
totalScheduledJobs = 0;
39+
totalPassedJobs = 0;
40+
var timer = new JobTimer();
41+
var topJob = jobScheduler.Schedule();
42+
AddPyramidJobs(topJob, 0);
43+
jobScheduler.Flush(topJob);
44+
jobScheduler.Wait(topJob);
45+
timer.End(totalScheduledJobs, "PyramidJobs test");
46+
var passedExpected = (int)Math.Pow(growSizePerLayer, layers);
47+
Console.WriteLine($"Total jobs scheduled: {totalScheduledJobs}, total passed jobs: {totalPassedJobs}");
48+
if (totalPassedJobs != passedExpected)
49+
{
50+
throw new($"Total passed jobs {totalPassedJobs} does not match expected {passedExpected}.");
51+
}
52+
}
53+
54+
private void InvertedPyramidJobs()
55+
{
56+
totalScheduledJobs = 0;
57+
totalPassedJobs = 0;
58+
var timer = new JobTimer();
59+
var bottomJob = jobScheduler.Schedule();
60+
var topJob = jobScheduler.Schedule();
61+
AddInvertedPyramidJobs(bottomJob, 1, topJob);
62+
jobScheduler.Flush(bottomJob);
63+
jobScheduler.Flush(topJob);
64+
jobScheduler.Wait(topJob);
65+
timer.End(totalScheduledJobs, "InvertedPyramidJobs test");
66+
var passedExpected = (int)Math.Pow(growSizePerLayer, layers - 1);
67+
Console.WriteLine($"Total jobs scheduled: {totalScheduledJobs}, total passed jobs: {totalPassedJobs}");
68+
if (totalPassedJobs != passedExpected)
69+
{
70+
throw new($"Total passed jobs {totalPassedJobs} does not match expected {passedExpected}.");
71+
}
72+
}
73+
74+
private void AddPyramidJobs(JobHandle parent, int layer)
75+
{
76+
if (layer >= layers)
77+
{
78+
return;
79+
}
80+
81+
for (var i = 0; i < growSizePerLayer; i++)
82+
{
83+
var isNotLastLayer = layer + 1 < layers;
84+
var handle = jobScheduler.Schedule(isNotLastLayer ? null : jobThing, parent);
85+
AddPyramidJobs(handle, layer + 1);
86+
jobScheduler.Flush(handle);
87+
totalScheduledJobs++;
88+
}
89+
}
90+
91+
private void AddInvertedPyramidJobs(JobHandle source, int layer, JobHandle topJob)
92+
{
93+
if (layer >= layers)
94+
{
95+
return;
96+
}
97+
98+
for (var i = 0; i < growSizePerLayer; i++)
99+
{
100+
var isLastLayer = layer + 1 == layers;
101+
var target = jobScheduler.Schedule(isLastLayer ? jobThing : null);
102+
target.DependsOn(source, jobScheduler);
103+
target.HasParent(topJob);
104+
AddInvertedPyramidJobs(target, layer + 1, topJob);
105+
jobScheduler.Flush(target);
106+
totalScheduledJobs++;
107+
}
108+
}
109+
}

JobScheduler.Benchmarks/JobSchedulerBenchmark.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ public struct CalculationJob : IJob
99
private int _first;
1010
private int _second;
1111
public static int _result;
12+
1213
public CalculationJob(int first, int second)
1314
{
1415
_first = first;
@@ -35,8 +36,8 @@ public class JobSchedulerBenchmark
3536
[IterationSetup]
3637
public void Setup()
3738
{
38-
_jobScheduler = new JobScheduler();
39-
_jobHandles = new List<JobHandle>(Jobs);
39+
_jobScheduler = new();
40+
_jobHandles = new(Jobs);
4041
}
4142

4243
[IterationCleanup]

JobScheduler.Test/JobSchedulerTests.cs

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -53,8 +53,8 @@ public void Flush_ParentWithChild()
5353
jobScheduler.Flush(handle1);
5454
jobScheduler.Flush(handle2);
5555

56-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
57-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
56+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(50));
57+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(50));
5858

5959
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
6060
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -80,8 +80,8 @@ public void Flush_Child()
8080

8181
jobScheduler.Flush(handle2);
8282

83-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
84-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
83+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
84+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
8585

8686
IsFalse(job1CompletedFlag, "Job1 did not complete in time.");
8787
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -111,8 +111,8 @@ public void Wait_ParentWithChild()
111111
// Waits on handle1 to ensure both handles ran
112112
jobScheduler.Wait(handle1);
113113

114-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
115-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
114+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
115+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
116116

117117
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
118118
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -142,8 +142,8 @@ public void Wait_All_ParentWithChild()
142142
// Waits on handle1 to ensure both handles ran
143143
jobScheduler.Wait(handle1, handle2);
144144

145-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
146-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(5));
145+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
146+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
147147

148148
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
149149
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
@@ -161,17 +161,56 @@ public void Flush_Dependencies()
161161
var job1 = new TestJob(1, () => { job1Completed.Set(); });
162162
var job2 = new TestJob(2, () => { job2Completed.Set(); });
163163

164-
// Job2 should finish after Job1 since its his child.
164+
// Job2 should finish after Job1 since it's his child.
165165
var handle1 = jobScheduler.Schedule(job1);
166166
var handle2 = jobScheduler.Schedule(job2);
167167
jobScheduler.AddDependency(handle2, handle1);
168168

169169
jobScheduler.Flush(handle1);
170+
jobScheduler.Flush(handle2);
170171

171-
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromSeconds(5));
172-
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromSeconds(10));
172+
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
173+
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(10));
173174

174175
IsTrue(job1CompletedFlag, "Job1 did not complete in time.");
175176
IsTrue(job2CompletedFlag, "Job2 did not complete in time.");
176177
}
178+
179+
[Test]
180+
public void OneMoreDependencyThanNodeLimitSize()
181+
{
182+
using var jobScheduler = new JobScheduler();
183+
var sourceJob = jobScheduler.Schedule();
184+
var targetJob = jobScheduler.Schedule();
185+
for (var i = 0; i < 9; i++)
186+
{
187+
var midJob = jobScheduler.Schedule();
188+
targetJob.DependsOn(midJob, jobScheduler);
189+
midJob.DependsOn(sourceJob, jobScheduler);
190+
jobScheduler.Flush(midJob);
191+
}
192+
193+
jobScheduler.Flush(targetJob);
194+
jobScheduler.Flush(sourceJob);
195+
jobScheduler.Wait(targetJob);
196+
}
197+
198+
[Test]
199+
public void ManyMoreDependencyThanNodeLimitSize()
200+
{
201+
using var jobScheduler = new JobScheduler();
202+
var sourceJob = jobScheduler.Schedule();
203+
var targetJob = jobScheduler.Schedule();
204+
for (var i = 0; i < 90; i++)
205+
{
206+
var midJob = jobScheduler.Schedule();
207+
targetJob.DependsOn(midJob, jobScheduler);
208+
midJob.DependsOn(sourceJob, jobScheduler);
209+
jobScheduler.Flush(midJob);
210+
}
211+
212+
jobScheduler.Flush(targetJob);
213+
jobScheduler.Flush(sourceJob);
214+
jobScheduler.Wait(targetJob);
215+
}
177216
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using NUnit.Framework;
2+
using Schedulers.Utils;
3+
4+
namespace Schedulers.Test;
5+
[TestFixture]
6+
public class ParallelForTests : IDisposable
7+
{
8+
public ParallelForTests()
9+
{
10+
ParallelForJobCommon.SetScheduler(new());
11+
}
12+
13+
public void Dispose()
14+
{
15+
ParallelForJobCommon.DisposeScheduler();
16+
}
17+
18+
[Test]
19+
public void TestParallelFor()
20+
{
21+
var results = new int[100];
22+
ParallelForJob.Create(results.Length, i =>
23+
{
24+
results[i] = i * i;
25+
}).Flush().Wait();
26+
27+
28+
for (var i = 0; i < 100; i++)
29+
{
30+
Assert.That(results[i], Is.EqualTo(i * i));
31+
}
32+
}
33+
34+
[Test]
35+
public void TestParallelForEach()
36+
{
37+
var dict = Enumerable.Range(1, 50).ToDictionary(i => i, i => i * 2);
38+
var sum = 0;
39+
40+
ParallelForEachJob.Create(dict, kvp =>
41+
{
42+
Console.WriteLine($"Processing key: {kvp.Key}, value: {kvp.Value}");
43+
Interlocked.Add(ref sum, kvp.Value);
44+
}).Flush().Wait();
45+
46+
47+
Assert.That(sum, Is.EqualTo(dict.Values.Sum()));
48+
}
49+
}

0 commit comments

Comments
 (0)