<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>StreamConnect: Ingesting Historic and Real-Time Data into Unified Streaming Architectures</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>Philipp Zehnder</string-name>
          <email>zehnder@fzi.de</email>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Dominik Riemer</string-name>
          <email>riemer@fzi.de</email>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <aff id="aff0">
          <label>0</label>
          <institution>FZI Research Center for Information Technology Haid und Neu Str.</institution>
          <addr-line>10 14 76131 Karlsruhe, Germany 76131 Karlsruhe</addr-line>
          ,
          <country country="DE">Germany</country>
        </aff>
      </contrib-group>
      <abstract>
        <p>The web of things provides a steadily increasing amount of both real-time and historic data sources. Yet widespread standards are missing and the heterogeneity of data formats and communication protocols makes the integration of such sources a challenging task often requiring for manual programming effort. This paper presents a novel, lightweight semantics-based approach to quickly connect heterogeneous data sources to stream processing systems. Our main contributions are i) a new model to represent characteristics of data streams and data sets such as schema and quality independent from the actual run-time format, ii) generic data adapters and methods to automatically discover these characteristics at runtime and iii) a distributed architecture to pre-process (e.g. clean and filter) raw data coming from these adapters directly on the edge before data is processed by a stream processing engine. Our contribution eases the ingestion of batch and real-time data into unified streaming architectures.</p>
      </abstract>
      <kwd-group>
        <kwd>Stream Processing</kwd>
        <kwd>Data Ingestion</kwd>
        <kwd>Semantic Web</kwd>
      </kwd-group>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>-</title>
      <p>In recent years, emerging trends such as the web of things have led to an
enormous data growth. For instance, manufacturing companies more and more
gather, besides existing data sources such as master and customer data, also
massive amounts of real-time data sources coming directly from shop floors. At
the same time, the web benefits from many data sources that are publicly made
available by means of open APIs.</p>
      <p>One major benefit of this trend is the ability to integrate and process such
sources in real-time as a basis for advanced analytic operations, enabling
companies to find correlations such as incident patterns early or even ahead of time.</p>
      <p>
        From an information management perspective, architectural patterns such as
publish/subscribe systems have gained popularity, enabling enterprises to
establish so-called data backbones that collect data in a single, yet distributed
messaging system such as Apache Kafka [
        <xref ref-type="bibr" rid="ref1">1</xref>
        ]. At the same time, modern distributed
streaming engines such as Apache Flink [
        <xref ref-type="bibr" rid="ref2">2</xref>
        ] are able to process both real-time
and historical data in a unified streaming architecture. Such architectures, also
known as Kappa architecture [
        <xref ref-type="bibr" rid="ref3">3</xref>
        ], reduce the effort to deploy and maintain two
different code bases for batch processing of historical data and stream processing
for quickly computing real-time views required by other Big Data architectures
such as the Lambda architecture [
        <xref ref-type="bibr" rid="ref3">3</xref>
        ].
      </p>
      <p>
        However, a still remaining open problem is the accessibility and ingestion of
data sources into such architectures for further processing. The development of
adapters for individual data sources is still a highly manual task [
        <xref ref-type="bibr" rid="ref4">4</xref>
        ] due to the
heterogeneity of protocols and diversity of data formats. This usually requires
for both technological expertise as well as domain knowledge to understand the
meaning of gathered data. The main objective of this paper is to reduce the
technical effort for the integration of new data sources into a big data streaming-only
infrastructure by introducing a semantics-based adapter concept. This objective
poses a number of technical challenges and requirements that need to be
considered:
– Temporal Aspect: Adapters need to be able to handle both real-time and
historical data.
– Adapter Configuration: A solution must provide a higher level of
abstraction to enable domain experts to configure adapters, which still requires a
lot of technical understanding.
– Data Cleaning: Since adapters are often long running processes, they
should ensure that the quality of the data does not change over time.
– (Edge) Pre-Processing: Simple pre-processing steps such as filtering,
transforming or aggregating data should be executed locally close to the sensor
to avoid sending noisy data to the messaging system.
      </p>
      <p>This paper is structured as follows: In section 2, we present a motivating
