arcsight perf improvements. better logging.

This commit is contained in:
Greg Oliver 2019-02-25 11:57:44 +00:00
Родитель 46c9bf1cf9
Коммит 602191f88c
5 изменённых файлов: 277 добавлений и 53 удалений

Просмотреть файл

@ -17,8 +17,11 @@ namespace nsgFunc
[Table("checkpoints", Connection = "AzureWebJobsStorage")] CloudTable checkpointTable,
Binder nsgDataBlobBinder,
string subId, string resourceGroup, string nsgName, string blobYear, string blobMonth, string blobDay, string blobHour, string blobMinute, string mac,
ExecutionContext executionContext,
ILogger log)
{
log.LogInformation($"BlobTriggerIngestAndTransmit triggered: {executionContext.InvocationId} ");
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
if (nsgSourceDataAccount.Length == 0)
{
@ -81,7 +84,7 @@ namespace nsgFunc
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages, 1, (int)(dataLength - 1));
} else
{
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages);
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages, 0, (int)dataLength);
}
}
catch (Exception ex)
@ -99,7 +102,7 @@ namespace nsgFunc
try
{
int bytesSent = await Util.SendMessagesDownstreamAsync(nsgMessagesString, log);
int bytesSent = await Util.SendMessagesDownstreamAsync(nsgMessagesString, executionContext, log);
log.LogInformation($"Sending {nsgMessagesString.Length} bytes (denormalized to {bytesSent} bytes) downstream via output binding {outputBinding}.");
}
catch (Exception ex)

Просмотреть файл

