Skip to content
Snippets Groups Projects
Commit 9f71d5cf authored by Damon Kohler's avatar Damon Kohler
Browse files

merge

parents cac4ed2f d3f6b734
No related branches found
No related tags found
No related merge requests found
......@@ -175,6 +175,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
signalOnShutdown(timeout, unit);
incomingMessageQueue.shutdown();
tcpClientManager.shutdown();
subscriberListeners.shutdown();
}
@Override
......
......@@ -60,9 +60,8 @@ public class SubscriberFactory {
@SuppressWarnings("unchecked")
public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration,
MessageDeserializer<T> messageDeserializer) {
GraphName topicName = topicDeclaration.getName();
synchronized (mutex) {
GraphName topicName = topicDeclaration.getName();
if (topicParticipantManager.hasSubscriber(topicName)) {
return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName);
} else {
......
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.ros.internal.transport.tcp;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.util.Timer;
import java.util.TimerTask;
/**
* Automatically reconnects when a {@link Channel} is closed.
*
* @author damonkohler@google.com (Damon Kohler)
*/
public class RetryingConnectionHandler extends AbstractNamedChannelHandler {
private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(RetryingConnectionHandler.class);
private static final String CONNECTION_REFUSED = "Connection refused";
// TODO(damonkohler): Allow the TcpClientConnection to alter the
// reconnect strategy (e.g. binary backoff, faster retries for tests, etc.)
private static final long RECONNECT_DELAY = 1000;
private final TcpClientConnection tcpClientConnection;
private final Timer timer;
public RetryingConnectionHandler(TcpClientConnection tcpClientConnection) {
this.tcpClientConnection = tcpClientConnection;
this.timer = new Timer();
}
@Override
public String getName() {
return "RetryingConnectionHandler";
}
@Override
public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
tcpClientConnection.setChannel(null);
if (DEBUG) {
if (tcpClientConnection.isDefunct()) {
log.info("Connection defunct: " + tcpClientConnection.getName());
}
}
if (tcpClientConnection.isPersistent() && !tcpClientConnection.isDefunct()) {
if (DEBUG) {
log.info("Connection closed, will reconnect: " + tcpClientConnection.getName());
}
timer.schedule(new TimerTask() {
@Override
public void run() {
SocketAddress remoteAddress = tcpClientConnection.getRemoteAddress();
if (DEBUG) {
log.info("Reconnecting: " + tcpClientConnection.getName());
}
ChannelFuture future =
tcpClientConnection.getBootstrap().connect(remoteAddress).awaitUninterruptibly();
if (future.isSuccess()) {
tcpClientConnection.setChannel(future.getChannel());
if (DEBUG) {
log.info("Reconnect successful: " + tcpClientConnection.getName());
}
} else {
if (DEBUG) {
log.error("Reconnect failed: " + tcpClientConnection.getName(), future.getCause());
}
// TODO(damonkohler): Is there a better way to check for connection
// refused?
if (future.getCause() instanceof ConnectException
&& future.getCause().getMessage().equals(CONNECTION_REFUSED)) {
if (DEBUG) {
log.error(
"Connection refused, marking as defunct: " + tcpClientConnection.getName(),
future.getCause());
}
// TODO(damonkohler): Add a listener so that publishers and
// subscribers can be notified when they lose a connection.
tcpClientConnection.setDefunct(true);
}
}
}
}, RECONNECT_DELAY);
} else {
if (DEBUG) {
log.info("Connection closed, will not reconnect: " + tcpClientConnection.getName());
}
}
super.channelClosed(ctx, e);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
if (DEBUG) {
log.error("Connection exception: " + tcpClientConnection.getName(), e.getCause());
}
e.getChannel().close();
super.exceptionCaught(ctx, e);
}
}
......@@ -57,7 +57,7 @@ public class TcpClient {
private final ClientBootstrap bootstrap;
private final List<NamedChannelHandler> namedChannelHandlers;
private TcpClientConnection tcpClientConnection;
private Channel channel;
public TcpClient(ChannelGroup channelGroup, Executor executor) {
this.channelGroup = channelGroup;
......@@ -86,8 +86,7 @@ public class TcpClient {
this.namedChannelHandlers.addAll(namedChannelHandlers);
}
public TcpClientConnection connect(String connectionName, SocketAddress socketAddress) {
tcpClientConnection = new TcpClientConnection(connectionName, bootstrap, socketAddress);
public Channel connect(String connectionName, SocketAddress socketAddress) {
TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(channelGroup) {
@Override
public ChannelPipeline getPipeline() {
......@@ -95,17 +94,13 @@ public class TcpClient {
for (NamedChannelHandler namedChannelHandler : namedChannelHandlers) {
pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler);
}
RetryingConnectionHandler retryingConnectionHandler =
tcpClientConnection.getRetryingConnectionHandler();
pipeline.addLast(retryingConnectionHandler.getName(), retryingConnectionHandler);
return pipeline;
}
};
bootstrap.setPipelineFactory(tcpClientPipelineFactory);
ChannelFuture future = bootstrap.connect(socketAddress).awaitUninterruptibly();
if (future.isSuccess()) {
Channel channel = future.getChannel();
tcpClientConnection.setChannel(channel);
channel = future.getChannel();
if (DEBUG) {
log.info("Connected to: " + socketAddress);
}
......@@ -113,18 +108,12 @@ public class TcpClient {
// We expect the first connection to succeed. If not, fail fast.
throw new RosRuntimeException("Connection exception: " + socketAddress, future.getCause());
}
return tcpClientConnection;
return channel;
}
public ChannelFuture write(ChannelBuffer buffer) {
Preconditions.checkNotNull(tcpClientConnection);
return tcpClientConnection.write(buffer);
}
public void shutdown() {
if (tcpClientConnection != null) {
tcpClientConnection.setPersistent(false);
tcpClientConnection.setChannel(null);
}
Preconditions.checkNotNull(channel);
Preconditions.checkNotNull(buffer);
return channel.write(buffer);
}
}
\ No newline at end of file
/*
* Copyright (C) 2011 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.ros.internal.transport.tcp;
import com.google.common.base.Preconditions;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import java.net.SocketAddress;
/**
* @author damonkohler@google.com (Damon Kohler)
*/
public class TcpClientConnection {
private final String name;
private final ClientBootstrap bootstrap;
private final SocketAddress remoteAddress;
private final RetryingConnectionHandler retryingConnectionHandler;
/**
* {@code true} if this client connection should reconnect when disconnected.
*/
private boolean persistent;
/**
* {@code true} if this connection is defunct (e.g. received a connection
* refused error)
*/
private boolean defunct;
/**
* This connection's {@link Channel}. May be {@code null} if we're not
* currently connected.
*/
private Channel channel;
/**
* @param bootstrap
* the {@link ClientBootstrap} instance to use when reconnecting
* @param remoteAddress
* the {@link SocketAddress} to reconnect to
*/
public TcpClientConnection(String name, ClientBootstrap bootstrap, SocketAddress remoteAddress) {
this.name = name;
this.bootstrap = bootstrap;
this.remoteAddress = remoteAddress;
retryingConnectionHandler = new RetryingConnectionHandler(this);
persistent = true;
defunct = false;
}
/**
* @return the {@link RetryingConnectionHandler} for this
* {@link TcpClientConnection}
*/
public RetryingConnectionHandler getRetryingConnectionHandler() {
return retryingConnectionHandler;
}
/**
* @return the {@link ClientBootstrap} instance used to reconnect
*/
public ClientBootstrap getBootstrap() {
return bootstrap;
}
/**
* @return the {@link SocketAddress} to reconnect to
*/
public SocketAddress getRemoteAddress() {
return remoteAddress;
}
/**
* @param persistent
* {@code true} if this client connection should reconnect when
* disconnected
*/
public void setPersistent(boolean persistent) {
this.persistent = persistent;
}
/**
* @return {@code true} if this client connection should reconnect when
* disconnected
*/
public boolean isPersistent() {
return persistent;
}
/**
* @see Channel#write
*/
public ChannelFuture write(ChannelBuffer buffer) {
Preconditions.checkNotNull(channel, "Not connected.");
return channel.write(buffer);
}
/**
* @param channel
* this connection's {@link Channel} or {@code null} if we're not
* currently connected
*/
void setChannel(Channel channel) {
this.channel = channel;
}
/**
* @return the name of this connection (e.g. Subscriber</topic/foo>)
*/
public String getName() {
return name;
}
/**
* @return {@code true} if this connection is defunct (e.g. received a
* connection refused error)
*/
public boolean isDefunct() {
return defunct;
}
/**
* @param defunct
* {@code true} if this connection is defunct (e.g. received a
* connection refused error)
*/
public void setDefunct(boolean defunct) {
this.defunct = defunct;
}
}
......@@ -76,14 +76,11 @@ public class TcpClientManager {
* {@link Channel}s.
*/
public void shutdown() {
for (TcpClient tcpClient : tcpClients) {
tcpClient.shutdown();
}
channelGroup.close().awaitUninterruptibly();
tcpClients.clear();
// Not calling channelFactory.releaseExternalResources() or
// bootstrap.releaseExternalResources() since only external resources are
// the ExecutorService and control of that must remain with the overall
// We don't call channelFactory.releaseExternalResources() or
// bootstrap.releaseExternalResources() since the only external resource is
// the ExecutorService which must remain in the control of the overall
// application.
}
}
......@@ -17,7 +17,6 @@
package org.ros.internal.transport;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import org.apache.commons.logging.Log;
......@@ -31,7 +30,6 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.junit.After;
......@@ -198,13 +196,6 @@ public class MessageQueueIntegrationTest {
assertTrue(secondLatch.await(3, TimeUnit.SECONDS));
}
private void expectNoMessages() throws InterruptedException {
CountDownLatch firstLatch = expectMessage(firstIncomingMessageQueue);
CountDownLatch secondLatch = expectMessage(secondIncomingMessageQueue);
assertFalse(firstLatch.await(3, TimeUnit.SECONDS));
assertFalse(secondLatch.await(3, TimeUnit.SECONDS));
}
@Test
public void testSendAndReceiveMessage() throws InterruptedException {
startRepeatingPublisher();
......@@ -261,36 +252,4 @@ public class MessageQueueIntegrationTest {
outgoingMessageQueue.shutdown();
outgoingMessageQueue.add(expectedMessage);
}
@Test
public void testReconnect() throws InterruptedException {
startRepeatingPublisher();
Channel serverChannel = buildServerChannel();
connect(firstTcpClientManager, serverChannel);
connect(secondTcpClientManager, serverChannel);
expectMessages();
// Disconnect the outgoing queue's incoming connections.
ChannelGroupFuture future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectMessages();
// Disconnect the outgoing queue's incoming connections again to see that
// retries work more than once.
future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectMessages();
// Shutdown to check that we will not reconnect.
firstTcpClientManager.shutdown();
secondTcpClientManager.shutdown();
firstIncomingMessageQueue.shutdown();
secondIncomingMessageQueue.shutdown();
future = outgoingMessageQueue.getChannelGroup().close();
assertTrue(future.await(1, TimeUnit.SECONDS));
assertTrue(future.isCompleteSuccess());
expectNoMessages();
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment