Skip to content
Snippets Groups Projects
Commit 834dbd20 authored by Damon Kohler's avatar Damon Kohler
Browse files

Adds missing CircularBlockingDeque and tests.

parent 6b1d4b0f
No related branches found
No related tags found
No related merge requests found
......@@ -17,22 +17,27 @@
package org.ros.concurrent;
import java.util.Iterator;
import java.util.Queue;
import java.util.NoSuchElementException;
/**
* A {@link Queue}-like data structure that removes the old elements when the
* number of elements exceeds the limit and blocks on {@link #take()} when there
* are no elements available.
* A deque that removes head or tail elements when the number of elements
* exceeds the limit and blocks on {@link #takeFirst()} and {@link #takeLast()} when
* there are no elements available.
*
* @author damonkohler@google.com (Damon Kohler)
*/
public class CircularBlockingQueue<T> implements Iterable<T> {
public class CircularBlockingDeque<T> implements Iterable<T> {
private final T[] queue;
private final T[] deque;
private final Object mutex;
/**
* Points to the next entry that will be returned by {@link #take()} unless
* The maximum number of entries in the queue.
*/
private final int limit;
/**
* Points to the next entry that will be returned by {@link #takeFirst()} unless
* {@link #isEmpty()}.
*/
private int start;
......@@ -42,22 +47,17 @@ public class CircularBlockingQueue<T> implements Iterable<T> {
*/
private int length;
/**
* The maximum number of entries in the queue.
*/
private int limit;
/**
* @param capacity
* the maximum number of elements allowed in the queue
*/
@SuppressWarnings("unchecked")
public CircularBlockingQueue(int capacity) {
queue = (T[]) new Object[capacity];
public CircularBlockingDeque(int capacity) {
deque = (T[]) new Object[capacity];
mutex = new Object();
limit = capacity;
start = 0;
length = 0;
limit = capacity;
}
/**
......@@ -68,9 +68,9 @@ public class CircularBlockingQueue<T> implements Iterable<T> {
* the entry to add
* @return {@code true}
*/
public boolean add(T entry) {
public boolean addLast(T entry) {
synchronized (mutex) {
queue[(start + length) % limit] = entry;
deque[(start + length) % limit] = entry;
if (length == limit) {
start = (start + 1) % limit;
} else {
......@@ -82,18 +82,42 @@ public class CircularBlockingQueue<T> implements Iterable<T> {
}
/**
* Returns the oldest entry in the queue, blocking if necessary until an entry
* is available.
* Adds the specified entry to the tail of the queue, overwriting older
* entries if necessary.
*
* @param entry
* the entry to add
* @return {@code true}
*/
public boolean addFirst(T entry) {
synchronized (mutex) {
if (start - 1 < 0) {
start = limit - 1;
} else {
start--;
}
deque[start] = entry;
if (length < limit) {
length++;
}
mutex.notify();
}
return true;
}
/**
* Retrieves the head of the queue, blocking if necessary until an entry is
* available.
*
* @return the oldest entry
* @return the head of the queue
* @throws InterruptedException
*/
public T take() throws InterruptedException {
public T takeFirst() throws InterruptedException {
T entry;
synchronized (mutex) {
while (true) {
if (length > 0) {
entry = queue[start];
entry = deque[start];
start = (start + 1) % limit;
length--;
break;
......@@ -104,6 +128,58 @@ public class CircularBlockingQueue<T> implements Iterable<T> {
return entry;
}
/**
* Retrieves, but does not remove, the head of this queue, returning
* {@code null} if this queue is empty.
*
* @return the head of this queue, or {@code null} if this queue is empty
*/
public T peekFirst() {
synchronized (mutex) {
if (length > 0) {
return deque[start];
}
return null;
}
}
/**
* Retrieves the tail of the queue, blocking if necessary until an entry is
* available.
*
* @return the tail of the queue
* @throws InterruptedException
*/
public T takeLast() throws InterruptedException {
T entry;
synchronized (mutex) {
while (true) {
if (length > 0) {
entry = deque[(start + length - 1) % limit];
length--;
break;
}
mutex.wait();
}
}
return entry;
}
/**
* Retrieves, but does not remove, the tail of this queue, returning
* {@code null} if this queue is empty.
*
* @return the tail of this queue, or {@code null} if this queue is empty
*/
public T peekLast() {
synchronized (mutex) {
if (length > 0) {
return deque[(start + length - 1) % limit];
}
return null;
}
}
public boolean isEmpty() {
return length == 0;
}
......@@ -128,7 +204,10 @@ public class CircularBlockingQueue<T> implements Iterable<T> {
@Override
public T next() {
T entry = queue[start + offset];
if (offset == length) {
throw new NoSuchElementException();
}
T entry = deque[(start + offset) % limit];
offset++;
return entry;
}
......
......@@ -25,6 +25,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
......@@ -33,7 +34,7 @@ import java.util.concurrent.TimeUnit;
/**
* @author damonkohler@google.com (Damon Kohler)
*/
public class CircularBlockingQueueTest {
public class CircularBlockingDequeTest {
private ExecutorService executorService;
......@@ -44,52 +45,75 @@ public class CircularBlockingQueueTest {
@Test
public void testAddAndTake() throws InterruptedException {
CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10);
CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(10);
String expectedString1 = "Hello, world!";
String expectedString2 = "Goodbye, world!";
queue.add(expectedString1);
queue.add(expectedString2);
assertEquals(expectedString1, queue.take());
assertEquals(expectedString2, queue.take());
deque.addLast(expectedString1);
deque.addLast(expectedString2);
assertEquals(expectedString1, deque.takeFirst());
assertEquals(expectedString2, deque.takeFirst());
}
@Test
public void testAddFirstAndTakeLast() throws InterruptedException {
CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(10);
String expectedString1 = "Hello, world!";
String expectedString2 = "Goodbye, world!";
deque.addLast(expectedString1);
deque.addLast(expectedString2);
assertEquals(expectedString1, deque.peekFirst());
assertEquals(expectedString2, deque.takeLast());
deque.addFirst(expectedString2);
assertEquals(expectedString1, deque.peekLast());
assertEquals(expectedString2, deque.takeFirst());
}
@Test
public void testOverwrite() throws InterruptedException {
CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(2);
CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(2);
String expectedString = "Hello, world!";
queue.add("overwritten");
queue.add(expectedString);
queue.add("foo");
assertEquals(expectedString, queue.take());
deque.addLast("overwritten");
deque.addLast(expectedString);
deque.addLast("foo");
assertEquals(expectedString, deque.takeFirst());
}
@Test
public void testIterator() throws InterruptedException {
CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(10);
// We keep the queue short and throw in an unused element so that the deque
// wraps around the backing array.
CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(2);
deque.addLast("unused");
String expectedString1 = "Hello, world!";
String expectedString2 = "Goodbye, world!";
queue.add(expectedString1);
queue.add(expectedString2);
Iterator<String> iterator = queue.iterator();
deque.addLast(expectedString1);
deque.addLast(expectedString2);
Iterator<String> iterator = deque.iterator();
assertEquals(expectedString1, iterator.next());
assertEquals(expectedString2, iterator.next());
assertFalse(iterator.hasNext());
queue.take();
iterator = queue.iterator();
try {
iterator.next();
fail();
} catch (NoSuchElementException e) {
// next() should throw an exception if there is no next element.
}
deque.takeFirst();
iterator = deque.iterator();
assertEquals(expectedString2, iterator.next());
assertFalse(iterator.hasNext());
}
@Test
public void testBlockingTake() throws InterruptedException {
final CircularBlockingQueue<String> queue = new CircularBlockingQueue<String>(1);
final CircularBlockingDeque<String> deque = new CircularBlockingDeque<String>(1);
final String expectedString = "Hello, world!";
final CountDownLatch latch = new CountDownLatch(1);
executorService.execute(new Runnable() {
@Override
public void run() {
try {
assertEquals(expectedString, queue.take());
assertEquals(expectedString, deque.takeFirst());
} catch (InterruptedException e) {
fail();
}
......@@ -98,7 +122,7 @@ public class CircularBlockingQueueTest {
});
// Sleep to ensure we're waiting on take().
Thread.sleep(5);
queue.add(expectedString);
deque.addLast(expectedString);
assertTrue(latch.await(1, TimeUnit.SECONDS));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment