From a0b093c940259fff20a9248e7064f839dcaea3f8 Mon Sep 17 00:00:00 2001 From: Alan Jowett Date: Wed, 11 Sep 2024 14:08:04 -0700 Subject: [PATCH] Enable add/remove CPU's from epoch consensus (#3771) Signed-off-by: Alan Jowett Co-authored-by: Alan Jowett --- libs/runtime/ebpf_epoch.c | 222 +++++++++++++++++++++++++++++---- libs/runtime/ebpf_epoch.h | 2 +- libs/runtime/ebpf_work_queue.c | 37 ++++-- libs/runtime/ebpf_work_queue.h | 11 ++ 4 files changed, 231 insertions(+), 41 deletions(-) diff --git a/libs/runtime/ebpf_epoch.c b/libs/runtime/ebpf_epoch.c index 3e4870c08..1a6b163fc 100644 --- a/libs/runtime/ebpf_epoch.c +++ b/libs/runtime/ebpf_epoch.c @@ -17,7 +17,23 @@ * 1) Each CPU determines the minimum epoch of all threads on the CPU. * 2) The minimum epoch is committed as the release epoch and any memory that is older than the release epoch is * released. - * 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again. + * 3) The epoch_computation_in_progress flag is cleared which allows the epoch computation to be initiated again. + * + * Note: + * CPUs can be in one of three states: + * 1) Inactive: The CPU is not actively participating in epoch computation. + * Active flag is false. + * 2) Activating: The CPU is in the process of activating and is not yet active. + * Active flag is true and current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH. + * 3) Active: The CPU is actively participating in epoch computation. + * Active flag is true and current_epoch != EBPF_EPOCH_UNKNOWN_EPOCH. + * + * All CPUs except CPU 0 are in the inactive state at initialization. CPU 0 is always active. + * + * CPUs transition between states as follows: + * 1) Inactive -> Activating: The CPU is activated when a thread enters an epoch and the CPU is not active. + * 2) Activating -> Active: The CPU is active when it is notified of the current epoch value. + * 3) Active -> Inactive: The CPU is deactivated when there are no threads in the epoch and the free list is empty. */ /** @@ -30,6 +46,16 @@ */ #define EBPF_NANO_SECONDS_PER_FILETIME_TICK 100 +/** + * @brief A sentinel value used to indicate that the epoch is unknown. + */ +#define EBPF_EPOCH_UNKNOWN_EPOCH 0 + +/** + * @brief The first valid epoch value. + */ +#define EBPF_EPOCH_FIRST_EPOCH 1 + #define EBPF_EPOCH_FAIL_FAST(REASON, ASSERTION) \ if (!(ASSERTION)) { \ ebpf_assert(!#ASSERTION); \ @@ -51,9 +77,19 @@ typedef __declspec(align(EBPF_CACHE_LINE_SIZE)) struct _ebpf_epoch_cpu_entry int timer_armed : 1; ///< Set if the flush timer is armed. int rundown_in_progress : 1; ///< Set if rundown is in progress. int epoch_computation_in_progress : 1; ///< Set if epoch computation is in progress. - ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items. + int active : 1; ///< CPU is active in epoch computation. Only accessed under _ebpf_epoch_cpu_active_lock. + int work_queue_assigned : 1; ///< Work queue is assigned to this CPU. + ebpf_timed_work_queue_t* work_queue; ///< Work queue used to schedule work items. } ebpf_epoch_cpu_entry_t; +static_assert( + sizeof(ebpf_epoch_cpu_entry_t) % EBPF_CACHE_LINE_SIZE == 0, "ebpf_epoch_cpu_entry_t is not cache aligned"); + +/** + * @brief Lock to ensure a consistent view of the active CPUs. + */ +static ebpf_lock_t _ebpf_epoch_cpu_active_lock; ///< Lock to protect the active CPU list. + /** * @brief Table of per-CPU state. */ @@ -116,12 +152,12 @@ typedef struct _ebpf_epoch_cpu_message { struct { - uint64_t current_epoch; ///< The new current epoch. - uint64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU. + int64_t current_epoch; ///< The new current epoch. + int64_t proposed_release_epoch; ///< Minimum epoch of all threads on the CPU. } propose_epoch; struct { - uint64_t released_epoch; ///< The newest epoch that can be released. + int64_t released_epoch; ///< The newest epoch that can be released. } commit_epoch; struct { @@ -224,6 +260,15 @@ static _IRQL_requires_(DISPATCH_LEVEL) void _ebpf_epoch_arm_timer_if_needed(ebpf static void _ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_work_item, void* context); +static void +_ebpf_epoch_activate_cpu(uint32_t cpu_id); + +static void +_ebpf_epoch_deactivate_cpu(uint32_t cpu_id); + +uint32_t +_ebpf_epoch_next_active_cpu(uint32_t cpu_id); + /** * @brief Raise the CPU's IRQL to DISPATCH_LEVEL if it is below DISPATCH_LEVEL. * First check if the IRQL is below DISPATCH_LEVEL to avoid the overhead of @@ -278,12 +323,13 @@ ebpf_epoch_initiate() goto Error; } + ebpf_lock_create(&_ebpf_epoch_cpu_active_lock); + ebpf_assert(EBPF_CACHE_ALIGN_POINTER(_ebpf_epoch_cpu_table) == _ebpf_epoch_cpu_table); // Initialize the per-CPU state. for (uint32_t cpu_id = 0; cpu_id < _ebpf_epoch_cpu_count; cpu_id++) { ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; - cpu_entry->current_epoch = 1; ebpf_list_initialize(&cpu_entry->epoch_state_list); ebpf_list_initialize(&cpu_entry->free_list); } @@ -302,6 +348,12 @@ ebpf_epoch_initiate() } } + // CPU 0 is always active. + _ebpf_epoch_activate_cpu(0); + + // Set the current epoch for CPU 0. + _ebpf_epoch_cpu_table[0].current_epoch = EBPF_EPOCH_FIRST_EPOCH; + KeInitializeDpc(&_ebpf_epoch_timer_dpc, _ebpf_epoch_timer_worker, NULL); KeSetTargetProcessorDpc(&_ebpf_epoch_timer_dpc, 0); @@ -358,6 +410,7 @@ ebpf_epoch_terminate() cxplat_free( _ebpf_epoch_cpu_table, CXPLAT_POOL_FLAG_NON_PAGED | CXPLAT_POOL_FLAG_CACHE_ALIGNED, EBPF_POOL_TAG_EPOCH); _ebpf_epoch_cpu_table = NULL; + EBPF_RETURN_VOID(); } @@ -376,6 +429,10 @@ ebpf_epoch_enter(_Out_ ebpf_epoch_state_t* epoch_state) ebpf_list_insert_tail(&cpu_entry->epoch_state_list, &epoch_state->epoch_list_entry); _ebpf_epoch_lower_to_previous_irql(epoch_state->irql_at_enter); + + if (!cpu_entry->active) { + _ebpf_epoch_activate_cpu(epoch_state->cpu_id); + } } #pragma warning(pop) @@ -650,6 +707,10 @@ _ebpf_epoch_insert_in_free_list(_In_ ebpf_epoch_allocation_header_t* header) uint32_t cpu_id = ebpf_get_current_cpu(); ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + if (!cpu_entry->active) { + _ebpf_epoch_activate_cpu(cpu_id); + } + if (cpu_entry->rundown_in_progress) { KeLowerIrql(old_irql); switch (header->entry_type) { @@ -747,8 +808,6 @@ void _ebpf_epoch_messenger_propose_release_epoch( _Inout_ ebpf_epoch_cpu_entry_t* cpu_entry, _Inout_ ebpf_epoch_cpu_message_t* message, uint32_t current_cpu) { - // Walk over each thread_entry in the epoch_state_list and compute the minimum epoch. - ebpf_list_entry_t* entry = cpu_entry->epoch_state_list.Flink; ebpf_epoch_state_t* epoch_state; uint32_t next_cpu; @@ -760,6 +819,18 @@ _ebpf_epoch_messenger_propose_release_epoch( } // Other CPUs update the current epoch. else { + // If the epoch was unknown, then update the freed_epoch for all items in the free list now that we know the + // current epoch. This occurs when the CPU is activated and continues until the first epoch is proposed. + if (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH) { + for (ebpf_list_entry_t* entry = cpu_entry->free_list.Flink; entry != &cpu_entry->free_list; + entry = entry->Flink) { + ebpf_epoch_allocation_header_t* header = + CONTAINING_RECORD(entry, ebpf_epoch_allocation_header_t, list_entry); + ebpf_assert(header->freed_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + header->freed_epoch = cpu_entry->current_epoch; + } + } + cpu_entry->current_epoch = message->message.propose_epoch.current_epoch; } @@ -767,25 +838,24 @@ _ebpf_epoch_messenger_propose_release_epoch( MemoryBarrier(); // Previous CPU's minimum epoch. - uint64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch; + int64_t minimum_epoch = message->message.propose_epoch.proposed_release_epoch; - while (entry != &cpu_entry->epoch_state_list) { + // Walk over each thread_entry in the epoch_state_list and compute the minimum epoch. + for (ebpf_list_entry_t* entry = &cpu_entry->epoch_state_list; entry != &cpu_entry->epoch_state_list; + entry = entry->Flink) { epoch_state = CONTAINING_RECORD(entry, ebpf_epoch_state_t, epoch_list_entry); minimum_epoch = min(minimum_epoch, epoch_state->epoch); - entry = entry->Flink; } // Set the proposed release epoch to the minimum epoch seen so far. message->message.propose_epoch.proposed_release_epoch = minimum_epoch; + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, then send a message to the first CPU to commit the release epoch. - if (current_cpu == _ebpf_epoch_cpu_count - 1) { + if (next_cpu == 0) { message->message.commit_epoch.released_epoch = minimum_epoch; message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_COMMIT_RELEASE_EPOCH; - next_cpu = 0; - } else { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; } _ebpf_epoch_send_message_async(message, next_cpu); @@ -813,22 +883,41 @@ _ebpf_epoch_messenger_commit_release_epoch( { uint32_t next_cpu; + // If any epoch_states are in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of a CPU is in progress. + bool other_cpus_are_activating = (message->message.commit_epoch.released_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + + // If this CPU is in EBPF_EPOCH_UNKNOWN_EPOCH, then activation of this CPU is in progress. + bool this_cpu_is_activating = (cpu_entry->current_epoch == EBPF_EPOCH_UNKNOWN_EPOCH); + cpu_entry->timer_armed = false; // Set the released_epoch to the value computed by the EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_RELEASE_EPOCH message. cpu_entry->released_epoch = message->message.commit_epoch.released_epoch - 1; + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, send the message to the first CPU to complete the cycle. - if (current_cpu != _ebpf_epoch_cpu_count - 1) { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; - } else { + if (next_cpu == 0) { message->message_type = EBPF_EPOCH_CPU_MESSAGE_TYPE_PROPOSE_EPOCH_COMPLETE; - next_cpu = 0; } _ebpf_epoch_send_message_async(message, next_cpu); + // Wait for all the CPUs to transition to an active state. + if (other_cpus_are_activating || this_cpu_is_activating) { + // One or more CPUs are still activating. Rearm the timer and wait for the next message. + _ebpf_epoch_arm_timer_if_needed(cpu_entry); + return; + } + + // All CPUs have transitioned to an active state and the epoch computation was successfully completed. + // Release any memory that is associated with expired epochs. _ebpf_epoch_release_free_list(cpu_entry, cpu_entry->released_epoch); + + // Check if this CPU is idle and deactivate it if it is (CPU 0 is always active). + if ((current_cpu != 0) && ebpf_list_is_empty(&cpu_entry->free_list) && + ebpf_list_is_empty(&cpu_entry->epoch_state_list)) { + _ebpf_epoch_deactivate_cpu(current_cpu); + } } /** @@ -894,15 +983,13 @@ _ebpf_epoch_messenger_rundown_in_progress( { uint32_t next_cpu; cpu_entry->rundown_in_progress = true; + + next_cpu = _ebpf_epoch_next_active_cpu(current_cpu); + // If this is the last CPU, then stop. - if (current_cpu != _ebpf_epoch_cpu_count - 1) { - // Send the message to the next CPU. - next_cpu = current_cpu + 1; - } else { + if (next_cpu == 0) { // Signal the caller that rundown is complete. KeSetEvent(&message->completion_event, 0, FALSE); - // Set next_cpu to UINT32_MAX to make code analysis happy. - next_cpu = UINT32_MAX; return; } @@ -1028,3 +1115,84 @@ _ebpf_epoch_work_item_callback(_In_ cxplat_preemptible_work_item_t* preemptible_ cxplat_release_rundown_protection(&_ebpf_epoch_work_item_rundown_ref); } + +/** + * @brief Add the CPU to the next active CPU table. + * + * @param[in] cpu_id CPU to add. + */ +static void +_ebpf_epoch_activate_cpu(uint32_t cpu_id) +{ + EBPF_LOG_ENTRY(); + + EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Activating CPU", cpu_id); + + ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock); + + cpu_entry->active = true; + // When the CPU is activated, the current epoch is not known. + // Memory freed before the current epoch is set will have its epoch set to EBPF_EPOCH_UNKNOWN_EPOCH and have its + // epoch set when the current epoch is known (i.e., when the next epoch is proposed). + cpu_entry->current_epoch = EBPF_EPOCH_UNKNOWN_EPOCH; + + if (!cpu_entry->work_queue_assigned) { + ebpf_result_t result = ebpf_timed_work_queue_set_cpu_id(cpu_entry->work_queue, cpu_id); + if (result != EBPF_SUCCESS) { + // This is a fatal error. The epoch system is in an inconsistent state. + __fastfail(FAST_FAIL_INVALID_ARG); + } + cpu_entry->work_queue_assigned = 1; + } + + ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state); + EBPF_LOG_EXIT(); +} + +/** + * @brief Remove the CPU from the next active CPU table. + * + * @param[in] cpu_id CPU to remove. + */ +static void +_ebpf_epoch_deactivate_cpu(uint32_t cpu_id) +{ + EBPF_LOG_ENTRY(); + + EBPF_LOG_MESSAGE_UINT64(EBPF_TRACELOG_LEVEL_INFO, EBPF_TRACELOG_KEYWORD_EPOCH, "Deactivating CPU", cpu_id); + + ebpf_epoch_cpu_entry_t* cpu_entry = &_ebpf_epoch_cpu_table[cpu_id]; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock); + cpu_entry->active = false; + ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state); + + EBPF_LOG_EXIT(); +} + +/** + * @brief Given the current CPU, return the next active CPU. + * + * @param[in] cpu_id The current CPU. + * @return The next active CPU. + */ +uint32_t +_ebpf_epoch_next_active_cpu(uint32_t cpu_id) +{ + uint32_t next_active_cpu; + ebpf_lock_state_t state = ebpf_lock_lock(&_ebpf_epoch_cpu_active_lock); + + for (next_active_cpu = cpu_id + 1; next_active_cpu < _ebpf_epoch_cpu_count; next_active_cpu++) { + if (_ebpf_epoch_cpu_table[next_active_cpu].active) { + break; + } + } + + if (next_active_cpu == _ebpf_epoch_cpu_count) { + next_active_cpu = 0; + } + + ebpf_lock_unlock(&_ebpf_epoch_cpu_active_lock, state); + + return next_active_cpu; +} diff --git a/libs/runtime/ebpf_epoch.h b/libs/runtime/ebpf_epoch.h index 9bfa30510..b6f1eb023 100644 --- a/libs/runtime/ebpf_epoch.h +++ b/libs/runtime/ebpf_epoch.h @@ -14,7 +14,7 @@ extern "C" typedef struct _ebpf_epoch_state { LIST_ENTRY epoch_list_entry; /// List entry for the epoch list. - uint64_t epoch; /// The epoch when this entry was added to the list. + int64_t epoch; /// The epoch when this entry was added to the list. uint32_t cpu_id; /// The CPU on which this entry was added to the list. KIRQL irql_at_enter; /// The IRQL when this entry was added to the list. } ebpf_epoch_state_t; diff --git a/libs/runtime/ebpf_work_queue.c b/libs/runtime/ebpf_work_queue.c index 564e6039f..7194a100c 100644 --- a/libs/runtime/ebpf_work_queue.c +++ b/libs/runtime/ebpf_work_queue.c @@ -51,19 +51,6 @@ ebpf_timed_work_queue_create( KeInitializeTimer(&local_work_queue->timer); KeInitializeDpc(&local_work_queue->dpc, _ebpf_timed_work_queue_timer_callback, local_work_queue); - PROCESSOR_NUMBER processor_number; - NTSTATUS status = KeGetProcessorNumberFromIndex(cpu_id, &processor_number); - if (!NT_SUCCESS(status)) { - return_value = EBPF_INVALID_ARGUMENT; - goto Done; - } - - status = KeSetTargetProcessorDpcEx(&local_work_queue->dpc, &processor_number); - if (!NT_SUCCESS(status)) { - return_value = EBPF_INVALID_ARGUMENT; - goto Done; - } - *work_queue = local_work_queue; local_work_queue = NULL; return_value = EBPF_SUCCESS; @@ -74,6 +61,30 @@ Done: return return_value; } +_Must_inspect_result_ ebpf_result_t +ebpf_timed_work_queue_set_cpu_id(_Inout_ ebpf_timed_work_queue_t* work_queue, uint32_t cpu_id) +{ + ebpf_result_t return_value; + PROCESSOR_NUMBER processor_number; + NTSTATUS status = KeGetProcessorNumberFromIndex(cpu_id, &processor_number); + if (!NT_SUCCESS(status)) { + return_value = EBPF_INVALID_ARGUMENT; + goto Done; + } + + status = KeSetTargetProcessorDpcEx(&work_queue->dpc, &processor_number); + if (!NT_SUCCESS(status)) { + return_value = EBPF_INVALID_ARGUMENT; + goto Done; + } + + work_queue->cpu_id = cpu_id; + + return_value = EBPF_SUCCESS; +Done: + return return_value; +} + void ebpf_timed_work_queue_destroy(_In_opt_ ebpf_timed_work_queue_t* work_queue) { diff --git a/libs/runtime/ebpf_work_queue.h b/libs/runtime/ebpf_work_queue.h index d87ef6aef..16d1c3429 100644 --- a/libs/runtime/ebpf_work_queue.h +++ b/libs/runtime/ebpf_work_queue.h @@ -48,6 +48,17 @@ extern "C" _In_ ebpf_timed_work_queue_callback_t callback, _In_ void* context); + /** + * @brief Set the CPU ID for the timed work queue. + * + * @param[in] work_queue Work queue to set the CPU ID for. + * @param[in] cpu_id Which CPU to run the work queue on. + * @retval EBPF_SUCCESS The operation was successful. + * @retval EBPF_INVALID_ARGUMENT The CPU ID is invalid. + */ + _Must_inspect_result_ ebpf_result_t + ebpf_timed_work_queue_set_cpu_id(_Inout_ ebpf_timed_work_queue_t* work_queue, uint32_t cpu_id); + /** * @brief Destroy a timed work queue. *