From 03225d138543b6aad2174ba9391460e365febf77 Mon Sep 17 00:00:00 2001 From: Juan Ignacio Ubeira <jubeira@ekumenlabs.com> Date: Thu, 8 Jun 2017 13:56:23 -0300 Subject: [PATCH] Adding the capability to remove a messageListener from a subscriber. --- .../org/ros/concurrent/ListenerGroup.java | 8 ++--- .../node/topic/DefaultSubscriber.java | 10 ++++++ .../transport/queue/IncomingMessageQueue.java | 14 +++++++++ .../transport/queue/MessageDispatcher.java | 31 +++++++++++++++++++ .../java/org/ros/node/topic/Subscriber.java | 12 +++++++ 5 files changed, 71 insertions(+), 4 deletions(-) diff --git a/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java b/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java index 29a8ccb2..5f562c3b 100644 --- a/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java +++ b/rosjava/src/main/java/org/ros/concurrent/ListenerGroup.java @@ -16,6 +16,7 @@ package org.ros.concurrent; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import java.util.Collection; @@ -107,11 +108,10 @@ public class ListenerGroup<T> { * @param listener the listener to remove * @return flag indicating successful removal */ - public boolean remove(T listener) - { + public boolean remove(T listener) { + Preconditions.checkNotNull(listener); for (EventDispatcher<T> eventDispatcher : eventDispatchers) { - if(listener.equals(eventDispatcher.getListener())) - { + if (listener.equals(eventDispatcher.getListener())) { eventDispatcher.cancel(); eventDispatchers.remove(eventDispatcher); return true; diff --git a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java index 6e69293f..7448a293 100644 --- a/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java +++ b/rosjava/src/main/java/org/ros/internal/node/topic/DefaultSubscriber.java @@ -139,6 +139,16 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub addMessageListener(messageListener, 1); } + @Override + public boolean removeMessageListener(MessageListener<T> messageListener) { + return incomingMessageQueue.removeListener(messageListener); + } + + @Override + public void removeAllMessageListeners() { + incomingMessageQueue.removeAllListeners(); + } + @VisibleForTesting public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) { synchronized (mutex) { diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java b/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java index d81d1803..bb809b49 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/IncomingMessageQueue.java @@ -71,6 +71,20 @@ public class IncomingMessageQueue<T> { messageDispatcher.addListener(messageListener, queueCapacity); } + /** + * @see MessageDispatcher#removeListener(MessageListener) + */ + public boolean removeListener(MessageListener<T> messageListener) { + return messageDispatcher.removeListener(messageListener); + } + + /** + * @see MessageDispatcher#removeAllListeners() + */ + public void removeAllListeners() { + messageDispatcher.removeAllListeners(); + } + public void shutdown() { messageDispatcher.cancel(); } diff --git a/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java b/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java index b44bf398..fd132487 100644 --- a/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java +++ b/rosjava/src/main/java/org/ros/internal/transport/queue/MessageDispatcher.java @@ -79,6 +79,37 @@ public class MessageDispatcher<T> extends CancellableLoop { } } + /** + * Removes the specified {@link MessageListener} from the internal + * {@link ListenerGroup}. + * @param messageListener {@link MessageListener} to remove. + * @return True if the listener was removed, false if it wasn't registered before. + * + * @see ListenerGroup#remove(Object) + */ + public boolean removeListener(MessageListener<T> messageListener) { + if (DEBUG) { + log.info("Removing listener."); + } + synchronized (mutex) { + return messageListeners.remove(messageListener); + } + } + + /** + * Removes all the registered {@link MessageListener}s. + * + * @see ListenerGroup#shutdown() + */ + public void removeAllListeners() { + if (DEBUG) { + log.info("Removing all listeners."); + } + synchronized (mutex) { + messageListeners.shutdown(); + } + } + /** * Returns a newly allocated {@link SignalRunnable} for the specified * {@link LazyMessage}. diff --git a/rosjava/src/main/java/org/ros/node/topic/Subscriber.java b/rosjava/src/main/java/org/ros/node/topic/Subscriber.java index 4dc46740..ed339f6a 100644 --- a/rosjava/src/main/java/org/ros/node/topic/Subscriber.java +++ b/rosjava/src/main/java/org/ros/node/topic/Subscriber.java @@ -61,6 +61,18 @@ public interface Subscriber<T> extends TopicParticipant { */ void addMessageListener(MessageListener<T> messageListener); + /** + * Removes a previously added {@link MessageListener}. + * @param messageListener {@link MessageListener} to remove. + * @return True if the listener was removed, false if it wasn't registered before. + */ + boolean removeMessageListener(MessageListener<T> messageListener); + + /** + * Removes all registered {@link MessageListener}s. + */ + void removeAllMessageListeners(); + /** * Shuts down and unregisters the {@link Subscriber}. using the default * timeout Shutdown is delayed by at most the specified timeout to allow -- GitLab