issue#1: Access to the ContainerHost link creation

This commit is contained in:
xinchen 2015-08-14 16:51:09 -07:00
Родитель 86cd93eb93
Коммит d3324b946e
22 изменённых файлов: 920 добавлений и 121 удалений

Просмотреть файл

@ -0,0 +1,62 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{E8D670D3-B1B4-45CA-9440-9630529599B2}</ProjectGuid>
<OutputType>Exe</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Listener.ContainerHost</RootNamespace>
<AssemblyName>Listener.ContainerHost</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<IntermediateOutputPath>..\..\..\obj\$(Configuration)\$(MSBuildProjectName)\</IntermediateOutputPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>..\..\..\bin\$(Configuration)\$(MSBuildProjectName)\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>..\..\..\bin\$(Configuration)\$(MSBuildProjectName)\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<ItemGroup>
<Reference Include="System" />
<Reference Include="System.Core" />
<Reference Include="System.Xml.Linq" />
<Reference Include="System.Data.DataSetExtensions" />
<Reference Include="Microsoft.CSharp" />
<Reference Include="System.Data" />
<Reference Include="System.Xml" />
</ItemGroup>
<ItemGroup>
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Amqp.Net.csproj">
<Project>{92153715-1d99-43b1-b291-470cf91a156d}</Project>
<Name>Amqp.Net</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,122 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Listener.ContainerHost
{
using Amqp;
using Amqp.Framing;
using Amqp.Listener;
using Amqp.Types;
using System;
using System.Threading.Tasks;
class Program
{
static void Main(string[] args)
{
string address = "amqp://guest:guest@127.0.0.1:5672";
if (args.Length > 0)
{
address = args[0];
}
// uncomment the following to write frame traces
//Trace.TraceLevel = TraceLevel.Frame;
//Trace.TraceListener = (f, a) => Console.WriteLine(DateTime.Now.ToString("[hh:ss.fff]") + " " + string.Format(f, a));
Uri addressUri = new Uri(address);
ContainerHost host = new ContainerHost(new Uri[] { addressUri }, null, addressUri.UserInfo);
host.Open();
Console.WriteLine("Container host is listening on {0}:{1}", addressUri.Host, addressUri.Port);
host.RegisterLinkProcessor(new LinkProcessor());
Console.WriteLine("Link processor is registered");
Console.WriteLine("Press enter key to exist...");
Console.ReadLine();
host.Close();
}
class LinkProcessor : ILinkProcessor
{
public void Process(AttachContext attachContext)
{
// start a task to process this request
var task = this.ProcessAsync(attachContext);
}
async Task ProcessAsync(AttachContext attachContext)
{
// simulating an async operation required to complete the task
await Task.Delay(100);
if (attachContext.Attach.LinkName == "")
{
// how to fail the attach request
attachContext.Complete(new Error() { Condition = ErrorCode.InvalidField, Description = "Empty link name not allowed." });
}
else if (attachContext.Link.Role)
{
attachContext.Complete(new IncomingLinkEndpoint(), 300);
}
else
{
attachContext.Complete(new OutgoingLinkEndpoint(), 0);
}
}
}
class IncomingLinkEndpoint : LinkEndpoint
{
public override void OnMessage(MessageContext messageContext)
{
// this can also be done when an async operation, if required, is done
messageContext.Complete();
}
public override void OnFlow(FlowContext flowContext)
{
}
public override void OnDisposition(DispositionContext dispositionContext)
{
}
}
class OutgoingLinkEndpoint : LinkEndpoint
{
public override void OnFlow(FlowContext flowContext)
{
for (int i = 0; i < flowContext.Messages; i++)
{
var message = new Message("Hello!");
message.Properties = new Properties() { Subject = "Welcome Message" };
flowContext.Link.SendMessage(message, null);
}
}
public override void OnDisposition(DispositionContext dispositionContext)
{
if (!dispositionContext.Settled)
{
dispositionContext.Link.DisposeMessage(dispositionContext.Message, new Accepted(), true);
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,50 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Listener.ContainerHost")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Listener.ContainerHost")]
[assembly: AssemblyCopyright("Copyright © 2015")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -0,0 +1,59 @@
<?xml version="1.0" encoding="utf-8"?>
<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<Import Project="$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props" Condition="Exists('$(MSBuildExtensionsPath)\$(MSBuildToolsVersion)\Microsoft.Common.props')" />
<PropertyGroup>
<Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
<Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
<ProjectGuid>{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}</ProjectGuid>
<OutputType>Library</OutputType>
<AppDesignerFolder>Properties</AppDesignerFolder>
<RootNamespace>Listener.IContainer</RootNamespace>
<AssemblyName>Listener.IContainer</AssemblyName>
<TargetFrameworkVersion>v4.5</TargetFrameworkVersion>
<FileAlignment>512</FileAlignment>
<IntermediateOutputPath>..\..\..\obj\$(Configuration)\$(MSBuildProjectName)\</IntermediateOutputPath>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugSymbols>true</DebugSymbols>
<DebugType>full</DebugType>
<Optimize>false</Optimize>
<OutputPath>..\..\..\bin\$(Configuration)\$(MSBuildProjectName)\</OutputPath>
<DefineConstants>DEBUG;TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
<PlatformTarget>AnyCPU</PlatformTarget>
<DebugType>pdbonly</DebugType>
<Optimize>true</Optimize>
<OutputPath>..\..\..\bin\$(Configuration)\$(MSBuildProjectName)\</OutputPath>
<DefineConstants>TRACE</DefineConstants>
<ErrorReport>prompt</ErrorReport>
<WarningLevel>4</WarningLevel>
</PropertyGroup>
<PropertyGroup>
<StartupObject />
</PropertyGroup>
<ItemGroup>
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestAmqpBroker.cs" />
</ItemGroup>
<ItemGroup>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Amqp.Net.csproj">
<Project>{92153715-1d99-43b1-b291-470cf91a156d}</Project>
<Name>Amqp.Net</Name>
</ProjectReference>
</ItemGroup>
<Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
<!-- To modify your build process, add your task inside one of the targets below and uncomment it.
Other similar extension points exist, see Microsoft.Common.targets.
<Target Name="BeforeBuild">
</Target>
<Target Name="AfterBuild">
</Target>
-->
</Project>

Просмотреть файл

@ -0,0 +1,50 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
using System.Reflection;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
// General Information about an assembly is controlled through the following
// set of attributes. Change these attribute values to modify the information
// associated with an assembly.
[assembly: AssemblyTitle("Listener.IContainer")]
[assembly: AssemblyDescription("")]
[assembly: AssemblyConfiguration("")]
[assembly: AssemblyCompany("")]
[assembly: AssemblyProduct("Listener.IContainer")]
[assembly: AssemblyCopyright("Copyright © 2014")]
[assembly: AssemblyTrademark("")]
[assembly: AssemblyCulture("")]
// Setting ComVisible to false makes the types in this assembly not visible
// to COM components. If you need to access a type in this assembly from
// COM, set the ComVisible attribute to true on that type.
[assembly: ComVisible(false)]
// Version information for an assembly consists of the following four values:
//
// Major Version
// Minor Version
// Build Number
// Revision
//
// You can specify all the values or you can default the Build and Revision Numbers
// by using the '*' as shown below:
// [assembly: AssemblyVersion("1.0.*")]
[assembly: AssemblyVersion("1.0.0.0")]
[assembly: AssemblyFileVersion("1.0.0.0")]

Просмотреть файл

@ -15,7 +15,7 @@
// limitations under the License. // limitations under the License.
// ------------------------------------------------------------------------------------ // ------------------------------------------------------------------------------------
namespace TestAmqpBroker namespace Listener.IContainer
{ {
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;

Просмотреть файл

@ -36,8 +36,6 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Drain", "Examples\I
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Spout", "Examples\Interop\Interop.Spout\Interop.Spout.csproj", "{68470436-2F0D-4A76-B1E6-9C8517844C3E}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Spout", "Examples\Interop\Interop.Spout\Interop.Spout.csproj", "{68470436-2F0D-4A76-B1E6-9C8517844C3E}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestAmqpBroker", "test\TestAmqpBroker\TestAmqpBroker.csproj", "{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Server", "Examples\Interop\Interop.Server\Interop.Server.csproj", "{09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Server", "Examples\Interop\Interop.Server\Interop.Server.csproj", "{09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}"
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Client", "Examples\Interop\Interop.Client\Interop.Client.csproj", "{C272B10E-36BE-4F36-A128-331F715514E6}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Interop.Client", "Examples\Interop\Interop.Client\Interop.Client.csproj", "{C272B10E-36BE-4F36-A128-331F715514E6}"
@ -72,6 +70,14 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PeerToPeer.Certificate", "E
EndProject EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBus.MessageSession", "Examples\ServiceBus\ServiceBus.MessageSession\ServiceBus.MessageSession.csproj", "{D1E69C74-4A33-4079-9665-8818A8479FFB}" Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ServiceBus.MessageSession", "Examples\ServiceBus\ServiceBus.MessageSession\ServiceBus.MessageSession.csproj", "{D1E69C74-4A33-4079-9665-8818A8479FFB}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Listener", "Listener", "{89F13102-4826-4406-844B-7B21C47885E7}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Listener.ContainerHost", "Examples\Listener\Listener.ContainerHost\Listener.ContainerHost.csproj", "{E8D670D3-B1B4-45CA-9440-9630529599B2}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Listener.IContainer", "Examples\Listener\Listener.IContainer\Listener.IContainer.csproj", "{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestAmqpBroker", "test\TestAmqpBroker\TestAmqpBroker.csproj", "{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@ -220,16 +226,6 @@ Global
{68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|ARM.ActiveCfg = Release|Any CPU {68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|ARM.ActiveCfg = Release|Any CPU
{68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|x64.ActiveCfg = Release|Any CPU {68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|x64.ActiveCfg = Release|Any CPU
{68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|x86.ActiveCfg = Release|Any CPU {68470436-2F0D-4A76-B1E6-9C8517844C3E}.Release|x86.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|ARM.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|x64.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|x86.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|Any CPU.Build.0 = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|ARM.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|x64.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|x86.ActiveCfg = Release|Any CPU
{09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|Any CPU.Build.0 = Debug|Any CPU {09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|ARM.ActiveCfg = Debug|Any CPU {09975981-F2D3-43AE-B70D-F4DEE7B0B0D2}.Debug|ARM.ActiveCfg = Debug|Any CPU
@ -392,6 +388,36 @@ Global
{D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|ARM.ActiveCfg = Release|Any CPU {D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|ARM.ActiveCfg = Release|Any CPU
{D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|x64.ActiveCfg = Release|Any CPU {D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|x64.ActiveCfg = Release|Any CPU
{D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|x86.ActiveCfg = Release|Any CPU {D1E69C74-4A33-4079-9665-8818A8479FFB}.Release|x86.ActiveCfg = Release|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Debug|ARM.ActiveCfg = Debug|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Debug|x64.ActiveCfg = Debug|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Debug|x86.ActiveCfg = Debug|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Release|Any CPU.Build.0 = Release|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Release|ARM.ActiveCfg = Release|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Release|x64.ActiveCfg = Release|Any CPU
{E8D670D3-B1B4-45CA-9440-9630529599B2}.Release|x86.ActiveCfg = Release|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Debug|ARM.ActiveCfg = Debug|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Debug|x64.ActiveCfg = Debug|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Debug|x86.ActiveCfg = Debug|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Release|Any CPU.Build.0 = Release|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Release|ARM.ActiveCfg = Release|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Release|x64.ActiveCfg = Release|Any CPU
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4}.Release|x86.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|ARM.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|x64.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Debug|x86.ActiveCfg = Debug|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|Any CPU.Build.0 = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|ARM.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|x64.ActiveCfg = Release|Any CPU
{78A9C0DF-1999-4D97-AE22-50ADC44F9A7B}.Release|x86.ActiveCfg = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@ -414,5 +440,8 @@ Global
{D6E83FC7-0D39-48AB-8DFA-FE89011CDD3C} = {BF554E12-7096-465F-B1D0-9ACFF00877F2} {D6E83FC7-0D39-48AB-8DFA-FE89011CDD3C} = {BF554E12-7096-465F-B1D0-9ACFF00877F2}
{E55CB1DD-2F8E-4052-A119-6F3FD9ED10BE} = {7FF80B2D-4158-41D3-BC87-60C018028F53} {E55CB1DD-2F8E-4052-A119-6F3FD9ED10BE} = {7FF80B2D-4158-41D3-BC87-60C018028F53}
{D1E69C74-4A33-4079-9665-8818A8479FFB} = {4A93A5B6-B935-4F86-B3DE-95B0F6EBEA56} {D1E69C74-4A33-4079-9665-8818A8479FFB} = {4A93A5B6-B935-4F86-B3DE-95B0F6EBEA56}
{89F13102-4826-4406-844B-7B21C47885E7} = {8F701234-EC60-485C-BCDB-8EE233246832}
{E8D670D3-B1B4-45CA-9440-9630529599B2} = {89F13102-4826-4406-844B-7B21C47885E7}
{4B7BFDB9-406A-4EBE-93C3-5D40D6A986F4} = {89F13102-4826-4406-844B-7B21C47885E7}
EndGlobalSection EndGlobalSection
EndGlobal EndGlobal

Просмотреть файл

@ -46,6 +46,15 @@
<Compile Include="ByteBuffer.cs" /> <Compile Include="ByteBuffer.cs" />
<Compile Include="Connection.cs" /> <Compile Include="Connection.cs" />
<Compile Include="Listener\ContainerHost.cs" /> <Compile Include="Listener\ContainerHost.cs" />
<Compile Include="Listener\AttachContext.cs" />
<Compile Include="Listener\FlowContext.cs" />
<Compile Include="Listener\ILinkProcessor.cs" />
<Compile Include="Listener\LinkEndpoint.cs" />
<Compile Include="Listener\DispositionContext.cs" />
<Compile Include="Listener\RequestContext.cs" />
<Compile Include="Listener\MessageContext.cs" />
<Compile Include="Listener\IRequestProcessor.cs" />
<Compile Include="Listener\IMessageProcessor.cs" />
<Compile Include="Listener\Context.cs" /> <Compile Include="Listener\Context.cs" />
<Compile Include="Listener\ListenerLink.cs" /> <Compile Include="Listener\ListenerLink.cs" />
<Compile Include="Net\TcpSettings.cs" /> <Compile Include="Net\TcpSettings.cs" />

Просмотреть файл

@ -0,0 +1,71 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using Amqp.Framing;
/// <summary>
/// Provides the context to an attach processor to process the received performative.
/// </summary>
public class AttachContext
{
internal AttachContext(ListenerLink link, Attach attach)
{
this.Link = link;
this.Attach = attach;
}
/// <summary>
/// Gets the link associated with the context.
/// </summary>
public ListenerLink Link
{
get;
private set;
}
/// <summary>
/// Gets the attach performative associated with the context.
/// </summary>
public Attach Attach
{
get;
private set;
}
/// <summary>
/// Completes the processing of the attach performative with success.
/// </summary>
/// <param name="linkEndpoint">The attached link endpoint.</param>
/// <param name="initialCredit">The initial credit to send to peer for a receiving link endpoint. It is ignored for a sending endpoint.</param>
public void Complete(LinkEndpoint linkEndpoint, int initialCredit)
{
this.Link.InitializeLinkEndpoint(linkEndpoint, (uint)initialCredit);
this.Link.CompleteAttach(this.Attach, null);
}
/// <summary>
/// Completes the processing of the attach performative with an error.
/// </summary>
/// <param name="error">The error to be sent to the remote peer.</param>
public void Complete(Error error)
{
this.Link.CompleteAttach(this.Attach, error);
}
}
}

Просмотреть файл

@ -35,6 +35,7 @@ namespace Amqp.Listener
readonly ConnectionListener[] listeners; readonly ConnectionListener[] listeners;
readonly Dictionary<string, MessageProcessor> messageProcessors; readonly Dictionary<string, MessageProcessor> messageProcessors;
readonly Dictionary<string, RequestProcessor> requestProcessors; readonly Dictionary<string, RequestProcessor> requestProcessors;
ILinkProcessor linkProcessor;
/// <summary> /// <summary>
/// Initializes a container host object with one address. /// Initializes a container host object with one address.
@ -100,6 +101,23 @@ namespace Amqp.Listener
} }
} }
/// <summary>
/// Registers a link process to handle received attach performatives. The registered message
/// processor(s) and request processor(s) are checked first. If a processor matches the address
/// on the received attach performative, it is invoked to handle the command. Otherwise, the
/// registered link processor is invoked to handle the command.
/// </summary>
/// <param name="linkProcessor"></param>
public void RegisterLinkProcessor(ILinkProcessor linkProcessor)
{
if (this.linkProcessor != null)
{
throw new AmqpException(ErrorCode.NotAllowed, this.linkProcessor.GetType().Name + " already registered");
}
this.linkProcessor = linkProcessor;
}
/// <summary> /// <summary>
/// Registers a message processor to accept incoming messages from the specified address. /// Registers a message processor to accept incoming messages from the specified address.
/// When it is called, the container creates a node where the client can attach. /// When it is called, the container creates a node where the client can attach.
@ -212,6 +230,12 @@ namespace Amqp.Listener
return true; return true;
} }
if (this.linkProcessor != null)
{
this.linkProcessor.Process(new AttachContext(listenerLink, attach));
return false;
}
throw new AmqpException(ErrorCode.NotFound, "No processor was found at " + address); throw new AmqpException(ErrorCode.NotFound, "No processor was found at " + address);
} }

Просмотреть файл

@ -19,35 +19,6 @@ namespace Amqp.Listener
{ {
using Amqp.Framing; using Amqp.Framing;
/// <summary>
/// Defines the property and methods of a message processor.
/// </summary>
public interface IMessageProcessor
{
/// <summary>
/// Gets the link credit to issue to the client.
/// </summary>
int Credit { get; }
/// <summary>
/// Processes a received message.
/// </summary>
/// <param name="messageContext">Context of the received message.</param>
void Process(MessageContext messageContext);
}
/// <summary>
/// Defines the methods of a request processor.
/// </summary>
public interface IRequestProcessor
{
/// <summary>
/// Processes a received request.
/// </summary>
/// <param name="requestContext">Context of the received request.</param>
void Process(RequestContext requestContext);
}
/// <summary> /// <summary>
/// The base class of request context. /// The base class of request context.
/// </summary> /// </summary>
@ -94,61 +65,4 @@ namespace Amqp.Listener
this.Link.DisposeMessage(this.Message, deliveryState, true); this.Link.DisposeMessage(this.Message, deliveryState, true);
} }
} }
/// <summary>
/// Provides the context to a message processor to process the received message.
/// </summary>
public class MessageContext : Context
{
internal MessageContext(ListenerLink link, Message message)
: base(link, message)
{
}
/// <summary>
/// Accepts the message.
/// </summary>
public void Complete()
{
this.Dispose(Context.Accepted);
}
/// <summary>
/// Rejects the message.
/// </summary>
/// <param name="error"></param>
public void Complete(Error error)
{
this.Dispose(new Rejected() { Error = error });
}
}
/// <summary>
/// Provides the context to a request processor to process the received request.
/// </summary>
public class RequestContext : Context
{
readonly ListenerLink responseLink;
internal RequestContext(ListenerLink requestLink, ListenerLink responseLink, Message request)
: base(requestLink, request)
{
this.responseLink = responseLink;
}
/// <summary>
/// Completes the request and sends the response message to the peer.
/// </summary>
/// <param name="response">The response message to send.</param>
public void Complete(Message response)
{
if (response.Properties == null)
{
response.Properties = new Properties();
}
response.Properties.CorrelationId = this.Message.Properties.MessageId;
this.responseLink.SendMessage(response, response.Encode());
}
}
} }

Просмотреть файл

@ -0,0 +1,42 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using Amqp.Framing;
/// <summary>
/// Provides the context to a link endpoint to process the received disposition.
/// </summary>
public class DispositionContext : MessageContext
{
internal DispositionContext(ListenerLink link, Message message, DeliveryState deliveryState, bool settled)
: base(link, message)
{
this.Settled = settled;
}
/// <summary>
/// Gets the settled state sent in the disposition.
/// </summary>
public bool Settled
{
get;
private set;
}
}
}

Просмотреть файл

@ -0,0 +1,61 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using Amqp.Types;
/// <summary>
/// Provides the context to a link endpoint to process a received flow.
/// </summary>
public class FlowContext
{
internal FlowContext(ListenerLink link, int messages, Fields properties)
{
this.Link = link;
this.Messages = messages;
this.Properties = properties;
}
/// <summary>
/// Gets the link associated with the context.
/// </summary>
public ListenerLink Link
{
get;
private set;
}
/// <summary>
/// Gets the number of messages allowed to send by the peer.
/// </summary>
public int Messages
{
get;
private set;
}
/// <summary>
/// Gets the properties associated with the flow performative.
/// </summary>
public Fields Properties
{
get;
private set;
}
}
}

Просмотреть файл

@ -0,0 +1,33 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
/// <summary>
/// Defines the property and methods of a link processor.
/// </summary>
public interface ILinkProcessor
{
/// <summary>
/// Processes a received attach performative to open a link endpoint. The
/// implementation must call attachContext.Complete to either accept or
/// reject the attach performative.
/// </summary>
/// <param name="attachContext">Context of the received attach performative.</param>
void Process(AttachContext attachContext);
}
}

Просмотреть файл

@ -0,0 +1,36 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
/// <summary>
/// Defines the property and methods of a message processor.
/// </summary>
public interface IMessageProcessor
{
/// <summary>
/// Gets the link credit to issue to the client.
/// </summary>
int Credit { get; }
/// <summary>
/// Processes a received message.
/// </summary>
/// <param name="messageContext">Context of the received message.</param>
void Process(MessageContext messageContext);
}
}

Просмотреть файл

@ -0,0 +1,31 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
/// <summary>
/// Defines the methods of a request processor.
/// </summary>
public interface IRequestProcessor
{
/// <summary>
/// Processes a received request.
/// </summary>
/// <param name="requestContext">Context of the received request.</param>
void Process(RequestContext requestContext);
}
}

Просмотреть файл

@ -0,0 +1,51 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using System;
/// <summary>
/// The base class of an AMQP link endpoint.
/// </summary>
public abstract class LinkEndpoint
{
/// <summary>
/// Processes a received message. A receiving endpoint must implement this method to process messages.
/// </summary>
/// <param name="messageContext">Context of the received message.</param>
public virtual void OnMessage(MessageContext messageContext)
{
throw new NotImplementedException();
}
/// <summary>
/// Processes a received flow performative. A sending endpoint should send messages per the requested
/// message count. A receiving endpoint may receive a flow if the sender may want to request for credit
/// or send custom properties.
/// </summary>
/// <param name="flowContext">Context of the received flow performative.</param>
public abstract void OnFlow(FlowContext flowContext);
/// <summary>
/// Processes a received disposition performative. The endpoint should check the delivery state and
/// perform appropriate actions to the message.
/// </summary>
/// <param name="dispositionContext">Context of the received disposition performative.</param>
public abstract void OnDisposition(DispositionContext dispositionContext);
}
}

