Fix AMQP transport to break infinite loop of link re-ATTACH-ments (#2472)
In iothubtransport_amqp_common.c `number_of_previous_failures` is used to track issues with the AMQP device and either restart or re-connect it as appropriately. Note: AMQP device (iothubtransport_amqp_device.c) is a subcomponent of the AMQP transport, representing each registered device. The AMQP transport can either just restart a registered AMQP device (causing just re-ATTACH-ment of AMQP links) or completely reconnect it (with END, CLOSE, OPEN, BEGIN, ...). When a failure occurs (e.g., the Twin messenger reports an error), the AMQP device goes into an error state itself and `number_of_previous_failures` (on AMQP transport layer) is incremented. If `number_of_previous_failures` is less than `MAX_NUMBER_OF_DEVICE_FAILURES`, the AMQP device is just restarted. If `number_of_previous_failures` reaches `MAX_NUMBER_OF_DEVICE_FAILURES`, the AMQP device is completely reconnected. If `number_of_previous_failures` never reaches `MAX_NUMBER_OF_DEVICE_FAILURES` (iothubtransport_amqp_common.c:1058) then the transport will never force a full reconnection, always trying to stop the registered AMQP device. When IoTHubTransport_AMQP_Common_Device_DoWork is called (for each registered device), if `send_pending_events` succeeds, `number_of_previous_failures` is set to 0. But all `send_pending_events` does is to add a message on the outgoing queue of the AMQP messenger, so it is always expected to succeed, always resetting `number_of_previous_failures`. That prevents in such case the AMQP transport from fully reconnecting with the IoT Hub in specific error scenarios - like when the Twin throttling occurs, causing outgoing Twin messages (e.g, PATCH, i.e., reported properties update) to fail. The AMQP transport keeps trapped in a loop of just trying to restablish the AMQP links in in this case.
This commit is contained in:
Родитель
7329f70906
Коммит
55c2b46a41
|
@ -157,6 +157,7 @@ typedef struct AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT_TAG
|
|||
uint32_t item_id;
|
||||
TRANSPORT_CALLBACKS_INFO transport_callbacks;
|
||||
void* transport_ctx;
|
||||
AMQP_TRANSPORT_DEVICE_INSTANCE* device;
|
||||
} AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT;
|
||||
|
||||
typedef struct AMQP_TRANSPORT_GET_TWIN_CONTEXT_TAG
|
||||
|
@ -438,6 +439,8 @@ static DEVICE_MESSAGE_DISPOSITION_RESULT on_message_received(IOTHUB_MESSAGE_HAND
|
|||
{
|
||||
device_disposition_result = DEVICE_MESSAGE_DISPOSITION_RESULT_NONE;
|
||||
}
|
||||
|
||||
amqp_device_instance->number_of_previous_failures = 0;
|
||||
}
|
||||
|
||||
return device_disposition_result;
|
||||
|
@ -468,6 +471,7 @@ static int on_method_request_received(void* context, const char* method_name, co
|
|||
}
|
||||
else
|
||||
{
|
||||
device_state->number_of_previous_failures = 0;
|
||||
result = 0;
|
||||
}
|
||||
return result;
|
||||
|
@ -516,9 +520,13 @@ static void on_device_send_twin_update_complete_callback(DEVICE_TWIN_UPDATE_RESU
|
|||
else
|
||||
{
|
||||
AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT* dev_twin_ctx = (AMQP_TRANSPORT_DEVICE_TWIN_CONTEXT*)context;
|
||||
|
||||
dev_twin_ctx->transport_callbacks.twin_rpt_state_complete_cb(dev_twin_ctx->item_id, status_code, dev_twin_ctx->transport_ctx);
|
||||
|
||||
if (status_code >= 200 && status_code < 300)
|
||||
{
|
||||
dev_twin_ctx->device->number_of_previous_failures = 0;
|
||||
}
|
||||
|
||||
free(dev_twin_ctx);
|
||||
}
|
||||
}
|
||||
|
@ -535,6 +543,8 @@ static void on_device_twin_update_received_callback(DEVICE_TWIN_UPDATE_TYPE upda
|
|||
|
||||
registered_device->transport_instance->transport_callbacks.twin_retrieve_prop_complete_cb((update_type == DEVICE_TWIN_UPDATE_TYPE_COMPLETE ? DEVICE_TWIN_UPDATE_COMPLETE : DEVICE_TWIN_UPDATE_PARTIAL),
|
||||
message, length, registered_device->transport_instance->transport_ctx);
|
||||
|
||||
registered_device->number_of_previous_failures = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -943,6 +953,7 @@ static void on_event_send_complete(IOTHUB_MESSAGE_LIST* message, D2C_EVENT_SEND_
|
|||
else
|
||||
{
|
||||
registered_device->number_of_send_event_complete_failures = 0;
|
||||
registered_device->number_of_previous_failures = 0;
|
||||
}
|
||||
|
||||
if (message->callback != NULL)
|
||||
|
@ -1080,7 +1091,6 @@ static int IoTHubTransport_AMQP_Common_Device_DoWork(AMQP_TRANSPORT_DEVICE_INSTA
|
|||
}
|
||||
else
|
||||
{
|
||||
registered_device->number_of_previous_failures = 0;
|
||||
result = RESULT_OK;
|
||||
}
|
||||
}
|
||||
|
@ -1354,6 +1364,7 @@ IOTHUB_PROCESS_ITEM_RESULT IoTHubTransport_AMQP_Common_ProcessItem(TRANSPORT_LL_
|
|||
dev_twin_ctx->transport_callbacks = transport_instance->transport_callbacks;
|
||||
dev_twin_ctx->transport_ctx = transport_instance->transport_ctx;
|
||||
dev_twin_ctx->item_id = iothub_item->device_twin->item_id;
|
||||
dev_twin_ctx->device = registered_device;
|
||||
|
||||
if (amqp_device_send_twin_update_async(
|
||||
registered_device->device_handle,
|
||||
|
|
|
@ -801,7 +801,7 @@ int amqp_device_start_async(AMQP_DEVICE_HANDLE handle, SESSION_HANDLE session_ha
|
|||
{
|
||||
instance->session_handle = session_handle;
|
||||
instance->cbs_handle = cbs_handle;
|
||||
instance->stop_delay_ms = 0;
|
||||
instance->stop_delay_ms = 0;
|
||||
|
||||
update_state(instance, DEVICE_STATE_STARTING);
|
||||
|
||||
|
|
|
@ -488,6 +488,25 @@ static DLIST_ENTRY TEST_waitingToSend;
|
|||
static unsigned long TEST_MESSAGE_ID;
|
||||
|
||||
|
||||
// ---------- Time-related Test Helpers ---------- //
|
||||
static time_t add_seconds(time_t base_time, unsigned int seconds)
|
||||
{
|
||||
time_t new_time;
|
||||
struct tm* bd_new_time;
|
||||
|
||||
if ((bd_new_time = localtime(&base_time)) == NULL)
|
||||
{
|
||||
new_time = INDEFINITE_TIME;
|
||||
}
|
||||
else
|
||||
{
|
||||
bd_new_time->tm_sec += seconds;
|
||||
new_time = mktime(bd_new_time);
|
||||
}
|
||||
|
||||
return new_time;
|
||||
}
|
||||
|
||||
// ---------- Helpers for Expected Calls ---------- //
|
||||
|
||||
// @remarks
|
||||
|
@ -4329,6 +4348,137 @@ TEST_FUNCTION(ConnectionStatusCallBack_UNAUTH_msg_communication_error)
|
|||
destroy_transport(handle, device_handle, NULL);
|
||||
}
|
||||
|
||||
TEST_FUNCTION(Transport_Fully_Reconnects_After_5_AMQP_device_errors)
|
||||
{
|
||||
// arrange
|
||||
initialize_test_variables();
|
||||
time_t current_time = TEST_current_time;
|
||||
bool is_state_change_timedout = false;
|
||||
TRANSPORT_LL_HANDLE handle = create_transport();
|
||||
IOTHUB_DEVICE_CONFIG* device_config = create_device_config(TEST_DEVICE_ID_CHAR_PTR, true);
|
||||
|
||||
IOTHUB_DEVICE_HANDLE device_handle;
|
||||
device_handle = register_device(handle, device_config, &TEST_waitingToSend, true);
|
||||
|
||||
crank_transport_ready_after_create(handle, &TEST_waitingToSend, 0, false, true, 1, TEST_current_time, false);
|
||||
|
||||
// act
|
||||
// Fail AMQP device enough times to trigger reconnection:
|
||||
umock_c_reset_all_calls();
|
||||
|
||||
for (int i = 0; i < 4; i++)
|
||||
{
|
||||
// Raise error from amqp_device level.
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
STRICT_EXPECTED_CALL(Transport_ConnectionStatusCallBack(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, IGNORED_PTR_ARG));
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_STARTED, DEVICE_STATE_ERROR_MSG);
|
||||
|
||||
// On first error, restablish the links (amqp_device_delayed_stop)
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_delayed_stop(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_ERROR_MSG, DEVICE_STATE_STOPPING);
|
||||
|
||||
// amqp_device_delayed_stop takes 10 seconds to fully stop.
|
||||
current_time = add_seconds(current_time, 10);
|
||||
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(is_timeout_reached(IGNORED_NUM_ARG, IGNORED_NUM_ARG, IGNORED_PTR_ARG))
|
||||
.CopyOutArgumentBuffer_is_timed_out(&is_state_change_timedout, sizeof(is_state_change_timedout))
|
||||
.SetReturn(0);
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
// Complete stopping amqp_device.
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
STRICT_EXPECTED_CALL(Transport_ConnectionStatusCallBack(
|
||||
IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, IGNORED_PTR_ARG));
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_STOPPING, DEVICE_STATE_STOPPED);
|
||||
|
||||
current_time = add_seconds(current_time, 1);
|
||||
|
||||
// AMQP transport starts the amqp_device again.
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_get_session_handle(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_get_cbs_handle(IGNORED_PTR_ARG, IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_start_async(IGNORED_PTR_ARG, IGNORED_PTR_ARG, IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_STOPPED, DEVICE_STATE_STARTING);
|
||||
|
||||
// amqp_device now goes fully started.
|
||||
current_time = add_seconds(current_time, 1);
|
||||
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(is_timeout_reached(IGNORED_NUM_ARG, IGNORED_NUM_ARG, IGNORED_PTR_ARG))
|
||||
.CopyOutArgumentBuffer_is_timed_out(&is_state_change_timedout, sizeof(is_state_change_timedout))
|
||||
.SetReturn(0);
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
STRICT_EXPECTED_CALL(retry_control_reset(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(Transport_ConnectionStatusCallBack(
|
||||
IOTHUB_CLIENT_CONNECTION_AUTHENTICATED, IOTHUB_CLIENT_CONNECTION_OK, IGNORED_PTR_ARG));
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_STARTING, DEVICE_STATE_STARTED);
|
||||
|
||||
// Regular DoWork call with no issues.
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(DList_IsListEmpty(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
}
|
||||
|
||||
// Raise 5th error from amqp_device, triggering AMQP transport to do a full reconnection.
|
||||
STRICT_EXPECTED_CALL(get_time(NULL)).SetReturn(current_time);
|
||||
STRICT_EXPECTED_CALL(Transport_ConnectionStatusCallBack(IOTHUB_CLIENT_CONNECTION_UNAUTHENTICATED, IOTHUB_CLIENT_CONNECTION_COMMUNICATION_ERROR, IGNORED_PTR_ARG));
|
||||
TEST_device_create_saved_on_state_changed_callback(TEST_device_create_saved_on_state_changed_context,
|
||||
DEVICE_STATE_STARTED, DEVICE_STATE_ERROR_MSG);
|
||||
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_head_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_item_get_value(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(STRING_c_str(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_device_do_work(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(singlylinkedlist_get_next_item(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(amqp_connection_do_work(IGNORED_PTR_ARG));
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
// Observe the AMQP transport do a full re-connection.
|
||||
set_expected_calls_for_prepare_for_connection_retry(1, DEVICE_STATE_ERROR_MSG);
|
||||
(void)IoTHubTransport_AMQP_Common_DoWork(handle);
|
||||
|
||||
// assert
|
||||
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
|
||||
|
||||
// cleanup
|
||||
destroy_transport(handle, device_handle, NULL);
|
||||
}
|
||||
|
||||
TEST_FUNCTION(ConnectionStatusCallBack_UNAUTH_no_network)
|
||||
{
|
||||
// arrange
|
||||
|
|
Загрузка…
Ссылка в новой задаче