Skip to content
Snippets Groups Projects
Select Git revision
  • master default protected
1 result

main.cpp

Blame
  • main.cpp 19.09 KiB
    #include <ros/ros.h>
    #include <stdio.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <std_msgs/Int32.h>
    #include <boost/make_shared.hpp>
    #include <memory>
    
    const std::string IP = "127.0.0.1";
    const uint16_t PORT = 8080;
    
    typedef uint8_t byte;
    
    // some helper functions -----------------------------------------------
    
    // log bytes sent/received
    inline void byte_info(const byte* buf, int buf_length, bool send=true){
        std::stringstream strstr;
        strstr << (send ? "sending" : "received") << " bytes:"; 
        for (int i = 0; i < buf_length; i++) strstr << " " << (int)buf[i];
        ROS_INFO(strstr.str().c_str());
    }
    
    // gets count bytes from socket file descriptor (with timeout)
    const double SECONDS_BEFORE_TIMEOUT = 30;
    std::unique_ptr<byte[]> get_bytes_socket(int fd, int count){
        std::unique_ptr<byte[]> arr = std::make_unique<byte[]>(count);
        int already_read = 0;
        ros::Time start_time = ros::Time::now();
        int success = 0;
        while (already_read < count) {
            success = recv(fd, &arr[already_read], count-already_read, MSG_DONTWAIT);
            if (success == 0) throw "connection was closed by TRON";
            if (success == -1){
                if (ros::Time::now().toSec() - start_time.toSec() >= SECONDS_BEFORE_TIMEOUT)
                    throw "timeout while reading bytes from socket file descriptor";
                continue;
            }
            already_read += success; // read returns number of bytes read
        }
        byte_info(arr.get(), already_read, false);
        return arr; // no explicit move needed since return value is rvalue
    };
    
    inline int32_t network_bytes_to_int_32(byte *buf){
        uint32_t h = ntohl(*reinterpret_cast<uint32_t*>(buf));
        return *reinterpret_cast<int32_t*>(&h);
    }
    
    inline uint16_t network_bytes_to_uint_16(byte *buf) {
        return ntohs(*reinterpret_cast<uint16_t*>(buf));
    }
    
    // wrapping get_bytes_socket for converting to 32 bit integer
    int32_t get_int_socket(int fd) {
        auto ack = get_bytes_socket(fd, 4);
        return network_bytes_to_int_32(ack.get());
    }
    
    // converts num to network order and adds it to byte array starting from index
    void add_int32_in_network_order(int32_t num, byte *buf, int index){
        uint32_t n = htonl(*reinterpret_cast<uint32_t*>(&num));
        byte* bytes = reinterpret_cast<byte*>(&n);
        buf[index] = bytes[0];
        buf[++index] = bytes[1];
        buf[++index] = bytes[2];
        buf[++index] = bytes[3];
    }
    
    // wraps write() for printing and throwing on errors
    inline void send_bytes(int fd, const byte *buf, int length){
        byte_info(buf, length);
        int ret = write(fd, (void*) buf, length);
        if (ret < 0) throw "writing failed";
    }
    
    // returns false if nothing more to read and true if 4 bytes are read successfully
    // used to reduce overhead in testing phase
    inline bool read_4_bytes_nonblock(int fd, byte *buf) {
        int bytes_recv = recv(fd, buf, 4, MSG_DONTWAIT);
        if (bytes_recv == -1) return false; // nothing more to read
        if (bytes_recv == 0) throw "connection was closed";
        if (bytes_recv != 4) throw "could not read full 4 bytes";
        byte_info(buf, 4, false);
        return true;
    }
    
    // returns file descriptor
    int create_connected_socket(std::string IP, uint16_t port){
        int socketfd;
        if ((socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
            throw "failed to create socket";
        }
        ROS_INFO("socket created successfully");
    
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        {
            int x = inet_pton(AF_INET, IP.c_str(), &addr.sin_addr);
            if (x != 1) {
                throw "IP could not be converted";
            }
        }
        if (connect(socketfd, (struct sockaddr*)&addr, sizeof(sockaddr_in)) < 0) {
            throw "failed to connect";
        }
        ROS_INFO("successfully connected");
        return socketfd;
    }
    
    // consts for TRON ----------------------------------------------------------
    
    const byte GET_ERROR_MSG = 127;
    const byte DECL_CHAN_INPUT = 1;
    const byte DECL_CHAN_OUTPUT = 2;
    const byte ADD_VAR_TO_INPUT = 3;
    const byte ADD_VAR_TO_OUTPUT = 4;
    const byte SET_TIME_UNIT = 5;
    const byte SET_TIMEOUT = 6;
    const byte REQUEST_START = 64;
    const byte ANSWER_START = 0;
    const int32_t ACK_SINGLE = 1 << 31; // 32 Bit int with most significant bit set to 1
    
    // global vars -------------------------------------------------------------
    
    // ROS uses little endian for its messages
    const bool SYS_IS_BIG_ENDIAN = htonl(47) == 47;
    int socket_fd;
    struct Channel {
        std::string name;
        int32_t identifier;
        bool is_input;
        std::vector<std::string> vars; // associated variables in Uppaal
        Channel(std::string name, int32_t id, bool is_input) : name(name), identifier(id), is_input(is_input){};
    };
    
    // used in struct Mapping for convenience
    std::unique_ptr<Channel> send_channel_decl_msg(bool is_input, std::string name);
    void add_var_to_channel(Channel& chan, bool is_input, std::string var);
    struct Mapping {
        std::string topic; // ROS topic name
    
        // offset in byte for next value (from last value) 
        std::vector<int> byte_offset;
    
        // custom conversion function used if data is not int32 and therefore
        // need to be converted to an int32 first
        // nullptr if not used
        // needs to make sure count of bytes used gets added to *int (parameter)
        // to indicate beginning of next field
        std::vector<int32_t (*)(byte*, int*)> converters_to_TRON;
        // similarly the conversion to a topic field takes the value from TRON,
        // a pointer to the buffer at which to add the bytes and
        // a pointer to an integer to increment by the amount of bytes added
        std::vector<void (*)(int32_t, byte*, int*)> converters_to_topics;
    
        Channel channel = Channel("", 0, true); // TRON channel
        /* note: all vectors need to have the same size 
        example: 
            topic = "example_topic"
            byte_offset = [4, 2]
            converters = *some pointer converting 8 bytes to int32, nullptr
            channel = struct Channel "example channel"
    
        This example maps any message from topic "example_topic" to "example_channel".
        8 Bytes starting from index 4 are converted via the given pointer 
        and mapped to the first variable of the corresponding channel.
        The conversion function also adds 8 to the current position, 
        thus the next field starts 8 bytes + 2 bytes for offset after the last 
        known position, which was 4.
        Because the second converter points to nullptr, it is considered an int32,
        which is mapped to the second variable of the channel. 
        The next position is incremented by 4 if there is no conversion function given.
        Note that the first 4 Bytes of the topics message are ignored in this example.
        */
    
        // Callback used if TRON sends some message
        void(*input_callback)(Mapping& map, int32_t*) = nullptr;
    
        Mapping(std::string topic, std::string channel, bool channelIsInput){
            this->topic = topic;
            this->channel = *send_channel_decl_msg(channelIsInput, channel).get();
        }
    
        void add_var_to_mapping(std::string name_tron, int byte_offset, 
                             int32_t (*conv_to_TRON)(byte*, int*) = nullptr,
                             void (*conv_to_topic)(int32_t, byte*, int*) = nullptr){
            add_var_to_channel(this->channel, this->channel.is_input, name_tron);
            this->byte_offset.push_back(byte_offset);
            this->converters_to_TRON.push_back(conv_to_TRON);
            this->converters_to_topics.push_back(conv_to_topic);
        }
    };
    std::list<Mapping> mappings;
    std::vector<ros::Publisher> input_publishers;
    // callback publishing to topics like defined in mapping, used for input_callback
    template<class T>
    void mapping_callback_to_topic(Mapping& map, int32_t* vars){
        boost::shared_ptr<T> shared_ptr = boost::make_shared<T>();
        byte* ptr = reinterpret_cast<byte*>(shared_ptr.get());
        int next_pos = 0;
        for (int i = 0; i < map.channel.vars.size(); i++){
            int32_t val = vars[i];
            next_pos += map.byte_offset[i];
            if (map.converters_to_topics[i] == nullptr) {
                byte* val_bytes;
                if (SYS_IS_BIG_ENDIAN){
                    uint32_t switched_byte_order = htonl(*reinterpret_cast<uint32_t*>(&val));
                    val = *reinterpret_cast<int32_t*>(&switched_byte_order);
                } 
                val_bytes = reinterpret_cast<byte*>(&val);
                ptr[next_pos++] = val_bytes[0];
                ptr[next_pos++] = val_bytes[1];
                ptr[next_pos++] = val_bytes[2];
                ptr[next_pos++] = val_bytes[3];
            } else {
                map.converters_to_topics[i](val, &ptr[next_pos], &next_pos);
            }
        }
        for (ros::Publisher& pub : input_publishers)
            if (pub.getTopic() == map.topic) {
                pub.publish(shared_ptr);
                return;
            }
        throw "did not find publisher for topic";
    }
    
    // subscribers already have callbacks
    std::vector<ros::Subscriber> output_subscribers;
    
    // keep track of acknowledgements that are missing
    /* note: since communication is asynchronous this value can only be
    compared reliably with 0 after testing is terminated */
    /* note: TRON only acknowledges if virtual clock is used, which it is not (yet) */
    int acks_missing = 0; 
    
    void get_error_msg(int32_t errorcode) {
        ROS_WARN("got error, trying to get corresponding message");
        byte get_err_msg_msg[5];
        get_err_msg_msg[0] = GET_ERROR_MSG; // get error msg code
        add_int32_in_network_order(errorcode, get_err_msg_msg, 1);
        send_bytes(socket_fd, get_err_msg_msg, 5); // connection is closed after this write?
        byte err_msg_length = get_bytes_socket(socket_fd, 1)[0];
        auto err_msg = get_bytes_socket(socket_fd, err_msg_length);
        std::string msg_str = std::string(reinterpret_cast<char*>(err_msg.get()), (size_t) err_msg_length);
        ROS_FATAL("TRON sent error message: %s", msg_str.c_str());
        throw "got error from TRON";
    }
    
    void add_var_to_channel(Channel& chan, bool is_input, std::string var) {
        byte msg[6 + var.length()];
        msg[0] = is_input ? ADD_VAR_TO_INPUT : ADD_VAR_TO_OUTPUT;
        add_int32_in_network_order(chan.identifier, msg, 1);
        msg[5] = (byte) var.length();
        for (int i = 0; i < var.length(); i++) msg[6+i] = var.at(i);
        ROS_INFO("attaching variable %s to channel %s", var.c_str(), chan.name.c_str());
        send_bytes(socket_fd, msg, 6 + var.length());
        int32_t ack = get_int_socket(socket_fd);
        if (ack < 0) get_error_msg(ack);
        ROS_INFO("success: attached variable");
        chan.vars.push_back(var);
    }
    
    std::unique_ptr<Channel> send_channel_decl_msg(bool is_input, std::string name) {
        // prepare packet
        size_t msg_length = 2 + name.length();
        byte msg[msg_length];
        msg[0] = is_input ? DECL_CHAN_INPUT : DECL_CHAN_OUTPUT;
        msg[1] = name.length();
        for (int i = 2, c = 0; i < msg_length; i++, c++) msg[i] = name[c];
    
        // send packet
        ROS_INFO("declaring channel %s as %s", name.c_str(), (is_input ? "input" : "output"));
        byte_info(msg, msg_length);
        write(socket_fd, (void*) msg, msg_length);
    
        // get answer from TRON
        int32_t channel_identifier = get_int_socket(socket_fd);
        if (channel_identifier < 0) { // error handling
            get_error_msg(channel_identifier);
        }
        if (channel_identifier == 0) throw "did not get channel identifier";
        // assigned channel ID successfully
        ROS_INFO("success: identifier for channel %s is %i", name.c_str(), channel_identifier);
        return std::make_unique<Channel>(name, channel_identifier, is_input);
    }
    
    void set_time_unit_and_timeout(uint64_t microseconds, int32_t timeout){
        byte msg[9];
        msg[0] = SET_TIME_UNIT;
        byte *microseconds_bytes = reinterpret_cast<byte*>(&microseconds);
    
        // htonl does not exist for long int
        if (SYS_IS_BIG_ENDIAN) {
            for (int i = 0; i < 8; i++) msg[i+1] = microseconds_bytes[i];
        } else {
            for (int i = 0; i < 8; i++) msg[i+1] = microseconds_bytes[7-i];
        }
        ROS_INFO("setting time unit: %i microseconds", microseconds);
        send_bytes(socket_fd, msg, 9);
        int32_t ack = get_int_socket(socket_fd);
        if (ack != 0) get_error_msg(ack);
        ROS_INFO("success: set time unit");
    
        msg[0] = SET_TIMEOUT;
        add_int32_in_network_order(timeout, msg, 1);
        ROS_INFO("setting timeout to %i units", timeout);
        send_bytes(socket_fd, msg, 5);
        ack = get_int_socket(socket_fd);
        if (ack != 0) get_error_msg(ack);
        ROS_INFO("success: set timeout");
    }
     
    void request_start() {
        /* documentation confuses codes for start and getErrorMessage, actually used:
        64 is start
        127 is gerErrorMessage */
        ROS_INFO("requesting start");
        byte start = REQUEST_START;
        send_bytes(socket_fd, &start, 1);
        byte answer = get_bytes_socket(socket_fd, 1)[0];
        if (answer != ANSWER_START) throw "starting failed";
        ROS_INFO("success: starting test phase");
    }
    
    void report_now(Channel& chan, uint16_t var_count, int32_t *vars){
        std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(6 + 4 * var_count);
    
        add_int32_in_network_order(chan.identifier, msg.get(), 0);
    
        unsigned short var_count_network_order = htons(var_count);
        byte *var_count_bytes = reinterpret_cast<byte*>(&var_count_network_order);
        msg[4] = var_count_bytes[0];
        msg[5] = var_count_bytes[1];
    
        for (unsigned short i = 0; i < var_count; i++) 
            add_int32_in_network_order(vars[i], msg.get(), 6 + i * 4);
    
        ROS_INFO("sending to output channel %s", chan.name.c_str());
        if (var_count == 0) ROS_INFO("no variables attached");
        for (unsigned short i = 0; i < var_count; i++)
            ROS_INFO("attached value %i to variable %s", vars[i], chan.vars[i].c_str());
        send_bytes(socket_fd, msg.get(), 6 + 4 * var_count);
        acks_missing++;
    }
    
    // callback reports to TRON like defined in mappings
    template<class T>
    void mappings_callback_to_TRON(const ros::MessageEvent<T>& event){
        std::string topic = event.getConnectionHeader().at("topic");
        byte *bytes = reinterpret_cast<byte*>(event.getMessage().get());
        for (Mapping& mapping : mappings) {
            if (mapping.topic.c_str() == topic && !mapping.channel.is_input) {
                int var_count = mapping.byte_offset.size();
                int32_t vars[var_count * 4];
                int next_pos = 0;
                for (int i = 0; i < var_count; i++) {
                    next_pos += mapping.byte_offset[i];
                    if (mapping.converters_to_TRON[i] == nullptr) {
                        if (SYS_IS_BIG_ENDIAN)
                            vars[i] = network_bytes_to_int_32(&bytes[next_pos]);
                        else vars[i] = *reinterpret_cast<int32_t*>(&bytes[next_pos]);
                        next_pos += 4;
                        continue;
                    } else {
                        vars[i] = mapping.converters_to_TRON[i](&bytes[next_pos], &next_pos);
                    }
                }
                report_now(mapping.channel, var_count, vars);
            }
        }
    }
    
    void configuration_phase(ros::NodeHandle& nh){
        /* note: for configuration phase maximum message length is 256 Bytes, 
        therefore heap allocation can be avoided most of the time in called functions */
        
        // channels: declare, init mapping, add vars to TRON channel, add vars to mapping
        // for input: add publisher to input_publishers
        //            add callback for mapping (key is pair of topic and channel name)
        // for output: add subscribers to output_subscribers
        
        Mapping map = Mapping("/command", "ausloesen", true);
        map.add_var_to_mapping("zahl", 0);
        map.input_callback = mapping_callback_to_topic<std_msgs::Int32>;
        mappings.push_back(map);
        input_publishers.push_back(nh.advertise<std_msgs::Int32>("/command", 1));
        // output channels: declare, init mapping, add vars to TRON channel and mapping
        // add subscriber to list
        map = Mapping("/position", "position", false);
        map.add_var_to_mapping("zahl", 0);
        mappings.push_back(map);
        output_subscribers.push_back(nh.subscribe("/position", 1, 
                mappings_callback_to_TRON<std_msgs::Int32>));
        // not obvious in documentation: local variables are not supported
        // add_var_to_channel(socketfd, "ausloesen", "lokal");
        uint64_t microseconds = 1000000; // one second
        // documentation states 2 signed integers are used for some reason
        set_time_unit_and_timeout(microseconds, 100);
    
        // wait till subscribers initialized
        for (ros::Publisher& pub : input_publishers) {
            while (pub.getNumSubscribers() == 0) { ros::Duration(1).sleep(); };
        }
    }
    
    void process_TRONs_msgs(){
        /* note: TRONs communication after start is not guaranteed to be synchronous,
        thus incoming messages must be checked for their content */
        while (true){
            // get 4 bytes at a time as an int32
            byte bytes[4];
            if (!read_4_bytes_nonblock(socket_fd, bytes)) 
                break; // no bytes left to process
            int32_t next = network_bytes_to_int_32(bytes);
    
            // bytes are acknowledgement
            if (next == ACK_SINGLE) {
                if (--acks_missing < 0) throw "too many acknowledgements";
                ROS_INFO("got acknowledgement for output");
                continue;
            }
    
            // bytes are channel identifier
            // find corresponding channel
            const Channel *chan = nullptr;
            for (Mapping& map : mappings) if (map.channel.identifier == next) chan = &map.channel;
            /* note: this only happens if message number 12 in TRON User Manual is received,
                which should not be the case according to the documentation */
            if (chan == nullptr) 
                throw "channel could not be identified";
    
            ROS_INFO("got channel identifier (%s) for input", chan->name.c_str());
    
            // channel identified, assuming all following bytes are correct
    
            // get number of variables
            recv(socket_fd, bytes, 2, MSG_DONTWAIT);
            byte_info(bytes, 2, false);
            uint16_t var_count = network_bytes_to_uint_16(bytes);
            ROS_INFO("got variable count %i", var_count);
    
            int32_t vals[var_count];
            // process variables
            for (uint16_t i = 0; i < var_count; i++) {
                recv(socket_fd, bytes, 4, MSG_DONTWAIT);
                next = network_bytes_to_int_32(bytes);
                std::string var = chan->vars[i];
                ROS_INFO("got variable number %i:  value of %s is %i", i+1, var.c_str(), next);
                vals[i] = next;
            }
    
            for (Mapping& map : mappings)
                if (map.channel.name == chan->name && map.channel.is_input)
                    for (ros::Publisher& pub : input_publishers){
                        if (pub.getTopic() == map.topic) {
                            if (map.input_callback != nullptr)
                                map.input_callback(map, vals);  
                            else throw "no callback declared";  
                            break;
                        }
                    }
    
            // send acknowledgement
            add_int32_in_network_order(ACK_SINGLE, bytes, 0);
            ROS_INFO("sending acknowledgement");
            send_bytes(socket_fd, bytes, 4);
        }
    }
    
    int main(int argc, char**argv){
        ros::init(argc, argv, "TRON dapter");
        ros::NodeHandle nh;
    
        try {
            socket_fd = create_connected_socket(IP, PORT);
    
            configuration_phase(nh);
            
            request_start(); 
    
            // testing phase loop
            ros::Rate test_phase_freq(1);
            while (ros::ok()) {
                process_TRONs_msgs();
                ros::spinOnce();
                test_phase_freq.sleep();
            } 
        } catch (const char* err){
            ROS_FATAL("shutting down: %s", err);
            ros::shutdown();
        }
    }