Skip to content
Snippets Groups Projects
Commit 3c405fe4 authored by Sebastian Ebert's avatar Sebastian Ebert
Browse files

client side service engine

parent 4d535fe2
Branches
No related tags found
No related merge requests found
import de.tudresden.inf.st.pnml.engine.event.*;
aspect BalloonExecution{
syn FiringSelectionSuccess TransitionSelectionResult.asFiringSelectionSuccess()=null;
......@@ -75,9 +77,34 @@ aspect BalloonExecution{
// build set of direct / indirect outgoing places and remove from them
Set<Place> outgoingPlaces = transition.resolveOutputPlaces();
HashMap<String, ArrayList<String>> reqMap = getPetriNet().getChannelElemensByKey(PnmlConstants.CHANNEL_PLACE_TYPE_CLIENT_REQ_KEY);
HashMap<String, ArrayList<String>> pubMap = getPetriNet().getChannelElemensByKey(PnmlConstants.CHANNEL_PLACE_TYPE_PUB_KEY);
boolean connectedToReqPlace = false;
String connectedReqPlaceId = null;
boolean connectedToPubPlace = false;
String connectedPubPlaceId = null;
// place a token in each outgoing place
for(Place place:outgoingPlaces){
for(Map.Entry<String, ArrayList<String>> entry : reqMap.entrySet()){
for(String s : entry.getValue()){
if(s.equals(place.getId()) || (place.getId().startsWith(s) && place.getId().contains("-INSTANCE-"))){
connectedToReqPlace = true;
connectedReqPlaceId = place.getId();
}
}
}
for(Map.Entry<String, ArrayList<String>> entry : reqMap.entrySet()){
for(String s : entry.getValue()){
if(s.equals(place.getId()) || (place.getId().startsWith(s) && place.getId().contains("-INSTANCE-"))){
connectedToPubPlace = true;
connectedPubPlaceId = place.getId();
}
}
}
BalloonMarkedPlace bmp=this.resolveBalloonPlace(place);
org.ros.node.topic.Publisher<std_msgs.String> pub = node.getPublisherByPlaceId(place.getId());
......@@ -96,7 +123,13 @@ aspect BalloonExecution{
this.flushTreeCache();
}
node.notify(node.NOTIFICATION_MARKING_CHANGE);
if(connectedToReqPlace){
node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ, connectedReqPlaceId));
} else if(connectedToPubPlace){
node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB, connectedPubPlaceId));
} else {
node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE));
}
}
return Optional.of(this);
......
......@@ -21,43 +21,30 @@ public class Main {
public static void main(java.lang.String[] args) {
// topicTest();
topicTest();
// testBaseNet();
// testServiceNets();
isTest();
// isTest();
}
@SuppressWarnings("unused")
private static void testBaseNet() {
java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/NodeNetTest.pnml";
PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0);
nodeConfiguration.setMasterUri(URI.create("http://localhost:11311"));
DinerosTestNode diNeRosNode = new DinerosTestNode("TestNode", petriNet);
for(BalloonTransition bt : diNeRosNode.callbackStorage.getTransitionList()){
bt.getBalloonCallbacks().add(new DefaultFinalTransitionHandler("default_final_cb", 1));
private static void topicTest() {
doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetTopicSubTest.pnml");
}
new Thread(() -> nodeMainExecutor.execute(diNeRosNode, nodeConfiguration)) {{
start();
}};
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
@SuppressWarnings("unused")
private static void testBaseNet() {
doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetTest.pnml");
}
@SuppressWarnings("unused")
private static void isTest() {
doTest("../pnml-relast-engine/src/main/resources/nets/NodeNetSignalTest.pnml");
}
java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/NodeNetSignalTest.pnml";
private static void doTest(String pnmlPath) {
PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0);
nodeConfiguration.setMasterUri(URI.create("http://localhost:11311"));
......@@ -107,63 +94,6 @@ public class Main {
.dumpAsPNG(Paths.get(filename + ".png"));
}
@SuppressWarnings("unused")
private static void isTest() throws IOException {
java.lang.String pnmlPath = "../pnml-relast-engine/src/main/resources/nets/is-test-2.pnml";
PetriNet petriNet = PnmlParser.parsePnml(pnmlPath).get(0);
System.out.println("Initial Signal Values: ");
for(Transition t : petriNet.allTransitions()){
for(InputSignalBinding staticIsb : t.asInputSignalTransition().getStaticInputSignalBindingList()){
System.out.println(staticIsb.getInputSignalID() + " : " + staticIsb.getInputSignalValue());
}
}
PetriNetInitializer.initInputSignalConnections(petriNet, "localhost", "mqtt");
try {
System.out.println("Sleeping for 10s");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
BalloonMarking bm = null;
try {
bm = petriNet.initializeBalloonMarking();
} catch (IOException | SAXException | ParserConfigurationException e) {
e.printStackTrace();
}
BalloonCallbackStorage bcs = petriNet.initializeCallbackStorage();
TransitionCallback tc = new DefaultFinalTransitionCallback("default_final_cb", 1);
List<TransitionCallback> tcl = new ArrayList<>();
tcl.add(tc);
for(Transition t : petriNet.allTransitions()){
bcs.resolveBalloonTransition(t).setBalloonCallbacks(tcl);
}
System.out.println("-- INITIAL MARKING --");
assert bm != null;
System.out.println(bm.print());
for(Transition t : petriNet.allTransitions()){
bm.fireTransition(t, bcs, true);
}
System.out.println("Updated Signal Values: ");
for(Transition t : petriNet.allTransitions()){
for(InputSignalBinding mutualLsb : t.asInputSignalTransition().getMutualInputSignalBindingList()){
System.out.println(mutualLsb.getInputSignalID() + " : " + mutualLsb.getInputSignalValue());
}
}
System.out.println("-- FINAL MARKING --");
System.out.println(bm.print());
}
@SuppressWarnings("unused")
private static void testServiceNets() {
......
package de.tudresden.inf.st.pnml.engine.event;
public class DiNeRosEvent {
public String eventType;
public String payload;
public DiNeRosEvent(String eventType) {
this(eventType, null);
}
public DiNeRosEvent(String eventType, String payload) {
if(eventType.equals(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE) ||
eventType.equals(DiNeRosEventTypes.NOTIFICATION_WAIT_ENDED) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED) ||
eventType.equals(DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ) || eventType.equals(DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB)) {
this.eventType = eventType;
} else {
throw new RuntimeException("Invalid event type used!");
}
this.payload = payload;
}
}
package de.tudresden.inf.st.pnml.engine.event;
public class DiNeRosEventTypes {
public static final String NOTIFICATION_MARKING_CHANGE = "markingChange";
public static final String NOTIFICATION_SIGNAL_CHANGE = "signalChange";
public static final String NOTIFICATION_WAIT_ENDED = "waitEnded";
public static final String NOTIFICATION_STARTUP_ENDED = "startEnded";
public static final String NOTIFICATION_SERVICE_REQ = "serviceReq";
public static final String NOTIFICATION_TOPIC_PUB = "topicPub";
}
......@@ -2,16 +2,25 @@ package de.tudresden.inf.st.pnml.engine.ros;
import de.tudresden.inf.st.pnml.base.constants.PnmlConstants;
import de.tudresden.inf.st.pnml.base.data.ClauseValuesDefinition;
import de.tudresden.inf.st.pnml.engine.event.DiNeRosEvent;
import de.tudresden.inf.st.pnml.engine.event.DiNeRosEventTypes;
import de.tudresden.inf.st.pnml.engine.transform.PetriNetInitializer;
import de.tudresden.inf.st.pnml.jastadd.model.*;
import org.jetbrains.annotations.NotNull;
import org.ros.concurrent.CancellableLoop;
import org.ros.exception.RemoteException;
import org.ros.exception.ServiceNotFoundException;
import org.ros.namespace.GraphName;
import org.ros.node.AbstractNodeMain;
import org.ros.node.ConnectedNode;
import org.ros.node.service.ServiceClient;
import org.ros.node.service.ServiceResponseListener;
import org.ros.node.topic.Publisher;
import org.ros.node.topic.Subscriber;
import org.xml.sax.SAXException;
import rosjava_srv.StringService;
import rosjava_srv.StringServiceRequest;
import rosjava_srv.StringServiceResponse;
import std_msgs.String;
import javax.xml.parsers.ParserConfigurationException;
......@@ -21,11 +30,6 @@ import java.util.concurrent.TimeUnit;
public abstract class DiNeRosNode extends AbstractNodeMain {
public static final java.lang.String NOTIFICATION_MARKING_CHANGE = "markingChange";
public static final java.lang.String NOTIFICATION_SIGNAL_CHANGE = "signalChange";
public static final java.lang.String NOTIFICATION_WAIT_ENDED = "waitEnded";
public static final java.lang.String NOTIFICATION_STARTUP_ENDED = "startEnded";
public final java.lang.String nodeName;
public final PetriNet petriNet;
protected BalloonMarking marking;
......@@ -79,27 +83,92 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
this.connectedNode.shutdown();
}
public synchronized void notify(java.lang.String notificationType) {
public synchronized void notify(DiNeRosEvent event) {
Set<Transition> signalFilteredTransitions = getSignalFilteredTransitions();
switch (notificationType) {
case NOTIFICATION_MARKING_CHANGE:
switch (event.eventType) {
case DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE:
onMarkingChangeInternal(signalFilteredTransitions);
break;
case NOTIFICATION_SIGNAL_CHANGE:
case DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE:
onSignalChangeInternal(signalFilteredTransitions);
break;
case NOTIFICATION_WAIT_ENDED:
case DiNeRosEventTypes.NOTIFICATION_WAIT_ENDED:
onWaitEndedInternal(signalFilteredTransitions);
break;
case NOTIFICATION_STARTUP_ENDED:
case DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED:
onStartupEndedInternal(signalFilteredTransitions);
break;
case DiNeRosEventTypes.NOTIFICATION_SERVICE_REQ:
onServiceRequestAvailableOnClientSide(event.payload);
break;
case DiNeRosEventTypes.NOTIFICATION_TOPIC_PUB:
onTopicPublisherAvailable(event.payload);
break;
}
}
private void onTopicPublisherAvailable(java.lang.String placeId){
java.lang.String[] split = placeId.split("-");
java.lang.String originalId = split[0];
java.lang.String topicName = petriNet.getPortNameByPlaceId(originalId);
Place place = petriNet.getPlaceById(placeId);
BalloonMarkedPlace bmp = this.marking.resolveBalloonPlace(place);
RosCommunicationUtil.publish(topicName, bmp.getBalloonMarking(0).getValue().getBytes(),
this.connectedNode, getPublisherByPlaceId(placeId));
bmp.getBalloonMarking(0).removeSelf();
petriNet.flushTreeCache();
this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE));
}
private void onServiceRequestAvailableOnClientSide(java.lang.String placeId){
ServiceClient<StringServiceRequest, StringServiceResponse> serviceClient;
// crop name if place is within a server instance
// Naming pattern is here: original PlaceID + "-INSTANCE-" + instanceId
java.lang.String[] split = placeId.split("-");
java.lang.String originalId = split[0];
java.lang.String serviceName = petriNet.getPortNameByPlaceId(originalId);
java.lang.String cResPlaceId = petriNet.getServiceClientResponsePlaceId(originalId) + "-INSTANCE-" + placeId.split("-")[split.length-1];
Place targetPlace = petriNet.getPlaceById(cResPlaceId);
Place sourcePlace = petriNet.getPlaceById(placeId);
BalloonMarkedPlace bmpSource = this.marking.resolveBalloonPlace(sourcePlace);
BalloonMarkedPlace bmpTarget = this.marking.resolveBalloonPlace(targetPlace);
try {
serviceClient = connectedNode.newServiceClient(serviceName, StringService._TYPE);
final StringServiceRequest request = serviceClient.newMessage();
request.setInput(bmpSource.getBalloonMarking(0).getValue());
serviceClient.call(request, new ServiceResponseListener<>() {
@Override
public void onSuccess(StringServiceResponse stringServiceResponse) {
bmpSource.getBalloonMarking(0).removeSelf();
bmpTarget.addBalloonMarking(new BalloonToken(stringServiceResponse.getOutput()));
}
@Override
public void onFailure(RemoteException e) {
System.err.println("Error while calling: token will not be removed!");
}
});
serviceClient.shutdown();
this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE));
} catch (ServiceNotFoundException e) {
System.err.println("Error while calling service. ");
}
}
@NotNull
protected Set<Transition> getSignalFilteredTransitions() {
Set<Transition> signalFilteredTransitions = new HashSet<>();
......@@ -130,13 +199,12 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
return signalFilteredTransitions;
}
@NotNull
private TransitionSelectionResult getTransitionSelectionResult(TransitionSelectionResult res) {
private void getTransitionSelectionResult(TransitionSelectionResult res) {
if (res.isFiringSelectionWait()) {
try {
TimeUnit.MILLISECONDS.sleep(res.asFiringSelectionWait().getWaitingTime());
onWaitEndedInternal(getSignalFilteredTransitions());
return res;
return;
} catch (InterruptedException e) {
e.printStackTrace();
}
......@@ -144,14 +212,13 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
if (res.isFiringSelectionFail()) {
System.err.println("[DiNeROS-Node] [" + nodeName + "] Firing selection action failed!");
return res;
return;
}
System.out.println("FIRING TRANSITION");
marking.fireTransition(res.asFiringSelectionSuccess()
.getTransition(), callbackStorage, inputSignalConnector, this, true);
return res;
}
private void onStartupEndedInternal(Set<Transition> enabledTransitions) {
......@@ -208,9 +275,6 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
System.out.println("[INIT NODE] Initializing node: " + nodeName);
// init service clients
// TODO
// init service servers (deep copies)
// TODO
......@@ -243,6 +307,7 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
BalloonMarkedPlace bmp= marking.resolveBalloonPlace(targetPlace);
bmp.getBalloonMarkingList().add(bt);
petriNet.flushTreeCache();
this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_MARKING_CHANGE));
}, petriNet.getChannelElementLimitById(placeId));
......@@ -251,14 +316,7 @@ public abstract class DiNeRosNode extends AbstractNodeMain {
}
System.out.println("[INIT NODE] Executing first marking query.");
// TransitionSelectionResult firstFiringResult =
this.notify(NOTIFICATION_STARTUP_ENDED);
/* if(firstFiringResult.isFiringSelectionSuccess()){
marking.fireTransition(firstFiringResult.asFiringSelectionSuccess()
.getTransition(), callbackStorage, inputSignalConnector, this,true);
}*/
this.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_STARTUP_ENDED));
this.internalNodeLoop();
}
......
......@@ -15,10 +15,13 @@ import org.ros.message.MessageListener;
public class RosCommunicationUtil {
public static boolean publish(java.lang.String topic, byte[] msgContent, ConnectedNode node){
public static boolean publish(java.lang.String topic, byte[] msgContent, ConnectedNode node, Publisher<std_msgs.String> pub){
System.out.println("Publishing new message to " + topic);
Publisher<std_msgs.String> pub = node.newPublisher(topic, std_msgs.String._TYPE);
if(pub == null){
pub = node.newPublisher(topic, std_msgs.String._TYPE);
}
if(pub == null){
while(true){
......@@ -36,19 +39,4 @@ public class RosCommunicationUtil {
}
return false;
}
public static Subscriber listen(java.lang.String topic, ConnectedNode node, Transition t, PetriNet pn, BalloonMarking bm){
Subscriber<String> subscriber = node.newSubscriber(topic, std_msgs.String._TYPE);
// subscriber.addMessageListener(message -> callback.accept(topic, message.getData().getBytes()));
subscriber.addMessageListener(message -> {
});
return subscriber;
}
}
package de.tudresden.inf.st.pnml.engine.transform;
import de.tudresden.inf.st.pnml.engine.event.DiNeRosEvent;
import de.tudresden.inf.st.pnml.engine.event.DiNeRosEventTypes;
import de.tudresden.inf.st.pnml.engine.ros.DiNeRosNode;
import de.tudresden.inf.st.pnml.jastadd.model.*;
......@@ -100,7 +102,7 @@ public class PetriNetInitializer {
public static void notifyOnSignalChange(DiNeRosNode node){
System.out.println("NOTIFY ON SIGNAL CHANGE");
node.notify(DiNeRosNode.NOTIFICATION_SIGNAL_CHANGE);
node.notify(new DiNeRosEvent(DiNeRosEventTypes.NOTIFICATION_SIGNAL_CHANGE));
}
public void init(PetriNet petriNet, String host, String protocol, InputSignalConnector isc, DiNeRosNode node){
......
<?xml version="1.0" encoding="UTF-8"?>
<pnml
xmlns="http://www.pnml.org/version-2009/grammar/pnml">
<net id="NodeNetTest-0" type="http://www.pnml.org/version-2009/grammar/ptnet">
<name>
<text>NodeNetTest-2</text>
</name>
<toolspecific tool="de.tudresden.inf.st.pnml.distributedPN" version="0.1">
<ports>
<port name="topicP1" placeType="sub" limit="5">p1</port>
<port name="serviceP1" placeType="creq" cResponsePlace="p2">p1</port>
<port name="serviceP1" placeType="cres" cRequestPlace="p1">p1</port>
</ports>
</toolspecific>
<page id="top">
<page id="sourcePage">
<place id="p1">
<toolspecific tool="de.tudresden.inf.st.pnml.distributedPN" version="0.1">
<node>n1</node>
<subnet>s1</subnet>
<balloonMarking>
<tokens>
</tokens>
</balloonMarking>
</toolspecific>
<initialMarking>
<text>0</text>
</initialMarking>
<name>
<text>p1</text>
<graphics>
<offset x="0" y="0" />
</graphics>
</name>
<graphics>
<position x="0" y="0" />
</graphics>
</place>
</page>
</page>
</net>
</pnml>
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment