diff --git a/case-study-sorting-pkg/build.gradle b/case-study-sorting-pkg/build.gradle index 98433bf1d5d57e224ac4225bb7ef43c777727315..2dd873c868da72b2994003bc408be03380bd0b42 100644 --- a/case-study-sorting-pkg/build.gradle +++ b/case-study-sorting-pkg/build.gradle @@ -24,9 +24,11 @@ dependencies { // https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.3' - compile fileTree(include: ['pnml-relast-engine-fatjar-0.1.jar'], dir: './libs') + implementation fileTree(include: ['pnml-relast-engine-fatjar-0.1.jar'], dir: './libs') + implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' - testCompile group: 'junit', name: 'junit', version: '4.12' + + testImplementation group: 'junit', name: 'junit', version: '4.12' } jar { diff --git a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/handlers/SensorDataProcessingHandler.java b/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/handlers/SensorDataProcessingHandler.java index 7ee7c316285e7c38f843c281738de93e69638014..c230fe9a804a94614e97864fddc6f48470026ec2 100644 --- a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/handlers/SensorDataProcessingHandler.java +++ b/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/handlers/SensorDataProcessingHandler.java @@ -5,52 +5,66 @@ import com.google.gson.GsonBuilder; import com.google.gson.reflect.TypeToken; import de.tudresden.inf.st.pnml.engine.execution.TransitionHandler; import de.tudresden.inf.st.sorting.constants.TokenConstants; -import de.tudresden.inf.st.sorting.mqtt.MessageListener; import java.lang.reflect.Type; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.function.Function; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence; public class SensorDataProcessingHandler extends TransitionHandler { final static Function<List<Map<String, Object>>, List<Map<String, Object>>> PROCESS_DATA_HANDLING_FUNCTION = maps -> { System.out.println("Executing GET_DATA_HANDLING_FUNCTION."); - MessageListener listener = new MessageListener(); - ExecutorService executor = Executors.newSingleThreadExecutor(); - executor.submit(() -> { - listener.listen(message -> { - System.out.println("Received message: " + message); + + String broker = "tcp://localhost:1883"; // Change to your broker's address + String topic = "mocksensor"; + String clientId = "MqttListenerClient"; + + try { + MqttClient client = new MqttClient(broker, clientId, new MqttDefaultFilePersistence()); + MqttConnectOptions connOpts = new MqttConnectOptions(); + connOpts.setCleanSession(true); + + client.connect(connOpts); + System.out.println("Connected to broker: " + broker); + + // Create a listener to handle incoming messages + client.subscribe(topic, (receivedTopic, message) -> { for (Map<String, Object> m : maps) { - if (Long.parseLong(message) < 4) { + + String payload = new String(message.getPayload(), StandardCharsets.UTF_8); + + if (Long.parseLong(payload) < 4) { m.replace(TokenConstants.HUMAN_DETECTED, "true"); } else { m.replace(TokenConstants.HUMAN_DETECTED, "false"); } } - listener.stop(); + + client.disconnect(); + System.out.println("Disconnected after processing the message."); + System.exit(0); }); - }); - // Wait for a maximum of 2 seconds - try { - if (!listener.awaitStop(2, TimeUnit.SECONDS)) { - System.out.println("Timeout reached, no message received."); + Thread.sleep(2000); + + if (client.isConnected()) { + System.out.println("No message received within 2 seconds."); + for (Map<String, Object> m : maps) { m.replace(TokenConstants.HUMAN_DETECTED, "true"); } + + client.disconnect(); + System.out.println("Disconnected."); } - } catch (InterruptedException e) { - System.err.println("Thread interrupted while waiting: " + e.getMessage()); - } finally { - executor.shutdown(); - } + } catch (MqttException | InterruptedException ignored) {} return maps; }; diff --git a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageCallback.java b/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageCallback.java deleted file mode 100644 index 8b297596c46932795ddf74ae6e838933514e0af8..0000000000000000000000000000000000000000 --- a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageCallback.java +++ /dev/null @@ -1,7 +0,0 @@ -package de.tudresden.inf.st.sorting.mqtt; - -// Callback Interface -@FunctionalInterface -public interface MessageCallback { - void onMessage(String message); -} diff --git a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageListener.java b/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageListener.java deleted file mode 100644 index db011df270088198b8234446fbb682669f1a993f..0000000000000000000000000000000000000000 --- a/case-study-sorting-pkg/src/main/java/de/tudresden/inf/st/sorting/mqtt/MessageListener.java +++ /dev/null @@ -1,37 +0,0 @@ -package de.tudresden.inf.st.sorting.mqtt; - -import java.util.concurrent.TimeUnit; -// TASK 2 START // -public class MessageListener { - - private volatile boolean running = true; - - // Simulate listening for a message - public void listen(MessageCallback callback) { - try { - // Simulate receiving a message after 1 second - Thread.sleep(1000); - if (running) { - callback.onMessage("Hello, this is a test message!"); - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - System.err.println("Listener interrupted: " + e.getMessage()); - } - } - - // Stop the listener - public void stop() { - running = false; - } - - // Await until the listener stops or timeout is reached - public boolean awaitStop(long timeout, TimeUnit unit) throws InterruptedException { - long startTime = System.nanoTime(); - while (running && (System.nanoTime() - startTime) < unit.toNanos(timeout)) { - Thread.sleep(100); // Check every 100ms - } - return !running; - } -} -