Select Git revision
IPOS.py 6.32 KiB
import serial
import math
import numpy as np
import logging
import re
import datetime
import time
import json
import paho.mqtt.client as mqtt
# setups
position_3383 = [0,4.35]
position_885 = [0,0]
position_2934 = [3.65,0.68]
position_1107 = [3.65,5.4]
the_hostname = "UWB_rasp"
#MQTT_SERVER = "192.168.0.111"
MQTT_SERVER = "broker.hivemq.com"
MQTT_PATH = "usertopic/SensorEventWrapper"
ser = serial.Serial('/dev/ttyUSB0', 115200, timeout=1) # open serial port
print(ser.name) # check which port was really used
def extractNumbersFromLine(line):
return re.findall(r'-?\d+\.?\d*',line)
def get_distance(number_found):
return number_found[1], number_found[3]
def average(lst):
return sum(lst) / len(lst)
def read_serial(number):
d1 = []
d2 = []
d3 = []
d4 = []
with serial.Serial('/dev/ttyUSB0', 115200, timeout=1) as ser:
for x in range(number):
line = ser.readline() # read a '\n' terminated line
number_found = extractNumbersFromLine(line.decode("utf-8"));
print(line)
if(len(number_found)>=4):
tag, distance = get_distance(number_found);
print(tag, distance)
if (tag == "3383"):
d1.append(float(distance))
elif (tag == "885"):
d2.append(float(distance))
elif (tag == "2934"):
d3.append(float(distance))
elif (tag == "1107"):
d4.append(float(distance))
if (len(d1) == 0 or len(d2) == 0 or len(d3) == 0 or len(d4) == 0):
print ("sth went wrong")
return [-1, -1, -1, -1]
return average(d1), average(d2), average(d3), average(d4)
class base_station(object):
def __init__(self, lat, lon, dist):
self.lat = lat
self.lon = lon
self.dist = dist
class point(object):
def __init__(self, x, y):
self.x = x
self.y = y
class circle(object):
def __init__(self, point, radius):
self.center = point
self.radius = radius
def get_two_points_distance(p1, p2):
return math.sqrt(pow((p1.x - p2.x), 2) + pow((p1.y - p2.y), 2))
def get_two_circles_intersecting_points(c1, c2):
p1 = c1.center
p2 = c2.center
r1 = c1.radius
r2 = c2.radius
d = get_two_points_distance(p1, p2)
# if to far away, or self contained - can't be done
if d >= (r1 + r2) or d <= math.fabs(r1 -r2):
return None
a = (pow(r1, 2) - pow(r2, 2) + pow(d, 2)) / (2*d)
h = math.sqrt(pow(r1, 2) - pow(a, 2))
x0 = p1.x + a*(p2.x - p1.x)/d
y0 = p1.y + a*(p2.y - p1.y)/d
rx = -(p2.y - p1.y) * (h/d)
ry = -(p2.x - p1.x) * (h / d)
return [point(x0+rx, y0-ry), point(x0-rx, y0+ry)]
def get_all_intersecting_points(circles):
points = []
num = len(circles)
for i in range(num):
j = i + 1
for k in range(j, num):
res = get_two_circles_intersecting_points(circles[i], circles[k])
if res:
points.extend(res)
return points
def is_contained_in_circles(point, circles):
for i in range(len(circles)):
if (get_two_points_distance(point, circles[i].center) > (circles[i].radius)):
return False
return True
def get_polygon_center(points):
#print(points)
center = point(0, 0)
num = len(points)
for i in range(num):
center.x += points[i].x
center.y += points[i].y
center.x /= num
center.y /= num
#print([center.x, center.y])
return center
def cal_position_2d(pt1,d1,pt2,d2,pt3,d3,pt4,d4):
#print(d1, d2, d3)
p1 = point(pt1[0], pt1[1])
p2 = point(pt2[0], pt2[1])
p3 = point(pt3[0], pt3[1])
p4 = point(pt4[0], pt4[1])
offset=0
while True:
c1 = circle(p1, d1+offset)
c2 = circle(p2, d2+offset)
c3 = circle(p3, d3+offset)
c4 = circle(p4, d4+offset)
offset = offset + 0.01
circle_list = [c1, c2, c3, c4]
inner_points = []
for p in get_all_intersecting_points(circle_list):
if is_contained_in_circles(p, circle_list):
inner_points.append(p)
if (len(inner_points)>=3):
break
print(len(inner_points))
return get_polygon_center(inner_points)
def create_msg(d):
#print(d[0])
timestamp = str(datetime.datetime.now().date())
timestamp += "T"
timestamp += str(datetime.datetime.now().time())
timestamp += "+00:00"
#print(timestamp)
point = cal_position_2d(position_3383,d[0],position_885,d[1],position_2934,d[2],position_1107,d[3])
point_msg = {
"x" : point.x,
"y" : point.y,
"z" : "1.0"
}
position_msg = {
"refSystemId" : "ROOT",
"point" : point_msg,
"accuracy" : 5.0
}
orientation_msg = {
"x" : 1.0,
"y" : 0.5,
"z" : 1.0,
"w" :
1.5
}
json_msg = {
"sensorId" : "UWB_1",
"position" : position_msg,
"orientation" : orientation_msg,
"lastPosUpdate" : timestamp
}
wrapper_msg = {"sensorPositionEvent" : [json_msg]
}
msg = json.dumps(wrapper_msg)
print(msg)
return msg
# The callback for when the client receives a CONNACK response from the server.
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected successfully")
else:
print("Connect returned result code: " + str(rc))
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
print("Received message: " + msg.topic + " -> " + msg.payload.decode("utf-8"))
# create the client
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# enable TLS
#client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
#client.username_pw_set("phone", "IPOSframework123")
client.connect(MQTT_SERVER, 1883)
#client.loop_start()
#client.subscribe(MQTT_PATH)
starttime = time.time()
line = ser.readline()
while True:
distances = read_serial(90)
if (distances == [-1, -1, -1, -1]):
continue
msg=create_msg(distances)
client.publish(MQTT_PATH, msg)
time.sleep(1.0-((time.time() - starttime)%1.0))
#client.loop_forever()
#publish.single(MQTT_PATH, msg, hostname=MQTT_SERVER, port=8884, auth={'username':"UWB_rasp", 'password':"IPOSframwork123"}, client_id=the_hostname)