Update functions binding to 2.0 model (#201)

This commit is contained in:
Anca Antochi 2018-09-05 16:37:58 -07:00 коммит произвёл GitHub
Родитель 1ab956af79
Коммит 1bc69d1e8b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
43 изменённых файлов: 1574 добавлений и 262 удалений

Просмотреть файл

@ -93,19 +93,11 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{1ABF19EE-768
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "binding", "binding", "{B8B03013-8D6E-4D4E-9914-33D55074DEC8}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Functions.Binding", "edge-modules\functions\binding\src\Microsoft.Azure.Devices.Edge.Functions.Binding\Microsoft.Azure.Devices.Edge.Functions.Binding.csproj", "{63120FDB-92DD-4E02-B77A-E4B51999A0BB}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "samples", "samples", "{33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}"
ProjectSection(SolutionItems) = preProject
edge-modules\functions\samples\host.json = edge-modules\functions\samples\host.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "EdgeHubTrigger-CSharp", "EdgeHubTrigger-CSharp", "{AFA4A92F-62AA-4EA9-9AE2-70E7EC5FCB12}"
ProjectSection(SolutionItems) = preProject
edge-modules\functions\samples\EdgeHubTrigger-CSharp\function.json = edge-modules\functions\samples\EdgeHubTrigger-CSharp\function.json
edge-modules\functions\samples\EdgeHubTrigger-CSharp\run.csx = edge-modules\functions\samples\EdgeHubTrigger-CSharp\run.csx
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Storage", "edge-util\src\Microsoft.Azure.Devices.Edge.Storage\Microsoft.Azure.Devices.Edge.Storage.csproj", "{DB92F6C3-A611-40C5-B464-8F11A3EE1AC3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.Devices.Edge.Storage.RocksDb", "edge-util\src\Microsoft.Azure.Devices.Edge.Storage.RocksDb\Microsoft.Azure.Devices.Edge.Storage.RocksDb.csproj", "{D6BFCE1F-0F53-43F1-8D2B-FC0FF8EEFA9C}"
@ -120,11 +112,6 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "docker", "docker", "{64E95D7B-8541-4A88-A291-8BE3445FB8FF}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "bin", "bin", "{80688705-AEFD-4297-907E-2B20DAFBB5D5}"
ProjectSection(SolutionItems) = preProject
edge-modules\functions\samples\bin\extensions.json = edge-modules\functions\samples\bin\extensions.json
EndProjectSection
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "linux", "linux", "{BCAA863C-8BC2-4FB1-9D9E-7BCADBCFC3A5}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "amd64", "amd64", "{B5BB5CDE-E6C7-4896-A66E-17D81BDB9A31}"
@ -168,6 +155,17 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "MessagesAnalyzer", "edge-mo
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "LeafDevice", "smoke\LeafDevice\LeafDevice.csproj", "{C5CBC493-96A4-4628-A952-8C7B9EEF1441}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Microsoft.Azure.WebJobs.Extensions.EdgeHub", "edge-modules\functions\binding\src\Microsoft.Azure.WebJobs.Extensions.EdgeHub\Microsoft.Azure.WebJobs.Extensions.EdgeHub.csproj", "{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "EdgeHubTrigger-CSharp", "EdgeHubTrigger-CSharp", "{C3BDC9FA-B7D8-44F3-970F-D24281335F46}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "EdgeHubTriggerCSharp", "edge-modules\functions\samples\EdgeHubTrigger-Csharp\EdgeHubTriggerCSharp.csproj", "{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "bin", "bin", "{3A879277-5A61-4A6A-BB27-5281FA83B1D3}"
ProjectSection(SolutionItems) = preProject
edge-modules\functions\samples\bin\extensions.json = edge-modules\functions\samples\bin\extensions.json
EndProjectSection
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
CodeCoverage|Any CPU = CodeCoverage|Any CPU
@ -325,12 +323,6 @@ Global
{D67D382A-5F16-4AB2-8082-DBE62C33E5B1}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D67D382A-5F16-4AB2-8082-DBE62C33E5B1}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D67D382A-5F16-4AB2-8082-DBE62C33E5B1}.Release|Any CPU.Build.0 = Release|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.CodeCoverage|Any CPU.ActiveCfg = CodeCoverage|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.CodeCoverage|Any CPU.Build.0 = CodeCoverage|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{63120FDB-92DD-4E02-B77A-E4B51999A0BB}.Release|Any CPU.Build.0 = Release|Any CPU
{DB92F6C3-A611-40C5-B464-8F11A3EE1AC3}.CodeCoverage|Any CPU.ActiveCfg = CodeCoverage|Any CPU
{DB92F6C3-A611-40C5-B464-8F11A3EE1AC3}.CodeCoverage|Any CPU.Build.0 = CodeCoverage|Any CPU
{DB92F6C3-A611-40C5-B464-8F11A3EE1AC3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
@ -415,6 +407,18 @@ Global
{C5CBC493-96A4-4628-A952-8C7B9EEF1441}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C5CBC493-96A4-4628-A952-8C7B9EEF1441}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C5CBC493-96A4-4628-A952-8C7B9EEF1441}.Release|Any CPU.Build.0 = Release|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.CodeCoverage|Any CPU.ActiveCfg = CodeCoverage|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.CodeCoverage|Any CPU.Build.0 = CodeCoverage|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4}.Release|Any CPU.Build.0 = Release|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.CodeCoverage|Any CPU.ActiveCfg = Debug|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.CodeCoverage|Any CPU.Build.0 = Debug|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -454,14 +458,11 @@ Global
{C4502187-81D7-4299-85C3-64AE241503E0} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
{1ABF19EE-7689-4EE3-A099-16111A8F16F4} = {B8B03013-8D6E-4D4E-9914-33D55074DEC8}
{B8B03013-8D6E-4D4E-9914-33D55074DEC8} = {C4502187-81D7-4299-85C3-64AE241503E0}
{63120FDB-92DD-4E02-B77A-E4B51999A0BB} = {1ABF19EE-7689-4EE3-A099-16111A8F16F4}
{33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E} = {C4502187-81D7-4299-85C3-64AE241503E0}
{AFA4A92F-62AA-4EA9-9AE2-70E7EC5FCB12} = {33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}
{DB92F6C3-A611-40C5-B464-8F11A3EE1AC3} = {66964A75-04AC-4FDE-8505-E6CB2EF90BE8}
{D6BFCE1F-0F53-43F1-8D2B-FC0FF8EEFA9C} = {66964A75-04AC-4FDE-8505-E6CB2EF90BE8}
{3EB5B58A-5820-44E7-9558-917C105B940D} = {373FFF5E-E84C-4789-B768-676FFF51E7A6}
{64E95D7B-8541-4A88-A291-8BE3445FB8FF} = {33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}
{80688705-AEFD-4297-907E-2B20DAFBB5D5} = {33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}
{BCAA863C-8BC2-4FB1-9D9E-7BCADBCFC3A5} = {64E95D7B-8541-4A88-A291-8BE3445FB8FF}
{B5BB5CDE-E6C7-4896-A66E-17D81BDB9A31} = {BCAA863C-8BC2-4FB1-9D9E-7BCADBCFC3A5}
{31B0F93F-3E58-4C66-9DCC-24236EA81F16} = {BCAA863C-8BC2-4FB1-9D9E-7BCADBCFC3A5}
@ -478,6 +479,10 @@ Global
{54771470-860C-4853-9318-6DB4EA76B595} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
{047DC795-A159-4BFF-AC0F-4DCE51A79C2C} = {578D5330-2F72-44C6-9DB5-C93B3F42C473}
{C5CBC493-96A4-4628-A952-8C7B9EEF1441} = {871A0862-7480-49C3-ACEB-9A60E9CE5B61}
{0D5C5996-7414-47D2-AF0F-C68E0E3F3AA4} = {1ABF19EE-7689-4EE3-A099-16111A8F16F4}
{C3BDC9FA-B7D8-44F3-970F-D24281335F46} = {33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}
{B8D5312A-B37B-4FA3-8B80-2D1A93077DDF} = {C3BDC9FA-B7D8-44F3-970F-D24281335F46}
{3A879277-5A61-4A6A-BB27-5281FA83B1D3} = {33B7755E-D6F9-4F9B-86A8-5DFE9FEE674E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {D71830F5-3AF5-46B4-8A9E-1DCE4F2253AC}

Просмотреть файл

@ -1,7 +0,0 @@
FROM microsoft/azure-functions-dotnet-core2.0:2.0
WORKDIR /app
COPY $EXE_DIR/ ./bindings
CMD ["dotnet", "/azure-functions-host/Microsoft.Azure.WebJobs.Script.WebHost.dll"]

Просмотреть файл

@ -1,5 +0,0 @@
FROM microsoft/azure-functions-dotnet-core2.0:2.0-arm32v7
WORKDIR /app
COPY $EXE_DIR/ ./bindings

Просмотреть файл

@ -1,9 +0,0 @@
# escape=`
FROM microsoft/azure-functions-dotnet-core2.0:2.0-nanoserver-1803
ARG EXE_DIR=.
WORKDIR /app
COPY $EXE_DIR/ ./bindings

Просмотреть файл

@ -1,49 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Config
{
using System;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Newtonsoft.Json;
/// <summary>
/// Extension configuration provider used to register EdgeHub triggers and binders
/// </summary>
public class EdgeHubExtensionConfigProvider : IExtensionConfigProvider
{
public void Initialize(ExtensionConfigContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
var extensions = context.Config.GetService<IExtensionRegistry>();
// register trigger binding provider
var triggerBindingProvider = new EdgeHubTriggerBindingProvider();
extensions.RegisterExtension<ITriggerBindingProvider>(triggerBindingProvider);
extensions.RegisterBindingRules<EdgeHubAttribute>();
FluentBindingRule<EdgeHubAttribute> rule = context.AddBindingRule<EdgeHubAttribute>();
rule.BindToCollector<Message>(typeof(EdgeHubCollectorBuilder));
context.AddConverter<Message, string>(this.MessageConverter);
context.AddConverter<string, Message>(this.ConvertToMessage);
}
Message ConvertToMessage(string str)
{
return JsonConvert.DeserializeObject<Message>(str);
}
string MessageConverter(Message msg)
{
return JsonConvert.SerializeObject(msg);
}
}
}

Просмотреть файл

@ -1,32 +0,0 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Config
{
using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Azure.WebJobs.Host.Config;
/// <summary>
/// Extension methods for EdgeHub integration
/// </summary>
public static class EdgeHubHostConfigExtensions
{
/// <summary>
/// Enables use of EdgeHub binding extensions
/// </summary>
/// <param name="config">The <see cref="JobHostConfiguration"/> to configure.</param>
public static void UseEdgeHub(this JobHostConfiguration config)
{
if (config == null)
{
throw new ArgumentNullException(nameof(config));
}
var extensions = config.GetService<IExtensionRegistry>();
var extensionConfig = new EdgeHubExtensionConfigProvider();
extensions.RegisterExtension<IExtensionConfigProvider>(extensionConfig);
}
}
}

Просмотреть файл

@ -0,0 +1,62 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
//
// Code ported from http://blogs.msdn.com/b/pfxteam/archive/2012/02/12/10266988.aspx
//
using System;
using System.Threading;
using System.Threading.Tasks;
sealed class AsyncLock : IDisposable
{
readonly Task<Releaser> releaser;
readonly SemaphoreSlim semaphore;
public AsyncLock()
: this(1)
{
}
public AsyncLock(int maximumConcurrency)
{
this.releaser = Task.FromResult(new Releaser(this));
this.semaphore = new SemaphoreSlim(maximumConcurrency, maximumConcurrency);
}
public Task<Releaser> LockAsync() => this.LockAsync(CancellationToken.None);
public Task<Releaser> LockAsync(CancellationToken token)
{
Task wait = this.semaphore.WaitAsync(token);
return wait.Status == TaskStatus.RanToCompletion ? this.releaser :
wait.ContinueWith((_, state) => new Releaser((AsyncLock)state),
this, CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously | TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Default);
}
/// <inheritdoc />
public void Dispose() => this.semaphore.Dispose();
public struct Releaser : IDisposable
{
readonly AsyncLock toRelease;
int disposed;
public Releaser(AsyncLock toRelease)
{
this.toRelease = toRelease ?? throw new ArgumentNullException(nameof(toRelease));
this.disposed = 0;
}
public void Dispose()
{
if (0 == Interlocked.Exchange(ref this.disposed, 1))
{
this.toRelease.semaphore.Release();
}
}
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Collections.Generic;

Просмотреть файл

@ -1,15 +1,16 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using Microsoft.Azure.WebJobs.Description;
[Binding]
[AttributeUsage(AttributeTargets.Parameter)]
public class EdgeHubAttribute : Attribute
{
public string OutputName { get; set; }
public int BatchSize { get; set; }
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using Microsoft.Azure.WebJobs.Description;

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Collections.Concurrent;
@ -8,7 +8,6 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding
using System.Reflection;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings;
using Microsoft.Azure.WebJobs.Host.Triggers;
/// <summary>
@ -35,11 +34,6 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding
return null;
}
if (parameter.ParameterType != typeof(Message) && parameter.ParameterType != typeof(string))
{
throw new InvalidOperationException($"Can't bind EdgeHubTriggerAttribute to type '{parameter.ParameterType}'.");
}
await this.TrySetEventDefaultHandlerAsync().ConfigureAwait(false);
var messageProcessor = new EdgeHubMessageProcessor();

Просмотреть файл

@ -0,0 +1,17 @@
// Copyright (c) Microsoft. All rights reserved.
using Microsoft.Azure.WebJobs.Extensions.EdgeHub;
using Microsoft.Azure.WebJobs.Hosting;
[assembly: WebJobsStartup(typeof(EdgeHubWebJobsStartup))]
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using Microsoft.Azure.WebJobs.Extensions.EdgeHub.Config;
using Microsoft.Azure.WebJobs.Hosting;
public class EdgeHubWebJobsStartup : IWebJobsStartup
{
public void Configure(IWebJobsBuilder builder) => builder.AddEdge();
}
}

Просмотреть файл

@ -1,11 +1,25 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AssemblyName>Microsoft.Azure.Devices.Edge.Functions.BindingExtension</AssemblyName>
<AssemblyName>Microsoft.Azure.WebJobs.Extensions.EdgeHub</AssemblyName>
<TreatWarningsAsErrors>True</TreatWarningsAsErrors>
<Configurations>Debug;Release;CodeCoverage</Configurations>
<HighEntropyVA>true</HighEntropyVA>
<Version>1.0.0-beta-004</Version>
</PropertyGroup>
<PropertyGroup>
<Version>1.0.0-beta-004</Version>
<Title>Microsoft Azure Edge Function extension</Title>
<Authors>Microsoft</Authors>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageLicenseUrl>https://github.com/Azure/iotedge/blob/master/LICENSE</PackageLicenseUrl>
<Description>This package contains binding extensions for IoTEdge.</Description>
<PackageIconUrl>http://go.microsoft.com/fwlink/?LinkID=288890</PackageIconUrl>
<PackageProjectUrl>https://github.com/Azure/iotedge</PackageProjectUrl>
<Copyright>© Microsoft Corporation. All rights reserved.</Copyright>
<PackageTags>IoT Microsoft Azure IoTEdge Functions</PackageTags>
</PropertyGroup>
<!--
@ -23,24 +37,6 @@
<ItemGroup>
<PackageReference Include="Microsoft.Azure.Devices.Client" Version="1.18.0" />
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="3.0.0-beta3" />
</ItemGroup>
<ItemGroup>
<Content Include="../../docker*/**/*.*">
<Link>%(RecursiveDir)%(Filename)%(Extension)</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<Content Include="../../scripts/**/*.*">
<Link>%(RecursiveDir)%(Filename)%(Extension)</Link>
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</Content>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\..\edge-util\src\Microsoft.Azure.Devices.Edge.Util\Microsoft.Azure.Devices.Edge.Util.csproj" />
<PackageReference Include="Microsoft.Azure.WebJobs" Version="3.0.0-beta7-11412" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,21 +1,19 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.Devices.Edge.Util;
using Microsoft.Azure.Devices.Edge.Util.Concurrency;
using Microsoft.Azure.Devices.Edge.Util.TransientFaultHandling;
using ExponentialBackoff = Util.TransientFaultHandling.ExponentialBackoff;
class ModuleClientCache
{
const int RetryCount = 5;
static readonly ITransientErrorDetectionStrategy TimeoutErrorDetectionStrategy = new DelegateErrorDetectionStrategy(ex => ex.HasTimeoutException());
static readonly RetryStrategy TransientRetryStrategy =
new ExponentialBackoff(RetryCount, TimeSpan.FromSeconds(2), TimeSpan.FromSeconds(60), TimeSpan.FromSeconds(4));
readonly AsyncLock asyncLock = new AsyncLock();
ModuleClient client;

Просмотреть файл

@ -1,8 +1,10 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Azure.Devices.Client;
static class Utils
@ -18,5 +20,10 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding
return copy;
}
public static bool HasTimeoutException(this Exception ex) =>
ex != null &&
(ex is TimeoutException || HasTimeoutException(ex.InnerException) ||
(ex is AggregateException argEx && (argEx.InnerExceptions?.Select(e => HasTimeoutException(e)).Any(e => e) ?? false)));
}
}

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.WebJobs;

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
@ -29,4 +29,4 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
this.handler = functionsMessageHandler;
}
}
}
}

