MonitoFi/monitofi.py

163 строки
5.6 KiB
Python
Executable File

# // Copyright (c) Microsoft Corporation.
# // Licensed under the MIT license.
import logging
import time
import sys
import os
import urllib3
import requests
import json
import json_log_formatter
from requests_pkcs12 import get, post
from applicationinsights.logging import LoggingHandler
from applicationinsights.exceptions import enable
from influxdb import InfluxDBClient
from datetime import datetime
API_URL = os.getenv('API_URL', "http://localhost:8080/nifi-api/").split(',') # Comma separated list of nifi cluster api urls in case of multiple clusters.
ENDPOINT_LIST = os.getenv('ENDPOINT_LIST', "controller/cluster,flow/cluster/summary,flow/process-groups/root,flow/status,counters,system-diagnostics?nodewise=true").split(',')
MODE = os.getenv('MODE', "unlimited") # In limited mode, only NUMBEROFITERATIONS API calls are made before exiting.
NUMBER_OF_ITERATIONS = int(os.getenv('NUMBER_OF_ITERATIONS', 2))
IKEY = os.getenv('IKEY', "REPLACE_ME")
SLEEP_INTERVAL = int(os.getenv('SLEEP_INTERVAL', 300))
SECURE = os.getenv('SECURE', False) # Is NiFi Cluster Secure
CERT_FILE = os.getenv('CERT_FILE', '/opt/monitofi/cert.pkcs12')
CERT_PASS = os.getenv('CERT_PASS', 'REPLACE_ME')
INFLUXDB_SERVER = os.getenv('INFLUXDB_SERVER', "127.0.0.1") # IP or hostname to InfluxDB server
INFLUXDB_PORT = int(os.getenv('INFLUXDB_PORT', 8086)) # Port on InfluxDB server
INFLUXDB_USERNAME = os.getenv('INFLUXDB_USERNAME', "root")
INFLUXDB_PASSWORD = os.getenv('INFLUXDB_PASSWORD', "root")
INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', "nifi")
count = 0
urllib3.disable_warnings()
conditions = {
"limited": lambda: count < NUMBER_OF_ITERATIONS,
"unlimited": lambda: True
}
# Sysout Logging Setup
logger = logging.getLogger("monitofi")
logger.setLevel(logging.INFO)
syshandler = logging.StreamHandler(sys.stdout)
syshandler.setLevel(logging.INFO)
formatter = json_log_formatter.JSONFormatter()
syshandler.setFormatter(formatter)
logger.addHandler(syshandler)
if IKEY != "REPLACE_ME":
# Logging unhandled exceptions with Appinsights
enable(IKEY)
# Applications Insights Logging Setup
handler = LoggingHandler(IKEY)
handler.setFormatter(formatter)
logger.addHandler(handler)
iclient = InfluxDBClient(INFLUXDB_SERVER, INFLUXDB_PORT, INFLUXDB_USERNAME, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
iclient.create_database(INFLUXDB_DATABASE)
def flattening(nested, prefix, ignore_list):
field = {}
flatten(True, nested, field, prefix, ignore_list)
return field
def flatten(top, nested, flatdict, prefix, ignore_list):
def assign(newKey, data, toignore):
if toignore:
if isinstance(data, (dict, list, tuple,)):
json_data = json.dumps(data)
flatdict[newKey] = json_data
else:
flatdict[newKey] = data
else:
if isinstance(data, (dict, list, tuple,)):
flatten(False, data, flatdict, newKey, ignore_list)
else:
flatdict[newKey] = data
if isinstance(nested, dict):
for key, value in nested.items():
ok = match_key(ignore_list, key)
if ok and prefix == "":
assign(key, value, True)
elif ok and prefix != "":
newKey = create_key(top, prefix, key)
assign(newKey, value, True)
else:
newKey = create_key(top, prefix, key)
assign(newKey, value, False)
elif isinstance(nested, (list, tuple,)):
for index, value in enumerate(nested):
if isinstance(value, dict):
for key1, value1 in value.items():
ok = match_key(ignore_list, key1)
if ok:
subkey = str(index) + "." + key1
newkey = create_key(top, prefix, subkey)
assign(newkey, value1, True)
else:
newkey = create_key(top, prefix, str(index))
assign(newkey, value, False)
else:
newkey = create_key(top, prefix, str(index))
assign(newkey, value, False)
else:
return ("Not a Valid input")
def create_key(top, prefix, subkey):
key = prefix
if top:
key += subkey
else:
key += "." + subkey
return key
def match_key(ignorelist, value):
for element in ignorelist:
if element == value:
return True
return False
while conditions[MODE]():
try:
for AURL in API_URL:
for ENDPOINT in ENDPOINT_LIST:
r = requests.get(url=AURL + ENDPOINT) if SECURE == False else get(url=AURL + ENDPOINT, headers={
'Content-Type': 'application/json'}, verify=False, pkcs12_filename=CERT_FILE, pkcs12_password=CERT_PASS)
received_response = r.json()
flat_response = flattening(received_response, "", [])
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
points = [{
"measurement": ENDPOINT,
"tags": {'APIURL':AURL},
"time": current_time,
"fields": flat_response
}]
logger.info(ENDPOINT, extra=received_response)
iclient.write_points(points)
if IKEY != "REPLACE_ME":
handler.flush()
count += 1
except Exception as e:
# this will send an exception to the Application Insights Logs
logging.exception("Code ran into an unforseen exception!", sys.exc_info()[0])
time.sleep(SLEEP_INTERVAL)
# logging shutdown will cause a flush of all un-sent telemetry items
logging.shutdown()