=Paper= {{Paper |id=Vol-1594/paper11 |storemode=property |title=Rewriting and Code Generation for Dataflow Programs |pdfUrl=https://ceur-ws.org/Vol-1594/paper11.pdf |volume=Vol-1594 |authors=Philipp Götze,Wieland Hoffmann,Kai-Uwe Sattler |dblpUrl=https://dblp.org/rec/conf/gvd/GotzeHS16 }} ==Rewriting and Code Generation for Dataflow Programs== https://ceur-ws.org/Vol-1594/paper11.pdf
     Rewriting and Code Generation for Dataflow Programs

                                                                                 ∗
                   Philipp Götze                       Wieland Hoffmann                       Kai-Uwe Sattler
               TU Ilmenau, Germany                  IBM Deutschland Research &              TU Ilmenau, Germany
               philipp.goetze@tu-                       Development GmbH                    kus@tu-ilmenau.de
                    ilmenau.de                      whoffman@de.ibm.com



ABSTRACT                                                               provided operators. However, this type of data retrieval is
Nowadays, several data processing engines to analyze and               only limited extensible. By introducing MapReduce for Big
query big data exist. In most of the cases, if users want              Data challenges, a lot of new possibilities for data querying
to perform queries using these engines, the complete pro-              and processing were made available. For this, as a drawback,
gram has to be implemented in a supported programming                  one has to deal with a greatly increased level of complexity,
language by the users themselves. This requires them to                manual optimization, and, in general, a low level of abstrac-
understanding both the programming language as well as                 tion. Hence, the current research tries to combine these
the API of the platform and also learning how to control               worlds to provide platforms with programming language in-
or even enable parallelism and concurrency. Especially with            tegrated APIs to raise this level again. For example, Apache
this tight integration into programming languages, the inter-          Spark and Apache Flink use Scala (beside Java and Python)
nal rewriting of queries to optimize the flow and order of the         as a kind of dataflow language. However, to get along with
data and operators is another big challenge since the query            these domain-specific languages (DSLs) the data analyst has
optimization techniques are difficult to apply. In this paper,         to know and to understand the corresponding language and
we want to address these problems by utilizing the dataflow            the API for the chosen platform. Furthermore, the user has
model and a code generator and compiler for various plat-              to write a lot of code beside the actual query, build and de-
forms based on it, namely Piglet. The focus of this paper              ploy the code for the backend, and for a great part manually
lies on stream processing platforms and, therefore, the asso-          optimize the program. Thus, as already mentioned in [14],
ciated challenges, especially for two exemplary engines, are           the idea is to provide a high-level declarative interface for
described. Moving on from there, we introduce our inter-               streaming queries and at the same time a scalable cluster-
nal procedure for rewriting dataflow graphs and finish with            based stream processing platform. From this idea, Piglet1 ,
the presentation of our own DSL approach to even support               a language extension to Pig Latin and dataflow compiler
user-defined rewriting rules.                                          for multiple batch and streaming backends, recently origi-
                                                                       nated to meet these requirements. The provision of such
                                                                       a multi-platform compiler brings some challenges. First of
Keywords                                                               all, there are the different requirements and conditions for
Query Optimization, Stream Processing, Dataflow, Dynamic               batch and stream processing. For instance, whereas in batch
Data, Data Analysis                                                    processing one deals with a finite amount of stored data
                                                                       tuples, working with streams means to get along with an
1.   INTRODUCTION                                                      open-ended continuous data stream. This is accompanied
                                                                       by fault-tolerance and elasticity tasks. In addition, there
   During the last years, the abstraction level of query lan-          are not only differences between batch and streaming, but
guages was subject to some fluctuations. It all started with           also platforms with the same processing type show different
a very high level of abstraction by the provision of declara-          behaviors. This does not mean that some of the backends do
tive query languages such as SQL or XQuery. These come                 something wrong, but rather arises from the different pro-
with a lot of advantages like a standardized language, ease            cessing approaches (i.e., Spark Streaming via micro-batches,
of use even for non-technical users, and automatic optimiza-           Flink Streaming and Storm via tuple-by-tuple). Another big
tion possible through the well-known relations between the             challenge is also the automatic optimization of dataflows
∗The work presented here was written as a Master’s thesis              consisting of user-defined operators since they can normally
at the TU Ilmenau [10].                                                only be treated as black boxes by the compiler.
                                                                          With this paper, we try to wipe out the drawbacks of both
                                                                       the high-level declarative approach (e.g., extensibility) and
                                                                       scalable cluster-based data processing platforms (e.g., high
                                                                       coding effort) by combining these two worlds and describe
                                                                       the challenges associated with it. Here, the focus is mainly
                                                                       on streaming platforms. The paper consists of two main
                                                                       contributions:
28th GI-Workshop on Foundations of Databases (Grundlagen von Daten-
banken), 24.05.2016 - 27.05.2016, Nörten-Hardenberg, Germany.          1
Copyright is held by the author/owner(s).                                  https://github.com/ksattler/piglet




                                                                  56
         Rewriting and Code Generation for Dataflow Programs



                                                           Piglet            Platform libraries
                                                                                                    Execution Environment

                                                       Rule Based
                                       Parser                                Spark       Flink
                                                         Rewriter
                   REPL                         Dataflow Plan (DAG)                         YARN/
                                                                                            Mesos
                                          Code Generator

                               Template   Template    Template    Template
                                                                              Program-File
                                                                 Pipe
                               Flink      Spark       Storm         Fabric
                                                                                                       Worker Nodes
                                                                 Plugins     Storm     PipeFabric



Figure 1: Piglet’s internal architecture: The code generator uses the backend plugins to create platform
specific programs.

    • We provide a high-level abstraction for querying data                  for source and sink operators, for example, schema extract-
      in the form of a unified model of dataflows and realize                ing loading methods, objects for deployment or aggregate
      the mapping to various platform-specific concepts.                     helper functions. Since backends are treated as plugins, one
    • With the developed framework for rewriting dataflow                    can extend the compiler with other platforms by creating a
      graphs, we offer an easy way of implementing and in-                   plugin, which should implement specific interfaces as well as
      tegrating user-defined operators together with their                   a template file as input for the code generator to create the
      rewriting rules into the dataflow.                                     corresponding target code.
  Although the implementation was done within our own                          Due to the kind of data processing for streams, a lot of
dataflow compiler Piglet, we think the ideas are not limited                 characteristics and challenges arise, which were not tackled
to this system.                                                              in the batch implementation. The main challenges among
                                                                             them, which we pick up in the following two sections, are:
                                                                                 • windows,
2.   THE DATAFLOW MODEL                                                          • blocking operators, and
   The dataflow model is nothing really new as it dates back                     • different behavior of various streaming backends.
to the 1960’s and 1970’s [11, 16] having the motivation to
automatically exploit parallelism. Thereby programs are
represented internally as directed acyclic graphs (DAGs)                     3.      RELATED WORK
with its nodes being mutually independent operation blocks,                     A similar goal of providing a unified model for defining
which are connected by input and output pipes to represent                   and processing dataflows for different platforms is intended
the dataflow. This creates a segmentation of the operations                  by the Google Dataflow model [1] and the corresponding
having no data dependency between each other and thus can                    Apache incubator project Beam2 . In contrast to Piglet,
be executed in parallel. The recognition of data dependen-                   Apache Beam comes with a Java SDK unifying and sim-
cies can be accomplished automatically by the compiler and                   plifying the creation of dataflow programs. As a drawback,
normally needs no action taken by the user.                                  this also means that the data analyst has to be familiar with
   These and further advantageous characteristics of the data-               Java and has to pick up the new API. Piglet, on the other
flow model are meant to be leveraged in Piglet, which uses                   hand, provides a high-level dataflow language allowing the
such a model as internal representation. The goal of the                     user a much better abstraction and enabling many more op-
project is to provide a language extension to Pig Latin and                  timization opportunities.
a compiler to generate code for several modern data pro-                        The challenges in terms of data stream processing men-
cessing platforms. In the course of this, the code is also                   tioned above are also partly discussed in [12], where Krämer
built and deployed to the specified execution environment                    and Seeger define a logical algebra with precise query se-
without any necessary intervention of the user.                              mantics for data streams based on the relational algebra. In
   In figure 1 the internal architecture of Piglet is presented.             contrast to batch processing, many standard operators such
As a first step, the Pig Latin input is parsed into a dataflow               as join, aggregate or duplicate elimination (i.e., blocking op-
plan which contains a list of objects whose types implement                  erators) can only refer to a subset of the stream defined by
the PigOperator trait. One type exists for each operator of                  windows. Instead of integrating windows directly into these
Pig and the additional ones provided by Piglet. The input                    standard operators, they decided to separate these function-
can be provided either as pre-written files or by interactively              alities as it was also done in Piglet. Thus, the redundant def-
entering statements via a Read-Eval-Print-Loop (REPL). In                    inition of window constructs within the operators is avoided
the next step, a rule based rewriter automatically optimizes                 and it also allows the user to apply multiple operations to
(see section 5) and possibly adapts the dataflow plan de-                    one window specification. On top of that, the well-known
pending inter alia on the target platform (see section 4).                   semantics of the relational algebra is preserved as much as
Subsequently, the rewritten plan is used together with the                   possible.
backend plugins to generate the target program. It is then                      As implied by the third challenge above, there exist many
zipped with the necessary platform library into a jar file and               data streaming systems, which propose several processing
lastly deployed to the execution environment, which then                     models especially for handling windows. Since there are no
takes care of the possibly parallel execution.                               standards for querying streaming data, they all come up
   The purpose of the platform library is to hide some im-
                                                                             2
plementation details from the generated code, mainly used                        http://incubator.apache.org/projects/beam.html




                                                                      57
         Rewriting and Code Generation for Dataflow Programs



with their own syntax and semantics. Because of that, the          ical planning, and the final code generation. The analy-
engines process queries in different ways although the user        sis phase resolves, among other things, the relation and at-
would expect the same behavior. On the other hand, sev-            tribute names within the given query in the first place. On
eral systems sometimes also express common capabilities in         the resulting logical plan rule-based optimizations, such as
different ways. First introduced in [4] and later described        reordering, are applied. The subsequent physical planning
in greater detail in [5], the SECRET model was proposed            then maps physical operators from the Spark engine to the
to provide a descriptive model for analyzing and comparing         logical plan and selects the optimal plan based on a cost
the execution semantics of stream processing engines. The          model (e.g., choosing the best fitting join implementation).
four dimensions of the SECRET model are:                           As a final step, code is generated to run on each machine
    • ScopE - gives information about the window intervals.        based on the selected physical plan.
    • Content - maps the intervals to window contents.
    • REport - reveals the conditions for window contents
      to become visible.                                           4.   FLINK AND SPARK STREAMING
    • Tick - states when the engine will trigger an action on         As described in detail in [6], Flink3 with its streaming
      an input stream.                                             API is one of the stream processing platforms integrated
   With these dimensions, it is possible to compare and ex-        into Piglet. Furthermore, also Spark Streaming4 , Apache
plain differences in behavior of the various systems. In their     Storm5 as well as PipeFabric [13] were added to the sup-
experiments, they reveal the variances in window construc-         ported target streaming platforms of Piglet. The biggest
tion, window reporting and in triggering. It was shown that        challenge here is the presence of different semantics and be-
even for the same engine sometimes it happens that the re-         haviors of the backends as already described in general in
sults are different. The reason for that is the different map-     section 2 and 3. In the following, some of the differences of
ping of the content to scopes, for example due to the assign-      Flink and Spark Streaming as well as general challenges with
ment of time values to tuples based on the system time. It         stream processing are sketched, which became conspicuous
was also shown that different approaches for reporting might       during the integration into the code generator.
also lead to different results for the same query. Thus, it can       Since basic Pig Latin is made for batch processing, it was
be seen that it is important to understand the window se-          necessary to enhance Piglet’s input language by streaming
mantics behind a stream processing engine before writing           operators such as Socket_Read, Socket_Write and Window
queries for it. More explicitly for Piglet, this means, for in-    as well as underlying loading functions like PigStream in
stance, that the same query can produce different results for      the first place. Beside the language extensions, the compiler
the various backends.                                              has also been equipped with corresponding mappings from
   Beside the dataflow model and streaming challenges, there       the dataflow operators to the streaming APIs. It was found
is also the problem of optimization of dataflow programs,          that specific operators supported in batch processing are not
which was already deeply discussed and implemented in the          logical or sometimes not even possible in its original form for
past. A famous system, for example, is EXODUS [8], which           streams. That is why these operators were either completely
can be used to integrate generated application-specific op-        omitted (e.g., Limit) or only supported inside of windows
timizers into databases. For the generation of the target          (e.g., Order By and Distinct).
C-code, EXODUS needs the set of operators and methods,                To make windows and joins work for Flink Streaming the
rules for transforming the query trees as well as cost func-       dataflow plan must be rewritten, that is, defining the win-
tions for each operator as input. In the running system, the       dow scope as well as rewriting Join and Cross operators.
incoming queries are transformed into trees and optimized          The former means to decide which operators need to be
by repeatedly applying the passed rules. Thereby, it main-         applied to the window and from when the stream is flat-
tains information about all resulting alternative trees and        tened again. For that Foreach, Join, Cross and sink nodes
the change in cost for each transformation. The successor          can represent the terminator of a window scope. At this
framework of EXODUS is Cascade [7], which was introduced           point, a corresponding WindowApply node is inserted into
to improve and enrich its predecessor by, for example, ex-         the dataflow, which is taken up in the code generation step.
tensibility, dynamic programming, and more flexibility. Es-        The later window rewriting process searches for Join and
pecially the improved handling of predicates is highlighted,       Cross operators to assign a window definition to them de-
for example, by detaching them from a logical join operator        rived from the input nodes. This is necessary because the
to transform it into a physical nested loop operation and          Flink Streaming API only allows to join two data streams
pushing it into the selection operator for the inner input         on a common time-based window, but not two windowed
tree.                                                              streams or a dataset with a windowed stream like in Spark
   There also exist optimization approaches for MapReduce          Streaming. Thus, three requirements must be fulfilled be-
programs. For instance, Spark SQL’s Catalyst [2, 3] that is        forehand:
an optimizer integrated into Spark and used for Spark SQL              • the direct input nodes must be windows,
and the corresponding data abstraction called DataFrames.              • the windows must be time-based, and
Similar to the rewriter in Piglet, it uses many features of            • all input nodes require having the same window and
the programming language Scala for a seamless integration                 sliding size.
into the system’s code. Advantages through that are, for           If these requirements are met the dataflow plan is rewritten
instance, the ease of adding and connecting new rules, tech-       in a way as shown in figure 2 with the associated Piglet
niques, and features as well as the possibility for developers     query:
to define extensions. Roughly taken, Catalyst works with           3
                                                                     https://flink.apache.org/
trees and applies manipulating rules to them. They use it          4
                                                                     https://spark.apache.org/
in four phases, namely analysis, logical optimization, phys-       5
                                                                     https://storm.apache.org/




                                                              58
               Rewriting and Code Generation for Dataflow Programs



 SOCKET_READ                      SOCKET_READ            SOCKET_READ                   SOCKET_READ
  ‘localhost:9997’                    ‘localhost:9998’   ‘localhost:9997’                ‘localhost:9998’             Flink Streaming                               Spark Streaming
    PigStream(‘,’)                      PigStream(‘,’)     PigStream(‘,’)                  PigStream(‘,’)

      x1, y1                              x2, y2
                                                                                                                            LOAD                                            LOAD
                                                                                                                            ‘file.csv’                                       ‘file.csv’
                                                                                                                         PigStream(‘,’)                                  PigStream(‘,’)
    WINDOW                              WINDOW
  Tumbling Time                       Tumbling Time          x1, y1                          x2, y2                           x, y                                         RDD(x, y)
   60 seconds                          60 seconds

     Window-                             Window-                                                                          WINDOW                                          WINDOW
  Iterator(x1, y1)                    Iterator(x2, y2)                  Tumbling Time                                    Tumbling Time                                  Tumbling Time
                                                                         60 seconds                                       20 seconds                                     20 seconds
                        JOIN                                                 JOIN
                       y1 == x2                                             y1 == x2                                                                                       RDD(x, y)

                     x1, y1, x2, y2                                     x1, y1, x2, y2                                      Window-
                                                                                                                          Iterator(x, y)
                                                                                                                                                                          DISTINCT
               SOCKET_WRITE                                           SOCKET_WRITE
                 ‘localhost:9999’                                      ‘localhost:9999’
                                                                                                                                                                           RDD(x, y)

                                                                                                                                           foreach Window:
                                                                                                                                            Distinct
                                                                                                                            APPLY           Group By x
                                                                                                                                                                         GROUP BY
Figure 2: Rewriting of a Join operator in Piglet for                                                                                        Generate (x, count())              x

Flink Streaming                                                                                                                                                       RDD(x, Iterator(x,y))


in1 = SOCKET_READ ’ localhost :9997 ’ USING PigStream                                                                       x, count
                                                                                                                                                                         FOREACH
     ( ’ , ’) AS ( x1 : int , y1 : int ) ;                                                                                                                              group, COUNT
in2 = SOCKET_READ ’ localhost :9998 ’ USING PigStream
     ( ’ , ’) AS ( x2 : int , y2 : chararray ) ;                                                                                                                        RDD(x, count)
w1 = WINDOW in1 RANGE 60 SECONDS ;
w2 = WINDOW in2 RANGE 60 SECONDS ;
joined = JOIN w1 BY y1 , w2 BY x2 ;                                                                                         STORE                                          STORE
                                                                                                                          ‘amount.csv’                                   ‘amount.csv’
SOCKET_WRITE joined TO ’ localhost :9999 ’;

   Internally the inputs of the window nodes are collected
and used as new input for the Join operator. That is, the
window definition is extracted and inserted into the unifying                                                    Figure 3: Structure of the generated code for Flink
operator, the pipes are redirected, and finally unnecessary                                                      and Spark Streaming
operators are removed.
                                                                                                                 stream eventually ends, it is not guaranteed that all the tu-
   For Spark Streaming, the window rewriting step is not
                                                                                                                 ples fit in memory. Nevertheless, to support such operators
necessary since windowed streams can be joined directly.
                                                                                                                 there are two alternative approaches. On the one hand, one
The only addition is a transformation of the tuples to key-
                                                                                                                 could apply them only to a subset like it was done in the
value pairs before joining the streaming subsets. This task
                                                                                                                 examples with the help of windows. On the other hand,
is done directly in the code generation phase and thus is not
                                                                                                                 the results could be updated for every incoming tuple by
visible in the dataflow plan.
                                                                                                                 using stateful operators. In the current state, we also sup-
   Beside the Join- and Cross-specific rewritings, the general
                                                                                                                 port the second variant for aggregates, which are also called
window scope definition is partly implemented within the
                                                                                                                 rolling aggregates or accumulations. Since Spark and Flink
rewriter, too. Currently, the aforementioned WindowApply
                                                                                                                 Streaming support stateful operators, we directly use them
node is simply inserted into the dataflow as an extra path
                                                                                                                 for this task. To achieve this behavior one just has to omit
from the window start to the end of the scope. The code
                                                                                                                 the window statement.
generator then takes care of integrating the operations into
the apply method. An example dataflow written in our Pig
language extension could be the following:                                                                       5.    QUERY PLAN PROCESSING
in = LOAD ’ file . csv ’ USING PigStream ( ’ , ’) AS ( x : Int ,                                                    As shown in Figure 1 and described in detail in [10], Piglet
      y : Int ) ;                                                                                                performs a rewriting step that transforms the output of the
win = WINDOW in RANGE 20 SECONDS ;
dis = DISTINCT win ;
                                                                                                                 parser by repeatedly applying rules to the dataflow plan be-
grp = GROUP dis BY win . x ;                                                                                     fore passing it on to the code generator. Compared to other
cnt = FOREACH grp GENERATE group , COUNT ( grp ) ;                                                               frameworks like Cascades [7] and EXODUS [8], the rewrit-
STORE cnt INTO ’ amount . csv ’;
                                                                                                                 ing process in Piglet is fully integrated into Piglet itself and,
   Here, the read tuples are collected in a tumbling window                                                      therefore, can use features of the host language. However,
of 20 seconds. The scope of this window is terminated by the                                                     it would not be easy to integrate it into another system.
Foreach statement as explained above. Thus, the duplicate                                                           The rewriting step serves a few different purposes. The
elimination, grouping and aggregation operation are put into                                                     first is optimization by modifying the dataflow plan to ex-
the apply method. For Spark Streaming, the dataflow is un-                                                       ploit relationships between different operators of the same
changed since the window operation just adjusts the micro-                                                       or different types. This allows, for example, to rewrite the
batch contents. The structure of the resulting codes for                                                         order of Filter and Order By operators such that the Fil-
Spark and Flink Streaming is visualized in figure 3.                                                             ter operator is always executed first, potentially shrinking
   Another challenge, as mentioned in section 2, are block-                                                      (but never enlarging) the data set flowing to the Order By
ing operators, which in batch processing are applied to the                                                      operator.
complete input. In the previous examples one could see such                                                         The second goal is the support of Pig operators that can-
operations, namely Join and Count (in general: aggregates).                                                      not be mapped 1:1 to Spark or Flink operations, but can
Since a data stream can be potentially endless, one cannot                                                       be rewritten to existing functions (for example Pig’s Split
just collect the data until the last tuple arrives. Even if the                                                  Into operator is rewritten to multiple Filter operators).




                                                                                                            59
         Rewriting and Code Generation for Dataflow Programs



The same principles can be applied to support new opera-                                                                                  applyRule
tors beyond those offered by Pig whose syntax better fit a            toMerge                 when                      and             applyPattern
certain problem domain. We chose to implement most of                toReplace           whenMatches               andMatches
the rewriting rules of [9] as examples for this goal.                                        unless                     or
  Piglet also allows the user to embed user-defined functions                           unlessMatches              orMatches
for use with Pig’s Foreach operator directly into the input
script like in the following example:                               Figure 4: Syntax diagram for the rewriting DSL
<% def myFunc ( i : Int ) : Int = i + 42 % >
out = FOREACH in GENERATE myFunc ( f1 ) ;                         ClassTags, for example, are used to great effect for making
  Despite those usually being small functions, they still might   it easier to implement rules that are only applied to spe-
interact with other types of operators, so being able to sup-     cific types of operators by wrapping the rule at runtime and
ply rewriting rules in addition to the user-defined functions     registering the wrapper, which is now able to only call the
themselves was another goal.                                      wrapped function if the input object is of a type that it ac-
  At a very high level, the rewriting process consists of         cepts, thereby working around the type erasure that happens
two parts. The first of those is activating available rules       at compile time for languages targeting the JVM.
at startup, depending on the settings with which Piglet is           Scala also makes it easy to develop embedded DSLs by al-
run, and applying the rules to a dataflow plan after the          lowing to replace the dot before method calls and the paren-
parser has been executed. Several objects called Rulesets,        theses around the argument with spaces if the method is of
one for each backend and language feature, exist, each pro-       arity one (it takes only one argument). This is called in-
viding a method called registerRules which registers zero         fix notation and is used by, for example, the ScalaTest
or more functions with an object called Rewriter. Rules are       framework to produce code similar to english sentences.
represented as functions of the type                                 Instead of forcing each rule to implement the steps out-
                                                                  lined above anew, we decided to use the infix notation to
Function1 [ Any , Option [ Any ]]
                                                                  provide a DSL of our own, which takes care of creating
   mapping each input object to either Some(newobject),           the wrapper function mentioned earlier automatically (given
indicating that the object has been changed and is to be          appropriate type signatures) and moves the checks for ad-
replaced by newobject, or None, indicating that the func-         ditional conditions out of the rule itself into zero or more
tion did not change the object. As soon as the options have       anonymous functions.
been read, the registration method of each applicable rule-          Figure 4 shows the syntax for this DSL. The applyRule
set is executed. After the input script has been read and         and applyPattern methods can be used to register a single
transformed into Scala objects by the Parser, the method          function as a rewriting rule. The methods when, unless,
processPlan of the rewriter object will be called, passing        and, or and their counterparts ending in Matches can be
the dataflow plan as the argument. This method will then          used to supply additional functions for checking precondi-
repeatedly apply all registered rules to all operators in the     tions on the input object. The only difference between both
plan in a top-down fashion, starting with the source nodes,       types is that functions whose name ends in Matches accept
until no rule leads to a change anymore. For traversing the       partial functions as their argument. Lastly, toMerge and
dataflow plan we use the library Kiama [15]. Not only does        toReplace will activate special behaviour for the two cases
it provide a data type Strategy that abstracts rewriting          of merging operators (passing not one, but two operators
operations (this type extends the function type mentioned         to the other functions) and replacing one operator with ex-
previously), it also includes methods for combining several       actly one other. Internally the builder pattern is used to
separate operations of that type (and, ior, . . . ), extending    keep track of all the preconditions and to wrap the func-
them with repetition (repeat, repeat, . . . ), and traversal      tion given to applyRule in them before registering it. The
(bottomup, topdown, . . . ) behaviour, or any combination of      following is an example of a rule utilizing the DSL:
those. This means that, unless otherwise necessary, a rule        def g r o u p e d S c h e m a J o i n E a r l y A b o r t ( op : BGPFilter )
only needs to handle a single operator as its input, being ap-            Boolean
plied to all possible operators is taken care of by the Piglet    Rewriter unless g r o u p e d S c h e m a J o i n E a r l y A b o r t
system. The traversal operations work with every type that          and { op = > RDF . isPathJoin ( op . patterns ) }
implements the Rewritable trait supplied by Kiama, which            applyRule J4
the operator objects in Piglet do.                                   In the type signature of groupedSchemaJoinEarlyAbort
                                                                  the type of the operator is restricted, so the anonymous
6.   USER-DEFINED REWRITING                                       function passed to and can already use that information to
                                                                  access the patterns attribute that only BGPFilter objects
   While implementing a set of rules it became apparent that
                                                                  have. The same operations can be used for implementing
our first approach writing them (as ordinary functions) is
                                                                  rewriting rules for user-defined functions embedded in the
quite cumbersome. It involved first checking the input type
                                                                  script. Piglet’s parser has been extended to treat every-
of the object (because the type of the argument is Any, but
                                                                  thing in a section of embedded code that follows the key-
most rules only apply to a specific type of operator), check-
                                                                  word rules: as Scala code containing the implementation
ing additional conditions on the operator object (for example
                                                                  of rewriting rules as in the following example:
the function called by a Foreach operator), usually through
                                                                  <% def myFunc ( i : Int ) : Int = i + 42
a series of calls of the form if (...) {return None} and              rules :
only then changing the object.                                        // Rules can be written here
   We, therefore, make extensive use of features offered by        %>
the Scala programming language to ease the implementa-               A few imports are automatically added to the code before
tion and registration of rules. Its local type inference and      it is executed inside the same JVM Piglet is running in. To




                                                             60
         Rewriting and Code Generation for Dataflow Programs



further ease the implementation of rules for embedded code,             Proceedings of the VLDB Endowment,
which only apply to Foreach operators that usually only                 8(12):1792–1803, 2015.
call a single function, we provide an additional extractor          [2] M. Armbrust, Y. Huai, C. Liang, R. Xin, and
object that returns the operator object and the name of the             M. Zaharia. Deep Dive into Spark SQL’s Catalyst
function that is getting called if the operator matches that            Optimizer.
pattern. Other extractor objects for providing quick access             https://databricks.com/blog/2015/04/13/
to predecessors and successors of operators are provided as             deep-dive-into-spark-sqls-catalyst-optimizer.
well. All of them return a tuple whose first element is the             html, 2015.
object that is matched, so the patterns can be nested.              [3] M. Armbrust, R. S. Xin, C. Lian, Y. Huai, D. Liu,
                                                                        J. K. Bradley, X. Meng, T. Kaftan, M. J. Franklin,
7.   CONCLUSION AND FUTURE WORK                                         A. Ghodsi, and M. Zaharia. Spark SQL: Relational
                                                                        Data Processing in Spark. In Proceedings of the 2015
   We have presented our approaches of code generation and
                                                                        ACM SIGMOD International Conference on
rewriting for dataflow programs integrated into Piglet with
                                                                        Management of Data, SIGMOD ’15, pages 1383–1394.
the major concentration on data streaming. As mentioned
                                                                        ACM, 2015.
above, we think the concepts can be applied to other sys-
tems, too.                                                          [4] I. Botan, R. Derakhshan, N. Dindar, L. Haas, R. J.
   As a future perspective, we also want to further extend              Miller, and N. Tatbul. SECRET: A Model for
the presented ideas. Relating to streaming features, there is           Analysis of the Execution Semantics of Stream
still a lot to do for supporting a comprehensive range of ap-           Processing Systems. Proceedings of the VLDB
plication scenarios. First of all, there exist many widespread          Endowment, 3(1-2):232–243, 2010.
connectors for source and sink nodes, such as Apache Kafka,         [5] N. Dindar, N. Tatbul, R. J. Miller, L. M. Haas, and
Twitter’s Streaming API or Flume, which should be made                  I. Botan. Modeling the execution semantics of stream
available in Piglet. Another point in our agenda are op-                processing engines with SECRET. The VLDB
erator alternatives, for example, Join, since at the current            Journal, 22(4):421–446, 2013.
state only equijoins are supported. It was also considered          [6] P. Götze. Code Generation for Dataflow Programs
to support states for the Sample (e.g., reservoir sampling)             using the example of Flink Streaming. Master Thesis,
or Order By (e.g., keeping all seen tuples and reporting the            Technische Universität Ilmenau, October 2015.
new order) operator. Obviously, one must always keep in             [7] G. Graefe. The Cascades Framework for Query
mind the required memory for such an approach. On top of                Optimization. IEEE Data Eng. Bull., 18(3):19–29,
that, because the window operator is one of the main con-               1995.
cepts in data streaming, we think about adding more vari-           [8] G. Graefe and D. J. DeWitt. The EXODUS Optimizer
ants like delta-based windows. As mentioned in section 4 a              Generator, volume 16. ACM, 1987.
WindowApply node is simply inserted to define the window            [9] S. Hagedorn, K. Hose, and K.-U. Sattler. SPARQling
scope. Another approach would be to put all operations                  Pig - Processing Linked Data with Pig Latin. In
within the window scope directly into this node in the form             BTW, March 2015.
of a sub-plan already during the rewriting step. Thereby,          [10] W. Hoffmann. Regelbasierte Transformation von
the code being produced is more transparent in the internal             Datenflussgraphen. Master Thesis, Technische
representation and this is also a much cleaner solution.                Universität Ilmenau, January 2016.
   Furthermore, the rewriting system can be extended to            [11] W. M. Johnston, J. R. P. Hanna, and R. J. Millar.
make it easier to assign specific properties to operator types          Advances in dataflow programming languages. In
that can automatically be used by general rewriting rules               ACM Computing Surveys, volume 36, pages 1–34,
that are not tailored to one type of operator. Another pos-             2004.
sible extension is the support for rewriting by utilizing alge-    [12] J. Krämer and B. Seeger. Semantics and
braic properties of operators such as associativity and dis-            Implementation of Continuous Sliding Window
tributivity. Using these properties, not only one, but a set            Queries over Data Streams. ACM Trans. Database
of new plans can be generated from an initial plan. If a Pig            Syst., 34(1):4:1–4:49, Apr. 2009.
script is run multiple times, information gathered during
                                                                   [13] O. Saleh and K.-U. Sattler. The Pipeflow Approach:
execution could be used to choose the best of those plans.
                                                                        Write Once, Run in Different Stream-processing
   Apart from embedded functions, the parser and rewriting
                                                                        Engines. In Proceedings of the 9th ACM International
rules associated with them, adding new types of operators
                                                                        Conference on Distributed Event-Based Systems,
and rules has to happen in Piglet’s code itself. Extending
                                                                        DEBS ’15, pages 368–371. ACM, 2015.
this to support loading user-defined operator types at run-
time, possibly via the ClassLoader system, is also an option       [14] K.-U. Sattler and F. Beier. Towards Elastic Stream
for the future.                                                         Processing: Patterns and Infrastructure. First
                                                                        International Workshop on Big Dynamic Distributed
                                                                        Data (BD3), 2013.
8.   REFERENCES                                                    [15] T. Sloane. Experiences with Domain-specific Language
 [1] T. Akidau, R. Bradshaw, C. Chambers, S. Chernyak,                  Embedding in Scala. In Domain-Specific Program
     R. J. Fernández-Moctezuma, R. Lax, S. McVeety,                    Development, page 7, 2008.
     D. Mills, F. Perry, E. Schmidt, and S. Whittle. The           [16] T. B. Sousa. Dataflow Programming Concept,
     Dataflow Model: A Practical Approach to Balancing                  Languages and Applications. In Doctoral Symposium
     Correctness, Latency, and Cost in Massive-Scale,                   on Informatics Engineering, 2012.
     Unbounded, Out-of-Order Data Processing.




                                                              61