scenario that illustrates the need and general approach of our contribution. Section
3 introduces an event model we developed for both raw-data and semantically
described virtual sensors. After defining the model, section 4 describes the adapter
architecture and illustrates how a new adapter can be modeled. Finally, section
5 presents the related work followed by section 6, conclusion and outlook.
2</p>
    </sec>
    <sec id="sec-2">
      <title>Motivating Scenario</title>
      <p>
        This section provides an illustrative scenario that shows the challenges that
must be solved when integrating multiple heterogeneous data sources with a
single adapter concept. As an example we will present how an adapter for a
new temperature sensor on an oil rig is created. The events are stored in a
message broker, like Apache Kafka, and can then be used by other systems like
StreamPipes1 [
        <xref ref-type="bibr" rid="ref5">5</xref>
        ] to build and execute processing pipelines, shown in figure 1.
1 http://streampipes.fzi.de
      </p>
      <p>
        To create a new adapter, the user makes use of a model editor, which realizes
a guided process where the user has to provide mandatory information about the
data in a graphical interface. As a first step in the modeling process, it must be
defined what kind of data should be processed: Real-time data or historic data
(e.g. a CSV file). In our example, we assume that the temperature sensor
originates from a real-time data source. Afterwards, the communication protocol for
accessing the data source needs to be selected based on several available options.
In our example, the temperature sensor provides a REST interface that must
be polled every second. The modeling process is semi-automatic and the system
tries to guess as much as possible, like the format of data and semantic content.
This is done based on the available meta data or by extracting sample data from
the source. Once the adapter has been initialized, raw data is processed in the
pipeline according to rules that are inferred from the model and is transformed
into the virtual sensor representation. In our example, the pipeline filters out all
events with a temperature higher then 350. This rule is automatically created by
the framework due to the RDF description of the virtual sensor which describes
the measurement range to be lower then 350. Furthermore, the raw data event
is transformed into JSON. Finally, events are emitted by the adapter and put
onto a message broker. They can then be consumed by processing engines, like
for example StreamPipes or Apache Spark [
        <xref ref-type="bibr" rid="ref6">6</xref>
        ].
3
      </p>
    </sec>
    <sec id="sec-3">
      <title>Event Model</title>
      <p>
        This section introduces an RDFS-based event model, which is based on the
Semantic Event Producer model of [7, section 7.3] and further re-uses parts
of the Semantic Sensor Network Ontology [
        <xref ref-type="bibr" rid="ref8">8</xref>
        ] and the QUDT Ontology [
        <xref ref-type="bibr" rid="ref9">9</xref>
        ] for
representing measurement units. We present two variations of the model, the
raw data model and the virtual sensor. The raw data model is a subset of the
virtual sensor model, which contains only basic information about the data.
Both models can be seen in figure 2, with the raw data model highlighted in
bold. The event model is instantiated in a design phase, once a new adapter is
being created by a domain expert.
      </p>
      <p>The raw data model has two types of data sequences, a Data Set and a Data
Stream, each of them has a grounding, the protocol and a format. Further, it has
a simple event schema that consist of a runtimeName, for example the column
name of a CSV table, and the according runtimeType, the type of data that
is stored in the table column (e.g. String or Integer). Besides primitive types,
an event schema can also describe nested structures and lists. Additionally, a
measurement unit for properties can be provided (e.g. temperature measured in
degrees celsius or fahrenheit).</p>
      <p>The virtual sensor has a more detailed semantic description as shown in
figure 2. The Data Sequence is produced by a Data Producer. Further qualities,
like the frequency of a Data Sequences can be described and it has one event
schema, consisting of multiple EventProperties. They can have different
PropertyQualities (e.g. accuracy or precision of a sensor measurement), a runtime
name (often the same as the runtime name of the raw data model), and a
domain property for example ’http://schema.org/location’. Properties might also
be modeled as lists or nested structures. The EventPropertyPrimitives have a
runtime type and a unit. The Functionality Enumeration can be used to mark a
property for example as a timestamp. Furthermore, the ValueSpecification can
be used in order to restrict possible values of the property.
4</p>
    </sec>
    <sec id="sec-4">
      <title>Adapter Architecture and Modeling</title>
      <p>This section describes the runtime-architecture of the adapters. Adapters are
