From ffb7457bf6a7d30772735ae255bcddfedb81b5b6 Mon Sep 17 00:00:00 2001 From: Oliva Kar Date: Wed, 8 Mar 2023 21:30:48 -0800 Subject: [PATCH] working , check in windows --- ...on_symmetric_key_failover_device_delete.py | 374 ++++++++++++++++++ 1 file changed, 374 insertions(+) create mode 100644 samples/solutions/provision_symmetric_key_failover_device_delete.py diff --git a/samples/solutions/provision_symmetric_key_failover_device_delete.py b/samples/solutions/provision_symmetric_key_failover_device_delete.py new file mode 100644 index 000000000..ba12fbc65 --- /dev/null +++ b/samples/solutions/provision_symmetric_key_failover_device_delete.py @@ -0,0 +1,374 @@ +# ------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +# -------------------------------------------------------------------------- + +import asyncio +from azure.iot.device.iothub.aio import IoTHubDeviceClient +from azure.iot.device.aio import ProvisioningDeviceClient + +import logging.handlers +import glob +import os +import traceback + +# Current DPS time out configured in the python SDK is itself 30 secs , +# so call times need to be configured keeping that in mind. +# The interval at which to check for registrations after a registration has been successful +MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS = 45 +# The interval at which to send telemetry +TELEMETRY_INTERVAL = 10 +# Interval in seconds between to check if device was provisioned to some hub +# This is used when registration was underway and other processes are checking if it is completed. +SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION = 10 +# Initial interval in seconds between consecutive connection attempts in case of error +INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS = 3 +# Threshold for retrying connection attempts after which the app will error +THRESHOLD_FOR_RETRY_CONNECTION = 90 +# Interval for rotating logs, in seconds +LOG_ROTATION_INTERVAL = 3600 +# How many logs to keep before recycling +LOG_BACKUP_COUNT = 6 +# Directory for storing log files +LOG_DIRECTORY = "./logs/event_loop_dpsfailover/device-delete-new-client-3" +messages_to_send = 10 + +# logger = logging.getLogger() +# logger.setLevel(level=logging.DEBUG) + +# Prepare the log directory +os.makedirs(LOG_DIRECTORY, exist_ok=True) +for filename in glob.glob("{}/*.log".format(LOG_DIRECTORY)): + os.remove(filename) + +log_formatter = logging.Formatter( + "%(asctime)s %(levelname)-5s (%(threadName)s) %(filename)s:%(funcName)s():%(message)s" +) + +info_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/info.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +info_log_handler.setLevel(level=logging.INFO) +info_log_handler.setFormatter(log_formatter) + +debug_log_handler = logging.handlers.TimedRotatingFileHandler( + filename="{}/debug.log".format(LOG_DIRECTORY), + when="S", + interval=LOG_ROTATION_INTERVAL, + backupCount=LOG_BACKUP_COUNT, +) +debug_log_handler.setLevel(level=logging.DEBUG) +debug_log_handler.setFormatter(log_formatter) + +root_logger = logging.getLogger() +root_logger.setLevel(level=logging.DEBUG) +root_logger.addHandler(info_log_handler) +root_logger.addHandler(debug_log_handler) + +sample_log_handler = logging.FileHandler(filename="{}/sample.log".format(LOG_DIRECTORY)) +sample_log_handler.setLevel(level=logging.DEBUG) +sample_log_handler.setFormatter(log_formatter) +logger = logging.getLogger(__name__) +logger.addHandler(sample_log_handler) + + +def get_type_name(e): + return type(e).__name__ + + +class Application(object): + async def initiate(self): + self.connected_event = asyncio.Event() + self.disconnected_event = asyncio.Event() + self.exit_app_event = asyncio.Event() + self.iothub_assignment_fail_event = asyncio.Event() + self.iothub_assignment_sucess_event = asyncio.Event() + # Baseline that device has not been assigned + self.iothub_assignment_fail_event.set() + # Baseline that device has not been connected + self.disconnected_event.set() + + self.iothub_client = None + self.first_connect = True + self.registration_attempt_on = True + # Power factor for increasing interval between consecutive connection attempts. + # This will increase with iteration + self.retry_increase_factor = 1 + # The nth number for attempting connection + self.sleep_time_between_conns = INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS + self.try_number = 1 + + async def create_dps_client(self, symmetric_key): + self.log_info_and_print("Will create provisioning device client to provision device...") + provisioning_host = os.getenv("PROVISIONING_HOST") + id_scope = os.getenv("PROVISIONING_IDSCOPE") + registration_id = os.getenv("PROVISIONING_REGISTRATION_ID") + provisioning_client = ProvisioningDeviceClient.create_from_symmetric_key( + provisioning_host=provisioning_host, + registration_id=registration_id, + id_scope=id_scope, + symmetric_key=symmetric_key, + ) + return provisioning_client + + async def create_hub_client(self, registration_result, symmetric_key): + try: + # Create a Device Client + self.iothub_client = IoTHubDeviceClient.create_from_symmetric_key( + symmetric_key=symmetric_key, + hostname=registration_result.registration_state.assigned_hub, + device_id=registration_result.registration_state.device_id, + ) + # Attach the connection state handler + self.iothub_client.on_connection_state_change = self.handle_on_connection_state_change + except Exception as e: + self.log_error_and_print( + "Caught exception while trying to attach handler : {}".format(get_type_name(e)) + ) + raise Exception( + "Caught exception while trying to attach handler. Will exit application..." + ) + + async def handle_on_connection_state_change(self): + self.log_info_and_print( + "handle_on_connection_state_change fired. Connected status : {}".format( + self.iothub_client.connected + ) + ) + if self.iothub_client.connected: + self.log_info_and_print("Connected connected_event is set...") + self.disconnected_event.clear() + main_event_loop.call_soon_threadsafe(self.connected_event.set) + # Reset the power factor, sleep time and the try number to what it was originally + # on every successful connection. + self.retry_increase_factor = 1 + self.sleep_time_between_conns = INITIAL_SLEEP_TIME_BETWEEN_CONNECTION_ATTEMPTS + self.try_number = 1 + else: + self.log_info_and_print("Disconnected connected_event is set...") + main_event_loop.call_soon_threadsafe(self.disconnected_event.set) + self.connected_event.clear() + + async def register_loop(self): + while True: + provisioning_client = None + self.log_info_and_print("Entry register_loop") + if self.iothub_assignment_sucess_event.is_set(): + self.log_info_and_print("Device has been successfully provisioned already...") + await asyncio.sleep(MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS) + elif self.iothub_assignment_fail_event.is_set(): + try: + symmetric_key = os.getenv("PROVISIONING_SYMMETRIC_KEY") + provisioning_client = await self.create_dps_client(symmetric_key) + self.log_info_and_print("Registering the device...") + registration_result = await provisioning_client.register() + print("The complete registration result is") + print(registration_result.registration_state) + if registration_result.status == "assigned": + self.log_info_and_print( + "Will create hub client to send telemetry from the provisioned device" + ) + await self.create_hub_client(registration_result, symmetric_key) + self.iothub_assignment_sucess_event.set() + self.iothub_assignment_fail_event.clear() + self.log_error_and_print( + "Registration was done and device was assigned correctly to an " + "IoTHub via the registration process.Will check if all is right " + "after some time..." + ) + await asyncio.sleep(MONITOR_TIME_BETWEEN_SUCCESS_ASSIGNMENTS) + else: + self.log_error_and_print( + "Registration was done but device was not assigned correctly to an " + "IoTHub via the registration process.Will try registration " + "again after some time..." + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + self.iothub_assignment_fail_event.set() + self.iothub_assignment_sucess_event.clear() + except Exception as e: + self.log_error_and_print( + "Registration process failed because of error {}".format(get_type_name(e)) + ) + raise Exception( + "Caught an unrecoverable error that needs to be " + "fixed from user end while registering. Will exit application..." + ) + finally: + await provisioning_client.shutdown() + + if self.exit_app_event.is_set(): + return + + async def wait_for_connect_and_send_telemetry(self): + id = 1 + while True: + if not self.iothub_client: + # Time to check if device has been provisioned + self.log_info_and_print( + "IoTHub client is still nonexistent for telemetry. " + "Will check after {} secs...".format(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + elif not self.iothub_client.connected: + self.log_info_and_print("IoTHub client is existent. But waiting for connection ...") + await self.connected_event.wait() + else: + self.log_info_and_print("sending message with id {}....".format(id)) + await self.iothub_client.send_message("message number {}".format(id)) + id += 1 + self.log_info_and_print("sent message.....") + self.log_info_and_print("sleeping for {} secs...".format(TELEMETRY_INTERVAL)) + await asyncio.sleep(TELEMETRY_INTERVAL) + if self.exit_app_event.is_set(): + return + + async def if_disconnected_then_connect_with_retry(self): + while True: + self.log_info_and_print("Entry for retry after disconnection") + done, pending = await asyncio.wait( + [ + self.disconnected_event.wait(), + self.exit_app_event.wait(), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + self.log_info_and_print("Exit for retry after disconnection") + await asyncio.gather(*done) + [x.cancel() for x in pending] + if self.exit_app_event.is_set(): + self.log_info_and_print("Exiting while connected") + return + if not self.iothub_client or self.iothub_assignment_fail_event.is_set(): + self.log_info_and_print( + "IoTHub client is still nonexistent or re-provisioning is needed for establishing connection." + "Will check after {} secs...".format(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + ) + await asyncio.sleep(SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION) + elif not self.iothub_assignment_fail_event.is_set(): + try: + self.log_info_and_print( + "Attempting to connect the device client try number {}....".format( + self.try_number + ) + ) + await self.iothub_client.connect() + if self.first_connect: + self.first_connect = False + self.log_info_and_print("Successfully connected the device client...") + except Exception as e: + self.log_error_and_print( + "Caught exception while trying to connect: {}".format(get_type_name(e)) + ) + self.log_error_and_print("Exception details...") + self.log_error_and_print( + "Detailed exception: {}".format(traceback.format_exception_only(type(e), e)) + ) + if self.first_connect: + self.log_info_and_print( + "Very first connection never occurred so will retry immediately..." + ) + self.first_connect = False + sleep_time = 0 + else: + self.log_info_and_print( + "Retry attempt interval is {} and increase power factor is {}".format( + self.sleep_time_between_conns, self.retry_increase_factor + ) + ) + sleep_time = pow(self.sleep_time_between_conns, self.retry_increase_factor) + + if sleep_time > THRESHOLD_FOR_RETRY_CONNECTION: + self.log_error_and_print( + "Failed to connect the device client couple of times." + "Retry time is greater than upper limit set. Will be reprovisioning the device again." + ) + self.try_number = 1 + self.retry_increase_factor = 1 + self.iothub_assignment_fail_event.set() + self.iothub_assignment_sucess_event.clear() + sleep_time = SLEEP_TIME_BETWEEN_CHECKING_REGISTRATION + self.log_error_and_print( + "Will not retry connection as trying re-provisioing again. " + "Will try connection again after some time {}...".format(sleep_time) + ) + await asyncio.sleep(sleep_time) + else: + self.log_error_and_print( + "Failed to connect the device client due to error :{}.Sleeping and retrying after {} seconds".format( + get_type_name(e), sleep_time + ) + ) + self.retry_increase_factor += 1 + self.try_number += 1 + await asyncio.sleep(sleep_time) + + def log_error_and_print(self, s): + logger.error(s) + + print(s) + + def log_info_and_print(self, s): + logger.info(s) + print(s) + + async def run_sample(self): + await self.initiate() + + self.log_error_and_print( + "asyncio debug is set to {}".format(os.getenv("PYTHONASYNCIODEBUG")) + ) + + tasks = [ + asyncio.create_task(self.register_loop()), + asyncio.create_task(self.wait_for_connect_and_send_telemetry()), + asyncio.create_task(self.if_disconnected_then_connect_with_retry()), + ] + + pending = [] + + try: + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + await asyncio.gather(*done) + except KeyboardInterrupt: + self.log_error_and_print("IoTHubClient sample stopped by user") + except Exception as e: + self.log_error_and_print("Exception in run sample loop: {}".format(get_type_name(e))) + finally: + self.log_info_and_print("Exiting app") + self.exit_app_event.set() + self.log_info_and_print("Waiting for all coroutines to exit") + if pending: + await asyncio.wait_for( + asyncio.wait(pending, return_when=asyncio.ALL_COMPLETED), timeout=5 + ) + self.log_info_and_print( + "Shutting down both ProvisioningClient and IoTHubClient and exiting Application" + ) + await self.iothub_client.shutdown() + + def main(self): + global main_event_loop + print("IoT Hub Sample #1 - Constant Connection With Telemetry") + print("Press Ctrl-C to exit") + + main_event_loop = asyncio.get_event_loop() + + try: + main_event_loop.run_until_complete(Application().run_sample()) + except Exception as e: + self.log_error_and_print( + "Any other exception in the main calling: {}".format(get_type_name(e)) + ) + except KeyboardInterrupt: + print("IoTHubClient sample stopped by user") + finally: + main_event_loop.close() + + +if __name__ == "__main__": + Application().main()