diff --git a/interfaces/devdoc/async_socket_requirements.md b/interfaces/devdoc/async_socket_requirements.md new file mode 100644 index 0000000..4f8d805 --- /dev/null +++ b/interfaces/devdoc/async_socket_requirements.md @@ -0,0 +1,163 @@ +# `async_socket` requirements + +## Overview + +`async_socket` is an interface for an asynchronous socket API on the underlying platform. + +## Design + +The `async_socket` takes ownership of the provided `SOCKET_HANDLE` and is responsible for closing it. For that reason, it is not possible to re-open the `async_socket` after closing it. This is due to the fact that the socket is created by some external event, such as accepting an incoming connection and closing the underlying socket is not reversible. + +## Exposed API + +```c +typedef struct ASYNC_SOCKET_TAG* ASYNC_SOCKET_HANDLE; + +#define ASYNC_SOCKET_OPEN_RESULT_VALUES \ + ASYNC_SOCKET_OPEN_OK, \ + ASYNC_SOCKET_OPEN_ERROR + +MU_DEFINE_ENUM(ASYNC_SOCKET_OPEN_RESULT, ASYNC_SOCKET_OPEN_RESULT_VALUES) + +#define ASYNC_SOCKET_SEND_SYNC_RESULT_VALUES \ + ASYNC_SOCKET_SEND_SYNC_OK, \ + ASYNC_SOCKET_SEND_SYNC_ERROR, \ + ASYNC_SOCKET_SEND_SYNC_NOT_OPEN + +MU_DEFINE_ENUM(ASYNC_SOCKET_SEND_SYNC_RESULT, ASYNC_SOCKET_SEND_SYNC_RESULT_VALUES) + +#define ASYNC_SOCKET_SEND_RESULT_VALUES \ + ASYNC_SOCKET_SEND_OK, \ + ASYNC_SOCKET_SEND_ERROR, \ + ASYNC_SOCKET_SEND_ABANDONED + +MU_DEFINE_ENUM(ASYNC_SOCKET_SEND_RESULT, ASYNC_SOCKET_SEND_RESULT_VALUES) + +#define ASYNC_SOCKET_RECEIVE_RESULT_VALUES \ + ASYNC_SOCKET_RECEIVE_OK, \ + ASYNC_SOCKET_RECEIVE_ERROR, \ + ASYNC_SOCKET_RECEIVE_ABANDONED + +MU_DEFINE_ENUM(ASYNC_SOCKET_RECEIVE_RESULT, ASYNC_SOCKET_RECEIVE_RESULT_VALUES) + +#define ASYNC_SOCKET_NOTIFY_IO_TYPE_VALUES \ + ASYNC_SOCKET_NOTIFY_IO_TYPE_IN, \ + ASYNC_SOCKET_NOTIFY_IO_TYPE_OUT + +MU_DEFINE_ENUM(ASYNC_SOCKET_NOTIFY_IO_TYPE, ASYNC_SOCKET_NOTIFY_IO_TYPE_VALUES) + +#define ASYNC_SOCKET_NOTIFY_IO_RESULT_VALUES \ + ASYNC_SOCKET_NOTIFY_IO_RESULT_IN, \ + ASYNC_SOCKET_NOTIFY_IO_RESULT_OUT, \ + ASYNC_SOCKET_NOTIFY_IO_RESULT_ABANDONED, \ + ASYNC_SOCKET_NOTIFY_IO_RESULT_ERROR + +MU_DEFINE_ENUM(ASYNC_SOCKET_NOTIFY_IO_RESULT, ASYNC_SOCKET_NOTIFY_IO_RESULT_VALUES) + +typedef void (*ON_ASYNC_SOCKET_OPEN_COMPLETE)(void* context, ASYNC_SOCKET_OPEN_RESULT open_result); +typedef void (*ON_ASYNC_SOCKET_SEND_COMPLETE)(void* context, ASYNC_SOCKET_SEND_RESULT send_result); +typedef void (*ON_ASYNC_SOCKET_RECEIVE_COMPLETE)(void* context, ASYNC_SOCKET_RECEIVE_RESULT receive_result, uint32_t bytes_received); +typedef void (*ON_ASYNC_SOCKET_NOTIFY_IO_COMPLETE)(void* context, ASYNC_SOCKET_NOTIFY_IO_RESULT notify_io_result); +typedef int (*ON_ASYNC_SOCKET_SEND)(void* context, ASYNC_SOCKET_HANDLE async_socket, const void* buf, size_t len); +typedef int (*ON_ASYNC_SOCKET_RECV)(void* context, ASYNC_SOCKET_HANDLE async_socket, void* buf, size_t len); + +typedef struct ASYNC_SOCKET_BUFFER_TAG +{ + void* buffer; + uint32_t length; +} ASYNC_SOCKET_BUFFER; + +MOCKABLE_FUNCTION(, ASYNC_SOCKET_HANDLE, async_socket_create, EXECUTION_ENGINE_HANDLE, execution_engine, SOCKET_HANDLE, socket_handle); +MOCKABLE_FUNCTION(, ASYNC_SOCKET_HANDLE, async_socket_create_with_transport, EXECUTION_ENGINE_HANDLE, execution_engine, SOCKET_HANDLE, socket_handle, ON_ASYNC_SOCKET_SEND, on_send, void*, on_send_context, ON_ASYNC_SOCKET_RECV, on_recv, void*, on_recv_context); +MOCKABLE_FUNCTION(, void, async_socket_destroy, ASYNC_SOCKET_HANDLE, async_socket); + +MOCKABLE_FUNCTION(, int, async_socket_open_async, ASYNC_SOCKET_HANDLE, async_socket, ON_ASYNC_SOCKET_OPEN_COMPLETE, on_open_complete, void*, on_open_complete_context); +MOCKABLE_FUNCTION(, void, async_socket_close, ASYNC_SOCKET_HANDLE, async_socket); +MOCKABLE_FUNCTION(, ASYNC_SOCKET_SEND_SYNC_RESULT, async_socket_send_async, ASYNC_SOCKET_HANDLE, async_socket, const ASYNC_SOCKET_BUFFER*, payload, uint32_t, buffer_count, ON_ASYNC_SOCKET_SEND_COMPLETE, on_send_complete, void*, on_send_complete_context); +MOCKABLE_FUNCTION(, int, async_socket_receive_async, ASYNC_SOCKET_HANDLE, async_socket, ASYNC_SOCKET_BUFFER*, payload, uint32_t, buffer_count, ON_ASYNC_SOCKET_RECEIVE_COMPLETE, on_receive_complete, void*, on_receive_complete_context); +MOCKABLE_FUNCTION(, int, async_socket_notify_io_async, ASYNC_SOCKET_HANDLE, async_socket, ASYNC_SOCKET_NOTIFY_IO_TYPE, io_type, ON_ASYNC_SOCKET_NOTIFY_IO_COMPLETE, on_notify_io_complete, void*, on_notify_io_complete_context); +``` + +### ON_ASYNC_SOCKET_SEND + +```c +typedef int (*ON_ASYNC_SOCKET_SEND)(void* context, ASYNC_SOCKET_HANDLE async_socket, const void* buf, size_t len); +``` + +The `ON_ASYNC_SOCKET_SEND` callback is called to handle sending data on the socket. It must return the size of data that was successfully written to the socket, or -1 in case of error. + +When this function fails with -1, an error code may be set using the platform-specific error code mechanism (such as `errno` on Linux). + +### ON_ASYNC_SOCKET_RECV + +```c +typedef int (*ON_ASYNC_SOCKET_RECV)(void* context, ASYNC_SOCKET_HANDLE async_socket, void* buf, size_t len); +``` + +The `ON_ASYNC_SOCKET_RECV` callback is called to handle receiving data from the socket. It must return the size of data that was successfully read from the socket, or -1 in case of error. + +When this function fails with -1, an error code may be set using the platform-specific error code mechanism (such as `errno` on Linux). + +### async_socket_create_with_transport + +```c +MOCKABLE_FUNCTION(, ASYNC_SOCKET_HANDLE, async_socket_create_with_transport, EXECUTION_ENGINE_HANDLE, execution_engine, SOCKET_HANDLE, socket_handle, ON_ASYNC_SOCKET_SEND, on_send, void*, on_send_context, ON_ASYNC_SOCKET_RECV, on_recv, void*, on_recv_context); +``` + +`async_socket_create_with_transport` allows overriding the send and receive functions for the socket. This may not be available on all platforms. + +### async_socket_create + +```c +MOCKABLE_FUNCTION(, ASYNC_SOCKET_HANDLE, async_socket_create, EXECUTION_ENGINE_HANDLE, execution_engine, SOCKET_HANDLE, socket_handle); +``` + +`async_socket_create` creates an async socket. + +### async_socket_destroy + +```c +MOCKABLE_FUNCTION(, void, async_socket_destroy, ASYNC_SOCKET_HANDLE, async_socket); +``` + +`async_socket_destroy` frees all resources associated with `async_socket`. + +### async_socket_open_async + +```c +MOCKABLE_FUNCTION(, int, async_socket_open_async, ASYNC_SOCKET_HANDLE, async_socket, ON_ASYNC_SOCKET_OPEN_COMPLETE, on_open_complete, void*, on_open_complete_context); +``` + +`async_socket_open_async` opens the async socket. + +### async_socket_close + +```c +MOCKABLE_FUNCTION(, void, async_socket_close, ASYNC_SOCKET_HANDLE, async_socket); +``` + +`async_socket_close` closes an open `async_socket`. Note that it is not possible to re-open a socket after closing it as it relies on underlying platform state which has been setup prior to the creating of this module. + +### async_socket_send_async + +```c +MOCKABLE_FUNCTION(, ASYNC_SOCKET_SEND_SYNC_RESULT, async_socket_send_async, ASYNC_SOCKET_HANDLE, async_socket, const ASYNC_SOCKET_BUFFER*, payload, uint32_t, buffer_count, ON_ASYNC_SOCKET_SEND_COMPLETE, on_send_complete, void*, on_send_complete_context); +``` + +`async_socket_send_async` sends a number of buffers asynchronously. + +### async_socket_receive_async + +```c +MOCKABLE_FUNCTION(, int, async_socket_receive_async, ASYNC_SOCKET_HANDLE, async_socket, ASYNC_SOCKET_BUFFER*, payload, uint32_t, buffer_count, ON_SOCKET_ASYNC_RECEIVE_COMPLETE, on_receive_complete, void*, on_receive_complete_context); +``` + +`async_socket_receive_async` receives in a number of buffers asynchronously. + +### async_socket_notify_io_async + +```c +MOCKABLE_FUNCTION(, int, async_socket_notify_io_async, ASYNC_SOCKET_HANDLE, async_socket, ASYNC_SOCKET_NOTIFY_IO_TYPE, io_type, ON_ASYNC_SOCKET_NOTIFY_IO_COMPLETE, on_notify_io_complete, void*, on_notify_io_complete_context); +``` + +`async_socket_notify_io_async` is used only for the Linux platform. diff --git a/linux/devdoc/async_socket_linux_requirements.md b/linux/devdoc/async_socket_linux_requirements.md index ced23ad..113e2d1 100644 --- a/linux/devdoc/async_socket_linux_requirements.md +++ b/linux/devdoc/async_socket_linux_requirements.md @@ -16,6 +16,16 @@ An `async_socket` owns the underlying platform specific socket passed on create. `async_socket_linux` shall not create any threads, but use the completion port linux object for its threading. The API shall be thread-safe with the exception of `async_socket_destroy` which should be only called from one thread. +## Custom Transport + +The `async_socket` may be created with a custom transport handler for send and receive functions. + +This requires callbacks of type `ON_ASYNC_SOCKET_SEND` and `ON_ASYNC_SOCKET_RECV`. + +`ON_ASYNC_SOCKET_SEND` must behave like the `send` function from the libc socket API. It must return the number of bytes sent or `-1` to indicate an error. In case of an error, it must set `errno` with the error code. + +`ON_ASYNC_SOCKET_RECV` must behave like the `recv` function from the libc socket API. It must return the number of bytes received or `-1` to indicate an error. In case of an error, it must set `errno` with the error code. + ## Exposed API `async_socket_linux` implements the `async_socket` API: diff --git a/linux/src/async_socket_linux.c b/linux/src/async_socket_linux.c index b535992..31e72d0 100644 --- a/linux/src/async_socket_linux.c +++ b/linux/src/async_socket_linux.c @@ -14,6 +14,8 @@ #include "c_logging/log_context.h" #include "c_logging/log_context_property_type_ascii_char_ptr.h" #include "c_logging/logger.h" +#include "c_logging/log_errno.h" +#include "c_logging/log_level.h" #include "c_pal/completion_port_linux.h" #include "c_pal/execution_engine.h" @@ -24,6 +26,10 @@ #include "c_pal/sync.h" #include "c_pal/socket_handle.h" +#ifdef ENABLE_SOCKET_LOGGING +#include "c_pal/timer.h" +#endif + #include "c_pal/async_socket.h" #define ASYNC_SOCKET_LINUX_STATE_VALUES \ @@ -190,14 +196,17 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION // Codes_SRS_ASYNC_SOCKET_LINUX_11_081: [ event_complete_callback shall call either the send or recv complete callback with an ABANDONED flag when the IO type is either ASYNC_SOCKET_IO_TYPE_SEND or ASYNC_SOCKET_IO_TYPE_RECEIVE respectively. ] if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_RECEIVE) { + LogError("Receive socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action)); io_context->on_receive_complete(io_context->callback_context, ASYNC_SOCKET_RECEIVE_ABANDONED, 0); } else if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_SEND) { + LogError("Send socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action)); io_context->on_send_complete(io_context->callback_context, ASYNC_SOCKET_SEND_ABANDONED); } else { + LogError("Notify socket has encountered %" PRI_MU_ENUM "", MU_ENUM_VALUE(COMPLETION_PORT_EPOLL_ACTION, action)); // Codes_SRS_ASYNC_SOCKET_LINUX_04_008: [ event_complete_callback shall call the notify complete callback with an ABANDONED flag when the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY. ] io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_ABANDONED); } @@ -218,7 +227,7 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION { ASYNC_SOCKET_RECEIVE_RESULT receive_result; uint32_t index = 0; - ssize_t total_recv_size = 0; + uint32_t total_recv_size = 0; do { @@ -235,12 +244,13 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION } else { - total_recv_size = recv_size = 0; + total_recv_size = 0; + recv_size = 0; if (errno == ECONNRESET) { // Codes_SRS_ASYNC_SOCKET_LINUX_11_090: [ If errno is ECONNRESET, then thread_worker_func shall call the on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_ABANDONED. ] receive_result = ASYNC_SOCKET_RECEIVE_ABANDONED; - LogInfo("A reset on the recv socket has been encountered"); + LogError("A reset on the recv socket has been encountered"); } else { @@ -254,30 +264,42 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION else if (recv_size == 0) { // Codes_SRS_ASYNC_SOCKET_LINUX_11_091: [ If the recv size equals 0, then event_complete_callback shall call on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_ABANDONED. ] + LogError("Socket received 0 bytes, assuming socket is closed"); receive_result = ASYNC_SOCKET_RECEIVE_ABANDONED; break; } else { - // Codes_SRS_ASYNC_SOCKET_LINUX_11_092: [ If the recv size > 0, if we have another buffer to fill then we will attempt another read, otherwise we shall call on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_OK ] - total_recv_size += recv_size; - if (index + 1 >= io_context->data.recv_ctx.total_buffer_count || recv_size <= io_context->data.recv_ctx.recv_buffers[index].length) + if (recv_size > UINT32_MAX || + UINT32_MAX - total_recv_size < recv_size) { - receive_result = ASYNC_SOCKET_RECEIVE_OK; - break; + // Handle unlikely overflow + LogError("Overflow in computing receive size (total_recv_size=%" PRIu32 " + recv_size=%zi > UINT32_MAX=%" PRIu32 ")", + total_recv_size, recv_size, UINT32_MAX); + receive_result = ASYNC_SOCKET_RECEIVE_ERROR; } else { - index++; + // Codes_SRS_ASYNC_SOCKET_LINUX_11_092: [ If the recv size > 0, if we have another buffer to fill then we will attempt another read, otherwise we shall call on_receive_complete callback with the on_receive_complete_context and ASYNC_SOCKET_RECEIVE_OK ] + total_recv_size += recv_size; + if (index + 1 >= io_context->data.recv_ctx.total_buffer_count || recv_size <= io_context->data.recv_ctx.recv_buffers[index].length) + { +#ifdef ENABLE_SOCKET_LOGGING + LogVerbose("Asynchronous receive of %" PRIu32 " bytes completed at %lf", bytes_received, timer_global_get_elapsed_us()); +#endif + receive_result = ASYNC_SOCKET_RECEIVE_OK; + break; + } + else + { + index++; + } } } } while (true); - if (total_recv_size >= 0) - { - // Call the callback - io_context->on_receive_complete(io_context->callback_context, receive_result, total_recv_size); - } + // Call the callback + io_context->on_receive_complete(io_context->callback_context, receive_result, total_recv_size); } // Codes_SRS_ASYNC_SOCKET_LINUX_11_093: [ event_complete_callback shall then free the io_context memory. ] @@ -306,7 +328,7 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION { // Codes_SRS_ASYNC_SOCKET_LINUX_11_098: [ if errno is ECONNRESET, then on_send_complete shall be called with ASYNC_SOCKET_SEND_ABANDONED. ] send_result = ASYNC_SOCKET_SEND_ABANDONED; - LogInfo("A reset on the send socket has been encountered"); + LogError("A reset on the send socket has been encountered"); } else { @@ -317,6 +339,9 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION } else { +#ifdef ENABLE_SOCKET_LOGGING + LogVerbose("Asynchronous send of %" PRIu32 " bytes completed at %lf", bytes_sent, timer_global_get_elapsed_us()); +#endif send_result = ASYNC_SOCKET_SEND_OK; } @@ -334,14 +359,17 @@ static void event_complete_callback(void* context, COMPLETION_PORT_EPOLL_ACTION // Codes_SRS_ASYNC_SOCKET_LINUX_11_086: [ Otherwise event_complete_callback shall call either the send or recv complete callback with an ERROR flag. ] if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_RECEIVE) { + LogError("Receive socket has encountered COMPLETION_PORT_EPOLL_ERROR"); io_context->on_receive_complete(io_context->callback_context, ASYNC_SOCKET_RECEIVE_ERROR, 0); } else if (io_context->io_type == ASYNC_SOCKET_IO_TYPE_SEND) { + LogError("Send socket has encountered COMPLETION_PORT_EPOLL_ERROR"); io_context->on_send_complete(io_context->callback_context, ASYNC_SOCKET_SEND_ERROR); } else { + LogError("Notify socket has encountered COMPLETION_PORT_EPOLL_ERROR"); // Codes_SRS_ASYNC_SOCKET_LINUX_04_011: [ If the IO type is ASYNC_SOCKET_IO_TYPE_NOTIFY then event_complete_callback shall call the notify complete callback with an ERROR flag. ] io_context->on_notify_io_complete(io_context->callback_context, ASYNC_SOCKET_NOTIFY_IO_RESULT_ERROR); } @@ -597,6 +625,8 @@ ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_ } else { + (void)interlocked_increment(&async_socket->pending_api_calls); + ASYNC_SOCKET_LINUX_STATE current_state; // Codes_SRS_ASYNC_SOCKET_LINUX_11_051: [ If async_socket is not OPEN, async_socket_send_async shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ] if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN) @@ -606,7 +636,9 @@ ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_ } else { - (void)interlocked_increment(&async_socket->pending_api_calls); +#ifdef ENABLE_SOCKET_LOGGING + LogVerbose("Starting send of %" PRIu32 " bytes at %lf", total_buffer_bytes, timer_global_get_elapsed_us()); +#endif ASYNC_SOCKET_SEND_RESULT send_result; for (index = 0; index < buffer_count; index++) @@ -655,9 +687,9 @@ ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_ } } // Codes_SRS_ASYNC_SOCKET_LINUX_11_059: [ If the errno value is ECONNRESET, ENOTCONN, or EPIPE shall fail and return ASYNC_SOCKET_SEND_SYNC_NOT_OPEN. ] - else if (errno == ECONNRESET || errno == ENOTCONN || errno == EPIPE) + else if (error_no == ECONNRESET || error_no == ENOTCONN || error_no == EPIPE) { - LogWarning("The connection was forcibly closed by the peer"); + LOGGER_LOG_EX(LOG_LEVEL_WARNING, LOG_ERRNO(), LOG_MESSAGE("The connection was forcibly closed by the peer")); send_result = ASYNC_SOCKET_SEND_ABANDONED; // Socket was closed result = ASYNC_SOCKET_SEND_SYNC_NOT_OPEN; @@ -681,6 +713,9 @@ ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_ // Otherwise we're going to be returning an error if (send_result == ASYNC_SOCKET_SEND_OK) { +#ifdef ENABLE_SOCKET_LOGGING + LogVerbose("Send completed synchronously at %lf", timer_global_get_elapsed_us()); +#endif // Codes_SRS_ASYNC_SOCKET_LINUX_11_061: [ If the send is successful, async_socket_send_async shall call the on_send_complete with on_send_complete_context and ASYNC_SOCKET_SEND_SYNC_OK. ] on_send_complete(on_send_complete_context, send_result); } @@ -688,11 +723,11 @@ ASYNC_SOCKET_SEND_SYNC_RESULT async_socket_send_async(ASYNC_SOCKET_HANDLE async_ { // Do nothing. If failure happend do not call the callback } + } all_ok: - if (interlocked_decrement(&async_socket->pending_api_calls) == 0) - { - wake_by_address_single(&async_socket->pending_api_calls); - } + if (interlocked_decrement(&async_socket->pending_api_calls) == 0) + { + wake_by_address_single(&async_socket->pending_api_calls); } } } @@ -754,6 +789,8 @@ int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BU } else { + (void)interlocked_increment(&async_socket->pending_api_calls); + ASYNC_SOCKET_LINUX_STATE current_state; // Codes_SRS_ASYNC_SOCKET_LINUX_11_072: [ If async_socket is not OPEN, async_socket_receive_async shall fail and return a non-zero value. ] if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN) @@ -764,8 +801,6 @@ int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BU } else { - (void)interlocked_increment(&async_socket->pending_api_calls); - // Codes_SRS_ASYNC_SOCKET_LINUX_11_074: [ The context shall also allocate enough memory to keep an array of buffer_count items. ] ASYNC_SOCKET_IO_CONTEXT* io_context = malloc_flex(sizeof(ASYNC_SOCKET_IO_CONTEXT), buffer_count, sizeof(ASYNC_SOCKET_BUFFER)); if (io_context == NULL) @@ -789,6 +824,10 @@ int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BU io_context->data.recv_ctx.recv_buffers[index].length = payload[index].length; } +#ifdef ENABLE_SOCKET_LOGGING + LogVerbose("Starting receive at %lf", timer_global_get_elapsed_us()); +#endif + // Codes_SRS_ASYNC_SOCKET_LINUX_11_102: [ Then the context shall then be added to the completion port system by calling completion_port_add with EPOLLIN and event_complete_callback as the callback. ] if (completion_port_add(async_socket->completion_port, EPOLLIN | EPOLLRDHUP | EPOLLONESHOT, async_socket->socket_handle, event_complete_callback, io_context) != 0) { @@ -805,11 +844,11 @@ int async_socket_receive_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_BU } free(io_context); } + } all_ok: - if (interlocked_decrement(&async_socket->pending_api_calls) == 0) - { - wake_by_address_single(&async_socket->pending_api_calls); - } + if (interlocked_decrement(&async_socket->pending_api_calls) == 0) + { + wake_by_address_single(&async_socket->pending_api_calls); } } } @@ -834,6 +873,8 @@ int async_socket_notify_io_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_ } else { + (void)interlocked_increment(&async_socket->pending_api_calls); + // Codes_SRS_ASYNC_SOCKET_LINUX_04_015: [ If the async socket's current state is not ASYNC_SOCKET_LINUX_STATE_OPEN then async_socket_notify_io_async shall fail and return a non-zero value. ] ASYNC_SOCKET_LINUX_STATE current_state; if ((current_state = interlocked_add(&async_socket->state, 0)) != ASYNC_SOCKET_LINUX_STATE_OPEN) @@ -843,8 +884,6 @@ int async_socket_notify_io_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_ } else { - (void)interlocked_increment(&async_socket->pending_api_calls); - // Codes_SRS_ASYNC_SOCKET_LINUX_04_017: [ Otherwise async_socket_notify_io_async shall create a context for the notify where the on_notify_io_complete and on_notify_io_complete_context shall be stored. ] ASYNC_SOCKET_IO_CONTEXT* io_context = malloc(sizeof(ASYNC_SOCKET_IO_CONTEXT)); if (io_context == NULL) @@ -879,12 +918,11 @@ int async_socket_notify_io_async(ASYNC_SOCKET_HANDLE async_socket, ASYNC_SOCKET_ free(io_context); } - + } all_ok: - if (interlocked_decrement(&async_socket->pending_api_calls) == 0) - { - wake_by_address_single(&async_socket->pending_api_calls); - } + if (interlocked_decrement(&async_socket->pending_api_calls) == 0) + { + wake_by_address_single(&async_socket->pending_api_calls); } } diff --git a/linux/src/error_handling.c b/linux/src/error_handling.c deleted file mode 100644 index 4d93687..0000000 --- a/linux/src/error_handling.c +++ /dev/null @@ -1,24 +0,0 @@ -// Copyright (C) Microsoft Corporation. All rights reserved. - -#include -#include - -#include "c_pal/gballoc_hl.h" // IWYU pragma: keep - -#include "c_logging/logger.h" - -#include "c_pal/windows_defines_errors.h" - -#include "c_pal/error_handling.h" - -static volatile_atomic int64_t last_error_code; - -void error_handling_set_last_error(void) -{ - -} - -uint64_t error_handling_get_last_error(void) -{ - -} diff --git a/linux/tests/async_socket_linux_ut/async_socket_linux_ut.c b/linux/tests/async_socket_linux_ut/async_socket_linux_ut.c index e005236..c402e05 100644 --- a/linux/tests/async_socket_linux_ut/async_socket_linux_ut.c +++ b/linux/tests/async_socket_linux_ut/async_socket_linux_ut.c @@ -65,6 +65,8 @@ IMPLEMENT_UMOCK_C_ENUM_TYPE(ASYNC_SOCKET_RECEIVE_RESULT, ASYNC_SOCKET_RECEIVE_RE TEST_DEFINE_ENUM_TYPE(ASYNC_SOCKET_NOTIFY_IO_RESULT, ASYNC_SOCKET_NOTIFY_IO_RESULT_VALUES) IMPLEMENT_UMOCK_C_ENUM_TYPE(ASYNC_SOCKET_NOTIFY_IO_RESULT, ASYNC_SOCKET_NOTIFY_IO_RESULT_VALUES) +MU_DEFINE_ENUM_STRINGS(COMPLETION_PORT_EPOLL_ACTION, COMPLETION_PORT_EPOLL_ACTION_VALUES) + static void on_umock_c_error(UMOCK_C_ERROR_CODE error_code) { ASSERT_FAIL("umock_c reported error :%" PRI_MU_ENUM "", MU_ENUM_VALUE(UMOCK_C_ERROR_CODE, error_code)); @@ -138,10 +140,10 @@ static void mock_async_socket_create_with_transport_setup(void) static void setup_async_socket_receive_async_mocks(void) { - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) + .CallCannotFail(); STRICT_EXPECTED_CALL(malloc_flex(IGNORED_ARG, 1, sizeof(ASYNC_SOCKET_BUFFER))); STRICT_EXPECTED_CALL(completion_port_add(test_completion_port, EPOLLIN | EPOLLRDHUP | EPOLLONESHOT, test_socket, IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) @@ -153,10 +155,10 @@ static void setup_async_socket_receive_async_mocks(void) static void setup_async_socket_notify_io_async_mocks(ASYNC_SOCKET_NOTIFY_IO_TYPE io_type) { - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) + .CallCannotFail(); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); STRICT_EXPECTED_CALL(completion_port_add(test_completion_port, (io_type == ASYNC_SOCKET_NOTIFY_IO_TYPE_IN) ? EPOLLIN : EPOLLOUT, test_socket, IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) @@ -461,10 +463,8 @@ TEST_FUNCTION(async_socket_open_async_succeeds) ASSERT_IS_NOT_NULL(async_socket); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_compare_exchange(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_compare_exchange(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(test_on_open_complete(test_callback_ctx, ASYNC_SOCKET_OPEN_OK)); // act @@ -488,10 +488,8 @@ TEST_FUNCTION(async_socket_open_async_succeeds_with_NULL_context) int result; umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_compare_exchange(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_compare_exchange(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(test_on_open_complete(NULL, ASYNC_SOCKET_OPEN_OK)); // act @@ -691,8 +689,7 @@ TEST_FUNCTION(async_socket_close_after_open_and_recv) STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(completion_port_remove(IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(mocked_close(test_socket)); - STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_exchange(IGNORED_ARG, IGNORED_ARG)); STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act @@ -954,7 +951,10 @@ TEST_FUNCTION(async_socket_send_async_when_not_open_fails) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); + STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act result = async_socket_send_async(async_socket, payload_buffers, sizeof(payload_buffers) / sizeof(payload_buffers[0]), test_on_send_complete, test_callback_ctx); @@ -982,7 +982,10 @@ TEST_FUNCTION(async_socket_send_async_after_close_fails) async_socket_close(async_socket); umock_c_reset_all_calls(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); + STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act result = async_socket_send_async(async_socket, payload_buffers, sizeof(payload_buffers) / sizeof(payload_buffers[0]), test_on_send_complete, test_callback_ctx); @@ -1015,8 +1018,8 @@ TEST_FUNCTION(async_socket_send_async_succeeds) umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)); STRICT_EXPECTED_CALL(test_on_send_complete(test_callback_ctx, ASYNC_SOCKET_SEND_OK)); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); @@ -1051,8 +1054,8 @@ TEST_FUNCTION(async_socket_send_async_multiple_sends_succeeds) ssize_t send_amt = payload_buffers[0].length/2; umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(send_amt); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)); @@ -1090,8 +1093,8 @@ TEST_FUNCTION(async_socket_send_async_multiple_sends_WOULDBLOCK_succeeds) ssize_t send_amt = payload_buffers[0].length/2; umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, payload_buffers[0].buffer, payload_buffers[0].length, MSG_NOSIGNAL)) .SetReturn(send_amt); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, payload_buffers[0].buffer + send_amt, payload_buffers[0].length - send_amt, MSG_NOSIGNAL)) @@ -1131,8 +1134,8 @@ TEST_FUNCTION(async_socket_send_async_with_NULL_on_send_complete_context_succeed umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)); STRICT_EXPECTED_CALL(test_on_send_complete(NULL, ASYNC_SOCKET_SEND_OK)); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); @@ -1164,10 +1167,8 @@ TEST_FUNCTION(when_underlying_calls_fail_async_socket_send_async_fails) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); @@ -1203,16 +1204,13 @@ TEST_FUNCTION(when_errno_for_send_returns_EWOULDBLOCK_it_uses_completion_port_tr payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); STRICT_EXPECTED_CALL(completion_port_add(test_completion_port, EPOLLOUT, test_socket, IGNORED_ARG, IGNORED_ARG)); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); @@ -1243,10 +1241,8 @@ TEST_FUNCTION(when_errno_for_send_returns_ECONNRESET_async_socket_send_async_ret payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); @@ -1279,10 +1275,8 @@ TEST_FUNCTION(when_errno_for_send_returns_error_async_socket_send_async_returns_ payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); @@ -1571,8 +1565,10 @@ TEST_FUNCTION(async_socket_receive_async_when_not_open_fails) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); + STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act int result = async_socket_receive_async(async_socket, payload_buffers, sizeof(payload_buffers) / sizeof(payload_buffers[0]), test_on_receive_complete, test_callback_ctx); @@ -1600,8 +1596,10 @@ TEST_FUNCTION(async_socket_receive_async_after_close_fails) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); + STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act int result = async_socket_receive_async(async_socket, payload_buffers, sizeof(payload_buffers) / sizeof(payload_buffers[0]), test_on_receive_complete, test_callback_ctx); @@ -2165,8 +2163,8 @@ TEST_FUNCTION(event_complete_func_send_EPOLLRDHUP_and_abandons_the_connection) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2237,8 +2235,8 @@ TEST_FUNCTION(event_complete_func_send_ABANDONED_and_abandons_the_connection) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2277,8 +2275,8 @@ TEST_FUNCTION(event_complete_func_send_EPOLLOUT_success) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2317,8 +2315,8 @@ TEST_FUNCTION(event_complete_func_send_EPOLLOUT_abandoned) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2358,8 +2356,8 @@ TEST_FUNCTION(event_complete_func_send_EPOLLOUT_error) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2400,8 +2398,8 @@ TEST_FUNCTION(event_complete_func_EPOLLOUT_multiple_sends_success) uint32_t payload_size = sizeof(payload_bytes) / sizeof(payload_bytes[0]); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, payload_size, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2473,8 +2471,8 @@ TEST_FUNCTION(event_complete_func_send_ERROR_and_error_the_connection) payload_buffers[0].length = sizeof(payload_bytes); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(mocked_send(IGNORED_ARG, IGNORED_ARG, IGNORED_ARG, MSG_NOSIGNAL)) .SetReturn(-1); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); @@ -2584,8 +2582,10 @@ TEST_FUNCTION(async_socket_notify_io_async_fails_when_state_is_not_OPEN) ASSERT_IS_NOT_NULL(async_socket); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); + STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act int result = async_socket_notify_io_async(async_socket, ASYNC_SOCKET_NOTIFY_IO_TYPE_IN, test_on_notify_complete, test_callback_ctx); @@ -2608,14 +2608,11 @@ TEST_FUNCTION(async_socket_notify_io_async_fails_when_alloc_context_fails) ASSERT_ARE_EQUAL(int, 0, async_socket_open_async(async_socket, test_on_open_complete, test_callback_ctx)); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)) .SetReturn(NULL); - STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act @@ -2688,16 +2685,13 @@ TEST_FUNCTION(async_socket_notify_io_async_fails_when_completion_port_add_fails) ASSERT_ARE_EQUAL(int, 0, async_socket_open_async(async_socket, test_on_open_complete, test_callback_ctx)); umock_c_reset_all_calls(); - STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)) - .CallCannotFail(); - STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_increment(IGNORED_ARG)); + STRICT_EXPECTED_CALL(interlocked_add(IGNORED_ARG, 0)); STRICT_EXPECTED_CALL(malloc(IGNORED_ARG)); STRICT_EXPECTED_CALL(completion_port_add(test_completion_port, EPOLLOUT, test_socket, IGNORED_ARG, IGNORED_ARG)) .SetReturn(-1); STRICT_EXPECTED_CALL(free(IGNORED_ARG)); - STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)) - .CallCannotFail(); + STRICT_EXPECTED_CALL(interlocked_decrement(IGNORED_ARG)); STRICT_EXPECTED_CALL(wake_by_address_single(IGNORED_ARG)); // act diff --git a/win32/devdoc/async_socket_win32.md b/win32/devdoc/async_socket_win32.md index bd779b0..eacfb86 100644 --- a/win32/devdoc/async_socket_win32.md +++ b/win32/devdoc/async_socket_win32.md @@ -6,7 +6,7 @@ ## Design -`async_socket_win32` is using the WSA Windows functions with a PTP_POOL in order to perform asynchronous socket send and receives. +`async_socket_win32` is using the WSA Windows functions with a `PTP_POOL` in order to perform asynchronous socket send and receives. `async_socket_win32` creates its own threadpool environment. The `async_socket_win32` takes ownership of the provided `SOCKET_HANDLE` and is responsible for closing it. For that reason, it is not possible to re-open the `async_socket` after closing it. This is due to the fact that the socket is created by some external event, such as accepting an incoming connection and closing the underlying socket is not reversible.