Skip to content

Commit 06c82af

Browse files
Fea, Windows平台的TaskRunnerDispatch改用系统线程池API实现,提高多模块线程共享能力
1 parent 9054d83 commit 06c82af

File tree

5 files changed

+123
-145
lines changed

5 files changed

+123
-145
lines changed

UnitTest/TaskRunnerUnitTest.cpp

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ namespace TaskRunnerUnitTest
260260

261261
WaitForSingleObject(_hEvent, 5000);
262262

263-
Assert::IsTrue(nCount == 5);
263+
Assert::AreEqual(nCount, 5);
264264
_pTimer = nullptr;
265265
_pTaskRunner = nullptr;
266266

@@ -362,7 +362,7 @@ namespace TaskRunnerUnitTest
362362
[&_uWaitResult](DWORD _uWaitResultT)
363363
{
364364
_uWaitResult = _uWaitResultT;
365-
return true;
365+
return false;
366366
});
367367

368368
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(600ul));
@@ -374,10 +374,11 @@ namespace TaskRunnerUnitTest
374374
[&_uWaitResult](DWORD _uWaitResultT)
375375
{
376376
_uWaitResult = _uWaitResultT;
377-
return true;
377+
return false;
378378
});
379379
SetEvent(_hEvent);
380380
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(100ul));
381+
Assert::AreEqual(DWORD(_uWaitResult), DWORD(WAIT_OBJECT_0));
381382
// CloseHandle(_hEvent);
382383
}
383384
};
@@ -827,7 +828,7 @@ namespace TaskRunnerUnitTest
827828
[&_uWaitResult](DWORD _uWaitResultT)
828829
{
829830
_uWaitResult = _uWaitResultT;
830-
return true;
831+
return false;
831832
});
832833

833834
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(600ul));
@@ -839,7 +840,7 @@ namespace TaskRunnerUnitTest
839840
[&_uWaitResult](DWORD _uWaitResultT)
840841
{
841842
_uWaitResult = _uWaitResultT;
842-
return true;
843+
return false;
843844
});
844845
SetEvent(_hEvent);
845846
Assert::IsTrue(((TaskEntry*)_pWait.Get())->Wait(100ul));

include/YY/Base/Threading/TaskRunner.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ namespace YY
8787
return HasFlags(fStyle, TaskEntryStyle::Canceled);
8888
}
8989

90-
void __YYAPI Cancel()
90+
virtual void __YYAPI Cancel()
9191
{
9292
YY::Sync::BitSet((int32_t*)&fStyle, 2);
9393
}
@@ -99,6 +99,8 @@ namespace YY
9999
{
100100
std::function<bool(void)> pfnTimerCallback;
101101

102+
HANDLE hThreadPoolTimer = nullptr;
103+
102104
// 任务到期时间
103105
TickCount uExpire;
104106

@@ -108,10 +110,13 @@ namespace YY
108110
Timer* pNext = nullptr;
109111

110112
HRESULT __YYAPI RunTask() override;
113+
114+
void __YYAPI Cancel() override;
111115
};
112116

113117
struct Wait : public TaskEntry
114118
{
119+
HANDLE hThreadPoolWait = nullptr;
115120
HANDLE hHandle = nullptr;
116121
TickCount uTimeOut;
117122
DWORD uWaitResult = WAIT_FAILED;
@@ -121,6 +126,8 @@ namespace YY
121126
Wait* pNext = nullptr;
122127

123128
HRESULT __YYAPI RunTask() override;
129+
130+
void __YYAPI Cancel() override;
124131
};
125132

126133
#if defined(_WIN32)

src/YY/Base/Threading/TaskRunner.cpp

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,18 @@ namespace YY
7979
}
8080
}
8181

82+
void __YYAPI Timer::Cancel()
83+
{
84+
TaskEntry::Cancel();
85+
86+
if (HANDLE _hThreadPoolTimer = YY::ExchangePoint(&hThreadPoolTimer, nullptr))
87+
{
88+
DeleteTimerQueueTimer(NULL, _hThreadPoolTimer, NULL);
89+
}
90+
91+
return;
92+
}
93+
8294
Threading::TaskRunner::TaskRunner()
8395
: uTaskRunnerId(GenerateNewTaskRunnerId())
8496
{
@@ -465,6 +477,18 @@ namespace YY
465477
return HRESULT_From_LSTATUS(ERROR_CANCELLED);
466478
}
467479
}
480+
481+
void __YYAPI Wait::Cancel()
482+
{
483+
TaskEntry::Cancel();
484+
485+
if (HANDLE _hThreadPoolWait = YY::ExchangePoint(&hThreadPoolWait, nullptr))
486+
{
487+
UnregisterWaitEx(_hThreadPoolWait, NULL);
488+
}
489+
490+
return;
491+
}
468492
} // namespace Threading
469493
}
470494
} // namespace YY

