Merge pull request #382 from microsoft/user/jasbray/update_task_cancel
Fix most remaining task cancellation race conditions
This commit is contained in:
Коммит
1a31f0cf31
|
@ -61,7 +61,7 @@ namespace ARIASDK_NS_BEGIN {
|
|||
if (!m_flushPending)
|
||||
return;
|
||||
}
|
||||
LOG_INFO("Waiting for pending Flush (%p) to complete...", m_flushHandle.m_task.load());
|
||||
LOG_INFO("Waiting for pending Flush (%p) to complete...", m_flushHandle.m_task);
|
||||
m_flushComplete.wait();
|
||||
}
|
||||
|
||||
|
@ -259,7 +259,7 @@ namespace ARIASDK_NS_BEGIN {
|
|||
m_flushPending = true;
|
||||
m_flushComplete.Reset();
|
||||
m_flushHandle = PAL::scheduleTask(&m_taskDispatcher, 0, this, &OfflineStorageHandler::Flush);
|
||||
LOG_INFO("Requested Flush (%p)", m_flushHandle.m_task.load());
|
||||
LOG_INFO("Requested Flush (%p)", m_flushHandle.m_task);
|
||||
}
|
||||
m_flushLock.unlock();
|
||||
}
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include <climits>
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <utility>
|
||||
|
||||
#include "ITaskDispatcher.hpp"
|
||||
#include "Version.hpp"
|
||||
|
@ -59,33 +60,42 @@ namespace PAL_NS_BEGIN {
|
|||
class DeferredCallbackHandle
|
||||
{
|
||||
public:
|
||||
std::atomic<MAT::Task*> m_task;
|
||||
MAT::ITaskDispatcher* m_taskDispatcher;
|
||||
std::mutex m_mutex;
|
||||
MAT::Task* m_task = nullptr;
|
||||
MAT::ITaskDispatcher* m_taskDispatcher = nullptr;
|
||||
|
||||
DeferredCallbackHandle(MAT::Task* task, MAT::ITaskDispatcher* taskDispatcher) :
|
||||
m_task(task),
|
||||
m_taskDispatcher(taskDispatcher) { };
|
||||
DeferredCallbackHandle() : m_task(nullptr), m_taskDispatcher(nullptr) {};
|
||||
DeferredCallbackHandle(const DeferredCallbackHandle& h) :
|
||||
m_task(h.m_task.load()),
|
||||
m_taskDispatcher(h.m_taskDispatcher) { };
|
||||
|
||||
DeferredCallbackHandle& operator=(DeferredCallbackHandle other)
|
||||
DeferredCallbackHandle() {};
|
||||
DeferredCallbackHandle(DeferredCallbackHandle&& h)
|
||||
{
|
||||
m_task = other.m_task.load();
|
||||
*this = std::move(h);
|
||||
};
|
||||
|
||||
DeferredCallbackHandle& operator=(DeferredCallbackHandle&& other)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
std::lock_guard<std::mutex> otherLock(other.m_mutex);
|
||||
m_task = other.m_task;
|
||||
other.m_task = nullptr;
|
||||
m_taskDispatcher = other.m_taskDispatcher;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool Cancel(uint64_t waitTime = 0)
|
||||
{
|
||||
MAT::Task* m_current_task = m_task.exchange(nullptr);
|
||||
if (m_current_task)
|
||||
std::lock_guard<std::mutex> lock(m_mutex);
|
||||
if (m_task)
|
||||
{
|
||||
bool result = (m_taskDispatcher != nullptr) && (m_taskDispatcher->Cancel(m_current_task, waitTime));
|
||||
bool result = (m_taskDispatcher != nullptr) && (m_taskDispatcher->Cancel(m_task, waitTime));
|
||||
return result;
|
||||
}
|
||||
return false;
|
||||
else {
|
||||
// Canceled nothing successfully
|
||||
return true;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -7,11 +7,6 @@
|
|||
/* Maximum scheduler interval for SDK is 1 hour required for clamping in case of monotonic clock drift */
|
||||
#define MAX_FUTURE_DELTA_MS (60 * 60 * 1000)
|
||||
|
||||
// Polling interval for task cancellation can be customized at compile-time
|
||||
#ifndef TASK_CANCEL_WAIT_MS
|
||||
#define TASK_CANCEL_WAIT_MS 50
|
||||
#endif
|
||||
|
||||
namespace PAL_NS_BEGIN {
|
||||
|
||||
class WorkerThreadShutdownItem : public Task
|
||||
|
@ -31,6 +26,7 @@ namespace PAL_NS_BEGIN {
|
|||
|
||||
// TODO: [MG] - investigate all the cases why we need recursive here
|
||||
std::recursive_mutex m_lock;
|
||||
std::timed_mutex m_execution_mutex;
|
||||
|
||||
std::list<MAT::Task*> m_queue;
|
||||
std::list<MAT::Task*> m_timerQueue;
|
||||
|
@ -100,7 +96,7 @@ namespace PAL_NS_BEGIN {
|
|||
//
|
||||
// - acquire the m_lock to prevent a new task from getting scheduled.
|
||||
// This may block the scheduling of a new task in queue for up to
|
||||
// TASK_CANCEL_WAIT_MS=50 ms in case if the task being canceled
|
||||
// waitTime in case if the task being canceled
|
||||
// is the one being executed right now.
|
||||
//
|
||||
// - if currently executing task is the one we are trying to cancel,
|
||||
|
@ -134,12 +130,19 @@ namespace PAL_NS_BEGIN {
|
|||
/* Can't recursively wait on completion of our own thread */
|
||||
if (m_hThread.get_id() != std::this_thread::get_id())
|
||||
{
|
||||
while ((waitTime > TASK_CANCEL_WAIT_MS) && (m_itemInProgress == item))
|
||||
if (waitTime > 0 && m_execution_mutex.try_lock_for(std::chrono::milliseconds(waitTime)))
|
||||
{
|
||||
PAL::sleep(TASK_CANCEL_WAIT_MS);
|
||||
waitTime -= TASK_CANCEL_WAIT_MS;
|
||||
m_itemInProgress = nullptr;
|
||||
m_execution_mutex.unlock();
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// The SDK may attempt to cancel itself from within its own task.
|
||||
// Return true and assume that the current task will finish, and therefore be cancelled.
|
||||
return true;
|
||||
}
|
||||
|
||||
/* Either waited long enough or the task is still executing. Return:
|
||||
* true - if item in progress is different than item (other task)
|
||||
* false - if item in progress is still the same (didn't wait long enough)
|
||||
|
@ -153,7 +156,6 @@ namespace PAL_NS_BEGIN {
|
|||
// Still in the queue
|
||||
m_timerQueue.erase(it);
|
||||
delete item;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
#if 0
|
||||
|
@ -167,7 +169,7 @@ namespace PAL_NS_BEGIN {
|
|||
Sleep(10);
|
||||
}
|
||||
#endif
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
protected:
|
||||
|
@ -229,13 +231,20 @@ namespace PAL_NS_BEGIN {
|
|||
break;
|
||||
}
|
||||
|
||||
LOG_TRACE("%10llu Execute item=%p type=%s\n", wakeupCount, item.get(), item.get()->TypeName.c_str() );
|
||||
(*item)();
|
||||
self->m_itemInProgress = nullptr;
|
||||
{
|
||||
std::lock_guard<std::timed_mutex> lock(self->m_execution_mutex);
|
||||
|
||||
if (item.get()) {
|
||||
item->Type = MAT::Task::Done;
|
||||
item.reset();
|
||||
// Item wasn't cancelled before it could be executed
|
||||
if (self->m_itemInProgress != nullptr) {
|
||||
LOG_TRACE("%10llu Execute item=%p type=%s\n", wakeupCount, item.get(), item.get()->TypeName.c_str() );
|
||||
(*item)();
|
||||
self->m_itemInProgress = nullptr;
|
||||
}
|
||||
|
||||
if (item) {
|
||||
item->Type = MAT::Task::Done;
|
||||
item = nullptr;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -81,6 +81,8 @@ namespace ARIASDK_NS_BEGIN {
|
|||
|
||||
// cancel all pending and force-finish all uploads
|
||||
stopTimes[1] = GetUptimeMs();
|
||||
// TODO: Should this still pause, since the TPM now has abort logic in addition to pause logic?
|
||||
// hcm.cancelAllRequests is also part of pause, so the logic is definitely redundant. Issue 387
|
||||
onPause();
|
||||
hcm.cancelAllRequests();
|
||||
tpm.finishAllUploads();
|
||||
|
|
|
@ -255,6 +255,7 @@ namespace ARIASDK_NS_BEGIN {
|
|||
// Called from finishAllUploads
|
||||
void TransmissionPolicyManager::handleFinishAllUploads()
|
||||
{
|
||||
// TODO: This pause appears to server no practical purpose? Issue 387
|
||||
pauseAllUploads();
|
||||
allUploadsFinished(); // calls stats.onStop >> this->flushTaskDispatcher;
|
||||
}
|
||||
|
|
|
@ -130,7 +130,14 @@ namespace ARIASDK_NS_BEGIN {
|
|||
{
|
||||
uint64_t cancelWaitTimeMs = (m_scheduledUploadAborted) ? UPLOAD_TASK_CANCEL_TIME_MS : 0;
|
||||
bool result = m_scheduledUpload.Cancel(cancelWaitTimeMs);
|
||||
m_isUploadScheduled.exchange(false);
|
||||
|
||||
// TODO: There is a potential for upload tasks to not be canceled, especially if they aren't waited for.
|
||||
// We either need a stronger guarantee here (could impact SDK performance), or a mechanism to
|
||||
// ensure those tasks are canceled when the log manager is destroyed. Issue 388
|
||||
if (result)
|
||||
{
|
||||
m_isUploadScheduled.exchange(false);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче