chore : Mean time recover for P0 (#686)
This commit is contained in:
Родитель
bbc62304d7
Коммит
23a3c175de
|
@ -0,0 +1,95 @@
|
|||
# Mean time to recover first phase
|
||||
|
||||
## Current situation
|
||||
|
||||
Currently the only way to run `aedes` without changing code in the SDK.
|
||||
This is done so that we can pass server authentication with a self signed server verification certificate.
|
||||
In order to do so, a certificate needs to be generated whose subject is equivalent to the hostname of the machine where the `aedes` server is being run.
|
||||
|
||||
### Same HOST machine
|
||||
|
||||
Run the `aedes` server (with or without container) in the same host.
|
||||
For connecting to this server the connection string for device client would have
|
||||
`HostName=localhost`.
|
||||
* Ensure that the `aedes` server is loaded with certificates whose common name is `localhost`.
|
||||
* Ensure that this certificate is passed as the `kwarg server_verification_cert` during device client creation.
|
||||
|
||||
### Different HOST Machine
|
||||
|
||||
Run the `aedes` server (with or without container) in a different host.
|
||||
For example lets say the hostname of the machine or the container is `alohomora.net`.
|
||||
For connecting to this server the connection string for device client would have
|
||||
`HostName=alohomora.net`.
|
||||
* Ensure that the `aedes` server is loaded with certificates whose common name is `alohomora.net`.
|
||||
* Ensure that this certificate is passed as the `kwarg server_verification_cert` during device client creation.
|
||||
|
||||
#### Script for Mean Time to Recover
|
||||
|
||||
The main script for tracking mean time to recover is `mean_time_recover_with_docker.py`.
|
||||
For this part the python package called `docker` needs to be installed.
|
||||
`pip install docker` will do this.
|
||||
|
||||
This package is needed so that the python script can access the Docker Engine API.
|
||||
It enables to do anything the docker command does, but from within python apps.
|
||||
|
||||
There are some variables that can be changed :-
|
||||
|
||||
`FACTOR_OF_KEEP_ALIVE` : the multiplication factor via which keep alive needs to be modified to calculate the amount of time the MQTT Broker will be down.
|
||||
|
||||
Other important variables already having values are :-
|
||||
|
||||
`KEEP_ALIVE` : option for changing the default keep alive for MQTT broker. Currently set at 15 secs.
|
||||
`KEEP_RUNNING` : the amount of time the server needs to keep running for.
|
||||
dead_duration : the amount of time the MQTT broker will be taken down for.
|
||||
`KEEP_DEAD` : The amount of time the MQTT broker will be non responsive.
|
||||
`MQTT_BROKER_RESTART_COUNT` : the count of times server will be stopped and started.
|
||||
|
||||
Before running the the `mean_time_recover_with_docker.py` script make sure your docker engine is running.
|
||||
|
||||
#### Certificate creation
|
||||
|
||||
There is another script which will create some self signed certificates for use in the `aedes` server.
|
||||
For this part we would need a package called `cryptography`.
|
||||
Prior to running this script please do `pip install cryptography` in the corresponding python environment .
|
||||
To generate a certificate with a certain common name (possibly `localhost`) the script needs to be run like
|
||||
`python create_self.py "localhost"`. There are no certificates included in the folder for aedes, so
|
||||
prior to running `aedes` server, certificates need to be generated.
|
||||
|
||||
### Debug or Test
|
||||
|
||||
If the `aedes` server or the docker container is run separately then for testing or debugging purposes
|
||||
a simpler script called `simple_send_message.py`.
|
||||
|
||||
#### MQTT Broker
|
||||
|
||||
For this part docker is needed to be installed in your machine.
|
||||
The mqtt broker being used is in the folder `aedes`.
|
||||
This has been configured to run locally in a docker container.
|
||||
There is no need to run the container as the python script will do so
|
||||
but it is needed to build the image so that the `mean_time_recover_with_docker.py` can create a container later.
|
||||
|
||||
The docker commands to build the appropriate image is :
|
||||
`docker build -t mqtt-broker . `
|
||||
|
||||
The docker command to remove the above image is
|
||||
`docker image rm mqtt-broker`
|
||||
|
||||
#### Running aedes in a container
|
||||
|
||||
For `aedes` server these are required
|
||||
|
||||
* Navigate to `aedes` folder and run the above image building commands
|
||||
which will build the image with the help of the `Dockerfile` in that folder.
|
||||
* Ensure that the `package.json` file similar to the one inside `aedes` folder exists.
|
||||
|
||||
#### Running docker container separately
|
||||
|
||||
If `aedes` is run not as the part of the mean time script then the
|
||||
following steps need to be performed prior to that
|
||||
|
||||
* node js must be installed locally
|
||||
* `npm install` must be done with the `package.json` in the current folder.
|
||||
* appropriate certificates must be generated
|
||||
|
||||
For running the containerized aedes separately please use this command.
|
||||
`docker run -d --publish 8883:8883 --name expecto-patronum mqtt-broker`
|
|
@ -0,0 +1 @@
|
|||
node_modules
|
|
@ -0,0 +1,13 @@
|
|||
FROM node:latest
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
COPY package*.json ./
|
||||
|
||||
RUN npm install
|
||||
|
||||
COPY . .
|
||||
|
||||
EXPOSE 8883
|
||||
|
||||
CMD [ "node", "./aedes_server.js" ]
|
|
@ -0,0 +1,48 @@
|
|||
const fs = require('fs')
|
||||
const aedes = require('aedes')()
|
||||
const port = 8883
|
||||
|
||||
const options = {
|
||||
key: fs.readFileSync('self_key_localhost.pem'),
|
||||
cert: fs.readFileSync('self_cert_localhost.pem')
|
||||
}
|
||||
|
||||
const server = require('tls').createServer(options, aedes.handle)
|
||||
|
||||
server.listen(port, function () {
|
||||
console.log('server started and listening on port', port)
|
||||
})
|
||||
|
||||
aedes.on('clientError', function (client, err) {
|
||||
console.log('client error', client.id, err.message, err.stack)
|
||||
})
|
||||
|
||||
aedes.on('connectionError', function (client, err) {
|
||||
console.log('client error', client, err.message, err.stack)
|
||||
})
|
||||
|
||||
aedes.on('publish', function (packet, client) {
|
||||
if (packet && packet.payload) {
|
||||
console.log('publish packet:', packet.payload.toString())
|
||||
}
|
||||
if (client) {
|
||||
console.log('message from client', client.id)
|
||||
}
|
||||
})
|
||||
|
||||
aedes.on('subscribe', function (subscriptions, client) {
|
||||
if (client) {
|
||||
console.log('subscribe from client', subscriptions, client.id)
|
||||
}
|
||||
})
|
||||
|
||||
aedes.on('client', function (client) {
|
||||
console.log('new client', client.id)
|
||||
})
|
||||
|
||||
//aedes.on('error', function (err) {
|
||||
// console.log('error is', err)
|
||||
//})
|
||||
|
||||
// check port in usage
|
||||
//lsof -nP -iTCP:8883 | grep LISTEN
|
|
@ -0,0 +1,14 @@
|
|||
{
|
||||
"name": "helloaedes",
|
||||
"version": "1.0.0",
|
||||
"description": "",
|
||||
"main": "aedes_server.js",
|
||||
"scripts": {
|
||||
"test": "echo \"Error: no test specified\" && exit 1"
|
||||
},
|
||||
"author": "",
|
||||
"license": "ISC",
|
||||
"dependencies": {
|
||||
"aedes": "^0.42.5"
|
||||
}
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||
from cryptography.x509.oid import NameOID
|
||||
from datetime import datetime, timedelta, time
|
||||
import uuid
|
||||
import argparse
|
||||
|
||||
|
||||
"""
|
||||
For using this script must have cryptography package installed in python environment.
|
||||
"""
|
||||
|
||||
PUBLIC_EXPONENT = 65537
|
||||
|
||||
|
||||
def create_self_signed_cert(common_name, days=30):
|
||||
password_file = "self_key.pem"
|
||||
private_key = create_private_key(key_file=password_file, key_size=4096)
|
||||
file_certificate = "self_cert.pem"
|
||||
|
||||
public_key = private_key.public_key()
|
||||
|
||||
subject = x509.Name(
|
||||
[x509.NameAttribute(NameOID.COMMON_NAME, str.encode(common_name).decode("utf-8"))]
|
||||
)
|
||||
|
||||
builder = create_cert_builder(
|
||||
subject=subject, issuer_name=subject, public_key=public_key, days=days, is_ca=False
|
||||
)
|
||||
|
||||
self_cert = builder.sign(
|
||||
private_key=private_key, algorithm=hashes.SHA256(), backend=default_backend()
|
||||
)
|
||||
with open(file_certificate, "wb") as f:
|
||||
f.write(self_cert.public_bytes(serialization.Encoding.PEM))
|
||||
|
||||
return self_cert
|
||||
|
||||
|
||||
def create_cert_builder(subject, issuer_name, public_key, days=30, is_ca=False):
|
||||
"""
|
||||
The method to create a builder for all types of certificates.
|
||||
:param subject: The subject of the certificate.
|
||||
:param issuer_name: The name of the issuer.
|
||||
:param public_key: The public key of the certificate.
|
||||
:param days: The number of days for which the certificate is valid. The default is 1 year or 30 days.
|
||||
:param is_ca: Boolean to indicate if a cert is ca or non ca.
|
||||
:return: The certificate builder.
|
||||
:rtype: :class `x509.CertificateBuilder`
|
||||
"""
|
||||
builder = x509.CertificateBuilder()
|
||||
|
||||
builder = builder.subject_name(subject)
|
||||
builder = builder.issuer_name(issuer_name)
|
||||
builder = builder.public_key(public_key)
|
||||
builder = builder.not_valid_before(datetime.today())
|
||||
|
||||
builder = builder.not_valid_after(datetime.today() + timedelta(days=days))
|
||||
builder = builder.serial_number(int(uuid.uuid4()))
|
||||
builder = builder.add_extension(
|
||||
x509.BasicConstraints(ca=is_ca, path_length=None), critical=True
|
||||
)
|
||||
return builder
|
||||
|
||||
|
||||
def create_private_key(key_file, password=None, key_size=4096):
|
||||
"""
|
||||
Crate encrypted key for certificates.
|
||||
:param key_file: The file to store the key.
|
||||
:param password: Password for the key.
|
||||
:param key_size: The key size to use for encryption. The default is 4096.
|
||||
:return: The private key.
|
||||
"""
|
||||
if password:
|
||||
encrypt_algo = serialization.BestAvailableEncryption(str.encode(password))
|
||||
else:
|
||||
encrypt_algo = serialization.NoEncryption()
|
||||
|
||||
private_key = rsa.generate_private_key(
|
||||
public_exponent=PUBLIC_EXPONENT, key_size=key_size, backend=default_backend()
|
||||
)
|
||||
# Write our key to file
|
||||
with open(key_file, "wb") as f:
|
||||
f.write(
|
||||
private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
||||
encryption_algorithm=encrypt_algo,
|
||||
)
|
||||
)
|
||||
|
||||
return private_key
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser(description="Generate a certificate chain.")
|
||||
parser.add_argument("domain", help="Domain name or common name.")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
common_name = args.domain
|
||||
create_self_signed_cert(common_name)
|
|
@ -0,0 +1,175 @@
|
|||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import docker
|
||||
import asyncio
|
||||
import uuid
|
||||
from azure.iot.device.aio import IoTHubDeviceClient
|
||||
from azure.iot.device import Message
|
||||
from time import perf_counter
|
||||
import threading
|
||||
from multiprocessing import Value, Process
|
||||
|
||||
|
||||
# Scenario based values
|
||||
KEEP_ALIVE = 18
|
||||
FACTOR_OF_KEEP_ALIVE = 0.5
|
||||
KEEP_RUNNING = 10
|
||||
KEEP_DEAD = int(KEEP_ALIVE * FACTOR_OF_KEEP_ALIVE)
|
||||
|
||||
MQTT_BROKER_RESTART_COUNT = 5
|
||||
CONTAINER_NAME = "leaky-cauldron"
|
||||
|
||||
elapsed_times = []
|
||||
container = None
|
||||
|
||||
|
||||
def control_container(
|
||||
container_name, keep_running, keep_dead, restart_count, signal_to_quit, should_be_restarted=True
|
||||
):
|
||||
global container
|
||||
|
||||
print("Container started.")
|
||||
client = docker.from_env()
|
||||
container = client.containers.run(
|
||||
"mqtt-broker", detach=True, name=container_name, ports={"8883/tcp": 8883}
|
||||
)
|
||||
if should_be_restarted:
|
||||
kill_and_restart_container(
|
||||
keep_running=keep_running,
|
||||
keep_dead=keep_dead,
|
||||
restart_count=restart_count,
|
||||
signal_to_quit=signal_to_quit,
|
||||
)
|
||||
else:
|
||||
# This may need to be varied so that the last message can be SENT without an async task cancellation error.
|
||||
kill_container(keep_running=5)
|
||||
signal_to_quit.value = 1
|
||||
|
||||
|
||||
def kill_and_restart_container(keep_running, keep_dead, restart_count, signal_to_quit):
|
||||
kill_container(keep_running)
|
||||
print("Container stopped.")
|
||||
start_timer(duration=keep_dead, restart_count=restart_count, signal_to_quit=signal_to_quit)
|
||||
|
||||
|
||||
def kill_container(keep_running):
|
||||
print("Container will run for {} secs.".format(keep_running))
|
||||
container.stop(timeout=keep_running)
|
||||
container.remove()
|
||||
|
||||
|
||||
def quitting_listener(quit_signal):
|
||||
while True:
|
||||
sig_val = quit_signal.value
|
||||
if sig_val == 1:
|
||||
print("Quitting...")
|
||||
break
|
||||
|
||||
|
||||
async def send_test_message(device_client, restart_count):
|
||||
i = 0
|
||||
while True:
|
||||
print("sending message #" + str(i))
|
||||
msg = Message("test wind speed " + str(i))
|
||||
msg.message_id = uuid.uuid4()
|
||||
t_start = perf_counter()
|
||||
await device_client.send_message(msg)
|
||||
t_stop = perf_counter()
|
||||
elapsed_time = t_stop - t_start
|
||||
elapsed_times.append(elapsed_time)
|
||||
print("done sending message #" + str(i))
|
||||
i = i + 1
|
||||
await asyncio.sleep(3)
|
||||
val = restart_count.value
|
||||
if val >= MQTT_BROKER_RESTART_COUNT:
|
||||
print(
|
||||
"Executed container restarts with telemetry {} times. Quitting telemetry task.".format(
|
||||
val
|
||||
)
|
||||
)
|
||||
break
|
||||
|
||||
|
||||
def start_timer(duration, restart_count, signal_to_quit):
|
||||
def timer_done():
|
||||
timer.cancel()
|
||||
print("{} secs is up. Cancelled timer. Container will be restarted again.".format(duration))
|
||||
restart_count.value = restart_count.value + 1
|
||||
# signal_to_quit.value = 0
|
||||
needs_restart = True
|
||||
if restart_count.value >= MQTT_BROKER_RESTART_COUNT:
|
||||
print(
|
||||
"Executed container restarts {} times. Container will not be restarted after the current loop. Quitting any future loop.".format(
|
||||
restart_count.value
|
||||
)
|
||||
)
|
||||
# signal_to_quit.value = 1
|
||||
needs_restart = False
|
||||
control_container(
|
||||
CONTAINER_NAME,
|
||||
keep_running=KEEP_RUNNING,
|
||||
keep_dead=duration,
|
||||
restart_count=restart_count,
|
||||
signal_to_quit=signal_to_quit,
|
||||
should_be_restarted=needs_restart,
|
||||
)
|
||||
|
||||
print("Container will be dead for {} secs.".format(duration))
|
||||
timer = threading.Timer(duration, timer_done)
|
||||
timer.start()
|
||||
|
||||
|
||||
async def main():
|
||||
|
||||
ca_cert = "self_cert_localhost.pem"
|
||||
certfile = open(ca_cert)
|
||||
root_ca_cert = certfile.read()
|
||||
|
||||
# Inter process values
|
||||
times_container_restart = Value("i", 0)
|
||||
signal_to_quit = Value("i", 0)
|
||||
|
||||
process_docker = Process(
|
||||
target=control_container,
|
||||
args=(CONTAINER_NAME, KEEP_RUNNING, KEEP_DEAD, times_container_restart, signal_to_quit),
|
||||
)
|
||||
process_docker.start()
|
||||
|
||||
# Do not delete sleep from here. Server needs some time to start.
|
||||
await asyncio.sleep(5)
|
||||
conn_str = "HostName=localhost;DeviceId=devicemtr;SharedAccessKey=Zm9vYmFy"
|
||||
device_client = IoTHubDeviceClient.create_from_connection_string(
|
||||
conn_str, keep_alive=KEEP_ALIVE, server_verification_cert=root_ca_cert
|
||||
)
|
||||
|
||||
await device_client.connect()
|
||||
|
||||
send_message_task = asyncio.create_task(
|
||||
send_test_message(device_client, times_container_restart)
|
||||
)
|
||||
|
||||
# Run the listener in the event loop
|
||||
# This can be a STDIN listener as well for user to indicate quitting.
|
||||
loop = asyncio.get_running_loop()
|
||||
finished_loops = loop.run_in_executor(None, quitting_listener, signal_to_quit)
|
||||
|
||||
# Wait for count times to reach a certain number indicative of completion
|
||||
await finished_loops
|
||||
|
||||
print("Count is " + str(times_container_restart.value))
|
||||
print(elapsed_times)
|
||||
|
||||
process_docker.terminate()
|
||||
try:
|
||||
send_message_task.cancel()
|
||||
except asyncio.CancelledError:
|
||||
print("send message task is cancelled now")
|
||||
|
||||
await device_client.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
|
@ -0,0 +1,72 @@
|
|||
# -------------------------------------------------------------------------
|
||||
# Copyright (c) Microsoft Corporation. All rights reserved.
|
||||
# Licensed under the MIT License. See License.txt in the project root for
|
||||
# license information.
|
||||
# --------------------------------------------------------------------------
|
||||
import asyncio
|
||||
import uuid
|
||||
from azure.iot.device.aio import IoTHubDeviceClient
|
||||
from azure.iot.device import Message
|
||||
from time import perf_counter
|
||||
|
||||
elapsed_times = []
|
||||
|
||||
|
||||
# define behavior for halting the application
|
||||
def quitting_listener():
|
||||
while True:
|
||||
selection = input("Press Q to quit\n")
|
||||
if selection == "Q" or selection == "q":
|
||||
print("Quitting...")
|
||||
break
|
||||
|
||||
|
||||
async def send_test_message(device_client):
|
||||
i = 0
|
||||
while True:
|
||||
print("sending message #" + str(i))
|
||||
msg = Message("test wind speed " + str(i))
|
||||
msg.message_id = uuid.uuid4()
|
||||
t_start = perf_counter()
|
||||
await device_client.send_message(msg)
|
||||
t_stop = perf_counter()
|
||||
elapsed_time = t_stop - t_start
|
||||
elapsed_times.append(elapsed_time)
|
||||
print("done sending message #" + str(i))
|
||||
i = i + 1
|
||||
await asyncio.sleep(3)
|
||||
|
||||
|
||||
async def main():
|
||||
# Scenario based values
|
||||
keep_alive = 15
|
||||
|
||||
conn_str = "HostName=localhost;DeviceId=devicemtr;SharedAccessKey=Zm9vYmFy"
|
||||
device_client = IoTHubDeviceClient.create_from_connection_string(
|
||||
conn_str, keep_alive=keep_alive
|
||||
)
|
||||
|
||||
await device_client.connect()
|
||||
|
||||
send_message_task = asyncio.create_task(send_test_message(device_client))
|
||||
|
||||
# Run the listener in the event loop
|
||||
# This can be a STDIN listener as well for user to indicate quitting.
|
||||
loop = asyncio.get_running_loop()
|
||||
finished_loops = loop.run_in_executor(None, quitting_listener)
|
||||
|
||||
# Wait for count times to reach a certain number indicative of completion
|
||||
await finished_loops
|
||||
|
||||
print(elapsed_times)
|
||||
|
||||
try:
|
||||
send_message_task.cancel()
|
||||
except asyncio.CancelledError:
|
||||
print("send message task is cancelled now")
|
||||
|
||||
await device_client.disconnect()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
Загрузка…
Ссылка в новой задаче