<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>Relation-Based In-Database Stream Processing</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>Christian Winter</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Thomas Neumann</string-name>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Alfons Kemper</string-name>
        </contrib>
      </contrib-group>
      <abstract>
        <p>Data analytics pipelines are growing increasingly diverse, with relevant data being split across multiple systems and processing modes. In particular, the analysis of data streams, i.e., high-velocity ephemeral data, is attracting growing interest and has led to the development of specialized stream processing engines. However, evaluating complex queries combining such ephemeral streams with historic data in a single system remains challenging. In this paper, we devise a novel stream processing technique that allows users to run ad hoc queries that combine streams and history tables in a relational database system. The backbone of our approach is a specialized ring-bufered relation, which allows for high ease of integration for existing database systems. We highlight the applicability of our approach by integrating it into the Umbra database system and demonstrate its performance against dedicated stream processing engines, outperforming them consistently for analytical workloads.</p>
      </abstract>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>1. Introduction</title>
      <p>⇞
with user_impact as (
select uid, avg(score) as score
from posts
group by uid
There is an ever-increasing need for just-in-time
analyses combining real-time data in the form of streams with ) ⨝
information held in databases, such as user, customer, or select u.username, u.contact_info,
billing data. Several solutions integrating durable data from uus.errasteu,,iu.ssecro_riempact i σ σ
in stream processing engines (SPEs) have been proposed, where u.id = i.uid
either loading read-only data into the stream processing and iu..srceogrieon&gt;== ’1D0E0’0 and Users Γ 
engine from local sources such as CSV files or ofering order by u.rate / i.score Posts
interfaces to external databases [1, 2, 3, 4]. However, the
reverse direction of integrating stream processing into (a) SQL (b) Query Plan
relational databases has yet to receive much attention. Figure 1: Exemplifying analytical query combining a stream
While modern SPEs are capable tools for many work- and durable relations.
loads, they lack the functionality to manage historic data
internally. We argue that the unmatched capabilities and
performance of relational database systems for managing express queries incorporating streams and regular tables.
and analyzing relational data make them the ideal solu- We demonstrate the applicability of our approach by
tion for processing durable relations and data streams. implementing it in the state-of-the-art database system</p>
      <p>In this paper, we devise a technique to integrate streams Umbra [5].
into database systems through a specialized streaming We outline the relation-based integration on an
exemrelation. By relying on a ring-bufered specialization of plifying workload, which we use as a running example
regular database relations, we can utilize the database throughout this paper: Consider a micro-blogging
sersystem’s full type and query support and gain access vice where users can share and like simple text posts.
to a wide range of pre-built functionality and operators, For reporting, such a service might be interested in
findsuch as eficient joins and string operations. Our ap- ing influential posters in a given region, e.g., users who
proach relies on regular SQL to interact with streams, achieved an average of at least 1’000 likes per post in the
necessitating no changes to the database grammar. Thus, last month for promotional campaigns. Figure 1 depicts
streams can be used with all tools commonly used for the corresponding query on an exemplifying schema.
database access, such as object-relation mapping (ORM) In many cases, business data, such as the contact info
libraries available for many programming languages. Fur- and payment details for paid bloggers, will be stored
thermore, the SQL-based interface allows users to easily separately from the service data, such as posts. In the
Joint Workshops at 49th International Conference on Very Large Data past, it was necessary to either find influential posters in
Bases (VLDBW’23) — Second International Workshop on Composable the service database or materialize posts in the business
Data Management Systems (CDMS’23), August 28 - September 1, 2023, database. The first option is undesirable as it involves
Vancouver, Canada analytical queries in a system likely optimized for simple
$ winterch@in.tum.de (C. Winter); neumann@in.tum.de lookup and update operations. In contrast, the second
(T. Neumann); kemper@in.tum.de (A. Kemper)</p>
      <p>© 2023 Copyright for this paper by its authors. Use permitted under Creative Commons License option would unnecessarily bloat the analytical database.
CPWrEooUrckReshdoinpgs IhStpN:/c1e6u1r3-w-0s.o7r3g ACttEribUutRion W4.0oInrtekrnsahtioonpal (PCCroBYce4.0e).dings (CEUR-WS.org) On the other hand, our approach allows us to stream the</p>
      <p>windows define a fixed window size l, either in terms of
