Towards Performance Prediction for Stream Processing Applications Johannes Rank Dominik Paluch Harald Kienegger Technical University of Munich Technical University of Munich Technical University of Munich Boltzmannstr. 3 Boltzmannstr. 3 Boltzmannstr. 3 Garching, Germany Garching, Germany Garching, Germany Helmut Krcmar Technical University of Munich Boltzmannstr. 3 Garching, Germany ABSTRACT 1. INTRODUCTION Stream processing systems have become the major engine Current trends in the area of Big Data have increased the whenever data processing with low latency or real-time ca- necessity to process data as it arrives instead of collecting pability is required. Meeting the performance demands of the information for later analysis. In many scenarios, the these systems in terms of latency, throughput and resource data is most valuable at the time of its generation but loses utilization is hence crucial to ensure stable operation and its importance within few seconds or minutes [4]. A com- correct functioning. Current performance modeling approa- mon example for such domains is predictive maintenance, ches target this issue by predicting performance characte- which aims to estimate an equipment failure before the pro- ristics on the architecture level, e.g by measuring existing blem actually occurs and hence allows to prevent a complete deployments as a whole and estimating the impact of rela- breakdown by maintaining the system in time [13]. Such se- ted changes. However, these approaches neglect the actual tups require monitoring and analyzing sensor data as it ar- application logic of the streaming system and are hence neit- rives. This is why so-called stream processing systems (SPS) her able to identify bottlenecks in the processing behavior become more and more important in areas such as IoT or nor to predict performance during development. In this pa- Industry 4.0 [12]. per we propose a more comprehensive performance modeling SPS are designed to continuously handle incoming events approach that considers both, the deployment architecture by providing real-time computation to allow an immedia- and the actual implementation logic of stream processing te reaction on detected trends or patterns. Due to the se- systems on the example of the SAP HANA Streaming Ana- quential processing model of stream applications, an over- lytics Server. For this means, we derive a performance model flow of an internal queue affects all preceding operations and based on the actual source code of the stream processing so- could spread until all prior queues are filled up. These so- lution and compare the simulation of our model with the called over-queuing situations heavily increase the latency of actual measurements. Our results show that the approach a system and, if real-time processing is required, potentially provides much better findings allowing to identify bottlen- bring the whole business scenario to fail. The development ecks in the application logic and to predict the point in time of stream processing solutions thus requires not only to en- when an over-queuing situation occurs to provide valuable sure a high throughput, but also to prevent over-queuing insights for both developers and performance analysts. situations despite varying workload situations [7]. For these reasons, performance is not only a quality of service aspect, Keywords but vital for the whole streaming application scenario to succeed [14]. It is hence a crucial task to ensure performan- Stream Processing, Performance Prediction, Over Queuing ce of SPS during both, the development of the streaming application and the operation of the system. Categories and Subject Descriptors However, despite the necessity to build SPS that meet the B.8.2 [Performance and Reliability] performance demands, there is currently a lack of experti- se to develop and operate these applications in an efficient manner [3]. According to Requeno et al. (2017), ”[...] there is now an urgent need for novel, performance oriented software engineering methodologies and tools capable of dealing with the complexity of such a new environment”[11]. Performan- ce simulation and prediction approaches provide promising solutions for this issue by allowing to determine bottlenecks 31st GI-Workshop on Foundations of Databases (Grundlagen von Daten- and answer sizing questions during both system operation banken), 11.06.2019 - 14.06.2019, Saarburg, Germany. and development [5]. Copyright is held by the author/owner(s). We target this issue by proposing a performance predicti- 3. STREAM PROCESSING on approach that simulates the performance aspects of the SPS are designed to continuously process large volumes SPS for each processing task of the application logic, ins- of data. In contrast to traditional database systems, a stre- tead of the component as a whole. For this reason, we apply aming system does not necessarily persist any information, a transformation of the source code in to a performance mo- but rather treats the data on the fly by maintaining a glo- del to provide simulation based predictions. In this paper we bal state through continuous analyzing or by modifying the focus on the internal queues in order to predict over-queuing data before forwarding it another system. The data sources situations and determine the bottleneck task in order to pro- for SPS are typically sensors or edged computing devices vide better insights than traditional prediction approaches. that monitor a certain object and report their observations We evaluated our approach by predicting the performance continuously in the form of events. Current SPS distinguish of an HANA Streaming Analytics Server and compared the between two processing models micro-batching and event- results with actual measurements. This paper is organized processing. Systems like Apache Spark Streaming that apply as follows. Section 2 describes existing research in the area a micro-batch processing model do not immediately process of performance prediction for SPS. Section 3 introduces the events but rather collect them in a job. As soon as a predefi- concept of stream processing as well as some related consi- ned threshold is reached, the microbatch job is released and derations regarding performance. Section 4 presents our mo- processed by the engine, which typically results in better deling approach including our experiment. Finally, section 5 throughput rates at the cost of a higher latency. In contrast concludes this paper and states future research directions. to that, systems like Apache Storm or Flink apply event- processing and handle the arriving events immediately as they arrive. Other engines as for example the SAP HANA 2. RELATED WORK streaming server support both processing models and there- Despite the fact that the first stream processing engines fore leave the decision to the developer. The stream applica- were already proposed several years ago [1] the number of tion as part of the SPS determines how an event is treated performance prediction approaches related to these systems by the system by defining a sequence of connected opera- are sparse. One of the first approaches that focus on Apa- tions (e.g. filters or functions) that form a directed acyclic che Spark streaming was proposed by Kroß/Krcmar (2016). graph (DAC). Depending on the actually used SPS frame- They applied the Palladio Component Model (PCM) [2] to work, the terminology for an operation varies. Apache Spark simulate the Spark processing framework and parametrized Streaming uses the term stage to describe a set of operations the PCM instance based on measurements obtained from that is executed by one node, whereas Storm uses the term running the HiBench 1 benchmark suite. The approach focu- bolt to describe a similar structure. In the context of this ses on scalability prediction by measuring an existing stream paper we use the HANA streaming terminology and simply processing system and predicting its throughput under va- refer to a stream operation as a task. rying workload scenarios. The simulation yielded accurate results with prediction errors between 0,67% and 3,41% [7]. 3.1 HANA Stream Processing However, the approach treats the application logic as a black In contrast to the open source representatives Apache box and hence does not provide any insights into the appli- Spark Streaming and Apache Storm, the SAP HANA stre- cation’s behavior. aming server is a commercial product that is integrated in- Requeno et al. (2017) propose a framework based on UML to the HANA database platform and focuses on analytical profiles to model Apache Storm applications. This includes workloads. Therefore it provides several libraries e.g. to sup- the modeling of the Storm topology (layout of spouts and port event-based machine learning based on data streams. bolts) as well as the deployment of the respective nodes. Af- The HANA streaming server has a monolithic structure and terwards an automatic model-to-model transformation deri- focuses on single deployments rather than distributed lands- ves a generalized stochastic petri net for simulation purpose. cape with multiple worker-nodes. Streaming application are The prediction covers the thread utilization (CPU) and exe- implemented in projects by using the Continuous Computa- cution time for each bolt [11]. However, the approach does tion Language (CCL) and pushed onto the streaming server. not give further insights into the operations contained in CCL has an SQL like syntax but the server does not execute each bolt as well as the internal queuing behavior. In addi- the statement only once but repeatedly for every incoming tion, it requires to design and parametrize the whole model event. The supported IDE for CCL development is Eclipse with UML which can be considered as time consuming and in combination with the streaming development plugin. Asi- exhaustive. Lin et al. (2018) propose an ABS based model to de from the source code view of the streaming project there simulate Spark streaming applications [9] similar to the ap- is also a graphical representation provided. Figure 1 shows proach in [7]. ABS is a language to describe the behavior of a visual example of a HANA streaming project, inspired distributed object-oriented systems. Their approach allows by the RIOT ETL streaming application benchmark [12]. to configure the stream application, as well as the Spark Events arrive in the .csv format and are firstly parsed into processing framework itself and therefore provides the capa- an internal structure. Afterwards a range filter is applied to bility to evaluate different deployment settings. The focus of omit outliers. The succeeding bloom filter checks if the tuple the approach is to predict the throughput. RAM and CPU belongs to a predefined set of events and discards the infor- can be parametrized but their utilization is not part of the mation otherwise. Afterwards an interpolation is performed prediction. Again the approach does not provide detailed to enrich the data with additional information, followed by insights into the actual processing tasks contained in each a join operation that merges the event with historical da- stage and also internal queues are not considered. ta provided by a HANA in-memory database. Finally, the streaming server annotates the tuple with a timestamp and 1 forwards the information to a database for persistency. Figure 2: Component Diagram Figure 1: Visualized Stream Application 3.2 Performance Considerations Based on the example introduced in Figure 1, several aspects are worth considering in terms of performance that apply to most streaming solutions. Firstly, due to the time criticality of most stream processing business cases, the la- tency is of special importance for any SPS due to the reasons explained in section 1. Since the information value of many events expire within short time periods it is crucial to en- sure that the data items are processed in a timely fashion. Secondly, the throughput is a valuable characteristic to de- cide which volumes of data the streaming engine is able to handle. It is defined by the slowest task (bottleneck) of the streaming application and hence should be the focus of any performance improvement activities. For SPS also the me- mory is of particular interest, since most SPS are designed Figure 3: Component Diagram to keep all the events and processing logic in memory. If data has to be stored on disk or paging is required due to limi- ted RAM resources, the latency heavily increases [14]. Some 4. PERFORMANCE PREDICTION streaming frameworks such as Apache Flink even crash if Due to the characteristics stated in section 3.2 we em- memory over allocation occurs [10]. phasize the importance to build performance models that Another characteristic of SPS are the internal queues. The simulate not only the SPS as a whole but considers all the HANA streaming engine automatically provides one queue individual tasks and queues of the application logic as well. prior to each task to temporarily buffer events if the corre- For this reason, we perform a manual transformation of a sponding task is still occupied by another data tuple. Sin- streaming application based on its CCL source code into ce most SPS are applied in the context of IoT, the corre- an instance of the Palladio Component Model. PCM is a sponding workload is mostly data-driven and often harder meta-model that consists of several model parts to separate- to predict in terms of volume, velocity and variety [8]. Inter- ly describe the different performance aspects of the modeled nal queues hence allow to handle temporary bursts of data system such as its components, resource environment, usage that exceed the usual throughput of the system without cra- profile and behavior. shing the whole application. The situation when one queue is filled up is called over-queuing and is one of the severest 4.1 Modeling Approach vulnerabilities of a SPS. Since every event has to be pro- Preliminary to the model transformation, we perform a cessed according to the DAG, preceding tasks will start to load test with a simple training workload in order to obtain queue-up as soon as the bottleneck queue is full, because the maximum processing rate of each task as well as the ave- they cannot continue with their operation as long as the- rage drop rates of the different filter operations. For this re- re is no space left to store the output. Figure 2 depicts an ason we use the built in streamingmonitor utility to measure example with three queues. Since task3 is the bottleneck, each second the number of processed events for every task. due to its processing rate of 1/s, its preceding queue2 will We use this information later to parameterize our model. start to fill up as soon as queue3 is full. Afterwards, task2 Furthermore, we configure our streaming system to apply is not longer able to sustain its high processing rate because event-processing instead of micro-batching. As a first step no space is left for putting the processed events. Therefore, of our model creation, we define the general architecture of queue2 and eventually queue1 will also queue-up until no our system by using the PCM repository diagram. As depic- more new events can be accepted leading to a forced load- ted in Figure 3 this diagram provides information about our shedding. Even if not the whole system is over-queued, each system’s components as well as the provided interfaces and additional event increases the whole latency of the applicati- services. The performance simulation in this paper focuses on. In the worst-case, this could lead to a situation where all on the stream processing server. However, since our stream the events that are currently in progress by the system are application persists the processed events eventually to the already outdated. For developers and performance analysts database, we need an additional component to model these it is hence most important to identify the bottleneck task external calls. This allows us in future work to extend the and its throughput. This allows to determine at which rate model with a prediction of the database utilization, depen- the queue starts to fill up and how long it would take until ding on the streaming servers output. The stream processing over-queuing arises in situation of unexpected data bursts. server contains six internal queues, one for each of the diffe- Figure 4: SEFF Queuing Process Figure 6: SEFF Queuing Process rent processing tasks. The TableRead and WriteToDB ope- rations are part of the related processing tasks and hence do not own a dedicated queue. On the architecture level, we model a queue as a passive resource with a capacity of branch action. PCM supports two types of branch transi- 1024 events, which is the default size for the HANA strea- tions, the guarded transition requires a boolean expression ming server. For each service provided by a component via to decide which branch to choose, whereas the probabilistic its interface, a so-called Service Effect Specification (SEFF) transition only needs a probability assigned to it. For our is created. The SEFF itself is another model that is linked model we choose a probabilistic transition in order to re- to the repository diagram and describes the actual behavior present the average drop rates of the respective filters. As of the respective services depcited as a finite state machine. depicted in Figure 5 a filter processes an incoming task and In order to obtain the SEFF for the streaming application, hence invokes a processing resource demand. If the event is we perform a manual transformation of the CCL source co- dropped it still has to release its claim on the current queue, de into its respective PCM model elements. Each streaming otherwise the event is forwarded to the next internal action. task is mapped to an internal action and parametrized with The workload is defined in the usage model. For our predic- the average processing rates obtained by the initial measure- tions we define two different usage scenarios for the data ments. An internal action describes a local resource demand rates 1000 and 1600 events per second. This allows us to invoked by processing a single event. In this example we on- simulate both load situations without changing the model. ly define a throughput rate in order to limit the number We choose an open workload since requests arrive at a cer- of events that can be processed per second. In future work tain rate which implies that the total number of events in an assignment of hardware resources such as CPU or RAM the system can be variable in contrast to a closed workload would also be possible. Before entering the task, each event model. This is required in order to predict the over-queuing has to acquire a token from the related passive resource (as situations. The scenario behavior describes how each event defined in the repository diagram) and is only allowed to interacts with the system. In our case it just enters the SEFF enter the task in case at least one token is available, which of our application and passes the variable workload that con- is similar to concept of semaphores. After leaving the task tains some characteristics about the current usage scenarios. the event has to acquire the token from the next queue be- Figure 6 displays the scenario for 1000 events per second. fore releasing its hold. Figure 4 depicts this behavior on the example of the parse stream and range filter action. 4.2 Experiment For our experiment we used an electricity-usage dataset obtained from the Boston Central Library 2 as our workload. The dataset consists of about 250000 power measurements that were recorded every 5 minutes since the year 2016. We used the first 200000 tuples of the dataset and split them into a training, and test set with 100000 records each. For our streaming system we setup a virtual machine with 1.0 processing units and 32 GB RAM based on an IBM Power E870 Server with 4.19 GHz and SLES12 SP02. We used the training set to perform a simple load test, while simulta- neously monitoring each task via the streaming monitor uti- lity to obtain the parametrization for our model. Afterwards we ran the simulation of our PCM instance multiple times Figure 5: SEFF Filtering Process 2 In case the task is a filter operation, it is followed by a electricity-usage . . 13.72 6.61 Parse Stream Parse Stream 14.32 6.69 11.61 5.03 Range Filter Range Filter 12.16 5.21 7.92 3.99 Bloom Filter Bloom Filter 8.31 MOTS 4.46 MOTS SOTS SOTS 5.29 2.31 Interpolation Interpolation 5.66 2.78 Figure 7: Over-queuing Prediction 1000/s Figure 8: Over-queuing Prediction 1600/s by using the SimuCom3 plugin to predict the performan- queued after 2.78 seconds. Both simulations provided reaso- ce for the two different workload situations. We choose the nable results with an average accuracy of 95.01% (1000/s) event rates 1000/sec and 1600/sec to include a small scalabi- and 91.97% (1600/s). The model can hence also be used for lity prediction and compared the results with corresponding predicting different workload scenarios. The differences bet- measurements obtained by running the test workload. Our ween simulation and measurement can be mostly attributed simulation focused on the utilization of the internal queues to two factors. Firstly, our probabilistic filter operations do or more precisely at which point the different queues in the not perfectly reflect the actual drop rates of the measure- system will be full. Figure 7 compares the measured over- ment. In summary, the difference between the average dro- queuing timestamp (MOTS) and the simulated over-queuing prate of the training and the test dataset is less than 2%. The timestamp (SOTS) for an event rate of 1000 records per se- other factor is the dynamic load-balancing. If a task over- cond. After 5.66 seconds, the queue of the Interpolation task queues, the run-time environment of the streaming server is filled up. Through this effect, also the preceding Bloom reallocates the CPU shares among the threads. For this rea- Filter queues eventually up after 8.31 seconds, even if its son our defined processing rates need to be modified as well capable processing rate is much faster than 1000/s. Finally, in order to reflect these dynamic changes. Since, the PCM the whole streaming server over-queues after 14.32 seconds. framework does not allow any kind of dynamic ResourceDe- This leads to an omitting of new events due to dynamic mand redistribution, we have to perform multiple simulation load shedding. For a time-critical system, this would imp- runs. Each time a queue runs full, a new SEFF has to be ge- ly that not only the real-time requirement could fail due to nerated containing the adapted average processing rates. In the increased latency (queuing delay), but the also results addition the queue-sizes in the component diagram need to of the streaming servers calculations would be distorted due be adapted according to the state before the load-balancing to the loss of the latest events. Our simulation predicts that took effect. The whole simulation of the usage scenario is the over-queuing already occurs after 13.72 seconds, whe- hence performed in four stages, with each stage depending reas the measured time is 0.6 seconds later. A developer on the results of the previous one. Inaccuracies of previous or analyst would get several valuable insights from such a stage are hence carried to the following simulation stages. result. Firstly, the bottleneck can be identified as the In- terpolation task, since it is the first task that over-queues. 4.3 Limitations Through such an insight any performance improvement ac- As described in the previous section, we needed four si- tivities can be focused on the application level, which would mulation runs for each of our usage scenarios due to the be much more efficient than just providing more hardware integrated load-balancing of the HANA streaming platform. resources. Hirzel, et al. (2014), for example proposes a ca- This can be considered as one of the strongest limitations talog consisting of 11 stream processing optimizations that of the current approach, since the manual creation of mul- are mostly applied on task level [6]. Secondly, the predicti- tiple SEFF stages is time-consuming. The best solution for on yields that system is not capable of handling a workload this issue would be to extend the PCM framework in or- of 1000/s. However, due to the data-driven characteristic of der to support dynamic ResourceDemands. For this reason most workloads related to SPS, unexpected data bursts can a representation as depicted in Figure 9 would probably be always occur. The most important insight is hence that ac- sufficient. In this case, depending on the current state of the cording to the simulation, the system would, in case of such queue the processing rate would be increased or reduced. a burst, still remain stable for 13.72 seconds. This predic- Another option would be to automate the transformation of tion is hence a good example why task based performance CCL source code to PCM instances. An automatic recreati- considerations are important for stream processing systems. on of models based on previous simulation runs would also Figure 8 depicts the simulation and measurement results allow more flexibilty. Another limitation is, that our cur- with a workload of 1600/s. 