@ -24,7 +24,7 @@ namespace nsgFunc
return result;
}
public static async Task<int> SendMessagesDownstreamAsync(string nsgMessagesString, ILogger log)
public static async Task<int> SendMessagesDownstreamAsync(string nsgMessagesString, ExecutionContext executionContext, ILogger log)
{
//
// nsgMessagesString looks like this:
@ -85,7 +85,7 @@ namespace nsgFunc
await Util.obLogstash(newClientContent, log);
break;
case "arcsight":
await Util.obArcsight(newClientContent, log);
bytesSent = await Util.obArcsightNew(newClientContent, executionContext, log);
break;
case "splunk":
bytesSent = await Util.obSplunk(newClientContent, log);
@ -174,7 +174,7 @@ namespace nsgFunc
innerFlow.mac,
tuple);
var sizeOfDenormalizedRecord = denormalizedRecord.GetSizeOfObject();
var sizeOfDenormalizedRecord = denormalizedRecord.GetSizeOfJSONObject();
if (sizeOfListItems + sizeOfDenormalizedRecord > MAXTRANSMISSIONSIZE + 20)
{

Просмотреть файл

@ -4,15 +4,8 @@ using System.Text;
using System.Text.RegularExpressions;
using Newtonsoft.Json;
using System.Collections.Generic;
class ByteArray
{
public byte[] b;
public ByteArray ()
{
b = new byte[1024 * 1024];
}
}
using System.Buffers;
using Microsoft.CodeAnalysis.Formatting;
class SplunkEventMessage
{
@ -27,7 +20,7 @@ class SplunkEventMessage
public int GetSizeOfObject()
{
return sourcetype.Length + 10 + 6 + (@event == null ? 0 : @event.GetSizeOfObject());
return sourcetype.Length + 10 + 6 + (@event == null ? 0 : @event.GetSizeOfJSONObject());
}
}
@ -95,15 +88,152 @@ class DenormalizedRecord
}
}
public override string ToString()
private string MakeMAC()
{
return JsonConvert.SerializeObject(this, new JsonSerializerSettings
StringBuilder sb = StringBuilderPool.Allocate();
string delimitedMac = "";
try
{
NullValueHandling = NullValueHandling.Ignore
});
sb.Append(mac.Substring(0, 2)).Append(":");
sb.Append(mac.Substring(2, 2)).Append(":");
sb.Append(mac.Substring(4, 2)).Append(":");
sb.Append(mac.Substring(6, 2)).Append(":");
sb.Append(mac.Substring(8, 2)).Append(":");
sb.Append(mac.Substring(10, 2));
delimitedMac = sb.ToString();
}
finally
{
StringBuilderPool.Free(sb);
}
return delimitedMac;
}
public int GetSizeOfObject()
private string MakeDeviceExternalID()
{
var patternSubscriptionId = "SUBSCRIPTIONS\\/(.*?)\\/";
var patternResourceGroup = "SUBSCRIPTIONS\\/(?:.*?)\\/RESOURCEGROUPS\\/(.*?)\\/";
var patternResourceName = "PROVIDERS\\/(?:.*?\\/.*?\\/)(.*?)(?:\\/|$)";
Match m = Regex.Match(resourceId, patternSubscriptionId);
var subscriptionID = m.Groups[1].Value;
m = Regex.Match(resourceId, patternResourceGroup);
var resourceGroup = m.Groups[1].Value;
m = Regex.Match(resourceId, patternResourceName);
var resourceName = m.Groups[1].Value;
return subscriptionID + "/" + resourceGroup + "/" + resourceName;
}
private string MakeCEFTime()
{
// sample input: "2017-08-09T00:13:25.4850000Z"
// sample output: Aug 09 00:13:25 host CEF:0
CultureInfo culture = new CultureInfo("en-US");
DateTime tempDate = Convert.ToDateTime(time, culture);
string newTime = tempDate.ToString("MMM dd HH:mm:ss");
return newTime + " host CEF:0";
}
private void BuildCEF(ref StringBuilder sb)
{
sb.Append(MakeCEFTime());
sb.Append("|Microsoft.Network");
sb.Append("|NETWORKSECURITYGROUPS");
sb.Append("|").Append(version.ToString("0.0"));
sb.Append("|").Append(category);
sb.Append("|").Append(operationName);
sb.Append("|1"); // severity is always 1
sb.Append("|deviceExternalId=").Append(MakeDeviceExternalID());
sb.Append(String.Format(" cs1={0}", nsgRuleName));
sb.Append(String.Format(" cs1Label=NSGRuleName"));
sb.Append((deviceDirection == "I" ? " dmac=" : " smac=") + MakeMAC());
sb.Append(" rt=").Append((Convert.ToUInt64(startTime) * 1000).ToString());
sb.Append(" src=").Append(sourceAddress);
sb.Append(" dst=").Append(destinationAddress);
sb.Append(" spt=").Append(sourcePort);
sb.Append(" dpt=").Append(destinationPort);
sb.Append(" proto=").Append((transportProtocol == "U" ? "UDP" : "TCP"));
sb.Append(" deviceDirection=").Append((deviceDirection == "I" ? "0" : "1"));
sb.Append(" act=").Append(deviceAction);
if (version >= 2.0)
{
// add fields from version 2 schema
sb.Append(" cs2=").Append(flowState);
sb.Append(" cs2Label=FlowState");
if (flowState != "B")
{
sb.Append(" cn1=").Append(packetsStoD);
sb.Append(" cn1Label=PacketsStoD");
sb.Append(" cn2=").Append(packetsDtoS);
sb.Append(" cn2Label=PacketsDtoS");
if (deviceDirection == "I")
{
sb.Append(" bytesIn=").Append(bytesStoD);
sb.Append(" bytesOut=").Append(bytesDtoS);
}
else
{
sb.Append(" bytesIn=").Append(bytesDtoS);
sb.Append(" bytesOut=").Append(bytesStoD);
}
}
}
}
public int AppendToTransmission(ref byte[] transmission, int maxSize, int offset)
{
StringBuilder sb = StringBuilderPool.Allocate();
var bytePool = ArrayPool<byte>.Shared;
byte[] buffer = bytePool.Rent((int)1000);
byte[] crlf = new Byte[] { 0x0D, 0x0A };
int bytesToAppend = 0;
try
{
BuildCEF(ref sb);
string s = sb.ToString();
bytesToAppend += s.Length + 2;
if (maxSize > offset + bytesToAppend)
{
Buffer.BlockCopy(Encoding.ASCII.GetBytes(s), 0, buffer, 0, s.Length);
Buffer.BlockCopy(crlf, 0, buffer, s.Length, 2);
Buffer.BlockCopy(buffer, 0, transmission, offset, bytesToAppend);
} else
{
throw new System.IO.InternalBufferOverflowException("ArcSight transmission buffer overflow");
}
}
finally
{
StringBuilderPool.Free(sb);
bytePool.Return(buffer);
}
return bytesToAppend;
}
public int GetSizeOfJSONObject()
{
int objectSize = 0;
@ -178,10 +308,10 @@ class NSGFlowLogTuple
flowState = parts[8];
if (flowState != "B")
{
packetsStoD = parts[9];
bytesStoD = parts[10];
packetsDtoS = parts[11];
bytesDtoS = parts[12];
packetsStoD = (parts[9] == "" ? "0" : parts[9]);
bytesStoD = (parts[10] == "" ? "0" : parts[10]);
packetsDtoS = (parts[11] == "" ? "0" : parts[11]);
bytesDtoS = (parts[12] == "" ? "0" : parts[12]);
}
}
}
@ -218,13 +348,13 @@ class NSGFlowLogTuple
if (deviceDirection == "I")
{
temp.Append(" bytesIn={0}").Append(bytesStoD);
temp.Append(" bytesOut={0}").Append(bytesDtoS);
temp.Append(" bytesIn=").Append(bytesStoD);
temp.Append(" bytesOut=").Append(bytesDtoS);
}
else
{
temp.Append(" bytesIn={0}").Append(bytesDtoS);
temp.Append(" bytesOut={0}").Append(bytesStoD);
temp.Append(" bytesIn=").Append(bytesDtoS);
temp.Append(" bytesOut=").Append(bytesStoD);
}
}
}

