=Paper=
{{Paper
|id=Vol-1558/paper45
|storemode=property
|title=A Relational Approach to Complex Dataflows
|pdfUrl=https://ceur-ws.org/Vol-1558/paper45.pdf
|volume=Vol-1558
|authors=Yannis Chronis,Yannis Foufoulas,Vaggelis Nikolopoulos,Alexandros Papadopoulos,Lefteris Stamatogiannakis,Christoforos Svingos,Yannis Ioannidis
|dblpUrl=https://dblp.org/rec/conf/edbt/ChronisFNPSSI16
}}
==A Relational Approach to Complex Dataflows==
A Relational Approach to Complex Dataflows
Yannis Chronis Yannis Foufoulas Vaggelis Nikolopoulos
Alexandros Lefteris Christoforos Svingos
Papadopoulos Stamatogiannakis
Yannis Ioannidis
{i.chronis, johnfouf, vgnikolop, alpap, estama, c.sviggos, yannis}@di.uoa.gr
MaDgIK Lab, Dept. of Informatics and Telecom., University of Athens, Greece.
ABSTRACT 2. SYSTEM OVERVIEW
Clouds have become an attractive platform for highly scal- The system architecture is shown in Figure 1. From a
able processing of Big Data, especially due to the concept of user’s point of view, the system is used as a traditional
elasticity, which characterizes them. Several languages and database system: create / drop tables or indexes, import
systems for cloud-based data processing have been proposed external data, issue queries. The queries are expressed in
in the past, with the most popular among them being based ExaDFL. ExaDFL is transformed into data processing flows
on MapReduce [7]. In this paper, we present Exareme, a (dataflows) represented as directed acyclic graphs (DAGs)
system for elastic large-scale data processing on the cloud that have arbitrary computations (operators) as nodes and
that follows a more general paradigm. Exareme is an open producer-consumer interactions as edges between the nodes.
source project [1]1 . The system offers a declarative language The typical queries we target are complex data-intensive
which is based on SQL with user-defined functions (UDFs) transformations that are expensive to execute, queries may
extended with parallelism primitives and an inverted syn- run for several minutes or hours.
tax to easily express data pipelines. Exareme is designed Exareme is separated into the following components: The
to take advantage of clouds by dynamically allocating and Master, is the main entry point, through the gateway, to
deallocating compute resources, offering trade-offs between the system and is responsible for the coordination of the rest
execution time and monetary cost. of the components. The Execution Engine communicates
with the resource manager and schedules the operators of the
query respecting their dependencies in the dataflow graph
1. INTRODUCTION and the available resources. It also monitors the dataflow
Modern applications face the need to process large amount execution and handles failures.
of data using complex functions. Examples include complex
analytics [14], similarity joins [12], and extract-transform- Comput
eCl
oud Ma
ster
Ga
tewa
y S
tor
ageCl
oud
load (ETL) processes [15]. Such rich tasks are typically Pa
rser
expressed using high-level APIs or languages [16] and are
Ex
ecu onEng
ine Reg
ist
ry
transformed into data intensive workflows, or simply dataflows.
Res
our
ceMa
nag
er Op mi
za onEng
ine
Exareme uses a master-worker architecture. Our language is
based on SQL to express both intra-worker and inter-worker
dataflows. We use UDFs and a inverted syntax to easily Wor
ker Wor
ker Wor
ker
express local pipelines and complex computations. Inter-
worker dataflows are described with simple parallelism prim-
itives. These abstractions allow users to fine tune dataflows
for different applications. All of the basic components of Figure 1: Exareme’s architecture
Exareme are designed to support the elastic properties of
cloud infrastructures. We provide comparisons to other state
All the information related to the data and the allocated
of the art systems.
VMs is stored in the Registry. The Resource Manager
1 is responsible for the allocation and deallocation of VMs
This research is supported in part by the European Com-
mission under Optique, Human Brain and MD-Paedigree based on the demand. The Optimization Engine trans-
projects. lates ExaDFL query into the distributed machine code of the
system (similar to [13]) and creates the final execution plan
by assigning operators to workers (Section 4.1). Finally,
the Worker executes operators (relational operators and
UDFs) and transfers intermediate results to other workers.
Each worker fetches the partitions needed for the execution
and caches them to its local disk for subsequent usage.
c 2016, Copyright is with the authors. Published in the Workshop Pro- Madis is the core engine of the Worker[2], it is an extension of
ceedings of the EDBT/ICDT 2016 Joint Conference (March 15, 2016, Bor- SQLite, based on the APSW wrapper. It executes the com-
deaux, France) on CEUR-WS.org (ISSN 1613-0073). Distribution of this
paper is permitted under the terms of the Creative Commons license CC- putations described by ExaQL (3.1). Madis processes the
by-nc-nd 4.0 data in a streaming fashion and performs pipelining when
Elas9c
Infrastructure
Time/Money
Trade-‐offs
data combinations. We use SQL to combine data and pro-
Resources
Time
Skyline
of
Solu9ons
cess them with UDFs, whenever the SQL abstractions are
Allocated
Resources
not sufficient or efficient to use. We enhanced the syntax of
ExaQL to easily combine virtual table functions (UDTFs)
Demand
into data pipelines.
Time
Money
Suppose we want to find the most frequent words that some
clerks use in their comments when they buy or sell products.
Figure 2: Dynamic infrastucture elasticities We have the names of the clerks in a compressed XML file
that is accessible via HTTP. In ExaQL, we can express it as
follows:
possible, even for UDFs. The UDFs are executed inside the select word, count(∗) as count
database along with the relational operators to push them from(select STRSPLITV(l comment) as word
as close to the data as possible. from lineitem, orders, (XMLPARSE ’[”/name”]’
FILE ’http://../clerk.xml.gz’) as clerk
2.1 Data Model where l orderkey = o orderkey and o clerk = name) as words
group by word
Exareme adopts the relational data model and extends it order by count desc;
with:
Complex Field Types: JSON, CSV, and TSV. The query uses the FILE UDTF to fetch, uncompress, and
Table Partitions: A table is defined as a set of partitions load the data on-the-fly from the HTTP server specified. It
and a partition is defined as a set of records having a par- is not needed to import or create temporary tables, all the
ticular property, i.e. the hash value of a column. details are handled automatically by the system. The out-
Partitioning: If the database has multiple tables as it hap- put of FILE is given to the XMLPARSE UDTF that parses
pens in data warehouses, the largest tables are partitioned the XML content and produces a table with the names of the
and all others are replicated along with the partitions. Data clerks. Row function STRSPLITV takes a string as input
placement is crucial for performance and elasticity. We use a and produces one nested table for each comment by splitting
modification of consistent hashing [11], because it offers good the words into rows. Notice that this behaviour is different
theoretical bounds and can be accurately modeled. To in- from the row functions typically supported by database sys-
crease flexibility and efficiency we use over-partitioning and tems which produce a single value. This is an extension of
replication. This way, changing the size of the virtual in- Exareme for row and aggregate functions.
frastructure will cause only data transfer and not the com-
putation of a new partitioning. 3.2 Data Parallelism Primitives
The support of simple primitives declaratively express po-
2.2 Money/Time Trade-Offs tential data parallelism in the dataflow language itself and
Exareme can express money/time trade-offs by examining let the system decide the actual degree of parallelism at run-
variations of an execution plan, we refer to this notion as time. This is very helpful since the queries are expressed
eco-elasticity [9] [8]. Exareme’s scheduler creates different independently of the parallelism used.
execution plans based on the algorithm described in 4.1.
Along with every query the user can specify an SLA. Using 3.2.1 Input Primitives
the SLAs the scheduler chooses the execution plan based on Figure 3 (top) shows the types of combinations supported
its time and money requirements. on two partitioned tables R and S, where a query Q is ex-
ecuted on each partition pair indicated, as well as the type
of reduction supported on a single partitioned table.
3. LANGUAGE Direct : This combines two (or more) tables that either
Queries are issued to Exareme using ExaDFL. ExaDFL (a) both have been partitioned in the way required by the
is a dataflow language that describes DAGs and it’s based combination specified, e.g., a distributed join on tables hash-
on SQL extended with UDFs and data parallelism primitives partitioned on the join attribute, or (b) one has been fully
[6]. ExaDFL allows fine control, but requires an understand- replicated and the other has been partitioned in some fash-
ing of partitioning and data placement. ion, e.g., a join between a small table replicated to the loca-
We are currently working on an optimizer that will pro- tions of the partitions of much larger table.
duce ExaDFL from UDF extended SQL by applying clas- Cartesian product: This combines two (or more) tables
sic database optimizations and transforming functions with that have been partitioned in ways unrelated to the combi-
their distributed version when it is necessary. nation specified.
In this section we firstly present the language that describes Tree: This performs a multi-level tree reduction on a sin-
intra-worker dataflows. Then we present the data paral- gle table, generalizing the two-level (combine and reduce)
lelism primitives and at the end we present the language as reduction of MapReduce. This is used when Q has aggre-
one. gate functions that are algebraic or distributive and has been
We use the following subset of TPC-H [3] : lineitem(l orderkey, shown to exhibit very good performance in practice.
l comment), orders(o orderkey,o clerk). Both are hash par-
titioned to 4 parts on their keys. 3.2.2 Output Primitives
Figure 3 (bottom):
3.1 ExaQL Same: The default mode does, the output number of par-
ExaQL is based on the SQL-92 standard. The relational titions is determined by the input.
primitives of SQL are a good way to express relations and Partition: Hash partitioning is used. This requires two
table clerk is replicated. Finally, the third query is used to
create table result using a tree aggregation. This is possible
because the aggregate function sum is distributive. All the
temporary tables are deleted automatically at the end of the
script.
4. QUERY OPTIMIZATION
In principle, the optimization process could proceed in
one giant step, examining all execution plans that could an-
swer the query and choosing the optimal that satisfies the
Figure 3: Input Partitioning (top), Output Parti- required constraints. Given the size of the alternatives space
tioning (bottom) in our setting, this approach is infeasible. Instead, our op-
timization process proceeds in multiple smaller steps, each
one operating at some level and making assumptions about
steps: i) partition each of the input parts and ii) union each the levels below. This is in analogy to query optimization in
of the sub-partitions into the final output. traditional databases but with the following differences. The
Broadcast: This creates full replicas of the output file, first operators may represent arbitrary operations and may have
broadcasting each partition to all relevant workers and then performance characteristics that are not known. Further-
performing their union at each worker. more, optimality may be subject to QoS or other constraints
and may be based on multiple criteria, e.g., monetary cost
3.3 ExaDFL of resources, quality of data, etc., and not just solely on
All of the above compose ExaDFL according to the fol- performance.The resources available for the execution of a
lowing grammar: dataflow are not fixed a-priori but flexible and reservable on
demand.
ExaDFL := ()+
query := ; 4.1 Sky
parallelism := create distributed [temp]
The dataflow scheduler we use, takes as input the dataflow
table [