azure-iot-edge-c-module/main.c

186 строки
6.2 KiB
C

// Copyright (c) Microsoft. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
#include <stdio.h>
#include <stdlib.h>
#include "iothub_module_client_ll.h"
#include "iothub_client_options.h"
#include "iothub_message.h"
#include "azure_c_shared_utility/threadapi.h"
#include "azure_c_shared_utility/crt_abstractions.h"
#include "azure_c_shared_utility/platform.h"
#include "azure_c_shared_utility/shared_util_options.h"
#include "iothubtransportmqtt.h"
#include "iothub.h"
#include "time.h"
typedef struct MESSAGE_INSTANCE_TAG
{
IOTHUB_MESSAGE_HANDLE messageHandle;
size_t messageTrackingId; // For tracking the messages within the user callback.
}
MESSAGE_INSTANCE;
size_t messagesReceivedByInput1Queue = 0;
// SendConfirmationCallback is invoked when the message that was forwarded on from 'InputQueue1Callback'
// pipeline function is confirmed.
static void SendConfirmationCallback(IOTHUB_CLIENT_CONFIRMATION_RESULT result, void* userContextCallback)
{
// The context corresponds to which message# we were at when we sent.
MESSAGE_INSTANCE* messageInstance = (MESSAGE_INSTANCE*)userContextCallback;
printf("Confirmation[%zu] received for message with result = %d\r\n", messageInstance->messageTrackingId, result);
IoTHubMessage_Destroy(messageInstance->messageHandle);
free(messageInstance);
}
// Allocates a context for callback and clones the message
// NOTE: The message MUST be cloned at this stage. InputQueue1Callback's caller always frees the message
// so we need to pass down a new copy.
static MESSAGE_INSTANCE* CreateMessageInstance(IOTHUB_MESSAGE_HANDLE message)
{
MESSAGE_INSTANCE* messageInstance = (MESSAGE_INSTANCE*)malloc(sizeof(MESSAGE_INSTANCE));
if (NULL == messageInstance)
{
printf("Failed allocating 'MESSAGE_INSTANCE' for pipelined message\r\n");
}
else
{
memset(messageInstance, 0, sizeof(*messageInstance));
if ((messageInstance->messageHandle = IoTHubMessage_Clone(message)) == NULL)
{
free(messageInstance);
messageInstance = NULL;
}
else
{
messageInstance->messageTrackingId = messagesReceivedByInput1Queue;
}
}
return messageInstance;
}
static IOTHUBMESSAGE_DISPOSITION_RESULT InputQueue1Callback(IOTHUB_MESSAGE_HANDLE message, void* userContextCallback)
{
IOTHUBMESSAGE_DISPOSITION_RESULT result;
IOTHUB_CLIENT_RESULT clientResult;
IOTHUB_MODULE_CLIENT_LL_HANDLE iotHubModuleClientHandle = (IOTHUB_MODULE_CLIENT_LL_HANDLE)userContextCallback;
unsigned const char* messageBody;
size_t contentSize;
if (IoTHubMessage_GetByteArray(message, &messageBody, &contentSize) != IOTHUB_MESSAGE_OK)
{
messageBody = "<null>";
}
printf("Received Message [%zu]\r\n Data: [%s]\r\n",
messagesReceivedByInput1Queue, messageBody);
// This message should be sent to next stop in the pipeline, namely "output1". What happens at "outpu1" is determined
// by the configuration of the Edge routing table setup.
MESSAGE_INSTANCE *messageInstance = CreateMessageInstance(message);
if (NULL == messageInstance)
{
result = IOTHUBMESSAGE_ABANDONED;
}
else
{
printf("Sending message (%zu) to the next stage in pipeline\n", messagesReceivedByInput1Queue);
clientResult = IoTHubModuleClient_LL_SendEventToOutputAsync(iotHubModuleClientHandle, messageInstance->messageHandle, "output1", SendConfirmationCallback, (void *)messageInstance);
if (clientResult != IOTHUB_CLIENT_OK)
{
IoTHubMessage_Destroy(messageInstance->messageHandle);
free(messageInstance);
printf("IoTHubModuleClient_LL_SendEventToOutputAsync failed on sending msg#=%zu, err=%d\n", messagesReceivedByInput1Queue, clientResult);
result = IOTHUBMESSAGE_ABANDONED;
}
else
{
result = IOTHUBMESSAGE_ACCEPTED;
}
}
messagesReceivedByInput1Queue++;
return result;
}
static IOTHUB_MODULE_CLIENT_LL_HANDLE InitializeConnection()
{
IOTHUB_MODULE_CLIENT_LL_HANDLE iotHubModuleClientHandle;
if (IoTHub_Init() != 0)
{
printf("Failed to initialize the platform.\r\n");
iotHubModuleClientHandle = NULL;
}
else if ((iotHubModuleClientHandle = IoTHubModuleClient_LL_CreateFromEnvironment(MQTT_Protocol)) == NULL)
{
printf("ERROR: IoTHubModuleClient_LL_CreateFromEnvironment failed\r\n");
}
else
{
// Uncomment the following lines to enable verbose logging.
// bool traceOn = true;
// IoTHubModuleClient_LL_SetOption(iotHubModuleClientHandle, OPTION_LOG_TRACE, &trace);
}
return iotHubModuleClientHandle;
}
static void DeInitializeConnection(IOTHUB_MODULE_CLIENT_LL_HANDLE iotHubModuleClientHandle)
{
if (iotHubModuleClientHandle != NULL)
{
IoTHubModuleClient_LL_Destroy(iotHubModuleClientHandle);
}
IoTHub_Deinit();
}
static int SetupCallbacksForModule(IOTHUB_MODULE_CLIENT_LL_HANDLE iotHubModuleClientHandle)
{
int ret;
if (IoTHubModuleClient_LL_SetInputMessageCallback(iotHubModuleClientHandle, "input1", InputQueue1Callback, (void*)iotHubModuleClientHandle) != IOTHUB_CLIENT_OK)
{
printf("ERROR: IoTHubModuleClient_LL_SetInputMessageCallback(\"input1\")..........FAILED!\r\n");
ret = 1;
}
else
{
ret = 0;
}
return ret;
}
void iothub_module()
{
IOTHUB_MODULE_CLIENT_LL_HANDLE iotHubModuleClientHandle;
srand((unsigned int)time(NULL));
if ((iotHubModuleClientHandle = InitializeConnection()) != NULL && SetupCallbacksForModule(iotHubModuleClientHandle) == 0)
{
// The receiver just loops constantly waiting for messages.
printf("Waiting for incoming messages.\r\n");
while (true)
{
IoTHubModuleClient_LL_DoWork(iotHubModuleClientHandle);
ThreadAPI_Sleep(100);
}
}
DeInitializeConnection(iotHubModuleClientHandle);
}
int main(void)
{
iothub_module();
return 0;
}