Просмотреть файл

@ -2,6 +2,7 @@
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Buffers;
using System.Net.Sockets;
using System.Threading.Tasks;
@ -9,7 +10,7 @@ namespace nsgFunc
{
public partial class Util
{
public static async Task obArcsight(string newClientContent, ILogger log)
public static async Task<int> obArcsight(string newClientContent, ExecutionContext executionContext, ILogger log)
{
//
// newClientContent looks like this:
@ -28,17 +29,19 @@ namespace nsgFunc
if (arcsightAddress.Length == 0 || arcsightPort.Length == 0)
{
log.LogError("Values for arcsightAddress and arcsightPort are required.");
return;
return 0;
}
TcpClient client = new TcpClient(arcsightAddress, Convert.ToInt32(arcsightPort));
NetworkStream stream = client.GetStream();
int count = 0;
int bytesSent = 0;
Byte[] transmission = new Byte[] { };
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
foreach (var message in convertToCEF(newClientContent, null, log))
{
try
{
transmission = Util.AppendToTransmission(transmission, message);
@ -46,7 +49,21 @@ namespace nsgFunc
// batch up the messages
if (count++ == 1000)
{
sw.Stop();
log.LogInformation($"Time to build new transmission byte[] from convertToCEF: {sw.ElapsedMilliseconds}");
sw.Reset();
sw.Start();
await stream.WriteAsync(transmission, 0, transmission.Length);
bytesSent += transmission.Length;
sw.Stop();
log.LogInformation($"Time to transmit to ArcSight server: {sw.ElapsedMilliseconds}");
sw.Reset();
sw.Start();
count = 0;
transmission = new Byte[] { };
}
@ -61,6 +78,11 @@ namespace nsgFunc
try
{
await stream.WriteAsync(transmission, 0, transmission.Length);
bytesSent += transmission.Length;
sw.Stop();
log.LogInformation($"Time to transmit to ArcSight server: {sw.ElapsedMilliseconds}");
}
catch (Exception ex)
{
@ -68,6 +90,97 @@ namespace nsgFunc
}
}
await stream.FlushAsync();
return bytesSent;
}
public static async Task<int> obArcsightNew(string newClientContent, ExecutionContext executionContext, ILogger log)
{
//
// newClientContent looks like this:
//
// {
// "records":[
// {...},
// {...}
// ...
// ]
// }
//
string arcsightAddress = Util.GetEnvironmentVariable("arcsightAddress");
string arcsightPort = Util.GetEnvironmentVariable("arcsightPort");
if (arcsightAddress.Length == 0 || arcsightPort.Length == 0)
{
log.LogError("Values for arcsightAddress and arcsightPort are required.");
return 0;
}
TcpClient client = new TcpClient(arcsightAddress, Convert.ToInt32(arcsightPort));
NetworkStream stream = client.GetStream();
int transmittedByteCount = 0;
System.Diagnostics.Stopwatch sw = new System.Diagnostics.Stopwatch();
foreach (var tuple in bundleMessageListsCEF(newClientContent, log))
{
try
{
sw.Start();
await stream.WriteAsync(tuple.Item1, 0, tuple.Item2);
sw.Stop();
transmittedByteCount += tuple.Item2;
log.LogInformation($"Transmitted {tuple.Item2} bytes, in time {sw.ElapsedMilliseconds}, operation id {executionContext.InvocationId}.");
sw.Reset();
}
catch (Exception ex)
{
log.LogError($"Exception sending to ArcSight: {ex.Message}");
}
}
await stream.FlushAsync();
return transmittedByteCount;
}
static System.Collections.Generic.IEnumerable<Tuple<byte[],int>> bundleMessageListsCEF(string newClientContent, ILogger log)
{
const int MAXBUFFERSIZE = 1024 * 1024;
var bytePool = ArrayPool<byte>.Shared;
byte[] transmission = bytePool.Rent((int)MAXBUFFERSIZE);
int transmissionLength = 0;
try
{
System.Diagnostics.Stopwatch sw = System.Diagnostics.Stopwatch.StartNew();
foreach (var messageList in denormalizedRecords(newClientContent, null, log))
{
sw.Stop();
log.LogInformation($"Time to get new messageList from denormalizedRecords: {sw.ElapsedMilliseconds}");
sw.Reset();
sw.Start();
foreach (var message in messageList)
{
int bytesAppended = message.AppendToTransmission(ref transmission, MAXBUFFERSIZE, transmissionLength);
transmissionLength += bytesAppended;
}
sw.Stop();
log.LogInformation($"Time to build transmission from messageList: {sw.ElapsedMilliseconds}");
yield return Tuple.Create(transmission, transmissionLength);
sw.Reset();
sw.Start();
transmissionLength = 0;
}
}
finally
{
bytePool.Return(transmission);
}
}
public static System.Collections.Generic.IEnumerable<string> convertToCEF(string newClientContent, Binder errorRecordBinder, ILogger log)

Просмотреть файл

@ -43,28 +43,6 @@ namespace nsgFunc
{
foreach (var messageList in denormalizedRecords(newClientContent, null, log))
{
//
// messageList looks like this: (List<xxx>)
//
// {
// "time": "xxx",
// "category": "xxx",
// "operationName": "xxx",
// "version": "xxx",
// "deviceExtId": "xxx",
// "flowOrder": "xxx",
// "nsgRuleName": "xxx",
// "dmac|smac": "xxx",
// "rt": "xxx",
// "src": "xxx",
// "dst": "xxx",
// "spt": "xxx",
// "dpt": "xxx",
// "proto": "xxx",
// "deviceDirection": "xxx",
// "act": "xxx"
// }
var outgoingRecords = new OutgoingRecords();
outgoingRecords.records = messageList;