diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index 1ec943d4bd51b367afaa7f4e72736fe442d99314..dca306973433f6056f11a9f878b91e266c25ef95 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -175,6 +175,7 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub signalOnShutdown(timeout, unit); incomingMessageQueue.shutdown(); tcpClientManager.shutdown(); + subscriberListeners.shutdown(); } @Override diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java index efb35eef27e7b2af22291d54962acbb44d1ccd86..203d500b2a7361fdefc4aef73d72cdc0b2dd395c 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/SubscriberFactory.java @@ -60,9 +60,8 @@ public class SubscriberFactory { @SuppressWarnings("unchecked") public <T> Subscriber<T> newOrExisting(TopicDeclaration topicDeclaration, MessageDeserializer<T> messageDeserializer) { - GraphName topicName = topicDeclaration.getName(); - synchronized (mutex) { + GraphName topicName = topicDeclaration.getName(); if (topicParticipantManager.hasSubscriber(topicName)) { return (DefaultSubscriber<T>) topicParticipantManager.getSubscriber(topicName); } else { diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/RetryingConnectionHandler.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/RetryingConnectionHandler.java deleted file mode 100644 index 1cc45dfa704d51c8564249c5a07bc2303eb1026d..0000000000000000000000000000000000000000 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/RetryingConnectionHandler.java +++ /dev/null @@ -1,123 +0,0 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.ros.internal.transport.tcp; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.ExceptionEvent; - -import java.net.ConnectException; -import java.net.SocketAddress; -import java.util.Timer; -import java.util.TimerTask; - -/** - * Automatically reconnects when a {@link Channel} is closed. - * - * @author damonkohler@google.com (Damon Kohler) - */ -public class RetryingConnectionHandler extends AbstractNamedChannelHandler { - - private static final boolean DEBUG = false; - private static final Log log = LogFactory.getLog(RetryingConnectionHandler.class); - - private static final String CONNECTION_REFUSED = "Connection refused"; - - // TODO(damonkohler): Allow the TcpClientConnection to alter the - // reconnect strategy (e.g. binary backoff, faster retries for tests, etc.) - private static final long RECONNECT_DELAY = 1000; - - private final TcpClientConnection tcpClientConnection; - private final Timer timer; - - public RetryingConnectionHandler(TcpClientConnection tcpClientConnection) { - this.tcpClientConnection = tcpClientConnection; - this.timer = new Timer(); - } - - @Override - public String getName() { - return "RetryingConnectionHandler"; - } - - @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception { - tcpClientConnection.setChannel(null); - if (DEBUG) { - if (tcpClientConnection.isDefunct()) { - log.info("Connection defunct: " + tcpClientConnection.getName()); - } - } - if (tcpClientConnection.isPersistent() && !tcpClientConnection.isDefunct()) { - if (DEBUG) { - log.info("Connection closed, will reconnect: " + tcpClientConnection.getName()); - } - timer.schedule(new TimerTask() { - @Override - public void run() { - SocketAddress remoteAddress = tcpClientConnection.getRemoteAddress(); - if (DEBUG) { - log.info("Reconnecting: " + tcpClientConnection.getName()); - } - ChannelFuture future = - tcpClientConnection.getBootstrap().connect(remoteAddress).awaitUninterruptibly(); - if (future.isSuccess()) { - tcpClientConnection.setChannel(future.getChannel()); - if (DEBUG) { - log.info("Reconnect successful: " + tcpClientConnection.getName()); - } - } else { - if (DEBUG) { - log.error("Reconnect failed: " + tcpClientConnection.getName(), future.getCause()); - } - // TODO(damonkohler): Is there a better way to check for connection - // refused? - if (future.getCause() instanceof ConnectException - && future.getCause().getMessage().equals(CONNECTION_REFUSED)) { - if (DEBUG) { - log.error( - "Connection refused, marking as defunct: " + tcpClientConnection.getName(), - future.getCause()); - } - // TODO(damonkohler): Add a listener so that publishers and - // subscribers can be notified when they lose a connection. - tcpClientConnection.setDefunct(true); - } - } - } - }, RECONNECT_DELAY); - } else { - if (DEBUG) { - log.info("Connection closed, will not reconnect: " + tcpClientConnection.getName()); - } - } - super.channelClosed(ctx, e); - } - - @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception { - if (DEBUG) { - log.error("Connection exception: " + tcpClientConnection.getName(), e.getCause()); - } - e.getChannel().close(); - super.exceptionCaught(ctx, e); - } -} diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClient.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClient.java index 8985dc6f3dd6614a92563bed0a282f4f0789d282..7b6fc62c03c68a83610c8f79e4e31aeac938f307 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClient.java +++ b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClient.java @@ -56,8 +56,8 @@ public class TcpClient { private final ChannelBufferFactory channelBufferFactory; private final ClientBootstrap bootstrap; private final List<NamedChannelHandler> namedChannelHandlers; - - private TcpClientConnection tcpClientConnection; + + private Channel channel; public TcpClient(ChannelGroup channelGroup, Executor executor) { this.channelGroup = channelGroup; @@ -86,8 +86,7 @@ public class TcpClient { this.namedChannelHandlers.addAll(namedChannelHandlers); } - public TcpClientConnection connect(String connectionName, SocketAddress socketAddress) { - tcpClientConnection = new TcpClientConnection(connectionName, bootstrap, socketAddress); + public Channel connect(String connectionName, SocketAddress socketAddress) { TcpClientPipelineFactory tcpClientPipelineFactory = new TcpClientPipelineFactory(channelGroup) { @Override public ChannelPipeline getPipeline() { @@ -95,17 +94,13 @@ public class TcpClient { for (NamedChannelHandler namedChannelHandler : namedChannelHandlers) { pipeline.addLast(namedChannelHandler.getName(), namedChannelHandler); } - RetryingConnectionHandler retryingConnectionHandler = - tcpClientConnection.getRetryingConnectionHandler(); - pipeline.addLast(retryingConnectionHandler.getName(), retryingConnectionHandler); return pipeline; } }; bootstrap.setPipelineFactory(tcpClientPipelineFactory); ChannelFuture future = bootstrap.connect(socketAddress).awaitUninterruptibly(); if (future.isSuccess()) { - Channel channel = future.getChannel(); - tcpClientConnection.setChannel(channel); + channel = future.getChannel(); if (DEBUG) { log.info("Connected to: " + socketAddress); } @@ -113,18 +108,12 @@ public class TcpClient { // We expect the first connection to succeed. If not, fail fast. throw new RosRuntimeException("Connection exception: " + socketAddress, future.getCause()); } - return tcpClientConnection; + return channel; } public ChannelFuture write(ChannelBuffer buffer) { - Preconditions.checkNotNull(tcpClientConnection); - return tcpClientConnection.write(buffer); - } - - public void shutdown() { - if (tcpClientConnection != null) { - tcpClientConnection.setPersistent(false); - tcpClientConnection.setChannel(null); - } + Preconditions.checkNotNull(channel); + Preconditions.checkNotNull(buffer); + return channel.write(buffer); } } \ No newline at end of file diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientConnection.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientConnection.java deleted file mode 100644 index 97c4afe5cdc5f685bd9507a4aebe3c91ef1a1264..0000000000000000000000000000000000000000 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientConnection.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (C) 2011 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.ros.internal.transport.tcp; - -import com.google.common.base.Preconditions; - -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; - -import java.net.SocketAddress; - -/** - * @author damonkohler@google.com (Damon Kohler) - */ -public class TcpClientConnection { - - private final String name; - private final ClientBootstrap bootstrap; - private final SocketAddress remoteAddress; - private final RetryingConnectionHandler retryingConnectionHandler; - - /** - * {@code true} if this client connection should reconnect when disconnected. - */ - private boolean persistent; - - /** - * {@code true} if this connection is defunct (e.g. received a connection - * refused error) - */ - private boolean defunct; - - /** - * This connection's {@link Channel}. May be {@code null} if we're not - * currently connected. - */ - private Channel channel; - - /** - * @param bootstrap - * the {@link ClientBootstrap} instance to use when reconnecting - * @param remoteAddress - * the {@link SocketAddress} to reconnect to - */ - public TcpClientConnection(String name, ClientBootstrap bootstrap, SocketAddress remoteAddress) { - this.name = name; - this.bootstrap = bootstrap; - this.remoteAddress = remoteAddress; - retryingConnectionHandler = new RetryingConnectionHandler(this); - persistent = true; - defunct = false; - } - - /** - * @return the {@link RetryingConnectionHandler} for this - * {@link TcpClientConnection} - */ - public RetryingConnectionHandler getRetryingConnectionHandler() { - return retryingConnectionHandler; - } - - /** - * @return the {@link ClientBootstrap} instance used to reconnect - */ - public ClientBootstrap getBootstrap() { - return bootstrap; - } - - /** - * @return the {@link SocketAddress} to reconnect to - */ - public SocketAddress getRemoteAddress() { - return remoteAddress; - } - - /** - * @param persistent - * {@code true} if this client connection should reconnect when - * disconnected - */ - public void setPersistent(boolean persistent) { - this.persistent = persistent; - } - - /** - * @return {@code true} if this client connection should reconnect when - * disconnected - */ - public boolean isPersistent() { - return persistent; - } - - /** - * @see Channel#write - */ - public ChannelFuture write(ChannelBuffer buffer) { - Preconditions.checkNotNull(channel, "Not connected."); - return channel.write(buffer); - } - - /** - * @param channel - * this connection's {@link Channel} or {@code null} if we're not - * currently connected - */ - void setChannel(Channel channel) { - this.channel = channel; - } - - /** - * @return the name of this connection (e.g. Subscriber</topic/foo>) - */ - public String getName() { - return name; - } - - /** - * @return {@code true} if this connection is defunct (e.g. received a - * connection refused error) - */ - public boolean isDefunct() { - return defunct; - } - - /** - * @param defunct - * {@code true} if this connection is defunct (e.g. received a - * connection refused error) - */ - public void setDefunct(boolean defunct) { - this.defunct = defunct; - } -} diff --git a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java index adad9a229ba152bd30ba353ed65f149c41a00036..b8670b1969cf17a4454aa6e4745672ec78f18a08 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java +++ b/rosjava/src/main/java/org/ros/internal/transport/tcp/TcpClientManager.java @@ -76,14 +76,11 @@ public class TcpClientManager { * {@link Channel}s. */ public void shutdown() { - for (TcpClient tcpClient : tcpClients) { - tcpClient.shutdown(); - } channelGroup.close().awaitUninterruptibly(); tcpClients.clear(); - // Not calling channelFactory.releaseExternalResources() or - // bootstrap.releaseExternalResources() since only external resources are - // the ExecutorService and control of that must remain with the overall + // We don't call channelFactory.releaseExternalResources() or + // bootstrap.releaseExternalResources() since the only external resource is + // the ExecutorService which must remain in the control of the overall // application. } } diff --git a/rosjava/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java b/rosjava/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java index f33bd4afd58bb2ba08e0aef6c964b557a60a8f9a..da472d2311120516dcd44637b424290cc366c68d 100644 --- a/rosjava/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java +++ b/rosjava/src/test/java/org/ros/internal/transport/MessageQueueIntegrationTest.java @@ -17,7 +17,6 @@ package org.ros.internal.transport; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.commons.logging.Log; @@ -31,7 +30,6 @@ import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.SimpleChannelHandler; import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.ChannelGroupFuture; import org.jboss.netty.channel.group.DefaultChannelGroup; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; import org.junit.After; @@ -198,13 +196,6 @@ public class MessageQueueIntegrationTest { assertTrue(secondLatch.await(3, TimeUnit.SECONDS)); } - private void expectNoMessages() throws InterruptedException { - CountDownLatch firstLatch = expectMessage(firstIncomingMessageQueue); - CountDownLatch secondLatch = expectMessage(secondIncomingMessageQueue); - assertFalse(firstLatch.await(3, TimeUnit.SECONDS)); - assertFalse(secondLatch.await(3, TimeUnit.SECONDS)); - } - @Test public void testSendAndReceiveMessage() throws InterruptedException { startRepeatingPublisher(); @@ -261,36 +252,4 @@ public class MessageQueueIntegrationTest { outgoingMessageQueue.shutdown(); outgoingMessageQueue.add(expectedMessage); } - - @Test - public void testReconnect() throws InterruptedException { - startRepeatingPublisher(); - Channel serverChannel = buildServerChannel(); - connect(firstTcpClientManager, serverChannel); - connect(secondTcpClientManager, serverChannel); - expectMessages(); - - // Disconnect the outgoing queue's incoming connections. - ChannelGroupFuture future = outgoingMessageQueue.getChannelGroup().close(); - assertTrue(future.await(1, TimeUnit.SECONDS)); - assertTrue(future.isCompleteSuccess()); - expectMessages(); - - // Disconnect the outgoing queue's incoming connections again to see that - // retries work more than once. - future = outgoingMessageQueue.getChannelGroup().close(); - assertTrue(future.await(1, TimeUnit.SECONDS)); - assertTrue(future.isCompleteSuccess()); - expectMessages(); - - // Shutdown to check that we will not reconnect. - firstTcpClientManager.shutdown(); - secondTcpClientManager.shutdown(); - firstIncomingMessageQueue.shutdown(); - secondIncomingMessageQueue.shutdown(); - future = outgoingMessageQueue.getChannelGroup().close(); - assertTrue(future.await(1, TimeUnit.SECONDS)); - assertTrue(future.isCompleteSuccess()); - expectNoMessages(); - } }