modeled via a graphical user interface by a domain expert and are automatically
instantiated.</p>
      <p>Figure 3 shows the different components of the adapter architecture. In the
beginning, a data converter has the main task to establish a connection to the
data source, collect data and transform it into the internal raw data format. After
data is available in the internal format, it is transformed into the virtual sensor
data representation via the raw data pipeline. This pipeline is explained in more
detail later in this section. The last component of the adapter is a broker, where
virtual sensor events are sent to in order to be consumed by other applications
and tools. All adapters have two interfaces, one for accessing real-time data and
one for providing the schema description. Data set adapters have an additional
interface to start a data replay and to serve data from different time-slots to
multiple consumers. This is not needed for real-time data since all events are
immediately emitted as they are produced.</p>
      <p>Raw Data Pipeline
Each adapter has its own individual raw data pipeline. Those pipelines are
created automatically according to the model defined by the user and consists of
multiple processing agents, which are based on previous work we have
implemented in StreamPipes. A user can use and configure the agents via a graphical
user interface. In this work, agents are configured automatically according to the
model description.</p>
      <p>Within a pipeline, multiple data transformations can take place. In general,
our idea is to use the defined semantics to automatically transform data during
runtime. Such transformations could be enrichments with context information,
filter out unreasonable values and transformations to ensure that the
resulting events always have the same schema and quality. The first component in
a pipeline is always the source providing raw-data while the last one is always
a sink that writes the virtual sensor values to the resulting broker. Currently
there is support for five kinds of agents in the pipeline, a structural transformer,
a unit transformer, a filter, a frequency reducer and a schema agent. Agents are
developed in a way that it is possible to add more at a later point in time. First,
we will describe the structural transformer agent:
Structural Transformer Agent The task of the Structural Transformer Agent
is to transform the internal raw-data representation into the structure required
by the virtual sensor. This is done via mappings between the two model schema’s.
One example would be to a flat data structure into a nested structure or vice
versa. Another example could be to flatten a property list into primitive
properties. All operations are performed on the internal format and are inferred from
the models based on the provided runtime name. At runtime, each data point is
transformed individually in a stateless manner.</p>
      <p>
        Unit Transform Agent The Unit Transformer Agent uses the unit
information of the EventPropertyPrimitive to automatically provide the correct
measurement unit. Users only need to model the structure of the required format
for the event property and the system automatically transforms the data. This
is accomplished using the QUDT Ontology [
        <xref ref-type="bibr" rid="ref9">9</xref>
        ], that provides information about
different units and also contains a conversion formula for individual conversions.
First, the measurement values are transformed into a standard metric, afterwards
this standard metric is further transformed into the goal unit.
      </p>
      <p>Filter Agent The Filter Agent filters data values out that are not compatible
to the virtual sensor description. Filter rules are extracted from the semantic
model, for example the PropertyQualities and the ValueSpecification as described
in section 3. For instance, if a quantitative value has a modeled range from 0 to
10 and the measured value is 11, the system infers that this is a false value and
can automatically remove it from the output stream. This agent ensures that
data consumers can expect only semantically correct data, which reduces the
probability of run-time errors.
Frequency Reducer Agent This agent changes the frequency of the data,
according to the actual values of the properties. When the user activates this
agent during the design phase, all values of the events are monitored during
runtime. If the agent detects that values of the events do not change over a
period of time, the frequency for emitting new events is reduced. With this
agent, it is possible to reduce the amount of data sent over the network without
loosing information.</p>
      <p>Schema Agent Some information about the stream must not be modeled by
the user, but is inferred automatically at run-time. In this case, the schema
description is adapted accordingly. This agent does not transform data, it
monitors runtime data and changes the schema description. Two such examples are
frequency and latency of the produced events. These values are measured at
runtime and are constantly updated in the schema description.
5</p>
    </sec>
    <sec id="sec-5">
      <title>Related Work</title>
      <p>
        Integrating semantic web technologies and big data streaming architectures is
