=Paper=
{{Paper
|id=None
|storemode=property
|title=Towards Elastic Stream Processing: Patterns and Infrastructure
|pdfUrl=https://ceur-ws.org/Vol-1018/paper9.pdf
|volume=Vol-1018
|dblpUrl=https://dblp.org/rec/conf/vldb/SattlerB13
}}
==Towards Elastic Stream Processing: Patterns and Infrastructure==
Towards Elastic Stream Processing: Patterns and
Infrastructure
∗
Kai-Uwe Sattler Felix Beier
Ilmenau University of Technology Ilmenau University of Technology
Ilmenau, Germany Ilmenau, Germany
kus@tu-ilmenau.de felix.beier@tu-ilmenau.de
ABSTRACT phere Streams or our own AnduIN engine provide abstrac-
Distributed, highly-parallel processing frameworks as tions to process continuous and possibly infinite streams
Hadoop are deemed to be state-of-the-art for handling big of data instead of disk-resident datasets. Typically, this in-
data today. But they burden application developers with cludes standard (relational) query operators, window-based
the task to manually implement program logic using low- operators for computing joins and aggregations as well as
level batch processing APIs. Thus, a movement can be ob- more advanced data analytics and data mining operators
served that high-level languages are developed which allow working on portions of the stream, e.g. windows or synopses
to declaratively model dataflows that are automatically op- of data. Complex Event Processing systems (CEP) partic-
timized and mapped to the batch-processing backends. How- ularly support the identification of event patterns in (tem-
ever, most of these systems are based on programming mod- poral) streams of data such as a sequence of specific event
els as MapReduce that provide elasticity and fault-tolerance types within a given time interval. Typically, systems of both
in a natural manner since intermediate results are mate- classes provide a declarative interface, either in form of SQL-
rialized and, therefore, processes can simply be restarted like query languages like CQL for DSMS, event languages
and scaled with partitioning input datasets. For continuous like SASE, or in the form of dataflow specifications like SPL
query processing on data streams, these concepts cannot be in IBM Infosphere Streams.
applied directly since it must be guaranteed that no data Recently, several new distributed stream computing plat-
is lost when nodes fail. Usually, these long running queries forms have been developed, aiming at providing scalable and
contain operators that maintain state information which de- fault-tolerant operation in cluster environments. Examples
pends on the data that has already been processed and hence are Apache S4 or Storm. In contrast to DSMS or CEP en-
they cannot be restarted without information loss. This also gines theses platforms do not (yet) provide declarative inter-
is an issue when streaming tasks should be scaled. Therefore, faces and, therefore, require to program applications instead
integrating elasticity and fault-tolerance in this context is a of writing queries. Developers of these systems argue that
challenging task which is subject of this paper. We show how they provide the same for stream processing what Hadoop
common patterns from parallel and distributed algorithms did for batch processing – which raises the hope of a similar
can be applied to tackle these problems and how they are movement towards higher-level languages as we can see with
mapped to the Mesos cluster management system. Pig, Jaql etc. for MapReduce.
However, there are some challenges in scalable and elastic
stream processing which are different from batch processing
1. INTRODUCTION with Hadoop. Whereas in Hadoop, input data as well as
Processing and analyzing big data is one of todays big intermediate results are materialized on disk and, therefore,
challenges. A popular definition from a Gartner report • both, map and reduce tasks can be restarted arbitrarily
names the three ’V’ s – volume, velocity, and variety as the in case of failures until the entire job is finished,
main characteristics of big data. Among them, velocity refers • since computation state is saved, the number of nodes
to the analytics of dynamic data even in (near) realtime. assigned to map and reduce tasks can be simply adjusted
Several approaches and techniques have been developed by partitioning input and intermediate results.
in the past to process dynamic data. Data stream manage-
This is more difficult when processing dynamic data – even
ment systems (DSMS) like STREAM, Aurora, IBM Infos-
with platforms as S4 or Storm which to some extent support
∗This work is partially funded by an IBM PhD Fellowship. a reliable and scalable operation. The main differences are:
(1) Partitioning of streams for data-parallel processing is not
always easily possible, for example in case of window-
or sequence-based operators including CEP operators.
Also, elastic operation by adding new nodes at runtime
of a query requires at least rerouting of data.
(2) Stream queries are typically long running queries which
cannot be simply restarted without losing data. Further-
more, because of this, the deployment and resource allo-
cation (placement of queries on nodes, allocating mem-
ory and CPUs) are much more critical.
1
In this paper, we try to answer the question how to bridge and topologies have to be implemented in a programming
the gap between an easy-to-use, high-level declarative in- language like Java or Python. Furthermore, state recovery
terface for data stream analytics and scalable cluster-based and partitioning have to be implemented manually, too.
stream computing platforms in order to address these chal- Optimus [11] is a framework for dynamic rewriting of
lenges. The contribution of this paper is twofold: execution plans for data-parallel computing, e.g., formu-
• Based on a basic dataflow model for stream queries we de- lated in DryadLINQ. The framework supports rewriting of
scribe patterns for fault-tolerant and scalable query pro- MapReduce programs at runtime, addressing issues like re-
cessing and discuss constraints of their application. partitioning, fault tolerance, and handling data skew. But
• We show the implementation and deployment of these pat- the required algorithms have to be implemented by the user.
terns using our distributed stream and CEP engine An-
duIN and the Mesos cluster infrastructure by describing 3. PATTERNS FOR SCALABLE PROCESS-
techniques supporting flexible and elastic deployment. ING OF DATA STREAM QUERIES
Though, we use the AnduIN system for describing and im- In the following we assume a simple processing model for
plementing the concepts, we think the ideas and patterns data stream queries: a query is represented by a dataflow
are applicable to other platforms, too. graph which is a common model in literature [9, 11].
In such a directed acyclic graph, nodes represent query
2. RELATED WORK operators and edges describe the tuple flow between them.
The relevant work related to this paper can be classified Query nodes can be arbitrary pipeline operators of a stream
into the main categories: continuous query processing and query algebra [2] like filter, projection etc. as well as window-
scalable dataflow platforms. or synopsis-based operators as sliding window joins and ag-
Continuous query processing is usually implemented in gregations, but also more complex data analytics operators
data stream management systems (DSMS). Pioneered by including CEP and data mining. Communication between
systems like STREAM, Borealis, and Telegraph, several query operators is performed either directly by invoking op-
approaches and systems have been developed in the last erator functions or via buffers/queues. Obviously, this repre-
decade including commercial products such as IBM Info- sents a very generic execution model to which a wide range
Sphere Streams and StreamBase. Typically, these systems of declarative query languages like CQL, dataflow specifi-
provide a SQL-like query language enhanced by features for cations such as IBM’s SPL, or implementation-oriented ap-
dealing with continuous queries such as sliding windows. proaches as used in S4 or Storm can be mapped. This model
Partitioning, distributed processing, and fault tolerance can be easily extended to the distributed case by inserting
have been studied to some extent, e.g., in Borealis [1] by in- network reader/writer nodes which use appropriate com-
troducing replicated processing nodes as well as several new munication protocols and APIs, e.g., TCP/UDP sockets or
tuple types such as punctuation tuples and control tuples more advanced solutions like ZeroMQ.
like undo tentative (tuples resulting from processing a sub- There are two main reasons for distributing query nodes:
set of the input which can be corrected later) and done tuples increasing processing reliability by introducing redundancy,
indicating that state reconciliation finished. State reconcil- and increasing performance and/or scalability by load distri-
iation is the process of stabilizing the output result, e.g., bution. The following patterns support these goals to differ-
by replacing previously tentative results. In this way, this ent degrees. In this discussion, we use the term “query task”
approach aims at fault-tolerance but not at partitioning. as the unit of distribution/scheduling for both, elementary
Another approach in the form of a programming model algebra operators, and for dataflow (sub-)graphs with well-
has been proposed in [14] as so-called discretized streams (D- defined properties (input/output, stateless vs. statefulness).
Streams). This idea is based on resilient distributed datasets Pattern 1: Simple standby: For a critical query node N
which are storage abstractions used for rebuilding lost data. a standby node S is maintained on a separate compute unit
An approach addressing load balancing issues by par- which is activated if N fails. This requires monitoring of N ,
titioning while providing fault tolerance for pipelined e.g. by a combination of heartbeats and cluster coordination
dataflows is Telegraph’s FluX [10]. FluX is a dataflow opera- service such as ZooKeeper as well as rerouting the input
tor extending the idea of the exchange operator form parallel tuple stream to S. Since in case of a failure the state of N
query processing. The operator encapsulates state partition- is lost, this pattern is applicable only for stateless nodes.
ing and tuple routing and allows to repartition even stateful
operators while executing the dataflow pipeline. Pattern 2: Checkpointing: This pattern is similar to pat-
Scalable dataflow platforms try to extend the applicabil- tern 1 but supports stateful operators. Failover is achieved
ity of the MapReduce paradigm for large-scale parallel batch by periodically checkpointing the state of the critical node N
processing to pipeline processing and continuous query sup- to a shared disk and restarting the standby node S from the
port. One example is HOP (Hadoop Online Prototype) [4]. checkpoint. Examples of such checkpoints are the content of
In HOP, map tasks maintain TCP sockets to reducers for sliding windows or hash tables for joins and aggregations.
pipelining their output. In addition, pipelining is also sup- Pattern 3: Hot standby: If the failover time of pattern
ported between jobs by sending the output of reducers di- 2 is not acceptable, a hot standby approach can be chosen
rectly to mappers of a subsequent job. Further, distributed where redundant query nodes S are kept actively. To achieve
dataflow systems are Twitter’s Storm and Apache S4. Storm this, the input stream has to be sent to all redundant nodes,
implements fault detection at task level and guaranteed mes- either by using multicast strategies at the network or at the
sage passing, whereas in S4 messages can be lost. Storm runs application level. This pattern works both with stateless and
so-called topologies – subsets of these topologies are assigned stateful query operators but requires a special node to elim-
to worker processes of a cluster. However, these systems only inate duplicate results, e.g., a stream selector node which
offer a simple programming model and, therefore, operators forwards the input from only one of multiple streams.
2
network writer N network writer N multicast writer N stream selector
failover failover log failover
S S S
Figure 1: Simple Standby Figure 2: Checkpointing Figure 3: Hot Standby
Pattern 4: Stream partitioning: This pattern exploits AnduINv2 queries are deployed as separate processes in
data parallelism by partitioning the input stream. It can be Mesos which are just-in-time compiled using the system’s
implemented by a splitter node redirecting each input tuple C++ compiler. This provides an easy mechanism to plug-in
to one of the query nodes N1 . . . Nk or by a multicast writer user defined operators and exchange operator implementa-
with an additional partitioning node P1 . . . Pk for filtering tions. During deployment, processes are interlinked by query
the input stream according to the partitioning scheme. Fi- channels which simply represent an abstraction over network
nally, the results are merged into a single stream. connections (TCP/UDP sockets, ZeroMQ connections).
Pattern 5: Stream pipelining: In contrast to pattern 4 Query tasks can be shared among multiple queries when
this pattern exploits task parallelism by splitting a complex they share some common (sub)streams or operators as de-
query node N into a sequence of query nodes N1 . . . Nk and scribed in [3]. Further, a query can be implemented as a set
placing them on separate compute units. of tasks which are distributed across multiple nodes in the
cluster. Therefore, the logical query tree is partitioned into
Usually, multiple patterns will be applied in order to smaller subtrees that are translated separately. We will use
achieve certain quality of service (QoS) guarantees as fault these mechanisms for implementing the elasticity patterns.
tolerance (patterns 1-3) or elasticity for adapting resource
consumption (patterns 4-5) according to the needs of the 4.1 Dataflow Graph Rewriting
applications. Of course this pattern list does not claim com- dataflows are specified in an XML format which can be
pleteness. There are several others that are applicable under seen as an intermediate representation, allowing to use dif-
certain circumstances, e.g., parallelization through aggrega- ferent frontends such as CQL or graphical tools. A dataflow
tion trees for commutative and associative aggregation op- specification consists of stream type definitions and operator
erators [9]. Nevertheless, these basic patterns described here definitions with name, type, type-specific parameters as well
are well-known in distributed and parallel algorithms, and as input and output channels. These channels are typed and
– with slight modifications – cover various use cases 1 . are used to interconnect operators to form a graph. The fol-
In the following we will describe how these patterns can lowing example shows a simple dataflow specification. (We
be utilized in a dataflow framework to dynamically restruc- omitted the XML notation for better readability).
ture the physical representation of the graph in a continuous
query context that is executed in a cluster infrastructure. type name = ”aStreamType” {
column name = ”x”, type = ”int” . . .
The restructuring is achieved with a simple set of rewriting }
rules that are automatically applied on the graph without operator name = ”source”, type = ”reader” {
the need to manually code them as in existing approaches output name = ”aStream”, type =”aStreamType”
}
[11] while guaranteeing that no state information is lost dur- operator name = ”myFilter”, type = ”filter” {
ing the restructuring phase. We present the algorithms based input name = ”aStream”
on our AnduINv2 stream processing engine but highlight param condition = ”aStream.x < 42”
output name = ”filteredStream”, type = ”aStreamType”
that they are also applicable on other frameworks as Dryad }
with slight modifications to achieve streaming semantics. operator name = ”sink”, type = ”writer” {
input name = ”filteredStream”
}
4. QUERY DEPLOYMENT INFRASTRUC-
Query
TURE Query
operator
Parameters (Meta-Query)
Fig. 6 illustrates the dataflow model used in AnduINv2 Output Channels
Input Channels
and how it is mapped to the physical layer of executable Query task
code. While the first prototype of the system aimed at pro-
cessing sensor data as well as in-network processing [12] and Query Task
deploy
complex event processing (CEP) [8], our current research fo- Task State
cuses on processing techniques for cluster environments. The Query
~ (t)
P
AnduINv2 system comprises three components: (1) the run- scheduler
Query 1: t1, t2, t3, ...
time environment containing the implementation of query Query 2: ...
Query 3: ...
operators including a CEP engine as well as operators for
~
I(t) ~
O(t)
controlling the query execution, (2) a query compiler trans-
...
⌧i ⌧p
lating a dataflow-based query specification given in an XML
Mesos Standby
file into query tasks, i.e., executable code linked to the run- master master
~
S(t)
time environment, (3) a query scheduler and executor inte-
Input Queue Operator States Output Queue
grated with the Mesos cluster framework that deploys query Task Task Task Task T T
Query Query Query Query
tasks for processing them on physical nodes. executor executor executor executor
Mesos Mesos Mesos Mesos ... ...
1
Actually, aggregation trees are just combinations of parti- slave slave slave slave future tuples T processed tuples
tioning and pipelining patterns.
Figure 6: query model
3
P1 N1
multicast stream
P2 N2
writer merger
N1 N2 Nk
Pk Nk
Figure 4: Partitioning Figure 5: Pipelining
Note, that apart from stream input and possible output windows) and are updated with each incoming tuple through
no communication operators have to be specified as part a state transition function τi 2 . The meta query extension is
of the query. Such operators are added during rewriting if represented by special input channels P ~ (t) that modify the
necessary. For formulating rewriting rules we use a simple operator state through τp .
notation. A dataflow as given above is written as To guarantee that a node failure does not lead to an in-
sink := writer(f := filter(src := reader)) formation loss it is necessary that all results which have not
been consumed by the following target can be reproduced
where writer, filter, and reader are operator types and the from the possibly infinite input stream. Therefore, the op-
optional sink, f, and src names denote operator instances. erator state needs to be snapshotted after each input tuple,
During rewriting, graph patterns have to be matched and or – if this is too expensive – the input tuples need to be
constraints are checked. For this purpose, the pseudo-type persisted in order to reproduce this state with just ’replay-
any is used as a placeholder for any possible type, and any* ing’ the input. Which tuples are still required for a possible
represents a dataflow of arbitrary operator types. The fol- replay can be controlled by special tuple messages that are
lowing pattern matches a dataflow subgraph containing a exchanged between tuple producers and consumers as in the
stateless filter operator (which is the case for any filter): Borealis system [1]. Note when frameworks as ZeroMQ are
used to implement query channels, reliable message delivery
a2 := any(f := filter(a1 := any*))[stateless(f)]
can be guaranteed without the need to modify operators.
In order to implement fault tolerance with transferring
To apply the patterns, a rewriting rule can be specified:
stateful query tasks to other computing nodes, a simple pro-
⇒ a2 (stream-selector(failover(writer(f(reader(multicast(a1 ))))))) tocol as presented in [10] is sufficient:
| {z } | {z } | {z }
@p1 @p∗ @p2 (1) quiesce all input streams,
Besides inserting or replacing operators (such as stream- (2) replicate the task state to the target node,
selector, failover, and multicast operators in the previous ex- (3) redirect the input streams to the target node,
ample), operator nodes are also annotated with placement (4) unquiesce all input streams.
information where pi , pj with i 6= j denote distinct compute 4.3 Elasticity Handling
nodes and p∗ denotes an arbitrary number of nodes.
The same algorithm can be used to replace query tasks
4.2 Failover Handling with their rewritten versions that compute the same logical
With these rewriting rules, internal data management transformation but use different operator implementations
nodes and different operator implementations can be trans- and/or partitioning schemes of the query graph into tasks
parently injected into the query plan without impacting the for implementing the elasticity patterns 4 and 5.
application. This allows to re-schedule a query task to an- Rewriting Cost Model: Usually, there are several possi-
other node in case of a failure (pattern 1), use an operator bilities for rewriting dataflow graphs. In order to make right
implementation that automatically integrates snapshotting decisions which tasks shall be replaced and how many nodes
(pattern 2), or replicate a task to implement hot standby. should be allocated, a cost model is required taking possi-
The actual flow of data through query channels during ble rewriting benefits into account as well as costs for the
runtime is controlled by special operator parameters – e.g., restructuring, e.g., for transferring states or costs for addi-
target IP addresses and ports – that can be adjusted through tional resources from the cluster infrastructure. Discussing
a concept we call meta queries (cf. Sect. 4.5). To detect and elaborate decision models is not in the focus of this paper
react on failures, query tasks are instrumented with moni- and is left for future work. We outline a rate-based model
toring interfaces that inform the query scheduler about the that is suitable to find hot spots in dataflows and is used in
nodes’ health and performance measures as tuple processing related literature [13].
rates. The scheduler then triggers a graph rewriting. Rewritings should be done when a query task is detected
When a rewritten graph needs to be deployed, it has to be that cannot process its incoming tuples with a rate higher
guaranteed that no information of the tuple stream is lost. than the arrival rate, e.g., when the computational complex-
To analyze the necessary steps, we reduce a query task to a ity or the memory consumption is too high and therefore the
finite state machine model (cf. Fig. 6) which is common for task accumulates an increasing backlog. Such a task repre-
implementing CEP operators [6] but can also be applied for sents the critical path in the dataflow, limiting the overall
general dataflow transformations. The query task receives a throughput. For finding these paths, a rate-based optimiza-
~
stream of input tuples I(t), applies its logical operation(s) tion approach is suitable that scans the dataflows starting
λ – e.g., a filter or a join – to generate an output stream at source nodes and detects such bottlenecks based on moni-
~
O(t). (We use vectorial representations here to combine all toring information gathered during the execution [13]. After
channels into a single quantity.) The output might depend hot spots have been identified, one or multiple of the follow-
~
on the task’s state S(t), i.e., the state of all internal oper- ing methods can be applied for alleviating these bottlenecks.
ators which can basically include anything that is required 2
Usually, the separation of λ and τ is only conceptual and
for implementing the operators (e.g., hash tables, or sliding both functions are combined.
4
'tick'-pattern 'ticks'
P+ P +
value A D+ t1 t2 t3 (t4 t5 t6 ) t7
|{z} |{z} |{z} |{z}
B+ A B C D
C p1 p1p2
p2 p1p3
t 1 t2 t3 t4 t5 t 6
|{z} | {z } |{z} |{z}
t7 P + 1 p3 p2p3
... ...
A B C D
p1 p1p2 p1p2p3
p2 p1p3 ...
t4 t5 t6 t7 partition
|{z} |{z} |{z} |{z} p3 p2p3 ... P +
time A B C D ... ... ...
p3p4 p1p2p3
t1 t2 t3 t4 t5 t6 t7 pipeline
2 p4p5 p2p3p4
... ...
Figure 7: ’tick’-shaped pattern
P P ° P +
Task sharing (Pattern 5): When multiple queries share p1 p1p2 p1p2p3
p2 p1p3 ...
the same (sub)graph to increase data locality [3] and the ... ... ...
shared graph is on the critical path, this path can be repli-
cated, sharing groups can be repartitioned, and tuples dis- Figure 8: rewriting repetition operator
tributed to all replicas. This is the easiest way to remove A matches any incoming tuple
burden from the critical path since inputs of sharing groups B + matches all following tuples with decreasing value
are independent from each other and no special dataflow C matches the first tuple with increasing value after B
transformations need to be performed. but with a value less than the previous one of A
Inter-operator parallelism (Pattern 4): Usually, it is D+ matches following tuples with increasing values > A
better to keep dataflow operations on few nodes in order For parallelization, we focus on the +-operator since it
to avoid costly transfer operations. Hence, initially com- is challenging for three reasons: First, like a join, it can
piled queries will comprise few tasks consisting of large produce multiple output tuples per input tuple. Second, it
flow (sub)graphs. However, when the computational com- needs to store state information for extending existing pat-
plexity exceeds a certain threshold or memory limits for terns to longer ones. Since each tuple is possibly multiplying
keeping state information are exceeded, a distributed pro- the number of results it is likely that – due to memory con-
cessing pipeline will yield better performance. Large graphs straints – such operators will become critical in the dataflow
are partitioned, recompiled, and distributed on additional graph. Third, in most cases the behavior of the stream is not
nodes in the cluster. Moreover, splitting large costly tasks predictable, rendering static allocations infeasible.
into smaller distributed ones increases fault-tolerance since Fig. 8 illustrates how the operator can be distributed dy-
it will become cheaper to recover from node failures [11]. namically to multiple nodes with simply applying graph pat-
Intra-operator parallelism (Pattern 4+5): When the terns 4 and 5. The P +-operator comprises two parts: a pat-
previous patterns not applicable, e.g., when operators are tern matcher P , and a +-operator which maintains all previ-
not shared or the graph has already been split into base ously matched patterns as state and concatenates them with
operators, the last possibility to increase parallelism is par- subsequent matches. The output of + is the output for the
titioning input streams and processing each partition inde- entire operator and serves as input for + again to construct
pendent from each other with multiple operator instances. longer matches. On memory overflow, the operator state can
Unfortunately, this pattern is the most difficult to implement be partitioned and distributed to multiple instances where
since its applicability depends on the actual operator type. all instances receive the original input stream. Two different
Partitions in input streams need to be found, distributed to behaviors of the operator are required to avoid duplicates.
the operator instances, and their (partial) results have to be The first instance processes the input directly, i.e., all match-
merged afterwards. Further, this pattern is prone to data ing tuples serve as new patterns of length 1. Those matches
skews and, therefore, some sort of load balancing has to be must not be reproduced by other instances that simply serve
implemented, e.g., by monitoring the load of each partition as targets for overflowing patterns that do not fit into the
and dynamically re-schedule partitions as proposed in [10]. local state but are independent from each other and hence
This concept seamlessly integrates with the graph rewriting can be processed on separate nodes. When the complexity
patterns described in this paper but again involves addi- of the matching algorithm P is critical, elasticity can also be
tional costs for transferring partitions among cluster nodes. achieved with implementing a pipeline. It exploits the fact
The most challenging problem in this context is finding that P + can be expressed through P ∨(P P + ), i.e., a pipeline
suitable partitioning schemes for the operators that shall of arbitrary length is constructed for matching incoming tu-
be deployed in the framework, especially when they are ples in parallel, emitting them as output, and forwarding
not stateless. In the following, we will present a paralleliza- them to the next stage for extension.
tion scheme in the complex event processing (CEP) context Since such parallelization schemes depend on operator se-
which is prominent in stream processing. mantics, the framework provides them to automatically scale
The task of CEP is finding complex patterns in a stream up and down required resources for built-in operators. For
of base patterns. These patterns are defined through cer- all user defined functions which are treated as black boxes
tain properties of incoming tuples, usually described through by AnduINv2, the parallelization needs to be implemented
predicates and additional correlations of their arrival time. by the user as in [11] or are provided through libraries that
Mostly, sequences and repetitions are used which can be ex- are linked as plugin to the execution environment.
pressed with regular expressions [15]. We demonstrate the
parallelization on the ’tick-shaped’ pattern example from [5] 4.4 Mesos Integration
which is illustrated in Fig. 7. In the original publication, the Mesos [7] is a cluster management software for resource
task for detecting such patterns is originated in stock ex- isolation and sharing. In Mesos, a master daemon (possi-
change trading, but it could also be applied for burst detec- bly supported by additional standby masters) manages a
tion. A ’tick-shape’ can be expressed by AB + CD+ where: set of slaves nodes. An application (called framework) runs
5
tasks on these slaves which is initiated by so-called execu- being integrated into AnduINv2, but can be applied in other
tors. Scheduling and resource assignments are managed by platforms, too. Our main questions we would like to answer
an application-specific scheduler. In order to support stream with future experiments are:
queries we implemented our own framework (cf. Fig. 6),
1) Which cost models are valid for online graph rewriting?
providing an executor for running query executables (query
2) How can resource requirements for a query be estimated
tasks) on slave nodes and a query scheduler which gets re-
before actually executing it?
source offers from the Mesos master (available cores, mem-
3) How can certain QoS guarantees be given to applications?
ory, and network ports) and requests for executing AnduIN
4) Can elastic stream processing benefit from heterogeneous
queries. Each query deployment request is described by a
clusters nodes?
unique ID, the executable, and a specification of resource
requirements, i.e., CPU cores, memory, and a list of query While the first questions intent to pave the way for a
channels which have to be mapped to network ports. This streaming-as-a-service infrastructure, answering the last one
specification is used by the scheduler to choose a slave node is needed to keep up with current hardware development
providing the requested resources for execution. Currently, trends. We believe that parallel and specialized processors
only a simple strategy is implemented selecting the first offer as many-core CPUs, GPUs, or FPGAs will find their way
providing the requested resources – more advanced strate- into future computing centers to provide the most efficient
gies are subject of future work. If the scheduler has chosen computing platforms for dedicated tasks – an important as-
an appropriate node, the request is forwarded to the corre- pect to tackle the big data challenge.
sponding executor. The scheduler assigns physical network
ports to query channels and tracks these assignments to be 6. REFERENCES
able to connect subsequent queries referring to the same
[1] D. J. Abadi, Y. Ahmad, M. Balazinska, et al. The Design of
logical channel. In this way, a query implemented by one or the Borealis Stream Processing Engine. In CIDR ’05, 2005.
more tasks can be deployed to one or more cluster nodes. [2] A. Arasu, S. Babu, and J. Widom. CQL: A language for
continuous queries over streams and relations. In Database
4.5 Meta Queries Programming Languages. Springer, 2004.
Though, Mesos provides mechanisms to deploy processes, [3] J. Chen, D. J. DeWitt, F. Tian, and Y. Wang. NiagaraCQ:
it does not support elastic operation for stream queries. In a scalable continuous query system for Internet databases.
Hadoop, it is the task of the job tracker to partition the work SIGMOD Rec., 29:379–390, 2000.
[4] T. Condie, N. Conway, P. Alvaro, J. M. Hellerstein,
across a set of map and reduce tasks. In case of data streams
K. Elmeleegy, and R. Sears. MapReduce online. In NSDI,
the situation is a bit different, because we cannot simply pages 21–21, 2010.
stop and continue/restart queries without loosing data. The [5] N. Dindar, P. M. Fischer, M. Soner, and N. Tatbul.
only way to achieve elasticity is to change query behavior at Efficiently correlating complex events over live and archived
runtime. Therefore, we introduce the idea of meta queries: in data streams. In DEBS ’11. ACM, 2011.
each (adjustable) query task an additional query is running [6] M. Eckert, F. Bry, S. Brodt, O. Poppe, and S. Hausmann.
on a control stream consisting of tuples of the form: A CEP Babelfish: Languages for Complex Event Processing
and Querying Surveyed. In Reasoning in Event-Based
hquery id, operator id, parameter, valuei Distributed Systems. Springer, 2011.
[7] B. Hindman, A. Konwinski, M. Zaharia, A. Ghodsi, A. D.
The control stream is produced by the query scheduler Joseph, R. Katz, S. Shenker, and I. Stoica. Mesos: A
platform for fine-grained resource sharing in the data
which monitors resource utilization and implements strate- center. In NSDI, pages 22–22, 2011.
gies for dynamic reallocation. Meta queries are particular [8] S. Hirte, E. Schubert, A. Seifert, S. Baumann, D. Klan, and
useful for implementing the patterns described in Sect. 3. K. Sattler. Data3 - A Kinect Interface for OLAP using
For instance, for failover without publish-subscribe (pattern Complex Event Processing. In ICDE, 2012.
1 and 2), the network writer has to be informed about the [9] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly.
network address of the newly activated standby node S. For Dryad: distributed data-parallel programs from sequential
this purpose, the network writer provides a parameter target- building blocks. SIGOPS, 41:59–72, 2007.
addr for the target address. A control stream tuple like [10] M. S. Joseph, J. M. Hellerstein, S. Ch, and M. J. Franklin.
Flux: An Adaptive Partitioning Operator for Continuous
Query Systems. In ICDE, 2002.
hquery#42, writer#2, target-addr, ”tcp://node2:6666”i
[11] Q. Ke, M. Isard, and Y. Yu. Optimus: a dynamic rewriting
received by the query task triggers sending the tuple stream framework for data-parallel execution plans. In EuroSys,
to the standby node node2. Similarly, for implementing pat- pages 15–28, 2013.
tern 3, the stream selector node can be informed about [12] D. Klan, M. Karnstedt, K. Hose, L. Ribe-Baumann, and
K. Sattler. Stream engines meet wireless sensor networks:
switching to the stream produced by query node S. cost-based planning and processing of complex queries in
For partitioning patterns like pattern 4 it is either required AnduIN. Distrib. and Parallel Databases, 29:151–183, 2011.
to modify the tuple distribution strategy of multicast writers [13] S. D. Viglas and J. F. Naughton. Rate-based query
or to adjust partitioning predicates Pi in Fig. 3. Both can be optimization for streaming information sources. In
easily implemented by sending appropriate control tuples. SIGMOD ’02. ACM, 2002.
[14] M. Zaharia, T. Das, H. Li, S. Shenker, and I. Stoica.
Discretized streams: an efficient and fault-tolerant model
5. CONCLUSION AND FUTURE WORK for stream processing on large clusters. In HotCloud ’12.
We presented basic concepts how fault-tolerance and elas- USENIX Association, 2012.
ticity can be achieved in the context of continuous query [15] F. Zemke, A. Witkowski, M. Cherniak, and L. Colby.
processing by combining techniques that have proven appli- Pattern matching in sequences of rows. Technical report,
ANSI Standard Proposal, 2007.
cability in other scenarios. These approaches are currently
6