diff --git a/rosjava/src/main/java/org/ros/concurrent/CircularBlockingDeque.java b/rosjava/src/main/java/org/ros/concurrent/CircularBlockingDeque.java new file mode 100644 index 0000000000000000000000000000000000000000..20a8f26ef3275c92e6037cd108cf8cc6a2472604 --- /dev/null +++ b/rosjava/src/main/java/org/ros/concurrent/CircularBlockingDeque.java @@ -0,0 +1,221 @@ +/* + * Copyright (C) 2012 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.ros.concurrent; + +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * A deque that removes head or tail elements when the number of elements + * exceeds the limit and blocks on {@link #takeFirst()} and {@link #takeLast()} when + * there are no elements available. + * + * @author damonkohler@google.com (Damon Kohler) + */ +public class CircularBlockingDeque<T> implements Iterable<T> { + + private final T[] deque; + private final Object mutex; + + /** + * The maximum number of entries in the queue. + */ + private final int limit; + + /** + * Points to the next entry that will be returned by {@link #takeFirst()} unless + * {@link #isEmpty()}. + */ + private int start; + + /** + * The number of entries in the queue. + */ + private int length; + + /** + * @param capacity + * the maximum number of elements allowed in the queue + */ + @SuppressWarnings("unchecked") + public CircularBlockingDeque(int capacity) { + deque = (T[]) new Object[capacity]; + mutex = new Object(); + limit = capacity; + start = 0; + length = 0; + } + + /** + * Adds the specified entry to the tail of the queue, overwriting older + * entries if necessary. + * + * @param entry + * the entry to add + * @return {@code true} + */ + public boolean addLast(T entry) { + synchronized (mutex) { + deque[(start + length) % limit] = entry; + if (length == limit) { + start = (start + 1) % limit; + } else { + length++; + } + mutex.notify(); + } + return true; + } + + /** + * Adds the specified entry to the tail of the queue, overwriting older + * entries if necessary. + * + * @param entry + * the entry to add + * @return {@code true} + */ + public boolean addFirst(T entry) { + synchronized (mutex) { + if (start - 1 < 0) { + start = limit - 1; + } else { + start--; + } + deque[start] = entry; + if (length < limit) { + length++; + } + mutex.notify(); + } + return true; + } + + /** + * Retrieves the head of the queue, blocking if necessary until an entry is + * available. + * + * @return the head of the queue + * @throws InterruptedException + */ + public T takeFirst() throws InterruptedException { + T entry; + synchronized (mutex) { + while (true) { + if (length > 0) { + entry = deque[start]; + start = (start + 1) % limit; + length--; + break; + } + mutex.wait(); + } + } + return entry; + } + + /** + * Retrieves, but does not remove, the head of this queue, returning + * {@code null} if this queue is empty. + * + * @return the head of this queue, or {@code null} if this queue is empty + */ + public T peekFirst() { + synchronized (mutex) { + if (length > 0) { + return deque[start]; + } + return null; + } + } + + /** + * Retrieves the tail of the queue, blocking if necessary until an entry is + * available. + * + * @return the tail of the queue + * @throws InterruptedException + */ + public T takeLast() throws InterruptedException { + T entry; + synchronized (mutex) { + while (true) { + if (length > 0) { + entry = deque[(start + length - 1) % limit]; + length--; + break; + } + mutex.wait(); + } + } + return entry; + } + + /** + * Retrieves, but does not remove, the tail of this queue, returning + * {@code null} if this queue is empty. + * + * @return the tail of this queue, or {@code null} if this queue is empty + */ + public T peekLast() { + synchronized (mutex) { + if (length > 0) { + return deque[(start + length - 1) % limit]; + } + return null; + } + } + + public boolean isEmpty() { + return length == 0; + } + + /** + * Returns an iterator over the queue. + * <p> + * Note that this is not thread-safe and that {@link Iterator#remove()} is + * unsupported. + * + * @see java.lang.Iterable#iterator() + */ + @Override + public Iterator<T> iterator() { + return new Iterator<T>() { + int offset = 0; + + @Override + public boolean hasNext() { + return offset < length; + } + + @Override + public T next() { + if (offset == length) { + throw new NoSuchElementException(); + } + T entry = deque[(start + offset) % limit]; + offset++; + return entry; + } + + @Override + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } +} diff --git a/rosjava/src/main/java/org/ros/concurrent/CircularBlockingQueue.java b/rosjava/src/main/java/org/ros/concurrent/CircularBlockingQueue.java deleted file mode 100644 index ae011a260bd6140f0eefe46f514172f4b9fede26..0000000000000000000000000000000000000000 --- a/rosjava/src/main/java/org/ros/concurrent/CircularBlockingQueue.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright (C) 2012 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package org.ros.concurrent; - -import java.util.Iterator; -import java.util.Queue; - -/** - * A {@link Queue}-like data structure that removes the old elements when the - * number of elements exceeds the limit and blocks on {@link #take()} when there - * are no elements available. - * - * @author damonkohler@google.com (Damon Kohler) - */ -public class CircularBlockingQueue<T> implements Iterable<T> { - - private final T[] queue; - private final Object mutex; - - /** - * Points to the next entry that will be returned by {@link #take()} unless - * {@link #isEmpty()}. - */ - private int start; - - /** - * The number of entries in the queue. - */ - private int length; - - /** - * The maximum number of entries in the queue. - */ - private int limit; - - /** - * @param capacity - * the maximum number of elements allowed in the queue - */ - @SuppressWarnings("unchecked") - public CircularBlockingQueue(int capacity) { - queue = (T[]) new Object[capacity]; - mutex = new Object(); - start = 0; - length = 0; - limit = capacity; - } - - /** - * Adds the specified entry to the tail of the queue, overwriting older - * entries if necessary. - * - * @param entry - * the entry to add - * @return {@code true} - */ - public boolean add(T entry) { - synchronized (mutex) { - queue[(start + length) % limit] = entry; - if (length == limit) { - start = (start + 1) % limit; - } else { - length++; - } - mutex.notify(); - } - return true; - } - - /** - * Returns the oldest entry in the queue, blocking if necessary until an entry - * is available. - * - * @return the oldest entry - * @throws InterruptedException - */ - public T take() throws InterruptedException { - T entry; - synchronized (mutex) { - while (true) { - if (length > 0) { - entry = queue[start]; - start = (start + 1) % limit; - length--; - break; - } - mutex.wait(); - } - } - return entry; - } - - public boolean isEmpty() { - return length == 0; - } - - /** - * Returns an iterator over the queue. - * <p> - * Note that this is not thread-safe and that {@link Iterator#remove()} is - * unsupported. - * - * @see java.lang.Iterable#iterator() - */ - @Override - public Iterator<T> iterator() { - return new Iterator<T>() { - int offset = 0; - - @Override - public boolean hasNext() { - return offset < length; - } - - @Override - public T next() { - T entry = queue[start + offset]; - offset++; - return entry; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - }; - } -} diff --git a/rosjava/src/test/java/org/ros/concurrent/CircularBlockingQueueTest.java b/rosjava/src/test/java/org/ros/concurrent/CircularBlockingDequeTest.java similarity index 56% rename from rosjava/src/test/java/org/ros/concurrent/CircularBlockingQueueTest.java rename to rosjava/src/test/java/org/ros/concurrent/CircularBlockingDequeTest.java index d8f973fe43952b233c48cd0ba9eb2f12495a8ea7..27a12618b1a983dae942f1431e9a453cebf64fd5 100644 --- a/rosjava/src/test/java/org/ros/concurrent/CircularBlockingQueueTest.java +++ b/rosjava/src/test/java/org/ros/concurrent/CircularBlockingDequeTest.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.Test; import java.util.Iterator; +import java.util.NoSuchElementException; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit; /** * @author damonkohler@google.com (Damon Kohler) */ -public class CircularBlockingQueueTest { +public class CircularBlockingDequeTest { private ExecutorService executorService; @@ -44,52 +45,75 @@ public class CircularBlockingQueueTest { @Test public void testAddAndTake() throws InterruptedException { - CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10); + CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(10); String expectedString1 = "Hello, world!"; String expectedString2 = "Goodbye, world!"; - queue.add(expectedString1); - queue.add(expectedString2); - assertEquals(expectedString1, queue.take()); - assertEquals(expectedString2, queue.take()); + deque.addLast(expectedString1); + deque.addLast(expectedString2); + assertEquals(expectedString1, deque.takeFirst()); + assertEquals(expectedString2, deque.takeFirst()); + } + + @Test + public void testAddFirstAndTakeLast() throws InterruptedException { + CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(10); + String expectedString1 = "Hello, world!"; + String expectedString2 = "Goodbye, world!"; + deque.addLast(expectedString1); + deque.addLast(expectedString2); + assertEquals(expectedString1, deque.peekFirst()); + assertEquals(expectedString2, deque.takeLast()); + deque.addFirst(expectedString2); + assertEquals(expectedString1, deque.peekLast()); + assertEquals(expectedString2, deque.takeFirst()); } @Test public void testOverwrite() throws InterruptedException { - CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(2); + CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(2); String expectedString = "Hello, world!"; - queue.add("overwritten"); - queue.add(expectedString); - queue.add("foo"); - assertEquals(expectedString, queue.take()); + deque.addLast("overwritten"); + deque.addLast(expectedString); + deque.addLast("foo"); + assertEquals(expectedString, deque.takeFirst()); } @Test public void testIterator() throws InterruptedException { - CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10); + // We keep the queue short and throw in an unused element so that the deque + // wraps around the backing array. + CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(2); + deque.addLast("unused"); String expectedString1 = "Hello, world!"; String expectedString2 = "Goodbye, world!"; - queue.add(expectedString1); - queue.add(expectedString2); - Iterator<String> iterator = queue.iterator(); + deque.addLast(expectedString1); + deque.addLast(expectedString2); + Iterator<String> iterator = deque.iterator(); assertEquals(expectedString1, iterator.next()); assertEquals(expectedString2, iterator.next()); assertFalse(iterator.hasNext()); - queue.take(); - iterator = queue.iterator(); + try { + iterator.next(); + fail(); + } catch (NoSuchElementException e) { + // next() should throw an exception if there is no next element. + } + deque.takeFirst(); + iterator = deque.iterator(); assertEquals(expectedString2, iterator.next()); assertFalse(iterator.hasNext()); } @Test public void testBlockingTake() throws InterruptedException { - final CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(1); + final CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(1); final String expectedString = "Hello, world!"; final CountDownLatch latch = new CountDownLatch(1); executorService.execute(new Runnable() { @Override public void run() { try { - assertEquals(expectedString, queue.take()); + assertEquals(expectedString, deque.takeFirst()); } catch (InterruptedException e) { fail(); } @@ -98,7 +122,7 @@ public class CircularBlockingQueueTest { }); // Sleep to ensure we're waiting on take(). Thread.sleep(5); - queue.add(expectedString); + deque.addLast(expectedString); assertTrue(latch.await(1, TimeUnit.SECONDS)); } }