Skip to content
Snippets Groups Projects
Commit c9bb02d8 authored by Johannes Mey's avatar Johannes Mey
Browse files

Merge branch 'feature/distributed-sync' into feature/package_split

parents 1e8592cd 79405545
No related branches found
No related tags found
No related merge requests found
...@@ -28,7 +28,7 @@ void NngConnection::receiveMessages() { ...@@ -28,7 +28,7 @@ void NngConnection::receiveMessages() {
} else if (recv_rv == NNG_EAGAIN) { } else if (recv_rv == NNG_EAGAIN) {
// no message received in current spin // no message received in current spin
} else { } else {
ROS_ERROR_STREAM("[ros2cgv] nng_recv returned: " << nng_strerror(recv_rv)); ROS_ERROR_STREAM("[" << ros::this_node::getName() << "] nng_recv returned: " << nng_strerror(recv_rv));
} }
loop_rate.sleep(); loop_rate.sleep();
} }
...@@ -37,19 +37,20 @@ void NngConnection::receiveMessages() { ...@@ -37,19 +37,20 @@ void NngConnection::receiveMessages() {
bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) { bool NngConnection::initializeConnection(std::function<void(std::string, std::string)> callback) {
int rv;
if (server) {
// first, establish the connection // first, establish the connection
ROS_INFO_STREAM("[ros2cgv] listening for connections at " << cgv_address); ROS_INFO_STREAM("[NngConnection] listening for connections at " << connection_address);
int rv;
if ((rv = nng_pair1_open(&sock)) != 0) { if ((rv = nng_pair1_open(&sock)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_pair1_open returned: " << nng_strerror(rv)); ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv));
return false; return false;
} }
nng_listener listener; nng_listener listener;
if ((rv = nng_listen(sock, cgv_address.c_str(), &listener, 0)) != 0) { if ((rv = nng_listen(sock, connection_address.c_str(), &listener, 0)) != 0) {
ROS_ERROR_STREAM("[ros2cgv] nng_listen returned: " << nng_strerror(rv)); ROS_ERROR_STREAM("[NngConnection] nng_listen returned: " << nng_strerror(rv));
return false; return false;
} }
...@@ -58,11 +59,28 @@ bool NngConnection::initializeConnection(std::function<void(std::string, std::st ...@@ -58,11 +59,28 @@ bool NngConnection::initializeConnection(std::function<void(std::string, std::st
// then, start the thread that uses the callback // then, start the thread that uses the callback
nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this); nng_receiver_thread = std::make_unique<std::thread>(&NngConnection::receiveMessages, this);
} else { // client mode
ROS_INFO_STREAM("[NngConnection] establishing connection with " << connection_address);
if ((rv = nng_pair1_open(&sock)) != 0) {
ROS_ERROR_STREAM("[NngConnection] nng_pair1_open returned: " << nng_strerror(rv));
}
ros::Rate connection_retry_rate(1);
while ((rv = nng_dial(sock, connection_address.c_str(), nullptr, 0)) != 0) {
ROS_WARN_STREAM("[NngConnection] nng_dial returned: " << nng_strerror(rv)
<< ". Trying to connect again in one second...");
connection_retry_rate.sleep();
}
ROS_INFO_STREAM("[NngConnection] Connection established!");
}
return true; return true;
} }
NngConnection::NngConnection(const std::string &cgvAddress) : cgv_address{cgvAddress}, sock{0} {} NngConnection::NngConnection(const std::string &connection_address, bool server) :
connection_address{connection_address}, sock{0}, server{server} {}
bool NngConnection::send(const std::string &channel, const std::string &message) { bool NngConnection::send(const std::string &channel, const std::string &message) {
......
...@@ -44,8 +44,6 @@ Object *RobotArmController::resolveObject(const std::string &id) { ...@@ -44,8 +44,6 @@ Object *RobotArmController::resolveObject(const std::string &id) {
void RobotArmController::sendScene() { void RobotArmController::sendScene() {
if (scene) { // meaning if the (smart) pointer is not a nullptr if (scene) { // meaning if the (smart) pointer is not a nullptr
scene->SerializeAsString();
ROS_INFO_STREAM("[ros2cgv] Sending scene with " << scene->objects().size() << " objects."); ROS_INFO_STREAM("[ros2cgv] Sending scene with " << scene->objects().size() << " objects.");
sendToAll(sendSceneTopic, scene->SerializeAsString()); sendToAll(sendSceneTopic, scene->SerializeAsString());
sceneUpdateAction(); sceneUpdateAction();
......
...@@ -19,23 +19,21 @@ ...@@ -19,23 +19,21 @@
#include "ccf/connection/NngConnection.h" #include "ccf/connection/NngConnection.h"
#include "ccf/util/NodeUtil.h" #include "ccf/util/NodeUtil.h"
const std::string NODE_NAME = "ros2cgv_dummy"; std::string NODE_NAME = "dummy_cgv_controller";
using CetiRosToolbox::getParameter; using CetiRosToolbox::getParameter;
using CetiRosToolbox::getPrivateParameter;
int main(int argc, char **argv) { int main(int argc, char **argv) {
GOOGLE_PROTOBUF_VERIFY_VERSION; GOOGLE_PROTOBUF_VERIFY_VERSION;
ros::init(argc, argv, NODE_NAME); ros::init(argc, argv, NODE_NAME);
NODE_NAME = ros::this_node::getName();
ros::NodeHandle n("connector_node_ros_cgv"); // namespace where the cgv_address is ros::NodeHandle n("connector_node_ros_cgv"); // namespace where the connection_address is
std::string cgv_address = "tcp://*:6576"; auto cgv_address = getPrivateParameter<std::string>("connection_address", "tcp://*:6576");
if (!n.getParam("cgv_address", cgv_address)) {
ROS_WARN_STREAM("[" << NODE_NAME << "] Could not get string value for " << n.getNamespace()
<< "/cgv_address from param server, using default " << cgv_address);
}
DummyRobotArmController connector{n, NODE_NAME}; DummyRobotArmController connector{n, NODE_NAME};
...@@ -45,6 +43,17 @@ int main(int argc, char **argv) { ...@@ -45,6 +43,17 @@ int main(int argc, char **argv) {
connection->setSendTopic(getParameter(n, "topics/scene", NODE_NAME + "/scene/update")); connection->setSendTopic(getParameter(n, "topics/scene", NODE_NAME + "/scene/update"));
connector.addConnection(std::move(connection)); connector.addConnection(std::move(connection));
auto clientControllers = getPrivateParameter<std::vector<std::string>>("client_controllers");
ROS_INFO_STREAM("[" << NODE_NAME << "] Connecting to " << clientControllers.size() << " client controllers.");
for (const auto &client : clientControllers) {
ROS_INFO_STREAM("[" << NODE_NAME << "] Connecting to client at " << client << ".");
std::unique_ptr<NngConnection> client_connection = std::make_unique<NngConnection>(client, false);
client_connection->setSendTopic(getParameter<std::string>(n, "topics/selection", "selection"));
client_connection->setReceiveTopic("client_scene");
connector.addConnection(std::move(client_connection));
}
connector.loadScene(ros::package::getPath("ccf") + "/config/config_scene.yaml"); connector.loadScene(ros::package::getPath("ccf") + "/config/config_scene.yaml");
Object *robot = nullptr; Object *robot = nullptr;
...@@ -57,9 +66,13 @@ int main(int argc, char **argv) { ...@@ -57,9 +66,13 @@ int main(int argc, char **argv) {
ros::Timer timer = n.createTimer(ros::Duration(10), [&connector]( ros::Timer timer = n.createTimer(ros::Duration(10), [&connector](
const ros::TimerEvent &event) { connector.sendScene(); }); // send a scene every ten seconds const ros::TimerEvent &event) { connector.sendScene(); }); // send a scene every ten seconds
auto selectionMessageCallback = [&currentlyPickedBox, &connector, &robot, &selectedBox, &selectedBin]( auto selectionMessageCallback = [&currentlyPickedBox, &connector, &robot, &selectedBox, &selectedBin, &n](
const Selection &selection) { const Selection &selection) {
// forward the selection to the clients
connector.sendToAll(getParameter<std::string>(n, "topics/selection", "selection"),
selection.SerializeAsString());
if (currentlyPickedBox.has_value()) { if (currentlyPickedBox.has_value()) {
ROS_WARN_STREAM("[" << NODE_NAME << "] Unable to accept selections while picking is in progress."); ROS_WARN_STREAM("[" << NODE_NAME << "] Unable to accept selections while picking is in progress.");
return; return;
......
...@@ -28,6 +28,7 @@ const std::string CELL_NAME = "place-a"; ...@@ -28,6 +28,7 @@ const std::string CELL_NAME = "place-a";
const std::string NODE_NAME = "rag_connector_dummy_a"; const std::string NODE_NAME = "rag_connector_dummy_a";
using CetiRosToolbox::getParameter; using CetiRosToolbox::getParameter;
using CetiRosToolbox::getPrivateParameter;
int main(int argc, char **argv) { int main(int argc, char **argv) {
...@@ -37,7 +38,7 @@ int main(int argc, char **argv) { ...@@ -37,7 +38,7 @@ int main(int argc, char **argv) {
ros::NodeHandle n("ceti_connector"); // namespace ros::NodeHandle n("ceti_connector"); // namespace
std::string cgv_address = getParameter<std::string>(n, "cgv_address", "tcp://*:6576"); auto cgv_address = getPrivateParameter<std::string>("connection_address", "tcp://*:6576");
DummyRobotArmController connector{n, CELL_NAME}; DummyRobotArmController connector{n, CELL_NAME};
......
...@@ -27,6 +27,7 @@ const std::string CELL_NAME = "place-b"; ...@@ -27,6 +27,7 @@ const std::string CELL_NAME = "place-b";
const std::string NODE_NAME = "rag_connector_dummy_b"; const std::string NODE_NAME = "rag_connector_dummy_b";
using CetiRosToolbox::getParameter; using CetiRosToolbox::getParameter;
using CetiRosToolbox::getPrivateParameter;
int main(int argc, char **argv) { int main(int argc, char **argv) {
...@@ -36,7 +37,7 @@ int main(int argc, char **argv) { ...@@ -36,7 +37,7 @@ int main(int argc, char **argv) {
ros::NodeHandle n("ceti_connector"); // namespace ros::NodeHandle n("ceti_connector"); // namespace
std::string cgv_address = getParameter<std::string>(n, "cgv_address", "tcp://*:6576"); auto cgv_address = getPrivateParameter<std::string>("connection_address", "tcp://*:6576");
std::string cell_prefix = getParameter(n, "place_b_prefix", CELL_NAME); std::string cell_prefix = getParameter(n, "place_b_prefix", CELL_NAME);
......
...@@ -18,23 +18,21 @@ ...@@ -18,23 +18,21 @@
#include "ccf/connection/NngConnection.h" #include "ccf/connection/NngConnection.h"
#include "ccf/util/NodeUtil.h" #include "ccf/util/NodeUtil.h"
const std::string NODE_NAME = "ros2cgv_moveit"; std::string NODE_NAME = "moveit_cgv_controller";
using CetiRosToolbox::getParameter; using CetiRosToolbox::getParameter;
using CetiRosToolbox::getPrivateParameter;
int main(int argc, char **argv) { int main(int argc, char **argv) {
GOOGLE_PROTOBUF_VERIFY_VERSION; GOOGLE_PROTOBUF_VERIFY_VERSION;
ros::init(argc, argv, NODE_NAME); ros::init(argc, argv, NODE_NAME);
NODE_NAME = ros::this_node::getName();
ros::NodeHandle n("connector_node_ros_cgv"); // namespace where the cgv_address is ros::NodeHandle n("connector_node_ros_cgv"); // namespace where the connection_address is
std::string cgv_address = "tcp://*:6576"; auto cgv_address = getPrivateParameter<std::string>("connection_address", "tcp://*:6576");
if (!n.getParam("cgv_address", cgv_address)) {
ROS_WARN_STREAM("[" << NODE_NAME << "] Could not get string value for " << n.getNamespace()
<< "/cgv_address from param server, using default " << cgv_address);
}
MoveItRobotArmController connector{n, NODE_NAME}; MoveItRobotArmController connector{n, NODE_NAME};
...@@ -44,6 +42,17 @@ int main(int argc, char **argv) { ...@@ -44,6 +42,17 @@ int main(int argc, char **argv) {
connection->setSendTopic(getParameter(n, "topics/scene", NODE_NAME + "/scene/update")); connection->setSendTopic(getParameter(n, "topics/scene", NODE_NAME + "/scene/update"));
connector.addConnection(std::move(connection)); connector.addConnection(std::move(connection));
auto clientControllers = getPrivateParameter<std::vector<std::string>>("client_controllers");
ROS_INFO_STREAM("[" << NODE_NAME << "] Connecting to " << clientControllers.size() << " client controllers.");
for (const auto &client : clientControllers) {
ROS_INFO_STREAM("[" << NODE_NAME << "] Connecting to client at " << client << ".");
std::unique_ptr<NngConnection> client_connection = std::make_unique<NngConnection>(client, false);
client_connection->setSendTopic(getParameter<std::string>(n, "topics/selection", "selection"));
client_connection->setReceiveTopic("client_scene");
connector.addConnection(std::move(client_connection));
}
// scene loading is not required, the scene is updated by moveit // scene loading is not required, the scene is updated by moveit
Object *robot = nullptr; Object *robot = nullptr;
...@@ -56,9 +65,13 @@ int main(int argc, char **argv) { ...@@ -56,9 +65,13 @@ int main(int argc, char **argv) {
ros::Timer timer = n.createTimer(ros::Duration(10), [&connector]( ros::Timer timer = n.createTimer(ros::Duration(10), [&connector](
const ros::TimerEvent &event) { connector.sendScene(); }); // send a scene every ten seconds const ros::TimerEvent &event) { connector.sendScene(); }); // send a scene every ten seconds
auto selectionMessageCallback = [&currentlyPickedBox, &connector, &robot, &selectedBox, &selectedBin]( auto selectionMessageCallback = [&currentlyPickedBox, &connector, &robot, &selectedBox, &selectedBin, &n](
const Selection &selection) { const Selection &selection) {
// forward the selection to the clients
connector.sendToAll(getParameter<std::string>(n, "topics/selection", "selection"),
selection.SerializeAsString());
if (currentlyPickedBox.has_value()) { if (currentlyPickedBox.has_value()) {
ROS_WARN_STREAM("[" << NODE_NAME << "] Unable to accept selections while picking is in progress."); ROS_WARN_STREAM("[" << NODE_NAME << "] Unable to accept selections while picking is in progress.");
return; return;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment