gcp monitor - add function app

This commit is contained in:
Vitalii Uslystyi 2021-06-16 16:34:21 +03:00
Родитель 4406eca0b7
Коммит 78eb16c39a
7 изменённых файлов: 318 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,12 @@
{
"scriptFile": "main.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *",
"runOnStartup": false
}
]
}

Просмотреть файл

@ -0,0 +1,170 @@
import os
import asyncio
import logging
import time
import re
import azure.functions as func
from google.cloud.monitoring_v3.services.metric_service import MetricServiceAsyncClient
from google.cloud.monitoring_v3.types import TimeInterval
from google.cloud.monitoring_v3.types import TimeSeries
from google.protobuf.json_format import MessageToDict
from .sentinel_connector_async import AzureSentinelConnectorAsync
from .state_manager import StateManager
logging.getLogger('azure.core.pipeline.policies.http_logging_policy').setLevel(logging.ERROR)
CREDENTIALS_FILE_CONTENT = os.environ['GCP_CREDENTIALS_FILE_CONTENT']
GCP_PROJECT_ID_LIST = os.environ['GCP_PROJECT_ID']
METRICS = os.environ['GCP_METRICS']
WORKSPACE_ID = os.environ['WORKSPACE_ID']
SHARED_KEY = os.environ['SHARED_KEY']
LOG_TYPE = 'GCP_MONITORING'
CREDS_FILE_PATH = os.path.join(os.path.dirname(os.path.realpath(__file__)), '.creds')
MAX_SEARCH_PERIOD_MINUTES = 120
DEFAULT_SEARCH_PERIOD_MINUTES = 5
LOG_ANALYTICS_URI = os.environ.get('logAnalyticsUri')
if not LOG_ANALYTICS_URI or str(LOG_ANALYTICS_URI).isspace():
LOG_ANALYTICS_URI = 'https://' + WORKSPACE_ID + '.ods.opinsights.azure.com'
pattern = r'https:\/\/([\w\-]+)\.ods\.opinsights\.azure.([a-zA-Z\.]+)$'
match = re.match(pattern, str(LOG_ANALYTICS_URI))
if not match:
raise Exception("Invalid Log Analytics Uri.")
async def main(mytimer: func.TimerRequest):
logging.info('Starting script')
create_credentials_file()
projects = GCP_PROJECT_ID_LIST.split()
metrics = METRICS.split()
state_manager = StateManager(os.environ['AzureWebJobsStorage'])
logging.info('Getting last timestamp')
last_ts = state_manager.get()
logging.info('Last timestamp: {}'.format(last_ts))
time_interval, last_ts = gen_time_interval(last_ts)
cors = []
client = MetricServiceAsyncClient()
for project in projects:
for metric in metrics:
cors.append(process_metric(client, project, metric, time_interval))
await asyncio.gather(*cors)
logging.info('Saving last timestamp: {}'.format(last_ts))
state_manager.post(str(last_ts))
remove_credentials_file()
logging.info('Script finished')
def gen_time_interval(last_ts: int):
now = int(time.time())
if last_ts:
try:
last_ts = int(last_ts)
except Exception:
last_ts = None
if last_ts:
last_ts = int(last_ts)
if now - last_ts < MAX_SEARCH_PERIOD_MINUTES * 60:
start_time = last_ts
logging.info('Getting data from {}'.format(start_time))
else:
start_time = now - MAX_SEARCH_PERIOD_MINUTES * 60
logging.warning('Last timestamp is too old. Getting data from {}'.format(start_time))
else:
start_time = now - DEFAULT_SEARCH_PERIOD_MINUTES * 60
logging.info('Last timestamp is not known. Getting data from {}'.format(start_time))
end_time = now - 30
interval = {
"start_time": {"seconds": start_time, "nanos": 0},
"end_time": {"seconds": end_time, "nanos": 0}
}
logging.info('Getting data for interval: {}'.format(interval))
return TimeInterval(interval), end_time
async def process_metric(client: MetricServiceAsyncClient, project_name: str, metric_type: str, time_interval: TimeInterval):
logging.info('Start processing metric: {} in {}'.format(metric_type, project_name))
sentinel = AzureSentinelConnectorAsync(LOG_ANALYTICS_URI, WORKSPACE_ID, SHARED_KEY, LOG_TYPE, queue_size=10000)
metric_string = 'metric.type = ' + '"' + metric_type + '"'
time_series_async_iterator = await client.list_time_series(name=project_name, filter=metric_string, interval=time_interval)
async for time_series in time_series_async_iterator:
async for event in parse_time_series(time_series):
await sentinel.send(event)
await sentinel.flush()
logging.info('Finish processing metric: {} in {}. Sent events: {}'.format(metric_type, project_name, sentinel.successfull_sent_events_number))
async def parse_time_series(time_series: TimeSeries):
d = MessageToDict(time_series._pb)
points = d.pop('points', [])
for p in points:
p = parse_point(p)
event = {}
event.update(d)
event.update(p)
yield event
def parse_bool(val):
if val == 'true':
val = True
elif val == 'false':
val = False
return val
def parse_point(point: dict):
parse_functions = {
'boolValue': parse_bool,
'int64Value': int,
'doubleValue': float
}
if 'value' in point:
for field in point['value']:
if field in parse_functions:
try:
val = point['value'][field]
func = parse_functions[field]
point['value'][field] = func(val)
except Exception:
pass
return point
def create_credentials_file():
with open(CREDS_FILE_PATH, 'w') as f:
content = CREDENTIALS_FILE_CONTENT.strip().replace('\n', '\\n')
f.write(content)
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = CREDS_FILE_PATH
def remove_credentials_file():
if os.path.exists(CREDS_FILE_PATH):
os.remove(CREDS_FILE_PATH)

