Skip to content
Snippets Groups Projects
Commit 46a9300b authored by Julian Cerruti's avatar Julian Cerruti Committed by GitHub
Browse files

Merge pull request #245 from jubeira/dev/add_subscriber_listener_removal

Adding the capability to remove a MessageListener from a subscriber.
parents deaeafa7 03225d13
Branches
Tags
No related merge requests found
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
package org.ros.concurrent; package org.ros.concurrent;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import java.util.Collection; import java.util.Collection;
...@@ -107,11 +108,10 @@ public class ListenerGroup<T> { ...@@ -107,11 +108,10 @@ public class ListenerGroup<T> {
* @param listener the listener to remove * @param listener the listener to remove
* @return flag indicating successful removal * @return flag indicating successful removal
*/ */
public boolean remove(T listener) public boolean remove(T listener) {
{ Preconditions.checkNotNull(listener);
for (EventDispatcher<T> eventDispatcher : eventDispatchers) { for (EventDispatcher<T> eventDispatcher : eventDispatchers) {
if(listener.equals(eventDispatcher.getListener())) if (listener.equals(eventDispatcher.getListener())) {
{
eventDispatcher.cancel(); eventDispatcher.cancel();
eventDispatchers.remove(eventDispatcher); eventDispatchers.remove(eventDispatcher);
return true; return true;
......
...@@ -139,6 +139,16 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub ...@@ -139,6 +139,16 @@ public class DefaultSubscriber<T> extends DefaultTopicParticipant implements Sub
addMessageListener(messageListener, 1); addMessageListener(messageListener, 1);
} }
@Override
public boolean removeMessageListener(MessageListener<T> messageListener) {
return incomingMessageQueue.removeListener(messageListener);
}
@Override
public void removeAllMessageListeners() {
incomingMessageQueue.removeAllListeners();
}
@VisibleForTesting @VisibleForTesting
public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) { public void addPublisher(PublisherIdentifier publisherIdentifier, InetSocketAddress address) {
synchronized (mutex) { synchronized (mutex) {
......
...@@ -71,6 +71,20 @@ public class IncomingMessageQueue<T> { ...@@ -71,6 +71,20 @@ public class IncomingMessageQueue<T> {
messageDispatcher.addListener(messageListener, queueCapacity); messageDispatcher.addListener(messageListener, queueCapacity);
} }
/**
* @see MessageDispatcher#removeListener(MessageListener)
*/
public boolean removeListener(MessageListener<T> messageListener) {
return messageDispatcher.removeListener(messageListener);
}
/**
* @see MessageDispatcher#removeAllListeners()
*/
public void removeAllListeners() {
messageDispatcher.removeAllListeners();
}
public void shutdown() { public void shutdown() {
messageDispatcher.cancel(); messageDispatcher.cancel();
} }
......
...@@ -79,6 +79,37 @@ public class MessageDispatcher<T> extends CancellableLoop { ...@@ -79,6 +79,37 @@ public class MessageDispatcher<T> extends CancellableLoop {
} }
} }
/**
* Removes the specified {@link MessageListener} from the internal
* {@link ListenerGroup}.
* @param messageListener {@link MessageListener} to remove.
* @return True if the listener was removed, false if it wasn't registered before.
*
* @see ListenerGroup#remove(Object)
*/
public boolean removeListener(MessageListener<T> messageListener) {
if (DEBUG) {
log.info("Removing listener.");
}
synchronized (mutex) {
return messageListeners.remove(messageListener);
}
}
/**
* Removes all the registered {@link MessageListener}s.
*
* @see ListenerGroup#shutdown()
*/
public void removeAllListeners() {
if (DEBUG) {
log.info("Removing all listeners.");
}
synchronized (mutex) {
messageListeners.shutdown();
}
}
/** /**
* Returns a newly allocated {@link SignalRunnable} for the specified * Returns a newly allocated {@link SignalRunnable} for the specified
* {@link LazyMessage}. * {@link LazyMessage}.
......
...@@ -61,6 +61,18 @@ public interface Subscriber<T> extends TopicParticipant { ...@@ -61,6 +61,18 @@ public interface Subscriber<T> extends TopicParticipant {
*/ */
void addMessageListener(MessageListener<T> messageListener); void addMessageListener(MessageListener<T> messageListener);
/**
* Removes a previously added {@link MessageListener}.
* @param messageListener {@link MessageListener} to remove.
* @return True if the listener was removed, false if it wasn't registered before.
*/
boolean removeMessageListener(MessageListener<T> messageListener);
/**
* Removes all registered {@link MessageListener}s.
*/
void removeAllMessageListeners();
/** /**
* Shuts down and unregisters the {@link Subscriber}. using the default * Shuts down and unregisters the {@link Subscriber}. using the default
* timeout Shutdown is delayed by at most the specified timeout to allow * timeout Shutdown is delayed by at most the specified timeout to allow
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment