=Paper= {{Paper |id=Vol-1080/owled2013_22 |storemode=property |title=Distributed Query Processing on the Cloud: the Optique Point of View (Short Paper) |pdfUrl=https://ceur-ws.org/Vol-1080/owled2013_22.pdf |volume=Vol-1080 |dblpUrl=https://dblp.org/rec/conf/owled/KllapiBHIJKKZ13 }} ==Distributed Query Processing on the Cloud: the Optique Point of View (Short Paper)== https://ceur-ws.org/Vol-1080/owled2013_22.pdf
            Distributed Query Processing on the Cloud:
                     the Optique Point of View
                          (Short Paper)?

          Herald Kllapi2 , Dimitris Bilidas2 , Ian Horrocks1 , Yannis Ioannidis2 ,
       Ernesto Jimenez-Ruiz1 , Evgeny Kharlamov1 , Manolis Koubarakis2 , Dmitriy
                                     Zheleznyakov1
                                     1
                                       Oxford University, UK
                                 2
                                     University of Athens, Greece



         Abstract. The Optique European project 3 [6] aims at providing an end-to-end
         solution for scalable access to Big Data integration, where end users will for-
         mulate queries based on a familiar conceptualization of the underlying domain.
         From the users’ queries the Optique platform will automatically generate appro-
         priate queries over the underlying integrated data, optimize and execute them on
         the Cloud. In this paper we present the distributed query processing engine of the
         Optique platform. The efficient execution of complex queries posed by end users
         is an important and challenging task. The engine aims at providing a scalable so-
         lution for query execution in the Cloud, and should cope with heterogeneity of
         data sources as well as with temporal and streaming data.


1      Introduction

The Optique Project aims at providing end users with the ability to access Big Data
through queries expressed using familiar conceptualization of the underlying domain.
This approach is usually referred to as Ontology Based Data Access (OBDA) [12, 2].
    In Figure 1 we present the architecture of the Optique OBDA approach. The core
elements of the architecture are an ontology, which describes the application domain
in terms of user-oriented vocabulary of classes (usually referred as concepts) and rela-
tionships between them (usually referred as roles), and a set of mappings, which relates
the terms in the ontology and the schema of the underlying data source. End-users for-
mulate queries using the terms defined by the ontology, which should be intuitive and
correspond to their view of the domain, and thus, they are not required to understand
the data source schemata. The main components of the Optique’s architecture are
  – the Query Formulation component that allows end users to pose queries to the sys-
     tem,
  – the Ontology and Mapping Management component that allows for bootstrapping
     of ontologies and mappings during the installation of the system and for their sub-
     sequent maintenance,
?
     This research was financed by the Optique project with the grant agreement FP7-318338.
 3
     http://www.optique-project.eu
                                   end users                  IT-expert

                     Application           Query          Ontology & Mapping
                                         Formulation          Mangement




                        results




                                         query
                                                   Ontology      Mappings



                                         Query Transformation

                         Distributed Query Optimisation and Processing


                                          ...                   ...
                                                          heterogeneous
                                  streaming data           data sources


                Fig. 1. The general architecture of the Optique OBDA system

  – the Query Transformation component that rewrites users’ queries into queries over
     the underlying data sources,
  – the Distributed Query Optimisation and Processing component that optimises and
     executes the queries produced by the Query Transformation component.
All the components will communicate through agreed APIs.
    In order for the Optique OBDA solution to be practical, it is crucial that the out-
put of the query rewriting process can be evaluated effectively and efficiently against
the integrated data sources of possibly various types, including temporal data and data
streams. This efficiency for Big Data scenarios is not an option – it is a necessity. We
plan to achieve the efficiency by both massive parallelism, i.e., running queries with
the maximum amount of parallelism at each stage of execution, and elasticity, i.e., by
allowing a flexibility to execute the same query with the use of resources that depends
on the the resource availability for this particular query, and the execution time goals.
The role of the Distributed Query Optimisation and Processing component is to provide
this functionality and we will focus on the component in this paper.
    An important motivation for the Optique project are two demanding use cases that
will give to the project the necessary test-bed. The first one is provided by Siemens 4 and
encompasses several terabytes of temporal data coming from sensors, with an increase
rate of about 30 gigabytes per day. The users need to query these data in combination
with many gigabytes of other relational data that describe events. The second use case
is provided by Statoil 5 and concerns more than one petabyte of geological data. The
data are stored in multiple databases which have different schemata and the user has
to access many of them in order to get results for a single query. In general, in the oil
and gas industry IT-experts spend 30–70% of their time gathering and assessing the
 4
     http://www.siemens.com
 5
     http://www.statoil.com
quality of data [3]. This is clearly very expensive in terms of both time and money. The
Optique project aims at solutions that reduce the cost of data access dramatically. More
precisely, Optique aims at reducing the running times of the queries for these use cases
from hours to minutes and from days to hours. A bigger goal of the project is to provide
a platform6 with a generic architecture that can be easily adapted to any domain that
requires scalable data access and efficient query execution for OBDA solutions.
    The rest of this paper is organized as follows. In Section 2 we first give an overview
of the system architecture and then we present a more detailed description of the basic
components. In Section 3 we present some uses cases. In Section 4 we present some
related work and in Section 5 we conclude.


2      System Architecture

The distributed query execution is based on the ADP [14], a system for complex dataflow
processing in the cloud. ADP has been developed and used successfully in several Euro-
pean projects. The initial ideas came from Diligent [5]. Then ADP was adapted and used
in project Health-e-Child as a Medical Query Processing Engine [10]. Subsequently, it
was refined to support more execution environments, more operators, and a more query
processing and optimization algorithms. ADP has been used successfully at the Uni-
versity of Athens for large scale distributed sorting algorithms, large scale database
processing, and also for distributed data mining problems.
     The general architecture of the distributed query answering component within the
Optique platform is shown in Figure 2. The system utilizes state-of-the-art database
techniques: (i) a declarative query language based on data flows, (ii) the use of sophis-
ticated optimization techniques for executing queries efficiently, (iii) operator extensi-
bility to bring domain specific computations into the database processing, and (iv) ex-
ecution platform independence to insulate applications from the idiosyncrasies of the
execution environments, such as local clusters, private clouds, or public clouds.
     The query is received through the gateway using JDBC API (Java Database Con-
nectivity). This communication mainly involves interaction with the Query Transforma-
tion component. The Master node is responsible for initialization and coordination of
the process. The Optimization Engine produces the execution plan for the query using
techniques described in [11]. Next, the execution plan is given to the Execution Engine
which is responsible for reserving the necessary resources, sending the operators of the
graph to the appropriate workers, and monitor the execution.
     The system uses two different communication channels between the different com-
ponents of the system. Data from the relational data sources, streams, and federated
sources is exchanged between the workers using lightweight TCP connections and com-
pression for high throughput. All the other communications (e.g., signals denoting that
a node is connected, execution is finished, etc.), is done through a peer-to-peer network
(P2P Net). For the time being, this network is a simple master-slaves using Java-RMI
(Remote Method Invocation).

 6
     Optique’s solutions are going to be integrated via the Information Workbench platform [8].
 Integrated via Information Workbench

 Presentation                                                                                                            Optique's configuration interface
 Layer


     Query Answering Component
                                                              Query transformation                                                   Configuration
                                                   Query Rewriting              Answ Manager                                          of modules
                                                   1-time Q                 1-time Q
                                                               Stream Q                 Stream Q                                       LDAP
                                                   SPARQL                   SPARQL
                   Shared                                                                                                          authentification
                  database

                                                      ADP Gateway: JDBC, Stream API
                                                   Distributed Query        Execution based on ADP

                                                                    Master

                   Data          Optimisation       Optimisation                  Execution           Execution           Stream
                 Connector         Engine             Engine                       Engine              Engine            Connector

                                                                     P2P Net

                                   Worker              Worker                      Worker              Worker


                                                                Fast Local Net
 Application,
 Internal Data
 Layer

                   JDBC, Teiid                                  Stream connector                                            Cloud API


 Externat                                                                                                                 Cloud (virtual        Externat
 Data                 ...           RDBs, triple stores,
                                                                          ...          data streams                      resource pool)
                                                                                                                                                  Cloud
                                    temporal DBs, etc.
 Layer

 Components                                                                             Colouring Convention                      Types of Users
                                                               Front end:                                                                  Expert users
       Component             Group of components                                                      Optique solution
                                                               mainly Web-based




         Fig. 2. General architecture of the ADP component within the Optique System

Language and Optimization: The queries are expressed in SQL. Queries are issued to
the system through the gateway. The SQL query is transformed to a data flow language
allowing complex graphs with operators as nodes and with edges representing producer-
consumer relationships. The first level of optimization is planning. The result of this
phase is an SQL query script. We enhanced SQL by adding the table partition as a first
class citizen of the language. A table partition is defined as a set of tuples having a
particular property (e.g., the value of a hash function applied on one column is the same
for all the tuples in the same partition). A table is defined as a set of partitions. The
optimizer produces an execution plan in the form of a directed acyclic graph (DAG),
with all the information needed to execute the query. The following query is an example.
    DISTRIBUTED CREATE TABLE lineitem large TO 10 ON l orderkey AS
    SELECT * FROM lineitem WHERE l quantity = 20

The query creates 10 partitions of a table with name lineitem large with rows based on
a selection condition. The partitioning is based on the column l orderkey.

Execution Engine: ADP relies on an asynchronous execution engine. As soon as a
worker node completes one job, it is sending a corresponding signal to the execution
engine. The execution engine uses an asynchronous event based execution manager,
which records the jobs that have been executed and assigns new jobs when all the pre-
requisite jobs have finished.

Worker Pool: The resources needed to execute the queries (machines, network, etc.)
are reserved or allocated automatically. Those resources are wrapped into containers.
Containers are used to abstract from the details of a physical machine in a cluster or a
virtual machine in a cloud. Workers run queries using a python wrapper of SQLite 7 .
This part of the system, which is available 8 , can also be used as a standalone single node
DB. Queries are expressed in a declarative language which is an extension of SQL. This
language facilitates considerably the use of user-defined functions (UDFs). UDFs are
written in Python. The system supports row, aggregate, and virtual table functions.

Data / Stream Connector: Data Connector and Stream Connector are responsible for
handling and dispatching the relational and stream data through the network respec-
tively. These modules are used when the system receives a request for collecting the
results of executed queries. Stream Connector uses an asynchronous stream event lis-
tener to be notified of incoming stream data, whereas Data Connector utilizes a table
transfer scheduler to receive partitions of relational tables from the worker nodes.


3     Use Cases

Now we present some of the use cases of the distributed query processing component.

 Data Import: The system provides the possibility to import data from several het-
erogenous sources. These data can be of many different types, including relational data,
data in file formats like comma-separated values files or XML and streams. When the
data is in the form of streams, the procedure is initiated through the Stream API in the
ADP Gateway, otherwise the JDBC API is used. In the first case, Master Node employs
one or more Optimization Engines which produce a plan defining which worker nodes
should be receiving each data stream. In the second case, the Optimization Engines also
define how the data should be partitioned (number of partitions, partitioning column,
etc.) and where each partition should be stored. The Master Node is notified when the
execution plan is ready and then it employs one or more Execution Engines.

 Query Execution: In a similar manner, when ADP Gateway receives a query, one
or more Optimization Engines produce an execution plan which contains the resulted
sequence of operators and the data partition upon which they should be applied. The
Optimization Engines report back to the Master Node which then utilizes the Execution
Engines who communicate with the Worker Nodes to execute the query. In the case
of federated data, some Worker Nodes need to communicate with external databases.
They ask queries and get back their results which, depending on the plan, need to be
combined with the data that they have locally.
 7
     http://www.sqlite.org
 8
     https://code.google.com/p/madis/
    When the execution of the query has finished, the Master Node is notified and
through the Gateway it can send a message to the external components. The results
stay in the Worker Nodes, because the volume of data in the results may be prohibitive
for them to be transfered in a single node. When an external component want to access
the results, then it must do so by sending an extra request. When receiving such a re-
quest, the Master Node uses the Data Connector to collect the results or apply to them
some aggregation functions (for example sum, average, etc.).


4   Related Work

The most popular big data platforms today are based on the MapReduce paradigm.
MapReduce was introduced by Google [4] as a simplified big data processing platform
on large clusters. The intuitive appeal of MapReduce and the availability of platforms
such as Hadoop, has also fueled the development of data management platforms that
aim at the support of SQL as a query language on top of MapReduce, or are hybrid
systems combining MapReduce implementations with existing relational database sys-
tems. These platforms attempt to compete with the well-known shared-nothing parallel
database systems available from relational DBMS vendors such as Oracle. For example,
Hive [13] is a data warehousing system built on top of Hadoop. The Hive query lan-
guage, HiveQL, is a subset of SQL. For example, it does not support materialized views,
and allows subqueries only in the FROM clause. Furthermore, only equality predicates
are supported in joins and it only supports UNION ALL (bag union) i.e., duplicates are
not eliminated. HadoopDB [1] is a very recent proposal which integrate single-node
database functionality with Hadoop in order to provide a highly scalable and fault toler-
ant distributed database with full SQL support. The U.S. startup Hadapt [9] is currently
commercializing HadoopDB. Greenplum by EMC [7] is another commercial platform
for big data analysis that is based on a massively parallel database system (shared-
nothing architecture) that supports in-database MapReduce capabilities.
     In the Semantic Web world, the emphasis recently has been on building scalable sys-
tems that offer expressive querying and reasoning capabilities over ontologies expressed
in RDFS or OWL 2 and its profiles (EL, QL and RL) and data in RDF. These systems
include database platforms for RDF offering the query language SPARQL (Sesame,
Jena, Virtuoso, Quest [12], OWLIM, AllegroGraph etc.) and OWL 2 reasoners (Pel-
let, HermiT etc.) Although recent RDF stores have been shown to scale to billions of
triples, the scalability of Semantic Web systems in general is lacking compared with the
scalability of more traditional systems such as parallel databases, or newer approaches
such as NoSQL databases and parallel databases/MapReduce hybrids. Recent Semantic
Web research is also focusing on the use of MapReduce for querying RDF data, but also
for forward and backward reasoning with RDFS/OWL 2 ontologies.
     To summarise, we believe that the benefits of solutions based on MapReduce are
limited and cannot be efficiently extended to more general workloads and more expres-
sive SQL queries such the ones needed in Optique. We believe that by using ADP and
the holistic optimization framework of Optique, provide us with a solid foundation upon
which to build and go beyond current state of the art platforms for Big Data processing.
5    Conclusions
The efficient execution of SQL queries on big data is an open research problem and
initial results achieved by research prototypes such as HadoopDB are encouraging. In
the Optique project we will push the barrier and provide massively parallel and elastic
solutions for query optimisation and execution over Big Data integration. Our solutions
based on ground breaking research will be deployed and evaluated in our use cases.
This will provide valuable insights for the application of semantic technologies to Big
Data integration problems in industry.


References
 1. Abouzeid, A., Bajda-Pawlikowski, K., Abadi, D.J., Rasin, A., Silberschatz, A.: HadoopDB:
    An architectural hybrid of MapReduce and DBMS technologies for analytical workloads.
    PVLDB 2(1), 922–933 (2009)
 2. Calvanese, D., Giacomo, G.D., Lembo, D., Lenzerini, M., Poggi, A., Rodriguez-Muro, M.,
    Rosati, R., Ruzzi, M., Savo, D.F.: The MASTRO system for ontology-based data access.
    Semantic Web 2(1), 43–53 (2011)
 3. Crompton, J.: Keynote talk at the W3C Workshop on Semantic Web in Oil & Gas Indus-
    try: Houston, TX, USA, 9–10 December (2008), available from http://www.w3.org/
    2008/12/ogws-slides/Crompton.pdf
 4. Dean, J., Ghemawat, S.: MapReduce: simplified data processing on large clusters. Com-
    munications of the ACM 51(1), 107–113 (2008), http://doi.acm.org/10.1145/
    1327452.1327492
 5. DILIGENT: A digital library infrastructure on grid enabled technology (2004), http://
    diligent.ercim.eu/, http://diligent.ercim.eu/
 6. Giese, M., Calvanese, D., Haase, P., Horrocks, I., Ioannidis, Y., Kllapi, H., Koubarakis, M.,
    Lenzerini, M., Möller, R., Özep, O., Rodriguez Muro, M., Rosati, R., Schlatte, R., Schmidt,
    M., Soylu, A., Waaler, A.: Scalable End-user Access to Big Data. In: Rajendra Akerkar: Big
    Data Computing. Florida : Chapman and Hall/CRC. To appear. (2013)
 7. Greenplum: ”greenplum, http://www.greenplum.com/” (2011), http://www.
    greenplum.com/
 8. Haase, P., Schmidt, M., Schwarte, A.: The information workbench as a self-service platform
    for linked data applications. In: COLD (2011)
 9. Hadapt: ”hadapt analytical platform, http://www.hadapt.com/” (2011), http://www.
    hadapt.com/
10. Health-e-Child: Integrated healthcare platform for european paediatrics (2006), http://
    www.health-e-child.org/, http://www.health-e-child.org/
11. Kllapi, H., Sitaridi, E., Tsangaris, M.M., Ioannidis, Y.E.: Schedule optimization for data
    processing flows on the cloud. In: Proc. of SIGMOD. pp. 289–300 (2011)
12. Rodriguez-Muro, M., Calvanese, D.: High performance query answering over dl-lite ontolo-
    gies. In: KR (2012)
13. Thusoo, A., Sarma, J.S., Jain, N., Shao, Z., Chakka, P., Zhang, N., Anthony, S., Liu, H.,
    Murthy, R.: Hive - a petabyte scale data warehouse using Hadoop. pp. 996–1005 (2010)
14. Tsangaris, M.M., Kakaletris, G., Kllapi, H., Papanikos, G., Pentaris, F., Polydoras, P.,
    Sitaridi, E., Stoumpos, V., Ioannidis, Y.E.: Dataflow processing and optimization on grid
    and cloud infrastructures. IEEE Data Eng. Bull. 32(1), 67–74 (2009)