libgunTm l4= tuplWeindow 1 Window 2 libgunTm lt4= Window 1 Window 2 wwthiiennddnoouwwmssbaealrdwvoaafyntsucapedlbevysanoacrseienbtyaletdnhugertfahutlislo, wnwithn.idWcohwhcialseinzteru,emsslubidlltiinningg
liiSgdn ;ls24== Window 1WindowW2indow 3 liiSgdn ;ltts42== WindowW1indowW2indow 3 faorrvoremivrleaospn.peFiiannngaollwtyh,ienurdnbobywopusen.rdSioeeddssswiooifnnidnwoawicntsidvcoiotwynsswiadhreeerrsetehnpeoaertnauttpierldee
eboddunnU Window isseSon ittte2ou=m Window 1 Window 2
ssttrrTeeaahmmeswf.oinrdaoqwuesreymaanndtiacrcel,otsheusts,tuonrseugiutalabrlerefloartiionnfiniptreoFigure 2: Overview of common stream windowing semantics. cessing is the unbounded window. However, this would
mean that queries over infinite streams will never report
a result as database queries only advance to the next step,
i.e., operator, once an input is fully depleted. Therefore,
last month’s posts into the analytical database without we instead follow the session window semantics, which
materializing them, requiring no analytical functionality is still close to relation scan semantics, and assume a scan
from the service database. This paper makes the follow- as depleted when no new stream tuples have arrived for
ing key contributions: the specified inactivity duration. Note that sliding and
• We describe the integration of stream processing tumbling windows can still be achieved on top of this
in database systems as a specialized ring-bufered session window using the SQL WINDOW operator.
relation. The second category, state management, determines
where and how systems manage the state of streaming
• We discuss the streaming model achieved by our queries. This state mainly comprises intermediate query
integration. results, such as aggregates, but also includes routing
• We evaluate our streaming relation against ded- and meta information, e.g., for worker and checkpoint
icated stream processing engines, focusing on management. In their survey, To et al. [6] identify four
end-to-end query and insert performance on a diferent state models for stream processing. Of those,
TPC-H-based workload. we most closely follow the operator view of Fernandez
The remainder of this paper is structured as follows: In et al. [7] wherein query progress and state are
materialSection 2, we discuss background and related work in ized within operators. However, due to Umbras
pipelinestream processing. Following, we discuss our approach based query execution model, query state only occurs
to in-database stream processing in Section 3, which we at pipeline breakers, not at all individual operators.
Furevaluate in Section 4. Finally, we conclude in Section 5. ther, distributing tuples to workers is handled through
morsel-driven parallelism [8] in our approach. Therefore,
routing decisions for tuples are made by downstream
op2. Background erators pulling new morsels, not actively and push-based
by upstream operators.</p>
      <p>Over the years, stream processing has evolved into a
diverse area of research, spanning a wide variety of data
stream models optimized for diferent applications. In
this section, we outline the model underlying our work
and discuss related work in the intersection between
stream processing and relational database systems.</p>
      <sec id="sec-1-1">
        <title>2.1. Stream Model</title>
      </sec>
      <sec id="sec-1-2">
        <title>2.2. Related Work</title>
        <sec id="sec-1-2-1">
          <title>Our approach overlaps with two primary research areas</title>
          <p>in data analytics: relational database systems and stream
processing. Both have seen vast amounts of research,
and we, therefore, focus our discussion of related work
on their intersection.</p>
          <p>While data stream models difer in many aspects, we fo- Durable data in stream processing engines. Recent
cus on the two main categories most relevant to our work: years have seen increased demand for analytics
comwindowing semantics and state management. Window- bining both historic and streamed data. Consequently,
ing semantics determines which subset of the stream qual- stream processing engines such as Apache Flink [2] and
ifies for query evaluation at any given moment. Among Apache Spark [1] enable the use of historic data in
anathe most common windowing semantics are sliding, tum- lytical queries over data streams. However, they do not
bling, session, and unbounded windows, shown in Fig- support managing historic data internally and instead
ure 2. The first two semantics are further subdivided rely on external sources. These external sources can be
into time- and tuple-based metrics. Tumbling and sliding ifle formats like Parquet and CSV or database systems
through connectors such as JDBC.</p>
          <p>While stream processing engines do not ofer
capabilities for managing historic data, modern SPEs man- uid username score content
angaellyst[a6t,e9f,o1r0l]o. nTgo-rpurnenvienngt acnodnfliccotsmbpeltewxeqeunemrieusltiinptleer- BTuupeler 12 Len5gth ChSrtirsing Data 9153Le5n0gth PIrte'sx 482tseO1
queries on a shared state, some SPEs rely on
transaction semantics commonly used by database systems [11,
12, 13]. TSpoon [14] extends Apache Flink [2] with a It's... uB trS
transaction model, thereby enabling a queryable state rse ign
for data stream analytics at configurable isolation
levels. In addition, Meehan et al. [15] build upon the OLTP
database system H-Store [16] and utilize H-Store’s trans- Figure 3: Overview of the caching layer consisting of a ring
actional processing model for data streams, enabling bufer for fixed size tuple parts and two string bufers used in
the ACID-compliant execution of streaming and transac- alternation.
tional database queries in a single system.</p>
          <p>In-database stream processing. Combining streams
and relational data in a single system has been proposed 3.1. Interface
in the context of data warehouse architectures [17, 18]. We want to rely purely on regular SQL grammar to
interHowever, these architectures rely on two separate en- act with data streams. However, we must also ensure the
gines for internal relational query and stream processing. database can infer which relations to treat as a stream and
Some works propose a unified SQL-based query language wthhaitcohf tPoippeelrisniesDt. BFo[r28t]hiasn, dwcerefoaltleowstraeasmynstaasxfsoirmeiiglanr
ttaoto express queries over both streams and durable rela- bles with a reserved server name stream. For our example
tions easily [19, 20]. stream posts, this will result in the SQL statement:</p>
          <p>Past research integrating both stream processing and
durable data in a single engine often rely on materialized crueiadte foreiignntetgaebrl,e posts (
views [21, 22] to realize continuous queries [23, 24, 25] username varchar,
through continuous views [26]. DBToaster [4, 27] im- score integer,
plements higher-order incremental view maintenance content varchar
in a standalone engine to enable high insert and query ) server stream
throughput for views combining both static and dynamic Following the creation of this foreign table, all
operadata. In addition, PipelineDB [28] supports stream pro- tions can be kept oblivious to the streaming nature of the
cessing in the full-fledged database system PostgreSQL relation. Inserts, therefore, can use regular SQL insert
using dedicated streaming views. semantics, e.g.,</p>
        </sec>
      </sec>
    </sec>
    <sec id="sec-2">
      <title>3. Approach</title>
      <sec id="sec-2-1">
        <title>Having defined the theoretical streaming model of our</title>
        <p>ring-bufer-based in-database stream processing
approach, we can describe its design and implementation.
The core diference between our in-database stream
processing to processing relational data is that streamed
data is not fully and permanently materialized within the
database at the start of a query. For regular queries, the
data that a query is working on is determined by its
transaction. Each transaction has a single state of the database
that it will evaluate all queries on. To achieve this, all
data must be fully materialized at the start of a query,
and any parallel changes must be handled transparently.
Streams, on the other hand, are not transactional. Stream
entries are ephemeral and are only cached in the database
for a short time. Queries, therefore, cannot rely on the
availability of all stream elements for query evaluation.
Furthermore, queries involving streams have to handle
stream arrivals during a query.
insert into posts values (
12, ’Chris’, 9153,
’It’s great to process streams
in a database system!’
)
to insert the tuple displayed in Figure 3.</p>
        <sec id="sec-2-1-1">
          <title>3.2. Caching Layer</title>
        </sec>
      </sec>
      <sec id="sec-2-2">
        <title>Before discussing inserts and queries to data streams, we</title>
        <p>must establish how streaming data is stored within the
database. Conceptually, ephemeral streams do not need
to be materialized outside of queries and instead can be
processed fully and directly on arrival. However, it is
beneficial to cache chunks of the stream before processing.</p>
        <p>For one, a cache can compensate load spikes in the input
stream where inserts occur too frequently for queries
to keep up. Using a cache can help alleviate such short
spikes by accepting tuples to be scanned by queries later
on. Furthermore, a cache allows re-using existing scan
logic and interfaces for durable relations, reducing the
integration overhead and enabling eficient morsel-wise
input processing.</p>
        <p>For our approach, we implement a caching layer based Algorithm 1 Stream caching layer insert processing
on a ring bufer. Our bufer has two main components, 1: function processInsert(Tuple t)
displayed in Figure 3. The first component, the tuple 23:: tsildot← ← wtriditeTmiodd.febtucfhfeArdSdiz(e1)
bufer, stores fixed-size tuple data. This data includes all 4: odd ← ⌊ tid/bufferSize⌋ mod 2
ifxed-size columns, such as integer, numeric, and floating- 56:: isftrsilnogtB=uf0fetrhe← n stringBuffers[odd]
point values, as well as metadata for variable-sized types, 7: stringBuffer.clear()
such as strings. In our example posts stream of Figure 1, 8: for val ∈ tuple do
these are the values for the integer columns uid and score 190:: if viafls.itsrSintrgiLnegn()gtthhe(nval) &gt; 12 then
and the metadata for the string columns username and 11: buffer.storeExtString(slot,
content. The number of tuples to be held in this bufer can 12: stringBufeflesre.store(val))
be configured to fit the expected load. By only storing 13: buffer.storeStringInPlace(slot, val)
metadata for variable-sized data and not the data itself [5], 1145:: elsebuffer.store(slot, val)
we ensure that all tuples have the same size, allowing us 16: /* Delay scan visibility until all previous tuples are visible */
tsotilelavsaillyidred-autase. slots without checking for overlaps with 111879::: vwahliiwldeaTviuatl(pi)dleTsup← les &lt;tidtid - 1 do</p>
        <p>The data for variable-sized types are instead stored in
