Skip to content
Snippets Groups Projects
Select Git revision
  • ec11e612a0157ba85d50cc80c14a5a3d66a801de
  • master default protected
  • Orderpicker
  • feature/SRSconvertion
  • fix/monitor-request
  • fetures/filter
6 results

MqttPositionHandler.java

Blame
  • MqttPositionHandler.java 1.49 KiB
    package ipos.project.SensorValueIntegration.api;
    
    import com.google.protobuf.InvalidProtocolBufferException;
    import ipos.models.SimpleScene.IposPosition;
    import ipos.project.config.mqtt.Handler;
    import ipos.project.config.mqtt.MqttListener;
    import ipos.project.mapper.ProtoJsonMap;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    
    
    // subscribe to the topic
    @MqttListener("test51/publish/positions")
    public class MqttPositionHandler implements Handler {
    
        private final Logger LOG = LoggerFactory.getLogger(getClass());
        JmsTemplate jmsTemplate;
    
        @Autowired
        public MqttPositionHandler(JmsTemplate jmsTemplate) {
            this.jmsTemplate = jmsTemplate;
        }
    
        // method that handle new message from the topic
        public void handle(MqttMessage message) {
            try {
                IposPosition pos = ProtoJsonMap.fromJson(message.toString(), IposPosition.class);
                //IposPosition pos = IposPosition.parseFrom(message.toString().getBytes());
                assert pos != null;
                LOG.info("IposPosition received: \n" + pos);
    
                this.jmsTemplate.convertAndSend("/positions", pos); // submit position to the internal broker
                //TODO: fix message converter for the internal broker
            } catch (InvalidProtocolBufferException e) {
                LOG.error("Invalid mqtt message:", e);
            }
        }
    }