Skip to content
Snippets Groups Projects
Commit ec11e612 authored by Oleksandr Husak's avatar Oleksandr Husak
Browse files

message has been sent to the system and a reply has been received

parent 9475e1fa
Branches
No related tags found
No related merge requests found
Showing
with 70 additions and 73 deletions
package ipos.project.DataModellntegration; package ipos.project.DataModellntegration;
import ipos.models.SimpleScene.IposPosition; import ipos.models.SimpleScene.IposPosition;
import ipos.project.DataModellntegration.service.impl.ExtMqttServiceImpl; import ipos.project.DataModellntegration.service.impl.ExternalPubServiceImpl;
import ipos.project.mapper.ProtoJsonMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Autowired;
...@@ -13,11 +14,15 @@ public class SimpleSceneIntegration { ...@@ -13,11 +14,15 @@ public class SimpleSceneIntegration {
private final Logger LOG = LoggerFactory.getLogger(getClass()); private final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired @Autowired
private ExtMqttServiceImpl mqttService; private ExternalPubServiceImpl mqttService;
@JmsListener(destination = "/positions", containerFactory = "myFactory") @JmsListener(destination = "/positions", containerFactory = "jmsListenFactory")
public void receiveMessage(IposPosition pos) { public void receiveMessage(IposPosition pos) {
LOG.info("Data integration <" + pos + ">"); LOG.trace("Data integration get: " + pos);
this.mqttService.publish("test51/subscribe/positions", pos.toString(), 0, false); // we can translate a class into a string using several methods: 1) `.toString()` 2) `JsonFormat` in `ProtoJsonMap`
String jsonPos = ProtoJsonMap.toJson(pos);
if (jsonPos != null) {
this.mqttService.publish("test51/subscribe/positions", jsonPos, 0, false);
}
} }
} }
package ipos.project.DataModellntegration.service; package ipos.project.DataModellntegration.service;
public interface ExternalCommunicationService { public interface ExternalPubService {
void publish(final String topic, final String msg , int qos, boolean retained); void publish(final String topic, final String msg , int qos, boolean retained);
void subscribe(final String topic, int qos);
} }
package ipos.project.DataModellntegration.service.impl; package ipos.project.DataModellntegration.service.impl;
import com.fasterxml.jackson.databind.ObjectMapper; import ipos.project.DataModellntegration.service.ExternalPubService;
import ipos.project.DataModellntegration.service.ExternalCommunicationService;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient; import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.MqttMessage;
...@@ -11,19 +10,15 @@ import org.springframework.beans.factory.annotation.Autowired; ...@@ -11,19 +10,15 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@Service @Service
public class ExtMqttServiceImpl implements ExternalCommunicationService { public class ExternalPubServiceImpl implements ExternalPubService {
private final Logger LOG = LoggerFactory.getLogger(getClass());
private final MqttAsyncClient mqttClient; private final MqttAsyncClient mqttClient;
@Autowired @Autowired
public ExtMqttServiceImpl(MqttAsyncClient mqttClient) { public ExternalPubServiceImpl(MqttAsyncClient mqttClient) {
this.mqttClient = mqttClient; this.mqttClient = mqttClient;
} }
// ====== Publish
// --- Client Json objects
public void publish(final String topic, final String msg , int qos, boolean retained) { public void publish(final String topic, final String msg , int qos, boolean retained) {
MqttMessage mqttMessage = new MqttMessage(); MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes()); mqttMessage.setPayload(msg.getBytes());
...@@ -36,14 +31,4 @@ public class ExtMqttServiceImpl implements ExternalCommunicationService { ...@@ -36,14 +31,4 @@ public class ExtMqttServiceImpl implements ExternalCommunicationService {
e.printStackTrace(); e.printStackTrace();
} }
} }
// ====== Subscribe
public void subscribe(final String topic, int qos) {
LOG.info(">> Subscribe on " + topic);
try {
mqttClient.subscribe(topic, qos);
} catch (MqttException e) {
e.printStackTrace();
}
}
} }
package ipos.project;
import ipos.models.SimpleScene.IposPosition;
import ipos.project.DataModellntegration.service.impl.ExtMqttServiceImpl;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
@Component
public class InitSubscribe {
private final Logger LOG = LoggerFactory.getLogger(getClass());
@Autowired
private ExtMqttServiceImpl messagingService;
// intent: example of dynamic subscription
@EventListener
void initiateSendingMessage(ContextRefreshedEvent event) throws InterruptedException, MqttException {
Thread.sleep(6000);
// this.messagingService.subscribe("test51/subscribe/positions", 0);
// this.messagingService.subscribe("test51/publish/positions", 0);
// IposPosition pos = IposPosition.newBuilder()
// .setX(1)
// .setY(2)
// .setZ(3)
// .setAccuracy((float) 0.9)
// .build();
//
// LOG.info(pos.toString());
}
}
...@@ -32,7 +32,7 @@ public class MqttPositionHandler implements Handler { ...@@ -32,7 +32,7 @@ public class MqttPositionHandler implements Handler {
assert pos != null; assert pos != null;
LOG.info("IposPosition received: \n" + pos); LOG.info("IposPosition received: \n" + pos);
this.jmsTemplate.convertAndSend("/positions", pos.toByteArray()); // submit position to the internal broker this.jmsTemplate.convertAndSend("/positions", pos); // submit position to the internal broker
//TODO: fix message converter for the internal broker //TODO: fix message converter for the internal broker
} catch (InvalidProtocolBufferException e) { } catch (InvalidProtocolBufferException e) {
LOG.error("Invalid mqtt message:", e); LOG.error("Invalid mqtt message:", e);
......
...@@ -11,7 +11,7 @@ import ipos.models.SimpleScene.IposPosition; ...@@ -11,7 +11,7 @@ import ipos.models.SimpleScene.IposPosition;
public class PositionMonitoring { public class PositionMonitoring {
private final Logger LOG = LoggerFactory.getLogger(getClass()); private final Logger LOG = LoggerFactory.getLogger(getClass());
@JmsListener(destination = "/positions123", containerFactory = "myFactory") @JmsListener(destination = "/positions123", containerFactory = "jmsListenFactory")
public void receiveMessage(IposPosition pos) { public void receiveMessage(IposPosition pos) {
LOG.info("Received <" + pos + ">"); LOG.info("Received <" + pos + ">");
} }
......
...@@ -6,9 +6,6 @@ import org.springframework.context.annotation.Configuration; ...@@ -6,9 +6,6 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms; import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory; import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory; import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.ConnectionFactory; import javax.jms.ConnectionFactory;
...@@ -17,23 +14,13 @@ import javax.jms.ConnectionFactory; ...@@ -17,23 +14,13 @@ import javax.jms.ConnectionFactory;
public class JmsConfig { public class JmsConfig {
@Bean @Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory, public JmsListenerContainerFactory<?> jmsListenFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) { DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter // This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory); configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
return factory; return factory;
} }
@Bean // Serialize message content to bytes // + write custom `MessageConverter`
public MessageConverter jacksonJmsMessageConverter() {
// MarshallingMessageConverter converter = new MarshallingMessageConverter();
// converter.setTargetType(MessageType.BYTES);
// return converter;
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
} }
...@@ -46,7 +46,7 @@ public class MqttConfig { ...@@ -46,7 +46,7 @@ public class MqttConfig {
"\n\tTopic: " + topic + "\n\tTopic: " + topic +
"\n\tMessage: " + new String(message.getPayload()) + "\n\tMessage: " + new String(message.getPayload()) +
"\n\tQoS: " + message.getQos() + "\n"); "\n\tQoS: " + message.getQos() + "\n");
messageHandler.getMessageHandler(topic).handle(message); messageHandler.getMessageHandler(topic).handle(message); // classes with @MqttListener annotations
} }
public void connectionLost(Throwable cause) { public void connectionLost(Throwable cause) {
......
...@@ -29,7 +29,7 @@ public class MqttHandlerMap { ...@@ -29,7 +29,7 @@ public class MqttHandlerMap {
private Map<String, Handler> findMessageHandlers() { private Map<String, Handler> findMessageHandlers() {
/** /**
* Find beans with @MqttHandler annotation * Find beans with @MqttHandler annotation. One topic = One handler (TODO: one-to-many)
* @return Map(key=Topic name, value=Handler container) * @return Map(key=Topic name, value=Handler container)
*/ */
......
...@@ -17,8 +17,13 @@ public class ProtoJsonMap { ...@@ -17,8 +17,13 @@ public class ProtoJsonMap {
private static final Logger LOG = LoggerFactory.getLogger(ProtoJsonMap.class); private static final Logger LOG = LoggerFactory.getLogger(ProtoJsonMap.class);
public static String toJson(MessageOrBuilder messageOrBuilder) throws IOException { public static String toJson(MessageOrBuilder messageOrBuilder) {
try {
return JsonFormat.printer().print(messageOrBuilder); return JsonFormat.printer().print(messageOrBuilder);
} catch (InvalidProtocolBufferException e) {
LOG.error("JSON converting error: ", e);
return null;
}
} }
@SuppressWarnings({"unchecked", "rawtypes"}) @SuppressWarnings({"unchecked", "rawtypes"})
......
...@@ -6,10 +6,15 @@ server: ...@@ -6,10 +6,15 @@ server:
mqtt: mqtt:
# automaticReconnect: true automaticReconnect: true
# cleanSession: true cleanSession: true
# setKeepAliveInterval: 10 setKeepAliveInterval: 10
# connectionTimeout: 20 connectionTimeout: 20
clientId: ipos.project.12312312324324 clientId: ipos.project
hostname: broker.hivemq.com hostname: broker.hivemq.com
port: 1883 port: 1883
spring:
activemq:
packages:
trusted: ipos.models, com.google.protobuf
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment