Efficient Migration of Very Large Distributed State for Scalable Stream Processing Bonaventura Del Monte supervised by Prof. Volker Markl DFKI GmbH bonaventura.delmonte@dfki.de ABSTRACT must optimally manage cluster resources respecting the size of Any scalable stream data processing engine must handle the the state. This does not only apply to intra-cluster instances dynamic nature of data streams and it must quickly react to but also inter-cluster ones, e.g., migrating the SPE among op- every fluctuation in the data rate. Many systems successfully erational environments or to cheaper “pay-as-you-go” instances. address data rate spikes through resource elasticity and dynamic Besides, parallel analytic algorithms need global state. Parallel load balancing. The main challenge is the presence of stateful op- instances of an operator work on their state partition and then erators because their internal, mutable state must be scaled out update global state, e.g., machine learning models. Therefore, while assuring fault-tolerance and continuous stream processing. these analytics result in very large distributed state. Both rescaling, load balancing, and recovering demand state Research goal. Motivated by industrial needs, our goal is to movement among work units. Therefore, how to guarantee those achieve stream processing with low latency and high throughput features in the presence of large distributed state with minimal when operators handle very large state. To this end, we focus on impact on the performance is still an open issue. We propose an management techniques that enable fault-tolerance, on-demand incremental migration mechanism for fine-grained state shards resource scaling, and load balancing in the presence of very large through periodic incremental checkpoints and replica groups. distributed state. This enables moving large state with minimal impact on stream Problem statement. Handling operators with very large dis- processing. Finally, we present a low-latency hand-over protocol tributed state is cumbersome. Guaranteeing fault-tolerance, that smoothly migrates tuples processing among work units. resource elasticity, and dynamic load balancing for these op- erators (i) require state transfer, (ii) must not undermine the consistency of distributed state shards, and (iii) demand robust 1. INTRODUCTION query processing performance. State transfer introduces latency Existing scalable Stream Data Processing Engines (SPEs) proportional to its size. Exactly-once stream processing requires offer fast stateful processing of data streams with low latency consistent state, i.e., results must be as accurate as if no failure and high throughput despite fluctuations in the data rate. To happened or the SPE did not perform any rescaling or rebalanc- this end, stateful processing benefits from on-demand resource ing operation on the state. Besides, a SPE must continuously elasticity, load balancing, and fault tolerance. Currently, both process stream tuples despite any of those operations. research [5, 6, 17, 13] and industry [1, 4, 18] address scaling Current approaches. To the best of our knowledge, there is up stateful operators while assuring fault tolerance in case of no system that fully features efficient state management when partitioned or partially distributed large state. Here, large state distributed very large state is involved. Many authors investi- means hundreds of gigabytes. gated this problem by constraining their scope to partitioned or A motivating example. Many streaming applications require partially distributed state [5, 4, 18] and to smaller size [7, 6, 17]. stateful processing. Examples of such applications are the data Proposed solution. Our solution is a low-latency incremental analytics stacks behind popular multimedia services, online mar- migration mechanism that moves fine-grained state shards by ketplaces, and mobile games. These stacks perform complex using periodic incremental checkpoints and replica groups. An event processing on live streams. Multimedia services and on- incremental checkpoint is a periodic snapshot of a state shards line marketplaces recommend new contents or items to their that involves only modified values. A replica group is a set of users through collaborative filtering [16]. Producers of mobile computing instances holding a copy of a portion of the state. games track in-game behaviour of players to promote the best Our migration mechanism moves large operator states with low in-app purchase and to detect frauds. The size of the state in impact on the system performance and without stopping the these applications scales with the number of users and their streaming topology. Although incremental migration reduces interactions with the application (e.g., rated items, purchases, the transfer overhead, we also provide a placement scheme for actions of a player) and can grow to terabyte sizes. State in the primary state shards and replica groups that minimizes transfer size of terabytes introduces a multifaceted challenge. The SPE cost. Our solutions are as follows: 1. a communication-efficient replication protocol that keeps a replica group consistent with the changes in the state of the primary operator 2. an optimal primary state shards and replica groups placement for decreasing migration cost 3. a hand-over protocol that migrates the processing between Proceedings of the VLDB 2017 PhD Workshop, August 28, 2017. Munich, two work units with minimal latency. Germany. Copyright (c) 2017 for this paper by its authors. Copying permitted for We point out that this thesis is at an early stage, hence, we do private and academic purposes.. not have any experimental validation yet. 2. RELATED WORK different work units. Each replica is updated through incremen- Castro et al. address the problem of scaling up and recovering tal checkpoints generated on the primary operator. In addition, stateful operators in a cloud environment through a set of primi- intrinsic issues of migration pose new challenges, e.g., data con- tives for state management that enables scaling up and recovery sistency, tuples rerouting, physical shards handling, and network of stateful operators [5]. Their experiments include operators transfer cost. To better explain our key idea, we first define our with small states and they confirmed that larger state has a data and system models. Then we provide an analysis of our higher recovery time. In a second work, the same authors pro- research goals. pose a new abstraction over large mutable state, called stateful dataflow graph, which manages partitioned or partial distributed 3.1 System Model state [6]. Our aim is to fill the gap in this area by providing Data Model. Let S be a stream of tuples, for each tuple a mechanism that both scales out and recovers a long-running q ∈S, we define kq as the value of the partitioning key and tq system with very large distributed state. ChronoStream is a as its monotonically generated time-stamp. system that seamlessly migrates and executes tasks [17], whose Stream processing. Our system is made of p work units run- authors believe to have achieved costless migration thank to a ning on z physical nodes (each of them can run a variable locality-sensitive data placement scheme, delta checkpointing, number of work units). Our system executes jobs/queries ex- and a lightweight transactional migration protocol. Although pressed as a dataflow graph. Each operator of the graph runs their experiments look promising, we argue transactional migra- on maximum p parallel instances. An operator takes n streams tion may be avoided by using two different protocols (one for and outputs m streams. Every parallel instance receives tuples state migration and one for the hand-over) and delta checkpoints (sent from upstream operators) w.r.t. a distribution function adds synchronization issues. that computes the assignments through kq . Ding et al. deal with finding the optimal task assignment that State model. The global state of all the operators in the minimizes the costs for state migration and satisfies load bal- streaming topology is a distributed logically partitionable data ancing constraints [7]. To this end, they introduce a live and store (e.g., a distributed K-V store). Partitions of this data progressive migration mechanism with negligible and controllable store contain a single state entry, e.g., window content of an delay. They come to a different conclusion w.r.t. ChronoStream, operator, user-defined counters. Each logical partition is made because they also argue that synchronization issues may affect of physical shards. Every parallel instance of an operator holds results correctness while performing a migration. The solution its own shard. Besides, each shard is made of fine grained data of Ding et al. performs multiple mini-migrations progressively: items. Each key of the input stream owns few data item in each mini-migration migrates a number of tasks smaller than every logical partition of the state and each shard holds a range a given threshold [7]. On the other hand, their experiments do of keys. Each range of keys can be further partitioned and not cover large state migration and it is unclear how the system optionally split. Distributed state demands some consistency could perform in such task. Furthermore, both ChronoStream guarantee in case (i) a key needs to be stored in multiple shards, and Ding et al. consider partitioned state. and (ii) tuple processing might trigger changes in more than one Nasir et al. present partial key grouping as a solution to handle shard. The distribution function determines the content of the load imbalance caused by skewness in the keys distribution of shards kept by stateful instances. Thus, each parallel instance input streams [14, 15]. The main idea is to keep track of the of an operator does not only process tuples with specific keys number of items in each parallel instance of an operator and but it also holds the data items of state for those keys. route a new item to the instance with smaller load. Items with the same key are routed to different parallel instances of the 3.2 Incremental Checkpoints same operator. An improvement to the solution is to determine A prerequisite for our set of protocols is an incremental check- the “hottest” keys in the stream and assign more workers to point protocol based on the approach of Carbone et al. [3]. In- those keys. However, they assumed the operator state has the stead of taking a snapshot of the whole state, we asynchronously associative property, thus merging intermediate partitioned sub- checkpoint the modified state values between the previous check- states is possible with an extra aggregate operation. Splitting point and the current one. An asynchronous checkpoint executed the state of a given key, indeed, mitigates its growth on one at time time t will not contain updates happened later than t. working unit, yet aggregating large state will require some po- 3.3 Replication Protocol tentially expensive network transfers. Our aim is to propose a load balancing approach that avoids such partial aggregations. We design a replication protocol to keep the global state of Gedik et al. propose transparent auto-parallelization for stream a streaming topology replicated and consistent. This protocol processing through a migration mechanism [8]. However, we replicates every primary state of each operator instance on a argue that their approach does not consider distributed large given number of work unit, i.e., each sub-range of keys has its state and it is totally decoupled from fault-tolerance. own replica group. The purpose of a replica group is to keep a Many SPEs have effectively implemented state management copy of different sub-ranges of keys for each operator. A primary techniques (e.g., Apache Flink [1, 4], Apache Spark [18], SEEP operators sends incremental checkpoints for a given range of [6], Naiad [13]). In particular, Apache Flink features a technique keys to its replica through the network. that asynchronously checkpoints the global states to minimize 3.4 Hand-Over Protocol the latency of a snapshot [3]. The hand-over protocol moves the processing of a given keys range (ks ,ke ) between two parallel instance of a target stateful operator. The system triggers this protocol when it detects 3. RESEARCH ISSUES the need of either rescaling an operator, balancing the load Our goal is to move large operator states with minimal im- over parallel instances of an operator, or recovering an operator. pact on the performance of query processing. Migrating large Main ideas behind this protocol are the usage of replica groups, states between operator instances in one shot is expensive due to incremental checkpoints and the embedding of the protocol itself network transfer, especially if the system is already overloaded in the dataflow paradigm. Moving the processing of any key during its regular operation. Our key idea is to incrementally involves tuple rerouting and migration of the state for that key. maintain a replica group for each fine-grained state unit over This operation is lightweight if the destination instance is in the replica group of the moved key. Indeed, the replica group 4.1 Achievement Plan misses at most the last incremental checkpoint. Let upstream We have a clear idea about the achievement of our goal, which be all the operators that send some input tuples to a target we define in the following sections. downstream operator, the steps of the protocol are: Fault-tolerance. Our replication protocol guarantees that each 1. The system decides to migrate that tuples marked with keys replica group holds a copy of some ranges of keys for different in range (ks ,ke ), from downstream instance os to ot , which operators. Since each key is replicated in q+1 physical units, is in the replica group of (ks ,ke ) the system can sustain up to q failing instances of an operator 2. Upstream injects a key move event in the data flow for keys by resuming the computation on one unit in its replica group. ks ,...,ke involving operators os and ot The system may need to replay some tuple unless the group has 3. Upstream sends its outgoing tuples marked with keys ks ,...,ke the latest state checkpoint and the failing unit did not process to ot , which processes them creating new states s0e ,...,s0t any newer tuple. 4. os generates an incremental checkpoint that contains its Load balancing. Relying on a load balancing policy (e.g., current states se ,...,st for keys ks ,...,ke and sends it to ot shard size or ingested tuples count above a given threshold), the 5. As soon as ot gets the incremental checkpoint, it updates its system triggers the hand-over protocol. Then, the hand-over current states se ,...,st with the received checkpoint protocol seamlessly moves the processing of some keys ranges 6. Then ot asynchronously merges them with s0e ,...,s0t . If new from a primary work unit to another in their replica groups. tuples arrive in ot , it generates new states and subsequently Determining the placement of ranges of keys for the primary merges them. state is another orthogonal challenge that we plan to overcome. As a result, the handover protocol guarantees eventual consis- Resource elasticity. Regardless of the chosen elasticity policy, tency on every migrated primary state after merging. Moreover, we need to efficiently rescale the state of every range of keys we assume that user-defined state has update and merge policies; along with its replicas minimizing the transfer cost. Rescaling the former updates state by processing a stream tuple, whereas possibly involves deleting some replicas, whereas state transfer the latter merges two partial states for the same key. If merging can be still done incrementally by using above protocols. As of partial state is not semantically possible, then the target we consider primary state and replica as one entity, we reassign instance buffers incoming tuples and updates the state upon its them to parallel instances as described in Section 3.5. This full receiving. procedure could benefit from current IaaS platforms where multiple VMs or containers share physical hardware. Indeed, we 3.5 Optimal Placement of Replica Groups may provision new resources on either an already used physical Each keys replica group is composed of q physical nodes, node or a new node. The last scenario is more challenging as as we aim to minimize continuous migration cost, the replica the system must migrate entire shards of the state to the new group has to be optimally placed over the streaming topology. node. Indeed, transferring an incremental checkpoint from a node a to b could potentially have a different cost than shipping the 4.2 The system in action same checkpoint to node c. This problem can be mapped as In Figure 1, we show a toy example of our system while it a bipartite graph matching problem whose classic solution is seamlessly performs resource scaling, state recovery, and load well-know as the Hungarian or Kuhn-Munkres algorithm [11, balancing. The figure shows a simple dataflow graph made of 12]. Nevertheless, our scenario is not static as we need to deal one source (parallelism=2) and one operator (parallelism=4). with resource scaling and failing nodes. Therefore, a dynamic For the sake of simplicity, we marked tuples and state for the approach to the assignment problem [10] is the best fit for our same keys range with the same colour. Each primary state for needs, since we look for an optimal assignment of the state items every keys range has only one replica group. In Figure 1.A, to an elastic set of physical nodes. Our optimization problem is the first instance is failing while the third one is overloaded. formulated as follows: given l sub-ranges of keys and z physical In Figure 1.B, the hand-over protocol seamlessly moves the nodes, find a placement for each sub-range of keys over q out of processing and the state of both yellow and violet keys ranges. z nodes that minimizes the migration cost. We evaluate the cost As the state of the yellow key range was on a failing node, our of shipping an incremental checkpoint between two nodes by system must reply lost tuples. Meanwhile, the fourth instance considering their workloads and the number of network switches processes violet tuples and creates a new partial state. The involved. hand-over protocol merges this partial state with the current replica and the last incremental checkpoint. Simultaneously, our 4. RESEARCH PLAN system provisions a new instance and migrates the red state In this thesis, we intend to investigate above research issues as it detects an overloaded second instance. In Figure 1.C, w.r.t. our goal: transparently providing fault-tolerance, resource the system is finally stable. The violet state is migrated and elasticity, and load balancing in the presence of very large dis- replicated on the fourth and second instance, respectively. The tributed state. Our focus is to investigate the trade-offs behind yellow state is restored on the second instance and replicated our proposed solution. First, the hand-over protocol presents on the new instance. The red state is replicated on the new several challenges, e.g., the granularity of the keys ranges, the instance. concurrent execution of the protocol, and the triggering policy of the protocol (through either consensus, common knowledge 4.3 Evaluation Plan or centralized entity). Secondly, we plan to investigate the We assess the capabilities of our system through the following usage of log-less replication (similarly to Bizur [9]) by using set of Key Performance Indicators (KPIs): shared registers [2]. Besides, log-less replication implies no log 1. the execution of our protocols must have negligible effect on compaction overhead. As network is the main bottleneck, we query processing performance plan to research orthogonal optimizations to reduce network 2. the system must guarantee exactly-once stream processing overhead, e.g, remote direct memory access, data compression, and state consistency and approximation. Lastly, the placement scheme of replica 3. performing a load balancing or a resources scaling operation groups may require further investigation as our initial definition must improve resource utilization of the physical infrastruc- of migration might neglect significant hidden cost. ture and prevent bottlenecks (e.g., operator back-pressure) A B C NEW P1 NEW P1 SRC1 SRC1 SRC1 P2 P2 P2 P3 P3 P3 SRC2 SRC2 SRC2 P4 P4 P4 OP3 normal new failing overloaded stream primary incremental incremental lost tuple to replica instance instance instance instance tuple state checkpoint migration replay Figure 1: Our protocols in action: tuples and state for the same keys range are marked with the same colour. Primary state for every keys range is incrementally replicated only once. Sensible steps of the hand-over protocol are circled. To meet above KPIs, we intend to design a suite of benchmarks [5] R. Castro Fernandez, M. Migliavacca, et al. Integrating that thoroughly stresses our proposed system. We plan to define scale out and fault tolerance in stream processing using a set of metrics (e.g., tuple processing throughput and latency, operator state management. In ACM SIGMOD. 2013. migrated state items, checkpoint size) and measure them in our [6] —. Making state explicit for imperative big data system on different real-world workloads, with distinct scaling processing. In USENIX ATC. 2014. and balancing policies, and different replica factors. Finally, we [7] J. Ding, T. Fu, et al. Optimal operator state migration for expect to compare our results with baseline systems. elastic data stream processing. CoRR, abs/1501.03619, 2015. 4.4 Future directions [8] B. Gedik, S. Schneider, et al. Elastic scaling for data We envision a system able to continuously process stream stream processing. IEEE Trans. Parallel Distrib. Syst., tuples despite data rate spikes and failures. This system can 2014. also seamlessly migrate itself among cluster, e.g., from one [9] E. Hoch, Y. Ben-Yehuda, et al. Bizur: A key-value IaaS-provider to a cheaper vendor, between two operational consensus algorithm for scalable file-systems. CoRR, environments. Incremental state migration will be a building abs/1702.04242, 2017. block of such operations. Other orthogonal research areas may be: (i) investigating the usage of new storage hardware, e.g, [10] G. A. Korsah, A. T. Stentz , et al. The dynamic NVRAM and SSD, (ii) considering non-keyed state and query- hungarian algorithm for the assignment problem with able state, (iii) providing elastic job maintenance, (iv) exploring changing costs. Tech. Rep. CMU-RI-TR-07-27, 2007. data compression techniques to reduce state size, and (v) in- [11] H. W. Kuhn. The hungarian method for the assignment vestigating incremental state migration (and resource elasticity) problem. Naval Research Logistics Quarterly, 1955. in case of Hybrid Transactional-Analytical Processing (HTAP) [12] J. Munkres. Algorithms for the assignment and workloads. transportation problems. Journal of the Society of Acknowledgments: We would like to thank our advisor Prof. Industrial and Applied Mathematics, 1957. Volker Markl, as well as Prof. Tilmann Rabl for his valuable [13] D. Murray, F. McSherry, et al. Naiad: A timely dataflow guidance, as well as Dr. Asterios Katsifodimos, Dr. Sebastian system. In ACM SOSP. 2013. Breß, and Dr. Alireza Rezaei Mahdiraji for their support. This [14] M. Nasir, G. Morales, et al. The power of both choices: work has been partially supported by the European Commission Practical load balancing for distributed stream processing through PROTEUS (ref. 687691). engines. CoRR, abs/1504.00788, 2015. [15] —. When two choices are not enough: Balancing at scale 5. REFERENCES in distributed stream processing. CoRR, abs/1510.05714, [1] A. Alexandrov, R. Bergmann, et al. The stratosphere 2015. platform for big data analytics. The VLDB Journal, 2014. [16] R. Sumbaly, J. Kreps, et al. The big data ecosystem at [2] H. Attiya, A. Bar-Noy, et al. Sharing memory robustly in linkedin. In ACM SIGMOD. 2013. message-passing systems. In ACM PODC. 1990. [17] Y. Wu and K. Tan. Chronostream: Elastic stateful stream [3] P. Carbone, G. Fóra, et al. Lightweight asynchronous computation in the cloud. In IEEE ICDE. 2015. snapshots for distributed dataflows. CoRR, [18] M. Zaharia, T. Das, et al. Discretized streams: abs/1506.08603, 2015. Fault-tolerant streaming computation at scale. In ACM [4] P. Carbone, A. Katsifodimos, et al. Apache flinkTM : SOSP. 2013. Stream and batch processing in a single engine. IEEE Data Eng. Bull., 30(40), 2015.