src/YY/Base/Threading/TaskRunnerDispatchImpl.cpp

Lines changed: 79 additions & 139 deletions
Original file line numberDiff line numberDiff line change
@@ -168,32 +168,12 @@ namespace YY
168168

169169
#ifdef _WIN32
170170
/// <summary>
171-
/// 用于Windows XP以及更高版本的Task调度器。
172-
/// 早期版本的系统 IoCompletionPort 无法等待,始终处于有信号状态。因此需要这个特殊的调度器。
173-
/// 此调度器普通状态下会启动二个线程:
174-
/// 其中一个线程专门用来处理 IoCompletionPort。
175-
/// 另外一个线程用于处理TimerManger与WaitManger。
171+
/// 用于Windows Task调度器。主要服务于SequencedTaskRunner。
176172
/// </summary>
177-
class TaskRunnerDispatchForWindowsXPOrLater
178-
: public TaskRunnerDispatchBaseImpl<TaskRunnerDispatchForWindowsXPOrLater>
173+
class TaskRunnerDispatchForWindows : public TaskRunnerDispatch
179174
{
180-
private:
181-
HANDLE hIoCompletionPort = NULL;
182-
volatile int32_t nIoCompletionPortTaskRef = 0ul;
183-
184175
public:
185-
TaskRunnerDispatchForWindowsXPOrLater()
186-
: TaskRunnerDispatchBaseImpl(CreateEventW(nullptr, FALSE, FALSE, nullptr))
187-
, hIoCompletionPort(CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 1u))
188-
{
189-
}
190-
191-
~TaskRunnerDispatchForWindowsXPOrLater()
192-
{
193-
CloseHandle(hIoCompletionPort);
194-
YY::Exchange(&nDispatchTaskRef, 0);
195-
SetEvent(oDefaultWaitBlock.hTaskRunnerServerHandle);
196-
}
176+
constexpr TaskRunnerDispatchForWindows() = default;
197177

198178
bool __YYAPI BindIO(_In_ HANDLE _hHandle) const noexcept override
199179
{
@@ -202,150 +182,110 @@ namespace YY
202182
return false;
203183
}
204184

205-
if (CreateIoCompletionPort(_hHandle, hIoCompletionPort, 0, 1) != hIoCompletionPort)
206-
{
207-
return false;
208-
}
185+
// 使用 BindIoCompletionCallback以获得更加友好的线程池调度。
186+
// XP的线程池接口直接再Wait函数中执行回调,刚好方便YY.Base将任务调度到对应的TaskRunner。
187+
const auto _bRet = BindIoCompletionCallback(
188+
_hHandle,
189+
[](DWORD _uErrorCode, DWORD _cbNumberOfBytesTransfered, LPOVERLAPPED _pOverlapped)
190+
{
191+
auto _pDispatchTask = RefPtr<IoTaskEntry>::FromPtr(static_cast<IoTaskEntry*>(_pOverlapped));
192+
if (!_pDispatchTask)
193+
return;
209194

210-
return true;
195+
// 错误代码如果已经设置,那么可能调用者线程已经事先处理了。
196+
if (_pDispatchTask->OnComplete(_uErrorCode))
197+
{
198+
DispatchTask(std::move(_pDispatchTask));
199+
}
200+
},
201+
0);
202+
203+
return _bRet;
211204
}
212205

213206
void __YYAPI StartIo() noexcept override
214207
{
215-
const auto _nNewIoCompletionPortTaskRef = YY::Sync::Increment(&nIoCompletionPortTaskRef);
216-
if (_nNewIoCompletionPortTaskRef == 1)
217-
{
218-
auto _bRet = TrySubmitThreadpoolCallback(
219-
[](_Inout_ PTP_CALLBACK_INSTANCE _pInstance,
220-
_In_ PVOID _pContext)
221-
{
222-
auto _pTask = reinterpret_cast<TaskRunnerDispatchForWindowsXPOrLater*>(_pContext);
223-
SetThreadDescription(GetCurrentThread(), L"IOCP调度线程");
224-
_pTask->ExecuteIoCompletionPort();
225-
SetThreadDescription(GetCurrentThread(), L"");
226-
},
227-
this,
228-
nullptr);
229-
230-
if (!_bRet)
231-
{
232-
throw Exception(HRESULT_From_LSTATUS(GetLastError()));
233-
}
234-
}
235208
}
236209

237-
void __YYAPI Weakup(int32_t _nNewDispatchTaskRef, uint32_t _uNewFlags = UINT32_MAX)
210+
void __YYAPI SetTimerInternal(_In_ RefPtr<Timer> _pTimer) noexcept override
238211
{
239-
if (_nNewDispatchTaskRef < 1)
212+
if (!_pTimer)
213+
return;
214+
215+
if (HANDLE _hThreadPoolTimer = YY::ExchangePoint(&_pTimer->hThreadPoolTimer, nullptr))
240216
{
241-
// 当前一部分Task因为时序发生抢跑时,短时间里引用计数可能会小于 1,甚至小于 0
242-
// 这时计数恢复时,任务其实已经被执行了。这时唤醒时我们不用做任何事情。
217+
DeleteTimerQueueTimer(NULL, _hThreadPoolTimer, INVALID_HANDLE_VALUE);
243218
}
244-
else if (_nNewDispatchTaskRef == 1)
219+
220+
const auto _uCurrent = TickCount::GetNow();
221+
const auto _iDueTime = (_pTimer->uExpire - _uCurrent).GetTotalMilliseconds();
222+
if (_iDueTime <= 0)
223+
{
224+
_pTimer->uExpire = _uCurrent;
225+
DispatchTask(std::move(_pTimer));
226+
}
227+
else
245228
{
246-
// 之前等待任务计数是0,这说明它没有线程,我们需要给它安排一个线程。
247-
auto _bRet = TrySubmitThreadpoolCallback(
248-
[](_Inout_ PTP_CALLBACK_INSTANCE _pInstance,
249-
_In_ PVOID _pContext)
229+
// 我们使用 CreateTimerQueueTimer这是因为它允许我们在线程池线程中执行回调,没有额外的上下文切换开销。
230+
auto _bRet = CreateTimerQueueTimer(
231+
&_pTimer->hThreadPoolTimer,
232+
NULL,
233+
[](PVOID _pParameter, BOOLEAN _bTimerFired)
250234
{
251-
auto _pTask = reinterpret_cast<TaskRunnerDispatchForWindowsXPOrLater*>(_pContext);
252-
SetThreadDescription(GetCurrentThread(), L"Timer/Wait调度线程");
253-
_pTask->ExecuteTaskRunner();
254-
SetThreadDescription(GetCurrentThread(), L"");
235+
auto _pTimerTask = RefPtr<Timer>::FromPtr(static_cast<Timer*>(_pParameter));
236+
if (!_pTimerTask)
237+
return;
238+
239+
DispatchTask(std::move(_pTimerTask));
255240
},
256-
this,
257-
nullptr);
241+
_pTimer.Get(),
242+
_iDueTime,
243+
0,
244+
WT_EXECUTEINTIMERTHREAD | WT_EXECUTEONLYONCE);
258245

259246
if (!_bRet)
260247
{
261-
throw Exception(HRESULT_From_LSTATUS(GetLastError()));
262-
}
263-
}
264-
else if (_uNewFlags < WakeupRefOnceRaw * 2)
265-
{
266-
// 之前的WakeupRef计数为 0,所以我们需要重新唤醒 Dispatch 线程。
267-
if (!SetEvent(oDefaultWaitBlock.hTaskRunnerServerHandle))
268-
{
269-
throw Exception(HRESULT_From_LSTATUS(GetLastError()));
248+
return;
270249
}
250+
251+
_pTimer.Get()->AddRef();
271252
}
272253
}
273254

274-
private:
275-
void __YYAPI ExecuteIoCompletionPort() noexcept
255+
HRESULT __YYAPI SetWaitInternal(_In_ RefPtr<Wait> _pWait) noexcept override
276256
{
277-
uint32_t _cTaskProcessed = 0;
278-
OVERLAPPED_ENTRY _oCompletionPortEntries[16];
279-
ULONG _uNumEntriesRemoved;
280-
for(;;)
257+
if (_pWait == nullptr || _pWait->hHandle == NULL)
258+
return E_INVALIDARG;
259+
260+
if (HANDLE _hThreadPoolWait = YY::ExchangePoint(&_pWait->hThreadPoolWait, nullptr))
281261
{
282-
if (YY::Sync::Subtract(&nIoCompletionPortTaskRef, _cTaskProcessed) <= 0)
283-
return;
262+
UnregisterWaitEx(_hThreadPoolWait, INVALID_HANDLE_VALUE);
263+
}
284264

285-
_cTaskProcessed = 0;
286-
auto _bRet = GetQueuedCompletionStatusEx(hIoCompletionPort, _oCompletionPortEntries, std::size(_oCompletionPortEntries), &_uNumEntriesRemoved, INFINITE, FALSE);
287-
if (!_bRet)
265+
// 我们使用 RegisterWaitForSingleObject 这是因为它允许我们在线程池线程中执行回调,没有额外的上下文切换开销。
266+
auto _bRet = RegisterWaitForSingleObject(
267+
&_pWait->hThreadPoolWait,
268+
_pWait->hHandle,
269+
[](PVOID _pParameter, BOOLEAN _bTimeout)
288270
{
289-
const auto _lStatus = GetLastError();
290-
if (_lStatus == WAIT_TIMEOUT || _lStatus == WAIT_IO_COMPLETION)
291-
{
292-
// 非意外错误
293-
continue;
294-
}
295-
else
296-
{
297-
// ERROR_ABANDONED_WAIT_0 : 这句柄关闭,线程应该退出了?所以我们也可以退出了?
271+
auto _pWaitTask = RefPtr<Wait>::FromPtr(static_cast<Wait*>(_pParameter));
272+
if (!_pWaitTask)
298273
return;
299-
}
300-
}
301274

302-
for (ULONG _uIndex = 0; _uIndex != _uNumEntriesRemoved; ++_uIndex)
303-
{
304-
auto _pDispatchTask = RefPtr<IoTaskEntry>::FromPtr(static_cast<IoTaskEntry*>(_oCompletionPortEntries[_uIndex].lpOverlapped));
305-
if (!_pDispatchTask)
306-
continue;
307-
308-
// 错误代码如果已经设置,那么可能调用者线程已经事先处理了。
309-
if (_pDispatchTask->OnComplete(DosErrorFormNtStatus(long(_pDispatchTask->Internal))))
310-
{
311-
DispatchTask(std::move(_pDispatchTask));
312-
}
313-
++_cTaskProcessed;
314-
}
315-
}
316-
}
275+
_pWaitTask->uWaitResult = _bTimeout ? WAIT_TIMEOUT : WAIT_OBJECT_0;
276+
DispatchTask(std::move(_pWaitTask));
277+
},
278+
_pWait.Get(),
279+
(_pWait->uTimeOut - TickCount::GetNow()).GetTotalMilliseconds(),
280+
WT_EXECUTEINWAITTHREAD | WT_EXECUTEONLYONCE);
317281

318-
void __YYAPI ExecuteTaskRunner()
319-
{
320-
size_t _cTaskProcessed = 0;
321-
for (;;)
282+
if (!_bRet)
322283
{
323-
auto _oCurrent = TickCount::GetNow();
324-
_cTaskProcessed += ProcessingTimerTasks(_oCurrent);
325-
_cTaskProcessed += ProcessingPendingTaskQueue();
326-
if (YY::Sync::Subtract(&nDispatchTaskRef, int32_t(_cTaskProcessed)) <= 0)
327-
return;
328-
329-
_cTaskProcessed = 0;
330-
331-
const auto _uTimerWakeupTickCount = GetMinimumWakeupTickCount();
332-
const auto _uWaitTaskWakeupTickCount = oDefaultWaitBlock.GetWakeupTickCountNolock(_oCurrent);
333-
auto _uWakeupTickCount = (std::min)(_uTimerWakeupTickCount, _uWaitTaskWakeupTickCount);
334-
const auto uWaitResult = WaitForMultipleObjectsEx(oDefaultWaitBlock.cWaitHandle, oDefaultWaitBlock.hWaitHandles, FALSE, GetWaitTimeSpan(_uWakeupTickCount), FALSE);
335-
if (uWaitResult == WAIT_OBJECT_0)
336-
{
337-
continue;
338-
}
339-
else if (uWaitResult == WAIT_TIMEOUT)
340-
{
341-
if (_uTimerWakeupTickCount < _uWaitTaskWakeupTickCount)
342-
{
343-
continue;
344-
}
345-
}
346-
347-
_cTaskProcessed += ProcessingWaitTasks(oDefaultWaitBlock, uWaitResult, oDefaultWaitBlock.cWaitHandle);
284+
return __HRESULT_FROM_WIN32(GetLastError());
348285
}
286+
287+
_pWait.Get()->AddRef();
288+
return S_OK;
349289
}
350290
};
351291
#endif
@@ -356,7 +296,7 @@ namespace YY
356296
if (!s_pCurrentTaskRunnerDispatch)
357297
{
358298
#ifdef _WIN32
359-
static TaskRunnerDispatchForWindowsXPOrLater s_TaskRunnerDispatch;
299+
static TaskRunnerDispatchForWindows s_TaskRunnerDispatch;
360300
s_pCurrentTaskRunnerDispatch = &s_TaskRunnerDispatch;
361301
#else
362302
#error "其他系统尚未适配"

0 commit comments

Comments
 (0)