resizable bufers pointed at by the metadata. In the
exemplifying tuple in Figure 3, one can see the two diferent lation in bulk for performance reasons. For each insert,
storage formats for strings used by Umbra. Both formats we acquire a tuple identifier for the new tuple (Line 2).
ifrst store the 4 byte length of the string. Short strings This id determines the ring bufer slot to store the tuple
up to a length of 12 characters, such as this username, in. Furthermore, it determines the string bufer to be
are stored inline in the remaining 12 bytes, requiring used for external strings. Before writing the first slot
no additional bufer storage. For longer strings, here for of the ring bufer, we additionally mark the
correspondthe content, we store a 4 byte prefix and an ofset into a ing string bufer for cleanup (Line 7). Note that while
separate storage region. For streams, this region is one of there are no longer any direct references into the string
the string bufers. The prefix helps quickly answer some bufer from the ring bufer, we still do not free the
memcomparisons and filter predicates without loading the ory region to not interfere with queries still processing
full string. Note that Umbra picks the string format for bufer entries that were just overwritten. Instead, the last
each individual string value, meaning a longer username scan to finish on this string bufer will free the associated
for another tuple might be stored externally. memory when it is completed. We will discuss string</p>
        <p>In contrast to the fixed-size tuple data, we cannot use a bufer memory management when discussing queries in
ring bufer for strings. When we would insert a string at Section 3.4.
slot  in a ring bufer longer than the string of the tuple Following, we write the tuple data into the ring bufer
previously cached at slot , it would at least partially slot. For strings, we decide between the two storage
overwrite the string data of the tuple still held in slot +1. layouts outlined in Section 3.2 based on the string length.
This tuple is, however, still accessible through the cache. All other values are stored in-place in the ring bufer.
To prevent overwriting still valid and accessible tuples, Finally, we mark the tuple as valid to make it visible
we alternate between two resizeable string bufers for for scans (Line 19). To prevent partially-written cache
diferent runs through the ring bufer, using one for even entries of parallel inserts from being accessible for scans,
and one for uneven runs. Alternating between bufers we only mark new tuples as visible once all previous
guarantees that string ofsets are valid at least until the tuples are visible.
tuple is overwritten in the tuple bufer while avoiding
expensive allocations for every individual string.</p>
        <sec id="sec-2-2-1">
          <title>3.3. Insert Processing</title>
        </sec>
      </sec>
      <sec id="sec-2-3">
        <title>Having outlined the layout of our caching layer, we can</title>
        <p>describe the insert process for stream tuples. While
tupleat-a-time inserts are also possible, we optimize for bulk
inserts from an external streaming broker like Kafka [29]
or SQL insert statements, e.g., reading from CSV files.
The process is outlined in Algorithm 1 conceptually for
a single tuple. In our implementation, we perform such
inserts at morsel-granularity instead, collecting inserted
tuples thread-locally before merging them into the
re</p>
        <sec id="sec-2-3-1">
          <title>3.4. Query Processing</title>
          <p>Through our specialized relation, the only operator in
a query plan aware of an input’s streaming nature is
the table scan. All other operators can be kept oblivious
about the nature of their input. While this integration is
minimally invasive, it requires a careful design of the scan
operator. Scans of regular relations rely on table metadata
to determine the range of tuples to scan at query planning
time which further determines the boundaries for the
scan at execution time. For streams, however, we cannot
rely on the scan boundaries to be known as tuples will
Algorithm 2 Stream scan operator morsel picking
1: function selectScanRange
2: morsel ← {}
3: while now() − lastPick.load() &lt; timeout do
4: limit ← validTuples.load()
5: position ← lastScanned.load()
6: loop
7: if limit ≤ position then
8: updateLimit()
9: break
10: lastPick.exchange(now())
11: if pickRange(morsel, limit, position) then
12: return morsel
13: return ScanDone
still arrive during the query execution. Even cardinality
estimates for query optimization can be unreliable as
past stream behavior does not necessarily reflect future
behavior. Therefore, we need to adapt query processing
in two areas to handle streams eficiently: query planning
and scan operator design.</p>
          <p>Scan
4</p>
          <p>Scan Bu er</p>
          <p>pointer
