Skip to content

Commit 3c5fb3b

Browse files
committed
Add changes
1 parent 0e1a414 commit 3c5fb3b

File tree

11 files changed

+235
-28
lines changed

11 files changed

+235
-28
lines changed

JobScheduler.Benchmarks/Benchmark.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ private static void BenchB()
170170
var handle = jobScheduler.Schedule(job);
171171
handle.Parent = parentHandle.Index;
172172
handle.HasParent(parentHandle);
173-
jobScheduler.ReadToFlush(handle);
173+
jobScheduler.Flush(handle);
174174
}
175175

176-
jobScheduler.ReadToFlush(parentHandle);
176+
jobScheduler.Flush(parentHandle);
177177
jobScheduler.Wait(parentHandle);
178178
}
179179

@@ -216,6 +216,7 @@ private static long BenchVector(bool dontUseVector)
216216
for (var sindex = 0; sindex < loopCount; sindex++)
217217
{
218218
var job = new ParallelJobProducer<VectorCalculationJob>(jobCount, data, jobScheduler, 16, !dontUseVector);
219+
jobScheduler.Flush(job.GetHandle());
219220
jobScheduler.Wait(job.GetHandle());
220221
}
221222

JobScheduler.Benchmarks/JobHierarchyTest.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ private void PyramidJobs()
4040
var timer = new JobTimer();
4141
var topJob = jobScheduler.Schedule();
4242
AddPyramidJobs(topJob, 0);
43-
jobScheduler.ReadToFlush(topJob);
43+
jobScheduler.Flush(topJob);
4444
jobScheduler.Wait(topJob);
4545
timer.End(totalScheduledJobs, "PyramidJobs test");
4646
var passedExpected = (int)Math.Pow(growSizePerLayer, layers);
@@ -59,8 +59,8 @@ private void InvertedPyramidJobs()
5959
var bottomJob = jobScheduler.Schedule();
6060
var topJob = jobScheduler.Schedule();
6161
AddInvertedPyramidJobs(bottomJob, 1, topJob);
62-
jobScheduler.ReadToFlush(bottomJob);
63-
jobScheduler.ReadToFlush(topJob);
62+
jobScheduler.Flush(bottomJob);
63+
jobScheduler.Flush(topJob);
6464
jobScheduler.Wait(topJob);
6565
timer.End(totalScheduledJobs, "InvertedPyramidJobs test");
6666
var passedExpected = (int)Math.Pow(growSizePerLayer, layers - 1);
@@ -83,7 +83,7 @@ private void AddPyramidJobs(JobHandle parent, int layer)
8383
var isNotLastLayer = layer + 1 < layers;
8484
var handle = jobScheduler.Schedule(isNotLastLayer ? null : jobThing, parent);
8585
AddPyramidJobs(handle, layer + 1);
86-
jobScheduler.ReadToFlush(handle);
86+
jobScheduler.Flush(handle);
8787
totalScheduledJobs++;
8888
}
8989
}
@@ -102,7 +102,7 @@ private void AddInvertedPyramidJobs(JobHandle source, int layer, JobHandle topJo
102102
target.DependsOn(source, jobScheduler);
103103
target.HasParent(topJob);
104104
AddInvertedPyramidJobs(target, layer + 1, topJob);
105-
jobScheduler.ReadToFlush(target);
105+
jobScheduler.Flush(target);
106106
totalScheduledJobs++;
107107
}
108108
}

JobScheduler.Test/JobSchedulerTests.cs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ public void Flush_ParentWithChild()
5050
var handle1 = jobScheduler.Schedule(job1);
5151
var handle2 = jobScheduler.Schedule(job2, handle1);
5252

53-
jobScheduler.ReadToFlush(handle1);
54-
jobScheduler.ReadToFlush(handle2);
53+
jobScheduler.Flush(handle1);
54+
jobScheduler.Flush(handle2);
5555

5656
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(50));
5757
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(50));
@@ -78,7 +78,7 @@ public void Flush_Child()
7878
var handle1 = jobScheduler.Schedule(job1);
7979
var handle2 = jobScheduler.Schedule(job2, handle1);
8080

81-
jobScheduler.ReadToFlush(handle2);
81+
jobScheduler.Flush(handle2);
8282

8383
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
8484
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(5));
@@ -105,8 +105,8 @@ public void Wait_ParentWithChild()
105105
var handle1 = jobScheduler.Schedule(job1);
106106
var handle2 = jobScheduler.Schedule(job2, handle1);
107107

108-
jobScheduler.ReadToFlush(handle1);
109-
jobScheduler.ReadToFlush(handle2);
108+
jobScheduler.Flush(handle1);
109+
jobScheduler.Flush(handle2);
110110

111111
// Waits on handle1 to ensure both handles ran
112112
jobScheduler.Wait(handle1);
@@ -136,8 +136,8 @@ public void Wait_All_ParentWithChild()
136136
var handle1 = jobScheduler.Schedule(job1);
137137
var handle2 = jobScheduler.Schedule(job2, handle1);
138138

139-
jobScheduler.ReadToFlush(handle1);
140-
jobScheduler.ReadToFlush(handle2);
139+
jobScheduler.Flush(handle1);
140+
jobScheduler.Flush(handle2);
141141

142142
// Waits on handle1 to ensure both handles ran
143143
jobScheduler.Wait(handle1, handle2);
@@ -166,8 +166,8 @@ public void Flush_Dependencies()
166166
var handle2 = jobScheduler.Schedule(job2);
167167
jobScheduler.AddDependency(handle2, handle1);
168168

169-
jobScheduler.ReadToFlush(handle1);
170-
jobScheduler.ReadToFlush(handle2);
169+
jobScheduler.Flush(handle1);
170+
jobScheduler.Flush(handle2);
171171

172172
var job1CompletedFlag = job1Completed.WaitOne(TimeSpan.FromMilliseconds(5));
173173
var job2CompletedFlag = job2Completed.WaitOne(TimeSpan.FromMilliseconds(10));
@@ -187,11 +187,11 @@ public void OneMoreDependencyThanNodeLimitSize()
187187
var midJob = jobScheduler.Schedule();
188188
targetJob.DependsOn(midJob, jobScheduler);
189189
midJob.DependsOn(sourceJob, jobScheduler);
190-
jobScheduler.ReadToFlush(midJob);
190+
jobScheduler.Flush(midJob);
191191
}
192192

193-
jobScheduler.ReadToFlush(targetJob);
194-
jobScheduler.ReadToFlush(sourceJob);
193+
jobScheduler.Flush(targetJob);
194+
jobScheduler.Flush(sourceJob);
195195
jobScheduler.Wait(targetJob);
196196
}
197197

@@ -206,11 +206,11 @@ public void ManyMoreDependencyThanNodeLimitSize()
206206
var midJob = jobScheduler.Schedule();
207207
targetJob.DependsOn(midJob, jobScheduler);
208208
midJob.DependsOn(sourceJob, jobScheduler);
209-
jobScheduler.ReadToFlush(midJob);
209+
jobScheduler.Flush(midJob);
210210
}
211211

212-
jobScheduler.ReadToFlush(targetJob);
213-
jobScheduler.ReadToFlush(sourceJob);
212+
jobScheduler.Flush(targetJob);
213+
jobScheduler.Flush(sourceJob);
214214
jobScheduler.Wait(targetJob);
215215
}
216216
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
using NUnit.Framework;
2+
using Schedulers.Utils;
3+
4+
namespace Schedulers.Test;
5+
[TestFixture]
6+
public class ParallelForTests : IDisposable
7+
{
8+
public ParallelForTests()
9+
{
10+
ParallelForJobCommon.SetScheduler(new());
11+
}
12+
13+
public void Dispose()
14+
{
15+
ParallelForJobCommon.DisposeScheduler();
16+
}
17+
18+
[Test]
19+
public void TestParallelFor()
20+
{
21+
var results = new int[100];
22+
ParallelForJob.Create(results.Length, i =>
23+
{
24+
results[i] = i * i;
25+
}).Flush().Wait();
26+
27+
28+
for (var i = 0; i < 100; i++)
29+
{
30+
Assert.That(results[i], Is.EqualTo(i * i));
31+
}
32+
}
33+
34+
[Test]
35+
public void TestParallelForEach()
36+
{
37+
var dict = Enumerable.Range(1, 50).ToDictionary(i => i, i => i * 2);
38+
var sum = 0;
39+
40+
ParallelForEachJob.Create(dict, kvp =>
41+
{
42+
Console.WriteLine($"Processing key: {kvp.Key}, value: {kvp.Value}");
43+
Interlocked.Add(ref sum, kvp.Value);
44+
}).Flush().Wait();
45+
46+
47+
Assert.That(sum, Is.EqualTo(dict.Values.Sum()));
48+
}
49+
}

JobScheduler/JobHandle.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public void DependsOn(JobHandle target, JobScheduler scheduler)
275275
currentLastNode.UnfinishedJobs--;
276276
currentLastNode.DependsOn(newNode, scheduler);
277277
// The new node should also be ready to be executed.
278-
scheduler.ReadToFlush(newNode);
278+
scheduler.Flush(newNode);
279279
return;
280280
}
281281

