2015-04-10 16:05:55 +03:00
// ------------------------------------------------------------------------------------
// Copyright (c) Microsoft Corporation
// All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the ""License""); you may not use this
// file except in compliance with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// THIS CODE IS PROVIDED *AS IS* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION ANY IMPLIED WARRANTIES OR
// CONDITIONS OF TITLE, FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABLITY OR
// NON-INFRINGEMENT.
//
// See the Apache Version 2.0 License for specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------------------
namespace PerfTest
{
using System ;
2015-06-21 20:52:28 +03:00
using System.Security.Cryptography.X509Certificates ;
2015-04-10 16:05:55 +03:00
using System.Threading ;
2015-10-22 02:54:25 +03:00
using System.Threading.Tasks ;
2015-04-10 16:05:55 +03:00
using Amqp ;
using Amqp.Framing ;
using Amqp.Listener ;
2015-10-22 02:54:25 +03:00
using Amqp.Types ;
2016-10-24 04:13:58 +03:00
using Test.Common ;
using TestExtensions = Test . Common . Extensions ;
2015-04-10 16:05:55 +03:00
class Program
{
static void Main ( string [ ] args )
{
try
{
PerfArguments perfArgs = new PerfArguments ( args ) ;
2017-04-26 03:56:57 +03:00
if ( args . Length = = 0 | | perfArgs . HasHelp )
2015-04-10 16:05:55 +03:00
{
Usage ( ) ;
return ;
}
if ( perfArgs . TraceLevel ! = 0 )
{
Trace . TraceLevel = perfArgs . TraceLevel ;
2017-03-09 02:14:58 +03:00
Trace . TraceListener = ( l , f , o ) = > Console . WriteLine ( DateTime . Now . ToString ( "[hh:mm:ss.fff]" ) + " " + string . Format ( f , o ) ) ;
2015-04-10 16:05:55 +03:00
}
Role role ;
if ( string . Equals ( "send" , perfArgs . Operation , StringComparison . OrdinalIgnoreCase ) )
{
role = new Sender ( perfArgs ) ;
}
else if ( string . Equals ( "receive" , perfArgs . Operation , StringComparison . OrdinalIgnoreCase ) )
{
role = new Receiver ( perfArgs ) ;
}
2016-12-14 06:14:33 +03:00
else if ( string . Equals ( "request" , perfArgs . Operation , StringComparison . OrdinalIgnoreCase ) )
{
role = new Requestor ( perfArgs ) ;
}
else if ( string . Equals ( "reply" , perfArgs . Operation , StringComparison . OrdinalIgnoreCase ) )
{
role = new ReplyListener ( perfArgs ) ;
}
2015-04-10 16:05:55 +03:00
else if ( string . Equals ( "listen" , perfArgs . Operation , StringComparison . OrdinalIgnoreCase ) )
{
role = new Listener ( perfArgs ) ;
}
else
{
throw new ArgumentException ( perfArgs . Operation ) ;
}
Console . WriteLine ( "Running perf test..." ) ;
role . Run ( ) ;
}
catch ( Exception e )
{
Console . WriteLine ( e . ToString ( ) ) ;
}
}
static void Usage ( )
{
Console . WriteLine ( System . Diagnostics . Process . GetCurrentProcess ( ) . ProcessName + ".exe send|receive|listen [arguments]" ) ;
Console . WriteLine ( " send \tsend messages to remote peer" ) ;
Console . WriteLine ( " receive\treceive messages from remote peer" ) ;
2016-12-14 06:14:33 +03:00
Console . WriteLine ( " request\tsend requests to a remote peer" ) ;
Console . WriteLine ( " reply \tstart a request processor and send replies" ) ;
2015-04-10 16:05:55 +03:00
Console . WriteLine ( " listen \tstart a listener and accept messages from remote peer" ) ;
Console . WriteLine ( "\r\narguments:" ) ;
2017-04-26 03:56:57 +03:00
Arguments . PrintArguments ( typeof ( PerfArguments ) ) ;
2015-04-10 16:05:55 +03:00
}
abstract class Role
{
2015-10-22 02:54:25 +03:00
protected IBufferManager bufferManager ;
2015-04-10 16:05:55 +03:00
PerfArguments perfArgs ;
2016-12-14 06:14:33 +03:00
long count ;
long started ;
long completed ;
long progress ;
2015-04-10 16:05:55 +03:00
ManualResetEvent completedEvent ;
System . Diagnostics . Stopwatch stopwatch ;
public Role ( PerfArguments perfArgs )
{
2015-10-22 02:54:25 +03:00
this . bufferManager = perfArgs . BufferPooling ? new BufferManager ( 256 , 2 * 1024 * 1024 , 100 * 1024 * 1024 ) : null ;
2015-04-10 16:05:55 +03:00
this . perfArgs = perfArgs ;
this . count = perfArgs . Count ;
this . progress = perfArgs . Progress ;
this . completedEvent = new ManualResetEvent ( false ) ;
}
protected PerfArguments Args
{
get { return this . perfArgs ; }
}
public abstract void Run ( ) ;
2015-06-21 20:52:28 +03:00
protected Connection CreateConnection ( Address address )
{
var factory = new ConnectionFactory ( ) ;
2015-10-22 02:54:25 +03:00
factory . BufferManager = this . bufferManager ;
2015-09-14 08:13:20 +03:00
factory . AMQP . MaxFrameSize = this . perfArgs . MaxFrameSize ;
2015-06-21 20:52:28 +03:00
if ( address . Scheme . Equals ( "amqps" , StringComparison . OrdinalIgnoreCase ) )
{
factory . SSL . RemoteCertificateValidationCallback = ( a , b , c , d ) = > true ;
}
return factory . CreateAsync ( address ) . Result ;
}
2015-04-10 16:05:55 +03:00
protected bool OnStart ( )
{
return Interlocked . Increment ( ref this . started ) < = this . count | | this . count = = 0 ;
}
protected bool OnComplete ( )
{
2016-12-14 06:14:33 +03:00
long done = Interlocked . Increment ( ref this . completed ) ;
2015-04-10 16:05:55 +03:00
if ( this . progress > 0 & & done % this . progress = = 0 )
{
long throughput ;
if ( this . stopwatch = = null )
{
this . stopwatch = System . Diagnostics . Stopwatch . StartNew ( ) ;
throughput = - 1 ;
}
else
{
long ms = this . stopwatch . ElapsedMilliseconds ;
throughput = ms > 0 ? done * 1000L / ms : - 1 ;
}
Trace . WriteLine ( TraceLevel . Information , "completed {0} throughput {1} msg/s" , done , throughput ) ;
}
if ( this . count > 0 & & done > = this . count )
{
this . stopwatch . Stop ( ) ;
this . completedEvent . Set ( ) ;
return false ;
}
else
{
return this . OnStart ( ) ;
}
}
protected void Wait ( )
{
this . completedEvent . WaitOne ( ) ;
}
2016-01-22 22:16:39 +03:00
protected void SetComplete ( )
{
this . completedEvent . Set ( ) ;
}
2015-04-10 16:05:55 +03:00
}
class Sender : Role
{
static OutcomeCallback onOutcome = OnSendComplete ;
int bodySize ;
public Sender ( PerfArguments args )
: base ( args )
{
this . bodySize = args . BodySize ;
}
public override void Run ( )
2015-10-22 02:54:25 +03:00
{
Task [ ] tasks = new Task [ this . Args . Connections ] ;
for ( int i = 0 ; i < this . Args . Connections ; i + + )
{
tasks [ i ] = Task . Run ( ( ) = > this . RunOnce ( i ) ) ;
}
Task . WhenAll ( tasks ) . Wait ( ) ;
}
2017-06-16 02:33:02 +03:00
static void OnSendComplete ( ILink link , Message message , Outcome outcome , object state )
2015-10-22 02:54:25 +03:00
{
var tuple = ( Tuple < Sender , SenderLink > ) state ;
Sender thisPtr = tuple . Item1 ;
SenderLink sender = tuple . Item2 ;
if ( thisPtr . bufferManager ! = null )
{
var buffer = message . GetBody < ByteBuffer > ( ) ;
buffer . Reset ( ) ;
thisPtr . bufferManager . ReturnBuffer ( new ArraySegment < byte > ( buffer . Buffer , buffer . Offset , buffer . Capacity ) ) ;
}
if ( thisPtr . OnComplete ( ) )
{
Message msg = thisPtr . CreateMessage ( ) ;
sender . Send ( msg , onOutcome , state ) ;
}
}
void RunOnce ( int id )
2015-04-10 16:05:55 +03:00
{
2015-06-21 20:52:28 +03:00
Connection connection = this . CreateConnection ( new Address ( this . Args . Address ) ) ;
2016-01-22 22:16:39 +03:00
connection . Closed + = ( o , e ) = > this . SetComplete ( ) ;
2015-04-10 16:05:55 +03:00
Session session = new Session ( connection ) ;
Attach attach = new Attach ( )
{
Source = new Source ( ) ,
Target = new Target ( ) { Address = this . Args . Node } ,
SndSettleMode = this . Args . SenderMode ,
RcvSettleMode = this . Args . ReceiverMode
} ;
2015-10-22 02:54:25 +03:00
SenderLink sender = new SenderLink ( session , "perf-test-sender" + id , attach , null ) ;
2015-04-10 16:05:55 +03:00
for ( int i = 1 ; i < = this . Args . Queue ; i + + )
{
if ( this . OnStart ( ) )
{
2015-10-22 02:54:25 +03:00
var message = this . CreateMessage ( ) ;
sender . Send ( message , onOutcome , Tuple . Create ( this , sender ) ) ;
2015-04-10 16:05:55 +03:00
}
}
this . Wait ( ) ;
sender . Close ( ) ;
session . Close ( ) ;
connection . Close ( ) ;
}
2015-10-22 02:54:25 +03:00
Message CreateMessage ( )
2015-04-10 16:05:55 +03:00
{
2015-10-22 02:54:25 +03:00
ArraySegment < byte > segment = this . bufferManager ! = null ?
this . bufferManager . TakeBuffer ( this . bodySize ) :
new ArraySegment < byte > ( new byte [ this . bodySize ] ) ;
int seed = DateTime . UtcNow . Millisecond ;
for ( int i = 0 ; i < this . bodySize ; i + + )
2015-04-10 16:05:55 +03:00
{
2015-10-22 02:54:25 +03:00
segment . Array [ segment . Offset + i ] = ( byte ) ( ( i + seed ) % 256 ) ;
2015-04-10 16:05:55 +03:00
}
2015-10-22 02:54:25 +03:00
Message message = new Message ( ) ;
message . Properties = new Properties ( ) { MessageId = "msg" } ;
message . BodySection = new Data ( )
{
Buffer = new ByteBuffer ( segment . Array , segment . Offset , this . bodySize , segment . Count )
} ;
return message ;
2015-04-10 16:05:55 +03:00
}
}
class Receiver : Role
{
public Receiver ( PerfArguments args )
: base ( args )
{
}
public override void Run ( )
2015-10-22 02:54:25 +03:00
{
Task [ ] tasks = new Task [ this . Args . Connections ] ;
for ( int i = 0 ; i < this . Args . Connections ; i + + )
{
tasks [ i ] = Task . Run ( ( ) = > this . RunOnce ( i ) ) ;
}
Task . WhenAll ( tasks ) . Wait ( ) ;
}
void RunOnce ( int id )
2015-04-10 16:05:55 +03:00
{
2015-06-21 20:52:28 +03:00
Connection connection = this . CreateConnection ( new Address ( this . Args . Address ) ) ;
2016-01-22 22:16:39 +03:00
connection . Closed + = ( o , e ) = > this . SetComplete ( ) ;
2015-04-10 16:05:55 +03:00
Session session = new Session ( connection ) ;
Attach attach = new Attach ( )
{
Source = new Source ( ) { Address = this . Args . Node } ,
Target = new Target ( ) ,
SndSettleMode = this . Args . SenderMode ,
RcvSettleMode = this . Args . ReceiverMode
} ;
2015-10-22 02:54:25 +03:00
ReceiverLink receiver = new ReceiverLink ( session , "perf-test-receiver" + id , attach , null ) ;
2015-04-10 16:05:55 +03:00
receiver . Start (
this . Args . Queue ,
( r , m ) = >
{
r . Accept ( m ) ;
2015-10-22 02:54:25 +03:00
m . Dispose ( ) ;
2015-04-10 16:05:55 +03:00
this . OnComplete ( ) ;
} ) ;
this . Wait ( ) ;
receiver . Close ( ) ;
session . Close ( ) ;
connection . Close ( ) ;
}
}
2016-12-14 06:14:33 +03:00
class Requestor : Role
{
byte [ ] buffer ;
public Requestor ( PerfArguments args )
: base ( args )
{
if ( args . BufferPooling )
{
this . buffer = new byte [ args . BodySize ] ; // a simulation of buffer pooling
}
}
public override void Run ( )
{
Task [ ] tasks = new Task [ this . Args . Connections ] ;
for ( int i = 0 ; i < this . Args . Connections ; i + + )
{
tasks [ i ] = Task . Run ( ( ) = > this . RunOnce ( i ) ) ;
}
Task . WhenAll ( tasks ) . Wait ( ) ;
}
void SendRequest ( SenderLink sender , string replyTo )
{
Message message = new Message ( ) ;
message . Properties = new Properties ( ) { ReplyTo = replyTo } ;
message . Properties . SetCorrelationId ( Guid . NewGuid ( ) ) ;
message . BodySection = new Data ( ) { Binary = this . GetBuffer ( ) } ;
sender . Send ( message , null , null ) ;
}
byte [ ] GetBuffer ( )
{
return this . Args . BufferPooling ? this . buffer : new byte [ this . Args . BodySize ] ;
}
void RunOnce ( int id )
{
Connection connection = this . CreateConnection ( new Address ( this . Args . Address ) ) ;
connection . Closed + = ( o , e ) = > this . SetComplete ( ) ;
Session session = new Session ( connection ) ;
string clientId = "request-" + Guid . NewGuid ( ) . ToString ( ) . Substring ( 0 , 6 ) ;
Attach sendAttach = new Attach ( )
{
Source = new Source ( ) ,
Target = new Target ( ) { Address = this . Args . Node } ,
SndSettleMode = SenderSettleMode . Settled
} ;
Attach recvAttach = new Attach ( )
{
Source = new Source ( ) { Address = this . Args . Node } ,
Target = new Target ( ) { Address = clientId } ,
SndSettleMode = SenderSettleMode . Settled
} ;
SenderLink sender = new SenderLink ( session , "s-" + clientId , sendAttach , null ) ;
ReceiverLink receiver = new ReceiverLink ( session , "r-" + clientId , recvAttach , null ) ;
receiver . Start (
50000 ,
( r , m ) = >
{
r . Accept ( m ) ;
m . Dispose ( ) ;
if ( this . OnComplete ( ) )
{
this . SendRequest ( sender , clientId ) ;
}
} ) ;
for ( int i = 1 ; i < = this . Args . Queue ; i + + )
{
if ( this . OnStart ( ) )
{
this . SendRequest ( sender , clientId ) ;
}
}
this . Wait ( ) ;
connection . Close ( ) ;
}
}
class ReplyListener : Role , IRequestProcessor
{
public ReplyListener ( PerfArguments args )
: base ( args )
{
}
public override void Run ( )
{
Uri addressUri = new Uri ( this . Args . Address ) ;
X509Certificate2 certificate = TestExtensions . GetCertificate ( addressUri . Scheme , addressUri . Host , this . Args . CertValue ) ;
ContainerHost host = new ContainerHost ( new Uri [ ] { addressUri } , certificate , addressUri . UserInfo ) ;
foreach ( var listener in host . Listeners )
{
listener . BufferManager = this . bufferManager ;
listener . AMQP . MaxFrameSize = this . Args . MaxFrameSize ;
}
host . Open ( ) ;
Console . WriteLine ( "Container host is listening on {0}:{1}" , addressUri . Host , addressUri . Port ) ;
host . RegisterRequestProcessor ( this . Args . Node , this ) ;
Console . WriteLine ( "Message processor is registered on {0}" , this . Args . Node ) ;
this . Wait ( ) ;
host . Close ( ) ;
}
int IRequestProcessor . Credit { get { return this . Args . Queue ; } }
void IRequestProcessor . Process ( RequestContext requestContext )
{
Message response = new Message ( "request processed" ) ;
response . ApplicationProperties = new ApplicationProperties ( ) ;
response . ApplicationProperties [ "status-code" ] = 200 ;
requestContext . Complete ( response ) ;
this . OnComplete ( ) ;
}
}
2015-04-10 16:05:55 +03:00
class Listener : Role , IMessageProcessor
{
int credit ;
public Listener ( PerfArguments args )
: base ( args )
{
this . credit = args . Queue ;
}
public override void Run ( )
{
Uri addressUri = new Uri ( this . Args . Address ) ;
2016-10-24 04:13:58 +03:00
X509Certificate2 certificate = TestExtensions . GetCertificate ( addressUri . Scheme , addressUri . Host , this . Args . CertValue ) ;
2015-06-21 20:52:28 +03:00
ContainerHost host = new ContainerHost ( new Uri [ ] { addressUri } , certificate , addressUri . UserInfo ) ;
2015-09-14 08:13:20 +03:00
foreach ( var listener in host . Listeners )
{
2015-10-22 02:54:25 +03:00
listener . BufferManager = this . bufferManager ;
2015-09-14 08:13:20 +03:00
listener . AMQP . MaxFrameSize = this . Args . MaxFrameSize ;
}
2015-04-10 16:05:55 +03:00
host . Open ( ) ;
Console . WriteLine ( "Container host is listening on {0}:{1}" , addressUri . Host , addressUri . Port ) ;
host . RegisterMessageProcessor ( this . Args . Node , this ) ;
Console . WriteLine ( "Message processor is registered on {0}" , this . Args . Node ) ;
this . Wait ( ) ;
host . Close ( ) ;
}
int IMessageProcessor . Credit
{
get { return this . credit ; }
}
void IMessageProcessor . Process ( MessageContext messageContext )
{
messageContext . Complete ( ) ;
this . OnComplete ( ) ;
}
}
class PerfArguments : Arguments
{
public PerfArguments ( string [ ] args )
: base ( args , 1 )
{
this . Operation = args [ 0 ] ;
}
public string Operation
{
get ;
private set ;
}
[Argument(Name = "address", Shortcut = "a", Description = "address of the remote peer or the local listener", Default = "amqp://guest:guest@127.0.0.1:5672")]
public string Address
{
get ;
protected set ;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "cert", Shortcut = "f", Description = "certificate for SSL authentication. Default to address.host")]
public string CertValue
{
get ;
protected set ;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "node", Shortcut = "n", Description = "name of the AMQP node", Default = "q1")]
public string Node
{
get ;
protected set ;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "count", Shortcut = "c", Description = "total number of messages to send or receive (0: infinite)", Default = 100000)]
2016-12-14 06:14:33 +03:00
public long Count
2015-04-10 16:05:55 +03:00
{
get ;
protected set ;
}
2015-10-22 02:54:25 +03:00
[Argument(Name = "connection", Shortcut = "i", Description = "number of connection to create", Default = 1)]
public int Connections
{
get ;
protected set ;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "body-size", Shortcut = "b", Description = "message body size (bytes)", Default = 64)]
public int BodySize
{
get ;
protected set ;
}
2015-09-14 08:13:20 +03:00
[Argument(Name = "max-frame-size", Shortcut = "m", Description = "connection max frame size (bytes)", Default = 256*1024)]
public int MaxFrameSize
{
get ;
protected set ;
}
2015-06-21 20:52:28 +03:00
[Argument(Name = "queue", Shortcut = "q", Description = "outgoing queue depth (link credit)", Default = 1000)]
2015-04-10 16:05:55 +03:00
public int Queue
{
get ;
protected set ;
}
2016-12-14 06:14:33 +03:00
[Argument(Name = "progress", Shortcut = "p", Description = "report progress for every this number of messages", Default = 1000)]
2015-04-10 16:05:55 +03:00
public int Progress
{
get ;
protected set ;
}
2015-10-22 02:54:25 +03:00
[Argument(Name = "buffer-pool", Shortcut = "u", Description = "enable buffer pooling", Default = false)]
public bool BufferPooling
{
get ;
protected set ;
}
2015-04-10 16:05:55 +03:00
[Argument(Name = "ack", Shortcut = "k", Description = "ack mode: amo|alo|eo", Default = "amo")]
protected string Ack
{
get ;
set ;
}
[Argument(Name = "trace", Shortcut = "t", Description = "trace level: err|warn|info|verbose|frm", Default = "info")]
protected string Trace
{
get ;
set ;
}
public SenderSettleMode SenderMode
{
get { return this . Ack . ToSenderSettleMode ( ) ; }
}
public ReceiverSettleMode ReceiverMode
{
get { return this . Ack . ToReceiverSettleMode ( ) ; }
}
public TraceLevel TraceLevel
{
2016-10-24 04:13:58 +03:00
get { return this . Trace . ToTraceLevel ( ) ; }
2015-04-10 16:05:55 +03:00
}
}
}
}