=Paper= {{Paper |id=Vol-2322/DARLIAP_10 |storemode=property |title=Time Series Similarity Search for Streaming Data in Distributed Systems |pdfUrl=https://ceur-ws.org/Vol-2322/DARLIAP_10.pdf |volume=Vol-2322 |authors=Ariane Ziehn,Marcela Charfuelan,Holmer Hemsen,Volker Markl |dblpUrl=https://dblp.org/rec/conf/edbt/ZiehnCHM19 }} ==Time Series Similarity Search for Streaming Data in Distributed Systems== https://ceur-ws.org/Vol-2322/DARLIAP_10.pdf
              Time Series Similarity Search for Streaming Data in
                             Distributed Systems
                              Ariane Ziehn                                                       Marcela Charfuelan
                          DFKI GmbH, Germany                                                    DFKI GmbH, Germany
                          ariane.ziehn@dfki.de                                                marcela.charfuelan@dfki.de

                            Holmer Hemsen                                                             Volker Markl
                          DFKI GmbH, Germany                                             DFKI GmbH and TU Berlin, Germany
                         holmer.hemsen@dfki.de                                                 volker.markl@dfki.de
ABSTRACT                                                                        ordering and partitioning. These aspects are of particular impor-
In this paper we propose a practical study and demonstration                    tance for time series processing of the stream data in parallel jobs.
of time series similarity search in modern distributed data pro-                Current approaches on distance calculations between two time
cessing platforms for stream data. After an intensive literature                series assume that the data is always ordered, according to the
review, we implement a flexible similarity search application in                time it is produced [1, 12]. This can not be assumed in distributed
Apache Flink, which includes the most commonly used distance                    systems, where data can reach the system with delays or in the
measurements: Euclidean distance and Dynamic Time Warping.                      worst case samples might be lost. Additionally, the usage of a
For efficient and accurate similarity search we evaluate normal-                distributed stream processing approach implies partitioning of
ization and pruning techniques developed for single machine                     the task into independent subtasks for distributed and parallel
processing and demonstrate that they can be adapted and lever-                  computing.
aged for those distributed platforms. Our final implementation is                  In the remainder of this paper, we present first of all the state-
capable of monitoring many time series in real-time and parallel.               of-the-art by exploring related work in Section 2. Afterwards, the
Further, we demonstrate that the number of required parame-                     basic concepts for similarity search are summarized in Section 3.
ters can be reduced and optimally derived from data properties.                 The necessary edits to the traditional DTW algorithm for its
We evaluate our application by comparing its performance with                   usage in distributed systems are explained in Section 4, which is
electrocardiogram data on a cluster with several nodes. We reach                followed by the experiments conducted in this work in Section 5.
average response times of less than 0,1 ms for windows of 2 s of                The conclusion in Section 6 sums up our achievements.
data, which allow fast reactions on matching sequences.
                                                                                2   RELATED WORK
1    INTRODUCTION                                                               As for many modern big data analytics applications, the under-
                                                                                lying techniques used in this paper concern several fields. The
Rakthanmanon et al. [18] demonstrate that they can find a given
                                                                                fields of main interest for our application are time series analy-
pattern in a day-long electrocardiogram (ECG) tracing in 35 s
                                                                                sis, similarity search, data streams and distributed systems. We
under Dynamic Time Warping (DTW ). Their implementation
                                                                                rarely find related work, which covers all of the four fields. One
runs on a single machine and the data is a finite batch set that
                                                                                close example is [7], where time series correlation analysis is per-
is completely stored. Batch data sets and single machines en-
                                                                                formed in the DSPS Apache Storm [5]. [7] faces similar problems
able the programmer to apply traditional optimization methods
                                                                                as we do, due to the usage of DSPS for time series analysis, but
to the algorithms, but nowadays batch data is often replaced
                                                                                considers a different analysis task. A second example is a naive
by data streams. In order to handle the increasing amount and
                                                                                distributed implementation [19] that duplicates the data to all
complex properties of data streams, open source frameworks for
                                                                                cluster nodes. On each node, the data is matched to one of the
distributed stream data processing (DSPS) have appeared. Yet,
                                                                                multiple patterns. The implementation uses no DSPS and mul-
many traditional optimization techniques can not be directly
                                                                                tiple shuffles of the same data to several workers causes heavy
applied to process data streams in distributed systems. Can we
                                                                                network traffic, if also multiple streams are analyzed. Thus, the
still efficiently find frequent patterns in new real-life situations,
                                                                                techniques of [19] can not be considered for our approach.
where the data is not stored but streamed? And how can we
                                                                                   In fact, several works already attempt time series processing
leverage traditional optimization methods in modern DSPS?
                                                                                and/or similarity search in modern DSPS, but most of them only
   In this paper we answer these questions and demonstrate that
                                                                                deal with batch instead of stream processing [1, 8]. Still, these
we can match ECG patterns with high accuracy in data streams
                                                                                works need to handle distribution and present adapted techniques
with an average response time of less than 0,1 ms per window by
                                                                                for time series processing we can use for our application.
using modern DSPS. In particular, we employ the DSPS Apache
                                                                                   Regarding similarity search in streaming on single machines,
Flink [4], which can handle massive amounts of continuous data
                                                                                the exact matching of patterns is first tried on-line in a streaming
and process them in real-time with low latency. Besides the addi-
                                                                                environment in [14]. The main focus of this approach is not only
tional monitoring effort of distributed systems, which is covered
                                                                                the pattern matching but also its maintenance (as the pattern
by the frameworks, two other aspects need special attention
                                                                                might change over time). The conclusion of this work is that
when developing streaming applications on distributed systems:
                                                                                both tasks, matching and maintenance, are feasible and can be
                                                                                implemented efficiently in streaming environments and thus, are
© 2019 Copyright held by the author(s). Published in the Workshop Proceedings   considered for our approach.
of the EDBT/ICDT 2019 Joint Conference (March 26, 2019, Lisbon, Portugal) on
CEUR-WS.org.
                                                                                Another common practice is the development and optimization
                                                                                of batch processing algorithms for single machines [12, 15, 16].
 Figure 1: Search for Q (red) in an ECG time series (blue)




But, single machines have limited scalability in terms of memory                      Figure 2: Distance matrix of DTW
and execution power, which make them cumbersome for real-
life, large scale streaming applications. Thus, these approaches
either not require low latency responses [18] or make use of           one-to-one point distance is the Euclidean distance. In compari-
specialized hardware and/or hardware optimization to guarantee         son to the one-to-many point distance DTW , the Euclidean dis-
high performance [10, 11, 13, 20]. Example optimization that can       tance is inflexible in handling shifts in time or amplitude. The
be applied to DTW are independent matrix cell [20] or combined         different behavior is visualized in Figure 2, where dark squares
normalization and distance calculations [18]. Many proposed            indicate the point-to-point comparison of the Euclidean distance
optimization made for single machines can not be efficiently           and circles symbolize the DTW warping path. Further, a distance
implemented or lead to processing delays in distributed systems        value is denoted as D(Q, S) for the cumulative distance between
e.g. due to missing shared memory. Therefore, only a few selected      the pattern Q and sequence S. For the cumulative distance over
techniques can be considered for our application.                      all ancestors until the two points qi and s j , where i and j indicate
                                                                       the index of each point in Q and S, the notation d(q  ˆ i , s j ) is used.
    A detailed overview of single machine approaches and im-
plementations for similarity search in time series analysis is         The ancestor dependencies and the distance path through the
presented in [16]. This work offers a deeper inside of common          matrix can also be seen in Figure 2. The final outputs are the
search and matching methods combined with different optimiza-          remaining minimal DTW (Q, S). Keeping in mind that the Eu-
tion proposed until 2011.                                              clidean distance (ED) is nothing else than a special case of DTW
    Similarities are matched and weighted by the usage of distance     with |Q | = |S |, both distances can be related with the triangle
measurements, which rank, how similar the compared sequences           inequality, which states that their final result has the following
are. Besides the classic Euclidean distance, DTW is an often rec-      property, proven in [15]:
ommended measurement for similarity search [3, 15, 16, 18].                                   DTW (S, Q) ≤ ED(S, Q)
Already in 1994, [2] illustrated the usage of DTW for time series,
which was previously used for speech recognition. Berndt and           The DTW distance matrix (Figure 2) is derived incrementally
Clifford [2] explain the algorithm and suggests further optimiza-      row-by-row because of the dependencies within the cells. Its
tion techniques. The most powerful pruning approach for DTW            calculation is the most expensive task of DTW [20], which is
is implemented in [18], where Rakthanmanon et al. make inten-          why heavy pruning and path restriction strategies are applied to
sive usage of a hierarchical lower bound pruning approach with         reduce computation. In order to reduce the size of our DTW dis-
increasing computational expenses to prevent unnecessary ex-           tance matrix, we use the Sakoe-Chiba Band [2, 18]. This warping
pensive full DTW calculations. This pruning approach is mainly         band restricts the warping path so that only cells, which are a
considered for our implementation.                                     certain number of cells away from the diagonals, are considered.
                                                                       Similar to the experiments made in [3], the size of our band is set,
                                                                       by default, to 10% of the query length |Q | and is further denoted
3   SIMILARITY SEARCH                                                  as wwar p .
Similarity search is a basic subtask for time series analytics such    Discovery Methods: Similarity search considers two discov-
as classification and clustering. Let us consider the ECG time         ery methods. The first one is Best Match Discovery. This method
series example [17] in Figure 1. In this figure, the red pattern (Q)   searches for the most similar sequence compared to the given
is matched twice with sequences of the blue time series. To search     pattern, the so-called best match [20]. For monitoring tasks in
for patterns in long time series, it is usual to compare it against    stream processing, Best Match Discovery is insufficient as once
subsequences of the series with similar length. From now on we         a global minima is derived, all other sequences are pruned. The
will refer to these subsequences as S. Similarity search can refer     second discovery method is Motif Discovery. Motif Discovery
either to the comparison of sequences in their shape or values,        is the mining of a repeated pattern in time series. According
where in time series processing, shape comparison is commonly          to [6, 15, 18] searching for repetitions of patterns in time se-
focused [1]. Due to the fact that the pattern is not necessarily       ries streams is more meaningful than Best Match Discovery, as
taken from the series, the scales of both might be different (see      streams are infinite and many similar patterns are expected to
value range in Figure 1). The scale difference needs to be handled     occur. In contrast to Best Match Discovery, Motif Discovery does
by normalization of each individual sequence, before attempting        not only deliver the best match, but also several similar sequences.
any meaningful shape comparison.                                       Thus, it gives a deeper inside of the data [12], meaning that the
 Distances play an important role in similarity search. A classic      stream is continuously searched and monitored.
Windows and Streams: When performing time series similar-
ity search on infinite streams, it is common to consider sliding
windows to analyze the data. Each window contains a sequence
of the data. By sliding over the series, overlaps of the data are
considered to prevent that a pattern is cut. Modern DSPS handle
streams in the exact same way. Thus, the parameters for parti-
tioning the data into sequences, can be related. First of all, from
similarity search we know, that the pattern and the sequences
should be of similar length. As DTW allows stretching and com-
pression of the sequence [2], a minimal (|Smin |) and maximal
sequence length (|Smax |) can be defined, considering that:
                      |Smin | ≤ |Q | ≤ |Smax |
Thus, in order to calculate the entire DTW distance matrix in
one window, the sliding window size should be equal to |Smax |.
In the window, we can create several subsequences (Si ) which                      Figure 3: Overview of DTW Parameters
fulfill the following equation:
                       |Smax − Si | ≥ |Smin |
                                                                         those data points are outliers, they influence the transformation
Therefore, |Smin | is the perfect overlap size, which still ensures
                                                                         of all window observations.
an high accurate result. To slide over the time series, it needs to be
divided in consecutive sequences. DSPS allow the user to collect
                                                                         4.2    Window Parameters
data of the a certain time range in one window. Nevertheless, the
window content is not guarantied to be in order. Thus, in cases          So far, we have derived a relationship between window and simi-
where order is required, sorting by time needs to be applied to          larity search parameters, the query length |Q | and the warping
ensure a meaningful comparison.                                          band restriction wwar p , but no optimal or recommended size for
                                                                         Smax and Smin have been defined.
4     DISTRIBUTED DYNAMIC TIME WARPING                                      We extend the window size from |Q | to |Smax | to allow the
                                                                         stretching of our sequence. Still, there are two main reason why
Usually, pattern mining is a data discovery task, made at a stage,
                                                                         the window size should be kept small: Its negative impact on the
where users have little knowledge about the data. Therefore, our
                                                                         normalization accuracy (Section 4.1) and that in fact DTW is a
implementation allows the user to start the search without setting
                                                                         cumulative distance. This leads to the assumption, that too long
DSPS or similarity search parameters. The initial pruning param-
                                                                         sequences can hardly give better results than shorter ones. DTW
eters are derived when the processing starts and are continuously
                                                                         is able to overcome large differences by leaving the diagonal
updated whenever a new sequence, more similar to the pattern,
                                                                         path by maximal wwar p cells. Still, longer sequences with lower
is detected. If further knowledge about the data exist, parameters
                                                                         accumulated distances will not be a common case [16]. Restricting
can also be set to narrow the search space. One feature of DSPS
                                                                         |Smax | is a performance optimization as for each additional point
is keying, which automatically groups the data by assigned labels,
                                                                         a further row is added to the DTW matrix. For all those reasons,
e.g. each monitored patient gets his own key. Another powerful
                                                                         Smax is set by default to the following length and due to the
feature for similarity search is the global state, which can be ac-
                                                                         warping band relation we can define Smin in a similar way :
cessed to get and update non-static parameters such as maximal
distance. Indeed, this global state allows faster pruning on all                Smax = |Q | + wwar p        and Smin = |Q | − wwar p
workers by communicating the best-so-far distance result and             On the basis of |Q | and wwar p , all parameters of our implementa-
thus, avoiding unnecessary calculations of unpromising subse-            tion can be derived automatically for DTW and its processing in
quences. A further advantage of this state is, that it is key-based      a distributed system. They can also be set manually to adjust the
and thus able to develop different thresholds for multiple time          search according to special requirements. The derived window
series [4], e.g. a specific threshold for each monitored patient.        parameters are in accordance with the suggestions made in [16]:
4.1    Normalization                                                                                                     |Q |
                                                                                       |Smax | ≤ 2 ∗ |Q |    and Smin ≥
As mentioned before, patterns and sequences might appear in dif-                                                          2
ferent scales (see Figure 1). Therefore, normalization is required.      One of the advantages of distributed systems is, that the sliding
In streaming we can apply normalization per sliding window.              windows can be distributed among several workers and therefore
The most accurate solution would be if the slide size is equal to        takes advantage of parallelization. Once a worker has received its
one and the window size equal to the pattern size. Such an accu-         window to be processed, we can apply traditional optimization
rate normalization is optimized in single machines by leveraging         techniques as it is done in single machine processing.
previous calculations. As workers in distributed systems do not
have easy access to previous results, our strategy is to extend the      4.3    Pruning
window to Smax and trigger a new one after an optimal slide size.        Fast pruning techniques for DTW have been considered to reduce
This strategy reduces not only the number of windows but also            execution time. As no comparable distributed stream processing
allows the implementation of several pruning techniques (see             implementation exists, we studied and modified single machine
Figure 3). Besides the overall computation reduction this might          techniques [18] for the usage in distributed systems. Not all
affect accuracy. The reason for the accuracy loss is that, due to        of them can be meaningfully applied to our implementation,
the window extension, additional data points are considered. If          for example running sums or previous result heuristics with
dependencies between two consecutive subsequences [18] can
not be guaranteed in distributed systems. These techniques are
simply too expensive to be applied on independent subtasks and
workers with only few observations instead of the whole time
series. Since pruning often comes at the cost of accuracy, only
the following techniques, with good approximate results [6, 18]
and applicable to single sequences have been implemented:
   (1) Warping band size and global maximal distance:
       We use the Sakoe-Chiba Band [18] for reduction of the
       DTW distance matrix search space and further prune all
        paths as soon as their latest cell reaches the maximal dis-
        tance. The entire distance matrix is pruned, whenever for
        a certain qi , no s j can be found, which means for all pos-
                      ˆ i , s j ) is above the maximal distance. Due
        sible paths d(q
        to the usage of a global key-based state for the maximal
        distance, each time series has its own distance threshold.
   (2) Lower Bounds (LB): Early pruning of DTW matrix cal-
        culations is possible with low cost calculations of LB.
        LB_Kim, LB_KimFL and LB_Keogh variations [18], which
                                                                                Figure 4: Baseline Test: Accuracy Comparison
        consider specific indexes and conditions for (qi , s j ) combi-
        nations, are implemented in our approach. Further, we can
        prune an entire window using the triangle inequality of           Table 1: Baseline Test: Execution Time Comparison for
       DTW and Euclidean distance by calculating the Euclidean            various Machines with parallelization 8
        distance for the first and the last subsequence of the win-
        dow. If both are above the best-so-far Euclidean distance           Data                 Baseline [18] Laptop Server       Cluster
       (kept in our global state), the window is pruned. It is worth        1 patient, 1 day     35 s          37 s   145 s        118 s
        to note, that this is not an exact search.                          Best Match Dis-
   (3) Square root operations on distance calculations are omit-            covery
        ted as the extra computational effort makes no difference           1 patient, 10 days   -              330 s    1.312 s   977 s
        to the distance ranking [18].                                       Best Match Dis-
                                                                            covery
5     EXPERIMENTS                                                           1 patient, 10 days   -              967 s    4.066 s   2.176 s
We have designed three experiments using the ECGoneday [17]                 Motif Discovery
dataset and the query pattern (Q) of the experiments from Rak-
thanmanon et al. [18]. The first experiment is a baseline test            In our implementation in Apache Flink (version 1.4.2), we han-
that allows us to compare our results against the ones reported           dle several patients’ data in parallel by utilizing multiple input
in [18]. In the second experiment we simulate a real-life scenario        sources. Additionally, each operator in Flink, like our window
e.g. a hospital where patients stay for a couple of days and their        function, can split its operations into several subtasks, which can
ECG signals are continuously monitored. Finally, in the third             be executed distributed and in parallel. The number of subtasks is
experiment the average response time is measured under various            defined by the degree of parallelism. These subtasks are chained
loading conditions in order to determine how long it takes the            together into so-called tasks. All operators working on the same
system to match the pattern in a continuous time series stream            degree of parallelism can be chained to one task.
in average.                                                                  In order to execute our experiments we have to define another
                                                                          important parameter on the machines: Taskslots. This parameter
5.1    Data and Processing Machines                                       defines the number of tasks a single worker can accept, which is
The ECGoneday dataset contains 20.140.000 data points and rep-            recommended to be set equal to the number of available CPUs
resents one day, in particular 22 hours and 23 minutes, of ECG            per machine [4]. Thus, Laptop is set to 8, while the other two
data for a single patient. Thus, we receive 250 data points per           machines are set to 48. Important to notice is that the number of
second. Using the default settings of our implementation and              Taskslots influences the parallelization degree. If more tasks than
the length of the given pattern, one Flink window contains 463            available task slots are created, these tasks need to wait, which
data points or 1.9 s of data. As sliding windows are used in our          cause delays or timeout errors. For all tests, five repetitions of
application, another window is triggered by default every 336 ms.         each experiment are performed and the average processing times
   In order to facilitate result comparisons and analyze the pro-         are reported.
cessing overhead of parallelization in a distributed system we
run our experiments on the following machines:                            5.2    Baseline Test
     • Laptop, an 8-core Intel with 2.90 Ghz, 31,1 GB RAM, run-           Our initial test is a comparison between the implementation
       ning Ubuntu 18.04                                                  of Rakthanmanon et al. [18] and ours. Thus, we run our applica-
     • Another single machine, further called Server, with 48             tion on the three machines for the ECGoneday dataset. For this
       CPUs, 2.90 Ghz, 126 GB of RAM, running Ubuntu 16.04                test, the search mode of our application was set to Best Match Dis-
     • Finally, we use a Cluster with 6 nodes, each with 48 CPUs,         covery, which is also used in the experiments of Rakthanmanon
       2.90 Ghz and 48 GB of RAM, running Linux 4.18                      et al. We expected a computational overhead for all machines
                                                                        parallelization levels, from 8 to 84 are applied. The execution
                                                                        time comparison of various settings is visualized in Figure 5.
                                                                            A first observation in Figure 5 is that the Laptop can only
                                                                        handle parallelization 8 and although, it is faster with 8 sources
                                                                        as soon as we increase the parallelization level, Laptop is not
                                                                        even able to start the process. Remember that every machine has
                                                                        a limitation on the number of Taskslots, for Laptop it is 8.
                                                                            We can also observe that the best results are obtained with the
                                                                        Cluster for parallelization 8, irrespective of the number of sources.
                                                                        This result can be explained again by the available number of
                                                                        task slots in the Cluster. Moreover, the first experiment with 8
                                                                        sources on parallelism 8 creates only 64 subtasks excluding the 8
Figure 5: Stream Simulation Test: Total Execution Time                  source tasks in the beginning. The Cluster can execute all of them
for ten Days, several Patients (Sources) and various levels             in parallel due to its workers capacity (48 Taskslots ∗ 6 nodes),
of parallelism executed on Laptop, Server and the Cluster               while Server as well as Laptop need to queue tasks, what increases
                                                                        their response time.
                                                                            But, how far can we increase the parallelism on the Cluster?
due to the usage of Apache Flink. Both, Cluster and Server need         We observed, that whenever the following inequality is true, we
to deal with CPU distribution and the Cluster further with data         do not profit any more from the distributed environment.
shuffles and communication effort to overcome the shared mem-
ory disadvantage. These costs only become negligible in case of
large data sets or streams, which in fact ECGoneday is not.               Sources ∗ Degree of Parallelism ≥ 2 ∗ Available task slots
   We set all machines to a maximal parallelization degree of
8 for a better comparison with the Laptop achievements. The
results presented in Table 1 prove that our implementation is very      5.4    Average Response Time Test
close to Rakthanmanon et al. [18] achievements, with only 2 s           The final experiment has been conducted to examine, how long
difference on Laptop. Here, the application profits from caching        the system takes to decide, if a window is similar to the given
due to shared memory and little monitoring costs of the cores.          pattern, or not. For real-life applications the response time is a
   On the other hand, shared memory and/or monitoring tasks             critical issue, since the faster the matching is done the faster a
cause a massive overhead in both, the Cluster and the Server,           human can react, e.g. attend a patient in a hospital. In a real-time
as the processing time for these machines are much higher. As           scenario a worker processing a window needs to wait until its
mentioned before, these results are not surprising as stream pro-       end time is reached, before it is triggered. In other words, as
cessing as well as distributed systems are intended to process          long as it has not seen all the required data, no decision can be
large amounts of data. Thus, we re-run this experiment with             made. Thus, this time is not considered as technical caused delay.
an increased number of days to 10. Laptop still can handle this         Inspired by the benchmark experiments performed in [9], we
amount of data and beats the other two machines in both modes,          define the average response time by calculating the difference of
Best Match and Motif Discovery. Here, we can already notice that        the system time between the arrival of the final data point of the
the difference between both modes is an execution time increase         window and the output of the window result. Figure 6(a) presents
of up to three times in all machines.                                   the results of our application, aggregated over 10.000 consecutive
   This experiment was also used to double check accuracy, in the       windows for 10 days data of a single patient. The plot shows, that
sense of verifying whether we obtain the same best match results        initializing the application is an expensive process. Here, several
as Rakthanmanon et al. [18]. In our implementation, due to the          pruning parameters are established and permanently updated
distributed normalization used, we obtained two best matches,           with improved threshold values between the workers. After the
one of them corresponding to the one found by Rakthanmanon              first 10.000 windows, the average response time drops from up
et al. We show the best matches obtained with our distributed           to 0,34 ms to less than 0,05 ms in average per window as the
DTW in comparison to the one of DTW [18] in Figure 4.                   application is now able to quickly prune unpromising sequences.
                                                                           This response time is stable for the next 100.000 windows,
5.3    Stream Simulation Test                                           which corresponds in real-time to the first 16 hours of the first day.
In this experiment we stress the processing machines even more          Afterwards, it drops again to responses truly close to 0 and thus
by not only adding more days of data but increasing the number          to real-time response. Further, we can observe some non-regular
of patients up to 64. The data is processed in parallel and the         peaks in Figure 6(a). They indicate delays, which are caused by
search mode is set to Motif Discovery, which is a more realistic        the network, e.g. due to the data throughput, communication,
scenario than Best Match Discovery. This search mode has much           result writing, etc. Still, the time variants of the peaks are tiny
higher computational demand, as seen in the first experiment,           and appear rarely, thus, they can be ignored for the overall result.
since more calculations need to be executed and the output rate            The average response time increases if multi-sources are pro-
increases due to a positive tolerance interval of the maximal           cessed. Figure 6(b) is the resulting plot of the average response
distance bound. Stream processing is intended to run over a             time test runs with 8 patients and 10 days of data on Server and
long period of time, thus we fix the number of days to 10 per           Cluster. In the first 100.000 windows the response time is up to
patient, based on the idea of simulating a real-life scenario e.g. of   120 ms with an average of 65 ms for the Server and up to 70 ms
a hospital, where patients stay for a couple of days. We run this       for the Cluster with an average of 36 ms. Only after learning
test with an increasing number of patients (sources in the Flink        the pruning parameters both machines prove fast responses of
environment) starting with 8 and up to 64 patients. Also, several       0,022 ms for the Cluster and 0,039 ms for the Server.
                       (a) Average Response Time for a single source                     (b) Average Response Time, 8 sources, after first 100.000 windows


    Figure 6: Average Response Time for DTW on Cluster: Aggregated results over 10.000 processed windows for 10 days


6    CONCLUSIONS                                                                        [5] Apache Foundation. 2018. Apache Storm. http://storm.apache.org/
                                                                                        [6] Alborz Geramifard, Finale Doshi, Joshua Redding, Nicholas Roy, and Jonathan
In this paper we present time series similarity search with DTW ,                           How. 2011. Online discovery of feature dependencies. In Proceedings of the
which has been reported as computational expensive and difficult                            28th International Conference on Machine Learning (ICML-11). 881–888.
                                                                                        [7] Tian Guo, Saket Sathe, and Karl Aberer. 2015. Fast distributed correlation
to implement for real-time processing. We benefit from Apache                               discovery over streaming time-series data. In Proceedings of the 24th ACM
Flink features such as smart partitioning and keying and are thus                           International on Conference on Information and Knowledge Management. ACM,
able to process several sources and sliding windows in parallel                             1161–1170.
                                                                                        [8] Mirko Kämpf and Jan W. Kantelhardt. 2013. Hadoop.TS: Large-Scale Time-
for independent distance computation. The overall result of this                            Series Processing. International Journal of Computer Applications 74, 17 (July
paper is, that efficient similarity search by the usage of modern                           2013), 1–8. https://doi.org/10.5120/12974-0233
DSPS is feasible and gives sufficiently fast response for imme-                         [9] Jeyhun Karimov, Tilmann Rabl, Asterios Katsifodimos, Roman Samarev, Henri
                                                                                            Heiskanen, and Volker Markl. 2018. Benchmarking Distributed Stream Pro-
diate reactions on matching sequences. Besides, and to the best                             cessing Engines. arXiv preprint arXiv:1802.08496 (2018).
of our knowledge, none of the reviewed literature can handle                           [10] Maria Kontaki, Apostolos N Papadopoulos, and Yannis Manolopoulos. 2007.
                                                                                            Adaptive similarity search in streaming time series with sliding windows.
several maximal distance thresholds for searching patterns in                               Data & Knowledge Engineering 63, 2 (2007), 478–502.
multiple time series in parallel. Future work is required though,                      [11] Xiang Lian, Lei Chen, Jeffrey Xu Yu, Guoren Wang, and Ge Yu. 2007. Similarity
especially, to extend the efficiency and generality of the applica-                         match over high speed time-series streams. In Data Engineering, 2007. ICDE
                                                                                            2007. IEEE 23rd International Conference on. IEEE, 1086–1095.
tion. Possible future work areas are: handle multiple patterns at                      [12] Bo Liu, Jianqiang Li, Cheng Chen, Wei Tan, Qiang Chen, and MengChu Zhou.
the same time, allow processing of multi-dimensional time series                            2015. Efficient motif discovery for large-scale time series in healthcare. IEEE
and the maintenance of patterns over time, for example by using                             Transactions on Industrial Informatics 11, 3 (2015), 583–590.
                                                                                       [13] Alice Marascu, Suleiman A Khan, and Themis Palpanas. 2012. Scalable similar-
a ValueState to allow on-line learning. For DTW , the usage of                              ity matching in streaming time series. In Pacific-Asia Conference on Knowledge
a tree structure to maintain the pattern would be necessary to                              Discovery and Data Mining. Springer, 218–230.
                                                                                       [14] Abdullah Mueen and Eamonn Keogh. 2010. Online Discovery and Maintenance
react on matches with different length.                                                     of Time Series Motifs. In Proceedings of the 16th ACM SIGKDD International
                                                                                            Conference on Knowledge Discovery and Data Mining (KDD ’10). ACM, New
                                                                                            York, NY, USA, 1089–1098. https://doi.org/10.1145/1835804.1835941
ACKNOWLEDGMENTS                                                                        [15] Rodica Neamtu, Ramoza Ahsan, Elke Rundensteiner, and Gabor Sarkozy. 2016.
This work was partly supported by the German Federal Min-                                   Interactive Time Series Exploration Powered by the Marriage of Similarity
                                                                                            Distances. Proc. VLDB Endow. 10, 3 (Nov. 2016), 169–180. https://doi.org/10.
istry of Transport and Digital Infrastructure (BMVI) through                                14778/3021924.3021933
the Daystream project (grant no. 19F2031A), the German Min-                            [16] Panagiotis Papapetrou, Vassilis Athitsos, Michalis Potamias, George Kollios,
istry for Education and Research (BMBF) as BBDC (grant no.                                  and Dimitrios Gunopulos. 2011. Embedding-based subsequence matching in
                                                                                            time-series databases. ACM Transactions on Database Systems (TODS) 36, 3
01IS14013A) and the EU H2020-ICT-2017-2018 program through                                  (2011), 17.
the BigMedilytics project (grant no. 780495).                                          [17] Mueen Rakthanmanon, Campana and Batista. 2012. The UCR Suite: Funded
                                                                                            by NSF IIS - 1161997 II. http://www.cs.ucr.edu/~eamonn/UCRsuite.html
                                                                                       [18] Thanawin Rakthanmanon, Bilson Campana, Abdullah Mueen, Gustavo Batista,
REFERENCES                                                                                  Brandon Westover, Qiang Zhu, Jesin Zakaria, and Eamonn Keogh. 2013. Ad-
 [1] Alice Berard and Georges Hebrail. 2013. Searching Time Series with Hadoop in           dressing Big Data Time Series: Mining Trillions of Time Series Subsequences
     an Electric Power Company. In Proceedings of the 2Nd International Workshop            Under Dynamic Time Warping. ACM Trans. Knowl. Discov. Data 7, 3 (Sept.
     on Big Data, Streams and Heterogeneous Source Mining: Algorithms, Systems,             2013), 10:1–10:31. https://doi.org/10.1145/2500489
     Programming Models and Applications (BigMine ’13). ACM, New York, NY,             [19] Norihiro Takahashi, Tomoki Yoshihisa, Yasushi Sakurai, and Masanori
     USA, 15–22. https://doi.org/10.1145/2501221.2501224                                    Kanazawa. 2009. A parallelized data stream processing system using dy-
 [2] Donald J Berndt and James Clifford. 1994. Using dynamic time warping to                namic time warping distance. In Complex, Intelligent and Software Intensive
     find patterns in time series.. In KDD workshop, Vol. 10. Seattle, WA, 359–370.         Systems, 2009. CISIS’09. International Conference on. IEEE, 1100–1105.
 [3] Hui Ding, Goce Trajcevski, Peter Scheuermann, Xiaoyue Wang, and Eamonn            [20] Limin Xiao, Yao Zheng, Wenqi Tang, Guangchao Yao, and Li Ruan. 2013.
     Keogh. 2008. Querying and mining of time series data: experimental com-                Parallelizing dynamic time warping algorithm using prefix computations
     parison of representations and distance measures. Proceedings of the VLDB              on GPU. In High Performance Computing and Communications & 2013 IEEE
     Endowment 1, 2 (2008), 1542–1552.                                                      International Conference on Embedded and Ubiquitous Computing (HPCC_EUC),
 [4] Apache Flink. 2018. Apache Flink: Fast and reliable large-scale data processing        2013 IEEE 10th International Conference on. IEEE, 294–299.
     engine. http://flink.apache.org