Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
U
UWB
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package registry
Container registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
IPos-public
UWB
Commits
cc2ead24
Commit
cc2ead24
authored
3 years ago
by
Hailong Zhu
Browse files
Options
Downloads
Patches
Plain Diff
added IPOS.py
parent
609abd0f
Branches
Branches containing commit
No related tags found
No related merge requests found
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
IPOS.py
+231
-0
231 additions, 0 deletions
IPOS.py
with
231 additions
and
0 deletions
IPOS.py
0 → 100644
+
231
−
0
View file @
cc2ead24
import
serial
import
math
import
numpy
as
np
import
logging
import
re
import
datetime
import
time
import
json
import
paho.mqtt.client
as
mqtt
# setups
position_3383
=
[
0
,
4.35
]
position_885
=
[
0
,
0
]
position_2934
=
[
3.65
,
0.68
]
position_1107
=
[
3.65
,
5.4
]
the_hostname
=
"
UWB_rasp
"
#MQTT_SERVER = "192.168.0.111"
MQTT_SERVER
=
"
broker.hivemq.com
"
MQTT_PATH
=
"
usertopic/SensorEventWrapper
"
ser
=
serial
.
Serial
(
'
/dev/ttyUSB0
'
,
115200
,
timeout
=
1
)
# open serial port
print
(
ser
.
name
)
# check which port was really used
def
get_distance
(
line
):
number_found
=
re
.
findall
(
r
'
-?\d+\.?\d*
'
,
line
)
return
number_found
[
1
],
number_found
[
3
]
def
average
(
lst
):
return
sum
(
lst
)
/
len
(
lst
)
def
read_serial
(
number
):
d1
=
[]
d2
=
[]
d3
=
[]
d4
=
[]
with
serial
.
Serial
(
'
/dev/ttyUSB0
'
,
115200
,
timeout
=
1
)
as
ser
:
for
x
in
range
(
number
):
line
=
ser
.
readline
()
# read a '\n' terminated line
print
(
line
)
if
(
len
(
line
)
>
30
):
tag
,
distance
=
get_distance
(
line
.
decode
(
"
utf-8
"
))
print
(
tag
,
distance
)
if
(
tag
==
"
3383
"
):
d1
.
append
(
float
(
distance
))
elif
(
tag
==
"
885
"
):
d2
.
append
(
float
(
distance
))
elif
(
tag
==
"
2934
"
):
d3
.
append
(
float
(
distance
))
elif
(
tag
==
"
1107
"
):
d4
.
append
(
float
(
distance
))
if
(
len
(
d1
)
==
0
or
len
(
d2
)
==
0
or
len
(
d3
)
==
0
or
len
(
d4
)
==
0
):
print
(
"
sth went wrong
"
)
return
[
-
1
,
-
1
,
-
1
,
-
1
]
return
average
(
d1
),
average
(
d2
),
average
(
d3
),
average
(
d4
)
class
base_station
(
object
):
def
__init__
(
self
,
lat
,
lon
,
dist
):
self
.
lat
=
lat
self
.
lon
=
lon
self
.
dist
=
dist
class
point
(
object
):
def
__init__
(
self
,
x
,
y
):
self
.
x
=
x
self
.
y
=
y
class
circle
(
object
):
def
__init__
(
self
,
point
,
radius
):
self
.
center
=
point
self
.
radius
=
radius
def
get_two_points_distance
(
p1
,
p2
):
return
math
.
sqrt
(
pow
((
p1
.
x
-
p2
.
x
),
2
)
+
pow
((
p1
.
y
-
p2
.
y
),
2
))
def
get_two_circles_intersecting_points
(
c1
,
c2
):
p1
=
c1
.
center
p2
=
c2
.
center
r1
=
c1
.
radius
r2
=
c2
.
radius
d
=
get_two_points_distance
(
p1
,
p2
)
# if to far away, or self contained - can't be done
if
d
>=
(
r1
+
r2
)
or
d
<=
math
.
fabs
(
r1
-
r2
):
return
None
a
=
(
pow
(
r1
,
2
)
-
pow
(
r2
,
2
)
+
pow
(
d
,
2
))
/
(
2
*
d
)
h
=
math
.
sqrt
(
pow
(
r1
,
2
)
-
pow
(
a
,
2
))
x0
=
p1
.
x
+
a
*
(
p2
.
x
-
p1
.
x
)
/
d
y0
=
p1
.
y
+
a
*
(
p2
.
y
-
p1
.
y
)
/
d
rx
=
-
(
p2
.
y
-
p1
.
y
)
*
(
h
/
d
)
ry
=
-
(
p2
.
x
-
p1
.
x
)
*
(
h
/
d
)
return
[
point
(
x0
+
rx
,
y0
-
ry
),
point
(
x0
-
rx
,
y0
+
ry
)]
def
get_all_intersecting_points
(
circles
):
points
=
[]
num
=
len
(
circles
)
for
i
in
range
(
num
):
j
=
i
+
1
for
k
in
range
(
j
,
num
):
res
=
get_two_circles_intersecting_points
(
circles
[
i
],
circles
[
k
])
if
res
:
points
.
extend
(
res
)
return
points
def
is_contained_in_circles
(
point
,
circles
):
for
i
in
range
(
len
(
circles
)):
if
(
get_two_points_distance
(
point
,
circles
[
i
].
center
)
>
(
circles
[
i
].
radius
)):
return
False
return
True
def
get_polygon_center
(
points
):
#print(points)
center
=
point
(
0
,
0
)
num
=
len
(
points
)
for
i
in
range
(
num
):
center
.
x
+=
points
[
i
].
x
center
.
y
+=
points
[
i
].
y
center
.
x
/=
num
center
.
y
/=
num
#print([center.x, center.y])
return
center
def
cal_position_2d
(
pt1
,
d1
,
pt2
,
d2
,
pt3
,
d3
,
pt4
,
d4
):
#print(d1, d2, d3)
p1
=
point
(
pt1
[
0
],
pt1
[
1
])
p2
=
point
(
pt2
[
0
],
pt2
[
1
])
p3
=
point
(
pt3
[
0
],
pt3
[
1
])
p4
=
point
(
pt4
[
0
],
pt4
[
1
])
offset
=
0
while
True
:
c1
=
circle
(
p1
,
d1
+
offset
)
c2
=
circle
(
p2
,
d2
+
offset
)
c3
=
circle
(
p3
,
d3
+
offset
)
c4
=
circle
(
p4
,
d4
+
offset
)
offset
=
offset
+
0.01
circle_list
=
[
c1
,
c2
,
c3
,
c4
]
inner_points
=
[]
for
p
in
get_all_intersecting_points
(
circle_list
):
if
is_contained_in_circles
(
p
,
circle_list
):
inner_points
.
append
(
p
)
if
(
len
(
inner_points
)
>=
3
):
break
print
(
len
(
inner_points
))
return
get_polygon_center
(
inner_points
)
def
create_msg
(
d
):
#print(d[0])
timestamp
=
str
(
datetime
.
datetime
.
now
().
date
())
timestamp
+=
"
T
"
timestamp
+=
str
(
datetime
.
datetime
.
now
().
time
())
timestamp
+=
"
+00:00
"
#print(timestamp)
point
=
cal_position_2d
(
position_3383
,
d
[
0
],
position_885
,
d
[
1
],
position_2934
,
d
[
2
],
position_1107
,
d
[
3
])
point_msg
=
{
"
x
"
:
point
.
x
,
"
y
"
:
point
.
y
,
"
z
"
:
"
1.0
"
}
position_msg
=
{
"
refSystemId
"
:
"
ROOT
"
,
"
point
"
:
point_msg
,
"
accuracy
"
:
5.0
}
orientation_msg
=
{
"
x
"
:
1.0
,
"
y
"
:
0.5
,
"
z
"
:
1.0
,
"
w
"
:
1.5
}
json_msg
=
{
"
sensorId
"
:
"
UWB_1
"
,
"
position
"
:
position_msg
,
"
orientation
"
:
orientation_msg
,
"
lastPosUpdate
"
:
timestamp
}
wrapper_msg
=
{
"
sensorPositionEvent
"
:
[
json_msg
]
}
msg
=
json
.
dumps
(
wrapper_msg
)
print
(
msg
)
return
msg
# The callback for when the client receives a CONNACK response from the server.
def
on_connect
(
client
,
userdata
,
flags
,
rc
):
if
rc
==
0
:
print
(
"
Connected successfully
"
)
else
:
print
(
"
Connect returned result code:
"
+
str
(
rc
))
# The callback for when a PUBLISH message is received from the server.
def
on_message
(
client
,
userdata
,
msg
):
print
(
"
Received message:
"
+
msg
.
topic
+
"
->
"
+
msg
.
payload
.
decode
(
"
utf-8
"
))
# create the client
client
=
mqtt
.
Client
()
client
.
on_connect
=
on_connect
client
.
on_message
=
on_message
# enable TLS
#client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS)
#client.username_pw_set("phone", "IPOSframework123")
client
.
connect
(
MQTT_SERVER
,
1883
)
#client.loop_start()
#client.subscribe(MQTT_PATH)
starttime
=
time
.
time
()
line
=
ser
.
readline
()
while
True
:
distances
=
read_serial
(
90
)
if
(
distances
==
[
-
1
,
-
1
,
-
1
,
-
1
]):
continue
msg
=
create_msg
(
distances
)
client
.
publish
(
MQTT_PATH
,
msg
)
time
.
sleep
(
1.0
-
((
time
.
time
()
-
starttime
)
%
1.0
))
#client.loop_forever()
#publish.single(MQTT_PATH, msg, hostname=MQTT_SERVER, port=8884, auth={'username':"UWB_rasp", 'password':"IPOSframwork123"}, client_id=the_hostname)
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment