=Paper= {{Paper |id=Vol-2399/paper05 |storemode=property |title=From Distributed Sources to Distributed Sinks: Towards Truly Decentralized Event Stream Processing |pdfUrl=https://ceur-ws.org/Vol-2399/paper05.pdf |volume=Vol-2399 |authors=Samira Akili |dblpUrl=https://dblp.org/rec/conf/vldb/Akili19 }} ==From Distributed Sources to Distributed Sinks: Towards Truly Decentralized Event Stream Processing== https://ceur-ws.org/Vol-2399/paper05.pdf
                                                                                                                  ceur-ws.org/Vol-2399/paper05.pdf




         From Distributed Sources to Distributed Sinks:
      Towards Truly Decentralized Event Stream Processing

                                                                  Samira Akili
                                                      Supervised by Matthias Weidlich
                                                       Humboldt-University of Berlin
                                                          akilsami@hu-berlin.de

ABSTRACT                                                                                Q1          Q2            Q1   Q1     Q2


Distributed stream processing evaluates queries over data
produced by geographically distributed sources. To efficiently
handle large amounts of decentralized data, whilst coping
with bandwidth restrictions, applications employ in-network
processing. To this end, a query is modularized and its
operators are assigned to network nodes, especially those
that act as data sources. The latter is known as the operator                    sink        operator    source

placement problem. Existing solutions to it, however, handle
data generated by distributed sources, whereas query results              Figure 1: Evaluation of two queries, Q1 and Q2 , with
are gathered at one designated node – the sink.                           traditional single-sink operator placement (left) and
   We argue that such single-sink solutions are not applicable            multi-sink operator placement (right).
for non-hierarchical system, in which multiple nodes need to
                                                                          of the network. The latter is known in literature as the
be informed about query results. Also, having a single sink
                                                                          operator placement problem [14].
enforces centralisation in compositional applications, where
                                                                             While existing approaches for DSP place operators inside
the result of one query is the input to another query. This
                                                                          the network, the results of an application are gathered at one
PhD project therefore aims to develop algorithms for multi-
                                                                          designated node [17], i.e. the sink, which is usually located
sink operator placement. We show that the computation
                                                                          in a data centre. In this work, we argue that a single-sink
of network costs for efficient operator placement, however,
                                                                          approach is not suitable for non-hierarchical systems and
requires us to incorporate various aspects of event generation,
                                                                          compositional applications. Non-hierarchical systems, such
query processing semantics, and network properties.
                                                                          as autonomous agent networks or car platooning, require a
                                                                          large number of nodes, potentially the whole network, to be
                                                                          informed about query results. At the same time, it is often
1.    INTRODUCTION                                                        desirable to build streaming applications by composition:
   In domains such as logistics, smart grids, or supply chain             The sink of one query represents a source for another one.
management, applications rely on data produced by geo-                    By enforcing a single sink, however, existing approaches do
graphically distributed sources [17]. They exploit this data              not support decentralisation beyond a single query.
by evaluating streaming queries in large networks, whose                     This PhD project aims at developing foundations of dis-
nodes are defined by sensors, middleware components, and                  tributed stream processing with multiple sinks. Unlike tradi-
information systems. While centralized approaches for dis-                tional operator placement that enforces a single sink node, see
tributed stream processing (DSP) typically do not scale due               Figure 1 (left), we strive for placements with multiple sinks,
to bandwidth and delay restrictions, decentralized in-network             see Figure 1 (right). This way, we achieve truly decentralised
processing takes advantage of data locality to reduce net-                stream processing that caters for distributed information de-
working costs. To this end, the queries of an application                 mands in non-hierarchical systems and avoids centralisation
are split into operators, which are evaluated inside the net-             in query composition. Considering multiple sinks yields a
work, possibly directly at the nodes that denote data sources.            higher degree of freedom for operator placement. Hence, tra-
Hence, at the core of (DSP) lies the modularization of a                  ditional algorithms to determine placements that minimise
query workload and the assignment of operators to the nodes               network costs can no longer be used.
                                                                             We illustrate our research setting with a case study pro-
                                                                          vided by [1], which deals with autonomous transport robots
                                                                          serving machines in a factory. The robots are equipped with
                                                                          various sensors and communicate over wifi. The robots di↵er
                                                                          in the types of their sensors, as they are assigned di↵erent
                                                                          kinds of tasks. Sensor signals and network messages consti-
                                                                          tute continuous streams of events. Based thereon, queries are
Proceedings of the VLDB 2019 PhD Workshop, August 26th, 2019. Los         evaluated to monitor the conduct of transport jobs. These
Angeles, California.
Copyright (C) 2019 for this paper by its authors. Copying permitted for   jobs are announced by machines and distributed among the
private and academic purposes.                                            robots that engage in a bidding process.
  In the remainder, Section 2 gives a formulation of our             Given an event-sourced network (G, f ) and a query (O, ),
research context. We review related work in Section 3, and        a placement p : O ! 2V is a function that assigns each
present our preliminary results in Section 4. In Section 5,       operator to a set of nodes. We refer to a node to which an
we conclude and outline our research agenda.                      operator o has been assigned as a host of o. Placements
                                                                                                                   S         are
                                                                  directly lifted to a query workload Q, i.e., pw : (O, )2Q O !
                                                                  2V assigns all operators of queries of Q to sets of nodes.
2.    RESEARCH CONTEXT                                               A placement dictates which nodes have to exchange events
  We first define a query model for stream processing. We         to evaluate queries: In general, a node n receives events
then formalize the problem of operator placement with multi-      from all nodes that host operators that are inputs for at
ple sinks and elaborate on various dimensions of this problem.    least one operator hosted at n. The quality of a query
                                                                  placement is assessed by the network costs it inflicts. We
2.1    Query Model                                                define the network costs as the total rate with which events
   To exemplify our approach, we employ a query language          are exchanged. Given a placement pw of a query workload
model as used in Complex Event Processing (CEP) [5].              Q, we denote its total cost by c(pw ) 2 R. Based thereon,
Let U = E1 , . . . , En denote the universe of primitive event    we capture the general problem addressed by our work as
types. Each event type E consists of a set of attributes          follows:
E = (A1 , . . . , An ), with, w.l.o.g., A1 being a timestamp. A
query defines a composite event pattern through a set of op-         Problem 1. Let Q be a query workload and (G, f ) a
erators, predicates describing correlation between events, and    event-sourced network. The problem of multi-sink opera-
a time window within which the pattern has to be detected.        tor placement is to determine a placement pw for Q such
An operator of a query is either defined by a primitive event     that c(pw ) is minimal.
type, or composed of other operators. Common composite
operators are SEQ, AND, OR and NOT [11]. The AND operator
matches, if all input event types occur; the SEQ operator         It is important to note that unlike traditional approaches we
requires all inputs in the specified order; the OR operator       consider to assign an operator to a set of nodes – leading to
matches, if at least one of the inputs occurs; and the NOT        placements with multiple sinks.
operator requires the absence of its input and is typically
defined in the context of another operator. An example            2.3 Problem Dimensions
for the latter is the query SEQ(A a, NOT(B b), C c), which          The problem of multi-sink operator placement needs to be
matches if no event of type B occurs between events of types      addressed in the light of various parameters that arise from
A and C. We write An as a shorthand for SEQ(A a1 , ..., A an ).   an application domain. Below, we provide an overview of this
   Consider the above case of autonomous transport robots. A      parameter space, along three dimensions: event generation,
monitoring query detects the following situation: An obstacle     query processing semantics, and network properties.
is reported (event type Ob) three times (a single observation
is not trustworthy due to workers passing the area). Then,         2.3.1   Event Generation
a request is issued to tow away the obstacle (Tow Req). A            Event Rates. To determine an optimal operator place-
robot capable of towing obstacles acknowledges the request        ment, knowledge about the rates of event generation is crucial.
(Tow Ack). However, within five minutes the obstacle is still     Intuitively, it is beneficial to host an operator where its in-
observed at the respective position. This situation hints at      put events are generated with high rates. Such knowledge
overloading of the robots capable of towing away obstacles        may be available at di↵erent granularities, in terms of a
and may be detected with the following query:                     local, global, or distributed rate per event type. Under a local
                                                                  rate, events of a type are generated with a di↵erent rate per
SEQ ( ( Ob o ) 3 , Tow Req tq , Tow Ack ta , Obst o ’ )
                                                                  node. An example from our case study are events of type
WHERE o . p o s i t i o n = o ’ . p o s i t i o n
AND o ’ . t i m e s t a m p ta . timestamp        5 min           BATTERY LOW generated by a robot, if it needs to charge
WITHIN 10 min                                                     its battery. The rates at which BATTERY LOW events are
                                                                  generated depends on the deployed battery and di↵ers for
                                                                  each robot. In contrast, events may be generated with a
2.2    Problem Statement                                          global rate, i.e., the same rate per node. An example are
   A query can be represented as operator tree (neglecting the    BEACON MESSAGE events emitted periodically by each robot
correlation predicates and time window). This is formalized       about its location. Under a distributed rate, only the total
as a tuple (O, ), where O is a set of operators with Op ⇢ O       rate of event generation in the network is known, but not the
being the operators defined by primitive event types, and         rates at individual nodes. Consider ACCEPT JOB MESSAGE
   : O ! 2O is a function that assigns input operators to an      events sent by the robots that win the bidding for transport
operator (i.e. its children in the tree). A workload Q is a set   jobs. While the total rate of these events is known, their
of queries.                                                       distribution among the wining robots is not.
   Queries are evaluated in an event-sourced network, which          Event Uniqueness. The uniqueness of events influences
is defined as a tuple (G, f ) where G is an undirected graph      the cost of operator placements: the local event traces of
G = (V, E) with V as a set of nodes and E as a set of             nodes may contain only distinct events or show some overlap
communication channels, and f : V ! 2U as a function that         (i.e., the same event may be generated by more than one
defines the types of events generated at a node. Here we only     node). In our case study, the latter is observed for TEMPERA-
consider the case that G is complete. Each node generates a       TURE DROP events: If temperature drops below a threshold,
local trace, an infinite sequence of events of the respective     all robots equipped with a thermometer report the situation
types that are totally ordered by their timestamps.               simultaneously. From an application point of view, there is
 N:
      AC      1                          3       CD                                                  tion, where low frequency events are pushed, while high
                                                              r = rA = rC = rD
                                                                                                     frequency events are pulled on demand.
                           2                                                                            Load Balancing. A simple strategy for load balancing
                      AC             Q1: AND(A3, C, D, A3)
                                                                                                     in DSP is to restrict the maximum number of operators that
                                                                                                     can be hosted per node. Yet, this number may be largely
             p                 unrest.   cont.                  p                unrest.    cont.
              1
                                 -
                                                                 2                                   detached from the actual computational load (CPU cycles)
      1                                      -        1   3   AND(A3, C)          2xr      2 x r/6
                  3
          AND(A , C, D, A )3
                                                              AND(A3)                                and networking load (number of received events). To achieve
 2    1   AND(A3, C)           2xr       2 x r/6      2   3   AND(A3, C)          2xr      2 x r/6   e↵ective load balancing, the types of operators have to be
          AND(A3)                                             AND(A3)                                taken into account. Stateful operators, e.g. SEQ, are particu-
                                 r           r            3         3        3
                                                                                    -        -
3     1   AND(C,D)                                            AND(A , C, D, A )                      larly demanding in terms of computational resources [19].
                                3r           8r/6                                  4r       4r/6        Constraints on Event Di↵usion. The communication
                                                                                                     within the network might be restricted, e.g. due to privacy or
Figure 2: Depending on the consumption policy dif-                                                   organizational reasons. Such restrictions can be defined on
ferent placements yield optimality.                                                                  the node level, prohibiting specific nodes to exchange events,
                                                                                                     or on the event level, prohibiting certain event types to be
no need to distinguish these reports, so that they are con-                                          shared across the network. As a consequence, some nodes
sidered as a single event that may be generated at multiple                                          might be not eligible to become host for an operator, which
nodes. Clearly, this influences the costs of a placement.                                            leads to a reduced space of possible placements.

2.3.2       Query Processing Semantics                                                               3.   RELATED WORK
   Consumption Policy. Semantics of streaming queries                                                   Operator placement has been investigated in the con-
are fine-tuned by a consumption policy that dictates how to                                          text of distributed stream processing [13–15], distributed
select events for query matching [3]. For instance, under an                                         CEP [9, 10, 18], and sensor networks [8, 16]. For non tree-
unrestricted policy, an event may be processed by multiple                                           structured queries, the problem is known to be NP-hard [7],
operators of the same query [2]. In contrast, a continuous                                           so that most existing approaches yield heuristic placement
policy allows each event to be processed by exactly one                                              algorithms. In [12] and [17], distributed stream processing
operator (i.e., the event is consumed by the operator).                                              and CEP applications were surveyed and it was concluded
   Consider Figure 2, which illustrates a network of three                                           that they di↵er in their employed system and query model,
nodes generating events of four types (a box next to a node                                          optimization goal and in whether the placement algorithm
shows the generated event types) with the same (local) rate                                          is centralized or decentralized. In all of the works mentioned
r and the query Q1 . Under an unrestricted policy, the query                                         above, the operator placement problem is formulated such
would match once a set of three A events, a C event, and a                                           that an operator is placed at exactly one node leading to
D event have been observed in the network. With a contin-                                            single-sink solutions, which are not suitable for our intent.
uous policy, a total of six A events is required for a match,                                           Distributed Stream Processing. In DSP, operators
though. The former policy leads to an event being sent from                                          are usually considered as black-boxes, so that none of the
its generating node to all nodes hosting operators for the                                           existing works considers operator semantics. In [14] and [15],
respective type. The latter policy, in turn, means that this is                                      placement algorithms are introduced to optimize bandwidth
not required. Figure 2 also illustrates how the the consump-                                         and delay. While di↵erent types of event generation can be
tion policy impacts the cost of an operator placement: It                                            employed in the proposed cost model, none of the factors
shows two placements, p1 and p2, with the query’s sink node                                          we discussed as network properties are addressed. Recently,
being placed at either node 1 or node 3, respectively. Each                                          Nardelli et al. [13] introduced several heuristics to compute
node optimizes query evaluation and sends partial matches                                            placements, yet considering solely single-sink solutions.
instead of individual events, e.g. in placement p1 , node 2                                             Distributed Complex Event Processing. In [9, 10,
sends matches for AND(A3 ,C) to node 1 instead of all events                                         18], (single-sink) placement algorithms that optimize network-
of type A and C. For an unrestricted policy, p1 yields minimal                                       ing costs are introduced. Chen et al. [9] employ a language
network costs, whereas p2 is optimal under a continuous                                              model similar to ours and support push-pull communica-
policy. Thus, the chosen policy impacts the rates with which                                         tion. However, neither load balancing strategies nor query
A events are sent and leads to di↵erent optimal placement.                                           semantics are taken into account. Cugola et al. [10] dis-
   Selectivity. The cost of operator placement is also influ-                                        cuss placement strategies for CEP queries written in TESLA.
enced by selectivity of an operator, i.e., the portion of events                                     While query semantics are incorporated, the approach ignores
processed by it that fulfill the correlations predicates (see                                        requirements in terms of load balancing. Starks et al. [18]
Section 2.1). The selectivity may be estimated based on the                                          investigate distributed CEP for mobile ad-hoc networks and
distribution of the events’ attribute values. As the selectivity                                     propose a decentralized algorithm. They argue that due to
of an operator governs the output rate of a node hosting the                                         the inherent dynamicity of such systems, placements need to
operator, it influences the costs of a placement.                                                    be recomputed frequently, which calls for optimizations of
                                                                                                     the placement algorithm itself in terms of networking costs.
2.3.3       Network Properties                                                                       Neither load balancing nor push-pull communication are
   Push-Pull Communication. Instead of requiring nodes                                               supported by the proposed placement mechanism, though.
to send events to other nodes for processing (known as push-                                            Sensor Networks. Srivastava et al. [16] introduce a
based communication), DSP may also exploit pull-based                                                placement algorithm that is optimal in bandwidth consump-
communication [4]. Then, operators pull their input events                                           tion and load balancing. However, their approach is only
from other nodes, which potentially reduces network costs.                                           applicable for operators resembling filters, which strongly
Both communication paradigms induce space for optimisa-                                              limits the query language and simplifies load balancing. In [8]
                         S
                                                                     processing. Moreover, we sketched a first algorithm to in-
                                                       O             corporate various dimensions of the problem. It yields an
                                                                     optimal placement with a sink for each query of a workload.
                                                                        As a next step, we plan to extend our approach to sup-
                                                                     port load balancing strategies, the push-pull rationale, and
             S@1       S@2       S@3
                                       O@1            O@2     O@3
                                                                     operator reuse between di↵erent queries of a workload. Fur-
                                                                     thermore, we intend to investigate how to decentralize the
                                                                     computation of a placement itself by relying on local infor-
  A1@1    A1@2     A1@3 A2@1    A2@2   A2@3    Q2 :    SEQ
                                                                     mation in the neighbourhood of a node, thereby avoiding the
                                                                     necessity of having global knowledge on the network. We
                                                AND1 AND2
                                                                     expect that decentralizing the placement algorithm favours
                                               Q3 :    OR            efficient recomputation of operator placements to react to
           A1                    A2                                  changes in the network. To cope with dynamicity and failures,
                                                       sink
                                                                     we will also explore notions of robustness of a placement.
Figure 3: Graphs for the queries Q2 (left) and
Q3 (right) for a network consisting of three nodes.                  6.    REFERENCES
                                                                      [1] proANT Transport Robots . http:
decentralized, adaptive placement are proposed for continu-               //www.insystems.de/en/produkte/proant-transport-roboter/.
ous queries, minimizing networking costs. Yet, the influence          [2] R. Adaikkalavan and S. Chakravarthy. Seamless event and data
of network properties on the placement is neglected.                      stream processing: Reconciling windows and consumption
                                                                          modes. In DASFAA, pages 341–356. Springer, 2011.
                                                                      [3] A. Adi and O. Etzion. Amit - the situation manager. VLDB J.,
                                                                          13(2):177–203, 2004.
4.   PRELIMINARY RESULTS                                              [4] M. Akdere, U. Çetintemel, and N. Tatbul. Plan-based complex
                                                                          event detection across distributed sources. VLDB, 1(1):66–77,
   As a first step towards an efficient multi-sink operator               2008.
placement strategy, we adapted the shortest tree algorithm            [5] A. Artikis, A. Margara, M. Ugarte, S. Vansummeren, and
introduced in [6]. The algorithm can be applied for a query               M. Weidlich. Complex event recognition languages: Tutorial. In
having a tree structure and produces an optimal placement                 DEBS, pages 7–10. ACM, 2017.
                                                                      [6] S. H. Bokhari. A shortest tree algorithm for optimal
for a query workload having a di↵erent sink for each query.               assignments across space and time in a distributed processor
The idea is to construct a graph such that its vertices reflect           system. IEEE TSE, (6):583–589, 1981.
all possible placements and the edge weights the respective           [7] V. Cardellini, V. Grassi, F. Lo Presti, and M. Nardelli. Optimal
costs. Using a dynamic programming approach we can ef-                    operator placement for distributed stream processing
                                                                          applications. In DEBS, pages 69–80. ACM, 2016.
ficiently compute a shortest path tree for the graph. The             [8] G. Chatzimilioudis, A. Cuzzocrea, D. Gunopulos, and
vertices of the shortest path tree yield an optimal place-                N. Mamoulis. A novel distributed framework for optimizing
ment for which costs are given by the sum of the tree’s edge              query routing trees in wireless sensor networks via optimal
                                                                          operator placement. JCSS, 79(3):349–368, 2013.
weights. Figure 3 illustrates such a graph for the query Q2
                                                                      [9] J. Chen, L. Ramaswamy, D. K. Lowenthal, and
and a network consisting of three nodes: The vertices are                 S. Kalyanaraman. Comet: Decentralized complex event
labelled with tuples of operators from O \ Op (i.e. we do                 detection in mobile delay tolerant networks. In MDM, pages
not consider primitive events as operators here) and node                 131–136. IEEE, 2012.
                                                                     [10] G. Cugola and A. Margara. Deployment strategies for
ids, e.g. the vertex S1 @1 reflects the placement of operator             distributed complex event processing. Computing,
S1 at node 1. The set of vertices describing all placement                95(2):129–156, 2013.
possibilities of one operator o is called the layer of o, e.g. the   [11] I. Flouris, N. Giatrakos, A. Deligiannakis, M. Garofalakis,
vertices {S1 @1, S1 @2, S1 @3} form the layer of the SEQ oper-            M. Kamp, and M. Mock. Issues in complex event processing:
                                                                          Status and prospects in the big data era. JSS, 127:217–236,
ator. The edges of the graph connect the nodes according to               2017.
the operator tree: If an operator o is a parent of an operator       [12] G. T. Lakshmanan, Y. Li, and R. Strom. Placement strategies
q in the query graph, then each node of the layer of o is                 for internet-scale data stream systems. IEEE Internet
connected to each node of the layer of q. The weight of an                Computing, 12(6):50–60, 2008.
                                                                     [13] M. Nardelli, V. Cardellini, V. Grassi, and F. L. PRESTI.
edge between two vertices A1 @1, S@2 reflects the event rates             Efficient operator placement for distributed data stream
that have to be exchanged in order to place SEQ at node 2                 processing applications. IEEE TPDS, 2019.
when AND1 is placed at node 1. The consumption policy,               [14] P. Pietzuch, J. Ledlie, J. Shneidman, M. Roussopoulos,
selectivity and event generation a↵ect the rates events are               M. Welsh, and M. Seltzer. Network-aware operator placement
                                                                          for stream-processing systems. In ICDE, pages 49–49. IEEE,
exchanged with and thus can be encoded in the edges weights               2006.
of the graph. Constrained event di↵usion is also supported           [15] S. Rizou, F. Durr, and K. Rothermel. Solving the
by the algorithm by adding a pruning step: if two nodes are               multi-operator placement problem in large-scale operator
                                                                          networks. In ICCCN, pages 1–6. IEEE, 2010.
not allowed to communicate the link weights between two
                                                                     [16] U. Srivastava, K. Munagala, and J. Widom. Operator
vertices referring to those nodes are set to infinite. For an             placement for in-network stream query processing. In PODS,
event type that is not to be shared across the network, a                 pages 250–258. ACM, 2005.
layer of its consuming operator contains only placements at          [17] F. Starks, V. Goebel, S. Kristiansen, and T. Plagemann.
                                                                          Mobile distributed complex event processing—ubi sumus? quo
nodes that generate the respective event type.                            vadimus? In Mobile Big Data, pages 147–180. Springer, 2018.
                                                                     [18] F. Starks and T. P. Plagemann. Operator placement for
                                                                          efficient distributed complex event processing in manets. In
5.   CONCLUSION AND NEXT STEPS                                            WiMob, pages 83–90. IEEE, 2015.
                                                                     [19] H. Zhang, Y. Diao, and N. Immerman. On complexity and
  We introduced the multi-sink operator placement problem,                optimization of expensive queries in complex event processing.
thereby paving the way for truly decentralized event stream               In SIGMOD, pages 217–228. ACM, 2014.