Extreme-Scale Interactive Cross-Platform Streaming Analytics – The INFORE Approach Antonios Deligiannakis Nikos Giatrakos Yannis Kotidis Athena Research Center Athena Research Center Athens University of Economics and Technical University of Crete Technical University of Crete Business adeli@athenarc.gr ngiatrakos@athenarc.gr kotidis@aueb.gr Vasilis Samoladas Alkis Simitsis Athena Research Center Athena Research Center Technical University of Crete alkis@athenarc.gr vsam@softnet.tuc.gr ABSTRACT data centers1 . Similarly, discovering dependencies or correlations Nowadays, analytics are at the core of many emerging applica- among economies or industries involves global analysis of stock tions and can greatly benefit from the abundance of data and the streams originating from various local market data centers. In mar- progress in the processing capabilities of modern hardware. Still, itime data analysis, streams come from a number of ground- or new challenges arise with the extreme complexity of designing satellite-based receivers before being ingested in proximate data applications to exploit in full the potential of such offerings. In centers. Additionally, Big Data technologies are significantly frag- this work, we describe the main challenges in delivering advanced, mented. Each of these scenarios, involves delivering analytics over cross-platform, streaming analytics at scale with the primary goal networked clusters each hosting a variety of Big Data platforms. of interactivity. We present the architecture of a prototype system Typically, such scenarios are designed as streaming analytics designed from scratch to tackle such challenges and we describe workflows. Our goal is to provide a solution for executing global, the internals of its components. Finally, we discuss key added-value cross-platform, streaming analytics workflows over a network of synergies upon serving target analytics workflows in real-world computer clusters hosting diverse Big Data technologies, with high applications and present a preliminary performance evaluation. interactivity; i.e., continuously deriving the output of the work- flow in real-time and adapting to ad-hoc changes required due to Reference Format: observed results or system performance metrics, at runtime. Antonios Deligiannakis, Nikos Giatrakos, Yannis Kotidis, Vasilis Samoladas, To this extent, streaming Big Data platforms such as Apache and Alkis Simitsis. Extreme-Scale Interactive Cross-Platform Streaming Spark, Flink, Kafka2 approach the problem by ensuring horizontal Analytics – The INFORE Approach. In the 2nd Workshop on Search, scalability, i.e, by scaling out (parallelizing) the computation to a Exploration, and Analysis in Heterogeneous Datastores (SEA Data 2021). number of Virtual Machines (VM) and providing APIs to program distributed Big Data processing pipelines. These platforms are not designed to support cross-platform execution scenarios. They also 1 INTRODUCTION provide none or suboptimal support for online Machine Learning Interactive analytics at scale lie at the core of many diverse appli- (ML) or Data Mining (DM) operators, since the major ML APIs cations. In life sciences, studying the effect of drug applications they provide do not focus on streaming data. Frameworks such as on simulated tumors may produce data streams of 100Gb/min [8]. Apache Beam or Apache SAMOA 3 , allow for creating code that is These streams need to get analyzed online to interactively deter- executable in different, supported Big Data platforms. Nevertheless, mine successive drug combinations. In the financial domain, NYSE this does not amend the lack of resource management in multi- alone generates several terabytes of stock data per day [6]. Interac- cluster and cross-platform settings. tive analytics enable timely reaction to investment opportunities Related systems such as Rheem [2], Ires [4], BigDawg [5], Mus- or risks. In maritime surveillance, one needs to fuse heterogeneous, keteer [9] are designed towards cross-platform execution of global voluminous data composed of position streams of thousands of workflows, but they can only handle batch, instead of stream, pro- vessels, satellite images, thermal camera or acoustic signal streams cessing pipelines. Even in systems like BigDawg or Rheem, which of on-site devices to investigate ongoing illegal activities at sea [14]. support stream processing in S-Store [13] and JavaStreams respec- Relevant data not only originate from a variety of heterogeneous tively, cross-streaming platform execution remains unsupported. sources but also often arrive at multiple, geo-dispersed clusters. Our work aims at filling the gap in handling efficiently advanced, In life sciences, several efforts aim at federating analytics across cross-platform, interactive analytics at scale. We describe the archi- tecture of an open-source system, namely INFORE 4 , designed from scratch for this purpose. A recent INFORE demo [7] (Best Demo Copyright © 2021 for the individual papers by the papers’ authors. Copyright © 2021 for the volume as a collection by its editors. This volume and its papers are published 1 https://elixir-europe.org/about-us, https://commonfund.nih.gov/bd2k/, https://cyverse.org/ under the Creative Commons License Attribution 4.0 International (CC BY 4.0). Published in the Proceedings of the 2nd Workshop on Search, Exploration, and Anal- 2 https://spark.apache.org/, https://flink.apache.org/, https://kafka.apache.org/ ysis in Heterogeneous Datastores, co-located with VLDB 2021 (August 16-20, 2021, 3 https://beam.apache.org/, https://samoa.incubator.apache.org/ Copenhagen, Denmark) on CEUR-WS.org. 4 https://bitbucket.org/infore_research_project/ Award at CIKM 2020) showcases how code-free, cross-platform (besides synchronous) synchronization policy to reduce the effect of analytics are feasible through user-friendly, graphical design and stragglers and bandwidth consumption [12], respectively. Regard- execution of streaming workflows. [7] presents the frontend, while ing interactivity, the OMLDM component allows to interactively in the current paper we focus on the backend. We detail its key adjust the number of learners and also to deploy the current up-to- features, three of its core components and inter-component syn- date ML/DM model at the PS side, should concept drifts occur, at the ergies in real-world applications. We also present a preliminary very same time the training process in parallel learners proceeds. performance evaluation. Established synergies with the SDE and the Optimizer components also boost vertical and federated scalability. Optimizer Component. The Optimizer is a prerequisite to effec- 2 OVERVIEW OF THE APPROACH tively use the SDE and the OMLDM Components towards interac- Requirements and Key Features. Based on our interaction with tivity. The Optimizer picks (a) the networked cluster, (b) the Big real-world practitioners and applications, early in INFORE’s design, Data platforms available in that cluster, and (c) the provisioned we identified three scalability requirements that should complement resources for each SDE, OMLDM (or other) operator of the global interactivity and cross-streaming platform execution features. workflow. A good solution is determined taking into account per- Enhanced Horizontal scalability. Parallelization is not adequate formance criteria and cost models, related to interactivity, engaging to handle extreme-scale scenarios. The number of available VMs throughput, (network and processing) latency, memory usage etc. in private clouds has a limit when it comes to extreme scale, while These components cooperate as follows (more in Section 4). monetary charges arise for cloud providers. Synopses-based Optimization. Given a workflow, the Optimizer Vertical scalability. Need to scale the computation with the num- produces alternative physical execution plans where it replaces ber of processed streams. For instance, computing the correlation exact (e.g., count, window, correlation) operators with equivalent, matrix of stock data streams results in a quadratic explosion in space but approximate ones from the SDE. It then provides suggestions and computational complexity, which is infeasible for thousands or for equivalent, but approximate, workflows accompanied by the millions of streams in our motivating scenarios. expected interactivity (execution speed up) vs. accuracy trade-off. Federated scalability. Need to scale the computation in settings Boosting Vertical Scalability. The SDE Component boosts vertical where data arrive at multiple, geo-dispersed sites. In such settings, scalability of the OMLDM Component in ways that are not pos- even when we do everything right within each cluster, the potential sible otherwise. Indicatively, we use Discrete Fourier Transform for interactivity is network bound [10]. (DFT) and Locality Sensitive Hashing (LSH) bitmaps [11] to bucke- Overview of Key Components. INFORE employs three core tize (hash) streams during correlation matrix computation, outlier components for realizing the aforementioned key requirements. detection, clustering or feature extraction. Vertical scalability is Synopses Data Engine (SDE). There is a wide consensus in stream achieved by hashing streams to learners based on their DFT or LSH processing [1] that approximate but rapid answers to analytics tasks, coefficients. The complexity of all these operators is reduced since more often than not, suffice. Synopses, such as samples, sketches, computations are pruned for streams that do not end up nearby in histograms [1], provide approximate answers, with accuracy guar- the hashing space. antees, to popular (cardinality, frequency moment, correlation, set Pumped Parameter Server. With the help of the Optimizer, we ex- membership, quantile) analytic operators. The maintenance and tend the OMLDM Component with an additional synchronization use of data synopses is essential to stream processing applications policy, besides the synchronous and asynchronous communica- because applications get only one look of the data. Synopses are tion entailed by the classic PS design [12]. We incorporate a novel used as an approximate version of the infinite data, which due to its communication protocol, termed Functional Geometric Monitoring small size can be processed and iterated upon, without the need of (FGM) [17]. This protocol strengthens horizontal (within a cluster) reading the entire, potentially infinite, data stream multiple times. and federated scalability by bridging the gap between synchronous The SDE builds and maintains such synopses across platforms and asynchronous communication. Instead of having learners com- and clusters. It achieves enhanced horizontal scalability because municating in predefined rounds/batches (synchronous) or when synopses shed the load assigned to each VM in a cluster and reduce each one is updated (asynchronous), FGM requires communication memory usage. For federated scalability, it reduces communication only when a concept drift is likely to have occurred. by transmitting compact data summaries. For interactivity, we add INFORE Architecture. We develop a prototype of the INFORE a unique SDE-as-a-Service (SDEaaS) design where the SDE is a system incorporating operators from SDE, OMLDM and stream constantly running service (job, in one or more clusters) that can transformations provided by the streaming APIs of Big Data plat- accept any query, synopses creation on new streams or plugging forms such as Apache Flink or Spark. An overview of the INFORE code for new synopses at runtime, with zero downtime for running architecture is illustrated in Figure 1(a). Currently, INFORE supports workflows. For vertical scalability, the SDEaaS design maintains streaming analytics workflows over Apache Flink, Spark Structured thousands of synopses for thousands of streams, in settings where Streaming and Kafka. Cross-(Big Data) platform and cluster com- prior efforts [15, 16, 18, 19] can maintain only few tens of synopses. munication is performed via JSON formatted Kafka messages. In a We henceforth use the terms SDE and SDEaaS interchangeably. nutshell, INFORE works as follows: Online Machine Learning and Data Mining (OMLDM). OMLDM (Step 1) The user designs in a Graphical Editor (left of Figure 1(a)) applies distributed, online ML and DM adopting a Parameter Server the desired workflow composed of stream transformation, OMLDM (PS) paradigm [12] (Figure 1(e)). The PS paradigm enhances hori- and SDE operators. These are logical operators composing the logi- zontal and federated scalability via the option of an asynchronous cal view of the workflow, independent of the underlying platforms. Stream Map CoFlatMap SDEaaS OMLDM Transformations Output Data CoFlatMap Topic parse HashData Topic Union 1 3 add Split Logical WF Physical WF Register 2 Map Synopsis estimate splitter federator merge Suggested Manager Stats, Optimizer Graphical Physical WF Logical WF Request parse Register FlatMap Editor Topic Request 5 4 FlatMap To geo- Monitor Submit dispersed Computing Clusters Union Topic Union Topic —Data Path —Requests Path —Single-stream —Federated —Mergeable Synopsis Estimation Synopsis Synopsis Estimation (a) Overall INFORE Architecture. (b) SDEaaS Component (Condensed View). Updates Model Global Map (sync, async, FGM) FlatMap FlatMap —Training Synchronization Training Parameter Feedback parse Learner Topic Server Topic Path Global Model Parameter Local Models on local streams Server Manager module Statistics Control Control —Control Topic Response Topic Path Benchmarking Map Learner n Learner 1 Learner 2 Optimization FlatMap Algorithms —Prediction … Prediction parse Predictor Prediction Cost Estimator Topic Output Topic Path (c) Optimizer Component (d) OMLDM Component (Condensed View). (e) Param. Server View. Figure 1: Overall INFORE Architecture & Internal Architecture of Key Components (Step 2) When the designed workflow is submitted, a JSON file synopses common to workflows. Moreover, for every new synopsis is conveyed to a Manager module (middle of Figure 1(a)). Initially, maintenance request, SDEaaS reserves new execution tasks for the the Manager sends the JSON to the Optimizer (right of Figure 1(a)). operators in Figure 1(b), while without SDEaaS we would reserve (Step 3) The Optimizer determines the physical view of the work- entire threads (task slots in terms of Flink). In that, SDEaaS manages flow, performing a mapping of the logical operators to physical ones thousands of synopsis, on-demand, for thousands of streams; hence, to be executed on specific network clusters and Big Data platforms, ensuring vertical scalability [11]. so that various performance criteria are optimized. To briefly showcase the performance benefits of our approach, (Step 4) The Optimizer returns a JSON with physical operators Figure 3(a) shows the results of an experiment over a cluster with to the Manager which compiles .𝑗𝑎𝑟 files deploying stream trans- 40 task slots. (The hardware details are not important for this ex- formation, OMLDM and SDE operators (top of Figure 1(a)) to be periment.) We compare SDEaaS to a non-SDEaaS approach, such submitted at chosen (cluster, Big Data platform) pairs. as Proteus [16], that extends Flink with data synopsis utilities, but (Step 5) The submitted jobs are monitored and performance lacks the SDEaaS paradigm. We design an experiment where we statistics are collected. start with maintaining 2 Count-Min sketches for frequency esti- mations on the volume, price pairs of financial data (stocks). Then, 3 KEY-COMPONENT INTERNALS we express demands for maintaining more Count-Min sketches This section elaborates on INFORE’s key components. For space for up to 5000 sketches/stocks. We do that without stopping the considerations, we describe the concepts using Flink as an example already running synopses. Figure 3(a) shows the sum of through- implementation, but the design is generic to support operators in puts of all running jobs for non-SDEaaS and the throughput of all streaming platforms mentioned above. SDEaaS. non-SDEaaS starts new jobs for new synopses and assigns at least one entire task slot to each new job. Hence, the 40 task 3.1 SDE Component slots in our cluster are depleted and ✘ signs in Figure 3(a) denote that non-SDEaaS cannot maintain more than 40 synopses. SDEaaS The architecture of the SDE Component is illustrated in Figure 1(b). has no such limit since it assigns new tasks to a single running In streaming settings, multiple workflows may run continuously service (job) at runtime, with zero downtime. SDEaaS also exhibits providing output streams to application interfaces for timely de- higher throughput than non-SDEaaS due to fine-grained resource cision making. The SDEaaS design we adopt (see Section 2) can utilization at the task, instead of task slot, level. serve multiple wofklows, running concurrently and continuously, The extensible SDE Library currently includes a variety of syn- with approximate, commonly used operators (e.g., counts, sums, opses techniques for cardinality, frequency moment, correlation, frequency moments) reusing, instead of duplicating, streams and set membership or quantile estimation [1]. Please check [11] for computational load of training on an incoming data stream of train- further details. The SDE API allows on-the-fly requests for build- ing samples. In a nutshell, PS holds and updates the state of the ing/stopping synopses, plug-in code for new synopses in the SDE learning process, including the current version of the trained model. Library, ad-hoc and continuous queries. To enable the SDEaaS de- Learners compute the updates to the model and coordinate via the sign for interactivity and vertical scalability, the SDE Library is PS. This is because the iterative nature of ML/DM computations re- built using subtype polymorphism and inheritance in Java. This is quires communication between all parallel workers, and this is done the key to allow new synopsis code to be loaded at runtime and by the PS in each pipeline. Training pipelines are data sinks, that also allows all requests to be issued on-the-fly by the Manager is, they do not generate an output stream. However, they do have module, instead of deploying separate jobs. Every new synopsis a Feedback Kafka topic to support iteration required by involved simply extends a Synopsis class with its detailed algorithms for tasks. The Feedback topic is also read by the predictors of the red add (streaming updates), estimate (query answering) and merge path, discussed below, to update their models upon a concept drift. for outputting an estimation for mergeable synopses [1] kept at For instance, should training on labeled streams of a classification multiple workers or clusters. task take place on par with prediction (tagging unlabeled streams), When a synopsis operator of the SDE is included in a workflow, in case of a concept drift, the PS sends a new model to predictors every request arriving via the RequestTopic in Figure 1(b) for build- via the FeedbackTopic. ing or querying a running synopsis follows the red-colored path. For training pipelines, it is the pipeline’s responsibility to coor- The RegisterSynopsis, RegisterRequest FlatMaps initially pro- dinate the processing between the learners and the PS. As noted in duce and then look up the same keys pointing to workers that Section 2, the supported coordination policies include synchronous, should maintain the synopsis, but for a different purpose each. asynchronous and FGM, a novel and bandwidth-efficient coordi- RegisterSynopsis uses these keys to direct streaming tuples, which nation policy that we introduced at [17]. Performance-wise, the follow the blue-colored path, to assigned workers to update the synchronous policy does not encourage enhanced horizontal scala- synopsis. Register Request uses the same keys to direct queries to bility because when many learners are used, the total utilization those workers and derive estimations at the estimation FlatMap is usually low, should only few stragglers exist. The asynchronous by reading the status of the synopsis kept at the add FlatMap. one is the policy of choice in large-scale ML; the processing speed is When a streaming update arrives at the Kafka DataTopic, the much higher when many learners are used, and therefore training is FlatMap termed HashData looks up the keys of workers produced more scalable. The FGM policy provides both enhanced horizontal by RegisterSynopsis and directs the update to the proper workers. and federated scalability because (a) the learners do not need to There, the add FlatMap includes the update in the maintained await each other’s sync in rounds or contact the PS immediately synopsis. For instance, in case a FM sketch is maintained[1], the add with each update, (b) communication takes place asynchronously, operation hashes the incoming tuple to a position of the maintained but only when a concept drift may have occurred. This is achieved bitmap and turns the corresponding bit to 1 if it is not already set. in synergy with the Optimizer (see Section 4). Upon a query, the estimation FlatMap uses the lowest set bit of Figure 1(d) shows an instance of the PS and learners running on that bitmap to derive a cardinality estimation[1]. a Flink cluster. The messaging interface between Learner and the The rest of the operators on the right-hand side of Figure 1(b) are PS is handled via Kafka, and hence it is generic and independent of used for appropriately controlling the output topics. The splitter the underlying Big Data platform. (split operator in Flink) directs the estimation to the OutputTopic Prediction pipelines. These are shown with the red-colored path of if a synopsis is defined on a single stream handled by one worker Figure 1(d). Prediction pipelines do not train the models they apply; (green path). If a synopses engages many streams (e.g. many stocks) these models are loaded at Predictor FlatMap via control mes- maintained at multiple workers (purple path) or multiple clusters sages. A model can be changed interactively by the PS at runtime, if (yellow path – clusters communicate via the UnionTopic of Kafka) there has been a concept drift, via the FeedbackTopic. Prediction merge FlatMap synthesizes the estimations (for instance, executing pipelines employ some model for classification, interpolation or a logical disjunction operation on various FM sketches [1]) before inference and transform input to output streams, which can be outputting the estimation via OutputTopic. processed further by downstream operators of a global workflow. The control topics (green arrows and Kafka topics in Figure 1(d)) deliver control messages to and from an OMLDM job. The control API understands Create/Read/Update/Delete (CRUD) commands 3.2 OMLDM Component for each type of resource. For instance, using control messages the The OMLDM component fills the gap of existing batch processing- OMLDM component will respond with the current status of the PS focused APIs such as Spark’s MLlib or FlinkML. Its internal architec- and learners (e.g. how many learners are currently running) or the ture is illustrated in Figure 1(d). Two types of pipelines are engaged currently up-to-date model at the PS. in the OMLDM architecture: training and prediction pipelines. OMLDM currently supports (i) classification: Passive Aggressive Training pipelines. Training data streams follow the blue-colored Classifier, Online Support Vector Machines and Vertical Hoeffding path of Figure 1(d). They are ingested via the TrainingTopic. The Trees, (ii) clustering/outlier detection: BIRCH, Online k-Means and rest of the training pipeline is implemented by a pair of Learner StreamKM++ and (iii) regression: Passive Aggressive Regressor, On- and ParameterServer FlatMaps. We follow a Parameter Server line Ridge and Polynomial Regression, Autoregressive Integrated (PS) distributed model (Figure 1(e)), where an interactively defined Moving Average. Facilities for correlation matrix computation, stan- number of identical learners distribute amongst themselves the dardization, polynomial feature extraction are also available. (a) Initial Workflow at the Graphical Editor. (b) Suggested Workflow via Synopses-based Optimization. Figure 2: Initial and transformed workflow using Synopses-based Optimization. 3.3 Optimizer Component cannot work in our setup as is, because it can only suggest optimal Given a workflow composed of various operators, we have to prop- paths. This would mean that some logical operators would be left erly tune their execution prescribing both the Big Data Platform out from suggesting a physical operator mapping. Our novel A* and the networked computer cluster that achieve good overall per- variation, fosters equivalent heuristics to the typical A* to prune the formance with respect to interactivity-related metrics are met. search space, but outputs a graph (instead of a path) with physical The internal architecture of the INFORE Optimizer is shown operators instantiating every logical one in the submitted workflow. in Figure 1(c). There are a number of submodules involved before This new algorithm can speed up the suggestion of the optimal providing logical to physical operator mappings to be deployed physical workflow by orders of magnitude (see Section 4). We have for execution. First, we use a statistics collector to derive perfor- also developed a Greedy and a Heuristic algorithm that leverage par- mance measurements from each executed workflow. Statistics are allelism and domain knowledge to boost the performance further collected via JMX or Slurm5 and are ingested in an ELK stack6 and to enable adaptivity (i.e., provide in-real time and incrementally while monitoring jobs. a new plan to adapt to ad hoc performance hiccups). All algorithms We have developed a Benchmarking submodule that automates employ the cost estimation models to evaluate alternative physical the acquisition of performance metrics for every supported operator operators while exploring the space of alternative mappings. run on different Big Data platforms. The Benchmarking submodule circumvents steps 1-3 in Figure 1(a) and enables direct job submision 4 SYNERGIES IN A REAL CASE STUDY to the underlying clusters engaging supported operators and stream This section describes how our prototype works in a real use case sources. In that, the execution of various operators and (part of) scenario from the financial domain and how advanced synergies are workflows on different (Big Data platform, cluster) pairs is a priori established among individual components. For exhibition purposes, benchmarked, statistics are collected and performance models are we employ a single cluster equipped with Flink and Spark. built to avoid a cold start. The logical workflow submitted to the Optimizer is illustrated Cost models are derived with a Bayesian Optimization approach in Figure 2(a). The workflow of Figure 2(a) utilizes Level 1, Level inspired by CherryPick [3]. From the statistics we have collected 2 stock data7 to discover highly positively/negatively correlated from either micro-benchmarks and/or actual workflow execution, pairs of stocks. In Figure 2(a), data arrive at a Kafka source. The we form a Gaussian performance/cost prediction model for the Split operator separates Level 1 from Level 2 data. It directs Level various operators and (parts of) workflows executed in different 2 data to the bottom branch of the workflow. There, the Level 2 bids (Big Data platform, cluster) pairs. The cost model is incrementally are Filtered (i.e., for monitoring only a subset of stocks). The bids improved with new workflow executions. per stock are counted using a Count logical operator. When a trade Algorithmically, the Optimizer solves a multi-criteria optimiza- for a stock is realized, a new Level 1 tuple is directed by Split to tion problem with objectives involving throughput, communication the upper part of the workflow. A Project operator keeps only the cost, processing and network latency, CPU/GPU and memory usage timestamp of the trade for each stock. The Join operator joins the as well as accuracy for synopses-based optimization purposes. Due stock trade, Level 1 tuple with the count of bids the stock received to conflicting objectives, we cannot optimize them all simultane- until the trade. The result is inserted in a time Window of recent such ously. Therefore, we resort to Pareto optimal solutions obtained counts, forming a time series. Finally, the CorrelationMatrix of by maximizing a weighted combination of these objectives under stocks’ time series is computed by that OMLDM operator. clusters’ capacity constraints. Synopses-based Optimization. When the workflow is submit- Notably, no prior effort [2, 4, 5, 9] examined such a rich set of ted, for every logical operator, there are two platform choices, Flink optimization objectives related to interactivity. Indicatively, only or Spark. The Optimizer holds a dictionary to determine the equiv- [2] considers network performance metrics (but for batch process- alence of logical operators to physical ones in either platform. In ing scenarios). To derive Pareto optimal solutions, besides a baseline, Synopses-based Optimization, the Optimizer attempts to boost in- Exhaustive Search algorithm that examines every possible mapping teractivity by treating the SDE as an additional platform. That is, of logical to physical operators in a workflow, we have developed a its dictionary also holds mappings about which exact operators novel variation of an A*-alike algorithm. The classic A* algorithm could be replaced by approximate ones of the SDE. Therefore, the 5 https://docs.oracle.com/javase/tutorial/jmx/overview/, slurm.schedmd.com/ 6 https://www.elastic.co/what-is/elk-stack 7 www.investopedia.com/terms/l/level1.asp,www.investopedia.com/terms/l/level2.asp Execution Times, Exhaustive and A* ‐alike vs Number Throughput Ratio vs Number of Streams & Volume Throughput versus Number of Synopses of Big Data Platforms |1 cluster| Execution Times (ms‐‐Log Scale) 1,00E+05 Processed Volume (TB) x Max Kafka Ingestion Rate 0,1 1 10 Total Throughput (tuples/sec) 3,00E+06 2,50E+06 1,00E+04 Exhaustive Astar‐alike 12,00 Throughput Ratio 2,00E+06 1,00E+03 10,00 INFORE/Naive 8,00 Parallelism/Naive 1,50E+06 1,00E+02 6,00 1,00E+06 SDEaaS 1,00E+01 4,00 , non‐SDEaaS 5,00E+05 2,00 Competitors 1,00E+00 0,00E+00 0,00 1 2 3 1 10 100 1000 10000 50 500 5000 Number of Concurrently Maintained Synopses Number of Big Data Platforms Number of Streams (a) SDEaaS vs non-SDEaaS. (b) Optimizer Execution Time. (c) Horizontal & Vertical Scalability. Figure 3: Performance Evaluation. Optimizer will finally suggest an execution plan as shown in Fig- time they receive the logical one, when 1 (Flink), 2 (Spark,Flink) or ure 2(b). In this plan the Count operator has been replaced by a 3 (Spark, Flink, Synopses-based Optimization) are available. As the SDE.CountMin sketch operator of the SDE and the Window operator figure demonstrates for more than 1 platform, our novel A*–alike has been replaced by SDE.DFT. All physical operators are prescribed algorithm reduces the time to prescribe a physical workflow by 1 to to be executed over Flink, i.e., where the SDE service runs. 4 orders of magnitude. For 1 platform (still the Optimizer prescribes Boosting Vertical Scalability. The SDE.DFT operator of the provisioned resources) the Exhaustive Search Algorithm is faster, SDE, for every window it approximates, outputs a pair of values: a since A*–alike requires more initialization time. bucketID and a vector of reduced dimensionality compared to the In Figure 3(c) we show the scalability of the INFORE approach original window. The bucketID is based on the first few coefficients versus a technique that executes the workflow of Figure 2(a) in Flink of the DFT [11] and is used as the key to hash stock streams to (denoted by "Parallelism"), without synopses-based optimization, learners of the CorrelationMatrix OMLDM operator (Figure 2(b)). boosted vertical scalability or the OMLDM Component. INFORE ex- Reduced vector correlations are pruned for stocks hashed far away ecutes the workflow of Figure 2(b). We plot the throughput ratio of in the hashing space since these stock streams are guaranteed to INFORE and "Parallelism" over a Naive approach that executes the be highly (dis)similar [11]. This boosts vertical scalability for a task same workflow with parallelism set to 1. The two horizontal axes in of quadratic complexity to the number of input streams. the figure show the number of processed stock streams (bottom) and Pumped Parameter Server. The stream entering the Corre- the volume in terabytes (top) ingested using the maximum Kafka lation Matrix operator of OMLDM is duplicated to the training rate. Thus, the top axis accounts for horizontal scalability (volume, and prediction pipelines (Figure 1(d)). Learners (buckets in our pre- speed), while the bottom axis accounts for vertical scalability. vious discussion) compute the pairwise correlations of the streams When we monitor 500 streams, i.e., 250K/2 cells in the correlation they receive (correlation coefficient and the beta regression parame- matrix, the fact that "Parallelism" lacks synopses-based optimization ter coincide for standardized vectors), while the PS holds the current and vertical scalability makes its throughput ratio approaching 1, i.e. up-to-date correlation matrix which is essentially the global model the Naive approach. INFORE’s performance remains steady when at the predictor and the PS sides. A concept drift for the global switching from 50 to 500 streams improving "Parallelism" by 3 times. model would occur if the Frobenius norm measuring the difference The most important findings come upon switching to 5000 stocks between the matrix kept at PS and the true correlation matrix may (25M/2 cells in the matrix). Figure 3(c) denotes that because of the have exceeded a threshold. The Optimizer will decompose this con- lack of enhanced horizontal and vertical scalability, "Parallelism" straint to local ones installed at each learner. Mathematical details becomes equivalent to the Naive one. INFORE exhibits more than are described in [17], but in a simple version of FGM, the Optimizer an order of magnitude improved throughput with the trade-off of will prescribe that learners will sync with the PS when a signed highly correlated pairs being identified with 90% accuracy. distance value computed locally at each learner, becomes positive. For federated scalability, we measure communicated bytes among Notice that local constraints are determined by the Optimizer which the operators. Due to space constraints, we only report here that initially prescribes and then adapts the learners at runtime. INFORE can reduce communication by more than an order of mag- Preliminary Experimental Evaluation. We employ a Kafka nitude when 5000 stock streams are being monitored. cluster with 4 Dell PowerEdge R320 Intel Xeon E5-2430 v2 2.50GHz machines with 32GB RAM and a cluster with 10 Dell PowerEdge 5 CONCLUSIONS & ONGOING WORK R300 Quad Core Xeon X3323 2.5GHz machines with 8GB RAM We have presented the design, internal structure, and integration each, for Flink and Spark Structured Streaming. We use real Level 1, aspects of key components of INFORE, a prototype designed for Level 2 stock data (∼5000 stocks/∼10 TB) provided by a European interactive analytics at scale. To our knowledge, INFORE is the first financial company. We set DFT to use 8 coefficients, CountMin system for cross-platform, streaming analytics. We currently enrich (𝜖 = 0.002, 𝛿 = 0.01), we use 0.9 correlation threshold and a time the SDE and OMLDM libraries to strengthen the support for the window of 5 minutes, based on input from the data provider. Fig- three scalability types and enhance analytics capabilities. ure 3(b) shows the time it takes to the two optimization algorithms (Exhaustive and A*–alike) to produce a physical workflow from the ACKNOWLEDGMENTS Klinkenberg, Miguel Ponce de Leon, Gian Gaetano Tartaglia, Alfonso Valencia, and Dimitrios Zissis. 2019. Interactive Extreme: Scale Analytics Towards Battling This work has received funding from the EU Horizon 2020 research Cancer. IEEE Technol. Soc. Mag. 38, 2 (2019), 54–61. https://doi.org/10.1109/MTS. and innovation program INFORE under grant agreement No 825070. 2019.2913071 [9] Ionel Gog, Malte Schwarzkopf, Natacha Crooks, Matthew P. Grosvenor, Allen Clement, and Steven Hand. 2015. Musketeer: all for one, one for all in data REFERENCES processing systems. In Proceedings of the Tenth European Conference on Computer [1] 2016. Data Stream Management - Processing High-Speed Data Streams. Springer. Systems, EuroSys 2015, Bordeaux, France, April 21-24, 2015, Laurent Réveillère, https://doi.org/10.1007/978-3-540-28608-0 Tim Harris, and Maurice Herlihy (Eds.). ACM, 2:1–2:16. https://doi.org/10.1145/ [2] Divy Agrawal, Sanjay Chawla, Bertty Contreras-Rojas, Ahmed K. Elmagarmid, 2741948.2741968 Yasser Idris, Zoi Kaoudi, Sebastian Kruse, Ji Lucas, Essam Mansour, Mourad [10] Jeyhun Karimov, Tilmann Rabl, Asterios Katsifodimos, Roman Samarev, Henri Ouzzani, Paolo Papotti, Jorge-Arnulfo Quiané-Ruiz, Nan Tang, Saravanan Thiru- Heiskanen, and Volker Markl. 2018. Benchmarking Distributed Stream Data muruganathan, and Anis Troudi. 2018. RHEEM: Enabling Cross-Platform Data Processing Systems. In 34th IEEE International Conference on Data Engineering, Processing - May The Big Data Be With You! -. Proc. VLDB Endow. 11, 11 (2018), ICDE 2018, Paris, France, April 16-19, 2018. IEEE Computer Society, 1507–1518. 1414–1427. https://doi.org/10.14778/3236187.3236195 https://doi.org/10.1109/ICDE.2018.00169 [3] Omid Alipourfard, Hongqiang Harry Liu, Jianshu Chen, Shivaram Venkataraman, [11] Antonis Kontaxakis, Nikos Giatrakos, and Antonios Deligiannakis. 2020. A Minlan Yu, and Ming Zhang. 2017. CherryPick: Adaptively Unearthing the Synopses Data Engine for Interactive Extreme-Scale Analytics. In CIKM ’20: The Best Cloud Configurations for Big Data Analytics. In 14th USENIX Symposium 29th ACM International Conference on Information and Knowledge Management, on Networked Systems Design and Implementation, NSDI 2017, Boston, MA, USA, Virtual Event, Ireland, October 19-23, 2020. ACM, 2085–2088. https://doi.org/10. March 27-29, 2017, Aditya Akella and Jon Howell (Eds.). USENIX Association, 469– 1145/3340531.3412154 482. https://www.usenix.org/conference/nsdi17/technical-sessions/presentation/ [12] Mu Li, David G. Andersen, Jun Woo Park, Alexander J. Smola, Amr Ahmed, alipourfard Vanja Josifovski, James Long, Eugene J. Shekita, and Bor-Yiing Su. 2014. Scal- [4] Katerina Doka, Nikolaos Papailiou, Dimitrios Tsoumakos, Christos Mantas, and ing Distributed Machine Learning with the Parameter Server. In 11th USENIX Nectarios Koziris. 2015. IReS: Intelligent, Multi-Engine Resource Scheduler for Big Symposium on Operating Systems Design and Implementation, OSDI ’14, Broom- Data Analytics Workflows. In Proceedings of the 2015 ACM SIGMOD International field, CO, USA, October 6-8, 2014, Jason Flinn and Hank Levy (Eds.). USENIX Conference on Management of Data, Melbourne, Victoria, Australia, May 31 - June Association, 583–598. https://www.usenix.org/conference/osdi14/technical- 4, 2015, Timos K. Sellis, Susan B. Davidson, and Zachary G. Ives (Eds.). ACM, sessions/presentation/li_mu 1451–1456. https://doi.org/10.1145/2723372.2735377 [13] John Meehan, Stan Zdonik, Shaobo Tian, Yulong Tian, Nesime Tatbul, Adam [5] Aaron J. Elmore, Jennie Duggan, Mike Stonebraker, Magdalena Balazinska, Ugur Dziedzic, and Aaron J. Elmore. 2016. Integrating real-time and batch processing Çetintemel, Vijay Gadepally, Jeffrey Heer, Bill Howe, Jeremy Kepner, Tim Kraska, in a polystore. In 2016 IEEE High Performance Extreme Computing Conference, Samuel Madden, David Maier, Timothy G. Mattson, Stavros Papadopoulos, Jeff HPEC 2016, Waltham, MA, USA, September 13-15, 2016. IEEE, 1–7. https://doi. Parkhurst, Nesime Tatbul, Manasi Vartak, and Stan Zdonik. 2015. A Demon- org/10.1109/HPEC.2016.7761585 stration of the BigDAWG Polystore System. Proc. VLDB Endow. 8, 12 (2015), [14] Aristides Milios, Konstantina Bereta, Konstantinos Chatzikokolakis, Dimitris 1908–1911. https://doi.org/10.14778/2824032.2824098 Zissis, and Stan Matwin. 2019. Automatic Fusion of Satellite Imagery and AIS [6] Forbes. 2021 (last accessed). https://www.forbes.com/sites/tomgroenfeldt/ data for Vessel Detection. In 22th International Conference on Information Fusion, 2013/02/14/at-nyse-the-data-deluge-overwhelms-traditional-databases/ FUSION 2019, Ottawa, ON, Canada, July 2-5, 2019. IEEE, 1–5. https://ieeexplore. #362df2415aab. ieee.org/document/9011339 [7] Nikos Giatrakos, David Arnu, Theodoros Bitsakis, Antonios Deligiannakis, Mi- [15] Barzan Mozafari. 2019. SnappyData. In Encyclopedia of Big Data Technologies, nos N. Garofalakis, Ralf Klinkenberg, Aris Konidaris, Antonis Kontaxakis, Yannis Sherif Sakr and Albert Y. Zomaya (Eds.). Springer. https://doi.org/10.1007/978-3- Kotidis, Vasilis Samoladas, Alkis Simitsis, George Stamatakis, Fabian Temme, 319-63962-8_258-1 Mate Torok, Edwin Yaqub, Arnau Montagud, Miguel Ponce de Leon, Holger [16] Proteus Project. 2021 (last accessed). https://github.com/proteus-h2020. Arndt, and Stefan Burkard. 2020. INforE: Interactive Cross-platform Analytics [17] Vasilis Samoladas and Minos N. Garofalakis. 2019. Functional Geometric Moni- for Everyone. In CIKM ’20: The 29th ACM International Conference on Information toring for Distributed Streams. In Advances in Database Technology - 22nd Interna- and Knowledge Management, Virtual Event, Ireland, October 19-23, 2020. ACM, tional Conference on Extending Database Technology, EDBT 2019, Lisbon, Portugal, 3389–3392. https://doi.org/10.1145/3340531.3417435 March 26-29, 2019. OpenProceedings.org, 85–96. https://doi.org/10.5441/002/ [8] Nikos Giatrakos, Nikos Katzouris, Antonios Deligiannakis, Alexander Artikis, edbt.2019.09 Minos N. Garofalakis, George Paliouras, Holger Arndt, Raffaele Grasso, Ralf [18] Stream-lib. 2021 (last accessed). https://github.com/addthis/stream-lib. [19] Yahoo DataSketch. 2021 (last accessed). https://datasketches.github.io/.