initial abnormal security draft

This commit is contained in:
Tze Yang Ng 2021-09-30 16:43:31 +08:00
Родитель af76a8b43e
Коммит 1bb8911572
12 изменённых файлов: 481 добавлений и 0 удалений

Просмотреть файл

@ -0,0 +1,121 @@
# This function is not intended to be invoked directly. Instead it will be
# triggered by an HTTP starter function.
# Before running this sample, please:
# - create a Durable activity function (default name is "Hello")
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt
import datetime
import logging
import asyncio
import azure.durable_functions as df
from .soar_connector_async import AbnormalSoarConnectorAsync
from .sentinel_connector_async import AzureSentinelConnectorAsync
API_TOKEN ="****"
SENTINEL_WORKSPACE_ID="****"
SENTINEL_SHARED_KEY="****"
LOG_ANALYTICS_URI = 'https://' + SENTINEL_WORKSPACE_ID + '.ods.opinsights.azure.com'
def orchestrator_function(context: df.DurableOrchestrationContext):
logging.info(f"Executing orchestrator function")
datetimeEntityId = df.EntityId("SoarDatetimeEntity", "latestDatetime")
stored_datetime = yield context.call_entity(datetimeEntityId, "get")
logging.info(f"retrieved stored datetime: {stored_datetime}")
# stored_datetime = "2020-08-01T01:01:01Z"
# threatCacheEntity = df.EntityId("SoarCacheEntity", Cacher.THREAT_MESSAGE_CACHE_KEY)
# message_id_cache = yield context.call_entity(threatCacheEntity, "get")
# logging.info(f"current message_id_cache: {message_id_cache}")
# caseCacheEntity = df.EntityId("SoarCacheEntity", Cacher.CASE_CACHE_KEY)
# case_id_cache = yield context.call_entity(caseCacheEntity, "get")
# logging.info(f"current case_id_cache: {case_id_cache}")
# dfCacher = Cacher(message_id_cache, case_id_cache)
current_datetime=datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
asyncio.run(transfer_abnormal_data_to_sentinel(stored_datetime, current_datetime))
# context.signal_entity(threatCacheEntity, "set", dfCacher.new_message_ids)
# logging.info(f"set new_message_ids to {dfCacher.new_message_ids}")
# context.signal_entity(caseCacheEntity, "set", dfCacher.new_case_ids)
# logging.info(f"set new_case_ids to {dfCacher.new_case_ids}")
context.signal_entity(datetimeEntityId, "set", current_datetime)
logging.info(f"set last_datetime to {current_datetime}")
async def transfer_abnormal_data_to_sentinel(stored_datetime, current_datetime):
queue = asyncio.Queue()
api_connector = AbnormalSoarConnectorAsync(API_TOKEN)
sentinel_connector = AzureSentinelConnectorAsync(LOG_ANALYTICS_URI,SENTINEL_WORKSPACE_ID, SENTINEL_SHARED_KEY)
threat_message_producer = asyncio.create_task(api_connector.get_all_threat_messages(stored_datetime, current_datetime, queue))
cases_producer = asyncio.create_task(api_connector.get_all_cases(stored_datetime, current_datetime, queue))
consumers = [asyncio.create_task(consume(sentinel_connector, queue)) for _ in range(3)]
await asyncio.gather(threat_message_producer, cases_producer)
await queue.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()
await sentinel_connector.flushall()
# async def transfer_abnormal_data_to_sentinel(stored_datetime, current_datetime, dfCacher):
# queue = asyncio.Queue()
# api_connector = AbnormalSoarConnectorAsync(API_TOKEN)
# sentinel_connector = AzureSentinelConnectorAsync(LOG_ANALYTICS_URI,SENTINEL_WORKSPACE_ID, SENTINEL_SHARED_KEY)
# threat_message_producer = asyncio.create_task(api_connector.get_all_threat_messages(stored_datetime, current_datetime, queue, caching_func=dfCacher.cache_threat_message_ids))
# cases_producer = asyncio.create_task(api_connector.get_all_cases(stored_datetime, current_datetime, queue, caching_func=dfCacher.cache_cases_ids))
# consumers = [asyncio.create_task(consume(sentinel_connector, queue)) for _ in range(3)]
# await asyncio.gather(threat_message_producer, cases_producer)
# await queue.join() # Implicitly awaits consumers, too
# for c in consumers:
# c.cancel()
# await sentinel_connector.flushall()
async def consume(sentinel_connector, queue):
while True:
message = await queue.get()
try:
await sentinel_connector.send(message)
except Exception as e:
logging.error(f"Sentinel send request Failed. Err: {e}")
queue.task_done()
class Cacher:
THREAT_MESSAGE_CACHE_KEY = "threat_message_id_cache"
CASE_CACHE_KEY = "case_id_cache"
def __init__(self, message_id_cache, case_id_cache) -> None:
self.message_id_cache = set(message_id_cache) if message_id_cache else set()
self.new_message_ids = []
self.case_id_cache = set(case_id_cache) if case_id_cache else set()
self.new_case_ids = []
def cache_threat_message_ids(self, message_ids):
# return message_ids
new_message_ids = []
for id in message_ids:
if id not in self.message_id_cache:
self.message_id_cache.add(id)
self.new_message_ids.append(id)
new_message_ids.append(id)
return new_message_ids
def cache_cases_ids(self, case_ids):
# return message_ids
new_case_ids = []
for id in case_ids:
if id not in self.case_id_cache:
self.case_id_cache.add(id)
self.new_case_ids.append(id)
new_case_ids.append(id)
return new_case_ids
main = df.Orchestrator.create(orchestrator_function)

