=Paper=
{{Paper
|id=Vol-2399/paper05
|storemode=property
|title=From Distributed Sources to Distributed Sinks: Towards Truly Decentralized Event Stream Processing
|pdfUrl=https://ceur-ws.org/Vol-2399/paper05.pdf
|volume=Vol-2399
|authors=Samira Akili
|dblpUrl=https://dblp.org/rec/conf/vldb/Akili19
}}
==From Distributed Sources to Distributed Sinks: Towards Truly Decentralized Event Stream Processing==
ceur-ws.org/Vol-2399/paper05.pdf
From Distributed Sources to Distributed Sinks:
Towards Truly Decentralized Event Stream Processing
Samira Akili
Supervised by Matthias Weidlich
Humboldt-University of Berlin
akilsami@hu-berlin.de
ABSTRACT Q1 Q2 Q1 Q1 Q2
Distributed stream processing evaluates queries over data
produced by geographically distributed sources. To efficiently
handle large amounts of decentralized data, whilst coping
with bandwidth restrictions, applications employ in-network
processing. To this end, a query is modularized and its
operators are assigned to network nodes, especially those
that act as data sources. The latter is known as the operator sink operator source
placement problem. Existing solutions to it, however, handle
data generated by distributed sources, whereas query results Figure 1: Evaluation of two queries, Q1 and Q2 , with
are gathered at one designated node – the sink. traditional single-sink operator placement (left) and
We argue that such single-sink solutions are not applicable multi-sink operator placement (right).
for non-hierarchical system, in which multiple nodes need to
of the network. The latter is known in literature as the
be informed about query results. Also, having a single sink
operator placement problem [14].
enforces centralisation in compositional applications, where
While existing approaches for DSP place operators inside
the result of one query is the input to another query. This
the network, the results of an application are gathered at one
PhD project therefore aims to develop algorithms for multi-
designated node [17], i.e. the sink, which is usually located
sink operator placement. We show that the computation
in a data centre. In this work, we argue that a single-sink
of network costs for efficient operator placement, however,
approach is not suitable for non-hierarchical systems and
requires us to incorporate various aspects of event generation,
compositional applications. Non-hierarchical systems, such
query processing semantics, and network properties.
as autonomous agent networks or car platooning, require a
large number of nodes, potentially the whole network, to be
informed about query results. At the same time, it is often
1. INTRODUCTION desirable to build streaming applications by composition:
In domains such as logistics, smart grids, or supply chain The sink of one query represents a source for another one.
management, applications rely on data produced by geo- By enforcing a single sink, however, existing approaches do
graphically distributed sources [17]. They exploit this data not support decentralisation beyond a single query.
by evaluating streaming queries in large networks, whose This PhD project aims at developing foundations of dis-
nodes are defined by sensors, middleware components, and tributed stream processing with multiple sinks. Unlike tradi-
information systems. While centralized approaches for dis- tional operator placement that enforces a single sink node, see
tributed stream processing (DSP) typically do not scale due Figure 1 (left), we strive for placements with multiple sinks,
to bandwidth and delay restrictions, decentralized in-network see Figure 1 (right). This way, we achieve truly decentralised
processing takes advantage of data locality to reduce net- stream processing that caters for distributed information de-
working costs. To this end, the queries of an application mands in non-hierarchical systems and avoids centralisation
are split into operators, which are evaluated inside the net- in query composition. Considering multiple sinks yields a
work, possibly directly at the nodes that denote data sources. higher degree of freedom for operator placement. Hence, tra-
Hence, at the core of (DSP) lies the modularization of a ditional algorithms to determine placements that minimise
query workload and the assignment of operators to the nodes network costs can no longer be used.
We illustrate our research setting with a case study pro-
vided by [1], which deals with autonomous transport robots
serving machines in a factory. The robots are equipped with
various sensors and communicate over wifi. The robots di↵er
in the types of their sensors, as they are assigned di↵erent
kinds of tasks. Sensor signals and network messages consti-
tute continuous streams of events. Based thereon, queries are
Proceedings of the VLDB 2019 PhD Workshop, August 26th, 2019. Los evaluated to monitor the conduct of transport jobs. These
Angeles, California.
Copyright (C) 2019 for this paper by its authors. Copying permitted for jobs are announced by machines and distributed among the
private and academic purposes. robots that engage in a bidding process.
In the remainder, Section 2 gives a formulation of our Given an event-sourced network (G, f ) and a query (O, ),
research context. We review related work in Section 3, and a placement p : O ! 2V is a function that assigns each
present our preliminary results in Section 4. In Section 5, operator to a set of nodes. We refer to a node to which an
we conclude and outline our research agenda. operator o has been assigned as a host of o. Placements
S are
directly lifted to a query workload Q, i.e., pw : (O, )2Q O !
2V assigns all operators of queries of Q to sets of nodes.
2. RESEARCH CONTEXT A placement dictates which nodes have to exchange events
We first define a query model for stream processing. We to evaluate queries: In general, a node n receives events
then formalize the problem of operator placement with multi- from all nodes that host operators that are inputs for at
ple sinks and elaborate on various dimensions of this problem. least one operator hosted at n. The quality of a query
placement is assessed by the network costs it inflicts. We
2.1 Query Model define the network costs as the total rate with which events
To exemplify our approach, we employ a query language are exchanged. Given a placement pw of a query workload
model as used in Complex Event Processing (CEP) [5]. Q, we denote its total cost by c(pw ) 2 R. Based thereon,
Let U = E1 , . . . , En denote the universe of primitive event we capture the general problem addressed by our work as
types. Each event type E consists of a set of attributes follows:
E = (A1 , . . . , An ), with, w.l.o.g., A1 being a timestamp. A
query defines a composite event pattern through a set of op- Problem 1. Let Q be a query workload and (G, f ) a
erators, predicates describing correlation between events, and event-sourced network. The problem of multi-sink opera-
a time window within which the pattern has to be detected. tor placement is to determine a placement pw for Q such
An operator of a query is either defined by a primitive event that c(pw ) is minimal.
type, or composed of other operators. Common composite
operators are SEQ, AND, OR and NOT [11]. The AND operator
matches, if all input event types occur; the SEQ operator It is important to note that unlike traditional approaches we
requires all inputs in the specified order; the OR operator consider to assign an operator to a set of nodes – leading to
matches, if at least one of the inputs occurs; and the NOT placements with multiple sinks.
operator requires the absence of its input and is typically
defined in the context of another operator. An example 2.3 Problem Dimensions
for the latter is the query SEQ(A a, NOT(B b), C c), which The problem of multi-sink operator placement needs to be
matches if no event of type B occurs between events of types addressed in the light of various parameters that arise from
A and C. We write An as a shorthand for SEQ(A a1 , ..., A an ). an application domain. Below, we provide an overview of this
Consider the above case of autonomous transport robots. A parameter space, along three dimensions: event generation,
monitoring query detects the following situation: An obstacle query processing semantics, and network properties.
is reported (event type Ob) three times (a single observation
is not trustworthy due to workers passing the area). Then, 2.3.1 Event Generation
a request is issued to tow away the obstacle (Tow Req). A Event Rates. To determine an optimal operator place-
robot capable of towing obstacles acknowledges the request ment, knowledge about the rates of event generation is crucial.
(Tow Ack). However, within five minutes the obstacle is still Intuitively, it is beneficial to host an operator where its in-
observed at the respective position. This situation hints at put events are generated with high rates. Such knowledge
overloading of the robots capable of towing away obstacles may be available at di↵erent granularities, in terms of a
and may be detected with the following query: local, global, or distributed rate per event type. Under a local
rate, events of a type are generated with a di↵erent rate per
SEQ ( ( Ob o ) 3 , Tow Req tq , Tow Ack ta , Obst o ’ )
node. An example from our case study are events of type
WHERE o . p o s i t i o n = o ’ . p o s i t i o n
AND o ’ . t i m e s t a m p ta . timestamp 5 min BATTERY LOW generated by a robot, if it needs to charge
WITHIN 10 min its battery. The rates at which BATTERY LOW events are
generated depends on the deployed battery and di↵ers for
each robot. In contrast, events may be generated with a
2.2 Problem Statement global rate, i.e., the same rate per node. An example are
A query can be represented as operator tree (neglecting the BEACON MESSAGE events emitted periodically by each robot
correlation predicates and time window). This is formalized about its location. Under a distributed rate, only the total
as a tuple (O, ), where O is a set of operators with Op ⇢ O rate of event generation in the network is known, but not the
being the operators defined by primitive event types, and rates at individual nodes. Consider ACCEPT JOB MESSAGE
: O ! 2O is a function that assigns input operators to an events sent by the robots that win the bidding for transport
operator (i.e. its children in the tree). A workload Q is a set jobs. While the total rate of these events is known, their
of queries. distribution among the wining robots is not.
Queries are evaluated in an event-sourced network, which Event Uniqueness. The uniqueness of events influences
is defined as a tuple (G, f ) where G is an undirected graph the cost of operator placements: the local event traces of
G = (V, E) with V as a set of nodes and E as a set of nodes may contain only distinct events or show some overlap
communication channels, and f : V ! 2U as a function that (i.e., the same event may be generated by more than one
defines the types of events generated at a node. Here we only node). In our case study, the latter is observed for TEMPERA-
consider the case that G is complete. Each node generates a TURE DROP events: If temperature drops below a threshold,
local trace, an infinite sequence of events of the respective all robots equipped with a thermometer report the situation
types that are totally ordered by their timestamps. simultaneously. From an application point of view, there is
N:
AC 1 3 CD tion, where low frequency events are pushed, while high
r = rA = rC = rD
frequency events are pulled on demand.
2 Load Balancing. A simple strategy for load balancing
AC Q1: AND(A3, C, D, A3)
in DSP is to restrict the maximum number of operators that
can be hosted per node. Yet, this number may be largely
p unrest. cont. p unrest. cont.
1
-
2 detached from the actual computational load (CPU cycles)
1 - 1 3 AND(A3, C) 2xr 2 x r/6
3
AND(A , C, D, A )3
AND(A3) and networking load (number of received events). To achieve
2 1 AND(A3, C) 2xr 2 x r/6 2 3 AND(A3, C) 2xr 2 x r/6 e↵ective load balancing, the types of operators have to be
AND(A3) AND(A3) taken into account. Stateful operators, e.g. SEQ, are particu-
r r 3 3 3
- -
3 1 AND(C,D) AND(A , C, D, A ) larly demanding in terms of computational resources [19].
3r 8r/6 4r 4r/6 Constraints on Event Di↵usion. The communication
within the network might be restricted, e.g. due to privacy or
Figure 2: Depending on the consumption policy dif- organizational reasons. Such restrictions can be defined on
ferent placements yield optimality. the node level, prohibiting specific nodes to exchange events,
or on the event level, prohibiting certain event types to be
no need to distinguish these reports, so that they are con- shared across the network. As a consequence, some nodes
sidered as a single event that may be generated at multiple might be not eligible to become host for an operator, which
nodes. Clearly, this influences the costs of a placement. leads to a reduced space of possible placements.
2.3.2 Query Processing Semantics 3. RELATED WORK
Consumption Policy. Semantics of streaming queries Operator placement has been investigated in the con-
are fine-tuned by a consumption policy that dictates how to text of distributed stream processing [13–15], distributed
select events for query matching [3]. For instance, under an CEP [9, 10, 18], and sensor networks [8, 16]. For non tree-
unrestricted policy, an event may be processed by multiple structured queries, the problem is known to be NP-hard [7],
operators of the same query [2]. In contrast, a continuous so that most existing approaches yield heuristic placement
policy allows each event to be processed by exactly one algorithms. In [12] and [17], distributed stream processing
operator (i.e., the event is consumed by the operator). and CEP applications were surveyed and it was concluded
Consider Figure 2, which illustrates a network of three that they di↵er in their employed system and query model,
nodes generating events of four types (a box next to a node optimization goal and in whether the placement algorithm
shows the generated event types) with the same (local) rate is centralized or decentralized. In all of the works mentioned
r and the query Q1 . Under an unrestricted policy, the query above, the operator placement problem is formulated such
would match once a set of three A events, a C event, and a that an operator is placed at exactly one node leading to
D event have been observed in the network. With a contin- single-sink solutions, which are not suitable for our intent.
uous policy, a total of six A events is required for a match, Distributed Stream Processing. In DSP, operators
though. The former policy leads to an event being sent from are usually considered as black-boxes, so that none of the
its generating node to all nodes hosting operators for the existing works considers operator semantics. In [14] and [15],
respective type. The latter policy, in turn, means that this is placement algorithms are introduced to optimize bandwidth
not required. Figure 2 also illustrates how the the consump- and delay. While di↵erent types of event generation can be
tion policy impacts the cost of an operator placement: It employed in the proposed cost model, none of the factors
shows two placements, p1 and p2, with the query’s sink node we discussed as network properties are addressed. Recently,
being placed at either node 1 or node 3, respectively. Each Nardelli et al. [13] introduced several heuristics to compute
node optimizes query evaluation and sends partial matches placements, yet considering solely single-sink solutions.
instead of individual events, e.g. in placement p1 , node 2 Distributed Complex Event Processing. In [9, 10,
sends matches for AND(A3 ,C) to node 1 instead of all events 18], (single-sink) placement algorithms that optimize network-
of type A and C. For an unrestricted policy, p1 yields minimal ing costs are introduced. Chen et al. [9] employ a language
network costs, whereas p2 is optimal under a continuous model similar to ours and support push-pull communica-
policy. Thus, the chosen policy impacts the rates with which tion. However, neither load balancing strategies nor query
A events are sent and leads to di↵erent optimal placement. semantics are taken into account. Cugola et al. [10] dis-
Selectivity. The cost of operator placement is also influ- cuss placement strategies for CEP queries written in TESLA.
enced by selectivity of an operator, i.e., the portion of events While query semantics are incorporated, the approach ignores
processed by it that fulfill the correlations predicates (see requirements in terms of load balancing. Starks et al. [18]
Section 2.1). The selectivity may be estimated based on the investigate distributed CEP for mobile ad-hoc networks and
distribution of the events’ attribute values. As the selectivity propose a decentralized algorithm. They argue that due to
of an operator governs the output rate of a node hosting the the inherent dynamicity of such systems, placements need to
operator, it influences the costs of a placement. be recomputed frequently, which calls for optimizations of
the placement algorithm itself in terms of networking costs.
2.3.3 Network Properties Neither load balancing nor push-pull communication are
Push-Pull Communication. Instead of requiring nodes supported by the proposed placement mechanism, though.
to send events to other nodes for processing (known as push- Sensor Networks. Srivastava et al. [16] introduce a
based communication), DSP may also exploit pull-based placement algorithm that is optimal in bandwidth consump-
communication [4]. Then, operators pull their input events tion and load balancing. However, their approach is only
from other nodes, which potentially reduces network costs. applicable for operators resembling filters, which strongly
Both communication paradigms induce space for optimisa- limits the query language and simplifies load balancing. In [8]
S
processing. Moreover, we sketched a first algorithm to in-
O corporate various dimensions of the problem. It yields an
optimal placement with a sink for each query of a workload.
As a next step, we plan to extend our approach to sup-
port load balancing strategies, the push-pull rationale, and
S@1 S@2 S@3
O@1 O@2 O@3
operator reuse between di↵erent queries of a workload. Fur-
thermore, we intend to investigate how to decentralize the
computation of a placement itself by relying on local infor-
A1@1 A1@2 A1@3 A2@1 A2@2 A2@3 Q2 : SEQ
mation in the neighbourhood of a node, thereby avoiding the
necessity of having global knowledge on the network. We
AND1 AND2
expect that decentralizing the placement algorithm favours
Q3 : OR efficient recomputation of operator placements to react to
A1 A2 changes in the network. To cope with dynamicity and failures,
sink
we will also explore notions of robustness of a placement.
Figure 3: Graphs for the queries Q2 (left) and
Q3 (right) for a network consisting of three nodes. 6. REFERENCES
[1] proANT Transport Robots . http:
decentralized, adaptive placement are proposed for continu- //www.insystems.de/en/produkte/proant-transport-roboter/.
ous queries, minimizing networking costs. Yet, the influence [2] R. Adaikkalavan and S. Chakravarthy. Seamless event and data
of network properties on the placement is neglected. stream processing: Reconciling windows and consumption
modes. In DASFAA, pages 341–356. Springer, 2011.
[3] A. Adi and O. Etzion. Amit - the situation manager. VLDB J.,
13(2):177–203, 2004.
4. PRELIMINARY RESULTS [4] M. Akdere, U. Çetintemel, and N. Tatbul. Plan-based complex
event detection across distributed sources. VLDB, 1(1):66–77,
As a first step towards an efficient multi-sink operator 2008.
placement strategy, we adapted the shortest tree algorithm [5] A. Artikis, A. Margara, M. Ugarte, S. Vansummeren, and
introduced in [6]. The algorithm can be applied for a query M. Weidlich. Complex event recognition languages: Tutorial. In
having a tree structure and produces an optimal placement DEBS, pages 7–10. ACM, 2017.
[6] S. H. Bokhari. A shortest tree algorithm for optimal
for a query workload having a di↵erent sink for each query. assignments across space and time in a distributed processor
The idea is to construct a graph such that its vertices reflect system. IEEE TSE, (6):583–589, 1981.
all possible placements and the edge weights the respective [7] V. Cardellini, V. Grassi, F. Lo Presti, and M. Nardelli. Optimal
costs. Using a dynamic programming approach we can ef- operator placement for distributed stream processing
applications. In DEBS, pages 69–80. ACM, 2016.
ficiently compute a shortest path tree for the graph. The [8] G. Chatzimilioudis, A. Cuzzocrea, D. Gunopulos, and
vertices of the shortest path tree yield an optimal place- N. Mamoulis. A novel distributed framework for optimizing
ment for which costs are given by the sum of the tree’s edge query routing trees in wireless sensor networks via optimal
operator placement. JCSS, 79(3):349–368, 2013.
weights. Figure 3 illustrates such a graph for the query Q2
[9] J. Chen, L. Ramaswamy, D. K. Lowenthal, and
and a network consisting of three nodes: The vertices are S. Kalyanaraman. Comet: Decentralized complex event
labelled with tuples of operators from O \ Op (i.e. we do detection in mobile delay tolerant networks. In MDM, pages
not consider primitive events as operators here) and node 131–136. IEEE, 2012.
[10] G. Cugola and A. Margara. Deployment strategies for
ids, e.g. the vertex S1 @1 reflects the placement of operator distributed complex event processing. Computing,
S1 at node 1. The set of vertices describing all placement 95(2):129–156, 2013.
possibilities of one operator o is called the layer of o, e.g. the [11] I. Flouris, N. Giatrakos, A. Deligiannakis, M. Garofalakis,
vertices {S1 @1, S1 @2, S1 @3} form the layer of the SEQ oper- M. Kamp, and M. Mock. Issues in complex event processing:
Status and prospects in the big data era. JSS, 127:217–236,
ator. The edges of the graph connect the nodes according to 2017.
the operator tree: If an operator o is a parent of an operator [12] G. T. Lakshmanan, Y. Li, and R. Strom. Placement strategies
q in the query graph, then each node of the layer of o is for internet-scale data stream systems. IEEE Internet
connected to each node of the layer of q. The weight of an Computing, 12(6):50–60, 2008.
[13] M. Nardelli, V. Cardellini, V. Grassi, and F. L. PRESTI.
edge between two vertices A1 @1, S@2 reflects the event rates Efficient operator placement for distributed data stream
that have to be exchanged in order to place SEQ at node 2 processing applications. IEEE TPDS, 2019.
when AND1 is placed at node 1. The consumption policy, [14] P. Pietzuch, J. Ledlie, J. Shneidman, M. Roussopoulos,
selectivity and event generation a↵ect the rates events are M. Welsh, and M. Seltzer. Network-aware operator placement
for stream-processing systems. In ICDE, pages 49–49. IEEE,
exchanged with and thus can be encoded in the edges weights 2006.
of the graph. Constrained event di↵usion is also supported [15] S. Rizou, F. Durr, and K. Rothermel. Solving the
by the algorithm by adding a pruning step: if two nodes are multi-operator placement problem in large-scale operator
networks. In ICCCN, pages 1–6. IEEE, 2010.
not allowed to communicate the link weights between two
[16] U. Srivastava, K. Munagala, and J. Widom. Operator
vertices referring to those nodes are set to infinite. For an placement for in-network stream query processing. In PODS,
event type that is not to be shared across the network, a pages 250–258. ACM, 2005.
layer of its consuming operator contains only placements at [17] F. Starks, V. Goebel, S. Kristiansen, and T. Plagemann.
Mobile distributed complex event processing—ubi sumus? quo
nodes that generate the respective event type. vadimus? In Mobile Big Data, pages 147–180. Springer, 2018.
[18] F. Starks and T. P. Plagemann. Operator placement for
efficient distributed complex event processing in manets. In
5. CONCLUSION AND NEXT STEPS WiMob, pages 83–90. IEEE, 2015.
[19] H. Zhang, Y. Diao, and N. Immerman. On complexity and
We introduced the multi-sink operator placement problem, optimization of expensive queries in complex event processing.
thereby paving the way for truly decentralized event stream In SIGMOD, pages 217–228. ACM, 2014.