Select Git revision
MqttPositionHandler.java

Oleksandr Husak authored
MqttPositionHandler.java 1.49 KiB
package ipos.project.SensorValueIntegration.api;
import com.google.protobuf.InvalidProtocolBufferException;
import ipos.models.SimpleScene.IposPosition;
import ipos.project.config.mqtt.Handler;
import ipos.project.config.mqtt.MqttListener;
import ipos.project.mapper.ProtoJsonMap;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
// subscribe to the topic
@MqttListener("test51/publish/positions")
public class MqttPositionHandler implements Handler {
private final Logger LOG = LoggerFactory.getLogger(getClass());
JmsTemplate jmsTemplate;
@Autowired
public MqttPositionHandler(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
// method that handle new message from the topic
public void handle(MqttMessage message) {
try {
IposPosition pos = ProtoJsonMap.fromJson(message.toString(), IposPosition.class);
//IposPosition pos = IposPosition.parseFrom(message.toString().getBytes());
assert pos != null;
LOG.info("IposPosition received: \n" + pos);
this.jmsTemplate.convertAndSend("/positions", pos); // submit position to the internal broker
//TODO: fix message converter for the internal broker
} catch (InvalidProtocolBufferException e) {
LOG.error("Invalid mqtt message:", e);
}
}
}