The task queue has a narrow race: if a future callback is evaluated just when the task queue is terminated it may get skipped from termination processing. The result is that a call for the future won't be canceled immediately when the task queue terminates. Instead it will be canceled when its due time occurs.

There is a second possible race as well. If a new future callback is scheduled and the schedule code is interleaved with a terminate call on another thread, the same thing can occur.

This change fixes both cases. It also fixes up some incorrect macros in the test projects so they can build again.
This commit is contained in:
Brian Pepin 2024-04-22 13:26:43 -07:00 коммит произвёл GitHub
Родитель 470d97cffe
Коммит fda5d4ecdb
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
12 изменённых файлов: 239 добавлений и 44 удалений

Просмотреть файл

@ -19,9 +19,9 @@
<WarningLevel>Level3</WarningLevel>
<PreprocessorDefinitions>USING_TAEF;DASHBOARD_PRINCIPLE_GROUP;_NO_ASYNCRTIMP;INLINE_TEST_METHOD_MARKUP;WINAPI_FAMILY=WINAPI_FAMILY_DESKTOP_APP;UNIT_TEST_SERVICES;HC_NOZLIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<CompileAsWinRT>true</CompileAsWinRT>
<AdditionalUsingDirectories>$(WindowsSdkDir_10)UnionMetadata;$(VCToolsInstallDir)lib\x86\store\references;%(AdditionalUsingDirectories)</AdditionalUsingDirectories>
<AdditionalUsingDirectories>$(WindowsSDK_UnionMetadataPath);$(VCToolsInstallDir)lib\x86\store\references;%(AdditionalUsingDirectories)</AdditionalUsingDirectories>
<AdditionalOptions>/GS %(AdditionalOptions)</AdditionalOptions>
<AdditionalIncludeDirectories>c:\Program Files (x86)\Windows Kits\10\Testing\Development\inc;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalIncludeDirectories>c:\Program Files (x86)\Windows Kits\10\Testing\Development\inc;$(HCSourceDir)\Task;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<RemoveUnreferencedCodeData Condition="'$(Configuration)|$(Platform)'=='Release|x64'">false</RemoveUnreferencedCodeData>
</ClCompile>
<Link>

Просмотреть файл

@ -19,8 +19,8 @@
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PreprocessorDefinitions>DASHBOARD_PRINCIPLE_GROUP;_NO_ASYNCRTIMP;UNITTEST_TE;INLINE_TEST_METHOD_MARKUP;WINAPI_FAMILY=WINAPI_FAMILY_DESKTOP_APP;UNIT_TEST_SERVICES;HC_NOZLIB;%(PreprocessorDefinitions)</PreprocessorDefinitions>
<AdditionalUsingDirectories>C:\Program Files (x86)\Windows Kits\10\UnionMetadata;C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\lib\store\references;C:\Program Files (x86)\Microsoft Visual Studio\2019\Enterprise\Common7\IDE\VC\vcpackages;%(AdditionalUsingDirectories)</AdditionalUsingDirectories>
<AdditionalIncludeDirectories>$(VCInstallDir)UnitTest\include;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<AdditionalUsingDirectories>$(WindowsSDK_UnionMetadataPath);$(VCIDEInstallDir)\vcpackages;%(AdditionalUsingDirectories)</AdditionalUsingDirectories>
<AdditionalIncludeDirectories>$(VCInstallDir)UnitTest\include;$(HCSourceDir)\Task;%(AdditionalIncludeDirectories)</AdditionalIncludeDirectories>
<CompileAsWinRT>true</CompileAsWinRT>
</ClCompile>
<Link>

Просмотреть файл

@ -7,7 +7,6 @@
#pragma once
#include <stdint.h>
#include <httpClient/pal.h>
extern "C"
{

Просмотреть файл

@ -130,6 +130,15 @@ HC_DECLARE_TRACE_AREA(WEBSOCKET);
#define ASYNC_LIB_TRACE(result, message) \
HC_TRACE_ERROR_HR(HTTPCLIENT, result, message); \
// Handle tracking for TaskQueue
#define SYSTEM_HANDLE_DEFINE_HELPERS(a, b)
#define SystemHandleAssert(h)
#define SystemHandleMarkCreated(h)
#define SystemHandleMarkDestroyed(h)
// We always use unique handles
#define USE_UNIQUE_HANDLES() (true)
#define CATCH_RETURN() CATCH_RETURN_IMPL(__FILE__, __LINE__)
#define CATCH_RETURN_IMPL(file, line) \

Просмотреть файл

@ -55,9 +55,9 @@ struct AsyncState
const void* identity = nullptr;
const char* identityName = nullptr;
void* operator new(size_t size, size_t additional)
void* operator new(size_t size, size_t additional, const std::nothrow_t& tag)
{
return ::operator new(size + additional);
return ::operator new(size + additional, tag);
}
void operator delete(void* ptr)
@ -388,7 +388,7 @@ static void RevertProviderCleanup(_Inout_ AsyncStateRef& state, _In_ ProviderCle
static HRESULT AllocStateNoCompletion(_Inout_ XAsyncBlock* asyncBlock, _Inout_ AsyncBlockInternal* internal, _In_ size_t contextSize)
{
AsyncStateRef state;
state.Attach(new (contextSize) AsyncState);
state.Attach(new (contextSize, std::nothrow) AsyncState);
RETURN_IF_NULL_ALLOC(state);
if (contextSize != 0)
@ -398,14 +398,23 @@ static HRESULT AllocStateNoCompletion(_Inout_ XAsyncBlock* asyncBlock, _Inout_ A
state->providerData.context = (state.Get() + 1);
}
// Addref the task queue. We duplicate with "Reference" to prevent spamming
// the handle tracker with each async call (and to prevent a needless allocation of
// the task queue handle wrapper).
XTaskQueueHandle queue = asyncBlock->queue;
if (queue == nullptr)
{
RETURN_HR_IF(E_NO_TASK_QUEUE, XTaskQueueGetCurrentProcessTaskQueue(&state->queue) == false);
RETURN_HR_IF(E_NO_TASK_QUEUE, XTaskQueueGetCurrentProcessTaskQueueWithOptions(
XTaskQueueDuplicateOptions::Reference,
&state->queue) == false);
}
else
{
RETURN_IF_FAILED(XTaskQueueDuplicateHandle(queue, &state->queue));
RETURN_IF_FAILED(XTaskQueueDuplicateHandleWithOptions(
queue,
XTaskQueueDuplicateOptions::Reference,
&state->queue));
}
state->userAsyncBlock = asyncBlock;

Просмотреть файл

@ -4,6 +4,7 @@
#include "referenced_ptr.h"
#include "TaskQueueP.h"
#include "TaskQueueImpl.h"
#include "XTaskQueuePriv.h"
//
// Note: ApiDiag is only used for reference count validation during
@ -362,6 +363,13 @@ HRESULT __stdcall TaskQueuePortImpl::QueueItem(
// QueueEntry now owns the ref.
portContextHolder.release();
// If we raced with termination, cancel the item we just added
if (portContext->GetStatus() != TaskQueuePortStatus::Active)
{
CancelPendingEntries(portContext, true);
}
return S_OK;
}
@ -612,6 +620,7 @@ bool TaskQueuePortImpl::DrainOneItem(
entry.callback(entry.callbackContext, IsCallCanceled(entry));
m_processingCallback--;
m_processingCallbackCv.notify_all();
#ifdef _WIN32
// If this entry has a wait registration, it needs
@ -634,6 +643,7 @@ bool TaskQueuePortImpl::DrainOneItem(
else
{
m_processingCallback--;
m_processingCallbackCv.notify_all();
}
if (m_queueList->empty())
@ -747,6 +757,26 @@ bool __stdcall TaskQueuePortImpl::IsEmpty()
return empty;
}
void __stdcall TaskQueuePortImpl::WaitForUnwind()
{
std::mutex mutex;
std::unique_lock<std::mutex> lock(mutex);
while(m_processingCallback.load() != 0)
{
// wait for 10 ms. We do not modify m_processingCallback under
// the protection of a mutex because we don't want the hit of
// taking a lock. Therefore, we can't wait forever for the
// cv here. We could miss it due to a race. This API is only
// called during task queue termination and therefore some polling
// is OK.
std::chrono::milliseconds ms{10};
auto when = std::chrono::steady_clock::time_point(ms);
m_processingCallbackCv.wait_until(lock, when);
}
}
HRESULT __stdcall TaskQueuePortImpl::SuspendTermination(
_In_ ITaskQueuePortContext* portContext
)
@ -979,7 +1009,19 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
}
else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
{
// remove_if works by removing items from the list and
// re-adding them if this callback returns false. If we
// are going to keep an item beyond this callback we need
// to make sure fields we're using stay valid. Only the
// port context is a risk.
if (hasNextItem)
{
nextItem.portContext->Release();
}
nextItem = entry;
nextItem.portContext->AddRef();
hasNextItem = true;
}
@ -988,21 +1030,34 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
if (hasNextItem)
{
while (true)
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
{
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
while (true)
{
m_timer.Start(nextItem.enqueueTime);
break;
}
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
{
m_timer.Start(nextItem.enqueueTime);
break;
}
dueTime = m_timerDue.load();
dueTime = m_timerDue.load();
if (dueTime <= nextItem.enqueueTime)
{
break;
if (dueTime <= nextItem.enqueueTime)
{
break;
}
}
}
else
{
// The port is no longer active. Pending entries are canceled
// when the port is terminated, but if we were iterating above
// it's possible that we removed an item while the termination was
// being processed and it got missed.
CancelPendingEntries(nextItem.portContext, true);
}
nextItem.portContext->Release();
}
else
{
@ -1046,7 +1101,9 @@ void TaskQueuePortImpl::ProcessThreadPoolCallback(_In_ OS::ThreadPoolActionStatu
{
DrainOneItem(status);
}
m_processingCallback--;
m_processingCallbackCv.notify_all();
// Important that this comes before Release; otherwise
// cleanup may deadlock.
@ -1568,10 +1625,17 @@ HRESULT __stdcall TaskQueueImpl::Terminate(
entry.release();
// Addref ourself to ensure we don't lose the queue to a close
// while termination is in flight. This is released on OnTerminationCallback
// while termination is in flight. This is released on OnTerminationCallback.
// If we are waiting we will take another reference and release it
// after the wait completes.
AddRef();
if (wait)
{
AddRef();
}
m_work.Port->Terminate(workToken);
if (wait)
@ -1581,6 +1645,17 @@ HRESULT __stdcall TaskQueueImpl::Terminate(
{
m_termination.cv.wait(lock);
}
// Termination notify happens through a callback and when the
// terminated flag gets set that callback and other frames
// are still on the stack. Wait for them to unwind here so
// if the caller wants to exit the process there isn't code
// on the stack in another thread.
m_work.Port->WaitForUnwind();
m_completion.Port->WaitForUnwind();
Release();
}
return S_OK;
@ -1673,6 +1748,7 @@ static HRESULT CreateTaskQueueHandle(
*queue = q.release();
SystemHandleMarkCreated(*queue);
return S_OK;
}
@ -1821,16 +1897,16 @@ STDAPI_(void) XTaskQueueCloseHandle(
{
ITaskQueue* aq = GetQueue(queue);
// The default handle is only returned for queues
// that cannot be closed.
ASSERT(aq != nullptr && aq->CanClose() != (queue == aq->GetHandle()));
if (aq != nullptr && aq->CanClose())
{
queue->m_signature = 0;
queue->m_queue = nullptr;
delete queue;
if (USE_UNIQUE_HANDLES() && queue != aq->GetHandle())
{
SystemHandleMarkDestroyed(queue);
queue->m_signature = 0;
queue->m_queue = nullptr;
delete queue;
}
aq->Release();
}
@ -1923,10 +1999,12 @@ STDAPI_(void) XTaskQueueUnregisterWaiter(
}
//
// Increments the refcount on the queue
// Increments the refcount on the queue and allows supplying
// options as to how the duplicate is performed.
//
STDAPI XTaskQueueDuplicateHandle(
STDAPI XTaskQueueDuplicateHandleWithOptions(
_In_ XTaskQueueHandle queueHandle,
_In_ XTaskQueueDuplicateOptions options,
_Out_ XTaskQueueHandle* duplicatedHandle
) noexcept
{
@ -1942,7 +2020,15 @@ STDAPI XTaskQueueDuplicateHandle(
if (queue->CanClose())
{
RETURN_IF_FAILED(CreateTaskQueueHandle(queue, duplicatedHandle));
if (USE_UNIQUE_HANDLES() && (options != XTaskQueueDuplicateOptions::Reference))
{
RETURN_IF_FAILED(CreateTaskQueueHandle(queue, duplicatedHandle));
}
else
{
queue->AddRef();
*duplicatedHandle = queue->GetHandle();
}
}
else
{
@ -1952,6 +2038,20 @@ STDAPI XTaskQueueDuplicateHandle(
return S_OK;
}
//
// Increments the refcount on the queue
//
STDAPI XTaskQueueDuplicateHandle(
_In_ XTaskQueueHandle queueHandle,
_Out_ XTaskQueueHandle* duplicatedHandle
) noexcept
{
return XTaskQueueDuplicateHandleWithOptions(
queueHandle,
XTaskQueueDuplicateOptions::None,
duplicatedHandle);
}
//
// Registers a callback that will be called when a new callback
// is submitted. The callback will be directly invoked when
@ -1988,9 +2088,11 @@ STDAPI_(void) XTaskQueueUnregisterMonitor(
//
// Returns a handle to the process task queue, or nullptr if there is no
// process task queue. By default, there is a default process task queue
// that uses the thread pool for both work and completion ports.
// that uses the thread pool for both work and completion ports. This is an
// internal variant that takes duplicate options.
//
STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueue(
STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueueWithOptions(
_In_ XTaskQueueDuplicateOptions options,
_Out_ XTaskQueueHandle* queue
) noexcept
{
@ -2033,19 +2135,26 @@ STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueue(
processQueue = nullptr;
}
if (processQueue != nullptr &&
processQueue->m_queue->CanClose())
if (processQueue != nullptr)
{
(void)CreateTaskQueueHandle(processQueue->m_queue, queue);
}
else
{
*queue = processQueue;
(void)XTaskQueueDuplicateHandleWithOptions(processQueue, options, queue);
}
return (*queue) != nullptr;
}
//
// Returns a handle to the process task queue, or nullptr if there is no
// process task queue. By default, there is a default process task queue
// that uses the thread pool for both work and completion ports.
//
STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueue(
_Out_ XTaskQueueHandle* queue
) noexcept
{
return XTaskQueueGetCurrentProcessTaskQueueWithOptions(XTaskQueueDuplicateOptions::None, queue);
}
//
// Sets the given task queue as the process wide task queue. The
// queue can be set to nullptr, in which case XTaskQueueGetCurrentProcessTaskQueue will

Просмотреть файл

@ -13,6 +13,8 @@
#include "SuspendState.h"
#endif
SYSTEM_HANDLE_DEFINE_HELPERS(XTaskQueueHandle, XSystemHandleType::TaskQueue);
namespace ApiDiag
{
void GlobalAddRef();
@ -202,6 +204,8 @@ public:
bool __stdcall IsEmpty();
void __stdcall WaitForUnwind();
HRESULT __stdcall SuspendTermination(
_In_ ITaskQueuePortContext* portContext);
@ -251,6 +255,7 @@ private:
XTaskQueueDispatchMode m_dispatchMode = XTaskQueueDispatchMode::Manual;
AtomicVector<ITaskQueuePortContext*> m_attachedContexts;
std::atomic<uint32_t> m_processingCallback{ 0 };
std::condition_variable m_processingCallbackCv;
std::mutex m_lock;
std::unique_ptr<LocklessQueue<QueueEntry>> m_queueList;
std::unique_ptr<LocklessQueue<QueueEntry>> m_pendingList;
@ -482,11 +487,24 @@ inline ITaskQueue* GetQueue(XTaskQueueHandle handle)
{
if (handle->m_signature != TASK_QUEUE_SIGNATURE)
{
SystemHandleAssert(handle);
ASSERT("Invalid XTaskQueueHandle");
return nullptr;
}
ITaskQueue* queue = handle->m_queue;
// Only SystemHandleAssert if the handle provided here is
// not the internal queue handle, which is never tracked
// by handle tracking. The internal queue handle is given out
// for the default process queue, when calling back into
// task queue monitors, and when async operations are created.
if (handle != queue->GetHandle())
{
SystemHandleAssert(handle);
}
ASSERT(queue->GetHandle()->m_signature == TASK_QUEUE_SIGNATURE);
return queue;
}

Просмотреть файл

@ -72,6 +72,8 @@ struct ITaskQueuePort: IApi
virtual bool __stdcall IsEmpty() = 0;
virtual void __stdcall WaitForUnwind() = 0;
virtual HRESULT __stdcall SuspendTermination(
_In_ ITaskQueuePortContext* portContext) = 0;

Просмотреть файл

@ -64,3 +64,44 @@ STDAPI_(void) XTaskQueueGlobalSuspend();
/// 2. The dispatcher will start returing items again.
/// </summary>
STDAPI_(void) XTaskQueueGlobalResume();
/// <summary>
/// Options when duplicating a task queue handle.
/// </summary>
enum class XTaskQueueDuplicateOptions
{
/// <summary>
/// Default behavior.
/// </summary>
None,
/// <summary>
/// The duplicated queue is a reference to the actual
/// queue object, not a duplicated queue handle. References
/// work just like fully duplicated handles but they are not
/// tracked by the handle tracking infrastructure and do not
/// cause an allocation for the handle.
/// </summary>
Reference
};
/// <summary>
/// Increments the refcount on the queue and allows supplying
/// options as to how the duplicate is performed.
/// </summary>
STDAPI XTaskQueueDuplicateHandleWithOptions(
_In_ XTaskQueueHandle queueHandle,
_In_ XTaskQueueDuplicateOptions options,
_Out_ XTaskQueueHandle *duplicatedHandle
) noexcept;
/// <summary>
/// Returns a handle to the process task queue, or nullptr if there is no
/// process task queue. By default, there is a default process task queue
/// that uses the thread pool for both work and completion ports. This is an
/// internal variant that takes duplicate options.
/// </summary>
STDAPI_(bool) XTaskQueueGetCurrentProcessTaskQueueWithOptions(
_In_ XTaskQueueDuplicateOptions options,
_Out_ XTaskQueueHandle *queue
) noexcept;

Просмотреть файл

@ -4,9 +4,9 @@
#include "UnitTestIncludes.h"
#include "XAsync.h"
#include "XAsyncProvider.h"
#include "Task\XAsyncProviderPriv.h"
#include "XAsyncProviderPriv.h"
#include "XTaskQueue.h"
#include "Task\XTaskQueuePriv.h"
#include "XTaskQueuePriv.h"
#define TEST_CLASS_OWNER L"brianpe"
@ -224,7 +224,12 @@ private:
{
if (opCode == XAsyncOp::Begin)
{
// Must run the ctor for the newly allocated memory, and the initial
// value has already been copied in here so we must rescue it.
FactorialCallData* d = (FactorialCallData*)data->context;
DWORD value = d->value;
d = new (data->context) FactorialCallData;
d->value = value;
// leak a ref on this guy so we don't try to free it. We need
// to do two addrefs because a new object starts with refcount

Просмотреть файл

@ -2,7 +2,7 @@
#include "pch.h"
#include "UnitTestIncludes.h"
#include "Task/LocklessQueue.h"
#include "LocklessQueue.h"
#define TEST_CLASS_OWNER L"brianpe"

Просмотреть файл

@ -5,7 +5,7 @@
#include "XTaskQueue.h"
#include "CallbackThunk.h"
#include "PumpedTaskQueue.h"
#include "Task/XTaskQueuePriv.h"
#include "XTaskQueuePriv.h"
#define TEST_CLASS_OWNER L"brianpe"
@ -215,12 +215,15 @@ public:
for(int idx = 0; idx < count; idx++)
{
VERIFY_SUCCEEDED(XTaskQueueDuplicateHandle(queue, &dups[idx]));
VERIFY_IS_TRUE(queue != dups[idx]);
}
for(int idx = 0; idx < count; idx++)
{
XTaskQueueCloseHandle(dups[idx]);
}
VERIFY_ARE_EQUAL(E_INVALIDARG, XTaskQueueDuplicateHandle(dups[0], &dups[1]));
XTaskQueueCloseHandle(queue);
}