=Paper= {{Paper |id=Vol-1882/paper01 |storemode=property |title=Efficient Migration of Very Large Distributed State for Scalable Stream Processing |pdfUrl=https://ceur-ws.org/Vol-1882/paper01.pdf |volume=Vol-1882 |authors=Bonaventura Del Monte |dblpUrl=https://dblp.org/rec/conf/vldb/Monte17 }} ==Efficient Migration of Very Large Distributed State for Scalable Stream Processing== https://ceur-ws.org/Vol-1882/paper01.pdf
       Efficient Migration of Very Large Distributed State for
                    Scalable Stream Processing

                                                    Bonaventura Del Monte
                                                supervised by Prof. Volker Markl
                                                         DFKI GmbH
                                                 bonaventura.delmonte@dfki.de

ABSTRACT                                                                  must optimally manage cluster resources respecting the size of
Any scalable stream data processing engine must handle the                the state. This does not only apply to intra-cluster instances
dynamic nature of data streams and it must quickly react to               but also inter-cluster ones, e.g., migrating the SPE among op-
every fluctuation in the data rate. Many systems successfully             erational environments or to cheaper “pay-as-you-go” instances.
address data rate spikes through resource elasticity and dynamic          Besides, parallel analytic algorithms need global state. Parallel
load balancing. The main challenge is the presence of stateful op-        instances of an operator work on their state partition and then
erators because their internal, mutable state must be scaled out          update global state, e.g., machine learning models. Therefore,
while assuring fault-tolerance and continuous stream processing.          these analytics result in very large distributed state.
Both rescaling, load balancing, and recovering demand state               Research goal. Motivated by industrial needs, our goal is to
movement among work units. Therefore, how to guarantee those              achieve stream processing with low latency and high throughput
features in the presence of large distributed state with minimal          when operators handle very large state. To this end, we focus on
impact on the performance is still an open issue. We propose an           management techniques that enable fault-tolerance, on-demand
incremental migration mechanism for fine-grained state shards             resource scaling, and load balancing in the presence of very large
through periodic incremental checkpoints and replica groups.              distributed state.
This enables moving large state with minimal impact on stream             Problem statement. Handling operators with very large dis-
processing. Finally, we present a low-latency hand-over protocol          tributed state is cumbersome. Guaranteeing fault-tolerance,
that smoothly migrates tuples processing among work units.                resource elasticity, and dynamic load balancing for these op-
                                                                          erators (i) require state transfer, (ii) must not undermine the
                                                                          consistency of distributed state shards, and (iii) demand robust
1.    INTRODUCTION                                                        query processing performance. State transfer introduces latency
   Existing scalable Stream Data Processing Engines (SPEs)                proportional to its size. Exactly-once stream processing requires
offer fast stateful processing of data streams with low latency           consistent state, i.e., results must be as accurate as if no failure
and high throughput despite fluctuations in the data rate. To             happened or the SPE did not perform any rescaling or rebalanc-
this end, stateful processing benefits from on-demand resource            ing operation on the state. Besides, a SPE must continuously
elasticity, load balancing, and fault tolerance. Currently, both          process stream tuples despite any of those operations.
research [5, 6, 17, 13] and industry [1, 4, 18] address scaling           Current approaches. To the best of our knowledge, there is
up stateful operators while assuring fault tolerance in case of           no system that fully features efficient state management when
partitioned or partially distributed large state. Here, large state       distributed very large state is involved. Many authors investi-
means hundreds of gigabytes.                                              gated this problem by constraining their scope to partitioned or
A motivating example. Many streaming applications require                 partially distributed state [5, 4, 18] and to smaller size [7, 6, 17].
stateful processing. Examples of such applications are the data           Proposed solution. Our solution is a low-latency incremental
analytics stacks behind popular multimedia services, online mar-          migration mechanism that moves fine-grained state shards by
ketplaces, and mobile games. These stacks perform complex                 using periodic incremental checkpoints and replica groups. An
event processing on live streams. Multimedia services and on-             incremental checkpoint is a periodic snapshot of a state shards
line marketplaces recommend new contents or items to their                that involves only modified values. A replica group is a set of
users through collaborative filtering [16]. Producers of mobile           computing instances holding a copy of a portion of the state.
games track in-game behaviour of players to promote the best              Our migration mechanism moves large operator states with low
in-app purchase and to detect frauds. The size of the state in            impact on the system performance and without stopping the
these applications scales with the number of users and their              streaming topology. Although incremental migration reduces
interactions with the application (e.g., rated items, purchases,          the transfer overhead, we also provide a placement scheme for
actions of a player) and can grow to terabyte sizes. State in the         primary state shards and replica groups that minimizes transfer
size of terabytes introduces a multifaceted challenge. The SPE            cost. Our solutions are as follows:
                                                                          1. a communication-efficient replication protocol that keeps a
                                                                             replica group consistent with the changes in the state of the
                                                                             primary operator
                                                                          2. an optimal primary state shards and replica groups placement
                                                                             for decreasing migration cost
                                                                          3. a hand-over protocol that migrates the processing between
Proceedings of the VLDB 2017 PhD Workshop, August 28, 2017. Munich,          two work units with minimal latency.
Germany.
Copyright (c) 2017 for this paper by its authors. Copying permitted for   We point out that this thesis is at an early stage, hence, we do
private and academic purposes..                                           not have any experimental validation yet.
2.   RELATED WORK                                                    different work units. Each replica is updated through incremen-
   Castro et al. address the problem of scaling up and recovering    tal checkpoints generated on the primary operator. In addition,
stateful operators in a cloud environment through a set of primi-    intrinsic issues of migration pose new challenges, e.g., data con-
tives for state management that enables scaling up and recovery      sistency, tuples rerouting, physical shards handling, and network
of stateful operators [5]. Their experiments include operators       transfer cost. To better explain our key idea, we first define our
with small states and they confirmed that larger state has a         data and system models. Then we provide an analysis of our
higher recovery time. In a second work, the same authors pro-        research goals.
pose a new abstraction over large mutable state, called stateful
dataflow graph, which manages partitioned or partial distributed
                                                                     3.1    System Model
state [6]. Our aim is to fill the gap in this area by providing         Data Model. Let S be a stream of tuples, for each tuple
a mechanism that both scales out and recovers a long-running         q ∈S, we define kq as the value of the partitioning key and tq
system with very large distributed state. ChronoStream is a          as its monotonically generated time-stamp.
system that seamlessly migrates and executes tasks [17], whose       Stream processing. Our system is made of p work units run-
authors believe to have achieved costless migration thank to a       ning on z physical nodes (each of them can run a variable
locality-sensitive data placement scheme, delta checkpointing,       number of work units). Our system executes jobs/queries ex-
and a lightweight transactional migration protocol. Although         pressed as a dataflow graph. Each operator of the graph runs
their experiments look promising, we argue transactional migra-      on maximum p parallel instances. An operator takes n streams
tion may be avoided by using two different protocols (one for        and outputs m streams. Every parallel instance receives tuples
state migration and one for the hand-over) and delta checkpoints     (sent from upstream operators) w.r.t. a distribution function
adds synchronization issues.                                         that computes the assignments through kq .
Ding et al. deal with finding the optimal task assignment that       State model. The global state of all the operators in the
minimizes the costs for state migration and satisfies load bal-      streaming topology is a distributed logically partitionable data
ancing constraints [7]. To this end, they introduce a live and       store (e.g., a distributed K-V store). Partitions of this data
progressive migration mechanism with negligible and controllable     store contain a single state entry, e.g., window content of an
delay. They come to a different conclusion w.r.t. ChronoStream,      operator, user-defined counters. Each logical partition is made
because they also argue that synchronization issues may affect       of physical shards. Every parallel instance of an operator holds
results correctness while performing a migration. The solution       its own shard. Besides, each shard is made of fine grained data
of Ding et al. performs multiple mini-migrations progressively:      items. Each key of the input stream owns few data item in
each mini-migration migrates a number of tasks smaller than          every logical partition of the state and each shard holds a range
a given threshold [7]. On the other hand, their experiments do       of keys. Each range of keys can be further partitioned and
not cover large state migration and it is unclear how the system     optionally split. Distributed state demands some consistency
could perform in such task. Furthermore, both ChronoStream           guarantee in case (i) a key needs to be stored in multiple shards,
and Ding et al. consider partitioned state.                          and (ii) tuple processing might trigger changes in more than one
Nasir et al. present partial key grouping as a solution to handle    shard. The distribution function determines the content of the
load imbalance caused by skewness in the keys distribution of        shards kept by stateful instances. Thus, each parallel instance
input streams [14, 15]. The main idea is to keep track of the        of an operator does not only process tuples with specific keys
number of items in each parallel instance of an operator and         but it also holds the data items of state for those keys.
route a new item to the instance with smaller load. Items with
the same key are routed to different parallel instances of the
                                                                     3.2    Incremental Checkpoints
same operator. An improvement to the solution is to determine           A prerequisite for our set of protocols is an incremental check-
the “hottest” keys in the stream and assign more workers to          point protocol based on the approach of Carbone et al. [3]. In-
those keys. However, they assumed the operator state has the         stead of taking a snapshot of the whole state, we asynchronously
associative property, thus merging intermediate partitioned sub-     checkpoint the modified state values between the previous check-
states is possible with an extra aggregate operation. Splitting      point and the current one. An asynchronous checkpoint executed
the state of a given key, indeed, mitigates its growth on one        at time time t will not contain updates happened later than t.
working unit, yet aggregating large state will require some po-      3.3    Replication Protocol
tentially expensive network transfers. Our aim is to propose a
load balancing approach that avoids such partial aggregations.         We design a replication protocol to keep the global state of
Gedik et al. propose transparent auto-parallelization for stream     a streaming topology replicated and consistent. This protocol
processing through a migration mechanism [8]. However, we            replicates every primary state of each operator instance on a
argue that their approach does not consider distributed large        given number of work unit, i.e., each sub-range of keys has its
state and it is totally decoupled from fault-tolerance.              own replica group. The purpose of a replica group is to keep a
Many SPEs have effectively implemented state management              copy of different sub-ranges of keys for each operator. A primary
techniques (e.g., Apache Flink [1, 4], Apache Spark [18], SEEP       operators sends incremental checkpoints for a given range of
[6], Naiad [13]). In particular, Apache Flink features a technique   keys to its replica through the network.
that asynchronously checkpoints the global states to minimize        3.4    Hand-Over Protocol
the latency of a snapshot [3].
                                                                       The hand-over protocol moves the processing of a given keys
                                                                     range (ks ,ke ) between two parallel instance of a target stateful
                                                                     operator. The system triggers this protocol when it detects
3.   RESEARCH ISSUES                                                 the need of either rescaling an operator, balancing the load
   Our goal is to move large operator states with minimal im-        over parallel instances of an operator, or recovering an operator.
pact on the performance of query processing. Migrating large         Main ideas behind this protocol are the usage of replica groups,
states between operator instances in one shot is expensive due to    incremental checkpoints and the embedding of the protocol itself
network transfer, especially if the system is already overloaded     in the dataflow paradigm. Moving the processing of any key
during its regular operation. Our key idea is to incrementally       involves tuple rerouting and migration of the state for that key.
maintain a replica group for each fine-grained state unit over       This operation is lightweight if the destination instance is in
the replica group of the moved key. Indeed, the replica group          4.1    Achievement Plan
misses at most the last incremental checkpoint. Let upstream              We have a clear idea about the achievement of our goal, which
be all the operators that send some input tuples to a target           we define in the following sections.
downstream operator, the steps of the protocol are:                    Fault-tolerance. Our replication protocol guarantees that each
1. The system decides to migrate that tuples marked with keys          replica group holds a copy of some ranges of keys for different
    in range (ks ,ke ), from downstream instance os to ot , which      operators. Since each key is replicated in q+1 physical units,
    is in the replica group of (ks ,ke )                               the system can sustain up to q failing instances of an operator
2. Upstream injects a key move event in the data flow for keys         by resuming the computation on one unit in its replica group.
    ks ,...,ke involving operators os and ot                           The system may need to replay some tuple unless the group has
3. Upstream sends its outgoing tuples marked with keys ks ,...,ke      the latest state checkpoint and the failing unit did not process
    to ot , which processes them creating new states s0e ,...,s0t      any newer tuple.
4. os generates an incremental checkpoint that contains its            Load balancing. Relying on a load balancing policy (e.g.,
    current states se ,...,st for keys ks ,...,ke and sends it to ot   shard size or ingested tuples count above a given threshold), the
5. As soon as ot gets the incremental checkpoint, it updates its       system triggers the hand-over protocol. Then, the hand-over
    current states se ,...,st with the received checkpoint             protocol seamlessly moves the processing of some keys ranges
6. Then ot asynchronously merges them with s0e ,...,s0t . If new       from a primary work unit to another in their replica groups.
    tuples arrive in ot , it generates new states and subsequently     Determining the placement of ranges of keys for the primary
    merges them.                                                       state is another orthogonal challenge that we plan to overcome.
As a result, the handover protocol guarantees eventual consis-         Resource elasticity. Regardless of the chosen elasticity policy,
tency on every migrated primary state after merging. Moreover,         we need to efficiently rescale the state of every range of keys
we assume that user-defined state has update and merge policies;       along with its replicas minimizing the transfer cost. Rescaling
the former updates state by processing a stream tuple, whereas         possibly involves deleting some replicas, whereas state transfer
the latter merges two partial states for the same key. If merging      can be still done incrementally by using above protocols. As
of partial state is not semantically possible, then the target         we consider primary state and replica as one entity, we reassign
instance buffers incoming tuples and updates the state upon its        them to parallel instances as described in Section 3.5. This
full receiving.                                                        procedure could benefit from current IaaS platforms where
                                                                       multiple VMs or containers share physical hardware. Indeed, we
3.5    Optimal Placement of Replica Groups                             may provision new resources on either an already used physical
   Each keys replica group is composed of q physical nodes,            node or a new node. The last scenario is more challenging as
as we aim to minimize continuous migration cost, the replica           the system must migrate entire shards of the state to the new
group has to be optimally placed over the streaming topology.          node.
Indeed, transferring an incremental checkpoint from a node a
to b could potentially have a different cost than shipping the         4.2    The system in action
same checkpoint to node c. This problem can be mapped as                 In Figure 1, we show a toy example of our system while it
a bipartite graph matching problem whose classic solution is           seamlessly performs resource scaling, state recovery, and load
well-know as the Hungarian or Kuhn-Munkres algorithm [11,              balancing. The figure shows a simple dataflow graph made of
12]. Nevertheless, our scenario is not static as we need to deal       one source (parallelism=2) and one operator (parallelism=4).
with resource scaling and failing nodes. Therefore, a dynamic          For the sake of simplicity, we marked tuples and state for the
approach to the assignment problem [10] is the best fit for our        same keys range with the same colour. Each primary state for
needs, since we look for an optimal assignment of the state items      every keys range has only one replica group. In Figure 1.A,
to an elastic set of physical nodes. Our optimization problem is       the first instance is failing while the third one is overloaded.
formulated as follows: given l sub-ranges of keys and z physical       In Figure 1.B, the hand-over protocol seamlessly moves the
nodes, find a placement for each sub-range of keys over q out of       processing and the state of both yellow and violet keys ranges.
z nodes that minimizes the migration cost. We evaluate the cost        As the state of the yellow key range was on a failing node, our
of shipping an incremental checkpoint between two nodes by             system must reply lost tuples. Meanwhile, the fourth instance
considering their workloads and the number of network switches         processes violet tuples and creates a new partial state. The
involved.                                                              hand-over protocol merges this partial state with the current
                                                                       replica and the last incremental checkpoint. Simultaneously, our
4.    RESEARCH PLAN                                                    system provisions a new instance and migrates the red state
   In this thesis, we intend to investigate above research issues      as it detects an overloaded second instance. In Figure 1.C,
w.r.t. our goal: transparently providing fault-tolerance, resource     the system is finally stable. The violet state is migrated and
elasticity, and load balancing in the presence of very large dis-      replicated on the fourth and second instance, respectively. The
tributed state. Our focus is to investigate the trade-offs behind      yellow state is restored on the second instance and replicated
our proposed solution. First, the hand-over protocol presents          on the new instance. The red state is replicated on the new
several challenges, e.g., the granularity of the keys ranges, the      instance.
concurrent execution of the protocol, and the triggering policy
of the protocol (through either consensus, common knowledge
                                                                       4.3    Evaluation Plan
or centralized entity). Secondly, we plan to investigate the             We assess the capabilities of our system through the following
usage of log-less replication (similarly to Bizur [9]) by using        set of Key Performance Indicators (KPIs):
shared registers [2]. Besides, log-less replication implies no log     1. the execution of our protocols must have negligible effect on
compaction overhead. As network is the main bottleneck, we                query processing performance
plan to research orthogonal optimizations to reduce network            2. the system must guarantee exactly-once stream processing
overhead, e.g, remote direct memory access, data compression,             and state consistency
and approximation. Lastly, the placement scheme of replica             3. performing a load balancing or a resources scaling operation
groups may require further investigation as our initial definition        must improve resource utilization of the physical infrastruc-
of migration might neglect significant hidden cost.                       ture and prevent bottlenecks (e.g., operator back-pressure)
                   A                                                        B                                                                   C
                                                                                                           NEW




                                     P1                                                                                                                                  NEW
                                                                                                           P1


                   SRC1                                                     SRC1                                                               SRC1



                                     P2                                                                    P2                                                            P2




                                     P3                                                                    P3                                                            P3



                   SRC2                                                     SRC2                                                               SRC2



                                     P4                                                                    P4                                                            P4




                                                                                      OP3



                                           normal      new        failing          overloaded   stream   primary             incremental   incremental   lost tuple to
                                                                                                                   replica
                                          instance   instance   instance            instance     tuple    state               checkpoint    migration       replay




Figure 1: Our protocols in action: tuples and state for the same keys range are marked with the same colour. Primary state for every
keys range is incrementally replicated only once. Sensible steps of the hand-over protocol are circled.

To meet above KPIs, we intend to design a suite of benchmarks                                        [5] R. Castro Fernandez, M. Migliavacca, et al. Integrating
that thoroughly stresses our proposed system. We plan to define                                          scale out and fault tolerance in stream processing using
a set of metrics (e.g., tuple processing throughput and latency,                                         operator state management. In ACM SIGMOD. 2013.
migrated state items, checkpoint size) and measure them in our                                       [6] —. Making state explicit for imperative big data
system on different real-world workloads, with distinct scaling                                          processing. In USENIX ATC. 2014.
and balancing policies, and different replica factors. Finally, we                                   [7] J. Ding, T. Fu, et al. Optimal operator state migration for
expect to compare our results with baseline systems.                                                     elastic data stream processing. CoRR, abs/1501.03619,
                                                                                                         2015.
4.4    Future directions                                                                             [8] B. Gedik, S. Schneider, et al. Elastic scaling for data
   We envision a system able to continuously process stream                                              stream processing. IEEE Trans. Parallel Distrib. Syst.,
tuples despite data rate spikes and failures. This system can                                            2014.
also seamlessly migrate itself among cluster, e.g., from one
                                                                                                     [9] E. Hoch, Y. Ben-Yehuda, et al. Bizur: A key-value
IaaS-provider to a cheaper vendor, between two operational
                                                                                                         consensus algorithm for scalable file-systems. CoRR,
environments. Incremental state migration will be a building
                                                                                                         abs/1702.04242, 2017.
block of such operations. Other orthogonal research areas may
be: (i) investigating the usage of new storage hardware, e.g,                                       [10] G. A. Korsah, A. T. Stentz , et al. The dynamic
NVRAM and SSD, (ii) considering non-keyed state and query-                                               hungarian algorithm for the assignment problem with
able state, (iii) providing elastic job maintenance, (iv) exploring                                      changing costs. Tech. Rep. CMU-RI-TR-07-27, 2007.
data compression techniques to reduce state size, and (v) in-                                       [11] H. W. Kuhn. The hungarian method for the assignment
vestigating incremental state migration (and resource elasticity)                                        problem. Naval Research Logistics Quarterly, 1955.
in case of Hybrid Transactional-Analytical Processing (HTAP)                                        [12] J. Munkres. Algorithms for the assignment and
workloads.                                                                                               transportation problems. Journal of the Society of
Acknowledgments: We would like to thank our advisor Prof.                                                Industrial and Applied Mathematics, 1957.
Volker Markl, as well as Prof. Tilmann Rabl for his valuable                                        [13] D. Murray, F. McSherry, et al. Naiad: A timely dataflow
guidance, as well as Dr. Asterios Katsifodimos, Dr. Sebastian                                            system. In ACM SOSP. 2013.
Breß, and Dr. Alireza Rezaei Mahdiraji for their support. This                                      [14] M. Nasir, G. Morales, et al. The power of both choices:
work has been partially supported by the European Commission                                             Practical load balancing for distributed stream processing
through PROTEUS (ref. 687691).                                                                           engines. CoRR, abs/1504.00788, 2015.
                                                                                                    [15] —. When two choices are not enough: Balancing at scale
5.    REFERENCES                                                                                         in distributed stream processing. CoRR, abs/1510.05714,
 [1] A. Alexandrov, R. Bergmann, et al. The stratosphere                                                 2015.
     platform for big data analytics. The VLDB Journal, 2014.                                       [16] R. Sumbaly, J. Kreps, et al. The big data ecosystem at
 [2] H. Attiya, A. Bar-Noy, et al. Sharing memory robustly in                                            linkedin. In ACM SIGMOD. 2013.
     message-passing systems. In ACM PODC. 1990.                                                    [17] Y. Wu and K. Tan. Chronostream: Elastic stateful stream
 [3] P. Carbone, G. Fóra, et al. Lightweight asynchronous                                               computation in the cloud. In IEEE ICDE. 2015.
     snapshots for distributed dataflows. CoRR,                                                     [18] M. Zaharia, T. Das, et al. Discretized streams:
     abs/1506.08603, 2015.                                                                               Fault-tolerant streaming computation at scale. In ACM
 [4] P. Carbone, A. Katsifodimos, et al. Apache flinkTM :                                                SOSP. 2013.
     Stream and batch processing in a single engine. IEEE
     Data Eng. Bull., 30(40), 2015.