* Timed work queue

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* Fix code analysis failure

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* Switch to timed_work_queue to save DPCs

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* Fix code-analysis failure

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* Fix fault injection failure

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* Fix code analysis failure

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

* PR feedback

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>

---------

Signed-off-by: Alan Jowett <alan.jowett@microsoft.com>
Co-authored-by: Alan Jowett <alan.jowett@microsoft.com>
This commit is contained in:
Alan Jowett 2023-10-04 11:04:52 -07:00 коммит произвёл GitHub
Родитель 9f66160a9d
Коммит f7010d45f3
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
8 изменённых файлов: 414 добавлений и 86 удалений

Просмотреть файл

@ -3,6 +3,7 @@
#include "ebpf_epoch.h"
#include "ebpf_tracelog.h"
#include "ebpf_work_queue.h"
/**
* @brief Epoch Base Memory Reclamation.
@ -50,6 +51,7 @@ typedef __declspec(align(EBPF_CACHE_LINE_SIZE)) struct _ebpf_epoch_cpu_entry
int timer_armed : 1; ///< Set if the flush timer is armed.
int rundown_in_progress : 1; ///< Set if rundown is in progress.
int epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress.
ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items.
} ebpf_epoch_cpu_entry_t;
/**
@ -107,7 +109,9 @@ typedef enum _ebpf_epoch_cpu_message_type
*/
typedef struct _ebpf_epoch_cpu_message
{
LIST_ENTRY list_entry; ///< List entry used to insert the message into the message queue.
ebpf_epoch_cpu_message_type_t message_type;
ebpf_work_queue_wakeup_behavior_t wake_behavior;
union
{
struct
@ -150,11 +154,6 @@ static ebpf_epoch_cpu_message_t _ebpf_epoch_compute_release_epoch_message = {0};
*/
static KDPC _ebpf_epoch_timer_dpc;
/**
* @brief DPC used to process epoch computation.
*/
static KDPC _ebpf_epoch_compute_release_epoch_dpc;
/**
* @brief Type of entry in the free list.
* There are two types of entries in the free list:
@ -198,8 +197,8 @@ cxplat_rundown_reference_t _ebpf_epoch_work_item_rundown_ref;
static void
_ebpf_epoch_release_free_list(_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, int64_t released_epoch);
_Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker(
_In_ KDPC* dpc, _In_opt_ void* cpu_entry, _In_opt_ void* message, _In_opt_ void* arg2);
_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker(
_Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* message);
_Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_timer_worker(
_In_ KDPC* dpc, _In_opt_ void* cpu_entry, _In_opt_ void* message, _In_opt_ void* arg2);
@ -207,6 +206,9 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void
_IRQL_requires_max_(APC_LEVEL) static void _ebpf_epoch_send_message_and_wait(
_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id);
static void
_ebpf_epoch_send_message_async(_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id);
_IRQL_requires_same_ static void
_ebpf_epoch_insert_in_free_list(_In_ ebpf_epoch_allocation_header_t* header);
@ -276,11 +278,18 @@ ebpf_epoch_initiate()
cpu_entry->current_epoch = 1;
ebpf_list_initialize(&cpu_entry->epoch_state_list);
ebpf_list_initialize(&cpu_entry->free_list);
LARGE_INTEGER interval;
interval.QuadPart = EBPF_EPOCH_FLUSH_DELAY_IN_NANOSECONDS / EBPF_NANO_SECONDS_PER_FILETIME_TICK;
ebpf_result_t result = ebpf_timed_work_queue_create(
&cpu_entry->work_queue, cpu_id, &interval, _ebpf_epoch_messenger_worker, cpu_entry);
if (result != EBPF_SUCCESS) {
return_value = result;
goto Error;
}
}
KeInitializeDpc(&_ebpf_epoch_compute_release_epoch_dpc, _ebpf_epoch_messenger_worker, NULL);
KeInitializeDpc(&_ebpf_epoch_timer_dpc, _ebpf_epoch_timer_worker, NULL);
KeSetTargetProcessorDpc(&_ebpf_epoch_compute_release_epoch_dpc, 0);
KeSetTargetProcessorDpc(&_ebpf_epoch_timer_dpc, 0);
KeInitializeTimer(&_ebpf_epoch_compute_release_epoch_timer);
@ -304,12 +313,13 @@ ebpf_epoch_terminate()
}
rundown_message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_RUNDOWN_IN_PROGRESS;
rundown_message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT;
_ebpf_epoch_send_message_and_wait(&rundown_message, 0);
// Cancel the timer.
KeCancelTimer(&_ebpf_epoch_compute_release_epoch_timer);
// Wait for the DPC to complete.
// Wait for the active DPC to complete.
KeFlushQueuedDpcs();
for (cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) {
@ -317,6 +327,7 @@ ebpf_epoch_terminate()
// Release all memory that is still in the free list.
_ebpf_epoch_release_free_list(cpu_entry, MAXINT64);
ebpf_assert(ebpf_list_is_empty(&cpu_entry->free_list));
ebpf_timed_work_queue_destroy(cpu_entry->work_queue);
}
// Wait for all work items to complete.
@ -331,9 +342,9 @@ ebpf_epoch_terminate()
}
#pragma warning(push)
#pragma warning( \
disable : 28166) // warning C28166: The function 'ebpf_epoch_enter' does not restore the IRQL to the value that was
// current at function entry and is required to do so. IRQL was last set to 2 at line 334.
#pragma warning(disable : 28166) // warning C28166: Code analysis incorrectly reports that the function
// 'ebpf_epoch_enter' does not restore the IRQL to the value that was current at
// function entry.
_IRQL_requires_same_ void
ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state)
{
@ -350,9 +361,8 @@ ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state)
#pragma warning(push)
#pragma warning( \
disable : 28166) // warning C28166: The function 'ebpf_epoch_exit' does not restore the IRQL to the value that was
// current at function entry and is required to do so. IRQL was last set to 2 at line 353.
// the IRQL.
disable : 28166) // warning C28166: Code analysis incorrectly reports that the function 'ebpf_epoch_exit'
// does not restore the IRQL to the value that was current at function entry.
_IRQL_requires_same_ void
ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state)
{
@ -377,6 +387,7 @@ ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state)
ebpf_epoch_cpu_message_t message = {0};
message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_EXIT_EPOCH;
message.message.exit_epoch.epoch_state = epoch_state;
message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT;
// The other CPU will call ebpf_epoch_exit() on our behalf.
// The epoch was entered at < DISPATCH_LEVEL but will now exit at DISPATCH_LEVEL. To prevent the assert above
@ -394,6 +405,11 @@ ebpf_epoch_exit(_In_ ebpf_epoch_state_t* epoch_state)
ebpf_list_remove_entry(&epoch_state->epoch_list_entry);
_ebpf_epoch_arm_timer_if_needed(&_ebpf_epoch_cpu_table[cpu_id]);
// If there are items in the work queue, flush them.
if (!ebpf_timed_work_queue_is_empty(_ebpf_epoch_cpu_table[cpu_id].work_queue)) {
ebpf_timed_work_queued_flush(_ebpf_epoch_cpu_table[cpu_id].work_queue);
}
_ebpf_epoch_lower_to_previous_irql(epoch_state->irql_at_enter);
}
#pragma warning(pop)
@ -496,6 +512,7 @@ ebpf_epoch_flush()
ebpf_epoch_cpu_message_t message = {0};
message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH;
message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT;
_ebpf_epoch_send_message_and_wait(&message, 0);
}
@ -506,6 +523,7 @@ ebpf_epoch_is_free_list_empty(uint32_t cpu_id)
ebpf_epoch_cpu_message_t message = {0};
message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_IS_FREE_LIST_EMPTY;
message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_INSERT;
_ebpf_epoch_send_message_and_wait(&message, cpu_id);
@ -653,9 +671,9 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void
_ebpf_epoch_skipped_timers = 0;
memset(&_ebpf_epoch_compute_release_epoch_message, 0, sizeof(_ebpf_epoch_compute_release_epoch_message));
_ebpf_epoch_compute_release_epoch_message.message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH;
_ebpf_epoch_compute_release_epoch_message.wake_behavior = EBPF_WORK_QUEUE_WAKEUP_ON_TIMER;
KeInitializeEvent(&_ebpf_epoch_compute_release_epoch_message.completion_event, NotificationEvent, false);
KeSetTargetProcessorDpc(&_ebpf_epoch_compute_release_epoch_dpc, 0);
KeInsertQueueDpc(&_ebpf_epoch_compute_release_epoch_dpc, &_ebpf_epoch_compute_release_epoch_message, NULL);
_ebpf_epoch_send_message_async(&_ebpf_epoch_compute_release_epoch_message, 0);
} else {
_ebpf_epoch_skipped_timers++;
LARGE_INTEGER due_time;
@ -668,10 +686,7 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void
* @brief DPC that runs when a message is sent between CPUs.
*/
typedef void (*ebpf_epoch_messenger_worker_t)(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu);
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu);
/**
* @brief Compute the next proposed release epoch and send it to the next CPU.
@ -683,20 +698,14 @@ typedef void (*ebpf_epoch_messenger_worker_t)(
* last CPU forwards the message to the next CPU. The last CPU then sends an
* EBPF_EPOCH_CPU_MESSAGE_TYPE_COMMIT_RELEASE_EPOCH message to CPU 0 with the final proposed release epoch.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to compute the epoch for.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_propose_release_epoch(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
UNREFERENCED_PARAMETER(dpc);
// Walk over each thread_entry in the epoch_state_list and compute the minimum epoch.
ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink;
ebpf_epoch_state_t* epoch_state;
@ -738,9 +747,7 @@ _ebpf_epoch_messenger_propose_release_epoch(
next_cpu = current_cpu + 1;
}
// Queue the DPC to the next CPU.
KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu);
KeInsertQueueDpc(dpc, message, NULL);
_ebpf_epoch_send_message_async(message, next_cpu);
}
/**
@ -755,19 +762,16 @@ _ebpf_epoch_messenger_propose_release_epoch(
* 5. Forwards the message to the next CPU.
* The last CPU then sends a EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_EPOCH_COMPLETE message to CPU 0.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to rearm the timer for.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_commit_release_epoch(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
uint32_t next_cpu;
cpu_entry->timer_armed = false;
// Set the released_epoch to the value computed by the EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH message.
cpu_entry->released_epoch = message->message.commit_epoch.released_epoch - 1;
@ -781,9 +785,7 @@ _ebpf_epoch_messenger_commit_release_epoch(
next_cpu = 0;
}
// Queue the DPC to the next CPU.
KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu);
KeInsertQueueDpc(dpc, message, NULL);
_ebpf_epoch_send_message_async(message, next_cpu);
_ebpf_epoch_release_free_list(cpu_entry, cpu_entry->released_epoch);
}
@ -795,21 +797,17 @@ _ebpf_epoch_messenger_commit_release_epoch(
* CPU 0 clears the epoch computation in progress flag and signals the KEVENT associated with the message to signal any
* waiting threads that the operation is completed.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to mark the computation as complete for.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_compute_epoch_complete(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
UNREFERENCED_PARAMETER(current_cpu);
// If this is the timer's DPC, then mark the computation as complete.
if (dpc == &_ebpf_epoch_compute_release_epoch_dpc) {
if (message == &_ebpf_epoch_compute_release_epoch_message) {
cpu_entry->epoch_computation_in_progress = false;
} else {
// This is an adhoc flush. Signal the caller that the flush is complete.
@ -824,19 +822,14 @@ _ebpf_epoch_messenger_compute_epoch_complete(
* The CPU removes the ebpf_epoch_state_t from the per-CPU thread list and signals the KEVENT associated with the
* message to signal any waiting threads that the operation is completed.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to call ebpf_epoch_exit() for.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_exit_epoch(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
UNREFERENCED_PARAMETER(dpc);
UNREFERENCED_PARAMETER(current_cpu);
UNREFERENCED_PARAMETER(cpu_entry);
@ -850,20 +843,15 @@ _ebpf_epoch_messenger_exit_epoch(
* Message is sent to each CPU to notify it that epoch code is shutting down and that no future timers should be armed
* and future messages should be ignored.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to set the flag for.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_rundown_in_progress(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
uint32_t next_cpu;
UNREFERENCED_PARAMETER(dpc);
cpu_entry->rundown_in_progress = true;
// If this is the last CPU, then stop.
if (current_cpu != _ebpf_epoch_cpu_count - 1) {
@ -877,9 +865,7 @@ _ebpf_epoch_messenger_rundown_in_progress(
return;
}
// Queue the DPC to the next CPU.
KeSetTargetProcessorDpc(dpc, (uint8_t)next_cpu);
KeInsertQueueDpc(dpc, message, NULL);
_ebpf_epoch_send_message_async(message, next_cpu);
}
/**
@ -887,19 +873,14 @@ _ebpf_epoch_messenger_rundown_in_progress(
* EBPF_EPOCH_CPU_MESSAGE_TYPE_IS_FREE_LIST_EMPTY message:
* Message is sent to each CPU to query if its local free list is empty.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] cpu_entry CPU entry to check.
* @param[in] message Message to process.
* @param[in] current_cpu Current CPU.
*/
void
_ebpf_epoch_messenger_is_free_list_empty(
_In_ KDPC* dpc,
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry,
_Inout_ ebpf_epoch_cpu_message_t* message,
uint32_t current_cpu)
_Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu)
{
UNREFERENCED_PARAMETER(dpc);
UNREFERENCED_PARAMETER(current_cpu);
message->message.is_free_list_empty.is_empty = ebpf_list_is_empty(&cpu_entry->free_list);
KeSetEvent(&message->completion_event, 0, FALSE);
@ -921,33 +902,28 @@ static ebpf_epoch_messenger_worker_t _ebpf_epoch_messenger_workers[] = {
*
* If rundown is in progress, then the message is ignored.
*
* @param[in] dpc DPC that triggered this function.
* @param[in] context Context passed to the DPC - not used.
* @param[in] arg1 The ebpf_epoch_cpu_message_t to process.
* @param[in] arg2 Not used.
* @param[in] list_entry List entry that contains the message to process.
*/
_Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker(
_In_ KDPC* dpc, _In_opt_ void* context, _In_opt_ void* arg1, _In_opt_ void* arg2)
_IRQL_requires_(DISPATCH_LEVEL) static void _ebpf_epoch_messenger_worker(
_Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* list_entry)
{
UNREFERENCED_PARAMETER(context);
UNREFERENCED_PARAMETER(arg2);
uint32_t cpu_id = ebpf_get_current_cpu();
ebpf_assert(ebpf_get_current_cpu() == cpu_id);
ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id];
ebpf_epoch_cpu_message_t* message = (ebpf_epoch_cpu_message_t*)arg1;
ebpf_epoch_cpu_message_t* message = CONTAINING_RECORD(list_entry, ebpf_epoch_cpu_message_t, list_entry);
// If rundown is in progress, then exit immediately.
if (cpu_entry->rundown_in_progress) {
return;
}
_Analysis_assume_(arg1 != NULL);
if (message->message_type > EBPF_COUNT_OF(_ebpf_epoch_messenger_workers) || message->message_type < 0) {
ebpf_assert(!"Invalid message type");
return;
}
_ebpf_epoch_messenger_workers[message->message_type](dpc, cpu_entry, message, cpu_id);
_ebpf_epoch_messenger_workers[message->message_type](cpu_entry, message, cpu_id);
}
/**
@ -955,26 +931,37 @@ _Function_class_(KDEFERRED_ROUTINE) _IRQL_requires_(DISPATCH_LEVEL) static void
*
* @param[in] message Message to send.
* @param[in] cpu_id CPU to send the message to.
* @param[in] flush If true, process all messages on the target CPU immediately.
*/
_IRQL_requires_max_(APC_LEVEL) static void _ebpf_epoch_send_message_and_wait(
_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id)
{
KDPC dpc;
// Initialize the completion event.
KeInitializeEvent(&message->completion_event, NotificationEvent, FALSE);
// Initialize the dpc.
KeInitializeDpc(&dpc, _ebpf_epoch_messenger_worker, NULL);
KeSetTargetProcessorDpc(&dpc, (uint8_t)cpu_id);
// Send the message.
KeInsertQueueDpc(&dpc, message, NULL);
// Queue the message to the specified CPU.
ebpf_timed_work_queue_insert(
_ebpf_epoch_cpu_table[cpu_id].work_queue, &message->list_entry, message->wake_behavior);
// Wait for the message to complete.
KeWaitForSingleObject(&message->completion_event, Executive, KernelMode, FALSE, NULL);
}
/**
* @brief Send a message to the specified CPU asynchronously.
*
* @param[in] message Message to send.
* @param[in] cpu_id CPU to send the message to.
* @param[in] flush If true, process all messages on the target CPU immediately.
*/
static void
_ebpf_epoch_send_message_async(_In_ ebpf_epoch_cpu_message_t* message, uint32_t cpu_id)
{
// Queue the message to the specified CPU.
ebpf_timed_work_queue_insert(
_ebpf_epoch_cpu_table[cpu_id].work_queue, &message->list_entry, message->wake_behavior);
}
/**
* @brief Callback for ebpf_preemptible_work_item_t that calls the callback
* function in ebpf_epoch_work_item_t.

Просмотреть файл

@ -0,0 +1,154 @@
// Copyright (c) Microsoft Corporation
// SPDX-License-Identifier: MIT
#include "ebpf_work_queue.h"
typedef struct _ebpf_timed_work_queue
{
KTIMER timer;
KDPC dpc;
ebpf_list_entry_t work_items;
ebpf_lock_t lock;
bool timer_armed;
LARGE_INTEGER interval;
ebpf_timed_work_queue_callback_t callback;
void* context;
uint32_t cpu_id;
} ebpf_timed_work_queue_t;
KDEFERRED_ROUTINE _ebpf_timed_work_queue_timer_callback;
void
_ebpf_timed_work_queue_timer_callback(
_In_ KDPC* dpc, _In_opt_ void* deferred_context, _In_opt_ void* system_argument1, _In_opt_ void* system_argument2);
_Must_inspect_result_ ebpf_result_t
ebpf_timed_work_queue_create(
_Out_ ebpf_timed_work_queue_t** work_queue,
uint32_t cpu_id,
_In_ LARGE_INTEGER* interval,
_In_ ebpf_timed_work_queue_callback_t callback,
_In_ void* context)
{
ebpf_timed_work_queue_t* local_work_queue = NULL;
ebpf_result_t return_value;
local_work_queue = ebpf_allocate(sizeof(ebpf_timed_work_queue_t));
if (!local_work_queue) {
return_value = EBPF_NO_MEMORY;
goto Done;
}
local_work_queue->callback = callback;
local_work_queue->context = context;
local_work_queue->interval = *interval;
local_work_queue->cpu_id = cpu_id;
ebpf_lock_create(&local_work_queue->lock);
ebpf_list_initialize(&local_work_queue->work_items);
KeInitializeTimer(&local_work_queue->timer);
KeInitializeDpc(&local_work_queue->dpc, _ebpf_timed_work_queue_timer_callback, local_work_queue);
KeSetTargetProcessorDpc(&local_work_queue->dpc, (CCHAR)cpu_id);
*work_queue = local_work_queue;
local_work_queue = NULL;
return_value = EBPF_SUCCESS;
Done:
ebpf_timed_work_queue_destroy(local_work_queue);
return return_value;
}
void
ebpf_timed_work_queue_destroy(_In_opt_ ebpf_timed_work_queue_t* work_queue)
{
if (!work_queue) {
return;
}
// Cancel the timer.
KeCancelTimer(&work_queue->timer);
// Wait for the DPC to complete.
KeFlushQueuedDpcs();
// Destroy the lock.
ebpf_lock_destroy(&work_queue->lock);
// Free the work queue.
ebpf_free(work_queue);
}
void
ebpf_timed_work_queue_insert(
_In_ ebpf_timed_work_queue_t* work_queue,
_In_ ebpf_list_entry_t* work_item,
ebpf_work_queue_wakeup_behavior_t wake_behavior)
{
ebpf_lock_state_t lock_state;
bool timer_armed;
lock_state = ebpf_lock_lock(&work_queue->lock);
timer_armed = work_queue->timer_armed;
ebpf_list_insert_tail(&work_queue->work_items, work_item);
if (wake_behavior == EBPF_WORK_QUEUE_WAKEUP_ON_INSERT) {
KeCancelTimer(&work_queue->timer);
work_queue->timer_armed = false;
KeInsertQueueDpc(&work_queue->dpc, NULL, NULL);
} else if (!timer_armed) {
LARGE_INTEGER due_time;
due_time.QuadPart = -work_queue->interval.QuadPart;
KeSetTimer(&work_queue->timer, due_time, &work_queue->dpc);
work_queue->timer_armed = true;
}
ebpf_lock_unlock(&work_queue->lock, lock_state);
}
bool
ebpf_timed_work_queue_is_empty(_In_ ebpf_timed_work_queue_t* work_queue)
{
return ebpf_list_is_empty(&work_queue->work_items);
}
void
ebpf_timed_work_queued_flush(_In_ ebpf_timed_work_queue_t* work_queue)
{
ebpf_lock_state_t lock_state;
ebpf_list_entry_t* work_item;
lock_state = ebpf_lock_lock(&work_queue->lock);
if (work_queue->timer_armed) {
KeCancelTimer(&work_queue->timer);
work_queue->timer_armed = false;
}
while (!ebpf_list_is_empty(&work_queue->work_items)) {
work_item = work_queue->work_items.Flink;
ebpf_list_remove_entry(work_item);
ebpf_lock_unlock(&work_queue->lock, lock_state);
work_queue->callback(work_queue->context, work_queue->cpu_id, work_item);
lock_state = ebpf_lock_lock(&work_queue->lock);
}
ebpf_lock_unlock(&work_queue->lock, lock_state);
}
void
_ebpf_timed_work_queue_timer_callback(
_In_ KDPC* dpc, _In_opt_ void* context, _In_opt_ void* system_argument1, _In_opt_ void* system_argument2)
{
UNREFERENCED_PARAMETER(dpc);
UNREFERENCED_PARAMETER(system_argument1);
UNREFERENCED_PARAMETER(system_argument2);
ebpf_timed_work_queue_t* work_queue = (ebpf_timed_work_queue_t*)context;
if (work_queue) {
ebpf_timed_work_queued_flush(work_queue);
}
}

Просмотреть файл

@ -0,0 +1,92 @@
// Copyright (c) Microsoft Corporation
// SPDX-License-Identifier: MIT
#pragma once
#include "ebpf_platform.h"
#ifdef __cplusplus
extern "C"
{
#endif
/**
* @brief A timed work queue. Entries in the work queue are executed
* either after a fixed interval or when the queue is polled. The purpose of this
* work queue is to allow for deferred execution of work items that are not
* time critical and can be batched together with other work via the poll API.
*/
typedef struct _ebpf_timed_work_queue ebpf_timed_work_queue_t;
typedef _IRQL_requires_(DISPATCH_LEVEL) void (*ebpf_timed_work_queue_callback_t)(
_Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t*);
typedef enum _ebpf_work_queue_wakeup_behavior
{
EBPF_WORK_QUEUE_WAKEUP_ON_INSERT = 0, ///< Wake up the work queue.
EBPF_WORK_QUEUE_WAKEUP_ON_TIMER = 1, ///< Don't wake up the work queue.
} ebpf_work_queue_wakeup_behavior_t;
/**
* @brief Create a timed work queue.
*
* @param[out] work_queue Pointer to memory that contains the work queue on success.
* @param[in] cpu_id The CPU to run the work queue on.
* @param[in] interval The interval at which to run the work queue.
* @param[in] callback The callback to execute for each work item.
* @param[in] context The context to pass to the callback.
*
* @retval EBPF_SUCCESS The operation was successful.
* @retval EBPF_NO_MEMORY Unable to allocate resources for this
* operation.
*/
_Must_inspect_result_ ebpf_result_t
ebpf_timed_work_queue_create(
_Out_ ebpf_timed_work_queue_t** work_queue,
uint32_t cpu_id,
_In_ LARGE_INTEGER* interval,
_In_ ebpf_timed_work_queue_callback_t callback,
_In_ void* context);
/**
* @brief Destroy a timed work queue.
*
* @param[in] work_queue The timed work queue to destroy.
*/
void
ebpf_timed_work_queue_destroy(_In_opt_ ebpf_timed_work_queue_t* work_queue);
/**
* @brief Insert a work item into the timed work queue. If immediate is true, the timer will fire immediately.
*
* @param[in] work_queue The work queue to insert the work item into.
* @param[in] work_item The work item to insert.
* @param[in] wake_behavior Wake up the work queue if true.
*/
void
ebpf_timed_work_queue_insert(
_In_ ebpf_timed_work_queue_t* work_queue,
_In_ ebpf_list_entry_t* work_item,
ebpf_work_queue_wakeup_behavior_t wake_behavior);
/**
* @brief Check if the timed work queue is empty without acquiring the lock.
*
* @param[in] work_queue The work queue to check.
* @return true The work queue is empty.
* @return false The work queue is not empty.
*/
bool
ebpf_timed_work_queue_is_empty(_In_ ebpf_timed_work_queue_t* work_queue);
/**
* @brief Execute the callback for all work items in the timed work queue.
*
* @param[in] work_queue The work queue to execute the callback for.
*/
void
ebpf_timed_work_queued_flush(_In_ ebpf_timed_work_queue_t* work_queue);
#ifdef __cplusplus
}
#endif

Просмотреть файл

@ -45,6 +45,7 @@
<ClCompile Include="..\ebpf_ring_buffer.c" />
<ClCompile Include="..\ebpf_state.c" />
<ClCompile Include="..\ebpf_trampoline.c" />
<ClCompile Include="..\ebpf_work_queue.c" />
<ClCompile Include="ebpf_fault_injection_kernel.c" />
<ClCompile Include="ebpf_handle_kernel.c" />
<ClCompile Include="ebpf_native_kernel.c" />
@ -72,6 +73,7 @@
<ClInclude Include="..\ebpf_ring_buffer.h" />
<ClInclude Include="..\ebpf_serialize.h" />
<ClInclude Include="..\ebpf_state.h" />
<ClInclude Include="..\ebpf_work_queue.h" />
<ClInclude Include="framework.h" />
<ClInclude Include="stdbool.h" />
<ClInclude Include="stdint.h" />

Просмотреть файл

@ -73,6 +73,9 @@
<ClCompile Include="..\ebpf_random.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\ebpf_work_queue.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\ebpf_epoch.h">
@ -147,5 +150,8 @@
<ClInclude Include="..\..\..\external\usersim\cxplat\inc\winkernel\cxplat_winkernel.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\ebpf_work_queue.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>

Просмотреть файл

@ -17,6 +17,7 @@
#include "ebpf_ring_buffer.h"
#include "ebpf_serialize.h"
#include "ebpf_state.h"
#include "ebpf_work_queue.h"
#include "helpers.h"
#include <winsock2.h>
@ -1225,3 +1226,81 @@ TEST_CASE("verify random", "[platform]")
// Verify that the random number generators do not have a dominant frequency.
REQUIRE(!has_dominant_frequency(SEQUENCE_LENGTH, ebpf_random_uint32));
}
TEST_CASE("work_queue", "[platform]")
{
_test_helper test_helper;
test_helper.initialize();
struct _work_item_context
{
LIST_ENTRY list_entry;
KEVENT completion_event;
} work_item_context;
ebpf_list_initialize(&work_item_context.list_entry);
KeInitializeEvent(&work_item_context.completion_event, NotificationEvent, FALSE);
ebpf_timed_work_queue_t* work_queue;
LARGE_INTEGER interval;
interval.QuadPart = 10 * 1000 * 100; // 100ms
int context = 1;
REQUIRE(
ebpf_timed_work_queue_create(
&work_queue,
0,
&interval,
[](_Inout_ void* context, uint32_t cpu_id, _Inout_ ebpf_list_entry_t* entry) {
UNREFERENCED_PARAMETER(context);
UNREFERENCED_PARAMETER(cpu_id);
auto work_item_context = reinterpret_cast<_work_item_context*>(entry);
KeSetEvent(&work_item_context->completion_event, 0, FALSE);
},
&context) == EBPF_SUCCESS);
// Unique ptr that will call ebpf_timed_work_queue_destroy when it goes out of scope.
std::unique_ptr<ebpf_timed_work_queue_t, decltype(&ebpf_timed_work_queue_destroy)> work_queue_ptr(
work_queue, &ebpf_timed_work_queue_destroy);
// Queue a work item without flush.
ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_TIMER);
LARGE_INTEGER timeout = {0};
// Verify that the work item is not signaled immediately.
REQUIRE(
KeWaitForSingleObject(&work_item_context.completion_event, Executive, KernelMode, FALSE, &timeout) ==
STATUS_TIMEOUT);
// Verify the queue is not empty.
REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == false);
timeout.QuadPart = -10 * 1000 * 1000; // 1s
// Verify that the work item is signaled after 100ms.
REQUIRE(
KeWaitForSingleObject(&work_item_context.completion_event, Executive, KernelMode, FALSE, &timeout) ==
STATUS_SUCCESS);
// Queue a work item with flush.
ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_INSERT);
// Wait for active DPCs to complete.
KeFlushQueuedDpcs();
// Verify the queue is now empty.
REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == true);
// Queue a work item without flush.
ebpf_timed_work_queue_insert(work_queue, &work_item_context.list_entry, EBPF_WORK_QUEUE_WAKEUP_ON_TIMER);
// Verify the queue is not empty.
REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == false);
// Process the work queue.
ebpf_timed_work_queued_flush(work_queue);
// Verify the queue is now empty.
REQUIRE(ebpf_timed_work_queue_is_empty(work_queue) == true);
}

Просмотреть файл

@ -34,6 +34,7 @@
<ClCompile Include="..\ebpf_ring_buffer.c" />
<ClCompile Include="..\ebpf_state.c" />
<ClCompile Include="..\ebpf_trampoline.c" />
<ClCompile Include="..\ebpf_work_queue.c" />
<ClCompile Include="ebpf_handle_user.c" />
<ClCompile Include="ebpf_native_user.c" />
<ClCompile Include="ebpf_platform_user.cpp" />
@ -48,6 +49,7 @@
<ClInclude Include="..\ebpf_random.h" />
<ClInclude Include="..\ebpf_ring_buffer.h" />
<ClInclude Include="..\ebpf_state.h" />
<ClInclude Include="..\ebpf_work_queue.h" />
<ClInclude Include="framework.h" />
</ItemGroup>
<PropertyGroup Label="Globals">

Просмотреть файл

@ -76,6 +76,9 @@
<ClCompile Include="..\..\shared\tracelog.c">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="..\ebpf_work_queue.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<ClInclude Include="..\ebpf_epoch.h">
@ -114,5 +117,8 @@
<ClInclude Include="..\..\..\external\usersim\cxplat\inc\cxplat.h">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="..\ebpf_work_queue.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
</Project>