Receiver connection errors (ex: container move) - client fixes

follow-ups:
1) fix sync blocking call - msgFactory.getConnectionAsync()
2) find if reactor.quiesced state is reliable or not
3) implement the same stuff for sendAPI
This commit is contained in:
Sreeram Garlapati 2016-02-16 00:02:01 -08:00
Родитель f7cf851cb7
Коммит 8d76f04b82
4 изменённых файлов: 150 добавлений и 71 удалений

Просмотреть файл

@ -1,5 +1,6 @@
package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.time.*;
import java.time.temporal.*;
import java.util.*;
@ -113,7 +114,7 @@ public class MessageReceiver extends ClientEntity
this.dateTime = dateTime;
}
this.receiveLink = this.createReceiveLink();
this.receiveLink = this.createReceiveLink(false);
this.linkOpen = new WorkItem<MessageReceiver>(new CompletableFuture<MessageReceiver>(), this.operationTimeout);
this.scheduleLinkOpenTimeout(this.linkOpen.getTimeoutTracker());
@ -157,7 +158,7 @@ public class MessageReceiver extends ClientEntity
*/
public CompletableFuture<Collection<Message>> receive()
{
if (this.receiveLink.getRemoteState() == EndpointState.CLOSED)
if (this.receiveLink.getLocalState() == EndpointState.CLOSED)
{
this.scheduleRecreate(Duration.ofSeconds(0));
}
@ -217,6 +218,7 @@ public class MessageReceiver extends ClientEntity
{
this.lastCommunicatedAt = Instant.now();
this.linkOpen.getWork().complete(this);
this.underlyingFactory.links.add(this.receiveLink);
}
else
{
@ -366,7 +368,7 @@ public class MessageReceiver extends ClientEntity
}
}
private Receiver createReceiveLink()
private Receiver createReceiveLink(boolean isConnectionAsync)
{
Source source = new Source();
source.setAddress(receivePath);
@ -397,7 +399,18 @@ public class MessageReceiver extends ClientEntity
Map<Symbol, UnknownDescribedType> filterMap = Collections.singletonMap(AmqpConstants.StringFilter, filter);
source.setFilter(filterMap);
Connection connection = this.underlyingFactory.getConnection();
Connection connection = null;
try {
connection = !isConnectionAsync ? this.underlyingFactory.getConnection()
: this.underlyingFactory.getConnectionAsync().get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
Session ssn = connection.session();
String receiveLinkName = this.getClientId();
@ -417,7 +430,7 @@ public class MessageReceiver extends ClientEntity
ssn.open();
receiver.open();
return receiver;
}
@ -426,7 +439,7 @@ public class MessageReceiver extends ClientEntity
*/
private void sendFlow(int credits)
{
if (this.receiveLink.getRemoteState() != EndpointState.CLOSED)
if (this.receiveLink.getLocalState() != EndpointState.CLOSED)
{
int currentPingFlow = this.pingFlowCount.get();
if (currentPingFlow > 0)
@ -460,7 +473,7 @@ public class MessageReceiver extends ClientEntity
private void sendPingFlow()
{
if (this.receiveLink.getRemoteState() != EndpointState.CLOSED)
if (this.receiveLink.getLocalState() != EndpointState.CLOSED)
{
if (Instant.now().isAfter(this.lastCommunicatedAt.plus(ClientConstants.AmqpLinkDetachTimeoutInMin, ChronoUnit.DAYS))
&& this.pingFlowCount.get() < MessageReceiver.PingFlowThreshold)
@ -500,14 +513,20 @@ public class MessageReceiver extends ClientEntity
@Override
public void run()
{
if (MessageReceiver.this.receiveLink.getRemoteState() != EndpointState.CLOSED)
if (MessageReceiver.this.receiveLink.getLocalState() != EndpointState.CLOSED)
{
return;
}
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink();
MessageReceiver.this.receiveLink = MessageReceiver.this.createReceiveLink(true);
ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this);
BaseHandler.setHandler(MessageReceiver.this.receiveLink, handler);
synchronized (MessageReceiver.this.linkCreateLock)
{
MessageReceiver.this.linkCreateScheduled = false;
}
MessageReceiver.this.underlyingFactory.getRetryPolicy().incrementRetryCount(MessageReceiver.this.getClientId());
}
},

Просмотреть файл

@ -3,16 +3,19 @@ package com.microsoft.azure.servicebus;
import java.io.IOException;
import java.nio.channels.*;
import java.time.Duration;
import java.util.LinkedList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.*;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.Handler;
import org.apache.qpid.proton.reactor.*;
import com.microsoft.azure.servicebus.amqp.ConnectionHandler;
import com.microsoft.azure.servicebus.amqp.ReactorHandler;
import com.microsoft.azure.servicebus.amqp.*;
/**
* Abstracts all amqp related details and exposes AmqpConnection object
@ -28,7 +31,6 @@ public class MessagingFactory extends ClientEntity
private Reactor reactor;
private Thread reactorThread;
private final Object reactorLock = new Object();
private final Object connectionLock = new Object();
private ConnectionHandler connectionHandler;
@ -38,6 +40,9 @@ public class MessagingFactory extends ClientEntity
private Duration operationTimeout;
private RetryPolicy retryPolicy;
private CompletableFuture<MessagingFactory> open;
private CompletableFuture<Connection> openConnection;
public LinkedList<Link> links;
/**
* @param reactor parameter reactor is purely for testing purposes and the SDK code should always set it to null
@ -46,37 +51,33 @@ public class MessagingFactory extends ClientEntity
{
super("MessagingFactory" + UUID.randomUUID().toString());
this.startReactor();
this.startReactor(new ReactorHandler());
this.operationTimeout = builder.getOperationTimeout();
this.retryPolicy = builder.getRetryPolicy();
this.links = new LinkedList<Link>();
}
private void createConnection(ConnectionStringBuilder builder)
{
synchronized (this.reactorLock)
{
assert this.reactor != null;
ConnectionHandler connectionHandler = new ConnectionHandler(this, builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey());
this.waitingConnectionOpen = true;
this.connection = reactor.connection(connectionHandler);
this.open = new CompletableFuture<MessagingFactory>();
}
assert this.reactor != null;
this.connectionHandler = new ConnectionHandler(this,
builder.getEndpoint().getHost(), builder.getSasKeyName(), builder.getSasKey());
this.waitingConnectionOpen = true;
this.connection = reactor.connection(this.connectionHandler);
this.open = new CompletableFuture<MessagingFactory>();
}
private void startReactor() throws IOException
private void startReactor(ReactorHandler reactorHandler) throws IOException
{
synchronized (this.reactorLock)
{
if (this.reactor == null)
{
this.reactor = Proton.reactor(new ReactorHandler());
this.reactorThread = new Thread(new RunReactor(this, this.reactor));
this.reactorThread.start();
}
}
// if (this.reactor == null || this.reactor.quiesced())
// {
this.reactor = Proton.reactor(reactorHandler);
this.reactorThread = new Thread(new RunReactor(this, this.reactor));
this.reactorThread.start();
// }
}
// Todo: async
@ -86,10 +87,27 @@ public class MessagingFactory extends ClientEntity
{
synchronized (this.connectionLock)
{
if (this.connection.getLocalState() == EndpointState.CLOSED && !this.waitingConnectionOpen)
if (this.connection.getLocalState() == EndpointState.CLOSED
&& !this.waitingConnectionOpen)
{
this.connection.free();
this.connection = reactor.connection(connectionHandler);
try {
this.startReactor(new ReactorHandler() {
@Override
public void onReactorInit(Event e)
{
super.onReactorInit(e);
Reactor reactor = e.getReactor();
MessagingFactory.this.connection = reactor.connection(MessagingFactory.this.connectionHandler);
}
});
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
this.openConnection = new CompletableFuture<Connection>();
this.waitingConnectionOpen = true;
}
}
@ -98,6 +116,12 @@ public class MessagingFactory extends ClientEntity
return this.connection;
}
CompletableFuture<Connection> getConnectionAsync()
{
this.getConnection();
return this.openConnection;
}
public Duration getOperationTimeout()
{
return this.operationTimeout;
@ -123,13 +147,41 @@ public class MessagingFactory extends ClientEntity
synchronized (this.connectionLock)
{
this.waitingConnectionOpen = false;
if (exception == null)
}
if (exception == null)
{
this.open.complete(this);
if(this.openConnection != null)
{
this.open.complete(this);
this.openConnection.complete(this.connection);
}
else
}
else
{
this.open.completeExceptionally(exception);
if (this.openConnection != null)
{
this.open.completeExceptionally(exception);
this.openConnection.completeExceptionally(exception);
}
}
}
public void onConnectionError(ErrorCondition error)
{
this.connection.close();
for (Link link : this.links)
{
Receiver receiver = (Receiver) link;
if (receiver!=null)
{
Handler handler = BaseHandler.getHandler(receiver);
if (handler != null && handler instanceof ReceiveLinkHandler)
{
ReceiveLinkHandler recvLinkHandler = (ReceiveLinkHandler) handler;
recvLinkHandler.processOnClose(receiver, error);
}
}
}
}

Просмотреть файл

@ -29,11 +29,21 @@ public final class ConnectionHandler extends BaseHandler
public ConnectionHandler(final MessagingFactory messagingFactory, final String hostname, final String username, final String password)
{
add(new Handshaker());
this.hostname = hostname;
this.username = username;
this.password = password;
this.messagingFactory = messagingFactory;
}
@Override
public void onConnectionInit(Event event)
{
Connection connection = event.getConnection();
connection.setHostname(this.hostname + ":" + ClientConstants.AmqpsPort);
connection.setContainer(UUID.randomUUID().toString());
connection.open();
}
@Override
public void onConnectionBound(Event event)
@ -74,15 +84,8 @@ public final class ConnectionHandler extends BaseHandler
TRACE_LOGGER.log(Level.WARNING, "Connection.onTransportError: hostname[" + event.getConnection().getHostname() + "Error (no description returned).");
}
}
}
@Override
public void onConnectionInit(Event event)
{
Connection connection = event.getConnection();
connection.setHostname(this.hostname + ":" + ClientConstants.AmqpsPort);
connection.setContainer(UUID.randomUUID().toString());
connection.open();
this.messagingFactory.onConnectionError(condition);
}
@Override

Просмотреть файл

@ -96,32 +96,37 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
Link link = event.getLink();
if (link instanceof Receiver)
{
link.close();
ErrorCondition condition = link.getRemoteCondition();
if (condition != null)
{
if (condition.getCondition() == null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "recvLink.onLinkRemoteClose: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
}
this.msgReceiver.onClose();
return;
}
if(TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteClose: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
}
}
this.msgReceiver.onError(condition);
this.processOnClose((Receiver) link, condition);
}
}
public void processOnClose(Receiver link, ErrorCondition condition)
{
link.close();
if (condition != null)
{
if (condition.getCondition() == null)
{
if(TRACE_LOGGER.isLoggable(Level.FINE))
{
TRACE_LOGGER.log(Level.FINE, "recvLink.onLinkRemoteClose: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
}
this.msgReceiver.onClose();
return;
}
if(TRACE_LOGGER.isLoggable(Level.WARNING))
{
TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteClose: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
}
}
this.msgReceiver.onError(condition);
}
@Override
public void onLinkRemoteDetach(Event event)
@ -133,7 +138,7 @@ public final class ReceiveLinkHandler extends BaseLinkHandler
if (condition != null)
{
if (TRACE_LOGGER.isLoggable(Level.WARNING))
TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteDetach: name["+link.getName()+"] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
TRACE_LOGGER.log(Level.WARNING, "recvLink.onLinkRemoteDetach: name[" + link.getName() + "] : ErrorCondition[" + condition.getCondition() + ", " + condition.getDescription() + "]");
}
link.close();