Просмотреть файл

@ -1,6 +1,6 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Collections.Generic;
@ -8,13 +8,12 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
using System.Reflection;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.WebJobs.Extensions.Bindings;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
using Microsoft.Azure.WebJobs.Host.Triggers;
using Devices.Client;
using Host.Bindings;
using Host.Executors;
using Host.Listeners;
using Host.Protocols;
using Host.Triggers;
/// <summary>
/// Implements a trigger binding for EdgeHub which triggers a function
@ -46,8 +45,7 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
throw new NotSupportedException("Message is required.");
}
IValueBinder valueBinder = new EdgeHubValueBinder(this.parameter, triggerValue);
return Task.FromResult<ITriggerData>(new TriggerData(valueBinder, this.GetBindingData(triggerValue)));
return Task.FromResult<ITriggerData>(new TriggerData(null, this.GetBindingData(triggerValue)));
}
public Task<IListener> CreateListenerAsync(ListenerFactoryContext context)
@ -72,7 +70,9 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
IReadOnlyDictionary<string, object> GetBindingData(Message value)
{
var bindingData = new Dictionary<string, object>(StringComparer.OrdinalIgnoreCase);
bindingData.Add("EdgeHubTrigger", value);
bindingData.Add("EnqueuedTimeUtc", value.CreationTimeUtc);
bindingData.Add("SequenceNumber", value.SequenceNumber);
bindingData.Add("Properties", value.Properties);
return bindingData;
}
@ -80,7 +80,9 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
IReadOnlyDictionary<string, Type> CreateBindingDataContract()
{
var contract = new Dictionary<string, Type>(StringComparer.OrdinalIgnoreCase);
contract.Add("EdgeHubTrigger", typeof(Message));
contract.Add("EnqueuedTimeUtc", typeof(DateTime));
contract.Add("SequenceNumber", typeof(ulong));
contract.Add("Properties", typeof(IDictionary<string, string>));
return contract;
}
@ -93,27 +95,6 @@ namespace Microsoft.Azure.Devices.Edge.Functions.Binding.Bindings
}
}
class EdgeHubValueBinder : ValueBinder
{
readonly object value;
public EdgeHubValueBinder(ParameterInfo parameter, Message value)
: base(parameter.ParameterType)
{
this.value = value;
}
public override Task<object> GetValueAsync()
{
return Task.FromResult(this.value);
}
public override string ToInvokeString()
{
return "Message";
}
}
class Listener : IListener
{
readonly ITriggeredFunctionExecutor executor;

Просмотреть файл

@ -0,0 +1,52 @@
// Copyright (c) Microsoft. All rights reserved.
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.WebJobs.Host.Bindings;
using Newtonsoft.Json.Linq;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub.Config
{
using System;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.WebJobs.Description;
using Microsoft.Azure.WebJobs.Host.Config;
using Newtonsoft.Json;
/// <summary>
/// Extension configuration provider used to register EdgeHub triggers and binders
/// </summary>
[Extension("EdgeHub")]
class EdgeHubExtensionConfigProvider : IExtensionConfigProvider
{
public void Initialize(ExtensionConfigContext context)
{
if (context == null)
{
throw new ArgumentNullException(nameof(context));
}
var bindingProvider = new EdgeHubTriggerBindingProvider();
var rule = context.AddBindingRule<EdgeHubTriggerAttribute>();
rule.AddConverter<Message, string>(ConvertMessageToString);
rule.AddConverter<Message, byte[]>(ConvertMessageToBytes);
rule.BindToTrigger<Message>(bindingProvider);
var rule2 = context.AddBindingRule<EdgeHubAttribute>();
rule2.BindToCollector<Message>(typeof(EdgeHubCollectorBuilder));
rule2.AddConverter<string, Message>(ConvertStringToMessage);
rule2.AddConverter<byte[], Message>(ConvertBytesToMessage);
rule2.AddOpenConverter<OpenType.Poco, Message>(ConvertPocoToMessage);
}
private Task<object> ConvertPocoToMessage(object src, Attribute attribute, ValueBindingContext context) => Task.FromResult<object>(ConvertStringToMessage(JsonConvert.SerializeObject(src)));
private static Message ConvertBytesToMessage(byte[] msgBytes) => new Message(msgBytes);
private static Message ConvertStringToMessage(string msg) => ConvertBytesToMessage(Encoding.UTF8.GetBytes(msg));
private static byte[] ConvertMessageToBytes(Message msg) => msg.GetBytes();
private static string ConvertMessageToString(Message msg) => Encoding.UTF8.GetString(ConvertMessageToBytes(msg));
}
}

Просмотреть файл

@ -0,0 +1,29 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub.Config
{
using System;
using Microsoft.Azure.WebJobs;
/// <summary>
/// Extension methods for EdgeHub integration
/// </summary>
public static class EdgeHubHostConfigExtensions
{
/// <summary>
/// Adds EdgeHub binding extensions <see cref="IWebJobsBuilder"/>.
/// </summary>
/// <param name="builder">The <see cref="IWebJobsBuilder"/> to configure.</param>
public static IWebJobsBuilder AddEdge(this IWebJobsBuilder builder)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
builder.AddExtension<EdgeHubExtensionConfigProvider>();
return builder;
}
}
}

Просмотреть файл

@ -0,0 +1,93 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
/// <summary>
/// Provides a wrapper for a non-generic <see cref="T:System.Threading.Tasks.Task" /> and calls into the pipeline
/// to retry only the generic version of the <see cref="T:System.Threading.Tasks.Task" />.
/// </summary>
class AsyncExecution : AsyncExecution<bool>
{
static Task<bool> cachedBoolTask;
public AsyncExecution(Func<Task> taskAction, ShouldRetry shouldRetry, Func<Exception, bool> isTransient, Action<int, Exception, TimeSpan> onRetrying, bool fastFirstRetry, CancellationToken cancellationToken)
: base(() => StartAsGenericTask(taskAction), shouldRetry, isTransient, onRetrying, fastFirstRetry, cancellationToken)
{
}
/// <summary>
/// Wraps the non-generic <see cref="T:System.Threading.Tasks.Task" /> into a generic <see cref="T:System.Threading.Tasks.Task" />.
/// </summary>
/// <param name="taskAction">The task to wrap.</param>
/// <returns>A <see cref="T:System.Threading.Tasks.Task" /> that wraps the non-generic <see cref="T:System.Threading.Tasks.Task" />.</returns>
static Task<bool> StartAsGenericTask(Func<Task> taskAction)
{
Task task = taskAction();
if (task == null)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} cannot be null", new object[]
{
"taskAction"
}), nameof(taskAction));
}
if (task.Status == TaskStatus.RanToCompletion)
{
return GetCachedTask();
}
if (task.Status == TaskStatus.Created)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} must be scheduled", new object[]
{
"taskAction"
}), nameof(taskAction));
}
var tcs = new TaskCompletionSource<bool>();
task.ContinueWith(delegate (Task t)
{
if (t.IsFaulted)
{
if (t.Exception != null)
tcs.TrySetException(t.Exception.InnerExceptions);
return;
}
if (t.IsCanceled)
{
tcs.TrySetCanceled();
return;
}
tcs.TrySetResult(true);
}, TaskContinuationOptions.ExecuteSynchronously);
return tcs.Task;
}
static Task<bool> GetCachedTask()
{
if (cachedBoolTask == null)
{
var taskCompletionSource = new TaskCompletionSource<bool>();
taskCompletionSource.TrySetResult(true);
cachedBoolTask = taskCompletionSource.Task;
}
return cachedBoolTask;
}
}
}

Просмотреть файл

