=Paper= {{Paper |id=Vol-1558/paper1 |storemode=property |title=Leveraging Reconfigurable Computing in Distributed Real-time Computation Systems |pdfUrl=https://ceur-ws.org/Vol-1558/paper1.pdf |volume=Vol-1558 |authors=Apostolos Nydriotis,Pavlos Malakonakis,Nikos Pavlakis,Grigorios Chrysos,Ekaterini Ioannou,Euripides Sotiriades,Minos Garofalakis,Apostolos Dollas |dblpUrl=https://dblp.org/rec/conf/edbt/NydriotisMPCISG16 }} ==Leveraging Reconfigurable Computing in Distributed Real-time Computation Systems== https://ceur-ws.org/Vol-1558/paper1.pdf
       Leveraging Reconfigurable Computing in Distributed
                Real-time Computation Systems
 Apostolos Nydriotis, Pavlos Malakonakis, Nikos Pavlakis, Grigorios Chrysos, Ekaterini Ioannou,
                    Euripides Sotiriades, Minos Garofalakis, Apostolos Dollas
                                                             ECE Department
                                                        Technical University of Crete
                                                             Chania, Greece
                                                             dollas@mhl.tuc.gr

ABSTRACT                                                                  improve system level performance by solving the Input/Output-
The community of Big Data processing typically performs real-             related issues, i.e. the usual bottleneck in several FPGA-based
time computations on data streams with distributed systems such as        computational systems.
the Apache Storm. Such systems offer substantial parallelism;             In recent years, there exists an increasing need for real-time
however, the communication overhead among nodes for the                   processing of the huge amounts of data. Processing massive
distribution of the workload places an upper limit to the exploitable     amounts of data in real-time can only be achieved by distributing
parallelism. The contribution of the present work is the integration      the workload across many computers and using distributed real-
of a reconfigurable platform with the Apache Storm, which is the          time computation systems such as the Apache Storm [7]. These
main platform of the Big Data streaming processing community.             systems have two inherent attributes: (i) when the parallelism is
By exploiting the internal bandwidth of FPGAs we show that the            unlimited and interconnection networks are sufficient for data
computational limits for stream processing are significantly              exchange any increase in the volume of data is processed by
increased vs. conventional distributed processing without                 adding additional processing nodes, which of course imposes a
compromising on the platform of choice or its seamless operation          financial cost (for acquisition and for operation); and (ii) the
in a dynamic pipeline. The integration of a Maxeler MPC-C Series          required network resources are not linear with respect to the
platform with the Apache Storm, as presented in detail, yields on         number of nodes incorporated in the distributed real-time
the Hayashi-Yoshida correlation algorithm an impressive tenfold           computation system, meaning that quite often the exploitable
increase in real-time streaming input capacity, which corresponds         parallelism is limited by interconnection limitations, and even
to a hundred-fold computational load. Our methodology is                  when a speedup can be achieved it is not linear with the resources.
sufficiently general to apply to any class of distributed systems or      Hence, there exists a need to increase the computational capacity
reconfigurable computers, and this work presents quantitative             of a single node of a system, such as the Apache Storm. At the
results of the expected I/O performance, depending on the means           same time the solution cannot be a proprietary system, as this
of network connection.                                                    would limit its usefulness.
CCS Concepts                                                              The purpose of this work, and thus its contribution, is to integrate
•Computer systems organization➝ Distributed architectures                 a powerful FPGA-based system with the Apache Storm
➝Cloud computing •Computer systems organization➝                          distributed platform, and by exploiting the internal processing and
Heterogeneous (hybrid) systems, Reconfigurable computing,                 communication bandwidth of FPGAs to demonstrate that
Data flow architectures                                                   significant speedups can be achieved vs. the CPU-only approach.
                                                                          More specifically, in this paper we show how we achieved the
Keywords                                                                  interconnection of the popular distributed real-time computation
Multi FPGA platform; streaming Big Data; Storm; distributed               system Apache Storm with a Maxeler multi-FPGA platform. The
computational system; hybrid computational platform; high                 interconnection forms the first (in our knowledge) dynamic
performance computing.                                                    pipeline for streaming applications, in which either the Apache
                                                                          Storm can be used by itself or the Maxeler reconfigurable
1. INTRODUCTION                                                           resources can be used for the real-time processing of the input
Multi-FPGA platforms provide opportunities for system level               stream. This work presents the integration challenges as well as
design by tight coupling of powerful General Purpose Processors           very promising initial performance results from actual
with FPGAs through fast interconnection networks or busses.               experiments. More specifically, we explain how to perform the
FPGAs also have very fast access to external memory and direct            incorporation of the FPGAs within Storm without imposing
fast connection to the internet. These systems, usually, have a           modifications to the data processing inside Storm. We use a
"look and feel" of a conventional General Purpose Server with a           demanding streaming financial data processing scenario as a
Linux-based operating system, using special compilers. Thus, they         driving problem, which involves the computation of the pair-wise
(c) 2016, Copyright is with the authors. Published in the Workshop
                                                                          correlations between stocks. Based on this scenario, we
Proceedings of the EDBT/ICDT 2016 Joint Conference (March 15,             experimentally demonstrate the benefits of computing the stock
2016, Bordeaux, France) on CEUR-WS.org (ISSN 1613-0073).                  correlations with the hybrid distributed computational system.
Distribution of this paper is permitted under the terms of the Creative
Commons license CC-by-nc-nd 4.0
                                                                          The remaining paper is organized as follows. Section 2 describes
                                                                          the state of the art technologies that have been used in our
implementation. Section 3 presents the incorporation of the            reconfigurable platform as a node for specific applications can
FPGAs within the distributed real-time computation system.             offer a significant advantage. The reconfigurable node can
Then, Section 4 presents our proposed architecture, while Section      perform the compute intensive parts of the algorithm and the
5 presents experimental results from a streaming finance               conventional nodes can perform all the other procedures which
application. Lastly, Section 6 provides conclusions and future         are difficult to be translated in hardware and have not significant
work.                                                                  computational load. Such a system can be considered to be a new
                                                                       era for reconfigurable computing which can incorporate
2. Technology Description                                              heterogeneous computing systems providing them with powerful
This section introduces briefly the technologies that were used        coprocessors.
and the characteristics which make these state of the art
technologies useful. The platforms in Subsections 2.2 and 2.3          2.2 The Maxeler System
have been designed to process streaming data.                          Maxeler technologies is one of vendors that offer state-of-the-art
                                                                       FPGA-based platforms, such as the ones described in the previous
2.1 FPGA-Based Processing                                              section [3] [4] [6].
Using FPGAs to speed-up computationally intensive processing
has been done successfully for over two decades, since the early       Maxeler offers Maximum Performance Computing (MPC)
1990s. However, major challenges towards the use of FPGAs as           systems based on FPGAs. They drive MPC by using the
high-end processors are: (i) I/O problems, which prove to be a         ‘Multiscale Dataflow Computing’ paradigm. Dataflow computers
computational bottleneck, and (ii) the difficulty to program and       focus on optimizing the movement of data in an application and
use such systems, as their development tools and interfaces are        utilize massive parallelism between thousands of tiny ‘dataflow
considered exotic for the community of software applications           cores’ in order to provide orders of magnitude benefits in
developers.                                                            performance, space and power consumption. Due to this model
                                                                       the Maxeler systems refer to their reconfigurable resources
The latest generation of FPGA devices offers significant resources     (FPGAs) as Dataflow Engines (DFEs). The system presented in
in addition to the reconfigurable fabric. Special I/O transceivers,    this work is the MPC-C series, shown in Figure 1, below. It
dedicated logic blocks for memory, powerful general purpose            consists of 12 Intel Xeon CPU cores with 64GB of RAM and 4
processors on chip, special modules for digital signal processing,     DFEs (Xilinx XC6VSX475T FPGAs) with 24GB of RAM for
and fast floating point operations have been added on-chip. Even       each one. Each DFE is connected to the CPUs via PCI Express
the reconfigurable fabric changed, offering more logic, better         with up to 2GB/s, and DFEs are directly connected with MaxRing
routing resources and run- time reconfiguration characteristics. In    interconnect. Maxeler technologies also offer nodes like MPC-X
addition, a large collection of functional Intellectual Property       and MPC-N series which support direct network connection of the
cores (IPs) is freely available to the designer through IP generator   DFEs.
tools such as the Xilinx Core Generator, or, distributed by
designers through web sites such as OpenCores. All of these
available resources help designers to take up new applications,                                      Maxeler MPC node
with considerable results on network systems such as network
                                                                                           CPU     CPU
switches, network intrusion detection systems, and financial data
                                                                                                                           DFE
analysis. Data streaming applications become much easier to
implement due to these technological advances of FPGAs, mostly                             CPU     CPU

in the forms of I/O transceivers on a chip and large amount of
                                                                                                                           DFE
available memory. Considering these features, the computational
                                                                                                             PCI Express
                                                                                           CPU     CPU




                                                                                                                                 MAXRING
power of FPGA devices was exponentially increased but the                       Ethernet

problems of I/O bottleneck and ease of use where not faced for                             CPU     CPU                     DFE
most mainstream (i.e. low cost) systems. It is very rare that a $100
FPGA-based board (regardless of vendor) will be used in large-
                                                                                           CPU     CPU
scale production mode as a coprocessor, despite its inherent
                                                                                                                           DFE
computational capabilities.
                                                                                           CPU     CPU
Unlike low-end systems, high-end multi FPGA platforms have
been developed to offer opportunities at system level design by
using powerful General Purpose Processors together with fast and                  Figure 1. MPC-C series node architecture
large FPGAs. These systems have a "look and feel" of a                 The Maxeler DFE configuration is called through C/C++ host
conventional General Purpose Server with a Linux -based                code running on the CPUs. The MaxCompiler translates Maxj into
operating system, using special compilers. Application                 FPGA configuration files. Maxj, is an extended form of Java with
Developers for such systems usually keep the software at its           operator overloading, and it is used as the Hardware Description
original form and swap the computational intensive procedures,         Language for Maxeler systems. The Maxj language and
with hardware procedures calls which are functionally equivalent,      environment offers a relatively user-friendly programming
and which are determined by profiling the applications. Such           environment (vs. traditional design in, say, VHDL) but it also
servers can offer significant computational power, which equals        poses limitations as to what can be implemented, and it is not
that of hundreds of conventional processors, while keeping the         uncommon for applications which require complex data structures
same look-and-feel for the end user (but not for the developer).       and synchronization to be designed with VHDL in order for the
In terms of user needs, the Big Data community uses platforms          designer to have complete control of the design.
that can manage dozens or even hundreds of nodes for high              The computing kernels which handle the data-intensive part of the
performance computations[1][2]. In such systems, using a               application and the associated manager, which orchestrates data
movement between the DFE and the CPU or RAM, are written
using the Maxj language. At its simplest level, DFE computation
can be added to an application with a single function call, while
for more fine-grained control the SLiC (Simple Live CPU) tool
provides an “action”-based interface. The SLiC interface, is
Maxeler’s application programming interface for CPU-DFE
integration.
In such an MPC system, the CPUs are in control and drive the
computations on the DFEs. The data-intensive parts of the
computations are typically confined to the DFE configuration and
are made available through the SLiC interface.

2.3 The Apache Storm platform                                                             Figure 2. Storm Topology
The Apache Storm is a free and open source distributed real-time
computation system used for processing unbounded streams of            Maxeler hardware had to be established, which proved to be
data. In order to process data with Storm [7], graphs of               impossible, as will be described below. Hence, we need to
computation – called topologies – have to be created. The nodes        differentiate between the Maxeler system being a Storm node vs.
of a topology encapsulate the processing logic, while connections      the Maxeler system being connected to a Storm node. Obviously
between the nodes indicate how the data flows in the graph. Storm      the former would be preferable, but as will be explained below it
provides the primitives for transforming such data streams into        was not possible.
new streams in a distributed and reliable way.                         The Maxeler hardware is called by a C/C++ host code. The main
The basic primitives which Storm provides for doing stream             problem that had to be addressed is the interface between Java
transformations are the “spouts” and the “bolts”. These are the        (Storm) and C. Three methods that allow C/C++ and Java
components of a topology and the programmer uses them to               connection were considered: (i) SWIG (Simplified Wrapper and
implement application-specific logic. Spouts can be thought of as      Interface Generator) that allows the call of C function through
the entry points of data into the system. For example, a spout may     Java, (ii) the exec function which calls the C executable and (iii)
connect to the Twitter API, receive a stream of tweets and emit it     network sockets. In the solution (iii), which was developed
to the rest of the topology. A bolt, on the other hand, consumes       successfully, we have the Maxeler system connected to Storm.
any number of input streams, does some processing and possibly         Although this may appear to be an issue of semantics, in reality it
emits new streams. Networks of spouts and bolts are packaged           affects how tight the entire integration becomes as well as system
into “topologies”, which are submitted to Storm clusters for           performance, as will be explained below.
execution. A topology then, is a graph of stream transformations
where each node is a spout or bolt. Edges in the graph indicate        3.1 SWIG
which bolts are subscribing to which streams. When a spout or          In order to use SWIG to connect the Storm Java code with C, the
bolt emits a tuple to a stream, it actually sends the tuple to every   Maxeler project was compiled as a shared library. SWIG [8] is a
bolt that subscribed to that stream.                                   software development tool that connects programs written in C
                                                                       and C++ with a variety of high-level programming languages.
Each node in a Storm topology, as the one depicted in Figure 2,        SWIG is used with different types of target languages including
executes in parallel in one or more cluster machines. The amount       common scripting languages such as Java, Perl, PHP, Python, Tcl
of parallelism of each component is subject of the topology            and Ruby. SWIG is most commonly used to create high-level
configuration. The configured number of threads and processes          interpreted or compiled programming environments, user
will be spawned across the cluster to do the execution. Storm          interfaces, and as a tool for testing and prototyping C/C++
offers “at least once” processing guarantees as well as “no data       software. SWIG is typically used to parse C/C++ interfaces and
loss” guarantees even if machines go down and messages are             generate the 'glue code' required for the above target languages to
dropped. Any failed tasks will automatically be reassigned.            call into the C/C++ code. After wrapping the C code with SWIG
                                                                       the C function can be called as a native code call from the Java.
3. Adding a Maxeler node to storm                                      The problem was that in order to use SWIG, its data types had to
Several technical solutions have been tested in order to integrate     be used in order to be able to exchange arguments from C to Java.
the Storm and Maxeler platforms. These solutions are presented         This would reduce flexibility as the Java code would have to be
and discussed in this Section, with the simplest and more generic      changed to use SWIG data types. Furthermore, the default
one (which was chosen as the best approach) last.                      scheduler of Storm selects the cluster machines on which the
A communication framework had to be created so that the                various tasks will run based on load balancing features. This
reconfigurable hardware could receive data, make any                   means that in order to pin the task that would connect with C
calculations needed, and then transmit the results back to the         through SWIG to a specific cluster machine – the Maxeler
software application. This task involves the implementation of a       workstation in this case– we would have to implement a custom
flexible interface between the Storm and Maxeler systems, which        Storm scheduler. Such a low-level tweaking of Storm would
could also be used with different tools or reconfigurable hardware     definitely raise the complexity level of our solution to very high
platforms.                                                             and undesirable levels, and deviate from the standard
                                                                       distributions, which was deemed undesirable.
The Storm framework was installed on the Maxeler workstation
and was tested as a simple Storm node. This was no difficult task,     3.2 Exec function call
as the Maxeler systems support Linux, for which Storm does have        Java’s exec command is able to directly call the Maxeler
a distribution. Following that step, the connection with the           executable or any executable file through a system call. This, of
course, implies paying a performance penalty since system calls
are very expensive operations. Furthermore, as with the SWIG                                                        Maxeler MPC node
alternative, a custom Storm scheduler would have to be
                                                                                                                                       DFE
implemented in order to successfully call the Maxeler executable.
Although the exec function call was successfully tested locally on            Spout          Bolt
the workstation, when the call came through Storm, the                                                                                 DFE
executable would not run. The problem turned out to be related to
                                                                                             Bolt             CPU       PCI Express
the version and tuning of Linux which is required for Storm vs.
that in which the Maxeler run-time environment, and as a result                                                                        DFE
several libraries, which are needed for the Maxeler system to
access the run-time environment could not become available. The
combination of the implementation difficulties, described above,                                                                       DFE

with the anticipated performance issues due to system calls
deemed this approach as well to be undesirable for further pursuit.
                                                                                      Figure 3. Storm Maxeler node Integration
3.3 Network sockets                                                      The network sockets approach was chosen, as shown in Figure 3
The last method which was proposed in order to establish                 (in which there is no bolt inside the Maxeler system) due to their
communication between the Java code and the C host code                  flexibility and their performance. We note here that as a side-
running on the Maxeler workstation was the use of network                effect from the way the sockets-based system was developed, we
sockets [9]. The Maxeler node, acting as a TCP server, creates           achieve flow control and synchronization of data between the
sockets on start up. The sockets, being in listening state, are          software components and the hardware ones. Any system that can
waiting for connections from client applications (e.g. Storm             implement sockets can use this approach without deteriorating
nodes). A TCP server may serve several clients concurrently, by          performance since the expensive interface functions of SWIG as
creating a child process for each client and establishing a TCP          well as the system calls are avoided.
connection – via a unique dedicated socket – between the child
process and the client. The Maxeler workstation can support up to        4. Platform Architecture
4 children processes with hardware calls as 4 different hardware         This section describes the architecture of the proposed platform.
designs can be executed simultaneously on the 4 available                The proposed architecture is generic as it can combine any FPGA
FPGAs. The socket server is written in C and serves as the               based platform with a stream processing framework using simple
Maxeler host code.                                                       and well know communication methods, i.e. network sockets.
On the basic form of the implementation one socket client makes          The FPGA-based platform consists of a combination of a
a call on the server. The clients are implemented in Java as Storm       reconfigurable and a general purpose processor. The general
components (Spouts/Bolts) that implement the appropriate                 purpose processor is used for communication with the streaming
algorithmic family interface. When the connection is established,        framework, for parsing the incoming data and passing them to the
the clients stream the data to the server. The Maxeler server            reconfigurable platform. Last, it is used for packetizing the results,
makes the hardware call and when the processing is completed it          which are received back from the reconfigurable platform, and
streams the results back to the clients. For example, the                send them to the stream-based framework. On the other hand, the
application presented on the following section uses two socket           reconfigurable-based part is used as the main processing unit. It is
clients in order to make a call on the hardware server, a                used exclusively for mapping the compute intensive parts of the
transmitter – that is implemented as a bolt – and a receiver –           algorithm and process the incoming data. These parts of the
implemented as a spout. The transmitter creates a new socket in          hardware platform are connected with each other using PCIe
order to connect to the server and sends the configuration as well       links.
as the input data from previous Storm components. It can also
request the results to be sent to the receiver. The receiver, which is   The second component of the proposed architecture is the stream
connected via a different socket, receives the results and forwards      processing platform. This platform is used for passing data to the
them to the rest of the topology. The server stays on listening state    reconfigurable platform and receiving the results back. This
until a connection is established by both a transmitter client and a     framework needs to use streaming formulation.
receiver client. First, the transmitter streams configuration. Next,     As described above, the communication scheme was the most
it transmits input data to the server. The server stores the data        challenging part for integrating the hardware-based platform with
coming from the transmitter and performs the Maxeler hardware            the software-based framework into a hybrid platform. For the
call to process them. After the hardware call has returned, it sends     presented solution, we propose the use of network sockets, which
the results to the receiver client whenever a result request is          is a generic and high-throughput solution for connecting
received. The receiver and transmitter libraries can also be used        distributed systems.
outside Storm, increasing the flexibility even more.
The three methods were compared in terms of flexibility and              5. Experiments
functionality. The use of SWIG basically reduces the flexibility of      This section presents some experimental results on a test platform
the interface, as the Java code would have to be rewritten using         that uses FPGA devices as part of a distributed real-time
SWIG’s data types. The exec system call allows more flexibility          computation system, i.e. Apache Storm. First, we describe the
as only a function call would have to be included in the Java code,      platform that was used for our experiments. The platform is
but even though it worked perfectly when called by Java on the           generic and can be easily extended. Second, the algorithm which
workstation, it didn’t work when the function ran through Storm.         was used to demonstrate the proposed infrastructure is described.
Furthermore, following this method would hurt a lot the                  Finally, some initial performance results from the integration of
performance of the system because of the cost of the system call.
FPGA devices into a distributed real-time computation system are          sent via TCP socket to another storm spout unit– basically this
presented.                                                                reimports the results to the storm system. Finally the results are
                                                                          presented to the final user. The hardware architecture that is
5.1 Platform                                                              mapped on reconfigurable technology is presented in Figure 5.
This section describes the infrastructure which was used for our
experiments. The proposed solution combines the reconfigurable            5.4 Performance
technology with the Apache Storm framework. The platform                  We evaluated our system with high-volume, real-life and
consists of a cluster with 7 nodes, each one with Dual-core AMD           synthetic data streams. The used datasets consist of stock market
CPUs @ 2.1 GHz, 8 GB RAM, Gigabit Ethernet connection and a               transactions. The rate of the transactions was one transaction for
Maxeler MPC-C server. Storm topologies consisted of spout and             each stock market per sec. We used various sized input datasets,
bolt primitives that were mapped on the cluster, as described in          which were streamed to the platform. The evaluated input datasets
previous sections. The Maxeler server was connected via TCP               were from small loads, i.e. 250 transactions per sec, up to heavy
sockets with a bolt and a spout of the Storm topology for I/O data        loads, i.e. 5000 transactions per sec. As the input datasets had one
movement [5].                                                             transaction for each stock market, thus, the number of transactions
                                                                          is equal to the number of processed stock markets per sec. The
The implemented infrastructure defines a data flow from data              data and the results were transferred over the network using the
sources through data processing components to data sinks. The             TCP sockets, as it was described above.
same workload has been assigned to both software and hardware
parts of the platform in order to compare the performance of the          First, we tested our system with real-life dataset with 1000 stock
reconfigurable part of the cluster vs. the performance achieved by        markets (i.e. 1000 streams, as each stock market is one stream).
the software-only cluster solution.                                       The performance results indicated that the time needed for the
                                                                          calculation of the HY correlation coefficients for real-life 1000
5.2 Test case: Correlation on financial                                   stock markets at every timestamp was about 0.2 sec. Thus, taking
streaming data                                                            into account that new transactions arrive every second, this means
The analysis and the elaboration of high data loads in real-time is       that our proposed solution can process in real-time the real-life
crucial for the financial stock markets. The financial data arrive in     input data. We also tested the proposed solution with synthetic
a streaming fashion from various numbers of streams with high             datasets. The demonstrations showed that the correlation of about
rates. The correlation metric is an industry-standard technique.          5000 stock markets can be computed in real time, i.e. every
One of the most well-known correlation metrics for high                   second. On the other hand, a distributed implementation of the
frequency streaming data is the Hayashi-Yoshida (HY) correlation          Hayashi-Yoshida algorithm over a 7-node cluster mapping the
estimator [10][11].                                                       Storm framework achieved the calculation of maximum 500 stock
                                                                          markets in real time. Last, we ran the above datasets over a single
The HY Correlation Estimator measures the pairwise correlation            thread fully optimized implementation of Hayashi-Yoshida
of the input market stocks. It uses the transaction prices of two         correlation. The performance achieved, was not competitive, as
stocks in order to calculate their correlation. The correlation is        the single thread solution could process up 250 stock market per
calculated over time windows, inside of which the stock                   sec. It is clear that a hybrid infrastructure of a distributed
transactions take place. Figure 4 presents the equation to calculate      framework coupled with reconfigurable part can offer quite
the HY estimator for two different market stocks.                         impressive performance even vs. distributed solutions for high
The algorithm outputs a correlation matrix that describes the             workload streaming problems.
correlation between all the different pairs of the incoming               In order to measure the bandwidth of the integrated system several
financial stock markets.                                                  experiments were conducted. Experiments were done, using the
                                                                          Local and the Remote Network (Internet). Local network is the
                                                                          network inside the university campus which has speed of 1 Gbps.
                                                                          The remote tests were run using Virtual Private Network (VPN)
                                                                          over an ADSL (24 Mbps download and 1Mbps upload) with the
                                                                          TCP/IP protocol. Table 1 shows the average results received for
                                                                          the server of these experiments.
       Figure 4. Hayashi-Yoshida Correlation estimator
                                                                          These results shows that over a fast network connection the
5.3 System Architecture                                                   system bottleneck is on processing, but it offers a sufficient
This section describes the system architecture of the test case           bandwidth for several applications, such as the demanding
algorithm on our proposed infrastructure. The implemented                 financial one. On the other hand, when we have connection to the
system takes as input a stream of the transaction prices of N             Maxeler server via VPN, the bottleneck is in the data transfer and
stocks. The transaction data are recorded at random times, i.e.           can be a functional option for remote systems in which the data
they have different timestamps, which means that we have to               rate is not crucial or there is a higher ratio of computation vs. I/O.
calculate the correlation estimator of all the pairs of the stocks                       Table 1. Bandwidth measurements
during different time intervals. First, bolt units preprocess the
received data in order to bring them in a proper format to be                Network configur.        Download(Mb/s)         Upload(Mb/s)
processed. The formatted data are streamed into the                            Local Network                 93.6                 51.12
reconfigurable platform of the infrastructure using TCP sockets,
as referred above. The data are passed to the reconfigurable part at                VPN                      0.81                  7.52
a fixed time interval, i.e. in our tests we used time interval = 1 sec.    MPC – X Series (est.)             1000                 1000
The reconfigurable part computes the correlation metric among all
the pairs of the incoming stock markets and then the results are
                                                                                                        Previous HY
                                             Transaction                                              Coefficients From
                                              Intervals                                               Shared Memory


                            Prices of Overlapping Transaction Interval                             Previous Correlation Coefficients

                     New Stock         Previous      New Stock         Previous      Previous HY      Previous standard       Previous standard
                      Price A        Stock Price A    Price B        Stock Price B   Covariance      deviation of Stock A    deviation of Stock B




                                 -                               -

                                                 x
                                                                                                          +                            +
                                                 +
                                                                                                     New standard             New standard
                                              New HY
                                                                                                      deviation of             deviation of
                                             Covariance
                                                                                                       Stock A                  Stock B

                                                                                                                              HY Reconfigurable
                                                                                                                                  Module


                                                                                    New HY
                                                                                 Coefficients To
                                                                                 Shared Memory
                      Figure 5. Reconfigurable architecture that implements the HY algorithm on streaming data
The connection of another type of Maxeler server, the MPC – X         if for example are on streaming data or not. The selected
Series, with on-FPGA chip network connection, shows the speed         combination is for streaming data as both Storm and Maxeler
of light for this implementation. Such a connection uses fast         systems have been designed for such calculations. In terms of ease
Ethernet to send and receive data and the Infiniband protocol to      of use this system can be fully automated by creating a function
transfer them to reconfigurable hardware. It hasn't been tested and   call for hardware platform
the presented performance is projected. Such a performance is
more than sufficient considering that for financial applications      7. ACKNOWLEDGMENTS
such as the complete NYSE Real-Time Reference Prices feed.            This work has been partially funded from the European Union’s
That feed provides real-time last sale prices in NYSE-Traded          Seventh Framework Programme under Grant Agreement 619525
Securities, and it needs a bandwidth of 13 Mbps for 1ms refresh       (QualiMaster).
rate[12].
                                                                                     8. REFERENCES
6. Conclusions                                                                       [1] Pell, O., Averbukh, V. (2012). "Maximum Performance
This paper shows the potential of reconfigurable computing, used                         Computing with Dataflow Engines". Computing in Science
as part of standardized distributed computer systems. The                                & Engineering , vol.14, no.4, (pp. 98-103).
proposed architecture is generic as it can be used for any                           [2] Pell, O., Mencer, O., Tsoi, K. H., & Luk, W. (2013).
streaming processing algorithmic scheme without any changes but                          “Maximum performance computing with dataflow engines”.
only mapping the compute intensive parts of the algorithms on                            In High-Performance Computing Using FPGAs (pp. 747-
reconfigurable logic. Also, our solution is platform-independent                         774). Springer New York.
as it combines two completely different in terms of technology
computing platforms to create a powerful hybrid computer                             [3] Multiscale Dataflow Programming , Maxeler Technologies
system. Both platforms were initially designed without any                               Ltd, London, UK, 2014
foresight to be connected together. The system that came up is                       [4] Programming MPC Systems, Maxeler Technologies Ltd,
robust, easy to use and able to achieve high performance.                                London, UK, 2014Page 64(of 65) www.qualimaster.eu
Our approach has several benefits. The multi-FPGA platform is                        [5] Deliverable 3.1 QualiMaster
connected to a typical Storm node and thus it required no special                    [6] https://www.maxeler.com/
treatment or flow modifications within Storm. It is able to offer
the computing power equivalent to several conventional Storm                         [7] https://storm.apache.org/
nodes with only one Maxeler multi-FPGA platform. In addition,                        [8] http://www.swig.org/index.php
the suggested interconnection can work with different FPGA                           [9] Yadav, R. (2007). Client/Server programming with TCP/IP
platforms and distributed computation systems. Finally, our                              sockets. Technical Article, DevMentor.
approach enriches Storm with the ability to realize scalability not
only with the traditional methodology of incorporating additional                    [10] Hayashi, T., & Yoshida, N. (2005). “On covariance
processing nodes but also by pushing the part of the processing on                        estimation of non-synchronously observed diffusion
the Maxeler multi-FPGA platform.                                                          processes”. Bernoulli, 11(2), (pp. 359-379).

This method can be easily applied to different computational                         [11] M. Young, The Technical Writer’s Handbook. Mill Valley,
systems as Hadoop, or with different reconfigurable platforms as                          CA: University Science, 1989.
Convey. Such an approach can lead to hybrid systems with                             [12] http://www.nyxdata.com/capacity
different configurations depending on the nature of computations,