From 3c405fe4740323921825ed829bbbf9d1d850f1a7 Mon Sep 17 00:00:00 2001 From: SebastianEbert <sebastian.ebert@tu-dresden.de> Date: Wed, 5 Jul 2023 17:59:08 +0200 Subject: [PATCH] client side service engine --- .../balloonMarking/BalloonExecution.jadd | 39 +++++- .../de/tudresden/inf/st/pnml/engine/Main.java | 92 ++------------ .../st/pnml/engine/event/DiNeRosEvent.java | 22 ++++ .../pnml/engine/event/DiNeRosEventTypes.java | 11 ++ .../inf/st/pnml/engine/ros/DiNeRosNode.java | 112 +++++++++++++----- .../pnml/engine/ros/RosCommunicationUtil.java | 22 +--- .../engine/transform/PetriNetInitializer.java | 4 +- .../resources/nets/NodeNetTopicSubTest.pnml | 43 +++++++ 8 files changed, 216 insertions(+), 129 deletions(-) create mode 100644 src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEvent.java create mode 100644 src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEventTypes.java create mode 100644 src/main/resources/nets/NodeNetTopicSubTest.pnml diff --git a/src/main/jastadd/engine/balloonMarking/BalloonExecution.jadd b/src/main/jastadd/engine/balloonMarking/BalloonExecution.jadd index fb1f36f..a84c5f2 100644 --- a/src/main/jastadd/engine/balloonMarking/BalloonExecution.jadd +++ b/src/main/jastadd/engine/balloonMarking/BalloonExecution.jadd @@ -1,3 +1,5 @@ +import de.tudresden.inf.st.pnml.engine.event.*; + aspect BalloonExecution{ syn FiringSelectionSuccess TransitionSelectionResult.asFiringSelectionSuccess()=null; @@ -75,9 +77,34 @@ aspect BalloonExecution{ // build set of direct / indirect outgoing places and remove from them Set<Place> outgoingPlaces = transition.resolveOutputPlaces(); - - // place a token in each outgoing place + HashMap<String, ArrayList<String>> reqMap = getPetriNet().getChannelElemensByKey(PnmlConstants.CHANNEL_PLACE_TYPE_CLIENT_REQ_KEY); + HashMap<String, ArrayList<String>> pubMap = getPetriNet().getChannelElemensByKey(PnmlConstants.CHANNEL_PLACE_TYPE_PUB_KEY); + boolean connectedToReqPlace = false; + String connectedReqPlaceId = null; + boolean connectedToPubPlace = false; + String connectedPubPlaceId = null; + + // place a token in each outgoing place for(Place place:outgoingPlaces){ + + for(Map.Entry<String, ArrayList<String>> entry : reqMap.entrySet()){ + for(String s : entry.getValue()){ + if(s.equals(place.getId()) || (place.getId().startsWith(s) && place.getId().contains("-INSTANCE-"))){ + connectedToReqPlace = true; + connectedReqPlaceId = place.getId(); + } + } + } + + for(Map.Entry<String, ArrayList<String>> entry : reqMap.entrySet()){ + for(String s : entry.getValue()){ + if(s.equals(place.getId()) || (place.getId().startsWith(s) && place.getId().contains("-INSTANCE-"))){ + connectedToPubPlace = true; + connectedPubPlaceId = place.getId(); + } + } + } + BalloonMarkedPlace bmp=this.resolveBalloonPlace(place); org.ros.node.topic.Publisher<std_msgs.String> pub = node.getPublisherByPlaceId(place.getId()); @@ -96,7 +123,13 @@ aspect BalloonExecution{ this.flushTreeCache(); } - node.notify(node.NOTIFICATION_MARKING_CHANGE); + if(connectedToReqPlace){ + node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ, connectedReqPlaceId)); + } else if(connectedToPubPlace){ + node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB, connectedPubPlaceId)); + } else { + node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE)); + } } return Optional.of(this); diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/Main.java b/src/main/java/de/tudresden/inf/st/pnml/engine/Main.java index 648bc62..d6e9ec4 100644 --- a/src/main/java/de/tudresden/inf/st/pnml/engine/Main.java +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/Main.java @@ -21,43 +21,30 @@ public class Main { public static void main(java.lang.String[] args) { - // topicTest(); + topicTest(); // testBaseNet(); // testServiceNets(); - isTest(); + // isTest(); + } + + private static void topicTest() { + doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetTopicSubTest.pnml"); } @SuppressWarnings("unused") private static void testBaseNet() { - - java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/NodeNetTest.pnml"; - PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0); - - nodeConfiguration.setMasterUri(URI.create("http://localhost:11311")); - - DinerosTestNode diNeRosNode = new DinerosTestNode("TestNode", petriNet); - - for(BalloonTransition bt : diNeRosNode.callbackStorage.getTransitionList()){ - bt.getBalloonCallbacks().add(new DefaultFinalTransitionHandler("default_final_cb", 1)); - } - - new Thread(() -> nodeMainExecutor.execute(diNeRosNode, nodeConfiguration)) {{ - start(); - }}; - - try { - Thread.sleep(2000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetTest.pnml"); } + @SuppressWarnings("unused") private static void isTest() { + doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetSignalTest.pnml"); + } - java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/NodeNetSignalTest.pnml"; + private static void doTest(String pnmlPath) { PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0); nodeConfiguration.setMasterUri(URI.create("http://localhost:11311")); @@ -107,63 +94,6 @@ public class Main { .dumpAsPNG(Paths.get(filename + ".png")); } - @SuppressWarnings("unused") - private static void isTest() throws IOException { - java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/is-test-2.pnml"; - PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0); - - System.out.println("Initial Signal Values: "); - for(Transition t : petriNet.allTransitions()){ - for(InputSignalBinding staticIsb : t.asInputSignalTransition().getStaticInputSignalBindingList()){ - System.out.println(staticIsb.getInputSignalID() + " : " + staticIsb.getInputSignalValue()); - } - } - - PetriNetInitializer.initInputSignalConnections(petriNet, "localhost", "mqtt"); - - try { - System.out.println("Sleeping for 10s"); - Thread.sleep(10000); - } catch (InterruptedException e) { - e.printStackTrace(); - } - - BalloonMarking bm = null; - try { - bm = petriNet.initializeBalloonMarking(); - } catch (IOException | SAXException | ParserConfigurationException e) { - e.printStackTrace(); - } - BalloonCallbackStorage bcs = petriNet.initializeCallbackStorage(); - - TransitionCallback tc = new DefaultFinalTransitionCallback("default_final_cb", 1); - List<TransitionCallback> tcl = new ArrayList<>(); - tcl.add(tc); - - for(Transition t : petriNet.allTransitions()){ - bcs.resolveBalloonTransition(t).setBalloonCallbacks(tcl); - } - - System.out.println("-- INITIAL MARKING --"); - assert bm != null; - System.out.println(bm.print()); - - - for(Transition t : petriNet.allTransitions()){ - bm.fireTransition(t, bcs, true); - } - - System.out.println("Updated Signal Values: "); - for(Transition t : petriNet.allTransitions()){ - for(InputSignalBinding mutualLsb : t.asInputSignalTransition().getMutualInputSignalBindingList()){ - System.out.println(mutualLsb.getInputSignalID() + " : " + mutualLsb.getInputSignalValue()); - } - } - - System.out.println("-- FINAL MARKING --"); - System.out.println(bm.print()); - - } @SuppressWarnings("unused") private static void testServiceNets() { diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEvent.java b/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEvent.java new file mode 100644 index 0000000..ec82363 --- /dev/null +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEvent.java @@ -0,0 +1,22 @@ +package de.tudresden.inf.st.pnml.engine.event; + +public class DiNeRosEvent { + + public String eventType; + public String payload; + + public DiNeRosEvent(String eventType) { + this(eventType, null); + } + + public DiNeRosEvent(String eventType, String payload) { + if(eventType.equals(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE) || + eventType.equals(DiNeRosEventTypes.NOTIFICATION_WAIT_ENDED) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED) || + eventType.equals(DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB)) { + this.eventType = eventType; + } else { + throw new RuntimeException("Invalid event type used!"); + } + this.payload = payload; + } +} diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEventTypes.java b/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEventTypes.java new file mode 100644 index 0000000..19e441e --- /dev/null +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/event/DiNeRosEventTypes.java @@ -0,0 +1,11 @@ +package de.tudresden.inf.st.pnml.engine.event; + +public class DiNeRosEventTypes { + + public static final String NOTIFICATION_MARKING_CHANGE = "markingChange"; + public static final String NOTIFICATION_SIGNAL_CHANGE = "signalChange"; + public static final String NOTIFICATION_WAIT_ENDED = "waitEnded"; + public static final String NOTIFICATION_STARTUP_ENDED = "startEnded"; + public static final String NOTIFICATION_SERVICE_REQ = "serviceReq"; + public static final String NOTIFICATION_TOPIC_PUB = "topicPub"; +} diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/ros/DiNeRosNode.java b/src/main/java/de/tudresden/inf/st/pnml/engine/ros/DiNeRosNode.java index aff9859..2c7aff3 100644 --- a/src/main/java/de/tudresden/inf/st/pnml/engine/ros/DiNeRosNode.java +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/ros/DiNeRosNode.java @@ -2,16 +2,25 @@ package de.tudresden.inf.st.pnml.engine.ros; import de.tudresden.inf.st.pnml.base.constants.PnmlConstants; import de.tudresden.inf.st.pnml.base.data.ClauseValuesDefinition; +import de.tudresden.inf.st.pnml.engine.event.DiNeRosEvent; +import de.tudresden.inf.st.pnml.engine.event.DiNeRosEventTypes; import de.tudresden.inf.st.pnml.engine.transform.PetriNetInitializer; import de.tudresden.inf.st.pnml.jastadd.model.*; import org.jetbrains.annotations.NotNull; import org.ros.concurrent.CancellableLoop; +import org.ros.exception.RemoteException; +import org.ros.exception.ServiceNotFoundException; import org.ros.namespace.GraphName; import org.ros.node.AbstractNodeMain; import org.ros.node.ConnectedNode; +import org.ros.node.service.ServiceClient; +import org.ros.node.service.ServiceResponseListener; import org.ros.node.topic.Publisher; import org.ros.node.topic.Subscriber; import org.xml.sax.SAXException; +import rosjava_srv.StringService; +import rosjava_srv.StringServiceRequest; +import rosjava_srv.StringServiceResponse; import std_msgs.String; import javax.xml.parsers.ParserConfigurationException; @@ -21,11 +30,6 @@ import java.util.concurrent.TimeUnit; public abstract class DiNeRosNode extends AbstractNodeMain { - public static final java.lang.String NOTIFICATION_MARKING_CHANGE = "markingChange"; - public static final java.lang.String NOTIFICATION_SIGNAL_CHANGE = "signalChange"; - public static final java.lang.String NOTIFICATION_WAIT_ENDED = "waitEnded"; - public static final java.lang.String NOTIFICATION_STARTUP_ENDED = "startEnded"; - public final java.lang.String nodeName; public final PetriNet petriNet; protected BalloonMarking marking; @@ -79,27 +83,92 @@ public abstract class DiNeRosNode extends AbstractNodeMain { this.connectedNode.shutdown(); } - public synchronized void notify(java.lang.String notificationType) { + public synchronized void notify(DiNeRosEvent event) { Set<Transition> signalFilteredTransitions = getSignalFilteredTransitions(); - switch (notificationType) { - case NOTIFICATION_MARKING_CHANGE: + switch (event.eventType) { + case DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE: onMarkingChangeInternal(signalFilteredTransitions); break; - case NOTIFICATION_SIGNAL_CHANGE: + case DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE: onSignalChangeInternal(signalFilteredTransitions); break; - case NOTIFICATION_WAIT_ENDED: + case DiNeRosEventTypes.NOTIFICATION_WAIT_ENDED: onWaitEndedInternal(signalFilteredTransitions); break; - case NOTIFICATION_STARTUP_ENDED: + case DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED: onStartupEndedInternal(signalFilteredTransitions); break; + case DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ: + onServiceRequestAvailableOnClientSide(event.payload); + break; + case DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB: + onTopicPublisherAvailable(event.payload); + break; } } + private void onTopicPublisherAvailable(java.lang.String placeId){ + + java.lang.String[] split = placeId.split("-"); + java.lang.String originalId = split[0]; + java.lang.String topicName = petriNet.getPortNameByPlaceId(originalId); + Place place = petriNet.getPlaceById(placeId); + BalloonMarkedPlace bmp = this.marking.resolveBalloonPlace(place); + RosCommunicationUtil.publish(topicName, bmp.getBalloonMarking(0).getValue().getBytes(), + this.connectedNode, getPublisherByPlaceId(placeId)); + bmp.getBalloonMarking(0).removeSelf(); + petriNet.flushTreeCache(); + this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE)); + + } + + private void onServiceRequestAvailableOnClientSide(java.lang.String placeId){ + + ServiceClient<StringServiceRequest, StringServiceResponse> serviceClient; + + // crop name if place is within a server instance + // Naming pattern is here: original PlaceID + "-INSTANCE-" + instanceId + java.lang.String[] split = placeId.split("-"); + java.lang.String originalId = split[0]; + + + java.lang.String serviceName = petriNet.getPortNameByPlaceId(originalId); + java.lang.String cResPlaceId = petriNet.getServiceClientResponsePlaceId(originalId) + "-INSTANCE-" + placeId.split("-")[split.length-1]; + Place targetPlace = petriNet.getPlaceById(cResPlaceId); + Place sourcePlace = petriNet.getPlaceById(placeId); + BalloonMarkedPlace bmpSource = this.marking.resolveBalloonPlace(sourcePlace); + BalloonMarkedPlace bmpTarget = this.marking.resolveBalloonPlace(targetPlace); + + try { + serviceClient = connectedNode.newServiceClient(serviceName, StringService._TYPE); + final StringServiceRequest request = serviceClient.newMessage(); + request.setInput(bmpSource.getBalloonMarking(0).getValue()); + + serviceClient.call(request, new ServiceResponseListener<>() { + + @Override + public void onSuccess(StringServiceResponse stringServiceResponse) { + bmpSource.getBalloonMarking(0).removeSelf(); + bmpTarget.addBalloonMarking(new BalloonToken(stringServiceResponse.getOutput())); + } + + @Override + public void onFailure(RemoteException e) { + System.err.println("Error while calling: token will not be removed!"); + } + }); + + serviceClient.shutdown(); + this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE)); + + } catch (ServiceNotFoundException e) { + System.err.println("Error while calling service. "); + } + } + @NotNull protected Set<Transition> getSignalFilteredTransitions() { Set<Transition> signalFilteredTransitions = new HashSet<>(); @@ -130,13 +199,12 @@ public abstract class DiNeRosNode extends AbstractNodeMain { return signalFilteredTransitions; } - @NotNull - private TransitionSelectionResult getTransitionSelectionResult(TransitionSelectionResult res) { + private void getTransitionSelectionResult(TransitionSelectionResult res) { if (res.isFiringSelectionWait()) { try { TimeUnit.MILLISECONDS.sleep(res.asFiringSelectionWait().getWaitingTime()); onWaitEndedInternal(getSignalFilteredTransitions()); - return res; + return; } catch (InterruptedException e) { e.printStackTrace(); } @@ -144,14 +212,13 @@ public abstract class DiNeRosNode extends AbstractNodeMain { if (res.isFiringSelectionFail()) { System.err.println("[DiNeROS-Node] [" + nodeName + "] Firing selection action failed!"); - return res; + return; } System.out.println("FIRING TRANSITION"); marking.fireTransition(res.asFiringSelectionSuccess() .getTransition(), callbackStorage, inputSignalConnector, this, true); - return res; } private void onStartupEndedInternal(Set<Transition> enabledTransitions) { @@ -208,9 +275,6 @@ public abstract class DiNeRosNode extends AbstractNodeMain { System.out.println("[INIT NODE] Initializing node: " + nodeName); - // init service clients - // TODO - // init service servers (deep copies) // TODO @@ -243,6 +307,7 @@ public abstract class DiNeRosNode extends AbstractNodeMain { BalloonMarkedPlace bmp= marking.resolveBalloonPlace(targetPlace); bmp.getBalloonMarkingList().add(bt); petriNet.flushTreeCache(); + this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE)); }, petriNet.getChannelElementLimitById(placeId)); @@ -251,14 +316,7 @@ public abstract class DiNeRosNode extends AbstractNodeMain { } System.out.println("[INIT NODE] Executing first marking query."); - // TransitionSelectionResult firstFiringResult = - this.notify(NOTIFICATION_STARTUP_ENDED); - - /* if(firstFiringResult.isFiringSelectionSuccess()){ - marking.fireTransition(firstFiringResult.asFiringSelectionSuccess() - .getTransition(), callbackStorage, inputSignalConnector, this,true); - }*/ - + this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED)); this.internalNodeLoop(); } diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/ros/RosCommunicationUtil.java b/src/main/java/de/tudresden/inf/st/pnml/engine/ros/RosCommunicationUtil.java index a75a2cd..48ee959 100644 --- a/src/main/java/de/tudresden/inf/st/pnml/engine/ros/RosCommunicationUtil.java +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/ros/RosCommunicationUtil.java @@ -15,10 +15,13 @@ import org.ros.message.MessageListener; public class RosCommunicationUtil { - public static boolean publish(java.lang.String topic, byte[] msgContent, ConnectedNode node){ + public static boolean publish(java.lang.String topic, byte[] msgContent, ConnectedNode node, Publisher<std_msgs.String> pub){ System.out.println("Publishing new message to " + topic); - Publisher<std_msgs.String> pub = node.newPublisher(topic, std_msgs.String._TYPE); + + if(pub == null){ + pub = node.newPublisher(topic, std_msgs.String._TYPE); + } if(pub == null){ while(true){ @@ -36,19 +39,4 @@ public class RosCommunicationUtil { } return false; } - - public static Subscriber listen(java.lang.String topic, ConnectedNode node, Transition t, PetriNet pn, BalloonMarking bm){ - - Subscriber<String> subscriber = node.newSubscriber(topic, std_msgs.String._TYPE); - // subscriber.addMessageListener(message -> callback.accept(topic, message.getData().getBytes())); - - subscriber.addMessageListener(message -> { - - - - }); - - return subscriber; - - } } diff --git a/src/main/java/de/tudresden/inf/st/pnml/engine/transform/PetriNetInitializer.java b/src/main/java/de/tudresden/inf/st/pnml/engine/transform/PetriNetInitializer.java index af9ff83..f36c3af 100644 --- a/src/main/java/de/tudresden/inf/st/pnml/engine/transform/PetriNetInitializer.java +++ b/src/main/java/de/tudresden/inf/st/pnml/engine/transform/PetriNetInitializer.java @@ -1,5 +1,7 @@ package de.tudresden.inf.st.pnml.engine.transform; +import de.tudresden.inf.st.pnml.engine.event.DiNeRosEvent; +import de.tudresden.inf.st.pnml.engine.event.DiNeRosEventTypes; import de.tudresden.inf.st.pnml.engine.ros.DiNeRosNode; import de.tudresden.inf.st.pnml.jastadd.model.*; @@ -100,7 +102,7 @@ public class PetriNetInitializer { public static void notifyOnSignalChange(DiNeRosNode node){ System.out.println("NOTIFY ON SIGNAL CHANGE"); - node.notify(DiNeRosNode.NOTIFICATION_SIGNAL_CHANGE); + node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE)); } public void init(PetriNet petriNet, String host, String protocol, InputSignalConnector isc, DiNeRosNode node){ diff --git a/src/main/resources/nets/NodeNetTopicSubTest.pnml b/src/main/resources/nets/NodeNetTopicSubTest.pnml new file mode 100644 index 0000000..9d98c9a --- /dev/null +++ b/src/main/resources/nets/NodeNetTopicSubTest.pnml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<pnml + xmlns="http://www.pnml.org/version-2009/grammar/pnml"> + <net id="NodeNetTest-0" type="http://www.pnml.org/version-2009/grammar/ptnet"> + <name> + <text>NodeNetTest-2</text> + </name> + <toolspecific tool="de.tudresden.inf.st.pnml.distributedPN" version="0.1"> + <ports> + <port name="topicP1" placeType="sub" limit="5">p1</port> + <port name="serviceP1" placeType="creq" cResponsePlace="p2">p1</port> + <port name="serviceP1" placeType="cres" cRequestPlace="p1">p1</port> + </ports> + </toolspecific> + + <page id="top"> + <page id="sourcePage"> + <place id="p1"> + <toolspecific tool="de.tudresden.inf.st.pnml.distributedPN" version="0.1"> + <node>n1</node> + <subnet>s1</subnet> + <balloonMarking> + <tokens> + </tokens> + </balloonMarking> + </toolspecific> + <initialMarking> + <text>0</text> + </initialMarking> + <name> + <text>p1</text> + <graphics> + <offset x="0" y="0" /> + </graphics> + </name> + <graphics> + <position x="0" y="0" /> + </graphics> + </place> + </page> + </page> + </net> +</pnml> \ No newline at end of file -- GitLab