=Paper= {{Paper |id=Vol-1558/paper45 |storemode=property |title=A Relational Approach to Complex Dataflows |pdfUrl=https://ceur-ws.org/Vol-1558/paper45.pdf |volume=Vol-1558 |authors=Yannis Chronis,Yannis Foufoulas,Vaggelis Nikolopoulos,Alexandros Papadopoulos,Lefteris Stamatogiannakis,Christoforos Svingos,Yannis Ioannidis |dblpUrl=https://dblp.org/rec/conf/edbt/ChronisFNPSSI16 }} ==A Relational Approach to Complex Dataflows== https://ceur-ws.org/Vol-1558/paper45.pdf
                 A Relational Approach to Complex Dataflows

                   Yannis Chronis                          Yannis Foufoulas                    Vaggelis Nikolopoulos
                     Alexandros                                 Lefteris                       Christoforos Svingos
                   Papadopoulos                            Stamatogiannakis
                                                            Yannis Ioannidis
                       {i.chronis, johnfouf, vgnikolop, alpap, estama, c.sviggos, yannis}@di.uoa.gr
                      MaDgIK Lab, Dept. of Informatics and Telecom., University of Athens, Greece.

ABSTRACT                                                                 2.   SYSTEM OVERVIEW
Clouds have become an attractive platform for highly scal-                  The system architecture is shown in Figure 1. From a
able processing of Big Data, especially due to the concept of            user’s point of view, the system is used as a traditional
elasticity, which characterizes them. Several languages and              database system: create / drop tables or indexes, import
systems for cloud-based data processing have been proposed               external data, issue queries. The queries are expressed in
in the past, with the most popular among them being based                ExaDFL. ExaDFL is transformed into data processing flows
on MapReduce [7]. In this paper, we present Exareme, a                   (dataflows) represented as directed acyclic graphs (DAGs)
system for elastic large-scale data processing on the cloud              that have arbitrary computations (operators) as nodes and
that follows a more general paradigm. Exareme is an open                 producer-consumer interactions as edges between the nodes.
source project [1]1 . The system offers a declarative language           The typical queries we target are complex data-intensive
which is based on SQL with user-defined functions (UDFs)                 transformations that are expensive to execute, queries may
extended with parallelism primitives and an inverted syn-                run for several minutes or hours.
tax to easily express data pipelines. Exareme is designed                   Exareme is separated into the following components: The
to take advantage of clouds by dynamically allocating and                Master, is the main entry point, through the gateway, to
deallocating compute resources, offering trade-offs between              the system and is responsible for the coordination of the rest
execution time and monetary cost.                                        of the components. The Execution Engine communicates
                                                                         with the resource manager and schedules the operators of the
                                                                         query respecting their dependencies in the dataflow graph
1.    INTRODUCTION                                                       and the available resources. It also monitors the dataflow
   Modern applications face the need to process large amount             execution and handles failures.
of data using complex functions. Examples include complex
analytics [14], similarity joins [12], and extract-transform-                     Comput
                                                                                       eCl
                                                                                         oud                          Ma
                                                                                                                       ster
                                                                                                     Ga
                                                                                                      tewa
                                                                                                         y                                 S
                                                                                                                                           tor
                                                                                                                                             ageCl
                                                                                                                                                 oud
load (ETL) processes [15]. Such rich tasks are typically                                                              Pa
                                                                                                                       rser
expressed using high-level APIs or languages [16] and are
                                                                                                   Ex
                                                                                                    ecu onEng
                                                                                                            ine      Reg
                                                                                                                       ist
                                                                                                                         ry
transformed into data intensive workflows, or simply dataflows.
                                                                                                   Res
                                                                                                     our
                                                                                                       ceMa
                                                                                                          nag
                                                                                                            er    Op mi
                                                                                                                      za onEng
                                                                                                                             ine
Exareme uses a master-worker architecture. Our language is
based on SQL to express both intra-worker and inter-worker
dataflows. We use UDFs and a inverted syntax to easily                                     Wor
                                                                                             ker             Wor
                                                                                                               ker                 Wor
                                                                                                                                     ker

express local pipelines and complex computations. Inter-
worker dataflows are described with simple parallelism prim-
itives. These abstractions allow users to fine tune dataflows
for different applications. All of the basic components of                          Figure 1: Exareme’s architecture
Exareme are designed to support the elastic properties of
cloud infrastructures. We provide comparisons to other state
                                                                            All the information related to the data and the allocated
of the art systems.
                                                                         VMs is stored in the Registry. The Resource Manager
1                                                                        is responsible for the allocation and deallocation of VMs
  This research is supported in part by the European Com-
mission under Optique, Human Brain and MD-Paedigree                      based on the demand. The Optimization Engine trans-
projects.                                                                lates ExaDFL query into the distributed machine code of the
                                                                         system (similar to [13]) and creates the final execution plan
                                                                         by assigning operators to workers (Section 4.1). Finally,
                                                                         the Worker executes operators (relational operators and
                                                                         UDFs) and transfers intermediate results to other workers.
                                                                         Each worker fetches the partitions needed for the execution
                                                                         and caches them to its local disk for subsequent usage.
 c 2016, Copyright is with the authors. Published in the Workshop Pro-   Madis is the core engine of the Worker[2], it is an extension of
ceedings of the EDBT/ICDT 2016 Joint Conference (March 15, 2016, Bor-    SQLite, based on the APSW wrapper. It executes the com-
deaux, France) on CEUR-WS.org (ISSN 1613-0073). Distribution of this
paper is permitted under the terms of the Creative Commons license CC-   putations described by ExaQL (3.1). Madis processes the
by-nc-nd 4.0                                                             data in a streaming fashion and performs pipelining when
                  Elas9c	
  Infrastructure	
                  Time/Money	
  Trade-­‐offs	
                   data combinations. We use SQL to combine data and pro-
           Resources	
                                     Time	
     Skyline	
  of	
  Solu9ons	
           cess them with UDFs, whenever the SQL abstractions are
                            Allocated	
  Resources	
  

                                                                                                            not sufficient or efficient to use. We enhanced the syntax of
                                                                                                            ExaQL to easily combine virtual table functions (UDTFs)
                                           Demand	
  
                                                                                                            into data pipelines.
                                                Time	
                                          Money	
     Suppose we want to find the most frequent words that some
                                                                                                            clerks use in their comments when they buy or sell products.
      Figure 2: Dynamic infrastucture elasticities                                                          We have the names of the clerks in a compressed XML file
                                                                                                            that is accessible via HTTP. In ExaQL, we can express it as
                                                                                                            follows:
possible, even for UDFs. The UDFs are executed inside the                                                   select word, count(∗) as count
database along with the relational operators to push them                                                   from(select STRSPLITV(l comment) as word
as close to the data as possible.                                                                                   from lineitem, orders, (XMLPARSE ’[”/name”]’
                                                                                                                       FILE ’http://../clerk.xml.gz’) as clerk
2.1    Data Model                                                                                                   where l orderkey = o orderkey and o clerk = name) as words
                                                                                                            group by word
   Exareme adopts the relational data model and extends it                                                  order by count desc;
with:
Complex Field Types: JSON, CSV, and TSV.                                                                       The query uses the FILE UDTF to fetch, uncompress, and
Table Partitions: A table is defined as a set of partitions                                                 load the data on-the-fly from the HTTP server specified. It
and a partition is defined as a set of records having a par-                                                is not needed to import or create temporary tables, all the
ticular property, i.e. the hash value of a column.                                                          details are handled automatically by the system. The out-
Partitioning: If the database has multiple tables as it hap-                                                put of FILE is given to the XMLPARSE UDTF that parses
pens in data warehouses, the largest tables are partitioned                                                 the XML content and produces a table with the names of the
and all others are replicated along with the partitions. Data                                               clerks. Row function STRSPLITV takes a string as input
placement is crucial for performance and elasticity. We use a                                               and produces one nested table for each comment by splitting
modification of consistent hashing [11], because it offers good                                             the words into rows. Notice that this behaviour is different
theoretical bounds and can be accurately modeled. To in-                                                    from the row functions typically supported by database sys-
crease flexibility and efficiency we use over-partitioning and                                              tems which produce a single value. This is an extension of
replication. This way, changing the size of the virtual in-                                                 Exareme for row and aggregate functions.
frastructure will cause only data transfer and not the com-
putation of a new partitioning.                                                                             3.2     Data Parallelism Primitives
                                                                                                               The support of simple primitives declaratively express po-
2.2    Money/Time Trade-Offs                                                                                tential data parallelism in the dataflow language itself and
   Exareme can express money/time trade-offs by examining                                                   let the system decide the actual degree of parallelism at run-
variations of an execution plan, we refer to this notion as                                                 time. This is very helpful since the queries are expressed
eco-elasticity [9] [8]. Exareme’s scheduler creates different                                               independently of the parallelism used.
execution plans based on the algorithm described in 4.1.
Along with every query the user can specify an SLA. Using                                                   3.2.1    Input Primitives
the SLAs the scheduler chooses the execution plan based on                                                     Figure 3 (top) shows the types of combinations supported
its time and money requirements.                                                                            on two partitioned tables R and S, where a query Q is ex-
                                                                                                            ecuted on each partition pair indicated, as well as the type
                                                                                                            of reduction supported on a single partitioned table.
3.    LANGUAGE                                                                                              Direct : This combines two (or more) tables that either
   Queries are issued to Exareme using ExaDFL. ExaDFL                                                       (a) both have been partitioned in the way required by the
is a dataflow language that describes DAGs and it’s based                                                   combination specified, e.g., a distributed join on tables hash-
on SQL extended with UDFs and data parallelism primitives                                                   partitioned on the join attribute, or (b) one has been fully
[6]. ExaDFL allows fine control, but requires an understand-                                                replicated and the other has been partitioned in some fash-
ing of partitioning and data placement.                                                                     ion, e.g., a join between a small table replicated to the loca-
We are currently working on an optimizer that will pro-                                                     tions of the partitions of much larger table.
duce ExaDFL from UDF extended SQL by applying clas-                                                         Cartesian product: This combines two (or more) tables
sic database optimizations and transforming functions with                                                  that have been partitioned in ways unrelated to the combi-
their distributed version when it is necessary.                                                             nation specified.
In this section we firstly present the language that describes                                              Tree: This performs a multi-level tree reduction on a sin-
intra-worker dataflows. Then we present the data paral-                                                     gle table, generalizing the two-level (combine and reduce)
lelism primitives and at the end we present the language as                                                 reduction of MapReduce. This is used when Q has aggre-
one.                                                                                                        gate functions that are algebraic or distributive and has been
We use the following subset of TPC-H [3] : lineitem(l orderkey,                                             shown to exhibit very good performance in practice.
l comment), orders(o orderkey,o clerk). Both are hash par-
titioned to 4 parts on their keys.                                                                          3.2.2    Output Primitives
                                                                                                            Figure 3 (bottom):
3.1    ExaQL                                                                                                Same: The default mode does, the output number of par-
  ExaQL is based on the SQL-92 standard. The relational                                                     titions is determined by the input.
primitives of SQL are a good way to express relations and                                                   Partition: Hash partitioning is used. This requires two
                                                                   table clerk is replicated. Finally, the third query is used to
                                                                   create table result using a tree aggregation. This is possible
                                                                   because the aggregate function sum is distributive. All the
                                                                   temporary tables are deleted automatically at the end of the
                                                                   script.

                                                                   4.    QUERY OPTIMIZATION
                                                                     In principle, the optimization process could proceed in
                                                                   one giant step, examining all execution plans that could an-
                                                                   swer the query and choosing the optimal that satisfies the
Figure 3: Input Partitioning (top), Output Parti-                  required constraints. Given the size of the alternatives space
tioning (bottom)                                                   in our setting, this approach is infeasible. Instead, our op-
                                                                   timization process proceeds in multiple smaller steps, each
                                                                   one operating at some level and making assumptions about
steps: i) partition each of the input parts and ii) union each     the levels below. This is in analogy to query optimization in
of the sub-partitions into the final output.                       traditional databases but with the following differences. The
Broadcast: This creates full replicas of the output file, first    operators may represent arbitrary operations and may have
broadcasting each partition to all relevant workers and then       performance characteristics that are not known. Further-
performing their union at each worker.                             more, optimality may be subject to QoS or other constraints
                                                                   and may be based on multiple criteria, e.g., monetary cost
3.3    ExaDFL                                                      of resources, quality of data, etc., and not just solely on
  All of the above compose ExaDFL according to the fol-            performance.The resources available for the execution of a
lowing grammar:                                                    dataflow are not fixed a-priori but flexible and reservable on
                                                                   demand.
ExaDFL         := ()+
query          :=   ;                          4.1   Sky
parallelism    := create distributed [temp]
                                                                      The dataflow scheduler we use, takes as input the dataflow
                  table  []
                  as []                                DAG and assigns its nodes (operators) to workers. It does
output         := [to ] [(hash | range)]                   so by taking into account two types of constraints i) the
                  partition on (,)∗                    dataflow (DAG) implied constraints based on the inter-operator
