Changed thread handling and minor enhancements.

This commit is contained in:
Erik Mogensen 2020-07-28 21:53:53 +02:00
Родитель f1f6e6b4e9
Коммит 8f883c1af7
1 изменённых файлов: 72 добавлений и 61 удалений

Просмотреть файл

@ -29,7 +29,7 @@ namespace MessagingSamples
using Microsoft.Azure.ServiceBus.Primitives; using Microsoft.Azure.ServiceBus.Primitives;
using Microsoft.Azure.ServiceBus; using Microsoft.Azure.ServiceBus;
using Microsoft.Azure.ServiceBus.InteropExtensions; using Microsoft.Azure.ServiceBus.InteropExtensions;
using System.Threading;
public class Program public class Program
{ {
@ -50,8 +50,8 @@ namespace MessagingSamples
Console.WriteLine("4) Client credential X.509 certificate"); Console.WriteLine("4) Client credential X.509 certificate");
int option; int option;
var sc = Console.ReadLine(); ConsoleKeyInfo cki = Console.ReadKey();
if (int.TryParse(sc, out option)) if (int.TryParse(cki.KeyChar.ToString(), out option))
{ {
switch (option) switch (option)
{ {
@ -84,21 +84,75 @@ namespace MessagingSamples
{ {
this.receiveClient = qc; this.receiveClient = qc;
this.InitializeReceiver();
this.sendClient = qc; this.sendClient = qc;
// send a set of messages
var sendTask = this.SendMessagesAsync(); var sendTask = this.SendMessagesAsync();
// the receive those messages
var cts = new CancellationTokenSource();
var receiveTask = this.ReceiveMessagesAsync(cts.Token);
Console.ReadKey(); // wait until both tasks are complete
await Task.WhenAll(
// run 20 seconds, then cancel the receive loop
Task.Delay(TimeSpan.FromSeconds(20)).ContinueWith((t) => cts.Cancel()),
// wait for the send task
sendTask,
// wait for the receive task
receiveTask);
}
// shut down the receiver, which will stop the OnMessageAsync loop Task ReceiveMessagesAsync(CancellationToken cancellationToken)
await this.receiveClient.CloseAsync(); {
var doneReceiving = new TaskCompletionSource<bool>();
// wait for send work to complete if required // close the receiver and factory when the CancellationToken fires
await sendTask; cancellationToken.Register(
async () =>
{
await this.receiveClient.CloseAsync();
doneReceiving.SetResult(true);
});
await this.sendClient.CloseAsync(); // register the OnMessageAsync callback
this.receiveClient.RegisterMessageHandler(
async (message, token) =>
{
if (message.Label != null &&
message.ContentType != null &&
message.Label.Equals("Scientist", StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;
dynamic scientist = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body));
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ firstName = {6}, name = {7} ]",
message.MessageId,
message.SystemProperties.SequenceNumber,
message.SystemProperties.EnqueuedTimeUtc,
message.ContentType,
message.Size,
message.ExpiresAtUtc,
scientist.firstName,
scientist.name);
Console.ResetColor();
}
}
await this.receiveClient.CompleteAsync(message.SystemProperties.LockToken);
},
new MessageHandlerOptions((eventargs) => { return Task.CompletedTask; })
{
AutoComplete = false,
MaxConcurrentCalls = 1
});
return doneReceiving.Task;
} }
async Task SendMessagesAsync() async Task SendMessagesAsync()
@ -138,58 +192,17 @@ namespace MessagingSamples
} }
} }
void InitializeReceiver()
{
// register the OnMessageAsync callback
this.receiveClient.RegisterMessageHandler(
async (message, token) =>
{
if (message.Label != null &&
message.ContentType != null &&
message.Label.Equals("Scientist", StringComparison.InvariantCultureIgnoreCase) &&
message.ContentType.Equals("application/json", StringComparison.InvariantCultureIgnoreCase))
{
var body = message.Body;
dynamic scientist = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(body));
lock (Console.Out)
{
Console.ForegroundColor = ConsoleColor.Cyan;
Console.WriteLine(
"\t\t\t\tMessage received: \n\t\t\t\t\t\tMessageId = {0}, \n\t\t\t\t\t\tSequenceNumber = {1}, \n\t\t\t\t\t\tEnqueuedTimeUtc = {2}," +
"\n\t\t\t\t\t\tExpiresAtUtc = {5}, \n\t\t\t\t\t\tContentType = \"{3}\", \n\t\t\t\t\t\tSize = {4}, \n\t\t\t\t\t\tContent: [ firstName = {6}, name = {7} ]",
message.MessageId,
message.SystemProperties.SequenceNumber,
message.SystemProperties.EnqueuedTimeUtc,
message.ContentType,
message.Size,
message.ExpiresAtUtc,
scientist.firstName,
scientist.name);
Console.ResetColor();
}
}
await this.receiveClient.CompleteAsync(message.SystemProperties.LockToken);
},
new MessageHandlerOptions((eventargs) => { return Task.CompletedTask; })
{
AutoComplete = false,
MaxConcurrentCalls = 1
});
}
async Task UserInteractiveLoginScenario() async Task UserInteractiveLoginScenario()
{ {
var aadTokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider(async (audience, authority, state) => var aadTokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider(async (audience, authority, state) =>
{ {
var app = PublicClientApplicationBuilder.Create(ClientId) IPublicClientApplication app = PublicClientApplicationBuilder.Create(ClientId)
.WithRedirectUri(ConfigurationManager.AppSettings["redirectURI"]) .WithRedirectUri(ConfigurationManager.AppSettings["redirectURI"])
.Build(); .Build();
var serviceBusAudience = new Uri("https://servicebus.azure.net"); var serviceBusAudience = new Uri("https://servicebus.azure.net");
var authResult = await app.AcquireTokenInteractive(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync(); AuthenticationResult authResult = await app.AcquireTokenInteractive(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync();
return authResult.AccessToken; return authResult.AccessToken;
}, $"https://login.windows.net/{TenantId}"); }, $"https://login.windows.net/{TenantId}");
@ -202,7 +215,7 @@ namespace MessagingSamples
async Task ClientCredentialsScenario() async Task ClientCredentialsScenario()
{ {
var aadTokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider TokenProvider aadTokenProvider = TokenProvider.CreateAzureActiveDirectoryTokenProvider
(async (audience, authority, state) => (async (audience, authority, state) =>
{ {
IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(ClientId) IConfidentialClientApplication app = ConfidentialClientApplicationBuilder.Create(ClientId)
@ -212,7 +225,7 @@ namespace MessagingSamples
var serviceBusAudience = new Uri("https://servicebus.azure.net"); var serviceBusAudience = new Uri("https://servicebus.azure.net");
var authResult = await app.AcquireTokenForClient(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync(); AuthenticationResult authResult = await app.AcquireTokenForClient(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync();
return authResult.AccessToken; return authResult.AccessToken;
}, $"https://login.windows.net/{TenantId}"); }, $"https://login.windows.net/{TenantId}");
@ -265,7 +278,7 @@ namespace MessagingSamples
var serviceBusAudience = new Uri("https://servicebus.azure.net"); var serviceBusAudience = new Uri("https://servicebus.azure.net");
var authResult = await app.AcquireTokenForClient(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync(); AuthenticationResult authResult = await app.AcquireTokenForClient(new string[] { $"{serviceBusAudience}/.default" }).ExecuteAsync();
return authResult.AccessToken; return authResult.AccessToken;
}, $"https://login.windows.net/{TenantId}"); }, $"https://login.windows.net/{TenantId}");
@ -276,12 +289,12 @@ namespace MessagingSamples
} }
public static int Main(string[] args) public static async Task<int> Main(string[] args)
{ {
try try
{ {
var app = new Program(); var app = new Program();
app.Run().GetAwaiter().GetResult(); await app.Run();
} }
catch (Exception e) catch (Exception e)
{ {
@ -291,8 +304,6 @@ namespace MessagingSamples
} }
return 0; return 0;
} }
} }
} }