diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/core/Engine.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/core/Engine.java index a4e27bd6..1d95594e 100644 --- a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/core/Engine.java +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/core/Engine.java @@ -42,11 +42,15 @@ import it.unibo.arces.wot.sepa.engine.protocol.websocket.WebsocketServer; import it.unibo.arces.wot.sepa.engine.protocol.http.HttpGate; import it.unibo.arces.wot.sepa.engine.protocol.http.HttpsGate; +import it.unibo.arces.wot.sepa.engine.protocol.tcp.ServerPrincipaleQuery; +import it.unibo.arces.wot.sepa.engine.protocol.tcp.ServerPrincipaleSubscribe; import it.unibo.arces.wot.sepa.engine.protocol.websocket.SecureWebsocketServer; import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; import it.unibo.arces.wot.sepa.engine.security.AuthorizationManager; +import net.minidev.json.parser.JSONParser; + /** * This class represents the SPARQL Subscription Engine (Core) of the Semantic @@ -70,7 +74,7 @@ public class Engine implements EngineMBean { // SPARQL 1.1 Protocol handler private HttpGate httpGate = null; - + // SPARQL 1.1 SE Protocol handler private WebsocketServer wsServer; private SecureWebsocketServer wssServer; @@ -236,6 +240,7 @@ public Engine(String[] args) throws SEPASecurityException, SEPAProtocolException } } + wssServer.start(); synchronized(wssServer) { try { @@ -244,6 +249,15 @@ public Engine(String[] args) throws SEPASecurityException, SEPAProtocolException throw new SEPAProtocolException(e); } } + + ServerPrincipaleQuery queryServer = new ServerPrincipaleQuery(scheduler); //da spostare sopra + ServerPrincipaleSubscribe subscribeServer = new ServerPrincipaleSubscribe(scheduler); + + + queryServer.start(); + subscribeServer.start(); + + System.out.println("----------------------"); // Welcome message diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/HttpGate.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/HttpGate.java index fdf1e295..0dd21e05 100644 --- a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/HttpGate.java +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/HttpGate.java @@ -6,7 +6,6 @@ import java.util.concurrent.TimeUnit; import org.apache.http.ExceptionLogger; - import org.apache.http.impl.nio.bootstrap.HttpServer; import org.apache.http.impl.nio.bootstrap.ServerBootstrap; import org.apache.http.impl.nio.reactor.IOReactorConfig; @@ -20,6 +19,7 @@ import it.unibo.arces.wot.sepa.engine.protocol.http.handler.QueryHandler; import it.unibo.arces.wot.sepa.engine.protocol.http.handler.UpdateHandler; import it.unibo.arces.wot.sepa.engine.protocol.http.handler.EchoHandler; +import it.unibo.arces.wot.sepa.engine.protocol.http.handler.LinkedDataNotificationServlet; import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; public class HttpGate { @@ -41,7 +41,24 @@ public HttpGate(EngineProperties properties, Scheduler scheduler) throws SEPAPro .setServerInfo(serverInfo).setIOReactorConfig(config).setExceptionLogger(ExceptionLogger.STD_ERR) .registerHandler(properties.getQueryPath(), new QueryHandler(scheduler)) .registerHandler(properties.getUpdatePath(), new UpdateHandler(scheduler)) - .registerHandler("/echo", new EchoHandler()).create(); + .registerHandler("/echo", new EchoHandler()) + .registerHandler("/ldnServlet/*", new LinkedDataNotificationServlet(scheduler)) + .registerHandler("/ldnServlet", new LinkedDataNotificationServlet(scheduler)).create(); //aggiunta + + + + + + //qui mappo la servlet che dovrò scrivere..gli passo uno scheduler + //la mia servlet dovrà implementare implements HttpAsyncRequestHandler + //devo invocare i metodi dello scheduler (vedi serverSecondario per sapere quali metodi) + //devo creare un mio handler con un parametro: httpExchange + //invio lo spuid + //devo controllare che sia una get(vedi QueryHandler per vedere come fare) + //la stringa che dovrò creare sarà così composta: + //http://localhost/nomeCheVoglio/richiestaPresaDallUtente + //uso postman per fare query: metto come http: localhost:8000/stringaConCuiHoMappatoLaServlet + //il file engine.jar contiene l'elenco delle porte try { server.start(); diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/LinkedDataNotificationServlet.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/LinkedDataNotificationServlet.java new file mode 100644 index 00000000..588ac9e8 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/LinkedDataNotificationServlet.java @@ -0,0 +1,97 @@ +package it.unibo.arces.wot.sepa.engine.protocol.http.handler; + +import java.io.IOException; +import java.net.URLDecoder; +import java.util.HashMap; +import java.util.Map; +import java.util.StringTokenizer; + +import org.apache.http.HttpException; +import org.apache.http.HttpRequest; +import org.apache.http.HttpStatus; +import org.apache.http.nio.protocol.BasicAsyncRequestConsumer; +import org.apache.http.nio.protocol.HttpAsyncExchange; +import org.apache.http.nio.protocol.HttpAsyncRequestConsumer; +import org.apache.http.nio.protocol.HttpAsyncRequestHandler; +import org.apache.http.protocol.HttpContext; + +import it.unibo.arces.wot.sepa.commons.request.QueryRequest; +import it.unibo.arces.wot.sepa.commons.request.SubscribeRequest; +import it.unibo.arces.wot.sepa.engine.protocol.http.HttpUtilities; +import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; + +public class LinkedDataNotificationServlet implements HttpAsyncRequestHandler{ + + private Scheduler scheduler; + + public LinkedDataNotificationServlet(Scheduler scheduler) { + this.scheduler = scheduler; + } + + @Override + public HttpAsyncRequestConsumer processRequest(HttpRequest request, HttpContext context) + throws HttpException, IOException { + return new BasicAsyncRequestConsumer(); + } + + @Override + public void handle(HttpRequest data, HttpAsyncExchange httpExchange, HttpContext context) + throws HttpException, IOException { + + String query = ""; + + if(httpExchange.getRequest().getRequestLine().getMethod().toUpperCase().equals("GET")) { + System.out.println("LINE: " + httpExchange.getRequest().getRequestLine().toString()); + String url = httpExchange.getRequest().getRequestLine().getUri(); + StringTokenizer token = new StringTokenizer(url, " "); + String path = token.nextToken(); ///ldnServlet?subscribe=select * where { ?a ?b ?c } + path = URLDecoder.decode(path, "UTF-8"); + int i = path.indexOf('?'); + + query = i > 0 ? path.substring(i+1) : "" ; + + Map parseQuery = LinkedDataNotificationServlet.getQueryMap(query); + + //se è una subscribe + if(parseQuery.containsKey("subscribe")){ + query = parseQuery.get("subscribe"); //subscribe=select * where { ?a ?b ?c } + + SubscribeRequest subscribeRequest = new SubscribeRequest(query); + scheduler.schedule(subscribeRequest, new ldnHandler(httpExchange)); + + } else { //case is not subscribe + + String[] split = path.split("\\/"); + String spuid = split[2]; //regex + int numeroNotifica = split.length > 3 ? Integer.parseInt(split[3]) : -1; + + QueryRequest queryRequest = new QueryRequest(query); + scheduler.schedule(queryRequest, new ReadLdnHandler(httpExchange, spuid, numeroNotifica)); + + } + + } else { + HttpUtilities.sendFailureResponse(httpExchange, HttpStatus.SC_BAD_REQUEST, "Wrong format: " + httpExchange.getRequest().getRequestLine()); + return; + } + + } + + public static Map getQueryMap(String query) + { + Map map = new HashMap(); + if(!query.isEmpty()){ + String[] params = query.split("&"); + for (String param : params) + { + String name = param.split("=")[0]; + System.out.println("Key: " + name); + String value = param.split("=")[1]; + System.out.println("Value: " + value); + map.put(name, value); + } + } + return map; + } + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ReadLdnHandler.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ReadLdnHandler.java new file mode 100644 index 00000000..0ce69bb3 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ReadLdnHandler.java @@ -0,0 +1,119 @@ +package it.unibo.arces.wot.sepa.engine.protocol.http.handler; + +import java.io.FileReader; +import java.io.IOException; +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpVersion; +import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHttpResponse; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.nio.protocol.BasicAsyncResponseProducer; +import org.apache.http.nio.protocol.HttpAsyncExchange; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; + +import it.unibo.arces.wot.sepa.commons.response.Notification; +import it.unibo.arces.wot.sepa.commons.response.Ping; +import it.unibo.arces.wot.sepa.commons.response.Response; +import it.unibo.arces.wot.sepa.engine.core.EventHandler; + +public class ReadLdnHandler implements EventHandler { + + private HttpAsyncExchange exchange; + private String spuid; + private int numeroNotifica; + + private static String FILENAME = "/Users/AlessioPazzani/Documents/Tesi/FileSpuid/"; + + public ReadLdnHandler(HttpAsyncExchange httpExchange, String spuid, int numeroNotifica) throws IllegalArgumentException { + this.exchange = httpExchange; + this.spuid = spuid; + this.numeroNotifica = numeroNotifica; + } + + @Override + public void sendResponse(Response response) throws IOException { + + if(numeroNotifica <0){ //localhost:8000/ldnServlet/spuid + + System.out.println("E' stato richiesto di visionare un particolare file"); + + String fileName = FILENAME + spuid; + Type listType = new TypeToken>(){}.getType(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonReader jsonReader = new JsonReader(new FileReader(fileName)); + List jsonList = gson.fromJson(jsonReader, listType); + + System.out.println(gson.toJson(jsonList)); //stampo l'intero file letto + + ResponseListingOfNotification listOfNotification = new ResponseListingOfNotification(); + List contains = new ArrayList(); + System.out.println("Dimensione JSONList: " + jsonList.size()); + for (int i=0; i>(){}.getType(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonReader jsonReader = new JsonReader(new FileReader(fileName)); + List jsonList = gson.fromJson(jsonReader, listType); + + if(!jsonList.isEmpty()){ //se il file con nome spuid esiste + + Object notifica = jsonList.get(numeroNotifica); + System.out.println("Notifica: \n\n" + notifica.toString()); + String result = gson.toJson(notifica); + NStringEntity entity = new NStringEntity( + result, + ContentType.create("application/json-ld", "UTF-8")); + HttpResponse res = new BasicHttpResponse(new BasicStatusLine(HttpVersion.HTTP_1_0, 200, "OK")); + res.setEntity(entity); + + exchange.submitResponse(new BasicAsyncResponseProducer(res)); + } + + } //else + + } //sendResponse + + @Override + public void notifyEvent(Notification notify) throws IOException { + // TODO Auto-generated method stub + + } + + @Override + public void sendPing(Ping ping) throws IOException { + // TODO Auto-generated method stub + + } + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ResponseListingOfNotification.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ResponseListingOfNotification.java new file mode 100644 index 00000000..b0ffb655 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ResponseListingOfNotification.java @@ -0,0 +1,45 @@ +package it.unibo.arces.wot.sepa.engine.protocol.http.handler; + +import java.util.ArrayList; +import java.util.List; + +import com.google.gson.annotations.SerializedName; + +public class ResponseListingOfNotification { + + + @SerializedName(value="@context") + private String context; + @SerializedName(value="@id") + private String id; + private List contains; + + public ResponseListingOfNotification(){ + contains = new ArrayList(); + } + + public String getContext() { + return context; + } + + public void setContext(String context) { + this.context = context; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public List getContains() { + return contains; + } + + public void setContains(List contains) { + this.contains = contains; + } + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/SPARQL11Handler.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/SPARQL11Handler.java index a01542a4..a781b989 100644 --- a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/SPARQL11Handler.java +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/SPARQL11Handler.java @@ -26,7 +26,7 @@ public abstract class SPARQL11Handler implements HttpAsyncRequestHandler, SPARQL11HandlerMBean { private static final Logger logger = LogManager.getLogger("SPARQL11Handler"); - private Scheduler scheduler; + Scheduler scheduler; protected HTTPHandlerBeans jmx = new HTTPHandlerBeans(); diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ldnHandler.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ldnHandler.java new file mode 100644 index 00000000..ce0029db --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/http/handler/ldnHandler.java @@ -0,0 +1,151 @@ +package it.unibo.arces.wot.sepa.engine.protocol.http.handler; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.lang.reflect.Type; + +import java.util.List; +import java.util.StringTokenizer; + +import org.apache.http.HttpResponse; +import org.apache.http.HttpVersion; +import org.apache.http.entity.ContentType; +import org.apache.http.message.BasicHttpResponse; +import org.apache.http.message.BasicStatusLine; +import org.apache.http.nio.entity.NStringEntity; +import org.apache.http.nio.protocol.BasicAsyncResponseProducer; +import org.apache.http.nio.protocol.HttpAsyncExchange; + +import it.unibo.arces.wot.sepa.commons.response.Notification; +import it.unibo.arces.wot.sepa.commons.response.Ping; +import it.unibo.arces.wot.sepa.commons.response.Response; +import it.unibo.arces.wot.sepa.commons.response.SubscribeResponse; +import it.unibo.arces.wot.sepa.engine.core.EventHandler; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; +import com.google.gson.stream.JsonReader; + + + +public class ldnHandler implements EventHandler{ + + private HttpAsyncExchange exchange; + private static String FILENAME = "/Users/AlessioPazzani/Documents/Tesi/FileSpuid/"; + private BufferedWriter b; + + public ldnHandler(HttpAsyncExchange httpExchange) throws IllegalArgumentException { + this.exchange = httpExchange; + } + + @SuppressWarnings("unchecked") + @Override + public void sendResponse(Response response) throws IOException { + org.json.simple.JSONArray ja = new org.json.simple.JSONArray(); + SubscribeResponse responseSubscribe = (SubscribeResponse)response; + String spuidString = responseSubscribe.getSpuid(); + + StringTokenizer token = new StringTokenizer(spuidString); + String spuid = "null"; + token.nextToken("/"); + while(token.hasMoreTokens()){ + spuid = token.nextToken(); + } + + File file = new File(FILENAME + spuid); + file.createNewFile(); + + String stringa = "{\"@context\":\"http://www.w3.org/ns/ldp\"," + "\n" + + "\"@id\":\"http://+ sepaldnexample.it/profile/" + spuid + "\","+ "\n" + //soggetto + "\"inbox\":\"http://ldnResult/inbox/\"}"; //oggetto + + + NStringEntity entity = new NStringEntity( + stringa, //qui dovrei sostituire stringa a spuid + ContentType.create("application/json-ld", "UTF-8")); + HttpResponse res = new BasicHttpResponse(new BasicStatusLine(HttpVersion.HTTP_1_0, 200, "OK")); + res.setEntity(entity); + res.addHeader("Link", "; rel=\"http://www.w3.org/ns/ldp#inbox\""); + /* + * metodo 1 + * JSONObject jo = new JSONObject(); + * jo.put("\"@context\":", "\"http://www.w3.org/ns/ldp\""); + * jo.put("\"@id\"", "\"http://" + response.getAsJsonObject().toString() + "/profile\""); + * jo.put("\"inbox\"", "\"http://" + spuid + "/inbox/\""); + * ja.add(jo); + * JSONObject mainObj = new JSONObject(); + * mainObj.put("Elenco", ja); + */ + + Type listType = new TypeToken>(){}.getType(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + + + FileWriter w = new FileWriter(FILENAME + spuid); + b = new BufferedWriter (w); + + ja.add((responseSubscribe.getAsJsonObject())); + + String daScrivere = gson.toJson(ja, listType); + b.write(daScrivere); + + b.close(); + + exchange.submitResponse(new BasicAsyncResponseProducer(res)); + + } + + @Override + public void notifyEvent(Notification notify) throws IOException { + + String spuidString = notify.getSpuid(); + + StringTokenizer token = new StringTokenizer(spuidString); + String spuid = "null"; + token.nextToken("/"); + while(token.hasMoreTokens()){ + spuid = token.nextToken(); + } + String fileName = FILENAME + spuid; + + Type listType = new TypeToken>(){}.getType(); + Gson gson = new GsonBuilder().setPrettyPrinting().create(); + JsonReader jsonReader = new JsonReader(new FileReader(fileName)); + //JSONArray data = gson.fromJson(reader, Review.class); + List jsonList = gson.fromJson(jsonReader, listType); + + System.out.println(gson.toJson(jsonList)); + + jsonList.add(notify.getAsJsonObject()); + + String result = gson.toJson(jsonList); //trasformo l'elenco degli oggetti in un'unica stringa + + /*Passaggi utili al fine di avere anche la notifica, aggiunta, indentata bene */ + jsonList = gson.fromJson(result, listType); + result = gson.toJson(jsonList); + + FileWriter w; + w=new FileWriter(fileName); + + BufferedWriter b; + b=new BufferedWriter (w); + + b.write(result); //sovrascrittura del file + + b.close(); + + System.out.println("Spuid notifica: " + spuid); + + } + + @Override + public void sendPing(Ping ping) throws IOException { + // TODO Auto-generated method stub + + } + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandler.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandler.java new file mode 100644 index 00000000..c23dae59 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandler.java @@ -0,0 +1,44 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +import java.io.DataOutputStream; +import java.io.IOException; + +import it.unibo.arces.wot.sepa.commons.response.Notification; +import it.unibo.arces.wot.sepa.commons.response.Ping; +import it.unibo.arces.wot.sepa.commons.response.Response; +import it.unibo.arces.wot.sepa.engine.core.EventHandler; + +public class ResponseTCPHandler implements EventHandler{ + + DataOutputStream outSock = null; + + public ResponseTCPHandler(DataOutputStream outSock){ + // + this.outSock = outSock; + } + + @Override + public void sendResponse(Response response) throws IOException { + // TODO Auto-generated method stub + System.out.println("RISPOSTA:\n\n" + response.toString()); + outSock.writeUTF(response.toString()); + } + + @Override + public void notifyEvent(Notification notify) throws IOException { + // TODO Auto-generated method stub + outSock.writeUTF(notify.toString()); + } + + @Override + public void sendPing(Ping ping) throws IOException { + // TODO Auto-generated method stub + + } + + + + + + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandlerUnsubscribe.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandlerUnsubscribe.java new file mode 100644 index 00000000..5499d90c --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ResponseTCPHandlerUnsubscribe.java @@ -0,0 +1,51 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +import java.io.DataOutputStream; +import java.io.IOException; + +import it.unibo.arces.wot.sepa.commons.response.Notification; +import it.unibo.arces.wot.sepa.commons.response.Ping; +import it.unibo.arces.wot.sepa.commons.response.Response; +import it.unibo.arces.wot.sepa.engine.core.EventHandler; + +public class ResponseTCPHandlerUnsubscribe implements EventHandler{ + + DataOutputStream outSock = null; + int numSubscribe = 0; + + public ResponseTCPHandlerUnsubscribe(DataOutputStream outSock, int numSubscribe){ + // + this.outSock = outSock; + this.numSubscribe = numSubscribe; + } + + @Override + public void sendResponse(Response response) throws IOException { + // TODO Auto-generated method stub + System.out.println("RISPOSTA:\n\n" + response.toString()); + outSock.writeUTF(response.toString()); + outSock.close(); + + + } + + @Override + public void notifyEvent(Notification notify) throws IOException { + // TODO Auto-generated method stub + outSock.writeUTF(notify.toString()); + outSock.close(); + + } + + @Override + public void sendPing(Ping ping) throws IOException { + // TODO Auto-generated method stub + + } + + + + + + +} diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleQuery.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleQuery.java new file mode 100644 index 00000000..08c625d3 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleQuery.java @@ -0,0 +1,75 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import it.unibo.arces.wot.sepa.engine.protocol.http.handler.SPARQL11Handler; +import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; + +public class ServerPrincipaleQuery extends Thread{ + + SPARQL11Handler handler; + Scheduler scheduler; + + public ServerPrincipaleQuery(Scheduler scheduler) throws IllegalArgumentException { + this.scheduler = scheduler; + } + + + + public void run() { + int port = 1080; + + ServerSocket serverSocket = null; + Socket clientSocket = null; + + try { + serverSocket = new ServerSocket(port); + serverSocket.setReuseAddress(true); + System.out.println("ServerPrincipale: started "); + System.out.println("Server: socket created: " + serverSocket); + } catch (Exception e) { + System.err + .println("Server: problems about server socket's creation: "+ e.getMessage()); + e.printStackTrace(); + try { + serverSocket.close(); + } catch (IOException e1) { + System.out.println("Errore nella chiusura della socketServer"); + e1.printStackTrace(); + } + System.exit(1); //idem + } + + try { + while (true) { + System.out.println("Server: waiting for query...\n"); + + try { + clientSocket = serverSocket.accept(); //bloccante!!! + System.out.println("Server: connection accepted: " + clientSocket); + } catch (Exception e) { + System.err + .println("Server: impossible to establish a connection: " + + e.getMessage()); + e.printStackTrace(); + continue; + } + + try { + new ServerSecondarioQuery(clientSocket, scheduler).start(); + } catch (Exception e) { + System.err.println("Server: problemi nel server thread: " + + e.getMessage()); + e.printStackTrace(); + continue; + } + }// while true + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Server: termino..."); + System.exit(2); + } + } +} + diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleSubscribe.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleSubscribe.java new file mode 100644 index 00000000..886b9849 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerPrincipaleSubscribe.java @@ -0,0 +1,78 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import it.unibo.arces.wot.sepa.engine.protocol.http.handler.SPARQL11Handler; +import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; + +public class ServerPrincipaleSubscribe extends Thread{ + + SPARQL11Handler handler; + Scheduler scheduler; + + public ServerPrincipaleSubscribe(Scheduler scheduler) throws IllegalArgumentException { + this.scheduler = scheduler; + } + + + + public void run() { + int port = 1081; + + ServerSocket serverSocket = null; + Socket clientSocket = null; + + try { + serverSocket = new ServerSocket(port); + serverSocket.setReuseAddress(true); + System.out.println("ServerPrincipale: started "); + System.out.println("Server: socket created: " + serverSocket); + } catch (Exception e) { + System.err + .println("Server: problems about server socket's creation: "+ e.getMessage()); + e.printStackTrace(); + try { + serverSocket.close(); + } catch (IOException e1) { + System.out.println("Errore nella chiusura della socketServer"); + e1.printStackTrace(); + } + System.exit(1); //idem + } + + try { + /* + * E' necessario che il collegamento TCP venga mantenuto sempre aperto, in quanto + * ogni volta che avviene una modifica, SEPA deve rispondere al client che ha fatto + * la subscribe + */ + System.out.println("Server: waiting for subscribe...\n"); + + try { + clientSocket = serverSocket.accept(); //bloccante!!! + System.out.println("Server: connection accepted: " + clientSocket); + } catch (Exception e) { + System.err + .println("Server: impossible to establish a connection: " + + e.getMessage()); + e.printStackTrace(); + //continue; + } + + try { + new ServerSecondarioSubscribe(clientSocket, scheduler).start(); + } catch (Exception e) { + System.err.println("Server: problemi nel server thread: " + + e.getMessage()); + e.printStackTrace(); + //continue; + } + } catch (Exception e) { + e.printStackTrace(); + System.out.println("Server: termino..."); + System.exit(2); + } + } +} + diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioQuery.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioQuery.java new file mode 100644 index 00000000..6e2d10fd --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioQuery.java @@ -0,0 +1,120 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +//Alessio Pazzani 0000766862 +import java.io.*; +import java.net.*; +import java.util.StringTokenizer; + +import it.unibo.arces.wot.sepa.commons.request.QueryRequest; +import it.unibo.arces.wot.sepa.commons.request.UpdateRequest; +import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; + + + +/* +* +* Varie: controllo campi inviati dal client +* +* +* sistemo il server quando ne ho tempo +* +*/ + + +public class ServerSecondarioQuery extends Thread { + private Socket clientSocket = null; + private Scheduler scheduler = null; + + public ServerSecondarioQuery(Socket clientSocket, Scheduler scheduler) { + this.clientSocket = clientSocket; + this.scheduler = scheduler; + } + + public void run() { + System.out.println("Attivazione figlio: "+ Thread.currentThread().getName()); + + DataInputStream inSock; + DataOutputStream outSock; + + try { + inSock = new DataInputStream(clientSocket.getInputStream()); + outSock = new DataOutputStream(clientSocket.getOutputStream()); + } catch (IOException ioe) { + System.out.println("Problemi nella creazione degli stream di input/output su socket: "); + ioe.printStackTrace(); + return; + } + + try { + try { + //INIZIALIZZAZIONE CAMPI MANDATI DAL CLIENT + String richiestaClient; + while ((richiestaClient = inSock.readUTF()) != null) { + + System.out.println("Codice ricevuto dal cliente: " + richiestaClient + "\n"); + + StringTokenizer token = new StringTokenizer(richiestaClient); //parse della stringa per capirne il tipo + + String requestType = token.nextToken(" "); + + System.out.println("Tipo di richiesta: " + requestType + "\n"); + + QueryRequest req = new QueryRequest(richiestaClient); + UpdateRequest up = new UpdateRequest(richiestaClient); + //System.out.println(req.getSPARQL());//stampa insert { } where .... + + if(requestType.equals("select")){ + scheduler.schedule(req, new ResponseTCPHandler(outSock)); + System.out.println("Query analizzata correttamente\n\n"); + System.out.println("In attesa di una query\n\n"); + + } else if(requestType.equals("delete") || requestType.equals("insert")){ + + scheduler.schedule(up, new ResponseTCPHandler(outSock)); + System.out.println("Update analizzata correttamente\n\n"); + System.out.println("In attesa di una query\n\n"); + + } else{ + System.out.println("Tipo di query non riconosciuta\n\n"); + } + + + //per la subscribe devo fare un nuovo server che accetta le richieste sulla porta + // 1081 (ad esempio, diversa da 1080, molto simile al precedente) + /* quando accetto la connessione guardo se è subscribe -> vado su serverSecondario + * faccio lo schedule di una SubscribeRequest(...) + * responseTCPHandler rimarrà la stessa + * + * delete { ?a} insert {  } where { ?a } + * select ?a where { ?a } per la sottoscrizione + */ + + + } // while + }catch (EOFException eof) { + System.out.println("Raggiunta la fine delle ricezioni, chiudo..."); + clientSocket.close(); + System.out.println("PutFileServer: termino..."); + //System.exit(0); + } catch (SocketTimeoutException ste) { + System.out.println("Timeout scattato: "); + ste.printStackTrace(); + clientSocket.close(); + //System.exit(1); + } catch (Exception e) { + System.out.println("Problemi, i seguenti : "); + e.printStackTrace(); + System.out.println("Chiudo ed esco..."); + clientSocket.close(); + //System.exit(2); + } + + }//try esterno + catch (IOException ioe) { + System.out.println("Problemi nella chiusura della socket: "); + ioe.printStackTrace(); + System.out.println("Chiudo ed esco..."); + //System.exit(3); + } + }//run +}//class \ No newline at end of file diff --git a/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioSubscribe.java b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioSubscribe.java new file mode 100644 index 00000000..d7ddd9e4 --- /dev/null +++ b/engine/src/main/java/it/unibo/arces/wot/sepa/engine/protocol/tcp/ServerSecondarioSubscribe.java @@ -0,0 +1,132 @@ +package it.unibo.arces.wot.sepa.engine.protocol.tcp; + +//Alessio Pazzani 0000766862 +import java.io.*; +import java.net.*; +import java.util.StringTokenizer; + +import it.unibo.arces.wot.sepa.commons.request.SubscribeRequest; +import it.unibo.arces.wot.sepa.commons.request.UnsubscribeRequest; +import it.unibo.arces.wot.sepa.engine.scheduling.Scheduler; + + + +/* +* +* Varie: controllo campi inviati dal client +* +* +* sistemo il server quando ne ho tempo +* +*/ + + +public class ServerSecondarioSubscribe extends Thread { + private Socket clientSocket = null; + private Scheduler scheduler = null; + private int numSubscribe = 0; + + public ServerSecondarioSubscribe(Socket clientSocket, Scheduler scheduler) { + this.clientSocket = clientSocket; + this.scheduler = scheduler; + } + + public void run() { + System.out.println("Attivazione figlio: "+ Thread.currentThread().getName()); + + DataInputStream inSock; + DataOutputStream outSock; + + try { + inSock = new DataInputStream(clientSocket.getInputStream()); + outSock = new DataOutputStream(clientSocket.getOutputStream()); + } catch (IOException ioe) { + System.out.println("Problemi nella creazione degli stream di input/output su socket: "); + ioe.printStackTrace(); + return; + } + + try { + try { + //INIZIALIZZAZIONE CAMPI MANDATI DAL CLIENT + String richiestaClient; + while ((richiestaClient = inSock.readUTF()) != null) { + + System.out.println("Codice ricevuto dal cliente: " + richiestaClient + "\n"); + + StringTokenizer token = new StringTokenizer(richiestaClient); //parse della stringa per capirne il tipo + + String requestType = token.nextToken(" "); + String spuid = token.nextToken(" "); + + System.out.println("Tipo di richiesta: " + requestType + "\n"); + + //requestType.equalsIgnoreCase("select") + SubscribeRequest req = new SubscribeRequest(richiestaClient); + UnsubscribeRequest unreq = new UnsubscribeRequest(spuid); + + if(requestType.equals("select")){ + scheduler.schedule(req, new ResponseTCPHandler(outSock)); + System.out.println("Inviato aggiornamento al client con successo\n\n"); + numSubscribe++; + + } if(requestType.equals("unsubscribe")){ + scheduler.schedule(unreq, new ResponseTCPHandlerUnsubscribe(outSock, numSubscribe)); + System.out.println("Unsubscribe relativo a\n" + spuid + "\neseguita con successo"); + numSubscribe--; + if(numSubscribe==0){ + System.out.println("Terminate tutte le subscribe"); + System.out.println("Connessione chiusa in quanto terminate le sottoscrizioni"); + + } else if(numSubscribe<0){ + System.out.println("spuid non esistente\n\n"); + } + }else{ + System.out.println("Tipo di subscribe/unsubuscribe non riconosciuta\n"); + } + + + //per la subscribe devo fare un nuovo server che accetta le richieste sulla porta + // 1081 (ad esempio, diversa da 1080, molto simile al precedente) + /* quando accetto la connessione guardo se è subscribe -> vado su serverSecondario + * faccio lo schedule di una SubscribeRequest(...) + * responseTCPHandler rimarrà la stessa + * + * delete { ?a} insert {  } where { ?a } + * select ?a where { ?a } per la sottoscrizione + * select * where { ?a ?b ?c } per la sottoscrizione a tutto + * insert data { } per l'update (N.B cambio ogni volta almeno un ) + */ + + + } // while + }catch (EOFException eof) { + System.out.println("Raggiunta la fine delle ricezioni, chiudo..."); + clientSocket.close(); + System.out.println("PutFileServer: termino..."); + //System.exit(0); + } catch (SocketTimeoutException ste) { + System.out.println("Timeout scattato: "); + ste.printStackTrace(); + clientSocket.close(); + //System.exit(1); + }catch(SocketException ex){ + System.out.println("Sono state chiuse tutte le socket, addio\n"); + clientSocket.close(); + }catch (Exception e) { + System.out.println("Problemi, i seguenti : "); + e.printStackTrace(); + System.out.println("Chiudo ed esco..."); + clientSocket.close(); + //System.exit(2); + } + + }//try esterno + catch (IOException ioe) { + System.out.println("Problemi nella chiusura della socket: "); + ioe.printStackTrace(); + System.out.println("Chiudo ed esco..."); + //System.exit(3); + } + }//run +}//class \ No newline at end of file