amqpnetlite/test/PerfTest/Program.cs

616 строки
21 KiB
C#
Исходник Обычный вид История

2015-04-10 16:05:55 +03:00
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace PerfTest
{
using System;
2015-06-21 20:52:28 +03:00
using System.Security.Cryptography.X509Certificates;
2015-04-10 16:05:55 +03:00
using System.Threading;
2015-10-22 02:54:25 +03:00
using System.Threading.Tasks;
2015-04-10 16:05:55 +03:00
using Amqp;
using Amqp.Framing;
using Amqp.Listener;
2015-10-22 02:54:25 +03:00
using Amqp.Types;
2016-10-24 04:13:58 +03:00
using Test.Common;
using TestExtensions = Test.Common.Extensions;
2015-04-10 16:05:55 +03:00
class Program
{
static void Main(string[] args)
{
try
{
PerfArguments perfArgs = new PerfArguments(args);
2017-04-26 03:56:57 +03:00
if (args.Length == 0 || perfArgs.HasHelp)
2015-04-10 16:05:55 +03:00
{
Usage();
return;
}
if (perfArgs.TraceLevel != 0)
{
Trace.TraceLevel = perfArgs.TraceLevel;
Trace.TraceListener = (l, f, o) => Console.WriteLine(DateTime.Now.ToString("[hh:mm:ss.fff]") + " " + string.Format(f, o));
2015-04-10 16:05:55 +03:00
}
Role role;
if (string.Equals("send", perfArgs.Operation, StringComparison.OrdinalIgnoreCase))
{
role = new Sender(perfArgs);
}
else if (string.Equals("receive", perfArgs.Operation, StringComparison.OrdinalIgnoreCase))
{
role = new Receiver(perfArgs);
}
else if (string.Equals("request", perfArgs.Operation, StringComparison.OrdinalIgnoreCase))
{
role = new Requestor(perfArgs);
}
else if (string.Equals("reply", perfArgs.Operation, StringComparison.OrdinalIgnoreCase))
{
role = new ReplyListener(perfArgs);
}
2015-04-10 16:05:55 +03:00
else if (string.Equals("listen", perfArgs.Operation, StringComparison.OrdinalIgnoreCase))
{
role = new Listener(perfArgs);
}
else
{
throw new ArgumentException(perfArgs.Operation);
}
Console.WriteLine("Running perf test...");
role.Run();
}
catch (Exception e)
{
Console.WriteLine(e.ToString());
}
}
static void Usage()
{
Console.WriteLine(System.Diagnostics.Process.GetCurrentProcess().ProcessName + ".exe send|receive|listen [arguments]");
Console.WriteLine(" send \tsend messages to remote peer");
Console.WriteLine(" receive\treceive messages from remote peer");
Console.WriteLine(" request\tsend requests to a remote peer");
Console.WriteLine(" reply \tstart a request processor and send replies");
2015-04-10 16:05:55 +03:00
Console.WriteLine(" listen \tstart a listener and accept messages from remote peer");
Console.WriteLine("\r\narguments:");
2017-04-26 03:56:57 +03:00
Arguments.PrintArguments(typeof(PerfArguments));
2015-04-10 16:05:55 +03:00
}
abstract class Role
{
2015-10-22 02:54:25 +03:00
protected IBufferManager bufferManager;
2015-04-10 16:05:55 +03:00
PerfArguments perfArgs;
long count;
long started;
long completed;
long progress;
2015-04-10 16:05:55 +03:00
ManualResetEvent completedEvent;
System.Diagnostics.Stopwatch stopwatch;
public Role(PerfArguments perfArgs)
{
2015-10-22 02:54:25 +03:00
this.bufferManager = perfArgs.BufferPooling ? new BufferManager(256, 2 * 1024 * 1024, 100 * 1024 * 1024) : null;
2015-04-10 16:05:55 +03:00
this.perfArgs = perfArgs;
this.count = perfArgs.Count;
this.progress = perfArgs.Progress;
this.completedEvent = new ManualResetEvent(false);
}
protected PerfArguments Args
{
get { return this.perfArgs; }
}
public abstract void Run();
2015-06-21 20:52:28 +03:00
protected Connection CreateConnection(Address address)
{
var factory = new ConnectionFactory();
2015-10-22 02:54:25 +03:00
factory.BufferManager = this.bufferManager;
factory.AMQP.MaxFrameSize = this.perfArgs.MaxFrameSize;
2015-06-21 20:52:28 +03:00
if (address.Scheme.Equals("amqps", StringComparison.OrdinalIgnoreCase))
{
factory.SSL.RemoteCertificateValidationCallback = (a, b, c, d) => true;
}
return factory.CreateAsync(address).Result;
}
2015-04-10 16:05:55 +03:00
protected bool OnStart()
{
return Interlocked.Increment(ref this.started) <= this.count || this.count == 0;
}
protected bool OnComplete()
{
long done = Interlocked.Increment(ref this.completed);
2015-04-10 16:05:55 +03:00
if (this.progress > 0 && done % this.progress == 0)
{
long throughput;
if (this.stopwatch == null)
{
this.stopwatch = System.Diagnostics.Stopwatch.StartNew();
throughput = -1;
}
else
{
long ms = this.stopwatch.ElapsedMilliseconds;
throughput = ms > 0 ? done * 1000L / ms : -1;
}
Trace.WriteLine(TraceLevel.Information, "completed {0} throughput {1} msg/s", done, throughput);
}
if (this.count > 0 && done >= this.count)
{
this.stopwatch.Stop();
this.completedEvent.Set();
return false;
}
else
{
return this.OnStart();
}
}
protected void Wait()
{
this.completedEvent.WaitOne();
}
protected void SetComplete()
{
this.completedEvent.Set();
}
2015-04-10 16:05:55 +03:00
}
class Sender : Role
{
static OutcomeCallback onOutcome = OnSendComplete;
int bodySize;
public Sender(PerfArguments args)
: base(args)
{
this.bodySize = args.BodySize;
}
public override void Run()
2015-10-22 02:54:25 +03:00
{
Task[] tasks = new Task[this.Args.Connections];
for (int i = 0; i < this.Args.Connections; i++)
{
tasks[i] = Task.Run(() => this.RunOnce(i));
}
Task.WhenAll(tasks).Wait();
}
2017-06-16 02:33:02 +03:00
static void OnSendComplete(ILink link, Message message, Outcome outcome, object state)
2015-10-22 02:54:25 +03:00
{
var tuple = (Tuple<Sender, SenderLink>)state;
Sender thisPtr = tuple.Item1;
SenderLink sender = tuple.Item2;
if (thisPtr.bufferManager != null)
{
var buffer = message.GetBody<ByteBuffer>();
buffer.Reset();
thisPtr.bufferManager.ReturnBuffer(new ArraySegment<byte>(buffer.Buffer, buffer.Offset, buffer.Capacity));
}
if (thisPtr.OnComplete())
{
Message msg = thisPtr.CreateMessage();
sender.Send(msg, onOutcome, state);
}
}
void RunOnce(int id)
2015-04-10 16:05:55 +03:00
{
2015-06-21 20:52:28 +03:00
Connection connection = this.CreateConnection(new Address(this.Args.Address));
connection.Closed += (o, e) => this.SetComplete();
2015-04-10 16:05:55 +03:00
Session session = new Session(connection);
Attach attach = new Attach()
{
Source = new Source(),
Target = new Target() { Address = this.Args.Node },
SndSettleMode = this.Args.SenderMode,
RcvSettleMode = this.Args.ReceiverMode
};
2015-10-22 02:54:25 +03:00
SenderLink sender = new SenderLink(session, "perf-test-sender" + id, attach, null);
2015-04-10 16:05:55 +03:00
for (int i = 1; i <= this.Args.Queue; i++)
{
if (this.OnStart())
{
2015-10-22 02:54:25 +03:00
var message = this.CreateMessage();
sender.Send(message, onOutcome, Tuple.Create(this, sender));
2015-04-10 16:05:55 +03:00
}
}
this.Wait();
sender.Close();
session.Close();
connection.Close();
}
2015-10-22 02:54:25 +03:00
Message CreateMessage()
2015-04-10 16:05:55 +03:00
{
2015-10-22 02:54:25 +03:00
ArraySegment<byte> segment = this.bufferManager != null ?
this.bufferManager.TakeBuffer(this.bodySize) :
new ArraySegment<byte>(new byte[this.bodySize]);
int seed = DateTime.UtcNow.Millisecond;
for (int i = 0; i < this.bodySize; i++)
2015-04-10 16:05:55 +03:00
{
2015-10-22 02:54:25 +03:00
segment.Array[segment.Offset + i] = (byte)((i + seed) % 256);
2015-04-10 16:05:55 +03:00
}
2015-10-22 02:54:25 +03:00
Message message = new Message();
message.Properties = new Properties() { MessageId = "msg" };
message.BodySection = new Data()
{
Buffer = new ByteBuffer(segment.Array, segment.Offset, this.bodySize, segment.Count)
};
return message;
2015-04-10 16:05:55 +03:00
}
}
class Receiver : Role
{
public Receiver(PerfArguments args)
: base(args)
{
}
public override void Run()
2015-10-22 02:54:25 +03:00
{
Task[] tasks = new Task[this.Args.Connections];
for (int i = 0; i < this.Args.Connections; i++)
{
tasks[i] = Task.Run(() => this.RunOnce(i));
}
Task.WhenAll(tasks).Wait();
}
void RunOnce(int id)
2015-04-10 16:05:55 +03:00
{
2015-06-21 20:52:28 +03:00
Connection connection = this.CreateConnection(new Address(this.Args.Address));
connection.Closed += (o, e) => this.SetComplete();
2015-04-10 16:05:55 +03:00
Session session = new Session(connection);
Attach attach = new Attach()
{
Source = new Source() { Address = this.Args.Node },
Target = new Target(),
SndSettleMode = this.Args.SenderMode,
RcvSettleMode = this.Args.ReceiverMode
};
2015-10-22 02:54:25 +03:00
ReceiverLink receiver = new ReceiverLink(session, "perf-test-receiver" + id, attach, null);
2015-04-10 16:05:55 +03:00
receiver.Start(
this.Args.Queue,
(r, m) =>
{
r.Accept(m);
2015-10-22 02:54:25 +03:00
m.Dispose();
2015-04-10 16:05:55 +03:00
this.OnComplete();
});
this.Wait();
receiver.Close();
session.Close();
connection.Close();
}
}
class Requestor : Role
{
byte[] buffer;
public Requestor(PerfArguments args)
: base(args)
{
if (args.BufferPooling)
{
this.buffer = new byte[args.BodySize]; // a simulation of buffer pooling
}
}
public override void Run()
{
Task[] tasks = new Task[this.Args.Connections];
for (int i = 0; i < this.Args.Connections; i++)
{
tasks[i] = Task.Run(() => this.RunOnce(i));
}
Task.WhenAll(tasks).Wait();
}
void SendRequest(SenderLink sender, string replyTo)
{
Message message = new Message();
message.Properties = new Properties() { ReplyTo = replyTo };
message.Properties.SetCorrelationId(Guid.NewGuid());
message.BodySection = new Data() { Binary = this.GetBuffer() };
sender.Send(message, null, null);
}
byte[] GetBuffer()
{
return this.Args.BufferPooling ? this.buffer : new byte[this.Args.BodySize];
}
void RunOnce(int id)
{
Connection connection = this.CreateConnection(new Address(this.Args.Address));
connection.Closed += (o, e) => this.SetComplete();
Session session = new Session(connection);
string clientId = "request-" + Guid.NewGuid().ToString().Substring(0, 6);
Attach sendAttach = new Attach()
{
Source = new Source(),
Target = new Target() { Address = this.Args.Node },
SndSettleMode = SenderSettleMode.Settled
};
Attach recvAttach = new Attach()
{
Source = new Source() { Address = this.Args.Node },
Target = new Target() { Address = clientId },
SndSettleMode = SenderSettleMode.Settled
};
SenderLink sender = new SenderLink(session, "s-" + clientId, sendAttach, null);
ReceiverLink receiver = new ReceiverLink(session, "r-" + clientId, recvAttach, null);
receiver.Start(
50000,
(r, m) =>
{
r.Accept(m);
m.Dispose();
if (this.OnComplete())
{
this.SendRequest(sender, clientId);
}
});
for (int i = 1; i <= this.Args.Queue; i++)
{
if (this.OnStart())
{
this.SendRequest(sender, clientId);
}
}
this.Wait();
connection.Close();
}
}
class ReplyListener : Role, IRequestProcessor
{
public ReplyListener(PerfArguments args)
: base(args)
{
}
public override void Run()
{
Address addressUri = new Address(this.Args.Address);
X509Certificate2 certificate = TestExtensions.GetCertificate(addressUri.Scheme, addressUri.Host, this.Args.CertValue);
ContainerHost host = new ContainerHost(new Address[] { addressUri }, certificate);
foreach (var listener in host.Listeners)
{
listener.BufferManager = this.bufferManager;
listener.AMQP.MaxFrameSize = this.Args.MaxFrameSize;
}
host.Open();
Console.WriteLine("Container host is listening on {0}:{1}", addressUri.Host, addressUri.Port);
host.RegisterRequestProcessor(this.Args.Node, this);
Console.WriteLine("Message processor is registered on {0}", this.Args.Node);
this.Wait();
host.Close();
}
int IRequestProcessor.Credit { get { return this.Args.Queue; } }
void IRequestProcessor.Process(RequestContext requestContext)
{
Message response = new Message("request processed");
response.ApplicationProperties = new ApplicationProperties();
response.ApplicationProperties["status-code"] = 200;
requestContext.Complete(response);
this.OnComplete();
}
}
2015-04-10 16:05:55 +03:00
class Listener : Role, IMessageProcessor
{
int credit;
public Listener(PerfArguments args)
: base(args)
{
this.credit = args.Queue;
}
public override void Run()
{
Address addressUri = new Address(this.Args.Address);
2016-10-24 04:13:58 +03:00
X509Certificate2 certificate = TestExtensions.GetCertificate(addressUri.Scheme, addressUri.Host, this.Args.CertValue);
ContainerHost host = new ContainerHost(new Address[] { addressUri }, certificate);
foreach (var listener in host.Listeners)
{
2015-10-22 02:54:25 +03:00
listener.BufferManager = this.bufferManager;
listener.AMQP.MaxFrameSize = this.Args.MaxFrameSize;
}
2015-04-10 16:05:55 +03:00
host.Open();
Console.WriteLine("Container host is listening on {0}:{1}", addressUri.Host, addressUri.Port);
host.RegisterMessageProcessor(this.Args.Node, this);
Console.WriteLine("Message processor is registered on {0}", this.Args.Node);
this.Wait();
host.Close();
}
int IMessageProcessor.Credit
{
get { return this.credit; }
}
void IMessageProcessor.Process(MessageContext messageContext)
{
messageContext.Complete();
this.OnComplete();
}
}
class PerfArguments : Arguments
{
public PerfArguments(string[] args)
: base(args, 1)
{
this.Operation = args[0];
}
public string Operation
{
get;
private set;
}
[Argument(Name = "address", Shortcut = "a", Description = "address of the remote peer or the local listener", Default = "amqp://guest:guest@127.0.0.1:5672")]
public string Address
{
get;
protected set;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "cert", Shortcut = "f", Description = "certificate for SSL authentication. Default to address.host")]
public string CertValue
{
get;
protected set;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "node", Shortcut = "n", Description = "name of the AMQP node", Default = "q1")]
public string Node
{
get;
protected set;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "count", Shortcut = "c", Description = "total number of messages to send or receive (0: infinite)", Default = 100000)]
public long Count
2015-04-10 16:05:55 +03:00
{
get;
protected set;
}
2015-10-22 02:54:25 +03:00
[Argument(Name = "connection", Shortcut = "i", Description = "number of connection to create", Default = 1)]
public int Connections
{
get;
protected set;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "body-size", Shortcut = "b", Description = "message body size (bytes)", Default = 64)]
public int BodySize
{
get;
protected set;
}
[Argument(Name = "max-frame-size", Shortcut = "m", Description = "connection max frame size (bytes)", Default = 256*1024)]
public int MaxFrameSize
{
get;
protected set;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "queue", Shortcut = "q", Description = "outgoing queue depth (link credit)", Default = 1000)]
2015-04-10 16:05:55 +03:00
public int Queue
{
get;
protected set;
}
[Argument(Name = "progress", Shortcut = "p", Description = "report progress for every this number of messages", Default = 1000)]
2015-04-10 16:05:55 +03:00
public int Progress
{
get;
protected set;
}
2015-10-22 02:54:25 +03:00
[Argument(Name = "buffer-pool", Shortcut = "u", Description = "enable buffer pooling", Default = false)]
public bool BufferPooling
{
get;
protected set;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "ack", Shortcut = "k", Description = "ack mode: amo|alo|eo", Default = "amo")]
protected string Ack
{
get;
set;
}
[Argument(Name = "trace", Shortcut = "t", Description = "trace level: err|warn|info|verbose|frm", Default = "info")]
protected string Trace
{
get;
set;
}
public SenderSettleMode SenderMode
{
get { return this.Ack.ToSenderSettleMode(); }
}
public ReceiverSettleMode ReceiverMode
{
get { return this.Ack.ToReceiverSettleMode(); }
}
public TraceLevel TraceLevel
{
2016-10-24 04:13:58 +03:00
get { return this.Trace.ToTraceLevel(); }
2015-04-10 16:05:55 +03:00
}
}
}
}