=Paper= {{Paper |id=Vol-2841/BMDA_5 |storemode=property |title=Online trajectory analysis with scalable event recognition |pdfUrl=https://ceur-ws.org/Vol-2841/BMDA_5.pdf |volume=Vol-2841 |authors=Emmanouil Ntoulias,Elias Alevizos,Alexander Artikis,Athanasios Koumparos |dblpUrl=https://dblp.org/rec/conf/edbt/NtouliasAAK21 }} ==Online trajectory analysis with scalable event recognition== https://ceur-ws.org/Vol-2841/BMDA_5.pdf
      Online trajectory analysis with scalable event recognition
                           Emmanouil Ntoulias                                                                   Elias Alevizos
                         NCSR “Demokritos”                                                National and Kapodistrian University of Athens
                    manosntoulias@iit.demokritos.gr                                                    NCSR “Demokritos”
                                                                                                         ilalev@di.uoa.gr

                             Alexander Artikis                                                          Athanasios Koumparos
                             University of Piraeus                                                      Vodafone Innovus
                             NCSR “Demokritos”                                             athanasios.koumparos@vodafoneinnovus.com
                              a.artikis@unipi.gr

ABSTRACT                                                                               with other ships nearby, coastal stations, or even satellites. Cargo
Moving object monitoring is becoming essential for companies                           ships of at least 300 gross tonnage and all passenger ships, re-
and organizations that need to manage thousands or even mil-                           gardless of size, are nowadays required to have AIS equipment
lions of commercial vehicles or vessels, detect dangerous situa-                       installed and regularly emit AIS messages while sailing at sea.
tions (e.g., collisions or malfunctions) and optimize their behavior.                  Currently, there are more than 500.000 vessels worldwide that
It is a task that must be executed in real-time, reporting any such                    can be tracked using AIS technology3 . It is crucial, both for au-
situations or opportunities as soon as they appear. Given the                          thorities and for maritime companies, to be able to track the
growing sizes of fleets worldwide, a monitoring system must                            behavior of ships at sea in order to avoid accidents and ensure
be highly efficient and scalable. It is becoming an increasingly                       that ships adhere to international regulations.
common requirement that such monitoring systems should be                                 Streams of transient data emitted from vehicles or ships must
able to automatically detect complex situations, possibly involv-                      be processed with minimal latency, if a monitoring system is to
ing multiple moving objects and requiring extensive background                         provide significant margins for action in case of critical situations.
knowledge. Building a monitoring system that is both expres-                           We therefore need to detect complex patterns of interest upon
sive and scalable is a significant challenge. Typically, the more                      these streams in an online and highly efficient manner that can
expressive a system is, the less flexible it becomes in terms of                       gracefully scale as the number of monitored entities increases.
its parallelization potential. We present a system that strikes a                      Besides kinematic data, it is also important to be able to take
balance between expressiveness and scalability. Our proposed                           into account static (or “almost” static, with respect to the rate
system employs a formalism that allows analysts to define com-                         of the streaming data), background knowledge, such as weather
plex patterns in a user-friendly manner while maintaining un-                          data, point of interest (POI) information (like gas stations, ports,
ambiguous semantics and avoiding ad hoc constructs. At the                             parking lots, police departments, etc. [21], NATURA areas where
same time, depending on the problem at hand, it can employ                             ships are not allowed to sail, etc. This enhanced data stream
different parallelization strategies in order to address the issue                     produces valuable opportunities for the detection of complex
of scalability. Our experimental results show that our system                          events. One can identify certain routes a vehicle or a ship is
can detect complex patterns over moving entities with minimal                          taking, malfunctions in the device installed, in the GPS tracker or
latency, even when the load on our system surpasses what is to                         the AIS transponder, cases of illegal shipping in protected areas
be realistically expected in real-world scenarios.                                     or possible collisions between ships moving dangerously close
                                                                                       to each other, to name but a few of the possible patterns which
                                                                                       could be of interest to analysts.
1    INTRODUCTION                                                                         As a solution to the problem of monitoring of moving objects,
Commercial vehicle fleets constitute a major part of Europe’s                          we present a Complex Event Processing system that aims to
economy. There were approximately 37 million commercial vehi-                          improve the operating efficiency of a commercial fleet. It oper-
cles in the European Union in 20151 and this number is growing                         ates online with enriched data in a streaming environment. Our
every year with an increasing rate. Devices emitting spatial and                       contributions are the following:
operational information are installed on commercial vehicles.                               • We present a Complex Event Processing (CEP) system
This information helps fleet management applications improve                                  based on symbolic automata which allows analysts to
the management and planning of transportation services [31].                                  define complex patterns in a user-friendly language. Our
   Consider another case of monitoring of moving objects, equally                             proposed language has formal semantics, while being able
important from an economic and environmental point of view:                                   to also take into account background knowledge.
maritime monitoring systems. Such systems have been attracting                              • We define a series of realistic complex patterns that iden-
considerable attention for economic as well as environmental                                  tify routes and malfunctions of vehicles and detect critical
reasons [24, 29, 30]. The Automatic Identification System (AIS) 2                             situations for vessels at sea.
is used to track vessels at sea in real-time through data exchange                          • We present and compare various implementations of par-
1 http://www.acea.be/statistics/article/vehicles-in-use-europe-2017                           allel processing techniques and discuss their applicability.
2 http://www.imo.org/OurWork/Safety/Navigation/Pages/AIS.aspx                               • We test our approach using large, real-world, heteroge-
                                                                                              neous data streams from diverse application domains,
© 2021 Copyright for this paper by its author(s). Published in the Workshop Proceed-
ings of the EDBT/ICDT 2021 Joint Conference (March 23—26, 2021, Nicosia, Cyprus)
                                                                                              showing that we can achieve real-time performance even
on CEUR-WS.org. Use permitted under Creative Commons License Attribution 4.0
International (CC BY 4.0)
                                                                                       3 https://www.vesselfinder.com
       in cases of significantly increased load, beyond the current     not always obvious how an input stream should be partitioned,
       demand levels.                                                   while avoiding data replication.

   The remainder of this paper is organized as follows. Section 2       3    AUTOMATA-BASED EVENT
discusses related work, while Section 3 describes our CEP engine.            RECOGNITION
In Section 4 the distributed version of our engine is presented.
                                                                        We begin by first presenting our framework for CEP. It is based
Section 5 summarizes the datasets of vehicle and vessel traces
                                                                        on Wayeb, a Complex Event Processing and Forecasting engine
and defines the recognition patterns. It also presents our empiri-
                                                                        which employs symbolic automata as its computational model
cal evaluation. Finally, we conclude the paper in section 6 and
                                                                        [10, 11]. The rationale behind our choice of Wayeb is that, con-
describe our future work.
                                                                        trary to other automata-based CEP engines, it has clear, compo-
                                                                        sitional semantics due to the fact that symbolic automata have
2   RELATED WORK                                                        nice closure properties [17]. At the same time, it is expressive
Complex event recognition systems accept as input a stream of           enough to support most of the common CEP operators [19], while
time-stamped, “simple, derived events” (SDEs). These SDEs are           remaining amenable to the standard parallelization solutions. In
the result of applying a simple transformation to some other            this paper, we extend Wayeb’s language in order to support more
event (e.g., a measurement from a sensor). By processing them, a        expressive patterns.
CEP engine can recognize complex events (CEs), i.e. collections            Symbolic automata constitute a variation of classical automata,
of SDEs satisfying some pattern. There are multiple CEP systems         with the main difference being that their transitions, instead of
proposed in the literature during the last 15 years, falling under      being labeled with a symbol from an alphabet, are equipped with
various classes [16, 19]. Automata-based systems constitute the         formulas from Boolean algebra [17]. A symbolic automaton con-
most common category. They compile patterns (definitions of             sumes strings and, after every new element, applies the predicates
complex events) into finite state automata, which are then used to      of its current state’s outgoing transitions to that element. If a
consume streams of simple events and report matches whenever            predicate evaluates to TRUE then the corresponding transition is
an automaton reaches a final state. Examples of such systems            triggered and the automaton moves to that transition’s target
may be found in [5, 11, 18, 28, 32]. Another important class of         state. A Boolean algebra is defined as follows:
CEP systems are the logic-based ones. In this case, patterns are           Definition 3.1 (Effective Boolean algebra [17]). A Boolean al-
defined as rules, with a head and a body defining the conditions        gebra is a tuple (D, Ψ, ⟦_⟧, ⊥, ⊤, ∨, ∧, ¬) where D is a set
which, if satisfied, lead to the detection of a CE. A typical example   of domain elements; Ψ is a set of predicates closed under the
of a logic-based system may be found in [12]. Finally, there are        Boolean connectives; ⊥, ⊤ ∈ Ψ; the component ⟦_⟧ : Ψ → 2 D
some tree-based systems, such as [22, 23], which are attractive         is a denotation function such that ⟦⊥⟧ = ∅, ⟦⊤⟧ = D and
because they are amenable to various optimization techniques.           ∀ϕ,ψ ∈ Ψ: a) ⟦ϕ ∨ ψ ⟧ = ⟦ϕ⟧ ∪ ⟦ψ ⟧; b) ⟦ϕ ∧ ψ ⟧ = ⟦ϕ⟧ ∩ ⟦ψ ⟧; and
    For efficient processing on big data streams, distributed archi-    c) ⟦¬ϕ⟧ = D \ ⟦ϕ⟧.
tectures need to be employed [19]. Big data platforms, such as
Apache Spark and Storm, have been used to embed CEP engines                Elements of D are called characters and finite sequences of
into their operators. Both platforms have incorporated Sidhi [7, 9]     characters are called strings. A set of strings L constructed from
and Esper [3, 4] as their embedded engines. Flink [1], on the other     elements of D (L ⊆ D ∗ , where ∗ denotes Kleene-star) is called
hand, provides support for CEP with the FlinkCEP built-in library       a language over D.
[5]. Besides using these Big Data platforms, numerous other par-           Wayeb uses symbolic regular expressions to define patterns
allelization techniques have been proposed in the literature that       and to represent a class of languages over D. Wayeb’s standard
can achieve a more fine-grained control over how the processing         operators are those of the classical regular expressions, i.e., con-
load is distributed among workers. Pattern-based parallelization        catenation, disjunction and Kleene-star. We extend Wayeb to
is the most obvious solution, where the patterns are distributed        include various extra CEP operators: that of negation and those
among the processing units [15]. One disadvantage of this paral-        of different selection policies (see [19] for a discussion of selection
lelization scheme is that events have to be replicated to multiple      policies). Symbolic regular expressions are defined as follows:
processing units, since a new input event may need to be con-              Definition 3.2 (Symbolic regular expression). A Wayeb symbolic
sumed by more than one pattern. Moreover, the parallelization           regular expression (SRE) over a Boolean algebra (D, Ψ, ⟦_⟧, ⊥,
level is necessarily limited by the number of patterns (for a single    ⊤, ∨, ∧, ¬) is recursively defined as follows:
pattern, this method offers no benefits). Operator-based paral-             • If ψ ∈ Ψ, then R := ψ is a symbolic regular expression,
lelization constitutes another approach, where the CEP operators               with L(ψ ) = ⟦ψ ⟧, i.e., the language of ψ is the subset of
are assigned to different processing units [13, 28]. This allows for           D for which ψ evaluates to TRUE;
multi-pattern optimizations and avoids the data replication issue           • Disjunction / Union: If R 1 and R 2 are symbolic regular
of the previous technique. On the other hand, the paralleliza-                 expressions, then R := R 1 + R 2 is also a symbolic regular
tion level is again limited, this time by the number of operators              expression, with L(R) = L(R 1 ) ∪ L(R 2 );
present in the pattern (which is closely related to the number              • Concatenation / Sequence: If R 1 and R 2 are symbolic reg-
of automaton states in automata-based CEP systems). Finally,                   ular expressions, then R := R 1 · R 2 is also a symbolic
in data-parallelization schemes, events are split among multiple               regular expression, with L(R) = L(R 1 ) · L(R 2 ), where ·
instances of the same pattern [20]. For example, a pattern trying              denotes concatenation. L(R) is then the set of all strings
to detect violations of speed limits must be applied to all the                constructed from concatenating each element of L(R 1 )
monitored vehicles and thus the input stream may be partitioned                with each element of L(R 2 );
according to the id of the vehicles. The advantage of this method           • Iteration / Kleene-star: If R is a symbolic regular expres-
is that it can scale well with the input event rate. It is, however,           sion, then R ′ := R ∗ is a symbolic regular expression, with
        L(R ∗ ) = (L(R))∗ , where L ∗ =
                                                Ð
                                                      L i and L i is the con-      Table 1: An example stream composed of six events. Each
                                               i ≥0                                event has a vehicle identifier, a value for that vehicle’s
       catenation of L with itself i times.                                        speed and a timestamp.
     • Bounded iteration: If R is a symbolic regular expression,
       then R ′ := R x + is a symbolic regular expression, with
                    x t imes                                                             vehicle id       78986          78986     78986         78986       78986    ...
       Rx + = R · · · · · R · R∗ .
                                                                                           speed           85             93         99           104         111     ...
     • Negation / complement: If R is a symbolic regular expres-
       sion, then R ′ := !R is a symbolic regular expression, with                      timestamp           1              2           3           4              5   ...
       L(R ′ ) = (L(R))c .
     • skip-till-any-match selection policy: If R 1 , R 2 , · · · , Rn are sym-                             >
       bolic regular expressions, then R ′ := #(R 1 , R 2 , · · · , Rn ) is
       a symbolic regular expression, with R ′ := R 1 · ⊤∗ · R 2 ·                                                speed > 100                  speed > 100
                                                                                              start         0                              1                      2
       ⊤∗ · · · ⊤∗ · Rn .
     • skip-till-next-match selection policy: If R 1 , R 2 , · · · , Rn are sym-
       bolic regular expressions, then R ′ := @(R 1 , R 2 , · · · , Rn ) is        Figure 1: Streaming symbolic automaton created from the
       a symbolic regular expression, with R ′ := R 1 ·!(⊤∗ · R 2 ·                expression R := (speed > 100) · (speed > 100).
       ⊤∗ ) · R 2 · · ·!(⊤∗ · Rn · ⊤∗ ) · Rn .
                                                                                                                                 CEP Engine
   A Wayeb expression without a selection policy implicitly fol-
                                                                                                                                 CEP Engine
lows the strict-contiguity policy, i.e., the SDEs involved in a match                                           event1                                 pattern1
                                                                                            flinkSource         event2                                 pattern2
of a pattern should occur contiguously in the input stream. The                                                   ...                ...                  ...
other two selection policies relax the strict requirement of con-                                               eventN
                                                                                                                                 CEP Engine
                                                                                                                                                       patternN

tiguity (see [19] for details). Note that all these operators, even
                                                                                                      (a) Pattern-based parallelization.
those of selection policies, may be arbitrarily used and nested
                                                                                                                                 CEP Engine
in an expression, without any limitations. This is in contrast to
                                                                                                                                 CEP Engine
other CEP systems where nested operations may be prohibited.                                                    event1                                 pattern1
   Wayeb patterns are defined as symbolic regular expressions                               flinkSource         event2
                                                                                                                  ...                ...
                                                                                                                                                       pattern2
                                                                                                                                                          ...
which are subsequently compiled into symbolic automata. The                                                     eventN
                                                                                                                                 CEP Engine
                                                                                                                                                       patternN
definition for a symbolic automaton is the following:
                                                                                                      (b) Partition-based parallelization.
   Definition 3.3 (Symbolic finite automaton [17]). A symbolic                              flinkSource         events           CEP Engine
finite automaton (SFA) is a tuple M =(A, Q, qs , F , ∆), where A is                         flinkSource         events           CEP Engine
                                                                                                                                                       pattern1

an effective Boolean algebra; Q is a finite set of states; qs ∈ Q is                                               ...
                                                                                                                                                       pattern2
                                                                                                                                                          ...
the initial state; Q f ⊆ Q is the set of final states; ∆ ⊆ Q × ΨA ×Q                        flinkSource         events           CEP Engine
                                                                                                                                                       patternN

is a finite set of transitions.
                                                                                            (c) Special case of Partition-Based paralleliza-
      A string w = a 1a 2 · · · ak is accepted by a SFA M iff, for 1 ≤                      tion when one-to-one relation between sources
                                               ai                                           and CEP engines exists.
i ≤ k, there exist transitions qi−1 → qi such that q 0 = qs and
               f
qk ∈ Q . The set of strings accepted by M is the language of M,
denoted by L(M). It can be proven that every symbolic regular                             Figure 2: Parallel schemes used with Wayeb.
expression can be translated to an equivalent (i.e., with the same
language) symbolic automaton [17].
                                                                                   The prefix ⊤∗ lets us skip any number of events from the stream
      We are now in a position to precisely define the meaning of
                                                                                   and start recognition at any index m, 1 ≤ m ≤ k.
“complex events”. Input events come in the form of tuples with
                                                                                      As an example, consider the domain of vehicle monitoring.
both numerical and categorical values. These tuples constitute the
                                                                                   An analyst could use the Wayeb language to define the pattern
set of domain elements D. A stream S is an infinite sequence S =
                                                                                   R := (speed > 100) · (speed > 100) in order to detect speed viola-
t 1 , t 2 , · · · , where each ti is a tuple (ti ∈ D). Our goal is to report
                                                                                   tions on roads where the maximum allowed speed is 100 km/h.
the indices i at which a CE is detected. If S 1..k = · · · , tk −1 , tk is
                                                                                   This pattern detects two consecutive events where the speed
the prefix of S up to the index k, we say that an instance of a SRE
                                                                                   exceeds the threshold in order to avoid cases where a vehicle
R is detected at k iff there exists a suffix Sm..k of S 1..k such that
                                                                                   momentarily exceeds the threshold, possibly due to some mea-
Sm..k ∈ L(R). If we attempted to detect CEs, as defined above, by
                                                                                   surement error. This pattern would be compiled to the (non-
directly compiling an expression R to an automaton, we would
                                                                                   deterministic) automaton of Figure 1. Table 1 shows an example
fail. Consider, for example, the (classical) regular expression R :=
                                                                                   stream processed by this automaton. For the first three input
a · b and the (classical) stream/string S = a, b, c, a, b, c. If we
                                                                                   events, the automaton would remain in its start state, state 0.
compile R to a (classical) automaton and feed S to it, then the
                                                                                   After the fourth event, it would move to state 1 and after the fifth
automaton would reach its final state after reading the second
                                                                                   event it would reach its final state, state 2. We would thus say
element t 2 of the string. However, it would then never reach its
                                                                                   that a complex event R was detected at timestamp = 5.
final state again. We would like our automaton to reach its final
state every time it encounters a, b as a suffix, e.g., again after
reading t 5 of S. We can achieve this with a simple trick. Instead of
                                                                                   4   SCALABLE EVENT RECOGNITION OVER
using R, we first convert it to Rs = ⊤∗ · R. Using Rs we can detect                    MULTIPLE TRAJECTORIES
CEs of R while consuming a stream S, since a stream segment                        We now discuss how various parallelization schemes may be ap-
Sm..k is recognized by R iff the prefix S 1..k is recognized by Rs .               plied to our CEP engine. For this purpose, we leverage a popular
streaming platform, Apache Flink [1, 14]. Flink is a distributed        5     EXPERIMENTAL EVALUATION
processing engine for stateful computations over unbounded and          We present an extensive experimental evaluation of our parallel
bounded data streams. It is designed to run in cluster environ-         CEP engine on two datasets containing real-world trajectories
ments and perform computations at in-memory speed. In this              of moving objects. The first dataset comes from the domain of
paper, we focus on two parallelization techniques: pattern-based        fleet management for vehicles moving on roads and emitting
and partition-based parallelization [19]. We currently exclude          information about their status. The second dataset consists of
state-based parallelization, since, as explained above, its paral-      vessel trajectories from ships sailing at sea. In both cases, our
lelization level is limited by the number of automaton states,          goal is to simultaneously monitor thousands of moving objects
which is typically quite low (it is often a single-digit number).       and detect interesting (or even critical) behavioral patterns in
We do intend, however, to examine in the future how it could            real-time, as defined by domain experts. All experiments were
be combined with pattern- or partition-based parallelization to         conducted on a server with 24 processors. Each processor is an
provide them with an extra performance boost.                           Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz. The server has 252
   In pattern-based parallelization, each available CEP engine          GB of RAM. The source code for Wayeb may be found in the
receives a unique subset of the patterns and performs recognition       following repository: https://github.com/ElAlev/Wayeb.
for these patterns only, with these subsets being (almost) equal
in size (see Figure 2a). On the other hand, the stream is broadcast     5.1     Fleet Management
to all parallel instances of any downstream operators. This is a
                                                                        Efficient fleet management is essential for transportation and
significant (yet unavoidable) drawback of pattern-based paral-
                                                                        logistics companies. We show how our proposed solution can
lelization, since each worker has a subset of the patterns while
                                                                        effectively help in this task. With the help of experts, we define a
each pattern may need to process the whole stream. Note that
                                                                        set of patterns to be detected on real-time streams of trajectories
each blue rectangle in Figure 2a represents a single thread. This
                                                                        and show that our engine can detect these patterns with an
means that in this architecture we have one thread for the source
                                                                        efficiency that is orders of magnitude better than real-time.
plus as many threads as the parallelism of the CEP operator.
   In partition-based parallelization the opposite happens. Every          5.1.1 Dataset Description. The dataset is provided by Voda-
CEP engine is initialized with all patterns, but the stream is not      fone Innovus4 , our partner in the Track & Know project, which
broadcast (see Figure 2b). A partitioning function is used to decide    offers fleet management services. It contains approximately 270M
where each new input event should be forwarded. This function           records (243GB). It covers a period of 5 months, from June 30,
takes as input any attribute of the event (we use the id of a vehicle   2018 11:00:00 PM to November 30, 2018 11:59:59 PM. The initial
or vessel) and, by performing hashing, it outputs which parallel        source emitting the data is composed of GPS (Global Positioning
instance of the next operator the event will go to. As with pattern-    System) traces of moving vehicles. The data also includes speed
based parallelization, we have one thread for the source plus as        information provided by an installed accelerometer and informa-
many threads as the parallelism of the CEP operator.                    tion regarding the level of fuel in a vehicle’s tank measured by a
   Besides Flink, we also use the Apache Kafka messaging plat-          fuel sensor. It is also enriched with weather and point-of-interest
form to connect our stream sources to Wayeb instances [2]. Kafka        (POI) information (e.g., if a vehicle is close to a gas station, a
provides various ways to consume streams. So far, we have fo-           university, a school etc), as described in [31]. Duration, accela-
cused on linear streams, i.e., events are assumed to be totally         ration and distance are some extra attributes that are calculated
ordered and arrive at our system sequentially one after another.        on the fly as they enter our system by storing information from
With Kafka, however, there is the option of using parallel input        previous events.
streams. A Kafka input topic can have multiple partitions and
                                                                           5.1.2 Pattern Definitions. The first pattern we have defined
each partition can be consumed in parallel by a different con-
                                                                        concerns vehicle routes. A route is the basic element of vehicle
sumer. In this case the input stream is already partitioned on
                                                                        management and aggregates data between the start and the end
some attribute of the events.
                                                                        point of a vehicle’s motion cycle. A motion cycle is based on
   Through this Kafka functionality, a variant of partition-based
                                                                        the engine status. Each vehicle route must start and end with
parallelization becomes possible, where both the input source
                                                                        an “engine-off” message, i.e., a message whose engine status
and the recognition engine work in parallel (see Figure 2c). If
                                                                        attribute is “off”. According to Vodafone Innovus, there are 12
the parallelism of the input source (i.e., the number of partitions
                                                                        patterns that describe the most frequent routes. These 12 route
of the topic is the same as that of the recognition operator (i.e.,
                                                                        patterns can be expressed with a single Wayeb pattern as follows:
number of CEP engines), then we can simply attach each source
instance to a CEP engine instance without further re-partitioning          Definition 5.1. A route pattern for a vehicle is defined as the
on our end. When, however, the parallelisms are different, fur-         following sequence: emitting “engine-off” messages for at least
ther re-partitioning is performed by partitioning each source the       30 minutes, emitting at least one “moving” message and again
same way we partitioned the single threaded source. Unlike the          emitting “engine-off” messages for at least 30 minutes:
previous architectures, each pair of a source and a CEP operator
                                                                                    Route := (Engine = Off ∧ Duration > 30)·
parallel instances belong in a single thread. This is attributed to
operator chaining [8], a Flink mechanism that chains operators                                  (Engine = Moving)+ ·
of the same parallelism in a single thread for better performance.                              (Engine = Off ∧ Duration > 30)
   Similarly to partition, we can have multiple sources for pattern
based parallelism as well. Messages in Kafka are still partitioned         Unfortunately, the expected data flow can be corrupted due to a
by a desired attribute, albeit we always have to perform the            variety of reasons. These reasons include bad connection during
broadcasting step for each source. Hence, we don’t have a variant       the device installation or after the vehicle has been serviced,
of pattern parallelization, rather we use it only as a way to process   movement of the satellites, hardware malfunctions or, simply,
streams of greater input rate.                                          4 https://www.vodafoneinnovus.com/
just an issue with the GPS. The result of these reasons is reflected
in the data. For example, coordinates may change even though
the vehicle is not moving or the vehicle may be moving but the
coordinates remain the same. It is also often the case that the
engine status is incorrect (e.g., parked messages are emitted even
though engine is on, vehicle is moving yet engine status is not
moving etc). These issues are important and need to be detected.
We have summarized those issues in a number of patterns, defined
as follows:

  Definition 5.2. ParkedMovingSwing. Engine status swings be-
tween “parked” and “moving” during consecutive events.
                                                                                                  (a) 16 patterns

ParkedMovingSwing := (Engine = Parked) · (Engine = Moving)·
                         (Engine = Parked) · (Engine = Moving)

   Definition 5.3. IdleParkedSwing. Engine status swings between
“idle” and “parked” during consecutive events.

    IdleParkedSwing := (Engine = Idle) · (Engine = Parked)·
                         (Engine = Idle) · (Engine = Parked)

   Definition 5.4. SpeedSwing. Speed swings between 0km/h and
greater than 50km/h during consecutive events.
                                                                                                  (b) 48 patterns

SpeedSwing := (Speed > 50)·(Speed = 0)·(Speed > 50)·(Speed = 0)        Figure 3: Recognition times of different parallelization
                                                                       techniques for different workloads. The horizontal axis
   Definition 5.5. MovingWithZeroSpeed. Engine status is “mov-         represents the number of worker threads.
ing”, distance traveled is greater than 30m, yet speed is 0km/h
for more than 3 consecutive messages.
                                                                           5.1.3 Recognition Results. Figure 3a showcases recognition
MWZS := (Engine = Moving ∧ Speed = 0 ∧ Distance > 30)3+                times for our various parallelization techniques: pattern-based,
                                                                       partition-based and partition-based with one source per engine.
    Definition 5.6. MovingWithBadSignal. Vehicle is accelerating       We also show results for the non-parallel version. We have du-
and distance traveled is greater than 30m yet there are no satel-      plicated some of the patterns defined previously to simulate a
lites tracking the vehicle for more than 3 consecutive messages.       greater workload of 16 patterns. We have repeated the experiment
                                                                       for 1, 2, 4 ,8 and 16 cores. Compared to the original single-core
 MWBS := (Accelaration ∧ Satellites = 0 ∧ Distance > 30)3+             version, all three parallelization techniques exhibit speed-ups.
                                                                       For partition- and pattern-based parallelization, however, there
   In addition to the above issues, possibly related to malfunc-       seems to be an upper limit on the number of cores it is most
tions, experts are also interested in the following patterns:          efficient to use. For pattern-based parallelization there is a signif-
                                                                       icant raise in time after 4 cores, while for partition-based there is
  Definition 5.7. Possible Theft. Engine status is parked, speed is    no improvement after 2 cores. The reason is that the single source
0km/h and distance traveled is greater than 30m for more than 3        acts as a bottleneck. For partition-based parallelization we have
consecutive messages.                                                  one thread (the partitioner) deciding in which core each event
                                                                       will be forwarded, while for pattern-based events are broadcast
         PossibleTheft := (Engine = Parked ∧                           to all cores, again in a single threaded manner. This explanation
                                                                       is supported by the smooth decrease in time when a parallel
                          Speed = 0 ∧ Distance > 30)3+
                                                                       source is used for partition-based parallelization. While it starts
   Definition 5.8. Dangerous Driving. There is ice on the road and     off worse than pattern- and partition-based, it exhibits the best
the vehicle is moving above a specific speed limit for at least 2      results for 16 cores.
consecutive messages.                                                      To further support our claim above, we conducted a second
                                                                       experiment with a larger load, as shown in Figure 3b. 48 pat-
                                                                       terns were used this time (replicated in similar fashion as before)
  DangerousDriving := (IceExists = TRUE ∧ Speed > vlimit )2+           without any other changes. Indeed, with every recognition node
                                                                       having more work to do, execution time becomes less dependent
   Definition 5.9. Refuel Opportunity. Vehicle is close to a gas
                                                                       on partitioning/broadcasting and more dependent on the actual
station and the fuel in the tank is less than 50% for at least 2
                                                                       recognition. Hence, speed-ups are now visible even for higher
consecutive messages.
                                                                       number of cores. As suspected, for parallel sources the results are
                                                                       not affected. Partition-based parallelization with parallel sources
RefuelOpp := (CloseToGasStation = TRUE ∧ FuelLevel < 0.5)2+            is slower when few threads are used (e.g., for 2 threads). This is
                                                                    par = 1      Results for Partition-based distribution are presented in Fig-
                                                                    par = 2
                  0.8                                               par = 3   ure 4a. The backpressure ratio is plotted against the number of
                                                                    par = 4   threads used for recognition. Generally, the more threads used
                                                                    par = 5
   Backpressure




                  0.6
                                                                    par = 6   the faster Wayeb can process events and hence less pressure
                                                                              is noted. Each dashed line represents a source with parallelism
                  0.4
                                                                              varying from 1 to 6. The event rate of a parallel source is mea-
                  0.2                                                         sured by executing an experiment with 0% backpressure (i.e., it
                                                                              will not slow down due to pressure) and summing the event rate
                   0                                                          of each parallel instance presented by the flink dashboard (e.g.,
                        0           5         10           15                 for 2 parallel instances of 120K events/second (e/s) for each one
                                     CER parallelism                          the overall sum is 120 + 120 = 240K (e/s)). Although, the greater
                                                                              the parallelism of the source the larger the event rate, it is not a
                                       (a) Partition-based.
                                                                              multiplier as for 1, 2, 3, 4, 5 and 6 sources the rate becomes 130K,
                                                                     par=1
                                                                     par=2    240K, 350K, 430K, 440K and 490K e/s respectively. The workload
                  0.8                                                par=3    in this experiment tries to emulate a real scenario and hence the
                                                                     par=4
                                                                     par=5    route and the 5 malfunction patterns are used (i.e., they are not
   Backpressure




                  0.6                                                par=6    replicated). The results show that Wayeb can effectively process
                                                                              streams of at least 490K e/s (black line) for this workload as 0
                  0.4                                                         pressure is exhibited when 16 workers are used. As it was stated
                                                                              before, the duration of the dataset is 5 months which translates
                  0.2                                                         to roughly 13M seconds. Since the total number of input events is
                                                                              270M, the average event input rate is 270M/13M ≈ 20 e/s. Com-
                            1   2      3       4       5        6             paring the pattern throughput rate (490K e/s) with the event input
                                     CER parallelism                          rate clearly exhibits a performance that is 4 orders of magnitude
                                        (b) Pattern-based.                    better than the real-time requirements of this use case.
                                                                                 We perform a similar experiment for pattern-based paralleliza-
                                                                              tion. This time the number of threads used for recognition varies
Figure 4: Backpressure experiments for fleet management.
                                                                              between 1, 2, 3 and 6 as there are only 6 patterns - a handicap
The horizontial axis expresses the number of recognition
                                                                              of this technique discussed earlier. Figure 4b showcases the re-
threads. Workload is 6 patterns. Each dashed line repre-
                                                                              sults of our experiment. Unfortunately, even with 6 recognition
sents a different parallelism of the source stream.
                                                                              threads and a single threaded source (purple dashed line) there is
                                                                              about 13% backpressure. Due to this, all sources are being slowed
                                                                              down to 110K e/s regardless of their parallelism. There is a drop
because the other two techniques have an extra thread handling                in pressure the more recognition threads are used due to the
the whole stream source and the work is in fact split between                 better distribution of the patterns. However, it is not significant
this one source thread and two others performing recognition.                 as the events are also multiplied as many times as the number of
We thus have 3 threads performing similar volumes of work.                    these threads and add extra pressure. Eventually more space is
When parallel sources are used, however, each parallel source is              requested from the output network buffers of the source.
chained to a Wayeb engine in a single thread and we thus have 2
threads doing more work. Each thread handles half the source                  5.2    Maritime Monitoring
and performs recognition on half the stream.
                                                                              We now present experimental results on another real-world
    In order to evaluate recognition speed independently from
                                                                              dataset. This dataset contains trajectories of vessels sailing at
source speed we had to turn operator chaining off as we can’t
                                                                              sea. We have defined a set of patterns that are similar to the
measure them separately when they belong in the same thread
                                                                              ones presented in [24, 26], which have been constructed with the
(i.e., in the case of one source per CEP engine). In addition, we
                                                                              help of domain experts. We demonstrate the effectiveness of our
leveraged parallel sources to achieve input streams of higher
                                                                              system which is capable of efficiently processing a dataset that
input rate. The goal here is to determine if our system can process
                                                                              contains trajectories from ≈ 5K vessels and covers a period of 6
input events faster than the source produces them. Flink offers a
                                                                              months in less than one hour.
metric for this purpose, called backpressure [6]. Backpressure is
judged on the availability of output buffers. Assuming that some                 5.2.1 Dataset Description. A public dataset of 18M position
task A sends events to some task B, if there is no output buffer              signals from 5K vessels sailing in the Atlantic Ocean around the
available for task A, we say that task B is back pressuring task              port of Brest, France, between October 1st 2015 and 31st March
A. In our case A, is the source operator and B is the operator                2016 has been utilized [27]. A derivative dataset has been released
with the CEP Engine. 100 samples (each sample checks if there is              in [25], containing a compressed version of the original dataset
any output buffer available) are triggered every 50ms in order to             (4.5M signals), as decribed in [24]. Each trajectory in this dataset
measure backpressure. The resulting ratio notifies us how many                contains only the so-called critical points of the original trajec-
of these samples were indicating back pressure, e.g. 0.6 indicates            tory, i.e., points that indicate a significant change in a vessel’s
that 60 in 100 were stuck requesting buffers from the network                 behavior (e.g., a change in speed or heading) and from which the
stack. According to the documentation [6] a ratio between 0                   original trajectory can be faithfully reconstructed. We processed
and 0.1 is normal. 0.1 to 0.5 is considered to be low and anything            these compressed trajectories in order to determine the proximity
above 0.5 is high. Note that low and high pressure will slow down             of vessels to various areas and locations of interest, such as ports,
the source to match the throughput of the pressuring operator.                fishing areas, protected NATURA areas, the coastline, etc.
                                                                                                         1
   5.2.2 Pattern Definitions. We now present a detailed descrip-                                                                                                     par = 1
                                                                                                                                                                     par = 2
tion of the maritime patterns that we implemented, assuming                                             0.8                                                          par = 3
that the input events contain the information described above.                                                                                                       par = 4
                                                                                                                                                                     par = 5




                                                                                         Backpressure
                                                                                                        0.6
  Definition 5.10. High Speed Near Coast: Vessel is within 300                                                                                                       par = 6
meters from the coast and is sailing with speed greater than 5                                          0.4
knots for at least one message.
                                                                                                        0.2
        HSNC := (IsNear(Coast) = TRUE ∧ Speed > 5)+
                                                                                                         0
   Definition 5.11. Anchored: Vessel is inside an anchorage area                                                0             5         10            15
or near a port and is sailing with speed less than 0.5 knots for at                                                            CER parallelism
least three messages.
                                                                                   (a) Partition-based backpressure for a workload of 6 patterns.
        Anchored := ((IsNear(Port) = TRUE ∨                                        Each dashed line represents a different parallelism of the
                                                                                   source stream. Event rate is capped at 180K e/s due to back-
                       WithinArea(Anchorage) = TRUE) ∧
                                                                                   pressure.
                       speed < 0.5)3+                                                                       1
                                                                                                                                                                      par=1
                                                                                                                                                                      par=2
   Definition 5.12. Drifting: There is a difference between heading                                     0.8                                                           par=3
and actual course over ground greater than 30 degrees while the                                                                                                       par=4
                                                                                                                                                                      par=5



                                                                                         Backpressure
vessel is sailing with at least 0.5 knots for at least three messages.                                  0.6                                                           par=6

      Drifting := (|Heading − Cog| > 30 ∧ Speed > 0.5)3+                                                0.4

   Definition 5.13. Trawling: A vessel is inside a fishing area sail-
                                                                                                        0.2
ing with speed between 1 and 9 knots for at least three messages.
In addition, it must be a fishing vessel.
                                                                                                            0
                                                                                                                    1     2      3       4        5        6
          Trawling := (VesselType = Fishing ∧                                                                                  CER parallelism
                         WithinArea(Fishing) = TRUE ∧
                                                                                   (b) Pattern-based backpressure for a workload of 6 patterns.
                         speed > 1.0 ∧ speed < 9.0)3+                              Each dashed line represents a different parallelism of the
                                                                                   source stream. Event rate is capped at 90K e/s due to back-
   Definition 5.14. Search and Rescue: A SAR Vessel sails with                     pressure.
a speed of greater than 2.7 knots and constantly changes its                                                                                                    partition 1 hour
                                                                                                   1
heading for at least three messages.                                                                                                                             pattern 1 hour
                                                                                                                                                               partition 0.5 hour
           SAR := (ChangeInHeading = TRUE ∧                                               0.8                                                                   pattern 0.5 hour
                                                                          Backpressure




                                                        3+
                    VesselType = SAR ∧ speed > 2.7)                                       0.6

  Definition 5.15. Loitering: Vessel is neither near port nor the                         0.4
coastline while it sails with speed below 0.5 knots.                                      0.2

              Loitering := (IsNear(Port) = FALSE ∧                                                 0
                            IsNear(Coast) = FALSE ∧                                                     0               5         10             15
                                         3+                                                                              CER parallelism
                            speed < 0.5)
                                                                         (c) Partition- and pattern-based parallelism for the dataset being
   5.2.3 Recognition Results. We used the 6 patterns defined             replayed in one 1 and 0.5 hours respectively. Workload is 220 pat-
above as the workload for a number of experiments. Following             terns of vessels approaching 220 different ports.
a similar approach to our fleet management experiments, we
avoided replicating the patterns to emulate a real scenario and          Figure 5: Backpressure experiments for the Maritime
evaluated them for streams of different event rates.                     dataset. The horizontal axis expresses the number of
   Figure 5a presents results for the partition-based paralleliza-       recognition threads.
tion scheme. This time the backpressure remains always above
40% for a 6-threaded input source, even with 16 recognition
threads. Due to the fact that the CEP operator cannot keep up            15.5M/4.5M ≈ 3.5 e/s. Comparing the pattern throughput rate
with the initial rate of the source, the source has to adjust its        with the event input rate showcases again a performance that
rate to match the throughput of the CER operator. According              is 4 orders of magnitude better than the real-time requirements
to the Flink dashboard, this rate is at most 180K e/s (with 16           of this use case. The results for pattern-based parallelism are
worker threads). This lower throughput of our CEP operator               presented in Figure 5b and follow a similar trend. The event rate
(compared with the fleet management use case) can be attrib-             here is capped at 90K e/s due to backpressure.
uted to the fact that the patterns are now more complex, as on              We conducted a second series of experiments with a setting
average they contain more unions, disjunctions and iterations to         where the number of patterns is naturally high, in order to de-
evaluate for every event. The duration of the dataset is 6 months        termine whether pattern-based parallelism offers an advantage
which translates to roughly 15.5M seconds. Since the total num-          in such settings. Consider the following pattern, describing the
ber of input events is about 4.5M, the average event input rate is       movement of a vessel as it a approaches a port.
   Definition 5.16. Approaching Port: Vessel is initially more than     ACKNOWLEDGMENTS
7 km away from the port, then, for at least on message, its dis-        This work was funded by European Union’s Horizon 2020 re-
tance from the port is between 5 and 7 km and finally it enters         search and innovation programme Track & Know "Big Data for
the port (i.e., its distance from the port falls below 5 km).           Mobility Tracking Knowledge Extraction in Urban Areas", under
                                                                        grant agreement No 780754. It is also supported by the European
            Port := (DistanceToPort(PortX ) > 7 .0) ∧                   Commission under the INFORE project (H2020-ICT- 825070).
                     DistanceToPort(PortX ) < 10.0)·
                                                                        REFERENCES
                     (DistanceToPort(PortX ) > 5.0) ∧                    [1] [n.d.]. Apache Flink - Stateful Computations over Data Streams. https://flink.
                     DistanceToPort(PortX ) < 7 .0)+ ·                       apache.org/.
                                                                         [2] [n.d.]. Apache Kafka. https://kafka.apache.org/.
                     (DistanceToPort(PortX ) < 5.0)                      [3] [n.d.]. Esper. http://www.espertech.com/esper.
                                                                         [4] [n.d.]. Esperonstorm. https://github.com/tomdz/storm-esper.
                                                                         [5] [n.d.]. FlinkCEP - Complex event processing for Flink. https://ci.apache.org/
   The predicate DistanceToPort calculates the distance of a vessel          projects/flink/flink-docs-stable/dev/libs/cep.html.
from the port PortX passed as argument and is evaluated online.          [6] [n.d.]. Monitoring Back Pressure. https://ci.apache.org/projects/flink/
                                                                             flink-docs-release-1.9/monitoring/back_pressure.html.
If we want to monitor vessel activity around every port in a given       [7] [n.d.]. Siddhi CEP. https://github.com/wso2/siddhi.
area, then we need to replicate this pattern N times, if there are N     [8] [n.d.].      Task chaining and resource groups.               https://ci.apache.
distinct ports. We would thus naturally have N patterns, which               org/projects/flink/flink-docs-release-1.9/dev/stream/operators/
                                                                             #task-chaining-and-resource-groups.
would be almost identical except for the argument passed to              [9] [n.d.].     WSO2. Creating a Storm Based Distributed Execu-tionPlan.
DistanceToPort (Port1 , Port2 , up to PortN ). For the area of Brest,        https://docs.wso2.com/display/CEP410/Creating+a+Storm+Based+
the total number of ports is 220. We run an experiment with                  Distributed+Execution+Plan.
                                                                        [10] E Alevizos, A Artikis, and G Paliouras. 2017. Event Forecasting with Pattern
these 220 patterns with partition- and then with pattern-based               Markov Chains. In DEBS.
parallelization. Figure 5c shows the results. Contrary to previous      [11] E Alevizos, A Artikis, and G Paliouras. 2018. Wayeb: a Tool for Complex Event
                                                                             Forecasting. In LPAR.
experiments, in this one we used a stream simulator to feed the         [12] A Artikis, M Sergot, and G Paliouras. 2015. An Event Calculus for Event
dataset to our CEP system. This simulator, instead of reading                Recognition. IEEE Trans. Knowl. Data Eng. (2015).
input events from a file and instantly sending them to our engine,      [13] C Balkesen, N Dindar, M Wetter, and N Tatbul. 2013. RIP: run-based intra-
                                                                             query parallelism for scalable complex event processing. In DEBS.
has the ability to insert a delay between consecutive events. For       [14] P Carbone, A Katsifodimos, S Ewen, V Markl, S Haridi, and K Tzoumas. 2015.
example, we can set the delay to be exactly the time difference              Apache Flink™: Stream and Batch Processing in a Single Engine. IEEE Data
between two events. This would allow us to re-play the stream                Eng. Bull. (2015).
                                                                        [15] G Cugola and A Margara. 2012. Complex event processing with T-REX. J.
as it was actually produced, which would take 6 months for                   Syst. Softw. (2012).
this dataset. We also have the ability to re-play the stream at         [16] G Cugola and A Margara. 2012. Processing flows of information: From data
                                                                             stream to complex event processing. ACM Comput. Surv. (2012).
higher speeds. For these experiments, we re-played the stream           [17] L D’Antoni and M Veanes. 2017. The Power of Symbolic Automata and
at various different speeds in order to determine the “breaking              Transducers. In CAV (1).
point” of our system. Figure 5c shows the results for two such          [18] A Demers, J Gehrke, B Panda, M Riedewald, V Sharma, and W White. 2007.
                                                                             Cayuga: A General Purpose Event Monitoring System. In CIDR.
speeds, where the whole stream was processed in 0.5 and 1 hour,         [19] N Giatrakos, E Alevizos, A Artikis, A Deligiannakis, and M Garofalakis. 2020.
corresponding to a speed-up of x8640 and x4320 compared to                   Complex event recognition in the Big Data era: a survey. VLDB J. (2020).
the original dataset. While the CEP operator lags behind the            [20] Martin Hirzel. 2012. Partition and compose: parallel complex event processing.
                                                                             In DEBS.
source when it is re-played at half an hour, it is evident that it      [21] N Koutroumanis, G Santipantakis, A Glenis, C Doulkeridis, and G Vouros.
can process it without any problems when it is replayed at one               2019. Integration of Mobility Data with Weather Information. In EDBT/ICDT
                                                                             Workshops.
hour, as both pattern- and partition-based parallelism exhibit 0%       [22] M Liu, E Rundensteiner, K Greenfield, C Gupta, S Wang, I Ari, and A Mehta.
backpressure. In fact, pattern-based parallelism performs better             2011. E-Cube: multi-dimensional event sequence analysis using hierarchical
in this experiment. This lends credence to our belief that pattern-          pattern query sharing. In SIGMOD.
                                                                        [23] Y Mei and S Madden. 2009. ZStream: a cost-based query processor for adap-
based parallelism might actually be more suitable than partition-            tively detecting composite events. In SIGMOD.
based parallelism when there is a high number of patterns to be         [24] K Patroumpas, E Alevizos, A Artikis, M Vodas, N Pelekis, and Y Theodoridis.
processed simultaneously.                                                    2017. Online event recognition from moving vessel trajectories. GeoInformat-
                                                                             ica (2017).
                                                                        [25] K Patroumpas, D Spirelis, E Chondrodima, H Georgiou, Petrou P, Tampakis
                                                                             P, Sideridis S, Pelekis N, and Theodoridis Y. 2018. Final dataset of Trajectory
6   SUMMARY AND FUTURE WORK                                                  Synopses over AIS kinematic messages in Brest area (ver. 0.8) [Data set],
In this paper, we presented Wayeb, a tool for Complex Event Pro-             10.5281/zenodo.2563256. https://doi.org/10.5281/zenodo.2563256
                                                                        [26] M Pitsikalis, A Artikis, R Dreo, C Ray, E Camossi, and A-L Jousselme. 2019.
cessing, as a means of processing big mobility data streams. We              Composite Event Recognition for Maritime Monitoring. In DEBS.
defined a number of detection patterns that are useful in fleet man-    [27] C Ray, R Dreo, E Camossi, and AL Jousselme. 2018. Heterogeneous Inte-
agement and maritime monitoring applications. Moreover, we                   grated Dataset for Maritime Intelligence, Surveillance, and Reconnaissance,
                                                                             10.5281/zenodo.1167595. https://doi.org/10.5281/zenodo.1167595
presented implementations of two distributed recognition tech-          [28] N Schultz-Møller, M Migliavacca, and P Pietzuch. 2009. Distributed complex
niques and compared their efficiency against the single-core ver-            event processing with query rewriting. In DEBS.
sion. Our results demonstrate the superiority of partition-based        [29] L Snidaro, I Visentini, and K Bryan. 2015. Fusing uncertain knowledge and
                                                                             evidence for maritime situational awareness via Markov Logic Networks. Inf.
over pattern-based parallelization, when the number of patterns              Fusion (2015).
is relatively low. When this number is significantly high, then         [30] F Terroso-Saenz, M Valdés-Vela, and A Skarmeta-Gómez. 2016. A complex
                                                                             event processing approach to detect abnormal behaviours in the marine envi-
pattern-based parallelization becomes a viable option. For the               ronment. Inf. Syst. Frontiers (2016).
future, we intend to combine various distribution techniques and        [31] E Tsilionis, N Koutroumanis, P Nikitopoulos, C Doulkeridis, and A Artikis.
to construct more patterns for the domains presented. Another                2019. Online Event Recognition from Moving Vehicles: Application Paper.
                                                                             TPLP (2019).
interesting research avenue would be to compare our automata-           [32] H Zhang, Y Diao, and N Immerman. 2014. On complexity and optimization of
based method against other approaches, such as logic-based ones,             expensive queries in complex event processing. In SIGMOD.
which have been applied to similar datasets [24, 31].