diff --git a/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java b/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java index 29a8ccb2ca79807dd03913549ae2adc34dae7d81..5f562c3bb82614fcbdf5a1d9ccf2eb30c931c41d 100644 --- a/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java +++ b/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java @@ -16,6 +16,7 @@ package org.ros.concurrent; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.Collection; @@ -107,11 +108,10 @@ public class ListenerGroup<T> { * @param listener the listener to remove * @return flag indicating successful removal */ - public boolean remove(T listener) - { + public boolean remove(T listener) { + Preconditions.checkNotNull(listener); for (EventDispatcher<T> eventDispatcher : eventDispatchers) { - if(listener.equals(eventDispatcher.getListener())) - { + if (listener.equals(eventDispatcher.getListener())) { eventDispatcher.cancel(); eventDispatchers.remove(eventDispatcher); return true; diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index 6e69293ff58e374e03738016d2f2568267a2c4a8..7448a2938defab4d70d10d48714577d7c5f96282 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -139,6 +139,16 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub addMessageListener(messageListener, 1); } + @Override + public boolean removeMessageListener(MessageListener<T> messageListener) { + return incomingMessageQueue.removeListener(messageListener); + } + + @Override + public void removeAllMessageListeners() { + incomingMessageQueue.removeAllListeners(); + } + @VisibleForTesting public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) { synchronized (mutex) { diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java b/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java index d81d1803465901e05a53f966b52a99fab8444692..bb809b492f15f663294271b3897f06d9e7ec5764 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java @@ -71,6 +71,20 @@ public class IncomingMessageQueue<T> { messageDispatcher.addListener(messageListener, queueCapacity); } + /** + * @see MessageDispatcher#removeListener(MessageListener) + */ + public boolean removeListener(MessageListener<T> messageListener) { + return messageDispatcher.removeListener(messageListener); + } + + /** + * @see MessageDispatcher#removeAllListeners() + */ + public void removeAllListeners() { + messageDispatcher.removeAllListeners(); + } + public void shutdown() { messageDispatcher.cancel(); } diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java b/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java index b44bf398f254833d2f4a65beec2b2e1ffb2880e1..fd1324877febf53c830cae1ab46eee4f93947e9c 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java @@ -79,6 +79,37 @@ public class MessageDispatcher<T> extends CancellableLoop { } } + /** + * Removes the specified {@link MessageListener} from the internal + * {@link ListenerGroup}. + * @param messageListener {@link MessageListener} to remove. + * @return True if the listener was removed, false if it wasn't registered before. + * + * @see ListenerGroup#remove(Object) + */ + public boolean removeListener(MessageListener<T> messageListener) { + if (DEBUG) { + log.info("Removing listener."); + } + synchronized (mutex) { + return messageListeners.remove(messageListener); + } + } + + /** + * Removes all the registered {@link MessageListener}s. + * + * @see ListenerGroup#shutdown() + */ + public void removeAllListeners() { + if (DEBUG) { + log.info("Removing all listeners."); + } + synchronized (mutex) { + messageListeners.shutdown(); + } + } + /** * Returns a newly allocated {@link SignalRunnable} for the specified * {@link LazyMessage}. diff --git a/rosjava/src/main/java/org/ros/node/topic/Subscriber.java b/rosjava/src/main/java/org/ros/node/topic/Subscriber.java index 4dc46740e4274c90092a6d5dfe482472f9e20f6b..ed339f6a3372dd494fb441c0f59932f53b0f922a 100644 --- a/rosjava/src/main/java/org/ros/node/topic/Subscriber.java +++ b/rosjava/src/main/java/org/ros/node/topic/Subscriber.java @@ -61,6 +61,18 @@ public interface Subscriber<T> extends TopicParticipant { */ void addMessageListener(MessageListener<T> messageListener); + /** + * Removes a previously added {@link MessageListener}. + * @param messageListener {@link MessageListener} to remove. + * @return True if the listener was removed, false if it wasn't registered before. + */ + boolean removeMessageListener(MessageListener<T> messageListener); + + /** + * Removes all registered {@link MessageListener}s. + */ + void removeAllMessageListeners(); + /** * Shuts down and unregisters the {@link Subscriber}. using the default * timeout Shutdown is delayed by at most the specified timeout to allow