Republish dlq Samples for Binder Reference Doc (#203)

* ReRouteDlq

* Dlq Samples for Binder reference

* partitioned consumer

* Apply suggestions from code review

Co-authored-by: Tim Hess <thess@vmware.com>

Co-authored-by: Tim Hess <thess@vmware.com>
This commit is contained in:
Hananiel Sarella 2021-05-26 09:24:43 -04:00 коммит произвёл GitHub
Родитель 3feb7d068e
Коммит 212dd5a022
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
30 изменённых файлов: 896 добавлений и 8 удалений

Просмотреть файл

@ -5,6 +5,20 @@ VisualStudioVersion = 16.0.29806.167
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RabbitMQWeb", "RabbitMQWeb\RabbitMQWeb.csproj", "{36602E04-9061-4828-B95F-C5D0472C768A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DependencyInjection", "Console\DependencyInjection\DependencyInjection.csproj", "{2FF4B674-B1D3-4D56-A759-6042CD12536C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenericHostEndpointManualContainer", "Console\GenericHostEndpointManualContainer\GenericHostEndpointManualContainer.csproj", "{AA408ECA-BAD2-4F63-B5AF-22A6163DA76A}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenericHostEndpointRegistration", "Console\GenericHostEndpointRegistration\GenericHostEndpointRegistration.csproj", "{A7036E3E-0C8F-4067-A8B0-6F07E02DA80E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenericHostManualContainer", "Console\GenericHostManualContainer\GenericHostManualContainer.csproj", "{F7300A01-1A65-4CEA-8C51-54CD339D4DF4}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenericHostRabbitListener", "Console\GenericHostRabbitListener\GenericHostRabbitListener.csproj", "{5259A092-1D5A-44AE-AC70-F4091CFD7895}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "GenericHostRabbitListenerHeaders", "Console\GenericHostRabbitListenerHeaders\GenericHostRabbitListenerHeaders.csproj", "{21664699-AEF5-4FCA-B370-2FEE392FC3F9}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TemplateSendReceive", "Console\TemplateSendReceive\TemplateSendReceive.csproj", "{96586662-7721-4CED-BF5E-9035C3A068F5}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -15,6 +29,34 @@ Global
{36602E04-9061-4828-B95F-C5D0472C768A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{36602E04-9061-4828-B95F-C5D0472C768A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{36602E04-9061-4828-B95F-C5D0472C768A}.Release|Any CPU.Build.0 = Release|Any CPU
{2FF4B674-B1D3-4D56-A759-6042CD12536C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2FF4B674-B1D3-4D56-A759-6042CD12536C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2FF4B674-B1D3-4D56-A759-6042CD12536C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2FF4B674-B1D3-4D56-A759-6042CD12536C}.Release|Any CPU.Build.0 = Release|Any CPU
{AA408ECA-BAD2-4F63-B5AF-22A6163DA76A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AA408ECA-BAD2-4F63-B5AF-22A6163DA76A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AA408ECA-BAD2-4F63-B5AF-22A6163DA76A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AA408ECA-BAD2-4F63-B5AF-22A6163DA76A}.Release|Any CPU.Build.0 = Release|Any CPU
{A7036E3E-0C8F-4067-A8B0-6F07E02DA80E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A7036E3E-0C8F-4067-A8B0-6F07E02DA80E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A7036E3E-0C8F-4067-A8B0-6F07E02DA80E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A7036E3E-0C8F-4067-A8B0-6F07E02DA80E}.Release|Any CPU.Build.0 = Release|Any CPU
{F7300A01-1A65-4CEA-8C51-54CD339D4DF4}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F7300A01-1A65-4CEA-8C51-54CD339D4DF4}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7300A01-1A65-4CEA-8C51-54CD339D4DF4}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7300A01-1A65-4CEA-8C51-54CD339D4DF4}.Release|Any CPU.Build.0 = Release|Any CPU
{5259A092-1D5A-44AE-AC70-F4091CFD7895}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5259A092-1D5A-44AE-AC70-F4091CFD7895}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5259A092-1D5A-44AE-AC70-F4091CFD7895}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5259A092-1D5A-44AE-AC70-F4091CFD7895}.Release|Any CPU.Build.0 = Release|Any CPU
{21664699-AEF5-4FCA-B370-2FEE392FC3F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{21664699-AEF5-4FCA-B370-2FEE392FC3F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{21664699-AEF5-4FCA-B370-2FEE392FC3F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
{21664699-AEF5-4FCA-B370-2FEE392FC3F9}.Release|Any CPU.Build.0 = Release|Any CPU
{96586662-7721-4CED-BF5E-9035C3A068F5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{96586662-7721-4CED-BF5E-9035C3A068F5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{96586662-7721-4CED-BF5E-9035C3A068F5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{96586662-7721-4CED-BF5E-9035C3A068F5}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

Просмотреть файл

@ -0,0 +1,31 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Steeltoe.Messaging.Handler.Attributes;
using Steeltoe.Messaging.RabbitMQ;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System;
using System.Threading.Tasks;
namespace PartitionedProducer
{
[EnableBinding(typeof(ISink))]
public class PartitionedConsumer
{
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<PartitionedConsumer>(args)
.Build();
await host.StartAsync();
}
[StreamListener(ISink.INPUT)]
public void Listen([Payload] string input, [Header(RabbitMessageHeaders.CONSUMER_QUEUE)] string queue)
{
Console.WriteLine(input +" received from queue " + queue);
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"PartitionedConsumer": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"input": {
"destination": "partitioned.destination",
"group": "myGroup",
"consumer": {
"partitioned": true,
"instanceIndex": 0
}
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,22 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System.Threading.Tasks;
namespace PartitionedProducer
{
[EnableBinding(typeof(ISource))]
public class Program
{
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<Program>(args)
.ConfigureServices(svc => svc.AddHostedService<Worker>())
.Build();
await host.StartAsync();
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"PartitionedProducer": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,57 @@
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Steeltoe.Messaging;
using Steeltoe.Messaging.Support;
using Steeltoe.Stream.Messaging;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace PartitionedProducer
{
public class Worker : BackgroundService
{
private readonly ISource _source;
private readonly ILogger<Worker> _logger;
private static readonly Random RANDOM = new Random();
private static readonly string[] data = new string[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public string ServiceName { get; set; } = "BackgroundWorker";
public Worker(ISource source, ILogger<Worker> logger)
{
_source = source;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Delay(5000, stoppingToken); // Wait for the Infrastructure to be setup correctly;
while (!stoppingToken.IsCancellationRequested)
{
_logger.LogInformation("Worker running at: {time}", DateTimeOffset.Now);
try
{
var message = Generate();
_source.Output.Send(message);
}
catch(Exception ex)
{
_logger.LogError(ex, ex.Message);
}
await Task.Delay(5000, stoppingToken);
}
}
protected virtual IMessage Generate()
{
var value = data[RANDOM.Next(data.Length)];
Console.WriteLine("Sending: " + value);
return MessageBuilder.WithPayload(value).SetHeader("partitionKey", value).Build();
}
}
}

Просмотреть файл

@ -0,0 +1,20 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"output": {
"destination": "partitioned.destination",
"producer": {
"partitioned": true,
"partitionKeyExpression": "Headers['partitionKey']",
"partitionCount": 2,
"requiredGroups": [ "myGroup" ]
}
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,82 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Steeltoe.Messaging;
using Steeltoe.Messaging.Handler.Attributes;
using Steeltoe.Messaging.RabbitMQ.Attributes;
using Steeltoe.Messaging.RabbitMQ.Config;
using Steeltoe.Messaging.RabbitMQ.Core;
using Steeltoe.Messaging.RabbitMQ.Exceptions;
using Steeltoe.Messaging.RabbitMQ.Extensions;
using Steeltoe.Messaging.Support;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System.Text;
using System.Threading.Tasks;
namespace ReRouteDlq
{
public class Program
{
private const string ORIGINAL_QUEUE = "so8400in.so8400";
private const string DLQ = ORIGINAL_QUEUE + ".dlq";
private const string PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private const string X_RETRIES_HEADER = "x-retries";
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<ReRouteDlq>(args)
.ConfigureServices((ctx, services) =>
{
services.AddRabbitServices();
services.AddRabbitTemplate();
services.AddRabbitListeners<ReRouteDlq>();
})
.Build();
await host.StartAsync();
}
[EnableBinding(typeof(ISink))]
public class ReRouteDlq
{
private readonly RabbitTemplate rabbitTemplate;
public ReRouteDlq(RabbitTemplate template)
{
rabbitTemplate = template;
}
[DeclareQueue(Name = PARKING_LOT)]
[RabbitListener(DLQ)]
public void RePublish(
string text,
[Header(Name = X_RETRIES_HEADER, Required = false)]
int? retriesHeader)
{
var failedMessage = MessageBuilder
.WithPayload(Encoding.UTF8.GetBytes(text))
.SetHeader(X_RETRIES_HEADER, (retriesHeader ?? 0) + 1)
.Build();
if (!retriesHeader.HasValue || retriesHeader < 3)
{
rabbitTemplate.Send(ORIGINAL_QUEUE, failedMessage);
}
else
{
rabbitTemplate.Send(PARKING_LOT, failedMessage);
}
}
[StreamListener(ISink.INPUT)]
public void InitialMessage(IMessage failedMessage)
{
throw new RabbitRejectAndDontRequeueException("failed");
}
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"ReRouteDlq": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,35 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"input": {
"destination": "so8400in",
"group": "so8400",
"consumer": {
"maxattempts": 1
}
}
},
"rabbit": {
"default": {
"consumer": {
"autoBindDlq": true
}
},
"bindings": {
"input": {
"consumer": {
"autoBindDlq": true,
"dlqTtl": 5000,
"dlqdeadletterexchange": ""
}
}
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,82 @@
using Steeltoe.Messaging;
using Steeltoe.Messaging.Handler.Attributes;
using Steeltoe.Messaging.RabbitMQ.Attributes;
using Steeltoe.Messaging.RabbitMQ.Core;
using Steeltoe.Messaging.RabbitMQ.Exceptions;
using Steeltoe.Messaging.RabbitMQ.Extensions;
using Steeltoe.Messaging.Support;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System.Text;
using System.Threading.Tasks;
namespace ReRouteDlqDelayed
{
public class Program
{
private const string ORIGINAL_QUEUE = "so8400in.so8400";
private const string DLQ = ORIGINAL_QUEUE + ".dlq";
private const string PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private const string X_RETRIES_HEADER = "x-retries";
private const string DELAY_EXCHANGE = "dlqReRouter";
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<ReRouteDlq>(args)
.ConfigureServices((ctx, services) =>
{
services.AddRabbitServices();
services.AddRabbitTemplate();
services.AddRabbitListeners<ReRouteDlq>();
})
.Build();
await host.StartAsync();
}
[EnableBinding(typeof(ISink))]
public class ReRouteDlq
{
private readonly RabbitTemplate rabbitTemplate;
public ReRouteDlq(RabbitTemplate template)
{
rabbitTemplate = template;
}
[DeclareQueue(Name = PARKING_LOT)]
[DeclareExchange(Name = "delayExchange", Delayed = "True")]
[DeclareQueueBinding(Name = "bindOriginalToDelay", QueueName = ORIGINAL_QUEUE, ExchangeName = "delayExchange")]
[RabbitListener(DLQ)]
public void RePublish(
string text,
[Header(Name = X_RETRIES_HEADER, Required = false)]
int? retriesHeader)
{
var failedMessage = MessageBuilder
.WithPayload(Encoding.UTF8.GetBytes(text))
.SetHeader(X_RETRIES_HEADER, (retriesHeader ?? 0) + 1)
.SetHeader("x-delay", 5000*retriesHeader)
.Build();
if (!retriesHeader.HasValue || retriesHeader < 3)
{
rabbitTemplate.Send(ORIGINAL_QUEUE, failedMessage);
}
else
{
rabbitTemplate.Send(PARKING_LOT, failedMessage);
}
}
[StreamListener(ISink.INPUT)]
public void InitialMessage(IMessage failedMessage)
{
throw new RabbitRejectAndDontRequeueException("failed");
}
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"ReRouteDlqDelayed": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,35 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"input": {
"destination": "so8400in",
"group": "so8400",
"consumer": {
"maxattempts": 1
}
}
},
"rabbit": {
"default": {
"consumer": {
"autoBindDlq": true
}
},
"bindings": {
"input": {
"consumer": {
"autoBindDlq": true,
"dlqTtl": 5000,
"dlqdeadletterexchange": ""
}
}
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,97 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Steeltoe.Messaging;
using Steeltoe.Messaging.Handler.Attributes;
using Steeltoe.Messaging.RabbitMQ.Attributes;
using Steeltoe.Messaging.RabbitMQ.Config;
using Steeltoe.Messaging.RabbitMQ.Core;
using Steeltoe.Messaging.RabbitMQ.Exceptions;
using Steeltoe.Messaging.RabbitMQ.Extensions;
using Steeltoe.Messaging.Support;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace ReRouteDlqRepublishToDlqFalse
{
public class Program
{
private const string ORIGINAL_QUEUE = "so8400in.so8400";
private const string DLQ = ORIGINAL_QUEUE + ".dlq";
private const string PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private const string X_DEATH_HEADER = "x-death";
private const string X_RETRIES_HEADER = "x-retries";
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<ReRouteDlq>(args)
.ConfigureServices((ctx, services) =>
{
services.AddRabbitServices();
services.AddRabbitTemplate();
services.AddRabbitListeners<ReRouteDlq>();
})
.Build();
await host.StartAsync();
}
[EnableBinding(typeof(ISink))]
public class ReRouteDlq
{
private readonly RabbitTemplate rabbitTemplate;
public ReRouteDlq(RabbitTemplate template)
{
rabbitTemplate = template;
}
[DeclareQueue(Name = PARKING_LOT)]
[RabbitListener(DLQ)]
public void RePublish(
string text,
[Header(Name = X_RETRIES_HEADER, Required = false)]
int? retriesHeader,
[Header(Name = X_DEATH_HEADER, Required = false)]
IDictionary<string, object> xDeathHeader
)
{
var failedMessage = MessageBuilder
.WithPayload(Encoding.UTF8.GetBytes(text))
.SetHeader(X_RETRIES_HEADER, (retriesHeader ?? 0) + 1)
.Build();
if (!retriesHeader.HasValue || retriesHeader < 3)
{
if (xDeathHeader != null
&& xDeathHeader.TryGetValue("exchange", out var exchange)
&& exchange is string strExchange
&& xDeathHeader.TryGetValue("routing-keys", out var rk)
&& rk is List<object> routingKeys)
{
rabbitTemplate.Send(strExchange, routingKeys[0].ToString(), failedMessage);
}
else
{
throw new RabbitRejectAndDontRequeueException("failed");
}
}
else
{
rabbitTemplate.Send(PARKING_LOT, failedMessage);
}
}
[StreamListener(ISink.INPUT)]
public void InitialMessage(IMessage failedMessage)
{
throw new RabbitRejectAndDontRequeueException("failed");
}
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"ReRouteDlqRepublishToDlqFalse": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,32 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"input": {
"destination": "so8400in",
"group": "so8400",
"consumer": {
"maxattempts": 1
}
}
},
"rabbit": {
"bindings": {
"input": {
"consumer": {
"autoBindDlq": true,
"republishToDlq": false,
"dlqTtl": 5000,
"dlqdeadletterexchange": ""
}
}
}
}
}
}
}
}

Просмотреть файл

@ -0,0 +1,94 @@
using Steeltoe.Messaging;
using Steeltoe.Messaging.Handler.Attributes;
using Steeltoe.Messaging.RabbitMQ.Attributes;
using Steeltoe.Messaging.RabbitMQ.Core;
using Steeltoe.Messaging.RabbitMQ.Exceptions;
using Steeltoe.Messaging.RabbitMQ.Extensions;
using Steeltoe.Messaging.RabbitMQ.Retry;
using Steeltoe.Messaging.Support;
using Steeltoe.Stream.Attributes;
using Steeltoe.Stream.Messaging;
using Steeltoe.Stream.StreamHost;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
namespace ReRouteDlqRepublishToDlqTrue
{
public class Program
{
private const string ORIGINAL_QUEUE = "so8400in.so8400";
private const string DLQ = ORIGINAL_QUEUE + ".dlq";
private const string PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
private const string X_RETRIES_HEADER = "x-retries";
private const string X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE;
private const string X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY;
static async Task Main(string[] args)
{
var host = StreamHost
.CreateDefaultBuilder<ReRouteDlq>(args)
.ConfigureServices((ctx, services) =>
{
services.AddRabbitServices();
services.AddRabbitTemplate();
services.AddRabbitListeners<ReRouteDlq>();
})
.Build();
await host.StartAsync();
}
[EnableBinding(typeof(ISink))]
public class ReRouteDlq
{
private readonly RabbitTemplate rabbitTemplate;
public ReRouteDlq(RabbitTemplate template)
{
rabbitTemplate = template;
}
[DeclareQueue(Name = PARKING_LOT)]
[RabbitListener(DLQ)]
public void RePublish(
string text,
[Header(Name = X_RETRIES_HEADER, Required = false)]
int? retriesHeader,
[Header(Name = X_ORIGINAL_EXCHANGE_HEADER, Required = false)]
string exchange,
[Header(Name = X_ORIGINAL_ROUTING_KEY_HEADER, Required = false)]
string originalRoutingKey
)
{
var failedMessage = MessageBuilder
.WithPayload(Encoding.UTF8.GetBytes(text))
.SetHeader(X_RETRIES_HEADER, (retriesHeader ?? 0) + 1)
.Build();
if (!retriesHeader.HasValue || retriesHeader < 3)
{
if (exchange != null && !string.IsNullOrEmpty(originalRoutingKey) )
{
rabbitTemplate.Send(exchange, originalRoutingKey, failedMessage);
}
else
{
rabbitTemplate.Send(ORIGINAL_QUEUE, failedMessage);
}
}
else
{
rabbitTemplate.Send(PARKING_LOT, failedMessage);
}
}
[StreamListener(ISink.INPUT)]
public void InitialMessage(IMessage failedMessage)
{
throw new RabbitRejectAndDontRequeueException("failed");
}
}
}
}

Просмотреть файл

@ -0,0 +1,10 @@
{
"profiles": {
"ReRouteDlqRepublishToDlqTrue": {
"commandName": "Project"
},
"Docker": {
"commandName": "Docker"
}
}
}

Просмотреть файл

@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -0,0 +1,31 @@
{
"spring": {
"cloud": {
"stream": {
"binder": "rabbit",
"bindings": {
"input": {
"destination": "so8400in",
"group": "so8400",
"consumer": {
"maxattempts": 1
}
}
},
"rabbit": {
"bindings": {
"input": {
"consumer": {
"autoBindDlq": true,
"republishToDlq": true,
"dlqTtl": 5000,
"dlqdeadletterexchange": ""
}
}
}
}
}
}
}
}

Просмотреть файл

@ -25,6 +25,18 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "DynamicDestinationMessaging
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "XDeathApplication", "XDeathApplication\XDeathApplication.csproj", "{78EDF333-A665-43FB-99E2-41980505FF74}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReRouteDlq", "ReRouteDlq\ReRouteDlq.csproj", "{9293B142-5F48-4E7B-B539-F76C7EB73B03}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReRouteDlqDelayed", "ReRouteDlqDelayed\ReRouteDlqDelayed.csproj", "{49ED7381-1410-4DD5-A78D-7ADF2C58045E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReRouteDlqRepublishToDlqFalse", "ReRouteDlqRepublishToDlqFalse\ReRouteDlqRepublishToDlqFalse.csproj", "{E4D2765E-3A8F-4AD6-A532-1DC44737FC11}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "ReRouteDlqRepublishToDlqTrue", "ReRouteDlqRepublishToDlqTrue\ReRouteDlqRepublishToDlqTrue.csproj", "{E53C489E-7B1A-46C0-8728-9C923644B5EF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionedProducer", "PartitionedProducer\PartitionedProducer.csproj", "{705B2081-3065-4B6F-9C4F-CFD89304F0AE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PartitionedConsumer", "PartitionedConsumer\PartitionedConsumer.csproj", "{E78ED292-CD60-4245-BD86-5EC956659895}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -75,6 +87,30 @@ Global
{78EDF333-A665-43FB-99E2-41980505FF74}.Debug|Any CPU.Build.0 = Debug|Any CPU
{78EDF333-A665-43FB-99E2-41980505FF74}.Release|Any CPU.ActiveCfg = Release|Any CPU
{78EDF333-A665-43FB-99E2-41980505FF74}.Release|Any CPU.Build.0 = Release|Any CPU
{9293B142-5F48-4E7B-B539-F76C7EB73B03}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{9293B142-5F48-4E7B-B539-F76C7EB73B03}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9293B142-5F48-4E7B-B539-F76C7EB73B03}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9293B142-5F48-4E7B-B539-F76C7EB73B03}.Release|Any CPU.Build.0 = Release|Any CPU
{49ED7381-1410-4DD5-A78D-7ADF2C58045E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{49ED7381-1410-4DD5-A78D-7ADF2C58045E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{49ED7381-1410-4DD5-A78D-7ADF2C58045E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{49ED7381-1410-4DD5-A78D-7ADF2C58045E}.Release|Any CPU.Build.0 = Release|Any CPU
{E4D2765E-3A8F-4AD6-A532-1DC44737FC11}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E4D2765E-3A8F-4AD6-A532-1DC44737FC11}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E4D2765E-3A8F-4AD6-A532-1DC44737FC11}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E4D2765E-3A8F-4AD6-A532-1DC44737FC11}.Release|Any CPU.Build.0 = Release|Any CPU
{E53C489E-7B1A-46C0-8728-9C923644B5EF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E53C489E-7B1A-46C0-8728-9C923644B5EF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E53C489E-7B1A-46C0-8728-9C923644B5EF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E53C489E-7B1A-46C0-8728-9C923644B5EF}.Release|Any CPU.Build.0 = Release|Any CPU
{705B2081-3065-4B6F-9C4F-CFD89304F0AE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{705B2081-3065-4B6F-9C4F-CFD89304F0AE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{705B2081-3065-4B6F-9C4F-CFD89304F0AE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{705B2081-3065-4B6F-9C4F-CFD89304F0AE}.Release|Any CPU.Build.0 = Release|Any CPU
{E78ED292-CD60-4245-BD86-5EC956659895}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E78ED292-CD60-4245-BD86-5EC956659895}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E78ED292-CD60-4245-BD86-5EC956659895}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E78ED292-CD60-4245-BD86-5EC956659895}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

Просмотреть файл

@ -33,7 +33,5 @@ namespace XDeathApplication
}
throw new RabbitRejectAndDontRequeueException("failed");
}
}
}

Просмотреть файл

@ -6,9 +6,15 @@
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
<LangVersion>8.0</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
<PackageReference Include="Steeltoe.Stream.Binder.RabbitMQ" Version="3.1.0-rc1" />
<PackageReference Include="Steeltoe.Stream.StreamBase" Version="3.1.0-rc1" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

Просмотреть файл

@ -12,13 +12,7 @@
}
}
},
"rabbit": {
"default": {
"consumer": {
"autoBindDlq": true
}
},
"bindings": {
"input": {
"consumer": {