Commit b36d4395 authored by liorpeak's avatar liorpeak
Browse files

query update

parent ea0898d9
......@@ -3,11 +3,9 @@ package eu.larkc.csparql.sr4ld2014;
import java.awt.geom.GeneralPath;
import java.util.Formattable;
import java.util.Formatter;
import org.apache.log4j.PropertyConfigurator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import eu.larkc.csparql.common.utils.CsparqlUtils;
import eu.larkc.csparql.common.utils.ReasonerChainingType;
import eu.larkc.csparql.core.engine.ConsoleFormatter;
......@@ -32,55 +30,66 @@ public class Example {
CsparqlEngineImpl engine = new CsparqlEngineImpl();
//Initialize the engine instance
//The initialization creates the static engine (SPARQL) and the stream engine (CEP)
//engine.initialize();
//engine.initialize(); true for enable timestamp function
engine.initialize(true);
/*String queryBody = "REGISTER STREAM IsInFs AS "
+ "PREFIX : <http://www.streamreasoning.org/ontologies/sr4ld2014-onto#> "
+ "CONSTRUCT { ?person :isIn ?room } "
+ "FROM STREAM <http://streamreasoning.org/streams/fs> [RANGE 10s STEP 1s] "
String queryBody = "REGISTER QUERY reasoning AS "
+ "PREFIX :<http://onto#> "
+ "PREFIX f: <http://larkc.eu/csparql/sparql/jena/ext#> "
//+ "CONSTRUCT { ?o1 :isInSituation [a :situationCODE1234] } "
+ "SELECT ?o1 ?s2 ?average "
+ "FROM STREAM <sensorTM2> [RANGE 5s TUMBLING] " //STEP 5s
+ "FROM STREAM <sensorTM1> [RANGE 5s TUMBLING] "
+ "WHERE { "
+ "?person :posts [ :who ?person ; :where ?room ] "
+ "}";
*/
+ " :sensorTM1 :madeObservation ?o1 ." // ?s instead of :sensorTM1 for both cases
+ " ?o1 :hasSimpleResult ?p1 ."
+ " ?o1 :hasTime ?r1 . "
//+ " ?s2 :madeObservation ?o2 ."
+ "{ SELECT ?s2 ( avg(?p2) AS ?average ) "
+ " WHERE { "
+ " ?s2 :madeObservation ?o2 ." //
+ " ?o2 :hasSimpleResult ?p2 ."
+ " ?o2 :hasTime ?r2 ."
+ " } "
+ " GROUP BY ?s2"
+ " HAVING ( avg(?p2) > 2 ) "
+ "} "
+ "FILTER ( ?s2 != :sensorTM1 ) . "
//+ "FILTER (f:timestamp(:sensorTM1,:madeObservation,?o1) < f:timestamp(?s1,:madeObservation,?o2)"
//+ " && ?p > 1 && ?p1 > 2 && ?s1 != :sensorTM1 ). "
+ " } ";
//+ "GROUP BY ?s1 "
//+ "HAVING ( avg(?p1) > 1 ) ";
/*
String queryBody = "REGISTER QUERY reasoning AS "
+ "PREFIX :<http://onto#> "
+ "PREFIX f: <http://larkc.eu/csparql/sparql/jena/ext#> "
//+ "CONSTRUCT { ?s :isIn ?p } "
+ "SELECT ?o1 ?o2 "
//+ "FROM STREAM <http://streamreasoning.org/streams/fb> [RANGE 1s STEP 1s] "
+ "FROM STREAM <http://streamreasoning.org/streams/sensors> [RANGE 5s STEP 5s] "
+ "FROM STREAM <http://streamreasoning.org/streams/sensors1> [RANGE 5s STEP 5s] "
//+ "WHERE { "
//+ "GRAPH stream1 {"
//+ "?s2 :madeObservation ?o2"
//+ "} UNION "
//+ "GRAPH stream2 {"
//+ "?s2 :madeObservation ?o2"
//+ "}"
+ "FROM STREAM <sensorTM2> [RANGE 5s TUMBLING] "
+ "FROM STREAM <sensorTM1> [RANGE 5s TUMBLING] "
+ "WHERE { "
+ "{ ?s :madeObservation ?o1 ."
+ "{ :sensorTM1 :madeObservation ?o1 ." // ?s instead of :sensorTM1 for both cases
+ " ?o1 :hasSimpleResult ?p ."
+ " ?o1 :hasTime ?r . "
+ " ?s1 :madeObservation ?o2 ."
+ " ?o2 :hasSimpleResult ?p1 ."
+ " ?o2 :hasTime ?r1 ."
+ "FILTER (f:timestamp(?s,:madeObservation,?o1) < f:timestamp(?s1,:madeObservation,?o2) && ?p > 1 && ?p1 > 2 && ?s != ?s1 ). }"
//+ "UNION " f:timestamp(?s,:madeObservation,?o1) < f:timestamp(?s1,:madeObservation,?o2)
//+ "{ ?s1 :madeObservation [ :hasSimpleResult ?p1 ; :hasTime ?r1 ] "
//+ "FILTER (?p1 > 3 && ?s != ?s1). }"
+ "FILTER (f:timestamp(:sensorTM1,:madeObservation,?o1) < f:timestamp(?s1,:madeObservation,?o2)"
+ " && ?p > 1 && ?p1 > 2 && ?s1 != :sensorTM1 ). }"
+ "} ";
//FacebookStreamer fb = new FacebookStreamer("http://streamreasoning.org/streams/fb", "http://www.streamreasoning.org/ontologies/sr4ld2014-onto#", 1000L);
SensorsStreamer1 s1 = new SensorsStreamer1("http://streamreasoning.org/streams/sensors1", "http://onto#", 1000L);
SensorsStreamer s = new SensorsStreamer("http://streamreasoning.org/streams/sensors", "http://onto#", 10000L);
*/
SensorsStreamer1 streamSTM1 = new SensorsStreamer1("sensorTM1", "http://onto#", 1000L);
SensorsStreamer streamSTM2 = new SensorsStreamer("sensorTM2", "http://onto#", 100L);
//Register new streams in the engine
engine.registerStream(s);
engine.registerStream(s1);
//Thread fbThread = new Thread(fb);
Thread sThread = new Thread(s);
Thread s1Thread = new Thread(s1);
engine.registerStream(streamSTM2);
engine.registerStream(streamSTM1);
Thread streamSTM2Thread = new Thread(streamSTM2);
Thread streamSTM1Thread = new Thread(streamSTM1);
//Register new query in the engine
CsparqlQueryResultProxy c = engine.registerQuery(queryBody, false);
......@@ -89,8 +98,8 @@ public class Example {
c.addObserver(new ConsoleFormatter());
//Start streaming data
sThread.start();
s1Thread.start();
streamSTM2Thread.start();
streamSTM1Thread.start();
engine.updateReasoner(c.getSparqlQueryId(), CsparqlUtils.fileToString("examples_files/rdfs.rules"), ReasonerChainingType.FORWARD, CsparqlUtils.serializeRDFFile("examples_files/tbox.rdf"));
......
/*******************************************************************************
* Copyright 2014 Davide Barbieri, Emanuele Della Valle, Marco Balduini
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Acknowledgements:
*
* This work was partially supported by the European project LarKC (FP7-215535)
* and by the European project MODAClouds (FP7-318484)
******************************************************************************/
package eu.larkc.csparql.sr4ld2014.streamer;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import eu.larkc.csparql.cep.api.RdfQuadruple;
import eu.larkc.csparql.cep.api.RdfStream;
......@@ -53,13 +34,33 @@ public class SensorsStreamer extends RdfStream implements Runnable {
Timestamp date = new Timestamp(System.currentTimeMillis());
//observationIndex = random.nextInt(Integer.MAX_VALUE);
RdfQuadruple q = new RdfQuadruple(baseUri + "Nacelle", baseUri + "hosts", baseUri + "sensorTempNacelle", System.currentTimeMillis());
RdfQuadruple q = new RdfQuadruple(baseUri + "M2", baseUri + "hosts", baseUri + "sensorTM2", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "sensorTempNacelle", baseUri + "madeObservation", baseUri + "obs" + observationIndex, System.currentTimeMillis());
q = new RdfQuadruple(baseUri + "sensorTM2", baseUri + "madeObservation", baseUri + "obsTM2-" + observationIndex, System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obs" + observationIndex, baseUri + "observedProperty", baseUri + "Nacelle_Temp", System.currentTimeMillis());
q = new RdfQuadruple(baseUri + "obsTM2-" + observationIndex, baseUri + "observedProperty", baseUri + "Px", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obsTM2-" + observationIndex, baseUri + "hasSimpleResult", result + "^^http://www.w3.org/2001/XMLSchema#integer", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obsTM2-" + observationIndex, baseUri + "hasTime", baseUri + "t-obsTM2-" + timeIndex, System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "t-obsTM2-" + timeIndex, baseUri + "inXSDDateTime", date + "^^http://www.w3.org/2001/XMLSchema#dateTimeStamp", System.currentTimeMillis());
System.out.println(q);
this.put(q);
/*
RdfQuadruple q = new RdfQuadruple(baseUri + "theLinnansuoTower", baseUri + "hosts", baseUri + "theLinnansuoLI-7500", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "theLinnansuoLI-7500", baseUri + "madeObservation", baseUri + "obs" + observationIndex, System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obs" + observationIndex, baseUri + "observedProperty", baseUri + "moleFractionCO2", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obs" + observationIndex, baseUri + "hasSimpleResult", result + "^^http://www.w3.org/2001/XMLSchema#integer", System.currentTimeMillis());
......@@ -71,11 +72,13 @@ public class SensorsStreamer extends RdfStream implements Runnable {
q = new RdfQuadruple(baseUri + "T" + timeIndex, baseUri + "inXSDDateTime", date + "^^http://www.w3.org/2001/XMLSchema#dateTimeStamp", System.currentTimeMillis());
System.out.println(q);
this.put(q);
*/
observationIndex++;
timeIndex++;
Thread.sleep(sleepTime);
TimeUnit.SECONDS.sleep(1);
//Thread.sleep(sleepTime);
} catch(Exception e){
e.printStackTrace();
......
package eu.larkc.csparql.sr4ld2014.streamer;
import java.sql.Timestamp;
import java.time.ZonedDateTime;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import eu.larkc.csparql.cep.api.RdfQuadruple;
import eu.larkc.csparql.cep.api.RdfStream;
public class SensorsStreamer1 extends RdfStream implements Runnable {
private long sleepTime;
private String baseUri;
public SensorsStreamer1(String iri, String baseUri,long sleepTime) {
super(iri);
this.sleepTime = sleepTime;
this.baseUri = baseUri;
}
public void run() {
Random random = new Random();
int result;
int observationIndex = 0;
int timeIndex = 0;
while(true){
try{
result = random.nextInt(5);
//ZonedDateTime zdt = ZonedDateTime.now();
//java.util.Date date = java.util.Date.from( zdt.toInstant() );
Timestamp date = new Timestamp(System.currentTimeMillis());
//observationIndex = random.nextInt(Integer.MAX_VALUE);
RdfQuadruple q = new RdfQuadruple(baseUri + "M1", baseUri + "hosts", baseUri + "sensorTM1", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "sensorTM1", baseUri + "madeObservation", baseUri + "obsTM1-" + observationIndex, System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obsTM1-" + observationIndex, baseUri + "observedProperty", baseUri + "Py", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obsTM1-" + observationIndex, baseUri + "hasSimpleResult", result + "^^http://www.w3.org/2001/XMLSchema#integer", System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "obsTM1-" + observationIndex, baseUri + "hasTime", baseUri + "t-obsTM1-" + timeIndex, System.currentTimeMillis());
System.out.println(q);
this.put(q);
q = new RdfQuadruple(baseUri + "t-obsTM1-" + timeIndex, baseUri + "inXSDDateTime", date + "^^http://www.w3.org/2001/XMLSchema#dateTimeStamp", System.currentTimeMillis());
System.out.println(q);
this.put(q);
observationIndex++;
timeIndex++;
TimeUnit.SECONDS.sleep(5);
//Thread.sleep(sleepTime);
} catch(Exception e){
e.printStackTrace();
}
}
}
}
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment