=Paper=
{{Paper
|id=Vol-2929/paper2
|storemode=property
|title=Extreme-Scale Interactive Cross-Platform Streaming Analytics -- The INFORE Approach
|pdfUrl=https://ceur-ws.org/Vol-2929/paper2.pdf
|volume=Vol-2929
|authors=Antonios Deligiannakis,Nikos Giatrakos,Yannis Kotidis,Vasilis Samoladas,Alkis Simitsis
|dblpUrl=https://dblp.org/rec/conf/vldb/DeligiannakisGK21
}}
==Extreme-Scale Interactive Cross-Platform Streaming Analytics -- The INFORE Approach==
Extreme-Scale Interactive Cross-Platform Streaming Analytics –
The INFORE Approach
Antonios Deligiannakis Nikos Giatrakos Yannis Kotidis
Athena Research Center Athena Research Center Athens University of Economics and
Technical University of Crete Technical University of Crete Business
adeli@athenarc.gr ngiatrakos@athenarc.gr kotidis@aueb.gr
Vasilis Samoladas Alkis Simitsis
Athena Research Center Athena Research Center
Technical University of Crete alkis@athenarc.gr
vsam@softnet.tuc.gr
ABSTRACT data centers1 . Similarly, discovering dependencies or correlations
Nowadays, analytics are at the core of many emerging applica- among economies or industries involves global analysis of stock
tions and can greatly benefit from the abundance of data and the streams originating from various local market data centers. In mar-
progress in the processing capabilities of modern hardware. Still, itime data analysis, streams come from a number of ground- or
new challenges arise with the extreme complexity of designing satellite-based receivers before being ingested in proximate data
applications to exploit in full the potential of such offerings. In centers. Additionally, Big Data technologies are significantly frag-
this work, we describe the main challenges in delivering advanced, mented. Each of these scenarios, involves delivering analytics over
cross-platform, streaming analytics at scale with the primary goal networked clusters each hosting a variety of Big Data platforms.
of interactivity. We present the architecture of a prototype system Typically, such scenarios are designed as streaming analytics
designed from scratch to tackle such challenges and we describe workflows. Our goal is to provide a solution for executing global,
the internals of its components. Finally, we discuss key added-value cross-platform, streaming analytics workflows over a network of
synergies upon serving target analytics workflows in real-world computer clusters hosting diverse Big Data technologies, with high
applications and present a preliminary performance evaluation. interactivity; i.e., continuously deriving the output of the work-
flow in real-time and adapting to ad-hoc changes required due to
Reference Format:
observed results or system performance metrics, at runtime.
Antonios Deligiannakis, Nikos Giatrakos, Yannis Kotidis, Vasilis Samoladas, To this extent, streaming Big Data platforms such as Apache
and Alkis Simitsis. Extreme-Scale Interactive Cross-Platform Streaming Spark, Flink, Kafka2 approach the problem by ensuring horizontal
Analytics – The INFORE Approach. In the 2nd Workshop on Search, scalability, i.e, by scaling out (parallelizing) the computation to a
Exploration, and Analysis in Heterogeneous Datastores (SEA Data 2021). number of Virtual Machines (VM) and providing APIs to program
distributed Big Data processing pipelines. These platforms are not
designed to support cross-platform execution scenarios. They also
1 INTRODUCTION provide none or suboptimal support for online Machine Learning
Interactive analytics at scale lie at the core of many diverse appli- (ML) or Data Mining (DM) operators, since the major ML APIs
cations. In life sciences, studying the effect of drug applications they provide do not focus on streaming data. Frameworks such as
on simulated tumors may produce data streams of 100Gb/min [8]. Apache Beam or Apache SAMOA 3 , allow for creating code that is
These streams need to get analyzed online to interactively deter- executable in different, supported Big Data platforms. Nevertheless,
mine successive drug combinations. In the financial domain, NYSE this does not amend the lack of resource management in multi-
alone generates several terabytes of stock data per day [6]. Interac- cluster and cross-platform settings.
tive analytics enable timely reaction to investment opportunities Related systems such as Rheem [2], Ires [4], BigDawg [5], Mus-
or risks. In maritime surveillance, one needs to fuse heterogeneous, keteer [9] are designed towards cross-platform execution of global
voluminous data composed of position streams of thousands of workflows, but they can only handle batch, instead of stream, pro-
vessels, satellite images, thermal camera or acoustic signal streams cessing pipelines. Even in systems like BigDawg or Rheem, which
of on-site devices to investigate ongoing illegal activities at sea [14]. support stream processing in S-Store [13] and JavaStreams respec-
Relevant data not only originate from a variety of heterogeneous tively, cross-streaming platform execution remains unsupported.
sources but also often arrive at multiple, geo-dispersed clusters. Our work aims at filling the gap in handling efficiently advanced,
In life sciences, several efforts aim at federating analytics across cross-platform, interactive analytics at scale. We describe the archi-
tecture of an open-source system, namely INFORE 4 , designed from
scratch for this purpose. A recent INFORE demo [7] (Best Demo
Copyright © 2021 for the individual papers by the papers’ authors. Copyright © 2021
for the volume as a collection by its editors. This volume and its papers are published
1 https://elixir-europe.org/about-us, https://commonfund.nih.gov/bd2k/, https://cyverse.org/
under the Creative Commons License Attribution 4.0 International (CC BY 4.0).
Published in the Proceedings of the 2nd Workshop on Search, Exploration, and Anal- 2 https://spark.apache.org/, https://flink.apache.org/, https://kafka.apache.org/
ysis in Heterogeneous Datastores, co-located with VLDB 2021 (August 16-20, 2021, 3 https://beam.apache.org/, https://samoa.incubator.apache.org/
Copenhagen, Denmark) on CEUR-WS.org. 4 https://bitbucket.org/infore_research_project/
Award at CIKM 2020) showcases how code-free, cross-platform (besides synchronous) synchronization policy to reduce the effect of
analytics are feasible through user-friendly, graphical design and stragglers and bandwidth consumption [12], respectively. Regard-
execution of streaming workflows. [7] presents the frontend, while ing interactivity, the OMLDM component allows to interactively
in the current paper we focus on the backend. We detail its key adjust the number of learners and also to deploy the current up-to-
features, three of its core components and inter-component syn- date ML/DM model at the PS side, should concept drifts occur, at the
ergies in real-world applications. We also present a preliminary very same time the training process in parallel learners proceeds.
performance evaluation. Established synergies with the SDE and the Optimizer components
also boost vertical and federated scalability.
Optimizer Component. The Optimizer is a prerequisite to effec-
2 OVERVIEW OF THE APPROACH tively use the SDE and the OMLDM Components towards interac-
Requirements and Key Features. Based on our interaction with tivity. The Optimizer picks (a) the networked cluster, (b) the Big
real-world practitioners and applications, early in INFORE’s design, Data platforms available in that cluster, and (c) the provisioned
we identified three scalability requirements that should complement resources for each SDE, OMLDM (or other) operator of the global
interactivity and cross-streaming platform execution features. workflow. A good solution is determined taking into account per-
Enhanced Horizontal scalability. Parallelization is not adequate formance criteria and cost models, related to interactivity, engaging
to handle extreme-scale scenarios. The number of available VMs throughput, (network and processing) latency, memory usage etc.
in private clouds has a limit when it comes to extreme scale, while These components cooperate as follows (more in Section 4).
monetary charges arise for cloud providers. Synopses-based Optimization. Given a workflow, the Optimizer
Vertical scalability. Need to scale the computation with the num- produces alternative physical execution plans where it replaces
ber of processed streams. For instance, computing the correlation exact (e.g., count, window, correlation) operators with equivalent,
matrix of stock data streams results in a quadratic explosion in space but approximate ones from the SDE. It then provides suggestions
and computational complexity, which is infeasible for thousands or for equivalent, but approximate, workflows accompanied by the
millions of streams in our motivating scenarios. expected interactivity (execution speed up) vs. accuracy trade-off.
Federated scalability. Need to scale the computation in settings Boosting Vertical Scalability. The SDE Component boosts vertical
where data arrive at multiple, geo-dispersed sites. In such settings, scalability of the OMLDM Component in ways that are not pos-
even when we do everything right within each cluster, the potential sible otherwise. Indicatively, we use Discrete Fourier Transform
for interactivity is network bound [10]. (DFT) and Locality Sensitive Hashing (LSH) bitmaps [11] to bucke-
Overview of Key Components. INFORE employs three core tize (hash) streams during correlation matrix computation, outlier
components for realizing the aforementioned key requirements. detection, clustering or feature extraction. Vertical scalability is
Synopses Data Engine (SDE). There is a wide consensus in stream achieved by hashing streams to learners based on their DFT or LSH
processing [1] that approximate but rapid answers to analytics tasks, coefficients. The complexity of all these operators is reduced since
more often than not, suffice. Synopses, such as samples, sketches, computations are pruned for streams that do not end up nearby in
histograms [1], provide approximate answers, with accuracy guar- the hashing space.
antees, to popular (cardinality, frequency moment, correlation, set Pumped Parameter Server. With the help of the Optimizer, we ex-
membership, quantile) analytic operators. The maintenance and tend the OMLDM Component with an additional synchronization
use of data synopses is essential to stream processing applications policy, besides the synchronous and asynchronous communica-
because applications get only one look of the data. Synopses are tion entailed by the classic PS design [12]. We incorporate a novel
used as an approximate version of the infinite data, which due to its communication protocol, termed Functional Geometric Monitoring
small size can be processed and iterated upon, without the need of (FGM) [17]. This protocol strengthens horizontal (within a cluster)
reading the entire, potentially infinite, data stream multiple times. and federated scalability by bridging the gap between synchronous
The SDE builds and maintains such synopses across platforms and asynchronous communication. Instead of having learners com-
and clusters. It achieves enhanced horizontal scalability because municating in predefined rounds/batches (synchronous) or when
synopses shed the load assigned to each VM in a cluster and reduce each one is updated (asynchronous), FGM requires communication
memory usage. For federated scalability, it reduces communication only when a concept drift is likely to have occurred.
by transmitting compact data summaries. For interactivity, we add INFORE Architecture. We develop a prototype of the INFORE
a unique SDE-as-a-Service (SDEaaS) design where the SDE is a system incorporating operators from SDE, OMLDM and stream
constantly running service (job, in one or more clusters) that can transformations provided by the streaming APIs of Big Data plat-
accept any query, synopses creation on new streams or plugging forms such as Apache Flink or Spark. An overview of the INFORE
code for new synopses at runtime, with zero downtime for running architecture is illustrated in Figure 1(a). Currently, INFORE supports
workflows. For vertical scalability, the SDEaaS design maintains streaming analytics workflows over Apache Flink, Spark Structured
thousands of synopses for thousands of streams, in settings where Streaming and Kafka. Cross-(Big Data) platform and cluster com-
prior efforts [15, 16, 18, 19] can maintain only few tens of synopses. munication is performed via JSON formatted Kafka messages. In a
We henceforth use the terms SDE and SDEaaS interchangeably. nutshell, INFORE works as follows:
Online Machine Learning and Data Mining (OMLDM). OMLDM (Step 1) The user designs in a Graphical Editor (left of Figure 1(a))
applies distributed, online ML and DM adopting a Parameter Server the desired workflow composed of stream transformation, OMLDM
(PS) paradigm [12] (Figure 1(e)). The PS paradigm enhances hori- and SDE operators. These are logical operators composing the logi-
zontal and federated scalability via the option of an asynchronous cal view of the workflow, independent of the underlying platforms.
Stream Map CoFlatMap
SDEaaS OMLDM Transformations Output
Data CoFlatMap Topic
parse HashData
Topic Union
1 3 add Split
Logical WF Physical WF Register
2 Map Synopsis
estimate splitter federator merge
Suggested Manager Stats, Optimizer
Graphical Physical WF Logical WF Request parse
Register FlatMap
Editor Topic Request
5 4 FlatMap To geo-
Monitor Submit dispersed
Computing Clusters Union Topic Union
Topic
—Data Path —Requests Path —Single-stream —Federated —Mergeable
Synopsis Estimation Synopsis Synopsis
Estimation
(a) Overall INFORE Architecture. (b) SDEaaS Component (Condensed View).
Updates
Model
Global
Map
(sync, async, FGM)
FlatMap FlatMap
—Training
Synchronization
Training Parameter Feedback
parse Learner
Topic Server Topic Path
Global Model
Parameter
Local Models on local streams
Server
Manager
module Statistics
Control
Control —Control
Topic
Response Topic Path
Benchmarking
Map
Learner n
Learner 1
Learner 2
Optimization FlatMap
Algorithms —Prediction
…
Prediction parse Predictor Prediction
Cost Estimator Topic Output Topic Path
(c) Optimizer Component (d) OMLDM Component (Condensed View). (e) Param. Server View.
Figure 1: Overall INFORE Architecture & Internal Architecture of Key Components
(Step 2) When the designed workflow is submitted, a JSON file synopses common to workflows. Moreover, for every new synopsis
is conveyed to a Manager module (middle of Figure 1(a)). Initially, maintenance request, SDEaaS reserves new execution tasks for the
the Manager sends the JSON to the Optimizer (right of Figure 1(a)). operators in Figure 1(b), while without SDEaaS we would reserve
(Step 3) The Optimizer determines the physical view of the work- entire threads (task slots in terms of Flink). In that, SDEaaS manages
flow, performing a mapping of the logical operators to physical ones thousands of synopsis, on-demand, for thousands of streams; hence,
to be executed on specific network clusters and Big Data platforms, ensuring vertical scalability [11].
so that various performance criteria are optimized. To briefly showcase the performance benefits of our approach,
(Step 4) The Optimizer returns a JSON with physical operators Figure 3(a) shows the results of an experiment over a cluster with
to the Manager which compiles .𝑗𝑎𝑟 files deploying stream trans- 40 task slots. (The hardware details are not important for this ex-
formation, OMLDM and SDE operators (top of Figure 1(a)) to be periment.) We compare SDEaaS to a non-SDEaaS approach, such
submitted at chosen (cluster, Big Data platform) pairs. as Proteus [16], that extends Flink with data synopsis utilities, but
(Step 5) The submitted jobs are monitored and performance lacks the SDEaaS paradigm. We design an experiment where we
statistics are collected. start with maintaining 2 Count-Min sketches for frequency esti-
mations on the volume, price pairs of financial data (stocks). Then,
3 KEY-COMPONENT INTERNALS we express demands for maintaining more Count-Min sketches
This section elaborates on INFORE’s key components. For space for up to 5000 sketches/stocks. We do that without stopping the
considerations, we describe the concepts using Flink as an example already running synopses. Figure 3(a) shows the sum of through-
implementation, but the design is generic to support operators in puts of all running jobs for non-SDEaaS and the throughput of
all streaming platforms mentioned above. SDEaaS. non-SDEaaS starts new jobs for new synopses and assigns
at least one entire task slot to each new job. Hence, the 40 task
3.1 SDE Component slots in our cluster are depleted and ✘ signs in Figure 3(a) denote
that non-SDEaaS cannot maintain more than 40 synopses. SDEaaS
The architecture of the SDE Component is illustrated in Figure 1(b).
has no such limit since it assigns new tasks to a single running
In streaming settings, multiple workflows may run continuously
service (job) at runtime, with zero downtime. SDEaaS also exhibits
providing output streams to application interfaces for timely de-
higher throughput than non-SDEaaS due to fine-grained resource
cision making. The SDEaaS design we adopt (see Section 2) can
utilization at the task, instead of task slot, level.
serve multiple wofklows, running concurrently and continuously,
The extensible SDE Library currently includes a variety of syn-
with approximate, commonly used operators (e.g., counts, sums,
opses techniques for cardinality, frequency moment, correlation,
frequency moments) reusing, instead of duplicating, streams and
set membership or quantile estimation [1]. Please check [11] for computational load of training on an incoming data stream of train-
further details. The SDE API allows on-the-fly requests for build- ing samples. In a nutshell, PS holds and updates the state of the
ing/stopping synopses, plug-in code for new synopses in the SDE learning process, including the current version of the trained model.
Library, ad-hoc and continuous queries. To enable the SDEaaS de- Learners compute the updates to the model and coordinate via the
sign for interactivity and vertical scalability, the SDE Library is PS. This is because the iterative nature of ML/DM computations re-
built using subtype polymorphism and inheritance in Java. This is quires communication between all parallel workers, and this is done
the key to allow new synopsis code to be loaded at runtime and by the PS in each pipeline. Training pipelines are data sinks, that
also allows all requests to be issued on-the-fly by the Manager is, they do not generate an output stream. However, they do have
module, instead of deploying separate jobs. Every new synopsis a Feedback Kafka topic to support iteration required by involved
simply extends a Synopsis class with its detailed algorithms for tasks. The Feedback topic is also read by the predictors of the red
add (streaming updates), estimate (query answering) and merge path, discussed below, to update their models upon a concept drift.
for outputting an estimation for mergeable synopses [1] kept at For instance, should training on labeled streams of a classification
multiple workers or clusters. task take place on par with prediction (tagging unlabeled streams),
When a synopsis operator of the SDE is included in a workflow, in case of a concept drift, the PS sends a new model to predictors
every request arriving via the RequestTopic in Figure 1(b) for build- via the FeedbackTopic.
ing or querying a running synopsis follows the red-colored path. For training pipelines, it is the pipeline’s responsibility to coor-
The RegisterSynopsis, RegisterRequest FlatMaps initially pro- dinate the processing between the learners and the PS. As noted in
duce and then look up the same keys pointing to workers that Section 2, the supported coordination policies include synchronous,
should maintain the synopsis, but for a different purpose each. asynchronous and FGM, a novel and bandwidth-efficient coordi-
RegisterSynopsis uses these keys to direct streaming tuples, which nation policy that we introduced at [17]. Performance-wise, the
follow the blue-colored path, to assigned workers to update the synchronous policy does not encourage enhanced horizontal scala-
synopsis. Register Request uses the same keys to direct queries to bility because when many learners are used, the total utilization
those workers and derive estimations at the estimation FlatMap is usually low, should only few stragglers exist. The asynchronous
by reading the status of the synopsis kept at the add FlatMap. one is the policy of choice in large-scale ML; the processing speed is
When a streaming update arrives at the Kafka DataTopic, the much higher when many learners are used, and therefore training is
FlatMap termed HashData looks up the keys of workers produced more scalable. The FGM policy provides both enhanced horizontal
by RegisterSynopsis and directs the update to the proper workers. and federated scalability because (a) the learners do not need to
There, the add FlatMap includes the update in the maintained await each other’s sync in rounds or contact the PS immediately
synopsis. For instance, in case a FM sketch is maintained[1], the add with each update, (b) communication takes place asynchronously,
operation hashes the incoming tuple to a position of the maintained but only when a concept drift may have occurred. This is achieved
bitmap and turns the corresponding bit to 1 if it is not already set. in synergy with the Optimizer (see Section 4).
Upon a query, the estimation FlatMap uses the lowest set bit of Figure 1(d) shows an instance of the PS and learners running on
that bitmap to derive a cardinality estimation[1]. a Flink cluster. The messaging interface between Learner and the
The rest of the operators on the right-hand side of Figure 1(b) are PS is handled via Kafka, and hence it is generic and independent of
used for appropriately controlling the output topics. The splitter the underlying Big Data platform.
(split operator in Flink) directs the estimation to the OutputTopic Prediction pipelines. These are shown with the red-colored path of
if a synopsis is defined on a single stream handled by one worker Figure 1(d). Prediction pipelines do not train the models they apply;
(green path). If a synopses engages many streams (e.g. many stocks) these models are loaded at Predictor FlatMap via control mes-
maintained at multiple workers (purple path) or multiple clusters sages. A model can be changed interactively by the PS at runtime, if
(yellow path – clusters communicate via the UnionTopic of Kafka) there has been a concept drift, via the FeedbackTopic. Prediction
merge FlatMap synthesizes the estimations (for instance, executing pipelines employ some model for classification, interpolation or
a logical disjunction operation on various FM sketches [1]) before inference and transform input to output streams, which can be
outputting the estimation via OutputTopic. processed further by downstream operators of a global workflow.
The control topics (green arrows and Kafka topics in Figure 1(d))
deliver control messages to and from an OMLDM job. The control
API understands Create/Read/Update/Delete (CRUD) commands
3.2 OMLDM Component for each type of resource. For instance, using control messages the
The OMLDM component fills the gap of existing batch processing- OMLDM component will respond with the current status of the PS
focused APIs such as Spark’s MLlib or FlinkML. Its internal architec- and learners (e.g. how many learners are currently running) or the
ture is illustrated in Figure 1(d). Two types of pipelines are engaged currently up-to-date model at the PS.
in the OMLDM architecture: training and prediction pipelines. OMLDM currently supports (i) classification: Passive Aggressive
Training pipelines. Training data streams follow the blue-colored Classifier, Online Support Vector Machines and Vertical Hoeffding
path of Figure 1(d). They are ingested via the TrainingTopic. The Trees, (ii) clustering/outlier detection: BIRCH, Online k-Means and
rest of the training pipeline is implemented by a pair of Learner StreamKM++ and (iii) regression: Passive Aggressive Regressor, On-
and ParameterServer FlatMaps. We follow a Parameter Server line Ridge and Polynomial Regression, Autoregressive Integrated
(PS) distributed model (Figure 1(e)), where an interactively defined Moving Average. Facilities for correlation matrix computation, stan-
number of identical learners distribute amongst themselves the dardization, polynomial feature extraction are also available.
(a) Initial Workflow at the Graphical Editor. (b) Suggested Workflow via Synopses-based Optimization.
Figure 2: Initial and transformed workflow using Synopses-based Optimization.
3.3 Optimizer Component cannot work in our setup as is, because it can only suggest optimal
Given a workflow composed of various operators, we have to prop- paths. This would mean that some logical operators would be left
erly tune their execution prescribing both the Big Data Platform out from suggesting a physical operator mapping. Our novel A*
and the networked computer cluster that achieve good overall per- variation, fosters equivalent heuristics to the typical A* to prune the
formance with respect to interactivity-related metrics are met. search space, but outputs a graph (instead of a path) with physical
The internal architecture of the INFORE Optimizer is shown operators instantiating every logical one in the submitted workflow.
in Figure 1(c). There are a number of submodules involved before This new algorithm can speed up the suggestion of the optimal
providing logical to physical operator mappings to be deployed physical workflow by orders of magnitude (see Section 4). We have
for execution. First, we use a statistics collector to derive perfor- also developed a Greedy and a Heuristic algorithm that leverage par-
mance measurements from each executed workflow. Statistics are allelism and domain knowledge to boost the performance further
collected via JMX or Slurm5 and are ingested in an ELK stack6 and to enable adaptivity (i.e., provide in-real time and incrementally
while monitoring jobs. a new plan to adapt to ad hoc performance hiccups). All algorithms
We have developed a Benchmarking submodule that automates employ the cost estimation models to evaluate alternative physical
the acquisition of performance metrics for every supported operator operators while exploring the space of alternative mappings.
run on different Big Data platforms. The Benchmarking submodule
circumvents steps 1-3 in Figure 1(a) and enables direct job submision 4 SYNERGIES IN A REAL CASE STUDY
to the underlying clusters engaging supported operators and stream This section describes how our prototype works in a real use case
sources. In that, the execution of various operators and (part of) scenario from the financial domain and how advanced synergies are
workflows on different (Big Data platform, cluster) pairs is a priori established among individual components. For exhibition purposes,
benchmarked, statistics are collected and performance models are we employ a single cluster equipped with Flink and Spark.
built to avoid a cold start. The logical workflow submitted to the Optimizer is illustrated
Cost models are derived with a Bayesian Optimization approach in Figure 2(a). The workflow of Figure 2(a) utilizes Level 1, Level
inspired by CherryPick [3]. From the statistics we have collected 2 stock data7 to discover highly positively/negatively correlated
from either micro-benchmarks and/or actual workflow execution, pairs of stocks. In Figure 2(a), data arrive at a Kafka source. The
we form a Gaussian performance/cost prediction model for the Split operator separates Level 1 from Level 2 data. It directs Level
various operators and (parts of) workflows executed in different 2 data to the bottom branch of the workflow. There, the Level 2 bids
(Big Data platform, cluster) pairs. The cost model is incrementally are Filtered (i.e., for monitoring only a subset of stocks). The bids
improved with new workflow executions. per stock are counted using a Count logical operator. When a trade
Algorithmically, the Optimizer solves a multi-criteria optimiza- for a stock is realized, a new Level 1 tuple is directed by Split to
tion problem with objectives involving throughput, communication the upper part of the workflow. A Project operator keeps only the
cost, processing and network latency, CPU/GPU and memory usage timestamp of the trade for each stock. The Join operator joins the
as well as accuracy for synopses-based optimization purposes. Due stock trade, Level 1 tuple with the count of bids the stock received
to conflicting objectives, we cannot optimize them all simultane- until the trade. The result is inserted in a time Window of recent such
ously. Therefore, we resort to Pareto optimal solutions obtained counts, forming a time series. Finally, the CorrelationMatrix of
by maximizing a weighted combination of these objectives under stocks’ time series is computed by that OMLDM operator.
clusters’ capacity constraints. Synopses-based Optimization. When the workflow is submit-
Notably, no prior effort [2, 4, 5, 9] examined such a rich set of ted, for every logical operator, there are two platform choices, Flink
optimization objectives related to interactivity. Indicatively, only or Spark. The Optimizer holds a dictionary to determine the equiv-
[2] considers network performance metrics (but for batch process- alence of logical operators to physical ones in either platform. In
ing scenarios). To derive Pareto optimal solutions, besides a baseline, Synopses-based Optimization, the Optimizer attempts to boost in-
Exhaustive Search algorithm that examines every possible mapping teractivity by treating the SDE as an additional platform. That is,
of logical to physical operators in a workflow, we have developed a its dictionary also holds mappings about which exact operators
novel variation of an A*-alike algorithm. The classic A* algorithm could be replaced by approximate ones of the SDE. Therefore, the
5 https://docs.oracle.com/javase/tutorial/jmx/overview/, slurm.schedmd.com/
6 https://www.elastic.co/what-is/elk-stack 7 www.investopedia.com/terms/l/level1.asp,www.investopedia.com/terms/l/level2.asp
Execution Times, Exhaustive and A* ‐alike vs Number Throughput Ratio vs Number of Streams & Volume
Throughput versus Number of Synopses of Big Data Platforms |1 cluster|
Execution Times (ms‐‐Log Scale)
1,00E+05 Processed Volume (TB) x Max Kafka Ingestion Rate
0,1 1 10
Total Throughput (tuples/sec)
3,00E+06
2,50E+06 1,00E+04 Exhaustive Astar‐alike 12,00
Throughput Ratio
2,00E+06 1,00E+03 10,00 INFORE/Naive
8,00 Parallelism/Naive
1,50E+06 1,00E+02 6,00
1,00E+06 SDEaaS
1,00E+01 4,00
, non‐SDEaaS
5,00E+05 2,00
Competitors 1,00E+00
0,00E+00 0,00
1 2 3
1 10 100 1000 10000 50 500 5000
Number of Concurrently Maintained Synopses Number of Big Data Platforms Number of Streams
(a) SDEaaS vs non-SDEaaS. (b) Optimizer Execution Time. (c) Horizontal & Vertical Scalability.
Figure 3: Performance Evaluation.
Optimizer will finally suggest an execution plan as shown in Fig- time they receive the logical one, when 1 (Flink), 2 (Spark,Flink) or
ure 2(b). In this plan the Count operator has been replaced by a 3 (Spark, Flink, Synopses-based Optimization) are available. As the
SDE.CountMin sketch operator of the SDE and the Window operator figure demonstrates for more than 1 platform, our novel A*–alike
has been replaced by SDE.DFT. All physical operators are prescribed algorithm reduces the time to prescribe a physical workflow by 1 to
to be executed over Flink, i.e., where the SDE service runs. 4 orders of magnitude. For 1 platform (still the Optimizer prescribes
Boosting Vertical Scalability. The SDE.DFT operator of the provisioned resources) the Exhaustive Search Algorithm is faster,
SDE, for every window it approximates, outputs a pair of values: a since A*–alike requires more initialization time.
bucketID and a vector of reduced dimensionality compared to the In Figure 3(c) we show the scalability of the INFORE approach
original window. The bucketID is based on the first few coefficients versus a technique that executes the workflow of Figure 2(a) in Flink
of the DFT [11] and is used as the key to hash stock streams to (denoted by "Parallelism"), without synopses-based optimization,
learners of the CorrelationMatrix OMLDM operator (Figure 2(b)). boosted vertical scalability or the OMLDM Component. INFORE ex-
Reduced vector correlations are pruned for stocks hashed far away ecutes the workflow of Figure 2(b). We plot the throughput ratio of
in the hashing space since these stock streams are guaranteed to INFORE and "Parallelism" over a Naive approach that executes the
be highly (dis)similar [11]. This boosts vertical scalability for a task same workflow with parallelism set to 1. The two horizontal axes in
of quadratic complexity to the number of input streams. the figure show the number of processed stock streams (bottom) and
Pumped Parameter Server. The stream entering the Corre- the volume in terabytes (top) ingested using the maximum Kafka
lation Matrix operator of OMLDM is duplicated to the training rate. Thus, the top axis accounts for horizontal scalability (volume,
and prediction pipelines (Figure 1(d)). Learners (buckets in our pre- speed), while the bottom axis accounts for vertical scalability.
vious discussion) compute the pairwise correlations of the streams When we monitor 500 streams, i.e., 250K/2 cells in the correlation
they receive (correlation coefficient and the beta regression parame- matrix, the fact that "Parallelism" lacks synopses-based optimization
ter coincide for standardized vectors), while the PS holds the current and vertical scalability makes its throughput ratio approaching 1, i.e.
up-to-date correlation matrix which is essentially the global model the Naive approach. INFORE’s performance remains steady when
at the predictor and the PS sides. A concept drift for the global switching from 50 to 500 streams improving "Parallelism" by 3 times.
model would occur if the Frobenius norm measuring the difference The most important findings come upon switching to 5000 stocks
between the matrix kept at PS and the true correlation matrix may (25M/2 cells in the matrix). Figure 3(c) denotes that because of the
have exceeded a threshold. The Optimizer will decompose this con- lack of enhanced horizontal and vertical scalability, "Parallelism"
straint to local ones installed at each learner. Mathematical details becomes equivalent to the Naive one. INFORE exhibits more than
are described in [17], but in a simple version of FGM, the Optimizer an order of magnitude improved throughput with the trade-off of
will prescribe that learners will sync with the PS when a signed highly correlated pairs being identified with 90% accuracy.
distance value computed locally at each learner, becomes positive. For federated scalability, we measure communicated bytes among
Notice that local constraints are determined by the Optimizer which the operators. Due to space constraints, we only report here that
initially prescribes and then adapts the learners at runtime. INFORE can reduce communication by more than an order of mag-
Preliminary Experimental Evaluation. We employ a Kafka nitude when 5000 stock streams are being monitored.
cluster with 4 Dell PowerEdge R320 Intel Xeon E5-2430 v2 2.50GHz
machines with 32GB RAM and a cluster with 10 Dell PowerEdge 5 CONCLUSIONS & ONGOING WORK
R300 Quad Core Xeon X3323 2.5GHz machines with 8GB RAM We have presented the design, internal structure, and integration
each, for Flink and Spark Structured Streaming. We use real Level 1, aspects of key components of INFORE, a prototype designed for
Level 2 stock data (∼5000 stocks/∼10 TB) provided by a European interactive analytics at scale. To our knowledge, INFORE is the first
financial company. We set DFT to use 8 coefficients, CountMin system for cross-platform, streaming analytics. We currently enrich
(𝜖 = 0.002, 𝛿 = 0.01), we use 0.9 correlation threshold and a time the SDE and OMLDM libraries to strengthen the support for the
window of 5 minutes, based on input from the data provider. Fig- three scalability types and enhance analytics capabilities.
ure 3(b) shows the time it takes to the two optimization algorithms
(Exhaustive and A*–alike) to produce a physical workflow from the
ACKNOWLEDGMENTS Klinkenberg, Miguel Ponce de Leon, Gian Gaetano Tartaglia, Alfonso Valencia,
and Dimitrios Zissis. 2019. Interactive Extreme: Scale Analytics Towards Battling
This work has received funding from the EU Horizon 2020 research Cancer. IEEE Technol. Soc. Mag. 38, 2 (2019), 54–61. https://doi.org/10.1109/MTS.
and innovation program INFORE under grant agreement No 825070. 2019.2913071
[9] Ionel Gog, Malte Schwarzkopf, Natacha Crooks, Matthew P. Grosvenor, Allen
Clement, and Steven Hand. 2015. Musketeer: all for one, one for all in data
REFERENCES processing systems. In Proceedings of the Tenth European Conference on Computer
[1] 2016. Data Stream Management - Processing High-Speed Data Streams. Springer. Systems, EuroSys 2015, Bordeaux, France, April 21-24, 2015, Laurent Réveillère,
https://doi.org/10.1007/978-3-540-28608-0 Tim Harris, and Maurice Herlihy (Eds.). ACM, 2:1–2:16. https://doi.org/10.1145/
[2] Divy Agrawal, Sanjay Chawla, Bertty Contreras-Rojas, Ahmed K. Elmagarmid, 2741948.2741968
Yasser Idris, Zoi Kaoudi, Sebastian Kruse, Ji Lucas, Essam Mansour, Mourad [10] Jeyhun Karimov, Tilmann Rabl, Asterios Katsifodimos, Roman Samarev, Henri
Ouzzani, Paolo Papotti, Jorge-Arnulfo Quiané-Ruiz, Nan Tang, Saravanan Thiru- Heiskanen, and Volker Markl. 2018. Benchmarking Distributed Stream Data
muruganathan, and Anis Troudi. 2018. RHEEM: Enabling Cross-Platform Data Processing Systems. In 34th IEEE International Conference on Data Engineering,
Processing - May The Big Data Be With You! -. Proc. VLDB Endow. 11, 11 (2018), ICDE 2018, Paris, France, April 16-19, 2018. IEEE Computer Society, 1507–1518.
1414–1427. https://doi.org/10.14778/3236187.3236195 https://doi.org/10.1109/ICDE.2018.00169
[3] Omid Alipourfard, Hongqiang Harry Liu, Jianshu Chen, Shivaram Venkataraman, [11] Antonis Kontaxakis, Nikos Giatrakos, and Antonios Deligiannakis. 2020. A
Minlan Yu, and Ming Zhang. 2017. CherryPick: Adaptively Unearthing the Synopses Data Engine for Interactive Extreme-Scale Analytics. In CIKM ’20: The
Best Cloud Configurations for Big Data Analytics. In 14th USENIX Symposium 29th ACM International Conference on Information and Knowledge Management,
on Networked Systems Design and Implementation, NSDI 2017, Boston, MA, USA, Virtual Event, Ireland, October 19-23, 2020. ACM, 2085–2088. https://doi.org/10.
March 27-29, 2017, Aditya Akella and Jon Howell (Eds.). USENIX Association, 469– 1145/3340531.3412154
482. https://www.usenix.org/conference/nsdi17/technical-sessions/presentation/ [12] Mu Li, David G. Andersen, Jun Woo Park, Alexander J. Smola, Amr Ahmed,
alipourfard Vanja Josifovski, James Long, Eugene J. Shekita, and Bor-Yiing Su. 2014. Scal-
[4] Katerina Doka, Nikolaos Papailiou, Dimitrios Tsoumakos, Christos Mantas, and ing Distributed Machine Learning with the Parameter Server. In 11th USENIX
Nectarios Koziris. 2015. IReS: Intelligent, Multi-Engine Resource Scheduler for Big Symposium on Operating Systems Design and Implementation, OSDI ’14, Broom-
Data Analytics Workflows. In Proceedings of the 2015 ACM SIGMOD International field, CO, USA, October 6-8, 2014, Jason Flinn and Hank Levy (Eds.). USENIX
Conference on Management of Data, Melbourne, Victoria, Australia, May 31 - June Association, 583–598. https://www.usenix.org/conference/osdi14/technical-
4, 2015, Timos K. Sellis, Susan B. Davidson, and Zachary G. Ives (Eds.). ACM, sessions/presentation/li_mu
1451–1456. https://doi.org/10.1145/2723372.2735377 [13] John Meehan, Stan Zdonik, Shaobo Tian, Yulong Tian, Nesime Tatbul, Adam
[5] Aaron J. Elmore, Jennie Duggan, Mike Stonebraker, Magdalena Balazinska, Ugur Dziedzic, and Aaron J. Elmore. 2016. Integrating real-time and batch processing
Çetintemel, Vijay Gadepally, Jeffrey Heer, Bill Howe, Jeremy Kepner, Tim Kraska, in a polystore. In 2016 IEEE High Performance Extreme Computing Conference,
Samuel Madden, David Maier, Timothy G. Mattson, Stavros Papadopoulos, Jeff HPEC 2016, Waltham, MA, USA, September 13-15, 2016. IEEE, 1–7. https://doi.
Parkhurst, Nesime Tatbul, Manasi Vartak, and Stan Zdonik. 2015. A Demon- org/10.1109/HPEC.2016.7761585
stration of the BigDAWG Polystore System. Proc. VLDB Endow. 8, 12 (2015), [14] Aristides Milios, Konstantina Bereta, Konstantinos Chatzikokolakis, Dimitris
1908–1911. https://doi.org/10.14778/2824032.2824098 Zissis, and Stan Matwin. 2019. Automatic Fusion of Satellite Imagery and AIS
[6] Forbes. 2021 (last accessed). https://www.forbes.com/sites/tomgroenfeldt/ data for Vessel Detection. In 22th International Conference on Information Fusion,
2013/02/14/at-nyse-the-data-deluge-overwhelms-traditional-databases/ FUSION 2019, Ottawa, ON, Canada, July 2-5, 2019. IEEE, 1–5. https://ieeexplore.
#362df2415aab. ieee.org/document/9011339
[7] Nikos Giatrakos, David Arnu, Theodoros Bitsakis, Antonios Deligiannakis, Mi- [15] Barzan Mozafari. 2019. SnappyData. In Encyclopedia of Big Data Technologies,
nos N. Garofalakis, Ralf Klinkenberg, Aris Konidaris, Antonis Kontaxakis, Yannis Sherif Sakr and Albert Y. Zomaya (Eds.). Springer. https://doi.org/10.1007/978-3-
Kotidis, Vasilis Samoladas, Alkis Simitsis, George Stamatakis, Fabian Temme, 319-63962-8_258-1
Mate Torok, Edwin Yaqub, Arnau Montagud, Miguel Ponce de Leon, Holger [16] Proteus Project. 2021 (last accessed). https://github.com/proteus-h2020.
Arndt, and Stefan Burkard. 2020. INforE: Interactive Cross-platform Analytics [17] Vasilis Samoladas and Minos N. Garofalakis. 2019. Functional Geometric Moni-
for Everyone. In CIKM ’20: The 29th ACM International Conference on Information toring for Distributed Streams. In Advances in Database Technology - 22nd Interna-
and Knowledge Management, Virtual Event, Ireland, October 19-23, 2020. ACM, tional Conference on Extending Database Technology, EDBT 2019, Lisbon, Portugal,
3389–3392. https://doi.org/10.1145/3340531.3417435 March 26-29, 2019. OpenProceedings.org, 85–96. https://doi.org/10.5441/002/
[8] Nikos Giatrakos, Nikos Katzouris, Antonios Deligiannakis, Alexander Artikis, edbt.2019.09
Minos N. Garofalakis, George Paliouras, Holger Arndt, Raffaele Grasso, Ralf [18] Stream-lib. 2021 (last accessed). https://github.com/addthis/stream-lib.
[19] Yahoo DataSketch. 2021 (last accessed). https://datasketches.github.io/.