@ -0,0 +1,152 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
using System.Globalization;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Handles the execution and retries of the user-initiated task.
/// </summary>
/// <typeparam name="TResult">The result type of the user-initiated task.</typeparam>
class AsyncExecution<TResult>
{
readonly Func<Task<TResult>> taskFunc;
readonly ShouldRetry shouldRetry;
readonly Func<Exception, bool> isTransient;
readonly Action<int, Exception, TimeSpan> onRetrying;
readonly bool fastFirstRetry;
readonly CancellationToken cancellationToken;
Task<TResult> previousTask;
int retryCount;
public AsyncExecution(Func<Task<TResult>> taskFunc, ShouldRetry shouldRetry, Func<Exception, bool> isTransient, Action<int, Exception, TimeSpan> onRetrying, bool fastFirstRetry, CancellationToken cancellationToken)
{
this.taskFunc = taskFunc;
this.shouldRetry = shouldRetry;
this.isTransient = isTransient;
this.onRetrying = onRetrying;
this.fastFirstRetry = fastFirstRetry;
this.cancellationToken = cancellationToken;
}
internal Task<TResult> ExecuteAsync()
{
return this.ExecuteAsyncImpl(null);
}
Task<TResult> ExecuteAsyncImpl(Task ignore)
{
if (this.cancellationToken.IsCancellationRequested)
{
if (this.previousTask != null)
{
return this.previousTask;
}
var taskCompletionSource = new TaskCompletionSource<TResult>();
taskCompletionSource.TrySetCanceled();
return taskCompletionSource.Task;
}
else
{
Task<TResult> task;
try
{
task = this.taskFunc();
}
catch (Exception ex)
{
if (!this.isTransient(ex))
{
throw;
}
var taskCompletionSource2 = new TaskCompletionSource<TResult>();
taskCompletionSource2.TrySetException(ex);
task = taskCompletionSource2.Task;
}
if (task == null)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} cannot be null", new object[]
{
"taskFunc"
}), nameof(this.taskFunc));
}
if (task.Status == TaskStatus.RanToCompletion)
{
return task;
}
if (task.Status == TaskStatus.Created)
{
throw new ArgumentException(string.Format(CultureInfo.InvariantCulture, "{0} must be scheduled", new object[]
{
"taskFunc"
}), nameof(this.taskFunc));
}
return task.ContinueWith(new Func<Task<TResult>, Task<TResult>>(this.ExecuteAsyncContinueWith), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
}
Task<TResult> ExecuteAsyncContinueWith(Task<TResult> runningTask)
{
if (!runningTask.IsFaulted || this.cancellationToken.IsCancellationRequested)
{
return runningTask;
}
TimeSpan zero;
Exception innerException = runningTask.Exception.InnerException;
#pragma warning disable CS0618 // Type or member is obsolete
if (innerException is RetryLimitExceededException)
#pragma warning restore CS0618 // Type or member is obsolete
{
var taskCompletionSource = new TaskCompletionSource<TResult>();
if (innerException.InnerException != null)
{
taskCompletionSource.TrySetException(innerException.InnerException);
}
else
{
taskCompletionSource.TrySetCanceled();
}
return taskCompletionSource.Task;
}
if (!this.isTransient(innerException) || !this.shouldRetry(this.retryCount++, innerException, out zero))
{
return runningTask;
}
if (zero < TimeSpan.Zero)
{
zero = TimeSpan.Zero;
}
this.onRetrying(this.retryCount, innerException, zero);
this.previousTask = runningTask;
if (zero > TimeSpan.Zero && (this.retryCount > 1 || !this.fastFirstRetry))
{
return Task.Delay(zero, this.cancellationToken).ContinueWith(new Func<Task, Task<TResult>>(this.ExecuteAsyncImpl), CancellationToken.None, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default).Unwrap();
}
return this.ExecuteAsyncImpl(null);
}
}
}

Просмотреть файл

@ -0,0 +1,21 @@
// Copyright (c) Microsoft. All rights reserved.
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
/// <summary>
/// An error detection strategy that delegates the detection to a lambda.
/// </summary>
class DelegateErrorDetectionStrategy : ITransientErrorDetectionStrategy
{
readonly Func<Exception, bool> underlying;
public DelegateErrorDetectionStrategy(Func<Exception, bool> isTransient)
{
this.underlying = isTransient ?? throw new ArgumentNullException(nameof(isTransient));
}
public bool IsTransient(Exception ex) => this.underlying(ex);
}
}

Просмотреть файл

@ -0,0 +1,79 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// A retry strategy with back-off parameters for calculating the exponential delay between retries.
/// Note: this fixes an overflow in the stock ExponentialBackoff in the Transient Fault Handling library
/// which causes the calculated delay to go negative.
/// Use of this class for exponential backoff is encouraged instead.
/// </summary>
class ExponentialBackoff : RetryStrategy
{
readonly int retryCount;
readonly TimeSpan minBackoff;
readonly TimeSpan maxBackoff;
readonly TimeSpan deltaBackoff;
public ExponentialBackoff()
: this(DefaultClientRetryCount, DefaultMinBackoff, DefaultMaxBackoff, DefaultClientBackoff)
{
}
public ExponentialBackoff(int retryCount, TimeSpan minBackoff, TimeSpan maxBackoff, TimeSpan deltaBackoff)
: this(retryCount, minBackoff, maxBackoff, deltaBackoff, DefaultFirstFastRetry)
{
}
public ExponentialBackoff(int retryCount, TimeSpan minBackoff, TimeSpan maxBackoff, TimeSpan deltaBackoff, bool firstFastRetry)
: base(firstFastRetry)
{
Guard.ArgumentNotNegativeValue(retryCount, "retryCount");
Guard.ArgumentNotNegativeValue(minBackoff.Ticks, "minBackoff");
Guard.ArgumentNotNegativeValue(maxBackoff.Ticks, "minBackoff");
Guard.ArgumentNotNegativeValue(deltaBackoff.Ticks, "deltaBackoff");
Guard.ArgumentNotGreaterThan(minBackoff.TotalMilliseconds, maxBackoff.TotalMilliseconds, "minBackoff must be less than or equal to maxBackoff");
this.retryCount = retryCount;
this.minBackoff = minBackoff;
this.maxBackoff = maxBackoff;
this.deltaBackoff = deltaBackoff;
}
public override ShouldRetry GetShouldRetry()
{
return (int currentRetryCount, Exception lastException, out TimeSpan retryInterval) =>
{
if (currentRetryCount < this.retryCount)
{
var random = new Random();
double length = Math.Min(
this.minBackoff.TotalMilliseconds + (Math.Pow(2.0, currentRetryCount) - 1.0) * (0.8 + random.NextDouble() * 0.4) * this.deltaBackoff.TotalMilliseconds,
this.maxBackoff.TotalMilliseconds);
retryInterval = TimeSpan.FromMilliseconds(length);
return true;
}
else
{
retryInterval = TimeSpan.Zero;
return false;
}
};
}
}
}

Просмотреть файл

@ -0,0 +1,94 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Represents a retry strategy with a specified number of retry attempts and a default, fixed time interval between retries.
/// </summary>
class FixedInterval : RetryStrategy
{
private readonly int retryCount;
private readonly TimeSpan retryInterval;
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> class.
/// </summary>
public FixedInterval() : this(DefaultClientRetryCount)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> class with the specified number of retry attempts.
/// </summary>
/// <param name="retryCount">The number of retry attempts.</param>
public FixedInterval(int retryCount) : this(retryCount, DefaultRetryInterval)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> class with the specified number of retry attempts, time interval, and retry strategy.
/// </summary>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="retryInterval">The time interval between retries.</param>
public FixedInterval(int retryCount, TimeSpan retryInterval) : this(retryCount, retryInterval, DefaultFirstFastRetry)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> class with the specified number of retry attempts, time interval, retry strategy, and fast start option.
/// </summary>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="retryInterval">The time interval between retries.</param>
/// <param name="firstFastRetry">true to immediately retry in the first attempt; otherwise, false. The subsequent retries will remain subject to the configured retry interval.</param>
public FixedInterval(int retryCount, TimeSpan retryInterval, bool firstFastRetry) : base(firstFastRetry)
{
Guard.ArgumentNotNegativeValue(retryCount, "retryCount");
Guard.ArgumentNotNegativeValue(retryInterval.Ticks, "retryInterval");
this.retryCount = retryCount;
this.retryInterval = retryInterval;
}
/// <summary>
/// Returns the corresponding ShouldRetry delegate.
/// </summary>
/// <returns>The ShouldRetry delegate.</returns>
public override ShouldRetry GetShouldRetry()
{
if (this.retryCount == 0)
{
return delegate (int currentRetryCount, Exception lastException, out TimeSpan interval)
{
interval = TimeSpan.Zero;
return false;
};
}
return delegate (int currentRetryCount, Exception lastException, out TimeSpan interval)
{
if (currentRetryCount < this.retryCount)
{
interval = this.retryInterval;
return true;
}
interval = TimeSpan.Zero;
return false;
};
}
}
}

Просмотреть файл

@ -0,0 +1,111 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
using System.Globalization;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Implements the common guard methods.
/// </summary>
static class Guard
{
/// <summary>
/// Checks a string argument to ensure that it isn't null or empty.
/// </summary>
/// <param name="argumentValue">The argument value to check.</param>
/// <param name="argumentName">The name of the argument.</param>
/// <returns>The return value should be ignored. It is intended to be used only when validating arguments during instance creation (for example, when calling the base constructor).</returns>
public static bool ArgumentNotNullOrEmptyString(string argumentValue, string argumentName)
{
ArgumentNotNull(argumentValue, argumentName);
if (argumentValue.Length == 0)
{
throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "String {0} cannot be empty", new object[]
{
argumentName
}));
}
return true;
}
/// <summary>
/// Checks an argument to ensure that it isn't null.
/// </summary>
/// <param name="argumentValue">The argument value to check.</param>
/// <param name="argumentName">The name of the argument.</param>
/// <returns>The return value should be ignored. It is intended to be used only when validating arguments during instance creation (for example, when calling the base constructor).</returns>
public static bool ArgumentNotNull(object argumentValue, string argumentName)
{
if (argumentValue == null)
{
throw new ArgumentNullException(argumentName);
}
return true;
}
/// <summary>
/// Checks an argument to ensure that its 32-bit signed value isn't negative.
/// </summary>
/// <param name="argumentValue">The <see cref="T:System.Int32" /> value of the argument.</param>
/// <param name="argumentName">The name of the argument for diagnostic purposes.</param>
public static void ArgumentNotNegativeValue(int argumentValue, string argumentName)
{
if (argumentValue < 0)
{
throw new ArgumentOutOfRangeException(argumentName, argumentValue, string.Format(CultureInfo.CurrentCulture, "Argument {0} cannot be negative", new object[]
{
argumentName
}));
}
}
/// <summary>
/// Checks an argument to ensure that its 64-bit signed value isn't negative.
/// </summary>
/// <param name="argumentValue">The <see cref="T:System.Int64" /> value of the argument.</param>
/// <param name="argumentName">The name of the argument for diagnostic purposes.</param>
public static void ArgumentNotNegativeValue(long argumentValue, string argumentName)
{
if (argumentValue < 0L)
{
throw new ArgumentOutOfRangeException(argumentName, argumentValue, string.Format(CultureInfo.CurrentCulture, "Argument {0} cannot be negative", new object[]
{
argumentName
}));
}
}
/// <summary>
/// Checks an argument to ensure that its value doesn't exceed the specified ceiling baseline.
/// </summary>
/// <param name="argumentValue">The <see cref="T:System.Double" /> value of the argument.</param>
/// <param name="ceilingValue">The <see cref="T:System.Double" /> ceiling value of the argument.</param>
/// <param name="argumentName">The name of the argument for diagnostic purposes.</param>
public static void ArgumentNotGreaterThan(double argumentValue, double ceilingValue, string argumentName)
{
if (argumentValue > ceilingValue)
{
throw new ArgumentOutOfRangeException(argumentName, argumentValue, string.Format(CultureInfo.CurrentCulture, "Argument {0} cannot be greater than baseline value {1}", new object[]
{
argumentName,
ceilingValue
}));
}
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Defines an interface that must be implemented by custom components responsible for detecting specific transient conditions.
/// </summary>
interface ITransientErrorDetectionStrategy
{
/// <summary>
/// Determines whether the specified exception represents a transient failure that can be compensated by a retry.
/// </summary>
/// <param name="ex">The exception object to be verified.</param>
/// <returns>true if the specified exception is considered as transient; otherwise, false.</returns>
bool IsTransient(Exception ex);
}
}

Просмотреть файл

@ -0,0 +1,82 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// A retry strategy with a specified number of retry attempts and an incremental time interval between retries.
/// </summary>
class Incremental : RetryStrategy
{
readonly int retryCount;
readonly TimeSpan initialInterval;
readonly TimeSpan increment;
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.Incremental" /> class.
/// </summary>
public Incremental() : this(DefaultClientRetryCount, DefaultRetryInterval, DefaultRetryIncrement)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.Incremental" /> class with the specified name and retry settings.
/// </summary>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="initialInterval">The initial interval that will apply for the first retry.</param>
/// <param name="increment">The incremental time value that will be used to calculate the progressive delay between retries.</param>
public Incremental(int retryCount, TimeSpan initialInterval, TimeSpan increment) : this(retryCount, initialInterval, increment, DefaultFirstFastRetry)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.Incremental" /> class with the specified number of retry attempts, time interval, retry strategy, and fast start option.
/// </summary>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="initialInterval">The initial interval that will apply for the first retry.</param>
/// <param name="increment">The incremental time value that will be used to calculate the progressive delay between retries.</param>
/// <param name="firstFastRetry">true to immediately retry in the first attempt; otherwise, false. The subsequent retries will remain subject to the configured retry interval.</param>
public Incremental(int retryCount, TimeSpan initialInterval, TimeSpan increment, bool firstFastRetry) : base(firstFastRetry)
{
Guard.ArgumentNotNegativeValue(retryCount, "retryCount");
Guard.ArgumentNotNegativeValue(initialInterval.Ticks, "initialInterval");
Guard.ArgumentNotNegativeValue(increment.Ticks, "increment");
this.retryCount = retryCount;
this.initialInterval = initialInterval;
this.increment = increment;
}
/// <summary>
/// Returns the corresponding ShouldRetry delegate.
/// </summary>
/// <returns>The ShouldRetry delegate.</returns>
public override ShouldRetry GetShouldRetry()
{
return delegate (int currentRetryCount, Exception lastException, out TimeSpan retryInterval)
{
if (currentRetryCount < this.retryCount)
{
retryInterval = TimeSpan.FromMilliseconds(this.initialInterval.TotalMilliseconds + this.increment.TotalMilliseconds * currentRetryCount);
return true;
}
retryInterval = TimeSpan.Zero;
return false;
};
}
}
}

Просмотреть файл

@ -0,0 +1,61 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// The special type of exception that provides managed exit from a retry loop. The user code can use this
/// exception to notify the retry policy that no further retry attempts are required.
/// </summary>
[Obsolete("You should use cancellation tokens or other means of stoping the retry loop.")]
sealed class RetryLimitExceededException : Exception
{
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryLimitExceededException" /> class with a default error message.
/// </summary>
public RetryLimitExceededException() : this("Retry limit exceeded")
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryLimitExceededException" /> class with a specified error message.
/// </summary>
/// <param name="message">The message that describes the error.</param>
public RetryLimitExceededException(string message) : base(message)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryLimitExceededException" /> class with a reference to the inner exception
/// that is the cause of this exception.
/// </summary>
/// <param name="innerException">The exception that is the cause of the current exception.</param>
public RetryLimitExceededException(Exception innerException) : base((innerException != null) ? innerException.Message : "Retry limit exceeded", innerException)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryLimitExceededException" /> class with a specified error message and inner exception.
/// </summary>
/// <param name="message">The message that describes the error.</param>
/// <param name="innerException">The exception that is the cause of the current exception.</param>
public RetryLimitExceededException(string message, Exception innerException) : base(message, innerException)
{
}
}
}

Просмотреть файл

