Skip to content
Snippets Groups Projects
Select Git revision
  • 1fb4ac0f4978f9c381fcf1673db9217f05e7d374
  • master default protected
2 results

main.cpp

Blame
  • user avatar
    cs-99 authored
    No commit message
    1fb4ac0f
    History
    main.cpp 13.51 KiB
    #include <ros/ros.h>
    #include <stdio.h>
    #include <sys/socket.h>
    #include <netinet/in.h>
    #include <arpa/inet.h>
    #include <std_msgs/Int32.h>
    
    typedef uint8_t byte;
    
    // some helper functions -----------------------------------------------
    
    // log bytes sent/received
    inline void byte_info(const byte* buf, int buf_length, bool send=true){
        std::stringstream strstr;
        strstr << (send ? "sending" : "received") << " bytes:"; 
        for (int i = 0; i < buf_length; i++) strstr << " " << (int)buf[i];
        ROS_INFO(strstr.str().c_str());
    }
    
    // gets count bytes from socket file descriptor (with timeout)
    const double SECONDS_BEFORE_TIMEOUT = 30;
    std::unique_ptr<byte[]> get_bytes_socket(int fd, int count){
        std::unique_ptr<byte[]> arr = std::make_unique<byte[]>(count);
        int already_read = 0;
        ros::Time start_time = ros::Time::now();
        int success = 0;
        while (already_read < count) {
            success = recv(fd, &arr[already_read], count-already_read, MSG_DONTWAIT);
            if (success == 0) throw "connection was closed by TRON";
            if (success == -1){
                if (ros::Time::now().toSec() - start_time.toSec() >= SECONDS_BEFORE_TIMEOUT)
                    throw "timeout while reading bytes from socket file descriptor";
                continue;
            }
            already_read += success; // read returns number of bytes read
        }
        byte_info(arr.get(), already_read, false);
        return arr; // no explicit move needed since return value is rvalue
    };
    
    inline int32_t bytes_to_int_32(byte *buf){
        uint32_t h = ntohl(*reinterpret_cast<uint32_t*>(buf));
        return *reinterpret_cast<int32_t*>(&h);
    }
    
    inline uint16_t bytes_to_uint_16(byte *buf) {
        return ntohs(*reinterpret_cast<uint16_t*>(buf));
    }
    
    // wrapping get_bytes_socket for converting to 32 bit integer
    int32_t get_int_socket(int fd) {
        auto ack = get_bytes_socket(fd, 4);
        return bytes_to_int_32(ack.get());
    }
    
    // converts num to network order and adds it to byte array starting from index
    void add_int32_in_network_order(int32_t num, byte *buf, int index){
        uint32_t n = htonl(*reinterpret_cast<uint32_t*>(&num));
        byte* bytes = reinterpret_cast<byte*>(&n);
        buf[index] = bytes[0];
        buf[++index] = bytes[1];
        buf[++index] = bytes[2];
        buf[++index] = bytes[3];
    }
    
    // wraps write() for printing and throwing on errors
    inline void send_bytes(int fd, const byte *buf, int length){
        byte_info(buf, length);
        int ret = write(fd, (void*) buf, length);
        if (ret < 0) throw "writing failed";
    }
    
    // returns false if nothing more to read and true if 4 bytes are read successfully
    // used to reduce overhead in testing phase
    inline bool read_4_bytes_nonblock(int fd, byte *buf) {
        int bytes_recv = recv(fd, buf, 4, MSG_DONTWAIT);
        if (bytes_recv == -1) return false; // nothing more to read
        if (bytes_recv == 0) throw "connection was closed";
        if (bytes_recv != 4) throw "could not read full 4 bytes";
        byte_info(buf, 4, false);
        return true;
    }
    
    // consts for TRON ----------------------------------------------------------
    
    const byte GET_ERROR_MSG = 127;
    const byte DECL_CHAN_INPUT = 1;
    const byte DECL_CHAN_OUTPUT = 2;
    const byte ADD_VAR_TO_INPUT = 3;
    const byte ADD_VAR_TO_OUTPUT = 4;
    const byte SET_TIME_UNIT = 5;
    const byte SET_TIMEOUT = 6;
    const byte REQUEST_START = 64;
    const byte ANSWER_START = 0;
    const int32_t ACK_SINGLE = 1 << 31; // 32 Bit int with most significant bit set to 1
    
    // global vars -------------------------------------------------------------
    
    struct Channel {
        int32_t identifier;
        bool is_input;
        std::vector<std::string> vars; // associated variables in Uppaal
        Channel(int32_t id, bool is_input) : identifier(id), is_input(is_input){};
        Channel() = default; // default constructor needed for std::map
    };
    std::map<std::string, Channel> channels;
    
    int socket_fd;
    
    // keep track of acknowledgements that are missing
    /* note: since communication is asynchronous this value can only be
    compared reliably with 0 after testing is terminated */
    int acks_missing = 0; 
    
    // tron specific functions -----------------------------------------------------
    
    void get_error_msg(int32_t errorcode) {
        ROS_WARN("got error, trying to get corresponding message");
        byte get_err_msg_msg[5];
        get_err_msg_msg[0] = GET_ERROR_MSG; // get error msg code
        add_int32_in_network_order(errorcode, get_err_msg_msg, 1);
        send_bytes(socket_fd, get_err_msg_msg, 5); // connection is closed after this write?
        byte err_msg_length = get_bytes_socket(socket_fd, 1)[0];
        auto err_msg = get_bytes_socket(socket_fd, err_msg_length);
        std::string msg_str = std::string(reinterpret_cast<char*>(err_msg.get()), (size_t) err_msg_length);
        ROS_FATAL("TRON sent error message: %s", msg_str.c_str());
        throw "got error from TRON";
    }
    
    void add_var_to_channel(std::string channel, bool is_input, std::string var) {
        byte msg[6 + var.length()];
        if (channels.find(channel) == channels.end()) throw "channel not declared";
        Channel& chan = channels.at(channel);
        msg[0] = is_input ? ADD_VAR_TO_INPUT : ADD_VAR_TO_OUTPUT;
        add_int32_in_network_order(chan.identifier, msg, 1);
        msg[5] = (byte) var.length();
        for (int i = 0; i < var.length(); i++) msg[6+i] = var.at(i);
        ROS_INFO("attaching variable %s to channel %s", var.c_str(), channel.c_str());
        send_bytes(socket_fd, msg, 6 + var.length());
        int32_t ack = get_int_socket(socket_fd);
        if (ack < 0) get_error_msg(ack);
        ROS_INFO("success: attached variable");
        chan.vars.push_back(var);
    }
    
    void send_channel_decl_msg(bool is_input, std::string name) {
        // prepare packet
        size_t msg_length = 2 + name.length();
        byte msg[msg_length];
        msg[0] = is_input ? DECL_CHAN_INPUT : DECL_CHAN_OUTPUT;
        msg[1] = name.length();
        for (int i = 2, c = 0; i < msg_length; i++, c++) msg[i] = name[c];
    
        // send packet
        ROS_INFO("declaring channel %s as %s", name.c_str(), (is_input ? "input" : "output"));
        byte_info(msg, msg_length);
        write(socket_fd, (void*) msg, msg_length);
    
        // get answer from TRON
        int32_t channel_identifier = get_int_socket(socket_fd);
        if (channel_identifier < 0) { // error handling
            get_error_msg(channel_identifier);
        }
        if (channel_identifier == 0) throw "did not get channel identifier";
        // assigned channel ID successfully
        ROS_INFO("success: identifier for channel %s is %i", name.c_str(), channel_identifier);
        channels[name] = Channel(channel_identifier, is_input);
    }
    
    void set_time_unit_and_timeout(uint64_t microseconds, int32_t timeout){
        byte msg[9];
        msg[0] = SET_TIME_UNIT;
        byte *microseconds_bytes = reinterpret_cast<byte*>(&microseconds);
    
        // htonl does not exist for long int
        const bool IS_BIG_ENDIAN = htonl(47) == 47;
        if (IS_BIG_ENDIAN) {
            for (int i = 0; i < 8; i++) msg[i+1] = microseconds_bytes[i];
        } else {
            for (int i = 0; i < 8; i++) msg[i+1] = microseconds_bytes[7-i];
        }
        ROS_INFO("setting time unit: %i microseconds", microseconds);
        send_bytes(socket_fd, msg, 9);
        int32_t ack = get_int_socket(socket_fd);
        if (ack != 0) get_error_msg(ack);
        ROS_INFO("success: set time unit");
    
        msg[0] = SET_TIMEOUT;
        add_int32_in_network_order(timeout, msg, 1);
        ROS_INFO("setting timeout to %i units", timeout);
        send_bytes(socket_fd, msg, 5);
        ack = get_int_socket(socket_fd);
        if (ack != 0) get_error_msg(ack);
        ROS_INFO("success: set timeout");
    }
     
    void request_start() {
        /* documentation confuses codes for start and getErrorMessage, actually used:
        64 is start
        127 is gerErrorMessage */
        ROS_INFO("requesting start");
        byte start = REQUEST_START;
        send_bytes(socket_fd, &start, 1);
        byte answer = get_bytes_socket(socket_fd, 1)[0];
        if (answer != ANSWER_START) throw "starting failed";
        ROS_INFO("success: starting test phase");
    }
    
    void report_now(std::string chan_name, uint16_t var_count, int32_t *vars){
        std::unique_ptr<byte[]> msg = std::make_unique<byte[]>(6 + 4 * var_count);
        if (channels.find(chan_name) == channels.end()) throw "channel not declared";
        Channel chan = channels.at(chan_name);
    
        add_int32_in_network_order(chan.identifier, msg.get(), 0);
    
        unsigned short var_count_network_order = htons(var_count);
        byte *var_count_bytes = reinterpret_cast<byte*>(&var_count_network_order);
        msg[4] = var_count_bytes[0];
        msg[5] = var_count_bytes[1];
    
        for (unsigned short i = 0; i < var_count; i++) 
            add_int32_in_network_order(vars[i], msg.get(), 6 + i * 4);
    
        ROS_INFO("sending to output channel %s", chan_name.c_str());
        if (var_count == 0) ROS_INFO("no variables attached");
        for (unsigned short i = 0; i < var_count; i++)
            ROS_INFO("attached value %i to variable %s", vars[i], chan.vars[i].c_str());
        send_bytes(socket_fd, msg.get(), 6 + 4 * var_count);
        acks_missing++;
    }
    
    // returns file descriptor
    int create_connected_socket(std::string IP, uint16_t port){
        int socketfd;
        if ((socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
            throw "failed to create socket";
        }
        ROS_INFO("socket created successfully");
    
        struct sockaddr_in addr;
        addr.sin_family = AF_INET;
        addr.sin_port = htons(port);
        {
            int x = inet_pton(AF_INET, IP.c_str(), &addr.sin_addr);
            if (x != 1) {
                throw "IP could not be converted";
            }
        }
        if (connect(socketfd, (struct sockaddr*)&addr, sizeof(sockaddr_in)) < 0) {
            throw "failed to connect";
        }
        ROS_INFO("successfully connected");
        return socketfd;
    }
    
    void configuration_phase(){
        /* note: for configuration phase maximum message length is 256 Bytes, 
        therefore heap allocation can be avoided most of the time in called functions */
        send_channel_decl_msg(true, "ausloesen");
        add_var_to_channel("ausloesen", true, "zahl");
        send_channel_decl_msg(false, "position");
        add_var_to_channel("position", false, "zahl");
        // not obvious in documentation: local variables are not supported
        //add_var_to_channel(socketfd, "ausloesen", "lokal");
        uint64_t microseconds = 1000000; // one second
        // documentation states 2 signed integers are used for some reason
        set_time_unit_and_timeout(microseconds, 100);
    }
    
    // TODO implement callbacks/topic messages
    void process_TRONs_msgs(){
        /* note: TRONs communication after start is not guaranteed to be synchronous,
        thus incoming messages must be checked for their content */
        while (true){
            // get 4 bytes at a time as an int32
            byte bytes[4];
            if (!read_4_bytes_nonblock(socket_fd, bytes)) 
                break; // no bytes left to process
            int32_t next = bytes_to_int_32(bytes);
    
            // bytes are acknowledgement
            if (next == ACK_SINGLE) {
                if (--acks_missing < 0) throw "too many acknowledgements";
                ROS_INFO("got acknowledgement for output");
                continue;
            }
    
            // bytes are channel identifier
            // find corresponding channel
            std::string chan_name;
            for (std::pair<const std::string, Channel>& pair : channels) {
                if (pair.second.identifier == next) chan_name = pair.first;
            }
    
            if (chan_name.empty()) 
                /* note: this only happens if message number 12 in TRON User Manual is received,
                which should not be the case according to the documentation */
                throw "channel could not be identified";
            ROS_INFO("got channel identifier (%s) for input", chan_name.c_str());
    
            // channel identified, assuming all following bytes are correct
    
            // get number of variables
            recv(socket_fd, bytes, 2, MSG_DONTWAIT);
            byte_info(bytes, 2, false);
            uint16_t var_count = bytes_to_uint_16(bytes);
            ROS_INFO("got variable count %i", var_count);
    
            // process variables
            for (uint16_t i = 0; i < var_count; i++) {
                recv(socket_fd, bytes, 4, MSG_DONTWAIT);
                next = bytes_to_int_32(bytes);
                Channel& c = channels.at(chan_name);
                std::string var = channels.at(chan_name).vars[i];
                ROS_INFO("got variable number %i:  value of %s is %i", i+1, channels.at(chan_name).vars[i].c_str(), next);
                // TODO transfer message to topic possibly via custom callback
            }
    
            // send acknowledgement
            add_int32_in_network_order(ACK_SINGLE, bytes, 0);
            ROS_INFO("sending acknowledgement");
            send_bytes(socket_fd, bytes, 4);
        }
    }
    
    void testCallback(const std_msgs::Int32::ConstPtr& msg){
        int32_t x = msg->data;
        report_now("position", 1, &x);
    }
    
    int main(int argc, char**argv){
        ros::init(argc, argv, "TRON dapter");
        ros::NodeHandle nh;
        ros::Publisher pub = nh.advertise<std_msgs::Int32>("test_topic", 1);
        if (!pub) throw "publisher nicht erstellt";
    
        try {
            const std::string IP = "127.0.0.1";
            const uint16_t PORT = 8080;
            socket_fd = create_connected_socket(IP, PORT);
    
            configuration_phase();
    
            // subscribe to topics and add callbacks which use report_now-------------------------------------------
            ros::Subscriber sub = nh.subscribe("test_topic", 1, testCallback);
            // -----------------------------------------------------------
            
            request_start(); 
    
            // testing phase loop
            ros::Rate test_phase_freq(1);
            int y = 2;
            while (ros::ok()) {
                std_msgs::Int32 x;
                x.data = 6;
                if (y==2) pub.publish(x);
                y++; // TRON does not seem to send acknowledgements back
                process_TRONs_msgs();
                ros::spinOnce();
                test_phase_freq.sleep();
            } 
        } catch (const char* err){
            ROS_FATAL("shutting down: %s", err);
            ros::shutdown();
        }
    }