<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>An optimistic approach to handle out-of-order events within analytical stream processing</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>Igor E. Kuralenok</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Nikita Marshalkin</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Artem Trofimov</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Boris Novikov</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>JetBrains Research Saint Petersburg</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Russia</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>ikuralenok@gmail.com</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>marnikitta@gmail.com</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>trofimov</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>artem@gmail.com</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>borisnov@acm.org</string-name>
        </contrib>
      </contrib-group>
      <fpage>22</fpage>
      <lpage>29</lpage>
      <abstract>
        <p>-In recent years, there has been a growth in research and industrial solutions in the field of distributed stream processing. However, even state-of-the-art stream processing systems experience difficulties with out-of-order data arrival. The most common solution to this issue is buffering. Its main problem is the extra cost for blocking before each order-sensitive operation. The goal of this paper is to propose and evaluate an optimistic approach to handle out-of-order events. We introduce a method that is suitable for any stateful operation and needs a single buffer for the complete computational pipeline. Such technique requires extra network transfers and re-computations, but the experiments demonstrate that a prototype of our approach is able to create low overhead while ensuring the correct ordering.</p>
      </abstract>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>-</title>
      <p>
        Nowadays, a lot of real-life applications use stream
processing for network monitoring, financial analytics, training
machine learning models, etc. State-of-the-art industrial stream
processing systems, such as Flink [
        <xref ref-type="bibr" rid="ref1">1</xref>
        ], Samza [
        <xref ref-type="bibr" rid="ref2">2</xref>
        ], Storm [
        <xref ref-type="bibr" rid="ref3">3</xref>
        ],
Heron [
        <xref ref-type="bibr" rid="ref4">4</xref>
        ], are able to provide low-latency and high-throughput
in distributed environment for this kind of problems.
However, providers of the order-sensitive computations remain
suboptimal. Most of these systems assume that events are
fed to the system with monotonically increasing timestamps
or with minor inversions. Often, such timestamps can be
assigned at system’s entry. Nevertheless, even if input items
arrive monotonically, they can be reordered because of the
subsequent parallel and asynchronous processing. In this case,
order-sensitive operations located deeper in data flow pipeline
can be broken. Figure 1 shows the example of common
distributed stream processing pipeline that breaks the input
order of the operation 2, even if inputs are monotonic and
links between operations guarantee FIFO order.
      </p>
      <p>The typical way to achieve in-order processing is to set
up a special buffer in front of operation. This buffer collects
all input items until some user-provided condition is satisfied.
Then the contents of the buffer are sorted and fed to the
operation. The main disadvantage of such technique is latency
growth. This issue becomes even more significant if the
processing pipeline contains several operations that require
ordered input. The problem here is that each buffer increases
latency, because of the additional waiting time.</p>
      <p>The alternative is to develop business logic tolerant to
outof-order items. However, this approach is suitable only for a
limited set of tasks. Moreover, it may dramatically complicate
the business-logic, which can lead to the maintenance cost
increase and is error-prone.</p>
      <p>In this paper we introduce an optimistic approach to handle
out-of-order items. Our evaluation demonstrates the low
overhead of our method. The contributions of this paper are the
following:
• Definition of new optimistic technique to handle
outof-order items in stateful operations that requires single
buffer per computational pipeline
• Analysis of properties of this approach
• Demonstration of working example that applies proposed
method</p>
      <p>The rest of the paper is structured as follows: in section II
we formalize the preliminaries of stream processing, the
examples of tasks that require ordered input are described in
section III, the typical approaches for handling out-of-order
events are discussed in IV, our optimistic technique is detailed
in V and its performance is demonstrated in VI, the main
differences between proposed method and existing ones are
shown in VII, finally we discuss the results and our plans
in VIII.</p>
    </sec>
    <sec id="sec-2">
      <title>II. STREAM PROCESSING CONCEPTS</title>
      <sec id="sec-2-1">
        <title>D. Physical deployment</title>
        <p>In this section we define some preliminaries for distributed
stream processing. It allows us to unify required terms and
to introduce definitions, which are needed for the subsequent
statements.</p>
        <p>In this paper a stream processing system is considered as
a shared-nothing distributed runtime. It handles input items
and processes them one-by-one according to user-provided
logic. It is able to handle a potentially unlimited number of
items. The main requirement of this kind of data processing
systems is to provide low latency between event occurrence
and its processing under a fixed load. The term distributed
implies that user’s procedures can be partitioned into distinct
computational units or shards. The following subsections detail
the properties of such systems more precisely.</p>
      </sec>
      <sec id="sec-2-2">
        <title>A. Data flow</title>
        <p>The basic data flow abstraction is a stream. The stream
is an infinite sequence of heterogeneous data items. Each
data item contains a payload defined by a user. Besides, it
can be accompanied by meta-information. Typically,
metainformation is used to define an order on data items. For
instance, it can be represented as a UNIX timestamp in
milliseconds.</p>
      </sec>
      <sec id="sec-2-3">
        <title>B. Computation flow</title>
        <p>Commonly, the computational pipeline is defined in the
