This commit is contained in:
Greg Oliver 2019-02-20 12:39:48 +00:00
Родитель b9cfd888a8
Коммит 26ff8f7ab3
4 изменённых файлов: 175 добавлений и 186 удалений

Просмотреть файл

@ -1,7 +1,9 @@
using Microsoft.Azure.WebJobs;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Text;
using System.Threading.Tasks;
@ -10,6 +12,8 @@ namespace nsgFunc
{
public partial class Util
{
const int MAXTRANSMISSIONSIZE = 512 * 1024;
public static string GetEnvironmentVariable(string name)
{
var result = System.Environment.GetEnvironmentVariable(name, System.EnvironmentVariableTarget.Process);
@ -125,6 +129,114 @@ namespace nsgFunc
}
static IEnumerable<List<DenormalizedRecord>> denormalizedRecords(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingList = new List<DenormalizedRecord>(450);
var sizeOfListItems = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedRecord = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
var sizeOfDenormalizedRecord = denormalizedRecord.GetSizeOfObject();
if (sizeOfListItems + sizeOfDenormalizedRecord > MAXTRANSMISSIONSIZE + 20)
{
yield return outgoingList;
outgoingList.Clear();
sizeOfListItems = 0;
}
outgoingList.Add(denormalizedRecord);
sizeOfListItems += sizeOfDenormalizedRecord;
}
}
}
}
if (sizeOfListItems > 0)
{
yield return outgoingList;
}
}
/// <summary>
/// input newClientContent is a string representation of a json array of records, each of which is a nsg flow log hierarchy
/// output is a List of SplunkEventMessage, up to a max # of bytes or 450 elements
/// </summary>
/// <param name="newClientContent"></param>
/// <param name="errorRecordBinder"></param>
/// <param name="log"></param>
/// <returns></returns>
static IEnumerable<List<SplunkEventMessage>> denormalizedSplunkEvents(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingSplunkList = new List<SplunkEventMessage>(450);
var sizeOfListItems = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedRecord = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
var splunkEventMessage = new SplunkEventMessage(denormalizedRecord);
var sizeOfObject = splunkEventMessage.GetSizeOfObject();
if (sizeOfListItems + sizeOfObject > MAXTRANSMISSIONSIZE + 20 || outgoingSplunkList.Count == 450)
{
yield return outgoingSplunkList;
outgoingSplunkList.Clear();
sizeOfListItems = 0;
}
outgoingSplunkList.Add(splunkEventMessage);
sizeOfListItems += sizeOfObject;
}
}
}
}
if (sizeOfListItems > 0)
{
yield return outgoingSplunkList;
}
}
public static async Task logErrorRecord(string errorRecord, Binder errorRecordBinder, ILogger log)
{
if (errorRecordBinder == null) { return; }

Просмотреть файл

@ -5,6 +5,28 @@ using System.Text.RegularExpressions;
using Newtonsoft.Json;
using System.Collections.Generic;
class SplunkEventMessage
{
public string sourcetype { get; set; }
public DenormalizedRecord @event { get; set; }
public SplunkEventMessage (DenormalizedRecord splunkEvent)
{
sourcetype = "amdl:nsg:flowlogs";
@event = splunkEvent;
}
public int GetSizeOfObject()
{
return sourcetype.Length + 10 + 6 + (@event == null ? 0 : @event.GetSizeOfObject());
}
}
class SplunkEventMessages
{
public List<SplunkEventMessage> splunkEventMessages { get; set; }
}
class DenormalizedRecord
{
public string time { get; set; }
@ -72,7 +94,7 @@ class DenormalizedRecord
});
}
public int SizeOfObject()
public int GetSizeOfObject()
{
int objectSize = 0;

Просмотреть файл

@ -12,9 +12,6 @@ namespace nsgFunc
{
public partial class Util
{
const int MAXTRANSMISSIONSIZE = 512 * 1024;
//const int MAXTRANSMISSIONSIZE = 2 * 1024;
private static Lazy<EventHubClient> LazyEventHubConnection = new Lazy<EventHubClient>(() =>
{
string EventHubConnectionString = GetEnvironmentVariable("eventHubConnection");
@ -79,54 +76,5 @@ namespace nsgFunc
yield return outgoingJson;
}
}
static IEnumerable<List<DenormalizedRecord>> denormalizedRecords(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingList = new List<DenormalizedRecord>(450);
var sizeOfListItems = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedRecord = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
var sizeOfDenormalizedRecord = denormalizedRecord.SizeOfObject();
if (sizeOfListItems + sizeOfDenormalizedRecord > MAXTRANSMISSIONSIZE + 20)
{
yield return outgoingList;
outgoingList.Clear();
sizeOfListItems = 0;
}
outgoingList.Add(denormalizedRecord);
sizeOfListItems += sizeOfDenormalizedRecord;
}
}
}
}
if (sizeOfListItems > 0)
{
yield return outgoingList;
}
}
}
}

Просмотреть файл

@ -41,130 +41,52 @@ namespace nsgFunc
ServicePointManager.SecurityProtocol = SecurityProtocolType.Tls12;
ServicePointManager.ServerCertificateValidationCallback += new RemoteCertificateValidationCallback(ValidateMyCert);
var transmission = new StringBuilder();
foreach (var message in convertToSplunk(newClientContent, null, log))
{
//
// message looks like this:
//
// {
// "time": "xxx",
// "category": "xxx",
// "operationName": "xxx",
// "version": "xxx",
// "deviceExtId": "xxx",
// "flowOrder": "xxx",
// "nsgRuleName": "xxx",
// "dmac|smac": "xxx",
// "rt": "xxx",
// "src": "xxx",
// "dst": "xxx",
// "spt": "xxx",
// "dpt": "xxx",
// "proto": "xxx",
// "deviceDirection": "xxx",
// "act": "xxx"
// }
transmission.Append(GetSplunkEventFromMessage(message));
}
int bytesSent = 0;
var client = new SingleHttpClientInstance();
try
foreach (var transmission in convertToSplunkList(newClientContent, log))
{
HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, splunkAddress);
req.Headers.Accept.Clear();
req.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
req.Headers.Add("Authorization", "Splunk " + splunkToken);
req.Content = new StringContent(transmission.ToString(), Encoding.UTF8, "application/json");
HttpResponseMessage response = await SingleHttpClientInstance.SendToSplunk(req);
if (response.StatusCode != HttpStatusCode.OK)
var client = new SingleHttpClientInstance();
try
{
throw new System.Net.Http.HttpRequestException($"StatusCode from Splunk: {response.StatusCode}, and reason: {response.ReasonPhrase}");
}
}
catch (System.Net.Http.HttpRequestException e)
{
throw new System.Net.Http.HttpRequestException("Sending to Splunk. Is Splunk service running?", e);
}
catch (Exception f)
{
throw new System.Exception("Sending to Splunk. Unplanned exception.", f);
}
return transmission.Length;
}
static System.Collections.Generic.IEnumerable<string> convertToSplunk(string newClientContent, Binder errorRecordBinder, ILogger log)
{
//
// newClientContent looks like this:
//
// {
// "records":[
// {...},
// {...}
// ...
// ]
// }
//
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
string logIncomingJSON = Util.GetEnvironmentVariable("logIncomingJSON");
Boolean flag;
if (Boolean.TryParse(logIncomingJSON, out flag))
{
if (flag)
{
logErrorRecord(newClientContent, errorRecordBinder, log).Wait();
}
}
var sbBase = new StringBuilder();
foreach (var record in logs.records)
{
float version = record.properties.Version;
sbBase.Clear();
sbBase.Append("{");
sbBase.Append(eqs("time", true)).Append(eqs(record.time));
sbBase.Append(eqs(true, "category")).Append(eqs(record.category));
sbBase.Append(eqs(true, "operationName")).Append(eqs(record.operationName));
sbBase.Append(eqs(true, "version")).Append(eqs(version.ToString("0.0")));
sbBase.Append(eqs(true, "deviceExtId")).Append(eqs(record.MakeDeviceExternalID()));
int count = 1;
var sbOuterFlowRecord = new StringBuilder();
foreach (var outerFlows in record.properties.flows)
{
sbOuterFlowRecord.Clear();
sbOuterFlowRecord.Append(sbBase.ToString());
sbOuterFlowRecord.Append(eqs(true, "flowOrder")).Append(eqs(count.ToString()));
sbOuterFlowRecord.Append(eqs(true, "nsgRuleName")).Append(eqs(outerFlows.rule));
var sbInnerFlowRecord = new StringBuilder();
foreach (var innerFlows in outerFlows.flows)
HttpRequestMessage req = new HttpRequestMessage(HttpMethod.Post, splunkAddress);
req.Headers.Accept.Clear();
req.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json"));
req.Headers.Add("Authorization", "Splunk " + splunkToken);
req.Content = new StringContent(transmission, Encoding.UTF8, "application/json");
HttpResponseMessage response = await SingleHttpClientInstance.SendToSplunk(req);
if (response.StatusCode != HttpStatusCode.OK)
{
sbInnerFlowRecord.Clear();
sbInnerFlowRecord.Append(sbOuterFlowRecord.ToString());
var firstFlowTupleEncountered = true;
foreach (var flowTuple in innerFlows.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
if (firstFlowTupleEncountered)
{
sbInnerFlowRecord.Append((tuple.GetDirection == "I" ? eqs(true, "dmac") : eqs(true, "smac"))).Append(eqs(innerFlows.MakeMAC()));
firstFlowTupleEncountered = false;
}
yield return sbInnerFlowRecord.ToString() + tuple.JsonSubString() + "}";
}
throw new System.Net.Http.HttpRequestException($"StatusCode from Splunk: {response.StatusCode}, and reason: {response.ReasonPhrase}");
}
}
catch (System.Net.Http.HttpRequestException e)
{
throw new System.Net.Http.HttpRequestException("Sending to Splunk. Is Splunk service running?", e);
}
catch (Exception f)
{
throw new System.Exception("Sending to Splunk. Unplanned exception.", f);
}
bytesSent += transmission.Length;
}
return bytesSent;
}
static System.Collections.Generic.IEnumerable<string> convertToSplunkList(string newClientContent, ILogger log)
{
foreach (var messageList in denormalizedSplunkEvents(newClientContent, null, log))
{
StringBuilder outgoingJson = new StringBuilder(MAXTRANSMISSIONSIZE);
foreach (var message in messageList)
{
var messageAsString = JsonConvert.SerializeObject(message, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
outgoingJson.Append(messageAsString);
}
yield return outgoingJson.ToString();
}
}
@ -183,20 +105,5 @@ namespace nsgFunc
return false;
}
static string GetSplunkEventFromMessage(string message)
{
StringBuilder sb = new StringBuilder();
string json = Newtonsoft.Json.JsonConvert.SerializeObject(message);
sb.Clear();
sb.Append("{\"sourcetype\": \"nsgFlowLog\",\"event\": ").Append(json).Append("}");
return sb.ToString();
}
}
}