Make core Host have no dependency on storage

1. split out storage bindings into a separate extension, which uses its own version of storage sdk (that can version separately from runtime's version)
2. Move runtime's internal storage to a new satellite dll (Host.Storage), which provides implementations for core runtime services like distributed queuing, Singleton, HostIdProvider, v1 logging, etc.

Remove legacy storage wrappers and use the storage SDK directly. This touches most files.

Tests are still broken - but Sample  app runs.

Move Protocol to a shared source project
This commit is contained in:
Mike 2018-05-02 15:30:21 -07:00 коммит произвёл Fabio Cavalcante
Родитель 7f130355c4
Коммит a67ea9d530
405 изменённых файлов: 3518 добавлений и 8321 удалений

Просмотреть файл

@ -11,10 +11,6 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs", "src\Microsoft.Az
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Host", "src\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj", "{39554ABE-D7B7-45FF-8D7C-56432ABCDE3D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Storage", "src\Microsoft.Azure.WebJobs.Storage\WebJobs.Storage.csproj", "{04CCE622-742C-4D8C-BCED-5B662D9C6454}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Protocols", "src\Microsoft.Azure.WebJobs.Protocols\WebJobs.Protocols.csproj", "{0C6F1900-EC03-4BFA-AFB6-9CA37401051A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Logging", "src\Microsoft.Azure.WebJobs.Logging\WebJobs.Logging.csproj", "{4F668623-B7D3-4834-99E2-2C0C8E0FFB9D}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Logging.ApplicationInsights", "src\Microsoft.Azure.WebJobs.Logging.ApplicationInsights\WebJobs.Logging.ApplicationInsights.csproj", "{32F7D43D-1DD6-46F3-90FE-99B914DE8DAA}"
@ -45,10 +41,22 @@ Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Shared", "src\Micro
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.ServiceBus.UnitTests", "test\Microsoft.Azure.WebJobs.ServiceBus.UnitTests\WebJobs.ServiceBus.UnitTests.csproj", "{D37637D5-7EF9-43CB-86BE-537473CD613B}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extension.Storage", "src\Microsoft.Azure.WebJobs.Extension.Storage\WebJobs.Extension.Storage.csproj", "{A9733406-267C-4A53-AB07-D3A834E22153}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Host.Storage", "src\Microsoft.Azure.WebJobs.Host.Storage\WebJobs.Host.Storage.csproj", "{DED33098-FE99-436C-96CC-B59A30BEF027}"
EndProject
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Shared.Storage", "src\Microsoft.Azure.WebJobs.Shared.Storage\WebJobs.Shared.Storage.shproj", "{6BED7F8A-A199-4D9D-85D1-6856EE3292C6}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.EventHubs", "src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.EventHubs.csproj", "{8498FA6B-3843-44A4-A351-E35711B7FFDF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WebJobs.Extension.Storage.UnitTests", "test\Microsoft.Azure.Webjobs.Extension.Storage.UnitTests\WebJobs.Extension.Storage.UnitTests.csproj", "{0CC5741F-ACDA-4DB8-9C17-074E8896F244}"
EndProject
Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "WebJobs.Protocols", "src\Microsoft.Azure.WebJobs.Protocols\WebJobs.Protocols.shproj", "{6FCD0852-6019-4CD5-9B7E-0DE021A72BD7}"
EndProject
Global
GlobalSection(SharedMSBuildProjectFiles) = preSolution
src\Microsoft.Azure.WebJobs.Shared.Storage\Microsoft.Azure.WebJobs.Shared.Storage.projitems*{6bed7f8a-a199-4d9d-85d1-6856ee3292c6}*SharedItemsImports = 13
src\Microsoft.Azure.WebJobs.Protocols\Microsoft.Azure.WebJobs.Protocols.projitems*{6fcd0852-6019-4cd5-9b7e-0de021a72bd7}*SharedItemsImports = 13
src\Microsoft.Azure.WebJobs.Shared\WebJobs.Shared.projitems*{add036f5-2170-4b05-9e0a-c2ed0a08a929}*SharedItemsImports = 13
EndGlobalSection
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@ -64,14 +72,6 @@ Global
{39554ABE-D7B7-45FF-8D7C-56432ABCDE3D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{39554ABE-D7B7-45FF-8D7C-56432ABCDE3D}.Release|Any CPU.ActiveCfg = Release|Any CPU
{39554ABE-D7B7-45FF-8D7C-56432ABCDE3D}.Release|Any CPU.Build.0 = Release|Any CPU
{04CCE622-742C-4D8C-BCED-5B662D9C6454}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{04CCE622-742C-4D8C-BCED-5B662D9C6454}.Debug|Any CPU.Build.0 = Debug|Any CPU
{04CCE622-742C-4D8C-BCED-5B662D9C6454}.Release|Any CPU.ActiveCfg = Release|Any CPU
{04CCE622-742C-4D8C-BCED-5B662D9C6454}.Release|Any CPU.Build.0 = Release|Any CPU
{0C6F1900-EC03-4BFA-AFB6-9CA37401051A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0C6F1900-EC03-4BFA-AFB6-9CA37401051A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0C6F1900-EC03-4BFA-AFB6-9CA37401051A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0C6F1900-EC03-4BFA-AFB6-9CA37401051A}.Release|Any CPU.Build.0 = Release|Any CPU
{4F668623-B7D3-4834-99E2-2C0C8E0FFB9D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4F668623-B7D3-4834-99E2-2C0C8E0FFB9D}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4F668623-B7D3-4834-99E2-2C0C8E0FFB9D}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -116,10 +116,22 @@ Global
{D37637D5-7EF9-43CB-86BE-537473CD613B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D37637D5-7EF9-43CB-86BE-537473CD613B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D37637D5-7EF9-43CB-86BE-537473CD613B}.Release|Any CPU.Build.0 = Release|Any CPU
{A9733406-267C-4A53-AB07-D3A834E22153}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A9733406-267C-4A53-AB07-D3A834E22153}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A9733406-267C-4A53-AB07-D3A834E22153}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A9733406-267C-4A53-AB07-D3A834E22153}.Release|Any CPU.Build.0 = Release|Any CPU
{DED33098-FE99-436C-96CC-B59A30BEF027}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DED33098-FE99-436C-96CC-B59A30BEF027}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DED33098-FE99-436C-96CC-B59A30BEF027}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DED33098-FE99-436C-96CC-B59A30BEF027}.Release|Any CPU.Build.0 = Release|Any CPU
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8498FA6B-3843-44A4-A351-E35711B7FFDF}.Release|Any CPU.Build.0 = Release|Any CPU
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0CC5741F-ACDA-4DB8-9C17-074E8896F244}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -133,6 +145,7 @@ Global
{340AB554-5482-4B3D-B65F-46DFF5AF1684} = {639967B0-0544-4C52-94AC-9A3D25E33256}
{27F0E6AC-505E-4BEC-81CA-8DF777DEA9C7} = {639967B0-0544-4C52-94AC-9A3D25E33256}
{D37637D5-7EF9-43CB-86BE-537473CD613B} = {639967B0-0544-4C52-94AC-9A3D25E33256}
{0CC5741F-ACDA-4DB8-9C17-074E8896F244} = {639967B0-0544-4C52-94AC-9A3D25E33256}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {371BFA14-0980-4A43-A18A-CA1C1A9CB784}

Просмотреть файл

@ -16,6 +16,7 @@ namespace SampleHost
[ErrorHandler]
public static class Functions
{
[Singleton]
public static void BlobTrigger(
[BlobTrigger("test")] string blob, ILogger logger)
{

Просмотреть файл

@ -10,6 +10,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Configuration;
using System.Collections.Generic;
using Microsoft.Extensions.Options;
namespace SampleHost
{
@ -17,6 +18,8 @@ namespace SampleHost
{
public static async Task Main(string[] args)
{
var x = new ServiceCollection();
var builder = new HostBuilder()
.UseEnvironment("Development")
.ConfigureWebJobsHost(o =>
@ -24,6 +27,11 @@ namespace SampleHost
// Example setting options properties:
// o.HostId = "testhostid";
})
// These can be toggled independently!
.AddWebJobsLogging() // Enables WebJobs v1 classic logging
.AddStorageForRuntimeInternals() // enables WebJobs to run distributed, via a storage account to coordinate
.AddStorageBindings() // adds [Blob], etc bindings for Azure Storage.
.ConfigureAppConfiguration(config =>
{
// Adding command line as a configuration source
@ -44,6 +52,8 @@ namespace SampleHost
var jobHost = builder.Build();
var opts = jobHost.Services.GetRequiredService<IOptions<LegacyConfig>>();
using (jobHost)
{
await jobHost.RunAsync();

Просмотреть файл

@ -23,6 +23,8 @@
<ItemGroup>
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extensions.EventHubs\WebJobs.EventHubs.csproj" />
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Extension.Storage\WebJobs.Extension.Storage.csproj" />
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host.Storage\WebJobs.Host.Storage.csproj" />
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Host\WebJobs.Host.csproj" />
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.Logging.ApplicationInsights\WebJobs.Logging.ApplicationInsights.csproj" />
<ProjectReference Include="..\..\src\Microsoft.Azure.WebJobs.ServiceBus\WebJobs.ServiceBus.csproj" />

Просмотреть файл

@ -6,6 +6,7 @@ using Microsoft.WindowsAzure.Storage.Table;
namespace Microsoft.Azure.WebJobs.Host
{
#if false // $$$ Anybody uses? This used to be called on Startup, call got removed in DI branch
/// <summary>
/// This helper addresses an issue in Azure Storage SDK where if certain assemblies are missing
/// the Storage SDK actually continues to work overall, but some features will silently fail.
@ -46,4 +47,5 @@ namespace Microsoft.Azure.WebJobs.Host
#endif
}
}
#endif
}

Просмотреть файл

@ -4,17 +4,16 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
{
internal class BlobCommittedAction : IBlobCommitedAction
{
private readonly IStorageBlob _blob;
private readonly ICloudBlob _blob;
private readonly IBlobWrittenWatcher _blobWrittenWatcher;
public BlobCommittedAction(IStorageBlob blob, IBlobWrittenWatcher blobWrittenWatcher)
public BlobCommittedAction(ICloudBlob blob, IBlobWrittenWatcher blobWrittenWatcher)
{
_blob = blob;
_blobWrittenWatcher = blobWrittenWatcher;

Просмотреть файл

@ -11,11 +11,7 @@ using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Blobs.Listeners;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Indexers;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
@ -25,12 +21,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
IAsyncConverter<BlobAttribute, CloudBlobDirectory>,
IAsyncConverter<BlobAttribute, BlobExtensionConfig.MultiBlobContext>
{
private IStorageAccountProvider _accountProvider;
private XStorageAccountProvider _accountProvider;
private IContextGetter<IBlobWrittenWatcher> _blobWrittenWatcherGetter;
private INameResolver _nameResolver;
private IConverterManager _converterManager;
public BlobExtensionConfig(IStorageAccountProvider accountProvider,
public BlobExtensionConfig(XStorageAccountProvider accountProvider,
IContextGetter<IBlobWrittenWatcher> contextAccessor,
INameResolver nameResolver,
IConverterManager converterManager)
@ -46,20 +42,20 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
BlobAttribute blobAttribute, CancellationToken cancellationToken)
{
var container = await GetContainerAsync(blobAttribute, cancellationToken);
return container.SdkObject;
return container;
}
// Write-only rule.
async Task<CloudBlobDirectory> IAsyncConverter<BlobAttribute, CloudBlobDirectory>.ConvertAsync(
BlobAttribute blobAttribute, CancellationToken cancellationToken)
{
IStorageBlobClient client = await GetClientAsync(blobAttribute, cancellationToken);
var client = await GetClientAsync(blobAttribute, cancellationToken);
BlobPath boundPath = BlobPath.ParseAndValidate(blobAttribute.BlobPath, isContainerBinding: false);
IStorageBlobContainer container = client.GetContainerReference(boundPath.ContainerName);
var container = client.GetContainerReference(boundPath.ContainerName);
CloudBlobDirectory directory = container.SdkObject.GetDirectoryReference(
CloudBlobDirectory directory = container.GetDirectoryReference(
boundPath.BlobName);
return directory;
@ -80,7 +76,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
private async Task<T> CreateBlobReference<T>(BlobAttribute blobAttribute, CancellationToken cancellationToken)
{
var blob = await GetBlobAsync(blobAttribute, cancellationToken, typeof(T));
return (T)(blob.SdkObject);
return (T)(blob);
}
#endregion
@ -111,12 +107,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
// T must have been matched by MultiBlobType
class BlobCollectionConverter<T> : IAsyncConverter<MultiBlobContext, IEnumerable<T>>
{
private readonly FuncAsyncConverter<IStorageBlob, T> _converter;
private readonly FuncAsyncConverter<ICloudBlob, T> _converter;
public BlobCollectionConverter(BlobExtensionConfig parent)
{
IConverterManager cm = parent._converterManager;
_converter = cm.GetConverter<IStorageBlob, T, BlobAttribute>();
_converter = cm.GetConverter<ICloudBlob, T, BlobAttribute>();
if (_converter == null)
{
throw new InvalidOperationException($"Can't convert blob to {typeof(T).FullName}.");
@ -130,20 +126,20 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
// bind to CloudBlobDirectory.
string prefix = context.Prefix;
var container = context.Container;
IEnumerable<IStorageListBlobItem> blobItems = await container.ListBlobsAsync(prefix, true, cancellationToken);
IEnumerable<IListBlobItem> blobItems = await container.ListBlobsAsync(prefix, true, cancellationToken);
// create an IEnumerable<T> of the correct type, performing any required conversions on the blobs
var list = await ConvertBlobs(blobItems);
return list;
}
private async Task<IEnumerable<T>> ConvertBlobs(IEnumerable<IStorageListBlobItem> blobItems)
private async Task<IEnumerable<T>> ConvertBlobs(IEnumerable<IListBlobItem> blobItems)
{
var list = new List<T>();
foreach (var blobItem in blobItems)
{
var src = (IStorageBlob)blobItem;
var src = (ICloudBlob)blobItem;
var funcCtx = new FunctionBindingContext(Guid.Empty, CancellationToken.None, null);
var valueCtx = new ValueBindingContext(funcCtx, CancellationToken.None);
@ -161,7 +157,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
private class MultiBlobContext
{
public string Prefix;
public IStorageBlobContainer Container;
public CloudBlobContainer Container;
}
// Initial rule that captures the muti-blob context.
@ -202,43 +198,43 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
}
}
private async Task<IStorageBlobClient> GetClientAsync(
private async Task<CloudBlobClient> GetClientAsync(
BlobAttribute blobAttribute,
CancellationToken cancellationToken)
{
IStorageAccount account = await _accountProvider.GetStorageAccountAsync(blobAttribute, cancellationToken, _nameResolver);
IStorageBlobClient client = account.CreateBlobClient();
var account = _accountProvider.Get(blobAttribute.Connection, _nameResolver);
var client = account.CreateCloudBlobClient();
return client;
}
private async Task<IStorageBlobContainer> GetContainerAsync(
private async Task<CloudBlobContainer> GetContainerAsync(
BlobAttribute blobAttribute,
CancellationToken cancellationToken)
{
IStorageBlobClient client = await GetClientAsync(blobAttribute, cancellationToken);
var client = await GetClientAsync(blobAttribute, cancellationToken);
BlobPath boundPath = BlobPath.ParseAndValidate(blobAttribute.BlobPath, isContainerBinding: true);
IStorageBlobContainer container = client.GetContainerReference(boundPath.ContainerName);
var container = client.GetContainerReference(boundPath.ContainerName);
return container;
}
private async Task<IStorageBlob> GetBlobAsync(
private async Task<ICloudBlob> GetBlobAsync(
BlobAttribute blobAttribute,
CancellationToken cancellationToken,
Type requestedType = null)
{
IStorageBlobClient client = await GetClientAsync(blobAttribute, cancellationToken);
var client = await GetClientAsync(blobAttribute, cancellationToken);
BlobPath boundPath = BlobPath.ParseAndValidate(blobAttribute.BlobPath);
IStorageBlobContainer container = client.GetContainerReference(boundPath.ContainerName);
var container = client.GetContainerReference(boundPath.ContainerName);
if (blobAttribute.Access != FileAccess.Read)
{
await container.CreateIfNotExistsAsync(cancellationToken);
}
IStorageBlob blob = await container.GetBlobReferenceForArgumentTypeAsync(
var blob = await container.GetBlobReferenceForArgumentTypeAsync(
boundPath.BlobName, requestedType, cancellationToken);
return blob;

Просмотреть файл

@ -4,7 +4,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
@ -12,22 +11,22 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
{
internal static class StorageBlobContainerExtensions
{
public static Task<IStorageBlob> GetBlobReferenceForArgumentTypeAsync(this IStorageBlobContainer container,
public static Task<ICloudBlob> GetBlobReferenceForArgumentTypeAsync(this CloudBlobContainer container,
string blobName, Type argumentType, CancellationToken cancellationToken)
{
if (argumentType == typeof(CloudBlockBlob))
{
IStorageBlob blob = container.GetBlockBlobReference(blobName);
ICloudBlob blob = container.GetBlockBlobReference(blobName);
return Task.FromResult(blob);
}
else if (argumentType == typeof(CloudPageBlob))
{
IStorageBlob blob = container.GetPageBlobReference(blobName);
ICloudBlob blob = container.GetPageBlobReference(blobName);
return Task.FromResult(blob);
}
else if (argumentType == typeof(CloudAppendBlob))
{
IStorageBlob blob = container.GetAppendBlobReference(blobName);
ICloudBlob blob = container.GetAppendBlobReference(blobName);
return Task.FromResult(blob);
}
else
@ -36,7 +35,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
}
}
private static async Task<IStorageBlob> GetExistingOrNewBlockBlobReferenceAsync(IStorageBlobContainer container,
private static async Task<ICloudBlob> GetExistingOrNewBlockBlobReferenceAsync(CloudBlobContainer container,
string blobName, CancellationToken cancellationToken)
{
try

Просмотреть файл

@ -4,17 +4,16 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Bindings
{
internal static class WriteBlobArgumentBinding
{
public static async Task<WatchableCloudBlobStream> BindStreamAsync(IStorageBlob blob,
public static async Task<WatchableCloudBlobStream> BindStreamAsync(ICloudBlob blob,
ValueBindingContext context, IBlobWrittenWatcher blobWrittenWatcher)
{
IStorageBlockBlob blockBlob = blob as IStorageBlockBlob;
var blockBlob = blob as CloudBlockBlob;
if (blockBlob == null)
{

Просмотреть файл

@ -7,7 +7,6 @@ using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
@ -36,28 +35,6 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs
metadata[BlobMetadataKeys.ParentId] = function.ToString();
}
public static async Task<Guid?> GetWriterAsync(IStorageBlob blob, CancellationToken cancellationToken)
{
if (!await blob.TryFetchAttributesAsync(cancellationToken))
{
return null;
}
if (!blob.Metadata.ContainsKey(BlobMetadataKeys.ParentId))
{
return null;
}
string val = blob.Metadata[BlobMetadataKeys.ParentId];
Guid result;
if (Guid.TryParse(val, out result))
{
return result;
}
return null;
}
public static async Task<Guid?> GetWriterAsync(ICloudBlob blob, CancellationToken cancellationToken)
{
if (!await blob.TryFetchAttributesAsync(cancellationToken))

Просмотреть файл

@ -1,10 +1,10 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
@ -21,7 +21,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs
get { return Singleton; }
}
public Task<Guid?> GetWriterAsync(IStorageBlob blob, CancellationToken cancellationToken)
public Task<Guid?> GetWriterAsync(ICloudBlob blob, CancellationToken cancellationToken)
{
return BlobCausalityManager.GetWriterAsync(blob, cancellationToken);
}

Просмотреть файл

@ -4,24 +4,25 @@
using System;
using System.Globalization;
using System.Text.RegularExpressions;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
namespace Microsoft.Azure.WebJobs
{
internal static class BlobClient
{
// Tested against storage service on July 2016. All other unsafe and reserved characters work fine.
private static readonly char[] UnsafeBlobNameCharacters = { '\\' };
public static string GetAccountName(IStorageBlobClient client)
public static string GetAccountName(this CloudStorageAccount client)
{
if (client == null)
{
return null;
}
return client?.Credentials?.AccountName;
}
return StorageClient.GetAccountName(client.Credentials);
public static string GetAccountName(this CloudBlobClient client)
{
return client?.Credentials?.AccountName;
}
// Naming rules are here: http://msdn.microsoft.com/en-us/library/dd135715.aspx

Просмотреть файл

@ -4,37 +4,37 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Converters;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal class BlobOutputConverter<TInput> : IAsyncObjectToTypeConverter<IStorageBlob>
internal class BlobOutputConverter<TInput> : IAsyncObjectToTypeConverter<ICloudBlob>
where TInput : class
{
private readonly IAsyncConverter<TInput, IStorageBlob> _innerConverter;
private readonly IAsyncConverter<TInput, ICloudBlob> _innerConverter;
public BlobOutputConverter(IAsyncConverter<TInput, IStorageBlob> innerConverter)
public BlobOutputConverter(IAsyncConverter<TInput, ICloudBlob> innerConverter)
{
_innerConverter = innerConverter;
}
public async Task<ConversionResult<IStorageBlob>> TryConvertAsync(object input,
public async Task<ConversionResult<ICloudBlob>> TryConvertAsync(object input,
CancellationToken cancellationToken)
{
TInput typedInput = input as TInput;
if (typedInput == null)
{
return new ConversionResult<IStorageBlob>
return new ConversionResult<ICloudBlob>
{
Succeeded = false,
Result = null
};
}
IStorageBlob blob = await _innerConverter.ConvertAsync(typedInput, cancellationToken);
var blob = await _innerConverter.ConvertAsync(typedInput, cancellationToken);
return new ConversionResult<IStorageBlob>
return new ConversionResult<ICloudBlob>
{
Succeeded = true,
Result = blob

Просмотреть файл

@ -4,13 +4,13 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal sealed class BlobWatchableDisposableValueProvider : IValueProvider, IWatchable, IDisposable
{
private readonly IStorageBlob _blob;
private readonly ICloudBlob _blob;
private readonly object _value;
private readonly Type _valueType;
private readonly IWatcher _watcher;
@ -18,7 +18,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs
private bool _disposed;
public BlobWatchableDisposableValueProvider(IStorageBlob blob, object value, Type valueType, IWatcher watcher,
public BlobWatchableDisposableValueProvider(ICloudBlob blob, object value, Type valueType, IWatcher watcher,
IDisposable disposable)
{
if (value != null && !valueType.IsAssignableFrom(value.GetType()))

Просмотреть файл

@ -4,18 +4,18 @@
using System;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal sealed class BlobWatchableValueProvider : IValueProvider, IWatchable
{
private readonly IStorageBlob _blob;
private readonly ICloudBlob _blob;
private readonly object _value;
private readonly Type _valueType;
private readonly IWatcher _watcher;
public BlobWatchableValueProvider(IStorageBlob blob, object value, Type valueType, IWatcher watcher)
public BlobWatchableValueProvider(ICloudBlob blob, object value, Type valueType, IWatcher watcher)
{
if (value != null && !valueType.IsAssignableFrom(value.GetType()))
{
@ -38,7 +38,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs
get { return _watcher; }
}
public static BlobWatchableValueProvider Create<T>(IStorageBlob blob, T value, IWatcher watcher)
public static BlobWatchableValueProvider Create<T>(ICloudBlob blob, T value, IWatcher watcher)
{
return new BlobWatchableValueProvider(blob, value: value, valueType: typeof(T), watcher: watcher);
}

Просмотреть файл

@ -4,7 +4,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;

Просмотреть файл

@ -4,12 +4,12 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal interface IBlobCausalityReader
{
Task<Guid?> GetWriterAsync(IStorageBlob blob, CancellationToken cancellationToken);
Task<Guid?> GetWriterAsync(ICloudBlob blob, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -1,12 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal interface IBlobWrittenWatcher
{
void Notify(IStorageBlob blobWritten);
void Notify(ICloudBlob blobWritten);
}
}

Просмотреть файл

@ -3,9 +3,8 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -22,7 +21,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
get { return Singleton; }
}
public async Task<string> GetETagAsync(IStorageBlob blob, CancellationToken cancellationToken)
public async Task<string> GetETagAsync(ICloudBlob blob, CancellationToken cancellationToken)
{
try
{

Просмотреть файл

@ -8,11 +8,10 @@ using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -24,32 +23,32 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
private readonly JobHostBlobsOptions _blobsOptions;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IBlobWrittenWatcher> _blobWrittenWatcherSetter;
private readonly IContextSetter<IMessageEnqueuedWatcher> _messageEnqueuedWatcherSetter;
private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
private readonly ISharedContextProvider _sharedContextProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly string _functionId;
private readonly IStorageAccount _hostAccount;
private readonly IStorageAccount _dataAccount;
private readonly IStorageBlobContainer _container;
private readonly XStorageAccount _hostAccount;
private readonly XStorageAccount _dataAccount;
private readonly CloudBlobContainer _container;
private readonly IBlobPathSource _input;
private readonly ITriggeredFunctionExecutor _executor;
private readonly SingletonManager _singletonManager;
private readonly IHostSingletonManager _singletonManager;
public BlobListenerFactory(IHostIdProvider hostIdProvider,
JobHostQueuesOptions queueOptions,
JobHostBlobsOptions blobsOptions,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IBlobWrittenWatcher> blobWrittenWatcherSetter,
IContextSetter<IMessageEnqueuedWatcher> messageEnqueuedWatcherSetter,
SharedQueueWatcher messageEnqueuedWatcherSetter,
ISharedContextProvider sharedContextProvider,
ILoggerFactory loggerFactory,
string functionId,
IStorageAccount hostAccount,
IStorageAccount dataAccount,
IStorageBlobContainer container,
XStorageAccount hostAccount,
XStorageAccount dataAccount,
CloudBlobContainer container,
IBlobPathSource input,
ITriggeredFunctionExecutor executor,
SingletonManager singletonManager)
IHostSingletonManager singletonManager)
{
_hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
@ -72,21 +71,20 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
// Note that these clients are intentionally for the storage account rather than for the dashboard account.
// We use the storage, not dashboard, account for the blob receipt container and blob trigger queues.
IStorageQueueClient primaryQueueClient = _hostAccount.CreateQueueClient();
IStorageBlobClient primaryBlobClient = _hostAccount.CreateBlobClient();
var primaryQueueClient = _hostAccount.CreateCloudQueueClient();
var primaryBlobClient = _hostAccount.CreateCloudBlobClient();
// Important: We're using the storage account of the function target here, which is the account that the
// function the listener is for is targeting. This is the account that will be used
// to read the trigger blob.
IStorageBlobClient targetBlobClient = _dataAccount.CreateBlobClient();
IStorageQueueClient targetQueueClient = _dataAccount.CreateQueueClient();
var targetBlobClient = _dataAccount.CreateCloudBlobClient();
var targetQueueClient = _dataAccount.CreateCloudQueueClient();
string hostId = await _hostIdProvider.GetHostIdAsync(cancellationToken);
string hostBlobTriggerQueueName = HostQueueNames.GetHostBlobTriggerQueueName(hostId);
IStorageQueue hostBlobTriggerQueue = primaryQueueClient.GetQueueReference(hostBlobTriggerQueueName);
var hostBlobTriggerQueue = primaryQueueClient.GetQueueReference(hostBlobTriggerQueueName);
SharedQueueWatcher sharedQueueWatcher = _sharedContextProvider.GetOrCreateInstance<SharedQueueWatcher>(
new SharedQueueWatcherFactory(_messageEnqueuedWatcherSetter));
SharedQueueWatcher sharedQueueWatcher = _messageEnqueuedWatcherSetter;
SharedBlobListener sharedBlobListener = _sharedContextProvider.GetOrCreateInstance<SharedBlobListener>(
new SharedBlobListenerFactory(hostId, _hostAccount, _exceptionHandler, _blobWrittenWatcherSetter));
@ -106,7 +104,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
// by default this should target the same storage account
// as the blob container we're monitoring
var poisonQueueClient = targetQueueClient;
if (_dataAccount.Type != StorageAccountType.GeneralPurpose ||
if (
// _dataAccount.Type != StorageAccountType.GeneralPurpose || $$$
_blobsOptions.CentralizedPoisonQueue)
{
// use the primary storage account if the centralize flag is true,
@ -144,8 +143,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
private async Task RegisterWithSharedBlobListenerAsync(
string hostId,
SharedBlobListener sharedBlobListener,
IStorageBlobClient blobClient,
IStorageQueue hostBlobTriggerQueue,
CloudBlobClient blobClient,
CloudQueue hostBlobTriggerQueue,
IMessageEnqueuedWatcher messageEnqueuedWatcher,
CancellationToken cancellationToken)
{
@ -158,8 +157,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
private void RegisterWithSharedBlobQueueListenerAsync(
SharedBlobQueueListener sharedBlobQueueListener,
IStorageBlobClient blobClient,
IStorageQueueClient queueClient)
CloudBlobClient blobClient,
CloudQueueClient queueClient)
{
BlobQueueRegistration registration = new BlobQueueRegistration
{

Просмотреть файл

@ -7,7 +7,6 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Shared.Protocol;
@ -22,32 +21,32 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
private const int DefaultScanHoursWindow = 2;
private readonly IStorageBlobClient _blobClient;
private readonly CloudBlobClient _blobClient;
private readonly HashSet<string> _scannedBlobNames = new HashSet<string>();
private readonly StorageAnalyticsLogParser _parser = new StorageAnalyticsLogParser();
private BlobLogListener(IStorageBlobClient blobClient)
private BlobLogListener(CloudBlobClient blobClient)
{
_blobClient = blobClient;
}
public IStorageBlobClient Client
public CloudBlobClient Client
{
get { return _blobClient; }
}
// This will throw if the client credentials are not valid.
public static async Task<BlobLogListener> CreateAsync(IStorageBlobClient blobClient,
public static async Task<BlobLogListener> CreateAsync(CloudBlobClient blobClient,
CancellationToken cancellationToken)
{
await EnableLoggingAsync(blobClient, cancellationToken);
return new BlobLogListener(blobClient);
}
public async Task<IEnumerable<IStorageBlob>> GetRecentBlobWritesAsync(CancellationToken cancellationToken,
public async Task<IEnumerable<ICloudBlob>> GetRecentBlobWritesAsync(CancellationToken cancellationToken,
int hoursWindow = DefaultScanHoursWindow)
{
List<IStorageBlob> blobs = new List<IStorageBlob>();
List<ICloudBlob> blobs = new List<ICloudBlob>();
var time = DateTime.UtcNow; // will scan back 2 hours, which is enough to deal with clock sqew
foreach (var blob in await ListRecentLogFilesAsync(_blobClient, time, cancellationToken, hoursWindow))
@ -69,7 +68,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
foreach (BlobPath path in filteredBlobs)
{
IStorageBlobContainer container = _blobClient.GetContainerReference(path.ContainerName);
var container = _blobClient.GetContainerReference(path.ContainerName);
blobs.Add(container.GetBlockBlobReference(path.BlobName));
}
}
@ -137,12 +136,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
// This lets us use prefix scans. $logs/Blob/YYYY/MM/DD/HH00/nnnnnn.log
// Logs are about 6 an hour, so we're only scanning about 12 logs total.
// $$$ If logs are large, we can even have a cache of "already scanned" logs that we skip.
private static async Task<List<IStorageBlob>> ListRecentLogFilesAsync(IStorageBlobClient blobClient,
private static async Task<List<ICloudBlob>> ListRecentLogFilesAsync(CloudBlobClient blobClient,
DateTime startTimeForSearch, CancellationToken cancellationToken, int hoursWindow)
{
string serviceName = "blob";
List<IStorageBlob> selectedLogs = new List<IStorageBlob>();
List<ICloudBlob> selectedLogs = new List<ICloudBlob>();
var lastHour = startTimeForSearch;
for (int i = 0; i < hoursWindow; i++)
@ -157,19 +156,19 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
// Populate the List<> with blob logs for the given prefix.
// http://blogs.msdn.com/b/windowsazurestorage/archive/2011/08/03/windows-azure-storage-logging-using-logs-to-track-storage-requests.aspx
private static async Task GetLogsWithPrefixAsync(List<IStorageBlob> selectedLogs, IStorageBlobClient blobClient,
private static async Task GetLogsWithPrefixAsync(List<ICloudBlob> selectedLogs, CloudBlobClient blobClient,
string prefix, CancellationToken cancellationToken)
{
// List the blobs using the prefix
IEnumerable<IStorageListBlobItem> blobs = await blobClient.ListBlobsAsync(prefix,
IEnumerable<IListBlobItem> blobs = await blobClient.ListBlobsAsync(prefix,
useFlatBlobListing: true, blobListingDetails: BlobListingDetails.Metadata,
cancellationToken: cancellationToken);
// iterate through each blob and figure the start and end times in the metadata
// Type cast to IStorageBlob is safe due to useFlatBlobListing: true above.
foreach (IStorageBlob item in blobs)
foreach (var item in blobs)
{
IStorageBlob log = item as IStorageBlob;
ICloudBlob log = item as ICloudBlob;
if (log != null)
{
// we will exclude the file if the file does not have log entries in the interested time range.
@ -184,7 +183,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public static async Task EnableLoggingAsync(IStorageBlobClient blobClient, CancellationToken cancellationToken)
public static async Task EnableLoggingAsync(CloudBlobClient blobClient, CancellationToken cancellationToken)
{
ServiceProperties serviceProperties = await blobClient.GetServicePropertiesAsync(cancellationToken);

Просмотреть файл

@ -2,8 +2,8 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -23,11 +23,11 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
/// storage account that the blob triggered function is listening
/// to).
/// </summary>
public IStorageBlobClient BlobClient { get; set; }
public CloudBlobClient BlobClient { get; set; }
/// <summary>
/// The storage client to use for the poison queue.
/// </summary>
public IStorageQueueClient QueueClient { get; set; }
public CloudQueueClient QueueClient { get; set; }
}
}

Просмотреть файл

@ -8,13 +8,14 @@ using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class BlobQueueTriggerExecutor : ITriggerExecutor<IStorageQueueMessage>
internal class BlobQueueTriggerExecutor : ITriggerExecutor<CloudQueueMessage>
{
private readonly IBlobETagReader _eTagReader;
private readonly IBlobCausalityReader _causalityReader;
@ -45,7 +46,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
_registrations.AddOrUpdate(functionId, registration, (i1, i2) => registration);
}
public async Task<FunctionResult> ExecuteAsync(IStorageQueueMessage value, CancellationToken cancellationToken)
public async Task<FunctionResult> ExecuteAsync(CloudQueueMessage value, CancellationToken cancellationToken)
{
BlobTriggerMessage message = JsonConvert.DeserializeObject<BlobTriggerMessage>(value.AsString, JsonSerialization.Settings);
@ -69,33 +70,23 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return successResult;
}
IStorageBlobContainer container = registration.BlobClient.GetContainerReference(message.ContainerName);
var container = registration.BlobClient.GetContainerReference(message.ContainerName);
string blobName = message.BlobName;
IStorageBlob blob;
ICloudBlob blob;
switch (message.BlobType)
try
{
case StorageBlobType.PageBlob:
blob = container.GetPageBlobReference(blobName);
break;
case StorageBlobType.AppendBlob:
blob = container.GetAppendBlobReference(blobName);
break;
case StorageBlobType.BlockBlob:
default:
blob = container.GetBlockBlobReference(blobName);
break;
blob = await container.GetBlobReferenceFromServerAsync(blobName);
}
catch (StorageException exception) when (exception.IsNotFound() || exception.IsOk())
{
// If the blob no longer exists, just ignore this message.
return successResult;
}
// Ensure the blob still exists with the same ETag.
string possibleETag = await _eTagReader.GetETagAsync(blob, cancellationToken);
if (possibleETag == null)
{
// If the blob no longer exists, just ignore this message.
return successResult;
}
string possibleETag = blob.Properties.ETag; // set since we fetched from server
// If the blob still exists but the ETag is different, delete the message but do a fast path notification.
if (!String.Equals(message.ETag, possibleETag, StringComparison.Ordinal))

Просмотреть файл

@ -6,8 +6,6 @@ using System.Diagnostics;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
@ -17,20 +15,20 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
private static readonly TimeSpan LeasePeriod = TimeSpan.FromSeconds(30);
private readonly IStorageBlobDirectory _directory;
private readonly CloudBlobDirectory _directory;
public BlobReceiptManager(IStorageBlobClient client)
public BlobReceiptManager(CloudBlobClient client)
: this(client.GetContainerReference(HostContainerNames.Hosts)
.GetDirectoryReference(HostDirectoryNames.BlobReceipts))
{
}
private BlobReceiptManager(IStorageBlobDirectory directory)
private BlobReceiptManager(CloudBlobDirectory directory)
{
_directory = directory;
}
public IStorageBlockBlob CreateReference(string hostId, string functionId, string containerName,
public CloudBlockBlob CreateReference(string hostId, string functionId, string containerName,
string blobName, string eTag)
{
// Put the ETag before the blob name to prevent ambiguity since the blob name can contain embedded slashes.
@ -39,7 +37,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return _directory.GetBlockBlobReference(receiptName);
}
public async Task<BlobReceipt> TryReadAsync(IStorageBlockBlob blob, CancellationToken cancellationToken)
public async Task<BlobReceipt> TryReadAsync(CloudBlockBlob blob, CancellationToken cancellationToken)
{
if (!await blob.TryFetchAttributesAsync(cancellationToken))
{
@ -49,7 +47,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return BlobReceipt.FromMetadata(blob.Metadata);
}
public async Task<bool> TryCreateAsync(IStorageBlockBlob blob, CancellationToken cancellationToken)
public async Task<bool> TryCreateAsync(CloudBlockBlob blob, CancellationToken cancellationToken)
{
BlobReceipt.Incomplete.ToMetadata(blob.Metadata);
AccessCondition accessCondition = new AccessCondition { IfNoneMatchETag = "*" };
@ -115,7 +113,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public async Task<string> TryAcquireLeaseAsync(IStorageBlockBlob blob, CancellationToken cancellationToken)
public async Task<string> TryAcquireLeaseAsync(CloudBlockBlob blob, CancellationToken cancellationToken)
{
try
{
@ -139,7 +137,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public async Task MarkCompletedAsync(IStorageBlockBlob blob, string leaseId,
public async Task MarkCompletedAsync(CloudBlockBlob blob, string leaseId,
CancellationToken cancellationToken)
{
BlobReceipt.Complete.ToMetadata(blob.Metadata);
@ -169,7 +167,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public async Task ReleaseLeaseAsync(IStorageBlockBlob blob, string leaseId, CancellationToken cancellationToken)
public async Task ReleaseLeaseAsync(CloudBlockBlob blob, string leaseId, CancellationToken cancellationToken)
{
try
{

Просмотреть файл

@ -9,11 +9,11 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class BlobTriggerExecutor : ITriggerExecutor<IStorageBlob>
internal class BlobTriggerExecutor : ITriggerExecutor<ICloudBlob>
{
private readonly string _hostId;
private readonly string _functionId;
@ -33,7 +33,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
_receiptManager = receiptManager;
}
public async Task<FunctionResult> ExecuteAsync(IStorageBlob value, CancellationToken cancellationToken)
public async Task<FunctionResult> ExecuteAsync(ICloudBlob value, CancellationToken cancellationToken)
{
// Avoid unnecessary network calls for non-matches. First, check to see if the blob matches this trigger.
IReadOnlyDictionary<string, object> bindingData = _input.CreateBindingData(value.ToBlobPath());
@ -53,7 +53,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return new FunctionResult(true);
}
IStorageBlockBlob receiptBlob = _receiptManager.CreateReference(_hostId, _functionId, value.Container.Name,
var receiptBlob = _receiptManager.CreateReference(_hostId, _functionId, value.Container.Name,
value.Name, possibleETag);
// Check for the completed receipt. If it's already there, noop.

Просмотреть файл

@ -2,7 +2,7 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Diagnostics.CodeAnalysis;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
using Newtonsoft.Json.Converters;
@ -22,7 +22,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
public string FunctionId { get; set; }
[JsonConverter(typeof(StringEnumConverter))]
public StorageBlobType BlobType { get; set; }
public BlobType BlobType { get; set; }
public string ContainerName { get; set; }

Просмотреть файл

@ -6,17 +6,18 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
using WebJobs.Extension.Storage;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class BlobTriggerQueueWriter : IBlobTriggerQueueWriter
{
private readonly IStorageQueue _queue;
private readonly CloudQueue _queue;
private readonly IMessageEnqueuedWatcher _watcher;
public BlobTriggerQueueWriter(IStorageQueue queue, IMessageEnqueuedWatcher watcher)
public BlobTriggerQueueWriter(CloudQueue queue, IMessageEnqueuedWatcher watcher)
{
_queue = queue;
Debug.Assert(watcher != null);
@ -26,7 +27,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
public async Task EnqueueAsync(BlobTriggerMessage message, CancellationToken cancellationToken)
{
string contents = JsonConvert.SerializeObject(message, JsonSerialization.Settings);
await _queue.AddMessageAndCreateIfNotExistsAsync(_queue.CreateMessage(contents), cancellationToken);
await _queue.AddMessageAndCreateIfNotExistsAsync(new CloudQueueMessage(contents), cancellationToken);
_watcher.Notify(_queue.Name);
}
}

Просмотреть файл

@ -4,14 +4,13 @@
using System;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class ContainerScanInfo
{
public ICollection<ITriggerExecutor<IStorageBlob>> Registrations { get; set; }
public ICollection<ITriggerExecutor<ICloudBlob>> Registrations { get; set; }
public DateTime LastSweepCycleLatestModified { get; set; }

Просмотреть файл

@ -3,12 +3,12 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal interface IBlobETagReader
{
Task<string> GetETagAsync(IStorageBlob blob, CancellationToken cancellationToken);
Task<string> GetETagAsync(ICloudBlob blob, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -4,14 +4,14 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal interface IBlobNotificationStrategy : ITaskSeriesCommand, IBlobWrittenWatcher
{
Task RegisterAsync(IStorageBlobContainer container, ITriggerExecutor<IStorageBlob> triggerExecutor,
Task RegisterAsync(CloudBlobContainer container, ITriggerExecutor<ICloudBlob> triggerExecutor,
CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -0,0 +1,25 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.WindowsAzure.Storage.Blob;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal interface IBlobReceiptManager
{
CloudBlockBlob CreateReference(string hostId, string functionId, string containerName, string blobName,
string eTag);
Task<BlobReceipt> TryReadAsync(CloudBlockBlob blob, CancellationToken cancellationToken);
Task<bool> TryCreateAsync(CloudBlockBlob blob, CancellationToken cancellationToken);
Task<string> TryAcquireLeaseAsync(CloudBlockBlob blob, CancellationToken cancellationToken);
Task MarkCompletedAsync(CloudBlockBlob blob, string leaseId, CancellationToken cancellationToken);
Task ReleaseLeaseAsync(CloudBlockBlob blob, string leaseId, CancellationToken cancellationToken);
}
}

Просмотреть файл

@ -9,10 +9,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -21,26 +20,26 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2);
private readonly IDictionary<IStorageBlobContainer, ICollection<ITriggerExecutor<IStorageBlob>>> _registrations;
private readonly IDictionary<IStorageBlobClient, BlobLogListener> _logListeners;
private readonly IDictionary<CloudBlobContainer, ICollection<ITriggerExecutor<ICloudBlob>>> _registrations;
private readonly IDictionary<CloudBlobClient, BlobLogListener> _logListeners;
private readonly Thread _initialScanThread;
private readonly ConcurrentQueue<IStorageBlob> _blobsFoundFromScanOrNotification;
private readonly ConcurrentQueue<ICloudBlob> _blobsFoundFromScanOrNotification;
private readonly CancellationTokenSource _cancellationTokenSource;
private bool _performInitialScan;
private bool _disposed;
public PollLogsStrategy(bool performInitialScan = true)
{
_registrations = new Dictionary<IStorageBlobContainer, ICollection<ITriggerExecutor<IStorageBlob>>>(
_registrations = new Dictionary<CloudBlobContainer, ICollection<ITriggerExecutor<ICloudBlob>>>(
new StorageBlobContainerComparer());
_logListeners = new Dictionary<IStorageBlobClient, BlobLogListener>(new StorageBlobClientComparer());
_logListeners = new Dictionary<CloudBlobClient, BlobLogListener>(new StorageBlobClientComparer());
_initialScanThread = new Thread(ScanContainers);
_blobsFoundFromScanOrNotification = new ConcurrentQueue<IStorageBlob>();
_blobsFoundFromScanOrNotification = new ConcurrentQueue<ICloudBlob>();
_cancellationTokenSource = new CancellationTokenSource();
_performInitialScan = performInitialScan;
}
public async Task RegisterAsync(IStorageBlobContainer container, ITriggerExecutor<IStorageBlob> triggerExecutor,
public async Task RegisterAsync(CloudBlobContainer container, ITriggerExecutor<ICloudBlob> triggerExecutor,
CancellationToken cancellationToken)
{
ThrowIfDisposed();
@ -53,7 +52,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
throw new InvalidOperationException("All registrations must be created before execution begins.");
}
ICollection<ITriggerExecutor<IStorageBlob>> containerRegistrations;
ICollection<ITriggerExecutor<ICloudBlob>> containerRegistrations;
if (_registrations.ContainsKey(container))
{
@ -61,13 +60,13 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
else
{
containerRegistrations = new List<ITriggerExecutor<IStorageBlob>>();
containerRegistrations = new List<ITriggerExecutor<ICloudBlob>>();
_registrations.Add(container, containerRegistrations);
}
containerRegistrations.Add(triggerExecutor);
IStorageBlobClient client = container.ServiceClient;
CloudBlobClient client = container.ServiceClient;
if (!_logListeners.ContainsKey(client))
{
@ -76,7 +75,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public void Notify(IStorageBlob blobWritten)
public void Notify(ICloudBlob blobWritten)
{
ThrowIfDisposed();
_blobsFoundFromScanOrNotification.Enqueue(blobWritten);
@ -90,7 +89,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
IStorageBlob blob;
ICloudBlob blob;
if (!_blobsFoundFromScanOrNotification.TryDequeue(out blob))
{
@ -105,7 +104,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
cancellationToken.ThrowIfCancellationRequested();
foreach (IStorageBlob blob in await logListener.GetRecentBlobWritesAsync(cancellationToken))
foreach (ICloudBlob blob in await logListener.GetRecentBlobWritesAsync(cancellationToken))
{
cancellationToken.ThrowIfCancellationRequested();
await NotifyRegistrationsAsync(blob, cancellationToken);
@ -143,9 +142,9 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
private async Task NotifyRegistrationsAsync(IStorageBlob blob, CancellationToken cancellationToken)
private async Task NotifyRegistrationsAsync(ICloudBlob blob, CancellationToken cancellationToken)
{
IStorageBlobContainer container = blob.Container;
CloudBlobContainer container = blob.Container;
// Log listening is client-wide and blob written notifications are host-wide, so filter out things that
// aren't in the container list.
@ -154,7 +153,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return;
}
foreach (ITriggerExecutor<IStorageBlob> registration in _registrations[container])
foreach (ITriggerExecutor<ICloudBlob> registration in _registrations[container])
{
cancellationToken.ThrowIfCancellationRequested();
@ -171,14 +170,14 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
CancellationToken cancellationToken = (CancellationToken)state;
foreach (IStorageBlobContainer container in _registrations.Keys)
foreach (CloudBlobContainer container in _registrations.Keys)
{
if (cancellationToken.IsCancellationRequested)
{
return;
}
List<IStorageListBlobItem> items;
List<IListBlobItem> items;
try
{
@ -200,7 +199,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
// Type cast to IStorageBlob is safe due to useFlatBlobListing: true above.
foreach (IStorageBlob item in items)
foreach (ICloudBlob item in items)
{
if (cancellationToken.IsCancellationRequested)
{

Просмотреть файл

@ -9,8 +9,6 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
@ -20,8 +18,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
internal sealed class ScanBlobScanLogHybridPollingStrategy : IBlobListenerStrategy
{
private static readonly TimeSpan PollingInterval = TimeSpan.FromSeconds(10);
private readonly IDictionary<IStorageBlobContainer, ContainerScanInfo> _scanInfo;
private readonly ConcurrentQueue<IStorageBlob> _blobsFoundFromScanOrNotification;
private readonly IDictionary<CloudBlobContainer, ContainerScanInfo> _scanInfo;
private readonly ConcurrentQueue<ICloudBlob> _blobsFoundFromScanOrNotification;
private readonly CancellationTokenSource _cancellationTokenSource;
private IBlobScanInfoManager _blobScanInfoManager;
// A budget is allocated representing the number of blobs to be listed in a polling
@ -34,10 +32,10 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
public ScanBlobScanLogHybridPollingStrategy(IBlobScanInfoManager blobScanInfoManager) : base()
{
_blobScanInfoManager = blobScanInfoManager;
_scanInfo = new Dictionary<IStorageBlobContainer, ContainerScanInfo>(new StorageBlobContainerComparer());
_scanInfo = new Dictionary<CloudBlobContainer, ContainerScanInfo>(new StorageBlobContainerComparer());
_pollLogStrategy = new PollLogsStrategy(performInitialScan: false);
_cancellationTokenSource = new CancellationTokenSource();
_blobsFoundFromScanOrNotification = new ConcurrentQueue<IStorageBlob>();
_blobsFoundFromScanOrNotification = new ConcurrentQueue<ICloudBlob>();
}
public void Start()
@ -53,7 +51,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
_cancellationTokenSource.Cancel();
}
public async Task RegisterAsync(IStorageBlobContainer container, ITriggerExecutor<IStorageBlob> triggerExecutor, CancellationToken cancellationToken)
public async Task RegisterAsync(CloudBlobContainer container, ITriggerExecutor<ICloudBlob> triggerExecutor, CancellationToken cancellationToken)
{
// Register and Execute are not concurrency-safe.
// Avoiding calling Register while Execute is running is the caller's responsibility.
@ -70,7 +68,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
containerScanInfo = new ContainerScanInfo()
{
Registrations = new List<ITriggerExecutor<IStorageBlob>>(),
Registrations = new List<ITriggerExecutor<ICloudBlob>>(),
LastSweepCycleLatestModified = latestStoredScan ?? DateTime.MinValue,
CurrentSweepCycleLatestModified = DateTime.MinValue,
ContinuationToken = null
@ -87,14 +85,14 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
ThrowIfDisposed();
Task logPollingTask = _pollLogStrategy.ExecuteAsync(cancellationToken);
List<IStorageBlob> failedNotifications = new List<IStorageBlob>();
List<ICloudBlob> failedNotifications = new List<ICloudBlob>();
List<Task> notifications = new List<Task>();
// Drain the background queue of blob written notifications.
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
IStorageBlob blob;
ICloudBlob blob;
if (!_blobsFoundFromScanOrNotification.TryDequeue(out blob))
{
@ -108,13 +106,13 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
List<Task> pollingTasks = new List<Task>();
pollingTasks.Add(logPollingTask);
foreach (KeyValuePair<IStorageBlobContainer, ContainerScanInfo> containerScanInfoPair in _scanInfo)
foreach (KeyValuePair<CloudBlobContainer, ContainerScanInfo> containerScanInfoPair in _scanInfo)
{
pollingTasks.Add(PollAndNotify(containerScanInfoPair.Key, containerScanInfoPair.Value, cancellationToken, failedNotifications));
}
// Re-add any failed notifications for the next iteration.
foreach (IStorageBlob failedNotification in failedNotifications)
foreach (var failedNotification in failedNotifications)
{
_blobsFoundFromScanOrNotification.Enqueue(failedNotification);
}
@ -125,13 +123,13 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return new TaskSeriesCommandResult(wait: Task.Delay(PollingInterval));
}
private async Task PollAndNotify(IStorageBlobContainer container, ContainerScanInfo containerScanInfo, CancellationToken cancellationToken, List<IStorageBlob> failedNotifications)
private async Task PollAndNotify(CloudBlobContainer container, ContainerScanInfo containerScanInfo, CancellationToken cancellationToken, List<ICloudBlob> failedNotifications)
{
cancellationToken.ThrowIfCancellationRequested();
DateTime lastScan = containerScanInfo.LastSweepCycleLatestModified;
IEnumerable<IStorageBlob> newBlobs = await PollNewBlobsAsync(container, containerScanInfo, cancellationToken);
IEnumerable<ICloudBlob> newBlobs = await PollNewBlobsAsync(container, containerScanInfo, cancellationToken);
foreach (IStorageBlob newBlob in newBlobs)
foreach (var newBlob in newBlobs)
{
cancellationToken.ThrowIfCancellationRequested();
await NotifyRegistrationsAsync(newBlob, failedNotifications, cancellationToken);
@ -157,7 +155,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public void Notify(IStorageBlob blobWritten)
public void Notify(ICloudBlob blobWritten)
{
ThrowIfDisposed();
_blobsFoundFromScanOrNotification.Enqueue(blobWritten);
@ -193,11 +191,11 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
/// the continuation token and the current cycle start for a container</param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task<IEnumerable<IStorageBlob>> PollNewBlobsAsync(
IStorageBlobContainer container, ContainerScanInfo containerScanInfo, CancellationToken cancellationToken)
public async Task<IEnumerable<ICloudBlob>> PollNewBlobsAsync(
CloudBlobContainer container, ContainerScanInfo containerScanInfo, CancellationToken cancellationToken)
{
IEnumerable<IStorageListBlobItem> currentBlobs;
IStorageBlobResultSegment blobSegment;
IEnumerable<IListBlobItem> currentBlobs;
BlobResultSegment blobSegment;
int blobPollLimitPerContainer = _scanBlobLimitPerPoll / _scanInfo.Count;
BlobContinuationToken continuationToken = containerScanInfo.ContinuationToken;
@ -217,7 +215,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
if (exception.IsNotFound())
{
return Enumerable.Empty<IStorageBlob>();
return Enumerable.Empty<ICloudBlob>();
}
else
{
@ -225,14 +223,14 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
List<IStorageBlob> newBlobs = new List<IStorageBlob>();
List<ICloudBlob> newBlobs = new List<ICloudBlob>();
// Type cast to IStorageBlob is safe due to useFlatBlobListing: true above.
foreach (IStorageBlob currentBlob in currentBlobs)
foreach (ICloudBlob currentBlob in currentBlobs)
{
cancellationToken.ThrowIfCancellationRequested();
IStorageBlobProperties properties = currentBlob.Properties;
var properties = currentBlob.Properties;
DateTime lastModifiedTimestamp = properties.LastModified.Value.UtcDateTime;
if (lastModifiedTimestamp > containerScanInfo.CurrentSweepCycleLatestModified)
@ -259,9 +257,9 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return newBlobs;
}
private async Task NotifyRegistrationsAsync(IStorageBlob blob, ICollection<IStorageBlob> failedNotifications, CancellationToken cancellationToken)
private async Task NotifyRegistrationsAsync(ICloudBlob blob, ICollection<ICloudBlob> failedNotifications, CancellationToken cancellationToken)
{
IStorageBlobContainer container = blob.Container;
CloudBlobContainer container = blob.Container;
ContainerScanInfo containerScanInfo;
// Blob written notifications are host-wide, so filter out things that aren't in the container list.
@ -270,7 +268,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return;
}
foreach (ITriggerExecutor<IStorageBlob> registration in containerScanInfo.Registrations)
foreach (ITriggerExecutor<ICloudBlob> registration in containerScanInfo.Registrations)
{
cancellationToken.ThrowIfCancellationRequested();

Просмотреть файл

@ -9,8 +9,6 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
@ -21,30 +19,30 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
private static readonly TimeSpan TwoSeconds = TimeSpan.FromSeconds(2);
private readonly IDictionary<IStorageBlobContainer, ICollection<ITriggerExecutor<IStorageBlob>>> _registrations;
private readonly IDictionary<IStorageBlobContainer, DateTime> _lastModifiedTimestamps;
private readonly ConcurrentQueue<IStorageBlob> _blobWrittenNotifications;
private readonly IDictionary<CloudBlobContainer, ICollection<ITriggerExecutor<ICloudBlob>>> _registrations;
private readonly IDictionary<CloudBlobContainer, DateTime> _lastModifiedTimestamps;
private readonly ConcurrentQueue<ICloudBlob> _blobWrittenNotifications;
public ScanContainersStrategy()
{
_registrations = new Dictionary<IStorageBlobContainer, ICollection<ITriggerExecutor<IStorageBlob>>>(
_registrations = new Dictionary<CloudBlobContainer, ICollection<ITriggerExecutor<ICloudBlob>>>(
new StorageBlobContainerComparer());
_lastModifiedTimestamps = new Dictionary<IStorageBlobContainer, DateTime>(
_lastModifiedTimestamps = new Dictionary<CloudBlobContainer, DateTime>(
new StorageBlobContainerComparer());
_blobWrittenNotifications = new ConcurrentQueue<IStorageBlob>();
_blobWrittenNotifications = new ConcurrentQueue<ICloudBlob>();
}
public void Notify(IStorageBlob blobWritten)
public void Notify(ICloudBlob blobWritten)
{
_blobWrittenNotifications.Enqueue(blobWritten);
}
public Task RegisterAsync(IStorageBlobContainer container, ITriggerExecutor<IStorageBlob> triggerExecutor,
public Task RegisterAsync(CloudBlobContainer container, ITriggerExecutor<ICloudBlob> triggerExecutor,
CancellationToken cancellationToken)
{
// Register and Execute are not concurrency-safe.
// Avoiding calling Register while Execute is running is the caller's responsibility.
ICollection<ITriggerExecutor<IStorageBlob>> containerRegistrations;
ICollection<ITriggerExecutor<ICloudBlob>> containerRegistrations;
if (_registrations.ContainsKey(container))
{
@ -52,7 +50,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
else
{
containerRegistrations = new List<ITriggerExecutor<IStorageBlob>>();
containerRegistrations = new List<ITriggerExecutor<ICloudBlob>>();
_registrations.Add(container, containerRegistrations);
}
@ -68,13 +66,13 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
public async Task<TaskSeriesCommandResult> ExecuteAsync(CancellationToken cancellationToken)
{
List<IStorageBlob> failedNotifications = new List<IStorageBlob>();
List<ICloudBlob> failedNotifications = new List<ICloudBlob>();
// Drain the background queue of blob written notifications.
while (true)
{
cancellationToken.ThrowIfCancellationRequested();
IStorageBlob blob;
ICloudBlob blob;
if (!_blobWrittenNotifications.TryDequeue(out blob))
{
@ -84,16 +82,16 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
await NotifyRegistrationsAsync(blob, failedNotifications, cancellationToken);
}
foreach (IStorageBlobContainer container in _registrations.Keys)
foreach (CloudBlobContainer container in _registrations.Keys)
{
cancellationToken.ThrowIfCancellationRequested();
DateTime lastScanTimestamp = _lastModifiedTimestamps[container];
Tuple<IEnumerable<IStorageBlob>, DateTime> newBlobsResult = await PollNewBlobsAsync(container,
Tuple<IEnumerable<ICloudBlob>, DateTime> newBlobsResult = await PollNewBlobsAsync(container,
lastScanTimestamp, cancellationToken);
IEnumerable<IStorageBlob> newBlobs = newBlobsResult.Item1;
IEnumerable<ICloudBlob> newBlobs = newBlobsResult.Item1;
_lastModifiedTimestamps[container] = newBlobsResult.Item2;
foreach (IStorageBlob newBlob in newBlobs)
foreach (ICloudBlob newBlob in newBlobs)
{
cancellationToken.ThrowIfCancellationRequested();
await NotifyRegistrationsAsync(newBlob, failedNotifications, cancellationToken);
@ -101,7 +99,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
// Re-add any failed notifications for the next iteration.
foreach (IStorageBlob failedNotification in failedNotifications)
foreach (ICloudBlob failedNotification in failedNotifications)
{
_blobWrittenNotifications.Enqueue(failedNotification);
}
@ -122,10 +120,10 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
}
private async Task NotifyRegistrationsAsync(IStorageBlob blob, ICollection<IStorageBlob> failedNotifications,
private async Task NotifyRegistrationsAsync(ICloudBlob blob, ICollection<ICloudBlob> failedNotifications,
CancellationToken cancellationToken)
{
IStorageBlobContainer container = blob.Container;
CloudBlobContainer container = blob.Container;
// Blob written notifications are host-wide, so filter out things that aren't in the container list.
if (!_registrations.ContainsKey(container))
@ -133,7 +131,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return;
}
foreach (ITriggerExecutor<IStorageBlob> registration in _registrations[container])
foreach (ITriggerExecutor<ICloudBlob> registration in _registrations[container])
{
cancellationToken.ThrowIfCancellationRequested();
@ -146,12 +144,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
public static async Task<Tuple<IEnumerable<IStorageBlob>, DateTime>> PollNewBlobsAsync(
IStorageBlobContainer container, DateTime previousTimestamp, CancellationToken cancellationToken)
public static async Task<Tuple<IEnumerable<ICloudBlob>, DateTime>> PollNewBlobsAsync(
CloudBlobContainer container, DateTime previousTimestamp, CancellationToken cancellationToken)
{
DateTime updatedTimestamp = previousTimestamp;
IList<IStorageListBlobItem> currentBlobs;
IList<IListBlobItem> currentBlobs;
try
{
@ -162,8 +160,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
if (exception.IsNotFound())
{
return new Tuple<IEnumerable<IStorageBlob>, DateTime>(
Enumerable.Empty<IStorageBlob>(), updatedTimestamp);
return new Tuple<IEnumerable<ICloudBlob>, DateTime>(
Enumerable.Empty<ICloudBlob>(), updatedTimestamp);
}
else
{
@ -171,14 +169,14 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
List<IStorageBlob> newBlobs = new List<IStorageBlob>();
List<ICloudBlob> newBlobs = new List<ICloudBlob>();
// Type cast to IStorageBlob is safe due to useFlatBlobListing: true above.
foreach (IStorageBlob currentBlob in currentBlobs)
foreach (ICloudBlob currentBlob in currentBlobs)
{
cancellationToken.ThrowIfCancellationRequested();
IStorageBlobProperties properties = currentBlob.Properties;
var properties = currentBlob.Properties;
DateTime lastModifiedTimestamp = properties.LastModified.Value.UtcDateTime;
if (lastModifiedTimestamp > updatedTimestamp)
@ -192,7 +190,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
return new Tuple<IEnumerable<IStorageBlob>, DateTime>(newBlobs, updatedTimestamp);
return new Tuple<IEnumerable<ICloudBlob>, DateTime>(newBlobs, updatedTimestamp);
}
}
}

Просмотреть файл

@ -5,9 +5,8 @@ using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -19,7 +18,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
private bool _started;
private bool _disposed;
public SharedBlobListener(string hostId, IStorageAccount storageAccount,
public SharedBlobListener(string hostId, XStorageAccount storageAccount,
IWebJobsExceptionHandler exceptionHandler)
{
_strategy = CreateStrategy(hostId, storageAccount);
@ -32,7 +31,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
get { return _strategy; }
}
public Task RegisterAsync(IStorageBlobContainer container, ITriggerExecutor<IStorageBlob> triggerExecutor,
public Task RegisterAsync(CloudBlobContainer container, ITriggerExecutor<ICloudBlob> triggerExecutor,
CancellationToken cancellationToken)
{
if (_started)
@ -87,11 +86,11 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
private static IBlobListenerStrategy CreateStrategy(string hostId, IStorageAccount account)
private static IBlobListenerStrategy CreateStrategy(string hostId, XStorageAccount account)
{
if (!StorageClient.IsDevelopmentStorageAccount(account))
if (!account.IsDevelopmentStorageAccount())
{
IBlobScanInfoManager scanInfoManager = new StorageBlobScanInfoManager(hostId, account.CreateBlobClient());
IBlobScanInfoManager scanInfoManager = new StorageBlobScanInfoManager(hostId, account.CreateCloudBlobClient());
return new ScanBlobScanLogHybridPollingStrategy(scanInfoManager);
}
else

Просмотреть файл

@ -3,19 +3,18 @@
using System;
using System.Diagnostics.CodeAnalysis;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Timers;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal class SharedBlobListenerFactory : IFactory<SharedBlobListener>
{
private readonly IStorageAccount _account;
private readonly XStorageAccount _account;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IBlobWrittenWatcher> _blobWrittenWatcherSetter;
private readonly string _hostId;
public SharedBlobListenerFactory(string hostId, IStorageAccount account,
public SharedBlobListenerFactory(string hostId, XStorageAccount account,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IBlobWrittenWatcher> blobWrittenWatcherSetter)
{

Просмотреть файл

@ -7,8 +7,6 @@ using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
@ -19,17 +17,17 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
internal class SharedBlobQueueListenerFactory : IFactory<SharedBlobQueueListener>
{
private readonly SharedQueueWatcher _sharedQueueWatcher;
private readonly IStorageQueue _hostBlobTriggerQueue;
private readonly CloudQueue _hostBlobTriggerQueue;
private readonly JobHostQueuesOptions _queueOptions;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IBlobWrittenWatcher _blobWrittenWatcher;
private readonly IStorageAccount _hostAccount;
private readonly XStorageAccount _hostAccount;
private readonly ILoggerFactory _loggerFactory;
public SharedBlobQueueListenerFactory(
IStorageAccount hostAccount,
XStorageAccount hostAccount,
SharedQueueWatcher sharedQueueWatcher,
IStorageQueue hostBlobTriggerQueue,
CloudQueue hostBlobTriggerQueue,
JobHostQueuesOptions queueOptions,
IWebJobsExceptionHandler exceptionHandler,
ILoggerFactory loggerFactory,
@ -55,12 +53,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
// the triggering blob.
// However we use a poison queue in the host storage account as a fallback default
// in case a particular blob lives in a restricted "blob only" storage account (i.e. no queues).
IStorageQueue defaultPoisonQueue = _hostAccount.CreateQueueClient().GetQueueReference(HostQueueNames.BlobTriggerPoisonQueue);
var defaultPoisonQueue = _hostAccount.CreateCloudQueueClient().GetQueueReference(HostQueueNames.BlobTriggerPoisonQueue);
// this special queue bypasses the QueueProcessorFactory - we don't want people to
// override this
QueueProcessorFactoryContext context = new QueueProcessorFactoryContext(_hostBlobTriggerQueue.SdkObject, _loggerFactory,
_queueOptions, defaultPoisonQueue.SdkObject);
QueueProcessorFactoryContext context = new QueueProcessorFactoryContext(_hostBlobTriggerQueue, _loggerFactory,
_queueOptions, defaultPoisonQueue);
SharedBlobQueueProcessor queueProcessor = new SharedBlobQueueProcessor(context, triggerExecutor);
IListener listener = new QueueListener(_hostBlobTriggerQueue, defaultPoisonQueue, triggerExecutor, _exceptionHandler, _loggerFactory,
@ -102,8 +100,8 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
BlobQueueRegistration registration = null;
if (_executor.TryGetRegistration(blobTriggerMessage.FunctionId, out registration))
{
IStorageQueue poisonQueue = registration.QueueClient.GetQueueReference(HostQueueNames.BlobTriggerPoisonQueue);
return poisonQueue.SdkObject;
var poisonQueue = registration.QueueClient.GetQueueReference(HostQueueNames.BlobTriggerPoisonQueue);
return poisonQueue;
}
return null;

Просмотреть файл

@ -11,7 +11,7 @@ using System.Net;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
@ -72,7 +72,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
/// Then it calls TryParseLogEntry to create a log entry out of every line of supported version and throws
/// an exception if the parse method returns null.
/// </remarks>
public async Task<IEnumerable<StorageAnalyticsLogEntry>> ParseLogAsync(IStorageBlob blob,
public async Task<IEnumerable<StorageAnalyticsLogEntry>> ParseLogAsync(ICloudBlob blob,
CancellationToken cancellationToken)
{
List<StorageAnalyticsLogEntry> entries = new List<StorageAnalyticsLogEntry>();

Просмотреть файл

@ -1,16 +1,16 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.WindowsAzure.Storage.Blob;
using System;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
// IStorageBlobClients are flyweights; distinct references do not equate to distinct storage accounts.
internal class StorageBlobClientComparer : IEqualityComparer<IStorageBlobClient>
internal class StorageBlobClientComparer : IEqualityComparer<CloudBlobClient>
{
public bool Equals(IStorageBlobClient x, IStorageBlobClient y)
public bool Equals(CloudBlobClient x, CloudBlobClient y)
{
if (x == null)
{
@ -24,7 +24,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return x.Credentials.AccountName == y.Credentials.AccountName;
}
public int GetHashCode(IStorageBlobClient obj)
public int GetHashCode(CloudBlobClient obj)
{
if (obj == null)
{

Просмотреть файл

@ -5,14 +5,13 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal static class StorageBlobClientExtensions
{
public static async Task<IEnumerable<IStorageListBlobItem>> ListBlobsAsync(this IStorageBlobClient client,
public static async Task<IEnumerable<IListBlobItem>> ListBlobsAsync(this CloudBlobClient client,
string prefix, bool useFlatBlobListing, BlobListingDetails blobListingDetails,
CancellationToken cancellationToken)
{
@ -21,19 +20,19 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
throw new ArgumentNullException("client");
}
List<IStorageListBlobItem> allResults = new List<IStorageListBlobItem>();
List<IListBlobItem> allResults = new List<IListBlobItem>();
BlobContinuationToken continuationToken = null;
IStorageBlobResultSegment result;
BlobResultSegment result;
do
{
result = await client.ListBlobsSegmentedAsync(prefix, useFlatBlobListing, blobListingDetails,
maxResults: null, currentToken: continuationToken, options: null, operationContext: null,
cancellationToken: cancellationToken);
maxResults: null, currentToken: continuationToken, options: null, operationContext: null
);
if (result != null)
{
IEnumerable<IStorageListBlobItem> currentResults = result.Results;
IEnumerable<IListBlobItem> currentResults = result.Results;
if (currentResults != null)
{
allResults.AddRange(currentResults);
@ -45,6 +44,6 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
while (result != null && continuationToken != null);
return allResults;
}
}
}
}

Просмотреть файл

@ -3,14 +3,14 @@
using System;
using System.Collections.Generic;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
// IStorageBlobContainers are flyweights; distinct references do not equate to distinct containers.
internal class StorageBlobContainerComparer : IEqualityComparer<IStorageBlobContainer>
internal class StorageBlobContainerComparer : IEqualityComparer<CloudBlobContainer>
{
public bool Equals(IStorageBlobContainer x, IStorageBlobContainer y)
public bool Equals(CloudBlobContainer x, CloudBlobContainer y)
{
if (x == null)
{
@ -24,7 +24,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
return x.Uri == y.Uri;
}
public int GetHashCode(IStorageBlobContainer obj)
public int GetHashCode(CloudBlobContainer obj)
{
if (obj == null)
{

Просмотреть файл

@ -5,14 +5,13 @@ using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
internal static class StorageBlobContainerExtensions
{
public static async Task<IEnumerable<IStorageListBlobItem>> ListBlobsAsync(this IStorageBlobContainer container, string prefix,
public static async Task<IEnumerable<IListBlobItem>> ListBlobsAsync(this CloudBlobContainer container, string prefix,
bool useFlatBlobListing, CancellationToken cancellationToken)
{
if (container == null)
@ -20,9 +19,9 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
throw new ArgumentNullException("container");
}
List<IStorageListBlobItem> allResults = new List<IStorageListBlobItem>();
List<IListBlobItem> allResults = new List<IListBlobItem>();
BlobContinuationToken continuationToken = null;
IStorageBlobResultSegment result;
BlobResultSegment result;
do
{
@ -32,7 +31,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
if (result != null)
{
IEnumerable<IStorageListBlobItem> currentResults = result.Results;
IEnumerable<IListBlobItem> currentResults = result.Results;
if (currentResults != null)
{
allResults.AddRange(currentResults);

Просмотреть файл

@ -6,9 +6,8 @@ using System.Globalization;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Blob;
using Newtonsoft.Json;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
@ -17,9 +16,9 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
{
private readonly JsonSerializer _serializer;
private readonly string _hostId;
private IStorageBlobDirectory _blobScanInfoDirectory;
private CloudBlobDirectory _blobScanInfoDirectory;
public StorageBlobScanInfoManager(string hostId, IStorageBlobClient blobClient)
public StorageBlobScanInfoManager(string hostId, CloudBlobClient blobClient)
{
if (string.IsNullOrEmpty(hostId))
{
@ -43,7 +42,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
public async Task<DateTime?> LoadLatestScanAsync(string storageAccountName, string containerName)
{
IStorageBlockBlob scanInfoBlob = GetScanInfoBlobReference(storageAccountName, containerName);
var scanInfoBlob = GetScanInfoBlobReference(storageAccountName, containerName);
DateTime? latestScan = null;
try
{
@ -88,7 +87,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
try
{
IStorageBlockBlob scanInfoBlob = GetScanInfoBlobReference(storageAccountName, containerName);
CloudBlockBlob scanInfoBlob = GetScanInfoBlobReference(storageAccountName, containerName);
await scanInfoBlob.UploadTextAsync(scanInfoLine);
}
catch
@ -97,7 +96,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Listeners
}
}
private IStorageBlockBlob GetScanInfoBlobReference(string storageAccountName, string containerName)
private CloudBlockBlob GetScanInfoBlobReference(string storageAccountName, string containerName)
{
// Path to the status blob is:
// blobScanInfo/{hostId}/{accountName}/{containerName}/scanInfo

Просмотреть файл

@ -4,21 +4,20 @@
using System.IO;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage;
using System.Threading;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal static class ReadBlobArgumentBinding
{
public static Task<WatchableReadStream> TryBindStreamAsync(IStorageBlob blob, ValueBindingContext context)
public static Task<WatchableReadStream> TryBindStreamAsync(ICloudBlob blob, ValueBindingContext context)
{
return TryBindStreamAsync(blob, context.CancellationToken);
}
public static async Task<WatchableReadStream> TryBindStreamAsync(IStorageBlob blob, CancellationToken cancellationToken)
public static async Task<WatchableReadStream> TryBindStreamAsync(ICloudBlob blob, CancellationToken cancellationToken)
{
Stream rawStream;
try

Просмотреть файл

@ -2,13 +2,14 @@
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure.Storage;
namespace Microsoft.Azure.WebJobs.Host
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
public interface IDistributedLockManagerFactory
internal static class StorageBlobExtensions
{
IDistributedLockManager Create();
}
}

Просмотреть файл

@ -3,28 +3,27 @@
using System;
using Microsoft.Azure.WebJobs.Host.Converters;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs
{
internal class StorageBlobToCloudAppendBlobConverter : IConverter<IStorageBlob, CloudAppendBlob>
internal class StorageBlobConverter<T> : IConverter<ICloudBlob, T> where T : class, ICloudBlob
{
public CloudAppendBlob Convert(IStorageBlob input)
public T Convert(ICloudBlob input)
{
if (input == null)
{
return null;
return default(T);
}
IStorageAppendBlob appendBlob = input as IStorageAppendBlob;
T appendBlob = input as T;
if (appendBlob == null)
{
throw new InvalidOperationException("The blob is not an append blob.");
throw new InvalidOperationException($"The blob is not an {typeof(T).Name}.");
}
return appendBlob.SdkObject;
return appendBlob;
}
}
}

Просмотреть файл

@ -12,61 +12,63 @@ using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Indexers;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
{
internal class BlobTriggerExtensionConfig : IExtensionConfigProvider
{
private IStorageAccountProvider _accountProvider;
private XStorageAccountProvider _accountProvider;
private BlobTriggerAttributeBindingProvider _triggerBinder;
public BlobTriggerExtensionConfig(IStorageAccountProvider accountProvider)
public BlobTriggerExtensionConfig(XStorageAccountProvider accountProvider, BlobTriggerAttributeBindingProvider triggerBinder)
{
_accountProvider = accountProvider;
_triggerBinder = triggerBinder;
}
public void Initialize(ExtensionConfigContext context)
{
var rule = context.AddBindingRule<BlobTriggerAttribute>();
rule.BindToTrigger<IStorageBlob>();
rule.BindToTrigger<ICloudBlob>(_triggerBinder);
rule.AddConverter<IStorageBlob, DirectInvokeString>(blob => new DirectInvokeString(blob.GetBlobPath()));
rule.AddConverter<DirectInvokeString, IStorageBlob>(ConvertFromInvokeString);
rule.AddConverter<ICloudBlob, DirectInvokeString>(blob => new DirectInvokeString(blob.GetBlobPath()));
rule.AddConverter<DirectInvokeString, ICloudBlob>(ConvertFromInvokeString);
// Common converters shared between [Blob] and [BlobTrigger]
// Trigger already has the IStorageBlob. Whereas BindToInput defines: Attr-->Stream.
// Converter manager already has Stream-->Byte[],String,TextReader
context.AddConverter<IStorageBlob, Stream>(ConvertToStreamAsync);
context.AddConverter<ICloudBlob, Stream>(ConvertToStreamAsync);
// Blob type is a property of an existing blob.
context.AddConverter(new StorageBlobToCloudBlobConverter());
context.AddConverter(new StorageBlobToCloudBlockBlobConverter());
context.AddConverter(new StorageBlobToCloudPageBlobConverter());
context.AddConverter(new StorageBlobToCloudAppendBlobConverter());
// Blob type is a property of an existing blob.
// $$$ did we lose CloudBlob. That's a base class for Cloud*Blob, but does not implement ICloudBlob?
context.AddConverter(new StorageBlobConverter<CloudAppendBlob>());
context.AddConverter(new StorageBlobConverter<CloudBlockBlob>());
context.AddConverter(new StorageBlobConverter<CloudPageBlob>());
}
private async Task<Stream> ConvertToStreamAsync(IStorageBlob input, CancellationToken cancellationToken)
private async Task<Stream> ConvertToStreamAsync(ICloudBlob input, CancellationToken cancellationToken)
{
WatchableReadStream watchableStream = await ReadBlobArgumentBinding.TryBindStreamAsync(input, cancellationToken);
return watchableStream;
}
// For describing InvokeStrings.
private async Task<IStorageBlob> ConvertFromInvokeString(DirectInvokeString input, Attribute attr, ValueBindingContext context)
private async Task<ICloudBlob> ConvertFromInvokeString(DirectInvokeString input, Attribute attr, ValueBindingContext context)
{
var attrResolved = (BlobTriggerAttribute)attr;
var account = await _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None);
var client = account.CreateBlobClient();
var cancellationToken = context.CancellationToken;
var account = _accountProvider.Get(attrResolved.Connection);
var client = account.CreateCloudBlobClient();
BlobPath path = BlobPath.ParseAndValidate(input.Value);
IStorageBlobContainer container = client.GetContainerReference(path.ContainerName);
var blob = await container.GetBlobReferenceFromServerAsync(path.BlobName, cancellationToken);
var container = client.GetContainerReference(path.ContainerName);
var blob = await container.GetBlobReferenceFromServerAsync(path.BlobName);
return blob;
}
@ -75,39 +77,34 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
internal class BlobTriggerAttributeBindingProvider : ITriggerBindingProvider
{
private readonly INameResolver _nameResolver;
private readonly IStorageAccountProvider _accountProvider;
private readonly XStorageAccountProvider _accountProvider;
private readonly IHostIdProvider _hostIdProvider;
private readonly JobHostQueuesOptions _queueOptions;
private readonly JobHostBlobsOptions _blobsOptions;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IBlobWrittenWatcher> _blobWrittenWatcherSetter;
private readonly IContextSetter<IMessageEnqueuedWatcher> _messageEnqueuedWatcherSetter;
private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
private readonly ISharedContextProvider _sharedContextProvider;
private readonly SingletonManager _singletonManager;
private readonly IHostSingletonManager _singletonManager;
private readonly ILoggerFactory _loggerFactory;
public BlobTriggerAttributeBindingProvider(INameResolver nameResolver,
IStorageAccountProvider accountProvider,
IExtensionTypeLocator extensionTypeLocator,
XStorageAccountProvider accountProvider,
IHostIdProvider hostIdProvider,
JobHostQueuesOptions queueOptions,
JobHostBlobsOptions blobsConfiguration,
IOptions<JobHostQueuesOptions> queueOptions,
IOptions<JobHostBlobsOptions> blobsConfiguration,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IBlobWrittenWatcher> blobWrittenWatcherSetter,
IContextSetter<IMessageEnqueuedWatcher> messageEnqueuedWatcherSetter,
SharedQueueWatcher messageEnqueuedWatcherSetter,
ISharedContextProvider sharedContextProvider,
SingletonManager singletonManager,
IHostSingletonManager singletonManager,
ILoggerFactory loggerFactory)
{
if (extensionTypeLocator == null)
{
throw new ArgumentNullException(nameof(extensionTypeLocator));
}
_accountProvider = accountProvider ?? throw new ArgumentNullException(nameof(accountProvider));
_hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
_blobsOptions = blobsConfiguration ?? throw new ArgumentNullException(nameof(blobsConfiguration));
_queueOptions = (queueOptions ?? throw new ArgumentNullException(nameof(queueOptions))).Value;
_blobsOptions = (blobsConfiguration?? throw new ArgumentNullException(nameof(blobsConfiguration))).Value;
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
_blobWrittenWatcherSetter = blobWrittenWatcherSetter ?? throw new ArgumentNullException(nameof(blobWrittenWatcherSetter));
_messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter));
@ -131,10 +128,11 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
string resolvedCombinedPath = Resolve(blobTriggerAttribute.BlobPath);
IBlobPathSource path = BlobPathSource.Create(resolvedCombinedPath);
IStorageAccount hostAccount = await _accountProvider.GetStorageAccountAsync(context.CancellationToken);
IStorageAccount dataAccount = await _accountProvider.GetStorageAccountAsync(blobTriggerAttribute, context.CancellationToken, _nameResolver);
var hostAccount = _accountProvider.GetHost();
var dataAccount = _accountProvider.Get(blobTriggerAttribute.Connection, _nameResolver);
// premium does not support blob logs, so disallow for blob triggers
dataAccount.AssertTypeOneOf(StorageAccountType.GeneralPurpose, StorageAccountType.BlobOnly);
// dataAccount.AssertTypeOneOf(StorageAccountType.GeneralPurpose, StorageAccountType.BlobOnly); $$$
ITriggerBinding binding = new BlobTriggerBinding(parameter, hostAccount, dataAccount, path,
_hostIdProvider, _queueOptions, _blobsOptions, _exceptionHandler, _blobWrittenWatcherSetter,

Просмотреть файл

@ -13,8 +13,7 @@ using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Microsoft.Extensions.Logging;
@ -25,9 +24,9 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
internal class BlobTriggerBinding : ITriggerBinding
{
private readonly ParameterInfo _parameter;
private readonly IStorageAccount _hostAccount;
private readonly IStorageAccount _dataAccount;
private readonly IStorageBlobClient _blobClient;
private readonly XStorageAccount _hostAccount;
private readonly XStorageAccount _dataAccount;
private readonly CloudBlobClient _blobClient;
private readonly string _accountName;
private readonly IBlobPathSource _path;
private readonly IHostIdProvider _hostIdProvider;
@ -35,37 +34,33 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
private readonly JobHostBlobsOptions _blobsConfiguration;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IBlobWrittenWatcher> _blobWrittenWatcherSetter;
private readonly IContextSetter<IMessageEnqueuedWatcher> _messageEnqueuedWatcherSetter;
private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
private readonly ISharedContextProvider _sharedContextProvider;
private readonly ILoggerFactory _loggerFactory;
private readonly IAsyncObjectToTypeConverter<IStorageBlob> _converter;
private readonly IAsyncObjectToTypeConverter<ICloudBlob> _converter;
private readonly IReadOnlyDictionary<string, Type> _bindingDataContract;
private readonly SingletonManager _singletonManager;
private readonly IHostSingletonManager _singletonManager;
public BlobTriggerBinding(ParameterInfo parameter,
IStorageAccount hostAccount,
IStorageAccount dataAccount,
XStorageAccount hostAccount,
XStorageAccount dataAccount,
IBlobPathSource path,
IHostIdProvider hostIdProvider,
JobHostQueuesOptions queueOptions,
JobHostBlobsOptions blobsConfiguration,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IBlobWrittenWatcher> blobWrittenWatcherSetter,
IContextSetter<IMessageEnqueuedWatcher> messageEnqueuedWatcherSetter,
SharedQueueWatcher messageEnqueuedWatcherSetter,
ISharedContextProvider sharedContextProvider,
SingletonManager singletonManager,
IHostSingletonManager singletonManager,
ILoggerFactory loggerFactory)
{
_parameter = parameter ?? throw new ArgumentNullException(nameof(parameter));
_hostAccount = hostAccount ?? throw new ArgumentNullException(nameof(hostAccount));
_dataAccount = dataAccount ?? throw new ArgumentNullException(nameof(dataAccount));
StorageClientFactoryContext context = new StorageClientFactoryContext
{
Parameter = parameter
};
_blobClient = dataAccount.CreateBlobClient(context);
_blobClient = dataAccount.CreateCloudBlobClient();
_accountName = BlobClient.GetAccountName(_blobClient);
_path = path ?? throw new ArgumentNullException(nameof(path));
_hostIdProvider = hostIdProvider ?? throw new ArgumentNullException(nameof(hostIdProvider));
@ -85,7 +80,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
{
get
{
return typeof(IStorageBlob);
return typeof(ICloudBlob);
}
}
@ -131,12 +126,12 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
return contract;
}
private IReadOnlyDictionary<string, object> CreateBindingData(IStorageBlob value)
private IReadOnlyDictionary<string, object> CreateBindingData(ICloudBlob value)
{
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
bindingData.Add("BlobTrigger", value.GetBlobPath());
bindingData.Add("Uri", value.Uri);
bindingData.Add("Properties", value.Properties?.SdkObject);
bindingData.Add("Properties", value.Properties);
bindingData.Add("Metadata", value.Metadata);
IReadOnlyDictionary<string, object> bindingDataFromPath = _path.CreateBindingData(value.ToBlobPath());
@ -152,17 +147,16 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
return bindingData;
}
private static IAsyncObjectToTypeConverter<IStorageBlob> CreateConverter(IStorageBlobClient client)
private static IAsyncObjectToTypeConverter<ICloudBlob> CreateConverter(CloudBlobClient client)
{
return new CompositeAsyncObjectToTypeConverter<IStorageBlob>(
new BlobOutputConverter<IStorageBlob>(new AsyncConverter<IStorageBlob, IStorageBlob>(new IdentityConverter<IStorageBlob>())),
new BlobOutputConverter<ICloudBlob>(new AsyncConverter<ICloudBlob, IStorageBlob>(new CloudBlobToStorageBlobConverter())),
return new CompositeAsyncObjectToTypeConverter<ICloudBlob>(
new BlobOutputConverter<ICloudBlob>(new AsyncConverter<ICloudBlob, ICloudBlob>(new IdentityConverter<ICloudBlob>())),
new BlobOutputConverter<string>(new StringToStorageBlobConverter(client)));
}
public async Task<ITriggerData> BindAsync(object value, ValueBindingContext context)
{
ConversionResult<IStorageBlob> conversionResult = await _converter.TryConvertAsync(value, context.CancellationToken);
ConversionResult<ICloudBlob> conversionResult = await _converter.TryConvertAsync(value, context.CancellationToken);
if (!conversionResult.Succeeded)
{
@ -181,7 +175,7 @@ namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
throw new ArgumentNullException("context");
}
IStorageBlobContainer container = _blobClient.GetContainerReference(_path.ContainerNamePattern);
var container = _blobClient.GetContainerReference(_path.ContainerNamePattern);
var factory = new BlobListenerFactory(_hostIdProvider, _queueOptions, _blobsConfiguration, _exceptionHandler,
_blobWrittenWatcherSetter, _messageEnqueuedWatcherSetter, _sharedContextProvider, _loggerFactory,

Просмотреть файл

@ -3,24 +3,23 @@
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Converters;
using Microsoft.Azure.WebJobs.Host.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Blob;
namespace Microsoft.Azure.WebJobs.Host.Blobs.Triggers
{
internal class StringToStorageBlobConverter : IAsyncConverter<string, IStorageBlob>
internal class StringToStorageBlobConverter : IAsyncConverter<string, ICloudBlob>
{
private readonly IStorageBlobClient _client;
private readonly CloudBlobClient _client;
public StringToStorageBlobConverter(IStorageBlobClient client)
public StringToStorageBlobConverter(CloudBlobClient client)
{
_client = client;
}
public Task<IStorageBlob> ConvertAsync(string input, CancellationToken cancellationToken)
public Task<ICloudBlob> ConvertAsync(string input, CancellationToken cancellationToken)
{
BlobPath path = BlobPath.ParseAndValidate(input);
IStorageBlobContainer container = _client.GetContainerReference(path.ContainerName);
var container = _client.GetContainerReference(path.ContainerName);
return container.GetBlobReferenceFromServerAsync(path.BlobName, cancellationToken);
}
}

Просмотреть файл

@ -0,0 +1,136 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
namespace WebJobs.Extension.Storage
{
// $$$ This exposes Azure Storage implementations for runtime state objects.
class Class1 : ISuperhack
{
private readonly JobHostQueuesOptions _queueOptions;
private readonly ILoggerFactory _loggerFactory;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly SharedQueueWatcher _sharedWatcher;
private readonly XStorageAccountProvider _storageAccountProvider;
public Class1(
XStorageAccountProvider storageAccountProvider,
IOptions<JobHostQueuesOptions> queueOptions,
IWebJobsExceptionHandler exceptionHandler,
SharedQueueWatcher sharedWatcher,
ILoggerFactory loggerFactory)
{
_storageAccountProvider = storageAccountProvider;
_queueOptions = queueOptions.Value;
_exceptionHandler = exceptionHandler;
_sharedWatcher = sharedWatcher;
_loggerFactory = loggerFactory;
}
public QueueMoniker GetQueueReference(string queueName) // Storage accounts?
{
return new QueueMoniker
{
// ConnectionString = _storageAccountProvider.DashboardConnectionString, $$$
QueueName = queueName
};
}
public IAsyncCollector<T> GetQueueWriter<T>(QueueMoniker queue)
{
return new QueueWriter<T>(this, Convert(queue));
}
class QueueWriter<T> : IAsyncCollector<T>
{
Class1 _parent;
CloudQueue _queue;
public QueueWriter(Class1 parent, CloudQueue queue)
{
this._parent = parent;
this._queue = queue;
}
public async Task AddAsync(T item, CancellationToken cancellationToken = default(CancellationToken))
{
string contents = JsonConvert.SerializeObject(
item,
JsonSerialization.Settings);
var msg = new CloudQueueMessage(contents);
await _queue.AddMessageAndCreateIfNotExistsAsync(msg, cancellationToken);
_parent._sharedWatcher.Notify(_queue.Name);
}
public Task FlushAsync(CancellationToken cancellationToken = default(CancellationToken))
{
return Task.CompletedTask;
}
}
CloudQueue Convert(QueueMoniker queueMoniker)
{
// var account = Task.Run(() => _storageAccountProvider.GetDashboardAccountAsync(CancellationToken.None)).GetAwaiter().GetResult();
var account = _storageAccountProvider.Get("Dashboard"); // $$$
var queue = account.CreateCloudQueueClient().GetQueueReference(queueMoniker.QueueName);
return queue;
}
public IListener CreateQueueListenr(
QueueMoniker queue,
QueueMoniker poisonQueue,
Func<string, CancellationToken, Task<FunctionResult>> callback
)
{
// Provide an upper bound on the maximum polling interval for run/abort from dashboard.
// This ensures that if users have customized this value the Dashboard will remain responsive.
TimeSpan maxPollingInterval = QueuePollingIntervals.DefaultMaximum;
var wrapper = new Wrapper
{
_callback = callback
};
IListener listener = new QueueListener(Convert(queue),
poisonQueue: Convert(poisonQueue),
triggerExecutor: wrapper,
exceptionHandler: _exceptionHandler,
loggerFactory: _loggerFactory,
sharedWatcher: _sharedWatcher,
queueOptions: _queueOptions,
maxPollingInterval: maxPollingInterval);
return listener;
}
// $$$ cleanup
class Wrapper : ITriggerExecutor<CloudQueueMessage>
{
public Func<string, CancellationToken, Task<FunctionResult>> _callback;
public Task<FunctionResult> ExecuteAsync(CloudQueueMessage value, CancellationToken cancellationToken)
{
return _callback(value.AsString, cancellationToken);
}
}
}
}

Просмотреть файл

@ -0,0 +1,74 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Bindings.StorageAccount;
using Microsoft.Azure.WebJobs.Host.Blobs;
using Microsoft.Azure.WebJobs.Host.Blobs.Bindings;
using Microsoft.Azure.WebJobs.Host.Blobs.Triggers;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Queues;
using Microsoft.Azure.WebJobs.Host.Queues.Bindings;
using Microsoft.Azure.WebJobs.Host.Queues.Listeners;
using Microsoft.Azure.WebJobs.Host.Queues.Triggers;
using Microsoft.Azure.WebJobs.Host.Tables;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.WindowsAzure.Storage;
using WebJobs.Extension.Storage;
namespace Microsoft.Extensions.Hosting
{
public static class IStorageHostBuilderExtensions
{
// $$$ Extensions need some way to register services! Callable from Script (so not via an explicit Extension method)
public static IHostBuilder AddStorageBindings(this IHostBuilder builder)
{
// add webjobs to user agent for all storage calls
OperationContext.GlobalSendingRequest += (sender, e) =>
{
// TODO: FACAVAL - This is not supported on by the latest version of the
// storage SDK. Need to re-add this when the capability is reintroduced.
// e.UserAgent += " AzureWebJobs";
};
return builder
.ConfigureServices((context, services) =>
{
// $$$ Move to Host.Storage?
services.TryAddSingleton<ISuperhack, Class1>();
services.TryAddSingleton<SharedQueueWatcher>();
// $$$ Remove this, should be done via DI
services.TryAddSingleton<ISharedContextProvider, SharedContextProvider>();
services.TryAddSingleton<XStorageAccountProvider>();
services.TryAddSingleton<IContextSetter<IBlobWrittenWatcher>>((p) => new ContextAccessor<IBlobWrittenWatcher>());
services.TryAddSingleton((p) => p.GetService<IContextSetter<IBlobWrittenWatcher>>() as IContextGetter<IBlobWrittenWatcher>);
services.TryAddSingleton<IContextSetter<IMessageEnqueuedWatcher>>((p) => new ContextAccessor<IMessageEnqueuedWatcher>());
services.TryAddSingleton((p) => p.GetService<IContextSetter<IMessageEnqueuedWatcher>>() as IContextGetter<IMessageEnqueuedWatcher>);
services.TryAddSingleton<BlobTriggerAttributeBindingProvider>();
services.TryAddSingleton<QueueTriggerAttributeBindingProvider>();
services.TryAddSingleton<CloudStorageAccountBindingProvider>();
})
.AddExtension<TableExtension>()
.AddExtension<QueueExtension>()
.AddExtension<BlobExtensionConfig>()
.AddExtension<BlobTriggerExtensionConfig>();
}
}
}

Просмотреть файл

@ -0,0 +1,12 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.
using System.Reflection;
using System.Runtime.CompilerServices;
//[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.ServiceBus.UnitTests")]
//[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Host.TestCommon")]
//[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Host.FunctionalTests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Extension.Storage.UnitTests")]
[assembly: InternalsVisibleTo("Microsoft.Azure.WebJobs.Host.UnitTests")]
[assembly: InternalsVisibleTo("DynamicProxyGenAssembly2")]

Просмотреть файл

@ -8,10 +8,8 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Queues.Triggers;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json.Linq;
@ -22,12 +20,15 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
internal class QueueExtension : IExtensionConfigProvider
{
private readonly IContextGetter<IMessageEnqueuedWatcher> _contextGetter;
private readonly IStorageAccountProvider _storageAccountProvider;
private readonly XStorageAccountProvider _storageAccountProvider;
private readonly QueueTriggerAttributeBindingProvider _triggerProvider;
public QueueExtension(IStorageAccountProvider storageAccountProvider, IContextGetter<IMessageEnqueuedWatcher> contextGetter)
public QueueExtension(XStorageAccountProvider storageAccountProvider, IContextGetter<IMessageEnqueuedWatcher> contextGetter,
QueueTriggerAttributeBindingProvider triggerProvider)
{
_contextGetter = contextGetter;
_storageAccountProvider = storageAccountProvider;
_triggerProvider = triggerProvider;
}
public void Initialize(ExtensionConfigContext context)
@ -37,23 +38,26 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
throw new ArgumentNullException(nameof(context));
}
context.AddBindingRule<QueueTriggerAttribute>().BindToTrigger(_triggerProvider);
var config = new PerHostConfig();
config.Initialize(context, _storageAccountProvider, _contextGetter);
}
// $$$ Get rid of PerHostConfig part?
// Multiple JobHost objects may share the same JobHostConfiguration.
// But queues have per-host instance state (IMessageEnqueuedWatcher).
// so capture that and create new binding rules per host instance.
private class PerHostConfig : IAsyncConverter<QueueAttribute, IAsyncCollector<IStorageQueueMessage>>
private class PerHostConfig : IAsyncConverter<QueueAttribute, IAsyncCollector<CloudQueueMessage>>
{
// Fields that the various binding funcs need to close over.
private IStorageAccountProvider _accountProvider;
private XStorageAccountProvider _accountProvider;
// Optimization where a queue output can directly trigger a queue input.
// This is per-host (not per-config)
private IContextGetter<IMessageEnqueuedWatcher> _messageEnqueuedWatcherGetter;
public void Initialize(ExtensionConfigContext context, IStorageAccountProvider storageAccountProvider, IContextGetter<IMessageEnqueuedWatcher> contextGetter)
public void Initialize(ExtensionConfigContext context, XStorageAccountProvider storageAccountProvider, IContextGetter<IMessageEnqueuedWatcher> contextGetter)
{
_accountProvider = storageAccountProvider;
_messageEnqueuedWatcherGetter = contextGetter;
@ -64,24 +68,23 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
// IStorageQueueMessage is the core testing interface
var binding = context.AddBindingRule<QueueAttribute>();
binding
.AddConverter<byte[], IStorageQueueMessage>(ConvertByteArrayToCloudQueueMessage)
.AddConverter<string, IStorageQueueMessage>(ConvertStringToCloudQueueMessage)
.AddOpenConverter<OpenType.Poco, IStorageQueueMessage>(ConvertPocoToCloudQueueMessage);
.AddConverter<byte[], CloudQueueMessage>(ConvertByteArrayToCloudQueueMessage)
.AddConverter<string, CloudQueueMessage>(ConvertStringToCloudQueueMessage)
.AddOpenConverter<OpenType.Poco, CloudQueueMessage>(ConvertPocoToCloudQueueMessage);
context // global converters, apply to multiple attributes.
.AddConverter<IStorageQueueMessage, byte[]>(ConvertCloudQueueMessageToByteArray)
.AddConverter<IStorageQueueMessage, string>(ConvertCloudQueueMessageToString)
.AddConverter<CloudQueueMessage, IStorageQueueMessage>(ConvertToStorageQueueMessage);
.AddConverter<CloudQueueMessage, byte[]>(ConvertCloudQueueMessageToByteArray)
.AddConverter<CloudQueueMessage, string>(ConvertCloudQueueMessageToString);
var builder = new QueueBuilder(this);
binding.AddValidator(ValidateQueueAttribute);
binding.SetPostResolveHook(ToWriteParameterDescriptorForCollector)
.BindToCollector<IStorageQueueMessage>(this);
.BindToCollector<CloudQueueMessage>(this);
binding.SetPostResolveHook(ToReadWriteParameterDescriptorForCollector)
.BindToInput<IStorageQueue>(builder);
.BindToInput<CloudQueue>(builder);
binding.SetPostResolveHook(ToReadWriteParameterDescriptorForCollector)
.BindToInput<CloudQueue>(builder);
@ -95,7 +98,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
return msg;
}
private IStorageQueueMessage ConvertJObjectToCloudQueueMessage(JObject obj, QueueAttribute attrResolved)
private CloudQueueMessage ConvertJObjectToCloudQueueMessage(JObject obj, QueueAttribute attrResolved)
{
var json = obj.ToString(); // convert to JSon
return ConvertStringToCloudQueueMessage(json, attrResolved);
@ -127,11 +130,9 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
private ParameterDescriptor ToParameterDescriptorForCollector(QueueAttribute attr, ParameterInfo parameter, INameResolver nameResolver, FileAccess access)
{
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
IStorageAccount account = _accountProvider.GetStorageAccountAsync(attr, CancellationToken.None, nameResolver).GetAwaiter().GetResult();
string accountName = account.Credentials.AccountName;
var account = _accountProvider.Get(attr.Connection, nameResolver);
var accountName = account.Name;
return new QueueParameterDescriptor
{
Name = parameter.Name,
@ -165,67 +166,52 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
}
}
private IStorageQueueMessage ConvertToStorageQueueMessage(CloudQueueMessage arg)
{
return new StorageQueueMessage(arg);
}
private byte[] ConvertCloudQueueMessageToByteArray(IStorageQueueMessage arg)
private byte[] ConvertCloudQueueMessageToByteArray(CloudQueueMessage arg)
{
return arg.AsBytes;
}
private string ConvertCloudQueueMessageToString(IStorageQueueMessage arg)
private string ConvertCloudQueueMessageToString(CloudQueueMessage arg)
{
return arg.AsString;
}
private IStorageQueueMessage ConvertByteArrayToCloudQueueMessage(byte[] arg, QueueAttribute attrResolved)
private CloudQueueMessage ConvertByteArrayToCloudQueueMessage(byte[] arg, QueueAttribute attrResolved)
{
IStorageQueue queue = GetQueue(attrResolved);
var msg = queue.CreateMessage(arg);
return msg;
return CloudQueueMessage.CreateCloudQueueMessageFromByteArray(arg);
}
private IStorageQueueMessage ConvertStringToCloudQueueMessage(string arg, QueueAttribute attrResolved)
private CloudQueueMessage ConvertStringToCloudQueueMessage(string arg, QueueAttribute attrResolved)
{
IStorageQueue queue = GetQueue(attrResolved);
var msg = queue.CreateMessage(arg);
return msg;
return new CloudQueueMessage(arg);
}
public async Task<IAsyncCollector<IStorageQueueMessage>> ConvertAsync(QueueAttribute attrResolved, CancellationToken cancellationToken)
public async Task<IAsyncCollector<CloudQueueMessage>> ConvertAsync(QueueAttribute attrResolved, CancellationToken cancellationToken)
{
IStorageQueue queue = await GetQueueAsync(attrResolved);
var queue = await GetQueueAsync(attrResolved);
return new QueueAsyncCollector(queue, _messageEnqueuedWatcherGetter.Value);
}
internal async Task<IStorageQueue> GetQueueAsync(QueueAttribute attrResolved)
internal async Task<CloudQueue> GetQueueAsync(QueueAttribute attrResolved)
{
var account = await _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None);
return GetQueue(attrResolved, account);
}
internal IStorageQueue GetQueue(QueueAttribute attrResolved)
{
// Avoid using the sync over async pattern (Async().GetAwaiter().GetResult()) whenever possible
var account = _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None).GetAwaiter().GetResult();
return GetQueue(attrResolved, account);
}
internal IStorageQueue GetQueue(QueueAttribute attrResolved, IStorageAccount account)
{
var client = account.CreateQueueClient();
// var account = await _accountProvider.GetStorageAccountAsync(attrResolved, CancellationToken.None);
var account = _accountProvider.Get(attrResolved.Connection);
var client = account.CreateCloudQueueClient();
string queueName = attrResolved.QueueName.ToLowerInvariant();
QueueClient.ValidateQueueName(queueName);
return client.GetQueueReference(queueName);
}
internal CloudQueue GetQueue(QueueAttribute attrResolved)
{
var queue = Task.Run(() => GetQueueAsync(attrResolved)).GetAwaiter().GetResult();
return queue;
}
}
private class QueueBuilder :
IAsyncConverter<QueueAttribute, IStorageQueue>,
IAsyncConverter<QueueAttribute, CloudQueue>
{
private readonly PerHostConfig _bindingProvider;
@ -235,38 +221,29 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Bindings
_bindingProvider = bindingProvider;
}
async Task<IStorageQueue> IAsyncConverter<QueueAttribute, IStorageQueue>.ConvertAsync(
QueueAttribute attrResolved,
CancellationToken cancellation)
{
IStorageQueue queue = await _bindingProvider.GetQueueAsync(attrResolved);
await queue.CreateIfNotExistsAsync(CancellationToken.None);
return queue;
}
async Task<CloudQueue> IAsyncConverter<QueueAttribute, CloudQueue>.ConvertAsync(
QueueAttribute attrResolved,
CancellationToken cancellation)
{
IAsyncConverter<QueueAttribute, IStorageQueue> convert = this;
IAsyncConverter<QueueAttribute, CloudQueue> convert = this;
var queue = await convert.ConvertAsync(attrResolved, cancellation);
return queue.SdkObject;
return queue;
}
}
// The core Async Collector for queueing messages.
internal class QueueAsyncCollector : IAsyncCollector<IStorageQueueMessage>
internal class QueueAsyncCollector : IAsyncCollector<CloudQueueMessage>
{
private readonly IStorageQueue _queue;
private readonly CloudQueue _queue;
private readonly IMessageEnqueuedWatcher _messageEnqueuedWatcher;
public QueueAsyncCollector(IStorageQueue queue, IMessageEnqueuedWatcher messageEnqueuedWatcher)
public QueueAsyncCollector(CloudQueue queue, IMessageEnqueuedWatcher messageEnqueuedWatcher)
{
this._queue = queue;
this._messageEnqueuedWatcher = messageEnqueuedWatcher;
}
public async Task AddAsync(IStorageQueueMessage message, CancellationToken cancellationToken = default(CancellationToken))
public async Task AddAsync(CloudQueueMessage message, CancellationToken cancellationToken = default(CancellationToken))
{
if (message == null)
{

Просмотреть файл

@ -8,8 +8,6 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage;
@ -21,9 +19,9 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
private readonly ITaskSeriesTimer _timer;
private readonly IDelayStrategy _delayStrategy;
private readonly IStorageQueue _queue;
private readonly IStorageQueue _poisonQueue;
private readonly ITriggerExecutor<IStorageQueueMessage> _triggerExecutor;
private readonly CloudQueue _queue;
private readonly CloudQueue _poisonQueue;
private readonly ITriggerExecutor<CloudQueueMessage> _triggerExecutor;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IMessageEnqueuedWatcher _sharedWatcher;
private readonly List<Task> _processing = new List<Task>();
@ -36,9 +34,9 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
private bool _disposed;
private TaskCompletionSource<object> _stopWaitingTaskSource;
public QueueListener(IStorageQueue queue,
IStorageQueue poisonQueue,
ITriggerExecutor<IStorageQueueMessage> triggerExecutor,
public QueueListener(CloudQueue queue,
CloudQueue poisonQueue,
ITriggerExecutor<CloudQueueMessage> triggerExecutor,
IWebJobsExceptionHandler exceptionHandler,
ILoggerFactory loggerFactory,
SharedQueueWatcher sharedWatcher,
@ -81,7 +79,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
EventHandler<PoisonMessageEventArgs> poisonMessageEventHandler = _sharedWatcher != null ? OnMessageAddedToPoisonQueue : (EventHandler<PoisonMessageEventArgs>)null;
_queueProcessor = queueProcessor ?? CreateQueueProcessor(
_queue.SdkObject, _poisonQueue != null ? _poisonQueue.SdkObject : null,
_queue, _poisonQueue,
loggerFactory, _queueuOptions, poisonMessageEventHandler);
TimeSpan maximumInterval = _queueProcessor.MaxPollingInterval;
@ -139,7 +137,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
_stopWaitingTaskSource = new TaskCompletionSource<object>();
}
IEnumerable<IStorageQueueMessage> batch;
IEnumerable<CloudQueueMessage> batch;
try
{
batch = await _queue.GetMessagesAsync(_queueProcessor.BatchSize,
@ -239,11 +237,11 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
}
}
internal async Task ProcessMessageAsync(IStorageQueueMessage message, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
internal async Task ProcessMessageAsync(CloudQueueMessage message, TimeSpan visibilityTimeout, CancellationToken cancellationToken)
{
try
{
if (!await _queueProcessor.BeginProcessingMessageAsync(message.SdkObject, cancellationToken))
if (!await _queueProcessor.BeginProcessingMessageAsync(message, cancellationToken))
{
return;
}
@ -258,7 +256,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
await timer.StopAsync(cancellationToken);
}
await _queueProcessor.CompleteProcessingMessageAsync(message.SdkObject, result, cancellationToken);
await _queueProcessor.CompleteProcessingMessageAsync(message, result, cancellationToken);
}
catch (StorageException ex) when (ex.IsTaskCanceled())
{
@ -284,8 +282,8 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
_sharedWatcher.Notify(e.PoisonQueue.Name);
}
private ITaskSeriesTimer CreateUpdateMessageVisibilityTimer(IStorageQueue queue,
IStorageQueueMessage message, TimeSpan visibilityTimeout,
private ITaskSeriesTimer CreateUpdateMessageVisibilityTimer(CloudQueue queue,
CloudQueueMessage message, TimeSpan visibilityTimeout,
IWebJobsExceptionHandler exceptionHandler)
{
// Update a message's visibility when it is halfway to expiring.

Просмотреть файл

@ -7,9 +7,9 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.Extensions.Logging;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
@ -17,20 +17,18 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
private static string poisonQueueSuffix = "-poison";
private readonly IStorageQueue _queue;
private readonly IStorageQueue _poisonQueue;
private readonly CloudQueue _queue;
private readonly CloudQueue _poisonQueue;
private readonly JobHostQueuesOptions _queueOptions;
private readonly IWebJobsExceptionHandler _exceptionHandler;
private readonly IContextSetter<IMessageEnqueuedWatcher> _messageEnqueuedWatcherSetter;
private readonly ISharedContextProvider _sharedContextProvider;
private readonly SharedQueueWatcher _messageEnqueuedWatcherSetter;
private readonly ILoggerFactory _loggerFactory;
private readonly ITriggeredFunctionExecutor _executor;
public QueueListenerFactory(IStorageQueue queue,
public QueueListenerFactory(CloudQueue queue,
JobHostQueuesOptions queueOptions,
IWebJobsExceptionHandler exceptionHandler,
IContextSetter<IMessageEnqueuedWatcher> messageEnqueuedWatcherSetter,
ISharedContextProvider sharedContextProvider,
SharedQueueWatcher messageEnqueuedWatcherSetter,
ILoggerFactory loggerFactory,
ITriggeredFunctionExecutor executor)
{
@ -38,7 +36,6 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
_queueOptions = queueOptions ?? throw new ArgumentNullException(nameof(queueOptions));
_exceptionHandler = exceptionHandler ?? throw new ArgumentNullException(nameof(exceptionHandler));
_messageEnqueuedWatcherSetter = messageEnqueuedWatcherSetter ?? throw new ArgumentNullException(nameof(messageEnqueuedWatcherSetter));
_sharedContextProvider = sharedContextProvider ?? throw new ArgumentNullException(nameof(sharedContextProvider));
_executor = executor ?? throw new ArgumentNullException(nameof(executor));
_poisonQueue = CreatePoisonQueueReference(queue.ServiceClient, queue.Name);
@ -50,16 +47,13 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
QueueTriggerExecutor triggerExecutor = new QueueTriggerExecutor(_executor);
SharedQueueWatcher sharedWatcher = _sharedContextProvider.GetOrCreateInstance<SharedQueueWatcher>(
new SharedQueueWatcherFactory(_messageEnqueuedWatcherSetter));
IListener listener = new QueueListener(_queue, _poisonQueue, triggerExecutor, _exceptionHandler, _loggerFactory,
sharedWatcher, _queueOptions);
_messageEnqueuedWatcherSetter, _queueOptions);
return Task.FromResult(listener);
}
private static IStorageQueue CreatePoisonQueueReference(IStorageQueueClient client, string name)
private static CloudQueue CreatePoisonQueueReference(CloudQueueClient client, string name)
{
Debug.Assert(client != null);

Просмотреть файл

@ -6,11 +6,11 @@ using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
internal class QueueTriggerExecutor : ITriggerExecutor<IStorageQueueMessage>
internal class QueueTriggerExecutor : ITriggerExecutor<CloudQueueMessage>
{
private readonly ITriggeredFunctionExecutor _innerExecutor;
@ -19,7 +19,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
_innerExecutor = innerExecutor;
}
public async Task<FunctionResult> ExecuteAsync(IStorageQueueMessage value, CancellationToken cancellationToken)
public async Task<FunctionResult> ExecuteAsync(CloudQueueMessage value, CancellationToken cancellationToken)
{
Guid? parentId = QueueCausalityManager.GetOwner(value);
TriggeredFunctionData input = new TriggeredFunctionData

Просмотреть файл

@ -4,8 +4,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Storage;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.Azure.WebJobs.Host.Timers;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
@ -14,12 +12,12 @@ namespace Microsoft.Azure.WebJobs.Host.Queues.Listeners
{
internal class UpdateQueueMessageVisibilityCommand : ITaskSeriesCommand
{
private readonly IStorageQueue _queue;
private readonly IStorageQueueMessage _message;
private readonly CloudQueue _queue;
private readonly CloudQueueMessage _message;
private readonly TimeSpan _visibilityTimeout;
private readonly IDelayStrategy _speedupStrategy;
public UpdateQueueMessageVisibilityCommand(IStorageQueue queue, IStorageQueueMessage message,
public UpdateQueueMessageVisibilityCommand(CloudQueue queue, CloudQueueMessage message,
TimeSpan visibilityTimeout, IDelayStrategy speedupStrategy)
{
if (queue == null)

Просмотреть файл

@ -6,7 +6,6 @@ using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.Contracts;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json.Linq;
@ -41,7 +40,7 @@ namespace Microsoft.Azure.WebJobs.Host.Queues
}
[DebuggerNonUserCode]
public static Guid? GetOwner(IStorageQueueMessage msg)
public static Guid? GetOwner(CloudQueueMessage msg)
{
string text = msg.TryGetAsString();

Просмотреть файл

@ -3,21 +3,15 @@
using System;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Storage.Queue;
using Microsoft.WindowsAzure.Storage.Queue;
namespace Microsoft.Azure.WebJobs.Host.Queues
{
internal static class QueueClient
{
public static string GetAccountName(IStorageQueueClient client)
public static string GetAccountName(this CloudQueueClient client)
{
if (client == null)
{
return null;
}
return StorageClient.GetAccountName(client.Credentials);
return client?.Credentials?.AccountName;
}
/// <summary>

Некоторые файлы не были показаны из-за слишком большого количества измененных файлов Показать больше