=Paper= {{Paper |id=None |storemode=property |title=Scalable Linked Data Stream Processing via Network-Aware Workload Scheduling |pdfUrl=https://ceur-ws.org/Vol-1046/SSWS2013_paper6.pdf |volume=Vol-1046 |dblpUrl=https://dblp.org/rec/conf/semweb/FischerSB13a }} ==Scalable Linked Data Stream Processing via Network-Aware Workload Scheduling== https://ceur-ws.org/Vol-1046/SSWS2013_paper6.pdf
     Scalable Linked Data Stream Processing via
        Network-Aware Workload Scheduling

            Lorenz Fischer, Thomas Scharrenbach, Abraham Bernstein

                         University of Zurich, Switzerland?
                  {lfischer,scharrenbach,bernstein}@ifi.uzh.ch



        Abstract. In order to cope with the ever-increasing data volume, dis-
        tributed stream processing systems have been proposed. To ensure scal-
        ability most distributed systems partition the data and distribute the
        workload among multiple machines. This approach does, however, raise
        the question how the data and the workload should be partitioned and
        distributed. A uniform scheduling strategy—a uniform distribution of
        computation load among available machines—typically used by stream
        processing systems, disregards network-load as one of the major bot-
        tlenecks for throughput resulting in an immense load in terms of inter-
        machine communication.
        In this paper we propose a graph-partitioning based approach for work-
        load scheduling within stream processing systems. We implemented a
        distributed triple-stream processing engine on top of the Storm realtime
        computation framework and evaluate its communication behavior using
        two real-world datasets. We show that the application of graph partition-
        ing algorithms can decrease inter-machine communication substantially
        (by 40% to 99%) whilst maintaining an even workload distribution, even
        using very limited data statistics. We also find that processing RDF
        data as single triples at a time rather than graph fragments (containing
        multiple triples), may decrease throughput indicating the usefulness of
        semantics.


Keywords: semantic flow processing, stream processing, linked data, complex
event processing, graph partitioning, workload scheduling


1     Introduction
In today’s connected world, data is produced in ever-increasing volume, velocity,
variety, and veracity [20]: sensor data is gathered, transactions are made in the
financial domain, people post/tweet messages, humans and machine infer new
information, etc. This phenomenon can also be found on the Web of Data (WoD),
where new sources are made available as linked data. In order to process these
?
    The research leading to these results has received funding from the Europ. Union
    7th Framework Programme FP7/2007-2011 under grant agreement no 296126 and
    from the Dept. of the Navy under Grant NICOP N62909-11-1-7065 issued by Office
    of Naval Research Global.
growing data sources many have proposed the use of distributed infrastructures
such as Hadoop [20]. The batch-oriented synchronous nature of these solutions,
however, may not be suited to ensure the timeliness of data processing. To ad-
dress this shortcoming stream processing approaches based on information-flow
processing have been proposed [8]. These systems continuously ingest new data
as it arrives and process it online rather than storing it for batch-like processing.
This continuous processing puts a significant load on employed systems and is,
obviously, limited by the capacity of the employed hardware infrastructure.
    To cope with increasing demands distributed stream processing systems have
been proposed, which usually ensure scalability by partitioning the data and
distributing the processing thereof to multiple machines. Note that deciding how
to partition the data and distribute the associated processing is a non-trivial task
and can have dire consequences on performance.
    Distributed processes communicate with each other by sending messages
containing partial results of the overall processing task. Processes on different
machines communicate over the network and the resulting network load lim-
its scalability in two ways: First, network traffic is several orders of magnitude
slower than in-machine communication.1 Second, the bandwidth of each ma-
chine limits the amount it can possibly communicate to processes residing on
other machines. As a consequence, finding a good distribution strategy for dis-
tributed stream processing is crucial to ensure scalability. Note that the variety
and potential schemalessness of linked data further aggravates the problem as a
Semantic Flow Processing (SFP) systems (1) cannot rely on the schema for data
partitioning and distribution and (2) the triple-nature (rather than the reliance
on n-tuples or relations) of the underlying data model potentially further sub-
divides the smallest unit of data increasing the number of possible partitioning
(and hence, distributions).
    As distributed stream processing becomes more important in many areas of
business and science, researchers have proposed various ways to schedule work-
load in such systems. Interestingly, we found no previous work that employs
existing graph partitioning algorithms to the problem of workload scheduling.
    In this paper, we propose the use of graph partitioning algorithms to optimize
the assignment of tasks to machines: we regard the data-flow within an SFP
system as a graph, where the edges’ weight represents the required bandwidth
for information passing and the vertices’ weight represents the computational
load required to process incoming messages. Specifically, we operationalize the
edge weights as the number of messages sent from one process to another, based
on the two assumptions: first, that messages are approximately the same size,
and second the computational load to be proportional to the number of messages
received by a processing vertex, assuming further that all messages need the same
time to be processed on all tasks. We then use a graph partitioning algorithm to
optimize the distribution of processes to machines whilst addressing two possibly
1
    Numbers from 2009: 500 times for latency, 40 times for bandwidth. See
    http://www.cs.cornell.edu/projects/ladis2009/talks/dean-keynote-ladis2009.pdf for
    more details.


                                          82
opposing goals: we try to minimize costly network messages whilst distributing
computation load as evenly as possible.
    To evaluate our distribution approach we implemented an SFP system on
top of the Storm realtime computation framework.2 Our implementation allows
compiling certain SPARQL queries, modified for stream processing, to a Storm
topology. To enable parallelization of the processing functions the data flows in
the topology are partitioned using appropriate hash functions. Next we empir-
ically determine the weight of the nodes and edges, partition the graph, and
“schedule” the processes accordingly on machines. In our evaluation using two
real-world datasets we show that our graph-partitioning strategy can decrease
inter-machine communication substantially (by 40% to 99%) whilst maintaining
an even workload distribution, even using very limited data statistics.
    The remainder of this paper is structured as follows: next we succinctly sum-
marize the most relevant related literature (Section 2) before introducing the
details of our approach (Section 3). This is followed by a description of our ex-
periments (Section 4), a discussion of the results in the light of its limitations
(Section 5), and an exploration of the implications and future work (Section 6).

2     Related Work
This study relates to (distributed) Information Flow Processing (IFP)3 , Seman-
tic Flow Processing (SFP) [18], and workload scheduling. We provide an overview
of the most relevant work in these fields.
Information Flow Processing: The field of IFP is vast and a survey is beyond
the scope of this paper [15, 12, 8]. Here we only discuss the Aurora/Borealis [1,
2] systems, as they are most closely related to our research.
Aurora [1] lets the user specify a query using a set of operators (boxes), which are
connected by links (arrows). The Borealis system [2] extends Aurora to include—
among other things—the distribution of the workload across multiple machines.
Load distribution is achieved by query partitioning – the assignment of operators
(boxes) or a collection thereof (superboxes) to worker machines. This approach
has two drawbacks: First, it limits the degree of parallelism to the number of
boxes. Second, in its naı̈ve setup, all information exchange between operators
goes over the network consuming enormous amounts of network bandwidth. The
only improvement to this strategy is to group operators onto the same machine.
Our approach, in contrast, proposes to improve load distribution through data
partitioning, where operators themselves are replicated across many machines.

Semantic Flow Processing: The C-SPARQL system [6] performs query matching
on subsets of the information flow defined by windows. For query matching on the
subsets it uses a regular SPARQL engine that is extended by some stream related
2
    http://storm-project.net
3
    The term Information Flow Processing has been suggested by [8] as the term Stream
    Processing is ambiguous due to its usage by both the Complex Event Processing
    (CEP) and the Data Stream Management Systems (DSMS) community.


                                          83
features. A distributed version was implemented using Apache S4 platform;4 yet
with no particular scheduling strategy [9].
    EP-SPARQL [4] is a complex event processing system, which extends the
ETALIS system with a flow-extension of SPARQL for query processing [4].
ETALIS is a Prolog engine for which no distributed version exists yet.
    CQELS [13] “implements the required query operators natively to avoid the
overhead and limitations of closed system regimes”. It optimizes the execution
by dynamically re-ordering operators to “prune the triples that will not make
it to the final output” thus limiting processing. As the implementation makes
no assumptions about scheduling with regards to messages sent between algebra
components CQELS could benefit from a scheduler based on graph partitioning.
    SPARQLStream [7] is a streaming extension to SPARQL that allow users to
query relational data streams over a set of stream-to-ontology mappings. The
language supports powerful windowing constructs and SRBench [23] uses it as
the default engine for evaluation.
    INSTANS [17] and Sparkwave[11] are based on a RETE network. Both their
implementations are non-distributed implementation, yet very efficient. Both
stream querying systems support the RDF, and RDFS, and—in the case of
Sparkwave—OWL entailment. It would be interesting to investigate, to what
extent our approach could be built on top of a RETE-network.

Workload Scheduling: Earlier work on scheduling in stream processing concen-
trated on operator scheduling in wide area networks [16] and admission control
[21, 22], recent work also targets the usecase in which workload of a stream pro-
cessor in a compute cluster has to be scheduled [5].
    SODA [21] is an admission control system and task scheduler for System S
[3]. The task scheduler within SODA is based on a mixed-integer optimization
program and also uses techniques from the network flow literature.
    Pietzbuch et al. [16] present an decentralized algorithm that is geared towards
minimizing the overall latency of a stream processor whose operators are spread
out across a wide area network, while taking CPU load into account.
    Xia et al. [22] map the problem of task scheduling to a multicommodity flow
network and present a distributed scheduling algorithm in which the amount of
communication between nodes is incrementally analyzed and reduced.
    Aniello et al. [5] present two algorithms which are both geared towards re-
ducing the number tuples tranferred over the network of a storm cluster. Their
static ”offline” scheduler takes characteristics of the topology into account while
their ”online” scheduler collects network statistics, before optimizing the sched-
ule by moving nodes connected by hot edges, i.e. edges that exhibit high data
volumes, on the same server. Their evaluations conducted using a synthetic and
a real-world dataset show, that online-scheduling results in much lower latency
than static or uniform scheduling.

4
    http://incubator.apache.org/s4


                                        84
3     Problem Statement, Formal Definitions, and System
      Description

In this section we provide the technical foundations for our study. These include
a brief introduction to the data- and processing models employed. Next, we in-
troduce the three concepts of data partitioning, scheduling, and load balancing
and how they affect the performance of a distributed system, before presenting
a formal problem description. We then show, how the multi-constraint optimiza-
tion problem of scheduling can be solved using a graph partitioning algorithm,
before we, finally, introduce the system we built to test our hypothesis.


3.1   Data- and Processing Model

A linked data stream processing system essentially continuously ingests large
volumes of temporally annotated RDF triples and emits the results again as data
stream. Such systems usually implement a version of the SPARQL algebra that
has been modified for processing dynamic data. In our case, we focus on a subset
of those defined in the queries of the SRBench benchmark [23]. The processing
model considered is a directed graph, where the nodes are algebra operators and
data is sent along the edges. Hence, each query can be transformed to a query
tree of algebra expressions – the topology of the processing graph.
While the system consumes temporally annotated RDF triples (< s, p, o > [ts],
where ts denotes the time-stamp), internal operators in the topolgy consume
and emit sets of variable bindings when performing the operations associated
with the respective operator. These variable bindings comprise a finite number
of variable/value pairs ?var/val, where ?var is a variable name and val is an
RDF term. Note that source operators (i.e., the input to the topology) consume
timestamped RDF triples instead of bindings and output operators may also
output RDF triples if so specified by the query.


3.2   Scheduling, Data Partitioning, and Load Balancing

In order to scale the system horizontally (i.e., executing its parts on multiple pro-
cessing units concurrently) we may replicate parts (or the whole) of the query’s
topology and execute clones of the operators in parallel. We refer to these clones
as tasks or task instances. Hence, each operator will be instantiated as a finite
number of n tasks (where n ≥ 1). These task instances, and thus the workload of
the system, can then be distributed across several machines in a compute cluster.
We refer to the assignment of tasks to machines as scheduling. Figure 1 shows an
example operator topology and one possible schedule distributing tasks to two
servers of a compute cluster. As there are now multiple instances of each operator
of the topology the data needs to be partitioned in accordance to the operator’s
needs. For stateless operators, such as filters or binders, it does not matter which
variable bindings they receive for processing. For stateful operators, in contrast,
such as aggregators or joins, the system needs to provide some guarantees about

                                         85
                    Topology                 Server 1          Server 2

                                        Bg         tg1         tg2         tg3




                    grouping grouping
                                        Bo         to1               to2



                                        Bb   tb2         tb1         tb3



Fig. 1. A topology for an example query with three operators or algebra expressions
(left side) and a possible schedule with eight tasks distributed over two servers (right
side). Data flows from top to bottom. Green (solid) arrows indicate fast intra-machine
communication; red (dashed) arrows indicate costly inter-machine communication.


what data gets delivered to which task instance. To this end, a topology config-
uration contains grouping strategies (or information about the data partitioning
function) on the edges between operator nodes (see also Figure 1 on left).
    In this study we assume the number of messages sent between the machines
as the key performance indicator (KPI). This is seems to be a prudent choice,
as the network can become a bottleneck of a distributed system that needs to
scale horizontally. We acknowledge that our choice has limitations and therefore
provide a discussion in Section 5.
    Given these definitions the goal of our approach is to find a schedule (i.e.,
assignment of tasks to machines) for a given topology that minimizes the total
number of data messages transferred over the network, whilst maintaining an
even workload distribution across machines in terms of CPU cycles.
    To achieve this goal we partition the data into logical units. We then re-
group these using graph-partitioning which provides us with an optimization
procedure to minimize the number of messages sent between machines. As a
result, we propose the following hypothesis: Combining data partitioning between
tasks with a scheduler that employs graph partitioning to assign the resulting task
instances outperforms a uniform distribution of data and tasks to machines.
    Our hypothesis assumes that different distribution strategies significantly in-
fluence the number of messages sent between the machines. Most stream process-
ing platforms attempt to uniformly distribute compute loads possibly incurring
high network traffic. Approaches like Borealis schedule the processors according
to the structure of the query, where every operator is is assigned to one ma-
chine. This approach has an upper limit in parallelization equal to the number
of operators and may incur high network traffic between to machines containing
active operators. Instead we propose to parallelize the operators and minimize
network traffic allowing for more flexibility for distributing the workload. Most
importantly, we propose that the scheduling strategy should optimize the amount
of data sent between machines.


                                             86
3.3    Formal Problem Description

In principle, a linked data stream processing system can be conceived as a query
graph QSF P =< Op, M C >, where Op is a finite set of operators opi (i.e.,
Op = ∪Ii=1 opi ) and each opi executes one or more algebra operations. The flow
of information between the operators is established by a set of edges mc ∈
M C (message channels) that denote a flow of messages (time-stamped variable
bindings µ[ts]) between the operators (opi ).
    Since we want to enable parallelism and distribution each operator is instan-
tiated in parallel as a finite number of tasks Topi = ∪Jj=1i
                                                              ti,j , where Ji denotes
the degree of parallelism of opi . We refer to the set of all tasks as

                             T = ∪Ii=1 Topi = ∪Ii=1 ∪Jj=1
                                                       i
                                                          ti,j
    Furthermore, each message channel mc ∈ M C is instantiated via a finite set
of channels cij ∈ C that connect the tasks ti , tj ∈ T . Specifically, connected
tasks send messages, i.e., time-stamped variable bindings µ[ts] to each other.
    Thus for each query graph QSF P there exists a parallelized Task Graph
T G =< T, CT >, where in addition to the mapping of each task to exactly
one operator (as specified above) each channel c ∈ C maps to exactly one mc
and each mc has at least one c to ensure connectivity. Hence, to ensure a cor-
rect mapping we require that ∀opa , opb , mcopa ,opb : ∃ta,j , tb,j , cta,j ,tb,j , where (i)
mcopa ,opb is a message channel that connects opa and opb , and (ii) cta,j ,tb,j con-
nects the corresponding tasks. In addition, we require that ∀ta,j , tb,j , cta,j ,tb,j :
∃exactly one opa , opb , mcopa ,opb to ensure the one-to-n mapping of operators and
message channels to tasks and channels.

Graph Partitioning A partitioning divides a set into pairwise disjoint sets.
In our case we want to partition the vertices of a graph G = (V, E) with a
finite set of vertices V and a finite set of edges E ⊂ V × V . A partitioning
P = {P1 , . . . , PK } for V separates the set of vertices such that
                                           SK
  – it covers the whole set of vertices: k=1 Pk = V and
                                                TK
  – the partitions Pk are pairwise disjoint: k=1 Pk = ∅

In addition, we denote (i) a partitioning function by part : V → P that assigns
every vertex v1 , . . . vl the partition Pk ∈ P it belongs to, (ii) a cost function by
cost(P ) ∈ R, which denotes some kind of cost associated with the partitioning
that is subject to optimization, and (iii) a load imbalance factor stdv(P ) that
ensures that the workload of the tasks is evenly distributed over the machines.
    We can easily map our problem of minimizing the number of messages that
are sent between machines to a graph partitioning problem with a specific cost
function. First, we define the graph to be partitioned as the Task Graph T G. A
partitioning of T G maps each task to exactly one machine.
    Second, in our case the cost function to minimize is the number of messages
sent between machines.
                   PK We operationalize the cost function for the network traf-
fic as cost(P ) = k=1 cost(Pk ). Each cost(Pk ) denotes the cost of transmitting

                                             87
messages, i.e. the bindings, across the network to a task situated in partition
Pk . Hence, we increment the cost for cost(Pk ) by one, iff a message is being
sent from task t1 to t2 and the two tasks are not in the same partition, i.e.
part(t1 ) 6= part(t2 ) and t2 ∈ Pk .
    Third, when optimizing the costs for the partitions we add the constraint
that the partitions shall be balanced with respect to the computational load.
We approximate the computation load for a partition by counting the number
of messages P that are sent over a channel for which task t is the receiving task:
load(Pk ) = cta ,t ∈C count(m) iff part(tb ) = Pk , where m ∈ M is a message
                     b
sent over channel cta ,tb . Note that we count all incoming messages for each task,
regardless of wether they had to be transferred over the network or not, as they
have to be processed and hence consume computation power in both cases.
    In order to make optimal use of the available resources, a balanced load
distribution is desirable. The standard deviation stdv(P ) of the load for all
partitions shall hence not exceed a certain threshold C:
                                     s
                                       P        load(P )
                                          p∈P (    K     − load(p))2
                   C < stdv(P ) =
                                                     K
All graph partitioning in this paper were computed using the METIS algorithms
for graph partitioning [10] – a well established graph partitioning package.

3.4   KATTS
In order to test our hypothesis we built a research prototype of a distributed
linked data stream processing engine called KATTS .5 In order to keep the pro-
gramming overhead minimal, we chose to build our system on top of the Storm
realtime computation framework.6 While our current prototype is built using
Storm it is important to note, that our findings are not only valid in the context
of Storm, but for any system that uses partitioned data streams.
    A Storm application is a graph, consisting of compute nodes that are con-
nected by edges. Edges are configured using a partitioning function or grouping
strategy. Using the abstractions of Storm we implemented a set of stream oper-
ators, a configuration environment, and a monitoring suite. Topologies can be
specified using XML and will output the sending behavior of the topolgy: the
communication graph. In addition to input operators that read time annotated
n-triple files and an output operator, the current set of supported operators con-
tains an aggregation operator (min, max, avg), a filter operator, a bind operator,
and a temporal join operator. Every incoming edge of each consuming operator
can be configured with a grouping strategy. If no grouping strategy is configured
local or shuffle grouping will be used.7 We always used the field grouping strat-
egy, which partitions the data based on the value of a tuple field (i.e., the value
5
  KATTS is a recursive acronym for Katts is A Triple Torrent Sieve. The code will
  be made accessible at https://github.com/uzh/katts upon publication of the paper.
6
  http://storm-project.net
7
  https://github.com/nathanmarz/storm/wiki/Concepts#stream-groupings


                                        88
of a variable). For partitioning Storm uses the hashCode() method of the field
value, which in our case is an object of type java.lang.String.8 In addition to the
configuration parameters of the particular node implementation, each node of
the topolgy can be configured with a value that defines its degree of parallelism,
which is the number of task instances that should be created for this operator.
    The monitoring suite has two main components: (1) a data collection facility,
which records communication behvaior, and (2) a runtime monitoring component
that keeps track of the number of input sources the system is receiving data from.
When all sources have been fully processed, the data aggregation process will be
executed and the topology as well as the Storm cluster will be halted.


4       Evaluation
In this section we evaluate if our proposed strategy is indeed better in terms of
network load whilst maintaining a comparable host load when compared to a
baseline strategy of trying to attain a uniform distribution of load. To that end
we first present the experimental setup and then present the results.


4.1       Experimental Setup

Datasets We evaluated our system using two example queries that are built
around a real world streaming use case: SRBench [23], which works with streams
of weather measurements, and an open government dataset, which we collected
through public sources.
SRBench is an RDF/SPARQL streaming benchmark consisting of weather ob-
servations about hurricanes and blizzards in the USA during the years 2001 and
2009 and contains 17 queries on LinkedSensorData,9 which originated from the
MesoWest project of the University of Utah.10 Given that our approach only
implements joins, simple aggregates, and triple-pattern matching, we restricted
ourselves to Q3, which searches for weather stations observing hurricane-like
condition. These are defined as “a sustained wind (for more than 3 hours) of at
least 33 m/s or 74 mph.” Without the prefix declaration a C-SPARQL [6] like
version of this query would look as follows:11
ASK
FROM STREAM  [RANGE 3 h STEP 1 h ]
WHERE {
  ? o b s e r v a t i o n om−o w l : p r o c e d u r e               ? sensor ;
                          om−o w l : o b s e r v e d P r o p e r t y w e a t h e r : WindSpeed ;
                          om−o w l : r e s u l t                     [ om−o w l : f l o a t V a l u e ? v a l u e ] .
}
GROUP BY ? s e n s o r
HAVING ( MIN( ? v a l u e ) >= ” 7 4 ” ˆ ˆ x s d : f l o a t )
 8
   http://docs.oracle.com/javase/6/docs/api/java/lang/String.html#hashCode()
 9
   http://wiki.knoesis.org/index.php/LinkedSensorData
10
   http://mesowest.utah.edu/index.html
11
   We employ a minimum aggregate rather than the average value used in
   http://www.w3.org/wiki/SRBench, as we think the word ”sustained” means that
   the wind speed has to be ”at least” 74 miles per hour and not ”on average”. We also
   use a step size of 1 hour instead of 10 minutes.


                                                                    89
                     SRB Q3:            R           A           F                                O



                     OpenGov:           T           A           B          F
                                                                                      J          O
                                        C


       Fig. 2. The topologies for Query 3 of the SRBench and the OpenGov query.

The resulting topology has four processes depicted in Fig.2: First, the reader
node (R) reads the incoming stream and scans it for the triple patterns contained
in the where-clause. Second, the matched bindings are then sent to the aggregator
node (A), which creates the minimum aggregate over the temporal window of
three hours and a step size of one hour. Third, the output of the aggregator is
then sent to the filter node (F), which filters all occurrences that are smaller
than 74 mph and sends all remaining instances to the output node (O). Finally,
the output node writes the occurrences into a file on disk.
OpenGov Dataset: To complement the regular setup of SRBench, which consists
mostly of weather station measurements, we gathered a second data set, which
combines data on public spending in the US with stock ticker data.12 We devised
a query that would highlight (publicly traded) companies, that double their stock
price within 20 days and are/were awarded a government contract in the same
time-frame. This query requires the system to scan two sources, aggregate/filter
values, and finally join certain events that may have a causal relation to each
using a temporal condition. The C-SPARQL representation of the query, for
example, looks as follows:
REGISTER QUERY P u b l i c S p e n d i n g S t o c k AS
SELECT { ? company name ? a g e n c y n a m e ? c o n t r a c t i d ? m i n p r i c e ? m a x p r i c e   ? factor   }
FROM STREAM  [RANGE 20 DAY STEP 1 DAY]
FROM STREAM  [RANGE 20 DAY STEP 1 DAY]
WHERE { GRAPH  {
               ? t i c k e r i d wc : PRC           ? ticker price ;
                                   wc :COMNAM ? company name ;
                                   wc : TICKER ? t i c k e r s y m b o l .
           } UNION GRAPH  {
               ? c o n t r a c t i d us : a g e n c y i d              ? agency name ;
                                      us : obligatedamount ? contract amount ;
                                      u s : vendorname                 ? company name .
           }}
AGGREGATE { ( ? m i n p r i c e , MIN , {? t i c k e r p r i c e } ) }
AGGREGATE { ( ? m a x p r i c e , MAX, {? t i c k e r p r i c e } ) }
BIND ( ? m a x p r i c e / ? m i n p r i c e AS ? f a c t o r )
FILTER ( ? f a c t o r > 2 )


The resulting topology (Fig.2) first aggregates (A) the ticker-sourced (T) data to
compute the minimum and maximum over a time window of 20 days. It computes
the ratio between these numbers (B), and then filters those solutions where that
ratio is smaller than or equal to two (F). The remaining company tickers are
then joined (J13 ) with the ones that where awarded government contracts (C).
The joined tuples are then sent to the output node (O).
12
     http://www.usaspending.gov, https://wrds-web.wharton.upenn.edu/wrds
13
     We use a hash join with eviction rules for the temporal constraints.


                                                             90
Evaluation Criteria In accordance with the Properties-Challenges-KPIs-
Stress-tests (PCKS) paradigm for for benchmarking SFP systems [18] we tested
the performance of our Distributed Flow Processing System by choosing the
number of inter-machines network messages as a key performance indicator
(KPI). As a secondary performance indicator (SPI) we chose the uniformity
of load distribution as measured by number of messages received per machine.
Procedure We used the following procedure to measure the performance of our
approach. First, we took each dataset and partitioned it to 12 files (as we had
12 machines at our disposal). The two queries were compiled into the topologies
described above and instantiated to allow 48 tasks for each node that is neither
a reader nor an output node.14 We then recorded the number of messages that
were sent between tasks at runtime.
Second, to test our hypothesis we needed to partition the resulting communica-
tion graph based on the network load of each channel. Since the channel loads
are not known before running the query we chose two experimental scenarios. In
the first scenario we assume an oracle optimizer that would know the number of
messages that would flow along every channel. This scenario allows to establish
a hypothetical upper bound of quality that our method could attain, if it were
to have an oracle. In a second scenario we assumed a learning optimizer that
first observes channel statistics for a brief period of time and then partitions the
graph accordingly. To that end we sliced the SRBench data into daily and the
OpenGov data into monthly slices. We then measured the the performance of
our approach based on learning during the preceding one to three time-slices,
essentially providing a adaptively learning system.
Third, to partition the graph we employed METIS [10]. We used the gpmetis
in its standard configuration, which creates partitions of equal size, and only
changed the -objtype parameter to instruct METIS to optimize for total commu-
nication volume when partitioning, rather than minimizing on total edgecut.15

4.2 Results
The Suitability of Graph Partitioning for Scheduling The results of our
evaluation are shown in Figure 3, which plots the number of network messages
divided by the number of total messages as a measure for the optimality of the
distribution. As the figure shows on the left, the SRBench data can be optimally
partitioned by the id of the reporting weather station even when using only the
data of the immediately preceeding time slice (Prev.1). All further computation
can be managed on a local machine, as no further joins are necessary. This
clearly indicates that some queries can be trivially distributed when a good data
partition is either known or can be learned.
    On the right we find the results for the OpenGov dataset. This evaluation is
not quite as clear-cut, as the join operation requires a significant redistribution of
messages. The results here are quite interesting. First, we find that our approach
14
   As we we ran our experiments on machines with more than 12 cores, we were able
   to achieve better capacity utilization by using more than 12 tasks per node.
15
   We used v5.0.2 with default partitioning (kway) and default load imbalance of 1.03.


                                         91
Fig. 3. Percentage of messages sent over the network for the uniform distribution and
the graph partitioned setup, using either the test data itself (oracle) or data from the
previous one to three time-slices as input for the graph partitioning algorithm.




Fig. 4. Average computation load distribution for all time-slices of each dataset.
RSD = Relative Standard Deviation

clearly outperforms the uniform distribution strategy by a factor of two to three.
Second, it is interesting to observe that even longer learning periods, using two
(Prev.2) and even three previous time slices (Prev.3), do not necessarily improve
the overall performance - maybe due to over-fitting or concept drift [19].
    For the sake of brevity we only show data for three time-slices of each evalu-
ation in Table 1: on the left side again the results for the embarrassingly parallel
SRBench query, which shows a reduction in network usage by over 99%. The
right side of the table is more interesting as it exhibits the gain of our approach
in a non-trivial case. Even for the OpenGov query, workload distribution using
a graph partitioning approach yields savings of network bandwidth of over 40%.

Balancing Computation Load Next to keeping the bandwidth usage to a
minimum, a distributed system must also make good use of the available compu-
tational power. For this reason we analyzed how many messages were processed
by all tasks on each host for the two queries. Figure 4 shows the results of this


               SRBench Q3 August, 2004              OpenGov , 2001
             Slice    Uniform Oracle Prev.1 Slice Uniform Oracle Prev.1
             Aug 9       1      0.7%   0.0% Feb     69,5% 29.0% 36.0%
             Aug 10      1      0.0%   0.7% Mar     69,8% 32.1% 30.1%
             Aug 11      1      0.0%   0.0% Apr     69,8% 37.7% 32.1%

Table 1. Percentage of messages sent over the network for the uniform distribution,
the partitioning based on the test data itself (oracle), and the preceeding time slice
(“Prev.1” in Table) as input for the graph partitioning algorith; three time slices each.



                                           92
evaluation: The load distribution resulting from the graph partitioned task as-
signment only differs slightly from the one found by uniform task distribution
(average relative standard deviation [RSD] OpenGov : 7.04% for partitioning vs.
5.27% for uniform baseline; SRBench: 3.74% for partitioning vs. 2.68% for uni-
form baseline).
The Influence of Data Partitioning The results above are very encouraging.
One of the major limitations of our measurements, however, is that we assumed
that the data came partitioned into meaningful groups. Whilst this assumption
is often true in practice (the input from weather stations comes as grouped mes-
sages from one station, data about one stock usually arrives from one source,
etc.). But in some worst-case scenarios the data might be mixed (even if a total
random intermixing is unlikely). To investigate the robustness of our procedure
against this assumption we ran our approach under two different partitioning
regimes: first, we made sure to partition the data along a different hash function
that chosen by our system (which relies on the Storm hash partitioning) and sec-
ond, we ensured employing the same partitioning. The results of this sensitivity
analysis are shown in Figure 5 for the SRBench query, which graphs a Sankey
chart of the inter-task communication under both conditions, where the width
of the lines corresponds to the number of messages. As the figure clearly shows
the mixed hashing setting requires to reshuffle all data from the readers to the
processing nodes, while the equally partitioning setting provides a clean stream
setting. As a consequence, we can expect that badly pre-partitioned data would
not exhibit as good results as the ones we exhibited above.


5   Discussion and Limitations of the Results
The results shown in the section above show that using a graph partitioning algo-
rithm to schedule tasks instances on machines does indeed reduce the messages
sent over the network whilst only having a slightly less even load distribution.




Fig. 5. Two communication graphs (data flows from left to right)
Left: Input partitioned using different hash function than the one used by Storm.
Right: Input partitioned using identical hash function as the one used by Storm.

                                         93
The first part of the finding could be seen as almost tautological: it could be
understood as showing that graph-partitioning using a well-establish algorithm
is better than a partitioning that ignores network traffic but “only” focuses on
load distribution. We believe, however, that there are subtle considerations that
are less than obvious.
First, the critical element is to realize that the operators can be parallelized with
an adequate data partitioning approach not to “just” use graph partitioning. It
is the interplay of the two partitionings that enables the graph partitioning to
find a good schedule: inadequate data partition can lead to highly suboptimal
schedules as the results about the influence of data partitioning show.
Second, the principle of finding the smallest possible partition given the desired
degree of parallelism (see also Section 3.2) seems important. How important
needs to be investigated. Whilst the idea seems simple its details are intricate
and required careful analysis—a task that we will have to continue in the future
by further exploring the interactions between data and graph partitioning and
devise an automated model for optimizing it.

    One somewhat surprising outcome of our analysis is that the overall efficiency
of the system heavily depends not only on the consistent use of the same parti-
tioning function, but also on the compatibility of the values over which the data
is being partitioned. Using incompatible data partioning functions can result in
very poor performance as seen in section 4.2. If a topology contains operators
that partition over incompatible fields such as in the OpenGov query, graph par-
titioning is still useful, but much less effective as when working with compatible
fields. It is this observation which contains an interesting insight: linked data
stream processors should work with graph fragments rather than triples. “Natu-
rally” occuring graph fragments often contain interdependent graph elements.
If one pulls these fragments apart due to some partitioning function one might
have to gather them in a later join. Hence, it seems prudent to favor an approach
that leaves these fragment together if a later join is foreseeable.

    Our current analysis is based on some underlying assumptions. First, we as-
sumed that some network statistics are available at the onset. Whilst this may
not always be given, our findings shows that even a small amount of statistics
seem to produce adequate schedules. Hence, it seems straightforward to start
with an uniform distribution and then apply an incremental graph partitioning
approach [14] improving the schedule during run-time.
Second, we assumed that the processing load (both CPU and RAM) is propor-
tional to the number of messages received (i.e., constant operator complexity).
Whilst this assumption is definitely true for some operators (e.g., computing the
average) others may require more computational effort. We intend to address
this issue in future work.
Third, we assumed that the query topology was given. Obviously, queries could
be translated to various topologies; each of which would require its own schedule.
Hence, it would make sense to combine our approach with a query optimizer –
a task beyond the scope of this paper.

                                         94
    Also, our current evaluation has some limitations. First, it is limited to two
datasets and queries. Whilst the queries seem representative of many settings
we have seen we intend to significantly extend our evaluation in the future in
terms of number of datasets and queries. Second, all our evaluations were run on
a cluster with 12 machines, 1GB ethernet, and 24 cores each. Obviously, we will
have to extend our evaluation to investigate the interactions between number
of machines and cores available and the degree of parallelism “granted.” Third,
we will have to run throughput-analyses in real-world setups in addition to our
current network analysis adding number of messages ingested per second as KPI.


6   Conclusion and Outlook
In this study we investigated whether and how scheduling the tasks of Dis-
tributed Semantic Flow Processing (DSFP) systems benefits from applying graph
partitioning. We implemented our approach on the Katts DSFP engine and eval-
uated it using a query of the SRBench benchmark and a usecase inspired by the
open government movement with regards to network load. The results show that
using a graph partitioning algorithm to schedule task instances on machines does
indeed reduce the number of messages sent over the network. We also found that
this only leads to a slightly less even load distribution.
    The critical element for optimizing the scheduling using graph partitioning
is an adequate data partitioning for parallelizing the operators. Future work
will investigate whether the principle of finding the smallest possible data par-
tition given the desired degree of parallelism is as important as our experiments
indicate.
    Our study’s most important shortcomings are its limitation to two datasets
and queries and the fixed setup of the distributed system. For the first we intend
to systematically extend our evaluation in the future in terms of number of
datasets and queries. For the latter, is it the interactions between number of
machines and cores available and the degree of parallelism that require further
research. Especially the impact of such interactions on throughput in terms of
messages ingested per second is of interest here.
    We are confident that our findings help making DSFP systems more scalable
and ultimately enable reactive systems that are capable of processing billions of
triples or graph fragments per second with a negligible delay. It is our firm belief
that the key to addressing these challenges needs to and will have to be revealed
from the data itself.
Acknowledgements We would like to thank Thomas Hunziker, who wrote the
first prototype of the KATTS system during his master’s thesis in our group.


References
 1. Abadi, D., Carney, D., Cetintemel, U., Cherniack, M., Convey, C., Erwin, C.,
    Galvez, E., Hatoun, M., Maskey, A., Rasin, A., Et Al.: Aurora: a data stream
    management system. In: Proc. of the 2003 ACM SIGMOD. pp. 666–666 (2003)


                                        95
 2. Abadi, D.J., Ahmad, Y., Balazinska, M., Hwang, J.h., Lindner, W., Maskey, A.S.,
    Rasin, A., Ryvkina, E., Tatbul, N., Xing, Y., Zdonik, S.: The Design of the Borealis
    Stream Processing Engine. In: Proc. CIDR2005. pp. 277–289 (2005),
 3. Amini, L., Andrade, H., Bhagwan, R., Eskesen, F., King, R., Park, Y., Venkatra-
    mani, C.: Spc: A distributed, scalable platform for data mining. In: Proc. Workshop
    on Data Mining Standards, Services and Platforms, DM-SSP (2006)
 4. Anicic, D., Fodor, P., Rudolph, S., Stojanovic, N.: EP-SPARQL: a unified language
    for event processing and stream reasoning. In: WWW2011. pp. 635–644 (2011)
 5. Aniello, L., Baldoni, R., Querzoni, L.: Adaptive online scheduling in storm. In:
    DEBS2013 (2013),
 6. Barbieri, D.F., Braga, D., Ceri, S., Della Valle, E., Grossniklaus, M.: C-SPARQL:
    A Continuous Query Language for RDF Data Streams. Int. J. of Sem. Comp. 4(1),
    3–25 (2010)
 7. Calbimonte, J.p., Corcho, O., Gray, A.J.G.: Enabling Ontology-based Access to
    Streaming Data Sources. In: Proc. ISWC 2010 (2010)
 8. Cugola, G., Margara, A.: Processing flows of information. ACM Computing Sur-
    veys 44(3), 1–62 (Jun 2012),
 9. Hoeksema, J., Kotoulas, S.: High-performance Distributed Stream Reasoning using
    S4. In: First International Workshop on Ordering and Reasoning (2011)
10. Karypis, G., Kumar, V.: A Fast and High Quality Multilevel Scheme for Partition-
    ing Irregular Graphs. SIAM J. on Scientific Comp. 20(1), 359–392 (Jan 1998),
11. Komazec, S., Cerri, D.: Sparkwave: Continuous Schema-Enhanced Pattern Match-
    ing over RDF Data Streams. In: DEBS 2012 (2012)
12. Lajos, J.F., Toth, G., Racz, R., Panczel, J., Gergely, T., Beszedes, A.: Survey on
    Complex Event Processing and Predictive Analytics. Tech. rep., Citeseer (2010),
13. Le-phuoc, D., Dao-tran, M., Parreira, J.X., Hauswirth, M.: A Native and Adaptive
    Approach for Unified Processing of Linked Streams and Linked Data. In: Proc.
    ISWC 2011. vol. 7031, pp. 370–388 (2011)
14. Ou, C.W., Ranka, S.: Parallel incremental graph partitioning. Parallel and Dis-
    tributed Systems, IEEE Transactions on 8(8), 884–896 (1997)
15. Owens, T.: Survey of event processing. Tech. Rep. December, Air Force Research
    Laboratory Public Affairs Office (2007),
16. Pietzuch, P., Ledlie, J., Shneidman, J., Roussopoulos, M., Welsh, M., Seltzer,
    M.: Network-Aware Operator Placement for Stream-Processing Systems. In: Proc.
    ICDE2006 (2006)
17. Rinne, M., Nuutila, E., Seppo, T.: INSTANS : High-Performance Event Processing
    with Standard RDF and SPARQL. In: ISWC 2012 Post. & Demos. pp. 6–9 (2012)
18. Scharrenbach, T., Urbani, J., Margara, A., della Valle, E., Bernstein, A.: Seven
    Commandments for Benchmarking Semantic Flow Processing Systems. In: ESWC
    2013 (2013)
19. Vorburger, P., Bernstein, A.: Entropy-based Concept Shift Detection. In: Proc.
    ICDM2006. pp. 1113–1118 (2006)
20. White, T.: Hadoop: The definitive guide. O’Reilly Media, Inc., 3 edn. (2012),
21. Wolf, J., Bansal, N., Hildrum, K., Parekh, S., Rajan, D., Wagle, R., Wu, K.L., Fleis-
    cher, L.: SODA: An optimizing scheduler for large-scale stream-based distributed
    computer systems. In: Proc. Middleware2008 (2008)
22. Xia, C., Towsley, D., Zhang, C.: Distributed resource management and admission
    control of stream processing systems with max utility. In: Proc. ICDCS2007 (2007)
23. Zhang, Y., Duc, P.M., Corcho, O., Calbimonte, J.p.: SRBench : A Streaming RDF
    / SPARQL Benchmark. In: Proc. ISWC 2012 (2012)



                                           96