@ -0,0 +1,307 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Provides the base implementation of the retry mechanism for unreliable actions and transient conditions.
/// </summary>
class RetryPolicy
{
/// <summary>
/// Implements a strategy that ignores any transient errors.
/// </summary>
sealed class TransientErrorIgnoreStrategy : ITransientErrorDetectionStrategy
{
/// <summary>
/// Always returns false.
/// </summary>
/// <param name="ex">The exception.</param>
/// <returns>Always false.</returns>
public bool IsTransient(Exception ex)
{
return false;
}
}
/// <summary>
/// Implements a strategy that treats all exceptions as transient errors.
/// </summary>
sealed class TransientErrorCatchAllStrategy : ITransientErrorDetectionStrategy
{
/// <summary>
/// Always returns true.
/// </summary>
/// <param name="ex">The exception.</param>
/// <returns>Always true.</returns>
public bool IsTransient(Exception ex)
{
return true;
}
}
/// <summary>
/// An instance of a callback delegate that will be invoked whenever a retry condition is encountered.
/// </summary>
public event EventHandler<RetryingEventArgs> Retrying;
/// <summary>
/// Returns a default policy that performs no retries, but invokes the action only once.
/// </summary>
public static RetryPolicy NoRetry { get; } = new RetryPolicy(new RetryPolicy.TransientErrorIgnoreStrategy(), RetryStrategy.NoRetry);
/// <summary>
/// Returns a default policy that implements a fixed retry interval configured with the default <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> retry strategy.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryPolicy DefaultFixed { get; } = new RetryPolicy(new RetryPolicy.TransientErrorCatchAllStrategy(), RetryStrategy.DefaultFixed);
/// <summary>
/// Returns a default policy that implements a progressive retry interval configured with the default <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.Incremental" /> retry strategy.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryPolicy DefaultProgressive { get; } = new RetryPolicy(new RetryPolicy.TransientErrorCatchAllStrategy(), RetryStrategy.DefaultProgressive);
/// <summary>
/// Returns a default policy that implements a random exponential retry interval configured with the default <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.FixedInterval" /> retry strategy.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryPolicy DefaultExponential { get; } = new RetryPolicy(new RetryPolicy.TransientErrorCatchAllStrategy(), RetryStrategy.DefaultExponential);
/// <summary>
/// Gets the retry strategy.
/// </summary>
public RetryStrategy RetryStrategy
{
get;
}
/// <summary>
/// Gets the instance of the error detection strategy.
/// </summary>
public ITransientErrorDetectionStrategy ErrorDetectionStrategy
{
get;
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy" /> class with the specified number of retry attempts and parameters defining the progressive delay between retries.
/// </summary>
/// <param name="errorDetectionStrategy">The <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.ITransientErrorDetectionStrategy" /> that is responsible for detecting transient conditions.</param>
/// <param name="retryStrategy">The strategy to use for this retry policy.</param>
public RetryPolicy(ITransientErrorDetectionStrategy errorDetectionStrategy, RetryStrategy retryStrategy)
{
Guard.ArgumentNotNull(errorDetectionStrategy, "errorDetectionStrategy");
Guard.ArgumentNotNull(retryStrategy, "retryPolicy");
this.ErrorDetectionStrategy = errorDetectionStrategy;
if (errorDetectionStrategy == null)
{
throw new InvalidOperationException("The error detection strategy type must implement the ITransientErrorDetectionStrategy interface.");
}
this.RetryStrategy = retryStrategy;
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy" /> class with the specified number of retry attempts and default fixed time interval between retries.
/// </summary>
/// <param name="errorDetectionStrategy">The <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.ITransientErrorDetectionStrategy" /> that is responsible for detecting transient conditions.</param>
/// <param name="retryCount">The number of retry attempts.</param>
public RetryPolicy(ITransientErrorDetectionStrategy errorDetectionStrategy, int retryCount) : this(errorDetectionStrategy, new FixedInterval(retryCount))
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy" /> class with the specified number of retry attempts and fixed time interval between retries.
/// </summary>
/// <param name="errorDetectionStrategy">The <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.ITransientErrorDetectionStrategy" /> that is responsible for detecting transient conditions.</param>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="retryInterval">The interval between retries.</param>
public RetryPolicy(ITransientErrorDetectionStrategy errorDetectionStrategy, int retryCount, TimeSpan retryInterval) : this(errorDetectionStrategy, new FixedInterval(retryCount, retryInterval))
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy" /> class with the specified number of retry attempts and backoff parameters for calculating the exponential delay between retries.
/// </summary>
/// <param name="errorDetectionStrategy">The <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.ITransientErrorDetectionStrategy" /> that is responsible for detecting transient conditions.</param>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="minBackoff">The minimum backoff time.</param>
/// <param name="maxBackoff">The maximum backoff time.</param>
/// <param name="deltaBackoff">The time value that will be used to calculate a random delta in the exponential delay between retries.</param>
public RetryPolicy(ITransientErrorDetectionStrategy errorDetectionStrategy, int retryCount, TimeSpan minBackoff, TimeSpan maxBackoff, TimeSpan deltaBackoff) : this(errorDetectionStrategy, new ExponentialBackoff(retryCount, minBackoff, maxBackoff, deltaBackoff))
{
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy" /> class with the specified number of retry attempts and parameters defining the progressive delay between retries.
/// </summary>
/// <param name="errorDetectionStrategy">The <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.ITransientErrorDetectionStrategy" /> that is responsible for detecting transient conditions.</param>
/// <param name="retryCount">The number of retry attempts.</param>
/// <param name="initialInterval">The initial interval that will apply for the first retry.</param>
/// <param name="increment">The incremental time value that will be used to calculate the progressive delay between retries.</param>
public RetryPolicy(ITransientErrorDetectionStrategy errorDetectionStrategy, int retryCount, TimeSpan initialInterval, TimeSpan increment) : this(errorDetectionStrategy, new Incremental(retryCount, initialInterval, increment))
{
}
/// <summary>
/// Repetitively executes the specified action while it satisfies the current retry policy.
/// </summary>
/// <param name="action">A delegate that represents the executable action that doesn't return any results.</param>
public virtual void ExecuteAction(Action action)
{
Guard.ArgumentNotNull(action, "action");
this.ExecuteAction<object>(delegate
{
action();
return null;
});
}
/// <summary>
/// Repetitively executes the specified action while it satisfies the current retry policy.
/// </summary>
/// <typeparam name="TResult">The type of result expected from the executable action.</typeparam>
/// <param name="func">A delegate that represents the executable action that returns the result of type <typeparamref name="TResult" />.</param>
/// <returns>The result from the action.</returns>
public virtual TResult ExecuteAction<TResult>(Func<TResult> func)
{
Guard.ArgumentNotNull(func, "func");
int num = 0;
ShouldRetry shouldRetry = this.RetryStrategy.GetShouldRetry();
TResult result;
while (true)
{
Exception ex;
TimeSpan zero;
try
{
result = func();
break;
}
#pragma warning disable CS0618 // Type or member is obsolete
catch (RetryLimitExceededException ex2)
#pragma warning restore CS0618 // Type or member is obsolete
{
if (ex2.InnerException != null)
{
throw ex2.InnerException;
}
result = default(TResult);
break;
}
catch (Exception ex3)
{
ex = ex3;
if (!this.ErrorDetectionStrategy.IsTransient(ex) || !shouldRetry(num++, ex, out zero))
{
throw;
}
}
if (zero.TotalMilliseconds < 0.0)
{
zero = TimeSpan.Zero;
}
this.OnRetrying(num, ex, zero);
if (num > 1 || !this.RetryStrategy.FastFirstRetry)
{
Task.Delay(zero).Wait();
}
}
return result;
}
/// <summary>
/// Repetitively executes the specified asynchronous task while it satisfies the current retry policy.
/// </summary>
/// <param name="taskAction">A function that returns a started task (also known as "hot" task).</param>
/// <returns>
/// A task that will run to completion if the original task completes successfully (either the
/// first time or after retrying transient failures). If the task fails with a non-transient error or
/// the retry limit is reached, the returned task will transition to a faulted state and the exception must be observed.
/// </returns>
public Task ExecuteAsync(Func<Task> taskAction)
{
return this.ExecuteAsync(taskAction, default(CancellationToken));
}
/// <summary>
/// Repetitively executes the specified asynchronous task while it satisfies the current retry policy.
/// </summary>
/// <param name="taskAction">A function that returns a started task (also known as "hot" task).</param>
/// <param name="cancellationToken">The token used to cancel the retry operation. This token does not cancel the execution of the asynchronous task.</param>
/// <returns>
/// Returns a task that will run to completion if the original task completes successfully (either the
/// first time or after retrying transient failures). If the task fails with a non-transient error or
/// the retry limit is reached, the returned task will transition to a faulted state and the exception must be observed.
/// </returns>
public Task ExecuteAsync(Func<Task> taskAction, CancellationToken cancellationToken)
{
if (taskAction == null)
{
throw new ArgumentNullException(nameof(taskAction));
}
return new AsyncExecution(taskAction, this.RetryStrategy.GetShouldRetry(), new Func<Exception, bool>(this.ErrorDetectionStrategy.IsTransient), new Action<int, Exception, TimeSpan>(this.OnRetrying), this.RetryStrategy.FastFirstRetry, cancellationToken).ExecuteAsync();
}
/// <summary>
/// Repeatedly executes the specified asynchronous task while it satisfies the current retry policy.
/// </summary>
/// <param name="taskFunc">A function that returns a started task (also known as "hot" task).</param>
/// <returns>
/// Returns a task that will run to completion if the original task completes successfully (either the
/// first time or after retrying transient failures). If the task fails with a non-transient error or
/// the retry limit is reached, the returned task will transition to a faulted state and the exception must be observed.
/// </returns>
public Task<TResult> ExecuteAsync<TResult>(Func<Task<TResult>> taskFunc)
{
return this.ExecuteAsync(taskFunc, default(CancellationToken));
}
/// <summary>
/// Repeatedly executes the specified asynchronous task while it satisfies the current retry policy.
/// </summary>
/// <param name="taskFunc">A function that returns a started task (also known as "hot" task).</param>
/// <param name="cancellationToken">The token used to cancel the retry operation. This token does not cancel the execution of the asynchronous task.</param>
/// <returns>
/// Returns a task that will run to completion if the original task completes successfully (either the
/// first time or after retrying transient failures). If the task fails with a non-transient error or
/// the retry limit is reached, the returned task will transition to a faulted state and the exception must be observed.
/// </returns>
public Task<TResult> ExecuteAsync<TResult>(Func<Task<TResult>> taskFunc, CancellationToken cancellationToken)
{
if (taskFunc == null)
{
throw new ArgumentNullException(nameof(taskFunc));
}
return new AsyncExecution<TResult>(taskFunc, this.RetryStrategy.GetShouldRetry(), new Func<Exception, bool>(this.ErrorDetectionStrategy.IsTransient), new Action<int, Exception, TimeSpan>(this.OnRetrying), this.RetryStrategy.FastFirstRetry, cancellationToken).ExecuteAsync();
}
/// <summary>
/// Notifies the subscribers whenever a retry condition is encountered.
/// </summary>
/// <param name="retryCount">The current retry attempt count.</param>
/// <param name="lastError">The exception that caused the retry conditions to occur.</param>
/// <param name="delay">The delay that indicates how long the current thread will be suspended before the next iteration is invoked.</param>
protected virtual void OnRetrying(int retryCount, Exception lastError, TimeSpan delay)
{
this.Retrying?.Invoke(this, new RetryingEventArgs(retryCount, lastError));
}
}
}

Просмотреть файл

@ -0,0 +1,111 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
using System;
/// <summary>
/// Represents a retry strategy that determines the number of retry attempts and the interval between retries.
/// </summary>
abstract class RetryStrategy
{
/// <summary>
/// Represents the default number of retry attempts.
/// </summary>
public static readonly int DefaultClientRetryCount = 10;
/// <summary>
/// Represents the default amount of time used when calculating a random delta in the exponential delay between retries.
/// </summary>
public static readonly TimeSpan DefaultClientBackoff = TimeSpan.FromSeconds(10.0);
/// <summary>
/// Represents the default maximum amount of time used when calculating the exponential delay between retries.
/// </summary>
public static readonly TimeSpan DefaultMaxBackoff = TimeSpan.FromSeconds(30.0);
/// <summary>
/// Represents the default minimum amount of time used when calculating the exponential delay between retries.
/// </summary>
public static readonly TimeSpan DefaultMinBackoff = TimeSpan.FromSeconds(1.0);
/// <summary>
/// Represents the default interval between retries.
/// </summary>
public static readonly TimeSpan DefaultRetryInterval = TimeSpan.FromSeconds(1.0);
/// <summary>
/// Represents the default time increment between retry attempts in the progressive delay policy.
/// </summary>
public static readonly TimeSpan DefaultRetryIncrement = TimeSpan.FromSeconds(1.0);
/// <summary>
/// Represents the default flag indicating whether the first retry attempt will be made immediately,
/// whereas subsequent retries will remain subject to the retry interval.
/// </summary>
public static readonly bool DefaultFirstFastRetry = true;
/// <summary>
/// Returns a default policy that performs no retries, but invokes the action only once.
/// </summary>
public static RetryStrategy NoRetry { get; } = new FixedInterval(0, DefaultRetryInterval);
/// <summary>
/// Returns a default policy that implements a fixed retry interval configured with the <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultClientRetryCount" /> and <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultRetryInterval" /> parameters.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryStrategy DefaultFixed { get; } = new FixedInterval(DefaultClientRetryCount, DefaultRetryInterval);
/// <summary>
/// Returns a default policy that implements a progressive retry interval configured with the <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultClientRetryCount" />, <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultRetryInterval" />, and <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultRetryIncrement" /> parameters.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryStrategy DefaultProgressive { get; } = new Incremental(DefaultClientRetryCount, DefaultRetryInterval, DefaultRetryIncrement);
/// <summary>
/// Returns a default policy that implements a random exponential retry interval configured with the <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultClientRetryCount" />, <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultMinBackoff" />, <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultMaxBackoff" />, and <see cref="F:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy.DefaultClientBackoff" /> parameters.
/// The default retry policy treats all caught exceptions as transient errors.
/// </summary>
public static RetryStrategy DefaultExponential { get; } = new ExponentialBackoff(DefaultClientRetryCount, DefaultMinBackoff, DefaultMaxBackoff, DefaultClientBackoff);
/// <summary>
/// Gets or sets a value indicating whether the first retry attempt will be made immediately,
/// whereas subsequent retries will remain subject to the retry interval.
/// </summary>
public bool FastFirstRetry
{
get;
set;
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryStrategy" /> class.
/// </summary>
/// <param name="firstFastRetry">true to immediately retry in the first attempt; otherwise, false. The subsequent retries will remain subject to the configured retry interval.</param>
protected RetryStrategy(bool firstFastRetry)
{
this.FastFirstRetry = firstFastRetry;
}
/// <summary>
/// Returns the corresponding ShouldRetry delegate.
/// </summary>
/// <returns>The ShouldRetry delegate.</returns>
public abstract ShouldRetry GetShouldRetry();
}
}

Просмотреть файл

@ -0,0 +1,56 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Contains information that is required for the <see cref="E:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryPolicy.Retrying" /> event.
/// </summary>
class RetryingEventArgs : EventArgs
{
/// <summary>
/// Gets the current retry count.
/// </summary>
public int CurrentRetryCount
{
get;
set;
}
/// <summary>
/// Gets the exception that caused the retry conditions to occur.
/// </summary>
public Exception LastException
{
get;
set;
}
/// <summary>
/// Initializes a new instance of the <see cref="T:Microsoft.Azure.WebJobs.Extensions.EdgeHub.RetryingEventArgs" /> class.
/// </summary>
/// <param name="currentRetryCount">The current retry attempt count.</param>
/// <param name="lastException">The exception that caused the retry conditions to occur.</param>
public RetryingEventArgs(int currentRetryCount, Exception lastException)
{
Guard.ArgumentNotNull(lastException, "lastException");
this.CurrentRetryCount = currentRetryCount;
this.LastException = lastException;
}
}
}

Просмотреть файл

@ -0,0 +1,29 @@
//Copyright(c) Microsoft.All rights reserved.
//Microsoft would like to thank its contributors, a list
//of whom are at http://aka.ms/entlib-contributors
//Licensed under the Apache License, Version 2.0 (the "License"); you
//may not use this file except in compliance with the License. You may
//obtain a copy of the License at
//http://www.apache.org/licenses/LICENSE-2.0
//Unless required by applicable law or agreed to in writing, software
//distributed under the License is distributed on an "AS IS" BASIS,
//WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
//implied. See the License for the specific language governing permissions
//and limitations under the License.
using System;
namespace Microsoft.Azure.WebJobs.Extensions.EdgeHub
{
/// <summary>
/// Defines a callback delegate that will be invoked whenever a retry condition is encountered.
/// </summary>
/// <param name="retryCount">The current retry attempt count.</param>
/// <param name="lastException">The exception that caused the retry conditions to occur.</param>
/// <param name="delay">The delay that indicates how long the current thread will be suspended before the next iteration is invoked.</param>
/// <returns><see langword="true" /> if a retry is allowed; otherwise, <see langword="false" />.</returns>
delegate bool ShouldRetry(int retryCount, Exception lastException, out TimeSpan delay);
}

Просмотреть файл

@ -1,53 +0,0 @@
#r "Microsoft.Azure.Devices.Client"
#r "Newtonsoft.Json"
using System.IO;
using Microsoft.Azure.Devices.Client;
using Newtonsoft.Json;
// Filter messages based on the temperature value in the body of the message and the temperature threshold value.
public static async Task Run(Message messageReceived, IAsyncCollector<Message> output, TraceWriter log)
{
const int temperatureThreshold = 19;
byte[] messageBytes = messageReceived.GetBytes();
var messageString = System.Text.Encoding.UTF8.GetString(messageBytes);
if (!string.IsNullOrEmpty(messageString))
{
// Get the body of the message and deserialize it.
var messageBody = JsonConvert.DeserializeObject<MessageBody>(messageString);
if (messageBody != null && messageBody.machine.temperature > temperatureThreshold)
{
// Send the message to the output as the temperature value is greater than the threashold.
var filteredMessage = new Message(messageBytes);
// Copy the properties of the original message into the new Message object.
foreach (KeyValuePair<string, string> prop in messageReceived.Properties)
{
filteredMessage.Properties.Add(prop.Key, prop.Value); }
// Add a new property to the message to indicate it is an alert.
filteredMessage.Properties.Add("MessageType", "Alert");
// Send the message.
await output.AddAsync(filteredMessage);
log.Info("Received and transferred a message with temperature above the threshold");
}
}
}
//Define the expected schema for the body of incoming messages.
class MessageBody
{
public Machine machine {get; set;}
public Ambient ambient {get; set;}
public string timeCreated {get; set;}
}
class Machine
{
public double temperature {get; set;}
public double pressure {get; set;}
}
class Ambient
{
public double temperature {get; set;}
public int humidity {get; set;}
}

Просмотреть файл

@ -0,0 +1,64 @@
// Copyright (c) Microsoft. All rights reserved.
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.Devices.Client;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.EdgeHub;
using Newtonsoft.Json;
namespace Functions.Samples
{
public static class EdgeHubSamples
{
[FunctionName("EdgeHubTrigger-CSharp")]
public static async Task FilterMessageAndSendMessage(
[EdgeHubTrigger("input1")] Message messageReceived,
[EdgeHub(OutputName = "output1")] IAsyncCollector<Message> output)
{
const int defaultTemperatureThreshold = 19;
byte[] messageBytes = messageReceived.GetBytes();
var messageString = System.Text.Encoding.UTF8.GetString(messageBytes);
// Get message body, containing the Temperature data
var messageBody = JsonConvert.DeserializeObject<MessageBody>(messageString);
if (messageBody != null && messageBody.Machine.Temperature > defaultTemperatureThreshold)
{
var filteredMessage = new Message(messageBytes);
foreach (KeyValuePair<string, string> prop in messageReceived.Properties)
{
filteredMessage.Properties.Add(prop.Key, prop.Value);
}
filteredMessage.Properties.Add("MessageType", "Alert");
await output.AddAsync(filteredMessage).ConfigureAwait(false);
}
}
public class MessageBody
{
public Machine Machine { get; set; }
public Ambient Ambient { get; set; }
public DateTime TimeCreated { get; set; }
}
public class Machine
{
public double Temperature { get; set; }
public double Pressure { get; set; }
}
public class Ambient
{
public double Temperature { get; set; }
public int Humidity { get; set; }
}
}
}

Просмотреть файл

@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<AzureFunctionsVersion></AzureFunctionsVersion>
<OutputType>Library</OutputType>
<ApplicationIcon />
<StartupObject />
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Azure.WebJobs.Extensions" Version="3.0.0-beta7-10655" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.1.1" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\binding\src\Microsoft.Azure.WebJobs.Extensions.EdgeHub\Microsoft.Azure.WebJobs.Extensions.EdgeHub.csproj" />
</ItemGroup>
</Project>

Просмотреть файл

@ -1,4 +1,5 @@
{
"configurationSource": "attributes",
"bindings": [
{
"type": "edgeHubTrigger",
@ -13,5 +14,8 @@
"batchSize": 10,
"direction": "inout"
}
]
],
"disabled": true,
"scriptFile": "bin/EdgeHubTriggerCSharp.dll",
"entryPoint": "Functions.Samples.EdgeHubSamples.FilterMessageAndSendMessage"
}

Просмотреть файл

@ -1,9 +1,9 @@
{
"extensions" : [
"extensions": [
{
"Name": "EdgeHubTrigger",
"TypeName": "Microsoft.Azure.Devices.Edge.Functions.Binding.Config.EdgeHubExtensionConfigProvider, Microsoft.Azure.Devices.Edge.Functions.BindingExtension, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"HintPath": "/app/bindings/Microsoft.Azure.Devices.Edge.Functions.BindingExtension.dll"
"TypeName": "Microsoft.Azure.WebJobs.Extensions.EdgeHub.EdgeHubWebJobsStartup, Microsoft.Azure.WebJobs.Extensions.EdgeHub, Version=1.0.0.0, Culture=neutral, PublicKeyToken=null",
"HintPath": "/app/EdgeHubTrigger-Csharp/bin/Microsoft.Azure.WebJobs.Extensions.EdgeHub.dll"
}
]
}

Просмотреть файл

@ -1,6 +1,7 @@
ARG buildno
FROM edgebuilds.azurecr.io/microsoft/azureiotedge-functions-binding:${buildno}-linux-amd64
FROM mcr.microsoft.com/azure-functions/dotnet:2.0
ENV AzureWebJobsScriptRoot=/app
COPY . /app

Просмотреть файл

@ -1,4 +1,8 @@
{
"version": "2.0",
"logger": {
"fileLoggingMode": "always"
},
"watchDirectories": [ "Shared" ],
"functionTimeout": "00:05:00",
"functions": [ "EdgeHubTrigger-Csharp" ]

Просмотреть файл

@ -4,5 +4,6 @@
<add key="nuget.org" value="https://api.nuget.org/v3/index.json" />
<add key="AzureAppService" value="https://www.myget.org/F/azure-appservice/api/v2" />
<add key="MyGetDeviceSdk" value="https://www.myget.org/F/aziot-device-sdk/api/v3/index.json" />
<add key="MyGetEdgeExtension" value="https://www.myget.org/F/aziot-edge-extension/api/v3/index.json" />
</packageSources>
</configuration>

Просмотреть файл

@ -182,6 +182,13 @@ publish_quickstart()
.
}
publish_functions_sample()
{
echo "Publishing Functions sample"
publish_files $FUNCTIONS_SAMPLE_DIR $PUBLISH_FOLDER
publish_project library "EdgeHubTriggerCSharp" netstandard2.0 $CONFIGURATION "$PUBLISH_FOLDER/samples/EdgeHubTrigger-Csharp/bin" $MSBUILD_OPTIONS
}
process_args "$@"
@ -196,14 +203,13 @@ publish_app "TemperatureFilter"
publish_app "load-gen"
publish_app "MessagesAnalyzer"
publish_lib "Microsoft.Azure.Devices.Edge.Functions.Binding"
publish_lib "Microsoft.Azure.WebJobs.Extensions.EdgeHub"
publish_files $SRC_DOCKER_DIR $PUBLISH_FOLDER
publish_files $SRC_SCRIPTS_DIR $PUBLISH_FOLDER
publish_files $SRC_BIN_DIR $PUBLISH_FOLDER
publish_files $SRC_STRESS_DIR $PUBLISH_FOLDER
publish_files $SRC_E2E_TEMPLATES_DIR $PUBLISH_FOLDER
publish_files $FUNCTIONS_SAMPLE_DIR $PUBLISH_FOLDER
if [ $PUBLISH_TESTS -eq 1 ]; then
while read proj; do
@ -216,5 +222,6 @@ fi
publish_quickstart linux-arm
publish_quickstart linux-x64
publish_functions_sample
exit $RES

Просмотреть файл

@ -39,7 +39,6 @@ $Images = @{
"agent" = @("Microsoft.Azure.Devices.Edge.Agent.Service", $Agent)
"hub" = @("Microsoft.Azure.Devices.Edge.Hub.Service", $Hub)
"simulated-temperature-sensor" = @("SimulatedTemperatureSensor", $SimulatedTemperatureSensor)
"functions-binding" = @("Microsoft.Azure.Devices.Edge.Functions.Binding", $FunctionsBinding)
}
foreach ($Image in $Images.GetEnumerator()) {