form of logical graph. The vertices of the logical graph are
operations, and the edges are links between them. Logical
graph defines only relations between operations, but it does
not describe the physical deployment. The logical graph for
the pipeline shown in Figure 1 is presented in Figure 2</p>
      </sec>
      <sec id="sec-2-4">
        <title>C. Operations</title>
        <p>There are two main types of streaming operations: stateless
and stateful. Stateless operations do not need any knowledge
about past inputs to process current one correctly. A simple
illustration is a map operation that multiplies by two any
numeric input item’s payload. On the other hand, stateful
operations are able to keep some aggregations or summaries
of received events. In such case, the output of the operation
depends not only on the input but also on its current state. As
an example, one can define an operation that sums all previous
items with numerical payloads.</p>
        <p>As mentioned above, each operation can be partitioned
between multiple computational units. Data items can be
balanced between partitions by key extracted from an item’s
payload for stateful operations. For stateless operations items
can be balanced randomly. The schema of physical partitioning
of operations is sometimes called physical graph. Regarding
physical links between operations, in the most cases, it is
assumed that they guarantee FIFO order.</p>
      </sec>
      <sec id="sec-2-5">
        <title>E. Guarantees</title>
        <p>Recognized important property of stream processing
systems is the type of guarantees it provides in case of failures.
There are three main types of such guarantees. At most once
semantics states that each input event is processed once or
not processed at all. At least once guarantees that each input
item is processed, but possibly multiple times, that can lead to
result duplication. Exactly once semantics guarantee that each
input event is processed exactly one time.</p>
      </sec>
    </sec>
    <sec id="sec-3">
      <title>III. TASKS THAT REQUIRE IN-ORDER INPUT</title>
      <p>In this section we outline common problems that require
the order persistence of input items and describe a couple of
computation scenarios, which can be found in many real-life
projects.</p>
      <sec id="sec-3-1">
        <title>A. Tasks requiring complete event retrieval</title>
        <p>The processing of the single event could be split into
multiple independent parts that are executed in parallel. After
execution finishes, the results must be combined into a single
cumulative item. This task could be naturally implemented
using order guarantees: the final part of the task could be
flagged and receiving the flagged result guarantees that the
rest of the operation is completed. Unfortunately, as it was
shown in Figure 1, independent processing via different paths
can lead to reordering.</p>
        <p>As an example, we can mention the computation of inverted
index. Pipeline shown in Figure 1 can be applied for the task.
In this case, operation 1 accepts documents from the input and
for each word produces corresponding positions. Operation 2
receives pairs of word and positions and computes changelog
of the inverted index for each word. In order to produce
changes for each document in the form of single update, there
is a need for retrieval all changelogs regarding the document.</p>
      </sec>
      <sec id="sec-3-2">
        <title>B. Tasks that depend on the order of input events</title>
        <p>This class includes all non-commutative operations. Such
tasks strictly require the order of input items, because there
are no any other methods to compute a valid result.</p>
        <p>Generally, this class of examples includes all windowed
aggregations. Moving average calculation over a numerical
stream is a typical case. Even if values inside window could
be arbitrary reordered, the order between windows is required
to ensure that incomplete windows are not produced.</p>
      </sec>
    </sec>
    <sec id="sec-4">
      <title>IV. EXISTING SOLUTIONS There are two most common methods that are used to implement order-sensitive operators: in-order processing (IOP) [5], [6], [7] and out-of-order processing (OOP) [8].</title>
      <sec id="sec-4-1">
        <title>A. In-order processing</title>
        <p>
          According to IOP approach, each operation must enforce
the total order on output elements that can be violated due
to asynchronous nature of execution. Buffering is usually
used to fulfill this requirement. Figure 3 shows the union
operation that combines multiple streams into the single one.
Both input streams are ordered, as predecessors must meet
ordering constraint. Nevertheless, if there is arrival time skew
between input streams, the union must buffer the earlier stream
to produce ordered output. It is known that IOP is memory
demanding and has unpredictable latencies and limited
scalability [
          <xref ref-type="bibr" rid="ref8">8</xref>
          ].
        </p>
      </sec>
      <sec id="sec-4-2">
        <title>B. Out-of-order processing</title>
        <p>
          OOP is an approach that does not require order maintenance
if it is not needed. In the case of ordering requirements, OOP
buffers input items until a special condition is satisfied. This
condition is supported by progress indicators such as
punctuations [
          <xref ref-type="bibr" rid="ref9">9</xref>
          ], low watermarks [
          <xref ref-type="bibr" rid="ref10">10</xref>
          ], or heartbeats [
          <xref ref-type="bibr" rid="ref11">11</xref>
          ]. They go
along the stream as ordinal items, but do not trigger
businesslogic of the operations. Each progress indicator carries
metainformation and promises that there are no any elements
with lesser meta-information. Therefore, indicators must be
monotonic, but data items between two consecutive indicators
can be arbitrarily reordered. Data sources periodically yield
them.
        </p>
        <p>A timed window operation can be mentioned as an example
of OOP approach. A window operation buffers partial results
until a progress indicator arrives. After that, the window
flushes corresponding buffers and propagates the indicator to
the next operation down the stream.</p>
        <p>OOP addresses the downsides of IOP, but the direct
implementation has flaws too. Even if the input stream is totally
ordered, the operation must wait for the progress indicator.
Figure 4 illustrates such case. Bottom window is complete
but must be blocked until the indicator for the item 11 arrives.
Another issue of OOP is that periodical flushes can result in
load bursts and an increase in latency.</p>
      </sec>
    </sec>
    <sec id="sec-5">
      <title>V. OPTIMISTIC APPROACH</title>
      <p>The main idea of our method is to represent stateful
transformations as a sequence of a map and windowed grouping
operations and handle out-of-order items within them.</p>
      <p>Following our approach, we make some assumptions about
stream processing system, that is suitable for applying it. Such
system must support meta-information on data items, allow
cycles in the logical graph, and its set of predefined operations
must be sufficient to express map and windowed grouping
operations. Additionally, OOP indicators should be supported.</p>
      <p>The ordering model of data items is defined at the beginning
of this section. Then, we show that any stateful transformation
can be implemented using the combination of windowed
grouping and map operations. After that, we demonstrate an
optimistic approach to handle out-of-order items within these
operations. At the end of the section, the limitations of such
technique are discussed.</p>
      <sec id="sec-5-1">
        <title>A. Ordering model</title>
        <p>We assume that there is a total order on data items.
If there are multiple data sources and there is no natural
ordering between them, the source id can be used in addition
to the locally assigned timestamp. The items are compared
lexicographically: timestamps first, then source id. Ordering
is preserved when an item is going through the operations.
More precisely, the order of output items is the same as the
order of corresponding input items. If more than one item
is generated, they are inserted in output stream sequentially.
Moreover, the output follows corresponding input but precedes
the next item. The ordering model is shown in Figure 5. F (x)
is an arbitrary transformation. Replication and union are used
to inject original unmodified items into the resulting stream to
show the order between items.</p>
        <p>B. Semantics of map and windowed grouping operations
1) Map: Map transforms input item into a sequence of
its derivatives, according to a user-provided function f . This
sequence can consist of any number of items or even be empty.</p>
        <p>2) Windowed grouping: Windowed grouping is a stateful
operation with a numeric parameter window. It is supposed
that payloads of input items of grouping operation have
keyvalue form. The state of this operation is represented by a set
of buckets, one for each key.</p>
        <p>Windowed grouping has the following semantics:</p>
        <p>The following example illustrates the semantics of the
windowed grouping operation. In this example, payloads of
input items are represented as natural numbers: 1, 2, 3, etc. The
hash function returns 1 if the number is even and 0 otherwise.
If the window is set to 3, the output is:</p>
        <p>(1), (2), (1|3), (2|4), (1|3|5), (2|4|6), (3|5|7), (4|6|8)...</p>
        <p>The special case of grouping with window = 2 in
conjunction with a stateless map is used to implement arbitrary
stateful transformation.</p>
      </sec>
      <sec id="sec-5-2">
        <title>C. Stateful transformations using defined operations</title>
        <p>Figure 6 shows the part of the logical pipeline, that can
be used for stateful transformation. The input of windowed
grouping operation is supposed to be ordered. There are
several functional steps to produce output and update state.
There are two cases of these steps:
•</p>
        <p>When the first item arrives at grouping, it is inserted
into the empty bucket. The grouping outputs
singleelement tuple, and then it is sent to the combined map.
Combined map generates state object and sends it back
to the grouping in the form of an ordinal key-value data
item. The key of the state item is the same as in the
item in tuple and value is the state. Combined map can
generate some additional output and send it further down
the stream.
•
•
•
•
•</p>
        <p>When the state item arrives at grouping, it is inserted in
the tail of the corresponding bucket after the item that
triggers state updating. The ordering model guarantees
that the state item would be processed before the next
item. Grouping outputs tuple with this item and the state.
However, combine map filters out such tuple, because its
last element is the state. This fact implies that the state
has been already combined with the first item in the tuple.
When new regular input item arrives at windowed
grouping, it is inserted into the corresponding bucket’s tail,
because of the order assumptions. Additionally, the right
ordering guarantees that input item is grouped into the
tuple with previously generated state item. The next map
operation combines new item and previous state into the
new state item. After that, the new state item is returned
to the grouping through the cycle. As in the first case,
combined map can generate some additional output.
The example of square matrix multiplication within
proposed approach is shown in Figure 7. In this example, input
items are represented as key-value pairs, where the key is the
dimension of a matrix, and the value is the matrix itself. The
reaction on three input matrices are the following:
When the first matrix A arrives at grouping, it is put
into the empty bucket for 3x3 matrices. After that, the
single-element tuple with matrix A is sent to combine map
operation. Combine map creates state object for matrix
A, which is just A itself. In the last step, state item is sent
back to grouping, and it is inserted right after item for
matrix A
Matrix B is arrived and inserted into the bucket right
after state item. The tuple containing state item and item
for matrix B is sent to combine map. Combine map
multiplies matrix in the state by matrix B. The result of
this operation is matrix AB. New state item for matrix AB
is created and sent back to the grouping. It is inserted in
bucket right after item with matrix B
Matrix C is arrived and went through the pipeline in a
similar way as matrix B</p>
      </sec>
      <sec id="sec-5-3">
        <title>D. Handling out-of-order events</title>
        <p>When we introduce the model for stateful operations, we
assume that all items arrive at grouping in the right order.
However, as it was shown above, it is not possible in
practice without additional buffering. We propose the following
approach to handle events in grouping:
• If an item arrives in order, it is processed as usual
• If two items are out-of-order, and the grouping observes
the second one then it is inserted into the corresponding
bucket at the position defined by the meta-information.
After that, tuples, which contain new item are generated
and sent further down the stream. At the same time,
for tuples, that has been produced but became invalid,
tombstones are sent</p>
        <p>Tombstones are ordinal data items but with a special flag in
meta-information. This flag means that tuples with such
metainformation are invalid, and they should not leave the system.
Tombstones have the same payloads as invalid items in order
to go through the same path in the computational pipeline.</p>
        <p>The pseudocode of the grouping is shown in Algorithm 2.
The functions accept new element, depending on its type. They
also receive a bucket for element’s hash. Emit function is called
to send new items downstream.</p>
        <p>This technique guarantees that all correct tuples are
eventually produced. However, invalid ones are also generated.
1: function INSERTTOMBSTONE(item, bucket)
2: cleanP osition ← lowerBound(item, bucket)
3: for all group : group ∩ cleanP osition 6= ∅ do
4: . Send tombstones for invalid groups
5: EMIT(new DataItemtomb(group))
6: end for
7: REMOVE(bucket, cleanP osition)
8: for all group : group ∩ cleanP osition 6= ∅ do
9: . Emit new groups that appeared after collapse
10: EMIT(new DataItem(group))
11: end for
12: end function
13:
14: function INSERTREGULAR(item, bucket)
15: insertP osition ← lowerBound(item, bucket)
16: for all group : group ∩ insertP osition 6= ∅ do
17: . Send tombstones for groups that would disappear
18: . after insert
19: EMIT(new DataItemtomb(group))
20: end for
21: INSERT(bucket, insertP osition)
22: for all group : group ∩ insertP osition 6= ∅ do
23: . Emit new groups
24: EMIT(new DataItem(group))
25: end for
26: end function
Therefore, there is a need for barrier at the pipeline’s sink, that
filters invalid items when corresponding tombstones arrive.
The barrier is partially flushed for some meta-information
interval when there is a guarantee that there are no any
outof-order items and tombstones further up the stream for this
range. This guarantee can be provided by punctuations or low
watermarks, as it is implemented in the most stream processing
systems. The fundamental idea behind this approach is to shift
blocking as far as possible down the stream. Notably, this is
the only buffer in the whole system, unlike existing solutions.</p>
        <p>
          The pseudocode for the barrier is shown in Algorithm 3.
The function Insert is called by the system on the new item’s
arrival. P unctuation is called when there is a guarantee that
there are no tombostones up the stream with the specified
time. The OOP architecture [
          <xref ref-type="bibr" rid="ref8">8</xref>
          ] can be employed to provide
such guarantee.
        </p>
      </sec>
      <sec id="sec-5-4">
        <title>E. Advantages and limitations</title>
        <p>The proposed architecture’s performance depends on how
often reorderings are observed during the runtime. In the
case when the order naturally preserved there is almost no
overhead: when the watermark arrives, all computations are
already done. The probability of reordering could be managed
on a business-logic level and optimized by the developer. In
experiments section it is shown that the computational nodes
count is one of such parameters.
Algorithm 3 Barrier</p>
        <p>Regarding the weaknesses, this method can generate
additional items, which lead to extra network traffic and
computations. Experiments, which are shown in the section VI
demonstrate that the number of extra items is low.</p>
      </sec>
    </sec>
    <sec id="sec-6">
      <title>VI. EXPERIMENTS</title>
      <sec id="sec-6-1">
        <title>A. Setup</title>
        <p>We performed the series of experiments to estimate the
performance of our system prototype. As a stream processing
task, we apply building an inverted index. This task is chosen
because it has the following properties:
1) Task requires stateful operations. It allows us to check
the performance of the proposed stateful pipeline
2) Computational flow of the task contains network shuffle
that can violate the ordering constraints of some
operations. Therefore, inverted index task can verify the
performance of our optimistic approach
3) The load distribution is skewed, because of Zipf’s law
These properties make the task sufficient to
comprehensively analyze the performance of the proposed solution.
Building inverted index can be considered as the halfway
task between documents generation and searching. In the
realworld, such scenario can be found in freshness-aware systems,
e.g., news processing engines.</p>
        <p>The logical pipeline of this computation is shown in Figure
8. First map operation accepts Wikipedia documents and
outputs pairs of words and corresponding positions. The next
part of the pipeline accepts pairs of word and positions and
computes updated posting list and the actual changelog.</p>
        <p>This stateful transformation is implemented in the form of
groping and map operation with a cycle, as it was shown
in the previous section. Regarding the physical deployment,
the full logical graph is deployed on each computational unit
or worker. Documents are randomly shuffled before the first
map operation. Word positions are partitioned by word before
grouping. The other links are implemented as simple chain
calls.</p>
        <p>Our experiments were performed on clusters of 10 nodes.
Each node is an AWS EC2 micro instance with 1GB RAM
and 1 core CPU.</p>
      </sec>
      <sec id="sec-6-2">
        <title>B. Overhead and scalability</title>
        <p>As a key metric in our experiment, we take the ratio of
arrived at the barrier items count to the number of the valid
items among them. This value clearly represents the overhead
of our approach, as it was mentioned at the end of the previous
section.</p>
        <p>The relation between the number of workers, the delay
between input documents and the proposed ratio is shown
in Figure 9. As expected, the peak of the ratio is achieved
when the document per second rate is high, and the number of
the nodes is low. This behavior can be explained by the fact
that a few workers cannot effectively deal with such intensive
load. Nevertheless, the proportion of invalid items reduces with
the increase of workers number. Under non-extreme load, the
total overhead of the optimistic approach is under 10% for
all considered number of workers. These results confirm that
the ratio does not increase with the growth of the number of
nodes.</p>
        <p>Therefore, the most important conclusions of the
experiments are: the proposed method is scalable, the overhead could
be optimized by system setup.</p>
      </sec>
    </sec>
    <sec id="sec-7">
      <title>VII. RELATED WORK Research works on this topic analyze different methods of handling out-of-order items. Most of them are based on buffering.</title>
      <p>
        K-slack technique can be applied, if network delay is
predictable [
        <xref ref-type="bibr" rid="ref12">12</xref>
        ], [
        <xref ref-type="bibr" rid="ref13">13</xref>
        ]. The key idea of the method is the
assumption that an event can be delayed for at most K time
units. Such assumption can reduce the size of the buffer.
However, in the real-life applications, it is very uncommon
to have any reliable predictions about the network delay.
1.4
1.3
1.2
1.1
o
ati
r
      </p>
      <p>
        IOP and OOP architectures, that are mentioned in the
section IV, are popular within research works and industrial
applications. IOP architecture is applied in [
        <xref ref-type="bibr" rid="ref6">6</xref>
        ], [
        <xref ref-type="bibr" rid="ref14">14</xref>
        ], [
        <xref ref-type="bibr" rid="ref5">5</xref>
        ], [
        <xref ref-type="bibr" rid="ref15">15</xref>
        ],
[16], [17]. OOP approach is introduced in [
        <xref ref-type="bibr" rid="ref8">8</xref>
        ] and it is widely
used in the industrial stream processing systems, for instance,
Flink [
        <xref ref-type="bibr" rid="ref1">1</xref>
        ] and Millwheel [
        <xref ref-type="bibr" rid="ref10">10</xref>
        ].
      </p>
      <p>Optimistic techniques are less covered in literature. In [18]
so-called aggressive approach is proposed. This approach
uses the idea that operation can immediately output
insertion message on the first input. After that, if that message
became invalid, because of the arrival of out-of-order items,
an operation can send deletion message to remove the previous
result and then send new insertion item. The idea of deletion
messages is very similar to our tombstone items. However,
authors describe their idea in an abstract way and do not
provide any techniques to apply their method for arbitrary
operations.</p>
      <p>Yet another optimistic strategy is detailed in [19]. This
method is probabilistic: it guarantees the right order with some
probability. Besides, it supports only the limited number of
query operators.</p>
      <p>VIII. CONSLUSION</p>
      <p>In this paper we introduce an optimistic approach for
handling out-of-order events. Our technique has the following
key properties:
• It does not require buffering before each order-sensitive
operation
• The method handles properly any stateful operation
• The overhead of the proposed approach is low (under
10% in most of our experiments)
• The total overhead could be managed by optimization of
the computational layout</p>
      <p>The optimistic nature of this method is able to help to
reduce the cost of waiting for punctuations or watermarks. It
is implied by the fact, that at the moment when the watermark
arrives all computations are already done.</p>
      <p>The experiments show that the number of the extra items
does not increase with the growth of the number of the
computational units. Therefore, this approach can potentially
provide lower latency in stream processing systems.
[16] M. A. Hammad, M. J. Franklin, W. G. Aref, and A. K. Elmagarmid,
“Scheduling for shared window joins over data streams,” in Proceedings
of the 29th International Conference on Very Large Data Bases
Volume 29, ser. VLDB ’03. VLDB Endowment, 2003, pp. 297–308.
[Online]. Available: http://dl.acm.org/citation.cfm?id=1315451.1315478
[17] M. A. Hammad, W. G. Aref, and A. K. Elmagarmid, “Optimizing
in-order execution of continuous queries over streamed sensor data,”
in Proceedings of the 17th International Conference on Scientific
and Statistical Database Management, ser. SSDBM’2005. Berkeley,
CA, US: Lawrence Berkeley Laboratory, 2005, pp. 143–146. [Online].</p>
      <p>Available: http://dl.acm.org/citation.cfm?id=1116877.1116897
