Development of Data-Intensive Services with Everest © Oleg Sukhoroslov © Alexander Afanasiev Institute for Information Transmission Problems of the Russian Academy of Sciences, Moscow sukhoroslov@iitp.ru afanasiev@iitp.ru Abstract. The paper considers development of domain-specific web services for processing of large volumes of data on high-performance computing resources. The development of these services is associated with a number of challenges, such as integration with external data repositories, implementation of efficient data transfer, management of user data stored on the resource, execution of data processing jobs and provision of remote access to the data. An approach for building big data processing services on the base of Everest platform is presented. The proposed approach takes into account the characteristic features and supports rapid deployment of these services on the base of existing computing infrastructure. An example of service for short-read sequence alignment that processes the next-generation sequencing data on a Hadoop cluster is described. Keywords: big data, web services, data transfer, data management, distributed data processing 1 Introduction Amazon ML, Microsoft Azure ML, Databricks Cloud, are general-purpose platforms, including a set of The explosive growth of data, observed in a variety of universal services, as well as its own infrastructure for areas from research to commerce, requires the use of storing and processing data. high-performance resources and efficient means for There is a lack of best practices for implementation storing and processing large amounts of data. During of DIS on the basis of the existing infrastructure for big the last decade, the distributed data processing data processing such as a cluster running Hadoop or technologies like Hadoop and Spark are emerged. Spark platforms which are increasingly used for the However, the complexity of the hardware and software analysis of scientific data [3-5]. Also, little attention is infrastructure prevents its direct use by non-specialists, paid to the integration of DIS with existing repositories and requires the creation of user-friendly tools to solve and data warehouses, including the cloud-based ones, as particular classes of problems. well as other services. A lot of experience in the One way of implementing such tools is the creation integration of distributed resources for storing and of domain-specific services based on the Software as a processing data has been accumulated within the grid Service model. This model allows users of such services infrastructures [6], however these environments are to quickly, without installing software, reuse ready- complex for use by researchers and do not support the made implementations of data processing methods in a use of new models of computations and technologies particular domain. At the same time, the user does not such as Hadoop. Finally, there is a lack of platforms for need to delve into the peculiarities of storing and implementation and deployment of DIS that would processing data on high-performance resources behind provide ready-made solutions of typical problems these services. encountered when creating this kind of services. Data-intensive services (DIS), in comparison to This work is designed to fill these gaps. Chapter 2 conventional computational services with a small describes the characteristics and requirements for DIS. amount of data, started to develop recently, so the Chapter 3 discusses the principles of implementation of principles and variants of implementation of these the DIS based on the Everest platform, initially focused services are poorly understood. There are several on creating services working with a small amount of academic projects aimed at supporting specific areas of data. A distinctive feature of the proposed approach is research, for example, the Globus Genomics [1] service support for the rapid implementation of DIS based on for analyzing the next-generation sequencing data and available computing resources and data warehouses. the PDACS portal [2] for storing and analyzing data in Chapter 4 describes an example of a service based on the cosmology domain. The first system uses Amazon the presented approach for mapping short reads on the cloud resources as a computing infrastructure, while the Hadoop cluster. second uses the resources of the NERSC and Magellan science cloud. Commercial cloud solutions, such as 2 Characteristics and Requirements for DIS Proceedings of the XIX International Conference Consider typical requirements for DIS that represent “Data Analytics and Management in Data Intensive remotely available services for solving a certain class of Domains” (DAMDID/RCDL’2017), Moscow, Russia, October 10-13, 2017 109 problems with a large amount of input data. Such 3 Implementation of DIS with Everest services should provide remote interfaces, usually in the form of a web user interface and application Everest [7-8] is a web-based distributed computing programming interface (API). The interface must allow platform. It provides users with tools to quickly publish the user to specify the input datasets and parameters of and share computing applications as services. The the problem being solved in terms of subject area. platform also manages execution of applications on DIS must use high-performance and scalable external computing resources attached by users. In (normally distributed) implementations of data analysis contrast to traditional distributed computing platforms, algorithms, requiring appropriate computing Everest implements the Platform as a Service (PaaS) infrastructure for data processing and storage. Such model by providing its functionality via remote web and infrastructure is generally represented by one or more programming interfaces. A single instance of the computing clusters running Hadoop platform or a platform can be accessed by many users in order to similar technology. DIS must translate the user request create, run and share applications with each other. The into one or more computing jobs that are submitted on a platform implements integration with servers and cluster and use scalable implementations (e.g., based on computing clusters using an agent that runs on the MapReduce) of perspective algorithms. resource side and plays the role of mediator between the The user must be able to pass arbitrary input data to platform and resources. The platform is publicly DIS. If the data is initially located on the user's available online to interested users [8]. computer or external storage resource (e.g., a data The advantage of using Everest platform to create repository) DIS must implement the transfer of data DIS is the availability of ready-made tools for rapid over a network to the used cluster. When transferring deployment of computational services and integration large amounts of data it is important to ensure the with computing resources that do not require a separate maximum transfer rate and automatic failover. Since the installation of the platform. At the same time, since the process of working with big data is often exploratory, platform was originally created to support services with requiring multiple invocations of DIS, the service a small amount of data, the effective implementation of should support reuse of data loaded to the cluster. In DIS on the base of Everest requires a number of order to optimize the use of network DIS must also improvements. In particular, it is necessary to cache frequently used datasets on the cluster. Data implement support of direct data transfers from external transfer functions can also be implemented as separate storage to the resource and vice versa, bypassing the auxiliary services. platform. In addition, it is required to implement the Importantly, DIS may operate separately from integration of the agent with the components of Hadoop computing resources used for real data processing. DIS platform platform or similar technology used for data can use multiple resources, that can be situated at storage and processing on the cluster. different locations. It is also possible that the service Figure 1 presents the proposed scheme of uses the resources provided by the user. In such cases it implementation of DIS on the base of Everest platform is important for reasons of efficiency to avoid passing and existing Hadoop cluster. Consider the scenario of the input data from the user to the resource through the using the service, which includes the following steps service and to transmit the data directly. marked in the figure. In practice, the data analysis is often a multi-step In step 1, the user uploads the data of interest to process that requires performing different tasks at some available on the network or selects data already different stages of the analysis. In such cases, the results present in the storage. This storage can be represented produced by one DIS can be passed as the input to by cloud services (Dropbox, Google Drive, etc.), another service. If these services use different resources, scientific data repositories (Dataverse, FigShare, there also arises a problem of data transmission between Zenodo, etc.), specialized databases (for example, 1000 resources. In general DIS should allow the user to Genomes Project), grid services or file servers (HTTP, download the output to his computer or an external FTP, GridFTP, rsync protocols). A wide range of resource, as well as to transfer the data directly to existing storage facilities makes the task of integrating another service. In addition, DIS may provide additional DIS with them more important, in comparison with the functionality for remote data preview and visualization. duplication of their functionality in the service itself. These functions may also be implemented as separate Note that the user's computer can also act as a data auxiliary services. store. In this case, the user needs to deploy a software DIS must support the simultaneous use by multiple that provides network access to the user's files. The users. This requires the protection of user data, resource experience of implementing such software to ensure the distribution between users and isolation of reliable transfer of scientific data across the network is computational processes. In the case of cloud already available [9]. infrastructure, DIS must also manage dynamic In step 2, the user prepares and sends a request to allocation and deallocation of resources in the cloud, the DIS, including a link to the input data and the values according to the current load. of other input parameters required by the service. The passed link should allow downloading the data from the 110 Figure 1 Implementation of DIS on the base of Everest platform and Hadoop cluster external storage without the user's participation. In task description on the given input data. The launch is some cases, this requires that the user first supply the performed through the cluster resource manager such as service with access credentials to the storage, such as an Yet Another Resource Negotiator (YARN), a OAuth token or a proxy certificate. component of the Hadoop platform that supports the In step 3, based on the user request, the service launch of MapReduce and Spark programs. A special generates a computational task and sends it to the agent adapter was implemented in order to support interaction located on the resource used by the service. Together of Everest agent with YARN, similar in function to the with the task description, the service sends to an agent a previously created adapters for integration with HPC link to the input data. As shown in the figure, when batch schedulers.. After the launch, the agent monitors sending a task from the service to the agent, the code of the status of the corresponding job (the application in the software implementation used for data processing terms of YARN) and broadcasts the progress can also be transferred. information to the service (step 6), which in turn The Hadoop and Spark platforms, most commonly displays this information to the user through the web used for distributed data processing, use the Java, Scala, interface. Upon completion of the program, the agent and Python languages for implementation of data transmits to the service the output files (of small size) processing algorithms. Unlike C and Fortran, often used and the final status of the job. in scientific parallel applications, programs in these If a large amount of data is produced as a result of languages can be relatively easily transferred from one the program execution, the agent must support direct cluster to another, including their dependencies, without network transfer of this data to the user specified the need for compilation. This opens the possibility for external storage (step 7). The information required for implementation of services on the basis of already this must be transmitted by the user when sending a created programs and libraries for Hadoop and Spark, request to the service in step 2. At the moment, the which can be used in conjunction with an arbitrary upload of output data to the specified FTP server or cluster specified by the user. This model significantly Dropbox folder is implemented. simplifies the publication and reuse of developments in In step 8, when the request is processed, the service this field, without requiring the owner of the service to sends the results to the user as a set of output provide their own resources. This also avoids the parameters and links to the output files. Some of these multiple implementations of services that use a single files can be stored by the service itself (for example, the program with different resources. program execution log), and some of them can be stored In step 4, the agent downloads input data from the on a cluster or located in an external storage. external storage to the local cluster. To implement this Note that the steps 1, 4 and 7, marked with an step, it is planned to add support for loading data from asterisk and associated with the transmission of data major types of repositories and storage. Currently the over the network, are not always required or may be basic support for downloading files via HTTP and FTP omitted. For example, step 1 is not required if the data protocols, as well as an experimental integration with is already in an external storage or on a cluster, which is Dropbox and Dataverse repository are implemented. true for frequently used data sets. Step 4 can be skipped The downloaded data is placed in the Hadoop if the data has already been downloaded to the cluster Distributed File System (HDFS) on the cluster, where it by the agent or manually by the administrator. To do can be accessed by the program launched in the next this, the agent must store information about the step. downloaded data and cache it for reuse. Step 7 is not In step 5, the agent runs the program specified in the required if the received data is an intermediate result 111 Figure 2 Implementation of DIS for mapping short readings and will be submitted as an input to another service both short reads and reference genomes necessary for using the same cluster. Taking into account these cases solving the mapping problem are available. Therefore can significantly reduce the amount of data transferred the files are provided to the service as links to this or across the network and, thus, speedup the processing of any other FTP server. requests. To solve the mapping problem, the BigBWA tool Let us briefly consider security issues. Since the [10] was used, which implements the parallel execution service users can not modify the code of the program of the well-known BWA package (Burrows-Wheeler launched by the service on the cluster, the risk of aligner) in the MapReduce paradigm on the Hadoop unauthorized access to data of other users is minimized. cluster. When accessing the service, the user can select When implementing data caching on a cluster, the agent one of the mapping algorithms implemented in the must also limit the re-use of confidential data only by BWA package. Additional fine-tuning of the algorithm the user who originally provided this data. As for the parameters is currently not implemented. Also, all distribution of cluster resources between users and the launches use a fixed reference human genome of about isolation of computing processes, these functions are 5 GB in size preloaded on the cluster. The total amount already implemented in the YARN manager. of input data of the problem on test runs was about 10- Although the approach described in this section 15 GB. implies the use of the Hadoop platform, it can be easily Upon the request submission, in accordance with the adapted to any other big data storage and processing scheme described in Chapter 3, the direct downloading platform. of the read files from the FTP server to the Hadoop cluster takes place. After downloading, the files are 4 Example DIS Implementation uncompresses and converted to the format used by BigBWA. The downloaded files are cached and, if the To demonstrate the described approach, a prototype file link in the request matches the already downloaded service was implemented on Everest platform for one, the data loading step is skipped. After the data is mapping short readings, one of the basic problems of loaded, the MapReduce job is launched with the analyzing the results of the next generation sequencing BigBWA tool. (NGS) in the bioinformatics domain. This task usually At the end of the job execution, the service returns represents the initial and the most computationally to the user the path to the file with the mapping results intensive stage of the NGS data analysis pipeline, on the cluster. This approach was chosen because, as characterized by large volumes of input and output data. noted earlier, reads mapping is only the initial step in The basic scheme of the service implementation in the analysis of NGS data. Therefore, in practice, these presented in Figure 2. results will usually be immediately passed as an input to The service requires one or two (paired) files with another service in the data processing pipeline. At the reads in the FASTQ format to be provided by a user. user's request, the mapping results can also be uploaded The size of these files in compressed form is usually to an external FTP server. The output data on the test several gigabytes. The public repository of the 1000 runs was about 5-10 GB in the SAM format. In the Genomes Project was chosen as the main input data future, it is planned to convert the results into a more storage. This repository provides the ability to compact BAM format. download data from the dedicated FTP server where The solution of the reads mapping problem on the 112 Hadoop cluster via the created service allowed to [5] Nothaft F. A. et al. Rethinking Data-intensive significantly reduce the data processing time. For Science Using Scalable Analytics Systems // example, the launch of the BWA package for mapping 2015 ACM SIGMOD International Conference on two reads on a single server in 4 threads took more on Management of Data. ACM, 2015. P. 631– than an hour, while the similar launch through a service 646. (28 map-tasks) took about 10 minutes. [6] Foster I., Kesselman C. (ed.). The Grid 2: Blueprint for a New Computing Infrastructure. 5 Conclusion Elsevier, 2003. [7] Sukhoroslov O., Volkov S., Afanasiev A. A In this paper, we considered the characteristic features and requirements for the implementation of data- Web-based Platform for Publication and intensive services for working with large data sets. An Distributed Execution of Computing approach to the implementation of these services based Applications // 14th International Symposium on the Everest platform, initially focused on the creation on Parallel and Distributed Computing of computing services with a small amount of data, is (ISPDC). 2015. P. 175–184. proposed. A distinctive feature of this approach, in [8] Everest. http://everest.distcomp.org/ comparison with commercial cloud solutions, is support [9] Chard K., Tuecke S., Foster I. Efficient and for the rapid implementation of services based on Secure Transfer, Synchronization, and Sharing existing computing resources and data repositories. An of Big Data // Cloud Computing. IEEE, 2014. example of a created service that implements the Vol. 1, No. 3. P. 46–55. analysis of next-generation sequencing data on the [10] Abu´ın J. M. et al. BigBWA: Approaching the Hadoop cluster is described. Burrows–Wheeler Aligner to Big Data Besides further development of the individual Technologies // Bioinformatics. 2015. elements of the described approach, future work will doi:10.1093/bioinformatics/btv506 focus on remaining challenges. For instance, many existing data repositories are not well prepared for immediate use and require considerable information integration efforts. There is also an increasing demand for processing of data streams. We plan to investigate the use of data integration and stream processing frameworks within the proposed approach to address these issues. We also plan to evaluate the proposed approach on case study applications using larger data sets or combining data from multiple repositories. Acknowledgements This work is supported by the Russian Science Foundation (project No. 16-11-10352). References [1] Madduri R. et al. Experiences Building Globus Genomics: A Next-generation Sequencing Analysis Service Using Galaxy, Globus, and Amazon Web Services // Concurrency and Computation: Practice and Experience. 2014. Vol. 26, No. 13. P. 2266–2279. [2] Madduri R. et al. PDACS: A Portal for Data Analysis Services for Cosmological Simulations // Computing in Science & Engineering. 2015. Vol. 17, No 5. P. 18–26. [3] Ekanayake J., Pallickara S., Fox G. MapReduce for Data Intensive Scientific Analyses // 2008 IEEE International Conference on eScience (eScience’08). IEEE, 2008. P. 277–284. [4] Zhang Z. et al. Scientific Computing Meets Big Data Technology: An Astronomy Use Case // 2015 IEEE International Conference on Big Data. IEEE, 2015. P. 918–927. 113