Towards Elastic Stream Processing: Patterns and Infrastructure ∗ Kai-Uwe Sattler Felix Beier Ilmenau University of Technology Ilmenau University of Technology Ilmenau, Germany Ilmenau, Germany kus@tu-ilmenau.de felix.beier@tu-ilmenau.de ABSTRACT phere Streams or our own AnduIN engine provide abstrac- Distributed, highly-parallel processing frameworks as tions to process continuous and possibly infinite streams Hadoop are deemed to be state-of-the-art for handling big of data instead of disk-resident datasets. Typically, this in- data today. But they burden application developers with cludes standard (relational) query operators, window-based the task to manually implement program logic using low- operators for computing joins and aggregations as well as level batch processing APIs. Thus, a movement can be ob- more advanced data analytics and data mining operators served that high-level languages are developed which allow working on portions of the stream, e.g. windows or synopses to declaratively model dataflows that are automatically op- of data. Complex Event Processing systems (CEP) partic- timized and mapped to the batch-processing backends. How- ularly support the identification of event patterns in (tem- ever, most of these systems are based on programming mod- poral) streams of data such as a sequence of specific event els as MapReduce that provide elasticity and fault-tolerance types within a given time interval. Typically, systems of both in a natural manner since intermediate results are mate- classes provide a declarative interface, either in form of SQL- rialized and, therefore, processes can simply be restarted like query languages like CQL for DSMS, event languages and scaled with partitioning input datasets. For continuous like SASE, or in the form of dataflow specifications like SPL query processing on data streams, these concepts cannot be in IBM Infosphere Streams. applied directly since it must be guaranteed that no data Recently, several new distributed stream computing plat- is lost when nodes fail. Usually, these long running queries forms have been developed, aiming at providing scalable and contain operators that maintain state information which de- fault-tolerant operation in cluster environments. Examples pends on the data that has already been processed and hence are Apache S4 or Storm. In contrast to DSMS or CEP en- they cannot be restarted without information loss. This also gines theses platforms do not (yet) provide declarative inter- is an issue when streaming tasks should be scaled. Therefore, faces and, therefore, require to program applications instead integrating elasticity and fault-tolerance in this context is a of writing queries. Developers of these systems argue that challenging task which is subject of this paper. We show how they provide the same for stream processing what Hadoop common patterns from parallel and distributed algorithms did for batch processing – which raises the hope of a similar can be applied to tackle these problems and how they are movement towards higher-level languages as we can see with mapped to the Mesos cluster management system. Pig, Jaql etc. for MapReduce. However, there are some challenges in scalable and elastic stream processing which are different from batch processing 1. INTRODUCTION with Hadoop. Whereas in Hadoop, input data as well as Processing and analyzing big data is one of todays big intermediate results are materialized on disk and, therefore, challenges. A popular definition from a Gartner report • both, map and reduce tasks can be restarted arbitrarily names the three ’V’ s – volume, velocity, and variety as the in case of failures until the entire job is finished, main characteristics of big data. Among them, velocity refers • since computation state is saved, the number of nodes to the analytics of dynamic data even in (near) realtime. assigned to map and reduce tasks can be simply adjusted Several approaches and techniques have been developed by partitioning input and intermediate results. in the past to process dynamic data. Data stream manage- This is more difficult when processing dynamic data – even ment systems (DSMS) like STREAM, Aurora, IBM Infos- with platforms as S4 or Storm which to some extent support ∗This work is partially funded by an IBM PhD Fellowship. a reliable and scalable operation. The main differences are: (1) Partitioning of streams for data-parallel processing is not always easily possible, for example in case of window- or sequence-based operators including CEP operators. Also, elastic operation by adding new nodes at runtime of a query requires at least rerouting of data. (2) Stream queries are typically long running queries which cannot be simply restarted without losing data. Further- more, because of this, the deployment and resource allo- cation (placement of queries on nodes, allocating mem- ory and CPUs) are much more critical. 1 In this paper, we try to answer the question how to bridge and topologies have to be implemented in a programming the gap between an easy-to-use, high-level declarative in- language like Java or Python. Furthermore, state recovery terface for data stream analytics and scalable cluster-based and partitioning have to be implemented manually, too. stream computing platforms in order to address these chal- Optimus [11] is a framework for dynamic rewriting of lenges. The contribution of this paper is twofold: execution plans for data-parallel computing, e.g., formu- • Based on a basic dataflow model for stream queries we de- lated in DryadLINQ. The framework supports rewriting of scribe patterns for fault-tolerant and scalable query pro- MapReduce programs at runtime, addressing issues like re- cessing and discuss constraints of their application. partitioning, fault tolerance, and handling data skew. But • We show the implementation and deployment of these pat- the required algorithms have to be implemented by the user. terns using our distributed stream and CEP engine An- duIN and the Mesos cluster infrastructure by describing 3. PATTERNS FOR SCALABLE PROCESS- techniques supporting flexible and elastic deployment. ING OF DATA STREAM QUERIES Though, we use the AnduIN system for describing and im- In the following we assume a simple processing model for plementing the concepts, we think the ideas and patterns data stream queries: a query is represented by a dataflow are applicable to other platforms, too. graph which is a common model in literature [9, 11]. In such a directed acyclic graph, nodes represent query 2. RELATED WORK operators and edges describe the tuple flow between them. The relevant work related to this paper can be classified Query nodes can be arbitrary pipeline operators of a stream into the main categories: continuous query processing and query algebra [2] like filter, projection etc. as well as window- scalable dataflow platforms. or synopsis-based operators as sliding window joins and ag- Continuous query processing is usually implemented in gregations, but also more complex data analytics operators data stream management systems (DSMS). Pioneered by including CEP and data mining. Communication between systems like STREAM, Borealis, and Telegraph, several query operators is performed either directly by invoking op- approaches and systems have been developed in the last erator functions or via buffers/queues. Obviously, this repre- decade including commercial products such as IBM Info- sents a very generic execution model to which a wide range Sphere Streams and StreamBase. Typically, these systems of declarative query languages like CQL, dataflow specifi- provide a SQL-like query language enhanced by features for cations such as IBM’s SPL, or implementation-oriented ap- dealing with continuous queries such as sliding windows. proaches as used in S4 or Storm can be mapped. This model Partitioning, distributed processing, and fault tolerance can be easily extended to the distributed case by inserting have been studied to some extent, e.g., in Borealis [1] by in- network reader/writer nodes which use appropriate com- troducing replicated processing nodes as well as several new munication protocols and APIs, e.g., TCP/UDP sockets or tuple types such as punctuation tuples and control tuples more advanced solutions like ZeroMQ. like undo tentative (tuples resulting from processing a sub- There are two main reasons for distributing query nodes: set of the input which can be corrected later) and done tuples increasing processing reliability by introducing redundancy, indicating that state reconciliation finished. State reconcil- and increasing performance and/or scalability by load distri- iation is the process of stabilizing the output result, e.g., bution. The following patterns support these goals to differ- by replacing previously tentative results. In this way, this ent degrees. In this discussion, we use the term “query task” approach aims at fault-tolerance but not at partitioning. as the unit of distribution/scheduling for both, elementary Another approach in the form of a programming model algebra operators, and for dataflow (sub-)graphs with well- has been proposed in [14] as so-called discretized streams (D- defined properties (input/output, stateless vs. statefulness). Streams). This idea is based on resilient distributed datasets Pattern 1: Simple standby: For a critical query node N which are storage abstractions used for rebuilding lost data. a standby node S is maintained on a separate compute unit An approach addressing load balancing issues by par- which is activated if N fails. This requires monitoring of N , titioning while providing fault tolerance for pipelined e.g. by a combination of heartbeats and cluster coordination dataflows is Telegraph’s FluX [10]. FluX is a dataflow opera- service such as ZooKeeper as well as rerouting the input tor extending the idea of the exchange operator form parallel tuple stream to S. Since in case of a failure the state of N query processing. The operator encapsulates state partition- is lost, this pattern is applicable only for stateless nodes. ing and tuple routing and allows to repartition even stateful operators while executing the dataflow pipeline. Pattern 2: Checkpointing: This pattern is similar to pat- Scalable dataflow platforms try to extend the applicabil- tern 1 but supports stateful operators. Failover is achieved ity of the MapReduce paradigm for large-scale parallel batch by periodically checkpointing the state of the critical node N processing to pipeline processing and continuous query sup- to a shared disk and restarting the standby node S from the port. One example is HOP (Hadoop Online Prototype) [4]. checkpoint. Examples of such checkpoints are the content of In HOP, map tasks maintain TCP sockets to reducers for sliding windows or hash tables for joins and aggregations. pipelining their output. In addition, pipelining is also sup- Pattern 3: Hot standby: If the failover time of pattern ported between jobs by sending the output of reducers di- 2 is not acceptable, a hot standby approach can be chosen rectly to mappers of a subsequent job. Further, distributed where redundant query nodes S are kept actively. To achieve dataflow systems are Twitter’s Storm and Apache S4. Storm this, the input stream has to be sent to all redundant nodes, implements fault detection at task level and guaranteed mes- either by using multicast strategies at the network or at the sage passing, whereas in S4 messages can be lost. Storm runs application level. This pattern works both with stateless and so-called topologies – subsets of these topologies are assigned stateful query operators but requires a special node to elim- to worker processes of a cluster. However, these systems only inate duplicate results, e.g., a stream selector node which offer a simple programming model and, therefore, operators forwards the input from only one of multiple streams. 2 network writer N network writer N multicast writer N stream selector failover failover log failover S S S Figure 1: Simple Standby Figure 2: Checkpointing Figure 3: Hot Standby Pattern 4: Stream partitioning: This pattern exploits AnduINv2 queries are deployed as separate processes in data parallelism by partitioning the input stream. It can be Mesos which are just-in-time compiled using the system’s implemented by a splitter node redirecting each input tuple C++ compiler. This provides an easy mechanism to plug-in to one of the query nodes N1 . . . Nk or by a multicast writer user defined operators and exchange operator implementa- with an additional partitioning node P1 . . . Pk for filtering tions. During deployment, processes are interlinked by query the input stream according to the partitioning scheme. Fi- channels which simply represent an abstraction over network nally, the results are merged into a single stream. connections (TCP/UDP sockets, ZeroMQ connections). Pattern 5: Stream pipelining: In contrast to pattern 4 Query tasks can be shared among multiple queries when this pattern exploits task parallelism by splitting a complex they share some common (sub)streams or operators as de- query node N into a sequence of query nodes N1 . . . Nk and scribed in [3]. Further, a query can be implemented as a set placing them on separate compute units. of tasks which are distributed across multiple nodes in the cluster. Therefore, the logical query tree is partitioned into Usually, multiple patterns will be applied in order to smaller subtrees that are translated separately. We will use achieve certain quality of service (QoS) guarantees as fault these mechanisms for implementing the elasticity patterns. tolerance (patterns 1-3) or elasticity for adapting resource consumption (patterns 4-5) according to the needs of the 4.1 Dataflow Graph Rewriting applications. Of course this pattern list does not claim com- dataflows are specified in an XML format which can be pleteness. There are several others that are applicable under seen as an intermediate representation, allowing to use dif- certain circumstances, e.g., parallelization through aggrega- ferent frontends such as CQL or graphical tools. A dataflow tion trees for commutative and associative aggregation op- specification consists of stream type definitions and operator erators [9]. Nevertheless, these basic patterns described here definitions with name, type, type-specific parameters as well are well-known in distributed and parallel algorithms, and as input and output channels. These channels are typed and – with slight modifications – cover various use cases 1 . are used to interconnect operators to form a graph. The fol- In the following we will describe how these patterns can lowing example shows a simple dataflow specification. (We be utilized in a dataflow framework to dynamically restruc- omitted the XML notation for better readability). ture the physical representation of the graph in a continuous query context that is executed in a cluster infrastructure. type name = ”aStreamType” { column name = ”x”, type = ”int” . . . The restructuring is achieved with a simple set of rewriting } rules that are automatically applied on the graph without operator name = ”source”, type = ”reader” { the need to manually code them as in existing approaches output name = ”aStream”, type =”aStreamType” } [11] while guaranteeing that no state information is lost dur- operator name = ”myFilter”, type = ”filter” { ing the restructuring phase. We present the algorithms based input name = ”aStream” on our AnduINv2 stream processing engine but highlight param condition = ”aStream.x < 42” output name = ”filteredStream”, type = ”aStreamType” that they are also applicable on other frameworks as Dryad } with slight modifications to achieve streaming semantics. operator name = ”sink”, type = ”writer” { input name = ”filteredStream” } 4. QUERY DEPLOYMENT INFRASTRUC- Query TURE Query operator Parameters (Meta-Query) Fig. 6 illustrates the dataflow model used in AnduINv2 Output Channels Input Channels and how it is mapped to the physical layer of executable Query task code. While the first prototype of the system aimed at pro- cessing sensor data as well as in-network processing [12] and Query Task deploy complex event processing (CEP) [8], our current research fo- Task State cuses on processing techniques for cluster environments. The Query ~ (t) P AnduINv2 system comprises three components: (1) the run- scheduler Query 1: t1, t2, t3, ... time environment containing the implementation of query Query 2: ... Query 3: ... operators including a CEP engine as well as operators for ~ I(t) ~ O(t) controlling the query execution, (2) a query compiler trans- ... ⌧i ⌧p lating a dataflow-based query specification given in an XML Mesos Standby file into query tasks, i.e., executable code linked to the run- master master ~ S(t) time environment, (3) a query scheduler and executor inte- Input Queue Operator States Output Queue grated with the Mesos cluster framework that deploys query Task Task Task Task T T Query Query Query Query tasks for processing them on physical nodes. executor executor executor executor Mesos Mesos Mesos Mesos ... ... 1 Actually, aggregation trees are just combinations of parti- slave slave slave slave future tuples T processed tuples tioning and pipelining patterns. Figure 6: query model 3 P1 N1 multicast stream P2 N2 writer merger N1 N2 Nk Pk Nk Figure 4: Partitioning Figure 5: Pipelining Note, that apart from stream input and possible output windows) and are updated with each incoming tuple through no communication operators have to be specified as part a state transition function τi 2 . The meta query extension is of the query. Such operators are added during rewriting if represented by special input channels P ~ (t) that modify the necessary. For formulating rewriting rules we use a simple operator state through τp . notation. A dataflow as given above is written as To guarantee that a node failure does not lead to an in- sink := writer(f := filter(src := reader)) formation loss it is necessary that all results which have not been consumed by the following target can be reproduced where writer, filter, and reader are operator types and the from the possibly infinite input stream. Therefore, the op- optional sink, f, and src names denote operator instances. erator state needs to be snapshotted after each input tuple, During rewriting, graph patterns have to be matched and or – if this is too expensive – the input tuples need to be constraints are checked. For this purpose, the pseudo-type persisted in order to reproduce this state with just ’replay- any is used as a placeholder for any possible type, and any* ing’ the input. Which tuples are still required for a possible represents a dataflow of arbitrary operator types. The fol- replay can be controlled by special tuple messages that are lowing pattern matches a dataflow subgraph containing a exchanged between tuple producers and consumers as in the stateless filter operator (which is the case for any filter): Borealis system [1]. Note when frameworks as ZeroMQ are used to implement query channels, reliable message delivery a2 := any(f := filter(a1 := any*))[stateless(f)] can be guaranteed without the need to modify operators. In order to implement fault tolerance with transferring To apply the patterns, a rewriting rule can be specified: stateful query tasks to other computing nodes, a simple pro- ⇒ a2 (stream-selector(failover(writer(f(reader(multicast(a1 ))))))) tocol as presented in [10] is sufficient: | {z } | {z } | {z } @p1 @p∗ @p2 (1) quiesce all input streams, Besides inserting or replacing operators (such as stream- (2) replicate the task state to the target node, selector, failover, and multicast operators in the previous ex- (3) redirect the input streams to the target node, ample), operator nodes are also annotated with placement (4) unquiesce all input streams. information where pi , pj with i 6= j denote distinct compute 4.3 Elasticity Handling nodes and p∗ denotes an arbitrary number of nodes. The same algorithm can be used to replace query tasks 4.2 Failover Handling with their rewritten versions that compute the same logical With these rewriting rules, internal data management transformation but use different operator implementations nodes and different operator implementations can be trans- and/or partitioning schemes of the query graph into tasks parently injected into the query plan without impacting the for implementing the elasticity patterns 4 and 5. application. This allows to re-schedule a query task to an- Rewriting Cost Model: Usually, there are several possi- other node in case of a failure (pattern 1), use an operator bilities for rewriting dataflow graphs. In order to make right implementation that automatically integrates snapshotting decisions which tasks shall be replaced and how many nodes (pattern 2), or replicate a task to implement hot standby. should be allocated, a cost model is required taking possi- The actual flow of data through query channels during ble rewriting benefits into account as well as costs for the runtime is controlled by special operator parameters – e.g., restructuring, e.g., for transferring states or costs for addi- target IP addresses and ports – that can be adjusted through tional resources from the cluster infrastructure. Discussing a concept we call meta queries (cf. Sect. 4.5). To detect and elaborate decision models is not in the focus of this paper react on failures, query tasks are instrumented with moni- and is left for future work. We outline a rate-based model toring interfaces that inform the query scheduler about the that is suitable to find hot spots in dataflows and is used in nodes’ health and performance measures as tuple processing related literature [13]. rates. The scheduler then triggers a graph rewriting. Rewritings should be done when a query task is detected When a rewritten graph needs to be deployed, it has to be that cannot process its incoming tuples with a rate higher guaranteed that no information of the tuple stream is lost. than the arrival rate, e.g., when the computational complex- To analyze the necessary steps, we reduce a query task to a ity or the memory consumption is too high and therefore the finite state machine model (cf. Fig. 6) which is common for task accumulates an increasing backlog. Such a task repre- implementing CEP operators [6] but can also be applied for sents the critical path in the dataflow, limiting the overall general dataflow transformations. The query task receives a throughput. For finding these paths, a rate-based optimiza- ~ stream of input tuples I(t), applies its logical operation(s) tion approach is suitable that scans the dataflows starting λ – e.g., a filter or a join – to generate an output stream at source nodes and detects such bottlenecks based on moni- ~ O(t). (We use vectorial representations here to combine all toring information gathered during the execution [13]. After channels into a single quantity.) The output might depend hot spots have been identified, one or multiple of the follow- ~ on the task’s state S(t), i.e., the state of all internal oper- ing methods can be applied for alleviating these bottlenecks. ators which can basically include anything that is required 2 Usually, the separation of λ and τ is only conceptual and for implementing the operators (e.g., hash tables, or sliding both functions are combined. 4 'tick'-pattern 'ticks' P+ P + value A D+ t1 t2 t3 (t4 t5 t6 ) t7 |{z} |{z} |{z} |{z} B+ A B C D C p1 p1p2 p2 p1p3 t 1 t2 t3 t4 t5 t 6 |{z} | {z } |{z} |{z} t7 P + 1 p3 p2p3 ... ... A B C D p1 p1p2 p1p2p3 p2 p1p3 ... t4 t5 t6 t7 partition |{z} |{z} |{z} |{z} p3 p2p3 ... P + time A B C D ... ... ... p3p4 p1p2p3 t1 t2 t3 t4 t5 t6 t7 pipeline 2 p4p5 p2p3p4 ... ... Figure 7: ’tick’-shaped pattern P P ° P + Task sharing (Pattern 5): When multiple queries share p1 p1p2 p1p2p3 p2 p1p3 ... the same (sub)graph to increase data locality [3] and the ... ... ... shared graph is on the critical path, this path can be repli- cated, sharing groups can be repartitioned, and tuples dis- Figure 8: rewriting repetition operator tributed to all replicas. This is the easiest way to remove A matches any incoming tuple burden from the critical path since inputs of sharing groups B + matches all following tuples with decreasing value are independent from each other and no special dataflow C matches the first tuple with increasing value after B transformations need to be performed. but with a value less than the previous one of A Inter-operator parallelism (Pattern 4): Usually, it is D+ matches following tuples with increasing values > A better to keep dataflow operations on few nodes in order For parallelization, we focus on the +-operator since it to avoid costly transfer operations. Hence, initially com- is challenging for three reasons: First, like a join, it can piled queries will comprise few tasks consisting of large produce multiple output tuples per input tuple. Second, it flow (sub)graphs. However, when the computational com- needs to store state information for extending existing pat- plexity exceeds a certain threshold or memory limits for terns to longer ones. Since each tuple is possibly multiplying keeping state information are exceeded, a distributed pro- the number of results it is likely that – due to memory con- cessing pipeline will yield better performance. Large graphs straints – such operators will become critical in the dataflow are partitioned, recompiled, and distributed on additional graph. Third, in most cases the behavior of the stream is not nodes in the cluster. Moreover, splitting large costly tasks predictable, rendering static allocations infeasible. into smaller distributed ones increases fault-tolerance since Fig. 8 illustrates how the operator can be distributed dy- it will become cheaper to recover from node failures [11]. namically to multiple nodes with simply applying graph pat- Intra-operator parallelism (Pattern 4+5): When the terns 4 and 5. The P +-operator comprises two parts: a pat- previous patterns not applicable, e.g., when operators are tern matcher P , and a +-operator which maintains all previ- not shared or the graph has already been split into base ously matched patterns as state and concatenates them with operators, the last possibility to increase parallelism is par- subsequent matches. The output of + is the output for the titioning input streams and processing each partition inde- entire operator and serves as input for + again to construct pendent from each other with multiple operator instances. longer matches. On memory overflow, the operator state can Unfortunately, this pattern is the most difficult to implement be partitioned and distributed to multiple instances where since its applicability depends on the actual operator type. all instances receive the original input stream. Two different Partitions in input streams need to be found, distributed to behaviors of the operator are required to avoid duplicates. the operator instances, and their (partial) results have to be The first instance processes the input directly, i.e., all match- merged afterwards. Further, this pattern is prone to data ing tuples serve as new patterns of length 1. Those matches skews and, therefore, some sort of load balancing has to be must not be reproduced by other instances that simply serve implemented, e.g., by monitoring the load of each partition as targets for overflowing patterns that do not fit into the and dynamically re-schedule partitions as proposed in [10]. local state but are independent from each other and hence This concept seamlessly integrates with the graph rewriting can be processed on separate nodes. When the complexity patterns described in this paper but again involves addi- of the matching algorithm P is critical, elasticity can also be tional costs for transferring partitions among cluster nodes. achieved with implementing a pipeline. It exploits the fact The most challenging problem in this context is finding that P + can be expressed through P ∨(P P + ), i.e., a pipeline suitable partitioning schemes for the operators that shall of arbitrary length is constructed for matching incoming tu- be deployed in the framework, especially when they are ples in parallel, emitting them as output, and forwarding not stateless. In the following, we will present a paralleliza- them to the next stage for extension. tion scheme in the complex event processing (CEP) context Since such parallelization schemes depend on operator se- which is prominent in stream processing. mantics, the framework provides them to automatically scale The task of CEP is finding complex patterns in a stream up and down required resources for built-in operators. For of base patterns. These patterns are defined through cer- all user defined functions which are treated as black boxes tain properties of incoming tuples, usually described through by AnduINv2, the parallelization needs to be implemented predicates and additional correlations of their arrival time. by the user as in [11] or are provided through libraries that Mostly, sequences and repetitions are used which can be ex- are linked as plugin to the execution environment. pressed with regular expressions [15]. We demonstrate the parallelization on the ’tick-shaped’ pattern example from [5] 4.4 Mesos Integration which is illustrated in Fig. 7. In the original publication, the Mesos [7] is a cluster management software for resource task for detecting such patterns is originated in stock ex- isolation and sharing. In Mesos, a master daemon (possi- change trading, but it could also be applied for burst detec- bly supported by additional standby masters) manages a tion. A ’tick-shape’ can be expressed by AB + CD+ where: set of slaves nodes. An application (called framework) runs 5 tasks on these slaves which is initiated by so-called execu- being integrated into AnduINv2, but can be applied in other tors. Scheduling and resource assignments are managed by platforms, too. Our main questions we would like to answer an application-specific scheduler. In order to support stream with future experiments are: queries we implemented our own framework (cf. Fig. 6), 1) Which cost models are valid for online graph rewriting? providing an executor for running query executables (query 2) How can resource requirements for a query be estimated tasks) on slave nodes and a query scheduler which gets re- before actually executing it? source offers from the Mesos master (available cores, mem- 3) How can certain QoS guarantees be given to applications? ory, and network ports) and requests for executing AnduIN 4) Can elastic stream processing benefit from heterogeneous queries. Each query deployment request is described by a clusters nodes? unique ID, the executable, and a specification of resource requirements, i.e., CPU cores, memory, and a list of query While the first questions intent to pave the way for a channels which have to be mapped to network ports. This streaming-as-a-service infrastructure, answering the last one specification is used by the scheduler to choose a slave node is needed to keep up with current hardware development providing the requested resources for execution. Currently, trends. We believe that parallel and specialized processors only a simple strategy is implemented selecting the first offer as many-core CPUs, GPUs, or FPGAs will find their way providing the requested resources – more advanced strate- into future computing centers to provide the most efficient gies are subject of future work. If the scheduler has chosen computing platforms for dedicated tasks – an important as- an appropriate node, the request is forwarded to the corre- pect to tackle the big data challenge. sponding executor. The scheduler assigns physical network ports to query channels and tracks these assignments to be 6. REFERENCES able to connect subsequent queries referring to the same [1] D. J. Abadi, Y. Ahmad, M. Balazinska, et al. The Design of logical channel. In this way, a query implemented by one or the Borealis Stream Processing Engine. In CIDR ’05, 2005. more tasks can be deployed to one or more cluster nodes. [2] A. Arasu, S. Babu, and J. Widom. CQL: A language for continuous queries over streams and relations. In Database 4.5 Meta Queries Programming Languages. Springer, 2004. Though, Mesos provides mechanisms to deploy processes, [3] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. NiagaraCQ: it does not support elastic operation for stream queries. In a scalable continuous query system for Internet databases. Hadoop, it is the task of the job tracker to partition the work SIGMOD Rec., 29:379–390, 2000. [4] T. Condie, N. Conway, P. Alvaro, J. M. Hellerstein, across a set of map and reduce tasks. In case of data streams K. Elmeleegy, and R. Sears. MapReduce online. In NSDI, the situation is a bit different, because we cannot simply pages 21–21, 2010. stop and continue/restart queries without loosing data. The [5] N. Dindar, P. M. Fischer, M. Soner, and N. Tatbul. only way to achieve elasticity is to change query behavior at Efficiently correlating complex events over live and archived runtime. Therefore, we introduce the idea of meta queries: in data streams. In DEBS ’11. ACM, 2011. each (adjustable) query task an additional query is running [6] M. Eckert, F. Bry, S. Brodt, O. Poppe, and S. Hausmann. on a control stream consisting of tuples of the form: A CEP Babelfish: Languages for Complex Event Processing and Querying Surveyed. In Reasoning in Event-Based hquery id, operator id, parameter, valuei Distributed Systems. Springer, 2011. [7] B. Hindman, A. Konwinski, M. Zaharia, A. Ghodsi, A. D. The control stream is produced by the query scheduler Joseph, R. Katz, S. Shenker, and I. Stoica. Mesos: A platform for fine-grained resource sharing in the data which monitors resource utilization and implements strate- center. In NSDI, pages 22–22, 2011. gies for dynamic reallocation. Meta queries are particular [8] S. Hirte, E. Schubert, A. Seifert, S. Baumann, D. Klan, and useful for implementing the patterns described in Sect. 3. K. Sattler. Data3 - A Kinect Interface for OLAP using For instance, for failover without publish-subscribe (pattern Complex Event Processing. In ICDE, 2012. 1 and 2), the network writer has to be informed about the [9] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly. network address of the newly activated standby node S. For Dryad: distributed data-parallel programs from sequential this purpose, the network writer provides a parameter target- building blocks. SIGOPS, 41:59–72, 2007. addr for the target address. A control stream tuple like [10] M. S. Joseph, J. M. Hellerstein, S. Ch, and M. J. Franklin. Flux: An Adaptive Partitioning Operator for Continuous Query Systems. In ICDE, 2002. hquery#42, writer#2, target-addr, ”tcp://node2:6666”i [11] Q. Ke, M. Isard, and Y. Yu. Optimus: a dynamic rewriting received by the query task triggers sending the tuple stream framework for data-parallel execution plans. In EuroSys, to the standby node node2. Similarly, for implementing pat- pages 15–28, 2013. tern 3, the stream selector node can be informed about [12] D. Klan, M. Karnstedt, K. Hose, L. Ribe-Baumann, and K. Sattler. Stream engines meet wireless sensor networks: switching to the stream produced by query node S. cost-based planning and processing of complex queries in For partitioning patterns like pattern 4 it is either required AnduIN. Distrib. and Parallel Databases, 29:151–183, 2011. to modify the tuple distribution strategy of multicast writers [13] S. D. Viglas and J. F. Naughton. Rate-based query or to adjust partitioning predicates Pi in Fig. 3. Both can be optimization for streaming information sources. In easily implemented by sending appropriate control tuples. SIGMOD ’02. ACM, 2002. [14] M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica. Discretized streams: an efficient and fault-tolerant model 5. CONCLUSION AND FUTURE WORK for stream processing on large clusters. In HotCloud ’12. We presented basic concepts how fault-tolerance and elas- USENIX Association, 2012. ticity can be achieved in the context of continuous query [15] F. Zemke, A. Witkowski, M. Cherniak, and L. Colby. processing by combining techniques that have proven appli- Pattern matching in sequences of rows. Technical report, ANSI Standard Proposal, 2007. cability in other scenarios. These approaches are currently 6