diff --git a/main.cpp b/main.cpp index a6243965a90e75616bfe24f9b5b98a47a0e01caf..a1c74095b8be7d2601f224fd4ee1b8196e338962 100644 --- a/main.cpp +++ b/main.cpp @@ -1,67 +1,24 @@ #include <ros/ros.h> -#include <ros/console.h> #include <stdio.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> +#include <std_msgs/Int32.h> typedef uint8_t byte; -// consts for TRON ---------------------------------------------------------- -const byte GET_ERROR_MSG = 127; -const byte DECL_CHAN_INPUT = 1; -const byte DECL_CHAN_OUTPUT = 2; -const byte ADD_VAR_TO_INPUT = 3; -const byte ADD_VAR_TO_OUTPUT = 4; -const byte SET_TIME_UNIT = 5; -const byte SET_TIMEOUT = 6; -const byte REQUEST_START = 64; -const byte ANSWER_START = 0; -const int32_t ACK_SINGLE = -2147483648; // 32 Bit int with most significant bit set to 1 - - -// function declarations -------------------------------------------------------- - -// gets count bytes from socket file descriptor -std::unique_ptr<byte[]> get_bytes_socket(int fd, int count); +// some helper functions ----------------------------------------------- // log bytes sent/received -void byte_info(const byte *msg, int msg_length, bool send=true); - -// wrapping get_bytes_socket for converting to 32 bit integer -int32_t get_int_socket(int fd); - -// retrieves error message and prints it, then throws -void get_error_msg(int fd, int32_t errorcode); // needs fix see implementation - -// converts num to network order and adds it to msg starting from index -void add_int32_in_network_order(int32_t num, byte *msg, int index); - -// wraps write() for printing and throwing on errors -void send_bytes(int fd, const byte* bytes, int length); - -void set_time_unit_and_timeout(int fd, uint64_t microseconds, int32_t timeout); - -// attaches UPPAAL variable called var to channel -void add_var_to_channel(int fd, std::string channel, bool is_input, std::string var); - -// reports to var_count variables to channel named chan_name -void report_now(int fd, std::string chan_name, unsigned short var_count, int32_t *vars); - -// throws if not answered with byte 0 -void request_start(int fd); -// ------------------------------------------------------------------------------- +inline void byte_info(const byte* buf, int buf_length, bool send=true){ + std::stringstream strstr; + strstr << (send ? "sending" : "received") << " bytes:"; + for (int i = 0; i < buf_length; i++) strstr << " " << (int)buf[i]; + ROS_INFO(strstr.str().c_str()); +} +// gets count bytes from socket file descriptor (with timeout) const double SECONDS_BEFORE_TIMEOUT = 30; -struct Channel { - int32_t identifier; - bool is_input; - std::vector<std::string> vars; // associated variables in Uppaal - Channel(int32_t id, bool is_input) : identifier(id), is_input(is_input){}; - Channel() = default; // default constructor needed for std::map -}; -std::map<std::string, Channel> channels; - std::unique_ptr<byte[]> get_bytes_socket(int fd, int count){ std::unique_ptr<byte[]> arr = std::make_unique<byte[]>(count); int already_read = 0; @@ -81,88 +38,128 @@ std::unique_ptr<byte[]> get_bytes_socket(int fd, int count){ return arr; // no explicit move needed since return value is rvalue }; -int32_t bytes_to_int_32(byte *ptr){ - uint32_t h = ntohl(*reinterpret_cast<uint32_t*>(ptr)); +inline int32_t bytes_to_int_32(byte *buf){ + uint32_t h = ntohl(*reinterpret_cast<uint32_t*>(buf)); return *reinterpret_cast<int32_t*>(&h); } -uint16_t bytes_to_uint_16(byte *ptr) { - return ntohs(*reinterpret_cast<uint16_t*>(ptr)); +inline uint16_t bytes_to_uint_16(byte *buf) { + return ntohs(*reinterpret_cast<uint16_t*>(buf)); } +// wrapping get_bytes_socket for converting to 32 bit integer int32_t get_int_socket(int fd) { auto ack = get_bytes_socket(fd, 4); return bytes_to_int_32(ack.get()); } -void add_int32_in_network_order(int32_t num, byte *msg, int index){ +// converts num to network order and adds it to byte array starting from index +void add_int32_in_network_order(int32_t num, byte *buf, int index){ uint32_t n = htonl(*reinterpret_cast<uint32_t*>(&num)); byte* bytes = reinterpret_cast<byte*>(&n); - msg[index] = bytes[0]; - msg[++index] = bytes[1]; - msg[++index] = bytes[2]; - msg[++index] = bytes[3]; + buf[index] = bytes[0]; + buf[++index] = bytes[1]; + buf[++index] = bytes[2]; + buf[++index] = bytes[3]; } -void byte_info(const byte* msg, int msg_length, bool send){ - std::stringstream strstr; - strstr << (send ? "sending" : "received") << " bytes:"; - for (int i = 0; i < msg_length; i++) strstr << " " << (int)msg[i]; - ROS_INFO(strstr.str().c_str()); +// wraps write() for printing and throwing on errors +inline void send_bytes(int fd, const byte *buf, int length){ + byte_info(buf, length); + int ret = write(fd, (void*) buf, length); + if (ret < 0) throw "writing failed"; } -void send_bytes(int fd, const byte* bytes, int length){ - byte_info(bytes, length); - int ret = write(fd, (void*) bytes, length); - if (ret < 0) throw "writing failed"; +// returns false if nothing more to read and true if 4 bytes are read successfully +// used to reduce overhead in testing phase +inline bool read_4_bytes_nonblock(int fd, byte *buf) { + int bytes_recv = recv(fd, buf, 4, MSG_DONTWAIT); + if (bytes_recv == -1) return false; // nothing more to read + if (bytes_recv == 0) throw "connection was closed"; + if (bytes_recv != 4) throw "could not read full 4 bytes"; + byte_info(buf, 4, false); + return true; } -void get_error_msg(int fd, int32_t errorcode) { +// consts for TRON ---------------------------------------------------------- + +const byte GET_ERROR_MSG = 127; +const byte DECL_CHAN_INPUT = 1; +const byte DECL_CHAN_OUTPUT = 2; +const byte ADD_VAR_TO_INPUT = 3; +const byte ADD_VAR_TO_OUTPUT = 4; +const byte SET_TIME_UNIT = 5; +const byte SET_TIMEOUT = 6; +const byte REQUEST_START = 64; +const byte ANSWER_START = 0; +const int32_t ACK_SINGLE = 1 << 31; // 32 Bit int with most significant bit set to 1 + +// global vars ------------------------------------------------------------- + +struct Channel { + int32_t identifier; + bool is_input; + std::vector<std::string> vars; // associated variables in Uppaal + Channel(int32_t id, bool is_input) : identifier(id), is_input(is_input){}; + Channel() = default; // default constructor needed for std::map +}; +std::map<std::string, Channel> channels; + +int socket_fd; + +// keep track of acknowledgements that are missing +/* note: since communication is asynchronous this value can only be +compared reliably with 0 after testing is terminated */ +int acks_missing = 0; + +// tron specific functions ----------------------------------------------------- + +void get_error_msg(int32_t errorcode) { ROS_WARN("got error, trying to get corresponding message"); - std::unique_ptr<byte[]> get_err_msg_msg = std::make_unique<byte[]>(5); + byte get_err_msg_msg[5]; get_err_msg_msg[0] = GET_ERROR_MSG; // get error msg code - add_int32_in_network_order(errorcode, get_err_msg_msg.get(), 1); - send_bytes(fd, get_err_msg_msg.get(), 5); // connection is closed after this write? - byte err_msg_length = get_bytes_socket(fd, 1)[0]; - auto err_msg = get_bytes_socket(fd, err_msg_length); + add_int32_in_network_order(errorcode, get_err_msg_msg, 1); + send_bytes(socket_fd, get_err_msg_msg, 5); // connection is closed after this write? + byte err_msg_length = get_bytes_socket(socket_fd, 1)[0]; + auto err_msg = get_bytes_socket(socket_fd, err_msg_length); std::string msg_str = std::string(reinterpret_cast<char*>(err_msg.get()), (size_t) err_msg_length); ROS_FATAL("TRON sent error message: %s", msg_str.c_str()); throw "got error from TRON"; } -void add_var_to_channel(int fd, std::string channel, bool is_input, std::string var) { - std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(6 + var.length()); +void add_var_to_channel(std::string channel, bool is_input, std::string var) { + byte msg[6 + var.length()]; if (channels.find(channel) == channels.end()) throw "channel not declared"; Channel& chan = channels.at(channel); msg[0] = is_input ? ADD_VAR_TO_INPUT : ADD_VAR_TO_OUTPUT; - add_int32_in_network_order(chan.identifier, msg.get(), 1); + add_int32_in_network_order(chan.identifier, msg, 1); msg[5] = (byte) var.length(); for (int i = 0; i < var.length(); i++) msg[6+i] = var.at(i); ROS_INFO("attaching variable %s to channel %s", var.c_str(), channel.c_str()); - send_bytes(fd, msg.get(), 6 + var.length()); - int32_t ack = get_int_socket(fd); - if (ack < 0) get_error_msg(fd, ack); + send_bytes(socket_fd, msg, 6 + var.length()); + int32_t ack = get_int_socket(socket_fd); + if (ack < 0) get_error_msg(ack); ROS_INFO("success: attached variable"); chan.vars.push_back(var); } -void send_channel_decl_msg(int fd, bool is_input, std::string name) { +void send_channel_decl_msg(bool is_input, std::string name) { // prepare packet size_t msg_length = 2 + name.length(); - std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(msg_length); + byte msg[msg_length]; msg[0] = is_input ? DECL_CHAN_INPUT : DECL_CHAN_OUTPUT; msg[1] = name.length(); for (int i = 2, c = 0; i < msg_length; i++, c++) msg[i] = name[c]; // send packet ROS_INFO("declaring channel %s as %s", name.c_str(), (is_input ? "input" : "output")); - byte_info(msg.get(), msg_length); - write(fd, (void*) msg.get(), msg_length); + byte_info(msg, msg_length); + write(socket_fd, (void*) msg, msg_length); // get answer from TRON - int32_t channel_identifier = get_int_socket(fd); + int32_t channel_identifier = get_int_socket(socket_fd); if (channel_identifier < 0) { // error handling - get_error_msg(fd, channel_identifier); + get_error_msg(channel_identifier); } if (channel_identifier == 0) throw "did not get channel identifier"; // assigned channel ID successfully @@ -170,8 +167,8 @@ void send_channel_decl_msg(int fd, bool is_input, std::string name) { channels[name] = Channel(channel_identifier, is_input); } -void set_time_unit_and_timeout(int fd, unsigned long long microseconds, int32_t timeout){ - std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(9); +void set_time_unit_and_timeout(uint64_t microseconds, int32_t timeout){ + byte msg[9]; msg[0] = SET_TIME_UNIT; byte *microseconds_bytes = reinterpret_cast<byte*>(µseconds); @@ -183,32 +180,33 @@ void set_time_unit_and_timeout(int fd, unsigned long long microseconds, int32_t for (int i = 0; i < 8; i++) msg[i+1] = microseconds_bytes[7-i]; } ROS_INFO("setting time unit: %i microseconds", microseconds); - send_bytes(fd, msg.get(), 9); - int32_t ack = get_int_socket(fd); - if (ack != 0) get_error_msg(fd, ack); + send_bytes(socket_fd, msg, 9); + int32_t ack = get_int_socket(socket_fd); + if (ack != 0) get_error_msg(ack); ROS_INFO("success: set time unit"); - msg.reset(new byte[5]); msg[0] = SET_TIMEOUT; - add_int32_in_network_order(timeout, msg.get(), 1); + add_int32_in_network_order(timeout, msg, 1); ROS_INFO("setting timeout to %i units", timeout); - send_bytes(fd, msg.get(), 5); - ack = get_int_socket(fd); - if (ack != 0) get_error_msg(fd, ack); + send_bytes(socket_fd, msg, 5); + ack = get_int_socket(socket_fd); + if (ack != 0) get_error_msg(ack); ROS_INFO("success: set timeout"); } - -void request_start(int fd) { + +void request_start() { + /* documentation confuses codes for start and getErrorMessage, actually used: + 64 is start + 127 is gerErrorMessage */ ROS_INFO("requesting start"); byte start = REQUEST_START; - send_bytes(fd, &start, 1); - byte answer = get_bytes_socket(fd, 1)[0]; + send_bytes(socket_fd, &start, 1); + byte answer = get_bytes_socket(socket_fd, 1)[0]; if (answer != ANSWER_START) throw "starting failed"; ROS_INFO("success: starting test phase"); } -// nach input verarbeitung weiter machen -void report_now(int fd, std::string chan_name, uint16_t var_count, int32_t *vars){ +void report_now(std::string chan_name, uint16_t var_count, int32_t *vars){ std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(6 + 4 * var_count); if (channels.find(chan_name) == channels.end()) throw "channel not declared"; Channel chan = channels.at(chan_name); @@ -223,109 +221,144 @@ void report_now(int fd, std::string chan_name, uint16_t var_count, int32_t *vars for (unsigned short i = 0; i < var_count; i++) add_int32_in_network_order(vars[i], msg.get(), 6 + i * 4); - send_bytes(fd, msg.get(), 6 + 4 * var_count); + ROS_INFO("sending to output channel %s", chan_name.c_str()); + if (var_count == 0) ROS_INFO("no variables attached"); + for (unsigned short i = 0; i < var_count; i++) + ROS_INFO("attached value %i to variable %s", vars[i], chan.vars[i].c_str()); + send_bytes(socket_fd, msg.get(), 6 + 4 * var_count); + acks_missing++; } -int main(int argc, char**argv){ - ros::init(argc, argv, "TRON dapter"); - ros::NodeHandle nh; - - const std::string IP = "127.0.0.1"; - const short PORT = 8080; - +// returns file descriptor +int create_connected_socket(std::string IP, uint16_t port){ int socketfd; if ((socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { - ROS_FATAL("socket could not be created"); - ros::shutdown(); + throw "failed to create socket"; } ROS_INFO("socket created successfully"); struct sockaddr_in addr; addr.sin_family = AF_INET; - addr.sin_port = htons(PORT); + addr.sin_port = htons(port); { int x = inet_pton(AF_INET, IP.c_str(), &addr.sin_addr); - if (x == 1) { - ROS_INFO("valid internet address"); - } else { - ROS_FATAL("internet address could not be converted"); - ros::shutdown(); + if (x != 1) { + throw "IP could not be converted"; } } if (connect(socketfd, (struct sockaddr*)&addr, sizeof(sockaddr_in)) < 0) { - ROS_FATAL("connection failed"); - ros::shutdown(); + throw "failed to connect"; } - ROS_INFO("successfully connected to TRON"); - try { - send_channel_decl_msg(socketfd, true, "ausloesen"); - add_var_to_channel(socketfd, "ausloesen", true, "zahl"); - send_channel_decl_msg(socketfd, false, "position"); - add_var_to_channel(socketfd, "position", false, "zahl"); - // nicht in dokumentation: lokale variablen nicht möglich - //add_var_to_channel(socketfd, "ausloesen", "lokal"); - - unsigned long long microseconds = 1000000; // one second - // Dokumentation arbeitet mit 2 unsigned ints, äußerst mystisch - set_time_unit_and_timeout(socketfd, microseconds, 10); - - // FALSCH IN DOKUMENTATION: Start und error msg byte codes vertauscht - // 64 ist start - // 127 ist gerErrorMessage - request_start(socketfd); // communication not synchronous anymore after start - - // listen to topics - int acks_needed = 0; - ros::Rate rate(1); - while (ros::ok()) { - while (true){ // checking for messages arrived - // get 4 bytes at a time - std::unique_ptr<byte[]> bytes = std::make_unique<byte[]>(4); - int bytes_recv = recv(socketfd, bytes.get(), 4, MSG_DONTWAIT); - if (bytes_recv == -1) break; // nothing more to read - if (bytes_recv == 0) throw "connection was closed"; - if (bytes_recv != 4) throw "could not read full ack or chanID"; - byte_info(bytes.get(), 4, false); - int32_t next = bytes_to_int_32(bytes.get()); - // if those 4 bytes are acknowledgement decrement amount of acks needed; - if (next == ACK_SINGLE) { - if (--acks_needed < 0) throw "too many acknowledgements"; - ROS_INFO("got acknowledgement for output"); - continue; - } - std::string chan_name; - for (std::pair<const std::string, Channel>& pair : channels) { - if (pair.second.identifier == next) chan_name = pair.first; - } - if (chan_name.empty()) throw "channel could not be identified"; - ROS_INFO("got channel identifier (%s) for input", chan_name.c_str()); - // channel identified, assuming following bytes are correct - bytes = std::make_unique<byte[]>(2); - recv(socketfd, bytes.get(), 2, MSG_DONTWAIT); - byte_info(bytes.get(), 2, false); - uint16_t var_count = bytes_to_uint_16(bytes.get()); - ROS_INFO("got variable count %i", var_count); - for (uint16_t i = 0; i < var_count; i++) { - bytes = std::make_unique<byte[]>(4); - recv(socketfd, bytes.get(), 4, MSG_DONTWAIT); - next = bytes_to_int_32(bytes.get()); - Channel& c = channels.at(chan_name); - std::string var = channels.at(chan_name).vars[i]; - ROS_INFO("got variable number %i: value of %s is %i", i+1, channels.at(chan_name).vars[i].c_str(), next); - // TODO transfer message to topic - } - bytes = std::make_unique<byte[]>(4); - add_int32_in_network_order(ACK_SINGLE, bytes.get(), 0); - ROS_INFO("sending acknowledgement"); - send_bytes(socketfd, bytes.get(), 4); - } - ros::spinOnce(); - rate.sleep(); + ROS_INFO("successfully connected"); + return socketfd; +} + +void configuration_phase(){ + /* note: for configuration phase maximum message length is 256 Bytes, + therefore heap allocation can be avoided most of the time in called functions */ + send_channel_decl_msg(true, "ausloesen"); + add_var_to_channel("ausloesen", true, "zahl"); + send_channel_decl_msg(false, "position"); + add_var_to_channel("position", false, "zahl"); + // not obvious in documentation: local variables are not supported + //add_var_to_channel(socketfd, "ausloesen", "lokal"); + uint64_t microseconds = 1000000; // one second + // documentation states 2 signed integers are used for some reason + set_time_unit_and_timeout(microseconds, 100); +} + +// TODO implement callbacks/topic messages +void process_TRONs_msgs(){ + /* note: TRONs communication after start is not guaranteed to be synchronous, + thus incoming messages must be checked for their content */ + while (true){ + // get 4 bytes at a time as an int32 + byte bytes[4]; + if (!read_4_bytes_nonblock(socket_fd, bytes)) + break; // no bytes left to process + int32_t next = bytes_to_int_32(bytes); + + // bytes are acknowledgement + if (next == ACK_SINGLE) { + if (--acks_missing < 0) throw "too many acknowledgements"; + ROS_INFO("got acknowledgement for output"); + continue; } - // erst nach ausloesen input wäre akzeptierbar - int32_t zahl = 4; - //report_now(socketfd, "position", 1, &zahl); + // bytes are channel identifier + // find corresponding channel + std::string chan_name; + for (std::pair<const std::string, Channel>& pair : channels) { + if (pair.second.identifier == next) chan_name = pair.first; + } + + if (chan_name.empty()) + /* note: this only happens if message number 12 in TRON User Manual is received, + which should not be the case according to the documentation */ + throw "channel could not be identified"; + ROS_INFO("got channel identifier (%s) for input", chan_name.c_str()); + + // channel identified, assuming all following bytes are correct + + // get number of variables + recv(socket_fd, bytes, 2, MSG_DONTWAIT); + byte_info(bytes, 2, false); + uint16_t var_count = bytes_to_uint_16(bytes); + ROS_INFO("got variable count %i", var_count); + + // process variables + for (uint16_t i = 0; i < var_count; i++) { + recv(socket_fd, bytes, 4, MSG_DONTWAIT); + next = bytes_to_int_32(bytes); + Channel& c = channels.at(chan_name); + std::string var = channels.at(chan_name).vars[i]; + ROS_INFO("got variable number %i: value of %s is %i", i+1, channels.at(chan_name).vars[i].c_str(), next); + // TODO transfer message to topic possibly via custom callback + } + + // send acknowledgement + add_int32_in_network_order(ACK_SINGLE, bytes, 0); + ROS_INFO("sending acknowledgement"); + send_bytes(socket_fd, bytes, 4); + } +} + +void testCallback(const std_msgs::Int32::ConstPtr& msg){ + int32_t x = msg->data; + report_now("position", 1, &x); +} + +int main(int argc, char**argv){ + ros::init(argc, argv, "TRON dapter"); + ros::NodeHandle nh; + ros::Publisher pub = nh.advertise<std_msgs::Int32>("test_topic", 1); + if (!pub) throw "publisher nicht erstellt"; + + try { + const std::string IP = "127.0.0.1"; + const uint16_t PORT = 8080; + socket_fd = create_connected_socket(IP, PORT); + + configuration_phase(); + + // subscribe to topics and add callbacks which use report_now------------------------------------------- + ros::Subscriber sub = nh.subscribe("test_topic", 1, testCallback); + // ----------------------------------------------------------- + + request_start(); + + // testing phase loop + ros::Rate test_phase_freq(1); + int y = 2; + while (ros::ok()) { + std_msgs::Int32 x; + x.data = 6; + if (y==2) pub.publish(x); + y++; // TRON does not seem to send acknowledgements back + process_TRONs_msgs(); + ros::spinOnce(); + test_phase_freq.sleep(); + } } catch (const char* err){ ROS_FATAL("shutting down: %s", err); ros::shutdown();