Просмотреть файл

@ -31,6 +31,12 @@ namespace Amqp.Listener
uint credit; uint credit;
object state; object state;
// caller can initialize the link for an endpoint, a sender or a receiver
// based on its needs.
// link endpoint
LinkEndpoint linkEndpoint;
// send // send
Action<int, object> onCredit; Action<int, object> onCredit;
Action<Message, DeliveryState, bool, object> onDispose; Action<Message, DeliveryState, bool, object> onDispose;
@ -80,11 +86,13 @@ namespace Amqp.Listener
/// <summary> /// <summary>
/// Initializes the receiver state for the link. /// Initializes the receiver state for the link.
/// </summary> /// </summary>
/// <param name="credit">The link credit.</param> /// <param name="credit">The link credit to send to the peer.</param>
/// <param name="onMessage">The callback to invoke for received messages.</param> /// <param name="onMessage">The callback to be invoked for received messages.</param>
/// <param name="state">The user state attached to the link.</param> /// <param name="state">The user state attached to the link.</param>
public void InitializeReceiver(uint credit, Action<ListenerLink, Message, DeliveryState, object> onMessage, object state) public void InitializeReceiver(uint credit, Action<ListenerLink, Message, DeliveryState, object> onMessage, object state)
{ {
ThrowIfNotNull(this.linkEndpoint, "endpoint");
ThrowIfNotNull(this.onMessage, "receiver");
this.credit = credit; this.credit = credit;
this.onMessage = onMessage; this.onMessage = onMessage;
this.state = state; this.state = state;
@ -93,11 +101,14 @@ namespace Amqp.Listener
/// <summary> /// <summary>
/// Initializes the sender state for the link. /// Initializes the sender state for the link.
/// </summary> /// </summary>
/// <param name="onCredit">The callback to invoke when flow is received with more link credit.</param> /// <param name="onCredit">The callback to be invoked when delivery limit changes (by received flow performatives).</param>
/// <param name="onDispose">The callback to invoke when disposition is received.</param> /// <param name="onDispose">The callback to be invoked when disposition is received.</param>
/// <param name="state">The user state attached to the link.</param> /// <param name="state">The user state attached to the link.</param>
public void InitializeSender(Action<int, object> onCredit, Action<Message, DeliveryState, bool, object> onDispose, object state) public void InitializeSender(Action<int, object> onCredit, Action<Message, DeliveryState, bool, object> onDispose, object state)
{ {
ThrowIfNotNull(this.linkEndpoint, "endpoint");
ThrowIfNotNull(this.onCredit, "sender");
ThrowIfNotNull(this.onDispose, "sender");
this.onCredit = onCredit; this.onCredit = onCredit;
this.onDispose = onDispose; this.onDispose = onDispose;
this.state = state; this.state = state;
@ -106,15 +117,15 @@ namespace Amqp.Listener
/// <summary> /// <summary>
/// Sends a message. This call is non-blocking and it does not wait for acknowledgements. /// Sends a message. This call is non-blocking and it does not wait for acknowledgements.
/// </summary> /// </summary>
/// <param name="message"></param> /// <param name="message">The message to be sent.</param>
/// <param name="buffer"></param> /// <param name="buffer">The serialized buffer of the message. It is null, the message is serialized.</param>
public void SendMessage(Message message, ByteBuffer buffer) public void SendMessage(Message message, ByteBuffer buffer)
{ {
Delivery delivery = new Delivery() Delivery delivery = new Delivery()
{ {
Handle = this.Handle, Handle = this.Handle,
Message = message, Message = message,
Buffer = buffer, Buffer = buffer ?? message.Encode(),
Link = this, Link = this,
Settled = this.SettleOnSend Settled = this.SettleOnSend
}; };
@ -126,7 +137,7 @@ namespace Amqp.Listener
/// <summary> /// <summary>
/// Sends a disposition for the message. /// Sends a disposition for the message.
/// </summary> /// </summary>
/// <param name="message">The message.</param> /// <param name="message">The message to be disposed (a disposition performative will be sent for this message).</param>
/// <param name="deliveryState">The delivery state to set on disposition.</param> /// <param name="deliveryState">The delivery state to set on disposition.</param>
/// <param name="settled">The settled flag on disposition.</param> /// <param name="settled">The settled flag on disposition.</param>
public void DisposeMessage(Message message, DeliveryState deliveryState, bool settled) public void DisposeMessage(Message message, DeliveryState deliveryState, bool settled)
@ -141,7 +152,8 @@ namespace Amqp.Listener
} }
/// <summary> /// <summary>
/// Completes the link attach request. /// Completes the link attach request. This should be called when the IContainer.AttachLink implementation returns false
/// and the asynchrounous processing completes.
/// </summary> /// </summary>
/// <param name="attach">The attach to send back.</param> /// <param name="attach">The attach to send back.</param>
/// <param name="error">The error, if any, for the link.</param> /// <param name="error">The error, if any, for the link.</param>
@ -171,6 +183,16 @@ namespace Amqp.Listener
} }
} }
internal void InitializeLinkEndpoint(LinkEndpoint linkEndpoint, uint credit)
{
ThrowIfNotNull(this.linkEndpoint, "endpoint");
ThrowIfNotNull(this.onMessage, "receiver");
ThrowIfNotNull(this.onCredit, "sender");
ThrowIfNotNull(this.onDispose, "sender");
this.credit = credit;
this.linkEndpoint = linkEndpoint;
}
internal override void OnAttach(uint remoteHandle, Attach attach) internal override void OnAttach(uint remoteHandle, Attach attach)
{ {
var container = ((ListenerConnection)this.Session.Connection).Listener.Container; var container = ((ListenerConnection)this.Session.Connection).Listener.Container;
@ -199,15 +221,16 @@ namespace Amqp.Listener
internal override void OnFlow(Flow flow) internal override void OnFlow(Flow flow)
{ {
if (this.onCredit != null) var theirLimit = (SequenceNumber)(flow.DeliveryCount + flow.LinkCredit);
var myLimit = (SequenceNumber)((uint)this.deliveryCount + this.credit);
int delta = theirLimit - myLimit;
if (this.linkEndpoint != null)
{ {
var theirLimit = (SequenceNumber)(flow.DeliveryCount + flow.LinkCredit); this.linkEndpoint.OnFlow(new FlowContext(this, delta, flow.Properties));
var myLimit = (SequenceNumber)((uint)this.deliveryCount + this.credit); }
int delta = theirLimit - myLimit; else if (delta > 0 && this.onCredit != null)
if (delta > 0) {
{ this.onCredit(delta, this.state);
this.onCredit(delta, this.state);
}
} }
} }
@ -217,6 +240,10 @@ namespace Amqp.Listener
{ {
this.onDispose(delivery.Message, delivery.State, delivery.Settled, this.state); this.onDispose(delivery.Message, delivery.State, delivery.Settled, this.state);
} }
else if (this.linkEndpoint != null)
{
this.linkEndpoint.OnDisposition(new DispositionContext(this, delivery.Message, delivery.State, delivery.Settled));
}
} }
internal override void OnTransfer(Delivery delivery, Transfer transfer, ByteBuffer buffer) internal override void OnTransfer(Delivery delivery, Transfer transfer, ByteBuffer buffer)
@ -261,11 +288,27 @@ namespace Amqp.Listener
{ {
} }
static void ThrowIfNotNull(object obj, string name)
{
if (obj != null)
{
throw new InvalidOperationException("The " + name + " has been already initialized for this link.");
}
}
void DeliverMessage(Delivery delivery) void DeliverMessage(Delivery delivery)
{ {
var container = ((ListenerConnection)this.Session.Connection).Listener.Container; var container = ((ListenerConnection)this.Session.Connection).Listener.Container;
delivery.Message = container.CreateMessage(delivery.Buffer); delivery.Message = container.CreateMessage(delivery.Buffer);
this.onMessage(this, delivery.Message, delivery.State, this.state); if (this.onMessage != null)
{
this.onMessage(this, delivery.Message, delivery.State, this.state);
}
else if (this.linkEndpoint != null)
{
this.linkEndpoint.OnMessage(new MessageContext(this, delivery.Message));
}
if (this.delivered++ >= this.credit / 2) if (this.delivered++ >= this.credit / 2)
{ {
this.delivered = 0; this.delivered = 0;

Просмотреть файл

@ -0,0 +1,59 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using Amqp.Framing;
/// <summary>
/// Provides the context to a message processor to process the received message.
/// </summary>
public class MessageContext : Context
{
internal MessageContext(ListenerLink link, Message message)
: base(link, message)
{
this.DeliveryState = message.Delivery.State;
}
/// <summary>
/// Gets the delivery state associated with the message.
/// </summary>
public DeliveryState DeliveryState
{
get;
private set;
}
/// <summary>
/// Accepts the message.
/// </summary>
public void Complete()
{
this.Dispose(Context.Accepted);
}
/// <summary>
/// Rejects the message.
/// </summary>
/// <param name="error"></param>
public void Complete(Error error)
{
this.Dispose(new Rejected() { Error = error });
}
}
}

Просмотреть файл

@ -0,0 +1,50 @@
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace Amqp.Listener
{
using Amqp.Framing;
/// <summary>
/// Provides the context to a request processor to process the received request.
/// </summary>
public class RequestContext : Context
{
readonly ListenerLink responseLink;
internal RequestContext(ListenerLink requestLink, ListenerLink responseLink, Message request)
: base(requestLink, request)
{
this.responseLink = responseLink;
}
/// <summary>
/// Completes the request and sends the response message to the peer.
/// </summary>
/// <param name="response">The response message to send.</param>
public void Complete(Message response)
{
if (response.Properties == null)
{
response.Properties = new Properties();
}
response.Properties.CorrelationId = this.Message.Properties.MessageId;
this.responseLink.SendMessage(response, response.Encode());
}
}
}

Просмотреть файл

@ -17,10 +17,10 @@
namespace TestAmqpBroker namespace TestAmqpBroker
{ {
using System;
using System.Linq;
using System.Collections.Generic;
using Amqp; using Amqp;
using Listener.IContainer;
using System;
using System.Collections.Generic;
class Program class Program
{ {

Просмотреть файл

@ -35,7 +35,6 @@
<ItemGroup> <ItemGroup>
<Compile Include="Program.cs" /> <Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" /> <Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="TestAmqpBroker.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="App.config" /> <None Include="App.config" />
@ -44,6 +43,10 @@
<Reference Include="System" /> <Reference Include="System" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ProjectReference Include="..\..\Examples\Listener\Listener.IContainer\Listener.IContainer.csproj">
<Project>{4b7bfdb9-406a-4ebe-93c3-5d40d6a986f4}</Project>
<Name>Listener.IContainer</Name>
</ProjectReference>
<ProjectReference Include="..\..\src\Amqp.Net.csproj"> <ProjectReference Include="..\..\src\Amqp.Net.csproj">
<Project>{92153715-1d99-43b1-b291-470cf91a156d}</Project> <Project>{92153715-1d99-43b1-b291-470cf91a156d}</Project>
<Name>Amqp.Net</Name> <Name>Amqp.Net</Name>