input          := direct | cprod | tree | extern                   dependencies captured by its edges, ii) the execution en-
       (the rest is omitted due to space)                          vironment implied constraints due to resource limitations.
   Two or more queries form a script. Each query has two           In that respect, we categorize resources as time-shared and
semantically different parts: parallelism and ExaQL. The           space-shared [10]. Time-shared resources can be used by
first part describes the input and output data parallelism         multiple operators concurrently at very low overhead. Con-
used and the second part the computations that get executed        current use of space-shared resources implies high overheads
on each input combination. The following ExaDFL dataflow           beyond workers limits of resources. We consider memory as
is equivalent to the ExaQL query of our example:                   the only space-shared resource, whereas CPU and network
                                                                   as time-shared resources. Constraints are imposed only by
// Query 1                                                         space-shared resources in every worker, at any given mo-
create distributed temp table clerk to 4 as extern                 ment, memory must be sufficient for the execution of the
  select name
  from (XMLPARSE ’[”/name”]’ FILE ’http://../clerk.xml.gz’);
                                                                   running operators. The scheduling algorithm we propose is
// Query 2                                                         Dynamic Skyline (Sky) and is shown in Algorithm 1.
create distributed temp table words as direct                         Sky is an iterative algorithm that incrementally computes
  select word, count(∗) as count partial                           the skylines of schedules, Figure 2. The algorithm begins
  from (select STRSPLITV(l comment) as word                        by scheduling the operators from producers to consumers
                 from lineitem, orders, clerk                      as defined by the DAG. Each operator with no inputs is a
                 where l orderkey = o orderkey
                 and o clerk = name)
                                                                   candidate for assignment. An operator is a candidate as
  group by word;                                                   soon as all of its inputs are available. The scheduler con-
// Query 3                                                         siders assigning every operator at an existing worker or at
create distributed table result as tree                            a new worker by adding a new VM. The result is a skyline
  select word, sum(count partial) as count                         of schedules (Figure 2). The final execution plan can be se-
  from wordcount                                                   lected either manually or automatically based on SLAs. [17]
  group by word
  order by count desc;
                                                                   The scheduler uses the following heuristics regarding data
                                                                   transferring. It transfers only intermediate results and, if
   The first query is executed to download and parse the           possible, does not move original tables. Intermediate results
XML file. The extern directive declares that the query uses        are usually smaller than the original tables because queries
an external source and only one instance of the query should       with a single input usually contain filters and queries with
be created. The result is a table called clerk that is repli-      multiple inputs usually join the tables using equi-joins. This
cated to 4 partitions. The second query combines tables            type of join reduces the size of the output table. Some type
lineitem, orders, and clerk using the direct input combina-        of queries are executed very efficiently this way, especially
tion. Notice that the result of the join is correct since tables   when the small tables fit in memory. This is the usual case
lineitem and orders are partitioned on the join column and         for OLAP workloads with star or snowflake schema. An-
Algorithm 1 Dynamic Skyline
Input: G: A dataflow graph.
Output: skyline: The skyline schedules.
                                                                                                                         S
                                                                                                                         yst
                                                                                                                           em X

 1: ready ←{operators in G that have no dependencies}
 2: op1 ← maxRunningT ime(ready)
 3: vm1 ← allocateN ewV M ()
 4: schedule1 ← {assign(op1 , vm1 , −, −)}
 5: skyline ← {schedule1 }
 6: while ready 6=      do
 7:   next ← maxRunningT ime(ready); S ←                       Figure 4: TPC-H with 64GB data and 32 VMs on
 8:   for s ∈ skyline do                                       System X, Hive and Exareme
 9:     if next is pinned then
10:        S ← S ∪ {s + assign(next, next.pin loc, −, −)}
11:      else
12:        for all containers c of s do
13:           S ← S ∪ {s + assign(next, c, −, −)}
14:        end for
15:        // Consider allocating a new VM
16:        new vm ← allocateN ewV M ()
17:        S ← S ∪ {s + assign(next, new vm, −, −)}
18:        releaseNotNeeded(s)
19:      end if
20:   end for                                                  Figure 5: configuration with eco-elasticity vs. static
21:   // Only skyline schedules (i.e., prune search space)     layouts.
22:   skyline ← skyline of S
23:   ready ← ready − {next} ∪ {operators in G that de-
      pendency constraints no longer exist}                    elasticity. As a baseline we use three static infrastructures
24: end while
                                                               that do not change over time small with 15 VMs, medium
25: return skyline
                                                               with 30 VMs, large with 60 VMs. We run the system for
                                                               one hour using a client that issue Q1 in three phases, each
                                                               of 1 hour duration. In the first and third phase, the Pois-
other benefit with this approach is the exploitation of in-    son parameter λ is set to 60 and in the second phase to 30
dexes if they exists on the original tables. In addition, we   (the rate is doubled). The elastic layout allocator produces
add gravity operators pinned to the location of the tables,    a better-fitted layout that adapts to the workload changes
so the movement of the original tables out of their initial    and yields the highest profit compared to all static choices.
location becomes an optimization choice.
                                                               6.    ACKNOWLEDGEMENTS
5.     EXPERIMENTAL EVALUATION                                    The authors would like to thank Herald Kllapi and Mano-
   Environment: We used up to 64 VMs, each with 1 CPU,         lis Tsangaris.
4 GB of memory, and 20 GB of disk, provided by Okeanos2 .
The average network speed measured was 150 Mbps.               7.    REFERENCES
Datasets: We generated a total of 256 GB of the following       [1] Exareme. http://www.exareme.org/.
                                                                [2] Madis. https://github.com/madgik/madis.
tables, using the TPC-H benchmark [3]. (in the parenthesis
                                                                [3] TPC-H Benchmark, http://www.tpc.org/tpch/.
we note the number of partition and the partitioning key)       [4] Apache. Hadoop, http://hadoop.apache.org/.
region(1), partsupp(1, ps partkey), orders(128, o orderkey),    [5] Apache. Tez, http://tez.apache.org/.
lineitem(128, l orderkey), customer(1, c custkey), part(1,      [6] C. J. Date and H. Darwen. A Guide to the SQL Standard (4th
p partkey), nation(1), and supplier(1).                             Ed.). Addison-Wesley Longman, 1997.
                                                                [7] J. Dean and S. Ghemawat. ”MapReduce: Simplified Data
Measurements: We run each query 4 times and report the              Processing on Large Clusters”. In OSDI, 2004.
average of the last 3 measurements. We compared Exareme         [8] E. Deelman et al. The cost of doing science on the cloud: the
with Hive [16] (with both MR [4] and Tez [5] as backend,            montage example. In IEEE/ACM SC, 2008.
formerly known as Stinger) and System X (an industry lead-      [9] K. et al. Schedule optimization for data processing flows on the
                                                                    cloud. SIGMOD ’11, page 289, 2011.
ing commercial system). Figure 4 shows the results, to save
                                                               [10] M. Garofalakis and Y. Ioannidis. Parallel query scheduling and
space we have omitted some results, but only if Exareme is          optimization with time- and space-shared resources. VLDB ’97.
faster. Hive-stinger was always faster than Hive. The ver-     [11] D. R. Karger et al. Consistent hashing and random trees:
sions of the systems we used are Hive 0.13.1, Hadoop 2.5.1,         Distributed caching protocols for relieving hot spots on the
                                                                    world wide web. In STOC, pages 654–663, 1997.
Tez 0.5.0 (intermediate results are compressed (Snappy)).
                                                               [12] H. Kllapi, B. Harb, and C. Yu. Near neighbor join. In ICDE.
System X is faster for queries 1 and 6 that involve aggrega-   [13] M. J. Litzkow et al. ”Condor - A Hunter of Idle Workstations”.
tions on the largest table (lineitem). We were not able to          In ICDCS, pages 104–111, 1988.
execute queries 8 and 9 on System X because of memory lim-     [14] S. Melnik et al. Dremel: Interactive analysis of web-scale
its (System X is an in-memory system). Overall, we observe          datasets. PVLDB, 3(1):330–339, 2010.
                                                               [15] A. Simitsis. Modeling and managing etl processes. In VLDB
that Exareme is faster in most cases than the state-of-the          PhD Workshop, 2003.
art systems.                                                   [16] A. Thusoo et al. ”Hive - a petabyte scale data warehouse using
Figure 5 show the profit that is gained when exploiting eco-        Hadoop”. In ICDE, 2010.
                                                               [17] H. R. Varian. ”Intermediate Microeconomics : A Modern
2
    okeanos.grnet.gr                                                Approach”.