From 0541869696b5d5175754bbc32fae3d9987f9296f Mon Sep 17 00:00:00 2001 From: Brian Pepin Date: Tue, 25 Aug 2020 22:42:01 -0700 Subject: [PATCH] Fix for possible spin during queue termination (#539) If a task queue is terminated while tasks are still being dispatched, the logic that drains the queue before termination can get into an infinite loop. The drain logic needs to pop items off the queue and those that should not be removed are re-added. The first one to be re-added is remembered as a sentinel so the loop knows when to quit. If that item gets dispatched, the loop never terminates. With this change we put all the items to re-add to the queue into a local buffer and then push them back onto the queue later. --- Source/Task/LocklessQueue.h | 34 ++++++++++ Source/Task/TaskQueue.cpp | 125 +++++++++--------------------------- 2 files changed, 64 insertions(+), 95 deletions(-) diff --git a/Source/Task/LocklessQueue.h b/Source/Task/LocklessQueue.h index 2b61eec5..2e63ab79 100644 --- a/Source/Task/LocklessQueue.h +++ b/Source/Task/LocklessQueue.h @@ -271,6 +271,40 @@ public: return false; } + // + // Removes items from the queue that satisfy the given callback. The callback + // is of the form: + // + // bool callback(TData& data, uint64_t address); + // + // If the callback returns true, it is taking ownership of the data and the + // address. If it returns false, the node is placed back on the queue. + // + // This is a lock-free call: if there are interleaving calls to push_back + // while this action is in progress final node order is not guaranteed (nodes + // this API processes may be interleaved with newly pushed nodes). + // + template + void remove_if(TCallback callback) + { + LocklessQueue retain(*this); + TData entry; + uint64_t address; + + while (pop_front(entry, address)) + { + if (!callback(entry, address)) + { + retain.move_back(std::move(entry), address); + } + } + + while (retain.pop_front(entry, address)) + { + move_back(std::move(entry), address); + } + } + private: /* diff --git a/Source/Task/TaskQueue.cpp b/Source/Task/TaskQueue.cpp index c8630654..e4afdcaa 100644 --- a/Source/Task/TaskQueue.cpp +++ b/Source/Task/TaskQueue.cpp @@ -696,38 +696,19 @@ void __stdcall TaskQueuePortImpl::ResumeTermination( // Removed the last external callback. Look for // parked terminations and reschedule them. - TerminationEntry *entry; - TerminationEntry *cycleEntry = nullptr; - uint64_t address; - - while (m_pendingTerminationList->pop_front(entry, address)) + m_pendingTerminationList->remove_if([&](auto& entry, auto address) { - if (entry == cycleEntry) - { - // We've wrapped - m_pendingTerminationList->push_back(entry, address); - break; - } - if (entry->portContext == portContext) { // This entry is for the port that's resuming, // we can schedule it. entry->node = address; ScheduleTermination(entry); + return true; } - else - { - // This entry is for another port context so - // we put it back for later. - if (cycleEntry == nullptr) - { - cycleEntry = entry; - } - m_pendingTerminationList->push_back(entry, address); - } - } + return false; + }); } } @@ -807,37 +788,23 @@ void TaskQueuePortImpl::CancelPendingEntries( m_timer.Cancel(); m_timerDue = UINT64_MAX; - - QueueEntry queueEntry; - uint64_t queueEntryNode; - uint64_t initialPushedId = UINT64_MAX; - while(m_pendingList->pop_front(queueEntry, queueEntryNode)) + m_pendingList->remove_if([&](auto& entry, auto address) { - if (queueEntry.id == initialPushedId) + if (entry.portContext == portContext) { - m_pendingList->push_back(queueEntry, queueEntryNode); - break; + if (!appendToQueue || !AppendEntry(entry, address)) + { + entry.portContext->Release(); + m_pendingList->free_node(address); + } + + return true; } - if (queueEntry.portContext == portContext) - { - if (!appendToQueue || !AppendEntry(queueEntry, queueEntryNode)) - { - queueEntry.portContext->Release(); - m_pendingList->free_node(queueEntryNode); - } - } - else - { - if (initialPushedId == UINT64_MAX) - { - initialPushedId = queueEntry.id; - } - m_pendingList->push_back(queueEntry, queueEntryNode); - } - } - + return false; + }); + SubmitPendingCallback(); #ifdef _WIN32 @@ -895,40 +862,23 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback( dueEntryNode = 0; - // We pop items off the pending list till it is empty, looking for - // our due time. We can do this without a lock because we only call - // this once the timer has come due, so there will be nothing else competing - // and messing with this list. We must not rely on a lock here because - // we need to allow QueueItem to remain lock free. - - LocklessQueue pendingList(*(m_pendingList.get())); - uint64_t node; - QueueEntry entry; - - while (m_pendingList->pop_front(entry, node)) + m_pendingList->remove_if([&](auto& entry, auto address) { if (!hasDueEntry && entry.enqueueTime == dueTime) { dueEntry = entry; - dueEntryNode = node; + dueEntryNode = address; hasDueEntry = true; + return true; } else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime) { nextItem = entry; hasNextItem = true; - pendingList.push_back(entry, node); } - else - { - pendingList.push_back(entry, node); - } - } - while (pendingList.pop_front(entry, node)) - { - m_pendingList->push_back(entry, node); - } + return false; + }); if (hasNextItem) { @@ -1009,33 +959,18 @@ void TaskQueuePortImpl::SignalQueue() void TaskQueuePortImpl::SignalTerminations() { - uint64_t termNode; - TerminationEntry* term; - TerminationEntry* initialPushBackNode = nullptr; - - while(m_terminationList->pop_front(term, termNode)) + m_terminationList->remove_if([this](auto& entry, auto address) { - if (term == initialPushBackNode) + if (entry->portContext->GetStatus() == TaskQueuePortStatus::Terminated) { - m_terminationList->push_back(term, termNode); - break; + entry->callback(entry->callbackContext); + m_terminationList->free_node(address); + delete entry; + return true; } - - if (term->portContext->GetStatus() == TaskQueuePortStatus::Terminated) - { - term->callback(term->callbackContext); - delete term; - m_terminationList->free_node(termNode); - } - else - { - if (initialPushBackNode == nullptr) - { - initialPushBackNode = term; - } - m_terminationList->push_back(term, termNode); - } - } + + return false; + }); } void TaskQueuePortImpl::ScheduleTermination(