Skip to content
Snippets Groups Projects
Select Git revision
  • f0568e3971d2728f7daf07914eb0328b831c0ea3
  • master default protected
2 results

IPOS.py

Blame
  • IPOS.py 6.32 KiB
    import serial
    import math
    import numpy as np
    import logging
    import re
    import datetime
    import time
    import json
    import paho.mqtt.client as mqtt
    # setups
    position_3383 = [0,4.35]
    position_885  = [0,0]
    position_2934 = [3.65,0.68]
    position_1107 = [3.65,5.4]
    the_hostname = "UWB_rasp"
    #MQTT_SERVER = "192.168.0.111"
    MQTT_SERVER = "broker.hivemq.com"
    MQTT_PATH = "usertopic/SensorEventWrapper"
    
    
    ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1)  # open serial port
    print(ser.name)         # check which port was really used
    
    def extractNumbersFromLine(line):
        return re.findall(r'-?\d+\.?\d*',line)
    
    def get_distance(number_found):
        return number_found[1], number_found[3]
        
    def average(lst):
        return sum(lst) / len(lst)
    
    def read_serial(number):
        d1 = []
        d2 = []
        d3 = []
        d4 = []
        with serial.Serial('/dev/ttyUSB0', 115200, timeout=1) as ser:
            for x in range(number):
                line = ser.readline()   # read a '\n' terminated line
                number_found = extractNumbersFromLine(line.decode("utf-8"));
                print(line)
                if(len(number_found)>=4):
                    tag, distance = get_distance(number_found);
                    print(tag, distance)
                    if (tag == "3383"):
                        d1.append(float(distance))
                    elif (tag == "885"):
                        d2.append(float(distance))
                    elif (tag == "2934"):
                        d3.append(float(distance))
                    elif (tag == "1107"):
                        d4.append(float(distance))
        if (len(d1) == 0 or len(d2) == 0 or len(d3) == 0 or len(d4) == 0):
            print ("sth went wrong")
            return [-1, -1, -1, -1]
        return average(d1), average(d2), average(d3), average(d4)
    
    class base_station(object):
        def __init__(self, lat, lon, dist):
            self.lat = lat
            self.lon = lon
            self.dist = dist
    
    class point(object):
        def __init__(self, x, y):
            self.x = x
            self.y = y
    
    class circle(object):
        def __init__(self, point, radius):
            self.center = point
            self.radius = radius
    
    def get_two_points_distance(p1, p2):
        return math.sqrt(pow((p1.x - p2.x), 2) + pow((p1.y - p2.y), 2))
    
    def get_two_circles_intersecting_points(c1, c2):
        p1 = c1.center 
        p2 = c2.center
        r1 = c1.radius
        r2 = c2.radius
    
        d = get_two_points_distance(p1, p2)
        # if to far away, or self contained - can't be done
        if d >= (r1 + r2) or d <= math.fabs(r1 -r2):
            return None
    
        a = (pow(r1, 2) - pow(r2, 2) + pow(d, 2)) / (2*d)
        h  = math.sqrt(pow(r1, 2) - pow(a, 2))
        x0 = p1.x + a*(p2.x - p1.x)/d 
        y0 = p1.y + a*(p2.y - p1.y)/d
        rx = -(p2.y - p1.y) * (h/d)
        ry = -(p2.x - p1.x) * (h / d)
        return [point(x0+rx, y0-ry), point(x0-rx, y0+ry)]
    
    def get_all_intersecting_points(circles):
        points = []
        num = len(circles)
        for i in range(num):
            j = i + 1
            for k in range(j, num):
                res = get_two_circles_intersecting_points(circles[i], circles[k])
                if res:
                    points.extend(res)
        return points
    
    def is_contained_in_circles(point, circles):
        for i in range(len(circles)):
            if (get_two_points_distance(point, circles[i].center) > (circles[i].radius)):
                return False
        return True
    
    def get_polygon_center(points):
        #print(points)
        center = point(0, 0)
        num = len(points)
        for i in range(num):
            center.x += points[i].x
            center.y += points[i].y
        center.x /= num
        center.y /= num
        #print([center.x, center.y])
        return center
    
    
    
    def cal_position_2d(pt1,d1,pt2,d2,pt3,d3,pt4,d4):
        #print(d1, d2, d3)
        p1 = point(pt1[0], pt1[1])
        p2 = point(pt2[0], pt2[1])
        p3 = point(pt3[0], pt3[1])
        p4 = point(pt4[0], pt4[1])
        offset=0
        
        while True:
            c1 = circle(p1, d1+offset)
            c2 = circle(p2, d2+offset)
            c3 = circle(p3, d3+offset)
            c4 = circle(p4, d4+offset)
            offset = offset + 0.01
    
            circle_list = [c1, c2, c3, c4]
            inner_points = []
            for p in get_all_intersecting_points(circle_list):
                if is_contained_in_circles(p, circle_list):
                    inner_points.append(p)
            if (len(inner_points)>=3):
                break
            print(len(inner_points))
        
        return get_polygon_center(inner_points)
    
    
    
    def create_msg(d):
        #print(d[0])
        timestamp = str(datetime.datetime.now().date())
        timestamp += "T"
        timestamp += str(datetime.datetime.now().time())
        timestamp += "+00:00"
        #print(timestamp)
        point = cal_position_2d(position_3383,d[0],position_885,d[1],position_2934,d[2],position_1107,d[3])
        
        point_msg = {
            "x" : point.x,
            "y" : point.y,
            "z" : "1.0"
            }
        
        position_msg = {
            "refSystemId" : "ROOT",
            "point" : point_msg,
            "accuracy" : 5.0
            }
        orientation_msg = {
            "x" : 1.0,
            "y" : 0.5,
            "z" : 1.0,
            "w" :
            1.5
            }
        
        json_msg = {
            "sensorId" : "UWB_1",
            "position" : position_msg,
            "orientation" : orientation_msg,
            "lastPosUpdate" : timestamp
            }
        wrapper_msg = {"sensorPositionEvent" : [json_msg]
            }
        msg = json.dumps(wrapper_msg)
        print(msg)
        return msg
    
    # The callback for when the client receives a CONNACK response from the server.
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print("Connected successfully")
        else:
            print("Connect returned result code: " + str(rc))
    
    # The callback for when a PUBLISH message is received from the server.
    def on_message(client, userdata, msg):
        print("Received message: " + msg.topic + " -> " + msg.payload.decode("utf-8"))
    
    # create the client
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    
    # enable TLS
    #client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
    #client.username_pw_set("phone", "IPOSframework123")
    client.connect(MQTT_SERVER, 1883)
    #client.loop_start()
    #client.subscribe(MQTT_PATH)
    
    
    
    starttime = time.time()
    line = ser.readline()
    while True:
        distances = read_serial(90)
        if (distances == [-1, -1, -1, -1]):
            continue
        msg=create_msg(distances)
        client.publish(MQTT_PATH, msg)
        time.sleep(1.0-((time.time() - starttime)%1.0))
    #client.loop_forever()
    #publish.single(MQTT_PATH, msg, hostname=MQTT_SERVER, port=8884, auth={'username':"UWB_rasp", 'password':"IPOSframwork123"}, client_id=the_hostname)