uery Memory
4</p>
          <p>Relation
2
e
g
1 nardetceles
scan o set
write o set</p>
          <p>3
next operator. There are two diferent possibilities to
achieve this: Sending a dedicated end-of-stream tuple or
3.4.1. Query Planning message signaling that the input is depleted or
detecting input depletion from metadata, such as arrival rate.</p>
          <p>For query planning, especially for join ordering, database We focus on the latter, which is most consistent with
systems rely on cardinality estimates for scans and filter our approach of handling streams transparently using an
predicates. These estimates are sourced from statistics SQL interface. All tuples arriving after detected depletion
maintained by inserts and updates to the relation. We can be handled in a new window for the query or are
cannot assume that previous statistics are available and considered not part of the stream.
reliable for streams. However, we still want the opti- For simplicity, we rely on session window semantics
mizer to produce an optimized query plan, especially to to achieve this behavior, advancing to the next step of
reduce materialized intermediate result sizes in the case query processing when no new tuples have arrived for a
of high-volume data streams. We assume streams are predefined timeout. For our integration into Umbra, we
of the largest cardinality and, therefore, want to only integrate this session window semantic into the morsel
materialize them if necessary. While the optimized query selection [8] where range-based metrics reside for regular
plan of Figure 1b only materializes the aggregated scores scans. Algorithm 2 outlines the resulting strategy. Before
per user, an unoptimized plan without statistics might obtaining a scan range, we check the timeout condition
ifrst join the users and posts relations, potentially materi- (Line 3). If no thread detected new arrivals during this
alizing the posts stream in the join hash table. To avoid timeout, we continue with the next query processing
this, we hint to the optimizer that streams will always task as we consider the stream input depleted. Following,
comprise the most data, thus ordering them to the probe we fetch the latest range information and check if tuples
side of joins. are available for processing (Line 7). If not, we return
to the timeout check of line 3. Once tuples are available,
3.4.2. Scan Operator we update the timeout condition (Line 10) and try to
In contrast to scan operators for durable relations, our pick a morsel. Note that the unchecked change to the
stream scan operator has to mask two things: unknown timeout condition can lead to a slight imprecision for
input bounds and ephemerality of tuples. As we, apart the timeout, as it might lead to the loss of a more recent
from the scan, entirely rely on existing database opera- timestamp. However, we deem this acceptable as it allows
tors for query processing, we also have to adhere to the us to reduce synchronization overhead.
execution model of the database system. In our system We still must mask the second property of streams,
Umbra, this is the producer-consumer model [30]. Gen- their ephemerality. The ephemeral nature of streaming
erally, database execution models will process a block- data does not impact the range selection outlined above,
ing operator entirely before starting work on the next. which is performed entirely on tuple ids. However, once
Streaming models, on the other hand, replace this block- the scan tries to access the corresponding values in the
ing semantics with running or windowed aggregates. To tuple bufer, we must ensure that the scan can never
enachieve similar semantics for database systems, we must counter values overwritten by concurrent inserts. This
determine when we can move query processing to the is especially important for string values with externally
stored data, such as the content value of Figure 3, where 40
ttarytiionng ftaoualtc.cFeisgsuarne 4indveapliidctsvaoluuresccoaunldstlreaatdegtyo. aFisresgt m(1en)-, /sse 30 FSUplminabrkkra /sh 120
we find the selected range in the bufer based on the tuple ltpu 20 ireeu 80
ids and prevent the deletion of the corresponding string M10 T Q 40 T
region through reference counting. Following ( 2 ), we 0 SM 0 SM
copy all fixed-size tuple data in the range into a sepa- 12 4 8 16 28 32 2 4 8 16 28 32
rate bufer residing in the scan operator. After copying # Threads # Threads
all tuples in the range, we check to see if concurrent
writes have overwritten any tuples that we have scanned
( 3 ). In case of an overlap, we cannot guarantee that (a) Insert (b) Query
all tuples in the scan bufer are the desired tuples and, Figure 5: Insert and query throughput in Umbra, Flink and
thus, have to abort the scan. If all scanned tuples are still Spark.
valid, we report them to the downstream operator of the
pipeline 4 . At this stage, the first downstream operator
materializing the tuples relocates the out-of-place con- using the batch table API based on CSV data. Further,
tent for strings into query memory, thereby protecting we allow Flink’s optimizer to re-order joins by setting
them from deletion and preventing segmentation faults the TABLE_OPTIMIZER_JOIN_REORDER_ENABLED flag.
in case of concurrent inserts. After all tuples of the scan We submit all queries to Flink using its SQL interface. All
range were processed by the downstream operators of intermediate results that Flink requires for processing
the scan’s pipeline, we release our reference to the rela- are located in an in-memory file system.
tion’s string bufer, freeing the corresponding memory Spark. We implement our TPC-H-based workload in
region if we held the last reference. Spark version 3.3.2. All queries are expressed using
the Spark SQL API on external CSV data frames. Jobs
are submitted to a local standalone spark cluster using</p>
        </sec>
      </sec>
    </sec>
    <sec id="sec-3">
      <title>4. Evaluation spark-submit.</title>
      <p>Umbra. We implement a streaming relation in Umbra
Having outlined our relation-based approach to in-da- as outlined in the previous section. Further, we create
tabase stream processing, we evaluate its performance all relations except for lineitem as durable relations in
against two popular dedicated stream processing engines, Umbra. Lineitem is created as a stream using the
interApache Flink [2] and Apache Spark [1]. We focus our face described in Section 3.1. We subtract the session
evaluation on data ingestion rates, scalability, and per- window timeout from Umbra’s query runtimes as the
formance on a mix of simple stream aggregation and system is idle during this period. Both Flink and Spark
complex analytical queries. are run in non-windowed configuration and are, thus,
not introducing similar delays.
4.1. Setup</p>
      <sec id="sec-3-1">
        <title>We perform all experiments in this section on a server</title>
        <p>equipped with 256 GB DDR3 main memory and an Intel
Xeon E5-2660 v2 CPU with 28 physical cores. All data for
the experiments is stored on a Samsung 970evo NVME
SSD. Results reported in this section are based on the
geometric mean of 5 runs taken after 2 warmup runs.
Workload. We base our experiments on the TPC-H
benchmark [31] at scale factor 100. To transform TPC-H
to a stream analytics workload, we consider the largest
relation by far, lineitem, to be a stream. All other relations
are considered durable and materialized at the start of a
query. Consequently, we only include queries with
exactly one scan of the lineitem relation in our experiments.
Due to issues with Flink, we had to remove queries 5 and
8 from our benchmark. We do not print the query result
in any of the systems.</p>
        <p>Flink. We implement a standalone Flink executable
based on Flink version 1.6.1 and express all relations</p>
        <sec id="sec-3-1-1">
          <title>4.2. Stream Ingestion</title>
        </sec>
      </sec>
      <sec id="sec-3-2">
        <title>As a first experiment, we examine the data ingestion rate</title>
        <p>ofered by the three systems. For this, we fully insert the
lineitem relation once into each system. As Spark and
Flink rely on pull-based semantics for eficient analytical
queries and do not ofer full support for push-based
inserts, we express inserts to them as SELECT COUNT(*)
FROM lineitem queries. In contrast, we rely on
pushbased semantics as this never delays inserting workers
and, therefore, imposes fewer requirements on inserting
systems. For Umbra, we use the bulk insert command
COPY lineitem FROM CSV.</p>
        <p>Figure 5a shows the insert performance in millions of
tuples per second along the number of insert threads. All
systems show near-linear scalability until simultaneous
multithreading (SMT) is reached. Overall, Flink ofers the
best insert performance, outperforming Umbra by a
factor of 1.8 for 32 threads. This advantage can be attributed
to the slight diference in the semantics of our insert
queries. Our approach has to fully process all columns
to materialize them in the ring bufer. In contrast, Flink
can simply count the number of rows without parsing
them entirely. While we expect this advantage to
disappear for more complex queries where multiple columns
must be parsed, it is very beneficial for such simple
workloads. Furthermore, the lineitem relation comprises far
more columns than are used by the average query. In our
running example of Figure 1, we would, of course, only
stream the necessary columns into the system, which
benefits both Spark and Umbra. Table 1 shows the
resulting speedup of inserting only the four columns relevant
for TPC-H Q6 over a full lineitem insert. While Flink also
benefits from less data being processed, Umbra and Spark
can more than double their insert throughput. For this
reduced lineitem stream, Umbra almost closes its insert
performance gap with Flink.
4.3. Query Performance
d
a
eh 10.0
r
e
vO 7.5
t
r
sen 5.0
I
-to 2.5
y
r
e
u
Q</p>
        <p>Approach</p>
        <p>Flink</p>
        <p>Spark</p>
        <p>Umbra
99%
75%
50%
25%
1%
2
4
8 16
# Threads
28
32
query. Table 1 shows the speedup in query performance
for TPC-H Q6 for a stream of only the four required
columns. As for inserts, we can see that Umbra and Spark
benefit more from this minimized stream than Flink, with
Umbra achieving the highest speedup. Finally, we want
to investigate the overhead that queries introduce in the
system on top of the work for data ingestion. Figure 6
shows the relative overhead that evaluating our
TPC-Hbased benchmark creates for each system. The overhead
of Spark is nearly constant, independent of the number
of threads used. However, Flink’s queries scale worse
than its inserts. The drastically higher overhead for Flink
confirms our assumption of Section 4.2 and indicates that
Flink heavily optimizes for the count(*) query that we
used to emulate inserts. For Umbra, we see next to no
overhead when executing queries in addition to inserts.
This further highlights the advantage of streaming only
required columns into the database, as the speedup we
have seen for inserts in Table 1 fully translates to query
speedup.</p>
      </sec>
      <sec id="sec-3-3">
        <title>Having analyzed the data ingestion capabilities of all</title>
        <p>approaches, we now focus on their analytical
capabilities. For this, we run a combined workload of the twelve
TPC-H queries selected for our benchmark, running each
query once. For Umbra, the specified number of threads
is the total number available to the system, which must
be shared between query and insert processing. We con- 5. Conclusion
currently schedule two queries in Umbra, one inserting
the lineitem stream from CSV and another evaluating the In this paper, we devised a technique for relation-based
TPC-H query on the inserted stream. Figure 5b shows stream processing in relational database systems.
Rethe throughput in queries per hour for all three systems. lying on a ring-bufered relation for stream processing
Even though Umbra has to split the available workers be- provides high ease of integration for existing database
tween insert and query processing queries, it consistently systems, enabling database systems to handle
streamoutperforms both Flink and Spark, independent of the enrichment queries combining transient with durable
number of available worker threads. Furthermore, we see data. To demonstrate the applicability of this relation, we
that the advantage Flink had when ingesting data into integrated it into the code-generating Umbra database
the system does not transfer to query analytics, where system.
multiple columns have to be parsed. Furthermore, we can Using the implementation within Umbra, we
demonsee scale-up issues with Flink as its relative performance strated the performance of our streaming relation in
degrades with increasing thread count, being overtaken a number of end-to-end benchmarks against dedicated
by Spark when using more than eight threads. Having stream-processing engines. Our approach consistently
considered the analytical performance of all approaches outperforms dedicated stream processing engines on
anfor the full lineitem stream, we want to investigate the alytical streaming workloads while requiring only
miniinfluence of processing only the columns required for a mal changes to the database system’s execution model.
[16] R. Kallman, H. Kimura, J. Natkins, A. Pavlo, A. Rasin,</p>
        <p>S. B. Zdonik, E. P. C. Jones, S. Madden, M.
Stone[1] M. Zaharia, R. S. Xin, P. Wendell, T. Das, M. Armbrust, braker, Y. Zhang, J. Hugg, D. J. Abadi, H-store: a
highA. Dave, X. Meng, J. Rosen, S. Venkataraman, M. J. performance, distributed main memory transaction
proFranklin, A. Ghodsi, J. Gonzalez, S. Shenker, I. Stoica, cessing system, Proc. VLDB Endow. 1 (2008) 1496–1499.
Apache spark: a unified engine for big data processing, [17] Y. Watanabe, S. Yamada, H. Kitagawa, T. Amagasa,
InCommun. ACM 59 (2016) 56–65. tegrating a stream processing engine and databases for
[2] P. Carbone, A. Katsifodimos, S. Ewen, V. Markl, S. Haridi, persistent streaming data management, in: DEXA,
volK. Tzoumas, Apache flink ™: Stream and batch processing ume 4653 of Lecture Notes in Computer Science, Springer,
in a single engine, IEEE Data Eng. Bull. 38 (2015) 28–38. 2007, pp. 414–423.
[3] B. Chandramouli, J. Goldstein, M. Barnett, R. DeLine, [18] K. Nakabasami, T. Amagasa, S. A. Shaikh, F. Gass, H.
KitaJ. C. Platt, J. F. Terwilliger, J. Wernsing, Trill: A high- gawa, An architecture for stream OLAP exploiting SPE
performance incremental query processor for diverse and OLAP engine, in: IEEE BigData, IEEE Computer
analytics, Proc. VLDB Endow. 8 (2014) 401–412. Society, 2015, pp. 319–326.
[4] C. Koch, Y. Ahmad, O. Kennedy, M. Nikolic, A. Nötzli, [19] N. Jain, S. Mishra, A. Srinivasan, J. Gehrke, J. Widom,
D. Lupei, A. Shaikhha, Dbtoaster: higher-order delta H. Balakrishnan, U. Çetintemel, M. Cherniack, R.
Tibprocessing for dynamic, frequently fresh views, VLDB J. betts, S. B. Zdonik, Towards a streaming SQL standard,
23 (2014) 253–278. Proc. VLDB Endow. 1 (2008) 1379–1390.
[5] T. Neumann, M. J. Freitag, Umbra: A disk-based system [20] E. Begoli, T. Akidau, F. Hueske, J. Hyde, K. Knight, K. L.
with in-memory performance, in: CIDR, www.cidrdb.org, Knowles, One SQL to rule them all - an eficient and
syn2020. tactically idiomatic approach to management of streams
[6] Q. To, J. Soto, V. Markl, A survey of state management in and tables, in: SIGMOD Conference, ACM, 2019, pp.</p>
        <p>big data processing systems, VLDB J. 27 (2018) 847–872. 1757–1772.