becoming more and more relevant. One example is Strider [
        <xref ref-type="bibr" rid="ref10">10</xref>
        ], which consumes
data from Apache Kafka and uses Apache Spark to optimize query planning.
Our approach is complementary and could be used to easily integrate new data
sources into Kafka and process it with this framework.
      </p>
      <p>
        There are also other solutions leveraging from semantic data models for
streaming data from sensors like for example [
        <xref ref-type="bibr" rid="ref11">11</xref>
        ], the sensor middleware for
OpenIoT [
        <xref ref-type="bibr" rid="ref12">12</xref>
        ]. The authors use the SSN ontology and also have the concept of
virtual sensors. One difference is that we mainly use semantics during the design
process to automatically transform data later during runtime, but we do not
focus on processing RDF data at runtime.
      </p>
      <p>
        A programming model that is related to our overall architecture are foglets
[
        <xref ref-type="bibr" rid="ref13">13</xref>
        ]. This architecture consists of a central cloud computing instance and several
edge nodes located closer to the sensors in the networking stack. With foglets, it is
possible to distribute programs across all the computing instances and perform
some processing steps on edge nodes an some in the cloud. Our approach is
similar, but our programming model is more lightweight as we clean data directly
on the edge nodes, where no further programming is required.
6
      </p>
    </sec>
    <sec id="sec-6">
      <title>Conclusion and Outlook</title>
      <p>In this paper, we presented a framework for data adapters that are capable of
ingesting real-time and historic batch data into unified stream processing engines.
We introduced a lightweight, RDFS-based model for raw data sources and an
extended model to represent virtual sensors. These models can be instantiated by
domain experts with little technical knowledge using a graphical user interface.
Based on these models, our contribution consists of a generic adapter
architecture to automatically consume, pre-process and harmonize data. Our approach
bridges the gap between a large variety of data sources and the processing engine
that performs the actual algorithms.</p>
      <p>In our future work, we plan to further extend our framework by supporting
