Fix for possible spin during queue termination (#539)

If a task queue is terminated while tasks are still being dispatched, the logic that drains the queue before termination can get into an infinite loop. The drain logic needs to pop items off the queue and those that should not be removed are re-added. The first one to be re-added is remembered as a sentinel so the loop knows when to quit. If that item gets dispatched, the loop never terminates.

With this change we put all the items to re-add to the queue into a local buffer and then push them back onto the queue later.
This commit is contained in:
Brian Pepin 2020-08-25 22:42:01 -07:00 коммит произвёл GitHub
Родитель c5d7b2b40a
Коммит 0541869696
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 64 добавлений и 95 удалений

Просмотреть файл

@ -271,6 +271,40 @@ public:
return false;
}
//
// Removes items from the queue that satisfy the given callback. The callback
// is of the form:
//
// bool callback(TData& data, uint64_t address);
//
// If the callback returns true, it is taking ownership of the data and the
// address. If it returns false, the node is placed back on the queue.
//
// This is a lock-free call: if there are interleaving calls to push_back
// while this action is in progress final node order is not guaranteed (nodes
// this API processes may be interleaved with newly pushed nodes).
//
template <typename TCallback>
void remove_if(TCallback callback)
{
LocklessQueue<TData> retain(*this);
TData entry;
uint64_t address;
while (pop_front(entry, address))
{
if (!callback(entry, address))
{
retain.move_back(std::move(entry), address);
}
}
while (retain.pop_front(entry, address))
{
move_back(std::move(entry), address);
}
}
private:
/*

Просмотреть файл

@ -696,38 +696,19 @@ void __stdcall TaskQueuePortImpl::ResumeTermination(
// Removed the last external callback. Look for
// parked terminations and reschedule them.
TerminationEntry *entry;
TerminationEntry *cycleEntry = nullptr;
uint64_t address;
while (m_pendingTerminationList->pop_front(entry, address))
m_pendingTerminationList->remove_if([&](auto& entry, auto address)
{
if (entry == cycleEntry)
{
// We've wrapped
m_pendingTerminationList->push_back(entry, address);
break;
}
if (entry->portContext == portContext)
{
// This entry is for the port that's resuming,
// we can schedule it.
entry->node = address;
ScheduleTermination(entry);
return true;
}
else
{
// This entry is for another port context so
// we put it back for later.
if (cycleEntry == nullptr)
{
cycleEntry = entry;
}
m_pendingTerminationList->push_back(entry, address);
}
}
return false;
});
}
}
@ -807,37 +788,23 @@ void TaskQueuePortImpl::CancelPendingEntries(
m_timer.Cancel();
m_timerDue = UINT64_MAX;
QueueEntry queueEntry;
uint64_t queueEntryNode;
uint64_t initialPushedId = UINT64_MAX;
while(m_pendingList->pop_front(queueEntry, queueEntryNode))
m_pendingList->remove_if([&](auto& entry, auto address)
{
if (queueEntry.id == initialPushedId)
if (entry.portContext == portContext)
{
m_pendingList->push_back(queueEntry, queueEntryNode);
break;
if (!appendToQueue || !AppendEntry(entry, address))
{
entry.portContext->Release();
m_pendingList->free_node(address);
}
return true;
}
if (queueEntry.portContext == portContext)
{
if (!appendToQueue || !AppendEntry(queueEntry, queueEntryNode))
{
queueEntry.portContext->Release();
m_pendingList->free_node(queueEntryNode);
}
}
else
{
if (initialPushedId == UINT64_MAX)
{
initialPushedId = queueEntry.id;
}
m_pendingList->push_back(queueEntry, queueEntryNode);
}
}
return false;
});
SubmitPendingCallback();
#ifdef _WIN32
@ -895,40 +862,23 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
dueEntryNode = 0;
// We pop items off the pending list till it is empty, looking for
// our due time. We can do this without a lock because we only call
// this once the timer has come due, so there will be nothing else competing
// and messing with this list. We must not rely on a lock here because
// we need to allow QueueItem to remain lock free.
LocklessQueue<QueueEntry> pendingList(*(m_pendingList.get()));
uint64_t node;
QueueEntry entry;
while (m_pendingList->pop_front(entry, node))
m_pendingList->remove_if([&](auto& entry, auto address)
{
if (!hasDueEntry && entry.enqueueTime == dueTime)
{
dueEntry = entry;
dueEntryNode = node;
dueEntryNode = address;
hasDueEntry = true;
return true;
}
else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
{
nextItem = entry;
hasNextItem = true;
pendingList.push_back(entry, node);
}
else
{
pendingList.push_back(entry, node);
}
}
while (pendingList.pop_front(entry, node))
{
m_pendingList->push_back(entry, node);
}
return false;
});
if (hasNextItem)
{
@ -1009,33 +959,18 @@ void TaskQueuePortImpl::SignalQueue()
void TaskQueuePortImpl::SignalTerminations()
{
uint64_t termNode;
TerminationEntry* term;
TerminationEntry* initialPushBackNode = nullptr;
while(m_terminationList->pop_front(term, termNode))
m_terminationList->remove_if([this](auto& entry, auto address)
{
if (term == initialPushBackNode)
if (entry->portContext->GetStatus() == TaskQueuePortStatus::Terminated)
{
m_terminationList->push_back(term, termNode);
break;
entry->callback(entry->callbackContext);
m_terminationList->free_node(address);
delete entry;
return true;
}
if (term->portContext->GetStatus() == TaskQueuePortStatus::Terminated)
{
term->callback(term->callbackContext);
delete term;
m_terminationList->free_node(termNode);
}
else
{
if (initialPushBackNode == nullptr)
{
initialPushBackNode = term;
}
m_terminationList->push_back(term, termNode);
}
}
return false;
});
}
void TaskQueuePortImpl::ScheduleTermination(