import threading from datetime import datetime from google.protobuf import json_format import cgv_connector_pb2 class MaxTopicLength: def __init__(self): self.max_mqtt_topic_length = 22 self.mqtt_log_reformat_event = threading.Event() def process_topic(self, topic): if len(topic) > self.max_mqtt_topic_length: self.max_mqtt_topic_length = len(topic) print(f"new long topic length: {self.max_mqtt_topic_length}") self.mqtt_log_reformat_event.set() return self.max_mqtt_topic_length def get_and_clear(self): if self.mqtt_log_reformat_event.is_set(): self.mqtt_log_reformat_event.clear() return True return False def format_log_msg(topic: str, max_mqtt_topic_length: int, message: str, timestamp: datetime = None): now = timestamp or datetime.now().strftime('%Y-%m-%d %H:%M:%S') # padding = max_mqtt_topic_length - len(topic) + 1 return f'[{now} @ {topic:{max_mqtt_topic_length}}] {message}' def parse_log_msg(entry: str): # print("Parsing >", entry, "<") at_index = entry.index('@') bracket_index = entry.index(']') return entry[1:at_index].strip(), entry[at_index + 1:bracket_index].strip(), entry[bracket_index + 2:] def topic_match(topics_to_filter, msg): timestamp, topic, message = parse_log_msg(msg) # replacing strange space characters topic = topic.replace(chr(65532), '') return topic in topics_to_filter def format_scene(scene: cgv_connector_pb2.Scene): result = "" for obj in scene.objects: if obj.type == cgv_connector_pb2.Object.Type.BOX: pos = obj.pos result += f"\n<obj {obj.id:15} at ({pos.x:6.2} {pos.y:6.2} {pos.z:6.2})>" return result def format_command(command: cgv_connector_pb2.MergedSelection): return f"<cmd by {command.idRobot} of {command.idPick} to {command.idPlace}>" def _get_reach_objects(r): return [objReach.idObject + ' ' + ('✔' if objReach.reachable else '✖') for objReach in r.objects] def format_reachability(r: cgv_connector_pb2.Reachability): return f"<reach of {r.idRobot}: [{', '.join(_get_reach_objects(r))}]>"