diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java index c69ef3dd3e367e55f52571e019aa10cd9488157b..19769de84718a926543de5fc767f7e2af58211ec 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultPublisher.java @@ -35,6 +35,7 @@ import org.ros.node.topic.Publisher; import org.ros.node.topic.PublisherListener; import org.ros.node.topic.Subscriber; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -63,6 +64,7 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ private final ListenerGroup<PublisherListener<T>> listeners; private final NodeIdentifier nodeIdentifier; private final MessageFactory messageFactory; + private CountDownLatch shutdownLatch; public DefaultPublisher(NodeIdentifier nodeIdentifier, TopicDeclaration topicDeclaration, MessageSerializer<T> serializer, MessageFactory messageFactory, @@ -105,9 +107,20 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ return outgoingMessageQueue.getLatchMode(); } + /** + * Sends shutdown signals and awaits for them to be received by + * {@link DefaultPublisher#signalOnMasterUnregistrationSuccess()} or + * {@link DefaultPublisher#signalOnMasterUnregistrationFailure()} before continuing shutdown + */ @Override public void shutdown(long timeout, TimeUnit unit) { + shutdownLatch = new CountDownLatch(listeners.size()); signalOnShutdown(timeout, unit); + try { + shutdownLatch.await(timeout, unit); + } catch (InterruptedException e) { + log.error(e.getMessage(), e); + } outgoingMessageQueue.shutdown(); listeners.shutdown(); } @@ -252,6 +265,7 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ @Override public void run(PublisherListener<T> listener) { listener.onMasterUnregistrationSuccess(publisher); + shutdownLatch.countDown(); } }); } @@ -269,6 +283,7 @@ public class DefaultPublisher<T> extends DefaultTopicParticipant implements Publ @Override public void run(PublisherListener<T> listener) { listener.onMasterUnregistrationFailure(publisher); + shutdownLatch.countDown(); } }); } diff --git a/rosjava/src/test/java/org/ros/internal/node/MasterRegistrationTest.java b/rosjava/src/test/java/org/ros/internal/node/MasterRegistrationTest.java index 4705d6ef16585e6d0363599ca9d176cd95d2adc0..5b39ebcb8b9d5427a2e1a93f7d86ba7f442902b0 100644 --- a/rosjava/src/test/java/org/ros/internal/node/MasterRegistrationTest.java +++ b/rosjava/src/test/java/org/ros/internal/node/MasterRegistrationTest.java @@ -73,4 +73,25 @@ public class MasterRegistrationTest extends RosTest { publisher.shutdown(); assertTrue(publisherListener.awaitMasterUnregistrationSuccess(1, TimeUnit.SECONDS)); } + + @Test + public void testUnregisterPublisherFailure() throws InterruptedException { + publisherListener = CountDownPublisherListener.newDefault(); + nodeMainExecutor.execute(new AbstractNodeMain() { + @Override + public GraphName getDefaultNodeName() { + return GraphName.of("node"); + } + + @Override + public void onStart(ConnectedNode connectedNode) { + publisher = connectedNode.newPublisher("topic", std_msgs.String._TYPE); + publisher.addListener(publisherListener); + } + }, nodeConfiguration); + assertTrue(publisherListener.awaitMasterRegistrationSuccess(1, TimeUnit.SECONDS)); + rosCore.shutdown(); + publisher.shutdown(); + assertTrue(publisherListener.awaitMasterUnregistrationFailure(6, TimeUnit.SECONDS)); + } }