Skip to content
Snippets Groups Projects
Commit 6b1d4b0f authored by Damon Kohler's avatar Damon Kohler
Browse files

Fixes broken test.

parent 2da352b9
No related branches found
No related tags found
No related merge requests found
...@@ -37,14 +37,14 @@ public class IncomingMessageQueue<T> { ...@@ -37,14 +37,14 @@ public class IncomingMessageQueue<T> {
* {@link IncomingMessageQueue#addListener(MessageListener, int)} which are * {@link IncomingMessageQueue#addListener(MessageListener, int)} which are
* consumed by user provided {@link MessageListener}s. * consumed by user provided {@link MessageListener}s.
*/ */
private static final int QUEUE_CAPACITY = 16; private static final int DEQUE_CAPACITY = 16;
private final MessageReceiver<T> messageReceiver; private final MessageReceiver<T> messageReceiver;
private final MessageDispatcher<T> messageDispatcher; private final MessageDispatcher<T> messageDispatcher;
public IncomingMessageQueue(MessageDeserializer<T> deserializer, ExecutorService executorService) { public IncomingMessageQueue(MessageDeserializer<T> deserializer, ExecutorService executorService) {
CircularBlockingDeque<LazyMessage<T>> lazyMessages = CircularBlockingDeque<LazyMessage<T>> lazyMessages =
new CircularBlockingDeque<LazyMessage<T>>(QUEUE_CAPACITY); new CircularBlockingDeque<LazyMessage<T>>(DEQUE_CAPACITY);
messageReceiver = new MessageReceiver<T>(lazyMessages, deserializer); messageReceiver = new MessageReceiver<T>(lazyMessages, deserializer);
messageDispatcher = new MessageDispatcher<T>(lazyMessages, executorService); messageDispatcher = new MessageDispatcher<T>(lazyMessages, executorService);
executorService.execute(messageDispatcher); executorService.execute(messageDispatcher);
......
...@@ -42,10 +42,10 @@ public class OutgoingMessageQueue<T> { ...@@ -42,10 +42,10 @@ public class OutgoingMessageQueue<T> {
private static final boolean DEBUG = false; private static final boolean DEBUG = false;
private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class); private static final Log log = LogFactory.getLog(OutgoingMessageQueue.class);
private static final int QUEUE_CAPACITY = 16; private static final int DEQUE_CAPACITY = 16;
private final MessageSerializer<T> serializer; private final MessageSerializer<T> serializer;
private final CircularBlockingDeque<T> queue; private final CircularBlockingDeque<T> deque;
private final ChannelGroup channelGroup; private final ChannelGroup channelGroup;
private final Writer writer; private final Writer writer;
private final MessageBufferPool messageBufferPool; private final MessageBufferPool messageBufferPool;
...@@ -57,7 +57,7 @@ public class OutgoingMessageQueue<T> { ...@@ -57,7 +57,7 @@ public class OutgoingMessageQueue<T> {
private final class Writer extends CancellableLoop { private final class Writer extends CancellableLoop {
@Override @Override
public void loop() throws InterruptedException { public void loop() throws InterruptedException {
T message = queue.takeFirst(); T message = deque.takeFirst();
final ChannelBuffer buffer = messageBufferPool.acquire(); final ChannelBuffer buffer = messageBufferPool.acquire();
serializer.serialize(message, buffer); serializer.serialize(message, buffer);
if (DEBUG) { if (DEBUG) {
...@@ -79,7 +79,7 @@ public class OutgoingMessageQueue<T> { ...@@ -79,7 +79,7 @@ public class OutgoingMessageQueue<T> {
public OutgoingMessageQueue(MessageSerializer<T> serializer, ExecutorService executorService) { public OutgoingMessageQueue(MessageSerializer<T> serializer, ExecutorService executorService) {
this.serializer = serializer; this.serializer = serializer;
queue = new CircularBlockingDeque<T>(QUEUE_CAPACITY); deque = new CircularBlockingDeque<T>(DEQUE_CAPACITY);
channelGroup = new DefaultChannelGroup(); channelGroup = new DefaultChannelGroup();
writer = new Writer(); writer = new Writer();
messageBufferPool = new MessageBufferPool(); messageBufferPool = new MessageBufferPool();
...@@ -101,7 +101,7 @@ public class OutgoingMessageQueue<T> { ...@@ -101,7 +101,7 @@ public class OutgoingMessageQueue<T> {
* the message to add to the queue * the message to add to the queue
*/ */
public void add(T message) { public void add(T message) {
queue.addLast(message); deque.addLast(message);
setLatchedMessage(message); setLatchedMessage(message);
} }
......
...@@ -75,7 +75,7 @@ public class NameResolver { ...@@ -75,7 +75,7 @@ public class NameResolver {
public GraphName resolve(GraphName namespace, GraphName name) { public GraphName resolve(GraphName namespace, GraphName name) {
GraphName remappedNamespace = lookUpRemapping(namespace); GraphName remappedNamespace = lookUpRemapping(namespace);
if (!remappedNamespace.isGlobal()) { if (!remappedNamespace.isGlobal()) {
throw new IllegalStateException(String.format( throw new IllegalArgumentException(String.format(
"Namespace %s (remapped from %s) must be global.", remappedNamespace, namespace)); "Namespace %s (remapped from %s) must be global.", remappedNamespace, namespace));
} }
GraphName remappedName = lookUpRemapping(name); GraphName remappedName = lookUpRemapping(name);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment