chore: convert timers to use an array for the instance (#208)

Co-authored-by: Jelani Brandon <jebrando@microsoft.com>
This commit is contained in:
Jelani Brandon 2023-01-24 22:10:08 -08:00 коммит произвёл GitHub
Родитель 98d44838dc
Коммит e5d532abb2
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
2 изменённых файлов: 172 добавлений и 24 удалений

Просмотреть файл

@ -9,7 +9,6 @@
#include <time.h> #include <time.h>
#include <string.h> #include <string.h>
#include <signal.h> #include <signal.h>
#include <errno.h>
#include <bits/types/__sigval_t.h> // for __sigval_t #include <bits/types/__sigval_t.h> // for __sigval_t
#include <bits/types/sigevent_t.h> // for sigevent, sigev_notify_fun... #include <bits/types/sigevent_t.h> // for sigevent, sigev_notify_fun...
@ -35,6 +34,17 @@
#define DEFAULT_TASK_ARRAY_SIZE 2048 #define DEFAULT_TASK_ARRAY_SIZE 2048
#define MILLISEC_TO_NANOSEC 1000000 #define MILLISEC_TO_NANOSEC 1000000
#define TP_SEMAPHORE_TIMEOUT_MS (100*MILLISEC_TO_NANOSEC) // The timespec value is in nanoseconds so need to multiply to get 100 MS #define TP_SEMAPHORE_TIMEOUT_MS (100*MILLISEC_TO_NANOSEC) // The timespec value is in nanoseconds so need to multiply to get 100 MS
#define MAX_TIMER_INSTANCE_COUNT 64
#define timespec_diff_macro(a, b, result) \
do { \
result.tv_sec = (a).tv_sec - (b).tv_sec; \
result.tv_nsec = (a).tv_nsec - (b).tv_nsec; \
if ((result).tv_nsec < 0) { \
--(result).tv_sec; \
(result).tv_nsec += 1000000000; \
} \
} while (0)
#define TASK_RESULT_VALUES \ #define TASK_RESULT_VALUES \
TASK_NOT_USED, \ TASK_NOT_USED, \
@ -45,6 +55,14 @@
MU_DEFINE_ENUM(TASK_RESULT, TASK_RESULT_VALUES); MU_DEFINE_ENUM(TASK_RESULT, TASK_RESULT_VALUES);
MU_DEFINE_ENUM_STRINGS(TASK_RESULT, TASK_RESULT_VALUES) MU_DEFINE_ENUM_STRINGS(TASK_RESULT, TASK_RESULT_VALUES)
#define TIMER_INSTANCE_STATUS_VALUES \
TIMER_ENABLED, \
TIMER_ENABLING, \
TIMER_DISABLED
MU_DEFINE_ENUM(TIMER_INSTANCE_STATUS, TIMER_INSTANCE_STATUS_VALUES);
MU_DEFINE_ENUM_STRINGS(TIMER_INSTANCE_STATUS, TIMER_INSTANCE_STATUS_VALUES)
MU_DEFINE_ENUM_STRINGS(THREADPOOL_OPEN_RESULT, THREADPOOL_OPEN_RESULT_VALUES) MU_DEFINE_ENUM_STRINGS(THREADPOOL_OPEN_RESULT, THREADPOOL_OPEN_RESULT_VALUES)
#define THREADPOOL_STATE_VALUES \ #define THREADPOOL_STATE_VALUES \
@ -61,6 +79,10 @@ typedef struct TIMER_INSTANCE_TAG
THREADPOOL_WORK_FUNCTION work_function; THREADPOOL_WORK_FUNCTION work_function;
void* work_function_ctx; void* work_function_ctx;
timer_t time_id; timer_t time_id;
volatile_atomic int32_t timer_status;
volatile_atomic int64_t time_sec;
struct timespec start_time;
volatile_atomic int32_t timer_trigger;
} TIMER_INSTANCE; } TIMER_INSTANCE;
typedef struct THREADPOOL_TASK_TAG typedef struct THREADPOOL_TASK_TAG
@ -92,8 +114,20 @@ typedef struct THREADPOOL_TAG
uint32_t list_index; uint32_t list_index;
THREAD_HANDLE* thread_handle_array; THREAD_HANDLE* thread_handle_array;
// Due to the fact that there is a race condition in the POSIX
// timer where it can send the callback after the timer has been deleted
// so we need to not allocate the TIMER_INSTANCES
TIMER_INSTANCE timer_instance[MAX_TIMER_INSTANCE_COUNT];
} THREADPOOL; } THREADPOOL;
// POSIX Bug:
// The Timer can be triggered after a timer_delete call has been issued
// The following is the reason:
// If the time is set to trigger at 01:00:24:05
// and the timer_delete occurs at 01:00:24:01
// the callback will be triggered. The code here is to guard against
// this from making unaccounted for timer callbacks
static void on_timer_callback(sigval_t timer_data) static void on_timer_callback(sigval_t timer_data)
{ {
TIMER_INSTANCE* timer_instance = timer_data.sival_ptr; TIMER_INSTANCE* timer_instance = timer_data.sival_ptr;
@ -103,7 +137,41 @@ static void on_timer_callback(sigval_t timer_data)
} }
else else
{ {
timer_instance->work_function(timer_instance->work_function_ctx); if (interlocked_add(&timer_instance->timer_status, 0) == TIMER_ENABLED)
{
// Make sure that if this timer has not been triggered previously that we
// don't go through the logic again
if (interlocked_add(&timer_instance->timer_trigger, 0) == 0)
{
// Increment the timer trigger so we know it won't go again
interlocked_increment(&timer_instance->timer_trigger);
struct timespec ts;
if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
{
LogError("Failure getting time from clock");
}
else
{
struct timespec result;
timespec_diff_macro(ts, timer_instance->start_time, result);
if (result.tv_sec - interlocked_add_64(&timer_instance->time_sec, 0) < 0)
{
// The timer triggered too soon, this is to combat a race condition in POSIX
// timer if you delete it too soon.
LogVerbose("Timer triggered, but time value was invalid, waiting for next trigger");
}
else
{
timer_instance->work_function(timer_instance->work_function_ctx);
}
}
}
else
{
timer_instance->work_function(timer_instance->work_function_ctx);
}
}
} }
} }
@ -291,6 +359,28 @@ static int reallocate_threadpool_array(THREADPOOL* threadpool)
return result; return result;
} }
static TIMER_INSTANCE* get_next_timer_instance(THREADPOOL* threadpool)
{
TIMER_INSTANCE* result;
// Loop through the list and find the first disabled timer
int32_t index;
for (index = 0; index < MAX_TIMER_INSTANCE_COUNT; index++)
{
if (interlocked_compare_exchange(&threadpool->timer_instance[index].timer_status, TIMER_ENABLING, TIMER_DISABLED) == TIMER_DISABLED)
{
result = &threadpool->timer_instance[index];
break;
}
}
if (index == MAX_TIMER_INSTANCE_COUNT)
{
result = NULL;
LogError("Failure All timers instances are in use");
}
return result;
}
THREADPOOL_HANDLE threadpool_create(EXECUTION_ENGINE_HANDLE execution_engine) THREADPOOL_HANDLE threadpool_create(EXECUTION_ENGINE_HANDLE execution_engine)
{ {
THREADPOOL* result; THREADPOOL* result;
@ -362,6 +452,11 @@ THREADPOOL_HANDLE threadpool_create(EXECUTION_ENGINE_HANDLE execution_engine)
(void)interlocked_exchange_64(&result->insert_idx, -1); (void)interlocked_exchange_64(&result->insert_idx, -1);
(void)interlocked_exchange_64(&result->consume_idx, -1); (void)interlocked_exchange_64(&result->consume_idx, -1);
for (int32_t index = 0; index < MAX_TIMER_INSTANCE_COUNT; index++)
{
(void)interlocked_exchange(&result->timer_instance[index].timer_status, TIMER_DISABLED);
}
goto all_ok; goto all_ok;
} }
srw_lock_destroy(result->srw_lock); srw_lock_destroy(result->srw_lock);
@ -545,15 +640,20 @@ int threadpool_timer_start(THREADPOOL_HANDLE threadpool, uint32_t start_delay_ms
} }
else else
{ {
TIMER_INSTANCE* timer_instance = malloc(sizeof(TIMER_INSTANCE)); TIMER_INSTANCE* timer_instance = get_next_timer_instance(threadpool);
if (timer_instance == NULL) if (timer_instance == NULL)
{ {
LogError("Failure allocating Timer Instance"); LogError("Failure getting Timer Instance all timers are being used");
}
else if (clock_gettime(CLOCK_REALTIME, &timer_instance->start_time) == -1)
{
LogError("Failure clock_gettime");
} }
else else
{ {
timer_instance->work_function = work_function; timer_instance->work_function = work_function;
timer_instance->work_function_ctx = work_function_ctx; timer_instance->work_function_ctx = work_function_ctx;
(void)interlocked_exchange(&timer_instance->timer_status, TIMER_ENABLED);
// Setup the timer // Setup the timer
struct sigevent sigev = {0}; struct sigevent sigev = {0};
@ -565,9 +665,7 @@ int threadpool_timer_start(THREADPOOL_HANDLE threadpool, uint32_t start_delay_ms
if (timer_create(CLOCK_REALTIME, &sigev, &time_id) != 0) if (timer_create(CLOCK_REALTIME, &sigev, &time_id) != 0)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_create.");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_create. Error: %d: %s", errno, err_msg);
} }
else else
{ {
@ -576,15 +674,15 @@ int threadpool_timer_start(THREADPOOL_HANDLE threadpool, uint32_t start_delay_ms
its.it_value.tv_nsec = start_delay_ms * MILLISEC_TO_NANOSEC % 1000000000; its.it_value.tv_nsec = start_delay_ms * MILLISEC_TO_NANOSEC % 1000000000;
its.it_interval.tv_sec = timer_period_ms / 1000; its.it_interval.tv_sec = timer_period_ms / 1000;
its.it_interval.tv_nsec = timer_period_ms * MILLISEC_TO_NANOSEC % 1000000000; its.it_interval.tv_nsec = timer_period_ms * MILLISEC_TO_NANOSEC % 1000000000;
(void)interlocked_exchange(&timer_instance->timer_trigger, TIMER_ENABLED);
if (timer_settime(time_id, 0, &its, NULL) == -1) if (timer_settime(time_id, 0, &its, NULL) == -1)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_settime");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_settime. Error: %s", MU_P_OR_NULL(err_msg));
} }
else else
{ {
(void)interlocked_exchange_64(&timer_instance->time_sec, its.it_interval.tv_sec);
timer_instance->time_id = time_id; timer_instance->time_id = time_id;
*timer_handle = timer_instance; *timer_handle = timer_instance;
result = 0; result = 0;
@ -593,12 +691,10 @@ int threadpool_timer_start(THREADPOOL_HANDLE threadpool, uint32_t start_delay_ms
if (timer_delete(time_id) != 0) if (timer_delete(time_id) != 0)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_delete.");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_delete. Error: %d: (%s)", errno, err_msg);
} }
} }
free(timer_instance); (void)interlocked_exchange(&timer_instance->timer_status, TIMER_DISABLED);
} }
result = MU_FAILURE; result = MU_FAILURE;
} }
@ -627,9 +723,7 @@ int threadpool_timer_restart(TIMER_INSTANCE_HANDLE timer, uint32_t start_delay_m
if (timer_settime(timer->time_id, 0, &its, NULL) != 0) if (timer_settime(timer->time_id, 0, &its, NULL) != 0)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_settime.");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_settime. Error: %d: (%s)", errno, err_msg);
result = MU_FAILURE; result = MU_FAILURE;
} }
else else
@ -651,9 +745,7 @@ void threadpool_timer_cancel(TIMER_INSTANCE_HANDLE timer)
struct itimerspec its = {0}; struct itimerspec its = {0};
if (timer_settime(timer->time_id, 0, &its, NULL) != 0) if (timer_settime(timer->time_id, 0, &its, NULL) != 0)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_settime");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_settime. Error: %d: (%s)", errno, err_msg);
} }
else else
{ {
@ -672,14 +764,12 @@ void threadpool_timer_destroy(TIMER_INSTANCE_HANDLE timer)
{ {
if (timer_delete(timer->time_id) != 0) if (timer_delete(timer->time_id) != 0)
{ {
char err_msg[128]; LogErrorNo("Failure calling timer_delete.");
(void)strerror_r(errno, err_msg, 128);
LogError("Failure calling timer_delete. Error: %d: (%s)", errno, err_msg);
} }
else else
{ {
// Do Nothing // Do Nothing
} }
free(timer); (void)interlocked_exchange(&timer->timer_status, TIMER_DISABLED);
} }
} }

Просмотреть файл

@ -23,6 +23,8 @@
#include "c_pal/execution_engine.h" #include "c_pal/execution_engine.h"
#include "c_pal/execution_engine_linux.h" #include "c_pal/execution_engine_linux.h"
#define TEST_TIMER_INSTANCE_COUNT 64
static TEST_MUTEX_HANDLE test_serialize_mutex; static TEST_MUTEX_HANDLE test_serialize_mutex;
TEST_DEFINE_ENUM_TYPE(THREADPOOL_OPEN_RESULT, THREADPOOL_OPEN_RESULT_VALUES); TEST_DEFINE_ENUM_TYPE(THREADPOOL_OPEN_RESULT, THREADPOOL_OPEN_RESULT_VALUES);
@ -546,4 +548,60 @@ TEST_FUNCTION(cancel_timer_waits_for_ongoing_execution)
execution_engine_dec_ref(execution_engine); execution_engine_dec_ref(execution_engine);
} }
TEST_FUNCTION(exhaust_timers_making_sure_delete_does_not_call_invalid_callback)
{
// create an execution engine
EXECUTION_ENGINE_HANDLE execution_engine = execution_engine_create(NULL);
ASSERT_IS_NOT_NULL(execution_engine);
// create the threadpool
THREADPOOL_HANDLE threadpool = threadpool_create(execution_engine);
ASSERT_IS_NOT_NULL(threadpool);
TIMER_INSTANCE_HANDLE timer[TEST_TIMER_INSTANCE_COUNT];
volatile_atomic int32_t dummy;
volatile_atomic int32_t call_count;
volatile_atomic int32_t new_callback_ctx;
(void)interlocked_exchange(&call_count, 0);
(void)interlocked_exchange(&new_callback_ctx, 0);
LogInfo("Starting timers");
// Fill up the array of timer instance and leave 1 spot left.
for (int32_t index = 0; index < TEST_TIMER_INSTANCE_COUNT-1; index++)
{
ASSERT_ARE_EQUAL(int, 0, threadpool_timer_start(threadpool, 0, 30000, work_function, (void*)&dummy, &timer[index]));
}
ASSERT_ARE_EQUAL(int, 0, threadpool_timer_start(threadpool, 100, 500, work_function, (void*)&call_count, &timer[TEST_TIMER_INSTANCE_COUNT-1]));
// act
wait_for_equal(&call_count, 3, 10000);
ASSERT_ARE_EQUAL(int32_t, call_count, 3, "Call counter has timed out");
// call cancel
LogInfo("Timer should be running and waiting, now destroy timer");
threadpool_timer_destroy(timer[TEST_TIMER_INSTANCE_COUNT-1]);
LogInfo("Now create a new timer");
ASSERT_ARE_EQUAL(int, 0, threadpool_timer_start(threadpool, 100, 500, work_function, (void*)&new_callback_ctx, &timer[TEST_TIMER_INSTANCE_COUNT-1]));
// Make sure that th new callback was sent correctly
wait_for_equal(&new_callback_ctx, 1, 10000);
// Make sure the old call count remains at 3
// assert
ASSERT_ARE_EQUAL(int32_t, call_count, 3, "Thread called an invalid callback");
ASSERT_ARE_EQUAL(int32_t, new_callback_ctx, 1, "Thread counter has timed out");
// cleanup
for (int32_t index = 0; index < TEST_TIMER_INSTANCE_COUNT; index++)
{
threadpool_timer_destroy(timer[index]);
}
threadpool_destroy(threadpool);
execution_engine_dec_ref(execution_engine);
}
END_TEST_SUITE(TEST_SUITE_NAME_FROM_CMAKE) END_TEST_SUITE(TEST_SUITE_NAME_FROM_CMAKE)