diff --git a/sdk/core/azure-core-amqp/test/ut/session_tests.cpp b/sdk/core/azure-core-amqp/test/ut/session_tests.cpp index 806a126c6..9533b64f6 100644 --- a/sdk/core/azure-core-amqp/test/ut/session_tests.cpp +++ b/sdk/core/azure-core-amqp/test/ut/session_tests.cpp @@ -255,7 +255,7 @@ namespace Azure { namespace Core { namespace Amqp { namespace Tests { connection.Open(); { - constexpr const size_t sessionCount = 10; + constexpr const size_t sessionCount = 30; GTEST_LOG_(INFO) << "Opening " << sessionCount << " sessions."; std::vector sessions; for (size_t i = 0; i < sessionCount; i += 1) diff --git a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c index 7f096ff70..7feefd765 100644 --- a/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c +++ b/sdk/core/azure-core-amqp/vendor/azure-uamqp-c/src/connection.c @@ -65,8 +65,8 @@ typedef struct CHANNEL_TABLE_TABLE_TAG // When a channel is freed, the entry is set back to ENDPOINT_UNUSED_CHANNEL. // CHANNEL_TABLE_ENTRY* channels; - uint16_t capacity; - uint16_t size; + uint32_t capacity; + uint32_t size; } CHANNEL_TABLE; typedef struct CONNECTION_INSTANCE_TAG @@ -173,49 +173,64 @@ static int channel_table_allocate(CHANNEL_TABLE* channel_table, uint16_t* h) else { *h = 0; - if (channel_table->size == channel_table->capacity) { - CHANNEL_TABLE_ENTRY* new_handles = (CHANNEL_TABLE_ENTRY*)realloc( - channel_table->channels, sizeof(CHANNEL_TABLE_ENTRY) * channel_table->capacity * 2); - if (new_handles == NULL) - { - LogError("Cannot reallocate memory for handle table"); - result = MU_FAILURE; - } - else - { - channel_table->channels = new_handles; - // Ensure the newly resized handle table is empty. - for (uint16_t i = channel_table->size; i < channel_table->capacity * 2; i++) + if (channel_table->capacity == UINT16_MAX) { - channel_table->channels[i].outgoing_channel = ENDPOINT_UNUSED_CHANNEL; - channel_table->channels[i].incoming_channel = ENDPOINT_UNUSED_CHANNEL; - channel_table->channels[i].is_endpoint_live = false; + LogError("Cannot allocate more channels"); + result = MU_FAILURE; + } + else + { + uint32_t new_capacity = channel_table->capacity + INITIAL_CHANNEL_TABLE_CAPACITY; + if (channel_table->capacity >= UINT16_MAX) + { + new_capacity = UINT16_MAX; + } + CHANNEL_TABLE_ENTRY* new_channels = (CHANNEL_TABLE_ENTRY*)realloc( + channel_table->channels, sizeof(CHANNEL_TABLE_ENTRY) * new_capacity); + if (new_channels == NULL) + { + LogError("Cannot reallocate memory for handle table"); + result = MU_FAILURE; + } + else + { + channel_table->channels = new_channels; + channel_table->capacity = new_capacity; + // Ensure the newly resized part of the handle table is empty. + uint32_t i = channel_table->size; + do + { + channel_table->channels[i].outgoing_channel = ENDPOINT_UNUSED_CHANNEL; + channel_table->channels[i].incoming_channel = ENDPOINT_UNUSED_CHANNEL; + channel_table->channels[i].is_endpoint_live = false; + i++; + } while (i < channel_table->capacity); + } } - channel_table->capacity *= 2; - } } if (!result) { - uint16_t i = 0; - for (; i < channel_table->size; i++) - { - if (channel_table->channels[i].outgoing_channel == ENDPOINT_UNUSED_CHANNEL) + // Look for an empty slot in the table. If none is found, add to the end of the table. + uint32_t i = 0; + for (; i < channel_table->size; i++) { - *h = i; - channel_table->channels[i].outgoing_channel = i; - break; + if (channel_table->channels[i].outgoing_channel == ENDPOINT_UNUSED_CHANNEL) + { + break; + } } - } - // If we didn't find a hole, add this to the end of the table. - if (i == channel_table->size) - { - *h = channel_table->size; - channel_table->channels[channel_table->size].outgoing_channel = channel_table->size; - channel_table->channels[channel_table->size].is_endpoint_live = true; - channel_table->size++; - } + // If we didn't find a hole, we need to increase the size of the table by 1. + if (i == channel_table->size) + { + channel_table->size++; + } + + *h = (uint16_t)i; + channel_table->channels[i].outgoing_channel = (uint16_t)i; + channel_table->channels[i].is_endpoint_live = true; + } } @@ -305,7 +320,7 @@ static int channel_table_find_outgoing_channel_from_incoming_channel(CHANNEL_TAB } if (i == channel_table->size) { - LogError("Could not find incoming channel %d", incoming_channel); + LogError("Could not find incoming channel %hu", incoming_channel); result = MU_FAILURE; } @@ -468,7 +483,7 @@ static void log_incoming_frame(uint16_t channel, AMQP_VALUE performative) else { char* performative_as_string; - LOG(AZ_LOG_TRACE, 0, "%d:", channel) + LOG(AZ_LOG_TRACE, 0, "%hu:", channel) LOG(AZ_LOG_TRACE, 0, "<- "); LOG(AZ_LOG_TRACE, 0, "%s", (char*)get_frame_type_as_string(descriptor)); performative_as_string = NULL; @@ -494,7 +509,7 @@ static void log_outgoing_frame(uint16_t channel, AMQP_VALUE performative) else { char* performative_as_string; - LOG(AZ_LOG_TRACE, 0, "%d:", channel) + LOG(AZ_LOG_TRACE, 0, "%hu:", channel) LOG(AZ_LOG_TRACE, 0, "-> "); LOG(AZ_LOG_TRACE, 0, "%s", (char*)get_frame_type_as_string(descriptor)); performative_as_string = NULL; @@ -1290,7 +1305,7 @@ static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE p ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel); if (session_endpoint == NULL) { - LogError("Cannot find session endpoint corresponding to remote channel %d", remote_channel); + LogError("Cannot find session endpoint corresponding to remote channel %hu", remote_channel); } else {