зеркало из https://github.com/Azure/c-pal.git
add a threadpool chaos test (#406)
* add a chaos test * change back thread count * address comments * address comments * address comment * add a comment * fixed a typo * address comments
This commit is contained in:
Родитель
87af89ee32
Коммит
59cc1df5b1
|
@ -79,6 +79,19 @@ static void open_work_function(void* context)
|
|||
TIMER_STATE_CANCELING, \
|
||||
TIMER_STATE_STOPPING
|
||||
|
||||
#define TEST_ACTION_VALUES \
|
||||
TEST_ACTION_THREADPOOL_OPEN, \
|
||||
TEST_ACTION_THREADPOOL_CLOSE, \
|
||||
TEST_ACTION_SCHEDULE_WORK, \
|
||||
TEST_ACTION_START_TIMER, \
|
||||
TEST_ACTION_DESTROY_TIMER, \
|
||||
TEST_ACTION_CANCEL_TIMER, \
|
||||
TEST_ACTION_RESTART_TIMER, \
|
||||
TEST_ACTION_SCHEDULE_WORK_ITEM
|
||||
|
||||
MU_DEFINE_ENUM(TEST_ACTION, TEST_ACTION_VALUES)
|
||||
MU_DEFINE_ENUM_STRINGS(TEST_ACTION, TEST_ACTION_VALUES)
|
||||
|
||||
MU_DEFINE_ENUM(TIMER_STATE, TIMER_STATE_VALUES)
|
||||
MU_DEFINE_ENUM_STRINGS(TIMER_STATE, TIMER_STATE_VALUES)
|
||||
|
||||
|
@ -787,6 +800,7 @@ TEST_FUNCTION(open_while_closing_fails)
|
|||
}
|
||||
|
||||
#define CHAOS_THREAD_COUNT 4
|
||||
#define TEST_RUN_TIME 10000 //ms
|
||||
|
||||
typedef struct CHAOS_TEST_TIMER_DATA_TAG
|
||||
{
|
||||
|
@ -805,6 +819,7 @@ typedef struct CHAOS_TEST_DATA_TAG
|
|||
THANDLE(THREADPOOL) threadpool;
|
||||
|
||||
volatile LONG can_start_timers;
|
||||
volatile LONG can_schedule_works;
|
||||
volatile LONG timers_starting;
|
||||
CHAOS_TEST_TIMER_DATA timers[MAX_TIMER_COUNT];
|
||||
THREADPOOL_WORK_ITEM_HANDLE work_item_context;
|
||||
|
@ -865,27 +880,31 @@ static void chaos_cleanup_all_timers(CHAOS_TEST_DATA* chaos_test_data)
|
|||
}
|
||||
}
|
||||
|
||||
static DWORD WINAPI chaos_thread_with_timers_func(LPVOID lpThreadParameter)
|
||||
static DWORD WINAPI chaos_thread_with_timers_no_lock_func(LPVOID lpThreadParameter)
|
||||
{
|
||||
CHAOS_TEST_DATA* chaos_test_data = (CHAOS_TEST_DATA*)lpThreadParameter;
|
||||
CHAOS_TEST_DATA* chaos_test_data = lpThreadParameter;
|
||||
|
||||
while (InterlockedAdd(&chaos_test_data->chaos_test_done, 0) == 0)
|
||||
{
|
||||
int which_action = rand() * 8 / (RAND_MAX + 1);
|
||||
int which_action = rand() * (MU_ENUM_VALUE_COUNT(TEST_ACTION_VALUES) - 2) / RAND_MAX + 1;
|
||||
switch (which_action)
|
||||
{
|
||||
case 0:
|
||||
default:
|
||||
ASSERT_FAIL("unexpected action type=%" PRI_MU_ENUM "", MU_ENUM_VALUE(TEST_ACTION, which_action));
|
||||
break;
|
||||
case TEST_ACTION_THREADPOOL_OPEN:
|
||||
// perform an open
|
||||
(void)threadpool_open(chaos_test_data->threadpool);
|
||||
break;
|
||||
case 1:
|
||||
case TEST_ACTION_THREADPOOL_CLOSE:
|
||||
// perform a close
|
||||
// First prevent new timers, because we need to clean them all up (lock)
|
||||
if (InterlockedCompareExchange(&chaos_test_data->can_start_timers, 0, 1) == 1)
|
||||
if (InterlockedCompareExchange(&chaos_test_data->can_start_timers, 0, 1) == 1 && InterlockedCompareExchange(&chaos_test_data->can_schedule_works, 0, 1) == 1)
|
||||
{
|
||||
// Wait for any threads that had been starting timers to complete
|
||||
wait_for_equal(&chaos_test_data->timers_starting, 0, INFINITE);
|
||||
|
||||
wait_for_equal((void*)(&chaos_test_data->executed_work_functions), (LONG)(chaos_test_data->expected_call_count), INFINITE);
|
||||
// Cleanup all timers
|
||||
chaos_cleanup_all_timers(chaos_test_data);
|
||||
|
||||
|
@ -894,102 +913,107 @@ static DWORD WINAPI chaos_thread_with_timers_func(LPVOID lpThreadParameter)
|
|||
|
||||
// Now back to normal
|
||||
(void)InterlockedExchange(&chaos_test_data->can_start_timers, 1);
|
||||
(void)InterlockedExchange(&chaos_test_data->can_schedule_works, 1);
|
||||
}
|
||||
break;
|
||||
case 2:
|
||||
case TEST_ACTION_SCHEDULE_WORK:
|
||||
// perform a schedule item
|
||||
if (threadpool_schedule_work(chaos_test_data->threadpool, work_function, (void*)&chaos_test_data->executed_work_functions) == 0)
|
||||
if (InterlockedAdd(&chaos_test_data->can_schedule_works, 0) != 0)
|
||||
{
|
||||
(void)InterlockedIncrement64(&chaos_test_data->expected_call_count);
|
||||
}
|
||||
break;
|
||||
case 3:
|
||||
// Start a timer
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
if (threadpool_schedule_work(chaos_test_data->threadpool, work_function, (void*)&chaos_test_data->executed_work_functions) == 0)
|
||||
{
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTING, TIMER_STATE_NONE) == TIMER_STATE_NONE)
|
||||
{
|
||||
uint32_t timer_start_delay = TIMER_START_DELAY_MIN + rand() * (TIMER_START_DELAY_MAX - TIMER_START_DELAY_MIN) / (RAND_MAX + 1);
|
||||
uint32_t timer_period = TIMER_PERIOD_MIN + rand() * (TIMER_PERIOD_MAX - TIMER_PERIOD_MIN) / (RAND_MAX + 1);
|
||||
if (threadpool_timer_start(chaos_test_data->threadpool, timer_start_delay, timer_period, work_function, (void*)&chaos_test_data->executed_timer_functions, &chaos_test_data->timers[which_timer_slot].timer) == 0)
|
||||
{
|
||||
InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTED);
|
||||
}
|
||||
else
|
||||
{
|
||||
InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_NONE);
|
||||
}
|
||||
}
|
||||
(void)InterlockedIncrement64(&chaos_test_data->expected_call_count);
|
||||
}
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case 4:
|
||||
// Stop a timer
|
||||
case TEST_ACTION_START_TIMER:
|
||||
// Start a timer
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTING, TIMER_STATE_NONE) == TIMER_STATE_NONE)
|
||||
{
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STOPPING, TIMER_STATE_STARTED) == TIMER_STATE_STARTED)
|
||||
uint32_t timer_start_delay = TIMER_START_DELAY_MIN + rand() * (TIMER_START_DELAY_MAX - TIMER_START_DELAY_MIN) / (RAND_MAX + 1);
|
||||
uint32_t timer_period = TIMER_PERIOD_MIN + rand() * (TIMER_PERIOD_MAX - TIMER_PERIOD_MIN) / (RAND_MAX + 1);
|
||||
if (threadpool_timer_start(chaos_test_data->threadpool, timer_start_delay, timer_period, work_function, (void*)&chaos_test_data->executed_timer_functions, &chaos_test_data->timers[which_timer_slot].timer) == 0)
|
||||
{
|
||||
InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTED);
|
||||
}
|
||||
else
|
||||
{
|
||||
threadpool_timer_destroy(chaos_test_data->timers[which_timer_slot].timer);
|
||||
chaos_test_data->timers[which_timer_slot].timer = NULL;
|
||||
InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_NONE);
|
||||
}
|
||||
}
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case 5:
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case TEST_ACTION_DESTROY_TIMER:
|
||||
// Stop a timer
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
{
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STOPPING, TIMER_STATE_STARTED) == TIMER_STATE_STARTED)
|
||||
{
|
||||
threadpool_timer_destroy(chaos_test_data->timers[which_timer_slot].timer);
|
||||
chaos_test_data->timers[which_timer_slot].timer = NULL;
|
||||
InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_NONE);
|
||||
}
|
||||
}
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case TEST_ACTION_CANCEL_TIMER:
|
||||
// Cancel a timer
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedAdd(&chaos_test_data->timers[which_timer_slot].state, 0) == TIMER_STATE_STARTED)
|
||||
{
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_CANCELING, TIMER_STATE_STARTED) == TIMER_STATE_STARTED)
|
||||
{
|
||||
threadpool_timer_cancel(chaos_test_data->timers[which_timer_slot].timer);
|
||||
(void)InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTED);
|
||||
}
|
||||
threadpool_timer_cancel(chaos_test_data->timers[which_timer_slot].timer);
|
||||
}
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case 6:
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
break;
|
||||
case TEST_ACTION_RESTART_TIMER:
|
||||
// Restart a timer
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
{
|
||||
// Synchronize with close
|
||||
(void)InterlockedIncrement(&chaos_test_data->timers_starting);
|
||||
if (InterlockedAdd(&chaos_test_data->can_start_timers, 0) != 0)
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedAdd(&chaos_test_data->timers[which_timer_slot].state, 0) == TIMER_STATE_STARTED)
|
||||
{
|
||||
int which_timer_slot = rand() * MAX_TIMER_COUNT / (RAND_MAX + 1);
|
||||
if (InterlockedCompareExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTING, TIMER_STATE_STARTED) == TIMER_STATE_STARTED)
|
||||
{
|
||||
uint32_t timer_start_delay = TIMER_START_DELAY_MIN + rand() * (TIMER_START_DELAY_MAX - TIMER_START_DELAY_MIN) / (RAND_MAX + 1);
|
||||
uint32_t timer_period = TIMER_PERIOD_MIN + rand() * (TIMER_PERIOD_MAX - TIMER_PERIOD_MIN) / (RAND_MAX + 1);
|
||||
ASSERT_ARE_EQUAL(int, 0, threadpool_timer_restart(chaos_test_data->timers[which_timer_slot].timer, timer_start_delay, timer_period));
|
||||
(void)InterlockedExchange(&chaos_test_data->timers[which_timer_slot].state, TIMER_STATE_STARTED);
|
||||
}
|
||||
uint32_t timer_start_delay = TIMER_START_DELAY_MIN + rand() * (TIMER_START_DELAY_MAX - TIMER_START_DELAY_MIN) / (RAND_MAX + 1);
|
||||
uint32_t timer_period = TIMER_PERIOD_MIN + rand() * (TIMER_PERIOD_MAX - TIMER_PERIOD_MIN) / (RAND_MAX + 1);
|
||||
ASSERT_ARE_EQUAL(int, 0, threadpool_timer_restart(chaos_test_data->timers[which_timer_slot].timer, timer_start_delay, timer_period));
|
||||
}
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
case 7:
|
||||
(void)InterlockedDecrement(&chaos_test_data->timers_starting);
|
||||
WakeByAddressSingle((PVOID)&chaos_test_data->timers_starting);
|
||||
}
|
||||
case TEST_ACTION_SCHEDULE_WORK_ITEM:
|
||||
// perform a schedule work item
|
||||
if (threadpool_schedule_work_item(chaos_test_data->threadpool, chaos_test_data->work_item_context) == 0)
|
||||
if (InterlockedAdd(&chaos_test_data->can_schedule_works, 0) != 0)
|
||||
{
|
||||
(void)InterlockedIncrement64(&chaos_test_data->expected_call_count);
|
||||
if (threadpool_schedule_work_item(chaos_test_data->threadpool, chaos_test_data->work_item_context) == 0)
|
||||
{
|
||||
(void)InterlockedIncrement64(&chaos_test_data->expected_call_count);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -1058,18 +1082,20 @@ TEST_FUNCTION(chaos_knight_test)
|
|||
execution_engine_dec_ref(execution_engine);
|
||||
}
|
||||
|
||||
TEST_FUNCTION(chaos_knight_test_with_timers)
|
||||
//test used for detect race condition between timer_restart/timer_cancel and timer destory, failed due to the race condition for the current code, will uncomment after the fix
|
||||
XTEST_FUNCTION(chaos_knight_test_with_timers_no_lock)
|
||||
{
|
||||
// start a number of threads and each of them will do a random action on the threadpool
|
||||
EXECUTION_ENGINE_PARAMETERS execution_engine_parameters = { 4, 0 };
|
||||
EXECUTION_ENGINE_HANDLE execution_engine = execution_engine_create(&execution_engine_parameters);
|
||||
ASSERT_IS_NOT_NULL(execution_engine);
|
||||
HANDLE thread_handles[CHAOS_THREAD_COUNT];
|
||||
size_t i;
|
||||
CHAOS_TEST_DATA chaos_test_data;
|
||||
|
||||
THANDLE(THREADPOOL) threadpool = threadpool_create(execution_engine);
|
||||
ASSERT_IS_NOT_NULL(threadpool);
|
||||
|
||||
ASSERT_ARE_EQUAL(int, 0, threadpool_open(threadpool));
|
||||
THANDLE_INITIALIZE_MOVE(THREADPOOL)(&chaos_test_data.threadpool, &threadpool);
|
||||
|
||||
(void)InterlockedExchange64(&chaos_test_data.expected_call_count, 0);
|
||||
|
@ -1082,22 +1108,28 @@ TEST_FUNCTION(chaos_knight_test_with_timers)
|
|||
for (i = 0; i < MAX_TIMER_COUNT; i++)
|
||||
{
|
||||
chaos_test_data.timers[i].timer = NULL;
|
||||
InterlockedExchange(&chaos_test_data.timers[i].state, TIMER_STATE_NONE);
|
||||
(void)InterlockedExchange(&chaos_test_data.timers[i].state, TIMER_STATE_NONE);
|
||||
}
|
||||
|
||||
chaos_test_data.work_item_context = threadpool_create_work_item(chaos_test_data.threadpool, work_function, (void*)&chaos_test_data.executed_work_functions);
|
||||
|
||||
ASSERT_IS_NOT_NULL(chaos_test_data.work_item_context);
|
||||
for (i = 0; i < CHAOS_THREAD_COUNT; i++)
|
||||
{
|
||||
thread_handles[i] = CreateThread(NULL, 0, chaos_thread_with_timers_func, &chaos_test_data, 0, NULL);
|
||||
thread_handles[i] = CreateThread(NULL, 0, chaos_thread_with_timers_no_lock_func, &chaos_test_data, 0, NULL);
|
||||
ASSERT_IS_NOT_NULL(thread_handles[i], "thread %zu failed to start", i);
|
||||
}
|
||||
|
||||
// wait for some time
|
||||
Sleep(10000);
|
||||
Sleep(TEST_RUN_TIME);
|
||||
|
||||
(void)InterlockedExchange(&chaos_test_data.chaos_test_done, 1);
|
||||
|
||||
//will change to use a future API: InterlockedHL_waitforvalue_64 later, task:https://msazure.visualstudio.com/One/_workitems/edit/30197585
|
||||
while (InterlockedAdd64(&chaos_test_data.expected_call_count, 0) != InterlockedAdd64(&chaos_test_data.executed_work_functions, 0))
|
||||
{
|
||||
Sleep(1);
|
||||
}
|
||||
|
||||
// wait for all threads to complete
|
||||
for (i = 0; i < CHAOS_THREAD_COUNT; i++)
|
||||
{
|
||||
|
|
Загрузка…
Ссылка в новой задаче