=Paper=
{{Paper
|id=Vol-1733/paper-11
|storemode=property
|title=Towards a Distributed, Scalable and Real-Time RDF Stream Processing Engine
|pdfUrl=https://ceur-ws.org/Vol-1733/paper-11.pdf
|volume=Vol-1733
|authors=Xiangnan Ren
|dblpUrl=https://dblp.org/rec/conf/semweb/Ren16
}}
==Towards a Distributed, Scalable and Real-Time RDF Stream Processing Engine==
Towards a distributed, scalable and real-time RDF Stream Processing engine Xiangnan Ren1,2,3 1 ATOS - 80 Quai Voltaire, 95870 Bezons, France {xiang-nan.ren}@atos.net 2 ISEP - LISITE, Paris 75006, France 3 LIGM (UMR 8049), CNRS, UPEM, F-77454, Marne-la-Vallée, France Abstract. Due to the growing need to timely process and derive valuable information and knowledge from data produced in the Semantic Web, RDF stream processing (RSP) has emerged as an important research domain. Of course, modern RSP have to address the volume and velocity characteristics encountered in the Big Data era. This comes at the price of designing high throughput, low latency, fault tolerant, highly available and scalable engines. The cost of implementing such systems from scratch is very high and usually one prefers to program components on top of a framework that possesses these properties, e.g., Apache Hadoop or Apache Spark. The research conducting in this PhD adopts this approach and aims to create a production-ready RSP engine which will be based on domain standards, e.g., Apache Kafka and Spark Streaming. In a nutshell, the engine aims to i) address basic event modeling - to guarantee the completeness of input data in window operators, ii) process real-time RDF stream in a distributed manner - efficient RDF stream handling is required; iii) support and extend common continuous SPARQL syntax - easy-to-use, adapt to the industrial needs and iv) support reasoning services at both the data preparation and query processing levels. Keywords: Stream Processing, Distributed Computing, Semantic Web, RDF, RSP 1 Problem statement Nowadays, the RDF data format is getting more and more popular in the Internet of Things (IoT) ecosystem, i.e., data produced by sensors or other devices are either directly represented as RDF triples or trasnformed into this standard. The heterogeneous nature of IoT data sources potentially presents multiple challenges for the implementation of real-time RDF Stream Processing (RSP) services. These challenges refer to four distinct problematics [16,25,9,2]: 1. functionality support: from a user friendliness perspective, the RSP engine is supposed to support common SPARQL syntax. 2. correctness of output: the query answers produced by the system are ex- pected to be correct. 3. performance: like any other stream processing system, high throughput, low response latency, scalability are considered as the performance criteria for RSP engine. 4. reasoning: correctness and completeness of query answers may depend on computed inferences. To address the above-mentioned problems, we can identify two categories of RSP engines which have designed in the last few years. The category of central- ized systems, including C-SPARQL [5], CQELS [15], SPARQLstream [7], ETALIS [3]/EP-SPARQL1 have lead the way in the direction of RDF stream processing but are limited in terms of the amount of events that can handle. This has mo- tivated a second generation of RSP which are distributed, e.g., CQELS-Cloud [17], Katts [11] and Distributed-Etalis. Although reaching better throughput and latency performances, these systems do not integrate the state of the art approaches that are currently guaranteeing fault tolerance, highly availability and scalability and which can enforce the system’s robustness and correctness. 2 Problem Relevancy and Motivation Stream processing in general is one of the hottest topics in Big data. It is cur- rently supporting analytics functionalities that have not been considered before. This is mainly due to the popularization of Web technologies (i.e., the pipeline that provides streams) as well as advancements in computing (i.e., emergence of distributed frameworks that facilitate the design of parallel computations). RDF data is relevant in this streaming context because i) it is a popular graph format equipped with an expressive SPARQL query language, ii) it is anchored in the Web with its wide use of URIs and iii) it has the ability to support reasoning services. The project within which my PhD research is taking place requires an impor- tant subset of the Semantic Web standards, namely RDF, RDFS and OWL con- sidering facilities to represent and reason graph data and knowledge, SPARQL as a well-established query language and Linked Open Data to access external knowledge. The project concerns industrial water resources management. The events emitted from various sensors correspond to pressure, flow, chlorine, tem- perature, turbidity, etc. values and need to be processed almost in real-time to trigger system alerts. The analysis of these events is especially valuable if joined with external, contextual data such as the geographical properties where the sensor is situated. Of course, our goal is to design a generic RSP engine that can adapt easily to use cases concerned with other domains. Intuitively, the goal is to seamlessly integrate novel ontologies, data/knowledge repositories and sets of queries within a highly distributed, reasoning-enabled, continuous query and complex event processing system. This aspect is particularly important since the PhD thesis is funded by a large IT company which envisions to have commercial activities using this system. 1 EP-SPARQL, a wrapper of ETALIS to support SPARQL-like syntax 2 3 Related work In the last few years, a variety of RDF stream processing engines have been proposed. As mentioned in section 1, the design of RSP needs to cover three as- pects [16,2]: functionality support, output correctness and performance. Engine performance is the most concerned issue, since the fine-grained and schema-free nature of RDF data could lead to intensive join operations on query task. For the convenience of illustration, we divide the RSP engines into two categories: centralized and distributed engines. Centralized RSP engines: currently, C-SPARQL, CQELS, SPARQLstream , and ETALIS/EP-SPARQL are popular centralized RSP engines. All of them are developed to run on a single machine. Distributed RSP engines: CQELS-Cloud is the first distributed RSP system which mainly focuses on the engine elasticity and scalability. The whole system is based on Storm2 . Firstly, CQELS-Cloud compresses the incoming RDF streams by dictionary encoding in order to reduce the data size and the communication in the computing cluster. Then, to overcome the performance bottlenecks on join tasks, the authors propose a parallel multiway join based on probing sequence, which is inspired by the MJoin algorithm [21]. However, CQELS-Cloud uses the conventional RDF triple (subject, predicate, object) as its data model and has not provided any event model yet. Furthermore, to the best of our knowledge, CQELS-Cloud is not completely open source, and current CQELS-Cloud does not allow external users to define customized queries. Katts is another RSP engine based on Storm, which applies graph partition- ing 3 to optimize the message exchanging for cluster computing. Based on our evaluation, we can say that Katts allows a limited amount of query operators and queries with only-streaming data sources. Besides, neither query algebra optimization nor SPARQL syntax support have been considered. Finally, the implementation of Katts remains at the stage of scientific prototype. Distributed-ETALIS is a distributed Complex Event Processing (CEP) en- gine based on Storm and Kafka 4 . Both Distributed-ETALIS and its centralized version ETALIS implement a rule-based DSL for event detection (pattern match- ing). It seems that ETALIS team has stopped the software maintenance. We also meet a serious scalability problem in our preliminary evaluation of ETALIS. A distributed RSP system must rely on a generic distributed stream pro- cessing framework such as Storm, Spark Streaming [23,4,24] and Flink 5 . These frameworks have proven their successes in countless scientific and industrial ap- plications. Comparing to Spark (Streaming) and Flink, Storm provides a rela- tively low-level programming API to allow generic real-time service design. Spark Streaming, one of the principal components of Spark ecosystem, is a near real- time distributed computing framework. Current Spark-Streaming is based on 2 http://storm.apache.org 3 http://glaros.dtc.umn.edu/gkhome/views/metis 4 http://kafka.apache.org 5 https://flink.apache.org 3 micro-batch execution mechanism, and provides the sub-second delay. Flink is another popular massively parallel data processing engine which supports real- time data processing and CEP. Due to the enrichment and the maturity of the platform ecosystems, we choose Spark Streaming as the framework of our RSP engine. 4 Research questions Given the context of this PhD thesis, the general research question is: How to efficiently query and reason over massive real-time RDF events data in a distributed computing environment? To answer this question, the following four aspects have to be considered: – Q1 - Distributed, robust RDF stream query processing: we consider the design of an architecture for the processing of RDF streams that is highly available, tolerates failures, scales and guarantees high throughput and low latencies. No RSP engine possesses all these properties and present perfor- mance figures comparable to non-RDF state of the art engines. In particular, these last engines are generally designed using a distributed pub/sub messag- ing system (e.g., Apache Kafka) and a dedicated stream engine (e.g., Apache Flink, Storm, Spark streaming, Beam). Replying to this question implies to adapt RDF peculiarities to this rapidly evolving ecosystem. – Q2 - Compression and reasoning: processing (i.e., querying and reasoning) in a real-time manner over complex graph-based events implies to optimize all computation aspects. We consider that a decompression-free querying ap- proach tightly connected to a semantic-based ontology encoding can guar- antee these properties. – Q3 - Extending continuous SPARQL query toward complex event process- ing capacities: we have identified some practical use cases that are requiring continuous query features that we have not encountered in available contin- uous SPARQL query languages, e.g., session-based windows. These laguage extensions have to be integrated in our declarative query language. – Q4 - How to evaluate RSP engines: we need a way to evaluate the system that is currently being designed. This implies to identify, experiment and evaluate streaming benchmarks and engines. A set of performance metrics also need to be defined to evaluate distributed RSP engines in a cluster environment. 5 Preliminary results Some preliminary results have obtained for Q4 of section 4. The main idea was to evaluate three centralized RSP systems, i.e. C-SPARQL, CQELS, ETALIS. This evaluation defines a complete set of performance metrics to measure query latency and memory consumption by varying stream rate, window size, number of streams , etc. The evaluation of distributed RSP engines are in progress, some 4 Fig. 1: Engine architecture performance metrics need to be added, such as measure the query latency and engine throughput by varying the number of machines. We have currently tested Katts on Amazon EC2 6 by using the real-world data. The evaluation results of Katts are kept for future comparisons with our system. This part of work gives us a deep understanding on existing RSP engines and continuous SPARQL query language features. This has helped me on the design and the implementation of our own RSP engine. Recently, I started to develop a prototype 7 that will run continuous SPARQL queries on Spark Streaming and Kafka. In Figure 1, I provide a high-level view of the system’s architecture. The data obtained from sensors or other sources are first encoded as RDF events. Next, the obtained RDF event streams continue to be serialized into binary format and transmitted to Kafka. Then, Spark- Streaming concurrently receives, caches and deserializes the incoming binary stream. Finally, the system applies the precompiled optimized logical plan to proceed the query execution. 6 Approach In this section, I present how I intend to address the Q1, Q2 and Q3 research questions presented in section 4 as well as how I will validate the system effi- ciency. Q1 addresses to three subquestions: i) Data/Event Modeling. Firstly, the sys- tem aims to support two RDF formats for data updating: triple (i.e. (s, p, o)) by triple and event by event. Basically, an RDF triple can be regarded as the sim- plest RDF graph pattern. I use Sesame [6] to convert the sensors data into RDF events. An RDF event is essentially a set of triple patterns. To identify each event and its belonging stream source, the representation of triple pattern (s, p, o) is extended. I.e., an event is formed as e = (strId, evtId, t, {(sn , pn , on )}n=1,...,N ) 8 . ii) SPARQL syntax supporting. To support distributed querying on Spark 6 https://github.com/renxiangnan/RSP_Evaluation_Results 7 https://github.com/renxiangnan/rsp 8 A Java/Scala collection object 5 Streaming, the incoming RDF event streams need to be cached in window oper- ator and converted into Spark pre-defined data structure. [14] gives a road map to choose an appropriate Spark distributed data collection. I create a native parser to parse SPARQL query into the Spark relational operators. The parser takes Sesame to transform the SPARQL query operators into an intermediate in- fix expression, namely, logical plan. The logical plan will be continuously parsed into a physical plan (i.e. algebra tree) to proceed the query execution. Due to the real-time aspect in streaming context, some optimization tech- niques based on data preprocessing become hardly applicable, e.g. data indexing, vertical partitioning [1], property tables [22] etc. My current research mainly fo- cuses on query algebra rewriting. Based on existing work [18,13,19,8,20], I plan to redesign a three-layers optimization strategy to simplify the query algebra, reduce the overhead of the aggregate operators and adjust the join order of triple patterns. Q2 concerns compression and reasoning aspects. I approach goes along the work of ERI[12] and RDSZ[10] but aims to go one step further in terms of com- pression. That is, we aim to adopt a two structures approach, one containing graph event patterns and the other one storing data bindings associated to these graph patterns. Graph patterns will be represented as compact graph signature that will facilitate the efficient discovery of similar signatures. Moreover, the ele- ments of these signatures will correspond to semantic-aware numerical encoding of underlying ontologies. This approach will support reasoning at both materi- alization (to enrich graph events that can potentially satisfy a set of continuous queries) and query reformulation (for optimization and satisfiability purposes) processing. The continuous SPARQL query language we have implemented so far is in- spired by C-SPARQL. It supports the definition of different window forms, e.g., fixed or sliding windows, via keywords which are permitted in queries. We have identified an additional session-based window form that is required in some prac- tical use cases. Hence, the approach to address Q3 will consist in defining the proper semantics for additional continuous SPARQL query clauses, implement- ing this semantics within our distributed streaming engine and integrating op- timization for this extended query languages, e.g., to support efficient query reformulation. Finally, this novel approach will be heavily tested and evaluated against practical use cases and data sets. 7 Evaluation plan Our system will be evaluated on both synthetic and real-world data sets. Con- cerning the synthetic data sets, we have already identified, experimented and even extended well-established benchmarks during our work of Q4. The real- world experimentation will be directly related to the water resources manage- ment use case. It consists of values obtained from real sensors. We aim to test the system live as well as replay old values and thus test whether our system is able to discovery peculiar situations that we known have occurred in the past. 6 The evaluation will be conducted both in local mode and distributed mode, with respect to the following outline. Functionality support. Since our sytem aims to fully automatize the query execution, we need to implement all the common query operators ((BGP join, filter, aggregations, etc.). Correctness. On both synthetic and real-world data sets, we will be able to qualify the correctness of the provided answers. On our water resources management, we already know that correctness implies some reasoning. Performance. Two performance aspects need to be studied: Latency and throughput. Latency refers to the wall-clock time consumed by the engine on each query execution. Throughput depicts that how many RDF triples can be processed in a unit time. These two performance metrics gives us insights into whether the system could handle the target scenario or not. The engine throughput and query latency are recorded by varying the input stream rate (events/second) and the number of cluster nodes. 8 Reflections This PhD thesis is now at an early stage and preliminary results are promising. I still have a lot to design and implement but I’m getting closer to the point where room for thorough experimentation, evaluation, exploration and innovation will be possible. In the near future, I’m planning to propose some trade-off between materialization and query rewriting as well as novel optimization for RDF stream processing, e.g. querying over compressed data. References 1. D. J. Abadi, A. Marcus, S. Madden, and K. J. Hollenbach. Scalable semantic web data management using vertical partitioning. In Proceedings of the 33rd Interna- tional Conference on Very Large Data Bases, pages 411–422, 2007. 2. M. I. Ali, F. Gao, and A. Mileo. Citybench: A configurable benchmark to evaluate RSP engines using smart city datasets. In The Semantic Web - ISWC 2015. 3. D. Anicic, P. Fodor, S. Rudolph, R. Stühmer, N. Stojanovic, and R. Studer. A rule-based language for complex event processing and reasoning. In Web Reasoning and Rule Systems, pages 42–57, 2010. 4. M. Armbrust, R. S. Xin, C. Lian, Y. Huai, D. Liu, J. K. Bradley, X. Meng, T. Kaf- tan, M. J. Franklin, A. Ghodsi, and M. Zaharia. Spark SQL: relational data processing in spark. In Proceedings of the 2015 ACM SIGMOD, pages 1383–1394, 2015. 5. D. F. Barbieri, D. Braga, S. Ceri, E. D. Valle, and M. Grossniklaus. C-SPARQL: SPARQL for continuous querying. In Proceedings of the 18th International Con- ference on World Wide Web, pages 1061–1062, 2009. 6. J. Broekstra, A. Kampman, and F. van Harmelen. Sesame: An architecture for storin gand querying RDF data and schema information. In Spinning the Semantic Web: Bringing the World Wide Web to Its Full Potential [outcome of a Dagstuhl seminar], pages 197–222, 2003. 7. J. Calbimonte, Ó. Corcho, and A. J. G. Gray. Enabling ontology-based access to streaming data sources. In The Semantic Web - ISWC 2010, pages 96–111, 2010. 7 8. D. Chatziantoniou, M. O. Akinde, T. Johnson, and S. Kim. The md-join: An operator for complex OLAP. In Proceedings of the 17th International Conference on Data Engineering, pages 524–533, 2001. 9. D. Dell’Aglio, J. Calbimonte, M. Balduini, Ó. Corcho, and E. D. Valle. On cor- rectness in RDF stream processor benchmarking. In The Semantic Web - ISWC 2013, pages 326–342, 2013. 10. J. D. Fernández, A. Llaves, and Ó. Corcho. Efficient RDF interchange (ERI) format for RDF data streams. In The Semantic Web - ISWC 2014 - 13th Inter- national Semantic Web Conference, Riva del Garda, Italy, October 19-23, 2014. Proceedings, Part II, pages 244–259, 2014. 11. L. Fischer, T. Scharrenbach, and A. Bernstein. Scalable linked data stream process- ing via network-aware workload scheduling. In Proceedings of the 9th International Workshop on Scalable Semantic Web Knowledge Base Systems, pages 81–96, 2013. 12. N. F. Garcı́a, J. Arias-Fisteus, L. Sánchez, D. Fuentes-Lorenzo, and Ó. Corcho. RDSZ: an approach for lossless RDF stream compression. In The Semantic Web: Trends and Challenges - 11th International Conference, ESWC 2014, pages 52–67, 2014. 13. H. Kim, P. Ravindra, and K. Anyanwu. From SPARQL to mapreduce: The journey using a nested triplegroup algebra. PVLDB, pages 1426–1429, 2011. 14. H. Naacke, O. Curé, and B. Amann. SPARQL query processing with Apache Spark. ArXiv e-prints, 2016. 15. D. L. Phuoc, M. Dao-Tran, J. X. Parreira, and M. Hauswirth. A native and adaptive approach for unified processing of linked streams and linked data. In The Semantic Web - ISWC 2011, pages 370–388, 2011. 16. D. L. Phuoc, M. Dao-Tran, M. Pham, P. A. Boncz, T. Eiter, and M. Fink. Linked stream data processing engines: Facts and figures. In The Semantic Web - ISWC 2012, 2012. 17. D. L. Phuoc, H. N. M. Quoc, C. L. Van, and M. Hauswirth. Elastic and scalable processing of linked stream data in the cloud. In The Semantic Web - ISWC 2013, pages 280–297, 2013. 18. P. Ravindra. Towards optimization of RDF analytical queries on mapreduce. In Workshops Proceedings of the 30th International Conference on Data Engineering Workshops, ICDE 2014, pages 335–339, 2014. 19. P. Ravindra, H. Kim, and K. Anyanwu. Optimization of complex SPARQL ana- lytical queries. In Proceedings of the 19th International Conference on Extending Database Technology, EDBT 2016, pages 257–268, 2016. 20. P. Tsialiamanis, L. Sidirourgos, I. Fundulaki, V. Christophides, and P. Boncz. Heuristics-based query optimisation for sparql. In Proceedings of the 15th Interna- tional Conference on Extending Database Technology, EDBT ’12, pages 324–335, 2012. 21. S. Viglas, J. F. Naughton, and J. Burger. Maximizing the output rate of multi-way join queries over streaming information sources. In VLDB, pages 285–296, 2003. 22. K. Wilkinson. Jena property table implementation. In SSWS, pages 35–46, 2006. 23. M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: Cluster computing with working sets. In 2nd USENIX Workshop on Hot Topics in Cloud Computing, HotCloud’10, 2010. 24. M. Zaharia, T. Das, H. Li, T. Hunter, S. Shenker, and I. Stoica. Discretized streams: fault-tolerant streaming computation at scale. In ACM SIGOPS 24th Symposium on Operating Systems Principles, SOSP, pages 423–438, 2013. 25. Y. Zhang, M. Pham, Ó. Corcho, and J. Calbimonte. Srbench: A streaming RD- F/SPARQL benchmark. In The Semantic Web - ISWC 2012, pages 641–657, 2012. 8