Просмотреть файл

@ -0,0 +1,91 @@
import datetime
import logging
import json
import hashlib
import hmac
import base64
import aiohttp
import asyncio
from collections import deque
class AzureSentinelConnectorAsync:
def __init__(self, log_analytics_uri, workspace_id, shared_key, log_type, queue_size=1000, queue_size_bytes=25 * (2**20)):
self.log_analytics_uri = log_analytics_uri
self.workspace_id = workspace_id
self.shared_key = shared_key
self.log_type = log_type
self.queue_size = queue_size
self.queue_size_bytes = queue_size_bytes
self._queue = deque()
self.successfull_sent_events_number = 0
self.failed_sent_events_number = 0
self.lock = asyncio.Lock()
async def send(self, event):
events = None
async with self.lock:
self._queue.append(event)
if len(self._queue) >= self.queue_size:
events = list(self._queue)
self._queue.clear()
if events:
await self._flush(events)
async def flush(self):
await self._flush(list(self._queue))
async def _flush(self, data: list):
if data:
data = self._split_big_request(data)
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[self._post_data(session, self.workspace_id, self.shared_key, d, self.log_type) for d in data])
def _build_signature(self, workspace_id, shared_key, date, content_length, method, content_type, resource):
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(workspace_id, encoded_hash)
return authorization
async def _post_data(self, session: aiohttp.ClientSession, workspace_id, shared_key, body, log_type):
logging.debug('Start sending data to sentinel')
events_number = len(body)
body = json.dumps(body)
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = self._build_signature(workspace_id, shared_key, rfc1123date, content_length, method, content_type, resource)
uri = self.log_analytics_uri + resource + '?api-version=2016-04-01'
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': log_type,
'x-ms-date': rfc1123date
}
async with session.post(uri, data=body, headers=headers) as response:
if (response.status >= 200 and response.status <= 299):
logging.debug('{} events have been successfully sent to Azure Sentinel'.format(events_number))
self.successfull_sent_events_number += events_number
else:
logging.error("Error during sending events to Azure Sentinel. Response code: {}".format(response.status))
self.failed_sent_events_number += events_number
raise Exception("Error during sending events to Azure Sentinel. Response code: {}".format(response.status))
def _check_size(self, queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < self.queue_size_bytes
def _split_big_request(self, queue):
if self._check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return self._split_big_request(queues_list[0]) + self._split_big_request(queues_list[1])

Просмотреть файл

@ -0,0 +1,22 @@
from azure.storage.fileshare import ShareClient
from azure.storage.fileshare import ShareFileClient
from azure.core.exceptions import ResourceNotFoundError
class StateManager:
def __init__(self, connection_string, share_name='funcstatemarkershare', file_path='funcstatemarkerfile'):
self.share_cli = ShareClient.from_connection_string(conn_str=connection_string, share_name=share_name)
self.file_cli = ShareFileClient.from_connection_string(conn_str=connection_string, share_name=share_name, file_path=file_path)
def post(self, marker_text: str):
try:
self.file_cli.upload_file(marker_text)
except ResourceNotFoundError:
self.share_cli.create_share()
self.file_cli.upload_file(marker_text)
def get(self):
try:
return self.file_cli.download_file().readall().decode()
except ResourceNotFoundError:
return None

Просмотреть файл

@ -0,0 +1,15 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[1.*, 2.0.0)"
}
}

Просмотреть файл

@ -0,0 +1,4 @@
{
"$schema": "http://json.schemastore.org/proxies",
"proxies": {}
}

Просмотреть файл

@ -0,0 +1,4 @@
google-cloud-monitoring==2.2.1
azure-functions==1.6.0
aiohttp==3.7.4.post0
azure-storage-file-share==12.5.0