Skip to content
Snippets Groups Projects
Commit aa945669 authored by Damon Kohler's avatar Damon Kohler
Browse files

Added onShutdownComplete() to NodeListener.

Added execute() to Node and removed isRunning() since it should no longer be needed.
Changed the behavior of onShutdown(). It's no called before shutting down the node and gives listeners a chance to cleanup using node resources.
parent 38ef3460
No related branches found
No related tags found
No related merge requests found
Showing
with 173 additions and 74 deletions
......@@ -49,6 +49,10 @@ public class RunFibonacciSimpleActionClient {
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}, configuration);
System.out.println("[Test] Waiting for action server to start");
......@@ -124,6 +128,10 @@ public class RunFibonacciSimpleActionClient {
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}, configuration);
System.out.println("[Test] Waiting for action server to start");
......
......@@ -39,6 +39,10 @@ public class RunFibonacciSimpleActionServer {
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}, configuration);
} catch (Exception e) {
// TODO Auto-generated catch block
......
......@@ -17,9 +17,9 @@
package org.ros.internal.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log;
import org.ros.concurrent.CancellableLoop;
import org.ros.exception.RemoteException;
import org.ros.exception.ServiceNotFoundException;
import org.ros.internal.message.new_style.ServiceMessageDefinition;
......@@ -66,7 +66,9 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
/**
* The default implementation of a {@link Node}.
......@@ -79,6 +81,14 @@ public class DefaultNode implements Node {
private static final boolean DEBUG = false;
/**
* The maximum delay before shutdown will begin even if all
* {@link NodeListener}s have not yet returned from their
* {@link NodeListener#onShutdown(Node)} callback.
*/
private static final int MAX_SHUTDOWN_DELAY_DURATION = 5;
private static final TimeUnit MAX_SHUTDOWN_DELAY_UNITS = TimeUnit.SECONDS;
private final GraphName nodeName;
private final NodeConfiguration nodeConfiguration;
private final NodeNameResolver resolver;
......@@ -95,7 +105,7 @@ public class DefaultNode implements Node {
private final URI masterUri;
/**
* Use for all thread creation.
* Used for all thread creation.
*/
private final ExecutorService executorService;
......@@ -104,11 +114,6 @@ public class DefaultNode implements Node {
*/
private final Collection<NodeListener> nodeListeners;
/**
* {@code true} if the node is in a running state, {@code false} otherwise.
*/
private boolean running;
/**
* {@link DefaultNode}s should only be constructed using the
* {@link DefaultNodeFactory}.
......@@ -154,7 +159,6 @@ public class DefaultNode implements Node {
// initialized with the SlaveServer's SlaveIdentifier so that it can
// register the /rosout Publisher.
log = new RosoutLogger(this);
running = true;
signalOnStart();
}
......@@ -372,18 +376,12 @@ public class DefaultNode implements Node {
return resolver.resolve(new GraphName(name));
}
@Override
public boolean isRunning() {
return running;
}
@Override
public void shutdown() {
Preconditions.checkState(running == true, "Not running.");
signalOnShutdown();
// NOTE(damonkohler): We don't want to raise potentially spurious
// exceptions during shutdown that would interrupt the process. This is
// simply best effort cleanup.
running = false;
slaveServer.shutdown();
registrar.shutdown();
for (Publisher<?> publisher : topicManager.getPublishers()) {
......@@ -436,7 +434,7 @@ public class DefaultNode implements Node {
for (ServiceClient<?, ?> serviceClient : serviceManager.getClients()) {
serviceClient.shutdown();
}
signalOnShutdown();
signalOnShutdownComplete();
}
@Override
......@@ -499,18 +497,45 @@ public class DefaultNode implements Node {
}
/**
* Signal all {@link NodeListener}s that the {@link Node} has shut down.
* Signal all {@link NodeListener}s that the {@link Node} has started shutting
* down.
*
* <p>
* Each listener is called in a separate thread.
*/
private void signalOnShutdown() {
final Node node = this;
final CountDownLatch latch = new CountDownLatch(nodeListeners.size());
for (final NodeListener listener : nodeListeners) {
executorService.execute(new Runnable() {
@Override
public void run() {
listener.onShutdown(node);
latch.countDown();
}
});
}
try {
latch.await(MAX_SHUTDOWN_DELAY_DURATION, MAX_SHUTDOWN_DELAY_UNITS);
} catch (InterruptedException e) {
// Ignored since we do not guarantee that all listeners will finish before
// shutdown begins.
}
}
/**
* Signal all {@link NodeListener}s that the {@link Node} has shut down.
*
* <p>
* Each listener is called in a separate thread.
*/
private void signalOnShutdownComplete() {
final Node node = this;
for (final NodeListener listener : nodeListeners) {
executorService.execute(new Runnable() {
@Override
public void run() {
listener.onShutdownComplete(node);
}
});
}
......@@ -520,4 +545,23 @@ public class DefaultNode implements Node {
InetSocketAddress getAddress() {
return slaveServer.getAddress();
}
@Override
public void execute(final CancellableLoop cancellableLoop) {
executorService.execute(cancellableLoop);
addListener(new NodeListener() {
@Override
public void onStart(Node node) {
}
@Override
public void onShutdown(Node node) {
cancellableLoop.cancel();
}
@Override
public void onShutdownComplete(Node node) {
}
});
}
}
......@@ -188,5 +188,9 @@ public class DefaultNodeRunner implements NodeRunner {
public void onShutdown(Node node) {
unregisterNode(node);
}
@Override
public void onShutdownComplete(Node node) {
}
}
}
......@@ -17,6 +17,7 @@
package org.ros.node;
import org.apache.commons.logging.Log;
import org.ros.concurrent.CancellableLoop;
import org.ros.exception.ServiceNotFoundException;
import org.ros.internal.node.service.ServiceResponseBuilder;
import org.ros.internal.node.xmlrpc.Master;
......@@ -39,6 +40,7 @@ import org.ros.node.topic.SubscriberListener;
import java.net.URI;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
/**
* A node in the ROS graph.
......@@ -308,17 +310,6 @@ public interface Node {
*/
ParameterTree newParameterTree();
/**
* Is the node running?
*
* <p>
* A running node may not be fully initialized yet, it is either in the
* process of starting up or is running.
*
* @return True if the node is running, false otherwise.
*/
boolean isRunning();
/**
* @return the {@link MessageSerializationFactory} used by this node
*/
......@@ -347,4 +338,18 @@ public interface Node {
* the {@link NodeListener} to remove
*/
void removeListener(NodeListener listener);
/**
* Executes a {@link CancellableLoop} using the {@link Node}'s
* {@link ExecutorService}. The {@link CancellableLoop} will be canceled when
* the {@link Node} starts shutting down.
*
* <p>
* Any blocking calls executed in the provided {@link CancellableLoop} can
* potentially delay {@link Node} shutdown and should be avoided.
*
* @param cancellableLoop
* the {@link CancellableLoop} to execute
*/
void execute(CancellableLoop cancellableLoop);
}
......@@ -32,10 +32,25 @@ public interface NodeListener {
void onStart(Node node);
/**
* Called when the {@link Node} has been shut down.
* Called when the {@link Node} has started shutting down. Shutdown will be
* delayed, although not indefinitely, until all {@link NodeListener}s have
* returned from this method.
*
* <p>
* Since this method can potentially delay {@link Node} shutdown, it is
* preferred to use {@link #onShutdownComplete(Node)} when {@link Node}
* resources are not required during the method call.
*
* @param node
* the {@link Node} that has been shut down
* the {@link Node} that has started shutting down
*/
void onShutdown(Node node);
/**
* Called when the {@link Node} has shut down.
*
* @param node
* the {@link Node} that has shut down
*/
void onShutdownComplete(Node node);
}
......@@ -17,6 +17,7 @@
package org.ros;
import org.apache.commons.logging.Log;
import org.ros.concurrent.CancellableLoop;
import org.ros.message.std_msgs.Bool;
import org.ros.message.std_msgs.Float64;
import org.ros.message.std_msgs.Int64;
......@@ -43,21 +44,21 @@ public class ParameterServerTestNode implements NodeMain {
@SuppressWarnings("rawtypes")
@Override
public void onStart(Node node) {
Publisher<org.ros.message.std_msgs.String> pub_tilde =
final Publisher<org.ros.message.std_msgs.String> pub_tilde =
node.newPublisher("tilde", "std_msgs/String");
Publisher<org.ros.message.std_msgs.String> pub_string =
final Publisher<org.ros.message.std_msgs.String> pub_string =
node.newPublisher("string", "std_msgs/String");
Publisher<Int64> pub_int = node.newPublisher("int", "std_msgs/Int64");
Publisher<Bool> pub_bool = node.newPublisher("bool", "std_msgs/Bool");
Publisher<Float64> pub_float = node.newPublisher("float", "std_msgs/Float64");
Publisher<Composite> pub_composite = node.newPublisher("composite", "test_ros/Composite");
Publisher<TestArrays> pub_list = node.newPublisher("list", "test_ros/TestArrays");
final Publisher<Int64> pub_int = node.newPublisher("int", "std_msgs/Int64");
final Publisher<Bool> pub_bool = node.newPublisher("bool", "std_msgs/Bool");
final Publisher<Float64> pub_float = node.newPublisher("float", "std_msgs/Float64");
final Publisher<Composite> pub_composite = node.newPublisher("composite", "test_ros/Composite");
final Publisher<TestArrays> pub_list = node.newPublisher("list", "test_ros/TestArrays");
ParameterTree param = node.newParameterTree();
Log log = node.getLog();
org.ros.message.std_msgs.String tilde_m = new org.ros.message.std_msgs.String();
final org.ros.message.std_msgs.String tilde_m = new org.ros.message.std_msgs.String();
tilde_m.data = param.getString(node.resolveName("~tilde").toString());
log.info("tilde: " + tilde_m.data);
......@@ -68,20 +69,20 @@ public class ParameterServerTestNode implements NodeMain {
NameResolver resolver = node.getResolver().createResolver(paramNamespace);
NameResolver setResolver = node.getResolver().createResolver(targetNamespace);
org.ros.message.std_msgs.String string_m = new org.ros.message.std_msgs.String();
final org.ros.message.std_msgs.String string_m = new org.ros.message.std_msgs.String();
string_m.data = param.getString(resolver.resolve("string"));
log.info("string: " + string_m.data);
Int64 int_m = new org.ros.message.std_msgs.Int64();
final Int64 int_m = new org.ros.message.std_msgs.Int64();
int_m.data = param.getInteger(resolver.resolve("int"));
log.info("int: " + int_m.data);
Bool bool_m = new org.ros.message.std_msgs.Bool();
final Bool bool_m = new org.ros.message.std_msgs.Bool();
bool_m.data = param.getBoolean(resolver.resolve("bool"));
log.info("bool: " + bool_m.data);
Float64 float_m = new org.ros.message.std_msgs.Float64();
final Float64 float_m = new org.ros.message.std_msgs.Float64();
float_m.data = param.getDouble(resolver.resolve("float"));
log.info("float: " + float_m.data);
Composite composite_m = new org.ros.message.test_ros.Composite();
final Composite composite_m = new org.ros.message.test_ros.Composite();
Map composite_map = param.getMap(resolver.resolve("composite"));
composite_m.a.w = (Double) ((Map) composite_map.get("a")).get("w");
composite_m.a.x = (Double) ((Map) composite_map.get("a")).get("x");
......@@ -91,7 +92,7 @@ public class ParameterServerTestNode implements NodeMain {
composite_m.b.y = (Double) ((Map) composite_map.get("b")).get("y");
composite_m.b.z = (Double) ((Map) composite_map.get("b")).get("z");
TestArrays list_m = new org.ros.message.test_ros.TestArrays();
final TestArrays list_m = new org.ros.message.test_ros.TestArrays();
// only using the integer part for easier (non-float) comparison
Object[] list = param.getList(resolver.resolve("list")).toArray();
list_m.int32_array = new int[list.length];
......@@ -107,7 +108,9 @@ public class ParameterServerTestNode implements NodeMain {
param.set(setResolver.resolve("composite"), composite_map);
param.set(setResolver.resolve("list"), Arrays.asList(list));
while (node.isRunning()) {
node.execute(new CancellableLoop() {
@Override
protected void loop() throws InterruptedException {
pub_tilde.publish(tilde_m);
pub_string.publish(string_m);
pub_int.publish(int_m);
......@@ -115,14 +118,16 @@ public class ParameterServerTestNode implements NodeMain {
pub_float.publish(float_m);
pub_composite.publish(composite_m);
pub_list.publish(list_m);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
});
}
@Override
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}
......@@ -89,4 +89,8 @@ public class PassthroughTestNode implements NodeMain {
@Override
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}
......@@ -16,6 +16,7 @@
package org.ros;
import org.ros.concurrent.CancellableLoop;
import org.ros.message.MessageListener;
import org.ros.message.std_msgs.Int64;
import org.ros.node.Node;
......@@ -32,7 +33,7 @@ public class SlaveApiTestNode implements NodeMain {
@Override
public void onStart(Node node) {
// Basic chatter in/out test.
Publisher<org.ros.message.std_msgs.String> pub_string =
final Publisher<org.ros.message.std_msgs.String> pub_string =
node.newPublisher("chatter_out", "std_msgs/String");
MessageListener<org.ros.message.std_msgs.String> chatter_cb =
new MessageListener<org.ros.message.std_msgs.String>() {
......@@ -45,7 +46,7 @@ public class SlaveApiTestNode implements NodeMain {
node.newSubscriber("chatter_in", "std_msgs/String", chatter_cb);
// Have at least one case of dual pub/sub on the same topic.
Publisher<Int64> pub_int64_pubsub = node.newPublisher("int64", "std_msgs/Int64");
final Publisher<Int64> pub_int64_pubsub = node.newPublisher("int64", "std_msgs/Int64");
MessageListener<Int64> int64_cb = new MessageListener<Int64>() {
@Override
public void onNewMessage(Int64 m) {
......@@ -56,7 +57,9 @@ public class SlaveApiTestNode implements NodeMain {
// Don't do any performance optimizations here. We want to make sure that
// GC, etc. is working.
while (true) {
node.execute(new CancellableLoop() {
@Override
protected void loop() throws InterruptedException {
org.ros.message.std_msgs.String chatter = new org.ros.message.std_msgs.String();
chatter.data = "hello " + System.currentTimeMillis();
pub_string.publish(chatter);
......@@ -64,15 +67,16 @@ public class SlaveApiTestNode implements NodeMain {
Int64 num = new Int64();
num.data = 1;
pub_int64_pubsub.publish(num);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Override
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}
......@@ -19,14 +19,11 @@ package org.ros.time;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import org.ros.time.RemoteUptimeClock;
import org.ros.time.RemoteUptimeClock.NanoTimeProvider;
import org.junit.Before;
import org.junit.Test;
import org.ros.message.Duration;
import org.ros.message.Time;
import org.ros.time.TimeProvider;
import org.ros.time.RemoteUptimeClock.NanoTimeProvider;
import java.util.concurrent.Callable;
......@@ -66,6 +63,7 @@ public class RemoteUptimeClockTest {
drift = 1;
uptime = new long[] { 0 };
uptimeCallable = new Callable<Long>() {
@Override
public Long call() throws Exception {
long previousUptime = uptime[0];
moveTimeForward(UPTIME_LATENCY_NS);
......
......@@ -53,4 +53,8 @@ public class Listener implements NodeMain {
@Override
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}
......@@ -61,4 +61,8 @@ public class Talker implements NodeMain {
@Override
public void onShutdown(Node node) {
}
@Override
public void onShutdownComplete(Node node) {
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment