Skip to content
Snippets Groups Projects
Commit a205dd29 authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

shortening

parent e990ce6c
No related branches found
No related tags found
No related merge requests found
......@@ -24,9 +24,11 @@ dependencies {
// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation group: 'com.fasterxml.jackson.core', name: 'jackson-databind', version: '2.13.3'
compile fileTree(include: ['pnml-relast-engine-fatjar-0.1.jar'], dir: './libs')
implementation fileTree(include: ['pnml-relast-engine-fatjar-0.1.jar'], dir: './libs')
implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5'
testCompile group: 'junit', name: 'junit', version: '4.12'
testImplementation group: 'junit', name: 'junit', version: '4.12'
}
jar {
......
......@@ -5,52 +5,66 @@ import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import de.tudresden.inf.st.pnml.engine.execution.TransitionHandler;
import de.tudresden.inf.st.sorting.constants.TokenConstants;
import de.tudresden.inf.st.sorting.mqtt.MessageListener;
import java.lang.reflect.Type;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
public class SensorDataProcessingHandler extends TransitionHandler {
final static Function<List<Map<String, Object>>, List<Map<String, Object>>> PROCESS_DATA_HANDLING_FUNCTION = maps -> {
System.out.println("Executing GET_DATA_HANDLING_FUNCTION.");
MessageListener listener = new MessageListener();
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.submit(() -> {
listener.listen(message -> {
System.out.println("Received message: " + message);
String broker = "tcp://localhost:1883"; // Change to your broker's address
String topic = "mocksensor";
String clientId = "MqttListenerClient";
try {
MqttClient client = new MqttClient(broker, clientId, new MqttDefaultFilePersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setCleanSession(true);
client.connect(connOpts);
System.out.println("Connected to broker: " + broker);
// Create a listener to handle incoming messages
client.subscribe(topic, (receivedTopic, message) -> {
for (Map<String, Object> m : maps) {
if (Long.parseLong(message) < 4) {
String payload = new String(message.getPayload(), StandardCharsets.UTF_8);
if (Long.parseLong(payload) < 4) {
m.replace(TokenConstants.HUMAN_DETECTED, "true");
} else {
m.replace(TokenConstants.HUMAN_DETECTED, "false");
}
}
listener.stop();
});
client.disconnect();
System.out.println("Disconnected after processing the message.");
System.exit(0);
});
// Wait for a maximum of 2 seconds
try {
if (!listener.awaitStop(2, TimeUnit.SECONDS)) {
System.out.println("Timeout reached, no message received.");
Thread.sleep(2000);
if (client.isConnected()) {
System.out.println("No message received within 2 seconds.");
for (Map<String, Object> m : maps) {
m.replace(TokenConstants.HUMAN_DETECTED, "true");
}
}
} catch (InterruptedException e) {
System.err.println("Thread interrupted while waiting: " + e.getMessage());
} finally {
executor.shutdown();
client.disconnect();
System.out.println("Disconnected.");
}
} catch (MqttException | InterruptedException ignored) {}
return maps;
};
......
package de.tudresden.inf.st.sorting.mqtt;
// Callback Interface
@FunctionalInterface
public interface MessageCallback {
void onMessage(String message);
}
package de.tudresden.inf.st.sorting.mqtt;
import java.util.concurrent.TimeUnit;
// TASK 2 START //
public class MessageListener {
private volatile boolean running = true;
// Simulate listening for a message
public void listen(MessageCallback callback) {
try {
// Simulate receiving a message after 1 second
Thread.sleep(1000);
if (running) {
callback.onMessage("Hello, this is a test message!");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.err.println("Listener interrupted: " + e.getMessage());
}
}
// Stop the listener
public void stop() {
running = false;
}
// Await until the listener stops or timeout is reached
public boolean awaitStop(long timeout, TimeUnit unit) throws InterruptedException {
long startTime = System.nanoTime();
while (running && (System.nanoTime() - startTime) < unit.toNanos(timeout)) {
Thread.sleep(100); // Check every 100ms
}
return !running;
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment