=Paper=
{{Paper
|id=Vol-2841/BMDA_5
|storemode=property
|title=Online trajectory analysis with scalable event recognition
|pdfUrl=https://ceur-ws.org/Vol-2841/BMDA_5.pdf
|volume=Vol-2841
|authors=Emmanouil Ntoulias,Elias Alevizos,Alexander Artikis,Athanasios Koumparos
|dblpUrl=https://dblp.org/rec/conf/edbt/NtouliasAAK21
}}
==Online trajectory analysis with scalable event recognition==
Online trajectory analysis with scalable event recognition Emmanouil Ntoulias Elias Alevizos NCSR “Demokritos” National and Kapodistrian University of Athens manosntoulias@iit.demokritos.gr NCSR “Demokritos” ilalev@di.uoa.gr Alexander Artikis Athanasios Koumparos University of Piraeus Vodafone Innovus NCSR “Demokritos” athanasios.koumparos@vodafoneinnovus.com a.artikis@unipi.gr ABSTRACT with other ships nearby, coastal stations, or even satellites. Cargo Moving object monitoring is becoming essential for companies ships of at least 300 gross tonnage and all passenger ships, re- and organizations that need to manage thousands or even mil- gardless of size, are nowadays required to have AIS equipment lions of commercial vehicles or vessels, detect dangerous situa- installed and regularly emit AIS messages while sailing at sea. tions (e.g., collisions or malfunctions) and optimize their behavior. Currently, there are more than 500.000 vessels worldwide that It is a task that must be executed in real-time, reporting any such can be tracked using AIS technology3 . It is crucial, both for au- situations or opportunities as soon as they appear. Given the thorities and for maritime companies, to be able to track the growing sizes of fleets worldwide, a monitoring system must behavior of ships at sea in order to avoid accidents and ensure be highly efficient and scalable. It is becoming an increasingly that ships adhere to international regulations. common requirement that such monitoring systems should be Streams of transient data emitted from vehicles or ships must able to automatically detect complex situations, possibly involv- be processed with minimal latency, if a monitoring system is to ing multiple moving objects and requiring extensive background provide significant margins for action in case of critical situations. knowledge. Building a monitoring system that is both expres- We therefore need to detect complex patterns of interest upon sive and scalable is a significant challenge. Typically, the more these streams in an online and highly efficient manner that can expressive a system is, the less flexible it becomes in terms of gracefully scale as the number of monitored entities increases. its parallelization potential. We present a system that strikes a Besides kinematic data, it is also important to be able to take balance between expressiveness and scalability. Our proposed into account static (or “almost” static, with respect to the rate system employs a formalism that allows analysts to define com- of the streaming data), background knowledge, such as weather plex patterns in a user-friendly manner while maintaining un- data, point of interest (POI) information (like gas stations, ports, ambiguous semantics and avoiding ad hoc constructs. At the parking lots, police departments, etc. [21], NATURA areas where same time, depending on the problem at hand, it can employ ships are not allowed to sail, etc. This enhanced data stream different parallelization strategies in order to address the issue produces valuable opportunities for the detection of complex of scalability. Our experimental results show that our system events. One can identify certain routes a vehicle or a ship is can detect complex patterns over moving entities with minimal taking, malfunctions in the device installed, in the GPS tracker or latency, even when the load on our system surpasses what is to the AIS transponder, cases of illegal shipping in protected areas be realistically expected in real-world scenarios. or possible collisions between ships moving dangerously close to each other, to name but a few of the possible patterns which could be of interest to analysts. 1 INTRODUCTION As a solution to the problem of monitoring of moving objects, Commercial vehicle fleets constitute a major part of Europe’s we present a Complex Event Processing system that aims to economy. There were approximately 37 million commercial vehi- improve the operating efficiency of a commercial fleet. It oper- cles in the European Union in 20151 and this number is growing ates online with enriched data in a streaming environment. Our every year with an increasing rate. Devices emitting spatial and contributions are the following: operational information are installed on commercial vehicles. • We present a Complex Event Processing (CEP) system This information helps fleet management applications improve based on symbolic automata which allows analysts to the management and planning of transportation services [31]. define complex patterns in a user-friendly language. Our Consider another case of monitoring of moving objects, equally proposed language has formal semantics, while being able important from an economic and environmental point of view: to also take into account background knowledge. maritime monitoring systems. Such systems have been attracting • We define a series of realistic complex patterns that iden- considerable attention for economic as well as environmental tify routes and malfunctions of vehicles and detect critical reasons [24, 29, 30]. The Automatic Identification System (AIS) 2 situations for vessels at sea. is used to track vessels at sea in real-time through data exchange • We present and compare various implementations of par- 1 http://www.acea.be/statistics/article/vehicles-in-use-europe-2017 allel processing techniques and discuss their applicability. 2 http://www.imo.org/OurWork/Safety/Navigation/Pages/AIS.aspx • We test our approach using large, real-world, heteroge- neous data streams from diverse application domains, © 2021 Copyright for this paper by its author(s). Published in the Workshop Proceed- ings of the EDBT/ICDT 2021 Joint Conference (March 23—26, 2021, Nicosia, Cyprus) showing that we can achieve real-time performance even on CEUR-WS.org. Use permitted under Creative Commons License Attribution 4.0 International (CC BY 4.0) 3 https://www.vesselfinder.com in cases of significantly increased load, beyond the current not always obvious how an input stream should be partitioned, demand levels. while avoiding data replication. The remainder of this paper is organized as follows. Section 2 3 AUTOMATA-BASED EVENT discusses related work, while Section 3 describes our CEP engine. RECOGNITION In Section 4 the distributed version of our engine is presented. We begin by first presenting our framework for CEP. It is based Section 5 summarizes the datasets of vehicle and vessel traces on Wayeb, a Complex Event Processing and Forecasting engine and defines the recognition patterns. It also presents our empiri- which employs symbolic automata as its computational model cal evaluation. Finally, we conclude the paper in section 6 and [10, 11]. The rationale behind our choice of Wayeb is that, con- describe our future work. trary to other automata-based CEP engines, it has clear, compo- sitional semantics due to the fact that symbolic automata have 2 RELATED WORK nice closure properties [17]. At the same time, it is expressive Complex event recognition systems accept as input a stream of enough to support most of the common CEP operators [19], while time-stamped, “simple, derived events” (SDEs). These SDEs are remaining amenable to the standard parallelization solutions. In the result of applying a simple transformation to some other this paper, we extend Wayeb’s language in order to support more event (e.g., a measurement from a sensor). By processing them, a expressive patterns. CEP engine can recognize complex events (CEs), i.e. collections Symbolic automata constitute a variation of classical automata, of SDEs satisfying some pattern. There are multiple CEP systems with the main difference being that their transitions, instead of proposed in the literature during the last 15 years, falling under being labeled with a symbol from an alphabet, are equipped with various classes [16, 19]. Automata-based systems constitute the formulas from Boolean algebra [17]. A symbolic automaton con- most common category. They compile patterns (definitions of sumes strings and, after every new element, applies the predicates complex events) into finite state automata, which are then used to of its current state’s outgoing transitions to that element. If a consume streams of simple events and report matches whenever predicate evaluates to TRUE then the corresponding transition is an automaton reaches a final state. Examples of such systems triggered and the automaton moves to that transition’s target may be found in [5, 11, 18, 28, 32]. Another important class of state. A Boolean algebra is defined as follows: CEP systems are the logic-based ones. In this case, patterns are Definition 3.1 (Effective Boolean algebra [17]). A Boolean al- defined as rules, with a head and a body defining the conditions gebra is a tuple (D, Ψ, ⟦_⟧, ⊥, ⊤, ∨, ∧, ¬) where D is a set which, if satisfied, lead to the detection of a CE. A typical example of domain elements; Ψ is a set of predicates closed under the of a logic-based system may be found in [12]. Finally, there are Boolean connectives; ⊥, ⊤ ∈ Ψ; the component ⟦_⟧ : Ψ → 2 D some tree-based systems, such as [22, 23], which are attractive is a denotation function such that ⟦⊥⟧ = ∅, ⟦⊤⟧ = D and because they are amenable to various optimization techniques. ∀ϕ,ψ ∈ Ψ: a) ⟦ϕ ∨ ψ ⟧ = ⟦ϕ⟧ ∪ ⟦ψ ⟧; b) ⟦ϕ ∧ ψ ⟧ = ⟦ϕ⟧ ∩ ⟦ψ ⟧; and For efficient processing on big data streams, distributed archi- c) ⟦¬ϕ⟧ = D \ ⟦ϕ⟧. tectures need to be employed [19]. Big data platforms, such as Apache Spark and Storm, have been used to embed CEP engines Elements of D are called characters and finite sequences of into their operators. Both platforms have incorporated Sidhi [7, 9] characters are called strings. A set of strings L constructed from and Esper [3, 4] as their embedded engines. Flink [1], on the other elements of D (L ⊆ D ∗ , where ∗ denotes Kleene-star) is called hand, provides support for CEP with the FlinkCEP built-in library a language over D. [5]. Besides using these Big Data platforms, numerous other par- Wayeb uses symbolic regular expressions to define patterns allelization techniques have been proposed in the literature that and to represent a class of languages over D. Wayeb’s standard can achieve a more fine-grained control over how the processing operators are those of the classical regular expressions, i.e., con- load is distributed among workers. Pattern-based parallelization catenation, disjunction and Kleene-star. We extend Wayeb to is the most obvious solution, where the patterns are distributed include various extra CEP operators: that of negation and those among the processing units [15]. One disadvantage of this paral- of different selection policies (see [19] for a discussion of selection lelization scheme is that events have to be replicated to multiple policies). Symbolic regular expressions are defined as follows: processing units, since a new input event may need to be con- Definition 3.2 (Symbolic regular expression). A Wayeb symbolic sumed by more than one pattern. Moreover, the parallelization regular expression (SRE) over a Boolean algebra (D, Ψ, ⟦_⟧, ⊥, level is necessarily limited by the number of patterns (for a single ⊤, ∨, ∧, ¬) is recursively defined as follows: pattern, this method offers no benefits). Operator-based paral- • If ψ ∈ Ψ, then R := ψ is a symbolic regular expression, lelization constitutes another approach, where the CEP operators with L(ψ ) = ⟦ψ ⟧, i.e., the language of ψ is the subset of are assigned to different processing units [13, 28]. This allows for D for which ψ evaluates to TRUE; multi-pattern optimizations and avoids the data replication issue • Disjunction / Union: If R 1 and R 2 are symbolic regular of the previous technique. On the other hand, the paralleliza- expressions, then R := R 1 + R 2 is also a symbolic regular tion level is again limited, this time by the number of operators expression, with L(R) = L(R 1 ) ∪ L(R 2 ); present in the pattern (which is closely related to the number • Concatenation / Sequence: If R 1 and R 2 are symbolic reg- of automaton states in automata-based CEP systems). Finally, ular expressions, then R := R 1 · R 2 is also a symbolic in data-parallelization schemes, events are split among multiple regular expression, with L(R) = L(R 1 ) · L(R 2 ), where · instances of the same pattern [20]. For example, a pattern trying denotes concatenation. L(R) is then the set of all strings to detect violations of speed limits must be applied to all the constructed from concatenating each element of L(R 1 ) monitored vehicles and thus the input stream may be partitioned with each element of L(R 2 ); according to the id of the vehicles. The advantage of this method • Iteration / Kleene-star: If R is a symbolic regular expres- is that it can scale well with the input event rate. It is, however, sion, then R ′ := R ∗ is a symbolic regular expression, with L(R ∗ ) = (L(R))∗ , where L ∗ = Ð L i and L i is the con- Table 1: An example stream composed of six events. Each i ≥0 event has a vehicle identifier, a value for that vehicle’s catenation of L with itself i times. speed and a timestamp. • Bounded iteration: If R is a symbolic regular expression, then R ′ := R x + is a symbolic regular expression, with x t imes vehicle id 78986 78986 78986 78986 78986 ... Rx + = R · · · · · R · R∗ . speed 85 93 99 104 111 ... • Negation / complement: If R is a symbolic regular expres- sion, then R ′ := !R is a symbolic regular expression, with timestamp 1 2 3 4 5 ... L(R ′ ) = (L(R))c . • skip-till-any-match selection policy: If R 1 , R 2 , · · · , Rn are sym- > bolic regular expressions, then R ′ := #(R 1 , R 2 , · · · , Rn ) is a symbolic regular expression, with R ′ := R 1 · ⊤∗ · R 2 · speed > 100 speed > 100 start 0 1 2 ⊤∗ · · · ⊤∗ · Rn . • skip-till-next-match selection policy: If R 1 , R 2 , · · · , Rn are sym- bolic regular expressions, then R ′ := @(R 1 , R 2 , · · · , Rn ) is Figure 1: Streaming symbolic automaton created from the a symbolic regular expression, with R ′ := R 1 ·!(⊤∗ · R 2 · expression R := (speed > 100) · (speed > 100). ⊤∗ ) · R 2 · · ·!(⊤∗ · Rn · ⊤∗ ) · Rn . CEP Engine A Wayeb expression without a selection policy implicitly fol- CEP Engine lows the strict-contiguity policy, i.e., the SDEs involved in a match event1 pattern1 flinkSource event2 pattern2 of a pattern should occur contiguously in the input stream. The ... ... ... other two selection policies relax the strict requirement of con- eventN CEP Engine patternN tiguity (see [19] for details). Note that all these operators, even (a) Pattern-based parallelization. those of selection policies, may be arbitrarily used and nested CEP Engine in an expression, without any limitations. This is in contrast to CEP Engine other CEP systems where nested operations may be prohibited. event1 pattern1 Wayeb patterns are defined as symbolic regular expressions flinkSource event2 ... ... pattern2 ... which are subsequently compiled into symbolic automata. The eventN CEP Engine patternN definition for a symbolic automaton is the following: (b) Partition-based parallelization. Definition 3.3 (Symbolic finite automaton [17]). A symbolic flinkSource events CEP Engine finite automaton (SFA) is a tuple M =(A, Q, qs , F , ∆), where A is flinkSource events CEP Engine pattern1 an effective Boolean algebra; Q is a finite set of states; qs ∈ Q is ... pattern2 ... the initial state; Q f ⊆ Q is the set of final states; ∆ ⊆ Q × ΨA ×Q flinkSource events CEP Engine patternN is a finite set of transitions. (c) Special case of Partition-Based paralleliza- A string w = a 1a 2 · · · ak is accepted by a SFA M iff, for 1 ≤ tion when one-to-one relation between sources ai and CEP engines exists. i ≤ k, there exist transitions qi−1 → qi such that q 0 = qs and f qk ∈ Q . The set of strings accepted by M is the language of M, denoted by L(M). It can be proven that every symbolic regular Figure 2: Parallel schemes used with Wayeb. expression can be translated to an equivalent (i.e., with the same language) symbolic automaton [17]. The prefix ⊤∗ lets us skip any number of events from the stream We are now in a position to precisely define the meaning of and start recognition at any index m, 1 ≤ m ≤ k. “complex events”. Input events come in the form of tuples with As an example, consider the domain of vehicle monitoring. both numerical and categorical values. These tuples constitute the An analyst could use the Wayeb language to define the pattern set of domain elements D. A stream S is an infinite sequence S = R := (speed > 100) · (speed > 100) in order to detect speed viola- t 1 , t 2 , · · · , where each ti is a tuple (ti ∈ D). Our goal is to report tions on roads where the maximum allowed speed is 100 km/h. the indices i at which a CE is detected. If S 1..k = · · · , tk −1 , tk is This pattern detects two consecutive events where the speed the prefix of S up to the index k, we say that an instance of a SRE exceeds the threshold in order to avoid cases where a vehicle R is detected at k iff there exists a suffix Sm..k of S 1..k such that momentarily exceeds the threshold, possibly due to some mea- Sm..k ∈ L(R). If we attempted to detect CEs, as defined above, by surement error. This pattern would be compiled to the (non- directly compiling an expression R to an automaton, we would deterministic) automaton of Figure 1. Table 1 shows an example fail. Consider, for example, the (classical) regular expression R := stream processed by this automaton. For the first three input a · b and the (classical) stream/string S = a, b, c, a, b, c. If we events, the automaton would remain in its start state, state 0. compile R to a (classical) automaton and feed S to it, then the After the fourth event, it would move to state 1 and after the fifth automaton would reach its final state after reading the second event it would reach its final state, state 2. We would thus say element t 2 of the string. However, it would then never reach its that a complex event R was detected at timestamp = 5. final state again. We would like our automaton to reach its final state every time it encounters a, b as a suffix, e.g., again after reading t 5 of S. We can achieve this with a simple trick. Instead of 4 SCALABLE EVENT RECOGNITION OVER using R, we first convert it to Rs = ⊤∗ · R. Using Rs we can detect MULTIPLE TRAJECTORIES CEs of R while consuming a stream S, since a stream segment We now discuss how various parallelization schemes may be ap- Sm..k is recognized by R iff the prefix S 1..k is recognized by Rs . plied to our CEP engine. For this purpose, we leverage a popular streaming platform, Apache Flink [1, 14]. Flink is a distributed 5 EXPERIMENTAL EVALUATION processing engine for stateful computations over unbounded and We present an extensive experimental evaluation of our parallel bounded data streams. It is designed to run in cluster environ- CEP engine on two datasets containing real-world trajectories ments and perform computations at in-memory speed. In this of moving objects. The first dataset comes from the domain of paper, we focus on two parallelization techniques: pattern-based fleet management for vehicles moving on roads and emitting and partition-based parallelization [19]. We currently exclude information about their status. The second dataset consists of state-based parallelization, since, as explained above, its paral- vessel trajectories from ships sailing at sea. In both cases, our lelization level is limited by the number of automaton states, goal is to simultaneously monitor thousands of moving objects which is typically quite low (it is often a single-digit number). and detect interesting (or even critical) behavioral patterns in We do intend, however, to examine in the future how it could real-time, as defined by domain experts. All experiments were be combined with pattern- or partition-based parallelization to conducted on a server with 24 processors. Each processor is an provide them with an extra performance boost. Intel(R) Xeon(R) CPU E5-2630 v2 @ 2.60GHz. The server has 252 In pattern-based parallelization, each available CEP engine GB of RAM. The source code for Wayeb may be found in the receives a unique subset of the patterns and performs recognition following repository: https://github.com/ElAlev/Wayeb. for these patterns only, with these subsets being (almost) equal in size (see Figure 2a). On the other hand, the stream is broadcast 5.1 Fleet Management to all parallel instances of any downstream operators. This is a Efficient fleet management is essential for transportation and significant (yet unavoidable) drawback of pattern-based paral- logistics companies. We show how our proposed solution can lelization, since each worker has a subset of the patterns while effectively help in this task. With the help of experts, we define a each pattern may need to process the whole stream. Note that set of patterns to be detected on real-time streams of trajectories each blue rectangle in Figure 2a represents a single thread. This and show that our engine can detect these patterns with an means that in this architecture we have one thread for the source efficiency that is orders of magnitude better than real-time. plus as many threads as the parallelism of the CEP operator. In partition-based parallelization the opposite happens. Every 5.1.1 Dataset Description. The dataset is provided by Voda- CEP engine is initialized with all patterns, but the stream is not fone Innovus4 , our partner in the Track & Know project, which broadcast (see Figure 2b). A partitioning function is used to decide offers fleet management services. It contains approximately 270M where each new input event should be forwarded. This function records (243GB). It covers a period of 5 months, from June 30, takes as input any attribute of the event (we use the id of a vehicle 2018 11:00:00 PM to November 30, 2018 11:59:59 PM. The initial or vessel) and, by performing hashing, it outputs which parallel source emitting the data is composed of GPS (Global Positioning instance of the next operator the event will go to. As with pattern- System) traces of moving vehicles. The data also includes speed based parallelization, we have one thread for the source plus as information provided by an installed accelerometer and informa- many threads as the parallelism of the CEP operator. tion regarding the level of fuel in a vehicle’s tank measured by a Besides Flink, we also use the Apache Kafka messaging plat- fuel sensor. It is also enriched with weather and point-of-interest form to connect our stream sources to Wayeb instances [2]. Kafka (POI) information (e.g., if a vehicle is close to a gas station, a provides various ways to consume streams. So far, we have fo- university, a school etc), as described in [31]. Duration, accela- cused on linear streams, i.e., events are assumed to be totally ration and distance are some extra attributes that are calculated ordered and arrive at our system sequentially one after another. on the fly as they enter our system by storing information from With Kafka, however, there is the option of using parallel input previous events. streams. A Kafka input topic can have multiple partitions and 5.1.2 Pattern Definitions. The first pattern we have defined each partition can be consumed in parallel by a different con- concerns vehicle routes. A route is the basic element of vehicle sumer. In this case the input stream is already partitioned on management and aggregates data between the start and the end some attribute of the events. point of a vehicle’s motion cycle. A motion cycle is based on Through this Kafka functionality, a variant of partition-based the engine status. Each vehicle route must start and end with parallelization becomes possible, where both the input source an “engine-off” message, i.e., a message whose engine status and the recognition engine work in parallel (see Figure 2c). If attribute is “off”. According to Vodafone Innovus, there are 12 the parallelism of the input source (i.e., the number of partitions patterns that describe the most frequent routes. These 12 route of the topic is the same as that of the recognition operator (i.e., patterns can be expressed with a single Wayeb pattern as follows: number of CEP engines), then we can simply attach each source instance to a CEP engine instance without further re-partitioning Definition 5.1. A route pattern for a vehicle is defined as the on our end. When, however, the parallelisms are different, fur- following sequence: emitting “engine-off” messages for at least ther re-partitioning is performed by partitioning each source the 30 minutes, emitting at least one “moving” message and again same way we partitioned the single threaded source. Unlike the emitting “engine-off” messages for at least 30 minutes: previous architectures, each pair of a source and a CEP operator Route := (Engine = Off ∧ Duration > 30)· parallel instances belong in a single thread. This is attributed to operator chaining [8], a Flink mechanism that chains operators (Engine = Moving)+ · of the same parallelism in a single thread for better performance. (Engine = Off ∧ Duration > 30) Similarly to partition, we can have multiple sources for pattern based parallelism as well. Messages in Kafka are still partitioned Unfortunately, the expected data flow can be corrupted due to a by a desired attribute, albeit we always have to perform the variety of reasons. These reasons include bad connection during broadcasting step for each source. Hence, we don’t have a variant the device installation or after the vehicle has been serviced, of pattern parallelization, rather we use it only as a way to process movement of the satellites, hardware malfunctions or, simply, streams of greater input rate. 4 https://www.vodafoneinnovus.com/ just an issue with the GPS. The result of these reasons is reflected in the data. For example, coordinates may change even though the vehicle is not moving or the vehicle may be moving but the coordinates remain the same. It is also often the case that the engine status is incorrect (e.g., parked messages are emitted even though engine is on, vehicle is moving yet engine status is not moving etc). These issues are important and need to be detected. We have summarized those issues in a number of patterns, defined as follows: Definition 5.2. ParkedMovingSwing. Engine status swings be- tween “parked” and “moving” during consecutive events. (a) 16 patterns ParkedMovingSwing := (Engine = Parked) · (Engine = Moving)· (Engine = Parked) · (Engine = Moving) Definition 5.3. IdleParkedSwing. Engine status swings between “idle” and “parked” during consecutive events. IdleParkedSwing := (Engine = Idle) · (Engine = Parked)· (Engine = Idle) · (Engine = Parked) Definition 5.4. SpeedSwing. Speed swings between 0km/h and greater than 50km/h during consecutive events. (b) 48 patterns SpeedSwing := (Speed > 50)·(Speed = 0)·(Speed > 50)·(Speed = 0) Figure 3: Recognition times of different parallelization techniques for different workloads. The horizontal axis Definition 5.5. MovingWithZeroSpeed. Engine status is “mov- represents the number of worker threads. ing”, distance traveled is greater than 30m, yet speed is 0km/h for more than 3 consecutive messages. 5.1.3 Recognition Results. Figure 3a showcases recognition MWZS := (Engine = Moving ∧ Speed = 0 ∧ Distance > 30)3+ times for our various parallelization techniques: pattern-based, partition-based and partition-based with one source per engine. Definition 5.6. MovingWithBadSignal. Vehicle is accelerating We also show results for the non-parallel version. We have du- and distance traveled is greater than 30m yet there are no satel- plicated some of the patterns defined previously to simulate a lites tracking the vehicle for more than 3 consecutive messages. greater workload of 16 patterns. We have repeated the experiment for 1, 2, 4 ,8 and 16 cores. Compared to the original single-core MWBS := (Accelaration ∧ Satellites = 0 ∧ Distance > 30)3+ version, all three parallelization techniques exhibit speed-ups. For partition- and pattern-based parallelization, however, there In addition to the above issues, possibly related to malfunc- seems to be an upper limit on the number of cores it is most tions, experts are also interested in the following patterns: efficient to use. For pattern-based parallelization there is a signif- icant raise in time after 4 cores, while for partition-based there is Definition 5.7. Possible Theft. Engine status is parked, speed is no improvement after 2 cores. The reason is that the single source 0km/h and distance traveled is greater than 30m for more than 3 acts as a bottleneck. For partition-based parallelization we have consecutive messages. one thread (the partitioner) deciding in which core each event will be forwarded, while for pattern-based events are broadcast PossibleTheft := (Engine = Parked ∧ to all cores, again in a single threaded manner. This explanation is supported by the smooth decrease in time when a parallel Speed = 0 ∧ Distance > 30)3+ source is used for partition-based parallelization. While it starts Definition 5.8. Dangerous Driving. There is ice on the road and off worse than pattern- and partition-based, it exhibits the best the vehicle is moving above a specific speed limit for at least 2 results for 16 cores. consecutive messages. To further support our claim above, we conducted a second experiment with a larger load, as shown in Figure 3b. 48 pat- terns were used this time (replicated in similar fashion as before) DangerousDriving := (IceExists = TRUE ∧ Speed > vlimit )2+ without any other changes. Indeed, with every recognition node having more work to do, execution time becomes less dependent Definition 5.9. Refuel Opportunity. Vehicle is close to a gas on partitioning/broadcasting and more dependent on the actual station and the fuel in the tank is less than 50% for at least 2 recognition. Hence, speed-ups are now visible even for higher consecutive messages. number of cores. As suspected, for parallel sources the results are not affected. Partition-based parallelization with parallel sources RefuelOpp := (CloseToGasStation = TRUE ∧ FuelLevel < 0.5)2+ is slower when few threads are used (e.g., for 2 threads). This is par = 1 Results for Partition-based distribution are presented in Fig- par = 2 0.8 par = 3 ure 4a. The backpressure ratio is plotted against the number of par = 4 threads used for recognition. Generally, the more threads used par = 5 Backpressure 0.6 par = 6 the faster Wayeb can process events and hence less pressure is noted. Each dashed line represents a source with parallelism 0.4 varying from 1 to 6. The event rate of a parallel source is mea- 0.2 sured by executing an experiment with 0% backpressure (i.e., it will not slow down due to pressure) and summing the event rate 0 of each parallel instance presented by the flink dashboard (e.g., 0 5 10 15 for 2 parallel instances of 120K events/second (e/s) for each one CER parallelism the overall sum is 120 + 120 = 240K (e/s)). Although, the greater the parallelism of the source the larger the event rate, it is not a (a) Partition-based. multiplier as for 1, 2, 3, 4, 5 and 6 sources the rate becomes 130K, par=1 par=2 240K, 350K, 430K, 440K and 490K e/s respectively. The workload 0.8 par=3 in this experiment tries to emulate a real scenario and hence the par=4 par=5 route and the 5 malfunction patterns are used (i.e., they are not Backpressure 0.6 par=6 replicated). The results show that Wayeb can effectively process streams of at least 490K e/s (black line) for this workload as 0 0.4 pressure is exhibited when 16 workers are used. As it was stated before, the duration of the dataset is 5 months which translates 0.2 to roughly 13M seconds. Since the total number of input events is 270M, the average event input rate is 270M/13M ≈ 20 e/s. Com- 1 2 3 4 5 6 paring the pattern throughput rate (490K e/s) with the event input CER parallelism rate clearly exhibits a performance that is 4 orders of magnitude (b) Pattern-based. better than the real-time requirements of this use case. We perform a similar experiment for pattern-based paralleliza- tion. This time the number of threads used for recognition varies Figure 4: Backpressure experiments for fleet management. between 1, 2, 3 and 6 as there are only 6 patterns - a handicap The horizontial axis expresses the number of recognition of this technique discussed earlier. Figure 4b showcases the re- threads. Workload is 6 patterns. Each dashed line repre- sults of our experiment. Unfortunately, even with 6 recognition sents a different parallelism of the source stream. threads and a single threaded source (purple dashed line) there is about 13% backpressure. Due to this, all sources are being slowed down to 110K e/s regardless of their parallelism. There is a drop because the other two techniques have an extra thread handling in pressure the more recognition threads are used due to the the whole stream source and the work is in fact split between better distribution of the patterns. However, it is not significant this one source thread and two others performing recognition. as the events are also multiplied as many times as the number of We thus have 3 threads performing similar volumes of work. these threads and add extra pressure. Eventually more space is When parallel sources are used, however, each parallel source is requested from the output network buffers of the source. chained to a Wayeb engine in a single thread and we thus have 2 threads doing more work. Each thread handles half the source 5.2 Maritime Monitoring and performs recognition on half the stream. We now present experimental results on another real-world In order to evaluate recognition speed independently from dataset. This dataset contains trajectories of vessels sailing at source speed we had to turn operator chaining off as we can’t sea. We have defined a set of patterns that are similar to the measure them separately when they belong in the same thread ones presented in [24, 26], which have been constructed with the (i.e., in the case of one source per CEP engine). In addition, we help of domain experts. We demonstrate the effectiveness of our leveraged parallel sources to achieve input streams of higher system which is capable of efficiently processing a dataset that input rate. The goal here is to determine if our system can process contains trajectories from ≈ 5K vessels and covers a period of 6 input events faster than the source produces them. Flink offers a months in less than one hour. metric for this purpose, called backpressure [6]. Backpressure is judged on the availability of output buffers. Assuming that some 5.2.1 Dataset Description. A public dataset of 18M position task A sends events to some task B, if there is no output buffer signals from 5K vessels sailing in the Atlantic Ocean around the available for task A, we say that task B is back pressuring task port of Brest, France, between October 1st 2015 and 31st March A. In our case A, is the source operator and B is the operator 2016 has been utilized [27]. A derivative dataset has been released with the CEP Engine. 100 samples (each sample checks if there is in [25], containing a compressed version of the original dataset any output buffer available) are triggered every 50ms in order to (4.5M signals), as decribed in [24]. Each trajectory in this dataset measure backpressure. The resulting ratio notifies us how many contains only the so-called critical points of the original trajec- of these samples were indicating back pressure, e.g. 0.6 indicates tory, i.e., points that indicate a significant change in a vessel’s that 60 in 100 were stuck requesting buffers from the network behavior (e.g., a change in speed or heading) and from which the stack. According to the documentation [6] a ratio between 0 original trajectory can be faithfully reconstructed. We processed and 0.1 is normal. 0.1 to 0.5 is considered to be low and anything these compressed trajectories in order to determine the proximity above 0.5 is high. Note that low and high pressure will slow down of vessels to various areas and locations of interest, such as ports, the source to match the throughput of the pressuring operator. fishing areas, protected NATURA areas, the coastline, etc. 1 5.2.2 Pattern Definitions. We now present a detailed descrip- par = 1 par = 2 tion of the maritime patterns that we implemented, assuming 0.8 par = 3 that the input events contain the information described above. par = 4 par = 5 Backpressure 0.6 Definition 5.10. High Speed Near Coast: Vessel is within 300 par = 6 meters from the coast and is sailing with speed greater than 5 0.4 knots for at least one message. 0.2 HSNC := (IsNear(Coast) = TRUE ∧ Speed > 5)+ 0 Definition 5.11. Anchored: Vessel is inside an anchorage area 0 5 10 15 or near a port and is sailing with speed less than 0.5 knots for at CER parallelism least three messages. (a) Partition-based backpressure for a workload of 6 patterns. Anchored := ((IsNear(Port) = TRUE ∨ Each dashed line represents a different parallelism of the source stream. Event rate is capped at 180K e/s due to back- WithinArea(Anchorage) = TRUE) ∧ pressure. speed < 0.5)3+ 1 par=1 par=2 Definition 5.12. Drifting: There is a difference between heading 0.8 par=3 and actual course over ground greater than 30 degrees while the par=4 par=5 Backpressure vessel is sailing with at least 0.5 knots for at least three messages. 0.6 par=6 Drifting := (|Heading − Cog| > 30 ∧ Speed > 0.5)3+ 0.4 Definition 5.13. Trawling: A vessel is inside a fishing area sail- 0.2 ing with speed between 1 and 9 knots for at least three messages. In addition, it must be a fishing vessel. 0 1 2 3 4 5 6 Trawling := (VesselType = Fishing ∧ CER parallelism WithinArea(Fishing) = TRUE ∧ (b) Pattern-based backpressure for a workload of 6 patterns. speed > 1.0 ∧ speed < 9.0)3+ Each dashed line represents a different parallelism of the source stream. Event rate is capped at 90K e/s due to back- Definition 5.14. Search and Rescue: A SAR Vessel sails with pressure. a speed of greater than 2.7 knots and constantly changes its partition 1 hour 1 heading for at least three messages. pattern 1 hour partition 0.5 hour SAR := (ChangeInHeading = TRUE ∧ 0.8 pattern 0.5 hour Backpressure 3+ VesselType = SAR ∧ speed > 2.7) 0.6 Definition 5.15. Loitering: Vessel is neither near port nor the 0.4 coastline while it sails with speed below 0.5 knots. 0.2 Loitering := (IsNear(Port) = FALSE ∧ 0 IsNear(Coast) = FALSE ∧ 0 5 10 15 3+ CER parallelism speed < 0.5) (c) Partition- and pattern-based parallelism for the dataset being 5.2.3 Recognition Results. We used the 6 patterns defined replayed in one 1 and 0.5 hours respectively. Workload is 220 pat- above as the workload for a number of experiments. Following terns of vessels approaching 220 different ports. a similar approach to our fleet management experiments, we avoided replicating the patterns to emulate a real scenario and Figure 5: Backpressure experiments for the Maritime evaluated them for streams of different event rates. dataset. The horizontal axis expresses the number of Figure 5a presents results for the partition-based paralleliza- recognition threads. tion scheme. This time the backpressure remains always above 40% for a 6-threaded input source, even with 16 recognition threads. Due to the fact that the CEP operator cannot keep up 15.5M/4.5M ≈ 3.5 e/s. Comparing the pattern throughput rate with the initial rate of the source, the source has to adjust its with the event input rate showcases again a performance that rate to match the throughput of the CER operator. According is 4 orders of magnitude better than the real-time requirements to the Flink dashboard, this rate is at most 180K e/s (with 16 of this use case. The results for pattern-based parallelism are worker threads). This lower throughput of our CEP operator presented in Figure 5b and follow a similar trend. The event rate (compared with the fleet management use case) can be attrib- here is capped at 90K e/s due to backpressure. uted to the fact that the patterns are now more complex, as on We conducted a second series of experiments with a setting average they contain more unions, disjunctions and iterations to where the number of patterns is naturally high, in order to de- evaluate for every event. The duration of the dataset is 6 months termine whether pattern-based parallelism offers an advantage which translates to roughly 15.5M seconds. Since the total num- in such settings. Consider the following pattern, describing the ber of input events is about 4.5M, the average event input rate is movement of a vessel as it a approaches a port. Definition 5.16. Approaching Port: Vessel is initially more than ACKNOWLEDGMENTS 7 km away from the port, then, for at least on message, its dis- This work was funded by European Union’s Horizon 2020 re- tance from the port is between 5 and 7 km and finally it enters search and innovation programme Track & Know "Big Data for the port (i.e., its distance from the port falls below 5 km). Mobility Tracking Knowledge Extraction in Urban Areas", under grant agreement No 780754. It is also supported by the European Port := (DistanceToPort(PortX ) > 7 .0) ∧ Commission under the INFORE project (H2020-ICT- 825070). DistanceToPort(PortX ) < 10.0)· REFERENCES (DistanceToPort(PortX ) > 5.0) ∧ [1] [n.d.]. Apache Flink - Stateful Computations over Data Streams. https://flink. DistanceToPort(PortX ) < 7 .0)+ · apache.org/. [2] [n.d.]. Apache Kafka. https://kafka.apache.org/. (DistanceToPort(PortX ) < 5.0) [3] [n.d.]. Esper. http://www.espertech.com/esper. [4] [n.d.]. Esperonstorm. https://github.com/tomdz/storm-esper. [5] [n.d.]. FlinkCEP - Complex event processing for Flink. https://ci.apache.org/ The predicate DistanceToPort calculates the distance of a vessel projects/flink/flink-docs-stable/dev/libs/cep.html. from the port PortX passed as argument and is evaluated online. [6] [n.d.]. Monitoring Back Pressure. https://ci.apache.org/projects/flink/ flink-docs-release-1.9/monitoring/back_pressure.html. If we want to monitor vessel activity around every port in a given [7] [n.d.]. Siddhi CEP. https://github.com/wso2/siddhi. area, then we need to replicate this pattern N times, if there are N [8] [n.d.]. Task chaining and resource groups. https://ci.apache. distinct ports. We would thus naturally have N patterns, which org/projects/flink/flink-docs-release-1.9/dev/stream/operators/ #task-chaining-and-resource-groups. would be almost identical except for the argument passed to [9] [n.d.]. WSO2. Creating a Storm Based Distributed Execu-tionPlan. DistanceToPort (Port1 , Port2 , up to PortN ). For the area of Brest, https://docs.wso2.com/display/CEP410/Creating+a+Storm+Based+ the total number of ports is 220. We run an experiment with Distributed+Execution+Plan. [10] E Alevizos, A Artikis, and G Paliouras. 2017. Event Forecasting with Pattern these 220 patterns with partition- and then with pattern-based Markov Chains. In DEBS. parallelization. Figure 5c shows the results. Contrary to previous [11] E Alevizos, A Artikis, and G Paliouras. 2018. Wayeb: a Tool for Complex Event Forecasting. In LPAR. experiments, in this one we used a stream simulator to feed the [12] A Artikis, M Sergot, and G Paliouras. 2015. An Event Calculus for Event dataset to our CEP system. This simulator, instead of reading Recognition. IEEE Trans. Knowl. Data Eng. (2015). input events from a file and instantly sending them to our engine, [13] C Balkesen, N Dindar, M Wetter, and N Tatbul. 2013. RIP: run-based intra- query parallelism for scalable complex event processing. In DEBS. has the ability to insert a delay between consecutive events. For [14] P Carbone, A Katsifodimos, S Ewen, V Markl, S Haridi, and K Tzoumas. 2015. example, we can set the delay to be exactly the time difference Apache Flink™: Stream and Batch Processing in a Single Engine. IEEE Data between two events. This would allow us to re-play the stream Eng. Bull. (2015). [15] G Cugola and A Margara. 2012. Complex event processing with T-REX. J. as it was actually produced, which would take 6 months for Syst. Softw. (2012). this dataset. We also have the ability to re-play the stream at [16] G Cugola and A Margara. 2012. Processing flows of information: From data stream to complex event processing. ACM Comput. Surv. (2012). higher speeds. For these experiments, we re-played the stream [17] L D’Antoni and M Veanes. 2017. The Power of Symbolic Automata and at various different speeds in order to determine the “breaking Transducers. In CAV (1). point” of our system. Figure 5c shows the results for two such [18] A Demers, J Gehrke, B Panda, M Riedewald, V Sharma, and W White. 2007. Cayuga: A General Purpose Event Monitoring System. In CIDR. speeds, where the whole stream was processed in 0.5 and 1 hour, [19] N Giatrakos, E Alevizos, A Artikis, A Deligiannakis, and M Garofalakis. 2020. corresponding to a speed-up of x8640 and x4320 compared to Complex event recognition in the Big Data era: a survey. VLDB J. (2020). the original dataset. While the CEP operator lags behind the [20] Martin Hirzel. 2012. Partition and compose: parallel complex event processing. In DEBS. source when it is re-played at half an hour, it is evident that it [21] N Koutroumanis, G Santipantakis, A Glenis, C Doulkeridis, and G Vouros. can process it without any problems when it is replayed at one 2019. Integration of Mobility Data with Weather Information. In EDBT/ICDT Workshops. hour, as both pattern- and partition-based parallelism exhibit 0% [22] M Liu, E Rundensteiner, K Greenfield, C Gupta, S Wang, I Ari, and A Mehta. backpressure. In fact, pattern-based parallelism performs better 2011. E-Cube: multi-dimensional event sequence analysis using hierarchical in this experiment. This lends credence to our belief that pattern- pattern query sharing. In SIGMOD. [23] Y Mei and S Madden. 2009. ZStream: a cost-based query processor for adap- based parallelism might actually be more suitable than partition- tively detecting composite events. In SIGMOD. based parallelism when there is a high number of patterns to be [24] K Patroumpas, E Alevizos, A Artikis, M Vodas, N Pelekis, and Y Theodoridis. processed simultaneously. 2017. Online event recognition from moving vessel trajectories. GeoInformat- ica (2017). [25] K Patroumpas, D Spirelis, E Chondrodima, H Georgiou, Petrou P, Tampakis P, Sideridis S, Pelekis N, and Theodoridis Y. 2018. Final dataset of Trajectory 6 SUMMARY AND FUTURE WORK Synopses over AIS kinematic messages in Brest area (ver. 0.8) [Data set], In this paper, we presented Wayeb, a tool for Complex Event Pro- 10.5281/zenodo.2563256. https://doi.org/10.5281/zenodo.2563256 [26] M Pitsikalis, A Artikis, R Dreo, C Ray, E Camossi, and A-L Jousselme. 2019. cessing, as a means of processing big mobility data streams. We Composite Event Recognition for Maritime Monitoring. In DEBS. defined a number of detection patterns that are useful in fleet man- [27] C Ray, R Dreo, E Camossi, and AL Jousselme. 2018. Heterogeneous Inte- agement and maritime monitoring applications. Moreover, we grated Dataset for Maritime Intelligence, Surveillance, and Reconnaissance, 10.5281/zenodo.1167595. https://doi.org/10.5281/zenodo.1167595 presented implementations of two distributed recognition tech- [28] N Schultz-Møller, M Migliavacca, and P Pietzuch. 2009. Distributed complex niques and compared their efficiency against the single-core ver- event processing with query rewriting. In DEBS. sion. Our results demonstrate the superiority of partition-based [29] L Snidaro, I Visentini, and K Bryan. 2015. Fusing uncertain knowledge and evidence for maritime situational awareness via Markov Logic Networks. Inf. over pattern-based parallelization, when the number of patterns Fusion (2015). is relatively low. When this number is significantly high, then [30] F Terroso-Saenz, M Valdés-Vela, and A Skarmeta-Gómez. 2016. A complex event processing approach to detect abnormal behaviours in the marine envi- pattern-based parallelization becomes a viable option. For the ronment. Inf. Syst. Frontiers (2016). future, we intend to combine various distribution techniques and [31] E Tsilionis, N Koutroumanis, P Nikitopoulos, C Doulkeridis, and A Artikis. to construct more patterns for the domains presented. Another 2019. Online Event Recognition from Moving Vehicles: Application Paper. TPLP (2019). interesting research avenue would be to compare our automata- [32] H Zhang, Y Diao, and N Immerman. 2014. On complexity and optimization of based method against other approaches, such as logic-based ones, expensive queries in complex event processing. In SIGMOD. which have been applied to similar datasets [24, 31].