JobScheduler/JobScheduler.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private void FlushDirect(JobHandle job)
103103
/// If it has unfinished children it will not be flushed, and instead will be left for the children to flush.
104104
/// </summary>
105105
/// <param name="job">The <see cref="JobHandle"/>.</param>
106-
public void ReadToFlush(JobHandle job)
106+
public void Flush(JobHandle job)
107107
{
108108
// This is to prevent the job from being flushed multiple times.
109109
var unfinishedJobs = Interlocked.Decrement(ref job.UnfinishedJobs);

JobScheduler/Utils/IParallelJobProducer.cs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ public interface IParallelJobProducer
1717
[Pure]
1818
public void RunVectorized(int start, int end)
1919
{
20-
throw new NotImplementedException();
20+
for (var i = start; i < end; i++)
21+
{
22+
RunSingle(i);
23+
}
2124
}
2225

2326
/// <summary>
@@ -65,7 +68,6 @@ public ParallelJobProducer(int to, T producer, JobScheduler scheduler, int loopS
6568
_onlySingle = onlySingle;
6669
_loopSize = loopSize;
6770
_selfHandle = _scheduler.Schedule(this);
68-
_scheduler.ReadToFlush(_selfHandle);
6971
}
7072

7173
//Only used to spawn sub-jobs
@@ -79,7 +81,7 @@ private ParallelJobProducer(T producer, JobScheduler scheduler, JobHandle parent
7981
_loopSize = loopSize;
8082
_onlySingle = onlySingle;
8183
_selfHandle = _scheduler.Schedule(this, parent);
82-
_scheduler.ReadToFlush(_selfHandle);
84+
_scheduler.Flush(_selfHandle);
8385
}
8486

8587
/// <summary>

JobScheduler/Utils/JobSchedulerExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ public static void Flush(this JobScheduler jobScheduler, Span<JobHandle> jobs)
1111
{
1212
foreach (ref var job in jobs)
1313
{
14-
jobScheduler.ReadToFlush(job);
14+
jobScheduler.Flush(job);
1515
}
1616
}
1717

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using System.Collections.Concurrent;
2+
3+
namespace Schedulers.Utils;
4+
5+
/// <summary>
6+
/// A utility class for running actions in parallel over a collection of items.
7+
/// </summary>
8+
public static class ParallelForEachJob
9+
{
10+
private readonly struct PartitionedJobProducer<T>(IList<IEnumerator<T>> partitions, Action<T> action) : IParallelJobProducer
11+
{
12+
public void RunSingle(int index)
13+
{
14+
var enumerator = partitions[index];
15+
while (enumerator.MoveNext())
16+
{
17+
action(enumerator.Current);
18+
}
19+
}
20+
}
21+
22+
/// <summary>
23+
/// Creates a ParallelJobProducer that executes the provided action on every item of the given collection.
24+
/// </summary>
25+
/// <param name="source">The data to run the action on</param>
26+
/// <param name="action">What to run</param>
27+
/// <param name="scheduler">The scheduler to use</param>
28+
/// <returns>A struct that wraps the handle</returns>
29+
public static ParallelForHandle Create<T>(IEnumerable<T> source, Action<T> action, JobScheduler? scheduler = null)
30+
{
31+
scheduler ??= ParallelForJobCommon.GlobalScheduler ?? throw new ArgumentNullException(nameof(scheduler), "JobScheduler cannot be null. Please initialize it before using ParallelForJob.");
32+
var partitioner = Partitioner.Create(source);
33+
var partitions = partitioner.GetPartitions(Environment.ProcessorCount);
34+
var producer = new ParallelJobProducer<PartitionedJobProducer<T>>(partitions.Count, new(partitions, action), scheduler);
35+
return new(producer.GetHandle(), scheduler);
36+
}
37+
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
namespace Schedulers.Utils;
2+
3+
/// <summary>
4+
/// A utility class for creating parallel jobs that execute an action for each index in a range.
5+
/// </summary>
6+
public static class ParallelForJob
7+
{
8+
private readonly struct JobProducer(Action<int> action) : IParallelJobProducer
9+
{
10+
public void RunSingle(int index)
11+
{
12+
action(index);
13+
}
14+
}
15+
16+
/// <summary>
17+
/// Creates a ParallelJobProducer that executes the provided action in parallel for the specified count.
18+
/// </summary>
19+
/// <param name="count">Executes from 0 to this</param>
20+
/// <param name="action">What to run</param>
21+
/// <param name="scheduler">The scheduler to use</param>
22+
/// <returns>A struct that wraps the handle</returns>
23+
public static ParallelForHandle Create(int count, Action<int> action, JobScheduler? scheduler = null)
24+
{
25+
scheduler ??= ParallelForJobCommon.GlobalScheduler ?? throw new ArgumentNullException(nameof(scheduler), "JobScheduler cannot be null. Please initialize it before using ParallelForJob.");
26+
var producer = new ParallelJobProducer<JobProducer>(count, new(action), scheduler);
27+
return new(producer.GetHandle(), scheduler);
28+
}
29+
}

0 commit comments

Comments
 (0)