[7] R. C. Fernandez, M. Migliavacca, E. Kalyvianaki, P. R. [21] O. Shmueli, A. Itai, Maintenance of views, in: SIGMOD
Pietzuch, Integrating scale out and fault tolerance in Conference, ACM Press, 1984, pp. 240–255.
stream processing using operator state management, in: [22] H. Gupta, Selection of views to materialize in a data
SIGMOD Conference, ACM, 2013, pp. 725–736. warehouse, in: ICDT, volume 1186 of Lecture Notes in
[8] V. Leis, P. A. Boncz, A. Kemper, T. Neumann, Morsel- Computer Science, Springer, 1997, pp. 98–112.
driven parallelism: a numa-aware query evaluation [23] D. B. Terry, D. Goldberg, D. A. Nichols, B. M. Oki,
Continframework for the many-core age, in: SIGMOD Confer- uous queries over append-only databases, in: SIGMOD
ence, ACM, 2014, pp. 743–754. Conference, ACM Press, 1992, pp. 321–330.
[9] S. A. Noghabi, K. Paramasivam, Y. Pan, N. Ramesh, [24] S. Babu, J. Widom, Continuous queries over data streams,
J. Bringhurst, I. Gupta, R. H. Campbell, Stateful scal- SIGMOD Rec. 30 (2001) 109–120.
able stream processing at linkedin, Proc. VLDB Endow. [25] L. Liu, C. Pu, R. S. Barga, T. Zhou, Diferential evaluation
10 (2017) 1634–1645. of continual queries, in: ICDCS, IEEE Computer Society,
[10] B. D. Monte, S. Zeuch, T. Rabl, V. Markl, Rethinking 1996, pp. 458–465.</p>
        <p>stateful stream processing with RDMA, in: SIGMOD [26] C. Winter, T. Schmidt, T. Neumann, A. Kemper, Meet me
Conference, ACM, 2022, pp. 1078–1092. halfway: Split maintenance of continuous views, Proc.
[11] S. Zhang, J. Soto, V. Markl, A survey on transactional VLDB Endow. 13 (2020) 2620–2633.</p>
        <p>stream processing, CoRR abs/2208.09827 (2022). [27] M. Nikolic, M. Dashti, C. Koch, How to win a hot dog
eat[12] I. Botan, P. M. Fischer, D. Kossmann, N. Tatbul, Trans- ing contest: Distributed incremental view maintenance
actional stream processing, in: EDBT, ACM, 2012, pp. with batch updates, in: SIGMOD Conference, ACM, 2016,
204–215. pp. 511–526.
[13] P. Götze, K. Sattler, Snapshot isolation for transactional [28] PipelineDB, PipelineDB - high-performance time-series
stream processing, in: EDBT, OpenProceedings.org, aggregation for postgresql, 2023. URL: https://github.
2019, pp. 650–653. com/pipelinedb/pipelinedb.
[14] L. Afetti, A. Margara, G. Cugola, Tspoon: Transactions [29] J. Kreps, N. Narkhede, J. Rao, Kafka: A distributed
meson a stream processor, J. Parallel Distributed Comput. saging system for log processing, in: Proc. NetDB,
vol140 (2020) 65–79. ume 11, 2011, pp. 1–7.
[15] J. Meehan, N. Tatbul, S. Zdonik, C. Aslantas, U. Çet- [30] T. Neumann, Eficiently compiling eficient query plans
intemel, J. Du, T. Kraska, S. Madden, D. Maier, A. Pavlo, for modern hardware, Proc. VLDB Endow. 4 (2011) 539–
M. Stonebraker, K. Tufte, H. Wang, S-store: Streaming 550.
meets transaction processing, Proc. VLDB Endow. 8 [31] T. P. P. C. (TPC), Tpc benchmark h: Standard
specifica(2015) 2134–2145. tion, 2021. URL: http://www.tpc.org/.</p>
      </sec>
    </sec>
  </body>
  <back>
    <ref-list />
  </back>
</article>