diff --git a/src/main/java/ipos/project/DataModellntegration/SimpleSceneIntegration.java b/src/main/java/ipos/project/DataModellntegration/SimpleSceneIntegration.java index fcdb1a190df32842c8482f1542a89de43435c09b..9d3e0cbd8b310fcf3da302402f0c8ccfaf857750 100644 --- a/src/main/java/ipos/project/DataModellntegration/SimpleSceneIntegration.java +++ b/src/main/java/ipos/project/DataModellntegration/SimpleSceneIntegration.java @@ -1,7 +1,8 @@ package ipos.project.DataModellntegration; import ipos.models.SimpleScene.IposPosition; -import ipos.project.DataModellntegration.service.impl.ExtMqttServiceImpl; +import ipos.project.DataModellntegration.service.impl.ExternalPubServiceImpl; +import ipos.project.mapper.ProtoJsonMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -13,11 +14,15 @@ public class SimpleSceneIntegration { private final Logger LOG = LoggerFactory.getLogger(getClass()); @Autowired - private ExtMqttServiceImpl mqttService; + private ExternalPubServiceImpl mqttService; - @JmsListener(destination = "/positions", containerFactory = "myFactory") + @JmsListener(destination = "/positions", containerFactory = "jmsListenFactory") public void receiveMessage(IposPosition pos) { - LOG.info("Data integration <" + pos + ">"); - this.mqttService.publish("test51/subscribe/positions", pos.toString(), 0, false); + LOG.trace("Data integration get: " + pos); + // we can translate a class into a string using several methods: 1) `.toString()` 2) `JsonFormat` in `ProtoJsonMap` + String jsonPos = ProtoJsonMap.toJson(pos); + if (jsonPos != null) { + this.mqttService.publish("test51/subscribe/positions", jsonPos, 0, false); + } } } diff --git a/src/main/java/ipos/project/DataModellntegration/service/ExternalCommunicationService.java b/src/main/java/ipos/project/DataModellntegration/service/ExternalPubService.java similarity index 58% rename from src/main/java/ipos/project/DataModellntegration/service/ExternalCommunicationService.java rename to src/main/java/ipos/project/DataModellntegration/service/ExternalPubService.java index 9762315298e48db277e88f1dee0ccbf6e14dbfd1..dd88adcb324c74935d6a80a003717ee8ea7355de 100644 --- a/src/main/java/ipos/project/DataModellntegration/service/ExternalCommunicationService.java +++ b/src/main/java/ipos/project/DataModellntegration/service/ExternalPubService.java @@ -1,6 +1,5 @@ package ipos.project.DataModellntegration.service; -public interface ExternalCommunicationService { +public interface ExternalPubService { void publish(final String topic, final String msg , int qos, boolean retained); - void subscribe(final String topic, int qos); } diff --git a/src/main/java/ipos/project/DataModellntegration/service/impl/ExtMqttServiceImpl.java b/src/main/java/ipos/project/DataModellntegration/service/impl/ExtMqttServiceImpl.java deleted file mode 100644 index 1989f3d124e6395006743bde4d82a76f1355f2fd..0000000000000000000000000000000000000000 --- a/src/main/java/ipos/project/DataModellntegration/service/impl/ExtMqttServiceImpl.java +++ /dev/null @@ -1,49 +0,0 @@ -package ipos.project.DataModellntegration.service.impl; - -import com.fasterxml.jackson.databind.ObjectMapper; -import ipos.project.DataModellntegration.service.ExternalCommunicationService; -import org.eclipse.paho.client.mqttv3.MqttAsyncClient; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.eclipse.paho.client.mqttv3.MqttMessage; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Service; - -@Service -public class ExtMqttServiceImpl implements ExternalCommunicationService { - - private final Logger LOG = LoggerFactory.getLogger(getClass()); - - private final MqttAsyncClient mqttClient; - - @Autowired - public ExtMqttServiceImpl(MqttAsyncClient mqttClient) { - this.mqttClient = mqttClient; - } - - // ====== Publish - // --- Client Json objects - public void publish(final String topic, final String msg , int qos, boolean retained) { - MqttMessage mqttMessage = new MqttMessage(); - mqttMessage.setPayload(msg.getBytes()); - mqttMessage.setQos(qos); - mqttMessage.setRetained(retained); - - try { - mqttClient.publish(topic, mqttMessage); - } catch (MqttException e) { - e.printStackTrace(); - } - } - - // ====== Subscribe - public void subscribe(final String topic, int qos) { - LOG.info(">> Subscribe on " + topic); - try { - mqttClient.subscribe(topic, qos); - } catch (MqttException e) { - e.printStackTrace(); - } - } -} diff --git a/src/main/java/ipos/project/DataModellntegration/service/impl/ExternalPubServiceImpl.java b/src/main/java/ipos/project/DataModellntegration/service/impl/ExternalPubServiceImpl.java new file mode 100644 index 0000000000000000000000000000000000000000..52858352cf218b32556f5bdc90a933e15bcc5016 --- /dev/null +++ b/src/main/java/ipos/project/DataModellntegration/service/impl/ExternalPubServiceImpl.java @@ -0,0 +1,34 @@ +package ipos.project.DataModellntegration.service.impl; + +import ipos.project.DataModellntegration.service.ExternalPubService; +import org.eclipse.paho.client.mqttv3.MqttAsyncClient; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +@Service +public class ExternalPubServiceImpl implements ExternalPubService { + + private final MqttAsyncClient mqttClient; + + @Autowired + public ExternalPubServiceImpl(MqttAsyncClient mqttClient) { + this.mqttClient = mqttClient; + } + + public void publish(final String topic, final String msg , int qos, boolean retained) { + MqttMessage mqttMessage = new MqttMessage(); + mqttMessage.setPayload(msg.getBytes()); + mqttMessage.setQos(qos); + mqttMessage.setRetained(retained); + + try { + mqttClient.publish(topic, mqttMessage); + } catch (MqttException e) { + e.printStackTrace(); + } + } +} diff --git a/src/main/java/ipos/project/InitSubscribe.java b/src/main/java/ipos/project/InitSubscribe.java deleted file mode 100644 index a86753fbe9de232ce5dc80da87192dd93b2fe6be..0000000000000000000000000000000000000000 --- a/src/main/java/ipos/project/InitSubscribe.java +++ /dev/null @@ -1,38 +0,0 @@ -package ipos.project; - -import ipos.models.SimpleScene.IposPosition; -import ipos.project.DataModellntegration.service.impl.ExtMqttServiceImpl; -import org.eclipse.paho.client.mqttv3.MqttException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.event.ContextRefreshedEvent; -import org.springframework.context.event.EventListener; -import org.springframework.stereotype.Component; - -@Component -public class InitSubscribe { - private final Logger LOG = LoggerFactory.getLogger(getClass()); - - @Autowired - private ExtMqttServiceImpl messagingService; - - // intent: example of dynamic subscription - @EventListener - void initiateSendingMessage(ContextRefreshedEvent event) throws InterruptedException, MqttException { - Thread.sleep(6000); -// this.messagingService.subscribe("test51/subscribe/positions", 0); -// this.messagingService.subscribe("test51/publish/positions", 0); - -// IposPosition pos = IposPosition.newBuilder() -// .setX(1) -// .setY(2) -// .setZ(3) -// .setAccuracy((float) 0.9) -// .build(); -// -// LOG.info(pos.toString()); - } - -} diff --git a/src/main/java/ipos/project/SensorValueIntegration/api/MqttPositionHandler.java b/src/main/java/ipos/project/SensorValueIntegration/api/MqttPositionHandler.java index 72f6325283e613d2dec5eaead306466ea975c2c5..900e8bdee1946ee97cbe6fa23c806580840400ae 100644 --- a/src/main/java/ipos/project/SensorValueIntegration/api/MqttPositionHandler.java +++ b/src/main/java/ipos/project/SensorValueIntegration/api/MqttPositionHandler.java @@ -32,7 +32,7 @@ public class MqttPositionHandler implements Handler { assert pos != null; LOG.info("IposPosition received: \n" + pos); - this.jmsTemplate.convertAndSend("/positions", pos.toByteArray()); // submit position to the internal broker + this.jmsTemplate.convertAndSend("/positions", pos); // submit position to the internal broker //TODO: fix message converter for the internal broker } catch (InvalidProtocolBufferException e) { LOG.error("Invalid mqtt message:", e); diff --git a/src/main/java/ipos/project/UseCaseController/PositionMonitoring.java b/src/main/java/ipos/project/UseCaseController/PositionMonitoring.java index 1d79c4249eed070b7c0660bbb2b11e29047bcab3..88a84405b0f26bf65943970eb08515bef9654339 100644 --- a/src/main/java/ipos/project/UseCaseController/PositionMonitoring.java +++ b/src/main/java/ipos/project/UseCaseController/PositionMonitoring.java @@ -11,7 +11,7 @@ import ipos.models.SimpleScene.IposPosition; public class PositionMonitoring { private final Logger LOG = LoggerFactory.getLogger(getClass()); - @JmsListener(destination = "/positions123", containerFactory = "myFactory") + @JmsListener(destination = "/positions123", containerFactory = "jmsListenFactory") public void receiveMessage(IposPosition pos) { LOG.info("Received <" + pos + ">"); } diff --git a/src/main/java/ipos/project/config/JmsConfig.java b/src/main/java/ipos/project/config/JmsConfig.java index e5bda5948891099556bb85f5b25a8fcc7f850f38..c2f5fe00c4d772fe2c5a23936eaf818b56ac76e6 100644 --- a/src/main/java/ipos/project/config/JmsConfig.java +++ b/src/main/java/ipos/project/config/JmsConfig.java @@ -6,9 +6,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory; -import org.springframework.jms.support.converter.MappingJackson2MessageConverter; -import org.springframework.jms.support.converter.MessageConverter; -import org.springframework.jms.support.converter.MessageType; import javax.jms.ConnectionFactory; @@ -17,23 +14,13 @@ import javax.jms.ConnectionFactory; public class JmsConfig { @Bean - public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory, - DefaultJmsListenerContainerFactoryConfigurer configurer) { + public JmsListenerContainerFactory<?> jmsListenFactory(ConnectionFactory connectionFactory, + DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); // This provides all boot's default to this factory, including the message converter configurer.configure(factory, connectionFactory); - // You could still override some of Boot's default if necessary. return factory; } - @Bean // Serialize message content to bytes - public MessageConverter jacksonJmsMessageConverter() { -// MarshallingMessageConverter converter = new MarshallingMessageConverter(); -// converter.setTargetType(MessageType.BYTES); -// return converter; - MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter(); - converter.setTargetType(MessageType.TEXT); - converter.setTypeIdPropertyName("_type"); - return converter; - } + // + write custom `MessageConverter` } diff --git a/src/main/java/ipos/project/config/MqttConfig.java b/src/main/java/ipos/project/config/MqttConfig.java index ee61cdf2170c3835866cc6d0c4973268c0f5e73c..d83caac52bce31f696d7bad563e9563347307881 100644 --- a/src/main/java/ipos/project/config/MqttConfig.java +++ b/src/main/java/ipos/project/config/MqttConfig.java @@ -46,7 +46,7 @@ public class MqttConfig { "\n\tTopic: " + topic + "\n\tMessage: " + new String(message.getPayload()) + "\n\tQoS: " + message.getQos() + "\n"); - messageHandler.getMessageHandler(topic).handle(message); + messageHandler.getMessageHandler(topic).handle(message); // classes with @MqttListener annotations } public void connectionLost(Throwable cause) { diff --git a/src/main/java/ipos/project/config/mqtt/MqttHandlerMap.java b/src/main/java/ipos/project/config/mqtt/MqttHandlerMap.java index 497e36427065f90a34f3f7a59112f3821ec0a98e..2300d94ef9380ba61b454b928ae363216d62c762 100644 --- a/src/main/java/ipos/project/config/mqtt/MqttHandlerMap.java +++ b/src/main/java/ipos/project/config/mqtt/MqttHandlerMap.java @@ -29,7 +29,7 @@ public class MqttHandlerMap { private Map<String, Handler> findMessageHandlers() { /** - * Find beans with @MqttHandler annotation + * Find beans with @MqttHandler annotation. One topic = One handler (TODO: one-to-many) * @return Map(key=Topic name, value=Handler container) */ diff --git a/src/main/java/ipos/project/mapper/ProtoJsonMap.java b/src/main/java/ipos/project/mapper/ProtoJsonMap.java index 286e8318468887aa3ba2a46d604abc29fb939758..00c924f85c6b0b7626e16a32f8dabd0e9e69cc7b 100644 --- a/src/main/java/ipos/project/mapper/ProtoJsonMap.java +++ b/src/main/java/ipos/project/mapper/ProtoJsonMap.java @@ -17,8 +17,13 @@ public class ProtoJsonMap { private static final Logger LOG = LoggerFactory.getLogger(ProtoJsonMap.class); - public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { - return JsonFormat.printer().print(messageOrBuilder); + public static String toJson(MessageOrBuilder messageOrBuilder) { + try { + return JsonFormat.printer().print(messageOrBuilder); + } catch (InvalidProtocolBufferException e) { + LOG.error("JSON converting error: ", e); + return null; + } } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7c64ef7bfff87c16ac0974ba7ae2ce7490f1379e..c41bd9578be586635dc79a6fba72d436dda7d683 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -6,10 +6,15 @@ server: mqtt: -# automaticReconnect: true -# cleanSession: true -# setKeepAliveInterval: 10 -# connectionTimeout: 20 - clientId: ipos.project.12312312324324 + automaticReconnect: true + cleanSession: true + setKeepAliveInterval: 10 + connectionTimeout: 20 + clientId: ipos.project hostname: broker.hivemq.com - port: 1883 \ No newline at end of file + port: 1883 + +spring: + activemq: + packages: + trusted: ipos.models, com.google.protobuf \ No newline at end of file