зеркало из https://github.com/microsoft/CCF.git
Fixing Task Leaks, take 2 (#1452)
This commit is contained in:
Родитель
21e388f6df
Коммит
f6a83a2983
|
@ -7,7 +7,6 @@ pr:
|
|||
- .daily.yml
|
||||
- .azure-pipelines-templates/*
|
||||
|
||||
|
||||
trigger: none
|
||||
|
||||
schedules:
|
||||
|
|
|
@ -52,6 +52,8 @@ TEST_CASE("Unpopped messages are freed")
|
|||
tm.add_task<Foo>(0, std::move(m2));
|
||||
// Task is owned by the queue, hasn't run
|
||||
CHECK(Foo::count == 1);
|
||||
|
||||
tm.drop_tasks();
|
||||
}
|
||||
// Task payload (and TMsg) is also freed if it hasn't run
|
||||
// but the queue was destructed
|
||||
|
|
|
@ -46,6 +46,8 @@ namespace threading
|
|||
}
|
||||
#endif
|
||||
|
||||
class ThreadMessaging;
|
||||
|
||||
class Task
|
||||
{
|
||||
#ifdef USE_MPSCQ
|
||||
|
@ -65,40 +67,6 @@ namespace threading
|
|||
#endif
|
||||
}
|
||||
|
||||
~Task()
|
||||
{
|
||||
#ifdef USE_MPSCQ
|
||||
while (!queue.is_empty())
|
||||
{
|
||||
ThreadMsg* current;
|
||||
bool result;
|
||||
std::tie(current, result) = queue.dequeue();
|
||||
if (result)
|
||||
{
|
||||
delete current;
|
||||
}
|
||||
}
|
||||
#else
|
||||
while (true)
|
||||
{
|
||||
if (local_msg == nullptr && item_head != nullptr)
|
||||
{
|
||||
local_msg = item_head.exchange(nullptr);
|
||||
reverse_local_messages();
|
||||
}
|
||||
|
||||
if (local_msg == nullptr)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
ThreadMsg* current = local_msg;
|
||||
local_msg = local_msg->next;
|
||||
delete current;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
bool run_next_task()
|
||||
{
|
||||
#ifdef USE_MPSCQ
|
||||
|
@ -169,6 +137,42 @@ namespace threading
|
|||
local_msg = prev;
|
||||
}
|
||||
#endif
|
||||
|
||||
void drop()
|
||||
{
|
||||
#ifdef USE_MPSCQ
|
||||
while (!queue.is_empty())
|
||||
{
|
||||
ThreadMsg* current;
|
||||
bool result;
|
||||
std::tie(current, result) = queue.dequeue();
|
||||
if (result)
|
||||
{
|
||||
delete current;
|
||||
}
|
||||
}
|
||||
#else
|
||||
while (true)
|
||||
{
|
||||
if (local_msg == nullptr && item_head != nullptr)
|
||||
{
|
||||
local_msg = item_head.exchange(nullptr);
|
||||
reverse_local_messages();
|
||||
}
|
||||
|
||||
if (local_msg == nullptr)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
ThreadMsg* current = local_msg;
|
||||
local_msg = local_msg->next;
|
||||
delete current;
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
friend ThreadMessaging;
|
||||
};
|
||||
|
||||
class ThreadMessaging
|
||||
|
@ -188,6 +192,17 @@ namespace threading
|
|||
tasks(num_threads)
|
||||
{}
|
||||
|
||||
// Drop all pending tasks, this is only ever to be used
|
||||
// on shutdown, to avoid leaks, and after all thread but
|
||||
// the main one have been shut down.
|
||||
void drop_tasks()
|
||||
{
|
||||
for (auto& t : tasks)
|
||||
{
|
||||
t.drop();
|
||||
}
|
||||
}
|
||||
|
||||
void set_finished(bool v = true)
|
||||
{
|
||||
finished.store(v);
|
||||
|
|
|
@ -20,6 +20,7 @@ static uint8_t* reserved_memory;
|
|||
std::atomic<std::chrono::milliseconds> logger::config::ms =
|
||||
std::chrono::milliseconds::zero();
|
||||
std::atomic<uint16_t> num_pending_threads = 0;
|
||||
std::atomic<uint16_t> num_complete_threads = 0;
|
||||
|
||||
threading::ThreadMessaging threading::ThreadMessaging::thread_messaging;
|
||||
std::atomic<uint16_t> threading::ThreadMessaging::thread_count = 0;
|
||||
|
@ -126,11 +127,21 @@ extern "C"
|
|||
|
||||
if (tid == 0)
|
||||
{
|
||||
return e.load()->run_main();
|
||||
auto s = e.load()->run_main();
|
||||
while (num_complete_threads !=
|
||||
threading::ThreadMessaging::thread_count - 1)
|
||||
{
|
||||
}
|
||||
// All threads are done, we can drop any remaining tasks safely and
|
||||
// completely
|
||||
threading::ThreadMessaging::thread_messaging.drop_tasks();
|
||||
return s;
|
||||
}
|
||||
else
|
||||
{
|
||||
return e.load()->run_worker();
|
||||
auto s = e.load()->run_worker();
|
||||
num_complete_threads.fetch_add(1);
|
||||
return s;
|
||||
}
|
||||
}
|
||||
else
|
||||
|
|
Загрузка…
Ссылка в новой задаче