Skip to content
Snippets Groups Projects
Commit d3f6b734 authored by Damon Kohler's avatar Damon Kohler
Browse files

Removes low-level connection retry logic. This will be replaced with higher...

Removes low-level connection retry logic. This will be replaced with higher level retries (e.g. subscribers will attempt to resubscribe).
parent 378ee8c3
Branches ivy-repo
No related tags found
No related merge requests found
...@@ -175,6 +175,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -175,6 +175,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
signalOnShutdown(timeout, unit); signalOnShutdown(timeout, unit);
incomingMessageQueue.shutdown(); incomingMessageQueue.shutdown();
tcpClientManager.shutdown(); tcpClientManager.shutdown();
subscriberListeners.shutdown();
} }
@Override @Override
......
...@@ -60,9 +60,8 @@ public class SubscriberFactory { ...@@ -60,9 +60,8 @@ public class SubscriberFactory {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration, public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageDeserializer<T> messageDeserializer) { MessageDeserializer<T> messageDeserializer) {
GraphName topicName = topicDeclaration.getName();
synchronized (mutex) { synchronized (mutex) {
GraphName topicName = topicDeclaration.getName();
if (topicParticipantManager.hasSubscriber(topicName)) { if (topicParticipantManager.hasSubscriber(topicName)) {
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
} else { } else {
......
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.ros.internal.transport.tcp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Timer;
import java.util.TimerTask;
/**
* Automatically reconnects when a {@link Channel} is closed.
*
* @author damonkohler@google.com (Damon Kohler)
*/
public class RetryingConnectionHandler extends AbstractNamedChannelHandler {
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(RetryingConnectionHandler.class);
private static final String CONNECTION_REFUSED = "Connection refused";
// TODO(damonkohler): Allow the TcpClientConnection to alter the
// reconnect strategy (e.g. binary backoff, faster retries for tests, etc.)
private static final long RECONNECT_DELAY = 1000;
private final TcpClientConnection tcpClientConnection;
private final Timer timer;
public RetryingConnectionHandler(TcpClientConnection tcpClientConnection) {
this.tcpClientConnection = tcpClientConnection;
this.timer = new Timer();
}
@Override
public String getName() {
return "RetryingConnectionHandler";
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
tcpClientConnection.setChannel(null);
if (DEBUG) {
if (tcpClientConnection.isDefunct()) {
log.info("Connection defunct: " + tcpClientConnection.getName());
}
}
if (tcpClientConnection.isPersistent() && !tcpClientConnection.isDefunct()) {
if (DEBUG) {
log.info("Connection closed, will reconnect: " + tcpClientConnection.getName());
}
timer.schedule(new TimerTask() {
@Override
public void run() {
SocketAddress remoteAddress = tcpClientConnection.getRemoteAddress();
if (DEBUG) {
log.info("Reconnecting: " + tcpClientConnection.getName());
}
ChannelFuture future =
tcpClientConnection.getBootstrap().connect(remoteAddress).awaitUninterruptibly();
if (future.isSuccess()) {
tcpClientConnection.setChannel(future.getChannel());
if (DEBUG) {
log.info("Reconnect successful: " + tcpClientConnection.getName());
}
} else {
if (DEBUG) {
log.error("Reconnect failed: " + tcpClientConnection.getName(), future.getCause());
}
// TODO(damonkohler): Is there a better way to check for connection
// refused?
if (future.getCause() instanceof ConnectException
&& future.getCause().getMessage().equals(CONNECTION_REFUSED)) {
if (DEBUG) {
log.error(
"Connection refused, marking as defunct: " + tcpClientConnection.getName(),
future.getCause());
}
// TODO(damonkohler): Add a listener so that publishers and
// subscribers can be notified when they lose a connection.
tcpClientConnection.setDefunct(true);
}
}
}
}, RECONNECT_DELAY);
} else {
if (DEBUG) {
log.info("Connection closed, will not reconnect: " + tcpClientConnection.getName());
}
}
super.channelClosed(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (DEBUG) {
log.error("Connection exception: " + tcpClientConnection.getName(), e.getCause());
}
e.getChannel().close();
super.exceptionCaught(ctx, e);
}
}
...@@ -57,7 +57,7 @@ public class TcpClient { ...@@ -57,7 +57,7 @@ public class TcpClient {
private final ClientBootstrap bootstrap; private final ClientBootstrap bootstrap;
private final List<NamedChannelHandler> namedChannelHandlers; private final List<NamedChannelHandler> namedChannelHandlers;
private TcpClientConnection tcpClientConnection; private Channel channel;
public TcpClient(ChannelGroup channelGroup, Executor executor) { public TcpClient(ChannelGroup channelGroup, Executor executor) {
this.channelGroup = channelGroup; this.channelGroup = channelGroup;
...@@ -86,8 +86,7 @@ public class TcpClient { ...@@ -86,8 +86,7 @@ public class TcpClient {
this.namedChannelHandlers.addAll(namedChannelHandlers); this.namedChannelHandlers.addAll(namedChannelHandlers);
} }
public TcpClientConnection connect(String connectionName, SocketAddress socketAddress) { public Channel connect(String connectionName, SocketAddress socketAddress) {
tcpClientConnection = new TcpClientConnection(connectionName, bootstrap, socketAddress);
TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(channelGroup) { TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(channelGroup) {
@Override @Override
public ChannelPipeline getPipeline() { public ChannelPipeline getPipeline() {
...@@ -95,17 +94,13 @@ public class TcpClient { ...@@ -95,17 +94,13 @@ public class TcpClient {
for (NamedChannelHandler namedChannelHandler : namedChannelHandlers) { for (NamedChannelHandler namedChannelHandler : namedChannelHandlers) {
pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler); pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler);
} }
RetryingConnectionHandler retryingConnectionHandler =
tcpClientConnection.getRetryingConnectionHandler();
pipeline.addLast(retryingConnectionHandler.getName(), retryingConnectionHandler);
return pipeline; return pipeline;
} }
}; };
bootstrap.setPipelineFactory(tcpClientPipelineFactory); bootstrap.setPipelineFactory(tcpClientPipelineFactory);
ChannelFuture future = bootstrap.connect(socketAddress).awaitUninterruptibly(); ChannelFuture future = bootstrap.connect(socketAddress).awaitUninterruptibly();
if (future.isSuccess()) { if (future.isSuccess()) {
Channel channel = future.getChannel(); channel = future.getChannel();
tcpClientConnection.setChannel(channel);
if (DEBUG) { if (DEBUG) {
log.info("Connected to: " + socketAddress); log.info("Connected to: " + socketAddress);
} }
...@@ -113,18 +108,12 @@ public class TcpClient { ...@@ -113,18 +108,12 @@ public class TcpClient {
// We expect the first connection to succeed. If not, fail fast. // We expect the first connection to succeed. If not, fail fast.
throw new RosRuntimeException("Connection exception: " + socketAddress, future.getCause()); throw new RosRuntimeException("Connection exception: " + socketAddress, future.getCause());
} }
return tcpClientConnection; return channel;
} }
public ChannelFuture write(ChannelBuffer buffer) { public ChannelFuture write(ChannelBuffer buffer) {
Preconditions.checkNotNull(tcpClientConnection); Preconditions.checkNotNull(channel);
return tcpClientConnection.write(buffer); Preconditions.checkNotNull(buffer);
} return channel.write(buffer);
public void shutdown() {
if (tcpClientConnection != null) {
tcpClientConnection.setPersistent(false);
tcpClientConnection.setChannel(null);
}
} }
} }
\ No newline at end of file
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.ros.internal.transport.tcp;
import com.google.common.base.Preconditions;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import java.net.SocketAddress;
/**
* @author damonkohler@google.com (Damon Kohler)
*/
public class TcpClientConnection {
private final String name;
private final ClientBootstrap bootstrap;
private final SocketAddress remoteAddress;
private final RetryingConnectionHandler retryingConnectionHandler;
/**
* {@code true} if this client connection should reconnect when disconnected.
*/
private boolean persistent;
/**
* {@code true} if this connection is defunct (e.g. received a connection
* refused error)
*/
private boolean defunct;
/**
* This connection's {@link Channel}. May be {@code null} if we're not
* currently connected.
*/
private Channel channel;
/**
* @param bootstrap
* the {@link ClientBootstrap} instance to use when reconnecting
* @param remoteAddress
* the {@link SocketAddress} to reconnect to
*/
public TcpClientConnection(String name, ClientBootstrap bootstrap, SocketAddress remoteAddress) {
this.name = name;
this.bootstrap = bootstrap;
this.remoteAddress = remoteAddress;
retryingConnectionHandler = new RetryingConnectionHandler(this);
persistent = true;
defunct = false;
}
/**
* @return the {@link RetryingConnectionHandler} for this
* {@link TcpClientConnection}
*/
public RetryingConnectionHandler getRetryingConnectionHandler() {
return retryingConnectionHandler;
}
/**
* @return the {@link ClientBootstrap} instance used to reconnect
*/
public ClientBootstrap getBootstrap() {
return bootstrap;
}
/**
* @return the {@link SocketAddress} to reconnect to
*/
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
/**
* @param persistent
* {@code true} if this client connection should reconnect when
* disconnected
*/
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
/**
* @return {@code true} if this client connection should reconnect when
* disconnected
*/
public boolean isPersistent() {
return persistent;
}
/**
* @see Channel#write
*/
public ChannelFuture write(ChannelBuffer buffer) {
Preconditions.checkNotNull(channel, "Not connected.");
return channel.write(buffer);
}
/**
* @param channel
* this connection's {@link Channel} or {@code null} if we're not
* currently connected
*/
void setChannel(Channel channel) {
this.channel = channel;
}
/**
* @return the name of this connection (e.g. Subscriber</topic/foo>)
*/
public String getName() {
return name;
}
/**
* @return {@code true} if this connection is defunct (e.g. received a
* connection refused error)
*/
public boolean isDefunct() {
return defunct;
}
/**
* @param defunct
* {@code true} if this connection is defunct (e.g. received a
* connection refused error)
*/
public void setDefunct(boolean defunct) {
this.defunct = defunct;
}
}
...@@ -76,14 +76,11 @@ public class TcpClientManager { ...@@ -76,14 +76,11 @@ public class TcpClientManager {
* {@link Channel}s. * {@link Channel}s.
*/ */
public void shutdown() { public void shutdown() {
for (TcpClient tcpClient : tcpClients) {
tcpClient.shutdown();
}
channelGroup.close().awaitUninterruptibly(); channelGroup.close().awaitUninterruptibly();
tcpClients.clear(); tcpClients.clear();
// Not calling channelFactory.releaseExternalResources() or // We don't call channelFactory.releaseExternalResources() or
// bootstrap.releaseExternalResources() since only external resources are // bootstrap.releaseExternalResources() since the only external resource is
// the ExecutorService and control of that must remain with the overall // the ExecutorService which must remain in the control of the overall
// application. // application.
} }
} }
...@@ -17,7 +17,6 @@ ...@@ -17,7 +17,6 @@
package org.ros.internal.transport; package org.ros.internal.transport;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
...@@ -31,7 +30,6 @@ import org.jboss.netty.channel.ChannelStateEvent; ...@@ -31,7 +30,6 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup; import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.junit.After; import org.junit.After;
...@@ -198,13 +196,6 @@ public class MessageQueueIntegrationTest { ...@@ -198,13 +196,6 @@ public class MessageQueueIntegrationTest {
assertTrue(secondLatch.await(3, TimeUnit.SECONDS)); assertTrue(secondLatch.await(3, TimeUnit.SECONDS));
} }
private void expectNoMessages() throws InterruptedException {
CountDownLatch firstLatch = expectMessage(firstIncomingMessageQueue);
CountDownLatch secondLatch = expectMessage(secondIncomingMessageQueue);
assertFalse(firstLatch.await(3, TimeUnit.SECONDS));
assertFalse(secondLatch.await(3, TimeUnit.SECONDS));
}
@Test @Test
public void testSendAndReceiveMessage() throws InterruptedException { public void testSendAndReceiveMessage() throws InterruptedException {
startRepeatingPublisher(); startRepeatingPublisher();
...@@ -261,36 +252,4 @@ public class MessageQueueIntegrationTest { ...@@ -261,36 +252,4 @@ public class MessageQueueIntegrationTest {
outgoingMessageQueue.shutdown(); outgoingMessageQueue.shutdown();
outgoingMessageQueue.add(expectedMessage); outgoingMessageQueue.add(expectedMessage);
} }
@Test
public void testReconnect() throws InterruptedException {
startRepeatingPublisher();
Channel serverChannel = buildServerChannel();
connect(firstTcpClientManager, serverChannel);
connect(secondTcpClientManager, serverChannel);
expectMessages();
// Disconnect the outgoing queue's incoming connections.
ChannelGroupFuture future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectMessages();
// Disconnect the outgoing queue's incoming connections again to see that
// retries work more than once.
future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectMessages();
// Shutdown to check that we will not reconnect.
firstTcpClientManager.shutdown();
secondTcpClientManager.shutdown();
firstIncomingMessageQueue.shutdown();
secondIncomingMessageQueue.shutdown();
future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectNoMessages();
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment