001/**
002 * Copyright (c) 2012, The University of Southampton and the individual contributors.
003 * All rights reserved.
004 *
005 * Redistribution and use in source and binary forms, with or without modification,
006 * are permitted provided that the following conditions are met:
007 *
008 *   *  Redistributions of source code must retain the above copyright notice,
009 *      this list of conditions and the following disclaimer.
010 *
011 *   *  Redistributions in binary form must reproduce the above copyright notice,
012 *      this list of conditions and the following disclaimer in the documentation
013 *      and/or other materials provided with the distribution.
014 *
015 *   *  Neither the name of the University of Southampton nor the names of its
016 *      contributors may be used to endorse or promote products derived from this
017 *      software without specific prior written permission.
018 *
019 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
020 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
021 * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
022 * DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
023 * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
024 * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
025 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
026 * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
027 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
028 * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
029 */
030package org.openimaj.rdf.storm.utils;
031
032import java.util.ArrayList;
033import java.util.Iterator;
034import java.util.List;
035
036import org.apache.jena.datatypes.RDFDatatype;
037import org.apache.jena.datatypes.TypeMapper;
038import org.apache.jena.graph.BlankNodeId;
039import org.apache.jena.graph.Graph;
040import org.apache.jena.graph.Node;
041import org.apache.jena.graph.NodeFactory;
042import org.apache.jena.graph.Node_Blank;
043import org.apache.jena.graph.Node_Literal;
044import org.apache.jena.graph.Node_URI;
045import org.apache.jena.graph.Node_Variable;
046import org.apache.jena.graph.Triple;
047import org.apache.jena.graph.compose.MultiUnion;
048import org.apache.jena.graph.impl.LiteralLabel;
049import org.apache.jena.mem.GraphMem;
050import org.apache.jena.reasoner.rulesys.Rule;
051import org.apache.jena.shared.AddDeniedException;
052import org.apache.jena.sparql.core.BasicPattern;
053import org.apache.jena.sparql.syntax.ElementFilter;
054import org.apache.jena.sparql.syntax.Template;
055import org.openimaj.kestrel.KestrelServerSpec;
056
057import com.esotericsoftware.kryo.Kryo;
058import com.esotericsoftware.kryo.Serializer;
059import com.esotericsoftware.kryo.io.Input;
060import com.esotericsoftware.kryo.io.Output;
061
062import backtype.storm.Config;
063
064/**
065 * A collections to tools for letting Jena play nicely with Storm
066 *
067 * @author Sina Samangooei (ss@ecs.soton.ac.uk)
068 * @author David Monks (dm11g08@ecs.soton.ac.uk)
069 *
070 */
071public class JenaStormUtils {
072
073        /**
074         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
075         *
076         */
077        public static class NodeSerialiser_URI extends Serializer<Node_URI> {
078
079                @Override
080                public void write(Kryo kryo, Output output, Node_URI object) {
081                        output.writeString(object.getURI());
082                }
083
084                @Override
085                public Node_URI read(Kryo kryo, Input input, Class<Node_URI> type) {
086                        return (Node_URI) NodeFactory.createURI(input.readString());
087                }
088
089        }
090
091        /**
092         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
093         *
094         */
095        public static class TemplateSerialiser extends Serializer<Template> {
096
097                @Override
098                public void write(Kryo kryo, Output output, Template object) {
099                        final BasicPattern bgp = object.getBGP();
100                        output.writeInt(bgp.size());
101                        for (final Triple triple : bgp) {
102                                kryo.writeClassAndObject(output, triple);
103                        }
104                }
105
106                @Override
107                public Template read(Kryo kryo, Input input, Class<Template> type) {
108                        final BasicPattern bgp = new BasicPattern();
109                        final int count = input.readInt();
110                        for (int i = 0; i < count; i++) {
111                                bgp.add((Triple) kryo.readClassAndObject(input));
112                        }
113                        return new Template(bgp);
114                }
115
116        }
117
118        /**
119         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
120         *
121         */
122        public static class NodeSerialiser_Literal extends Serializer<Node_Literal> {
123
124                @Override
125                public void write(Kryo kryo, Output output, Node_Literal object) {
126                        final LiteralLabel label = object.getLiteral();
127                        output.writeString(label.getLexicalForm());
128                        output.writeString(label.language());
129                        output.writeString(label.getDatatypeURI());
130                }
131
132                @Override
133                public Node_Literal read(Kryo kryo, Input input, Class<Node_Literal> type) {
134                        final String lexicalForm = input.readString();
135                        final String langauge = input.readString();
136                        final String datatypeURI = input.readString();
137                        final RDFDatatype dtype = TypeMapper.getInstance().getSafeTypeByName(datatypeURI);
138                        return (Node_Literal) NodeFactory.createLiteral(lexicalForm, langauge, dtype);
139
140                }
141
142        }
143
144        /**
145         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
146         *
147         */
148        public static class NodeSerialiser_Blank extends Serializer<Node_Blank> {
149
150                @Override
151                public void write(Kryo kryo, Output output, Node_Blank object) {
152                        final String blankNodeString = object.toString();
153                        output.writeString(blankNodeString);
154                }
155
156                @Override
157                public Node_Blank read(Kryo kryo, Input input, Class<Node_Blank> type) {
158                        final String label = input.readString();
159                        final Node_Blank retNode = (Node_Blank) NodeFactory.createBlankNode(BlankNodeId.create(label));
160                        return retNode;
161                }
162
163        }
164
165        /**
166         * @author David Monks<dm11g08@ecs.soton.ac.uk>
167         *
168         */
169        public static class NodeSerialiser_Variable extends Serializer<Node_Variable> {
170
171                @Override
172                public void write(Kryo kryo, Output output, Node_Variable object) {
173                        final String blankNodeString = object.toString();
174                        output.writeString(blankNodeString);
175                }
176
177                @Override
178                public Node_Variable read(Kryo kryo, Input input, Class<Node_Variable> type) {
179                        final String label = input.readString();
180                        final Node_Variable retNode = (Node_Variable) NodeFactory.createVariable(label.replaceFirst("\\?", ""));
181                        return retNode;
182                }
183
184        }
185
186        /**
187         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
188         *
189         */
190        public static class TripleSerialiser extends Serializer<Triple> {
191
192                @Override
193                public void write(Kryo kryo, Output output, Triple object) {
194                        final Node s = object.getSubject();
195                        final Node p = object.getPredicate();
196                        final Node o = object.getObject();
197                        kryo.writeClassAndObject(output, s);
198                        kryo.writeClassAndObject(output, p);
199                        kryo.writeClassAndObject(output, o);
200                }
201
202                @Override
203                public Triple read(Kryo kryo, Input input, Class<Triple> type) {
204                        final Node s = (Node) kryo.readClassAndObject(input);
205                        final Node p = (Node) kryo.readClassAndObject(input);
206                        final Node o = (Node) kryo.readClassAndObject(input);
207                        return new Triple(s, p, o);
208                }
209
210        }
211
212        /**
213         *
214         * @author David Monks <dm11g08@ecs.soton.ac.uk>
215         */
216        public static class GraphSerialiser extends Serializer<Graph> {
217
218                @Override
219                public void write(Kryo kryo, Output output, Graph object) {
220                        output.writeInt(object.size());
221                        final Iterator<Triple> it = object.find(null, null, null);
222                        while (it.hasNext()) {
223                                final Triple next = it.next();
224                                kryo.writeClassAndObject(output, next);
225                        }
226                }
227
228                @Override
229                public Graph read(Kryo kryo, Input input, Class<Graph> type) {
230                        final int size = input.readInt();
231                        Graph graph = null;
232                        graph = new GraphMem();
233                        final List<Triple> overflow = new ArrayList<Triple>();
234                        for (int i = 0; i < size; i++) {
235                                final Object obj = kryo.readClassAndObject(input);
236                                try {
237                                        graph.add((Triple) obj);
238                                } catch (final AddDeniedException ex) {
239                                        overflow.add((Triple) obj);
240                                }
241                        }
242                        Iterator<Triple> it = overflow.iterator();
243                        while (!overflow.isEmpty()) {
244                                if (!it.hasNext())
245                                        it = overflow.iterator();
246                                try {
247                                        graph.add(it.next());
248                                        it.remove();
249                                } catch (final AddDeniedException ex) {
250                                }
251                        }
252                        return graph;
253                }
254
255        }
256
257        /**
258         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
259         *
260         */
261        public static class NodeSerialiser_ARRAY extends Serializer<Node[]> {
262
263                @Override
264                public void write(Kryo kryo, Output output, Node[] object) {
265                        output.writeInt(object.length);
266                        for (final Node node : object) {
267                                kryo.writeClassAndObject(output, node);
268                        }
269                }
270
271                @Override
272                public Node[] read(Kryo kryo, Input input, Class<Node[]> type) {
273                        final Node[] out = new Node[input.readInt()];
274                        for (int i = 0; i < out.length; i++) {
275                                out[i] = (Node) kryo.readClassAndObject(input);
276                        }
277                        return out;
278                }
279
280        }
281
282        /**
283         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
284         *
285         */
286        public static class KestrelServerSpec_Serializer extends Serializer<KestrelServerSpec> {
287
288                @Override
289                public void write(Kryo kryo, Output output, KestrelServerSpec object) {
290                        output.writeString(object.host);
291                        output.writeInt(object.port);
292                }
293
294                @Override
295                public KestrelServerSpec read(Kryo kryo, Input input, Class<KestrelServerSpec> type) {
296                        return new KestrelServerSpec(input.readString(), input.readInt());
297                }
298
299        }
300
301        /**
302         * @author Sina Samangooei (ss@ecs.soton.ac.uk)
303         *
304         */
305        public static class RuleSerializer extends Serializer<Rule> {
306
307                @Override
308                public void write(Kryo kryo, Output output, Rule object) {
309                        output.writeString(object.toString());
310                }
311
312                @Override
313                public Rule read(Kryo kryo, Input input, Class<Rule> type) {
314                        return Rule.parseRule(input.readString());
315                }
316
317        }
318
319        /**
320         * @param conf
321         *            register some Jena serialisers to this configuration
322         */
323        public static void registerSerializers(Config conf) {
324                conf.registerSerialization(Node[].class, NodeSerialiser_ARRAY.class);
325                conf.registerSerialization(Node_URI.class, NodeSerialiser_URI.class);
326                conf.registerSerialization(Node_Literal.class, NodeSerialiser_Literal.class);
327                conf.registerSerialization(Node_Blank.class, NodeSerialiser_Blank.class);
328                conf.registerSerialization(Node_Variable.class, NodeSerialiser_Variable.class);
329                conf.registerSerialization(Triple.class, TripleSerialiser.class);
330                conf.registerSerialization(ArrayList.class);
331                conf.registerSerialization(KestrelServerSpec.class, KestrelServerSpec_Serializer.class);
332                conf.registerSerialization(Rule.class, RuleSerializer.class);
333                conf.registerSerialization(Graph.class, GraphSerialiser.class);
334                conf.registerSerialization(GraphMem.class, GraphSerialiser.class);
335                conf.registerSerialization(MultiUnion.class, GraphSerialiser.class);
336                conf.registerSerialization(Template.class, TemplateSerialiser.class);
337                conf.registerSerialization(ElementFilter.class);
338                // conf.registerSerialization(Node_NULL.class);
339                // conf.registerSerialization(Node_Blank.class);
340        }
341}