refactor message processor, add api to rescan an hour of logs

This commit is contained in:
Greg Oliver 2019-02-19 08:02:04 +00:00
Родитель 3f643565a5
Коммит d061817ada
5 изменённых файлов: 398 добавлений и 23 удалений

Просмотреть файл

@ -86,7 +86,7 @@ namespace nsgFunc
throw ex;
}
checkpoint.PutCheckpoint(checkpointTable, "", 0, blockList.Count()-1);
checkpoint.PutCheckpoint(checkpointTable, blockList.Count()-1);
}
}
}

Просмотреть файл

@ -5,9 +5,6 @@ namespace nsgFunc
{
public class Checkpoint : TableEntity
{
public string LastBlockName { get; set; }
public long StartingByteOffset { get; set; }
public int CheckpointIndex { get; set; } // index of the last processed block list item
public Checkpoint()
@ -18,8 +15,6 @@ namespace nsgFunc
{
PartitionKey = partitionKey;
RowKey = rowKey;
LastBlockName = blockName;
StartingByteOffset = offset;
CheckpointIndex = index;
}
@ -42,10 +37,8 @@ namespace nsgFunc
return checkpoint;
}
public void PutCheckpoint(CloudTable checkpointTable, string lastBlockName, long startingByteOffset, int index)
public void PutCheckpoint(CloudTable checkpointTable, int index)
{
LastBlockName = lastBlockName;
StartingByteOffset = startingByteOffset;
CheckpointIndex = index;
TableOperation operation = TableOperation.InsertOrReplace(this);

107
NwNsgProject/RescanAPI.cs Normal file
Просмотреть файл

@ -0,0 +1,107 @@
using System;
using System.IO;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using Microsoft.WindowsAzure.Storage.Table;
using Microsoft.WindowsAzure.Storage.Blob;
namespace nsgFunc
{
public static class RescanAPI
{
// https://<APP_NAME>.azurewebsites.net/api/rescan/2/17/8
//
[FunctionName("RescanAPI")]
public static async Task<IActionResult> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", Route = "rescan/resourceId=/SUBSCRIPTIONS/{subId}/RESOURCEGROUPS/{resourceGroup}/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/{nsgName}/y={blobYear}/m={blobMonth}/d={blobDay}/h={blobHour}/m={blobMinute}/macAddress={mac}/PT1H.json")]
// [Table("checkpoints", Connection = "AzureWebJobsStorage")] CloudTable checkpointToReset,
HttpRequest req,
Binder checkpointsBinder,
Binder nsgDataBlobBinder,
string subId, string resourceGroup, string nsgName, string blobYear, string blobMonth, string blobDay, string blobHour, string blobMinute, string mac,
ILogger log)
{
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
if (nsgSourceDataAccount.Length == 0)
{
log.LogError("Value for nsgSourceDataAccount is required.");
throw new System.ArgumentNullException("nsgSourceDataAccount", "Please provide setting.");
}
string AzureWebJobsStorage = Util.GetEnvironmentVariable("AzureWebJobsStorage");
if (AzureWebJobsStorage.Length == 0)
{
log.LogError("Value for AzureWebJobsStorage is required.");
throw new System.ArgumentNullException("AzureWebJobsStorage", "Please provide setting.");
}
string blobContainerName = Util.GetEnvironmentVariable("blobContainerName");
if (blobContainerName.Length == 0)
{
log.LogError("Value for blobContainerName is required.");
throw new System.ArgumentNullException("blobContainerName", "Please provide setting.");
}
var blobName = $"resourceId=/SUBSCRIPTIONS/{subId}/RESOURCEGROUPS/{resourceGroup}/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/{nsgName}/y={blobYear}/m={blobMonth}/d={blobDay}/h={blobHour}/m={blobMinute}/macAddress={mac}/PT1H.json";
var blobDetails = new BlobDetails(subId, resourceGroup, nsgName, blobYear, blobMonth, blobDay, blobHour, blobMinute, mac);
var tableAttributes = new Attribute[]
{
new TableAttribute("checkpoints"),
new StorageAccountAttribute("AzureWebJobsStorage")
};
try
{
CloudTable CheckpointTable = await checkpointsBinder.BindAsync<CloudTable>(tableAttributes);
TableOperation getOperation = TableOperation.Retrieve<Checkpoint>(blobDetails.GetPartitionKey(), blobDetails.GetRowKey());
TableResult result = await CheckpointTable.ExecuteAsync(getOperation);
Checkpoint c = (Checkpoint)result.Result;
c.CheckpointIndex = 1;
TableOperation putOperation = TableOperation.InsertOrReplace(c);
await CheckpointTable.ExecuteAsync(putOperation);
}
catch (Exception ex)
{
log.LogError(string.Format("Error binding checkpoints table: {0}", ex.Message));
throw ex;
}
var attributes = new Attribute[]
{
new BlobAttribute(string.Format("{0}/{1}", blobContainerName, blobName)),
new StorageAccountAttribute(nsgSourceDataAccount)
};
try
{
CloudBlockBlob blob = await nsgDataBlobBinder.BindAsync<CloudBlockBlob>(attributes);
await blob.FetchAttributesAsync();
var metadata = blob.Metadata;
if (metadata.ContainsKey("rescan"))
{
int numberRescans = Convert.ToInt32(metadata["rescan"]);
metadata["rescan"] = (numberRescans + 1).ToString();
}
else
{
metadata.Add("rescan", "1");
}
await blob.SetMetadataAsync();
}
catch (Exception ex)
{
log.LogError(string.Format("Error binding blob input: {0}", ex.Message));
throw ex;
}
return (ActionResult)new OkObjectResult($"NSG flow logs for {blobName} were requested.");
}
}
}

Просмотреть файл

@ -3,8 +3,9 @@ using System.Globalization;
using System.Text;
using System.Text.RegularExpressions;
using Newtonsoft.Json;
using System.Collections.Generic;
class ObjectDenormalizer
class DenormalizedRecord
{
public string time { get; set; }
public string category { get; set; }
@ -28,7 +29,7 @@ class ObjectDenormalizer
public string packetsDtoS { get; set; }
public string bytesDtoS { get; set; }
public ObjectDenormalizer(
public DenormalizedRecord(
float version,
string time,
string category,
@ -71,6 +72,39 @@ class ObjectDenormalizer
});
}
public int SizeOfObject()
{
int objectSize = 0;
objectSize += this.version.ToString().Length + 7 + 6;
objectSize += this.time.Length + 4 + 6;
objectSize += this.category.Length + 8 + 6;
objectSize += this.operationName.Length + 13 + 6;
objectSize += this.resourceId.Length + 10 + 6;
objectSize += this.nsgRuleName.Length + 11 + 6;
objectSize += this.mac.Length + 3 + 6;
objectSize += this.startTime.Length + 9 + 6;
objectSize += this.sourceAddress.Length + 13 + 6;
objectSize += this.destinationAddress.Length + 18 + 6;
objectSize += this.sourcePort.Length + 10 + 6;
objectSize += this.destinationPort.Length + 15 + 6;
objectSize += this.deviceDirection.Length + 15 + 6;
objectSize += this.deviceAction.Length + 12 + 6;
if (this.version >= 2.0)
{
objectSize += this.flowState.Length + 9 + 6;
objectSize += this.packetsDtoS == null ? 0 : this.packetsDtoS.Length + 11 + 6;
objectSize += this.packetsStoD == null ? 0 : this.packetsStoD.Length + 11 + 6;
objectSize += this.bytesDtoS == null ? 0 : this.bytesDtoS.Length + 9 + 6;
objectSize += this.bytesStoD == null ? 0 : this.bytesStoD.Length + 9 + 6;
}
return objectSize;
}
}
class OutgoingRecords
{
public List<DenormalizedRecord> records { get; set; }
}
class NSGFlowLogTuple

Просмотреть файл

@ -5,23 +5,20 @@ using Newtonsoft.Json;
using System;
using System.Text;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Collections.Generic;
namespace nsgFunc
{
public partial class Util
{
const int MAXTRANSMISSIONSIZE = 255 * 1024;
// const int MAXTRANSMISSIONSIZE = 2 * 1024;
const int MAXTRANSMISSIONSIZE = 512 * 1024;
//const int MAXTRANSMISSIONSIZE = 2 * 1024;
public static async Task obEventHub(string newClientContent, ILogger log)
private static Lazy<EventHubClient> LazyEventHubConnection = new Lazy<EventHubClient>(() =>
{
string EventHubConnectionString = GetEnvironmentVariable("eventHubConnection");
string EventHubName = GetEnvironmentVariable("eventHubName");
if (EventHubConnectionString.Length == 0 || EventHubName.Length == 0)
{
log.LogError("Values for eventHubConnection and eventHubName are required.");
return;
}
var connectionStringBuilder = new EventHubsConnectionStringBuilder(EventHubConnectionString)
{
@ -29,16 +26,22 @@ namespace nsgFunc
};
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionStringBuilder.ToString());
foreach (var bundleOfMessages in bundleMessages(newClientContent, log))
{
//log.Info(String.Format("-----Outgoing message is: {0}", bundleOfMessages));
return eventHubClient;
});
public static async Task obEventHub(string newClientContent, ILogger log)
{
var eventHubClient = LazyEventHubConnection.Value;
foreach (var bundleOfMessages in bundleMessageListsJson(newClientContent, log))
{
await eventHubClient.SendAsync(new EventData(Encoding.UTF8.GetBytes(bundleOfMessages)));
}
}
static System.Collections.Generic.IEnumerable<string> bundleMessages(string newClientContent, ILogger log)
{
var numberOfItemsInBundle = 0;
var transmission = new StringBuilder(MAXTRANSMISSIONSIZE);
transmission.Append("{\"records\":[");
bool firstRecord = true;
@ -69,6 +72,8 @@ namespace nsgFunc
if (transmission.Length + message.Length > MAXTRANSMISSIONSIZE)
{
transmission.Append("]}");
log.LogInformation("Number of items in bundle: {0}", numberOfItemsInBundle);
numberOfItemsInBundle = 0;
yield return transmission.ToString();
transmission.Clear();
transmission.Append("{\"records\":[");
@ -86,6 +91,7 @@ namespace nsgFunc
}
transmission.Append(message);
numberOfItemsInBundle++;
}
if (transmission.Length > 0)
{
@ -94,6 +100,96 @@ namespace nsgFunc
}
}
static System.Collections.Generic.IEnumerable<string> bundleMessageLists(string newClientContent, ILogger log)
{
var transmission = new StringBuilder(MAXTRANSMISSIONSIZE);
foreach (var messageList in denormalizeLists(newClientContent, null, log))
{
//
// messageList looks like this: (List<xxx>)
//
// {
// "time": "xxx",
// "category": "xxx",
// "operationName": "xxx",
// "version": "xxx",
// "deviceExtId": "xxx",
// "flowOrder": "xxx",
// "nsgRuleName": "xxx",
// "dmac|smac": "xxx",
// "rt": "xxx",
// "src": "xxx",
// "dst": "xxx",
// "spt": "xxx",
// "dpt": "xxx",
// "proto": "xxx",
// "deviceDirection": "xxx",
// "act": "xxx"
// }
transmission.Append("{\"records\":[");
var numberOfItemsInBundle = 0;
bool firstRecord = true;
foreach (var message in messageList)
{
if (firstRecord)
{
firstRecord = false;
}
else
{
transmission.Append(",");
}
transmission.Append(message);
numberOfItemsInBundle++;
}
transmission.Append("]}");
log.LogInformation("Number of items in bundle: {0}", numberOfItemsInBundle);
numberOfItemsInBundle = 0;
yield return transmission.ToString();
transmission.Clear();
}
}
static System.Collections.Generic.IEnumerable<string> bundleMessageListsJson(string newClientContent, ILogger log)
{
foreach (var messageList in denormalizedRecords(newClientContent, null, log))
{
//
// messageList looks like this: (List<xxx>)
//
// {
// "time": "xxx",
// "category": "xxx",
// "operationName": "xxx",
// "version": "xxx",
// "deviceExtId": "xxx",
// "flowOrder": "xxx",
// "nsgRuleName": "xxx",
// "dmac|smac": "xxx",
// "rt": "xxx",
// "src": "xxx",
// "dst": "xxx",
// "spt": "xxx",
// "dpt": "xxx",
// "proto": "xxx",
// "deviceDirection": "xxx",
// "act": "xxx"
// }
var outgoingRecords = new OutgoingRecords();
outgoingRecords.records = messageList;
var outgoingJson = JsonConvert.SerializeObject(outgoingRecords, new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
});
yield return outgoingJson;
}
}
static System.Collections.Generic.IEnumerable<string> denormalizeRecords(string newClientContent, Binder errorRecordBinder, ILogger log)
{
//
@ -132,7 +228,7 @@ namespace nsgFunc
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedObject = new ObjectDenormalizer(
var denormalizedObject = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
@ -148,6 +244,151 @@ namespace nsgFunc
}
}
}
}
static System.Collections.Generic.IEnumerable<List<string>> denormalizeLists(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingList = new List<string>(450);
var sizeOfListItems = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedObject = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
string outgoingJson = denormalizedObject.ToString();
if (sizeOfListItems + outgoingJson.Length > MAXTRANSMISSIONSIZE+20)
{
yield return outgoingList;
outgoingList.Clear();
sizeOfListItems = 0;
}
outgoingList.Add(outgoingJson);
sizeOfListItems += outgoingJson.Length;
}
}
}
}
if (outgoingList.Count > 0)
{
yield return outgoingList;
}
}
static IEnumerable<DenormalizedRecord[]> denormalizeArrays(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingArray = Array.CreateInstance(typeof(DenormalizedRecord), 450);
var sizeOfArrayItems = 0;
var arrayItemIndex = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedObject = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
string outgoingJson = denormalizedObject.ToString();
if (sizeOfArrayItems + outgoingJson.Length > MAXTRANSMISSIONSIZE + 20)
{
yield return (DenormalizedRecord[])outgoingArray;
Array.Clear(outgoingArray, 0, 450);
sizeOfArrayItems = 0;
arrayItemIndex = 0;
}
outgoingArray.SetValue(denormalizedObject, arrayItemIndex++);
sizeOfArrayItems += outgoingJson.Length;
}
}
}
}
if (arrayItemIndex > 0)
{
yield return (DenormalizedRecord[])outgoingArray;
}
}
static IEnumerable<List<DenormalizedRecord>> denormalizedRecords(string newClientContent, Binder errorRecordBinder, ILogger log)
{
var outgoingList = new List<DenormalizedRecord>(450);
var sizeOfListItems = 0;
NSGFlowLogRecords logs = JsonConvert.DeserializeObject<NSGFlowLogRecords>(newClientContent);
foreach (var record in logs.records)
{
float version = record.properties.Version;
foreach (var outerFlow in record.properties.flows)
{
foreach (var innerFlow in outerFlow.flows)
{
foreach (var flowTuple in innerFlow.flowTuples)
{
var tuple = new NSGFlowLogTuple(flowTuple, version);
var denormalizedRecord = new DenormalizedRecord(
record.properties.Version,
record.time,
record.category,
record.operationName,
record.resourceId,
outerFlow.rule,
innerFlow.mac,
tuple);
var sizeOfDenormalizedRecord = denormalizedRecord.SizeOfObject();
if (sizeOfListItems + sizeOfDenormalizedRecord > MAXTRANSMISSIONSIZE + 20)
{
yield return outgoingList;
outgoingList.Clear();
sizeOfListItems = 0;
}
outgoingList.Add(denormalizedRecord);
sizeOfListItems += sizeOfDenormalizedRecord;
}
}
}
}
if (sizeOfListItems > 0)
{
yield return outgoingList;
}
}
}
}