refactor code, eliminate the queues.
This commit is contained in:
Родитель
0346783d30
Коммит
d962ab4e30
|
@ -1,6 +1,8 @@
|
|||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Text;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public class BlobDetails
|
||||
{
|
||||
|
@ -76,5 +78,10 @@ namespace NwNsgProject
|
|||
{
|
||||
return string.Format("{0}_{1}_{2}_{3}_{4}", Year, Month, Day, Hour, Minute);
|
||||
}
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
return string.Format("{0}_{1}_{2}_{3}", ResourceGroupName, NsgName, Day, Hour);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,92 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.WindowsAzure.Storage.Blob;
|
||||
using Microsoft.WindowsAzure.Storage.Table;
|
||||
using System;
|
||||
using System.Linq;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace nsgFunc
|
||||
{
|
||||
public static class BlobTriggerIngestAndTransmit
|
||||
{
|
||||
[FunctionName("BlobTriggerIngestAndTransmit")]
|
||||
public static async Task Run(
|
||||
[BlobTrigger("%blobContainerName%/resourceId=/SUBSCRIPTIONS/{subId}/RESOURCEGROUPS/{resourceGroup}/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/{nsgName}/y={blobYear}/m={blobMonth}/d={blobDay}/h={blobHour}/m={blobMinute}/macAddress={mac}/PT1H.json", Connection = "%nsgSourceDataAccount%")]CloudBlockBlob myBlob,
|
||||
[Table("checkpoints", Connection = "AzureWebJobsStorage")] CloudTable checkpointTable,
|
||||
Binder nsgDataBlobBinder,
|
||||
string subId, string resourceGroup, string nsgName, string blobYear, string blobMonth, string blobDay, string blobHour, string blobMinute, string mac,
|
||||
ILogger log)
|
||||
{
|
||||
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
|
||||
if (nsgSourceDataAccount.Length == 0)
|
||||
{
|
||||
log.LogError("Value for nsgSourceDataAccount is required.");
|
||||
throw new System.ArgumentNullException("nsgSourceDataAccount", "Please provide setting.");
|
||||
}
|
||||
|
||||
string blobContainerName = Util.GetEnvironmentVariable("blobContainerName");
|
||||
if (blobContainerName.Length == 0)
|
||||
{
|
||||
log.LogError("Value for blobContainerName is required.");
|
||||
throw new System.ArgumentNullException("blobContainerName", "Please provide setting.");
|
||||
}
|
||||
|
||||
var blobDetails = new BlobDetails(subId, resourceGroup, nsgName, blobYear, blobMonth, blobDay, blobHour, blobMinute, mac);
|
||||
|
||||
// get checkpoint
|
||||
Checkpoint checkpoint = Checkpoint.GetCheckpoint(blobDetails, checkpointTable);
|
||||
|
||||
var blockList = myBlob.DownloadBlockListAsync().Result;
|
||||
var startingByte = blockList.Where((item, index) => index<checkpoint.CheckpointIndex).Sum(item => item.Length);
|
||||
var endingByte = blockList.Where((item, index) => index < blockList.Count()-1).Sum(item => item.Length);
|
||||
var dataLength = endingByte - startingByte;
|
||||
|
||||
log.LogInformation("Blob: {0}, starting byte: {1}, ending byte: {2}, number of bytes: {3}", blobDetails.ToString(), startingByte, endingByte, dataLength);
|
||||
|
||||
if (dataLength == 0)
|
||||
{
|
||||
log.LogWarning(string.Format("Blob: {0}, triggered on completed hour.", blobDetails.ToString()));
|
||||
return;
|
||||
}
|
||||
//foreach (var item in blockList)
|
||||
//{
|
||||
// log.LogInformation("Name: {0}, Length: {1}", item.Name, item.Length);
|
||||
//}
|
||||
|
||||
var attributes = new Attribute[]
|
||||
{
|
||||
new BlobAttribute(string.Format("{0}/{1}", blobContainerName, myBlob.Name)),
|
||||
new StorageAccountAttribute(nsgSourceDataAccount)
|
||||
};
|
||||
|
||||
string nsgMessagesString;
|
||||
try
|
||||
{
|
||||
byte[] nsgMessages = new byte[dataLength];
|
||||
CloudBlockBlob blob = nsgDataBlobBinder.BindAsync<CloudBlockBlob>(attributes).Result;
|
||||
blob.DownloadRangeToByteArrayAsync(nsgMessages, 0, startingByte, dataLength).Wait();
|
||||
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.LogError(string.Format("Error binding blob input: {0}", ex.Message));
|
||||
throw ex;
|
||||
}
|
||||
|
||||
//log.LogDebug(nsgMessagesString);
|
||||
|
||||
try
|
||||
{
|
||||
await Util.SendMessagesDownstreamAsync(nsgMessagesString, log);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.LogError(string.Format("SendMessagesDownstreamAsync: Error {0}", ex.Message));
|
||||
throw ex;
|
||||
}
|
||||
|
||||
checkpoint.PutCheckpoint(checkpointTable, "", 0, blockList.Count()-1);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,48 +1,55 @@
|
|||
using System;
|
||||
using Microsoft.WindowsAzure.Storage.Table;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public class Checkpoint : TableEntity
|
||||
{
|
||||
|
||||
public string LastBlockName { get; set; }
|
||||
public long StartingByteOffset { get; set; }
|
||||
public int CheckpointIndex { get; set; } // index of the last processed block list item
|
||||
|
||||
public Checkpoint()
|
||||
{
|
||||
}
|
||||
|
||||
public Checkpoint(string partitionKey, string rowKey, string blockName, long offset)
|
||||
public Checkpoint(string partitionKey, string rowKey, string blockName, long offset, int index)
|
||||
{
|
||||
PartitionKey = partitionKey;
|
||||
RowKey = rowKey;
|
||||
LastBlockName = blockName;
|
||||
StartingByteOffset = offset;
|
||||
CheckpointIndex = index;
|
||||
}
|
||||
|
||||
public static Checkpoint GetCheckpoint(BlobDetails blobDetails, CloudTable checkpointTable)
|
||||
{
|
||||
TableOperation operation = TableOperation.Retrieve<Checkpoint>(
|
||||
blobDetails.GetPartitionKey(), blobDetails.GetRowKey());
|
||||
TableResult result = checkpointTable.Execute(operation);
|
||||
TableResult result = checkpointTable.ExecuteAsync(operation).Result;
|
||||
|
||||
Checkpoint checkpoint = (Checkpoint)result.Result;
|
||||
if (checkpoint == null)
|
||||
{
|
||||
checkpoint = new Checkpoint(blobDetails.GetPartitionKey(), blobDetails.GetRowKey(), "", 0);
|
||||
checkpoint = new Checkpoint(blobDetails.GetPartitionKey(), blobDetails.GetRowKey(), "", 0, 1);
|
||||
}
|
||||
if (checkpoint.CheckpointIndex == 0)
|
||||
{
|
||||
checkpoint.CheckpointIndex = 1;
|
||||
}
|
||||
|
||||
return checkpoint;
|
||||
}
|
||||
|
||||
public void PutCheckpoint(CloudTable checkpointTable, string lastBlockName, long startingByteOffset)
|
||||
public void PutCheckpoint(CloudTable checkpointTable, string lastBlockName, long startingByteOffset, int index)
|
||||
{
|
||||
LastBlockName = lastBlockName;
|
||||
StartingByteOffset = startingByteOffset;
|
||||
CheckpointIndex = index;
|
||||
|
||||
TableOperation operation = TableOperation.InsertOrReplace(this);
|
||||
checkpointTable.Execute(operation);
|
||||
checkpointTable.ExecuteAsync(operation).Wait();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,14 +1,12 @@
|
|||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<Project Sdk="Microsoft.NET.Sdk">
|
||||
<PropertyGroup>
|
||||
<TargetFramework>net461</TargetFramework>
|
||||
<TargetFramework>netcoreapp2.1</TargetFramework>
|
||||
<AzureFunctionsVersion>v2</AzureFunctionsVersion>
|
||||
</PropertyGroup>
|
||||
<ItemGroup>
|
||||
<PackageReference Include="Microsoft.Azure.EventHubs" Version="2.1.0" />
|
||||
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.14" />
|
||||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<Reference Include="Microsoft.CSharp" />
|
||||
<PackageReference Include="Microsoft.Azure.EventHubs" Version="2.2.1" />
|
||||
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions.Storage" Version="3.0.3" />
|
||||
<PackageReference Include="Microsoft.NET.Sdk.Functions" Version="1.0.24" />
|
||||
</ItemGroup>
|
||||
<ItemGroup>
|
||||
<None Update="host.json">
|
||||
|
@ -19,4 +17,4 @@
|
|||
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
|
||||
</None>
|
||||
</ItemGroup>
|
||||
</Project>
|
||||
</Project>
|
|
@ -1,157 +0,0 @@
|
|||
using System.IO;
|
||||
using System.Collections.Generic;
|
||||
using Microsoft.WindowsAzure.Storage.Blob;
|
||||
using Microsoft.WindowsAzure.Storage.Table;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
|
||||
namespace NwNsgProject
|
||||
{
|
||||
public static class Stage1BlobTrigger
|
||||
{
|
||||
const int MAXDOWNLOADBYTES = 102400;
|
||||
|
||||
[FunctionName("stage1BlobTrigger")]
|
||||
public static void Run(
|
||||
[BlobTrigger("%blobContainerName%/resourceId=/SUBSCRIPTIONS/{subId}/RESOURCEGROUPS/{resourceGroup}/PROVIDERS/MICROSOFT.NETWORK/NETWORKSECURITYGROUPS/{nsgName}/y={blobYear}/m={blobMonth}/d={blobDay}/h={blobHour}/m={blobMinute}/macAddress={mac}/PT1H.json", Connection = "%nsgSourceDataAccount%")]CloudBlockBlob myBlob,
|
||||
[Queue("stage1", Connection = "AzureWebJobsStorage")] ICollector<Chunk> outputChunks,
|
||||
[Table("checkpoints", Connection = "AzureWebJobsStorage")] CloudTable checkpointTable,
|
||||
string subId, string resourceGroup, string nsgName, string blobYear, string blobMonth, string blobDay, string blobHour, string blobMinute, string mac,
|
||||
TraceWriter log)
|
||||
{
|
||||
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
|
||||
if (nsgSourceDataAccount.Length == 0)
|
||||
{
|
||||
log.Error("Value for nsgSourceDataAccount is required.");
|
||||
throw new System.ArgumentNullException("nsgSourceDataAccount", "Please provide setting.");
|
||||
}
|
||||
|
||||
string blobContainerName = Util.GetEnvironmentVariable("blobContainerName");
|
||||
if (blobContainerName.Length == 0)
|
||||
{
|
||||
log.Error("Value for blobContainerName is required.");
|
||||
throw new System.ArgumentNullException("blobContainerName", "Please provide setting.");
|
||||
}
|
||||
|
||||
var blobDetails = new BlobDetails(subId, resourceGroup, nsgName, blobYear, blobMonth, blobDay, blobHour, blobMinute, mac);
|
||||
|
||||
// get checkpoint
|
||||
Checkpoint checkpoint = Checkpoint.GetCheckpoint(blobDetails, checkpointTable);
|
||||
|
||||
// break up the block list into 10k chunks
|
||||
List<Chunk> chunks = new List<Chunk>();
|
||||
long currentChunkSize = 0;
|
||||
string currentChunkLastBlockName = "";
|
||||
long currentStartingByteOffset = 0;
|
||||
|
||||
bool firstBlockItem = true;
|
||||
bool foundStartingOffset = false;
|
||||
bool tieOffChunk = false;
|
||||
|
||||
int numberOfBlocks = 0;
|
||||
long sizeOfBlocks = 0;
|
||||
|
||||
foreach (var blockListItem in myBlob.DownloadBlockList(BlockListingFilter.Committed))
|
||||
{
|
||||
if (!foundStartingOffset)
|
||||
{
|
||||
if (firstBlockItem)
|
||||
{
|
||||
currentStartingByteOffset += blockListItem.Length;
|
||||
firstBlockItem = false;
|
||||
if (checkpoint.LastBlockName == "")
|
||||
{
|
||||
foundStartingOffset = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
if (blockListItem.Name == checkpoint.LastBlockName)
|
||||
{
|
||||
foundStartingOffset = true;
|
||||
}
|
||||
currentStartingByteOffset += blockListItem.Length;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// tieOffChunk = add current chunk to the list, initialize next chunk counters
|
||||
// conditions to account for:
|
||||
// 1) current chunk is empty & not the last block (size > 10 I think)
|
||||
// a) add blockListItem to current chunk
|
||||
// b) loop
|
||||
// 2) current chunk is empty & last block (size < 10 I think)
|
||||
// a) do not add blockListItem to current chunk
|
||||
// b) loop terminates
|
||||
// c) chunk last added to the list is the last chunk
|
||||
// 3) current chunk is not empty & not the last block
|
||||
// a) if size of block + size of chunk >10k
|
||||
// i) add chunk to list <-- tieOffChunk
|
||||
// ii) reset chunk counters
|
||||
// b) add blockListItem to chunk
|
||||
// c) loop
|
||||
// 4) current chunk is not empty & last block
|
||||
// a) add chunk to list <-- tieOffChunk
|
||||
// b) do not add blockListItem to chunk
|
||||
// c) loop terminates
|
||||
tieOffChunk = (currentChunkSize != 0) && ((blockListItem.Length < 10) || (currentChunkSize + blockListItem.Length > MAXDOWNLOADBYTES));
|
||||
if (tieOffChunk)
|
||||
{
|
||||
// chunk complete, add it to the list & reset counters
|
||||
chunks.Add(new Chunk
|
||||
{
|
||||
BlobName = blobContainerName + "/" + myBlob.Name,
|
||||
Length = currentChunkSize,
|
||||
LastBlockName = currentChunkLastBlockName,
|
||||
Start = currentStartingByteOffset,
|
||||
BlobAccountConnectionName = nsgSourceDataAccount
|
||||
});
|
||||
currentStartingByteOffset += currentChunkSize; // the next chunk starts at this offset
|
||||
currentChunkSize = 0;
|
||||
tieOffChunk = false;
|
||||
}
|
||||
if (blockListItem.Length > 10)
|
||||
{
|
||||
numberOfBlocks++;
|
||||
sizeOfBlocks += blockListItem.Length;
|
||||
|
||||
currentChunkSize += blockListItem.Length;
|
||||
currentChunkLastBlockName = blockListItem.Name;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
if (currentChunkSize != 0)
|
||||
{
|
||||
// residual chunk
|
||||
chunks.Add(new Chunk
|
||||
{
|
||||
BlobName = blobContainerName + "/" + myBlob.Name,
|
||||
Length = currentChunkSize,
|
||||
LastBlockName = currentChunkLastBlockName,
|
||||
Start = currentStartingByteOffset,
|
||||
BlobAccountConnectionName = nsgSourceDataAccount
|
||||
});
|
||||
}
|
||||
|
||||
if (chunks.Count > 0)
|
||||
{
|
||||
var lastChunk = chunks[chunks.Count - 1];
|
||||
checkpoint.PutCheckpoint(checkpointTable, lastChunk.LastBlockName, lastChunk.Start + lastChunk.Length);
|
||||
}
|
||||
|
||||
// add the chunks to output queue
|
||||
// they are sent automatically by Functions configuration
|
||||
foreach (var chunk in chunks)
|
||||
{
|
||||
outputChunks.Add(chunk);
|
||||
if (chunk.Length == 0)
|
||||
{
|
||||
log.Error("chunk length is 0");
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,161 +0,0 @@
|
|||
using System;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Azure.WebJobs.Host.Bindings;
|
||||
using Microsoft.WindowsAzure.Storage.Blob;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
{
|
||||
public static class Stage2QueueTrigger
|
||||
{
|
||||
const int MAX_CHUNK_SIZE = 102400;
|
||||
|
||||
[FunctionName("Stage2QueueTrigger")]
|
||||
public static async Task Run(
|
||||
[QueueTrigger("stage1", Connection = "AzureWebJobsStorage")]Chunk inputChunk,
|
||||
[Queue("stage2", Connection = "AzureWebJobsStorage")] ICollector<Chunk> outputQueue,
|
||||
Binder binder,
|
||||
TraceWriter log)
|
||||
{
|
||||
// log.Info($"C# Queue trigger function processed: {inputChunk}");
|
||||
|
||||
if (inputChunk.Length < MAX_CHUNK_SIZE)
|
||||
{
|
||||
outputQueue.Add(inputChunk);
|
||||
return;
|
||||
}
|
||||
|
||||
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
|
||||
if (nsgSourceDataAccount.Length == 0)
|
||||
{
|
||||
log.Error("Value for nsgSourceDataAccount is required.");
|
||||
throw new ArgumentNullException("nsgSourceDataAccount", "Please supply in this setting the name of the connection string from which NSG logs should be read.");
|
||||
}
|
||||
|
||||
var attributes = new Attribute[]
|
||||
{
|
||||
new BlobAttribute(inputChunk.BlobName),
|
||||
new StorageAccountAttribute(nsgSourceDataAccount)
|
||||
};
|
||||
|
||||
byte[] nsgMessages = new byte[inputChunk.Length];
|
||||
try
|
||||
{
|
||||
CloudBlockBlob blob = await binder.BindAsync<CloudBlockBlob>(attributes);
|
||||
await blob.DownloadRangeToByteArrayAsync(nsgMessages, 0, inputChunk.Start, inputChunk.Length);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error(string.Format("Error binding blob input: {0}", ex.Message));
|
||||
throw ex;
|
||||
}
|
||||
|
||||
int startingByte = 0;
|
||||
var chunkCount = 0;
|
||||
|
||||
var newChunk = GetNewChunk(inputChunk, chunkCount++, log, 0);
|
||||
|
||||
//long length = FindNextRecord(nsgMessages, startingByte);
|
||||
var nsgMessagesString = System.Text.Encoding.Default.GetString(nsgMessages);
|
||||
int endingByte = FindNextRecordRecurse(nsgMessagesString, startingByte, 0, log);
|
||||
int length = endingByte - startingByte + 1;
|
||||
|
||||
while (length != 0)
|
||||
{
|
||||
if (newChunk.Length + length > MAX_CHUNK_SIZE)
|
||||
{
|
||||
//log.Info($"Chunk starts at {newChunk.Start}, length is {newChunk.Length}, next start is {newChunk.Start + newChunk.Length}");
|
||||
outputQueue.Add(newChunk);
|
||||
|
||||
newChunk = GetNewChunk(inputChunk, chunkCount++, log, newChunk.Start + newChunk.Length);
|
||||
}
|
||||
|
||||
newChunk.Length += length;
|
||||
startingByte += length;
|
||||
|
||||
endingByte = FindNextRecordRecurse(nsgMessagesString, startingByte, 0, log);
|
||||
length = endingByte - startingByte + 1;
|
||||
}
|
||||
|
||||
if (newChunk.Length > 0)
|
||||
{
|
||||
outputQueue.Add(newChunk);
|
||||
//log.Info($"Chunk starts at {newChunk.Start}, length is {newChunk.Length}");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static Chunk GetNewChunk(Chunk thisChunk, int index, TraceWriter log, long start = 0)
|
||||
{
|
||||
var chunk = new Chunk
|
||||
{
|
||||
BlobName = thisChunk.BlobName,
|
||||
BlobAccountConnectionName = thisChunk.BlobAccountConnectionName,
|
||||
LastBlockName = string.Format("{0}-{1}", index, thisChunk.LastBlockName),
|
||||
Start = (start == 0 ? thisChunk.Start : start),
|
||||
Length = 0
|
||||
};
|
||||
|
||||
//log.Info($"new chunk: {chunk.ToString()}");
|
||||
|
||||
return chunk;
|
||||
}
|
||||
|
||||
public static long FindNextRecord(byte[] array, long startingByte)
|
||||
{
|
||||
var arraySize = array.Length;
|
||||
var endingByte = startingByte;
|
||||
var curlyBraceCount = 0;
|
||||
var insideARecord = false;
|
||||
|
||||
for (long index = startingByte; index < arraySize; index++)
|
||||
{
|
||||
endingByte++;
|
||||
|
||||
if (array[index] == '{')
|
||||
{
|
||||
insideARecord = true;
|
||||
curlyBraceCount++;
|
||||
}
|
||||
|
||||
curlyBraceCount -= (array[index] == '}' ? 1 : 0);
|
||||
|
||||
if (insideARecord && curlyBraceCount == 0)
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return endingByte - startingByte;
|
||||
}
|
||||
|
||||
public static int FindNextRecordRecurse(string nsgMessages, int startingByte, int braceCounter, TraceWriter log)
|
||||
{
|
||||
if (startingByte == nsgMessages.Length)
|
||||
{
|
||||
return startingByte - 1;
|
||||
}
|
||||
|
||||
int nextBrace = nsgMessages.IndexOfAny(new Char[] { '}', '{' }, startingByte);
|
||||
|
||||
if (nsgMessages[nextBrace] == '{')
|
||||
{
|
||||
braceCounter++;
|
||||
nextBrace = FindNextRecordRecurse(nsgMessages, nextBrace + 1, braceCounter, log);
|
||||
}
|
||||
else
|
||||
{
|
||||
braceCounter--;
|
||||
if (braceCounter > 0)
|
||||
{
|
||||
nextBrace = FindNextRecordRecurse(nsgMessages, nextBrace + 1, braceCounter, log);
|
||||
}
|
||||
}
|
||||
|
||||
return nextBrace;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
|
@ -1,154 +0,0 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.WindowsAzure.Storage.Blob;
|
||||
using System;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
{
|
||||
public static class Stage3QueueTrigger
|
||||
{
|
||||
[FunctionName("Stage3QueueTrigger")]
|
||||
public static async Task Run(
|
||||
[QueueTrigger("stage2", Connection = "AzureWebJobsStorage")]Chunk inputChunk,
|
||||
Binder binder,
|
||||
Binder cefLogBinder,
|
||||
Binder errorRecordBinder,
|
||||
TraceWriter log)
|
||||
{
|
||||
// log.Info($"C# Queue trigger function processed: {inputChunk}");
|
||||
|
||||
string nsgSourceDataAccount = Util.GetEnvironmentVariable("nsgSourceDataAccount");
|
||||
if (nsgSourceDataAccount.Length == 0)
|
||||
{
|
||||
log.Error("Value for nsgSourceDataAccount is required.");
|
||||
throw new ArgumentNullException("nsgSourceDataAccount", "Please supply in this setting the name of the connection string from which NSG logs should be read.");
|
||||
}
|
||||
|
||||
var attributes = new Attribute[]
|
||||
{
|
||||
new BlobAttribute(inputChunk.BlobName),
|
||||
new StorageAccountAttribute(nsgSourceDataAccount)
|
||||
};
|
||||
|
||||
string nsgMessagesString;
|
||||
try
|
||||
{
|
||||
byte[] nsgMessages = new byte[inputChunk.Length];
|
||||
CloudBlockBlob blob = await binder.BindAsync<CloudBlockBlob>(attributes);
|
||||
await blob.DownloadRangeToByteArrayAsync(nsgMessages, 0, inputChunk.Start, inputChunk.Length);
|
||||
nsgMessagesString = System.Text.Encoding.UTF8.GetString(nsgMessages);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error(string.Format("Error binding blob input: {0}", ex.Message));
|
||||
throw ex;
|
||||
}
|
||||
|
||||
// skip past the leading comma
|
||||
string trimmedMessages = nsgMessagesString.Trim();
|
||||
int curlyBrace = trimmedMessages.IndexOf('{');
|
||||
string newClientContent = "{\"records\":[";
|
||||
newClientContent += trimmedMessages.Substring(curlyBrace);
|
||||
newClientContent += "]}";
|
||||
|
||||
await SendMessagesDownstream(newClientContent, log);
|
||||
|
||||
string logOutgoingCEF = Util.GetEnvironmentVariable("logOutgoingCEF");
|
||||
Boolean flag;
|
||||
if (Boolean.TryParse(logOutgoingCEF, out flag))
|
||||
{
|
||||
if (flag)
|
||||
{
|
||||
await CEFLog(newClientContent, cefLogBinder, errorRecordBinder, log);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static async Task SendMessagesDownstream(string myMessages, TraceWriter log)
|
||||
{
|
||||
//
|
||||
// myMessages looks like this:
|
||||
// {
|
||||
// "records":[
|
||||
// {...},
|
||||
// {...}
|
||||
// ...
|
||||
// ]
|
||||
// }
|
||||
//
|
||||
string outputBinding = Util.GetEnvironmentVariable("outputBinding");
|
||||
if (outputBinding.Length == 0)
|
||||
{
|
||||
log.Error("Value for outputBinding is required. Permitted values are: 'logstash', 'arcsight'.");
|
||||
return;
|
||||
}
|
||||
|
||||
switch (outputBinding)
|
||||
{
|
||||
case "logstash":
|
||||
await Util.obLogstash(myMessages, log);
|
||||
break;
|
||||
case "arcsight":
|
||||
await Util.obArcsight(myMessages, log);
|
||||
break;
|
||||
case "splunk":
|
||||
await Util.obSplunk(myMessages, log);
|
||||
break;
|
||||
case "eventhub":
|
||||
await Util.obEventHub(myMessages, log);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
static async Task CEFLog(string newClientContent, Binder cefLogBinder, Binder errorRecordBinder, TraceWriter log)
|
||||
{
|
||||
int count = 0;
|
||||
Byte[] transmission = new Byte[] { };
|
||||
|
||||
foreach (var message in Util.convertToCEF(newClientContent, errorRecordBinder, log))
|
||||
{
|
||||
|
||||
try
|
||||
{
|
||||
transmission = Util.AppendToTransmission(transmission, message);
|
||||
|
||||
// batch up the messages
|
||||
if (count++ == 1000)
|
||||
{
|
||||
Guid guid = Guid.NewGuid();
|
||||
var attributes = new Attribute[]
|
||||
{
|
||||
new BlobAttribute(String.Format("ceflog/{0}", guid)),
|
||||
new StorageAccountAttribute("cefLogAccount")
|
||||
};
|
||||
|
||||
CloudBlockBlob blob = await cefLogBinder.BindAsync<CloudBlockBlob>(attributes);
|
||||
await blob.UploadFromByteArrayAsync(transmission, 0, transmission.Length);
|
||||
|
||||
count = 0;
|
||||
transmission = new Byte[] { };
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error($"Exception logging CEF output: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
if (count != 0)
|
||||
{
|
||||
Guid guid = Guid.NewGuid();
|
||||
var attributes = new Attribute[]
|
||||
{
|
||||
new BlobAttribute(String.Format("ceflog/{0}", guid)),
|
||||
new StorageAccountAttribute("cefLogAccount")
|
||||
};
|
||||
|
||||
CloudBlockBlob blob = await cefLogBinder.BindAsync<CloudBlockBlob>(attributes);
|
||||
await blob.UploadFromByteArrayAsync(transmission, 0, transmission.Length);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -1,12 +1,12 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.WindowsAzure.Storage.Blob;
|
||||
using System;
|
||||
using System.Net.Http;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public partial class Util
|
||||
{
|
||||
|
@ -19,6 +19,65 @@ namespace NwNsgProject
|
|||
return result;
|
||||
}
|
||||
|
||||
public static async Task SendMessagesDownstreamAsync(string nsgMessagesString, ILogger log)
|
||||
{
|
||||
//
|
||||
// nsgMessagesString looks like this:
|
||||
//
|
||||
// ,{...}, <-- note leading comma
|
||||
// {...}
|
||||
// ...
|
||||
// {...}
|
||||
//
|
||||
// - OR -
|
||||
//
|
||||
// {...}, <-- note lack of leading comma
|
||||
// {...}
|
||||
// ...
|
||||
// {...}
|
||||
//
|
||||
string outputBinding = Util.GetEnvironmentVariable("outputBinding");
|
||||
if (outputBinding.Length == 0)
|
||||
{
|
||||
log.LogError("Value for outputBinding is required. Permitted values are: 'logstash', 'arcsight', 'splunk', 'eventhub'.");
|
||||
return;
|
||||
}
|
||||
|
||||
// skip past the leading comma
|
||||
string trimmedMessages = nsgMessagesString.Trim();
|
||||
int curlyBrace = trimmedMessages.IndexOf('{');
|
||||
string newClientContent = "{\"records\":[";
|
||||
newClientContent += trimmedMessages.Substring(curlyBrace);
|
||||
newClientContent += "]}";
|
||||
|
||||
//
|
||||
// newClientContent looks like this:
|
||||
// {
|
||||
// "records":[
|
||||
// {...},
|
||||
// {...}
|
||||
// ...
|
||||
// ]
|
||||
// }
|
||||
//
|
||||
|
||||
switch (outputBinding)
|
||||
{
|
||||
case "logstash":
|
||||
await Util.obLogstash(newClientContent, log);
|
||||
break;
|
||||
case "arcsight":
|
||||
await Util.obArcsight(newClientContent, log);
|
||||
break;
|
||||
case "splunk":
|
||||
await Util.obSplunk(newClientContent, log);
|
||||
break;
|
||||
case "eventhub":
|
||||
await Util.obEventHub(newClientContent, log);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
public class SingleHttpClientInstance
|
||||
{
|
||||
private static readonly HttpClient HttpClient;
|
||||
|
@ -29,7 +88,7 @@ namespace NwNsgProject
|
|||
HttpClient.Timeout = new TimeSpan(0, 1, 0);
|
||||
}
|
||||
|
||||
public static async Task<HttpResponseMessage> SendToLogstash(HttpRequestMessage req, TraceWriter log)
|
||||
public static async Task<HttpResponseMessage> SendToLogstash(HttpRequestMessage req, ILogger log)
|
||||
{
|
||||
HttpResponseMessage response = null;
|
||||
var httpClient = new HttpClient();
|
||||
|
@ -40,17 +99,17 @@ namespace NwNsgProject
|
|||
}
|
||||
catch (AggregateException ex)
|
||||
{
|
||||
log.Error("Got AggregateException.");
|
||||
log.LogError("Got AggregateException.");
|
||||
throw ex;
|
||||
}
|
||||
catch (TaskCanceledException ex)
|
||||
{
|
||||
log.Error("Got TaskCanceledException.");
|
||||
log.LogError("Got TaskCanceledException.");
|
||||
throw ex;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error("Got other exception.");
|
||||
log.LogError("Got other exception.");
|
||||
throw ex;
|
||||
}
|
||||
return response;
|
||||
|
@ -64,7 +123,7 @@ namespace NwNsgProject
|
|||
|
||||
}
|
||||
|
||||
public static async Task logErrorRecord(string errorRecord, Binder errorRecordBinder, TraceWriter log)
|
||||
public static async Task logErrorRecord(string errorRecord, Binder errorRecordBinder, ILogger log)
|
||||
{
|
||||
if (errorRecordBinder == null) { return; }
|
||||
|
||||
|
@ -82,17 +141,17 @@ namespace NwNsgProject
|
|||
};
|
||||
|
||||
CloudBlockBlob blob = await errorRecordBinder.BindAsync<CloudBlockBlob>(attributes);
|
||||
blob.UploadFromByteArray(transmission, 0, transmission.Length);
|
||||
await blob.UploadFromByteArrayAsync(transmission, 0, transmission.Length);
|
||||
|
||||
transmission = new Byte[] { };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error($"Exception logging record: {ex.Message}");
|
||||
log.LogError($"Exception logging record: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
static async Task logErrorRecord(NSGFlowLogRecord errorRecord, Binder errorRecordBinder, TraceWriter log)
|
||||
static async Task logErrorRecord(NSGFlowLogRecord errorRecord, Binder errorRecordBinder, ILogger log)
|
||||
{
|
||||
if (errorRecordBinder == null) { return; }
|
||||
|
||||
|
@ -110,13 +169,13 @@ namespace NwNsgProject
|
|||
};
|
||||
|
||||
CloudBlockBlob blob = await errorRecordBinder.BindAsync<CloudBlockBlob>(attributes);
|
||||
blob.UploadFromByteArray(transmission, 0, transmission.Length);
|
||||
await blob.UploadFromByteArrayAsync(transmission, 0, transmission.Length);
|
||||
|
||||
transmission = new Byte[] { };
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error($"Exception logging record: {ex.Message}");
|
||||
log.LogError($"Exception logging record: {ex.Message}");
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -171,6 +230,5 @@ namespace NwNsgProject
|
|||
return eqs(key, true) + eqs(value);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +0,0 @@
|
|||
using System;
|
||||
|
||||
namespace NwNsgProject
|
||||
{
|
||||
public class Chunk
|
||||
{
|
||||
public string BlobName { get; set; }
|
||||
public string BlobAccountConnectionName { get; set; }
|
||||
public long Length { get; set; }
|
||||
public long Start { get; set; }
|
||||
public string LastBlockName { get; set; }
|
||||
|
||||
public override string ToString()
|
||||
{
|
||||
var msg = string.Format("Connection: {0}, Block: {1}, Start: {2}, Length: {3}, Name: {4}", BlobAccountConnectionName, LastBlockName, Start, Length, BlobName);
|
||||
return msg;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
|
@ -1,2 +1,3 @@
|
|||
{
|
||||
"version": "2.0"
|
||||
}
|
|
@ -1,15 +1,15 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Net.Sockets;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public partial class Util
|
||||
{
|
||||
public static async Task obArcsight(string newClientContent, TraceWriter log)
|
||||
public static async Task obArcsight(string newClientContent, ILogger log)
|
||||
{
|
||||
//
|
||||
// newClientContent looks like this:
|
||||
|
@ -27,7 +27,7 @@ namespace NwNsgProject
|
|||
|
||||
if (arcsightAddress.Length == 0 || arcsightPort.Length == 0)
|
||||
{
|
||||
log.Error("Values for arcsightAddress and arcsightPort are required.");
|
||||
log.LogError("Values for arcsightAddress and arcsightPort are required.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -53,7 +53,7 @@ namespace NwNsgProject
|
|||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error($"Exception sending to ArcSight: {ex.Message}");
|
||||
log.LogError($"Exception sending to ArcSight: {ex.Message}");
|
||||
}
|
||||
}
|
||||
if (count > 0)
|
||||
|
@ -64,13 +64,13 @@ namespace NwNsgProject
|
|||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
log.Error($"Exception sending to ArcSight: {ex.Message}");
|
||||
log.LogError($"Exception sending to ArcSight: {ex.Message}");
|
||||
}
|
||||
}
|
||||
await stream.FlushAsync();
|
||||
}
|
||||
|
||||
public static System.Collections.Generic.IEnumerable<string> convertToCEF(string newClientContent, Binder errorRecordBinder, TraceWriter log)
|
||||
public static System.Collections.Generic.IEnumerable<string> convertToCEF(string newClientContent, Binder errorRecordBinder, ILogger log)
|
||||
{
|
||||
// newClientContent is a json string with records
|
||||
|
||||
|
|
|
@ -1,27 +1,25 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Azure.EventHubs;
|
||||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
using Newtonsoft.Json;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public partial class Util
|
||||
{
|
||||
const int MAXTRANSMISSIONSIZE = 255 * 1024;
|
||||
// const int MAXTRANSMISSIONSIZE = 2 * 1024;
|
||||
|
||||
public static async Task obEventHub(string newClientContent, TraceWriter log)
|
||||
public static async Task obEventHub(string newClientContent, ILogger log)
|
||||
{
|
||||
string EventHubConnectionString = GetEnvironmentVariable("eventHubConnection");
|
||||
string EventHubName = GetEnvironmentVariable("eventHubName");
|
||||
if (EventHubConnectionString.Length == 0 || EventHubName.Length == 0)
|
||||
{
|
||||
log.Error("Values for eventHubConnection and eventHubName are required.");
|
||||
log.LogError("Values for eventHubConnection and eventHubName are required.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -39,7 +37,7 @@ namespace NwNsgProject
|
|||
}
|
||||
}
|
||||
|
||||
static System.Collections.Generic.IEnumerable<string> bundleMessages(string newClientContent, TraceWriter log)
|
||||
static System.Collections.Generic.IEnumerable<string> bundleMessages(string newClientContent, ILogger log)
|
||||
{
|
||||
var transmission = new StringBuilder(MAXTRANSMISSIONSIZE);
|
||||
transmission.Append("{\"records\":[");
|
||||
|
@ -96,7 +94,7 @@ namespace NwNsgProject
|
|||
}
|
||||
}
|
||||
|
||||
static System.Collections.Generic.IEnumerable<string> denormalizeRecords(string newClientContent, Binder errorRecordBinder, TraceWriter log)
|
||||
static System.Collections.Generic.IEnumerable<string> denormalizeRecords(string newClientContent, Binder errorRecordBinder, ILogger log)
|
||||
{
|
||||
//
|
||||
// newClientContent looks like this:
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Net;
|
||||
using System.Net.Http;
|
||||
using System.Net.Http.Headers;
|
||||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public partial class Util
|
||||
{
|
||||
public static async Task obLogstash(string newClientContent, TraceWriter log)
|
||||
public static async Task obLogstash(string newClientContent, ILogger log)
|
||||
{
|
||||
string logstashAddress = Util.GetEnvironmentVariable("logstashAddress");
|
||||
string logstashHttpUser = Util.GetEnvironmentVariable("logstashHttpUser");
|
||||
|
@ -20,7 +18,7 @@ namespace NwNsgProject
|
|||
|
||||
if (logstashAddress.Length == 0 || logstashHttpUser.Length == 0 || logstashHttpPwd.Length == 0)
|
||||
{
|
||||
log.Error("Values for logstashAddress, logstashHttpUser and logstashHttpPwd are required.");
|
||||
log.LogError("Values for logstashAddress, logstashHttpUser and logstashHttpPwd are required.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -44,7 +42,7 @@ namespace NwNsgProject
|
|||
HttpResponseMessage response = await Util.SingleHttpClientInstance.SendToLogstash(req, log);
|
||||
if (response.StatusCode != HttpStatusCode.OK)
|
||||
{
|
||||
log.Error($"StatusCode from Logstash: {response.StatusCode}, and reason: {response.ReasonPhrase}");
|
||||
log.LogError($"StatusCode from Logstash: {response.StatusCode}, and reason: {response.ReasonPhrase}");
|
||||
}
|
||||
}
|
||||
catch (System.Net.Http.HttpRequestException e)
|
||||
|
@ -54,7 +52,7 @@ namespace NwNsgProject
|
|||
{
|
||||
msg += " *** " + e.InnerException.Message;
|
||||
}
|
||||
log.Error($"HttpRequestException Error: \"{msg}\" was caught while sending to Logstash.");
|
||||
log.LogError($"HttpRequestException Error: \"{msg}\" was caught while sending to Logstash.");
|
||||
throw e;
|
||||
}
|
||||
catch (Exception f)
|
||||
|
@ -64,7 +62,7 @@ namespace NwNsgProject
|
|||
{
|
||||
msg += " *** " + f.InnerException.Message;
|
||||
}
|
||||
log.Error($"Unknown error caught while sending to Logstash: \"{f.ToString()}\"");
|
||||
log.LogError($"Unknown error caught while sending to Logstash: \"{f.ToString()}\"");
|
||||
throw f;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
using Microsoft.Azure.WebJobs;
|
||||
using Microsoft.Azure.WebJobs.Host;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Newtonsoft.Json;
|
||||
using System;
|
||||
using System.Net;
|
||||
|
@ -10,11 +10,11 @@ using System.Security.Cryptography.X509Certificates;
|
|||
using System.Text;
|
||||
using System.Threading.Tasks;
|
||||
|
||||
namespace NwNsgProject
|
||||
namespace nsgFunc
|
||||
{
|
||||
public partial class Util
|
||||
{
|
||||
public static async Task obSplunk(string newClientContent, TraceWriter log)
|
||||
public static async Task obSplunk(string newClientContent, ILogger log)
|
||||
{
|
||||
//
|
||||
// newClientContent looks like this:
|
||||
|
@ -33,7 +33,7 @@ namespace NwNsgProject
|
|||
|
||||
if (splunkAddress.Length == 0 || splunkToken.Length == 0)
|
||||
{
|
||||
log.Error("Values for splunkAddress and splunkToken are required.");
|
||||
log.LogError("Values for splunkAddress and splunkToken are required.");
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -93,7 +93,7 @@ namespace NwNsgProject
|
|||
|
||||
}
|
||||
|
||||
static System.Collections.Generic.IEnumerable<string> convertToSplunk(string newClientContent, Binder errorRecordBinder, TraceWriter log)
|
||||
static System.Collections.Generic.IEnumerable<string> convertToSplunk(string newClientContent, Binder errorRecordBinder, ILogger log)
|
||||
{
|
||||
//
|
||||
// newClientContent looks like this:
|
||||
|
|
Загрузка…
Ссылка в новой задаче