[18] M. Wei, M. Liu, M. Li, D. Golovnya, E. A. Rundensteiner, and
K. Claypool, “Supporting a spectrum of out-of-order event processing
technologies: From aggressive to conservative methodologies,” in Proc.
of the 2009 ACM SIGMOD Intnl. Conf. on Management of Data, ser.</p>
      <p>SIGMOD ’09. New York, NY, USA: ACM, 2009, pp. 1031–1034.
[19] C.-W. Li, Y. Gu, G. Yu, and B. Hong, “Aggressive complex event
processing with confidence over out-of-order streams,” Journal of
Computer Science and Technology, vol. 26, no. 4, pp. 685–696, Jul
2011. [Online]. Available: https://doi.org/10.1007/s11390-011-1168-x</p>
    </sec>
  </body>
  <back>
    <ref-list>
      <ref id="ref1">
        <mixed-citation>
          [1]
          <string-name>
            <given-names>P.</given-names>
            <surname>Carbone</surname>
          </string-name>
          ,
          <string-name>
            <given-names>A.</given-names>
            <surname>Katsifodimos</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Ewen</surname>
          </string-name>
          ,
          <string-name>
            <given-names>V.</given-names>
            <surname>Markl</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Haridi</surname>
          </string-name>
          , and
          <string-name>
            <given-names>K.</given-names>
            <surname>Tzoumas</surname>
          </string-name>
          , “
          <article-title>Apache flink: Stream and batch processing in a single engine,”</article-title>
          <source>Bulletin of the IEEE Computer Society Technical Committee on Data Engineering</source>
          , vol.
          <volume>36</volume>
          , no.
          <issue>4</issue>
          ,
          <year>2015</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref2">
        <mixed-citation>
          [2]
          <string-name>
            <given-names>S. A.</given-names>
            <surname>Noghabi</surname>
          </string-name>
          ,
          <string-name>
            <given-names>K.</given-names>
            <surname>Paramasivam</surname>
          </string-name>
          ,
          <string-name>
            <given-names>Y.</given-names>
            <surname>Pan</surname>
          </string-name>
          ,
          <string-name>
            <given-names>N.</given-names>
            <surname>Ramesh</surname>
          </string-name>
          ,
          <string-name>
            <given-names>J.</given-names>
            <surname>Bringhurst</surname>
          </string-name>
          ,
          <string-name>
            <surname>I. Gupta</surname>
          </string-name>
          , and
          <string-name>
            <given-names>R. H.</given-names>
            <surname>Campbell</surname>
          </string-name>
          , “Samza: Stateful scalable stream processing at linkedin,
          <source>” Proc. VLDB Endow.</source>
          , vol.
          <volume>10</volume>
          , no.
          <issue>12</issue>
          , pp.
          <fpage>1634</fpage>
          -
          <lpage>1645</lpage>
          , Aug.
          <year>2017</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref3">
        <mixed-citation>
          [3] (
          <year>2017</year>
          , Oct.)
          <article-title>Apache storm</article-title>
          . [Online]. Available: http://storm.apache.org/
        </mixed-citation>
      </ref>
      <ref id="ref4">
        <mixed-citation>
          [4]
          <string-name>
            <given-names>S.</given-names>
            <surname>Kulkarni</surname>
          </string-name>
          ,
          <string-name>
            <given-names>N.</given-names>
            <surname>Bhagat</surname>
          </string-name>
          ,
          <string-name>
            <given-names>M.</given-names>
            <surname>Fu</surname>
          </string-name>
          ,
          <string-name>
            <given-names>V.</given-names>
            <surname>Kedigehalli</surname>
          </string-name>
          ,
          <string-name>
            <given-names>C.</given-names>
            <surname>Kellogg</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Mittal</surname>
          </string-name>
          ,
          <string-name>
            <given-names>J. M.</given-names>
            <surname>Patel</surname>
          </string-name>
          ,
          <string-name>
            <given-names>K.</given-names>
            <surname>Ramasamy</surname>
          </string-name>
          , and
          <string-name>
            <given-names>S.</given-names>
            <surname>Taneja</surname>
          </string-name>
          , “Twitter heron: Stream processing at scale,”
          <source>in Proc. of the 2015 ACM SIGMOD Intnl. Conf. on Management of Data, ser. SIGMOD '15</source>
          . New York, NY, USA: ACM,
          <year>2015</year>
          , pp.
          <fpage>239</fpage>
          -
          <lpage>250</lpage>
          .
        </mixed-citation>
      </ref>
      <ref id="ref5">
        <mixed-citation>
          [5]
          <string-name>
            <given-names>A.</given-names>
            <surname>Arasu</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Babu</surname>
          </string-name>
          , and
          <string-name>
            <given-names>J.</given-names>
            <surname>Widom</surname>
          </string-name>
          , “
          <article-title>The cql continuous query language: Semantic foundations and query execution,” The VLDB Journal</article-title>
          , vol.
          <volume>15</volume>
          , no.
          <issue>2</issue>
          , pp.
          <fpage>121</fpage>
          -
          <lpage>142</lpage>
          , Jun.
          <year>2006</year>
          . [Online]. Available: http://dx.doi.org/10.1007/s00778-004-0147-z
        </mixed-citation>
      </ref>
      <ref id="ref6">
        <mixed-citation>
          [6]
          <string-name>
            <given-names>C.</given-names>
            <surname>Cranor</surname>
          </string-name>
          ,
          <string-name>
            <given-names>T.</given-names>
            <surname>Johnson</surname>
          </string-name>
          ,
          <string-name>
            <given-names>O.</given-names>
            <surname>Spataschek</surname>
          </string-name>
          , and
          <string-name>
            <given-names>V.</given-names>
            <surname>Shkapenyuk</surname>
          </string-name>
          , “
          <article-title>Gigascope: A stream database for network applications,” in Proceedings of the 2003 ACM SIGMOD International Conference on Management of Data, ser</article-title>
          .
          <source>SIGMOD '03</source>
          . New York, NY, USA: ACM,
          <year>2003</year>
          , pp.
          <fpage>647</fpage>
          -
          <lpage>651</lpage>
          . [Online]. Available: http://doi.acm.
          <source>org/10</source>
          .1145/872757.872838
        </mixed-citation>
      </ref>
      <ref id="ref7">
        <mixed-citation>
          [7]
          <string-name>
            <given-names>M.</given-names>
            <surname>Hammad</surname>
          </string-name>
          ,
          <string-name>
            <given-names>W.</given-names>
            <surname>Aref</surname>
          </string-name>
          ,
          <article-title>and</article-title>
          <string-name>
            <given-names>A.</given-names>
            <surname>Elmagarmid</surname>
          </string-name>
          , “
          <article-title>Optimizing in-order execution of continuous queries over streamed sensor data</article-title>
          ,”
          <year>2004</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref8">
        <mixed-citation>
          [8]
          <string-name>
            <given-names>J.</given-names>
            <surname>Li</surname>
          </string-name>
          ,
          <string-name>
            <given-names>K.</given-names>
            <surname>Tufte</surname>
          </string-name>
          ,
          <string-name>
            <given-names>V.</given-names>
            <surname>Shkapenyuk</surname>
          </string-name>
          ,
          <string-name>
            <given-names>V.</given-names>
            <surname>Papadimos</surname>
          </string-name>
          , T. Johnson, and
          <string-name>
            <given-names>D.</given-names>
            <surname>Maier</surname>
          </string-name>
          , “
          <article-title>Out-of-order processing: A new architecture for high-performance stream systems</article-title>
          ,
          <source>” Proc. VLDB Endow.</source>
          , vol.
          <volume>1</volume>
          , no.
          <issue>1</issue>
          , pp.
          <fpage>274</fpage>
          -
          <lpage>288</lpage>
          , Aug.
          <year>2008</year>
          . [Online]. Available: http://dx.doi.org/10.14778/1453856.1453890
        </mixed-citation>
      </ref>
      <ref id="ref9">
        <mixed-citation>
          [9]
          <string-name>
            <given-names>P. A.</given-names>
            <surname>Tucker</surname>
          </string-name>
          ,
          <string-name>
            <given-names>D.</given-names>
            <surname>Maier</surname>
          </string-name>
          ,
          <string-name>
            <given-names>T.</given-names>
            <surname>Sheard</surname>
          </string-name>
          , and L. Fegaras, “
          <article-title>Exploiting punctuation semantics in continuous data streams,”</article-title>
          <source>IEEE Trans. on Knowl. and Data Eng.</source>
          , vol.
          <volume>15</volume>
          , no.
          <issue>3</issue>
          , pp.
          <fpage>555</fpage>
          -
          <lpage>568</lpage>
          , Mar.
          <year>2003</year>
          . [Online]. Available: http://dx.doi.org/10.1109/TKDE.
          <year>2003</year>
          .1198390
        </mixed-citation>
      </ref>
      <ref id="ref10">
        <mixed-citation>
          [10]
          <string-name>
            <given-names>T.</given-names>
            <surname>Akidau</surname>
          </string-name>
          ,
          <string-name>
            <given-names>A.</given-names>
            <surname>Balikov</surname>
          </string-name>
          , K. Bekirog˘lu, S. Chernyak,
          <string-name>
            <given-names>J.</given-names>
            <surname>Haberman</surname>
          </string-name>
          ,
          <string-name>
            <given-names>R.</given-names>
            <surname>Lax</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>McVeety</surname>
          </string-name>
          ,
          <string-name>
            <given-names>D.</given-names>
            <surname>Mills</surname>
          </string-name>
          ,
          <string-name>
            <given-names>P.</given-names>
            <surname>Nordstrom</surname>
          </string-name>
          , and
          <string-name>
            <given-names>S.</given-names>
            <surname>Whittle</surname>
          </string-name>
          , “Millwheel: Faulttolerant stream processing at internet scale,
          <source>” Proc. VLDB</source>
          , vol.
          <volume>6</volume>
          , no.
          <issue>11</issue>
          , pp.
          <fpage>1033</fpage>
          -
          <lpage>1044</lpage>
          , Aug.
          <year>2013</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref11">
        <mixed-citation>
          [11]
          <string-name>
            <given-names>U.</given-names>
            <surname>Srivastava</surname>
          </string-name>
          and
          <string-name>
            <given-names>J.</given-names>
            <surname>Widom</surname>
          </string-name>
          , “
          <article-title>Flexible time management in data stream systems,” in Proc. PODS, ser</article-title>
          .
          <source>PODS '04</source>
          . New York, NY, USA: ACM,
          <year>2004</year>
          , pp.
          <fpage>263</fpage>
          -
          <lpage>274</lpage>
          . [Online]. Available: http://doi.acm.
          <source>org/10</source>
          .1145/1055558.1055596
        </mixed-citation>
      </ref>
      <ref id="ref12">
        <mixed-citation>
          [12]
          <string-name>
            <given-names>S.</given-names>
            <surname>Babu</surname>
          </string-name>
          ,
          <string-name>
            <given-names>U.</given-names>
            <surname>Srivastava</surname>
          </string-name>
          , and
          <string-name>
            <given-names>J.</given-names>
            <surname>Widom</surname>
          </string-name>
          , “
          <article-title>Exploiting k-constraints to reduce memory overhead in continuous queries over data streams,” ACM Trans</article-title>
          . Database Syst., vol.
          <volume>29</volume>
          , no.
          <issue>3</issue>
          , pp.
          <fpage>545</fpage>
          -
          <lpage>580</lpage>
          , Sep.
          <year>2004</year>
          . [Online]. Available: http://doi.acm.
          <source>org/10</source>
          .1145/1016028.1016032
        </mixed-citation>
      </ref>
      <ref id="ref13">
        <mixed-citation>
          [13]
          <string-name>
            <given-names>M.</given-names>
            <surname>Li</surname>
          </string-name>
          ,
          <string-name>
            <given-names>M.</given-names>
            <surname>Liu</surname>
          </string-name>
          ,
          <string-name>
            <given-names>L.</given-names>
            <surname>Ding</surname>
          </string-name>
          ,
          <string-name>
            <given-names>E. A.</given-names>
            <surname>Rundensteiner</surname>
          </string-name>
          , and
          <string-name>
            <given-names>M.</given-names>
            <surname>Mani</surname>
          </string-name>
          , “
          <article-title>Event stream processing with out-of-order data arrival,”</article-title>
          <source>in Proceedings of the 27th International Conference on Distributed Computing Systems Workshops, ser. ICDCSW '07</source>
          . Washington, DC, USA: IEEE Computer Society,
          <year>2007</year>
          , pp.
          <fpage>67</fpage>
          -. [Online]. Available: http://dx.doi.org/10.1109/ICDCSW.
          <year>2007</year>
          .35
        </mixed-citation>
      </ref>
      <ref id="ref14">
        <mixed-citation>
          [14]
          <string-name>
            <given-names>D. J.</given-names>
            <surname>Abadi</surname>
          </string-name>
          ,
          <string-name>
            <given-names>D.</given-names>
            <surname>Carney</surname>
          </string-name>
          , U. C¸etintemel, M. Cherniack,
          <string-name>
            <given-names>C.</given-names>
            <surname>Convey</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Lee</surname>
          </string-name>
          ,
          <string-name>
            <given-names>M.</given-names>
            <surname>Stonebraker</surname>
          </string-name>
          ,
          <string-name>
            <given-names>N.</given-names>
            <surname>Tatbul</surname>
          </string-name>
          , and
          <string-name>
            <given-names>S.</given-names>
            <surname>Zdonik</surname>
          </string-name>
          , “
          <article-title>Aurora: A new model and architecture for data stream management,”</article-title>
          <source>The VLDB Journal</source>
          , vol.
          <volume>12</volume>
          , no.
          <issue>2</issue>
          , pp.
          <fpage>120</fpage>
          -
          <lpage>139</lpage>
          , Aug.
          <year>2003</year>
          . [Online]. Available: http://dx.doi.org/10.1007/s00778-003-0095-z
        </mixed-citation>
      </ref>
      <ref id="ref15">
        <mixed-citation>
          [15]
          <string-name>
            <given-names>L.</given-names>
            <surname>Ding</surname>
          </string-name>
          and
          <string-name>
            <given-names>E. A.</given-names>
            <surname>Rundensteiner</surname>
          </string-name>
          , “
          <article-title>Evaluating window joins over punctuated streams,” in Proceedings of the Thirteenth ACM International Conference on Information and Knowledge Management, ser</article-title>
          .
          <source>CIKM '04</source>
          . New York, NY, USA: ACM,
          <year>2004</year>
          , pp.
          <fpage>98</fpage>
          -
          <lpage>107</lpage>
          . [Online]. Available: http://doi.acm.
          <source>org/10</source>
          .1145/1031171.1031189
        </mixed-citation>
      </ref>
    </ref-list>
  </back>
</article>