diff --git a/garage.cpp b/garage.cpp index fb2a33765d7e81b4bf87f635b71ac559595c1f75..ee8d9f32dc0ac7258861cec6912d818f3403e2dd 100644 --- a/garage.cpp +++ b/garage.cpp @@ -2,19 +2,39 @@ #include <std_msgs/Int32.h> #include <std_msgs/String.h> -void callback(std_msgs::Int32ConstPtr& ptr) { +int zahl = 5; +int status = 0; +ros::Time zeit; +void callback(const std_msgs::Int32::ConstPtr& ptr) { + zahl = ptr->data; + ROS_INFO("TOPIC received zahl %i", zahl); + status++; + zeit = ros::Time::now(); } int main(int argc, char**argv){ ros::init(argc, argv, "garage"); ros::NodeHandle nh; - ros::Publisher pub = nh.advertise<std_msgs::Int32>("position", 1); + ros::Publisher pub = nh.advertise<std_msgs::Int32>("/position", 1); if (!pub) throw "publisher failed"; - std_msgs::Int32 x; - x.data = 6; - while(pub.getNumSubscribers() == 0) {}; - ros::Duration(3).sleep(); - pub.publish(x); - ros::Duration(5).sleep(); + + ros::Subscriber sub = nh.subscribe("/command", 1, callback); + // wait till subscribers initialized + while (pub.getNumSubscribers() == 0) { ros::spinOnce(); }; + zeit = ros::Time::now(); + ros::Rate rate(20); + while (ros::ok()){ + ros::spinOnce(); + rate.sleep(); + if (ros::Time::now().toSec() - zeit.toSec() > 10) { + status = ++status % 4; + if (status == 1) { + std_msgs::Int32 x; + x.data = ++zahl; + pub.publish(x); + } + zeit = ros::Time::now(); + } + } } \ No newline at end of file diff --git a/main.cpp b/main.cpp index df0d42d3b34f0fd21ed4610236e8206f90fc3372..15f96998be161279e3d73dead2f437b937f23d91 100644 --- a/main.cpp +++ b/main.cpp @@ -4,6 +4,11 @@ #include <netinet/in.h> #include <arpa/inet.h> #include <std_msgs/Int32.h> +#include <boost/make_shared.hpp> +#include <memory> + +const std::string IP = "127.0.0.1"; +const uint16_t PORT = 8080; typedef uint8_t byte; @@ -122,7 +127,7 @@ const int32_t ACK_SINGLE = 1 << 31; // 32 Bit int with most significant bit set // ROS uses little endian for its messages const bool SYS_IS_BIG_ENDIAN = htonl(47) == 47; - +int socket_fd; struct Channel { std::string name; int32_t identifier; @@ -143,9 +148,13 @@ struct Mapping { // nullptr if not used // needs to make sure count of bytes used gets added to *int (parameter) // to indicate beginning of next field - std::vector<int32_t(*)(byte*, int*)> converters; + std::vector<int32_t (*)(byte*, int*)> converters_to_TRON; + // similarly the conversion to a topic field takes the value from TRON, + // a pointer to the buffer at which to add the bytes and + // a pointer to an integer to increment by the amount of bytes added + std::vector<void (*)(int32_t, byte*, int*)> converters_to_topics; - Channel *chan; // TRON channel + int channel_index; // TRON channel /* note: all vectors need to have the same size example: topic = "example_topic" @@ -167,21 +176,29 @@ struct Mapping { }; std::vector<Mapping> mappings; -int socket_fd; +std::vector<ros::Publisher> input_publishers; +// custom callbacks for TRONs messages taking channel name, pointer to values and publisher to publish to +// key is pair of topic name and channel name in this order +// not using custom fallbacks leads to mapping_callback_to_topics being used +std::map<std::pair<std::string, std::string>, void(*)(std::string, int32_t*, ros::Publisher&)> input_callbacks; +// subscribers already have callbacks +std::vector<ros::Subscriber> output_subscribers; // keep track of acknowledgements that are missing /* note: since communication is asynchronous this value can only be compared reliably with 0 after testing is terminated */ -/* note: TRON only acknowledges if virtual clock is used */ +/* note: TRON only acknowledges if virtual clock is used, which it is not (yet) */ int acks_missing = 0; -// tron specific functions ----------------------------------------------------- +// specific functions ----------------------------------------------------- -Channel* get_chan(std::string chan_name) { - Channel *chan = nullptr; - for (Channel& cur_chan : channels) if (cur_chan.name == chan_name) chan = &cur_chan; - if (chan == nullptr) throw "channel not declared"; - return chan; +int get_channel_index(std::string chan_name) { + int i = 0; + for (Channel& cur_chan : channels) { + if (cur_chan.name == chan_name) return i; + i++; + } + throw "channel not declared"; } void get_error_msg(int32_t errorcode) { @@ -199,9 +216,9 @@ void get_error_msg(int32_t errorcode) { void add_var_to_channel(std::string channel, bool is_input, std::string var) { byte msg[6 + var.length()]; - Channel *chan = get_chan(channel); + Channel& chan = channels[get_channel_index(channel)]; msg[0] = is_input ? ADD_VAR_TO_INPUT : ADD_VAR_TO_OUTPUT; - add_int32_in_network_order(chan->identifier, msg, 1); + add_int32_in_network_order(chan.identifier, msg, 1); msg[5] = (byte) var.length(); for (int i = 0; i < var.length(); i++) msg[6+i] = var.at(i); ROS_INFO("attaching variable %s to channel %s", var.c_str(), channel.c_str()); @@ -209,7 +226,7 @@ void add_var_to_channel(std::string channel, bool is_input, std::string var) { int32_t ack = get_int_socket(socket_fd); if (ack < 0) get_error_msg(ack); ROS_INFO("success: attached variable"); - chan->vars.push_back(var); + chan.vars.push_back(var); } void send_channel_decl_msg(bool is_input, std::string name) { @@ -301,44 +318,119 @@ void report_now(std::string chan_name, uint16_t var_count, int32_t *vars){ void init_topic_channel_mapping(const std::string topic, const std::string channel){ Mapping map; map.topic = topic; - map.chan = get_chan(channel); + map.channel_index = get_channel_index(channel); mappings.push_back(map); } -void add_var_to_mapping(int mapping_index, int byte_offset, - int32_t (*conv)(byte*, int*) = nullptr) { - Mapping& map = mappings[mapping_index]; +void add_var_to_mapping(std::string topic, std::string channel, int byte_offset, + int32_t (*conv_to_TRON)(byte*, int*) = nullptr, + void (*conv_to_topic)(int32_t, byte*, int*) = nullptr) { + int index = -1; + for (int i = 0; i < mappings.size(); i++) { + Mapping& map = mappings[i]; + if (channels[map.channel_index].name == channel && map.topic == topic) index = i; + } + if (index < 0) throw "did not find mapping"; + + Mapping& map = mappings[index]; map.byte_offset.push_back(byte_offset); - map.converters.push_back(conv); + map.converters_to_TRON.push_back(conv_to_TRON); + map.converters_to_topics.push_back(conv_to_topic); } -// this function is a little cumbersome to use -// recommended to use init_channel_topic_mapping and add_var_to_mapping instead -void create_channel_topic_mapping_with_vars(const std::string topic, - const std::string channel, - std::vector<int> byte_offset, - std::vector<int32_t (*)(byte*, int*)> conv){ - init_topic_channel_mapping(topic, channel); - for (int i = 0; i < byte_offset.size(); i++) { - add_var_to_mapping(mappings.size(), byte_offset[i], conv[i]); + +// callback reports to TRON like defined in mappings +template<class T> +void mapping_callback_to_TRON(const ros::MessageEvent<T>& event){ + std::string topic = event.getConnectionHeader().at("topic"); + byte *bytes = reinterpret_cast<byte*>(event.getMessage().get()); + for (Mapping& mapping : mappings) { + if (mapping.topic.c_str() == topic) { + int var_count = mapping.byte_offset.size(); + int32_t vars[var_count * 4]; + int next_pos = 0; + for (int i = 0; i < var_count; i++) { + next_pos += mapping.byte_offset[i]; + if (mapping.converters_to_TRON[i] == nullptr) { + if (SYS_IS_BIG_ENDIAN) + vars[i] = network_bytes_to_int_32(&bytes[next_pos]); + else vars[i] = *reinterpret_cast<int32_t*>(&bytes[next_pos]); + next_pos += 4; + continue; + } else { + vars[i] = mapping.converters_to_TRON[i](&bytes[next_pos], &next_pos); + } + } + report_now(channels[mapping.channel_index].name, var_count, vars); + } } } -void configuration_phase(){ +// callback publishing to topics like defined in mappings +template<class T> +void mapping_callback_to_topics(std::string channel, int32_t* vars, ros::Publisher& pub){ + for (Mapping& mapping : mappings) { + if (channels[mapping.channel_index].name == channel) { + boost::shared_ptr<T> shared_ptr = boost::make_shared<T>(); + byte* ptr = reinterpret_cast<byte*>(shared_ptr.get()); + int next_pos = 0; + for (int i = 0; i < channels[mapping.channel_index].vars.size(); i++){ + int32_t val = vars[i]; + next_pos += mapping.byte_offset[i]; + if (mapping.converters_to_topics[i] == nullptr) { + byte* val_bytes; + if (SYS_IS_BIG_ENDIAN){ + uint32_t switched_byte_order = htonl(*reinterpret_cast<uint32_t*>(&val)); + val = *reinterpret_cast<int32_t*>(&switched_byte_order); + } + val_bytes = reinterpret_cast<byte*>(&val); + ptr[next_pos++] = val_bytes[0]; + ptr[next_pos++] = val_bytes[1]; + ptr[next_pos++] = val_bytes[2]; + ptr[next_pos++] = val_bytes[3]; + } else { + mapping.converters_to_topics[i](val, &ptr[next_pos], &next_pos); + } + } + pub.publish(shared_ptr); + } + } +} + +void configuration_phase(ros::NodeHandle& nh){ /* note: for configuration phase maximum message length is 256 Bytes, therefore heap allocation can be avoided most of the time in called functions */ + + // channels: declare, init mapping, add vars to TRON channel, add vars to mapping + // for input: add publisher to input_publishers + // add callback for mapping (key is pair of topic and channel name) + // for output: add subscribers to output_subscribers + send_channel_decl_msg(true, "ausloesen"); + init_topic_channel_mapping("/command", "ausloesen"); add_var_to_channel("ausloesen", true, "zahl"); - + add_var_to_mapping("/command", "ausloesen", 0); + input_publishers.push_back(nh.advertise<std_msgs::Int32>("/command", 1)); + input_callbacks[std::pair<std::string, std::string>("/command", "ausloesen")] + = mapping_callback_to_topics<std_msgs::Int32>; + // output channels: declare, init mapping, add vars to TRON channel and mapping + // add subscriber to list send_channel_decl_msg(false, "position"); init_topic_channel_mapping("/position", "position"); add_var_to_channel("position", false, "zahl"); - add_var_to_mapping(0, 0); + add_var_to_mapping("/position", "position", 0); + output_subscribers.push_back(nh.subscribe("/position", 1, + mapping_callback_to_TRON<std_msgs::Int32>)); // not obvious in documentation: local variables are not supported - //add_var_to_channel(socketfd, "ausloesen", "lokal"); + // add_var_to_channel(socketfd, "ausloesen", "lokal"); uint64_t microseconds = 1000000; // one second // documentation states 2 signed integers are used for some reason set_time_unit_and_timeout(microseconds, 100); + + // wait till subscribers initialized + for (ros::Publisher& pub : input_publishers) { + while (pub.getNumSubscribers() == 0) { ros::Duration(1).sleep(); }; + } } void process_TRONs_msgs(){ @@ -360,7 +452,7 @@ void process_TRONs_msgs(){ // bytes are channel identifier // find corresponding channel - Channel *chan = nullptr; + const Channel *chan = nullptr; for (Channel& cur_chan : channels) if (cur_chan.identifier == next) chan = &cur_chan; /* note: this only happens if message number 12 in TRON User Manual is received, which should not be the case according to the documentation */ @@ -377,14 +469,30 @@ void process_TRONs_msgs(){ uint16_t var_count = network_bytes_to_uint_16(bytes); ROS_INFO("got variable count %i", var_count); + int32_t vals[var_count]; // process variables for (uint16_t i = 0; i < var_count; i++) { recv(socket_fd, bytes, 4, MSG_DONTWAIT); next = network_bytes_to_int_32(bytes); std::string var = chan->vars[i]; ROS_INFO("got variable number %i: value of %s is %i", i+1, var.c_str(), next); + vals[i] = next; } + for (Mapping& map : mappings) + if (channels[map.channel_index].name == chan->name) + for (ros::Publisher& pub : input_publishers){ + if (pub.getTopic() == map.topic) { + if (input_callbacks.find( + std::pair<std::string, std::string>(map.topic, channels[map.channel_index].name)) + != input_callbacks.end()) + input_callbacks.at(std::pair<std::string, std::string>(map.topic, channels[map.channel_index].name)) + (chan->name, vals, pub); + else throw "did not find callback"; + break; + } + } + // send acknowledgement add_int32_in_network_order(ACK_SINGLE, bytes, 0); ROS_INFO("sending acknowledgement"); @@ -392,51 +500,14 @@ void process_TRONs_msgs(){ } } -// callback reports to TRON like defined in mappings ----------------------------- -template<class T> -void mapCallback(const ros::MessageEvent<T>& event){ - std::string topic = event.getConnectionHeader().at("topic"); - byte *bytes = reinterpret_cast<byte*>(event.getMessage().get()); - for (Mapping& mapping : mappings) { - if (mapping.topic.c_str() == topic) { - int var_count = mapping.byte_offset.size(); - int32_t vars[var_count * 4]; - int last_pos = 0; - for (int i = 0; i < var_count; i++) { - last_pos += mapping.byte_offset[i]; - if (mapping.converters[i] == nullptr) { - if (SYS_IS_BIG_ENDIAN) - vars[i] = network_bytes_to_int_32(&bytes[last_pos]); - else vars[i] = *reinterpret_cast<int32_t*>(&bytes[last_pos]); - last_pos += 4; - continue; - } else { - vars[i] = mapping.converters[i](&bytes[last_pos], &last_pos); - } - } - report_now(mapping.chan->name, var_count, vars); - } - } -} - -// place custom callbacks here (using report_now) - -// ---------------------------------------------------------------- - int main(int argc, char**argv){ ros::init(argc, argv, "TRON dapter"); ros::NodeHandle nh; try { - const std::string IP = "127.0.0.1"; - const uint16_t PORT = 8080; socket_fd = create_connected_socket(IP, PORT); - configuration_phase(); - - // subscribe to topics and add callbacks which use report_now - ros::Subscriber sub = nh.subscribe("position", 1, mapCallback<std_msgs::Int32>); - // ----------------------------------------------------------- + configuration_phase(nh); request_start();