From f7010d45f3afdacc1ad52bf3076e6dc4e2452ce3 Mon Sep 17 00:00:00 2001 From: Alan Jowett Date: Wed, 4 Oct 2023 11:04:52 -0700 Subject: [PATCH] Timed work queue (#2900) * Timed work queue Signed-off-by: Alan Jowett * Fix code analysis failure Signed-off-by: Alan Jowett * Switch to timed_work_queue to save DPCs Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * Fix code-analysis failure Signed-off-by: Alan Jowett * Fix fault injection failure Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * Fix code analysis failure Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett * PR feedback Signed-off-by: Alan Jowett --------- Signed-off-by: Alan Jowett Co-authored-by: Alan Jowett --- libs/runtime/ebpf_epoch.c | 159 ++++++++---------- libs/runtime/ebpf_work_queue.c | 154 +++++++++++++++++ libs/runtime/ebpf_work_queue.h | 92 ++++++++++ libs/runtime/kernel/platform_kernel.vcxproj | 2 + .../kernel/platform_kernel.vcxproj.filters | 6 + libs/runtime/unit/platform_unit_test.cpp | 79 +++++++++ libs/runtime/user/platform_user.vcxproj | 2 + .../user/platform_user.vcxproj.filters | 6 + 8 files changed, 414 insertions(+), 86 deletions(-) create mode 100644 libs/runtime/ebpf_work_queue.c create mode 100644 libs/runtime/ebpf_work_queue.h diff --git a/libs/runtime/ebpf_epoch.c b/libs/runtime/ebpf_epoch.c index 245567b49..05bdf1ca6 100644 --- a/libs/runtime/ebpf_epoch.c +++ b/libs/runtime/ebpf_epoch.c @@ -3,6 +3,7 @@ #include "ebpf_epoch.h" #include "ebpf_tracelog.h" +#include "ebpf_work_queue.h" /** * @brief Epoch Base Memory Reclamation. @@ -50,6 +51,7 @@ typedef __declspec(align(EBPF_CACHE_LINE_SIZE)) struct _ebpf_epoch_cpu_entry int timer_armed : 1; ///< Set if the flush timer is armed. int rundown_in_progress : 1; ///< Set if rundown is in progress. int epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress. + ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items. } ebpf_epoch_cpu_entry_t; /** @@ -107,7 +109,9 @@ typedef enum _ebpf_epoch_cpu_message_type */ typedef struct _ebpf_epoch_cpu_message { + LIST_ENTRY list_entry; ///< List entry used to insert the message into the message queue. ebpf_epoch_cpu_message_type_t message_type; + ebpf_work_queue_wakeup_behavior_t wake_behavior; union { struct @@ -150,11 +154,6 @@ static ebpf_epoch_cpu_message_t _ebpf_epoch_compute_release_epoch_message = {0}; */ static KDPC _ebpf_epoch_timer_dpc; -/** - * @brief DPC used to process epoch computation. - */ -static KDPC _ebpf_epoch_compute_release_epoch_dpc; - /** * @brief Type of entry in the free list. * There are two types of entries in the free list: @@ -198,8 +197,8 @@ cxplat_rundown_reference_t _ebpf_epoch_work_item_rundown_ref; static void _ebpf_epoch_release_free_list(_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, int64_t released_epoch); -_Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker( - _In_ KDPC* dpc, _In_opt_ void* cpu_entry, _In_opt_ void* message, _In_opt_ void* arg2); +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker( + _Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* message); _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_timer_worker( _In_ KDPC* dpc, _In_opt_ void* cpu_entry, _In_opt_ void* message, _In_opt_ void* arg2); @@ -207,6 +206,9 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _IRQL_requires_max_(APC_LEVEL) static void _ebpf_epoch_send_message_and_wait( _In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id); +static void +_ebpf_epoch_send_message_async(_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id); + _IRQL_requires_same_ static void _ebpf_epoch_insert_in_free_list(_In_ ebpf_epoch_allocation_header_t* header); @@ -276,11 +278,18 @@ ebpf_epoch_initiate() cpu_entry->current_epoch = 1; ebpf_list_initialize(&cpu_entry->epoch_state_list); ebpf_list_initialize(&cpu_entry->free_list); + LARGE_INTEGER interval; + interval.QuadPart = EBPF_EPOCH_FLUSH_DELAY_IN_NANOSECONDS / EBPF_NANO_SECONDS_PER_FILETIME_TICK; + + ebpf_result_t result = ebpf_timed_work_queue_create( + &cpu_entry->work_queue, cpu_id, &interval, _ebpf_epoch_messenger_worker, cpu_entry); + if (result != EBPF_SUCCESS) { + return_value = result; + goto Error; + } } - KeInitializeDpc(&_ebpf_epoch_compute_release_epoch_dpc, _ebpf_epoch_messenger_worker, NULL); KeInitializeDpc(&_ebpf_epoch_timer_dpc, _ebpf_epoch_timer_worker, NULL); - KeSetTargetProcessorDpc(&_ebpf_epoch_compute_release_epoch_dpc, 0); KeSetTargetProcessorDpc(&_ebpf_epoch_timer_dpc, 0); KeInitializeTimer(&_ebpf_epoch_compute_release_epoch_timer); @@ -304,12 +313,13 @@ ebpf_epoch_terminate() } rundown_message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_RUNDOWN_IN_PROGRESS; + rundown_message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT; _ebpf_epoch_send_message_and_wait(&rundown_message, 0); // Cancel the timer. KeCancelTimer(&_ebpf_epoch_compute_release_epoch_timer); - // Wait for the DPC to complete. + // Wait for the active DPC to complete. KeFlushQueuedDpcs(); for (cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) { @@ -317,6 +327,7 @@ ebpf_epoch_terminate() // Release all memory that is still in the free list. _ebpf_epoch_release_free_list(cpu_entry, MAXINT64); ebpf_assert(ebpf_list_is_empty(&cpu_entry->free_list)); + ebpf_timed_work_queue_destroy(cpu_entry->work_queue); } // Wait for all work items to complete. @@ -331,9 +342,9 @@ ebpf_epoch_terminate() } #pragma warning(push) -#pragma warning( \ - disable : 28166) // warning C28166: The function 'ebpf_epoch_enter' does not restore the IRQL to the value that was - // current at function entry and is required to do so. IRQL was last set to 2 at line 334. +#pragma warning(disable : 28166) // warning C28166: Code analysis incorrectly reports that the function + // 'ebpf_epoch_enter' does not restore the IRQL to the value that was current at + // function entry. _IRQL_requires_same_ void ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state) { @@ -350,9 +361,8 @@ ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state) #pragma warning(push) #pragma warning( \ - disable : 28166) // warning C28166: The function 'ebpf_epoch_exit' does not restore the IRQL to the value that was - // current at function entry and is required to do so. IRQL was last set to 2 at line 353. -// the IRQL. + disable : 28166) // warning C28166: Code analysis incorrectly reports that the function 'ebpf_epoch_exit' + // does not restore the IRQL to the value that was current at function entry. _IRQL_requires_same_ void ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state) { @@ -377,6 +387,7 @@ ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state) ebpf_epoch_cpu_message_t message = {0}; message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_EXIT_EPOCH; message.message.exit_epoch.epoch_state = epoch_state; + message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT; // The other CPU will call ebpf_epoch_exit() on our behalf. // The epoch was entered at < DISPATCH_LEVEL but will now exit at DISPATCH_LEVEL. To prevent the assert above @@ -394,6 +405,11 @@ ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state) ebpf_list_remove_entry(&epoch_state->epoch_list_entry); _ebpf_epoch_arm_timer_if_needed(&_ebpf_epoch_cpu_table[cpu_id]); + // If there are items in the work queue, flush them. + if (!ebpf_timed_work_queue_is_empty(_ebpf_epoch_cpu_table[cpu_id].work_queue)) { + ebpf_timed_work_queued_flush(_ebpf_epoch_cpu_table[cpu_id].work_queue); + } + _ebpf_epoch_lower_to_previous_irql(epoch_state->irql_at_enter); } #pragma warning(pop) @@ -496,6 +512,7 @@ ebpf_epoch_flush() ebpf_epoch_cpu_message_t message = {0}; message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH; + message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT; _ebpf_epoch_send_message_and_wait(&message, 0); } @@ -506,6 +523,7 @@ ebpf_epoch_is_free_list_empty(uint32_t cpu_id) ebpf_epoch_cpu_message_t message = {0}; message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_IS_FREE_LIST_EMPTY; + message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT; _ebpf_epoch_send_message_and_wait(&message, cpu_id); @@ -653,9 +671,9 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_skipped_timers = 0; memset(&_ebpf_epoch_compute_release_epoch_message, 0, sizeof(_ebpf_epoch_compute_release_epoch_message)); _ebpf_epoch_compute_release_epoch_message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH; + _ebpf_epoch_compute_release_epoch_message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_TIMER; KeInitializeEvent(&_ebpf_epoch_compute_release_epoch_message.completion_event, NotificationEvent, false); - KeSetTargetProcessorDpc(&_ebpf_epoch_compute_release_epoch_dpc, 0); - KeInsertQueueDpc(&_ebpf_epoch_compute_release_epoch_dpc, &_ebpf_epoch_compute_release_epoch_message, NULL); + _ebpf_epoch_send_message_async(&_ebpf_epoch_compute_release_epoch_message, 0); } else { _ebpf_epoch_skipped_timers++; LARGE_INTEGER due_time; @@ -668,10 +686,7 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void * @brief DPC that runs when a message is sent between CPUs. */ typedef void (*ebpf_epoch_messenger_worker_t)( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu); + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu); /** * @brief Compute the next proposed release epoch and send it to the next CPU. @@ -683,20 +698,14 @@ typedef void (*ebpf_epoch_messenger_worker_t)( * last CPU forwards the message to the next CPU. The last CPU then sends an * EBPF_EPOCH_CPU_MESSAGE_TYPE_COMMIT_RELEASE_EPOCH message to CPU 0 with the final proposed release epoch. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to compute the epoch for. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_propose_release_epoch( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { - UNREFERENCED_PARAMETER(dpc); - // Walk over each thread_entry in the epoch_state_list and compute the minimum epoch. ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink; ebpf_epoch_state_t* epoch_state; @@ -738,9 +747,7 @@ _ebpf_epoch_messenger_propose_release_epoch( next_cpu = current_cpu + 1; } - // Queue the DPC to the next CPU. - KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu); - KeInsertQueueDpc(dpc, message, NULL); + _ebpf_epoch_send_message_async(message, next_cpu); } /** @@ -755,19 +762,16 @@ _ebpf_epoch_messenger_propose_release_epoch( * 5. Forwards the message to the next CPU. * The last CPU then sends a EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_EPOCH_COMPLETE message to CPU 0. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to rearm the timer for. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_commit_release_epoch( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { uint32_t next_cpu; + cpu_entry->timer_armed = false; // Set the released_epoch to the value computed by the EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH message. cpu_entry->released_epoch = message->message.commit_epoch.released_epoch - 1; @@ -781,9 +785,7 @@ _ebpf_epoch_messenger_commit_release_epoch( next_cpu = 0; } - // Queue the DPC to the next CPU. - KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu); - KeInsertQueueDpc(dpc, message, NULL); + _ebpf_epoch_send_message_async(message, next_cpu); _ebpf_epoch_release_free_list(cpu_entry, cpu_entry->released_epoch); } @@ -795,21 +797,17 @@ _ebpf_epoch_messenger_commit_release_epoch( * CPU 0 clears the epoch computation in progress flag and signals the KEVENT associated with the message to signal any * waiting threads that the operation is completed. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to mark the computation as complete for. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_compute_epoch_complete( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { UNREFERENCED_PARAMETER(current_cpu); // If this is the timer's DPC, then mark the computation as complete. - if (dpc == &_ebpf_epoch_compute_release_epoch_dpc) { + if (message == &_ebpf_epoch_compute_release_epoch_message) { cpu_entry->epoch_computation_in_progress = false; } else { // This is an adhoc flush. Signal the caller that the flush is complete. @@ -824,19 +822,14 @@ _ebpf_epoch_messenger_compute_epoch_complete( * The CPU removes the ebpf_epoch_state_t from the per-CPU thread list and signals the KEVENT associated with the * message to signal any waiting threads that the operation is completed. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to call ebpf_epoch_exit() for. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_exit_epoch( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { - UNREFERENCED_PARAMETER(dpc); UNREFERENCED_PARAMETER(current_cpu); UNREFERENCED_PARAMETER(cpu_entry); @@ -850,20 +843,15 @@ _ebpf_epoch_messenger_exit_epoch( * Message is sent to each CPU to notify it that epoch code is shutting down and that no future timers should be armed * and future messages should be ignored. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to set the flag for. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_rundown_in_progress( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { uint32_t next_cpu; - UNREFERENCED_PARAMETER(dpc); cpu_entry->rundown_in_progress = true; // If this is the last CPU, then stop. if (current_cpu != _ebpf_epoch_cpu_count - 1) { @@ -877,9 +865,7 @@ _ebpf_epoch_messenger_rundown_in_progress( return; } - // Queue the DPC to the next CPU. - KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu); - KeInsertQueueDpc(dpc, message, NULL); + _ebpf_epoch_send_message_async(message, next_cpu); } /** @@ -887,19 +873,14 @@ _ebpf_epoch_messenger_rundown_in_progress( * EBPF_EPOCH_CPU_MESSAGE_TYPE_IS_FREE_LIST_EMPTY message: * Message is sent to each CPU to query if its local free list is empty. * - * @param[in] dpc DPC that triggered this function. * @param[in] cpu_entry CPU entry to check. * @param[in] message Message to process. * @param[in] current_cpu Current CPU. */ void _ebpf_epoch_messenger_is_free_list_empty( - _In_ KDPC* dpc, - _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, - _Inout_ ebpf_epoch_cpu_message_t* message, - uint32_t current_cpu) + _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { - UNREFERENCED_PARAMETER(dpc); UNREFERENCED_PARAMETER(current_cpu); message->message.is_free_list_empty.is_empty = ebpf_list_is_empty(&cpu_entry->free_list); KeSetEvent(&message->completion_event, 0, FALSE); @@ -921,33 +902,28 @@ static ebpf_epoch_messenger_worker_t _ebpf_epoch_messenger_workers[] = { * * If rundown is in progress, then the message is ignored. * - * @param[in] dpc DPC that triggered this function. * @param[in] context Context passed to the DPC - not used. - * @param[in] arg1 The ebpf_epoch_cpu_message_t to process. - * @param[in] arg2 Not used. + * @param[in] list_entry List entry that contains the message to process. */ -_Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker( - _In_ KDPC* dpc, _In_opt_ void* context, _In_opt_ void* arg1, _In_opt_ void* arg2) +_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker( + _Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* list_entry) { UNREFERENCED_PARAMETER(context); - UNREFERENCED_PARAMETER(arg2); - uint32_t cpu_id = ebpf_get_current_cpu(); + ebpf_assert(ebpf_get_current_cpu() == cpu_id); ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; - ebpf_epoch_cpu_message_t* message = (ebpf_epoch_cpu_message_t*)arg1; + ebpf_epoch_cpu_message_t* message = CONTAINING_RECORD(list_entry, ebpf_epoch_cpu_message_t, list_entry); // If rundown is in progress, then exit immediately. if (cpu_entry->rundown_in_progress) { return; } - _Analysis_assume_(arg1 != NULL); - if (message->message_type > EBPF_COUNT_OF(_ebpf_epoch_messenger_workers) || message->message_type < 0) { ebpf_assert(!"Invalid message type"); return; } - _ebpf_epoch_messenger_workers[message->message_type](dpc, cpu_entry, message, cpu_id); + _ebpf_epoch_messenger_workers[message->message_type](cpu_entry, message, cpu_id); } /** @@ -955,26 +931,37 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void * * @param[in] message Message to send. * @param[in] cpu_id CPU to send the message to. + * @param[in] flush If true, process all messages on the target CPU immediately. */ _IRQL_requires_max_(APC_LEVEL) static void _ebpf_epoch_send_message_and_wait( _In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id) { - KDPC dpc; - // Initialize the completion event. KeInitializeEvent(&message->completion_event, NotificationEvent, FALSE); - // Initialize the dpc. - KeInitializeDpc(&dpc, _ebpf_epoch_messenger_worker, NULL); - KeSetTargetProcessorDpc(&dpc, (uint8_t)cpu_id); - - // Send the message. - KeInsertQueueDpc(&dpc, message, NULL); + // Queue the message to the specified CPU. + ebpf_timed_work_queue_insert( + _ebpf_epoch_cpu_table[cpu_id].work_queue, &message->list_entry, message->wake_behavior); // Wait for the message to complete. KeWaitForSingleObject(&message->completion_event, Executive, KernelMode, FALSE, NULL); } +/** + * @brief Send a message to the specified CPU asynchronously. + * + * @param[in] message Message to send. + * @param[in] cpu_id CPU to send the message to. + * @param[in] flush If true, process all messages on the target CPU immediately. + */ +static void +_ebpf_epoch_send_message_async(_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id) +{ + // Queue the message to the specified CPU. + ebpf_timed_work_queue_insert( + _ebpf_epoch_cpu_table[cpu_id].work_queue, &message->list_entry, message->wake_behavior); +} + /** * @brief Callback for ebpf_preemptible_work_item_t that calls the callback * function in ebpf_epoch_work_item_t. diff --git a/libs/runtime/ebpf_work_queue.c b/libs/runtime/ebpf_work_queue.c new file mode 100644 index 000000000..e52734f6f --- /dev/null +++ b/libs/runtime/ebpf_work_queue.c @@ -0,0 +1,154 @@ +// Copyright (c) Microsoft Corporation +// SPDX-License-Identifier: MIT + +#include "ebpf_work_queue.h" + +typedef struct _ebpf_timed_work_queue +{ + KTIMER timer; + KDPC dpc; + ebpf_list_entry_t work_items; + ebpf_lock_t lock; + bool timer_armed; + LARGE_INTEGER interval; + ebpf_timed_work_queue_callback_t callback; + void* context; + uint32_t cpu_id; +} ebpf_timed_work_queue_t; + +KDEFERRED_ROUTINE _ebpf_timed_work_queue_timer_callback; + +void +_ebpf_timed_work_queue_timer_callback( + _In_ KDPC* dpc, _In_opt_ void* deferred_context, _In_opt_ void* system_argument1, _In_opt_ void* system_argument2); + +_Must_inspect_result_ ebpf_result_t +ebpf_timed_work_queue_create( + _Out_ ebpf_timed_work_queue_t** work_queue, + uint32_t cpu_id, + _In_ LARGE_INTEGER* interval, + _In_ ebpf_timed_work_queue_callback_t callback, + _In_ void* context) +{ + ebpf_timed_work_queue_t* local_work_queue = NULL; + ebpf_result_t return_value; + + local_work_queue = ebpf_allocate(sizeof(ebpf_timed_work_queue_t)); + if (!local_work_queue) { + return_value = EBPF_NO_MEMORY; + goto Done; + } + + local_work_queue->callback = callback; + local_work_queue->context = context; + local_work_queue->interval = *interval; + local_work_queue->cpu_id = cpu_id; + + ebpf_lock_create(&local_work_queue->lock); + + ebpf_list_initialize(&local_work_queue->work_items); + + KeInitializeTimer(&local_work_queue->timer); + KeInitializeDpc(&local_work_queue->dpc, _ebpf_timed_work_queue_timer_callback, local_work_queue); + KeSetTargetProcessorDpc(&local_work_queue->dpc, (CCHAR)cpu_id); + + *work_queue = local_work_queue; + local_work_queue = NULL; + return_value = EBPF_SUCCESS; + +Done: + ebpf_timed_work_queue_destroy(local_work_queue); + + return return_value; +} + +void +ebpf_timed_work_queue_destroy(_In_opt_ ebpf_timed_work_queue_t* work_queue) +{ + if (!work_queue) { + return; + } + + // Cancel the timer. + KeCancelTimer(&work_queue->timer); + + // Wait for the DPC to complete. + KeFlushQueuedDpcs(); + + // Destroy the lock. + ebpf_lock_destroy(&work_queue->lock); + + // Free the work queue. + ebpf_free(work_queue); +} + +void +ebpf_timed_work_queue_insert( + _In_ ebpf_timed_work_queue_t* work_queue, + _In_ ebpf_list_entry_t* work_item, + ebpf_work_queue_wakeup_behavior_t wake_behavior) +{ + ebpf_lock_state_t lock_state; + bool timer_armed; + + lock_state = ebpf_lock_lock(&work_queue->lock); + + timer_armed = work_queue->timer_armed; + ebpf_list_insert_tail(&work_queue->work_items, work_item); + + if (wake_behavior == EBPF_WORK_QUEUE_WAKEUP_ON_INSERT) { + KeCancelTimer(&work_queue->timer); + work_queue->timer_armed = false; + KeInsertQueueDpc(&work_queue->dpc, NULL, NULL); + } else if (!timer_armed) { + LARGE_INTEGER due_time; + due_time.QuadPart = -work_queue->interval.QuadPart; + KeSetTimer(&work_queue->timer, due_time, &work_queue->dpc); + work_queue->timer_armed = true; + } + + ebpf_lock_unlock(&work_queue->lock, lock_state); +} + +bool +ebpf_timed_work_queue_is_empty(_In_ ebpf_timed_work_queue_t* work_queue) +{ + return ebpf_list_is_empty(&work_queue->work_items); +} + +void +ebpf_timed_work_queued_flush(_In_ ebpf_timed_work_queue_t* work_queue) +{ + ebpf_lock_state_t lock_state; + ebpf_list_entry_t* work_item; + + lock_state = ebpf_lock_lock(&work_queue->lock); + + if (work_queue->timer_armed) { + KeCancelTimer(&work_queue->timer); + work_queue->timer_armed = false; + } + + while (!ebpf_list_is_empty(&work_queue->work_items)) { + work_item = work_queue->work_items.Flink; + ebpf_list_remove_entry(work_item); + ebpf_lock_unlock(&work_queue->lock, lock_state); + work_queue->callback(work_queue->context, work_queue->cpu_id, work_item); + lock_state = ebpf_lock_lock(&work_queue->lock); + } + + ebpf_lock_unlock(&work_queue->lock, lock_state); +} + +void +_ebpf_timed_work_queue_timer_callback( + _In_ KDPC* dpc, _In_opt_ void* context, _In_opt_ void* system_argument1, _In_opt_ void* system_argument2) +{ + UNREFERENCED_PARAMETER(dpc); + UNREFERENCED_PARAMETER(system_argument1); + UNREFERENCED_PARAMETER(system_argument2); + ebpf_timed_work_queue_t* work_queue = (ebpf_timed_work_queue_t*)context; + if (work_queue) { + ebpf_timed_work_queued_flush(work_queue); + } +} \ No newline at end of file diff --git a/libs/runtime/ebpf_work_queue.h b/libs/runtime/ebpf_work_queue.h new file mode 100644 index 000000000..e5e99c4bc --- /dev/null +++ b/libs/runtime/ebpf_work_queue.h @@ -0,0 +1,92 @@ +// Copyright (c) Microsoft Corporation +// SPDX-License-Identifier: MIT + +#pragma once + +#include "ebpf_platform.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + + /** + * @brief A timed work queue. Entries in the work queue are executed + * either after a fixed interval or when the queue is polled. The purpose of this + * work queue is to allow for deferred execution of work items that are not + * time critical and can be batched together with other work via the poll API. + */ + typedef struct _ebpf_timed_work_queue ebpf_timed_work_queue_t; + + typedef _IRQL_requires_(DISPATCH_LEVEL) void (*ebpf_timed_work_queue_callback_t)( + _Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t*); + + typedef enum _ebpf_work_queue_wakeup_behavior + { + EBPF_WORK_QUEUE_WAKEUP_ON_INSERT = 0, ///< Wake up the work queue. + EBPF_WORK_QUEUE_WAKEUP_ON_TIMER = 1, ///< Don't wake up the work queue. + } ebpf_work_queue_wakeup_behavior_t; + + /** + * @brief Create a timed work queue. + * + * @param[out] work_queue Pointer to memory that contains the work queue on success. + * @param[in] cpu_id The CPU to run the work queue on. + * @param[in] interval The interval at which to run the work queue. + * @param[in] callback The callback to execute for each work item. + * @param[in] context The context to pass to the callback. + * + * @retval EBPF_SUCCESS The operation was successful. + * @retval EBPF_NO_MEMORY Unable to allocate resources for this + * operation. + */ + _Must_inspect_result_ ebpf_result_t + ebpf_timed_work_queue_create( + _Out_ ebpf_timed_work_queue_t** work_queue, + uint32_t cpu_id, + _In_ LARGE_INTEGER* interval, + _In_ ebpf_timed_work_queue_callback_t callback, + _In_ void* context); + + /** + * @brief Destroy a timed work queue. + * + * @param[in] work_queue The timed work queue to destroy. + */ + void + ebpf_timed_work_queue_destroy(_In_opt_ ebpf_timed_work_queue_t* work_queue); + + /** + * @brief Insert a work item into the timed work queue. If immediate is true, the timer will fire immediately. + * + * @param[in] work_queue The work queue to insert the work item into. + * @param[in] work_item The work item to insert. + * @param[in] wake_behavior Wake up the work queue if true. + */ + void + ebpf_timed_work_queue_insert( + _In_ ebpf_timed_work_queue_t* work_queue, + _In_ ebpf_list_entry_t* work_item, + ebpf_work_queue_wakeup_behavior_t wake_behavior); + + /** + * @brief Check if the timed work queue is empty without acquiring the lock. + * + * @param[in] work_queue The work queue to check. + * @return true The work queue is empty. + * @return false The work queue is not empty. + */ + bool + ebpf_timed_work_queue_is_empty(_In_ ebpf_timed_work_queue_t* work_queue); + + /** + * @brief Execute the callback for all work items in the timed work queue. + * + * @param[in] work_queue The work queue to execute the callback for. + */ + void + ebpf_timed_work_queued_flush(_In_ ebpf_timed_work_queue_t* work_queue); + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/libs/runtime/kernel/platform_kernel.vcxproj b/libs/runtime/kernel/platform_kernel.vcxproj index a86dcca2f..cb6edae49 100644 --- a/libs/runtime/kernel/platform_kernel.vcxproj +++ b/libs/runtime/kernel/platform_kernel.vcxproj @@ -45,6 +45,7 @@ + @@ -72,6 +73,7 @@ + diff --git a/libs/runtime/kernel/platform_kernel.vcxproj.filters b/libs/runtime/kernel/platform_kernel.vcxproj.filters index 996ad1330..2780f56f5 100644 --- a/libs/runtime/kernel/platform_kernel.vcxproj.filters +++ b/libs/runtime/kernel/platform_kernel.vcxproj.filters @@ -73,6 +73,9 @@ Source Files + + Source Files + @@ -147,5 +150,8 @@ Header Files + + Header Files + \ No newline at end of file diff --git a/libs/runtime/unit/platform_unit_test.cpp b/libs/runtime/unit/platform_unit_test.cpp index eb1fbf696..fa1b0111c 100644 --- a/libs/runtime/unit/platform_unit_test.cpp +++ b/libs/runtime/unit/platform_unit_test.cpp @@ -17,6 +17,7 @@ #include "ebpf_ring_buffer.h" #include "ebpf_serialize.h" #include "ebpf_state.h" +#include "ebpf_work_queue.h" #include "helpers.h" #include @@ -1225,3 +1226,81 @@ TEST_CASE("verify random", "[platform]") // Verify that the random number generators do not have a dominant frequency. REQUIRE(!has_dominant_frequency(SEQUENCE_LENGTH, ebpf_random_uint32)); } + +TEST_CASE("work_queue", "[platform]") +{ + _test_helper test_helper; + test_helper.initialize(); + struct _work_item_context + { + LIST_ENTRY list_entry; + KEVENT completion_event; + } work_item_context; + + ebpf_list_initialize(&work_item_context.list_entry); + + KeInitializeEvent(&work_item_context.completion_event, NotificationEvent, FALSE); + + ebpf_timed_work_queue_t* work_queue; + LARGE_INTEGER interval; + + interval.QuadPart = 10 * 1000 * 100; // 100ms + int context = 1; + REQUIRE( + ebpf_timed_work_queue_create( + &work_queue, + 0, + &interval, + [](_Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* entry) { + UNREFERENCED_PARAMETER(context); + UNREFERENCED_PARAMETER(cpu_id); + auto work_item_context = reinterpret_cast<_work_item_context*>(entry); + KeSetEvent(&work_item_context->completion_event, 0, FALSE); + }, + &context) == EBPF_SUCCESS); + + // Unique ptr that will call ebpf_timed_work_queue_destroy when it goes out of scope. + std::unique_ptr work_queue_ptr( + work_queue, &ebpf_timed_work_queue_destroy); + + // Queue a work item without flush. + ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_TIMER); + + LARGE_INTEGER timeout = {0}; + + // Verify that the work item is not signaled immediately. + REQUIRE( + KeWaitForSingleObject(&work_item_context.completion_event, Executive, KernelMode, FALSE, &timeout) == + STATUS_TIMEOUT); + + // Verify the queue is not empty. + REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == false); + + timeout.QuadPart = -10 * 1000 * 1000; // 1s + + // Verify that the work item is signaled after 100ms. + REQUIRE( + KeWaitForSingleObject(&work_item_context.completion_event, Executive, KernelMode, FALSE, &timeout) == + STATUS_SUCCESS); + + // Queue a work item with flush. + ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_INSERT); + + // Wait for active DPCs to complete. + KeFlushQueuedDpcs(); + + // Verify the queue is now empty. + REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == true); + + // Queue a work item without flush. + ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_TIMER); + + // Verify the queue is not empty. + REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == false); + + // Process the work queue. + ebpf_timed_work_queued_flush(work_queue); + + // Verify the queue is now empty. + REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == true); +} diff --git a/libs/runtime/user/platform_user.vcxproj b/libs/runtime/user/platform_user.vcxproj index c6548c24b..65a865982 100644 --- a/libs/runtime/user/platform_user.vcxproj +++ b/libs/runtime/user/platform_user.vcxproj @@ -34,6 +34,7 @@ + @@ -48,6 +49,7 @@ + diff --git a/libs/runtime/user/platform_user.vcxproj.filters b/libs/runtime/user/platform_user.vcxproj.filters index 03f8518c2..7d23a2f5f 100644 --- a/libs/runtime/user/platform_user.vcxproj.filters +++ b/libs/runtime/user/platform_user.vcxproj.filters @@ -76,6 +76,9 @@ Source Files + + Source Files + @@ -114,5 +117,8 @@ Header Files + + Header Files + \ No newline at end of file