=Paper=
{{Paper
|id=Vol-2022/paper19
|storemode=property
|title=
Development of Data-Intensive Services with Everest
|pdfUrl=https://ceur-ws.org/Vol-2022/paper19.pdf
|volume=Vol-2022
|authors=Oleg Sukhoroslov,Alexander Afanasiev
|dblpUrl=https://dblp.org/rec/conf/rcdl/SukhoroslovA17
}}
==
Development of Data-Intensive Services with Everest
==
Development of Data-Intensive Services with Everest
© Oleg Sukhoroslov © Alexander Afanasiev
Institute for Information Transmission Problems of the Russian Academy of Sciences,
Moscow
sukhoroslov@iitp.ru afanasiev@iitp.ru
Abstract. The paper considers development of domain-specific web services for processing of large
volumes of data on high-performance computing resources. The development of these services is associated
with a number of challenges, such as integration with external data repositories, implementation of efficient
data transfer, management of user data stored on the resource, execution of data processing jobs and
provision of remote access to the data. An approach for building big data processing services on the base of
Everest platform is presented. The proposed approach takes into account the characteristic features and
supports rapid deployment of these services on the base of existing computing infrastructure. An example
of service for short-read sequence alignment that processes the next-generation sequencing data on a
Hadoop cluster is described.
Keywords: big data, web services, data transfer, data management, distributed data processing
1 Introduction Amazon ML, Microsoft Azure ML, Databricks Cloud,
are general-purpose platforms, including a set of
The explosive growth of data, observed in a variety of universal services, as well as its own infrastructure for
areas from research to commerce, requires the use of storing and processing data.
high-performance resources and efficient means for There is a lack of best practices for implementation
storing and processing large amounts of data. During of DIS on the basis of the existing infrastructure for big
the last decade, the distributed data processing data processing such as a cluster running Hadoop or
technologies like Hadoop and Spark are emerged. Spark platforms which are increasingly used for the
However, the complexity of the hardware and software analysis of scientific data [3-5]. Also, little attention is
infrastructure prevents its direct use by non-specialists, paid to the integration of DIS with existing repositories
and requires the creation of user-friendly tools to solve and data warehouses, including the cloud-based ones, as
particular classes of problems. well as other services. A lot of experience in the
One way of implementing such tools is the creation integration of distributed resources for storing and
of domain-specific services based on the Software as a processing data has been accumulated within the grid
Service model. This model allows users of such services infrastructures [6], however these environments are
to quickly, without installing software, reuse ready- complex for use by researchers and do not support the
made implementations of data processing methods in a use of new models of computations and technologies
particular domain. At the same time, the user does not such as Hadoop. Finally, there is a lack of platforms for
need to delve into the peculiarities of storing and implementation and deployment of DIS that would
processing data on high-performance resources behind provide ready-made solutions of typical problems
these services. encountered when creating this kind of services.
Data-intensive services (DIS), in comparison to This work is designed to fill these gaps. Chapter 2
conventional computational services with a small describes the characteristics and requirements for DIS.
amount of data, started to develop recently, so the Chapter 3 discusses the principles of implementation of
principles and variants of implementation of these the DIS based on the Everest platform, initially focused
services are poorly understood. There are several on creating services working with a small amount of
academic projects aimed at supporting specific areas of data. A distinctive feature of the proposed approach is
research, for example, the Globus Genomics [1] service support for the rapid implementation of DIS based on
for analyzing the next-generation sequencing data and available computing resources and data warehouses.
the PDACS portal [2] for storing and analyzing data in Chapter 4 describes an example of a service based on
the cosmology domain. The first system uses Amazon the presented approach for mapping short reads on the
cloud resources as a computing infrastructure, while the Hadoop cluster.
second uses the resources of the NERSC and Magellan
science cloud. Commercial cloud solutions, such as
2 Characteristics and Requirements for DIS
Proceedings of the XIX International Conference Consider typical requirements for DIS that represent
“Data Analytics and Management in Data Intensive remotely available services for solving a certain class of
Domains” (DAMDID/RCDL’2017), Moscow, Russia,
October 10-13, 2017
109
problems with a large amount of input data. Such 3 Implementation of DIS with Everest
services should provide remote interfaces, usually in the
form of a web user interface and application Everest [7-8] is a web-based distributed computing
programming interface (API). The interface must allow platform. It provides users with tools to quickly publish
the user to specify the input datasets and parameters of and share computing applications as services. The
the problem being solved in terms of subject area. platform also manages execution of applications on
DIS must use high-performance and scalable external computing resources attached by users. In
(normally distributed) implementations of data analysis contrast to traditional distributed computing platforms,
algorithms, requiring appropriate computing Everest implements the Platform as a Service (PaaS)
infrastructure for data processing and storage. Such model by providing its functionality via remote web and
infrastructure is generally represented by one or more programming interfaces. A single instance of the
computing clusters running Hadoop platform or a platform can be accessed by many users in order to
similar technology. DIS must translate the user request create, run and share applications with each other. The
into one or more computing jobs that are submitted on a platform implements integration with servers and
cluster and use scalable implementations (e.g., based on computing clusters using an agent that runs on the
MapReduce) of perspective algorithms. resource side and plays the role of mediator between the
The user must be able to pass arbitrary input data to platform and resources. The platform is publicly
DIS. If the data is initially located on the user's available online to interested users [8].
computer or external storage resource (e.g., a data The advantage of using Everest platform to create
repository) DIS must implement the transfer of data DIS is the availability of ready-made tools for rapid
over a network to the used cluster. When transferring deployment of computational services and integration
large amounts of data it is important to ensure the with computing resources that do not require a separate
maximum transfer rate and automatic failover. Since the installation of the platform. At the same time, since the
process of working with big data is often exploratory, platform was originally created to support services with
requiring multiple invocations of DIS, the service a small amount of data, the effective implementation of
should support reuse of data loaded to the cluster. In DIS on the base of Everest requires a number of
order to optimize the use of network DIS must also improvements. In particular, it is necessary to
cache frequently used datasets on the cluster. Data implement support of direct data transfers from external
transfer functions can also be implemented as separate storage to the resource and vice versa, bypassing the
auxiliary services. platform. In addition, it is required to implement the
Importantly, DIS may operate separately from integration of the agent with the components of Hadoop
computing resources used for real data processing. DIS platform platform or similar technology used for data
can use multiple resources, that can be situated at storage and processing on the cluster.
different locations. It is also possible that the service Figure 1 presents the proposed scheme of
uses the resources provided by the user. In such cases it implementation of DIS on the base of Everest platform
is important for reasons of efficiency to avoid passing and existing Hadoop cluster. Consider the scenario of
the input data from the user to the resource through the using the service, which includes the following steps
service and to transmit the data directly. marked in the figure.
In practice, the data analysis is often a multi-step In step 1, the user uploads the data of interest to
process that requires performing different tasks at some available on the network or selects data already
different stages of the analysis. In such cases, the results present in the storage. This storage can be represented
produced by one DIS can be passed as the input to by cloud services (Dropbox, Google Drive, etc.),
another service. If these services use different resources, scientific data repositories (Dataverse, FigShare,
there also arises a problem of data transmission between Zenodo, etc.), specialized databases (for example, 1000
resources. In general DIS should allow the user to Genomes Project), grid services or file servers (HTTP,
download the output to his computer or an external FTP, GridFTP, rsync protocols). A wide range of
resource, as well as to transfer the data directly to existing storage facilities makes the task of integrating
another service. In addition, DIS may provide additional DIS with them more important, in comparison with the
functionality for remote data preview and visualization. duplication of their functionality in the service itself.
These functions may also be implemented as separate Note that the user's computer can also act as a data
auxiliary services. store. In this case, the user needs to deploy a software
DIS must support the simultaneous use by multiple that provides network access to the user's files. The
users. This requires the protection of user data, resource experience of implementing such software to ensure the
distribution between users and isolation of reliable transfer of scientific data across the network is
computational processes. In the case of cloud already available [9].
infrastructure, DIS must also manage dynamic In step 2, the user prepares and sends a request to
allocation and deallocation of resources in the cloud, the DIS, including a link to the input data and the values
according to the current load. of other input parameters required by the service. The
passed link should allow downloading the data from the
110
Figure 1 Implementation of DIS on the base of Everest platform and Hadoop cluster
external storage without the user's participation. In task description on the given input data. The launch is
some cases, this requires that the user first supply the performed through the cluster resource manager such as
service with access credentials to the storage, such as an Yet Another Resource Negotiator (YARN), a
OAuth token or a proxy certificate. component of the Hadoop platform that supports the
In step 3, based on the user request, the service launch of MapReduce and Spark programs. A special
generates a computational task and sends it to the agent adapter was implemented in order to support interaction
located on the resource used by the service. Together of Everest agent with YARN, similar in function to the
with the task description, the service sends to an agent a previously created adapters for integration with HPC
link to the input data. As shown in the figure, when batch schedulers.. After the launch, the agent monitors
sending a task from the service to the agent, the code of the status of the corresponding job (the application in
the software implementation used for data processing terms of YARN) and broadcasts the progress
can also be transferred. information to the service (step 6), which in turn
The Hadoop and Spark platforms, most commonly displays this information to the user through the web
used for distributed data processing, use the Java, Scala, interface. Upon completion of the program, the agent
and Python languages for implementation of data transmits to the service the output files (of small size)
processing algorithms. Unlike C and Fortran, often used and the final status of the job.
in scientific parallel applications, programs in these If a large amount of data is produced as a result of
languages can be relatively easily transferred from one the program execution, the agent must support direct
cluster to another, including their dependencies, without network transfer of this data to the user specified
the need for compilation. This opens the possibility for external storage (step 7). The information required for
implementation of services on the basis of already this must be transmitted by the user when sending a
created programs and libraries for Hadoop and Spark, request to the service in step 2. At the moment, the
which can be used in conjunction with an arbitrary upload of output data to the specified FTP server or
cluster specified by the user. This model significantly Dropbox folder is implemented.
simplifies the publication and reuse of developments in In step 8, when the request is processed, the service
this field, without requiring the owner of the service to sends the results to the user as a set of output
provide their own resources. This also avoids the parameters and links to the output files. Some of these
multiple implementations of services that use a single files can be stored by the service itself (for example, the
program with different resources. program execution log), and some of them can be stored
In step 4, the agent downloads input data from the on a cluster or located in an external storage.
external storage to the local cluster. To implement this Note that the steps 1, 4 and 7, marked with an
step, it is planned to add support for loading data from asterisk and associated with the transmission of data
major types of repositories and storage. Currently the over the network, are not always required or may be
basic support for downloading files via HTTP and FTP omitted. For example, step 1 is not required if the data
protocols, as well as an experimental integration with is already in an external storage or on a cluster, which is
Dropbox and Dataverse repository are implemented. true for frequently used data sets. Step 4 can be skipped
The downloaded data is placed in the Hadoop if the data has already been downloaded to the cluster
Distributed File System (HDFS) on the cluster, where it by the agent or manually by the administrator. To do
can be accessed by the program launched in the next this, the agent must store information about the
step. downloaded data and cache it for reuse. Step 7 is not
In step 5, the agent runs the program specified in the required if the received data is an intermediate result
111
Figure 2 Implementation of DIS for mapping short readings
and will be submitted as an input to another service both short reads and reference genomes necessary for
using the same cluster. Taking into account these cases solving the mapping problem are available. Therefore
can significantly reduce the amount of data transferred the files are provided to the service as links to this or
across the network and, thus, speedup the processing of any other FTP server.
requests. To solve the mapping problem, the BigBWA tool
Let us briefly consider security issues. Since the [10] was used, which implements the parallel execution
service users can not modify the code of the program of the well-known BWA package (Burrows-Wheeler
launched by the service on the cluster, the risk of aligner) in the MapReduce paradigm on the Hadoop
unauthorized access to data of other users is minimized. cluster. When accessing the service, the user can select
When implementing data caching on a cluster, the agent one of the mapping algorithms implemented in the
must also limit the re-use of confidential data only by BWA package. Additional fine-tuning of the algorithm
the user who originally provided this data. As for the parameters is currently not implemented. Also, all
distribution of cluster resources between users and the launches use a fixed reference human genome of about
isolation of computing processes, these functions are 5 GB in size preloaded on the cluster. The total amount
already implemented in the YARN manager. of input data of the problem on test runs was about 10-
Although the approach described in this section 15 GB.
implies the use of the Hadoop platform, it can be easily Upon the request submission, in accordance with the
adapted to any other big data storage and processing scheme described in Chapter 3, the direct downloading
platform. of the read files from the FTP server to the Hadoop
cluster takes place. After downloading, the files are
4 Example DIS Implementation uncompresses and converted to the format used by
BigBWA. The downloaded files are cached and, if the
To demonstrate the described approach, a prototype file link in the request matches the already downloaded
service was implemented on Everest platform for one, the data loading step is skipped. After the data is
mapping short readings, one of the basic problems of loaded, the MapReduce job is launched with the
analyzing the results of the next generation sequencing BigBWA tool.
(NGS) in the bioinformatics domain. This task usually At the end of the job execution, the service returns
represents the initial and the most computationally to the user the path to the file with the mapping results
intensive stage of the NGS data analysis pipeline, on the cluster. This approach was chosen because, as
characterized by large volumes of input and output data. noted earlier, reads mapping is only the initial step in
The basic scheme of the service implementation in the analysis of NGS data. Therefore, in practice, these
presented in Figure 2. results will usually be immediately passed as an input to
The service requires one or two (paired) files with another service in the data processing pipeline. At the
reads in the FASTQ format to be provided by a user. user's request, the mapping results can also be uploaded
The size of these files in compressed form is usually to an external FTP server. The output data on the test
several gigabytes. The public repository of the 1000 runs was about 5-10 GB in the SAM format. In the
Genomes Project was chosen as the main input data future, it is planned to convert the results into a more
storage. This repository provides the ability to compact BAM format.
download data from the dedicated FTP server where The solution of the reads mapping problem on the
112
Hadoop cluster via the created service allowed to [5] Nothaft F. A. et al. Rethinking Data-intensive
significantly reduce the data processing time. For Science Using Scalable Analytics Systems //
example, the launch of the BWA package for mapping 2015 ACM SIGMOD International Conference
on two reads on a single server in 4 threads took more on Management of Data. ACM, 2015. P. 631–
than an hour, while the similar launch through a service 646.
(28 map-tasks) took about 10 minutes. [6] Foster I., Kesselman C. (ed.). The Grid 2:
Blueprint for a New Computing Infrastructure.
5 Conclusion Elsevier, 2003.
[7] Sukhoroslov O., Volkov S., Afanasiev A. A
In this paper, we considered the characteristic features
and requirements for the implementation of data- Web-based Platform for Publication and
intensive services for working with large data sets. An Distributed Execution of Computing
approach to the implementation of these services based Applications // 14th International Symposium
on the Everest platform, initially focused on the creation on Parallel and Distributed Computing
of computing services with a small amount of data, is (ISPDC). 2015. P. 175–184.
proposed. A distinctive feature of this approach, in [8] Everest. http://everest.distcomp.org/
comparison with commercial cloud solutions, is support [9] Chard K., Tuecke S., Foster I. Efficient and
for the rapid implementation of services based on Secure Transfer, Synchronization, and Sharing
existing computing resources and data repositories. An of Big Data // Cloud Computing. IEEE, 2014.
example of a created service that implements the Vol. 1, No. 3. P. 46–55.
analysis of next-generation sequencing data on the [10] Abu´ın J. M. et al. BigBWA: Approaching the
Hadoop cluster is described. Burrows–Wheeler Aligner to Big Data
Besides further development of the individual Technologies // Bioinformatics. 2015.
elements of the described approach, future work will doi:10.1093/bioinformatics/btv506
focus on remaining challenges. For instance, many
existing data repositories are not well prepared for
immediate use and require considerable information
integration efforts. There is also an increasing demand
for processing of data streams. We plan to investigate
the use of data integration and stream processing
frameworks within the proposed approach to address
these issues. We also plan to evaluate the proposed
approach on case study applications using larger data
sets or combining data from multiple repositories.
Acknowledgements
This work is supported by the Russian Science
Foundation (project No. 16-11-10352).
References
[1] Madduri R. et al. Experiences Building Globus
Genomics: A Next-generation Sequencing
Analysis Service Using Galaxy, Globus, and
Amazon Web Services // Concurrency and
Computation: Practice and Experience. 2014.
Vol. 26, No. 13. P. 2266–2279.
[2] Madduri R. et al. PDACS: A Portal for Data
Analysis Services for Cosmological
Simulations // Computing in Science &
Engineering. 2015. Vol. 17, No 5. P. 18–26.
[3] Ekanayake J., Pallickara S., Fox G.
MapReduce for Data Intensive Scientific
Analyses // 2008 IEEE International
Conference on eScience (eScience’08). IEEE,
2008. P. 277–284.
[4] Zhang Z. et al. Scientific Computing Meets
Big Data Technology: An Astronomy Use
Case // 2015 IEEE International Conference on
Big Data. IEEE, 2015. P. 918–927.
113