Add a queue to iothub_messaging (only for longhaul, deprecated for publish use) (#2579)
* Partial changes to add a queue to iothub_messaging (only for longhaul, deprecated for publish use) * Final changes to add a queue to iothub_messaging (only for longhaul, deprecated for publish use) * Remove references to umock_c_negative_tests_get_call_description * Address code review comments * Fix build break * Fix another build break * Fix valgrind issue in iothub_messaging_ll.c * Fix build break
This commit is contained in:
Родитель
4c23991f90
Коммит
9cf2994f42
|
@ -25,7 +25,8 @@ extern "C"
|
|||
IOTHUB_MESSAGE_OK, \
|
||||
IOTHUB_MESSAGE_INVALID_ARG, \
|
||||
IOTHUB_MESSAGE_INVALID_TYPE, \
|
||||
IOTHUB_MESSAGE_ERROR \
|
||||
IOTHUB_MESSAGE_ERROR, \
|
||||
IOTHUB_MESSAGE_BECAUSE_DESTROY
|
||||
|
||||
/** @brief Enumeration specifying the status of calls to various
|
||||
* APIs in this module.
|
||||
|
|
|
@ -101,6 +101,16 @@ MOCKABLE_FUNCTION(, IOTHUB_MESSAGING_RESULT, IoTHubMessaging_SetFeedbackMessageC
|
|||
*/
|
||||
MOCKABLE_FUNCTION(, IOTHUB_MESSAGING_RESULT, IoTHubMessaging_SetTrustedCert, IOTHUB_MESSAGING_CLIENT_HANDLE, messagingClientHandle, const char*, trusted_cert);
|
||||
|
||||
/**
|
||||
* @brief Sets the maximum number of in-flight cloud-to-device messages being sent to Azure IoT Hub.
|
||||
*
|
||||
* @param messagingHandle The handle created by a call to the create function.
|
||||
* @param maxQueueSize The maximum number of messages in the send queue.
|
||||
*
|
||||
* @return IOTHUB_CLIENT_OK upon success or an error code upon failure.
|
||||
*/
|
||||
MOCKABLE_FUNCTION(, IOTHUB_MESSAGING_RESULT, IoTHubMessaging_SetMaxSendQueueSize, IOTHUB_MESSAGING_CLIENT_HANDLE, messagingClientHandle, size_t, maxQueueSize);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -43,7 +43,9 @@ MU_DEFINE_ENUM_WITHOUT_INVALID(IOTHUB_MESSAGE_SEND_STATE, IOTHUB_MESSAGE_SEND_ST
|
|||
IOTHUB_MESSAGING_ERROR, \
|
||||
IOTHUB_MESSAGING_INVALID_JSON, \
|
||||
IOTHUB_MESSAGING_DEVICE_EXIST, \
|
||||
IOTHUB_MESSAGING_CALLBACK_NOT_SET \
|
||||
IOTHUB_MESSAGING_CALLBACK_NOT_SET, \
|
||||
IOTHUB_MESSAGING_QUEUE_FULL, \
|
||||
IOTHUB_MESSAGING_BECAUSE_DESTROY
|
||||
|
||||
MU_DEFINE_ENUM_WITHOUT_INVALID(IOTHUB_MESSAGING_RESULT, IOTHUB_MESSAGING_RESULT_VALUES);
|
||||
|
||||
|
@ -166,6 +168,16 @@ MOCKABLE_FUNCTION(, void, IoTHubMessaging_LL_DoWork, IOTHUB_MESSAGING_HANDLE, me
|
|||
*/
|
||||
MOCKABLE_FUNCTION(, IOTHUB_MESSAGING_RESULT, IoTHubMessaging_LL_SetTrustedCert, IOTHUB_MESSAGING_HANDLE, messagingHandle, const char*, trusted_cert);
|
||||
|
||||
/**
|
||||
* @brief Sets the maximum number of in-flight cloud-to-device messages being sent to Azure IoT Hub.
|
||||
*
|
||||
* @param messagingHandle The handle created by a call to the create function.
|
||||
* @param maxQueueSize The maximum number of messages in the send queue.
|
||||
*
|
||||
* @return IOTHUB_CLIENT_OK upon success or an error code upon failure.
|
||||
*/
|
||||
MOCKABLE_FUNCTION(, IOTHUB_MESSAGING_RESULT, IoTHubMessaging_LL_SetMaxSendQueueSize, IOTHUB_MESSAGING_HANDLE, messagingHandle, size_t, maxQueueSize);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -308,3 +308,30 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_SetTrustedCert(IOTHUB_MESSAGING_CLIENT_H
|
|||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
IOTHUB_MESSAGING_RESULT IoTHubMessaging_SetMaxSendQueueSize(IOTHUB_MESSAGING_CLIENT_HANDLE messagingClientHandle, size_t maxQueueSize)
|
||||
{
|
||||
IOTHUB_MESSAGING_RESULT result;
|
||||
|
||||
if (messagingClientHandle == NULL)
|
||||
{
|
||||
LogError("NULL messagingClientHandle");
|
||||
result = IOTHUB_MESSAGING_INVALID_ARG;
|
||||
}
|
||||
else
|
||||
{
|
||||
IOTHUB_MESSAGING_CLIENT_INSTANCE* iotHubMessagingClientInstance = (IOTHUB_MESSAGING_CLIENT_INSTANCE*)messagingClientHandle;
|
||||
|
||||
if (Lock(iotHubMessagingClientInstance->LockHandle) != LOCK_OK)
|
||||
{
|
||||
LogError("Could not acquire lock");
|
||||
result = IOTHUB_MESSAGING_ERROR;
|
||||
}
|
||||
else
|
||||
{
|
||||
result = IoTHubMessaging_LL_SetMaxSendQueueSize(iotHubMessagingClientInstance->IoTHubMessagingHandle, maxQueueSize);
|
||||
(void)Unlock(iotHubMessagingClientInstance->LockHandle);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include "azure_c_shared_utility/platform.h"
|
||||
#include "azure_c_shared_utility/sastoken.h"
|
||||
#include "azure_c_shared_utility/shared_util_options.h"
|
||||
#include "azure_c_shared_utility/singlylinkedlist.h"
|
||||
|
||||
#include "azure_uamqp_c/connection.h"
|
||||
#include "azure_uamqp_c/message_receiver.h"
|
||||
|
@ -27,6 +28,7 @@
|
|||
#include "iothub_sc_version.h"
|
||||
|
||||
#define SIZE_OF_PERCENT_S_IN_FMT_STRING 2
|
||||
#define DEFAULT_MAX_SEND_QUEUE_SIZE SIZE_MAX
|
||||
|
||||
MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(IOTHUB_FEEDBACK_STATUS_CODE, IOTHUB_FEEDBACK_STATUS_CODE_VALUES);
|
||||
MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(IOTHUB_MESSAGE_SEND_STATE, IOTHUB_MESSAGE_SEND_STATE_VALUES);
|
||||
|
@ -35,13 +37,19 @@ MU_DEFINE_ENUM_STRINGS_WITHOUT_INVALID(IOTHUB_MESSAGING_RESULT, IOTHUB_MESSAGING
|
|||
typedef struct CALLBACK_DATA_TAG
|
||||
{
|
||||
IOTHUB_OPEN_COMPLETE_CALLBACK openCompleteCompleteCallback;
|
||||
IOTHUB_SEND_COMPLETE_CALLBACK sendCompleteCallback;
|
||||
IOTHUB_FEEDBACK_MESSAGE_RECEIVED_CALLBACK feedbackMessageCallback;
|
||||
void* openUserContext;
|
||||
void* sendUserContext;
|
||||
IOTHUB_FEEDBACK_MESSAGE_RECEIVED_CALLBACK feedbackMessageCallback;
|
||||
void* feedbackUserContext;
|
||||
} CALLBACK_DATA;
|
||||
|
||||
typedef struct SEND_CALLBACK_DATA_TAG
|
||||
{
|
||||
size_t id;
|
||||
IOTHUB_MESSAGING_HANDLE messagingHandle;
|
||||
IOTHUB_SEND_COMPLETE_CALLBACK callback;
|
||||
void* userContext;
|
||||
} SEND_CALLBACK_DATA;
|
||||
|
||||
typedef struct IOTHUB_MESSAGING_TAG
|
||||
{
|
||||
int isOpened;
|
||||
|
@ -66,8 +74,12 @@ typedef struct IOTHUB_MESSAGING_TAG
|
|||
MESSAGE_SENDER_STATE message_sender_state;
|
||||
MESSAGE_RECEIVER_STATE message_receiver_state;
|
||||
|
||||
CALLBACK_DATA* callback_data;
|
||||
CALLBACK_DATA callback_data;
|
||||
|
||||
SINGLYLINKEDLIST_HANDLE send_callback_data;
|
||||
size_t max_send_queue_size;
|
||||
size_t send_queue_size;
|
||||
size_t next_send_data_id;
|
||||
} IOTHUB_MESSAGING;
|
||||
|
||||
|
||||
|
@ -79,6 +91,112 @@ static const char* const FEEDBACK_RECORD_KEY_ORIGINAL_MESSAGE_ID = "originalMess
|
|||
static const char* const AMQP_ADDRESS_PATH_FMT = "/devices/%s/messages/deviceBound";
|
||||
static const char* const AMQP_ADDRESS_PATH_MODULE_FMT = "/devices/%s/modules/%s/messages/deviceBound";
|
||||
|
||||
static inline bool is_send_queue_full(IOTHUB_MESSAGING_HANDLE messagingHandle)
|
||||
{
|
||||
return (messagingHandle->send_queue_size >= messagingHandle->max_send_queue_size);
|
||||
}
|
||||
|
||||
static SEND_CALLBACK_DATA* enqueue_send_callback_data(IOTHUB_MESSAGING_HANDLE messagingHandle, IOTHUB_SEND_COMPLETE_CALLBACK sendCompleteCallback, void* userContext)
|
||||
{
|
||||
SEND_CALLBACK_DATA* result = NULL;
|
||||
|
||||
if (is_send_queue_full(messagingHandle))
|
||||
{
|
||||
LogError("Send queue is full");
|
||||
}
|
||||
else
|
||||
{
|
||||
result = malloc(sizeof(SEND_CALLBACK_DATA));
|
||||
|
||||
if (result == NULL)
|
||||
{
|
||||
LogError("Failed allocating SEND_CALLBACK_DATA");
|
||||
}
|
||||
else
|
||||
{
|
||||
// There is a possible type overflow here in next_send_data_id, which could be
|
||||
// bad if another SEND_CALLBACK_DATA is already in the queue with the same id.
|
||||
// However that is extremely unlikely, since there would need to be SIZE_MAX + 1
|
||||
// items in the send_callback_data list.
|
||||
result->id = messagingHandle->next_send_data_id++;
|
||||
result->messagingHandle = messagingHandle;
|
||||
result->callback = sendCompleteCallback;
|
||||
result->userContext = userContext;
|
||||
|
||||
if (singlylinkedlist_add(messagingHandle->send_callback_data, result) == NULL)
|
||||
{
|
||||
LogError("Failed adding context to send_callback_data");
|
||||
free(result);
|
||||
result = NULL;
|
||||
}
|
||||
else
|
||||
{
|
||||
messagingHandle->send_queue_size++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static bool remove_single_send_data_condition_function(const void* item, const void* match_context, bool* continue_processing)
|
||||
{
|
||||
SEND_CALLBACK_DATA* current_send_data = (SEND_CALLBACK_DATA*)item;
|
||||
SEND_CALLBACK_DATA* reference_send_data = (SEND_CALLBACK_DATA*)match_context;
|
||||
|
||||
if (current_send_data->id == reference_send_data->id)
|
||||
{
|
||||
*continue_processing = false;
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
*continue_processing = true;
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
static void dequeue_send_callback_data(SEND_CALLBACK_DATA* send_data)
|
||||
{
|
||||
IOTHUB_MESSAGING_HANDLE messagingHandle = send_data->messagingHandle;
|
||||
|
||||
if (singlylinkedlist_remove_if(messagingHandle->send_callback_data, remove_single_send_data_condition_function, send_data) != 0)
|
||||
{
|
||||
LogError("Failed dequeueing send_data, not found in list.");
|
||||
}
|
||||
else
|
||||
{
|
||||
free(send_data);
|
||||
messagingHandle->send_queue_size--;
|
||||
}
|
||||
}
|
||||
|
||||
static bool remove_all_send_data_condition_function(const void* item, const void* match_context, bool* continue_processing)
|
||||
{
|
||||
SEND_CALLBACK_DATA* send_data = (SEND_CALLBACK_DATA*)item;
|
||||
IOTHUB_MESSAGING_RESULT messagingResult = *(IOTHUB_MESSAGING_RESULT*)match_context;
|
||||
|
||||
if (send_data->callback != NULL)
|
||||
{
|
||||
send_data->callback(send_data->userContext, messagingResult);
|
||||
}
|
||||
|
||||
free(send_data);
|
||||
|
||||
*continue_processing = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
static void dequeue_all_send_callback_data(IOTHUB_MESSAGING_HANDLE messagingHandle, IOTHUB_MESSAGING_RESULT messagingResult)
|
||||
{
|
||||
if (singlylinkedlist_remove_if(messagingHandle->send_callback_data, remove_all_send_data_condition_function, &messagingResult) != 0)
|
||||
{
|
||||
LogError("Failed dequeueing all send_data.");
|
||||
}
|
||||
|
||||
messagingHandle->send_queue_size = 0;
|
||||
}
|
||||
|
||||
static int setMessageId(IOTHUB_MESSAGE_HANDLE iothub_message_handle, PROPERTIES_HANDLE uamqp_message_properties)
|
||||
{
|
||||
int result;
|
||||
|
@ -456,7 +574,7 @@ static char* createReceiveTargetAddress(IOTHUB_MESSAGING_HANDLE messagingHandle)
|
|||
char* buffer = NULL;
|
||||
if (messagingHandle->hostname == NULL)
|
||||
{
|
||||
LogError("createSendTargetAddress failed - hostname cannot be NULL");
|
||||
LogError("createReceiveTargetAddress failed - hostname cannot be NULL");
|
||||
result = NULL;
|
||||
}
|
||||
else
|
||||
|
@ -570,9 +688,9 @@ static void IoTHubMessaging_LL_SenderStateChanged(void* context, MESSAGE_SENDER_
|
|||
if ((messagingData->message_sender_state == MESSAGE_SENDER_STATE_OPEN) && (messagingData->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
|
||||
{
|
||||
messagingData->isOpened = true;
|
||||
if (messagingData->callback_data->openCompleteCompleteCallback != NULL)
|
||||
if (messagingData->callback_data.openCompleteCompleteCallback != NULL)
|
||||
{
|
||||
(messagingData->callback_data->openCompleteCompleteCallback)(messagingData->callback_data->openUserContext);
|
||||
(messagingData->callback_data.openCompleteCompleteCallback)(messagingData->callback_data.openUserContext);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -593,9 +711,9 @@ static void IoTHubMessaging_LL_ReceiverStateChanged(const void* context, MESSAGE
|
|||
if ((messagingData->message_sender_state == MESSAGE_SENDER_STATE_OPEN) && (messagingData->message_receiver_state == MESSAGE_RECEIVER_STATE_OPEN))
|
||||
{
|
||||
messagingData->isOpened = true;
|
||||
if (messagingData->callback_data->openCompleteCompleteCallback != NULL)
|
||||
if (messagingData->callback_data.openCompleteCompleteCallback != NULL)
|
||||
{
|
||||
(messagingData->callback_data->openCompleteCompleteCallback)(messagingData->callback_data->openUserContext);
|
||||
(messagingData->callback_data.openCompleteCompleteCallback)(messagingData->callback_data.openUserContext);
|
||||
}
|
||||
}
|
||||
else
|
||||
|
@ -610,10 +728,11 @@ static void IoTHubMessaging_LL_SendMessageComplete(void* context, MESSAGE_SEND_R
|
|||
(void)delivery_state;
|
||||
if (context != NULL)
|
||||
{
|
||||
IOTHUB_MESSAGING* messagingData = (IOTHUB_MESSAGING*)context;
|
||||
if (messagingData->callback_data->sendCompleteCallback != NULL)
|
||||
SEND_CALLBACK_DATA* send_data = (SEND_CALLBACK_DATA*)context;
|
||||
|
||||
if (send_data->callback != NULL)
|
||||
{
|
||||
// Convert a send result to an
|
||||
// Convert a MESSAGE_SEND_RESULT to an IOTHUB_MESSAGING_RESULT.
|
||||
IOTHUB_MESSAGING_RESULT msg_result;
|
||||
switch (send_result)
|
||||
{
|
||||
|
@ -627,8 +746,11 @@ static void IoTHubMessaging_LL_SendMessageComplete(void* context, MESSAGE_SEND_R
|
|||
msg_result = IOTHUB_MESSAGING_ERROR;
|
||||
break;
|
||||
}
|
||||
(messagingData->callback_data->sendCompleteCallback)(messagingData->callback_data->sendUserContext, msg_result);
|
||||
|
||||
send_data->callback(send_data->userContext, msg_result);
|
||||
}
|
||||
|
||||
dequeue_send_callback_data(send_data);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -645,6 +767,9 @@ static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context
|
|||
IOTHUB_MESSAGING* messagingData = (IOTHUB_MESSAGING*)context;
|
||||
|
||||
BINARY_DATA binary_data;
|
||||
binary_data.bytes = NULL;
|
||||
binary_data.length = 0;
|
||||
|
||||
JSON_Value* root_value = NULL;
|
||||
JSON_Object* feedback_object = NULL;
|
||||
JSON_Array* feedback_array = NULL;
|
||||
|
@ -675,7 +800,7 @@ static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context
|
|||
|
||||
if ((feedbackBatch = (IOTHUB_SERVICE_FEEDBACK_BATCH*)malloc(sizeof(IOTHUB_SERVICE_FEEDBACK_BATCH))) == NULL)
|
||||
{
|
||||
LogError("json_parse_string failed");
|
||||
LogError("Failed allocating IOTHUB_SERVICE_FEEDBACK_BATCH");
|
||||
result = messaging_delivery_rejected("Rejected due to failure reading AMQP message", "Failed to allocate memory for feedback batch");
|
||||
}
|
||||
else
|
||||
|
@ -771,9 +896,9 @@ static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context
|
|||
}
|
||||
else
|
||||
{
|
||||
if (messagingData->callback_data->feedbackMessageCallback != NULL)
|
||||
if (messagingData->callback_data.feedbackMessageCallback != NULL)
|
||||
{
|
||||
(messagingData->callback_data->feedbackMessageCallback)(messagingData->callback_data->feedbackUserContext, feedbackBatch);
|
||||
(messagingData->callback_data.feedbackMessageCallback)(messagingData->callback_data.feedbackUserContext, feedbackBatch);
|
||||
}
|
||||
result = messaging_delivery_accepted();
|
||||
}
|
||||
|
@ -799,7 +924,6 @@ static AMQP_VALUE IoTHubMessaging_LL_FeedbackMessageReceived(const void* context
|
|||
IOTHUB_MESSAGING_HANDLE IoTHubMessaging_LL_Create(IOTHUB_SERVICE_CLIENT_AUTH_HANDLE serviceClientHandle)
|
||||
{
|
||||
IOTHUB_MESSAGING_HANDLE result;
|
||||
CALLBACK_DATA* callback_data;
|
||||
|
||||
if (serviceClientHandle == NULL)
|
||||
{
|
||||
|
@ -837,7 +961,7 @@ IOTHUB_MESSAGING_HANDLE IoTHubMessaging_LL_Create(IOTHUB_SERVICE_CLIENT_AUTH_HAN
|
|||
}
|
||||
else if ((result = (IOTHUB_MESSAGING*)malloc(sizeof(IOTHUB_MESSAGING))) == NULL)
|
||||
{
|
||||
LogError("Malloc failed for IOTHUB_REGISTRYMANAGER");
|
||||
LogError("Malloc failed for IOTHUB_MESSAGING");
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -883,9 +1007,9 @@ IOTHUB_MESSAGING_HANDLE IoTHubMessaging_LL_Create(IOTHUB_SERVICE_CLIENT_AUTH_HAN
|
|||
free(result);
|
||||
result = NULL;
|
||||
}
|
||||
else if ((callback_data = (CALLBACK_DATA*)malloc(sizeof(CALLBACK_DATA))) == NULL)
|
||||
else if ((result->send_callback_data = singlylinkedlist_create()) == NULL)
|
||||
{
|
||||
LogError("Malloc failed for callback_data");
|
||||
LogError("Failed creating send_callback_data");
|
||||
free(result->hostname);
|
||||
free(result->iothubName);
|
||||
free(result->iothubSuffix);
|
||||
|
@ -896,14 +1020,7 @@ IOTHUB_MESSAGING_HANDLE IoTHubMessaging_LL_Create(IOTHUB_SERVICE_CLIENT_AUTH_HAN
|
|||
}
|
||||
else
|
||||
{
|
||||
callback_data->openCompleteCompleteCallback = NULL;
|
||||
callback_data->sendCompleteCallback = NULL;
|
||||
callback_data->feedbackMessageCallback = NULL;
|
||||
callback_data->openUserContext = NULL;
|
||||
callback_data->sendUserContext = NULL;
|
||||
callback_data->feedbackUserContext = NULL;
|
||||
|
||||
result->callback_data = callback_data;
|
||||
result->max_send_queue_size = DEFAULT_MAX_SEND_QUEUE_SIZE;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -916,7 +1033,9 @@ void IoTHubMessaging_LL_Destroy(IOTHUB_MESSAGING_HANDLE messagingHandle)
|
|||
{
|
||||
IOTHUB_MESSAGING* messHandle = (IOTHUB_MESSAGING*)messagingHandle;
|
||||
|
||||
free(messHandle->callback_data);
|
||||
dequeue_all_send_callback_data(messagingHandle, IOTHUB_MESSAGING_BECAUSE_DESTROY);
|
||||
|
||||
singlylinkedlist_destroy(messagingHandle->send_callback_data);
|
||||
free(messHandle->hostname);
|
||||
free(messHandle->iothubName);
|
||||
free(messHandle->iothubSuffix);
|
||||
|
@ -1078,8 +1197,8 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_Open(IOTHUB_MESSAGING_HANDLE messagin
|
|||
}
|
||||
else
|
||||
{
|
||||
messagingHandle->callback_data->openCompleteCompleteCallback = openCompleteCallback;
|
||||
messagingHandle->callback_data->openUserContext = userContextCallback;
|
||||
messagingHandle->callback_data.openCompleteCompleteCallback = openCompleteCallback;
|
||||
messagingHandle->callback_data.openUserContext = userContextCallback;
|
||||
|
||||
sasl_io_config.sasl_mechanism = messagingHandle->sasl_mechanism_handle;
|
||||
sasl_io_config.underlying_io = messagingHandle->tls_io;
|
||||
|
@ -1306,14 +1425,13 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_SetFeedbackMessageCallback(IOTHUB_MES
|
|||
}
|
||||
else
|
||||
{
|
||||
messagingHandle->callback_data->feedbackMessageCallback = feedbackMessageReceivedCallback;
|
||||
messagingHandle->callback_data->feedbackUserContext = userContextCallback;
|
||||
messagingHandle->callback_data.feedbackMessageCallback = feedbackMessageReceivedCallback;
|
||||
messagingHandle->callback_data.feedbackUserContext = userContextCallback;
|
||||
result = IOTHUB_MESSAGING_OK;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_Send(IOTHUB_MESSAGING_HANDLE messagingHandle, const char* deviceId, IOTHUB_MESSAGE_HANDLE message, IOTHUB_SEND_COMPLETE_CALLBACK sendCompleteCallback, void* userContextCallback)
|
||||
{
|
||||
IOTHUB_MESSAGING_RESULT result;
|
||||
|
@ -1343,6 +1461,11 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_Send(IOTHUB_MESSAGING_HANDLE messagin
|
|||
LogError("Messaging is not opened - call IoTHubMessaging_LL_Open to open");
|
||||
result = IOTHUB_MESSAGING_ERROR;
|
||||
}
|
||||
else if (is_send_queue_full(messagingHandle))
|
||||
{
|
||||
LogError("Send queue is full");
|
||||
result = IOTHUB_MESSAGING_QUEUE_FULL;
|
||||
}
|
||||
else if ((deviceDestinationString = createDeviceDestinationString(deviceId, moduleId)) == NULL)
|
||||
{
|
||||
LogError("Could not create a message.");
|
||||
|
@ -1400,12 +1523,17 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_Send(IOTHUB_MESSAGING_HANDLE messagin
|
|||
}
|
||||
else
|
||||
{
|
||||
messagingHandle->callback_data->sendCompleteCallback = sendCompleteCallback;
|
||||
messagingHandle->callback_data->sendUserContext = userContextCallback;
|
||||
SEND_CALLBACK_DATA* send_data = enqueue_send_callback_data(messagingHandle, sendCompleteCallback, userContextCallback);
|
||||
|
||||
if (messagesender_send_async(messagingHandle->message_sender, amqpMessage, IoTHubMessaging_LL_SendMessageComplete, messagingHandle, 0) == NULL)
|
||||
if (send_data == NULL)
|
||||
{
|
||||
LogError("Could not set outgoing window.");
|
||||
LogError("Failed enqueueing message.");
|
||||
result = IOTHUB_MESSAGING_ERROR;
|
||||
}
|
||||
else if (messagesender_send_async(messagingHandle->message_sender, amqpMessage, IoTHubMessaging_LL_SendMessageComplete, send_data, 0) == NULL)
|
||||
{
|
||||
LogError("messagesender_send_async failed.");
|
||||
dequeue_send_callback_data(send_data);
|
||||
result = IOTHUB_MESSAGING_ERROR;
|
||||
}
|
||||
else
|
||||
|
@ -1435,6 +1563,7 @@ void IoTHubMessaging_LL_DoWork(IOTHUB_MESSAGING_HANDLE messagingHandle)
|
|||
IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_SetTrustedCert(IOTHUB_MESSAGING_HANDLE messagingHandle, const char* trusted_cert)
|
||||
{
|
||||
IOTHUB_MESSAGING_RESULT result;
|
||||
|
||||
if (messagingHandle == NULL || trusted_cert == NULL)
|
||||
{
|
||||
LogError("Invalid argument messagingHandle: %p trusted_cert: %p", messagingHandle, trusted_cert);
|
||||
|
@ -1442,7 +1571,7 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_SetTrustedCert(IOTHUB_MESSAGING_HANDL
|
|||
}
|
||||
else
|
||||
{
|
||||
char* temp_cert;
|
||||
char* temp_cert = NULL;
|
||||
if (mallocAndStrcpy_s(&temp_cert, trusted_cert) != 0)
|
||||
{
|
||||
result = IOTHUB_MESSAGING_ERROR;
|
||||
|
@ -1460,3 +1589,19 @@ IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_SetTrustedCert(IOTHUB_MESSAGING_HANDL
|
|||
return result;
|
||||
}
|
||||
|
||||
IOTHUB_MESSAGING_RESULT IoTHubMessaging_LL_SetMaxSendQueueSize(IOTHUB_MESSAGING_HANDLE messagingHandle, size_t maxQueueSize)
|
||||
{
|
||||
IOTHUB_MESSAGING_RESULT result;
|
||||
if (messagingHandle == NULL || maxQueueSize == 0)
|
||||
{
|
||||
LogError("Invalid argument messagingHandle: %p maxQueueSize: %zu", messagingHandle, maxQueueSize);
|
||||
result = IOTHUB_MESSAGING_INVALID_ARG;
|
||||
}
|
||||
else
|
||||
{
|
||||
messagingHandle->max_send_queue_size = maxQueueSize;
|
||||
result = IOTHUB_MESSAGING_OK;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -849,4 +849,26 @@ TEST_FUNCTION(IoTHubMessaging_SetTrustedCert_Lock_fails)
|
|||
IoTHubMessaging_Destroy(messagingClientHandle);
|
||||
}
|
||||
|
||||
TEST_FUNCTION(IoTHubMessaging_SetMaxSendQueueSize_success)
|
||||
{
|
||||
// arrange
|
||||
IOTHUB_MESSAGING_CLIENT_HANDLE messagingClientHandle = IoTHubMessaging_Create(TEST_IOTHUB_SERVICE_CLIENT_AUTH_HANDLE);
|
||||
umock_c_reset_all_calls();
|
||||
|
||||
STRICT_EXPECTED_CALL(Lock(IGNORED_PTR_ARG));
|
||||
STRICT_EXPECTED_CALL(IoTHubMessaging_LL_SetMaxSendQueueSize(IGNORED_PTR_ARG, IGNORED_NUM_ARG));
|
||||
STRICT_EXPECTED_CALL(Unlock(IGNORED_PTR_ARG));
|
||||
|
||||
// act
|
||||
IOTHUB_MESSAGING_RESULT result;
|
||||
result = IoTHubMessaging_SetMaxSendQueueSize(messagingClientHandle, 100);
|
||||
|
||||
// assert
|
||||
ASSERT_ARE_EQUAL(char_ptr, umock_c_get_expected_calls(), umock_c_get_actual_calls());
|
||||
ASSERT_ARE_EQUAL(int, IOTHUB_MESSAGING_OK, result);
|
||||
|
||||
// cleanup
|
||||
IoTHubMessaging_Destroy(messagingClientHandle);
|
||||
}
|
||||
|
||||
END_TEST_SUITE(iothub_messaging_ut)
|
||||
|
|
Загрузка…
Ссылка в новой задаче