more protocols and formats and also extending the (semi-) automatic
transformation capabilities of our raw-data pipeline.</p>
    </sec>
  </body>
  <back>
    <ref-list>
      <ref id="ref1">
        <mixed-citation>
          1.
          <string-name>
            <given-names>J.</given-names>
            <surname>Kreps</surname>
          </string-name>
          ,
          <string-name>
            <given-names>N.</given-names>
            <surname>Narkhede</surname>
          </string-name>
          ,
          <string-name>
            <given-names>J.</given-names>
            <surname>Rao</surname>
          </string-name>
          et al.,
          <article-title>“Kafka: A distributed messaging system for log processing</article-title>
          ,”
          <source>in Proceedings of the NetDB</source>
          ,
          <year>2011</year>
          , pp.
          <fpage>1</fpage>
          -
          <lpage>7</lpage>
          .
        </mixed-citation>
      </ref>
      <ref id="ref2">
        <mixed-citation>
          2.
          <string-name>
            <given-names>P.</given-names>
            <surname>Carbone</surname>
          </string-name>
          ,
          <string-name>
            <given-names>A.</given-names>
            <surname>Katsifodimos</surname>
          </string-name>
          et al., “
          <article-title>Apache flink: Stream and batch processing in a single engine,”</article-title>
          <source>Bulletin of the IEEE Computer Society Technical Committee on Data Engineering</source>
          , vol.
          <volume>36</volume>
          , no.
          <issue>4</issue>
          ,
          <year>2015</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref3">
        <mixed-citation>
          3.
          <string-name>
            <given-names>R. C.</given-names>
            <surname>Fernandez</surname>
          </string-name>
          ,
          <string-name>
            <given-names>P. R.</given-names>
            <surname>Pietzuch</surname>
          </string-name>
          et al., “Liquid:
          <article-title>Unifying nearline and offline big data integration,”</article-title>
          <source>in CIDR 2015, Seventh Biennial Conference on Innovative Data Systems Research</source>
          , Asilomar, CA, USA,,
          <year>2015</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref4">
        <mixed-citation>
          4.
          <string-name>
            <given-names>S.</given-names>
            <surname>Bischof</surname>
          </string-name>
          ,
          <string-name>
            <given-names>C.</given-names>
            <surname>Martin</surname>
          </string-name>
          ,
          <string-name>
            <given-names>A.</given-names>
            <surname>Polleres</surname>
          </string-name>
          , and
          <string-name>
            <given-names>P.</given-names>
            <surname>Schneider</surname>
          </string-name>
          ,
          <article-title>Collecting, Integrating, Enriching and Republishing Open City Data as Linked Data</article-title>
          . Cham: Springer International Publishing,
          <year>2015</year>
          , pp.
          <fpage>57</fpage>
          -
          <lpage>75</lpage>
          . [Online]. Available: http://dx.doi.org/10.1007/978-3-
          <fpage>319</fpage>
          -25010-
          <issue>6</issue>
          _
          <fpage>4</fpage>
        </mixed-citation>
      </ref>
      <ref id="ref5">
        <mixed-citation>
          5.
          <string-name>
            <given-names>D.</given-names>
            <surname>Riemer</surname>
          </string-name>
          ,
          <string-name>
            <given-names>F.</given-names>
            <surname>Kaulfersch</surname>
          </string-name>
          ,
          <string-name>
            <given-names>R.</given-names>
            <surname>Hutmacher</surname>
          </string-name>
          , and L. Stojanovic, “
          <article-title>Streampipes: Solving the challenge with semantic stream processing pipelines</article-title>
          ,”
          <source>in Proceedings of the 9th ACM International Conference on Distributed Event-Based Systems, ser. DEBS '15</source>
          . New York, NY, USA: ACM,
          <year>2015</year>
          , pp.
          <fpage>330</fpage>
          -
          <lpage>331</lpage>
          . [Online]. Available: http://doi.acm.
          <source>org/10</source>
          .1145/2675743.2776765
        </mixed-citation>
      </ref>
      <ref id="ref6">
        <mixed-citation>
          6.
          <string-name>
            <given-names>M.</given-names>
            <surname>Zaharia</surname>
          </string-name>
          ,
          <string-name>
            <given-names>M.</given-names>
            <surname>Chowdhury</surname>
          </string-name>
          ,
          <string-name>
            <given-names>M. J.</given-names>
            <surname>Franklin</surname>
          </string-name>
          ,
          <string-name>
            <given-names>S.</given-names>
            <surname>Shenker</surname>
          </string-name>
          ,
          <string-name>
            <surname>and I. Stoica</surname>
          </string-name>
          , “Spark:
          <article-title>Cluster computing with working sets</article-title>
          .
          <source>” HotCloud</source>
          , vol.
          <volume>10</volume>
          , no.
          <fpage>10</fpage>
          -
          <lpage>10</lpage>
          , p.
          <fpage>95</fpage>
          ,
          <year>2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref7">
        <mixed-citation>
          7.
          <string-name>
            <given-names>D.</given-names>
            <surname>Riemer</surname>
          </string-name>
          , “
          <article-title>Methods and tools for management of distributed event processing applications</article-title>
          ,”
          <source>Ph.D. dissertation, Dissertation</source>
          , Karlsruhe, Karlsruher Institut für Technologie (KIT)„
          <year>2016</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref8">
        <mixed-citation>
          8.
          <string-name>
            <given-names>K.</given-names>
            <surname>Taylor</surname>
          </string-name>
          , S. Cox,
          <string-name>
            <given-names>K.</given-names>
            <surname>Janowicz</surname>
          </string-name>
          ,
          <string-name>
            <given-names>D. L.</given-names>
            <surname>Phuoc</surname>
          </string-name>
          ,
          <string-name>
            <given-names>A.</given-names>
            <surname>Haller</surname>
          </string-name>
          , and
          <string-name>
            <given-names>M.</given-names>
            <surname>Lefrançois</surname>
          </string-name>
          , “
          <article-title>Semantic sensor network ontology,” W3C</article-title>
          ,
          <string-name>
            <surname>Candidate</surname>
            <given-names>Recommendation</given-names>
          </string-name>
          , Jul.
          <year>2017</year>
          , https://www.w3.org/TR/2017/CR-vocab-ssn-
          <volume>20170711</volume>
          /.
        </mixed-citation>
      </ref>
      <ref id="ref9">
        <mixed-citation>
          9.
          <string-name>
            <given-names>R.</given-names>
            <surname>Hodgson</surname>
          </string-name>
          , P. J. Keller, J. Hodges, and
          <string-name>
            <given-names>J.</given-names>
            <surname>Spivak</surname>
          </string-name>
          , “
          <article-title>Qudt - quantities, units, dimensions and data types ontologies,”</article-title>
          <string-name>
            <surname>Tech. Rep.</surname>
          </string-name>
          ,
          <year>2017</year>
          , http://www.qudt.org/.
        </mixed-citation>
      </ref>
      <ref id="ref10">
        <mixed-citation>
          10.
          <string-name>
            <given-names>X.</given-names>
            <surname>Ren</surname>
          </string-name>
          and
          <string-name>
            <given-names>O.</given-names>
            <surname>Curé</surname>
          </string-name>
          , “Strider:
          <string-name>
            <given-names>A Hybrid</given-names>
            <surname>Adaptive Distributed RDF Stream Processing</surname>
          </string-name>
          <string-name>
            <surname>Engine</surname>
          </string-name>
          ,” pp.
          <fpage>1</fpage>
          -
          <lpage>17</lpage>
          ,
          <year>2017</year>
          . [Online]. Available: http://arxiv.org/abs/1705.05688
        </mixed-citation>
      </ref>
      <ref id="ref11">
        <mixed-citation>
          11.
          <string-name>
            <surname>J.-P. Calbimonte</surname>
            ,
            <given-names>S.</given-names>
          </string-name>
          <string-name>
            <surname>Sarni</surname>
            ,
            <given-names>J.</given-names>
          </string-name>
          <string-name>
            <surname>Eberle</surname>
            , and
            <given-names>K.</given-names>
          </string-name>
          <string-name>
            <surname>Aberer</surname>
          </string-name>
          , “
          <article-title>Xgsn: An open-source semantic sensing middleware for the web of things</article-title>
          .” in TC/SSN@ ISWC,
          <year>2014</year>
          , pp.
          <fpage>51</fpage>
          -
          <lpage>66</lpage>
          .
        </mixed-citation>
      </ref>
      <ref id="ref12">
        <mixed-citation>
          12.
          <string-name>
            <surname>J. Soldatos</surname>
            ,
            <given-names>N.</given-names>
          </string-name>
          <string-name>
            <surname>Kefalakis</surname>
          </string-name>
          et al., “Openiot:
          <article-title>Open source internet-of-things in the cloud,” in Interoperability and open-source solutions for the internet of things</article-title>
          . Springer,
          <year>2015</year>
          , pp.
          <fpage>13</fpage>
          -
          <lpage>25</lpage>
          .
        </mixed-citation>
      </ref>
      <ref id="ref13">
        <mixed-citation>
          13. E. Saurez,
          <string-name>
            <given-names>K.</given-names>
            <surname>Hong</surname>
          </string-name>
          ,
          <string-name>
            <given-names>D.</given-names>
            <surname>Lillethun</surname>
          </string-name>
          ,
          <string-name>
            <given-names>U.</given-names>
            <surname>Ramachandran</surname>
          </string-name>
          , and
          <string-name>
            <given-names>B.</given-names>
            <surname>Ottenwälder</surname>
          </string-name>
          , “
          <article-title>Incremental deployment and migration of geo-distributed situation awareness applications in the fog</article-title>
          ,”
          <source>in Proceedings of the 10th ACM International Conference on Distributed and Event-based Systems. ACM</source>
          ,
          <year>2016</year>
          , pp.
          <fpage>258</fpage>
          -
          <lpage>269</lpage>
          .
        </mixed-citation>
      </ref>
    </ref-list>
  </back>
</article>