This commit is contained in:
xinchen 2019-06-20 15:40:02 -07:00
Родитель 58f705f5ff
Коммит abf0504cc9
5 изменённых файлов: 90 добавлений и 18 удалений

Просмотреть файл

@ -488,21 +488,22 @@ namespace Amqp
internal virtual void OnBegin(ushort remoteChannel, Begin begin)
{
this.ValidateChannel(remoteChannel);
lock (this.ThisLock)
{
if (remoteChannel > this.channelMax)
{
throw new AmqpException(ErrorCode.NotAllowed,
Fx.Format(SRAmqp.AmqpHandleExceeded, this.channelMax + 1));
}
Session session = this.GetSession(this.localSessions, begin.RemoteChannel);
session.OnBegin(remoteChannel, begin);
int count = this.remoteSessions.Length;
if (count - 1 < remoteChannel)
{
int size = Math.Min(count * 2, this.channelMax + 1);
int size = count * 2;
while (size - 1 < remoteChannel)
{
size *= 2;
}
Session[] expanded = new Session[size];
Array.Copy(this.remoteSessions, expanded, count);
this.remoteSessions = expanded;
@ -519,6 +520,15 @@ namespace Amqp
}
}
internal void ValidateChannel(ushort channel)
{
if (channel > this.channelMax)
{
throw new AmqpException(ErrorCode.NotAllowed,
Fx.Format(SRAmqp.AmqpHandleExceeded, this.channelMax + 1));
}
}
void OnEnd(ushort remoteChannel, End end)
{
Session session = this.GetSession(this.remoteSessions, remoteChannel);

Просмотреть файл

@ -59,6 +59,8 @@ namespace Amqp.Listener
internal override void OnBegin(ushort remoteChannel, Begin begin)
{
this.ValidateChannel(remoteChannel);
// this sends a begin to the remote peer
Begin local = new Begin()
{
@ -69,11 +71,6 @@ namespace Amqp.Listener
HandleMax = (uint)(this.listener.AMQP.MaxLinksPerSession - 1)
};
if (begin.HandleMax < local.HandleMax)
{
local.HandleMax = begin.HandleMax;
}
var session = new ListenerSession(this, local);
// this updates the local session state

Просмотреть файл

@ -33,6 +33,8 @@ namespace Amqp.Listener
internal override void OnAttach(Attach attach)
{
this.ValidateHandle(attach.Handle);
var connection = (ListenerConnection)this.Connection;
Link link = connection.Listener.Container.CreateLink(connection, this, attach);
this.AddRemoteLink(attach.Handle, link);

Просмотреть файл

@ -382,11 +382,7 @@ namespace Amqp
internal virtual void OnAttach(Attach attach)
{
if (attach.Handle > this.handleMax)
{
throw new AmqpException(ErrorCode.NotAllowed,
Fx.Format(SRAmqp.AmqpHandleExceeded, this.handleMax + 1));
}
this.ValidateHandle(attach.Handle);
Link link = null;
lock (this.ThisLock)
@ -419,7 +415,12 @@ namespace Amqp
int count = this.remoteLinks.Length;
if (count - 1 < remoteHandle)
{
int size = (int)Math.Min(count * 2 - 1, this.handleMax) + 1;
int size = count * 2;
while (size - 1 < remoteHandle)
{
size *= 2;
}
Link[] expanded = new Link[size];
Array.Copy(this.remoteLinks, expanded, count);
this.remoteLinks = expanded;
@ -447,6 +448,15 @@ namespace Amqp
};
}
internal void ValidateHandle(uint handle)
{
if (handle > this.handleMax)
{
throw new AmqpException(ErrorCode.NotAllowed,
Fx.Format(SRAmqp.AmqpHandleExceeded, this.handleMax + 1));
}
}
internal Delivery RemoveDeliveries(Link link)
{
LinkedList list = null;

Просмотреть файл

@ -99,6 +99,59 @@ namespace Test.Amqp
}).Unwrap().GetAwaiter().GetResult();
}
[TestMethod]
public void RemoteSessionChannelTest()
{
this.testListener.RegisterTarget(TestPoint.Begin, (stream, channel, fields) =>
{
// send a large channel number to test if client can grow the table correctly
TestListener.FRM(stream, 0x11UL, 0, (ushort)(channel + 100), channel, 0u, 100u, 100u, 8u);
return TestOutcome.Stop;
});
string testName = "ConnectionChannelTest";
Open open = new Open() { ContainerId = testName, HostName = "localhost", MaxFrameSize = 2048 };
Connection connection = new Connection(this.address, null, open, null);
for (int i = 0; i < 10; i++)
{
Session session = new Session(connection);
}
connection.Close();
Assert.IsTrue(connection.Error == null, "connection has error!" + connection.Error);
}
[TestMethod]
public void RemoteLinkHandleTest()
{
this.testListener.RegisterTarget(TestPoint.Begin, (stream, channel, fields) =>
{
TestListener.FRM(stream, 0x11UL, 0, channel, channel, 0u, 100u, 100u, 8000u);
return TestOutcome.Stop;
});
this.testListener.RegisterTarget(TestPoint.Attach, (stream, channel, fields) =>
{
uint handle = (uint)fields[1];
fields[1] = handle + 100u;
return TestOutcome.Continue;
});
string testName = "RemoteLinkHandleTest";
Open open = new Open() { ContainerId = testName, HostName = "localhost", MaxFrameSize = 2048 };
Connection connection = new Connection(this.address, null, open, null);
Session session = new Session(connection);
for (int i = 0; i < 10; i++)
{
SenderLink sender = new SenderLink(session, "sender-" + i, "any");
}
connection.Close();
Assert.IsTrue(connection.Error == null, "connection has error!" + connection.Error);
}
[TestMethod]
public void ConnectionRemoteIdleTimeoutTest()
{