Просмотреть файл

@ -0,0 +1,10 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "orchestrationTrigger",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,109 @@
import datetime
import logging
import json
import hashlib
import hmac
import base64
import aiohttp
import asyncio
from collections import deque, defaultdict
class AzureSentinelConnectorAsync:
def __init__(self, log_analytics_uri, workspace_id, shared_key, queue_size=1000, queue_size_bytes=25 * (2**20)):
self.log_analytics_uri = log_analytics_uri
self.workspace_id = workspace_id
self.shared_key = shared_key
self.queue_size = queue_size
self.queue_size_bytes = queue_size_bytes
self._queue = defaultdict(deque)
self.successfull_sent_events_number = 0
self.failed_sent_events_number = 0
self.lock = asyncio.Lock()
async def send(self, event):
events = None
async with self.lock:
log_type, event_data = event
log_type_queue = self._queue[log_type]
log_type_queue.append(event_data)
if len(log_type_queue) >= self.queue_size:
events = list(log_type_queue)
log_type_queue.clear()
if events:
await self._flush(events, log_type)
async def flushall(self):
logging.info("FLUSHING ALL queues")
for log_type, queue in self._queue.items():
await self._flush(list(queue), log_type)
async def _flush(self, data: list, log_type: str):
if data:
data = self._split_big_request(data)
logging.info(f"FLUSHING {log_type}")
async with aiohttp.ClientSession() as session:
await asyncio.gather(*[self._post_data(session, self.workspace_id, self.shared_key, d, log_type) for d in data])
def _build_signature(self, workspace_id, shared_key, date, content_length, method, content_type, resource):
x_headers = 'x-ms-date:' + date
string_to_hash = method + "\n" + str(content_length) + "\n" + content_type + "\n" + x_headers + "\n" + resource
bytes_to_hash = bytes(string_to_hash, encoding="utf-8")
decoded_key = base64.b64decode(shared_key)
encoded_hash = base64.b64encode(hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()).decode()
authorization = "SharedKey {}:{}".format(workspace_id, encoded_hash)
return authorization
async def _post_data(self, session: aiohttp.ClientSession, workspace_id, shared_key, body, log_type):
events_number = len(body)
body = json.dumps(body)
method = 'POST'
content_type = 'application/json'
resource = '/api/logs'
rfc1123date = datetime.datetime.utcnow().strftime('%a, %d %b %Y %H:%M:%S GMT')
content_length = len(body)
signature = self._build_signature(workspace_id, shared_key, rfc1123date, content_length, method, content_type, resource)
uri = self.log_analytics_uri + resource + '?api-version=2016-04-01'
headers = {
'content-type': content_type,
'Authorization': signature,
'Log-Type': log_type,
'x-ms-date': rfc1123date
}
try_number = 1
while True:
try:
await self._make_request(session, uri, body, headers)
except Exception as err:
if try_number < 3:
logging.warning('Error while sending data to Azure Sentinel. Try number: {}. Trying one more time. {}'.format(try_number, err))
await asyncio.sleep(try_number)
try_number += 1
else:
logging.error(str(err))
self.failed_sent_events_number += events_number
raise err
else:
logging.info('{} events have been successfully sent to Azure Sentinel'.format(events_number))
self.successfull_sent_events_number += events_number
break
async def _make_request(self, session, uri, body, headers):
async with session.post(uri, data=body, headers=headers) as response:
if not (200 <= response.status <= 299):
raise Exception("Error during sending events to Azure Sentinel. Response code: {}".format(response.status))
def _check_size(self, queue):
data_bytes_len = len(json.dumps(queue).encode())
return data_bytes_len < self.queue_size_bytes
def _split_big_request(self, queue):
if self._check_size(queue):
return [queue]
else:
middle = int(len(queue) / 2)
queues_list = [queue[:middle], queue[middle:]]
return self._split_big_request(queues_list[0]) + self._split_big_request(queues_list[1])

Просмотреть файл

@ -0,0 +1,137 @@
import json
from urllib.parse import urlencode
from enum import Enum
import aiohttp
import logging
import asyncio
class Resources(Enum):
threats = 0
cases = 1
class FilterParam(Enum):
receivedTime = 0
createdTime = 1
class AbnormalSoarConnectorAsync:
BASEURL = "https://api.abnormalplatform.com/v1"
MAP_RESOURCE_TO_LOGTYPE = {
Resources.threats: "ABNORMAL_THREAT_MESSAGES",
Resources.cases: "ABNORMAL_CASES"
}
def __init__(self, api_key, num_consumers=10) -> None:
self.api_key = api_key
self.num_consumers = num_consumers
def _get_header(self):
"""
returns header for all HTTP requests to Abnormal Security's API
"""
return {"Authorization": f"Bearer {self.api_key}"}
def _get_filter_query(self, filter_param, gte_datetime=None, lte_datetime=None):
"""
Receives an offset and determines if a commit should be done
to the Kafka consumer. If a commit should be done, it will return the offset
to commit. If not, it returns None.
Args:
offset (int): The record offset that is completed.
"""
filter_string = f'{filter_param.name}'
if gte_datetime:
filter_string += ' ' + f'gte {gte_datetime}'
if lte_datetime:
filter_string += ' ' + f'lte {lte_datetime}'
return {
'filter': filter_string,
}
def _get_all_ids_url(self, resource, query_dict):
return f"{self.BASEURL}/{resource.name}?{urlencode(query_dict)}"
def _get_object_url(self, resource_name, resource_id):
return f"{self.BASEURL}/{resource_name}/{resource_id}"
def _extract_messages(self, threat):
return threat.get("messages")
def _extract_message_ids(self, threats_resp):
return [threat.get("threatId") for threat in threats_resp.get('threats', [])]
def _extract_case_ids(self, cases_resp):
return [case.get("caseId") for case in cases_resp.get('cases', [])]
async def _make_request(self, session, url, headers):
async with session.get(url, headers=headers) as response:
if not (200 <= response.status <= 299):
raise Exception(
"Error during sending events to Abnormal SOAR API. Response code: {}. Text:{}".format(response.status, await response.text()))
await asyncio.sleep(1)
return json.loads(await response.text())
async def _send_request(self, session, url):
attempts = 1
while True:
try:
response_data = await self._make_request(session, url, self._get_header())
except Exception as e:
if attempts < 3:
logging.warning(f'Error while getting data to Abnormal Soar API. Attempt:{attempts}. Err: {e}')
await asyncio.sleep(3)
attempts += 1
else:
logging.error(f"Abnormal Soar API request Failed. Err: {e}")
raise e
else:
return response_data
async def generate_resource_ids(self, session, resource, query_dict, output_queue, post_processing_func=lambda x:[x]):
nextPageNumber = 1
while nextPageNumber:
query_dict["pageNumber"] = nextPageNumber
response_data = await self._send_request(session, self._get_all_ids_url(resource, query_dict))
for id in post_processing_func(response_data):
await output_queue.put(id)
nextPageNumber = response_data.get("nextPageNumber")
async def process_resource_ids(self, session, resource, input_queue, output_queue, post_processing_func=lambda x:[x]):
resource_log_type = self.MAP_RESOURCE_TO_LOGTYPE[resource]
while True:
current_id = await input_queue.get()
try:
response_data = await self._send_request(session, self._get_object_url(resource.name, current_id))
except Exception:
logging.error(f"Discarding enqueued resource id: {current_id}")
else:
for output in post_processing_func(response_data):
await output_queue.put((resource_log_type, output))
input_queue.task_done()
async def get_all_threat_messages(self, gte_datetime, lte_datetime, output_queue, caching_func=None):
intermediate_queue = asyncio.Queue()
async with aiohttp.ClientSession() as session:
filter_query = self._get_filter_query(FilterParam.receivedTime, gte_datetime, lte_datetime)
producer_post_process_func = lambda x: caching_func(self._extract_message_ids(x)) if caching_func else self._extract_message_ids(x)
producer = asyncio.create_task(self.generate_resource_ids(session, Resources.threats, filter_query, intermediate_queue, producer_post_process_func))
consumers = [asyncio.create_task(self.process_resource_ids(session, Resources.threats, intermediate_queue, output_queue, self._extract_messages)) for _ in range(self.num_consumers)]
await asyncio.gather(producer)
await intermediate_queue.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()
async def get_all_cases(self, gte_datetime, lte_datetime, output_queue, caching_func=None):
intermediate_queue = asyncio.Queue()
async with aiohttp.ClientSession() as session:
filter_query = self._get_filter_query(FilterParam.createdTime, gte_datetime, lte_datetime)
producer_post_process_func = lambda x: caching_func(self._extract_case_ids(x)) if caching_func else self._extract_case_ids(x)
producer = asyncio.create_task(self.generate_resource_ids(session, Resources.cases, filter_query, intermediate_queue, producer_post_process_func))
consumers = [asyncio.create_task(self.process_resource_ids(session, Resources.cases, intermediate_queue, output_queue)) for _ in range(self.num_consumers)]
await asyncio.gather(producer)
await intermediate_queue.join() # Implicitly awaits consumers, too
for c in consumers:
c.cancel()

Просмотреть файл

@ -0,0 +1,18 @@
# This function an HTTP starter function for Durable Functions.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable activity function (default name is "Hello")
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt
import logging
import azure.functions as func
import azure.durable_functions as df
async def main(mytimer: func.TimerRequest, starter: str):
logging.info("in starting")
client = df.DurableOrchestrationClient(starter)
instance_id = await client.start_new("SentinelFunctionsOrchestrator")
logging.info(f"Started orchestration with ID = '{instance_id}'.")

Просмотреть файл

@ -0,0 +1,16 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "mytimer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */5 * * * *"
},
{
"name": "starter",
"type": "durableClient",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,20 @@
import azure.durable_functions as df
import datetime
min_time = "2021-01-01T00:00:00Z"
def entity_function(context: df.DurableEntityContext):
current_datetime_str = context.get_state(lambda: datetime.datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ"))
operation = context.operation_name
if operation == "set":
new_timestamp_str = context.get_input()
current_datetime_str = new_timestamp_str
elif operation == "reset":
current_datetime_str = min_time
elif operation == "get":
context.set_result(current_datetime_str)
context.set_state(current_datetime_str)
main = df.Entity.create(entity_function)

Просмотреть файл

@ -0,0 +1,10 @@
{
"scriptFile": "__init__.py",
"bindings": [
{
"name": "context",
"type": "entityTrigger",
"direction": "in"
}
]
}

Просмотреть файл

@ -0,0 +1,15 @@
{
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[2.*, 3.0.0)"
}
}

Просмотреть файл

@ -0,0 +1,14 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=sentinelfunctionapp;AccountKey=UsdYdamyjMf6kRExk4WseAmYzLdq8qVI0eYCmepVqlb7fbbJhiDd/ozTveASXrkl4eWpoZzB2JzzNuvOiGA05A==;EndpointSuffix=core.windows.net",
"FUNCTIONS_WORKER_RUNTIME": "python",
"AzureWebJobs.TimerTriggerSOAR.Disabled": "true",
"AzureWebJobs.HttpTrigger2.Disabled": "true",
"AzureWebJobs.DurableFunctionsHttpStart2.Disabled": "true",
"AzureWebJobs.SoarTimerTrigger.Disabled": "true",
"AzureWebJobs.HttpTrigger1.Disabled": "true",
"AzureWebJobs.DurableFunctionsHttpReset.Disabled": "true",
"AzureWebJobs.SentinelFunctionsOrchestrator.Disabled": "false"
}
}

Просмотреть файл

@ -0,0 +1,4 @@
{
"$schema": "http://json.schemastore.org/proxies",
"proxies": {}
}

Просмотреть файл

@ -0,0 +1,7 @@
# DO NOT include azure-functions-worker in this file
# The Python Worker is managed by Azure Functions platform
# Manually managing azure-functions-worker may cause unexpected issues
azure-functions
azure-functions-durable
requests