Skip to content

Commit c3143d2

Browse files
ThreadPool: get task status from the handler's return value in EnqueueAsyncWork
1 parent c0a31d2 commit c3143d2

File tree

3 files changed

+46
-4
lines changed

3 files changed

+46
-4
lines changed

Common/interface/AsyncInitializer.hpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,13 @@ class AsyncInitializer
106106
{
107107
VERIFY_EXPR(pThreadPool != nullptr);
108108
return std::unique_ptr<AsyncInitializer>{
109-
new AsyncInitializer{EnqueueAsyncWork(pThreadPool, ppPrerequisites, NumPrerequisites, std::forward<HanlderType>(Handler))},
109+
new AsyncInitializer{
110+
EnqueueAsyncWork(pThreadPool, ppPrerequisites, NumPrerequisites,
111+
[Handler = std::forward<HanlderType>(Handler)](Uint32 ThreadId) mutable {
112+
Handler(ThreadId);
113+
return ASYNC_TASK_STATUS_COMPLETE;
114+
}),
115+
},
110116
};
111117
}
112118

Common/interface/ThreadPool.hpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ class AsyncTaskBase : public ObjectBase<IAsyncTask>
9595
break;
9696

9797
case ASYNC_TASK_STATUS_NOT_STARTED:
98-
DEV_ERROR("NOT_STARTED is only allowed as initial task status.");
98+
DEV_CHECK_ERR(m_TaskStatus == ASYNC_TASK_STATUS_RUNNING,
99+
"A task should only be moved to NOT_STARTED state from RUNNING state.");
99100
break;
100101

101102
case ASYNC_TASK_STATUS_RUNNING:
@@ -184,8 +185,8 @@ RefCntAutoPtr<IAsyncTask> EnqueueAsyncWork(IThreadPool* pThreadPool,
184185

185186
virtual void DILIGENT_CALL_TYPE Run(Uint32 ThreadId) override final
186187
{
187-
m_Handler(ThreadId);
188-
SetStatus(ASYNC_TASK_STATUS_COMPLETE);
188+
ASYNC_TASK_STATUS TaskStatus = m_Handler(ThreadId);
189+
SetStatus(TaskStatus);
189190
}
190191

191192
private:

Tests/DiligentCoreTest/src/Common/ThreadPoolTest.cpp

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ TEST(Common_ThreadPool, EnqueueTask)
7575
f = std::sin(f + 1.f);
7676
Results[i].store(f);
7777
WorkComplete[i].store(true);
78+
79+
return ASYNC_TASK_STATUS_COMPLETE;
7880
});
7981
}
8082

@@ -133,6 +135,8 @@ TEST(Common_ThreadPool, ProcessTask)
133135
f = std::sin(f + 1.f);
134136
Results[i].store(f);
135137
WorkComplete[i].store(true);
138+
139+
return ASYNC_TASK_STATUS_COMPLETE;
136140
});
137141
}
138142

@@ -326,6 +330,7 @@ TEST(Common_ThreadPool, Priorities)
326330
[&CompletionOrder, i](Uint32 ThreadId) //
327331
{
328332
CompletionOrder.push_back(i);
333+
return ASYNC_TASK_STATUS_COMPLETE;
329334
});
330335
}
331336

@@ -396,6 +401,8 @@ TEST(Common_ThreadPool, Prerequisites)
396401
}
397402
if (CorrectOrder)
398403
NumTasksCorrectlyOrdered.fetch_add(1);
404+
405+
return ASYNC_TASK_STATUS_COMPLETE;
399406
},
400407
static_cast<float>(task) // Inverse priority so that the thread pool fixes it
401408
);
@@ -407,4 +414,32 @@ TEST(Common_ThreadPool, Prerequisites)
407414
}
408415
}
409416

417+
418+
TEST(Common_ThreadPool, ReRunTasks)
419+
{
420+
auto pThreadPool = CreateThreadPool(ThreadPoolCreateInfo{4});
421+
ASSERT_NE(pThreadPool, nullptr);
422+
423+
constexpr Uint32 NumTasks = 32;
424+
std::vector<std::atomic<int>> ReRunCounters(NumTasks);
425+
426+
for (int i = 0; i < ReRunCounters.size(); ++i)
427+
ReRunCounters[i] = 32 + i;
428+
429+
for (Uint32 task = 0; task < NumTasks; ++task)
430+
{
431+
EnqueueAsyncWork(
432+
pThreadPool,
433+
[task, &ReRunCounters](Uint32 ThreadId) //
434+
{
435+
int ReRunCounter = ReRunCounters[task].fetch_add(-1) - 1;
436+
return ReRunCounter > 0 ? ASYNC_TASK_STATUS_NOT_STARTED : ASYNC_TASK_STATUS_COMPLETE;
437+
});
438+
}
439+
440+
pThreadPool->WaitForAllTasks();
441+
for (size_t i = 0; i < ReRunCounters.size(); ++i)
442+
EXPECT_EQ(ReRunCounters[i], 0) << i;
443+
}
444+
410445
} // namespace

0 commit comments

Comments
 (0)