зеркало из https://github.com/microsoft/MonitoFi.git
Adding Code,Docs,Dashboards - First Version
This commit is contained in:
Родитель
6c6c26a9ed
Коммит
52c5e1b96e
|
@ -0,0 +1,15 @@
|
|||
FROM python:3.7-slim
|
||||
|
||||
RUN mkdir /opt/nifimonitor
|
||||
|
||||
ENV VIRTUAL_ENV=/opt/nifimonitor/nifimonitor-env
|
||||
RUN python -m venv $VIRTUAL_ENV
|
||||
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
|
||||
|
||||
COPY ./requirements.txt /opt/nifimonitor/requirements.txt
|
||||
RUN pip install -r /opt/nifimonitor/requirements.txt
|
||||
|
||||
WORKDIR /opt/nifimonitor
|
||||
COPY ./nifi_monitor.py /opt/nifimonitor/nifi_monitor.py
|
||||
|
||||
CMD ["python", "/opt/nifimonitor/nifi_monitor.py"]
|
Двоичный файл не отображается.
После Ширина: | Высота: | Размер: 2.0 MiB |
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -0,0 +1,17 @@
|
|||
API_URL='https://monitor-on-aks-nifi.monitor-on-aks.svc/nifi-api/'
|
||||
ENDPOINT_LIST="controller/cluster,flow/cluster/summary,flow/process-groups/root,flow/status,counters,system-diagnostics"
|
||||
SLEEP_INTERVAL=300
|
||||
MODE ="unlimited" # In limited mode, only NUMBEROFITERATIONS API calls are made before exiting.
|
||||
NUMBER_OF_ITERATIONS = 2
|
||||
|
||||
SECURE=True #If Set to True, Please add following volume mount with certificate -v $(pwd)/keystore.pkcs12:/opt/nifimonitor/cert.pkcs12
|
||||
CERT_PASS="PasswordForCertificate"
|
||||
|
||||
IKEY="REPLACE_ME" #AppInsights Instrumentation Key"
|
||||
|
||||
INFLUXDB_SERVER="influxdb-grafana"
|
||||
INFLUXDB_SERVER ="127.0.0.1" # IP or hostname to InfluxDB server
|
||||
INFLUXDB_PORT =8086 # Port on InfluxDB server
|
||||
INFLUXDB_USERNAME ="root"
|
||||
INFLUXDB_PASSWORD ="root"
|
||||
INFLUXDB_DATABASE ="nifi"
|
|
@ -0,0 +1,159 @@
|
|||
import logging
|
||||
import time
|
||||
import sys
|
||||
import os
|
||||
import urllib3
|
||||
import requests
|
||||
import json
|
||||
import json_log_formatter
|
||||
from requests_pkcs12 import get, post
|
||||
from applicationinsights.logging import LoggingHandler
|
||||
from applicationinsights.exceptions import enable
|
||||
from influxdb import InfluxDBClient
|
||||
from datetime import datetime
|
||||
|
||||
API_URL = os.getenv('API_URL', "http://localhost:8080/nifi-api/") # complete api-endpoint
|
||||
ENDPOINT_LIST = os.getenv('ENDPOINT_LIST', "controller/cluster,flow/cluster/summary,flow/process-groups/root,flow/status,counters,system-diagnostics?nodewise=true").split(',')
|
||||
MODE = os.getenv('MODE', "unlimited") # In limited mode, only NUMBEROFITERATIONS API calls are made before exiting.
|
||||
NUMBER_OF_ITERATIONS = int(os.getenv('NUMBER_OF_ITERATIONS', 2))
|
||||
IKEY = os.getenv('IKEY', "REPLACE_ME")
|
||||
SLEEP_INTERVAL = int(os.getenv('SLEEP_INTERVAL', 300))
|
||||
|
||||
SECURE = os.getenv('SECURE', False) # Is NiFi Cluster Secure
|
||||
CERT_FILE = os.getenv('CERT_FILE', '/opt/nifimonitor/cert.pkcs12')
|
||||
CERT_PASS = os.getenv('CERT_PASS', 'REPLACE_ME')
|
||||
|
||||
INFLUXDB_SERVER = os.getenv('INFLUXDB_SERVER', "127.0.0.1") # IP or hostname to InfluxDB server
|
||||
INFLUXDB_PORT = int(os.getenv('INFLUXDB_PORT', 8086)) # Port on InfluxDB server
|
||||
INFLUXDB_USERNAME = os.getenv('INFLUXDB_USERNAME', "root")
|
||||
INFLUXDB_PASSWORD = os.getenv('INFLUXDB_PASSWORD', "root")
|
||||
INFLUXDB_DATABASE = os.getenv('INFLUXDB_DATABASE', "nifi")
|
||||
|
||||
count = 0
|
||||
urllib3.disable_warnings()
|
||||
conditions = {
|
||||
"limited": lambda: count < NUMBER_OF_ITERATIONS,
|
||||
"unlimited": lambda: True
|
||||
}
|
||||
|
||||
# Sysout Logging Setup
|
||||
logger = logging.getLogger("nifi-monitor")
|
||||
logger.setLevel(logging.INFO)
|
||||
syshandler = logging.StreamHandler(sys.stdout)
|
||||
syshandler.setLevel(logging.INFO)
|
||||
formatter = json_log_formatter.JSONFormatter()
|
||||
syshandler.setFormatter(formatter)
|
||||
logger.addHandler(syshandler)
|
||||
|
||||
if IKEY != "REPLACE_ME":
|
||||
# Logging unhandled exceptions with Appinsights
|
||||
enable(IKEY)
|
||||
# Applications Insights Logging Setup
|
||||
handler = LoggingHandler(IKEY)
|
||||
handler.setFormatter(formatter)
|
||||
logger.addHandler(handler)
|
||||
|
||||
iclient = InfluxDBClient(INFLUXDB_SERVER, INFLUXDB_PORT, INFLUXDB_USERNAME, INFLUXDB_PASSWORD, INFLUXDB_DATABASE)
|
||||
iclient.create_database(INFLUXDB_DATABASE)
|
||||
|
||||
def flattening(nested, prefix, ignore_list):
|
||||
field = {}
|
||||
|
||||
flatten(True, nested, field, prefix, ignore_list)
|
||||
|
||||
return field
|
||||
|
||||
|
||||
def flatten(top, nested, flatdict, prefix, ignore_list):
|
||||
def assign(newKey, data, toignore):
|
||||
if toignore:
|
||||
if isinstance(data, (dict, list, tuple,)):
|
||||
json_data = json.dumps(data)
|
||||
flatdict[newKey] = json_data
|
||||
else:
|
||||
flatdict[newKey] = data
|
||||
else:
|
||||
if isinstance(data, (dict, list, tuple,)):
|
||||
flatten(False, data, flatdict, newKey, ignore_list)
|
||||
else:
|
||||
flatdict[newKey] = data
|
||||
|
||||
if isinstance(nested, dict):
|
||||
for key, value in nested.items():
|
||||
ok = match_key(ignore_list, key)
|
||||
if ok and prefix == "":
|
||||
assign(key, value, True)
|
||||
elif ok and prefix != "":
|
||||
newKey = create_key(top, prefix, key)
|
||||
assign(newKey, value, True)
|
||||
else:
|
||||
newKey = create_key(top, prefix, key)
|
||||
assign(newKey, value, False)
|
||||
|
||||
elif isinstance(nested, (list, tuple,)):
|
||||
for index, value in enumerate(nested):
|
||||
if isinstance(value, dict):
|
||||
for key1, value1 in value.items():
|
||||
ok = match_key(ignore_list, key1)
|
||||
if ok:
|
||||
subkey = str(index) + "." + key1
|
||||
newkey = create_key(top, prefix, subkey)
|
||||
assign(newkey, value1, True)
|
||||
else:
|
||||
newkey = create_key(top, prefix, str(index))
|
||||
assign(newkey, value, False)
|
||||
|
||||
else:
|
||||
newkey = create_key(top, prefix, str(index))
|
||||
assign(newkey, value, False)
|
||||
|
||||
else:
|
||||
return ("Not a Valid input")
|
||||
|
||||
|
||||
def create_key(top, prefix, subkey):
|
||||
key = prefix
|
||||
if top:
|
||||
key += subkey
|
||||
else:
|
||||
key += "." + subkey
|
||||
|
||||
return key
|
||||
|
||||
|
||||
def match_key(ignorelist, value):
|
||||
for element in ignorelist:
|
||||
if element == value:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
while conditions[MODE]():
|
||||
try:
|
||||
for ENDPOINT in ENDPOINT_LIST:
|
||||
r = requests.get(url=API_URL + ENDPOINT) if SECURE == False else get(url=API_URL + ENDPOINT, headers={
|
||||
'Content-Type': 'application/json'}, verify=False, pkcs12_filename=CERT_FILE, pkcs12_password=CERT_PASS)
|
||||
received_response = r.json()
|
||||
flat_response = flattening(received_response, "", [])
|
||||
current_time = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
|
||||
points = [{
|
||||
"measurement": ENDPOINT,
|
||||
"tags": {},
|
||||
"time": current_time,
|
||||
"fields": flat_response
|
||||
}]
|
||||
logger.info(ENDPOINT, extra=received_response)
|
||||
iclient.write_points(points)
|
||||
if IKEY != "REPLACE_ME":
|
||||
handler.flush()
|
||||
count += 1
|
||||
except Exception as e:
|
||||
# this will send an exception to the Application Insights Logs
|
||||
logging.exception("Code ran into an unforseen exception!", sys.exc_info()[0])
|
||||
|
||||
time.sleep(SLEEP_INTERVAL)
|
||||
|
||||
# logging shutdown will cause a flush of all un-sent telemetry items
|
||||
logging.shutdown()
|
||||
|
|
@ -0,0 +1,17 @@
|
|||
applicationinsights==0.11.9
|
||||
certifi==2020.6.20
|
||||
cffi==1.14.0
|
||||
chardet==3.0.4
|
||||
cryptography==2.9.2
|
||||
idna==2.10
|
||||
influxdb==5.3.0
|
||||
JSON-log-formatter==0.3.0
|
||||
msgpack==0.6.1
|
||||
pycparser==2.20
|
||||
pyOpenSSL==19.1.0
|
||||
python-dateutil==2.8.1
|
||||
pytz==2020.1
|
||||
requests==2.24.0
|
||||
requests-pkcs12==1.7
|
||||
six==1.15.0
|
||||
urllib3==1.25.9
|
Загрузка…
Ссылка в новой задаче