=Paper= {{Paper |id=Vol-2572/short2 |storemode=property |title=On Evaluating Performance of Balanced Optimization of ETL Processes for Streaming Data Sources |pdfUrl=https://ceur-ws.org/Vol-2572/short2.pdf |volume=Vol-2572 |authors=Michal Bodziony,Szymon Roszyk,Robert Wrembel |dblpUrl=https://dblp.org/rec/conf/dolap/BodzionyRW20 }} ==On Evaluating Performance of Balanced Optimization of ETL Processes for Streaming Data Sources== https://ceur-ws.org/Vol-2572/short2.pdf
 On Evaluating Performance of Balanced Optimization of ETL
            Processes for Streaming Data Sources
                Michał Bodziony                                             Szymon Roszyk                              Robert Wrembel
     IBM Poland, Software Lab Kraków                             Poznan University of Technology               Poznan University of Technology
             Kraków, Poland                                              Poznań, Poland                                 Poznań, Poland
       michal.bodziony@pl.ibm.com                                  szymon.roszyk@student.put.                  robert.wrembel@cs.put.poznan.pl
                                                                           poznan.pl

ABSTRACT                                                                                 apply task movement within an ETL process. One of such tech-
A core component of each data warehouse architecture is the                              niques is called balanced optimization [17]. Research approaches
Extract-Transform-Load (ETL) layer. The processes in this layer                          propose task re-ordering and parallel processing.
integrate, transform, and move large volumes of data from data                              This paper presents first findings from an R&D project, jointly
sources into a data warehouse (or data lake). For this reason,                           done by IBM Software Lab and Poznan University of Technol-
efficient execution of ETL processes is of high practical impor-                         ogy, on assessing the performance of the balanced optimization
tance. Research approaches propose task re-ordering and parallel                         on a streaming data source. In particular, we were interested in
processing. In practice, companies apply scale-up or scale-out                           assessing: (1) if the balanced optimization may increase the per-
techniques to increase performance of the ETL layer. ETL tools                           formance of ETL processes - run by IBM DataStage and ingesting
existing on the market support mainly parallel processing of                             data from a stream DS - run by Kafka, (2) which ETL tasks may
ETL tasks. Additionally, the IBM DataStage ETL engine applies                            benefit from the balanced optimization, and (3) what parameters
the so-called balanced optimization that allows to execute ETL                           of the DS may affect performance. The findings are summarized
tasks either in a data source or in a data warehouse, thus opti-                         in Section 4.
mizing overall resource consumption. With the widespread of
non-relational technologies for storing big data, it is essential to                     2     RELATED WORK
apply ETL processes to this type of data. In this paper, we present                      In this section we overview the solutions to optimizing ETL
an initial assessment of the balanced optimization applied to IBM                        processes: (1) applied in practice by companies, (2) offered by ETL
DataStage, ingesting data from Kafka streaming data source.                              engines existing on the market, and (3) developed by researchers.

                                                                                         2.1    Industrial approaches
1     INTRODUCTION                                                                       Industrial projects utilizing ETL, reduce processing time of ETL
A core component of a data warehouse (DW) [25] or a data lake                            processes by increasing computational power of an ETL server
(DL) [22] is an Extract-Transform-Load (ETL) software. It is re-                         and by applying parallel processing of data. This can be achieved
sponsible for ingesting data from data sources (DSs), integrating,                       by: (1) scaling-up an ETL server, i.e., by means of increasing the
and cleaning data as well as uploading them into a DW/DL. ETL                            number of CPU and the size of memory, (2) scaling-out an ETL
is implemented as a workflow (process) composed of tasks, man-                           architecture, i.e., by means of increasing the number of worksta-
aged by an ETL engine, frequently run on a dedicated server or                           tions in a cluster running the ETL engine. All the commercial
on a cluster. An ETL process moves large volumes of data be-                             ETL engines, cf. [8], support this kind of optimization. Parallel
tween DSs and a DW. Therefore, its execution may take hours to                           processing is also applicable to uploading data into a data ware-
complete. For example, simple movement of 1TB of data between                            house, e.g., command nzload (IBM Pure Data for Analytics, a.k.a.
a DS and DW (which use magnetic disks with a typical 200MB/s                             Netezza) or import (Oracle).
throughput) takes 2.7 hours by an ETL process. Resource con-                                On top of this, an ETL task re-ordering may be used to produce
sumption and the overall processing time are further increased                           a more efficient processes. In the simplest case, called push down,
by complex tasks executed within the process, including integrat-                        the most selective tasks are moved towards the beginning of
ing, cleaning, and de-duplicating data. For this reason, providing                       an ETL process (towards data sources) to reduce a data volume
means for optimization of ETL processes is of high practical                             (I/O) as soon as possible [26]. IBM extended this technique into
importance. Big data add to the problem more complexity that                             balanced optimization, where some tasks are moved into DSs and
results from: (1) bigger data volumes, and (2) much more complex                         some are moved into a DW, to be executed there, cf. Section 3.
and diverse data models and formats that need to be processed
by ETL. These characteristics call for yet more efficient execution                      2.2    Research approaches
of ETL processes.                                                                        Research approaches to optimizing ETL execution can be catego-
    Unfortunately, this problem has been only partially addressed                        rized as: (1) optimization by task re-ordering augmented with:
by technology and research, cf. Section 2. In practice, companies                        estimation of execution costs and parallelization and (2) optimiza-
apply scale-up or scale-out for their ETL servers, to reduce pro-                        tion of ETL processes with user defined functions (UDFs).
cessing time. ETL tools existing on the market support mainly
parallel processing of ETL tasks. Some of them, on top of that,                             2.2.1 Cost-based, task re-ordering, and parallelization.
                                                                                         In [19, 20] the authors apply the MapReduce framework to pro-
                                                                                         cess ETL workflows to feed in parallel dimensional schemas (star,
© Copyright 2020 for this paper held by its author(s). Published in the proceedings of   snowflake, and SCDs). The authors provide some processing
DOLAP 2020 (March 30, 2020, Copenhagen, Denmark, co-located with EDBT/ICDT
2020) on CEUR-WS.org. Use permitted under Creative Commons License Attribution           skeletons for MapReduce for dimensions of different sizes and
4.0 International (CC BY 4.0).                                                           structures. A system for optimizing MapReduce workloads was
presented in [11]. The optimization uses: sharing of data flows,         the semantics of the operators must be known. Such a seman-
materialization, and physical data layouts. However, neither an          tics is known and understood for traditional operators based on
optimization technique nor a cost function were explained.               the relational algebra. However, the semantics of user defined
    [18] proposes to parallelize ETL process P in three steps. First,    functions is typically unknown. For this reason, UDFs have been
P is partitioned into linear sub-processes LPi . Second, in each LPi ,   handled by solutions that require: either (1) manual annotation
input data are partitioned horizontally into n disjoint partitions       of UDFs or (2) perform an analysis of an UDF code to explore
(n is parameterized) and each partition is processed in parallel by      some options for optimization or (3) use explicitly defined parallel
a separate thread. Third, for tasks with a heavy computational           skeletons.
load, an internal multi-threading parallelization may be applied,           In [12, 13], UDFs are treated as black boxes. The framework
if needed.                                                               consists of the Nephele execution engine and the PACT compiler
    [10] assumes that an ETL process is assigned an estimated exe-       to execute UDFs, based on the PACT programming model [5]
cution cost and a more efficient variant of this process is produced     (PACT is a generalization of MapReduce). PACT can use hints to
by task re-ordering, but the authors do not provide any method           execute a given task in parallel. Such hints are later exploited by
for the re-ordering. The main goal of [10] is to identify a set of       a cost-based optimizer to generate parallel execution plans. The
statistics to collect for this kind of optimization. The constraint      optimization is based on: (1) a re-ordering of UDFs in a workflow
is that this set must be: minimal and applicable to estimate costs       and (2) an execution of UDFs in a parallel environment. The re-
of all possible task re-orderings. Moreover, the cost of collecting      ordering takes into account properties of UDFs. The properties
statistics must be minimal. The statistics include: a cardinality        are discovered by means of a static code analysis. A cost-based
of table Ti , attribute histograms of Ti , and a number of distinct      optimizer (model) is used to construct some possible re-orderings.
values of attributes of Ti . A proposed cost function includes: data     Also in [9], UDFs annotations are used to generate an optimized
statistics, CPU speed, disk-access speed, and memory usage. The          query plan, but only for relational operators.
authors proposed cost functions for the following ETL tasks (all            A framework proposed in [7], called SQL/MR, enables a paral-
expressed via SQL): select, project, join, group-by, and transform.      lelization of UDFs in a massively-parallel shared-nothing data-
To find the set of statistics, which is an NP-hard problem, the          base. To achieve a parallelism, SQL/MR requires a definition of
authors use linear programming.                                          the Row and Partition functions and corresponding execution
    The approach presented in [24] goes one step further, as it          models for the SQL/MR function instances. The Row function is
proposes a specific performance optimization technique by task           described as an equivalent to a Map function in MapReduce. Row
re-ordering. To this end, each workload gets assigned an exe-            functions perform row-level transformation and processing. The
cution cost. The main components of the cost formula are time            execution model of the Row function allows independent pro-
and data volume. The five following workflow transformation              cessing of each input row by exactly one instance of the SQL/MR
operations were proposed: swap - changing the order of two               function. The Partition function is similar to the Reduce function
tasks, factorize - combining tasks that execute the same oper-           in MapReduce. Exactly one instance of a SQL/MR function is
ation on different flows, distribute - splitting the execution of        used to independently process each group of rows defined by
a task into n parallel tasks, merge - merging two consecutive            the PARTITION BY clause in a query. Independent processing of
tasks, and its reverse operation - split [23]. For each of these         each partition allows the execution engine to achieve parallelism
operations, criteria for correct workflow transformations were           at the level of a partition. The dynamic cost-based optimization
proposed. Finally, a heuristic for pruning the search space of           is enabled for re-orderings of UDFs.
all possible workflow transformations (by task re-ordering) was             An optimizer proposed in [21] rewrites an execution plan
proposed, with the main goal to filter data as soon as possible.         based on: (1) automatically inferring the semantics of a MapRe-
In [15], the re-ordering of operators is based on their semantics,       duce style UDF and (2) a small set of rewrite rules. The semantics
e.g., a highly selective operator would be placed (re-ordered) at        can be provided by means of manual UDF annotations or an auto-
the beginning of a workflow.                                             matic discovery. The manual annotations include: a cost function,
    In the same spirit, workflow transformations were proposed in        resource consumption, a number of input rows and output rows.
[14] for the purpose of being able to reuse existing data processing     The automatically discovered annotations include: paralleliza-
workflows and integrate them into other workflows.                       tion function of a given operator, a schema information, and
    The approach proposed in [16] draws upon the contributions           read/write operations on attributes.
of [24]. In [16], possible tasks re-orderings are constrained by            The aforementioned approaches require understanding the
means of a dedicated structure called a dependency graph. This           semantics of an UDF (a black-box) by means of either parsing an
approach optimizes only linear workflows. To this end, a non-            UDF code, or applying certain coding style, or using certain key-
linear workflow is split into linear ones (called groups), by means      words, or using parallelization hints. Moreover, the approaches
of pre-defined split rules. Next, parallel groups are optimized          do not provide means of analyzing an optimal architecture con-
independently by task re-ordering - tasks can be moved between           figuration (e.g., the number of nodes in a cluster) or a degree of
adjacent groups, and adjacent groups can be merged. The draw-            parallelism.
back of this approach is however, that the most selective tasks             In [2] the authors developed a framework for using pre-defined
can be moved towards the end of a workflow as the result of a            generic and specific code skeletons for writing programs to be
re-ordering.                                                             executed in a parallel environment. The code is then generated
                                                                         automatically and the process is guided by configuration param-
                                                                         eters that define the number of nodes a program is executed
   2.2.2 ETL with UDFs.
                                                                         on.
None of the approaches described in Sections 2.2.1 and 2.1 sup-
                                                                            In [1, 3] the authors proposed an overall architecture for ex-
ports the optimization of ETL processes with user-defined func-
                                                                         ecuting UDFs in a parallel environment. The architecture was
tions. In the techniques based on the re-ordering of operators,
further extended in [4] with a model for optimizing a cluster con-          To this end a pico-cluster composed of four physical worksta-
figuration, to provide a sub-optimal performance of a given ETL          tions was built. Each workstation included: a 4-core CPU 3GHz,
process with UDFs. The applied model is based on the multiple            16GB RAM, 256GB HDD, and was run under Linux RedHat. Two
choice knapsack problem and the lp_solve library is used to solve        workstations run Kafka and the other two run the IBM InfoSphere
the problem (its implementation is available on GitHub1 ).               DataStage ETL server. The ETL processes were designed using
   The work described in this paper applies: (1) the balanced            DataStage Designer. The system was fed with rows from table
optimization, which uses task re-orderings to move some ETL              Store_Sales (1.5GB), from the TPC-D benchmark. The perfor-
tasks into a data source, (2) parallel processing of ETL tasks, by       mance statistics were measured on each node by iostat. In each
standard parallelization mechanisms of IBM DataStage run in              scenario 12 experiments were run. The lowest and the highest
a micro-cluster, and (3) parallel processing of some ETL tasks           measurements were discarded and the average of the remaining
moved into and executed in a data source, by means of standard           10 was computed and is shown in all the figures. Due to the space
parallelization mechanisms available in Kafka.                           constraints, we present only selected experimental scenarios.
                                                                            Notice that the goal of these experiments was to assess the
3    BALANCED OPTIMIZATION                                               behaviour of the system only at the border between Kafka and
The balanced optimization [17, 27] is implemented in IBM In-             DataStage. For this reason, the ETL processes used in the ex-
foSphere DataStage. Its overall goals are to: (1) minimize data          periments included elementary tasks/components, like: Kafka
movement, i.e., I/O, (2) use optimization techniques available in        connector, column import, filtering, aggregation, and storing out-
a source or target data servers, and (3) maximize parallel process-      put in a file. An example process splitting a data flow is shown
ing. The principle of this optimization is to balance processing         in Figure 1.
between a data source, an ETL server, and a destination (typically
a data warehouse). To this end, a given ETL task can be moved
into a data source (operation push down), to be executed there. It
is typically applied to tasks at the beginning of an ETL process,
i.e., ingesting, filtering, or pre-processing data. ETL tasks that
cannot be moved into a data source are executed in the DataStage
engine. Finally, tasks that profit from processing in a data ware-
house are moved and executed there (operation push up). This
way, specialized features of a DW management system can be                Figure 1: An example ETL process splitting a data flow
applied to processing these tasks, e.g., dedicated indexes, mul-
tidimensional clusters, partitioning schemes, and materialized
views.                                                                   4.1     Parameter RecordCount
    Moving tasks is controlled by DataStage preferences [17] that
guide the DataStage engine to move certain pre-defined tasks             RecordCount is a parameter of a connector from DataStage to
into either a DS, or a DW, or execute them in the engine. The            Kafka. It defines a number of rows (a rowset) that are read from
following tasks are controlled by the preferences: transformation,       a topic. After a rowset is read, the ETL connector outputs the
filtering, aggregation, join, look-up, merge, project, bulk I/O, and     rowset to the ETL process for further processing.
temporary staging tables. Once a given ETL process has been                 The value of RecordCount in the experiments ranged from 1 to
designed, a user specifies the preferences. Then the ETL process         ’ALL’. In the latter case, the whole test data set was read before
gets optimized by DataStage, taking into account the preferences.        starting the ETL process (in the experiment 11 million of rows).
An executable version of the process is generated and deployed.          The elapsed times of processing the whole data set are shown
    So far, the balanced optimization has been proven to be prof-        in Figure 2. The value of the standard deviation ranges from
itable when applied to structured data sources. Since numerous           0.5 to 2.5. From the chart we observe that: (1) the performance
NoSQL and stream DSs become first class citizens, assessing the          strongly depends on the value of the parameter and (2) the perfor-
applicability of this type of optimization to such data sources          mance does not improve for values of RecordCount greater than
may have a real business value. The work presented in this paper         50. Having analyzed detailed CPU and I/O statistics (not shown
is the first one to assess the balanced optimization on a stream         in this paper) we conclude that an optimal value of RecordCount
data source.                                                             is within the range (400, 600).

4    EXPERIMENTAL EVALUATION                                             4.2     The number of Kafka partitions
                                                                         Kafka allows to split streaming data into partitions, each of which
The goal of the experiments was to assess if the balanced opti-
                                                                         is read by a consumer. As the number of partitions may strongly
mization in IBM DataStage may increase the performance of an
                                                                         influence the performance of the whole ETL process2 , figuring
ETL process ingesting data from a stream data source, run by
                                                                         out how performance is affected by the number of partitions is
Kafka. Such a software architecture is frequently used by IBM
                                                                         of great interest.
customers. In particular, we were interested in figuring out: (1)
                                                                             To assess the impact of the number of partitions on processing
what parameters of Kafka may affect performance, cf. Sections
                                                                         time, in our experiments the number of partitions ranged from 1
4.1 and 4.2 as well as (2) which ETL tasks may benefit from the
                                                                         to 8. The number of partitions equaled to the number of Kafka
balanced optimization. In this phase of the project we evaluated
                                                                         consumers. The performance results are shown in Figure 3. Here
filtering (Section 4.3), dataflow split (Section 4.4), and aggregation
                                                                         we show only the results for RecordCount = ALL. As we can
(Section 4.5).
                                                                         observe, the elapsed processing time of the ETL process varies and
1 https://github.com/fawadali/MCKPCostModel                              2 https://www.confluent.io/blog/how-choose-number-topics-partitions-kafka-cluster
                       800                                                                                                            100
                                                                                                                                                                                                RecordCount=10 PD
                                                                                                                                                                                                   RecordCount=10
                       700                                                                                                                                                                     RecordCount=ALL PD
                                                                                                                                                                                                  RecordCount=ALL
                                                                                                                                       80
                       600
    elapsed time [s]




                                                                                                                   elapsed time [s]
                       500
                                                                                                                                       60

                       400

                                                                                                                                       40
                       300

                       200
                                                                                                                                       20
                       100

                            0                                                                                                           0
                                1   2       5   10        20      50   80     100 500 1000 2000 6000   ALL                                           10                    50                       90
                                                         value of parameter Record Count                                                                              selectivity [%]




Figure 2: The dependency of the elapsed processing time                                                      Figure 4: The dependency of the elapsed processing time
on the value of parameter RecordCount                                                                        on the selectivity; parameter RecordCount in {10, ALL}

the best performance was achieved for the number of partitions
equaled to 2 and 3. The standard deviation ranges from 0.5 to 1.3.                                           4.4                       Flow split
                                                                                                             The purpose of a flow split is to divide a data flow into multiple
                       50                                                                                    flows. In our case, we split the input flow into two, using a pa-
                                                                                                             rameterized split ratio: from 10% of data in one flow to an even
                       40                                                                                    split. The flow split was executed in: (1) Kafka, as the result of
                                                                                                             push down, and (2) DataStage. The obtained elapsed execution
  elapsed time [s]




                       30                                                                                    times for RecordCount=10 and RecordCount=ALL are shown in
                                                                                                             Figure 5. The standard deviation ranged from 1.3 to 3.3.
                       20

                                                                                                                                      100
                                                                                                                                                                                                RecordCount=10 PD
                       10                                                                                                                                                                          RecordCount=10
                                                                                                                                                                                               RecordCount=ALL PD
                                                                                                                                                                                                  RecordCount=ALL
                                                                                                                                       80
                        0
                                1       2            3          4            5    6        7           8
                                                                                                                   elapsed time [s]




                                                                 # partitions
                                                                                                                                       60



Figure 3: The dependency of the elapsed processing                                                                                     40
time on the number of partitions (consumers); parameter
RecordCount=ALL                                                                                                                        20



   A detailed analysis of I/O (not shown here due to space con-                                                                         0
                                                                                                                                             50/50        40/60            30/70            20/80        10/90
straints) reveals that the number of I/O increases with the in-                                                                                                   data flow split ratio [%]

creasing number of partitions. Similar increase was observed also
in a CPU time. That may be some of the factors that influence                                                Figure 5: The dependency of the elapsed processing time
the characteristics shown in Figure 3.                                                                       on the data flow split ratio; parameter RecordCount in {10,
                                                                                                             ALL}
4.3                         Selectivity of data ingest
In this experiment we assessed the elapsed ETL processing time
w.r.t. the selectivity of a data ingest from Kafka (the selectivity                                             The characteristics show the performance improvement while
is defined as: the number of rows ingested / the total number of                                             applied push down for RecordCount=10, cf. bars labeled Record-
rows). Two scenarios were implemented. In the first one, oper-                                               Count=10 PD vs. RecordCount=10. From other experiments (not
ation push down was applied to move data filtering into Kafka                                                discussed in this paper) we observed that the maximum value
(available in library Kafka Streams). In the second one, data were                                           of RecordCount that improved the performance is 10. From the
filtered by a standard pre-defined task in DataStage. The results                                            analysis of detailed data on CPU and I/O, we tentatively draw a
for RecordCount=10 and RecordCount=ALL are shown in Figure                                                   conclusion that the observed behaviour is caused by Kafka that
4. The results labeled with suffix PD denote the scenario with                                               was not able to deliver data on time into DataStage.
operation push down applied. The standard deviation ranges from
0.1 to 2.8.                                                                                                  4.5                       Aggregation
    The results clearly show that a wrong combination of Record-                                             In this experiment, count is used to count records in groups.
Count with applied push down can decrease the overall perfor-                                                The number of groups ranges from 10 to 90. The aggregation is
mance of an ETL process, cf. RecordCount=ALL PD (with push                                                   executed in two variants: (1) in Kafka (available in library Kafka
down) vs. RecordCount=ALL (without push down), for selectiv-                                                 Streams), as the result of push down, and (2) in DataStage. The
ity=90%. From other experiments (not presented in this paper)                                                results are shown in Figure 6 for RecordCount=10 and Record-
we observed that values of RecordCount ≤ 20, caused that push                                                Count=ALL. The standard deviation ranged from 0.5 to 2.2. As
down offered stable better performance for the whole range of                                                we can see, the execution time does not profit from push down
selectivities.                                                                                               in either of these cases, cf. bars labeled as RecordCount=10 PD
(push down applied) vs. RecordCount=10 (without push down) and                 DataStage. The recommender will analyze experimental metadata
RecordCount=ALL PD vs. RecordCount=ALL.                                        to discover patterns and build recommendation models, with a
   The analysis of CPU and I/O reveals that push down caused                   goal to propose configuration parameters for a given ETL process
higher CPU usage times and higher I/O, which resulted in much                  and to propose an orchestration scenarios of tasks within the
worse performance of the aggregation in Kafka than in DataStage.               balanced optimization.
                                                                               Acknowledgements. The work of Robert Wrembel is supported
                       180    RecordCount=10 PD                                by: (1) the grant of the Polish National Agency for Academic
                                 RecordCount=10

                       160
                             RecordCount=ALL PD
                                RecordCount=ALL
                                                                               Exchange, within the Bekker Programme and (2) IBM Shared
                       140
                                                                               University Reward 2019.
                       120
    elapsed time [s]




                       100
                                                                               REFERENCES
                                                                                [1] Syed Muhammad Fawad Ali. 2018. Next-generation ETL Framework to Ad-
                        80
                                                                                    dress the Challenges Posed by Big Data. In DOLAP.
                        60                                                      [2] Syed Muhammad Fawad Ali, Johannes Mey, and Maik Thiele. 2019. Paral-
                                                                                    lelizing user-defined functions in the ETL workflow using orchestration style
                        40
                                                                                    sheets. Int. J. of Applied Mathematics and Comp. Science (AMCS) (2019), 69–79.
                        20                                                      [3] Syed Muhammad Fawad Ali and Robert Wrembel. 2017. From conceptual de-
                                                                                    sign to performance optimization of ETL workflows: current state of research
                         0
                                                  10         50           90        and open problems. The VLDB Journal (2017), 1–25.
                                                       number of groups         [4] Syed Muhammad Fawad Ali and Robert Wrembel. 2019. Towards a Cost
                                                                                    Model to Optimize User-Defined Functions in an ETL Workflow Based on
                                                                                    User-Defined Performance Metrics. In ADBIS. LNCS 11695, 441–456.
Figure 6: The dependency of the elapsed processing time                         [5] Dominic Battré, Stephan Ewen, Fabian Hueske, Odej Kao, Volker Markl, and
on the number of groups in select count(*) ... group                                Daniel Warneke. 2010. Nephele/PACTs: a programming model and execution
                                                                                    framework for web-scale analytical processing. In ACM Symposium on Cloud
by; parameter RecordCount in {10, ALL}                                              Computing. 119–130.
                                                                                [6] Michal Bodziony. 2019. ETL in Big Data Architectures: IBM Approach to
                                                                                    Design and Optimization of ETL Workflows (Invited talk). In DOLAP.
                                                                                [7] Eric Friedman, Peter Pawlowski, and John Cieslewicz. 2009. SQL/MapReduce:
5     SUMMARY                                                                       A practical approach to self-describing, polymorphic, and parallelizable user-
In practice, data integration architectures are frequently built                    defined functions. PVLDB 2, 2 (2009), 1402–1413.
                                                                                [8] Gartner. 2019. Magic Quadrant for Data Integration Tools.
using IBM DataStage, NoSQL (HBase, Cassandra), and stream DSs                   [9] Philipp Große, Norman May, and Wolfgang Lehner. 2014. A study of parti-
(Kafka). The balanced optimization available in DataStage offers                    tioning and parallel UDF execution with the SAP HANA database. In SSDBM.
                                                                                    36.
performance improvements for relational data sources, however,                 [10] Ramanujam Halasipuram, Prasad M. Deshpande, and Sriram Padmanabhan.
its characteristics on NoSQL and stream DSs are unknown. For                        2014. Determining Essential Statistics for Cost Based Optimization of an ETL
this reason, a project was launched at IBM to discover these                        Workflow. In EDBT. 307–318.
                                                                               [11] Herodotos Herodotou, Harold Lim, Gang Luo, Nedyalko Borisov, Liang Dong,
characteristics, with a goal to build an execution optimizer for a                  Fatma Bilgen Cetin, and Shivnath Babu. 2011. Starfish: A Self-tuning System
standard and multi-cloud architectures [6], based on a learning                     for Big Data Analytics. In CIDR, Vol. 11. 261–272.
recommender.                                                                   [12] Fabian Hueske, Mathias Peters, Aljoscha Krettek, Matthias Ringwald, Kostas
                                                                                    Tzoumas, Volker Markl, and Johann-Christoph Freytag. 2013. Peeking into
    In this paper we presented the experimental evaluation of the                   the optimization of data flow programs with mapreduce-style udfs. In ICDE.
balanced optimization applied to a stream DS run by Kafka, within                   1292–1295.
                                                                               [13] Fabian Hueske, Mathias Peters, Matthias J Sax, Astrid Rheinländer, Rico
a joint project of IBM and Poznan University of Technology. To                      Bergmann, Aljoscha Krettek, and Kostas Tzoumas. 2012. Opening the black
the best of our knowledge, it is the first project on analyzing                     boxes in data flow optimization. PVLDB 5, 11 (2012), 1256–1267.
the possibility of using this type of ETL optimization on non-                 [14] Petar Jovanovic, Oscar Romero, Alkis Simitsis, and Alberto Abelló. 2016. Incre-
                                                                                    mental Consolidation of Data-Intensive Multi-Flows. IEEE TKDE 28, 5 (2016),
relational DSs.                                                                     1203–1216.
    From this evaluation, the most interesting observations of a               [15] Anastasios Karagiannis, Panos Vassiliadis, and Alkis Simitsis. 2013. Scheduling
real business value are as follows. First, Kafka turned out to be                   strategies for efficient ETL execution. Information Syst. 38, 6 (2013), 927–945.
                                                                               [16] Nitin Kumar and P. Sreenivasa Kumar. 2010. An Efficient Heuristic for Logical
a bottleneck for push down applied to: (1) filtering, for Record-                   Optimization of ETL Workflows. In VLDB Workshop on Enabling Real-Time
Count=ALL and the selectivity 50%; for other tested operations                      Business Intelligence. 68–83.
                                                                               [17] Rao Lella. 2014. Optimizing BDFS jobs using InfoSphere DataStage Balanced
push down increased performance; (2) split, for RecordCount=ALL,                    Optimization. IBM white paper: Developer Works.
for all split ratios; (3) aggregation, for RecordCount in {10, ALL}.           [18] Xiufeng Liu and Nadeem Iftikhar. 2015. An ETL optimization framework
Second, an overall performance of an ETL process strongly de-                       using partitioning and parallelization. In ACM SAC. 1015–1022.
                                                                               [19] Xiufeng Liu, Christian Thomsen, and Torben Bach Pedersen. 2012. MapReduce-
pends on specific parameters of Kafka, e.g., RecordCount and the                    based Dimensional ETL Made Easy. PVLDB 5, 12 (2012), 1882–1885.
number of partitions. Third, the characteristics of CPU and I/O                [20] Xiufeng Liu, Christian Thomsen, and Torben Bach Pedersen. 2013. ETLMR: A
usage may suggest that in order to increase the performance of                      Highly Scalable Dimensional ETL Framework Based on MapReduce. Trans.
                                                                                    Large-Scale Data- and Knowledge-Centered Systems 8 (2013), 1–31.
Kafka, more hardware needs to be allocated for Kafka than for                  [21] Astrid Rheinländer, Arvid Heise, Fabian Hueske, Ulf Leser, and Felix Nau-
DataStage.                                                                          mann. 2015. SOFA: An extensible logical optimizer for UDF-heavy data flows.
                                                                                    Information Syst. 52 (2015), 96–125.
    Even though, the aforementioned observations cannot be gen-                [22] Philip Russom. 2017. Data Lakes: Purposes, Practices, Patterns, and Platforms.
eralized (they apply to this particular experimental setting), they                 TDWI white paper.
turned out to be of practical value. First, they were very well                [23] Alkis Simitsis, Panos Vassiliadis, and Timos Sellis. 2005. Optimizing ETL
                                                                                    Processes in Data Warehouses. In ICDE. 564–575.
received by the Executive Management @IBM. Second, the obser-                  [24] Alkis Simitsis, Panos Vassiliadis, and Timos K. Sellis. 2005. State-Space Opti-
vations were integrated into a knowledge base of IBM Software                       mization of ETL Workflows. IEEE TKDE 17, 10 (2005), 1404–1419.
Lab and have already been used for multiple proofs of concept.                 [25] Alejandro A. Vaisman and Esteban Zimányi. 2014. Data Warehouse Systems -
                                                                                    Design and Implementation. Springer.
    The next phase of this project will consist in: (1) extending the          [26] Informatica white paper. 2007. How to Achieve Flexible, Cost-effective Scala-
evaluation of push down to Kafka (other sizes of a cluster, other                   bility and Performance through Pushdown Processing.
                                                                               [27] IBM white paper. 2008. IBM InfoSphere DataStage Balanced Optimization.
parameters of Kafka), (2) evaluating push down for HBase and                        Information Management Software.
Cassandra, (3) designing and building a learning recommender for