=Paper= {{Paper |id=Vol-1468/bd2015_obrien |storemode=property |title=VariantSpark: Applying Spark-based machine learning methods to genomic information |pdfUrl=https://ceur-ws.org/Vol-1468/bd2015_obrien.pdf |volume=Vol-1468 }} ==VariantSpark: Applying Spark-based machine learning methods to genomic information== https://ceur-ws.org/Vol-1468/bd2015_obrien.pdf
     VariantSpark: Applying Spark-based
     machine learning methods to genomic
                 information
                      Aidan R. O’BRIENa and Denis C. BAUER a,1
                       a
                         CSIRO, Health and Biosecurity Flagship


          Abstract. Genomic information is increasingly being used for medical research,
          giving rise to the need for efficient analysis methodology able to cope with
          thousands of individuals and millions of variants. Catering for this need, we
          developed VariantSpark, a framework for applying machine learning algorithms in
          MLlib to genomic variant data using the efficient in-memory Spark compute
          engine. We demonstrate a speedup compared to our earlier Hadoop-based
          implementation as well as a published Spark-based approach using the ADAM
          framework. Adapting emerging methodologies for fast efficient analysis of large
          diverse data volumes to process genomic information, will be the cornerstone for
          precision genome medicine, which promises tailored healthcare for diagnosis and
          treatment.

          Keywords. Spark, genomics, clustering,



Introduction

We apply Apache Spark to publicly available genomic data. We demonstrate the
potential Spark has to process huge amounts of data, as well as its scalability and
flexibility. We also compare it to our previous implementation of similar techniques in
Apache Hadoop, in regards to speed and performance.


1. Background

With the decreasing costs of DNA sequencing [1], coupled with large cohorts, the
amount of data available for processing can quickly become overwhelming. The 1000
Genomes Project [2], for example, has made publicly available 100s of gigabytes of
VCF (Variant Call Format) files containing the genomic variants of over 1000
individuals. To cope with these huge amounts of data, challenges such as memory
requirements need to be taken into consideration, which are not addressed by other
parallelisation strategies like OpenMPI or hardware accelerators (GPGPU). Projects
such as ‘Apache Hadoop MapReduce’ [3] can transform data into ‘key-value pairs’ that
can then be distributed between multiple nodes across a compute-cluster, effectively
allowing the abstraction of one large job into many smaller, independent jobs.
Unfortunately, the MapReduce paradigm that Hadoop is based on is not always the
optimal solution, and can prove to be an obstacle when designing jobs. In addition,
Hadoop is disk IO intensive, and this can prove to be a bottleneck in processing-speed.
     ‘Apache Spark’ [4] is a more recent compute engine, which overcomes many of
Hadoop’s limitations. One of the main benefits is that it allows programs to cache data
in memory; potentially eliminating, or at least reducing, the bottleneck of disk IO.
When utilising caching, Apache claim Spark to be 100x faster than Hadoop. Although
Spark allows MapReduce-like programs, it does not require programs to exactly model
the MapReduce paradigm, which in turn allows more flexibility when designing
programs. Coupled with MLlib (Spark’s machine-learning library), the possibilities to
apply Spark to genomic data are endless.
     Recognising this, the Big Data Genomics (BDG) group has recently demonstrated
the strength of Spark in a genomic clustering application using ADAM, a set of formats
and APIs as well as processing stage implementations for genomic data [5]. While the
speedup over traditional methods was impressive, being limited by constraints within
this general genomics framework hampered performance. We hence developed a more
focused purpose-built application in Spark to perform k-means clustering of individuals.
Using this we are able to compare various processing, software and clustering
techniques; i.e., a similar implementation in Hadoop and Mahout.


2. Methods

We obtain the genomic data from the 1000 genome project as VCF file from
http://www.1000genomes.org/data. It represents a 1092 (individuals) x 11,815,007
(variants) dimensional dataset of individual genotypes. Furthermore, we also download
the metadata containing the population association for each individual.
We implemented VariantSpark in Scala, leveraging Apache Spark and MLLib. The aim
of the first step is to read in VCF files and effectively transpose them to vectors of each
individual’s variants. Using tuples to store the data in key-value pairs allows us to
achieve this task in a distributed fashion that should scale to allow for an input of
hundreds of gigabytes. Effectively, we split each line from the VCF file(s) into an array.
We then zip each item in each array with its heading from the VCF file, resulting in an
array of tuples (See Figure 1 Spark “Zip with header”). Each of these arrays now stores
a single variant for every individual in the dataset and can be evenly distributed across
the cluster in Spark’s Resilient Distributed Dataset (RDD) format [6]. However, further
processing is required, as we need arrays of individuals, not arrays of variants.
     We therefore zip each array with a unique index representing a variant (See Figure
1 Spark “Zip with index”). We now have a tuple containing an integer paired with an
array of yet more tuples. We can now use the FLATMAP function to effectively flatten
the data structure and get closer to the structure we require. The FLATMAP, with a
custom inner function, results in a tuple for every variant across every individual. This
tuple contains the individual’s ID as the ‘key’, and as the ‘value’, a secondary tuple of
the variant index and the variant (See Figure 1 Spark “Flatmap”).
     The value of each variant is the distance from the wild-type allele. For example, a
homozygous variant is 2, a heterozygous variant is 1 and no variant is 0. We calculate
this value from the variant string during the aforementioned FLATMAP stage. In
addition, to keep data-sizes to a minimum, we remove any variants with a value of zero.
Therefore, we can store the data as a sparse vector, rather than a dense vector. The
sparse vectors store non-zero variants with their index, rather than the entire array of
variants, as a dense-vector would do.
     We need to group the collection of tuples by the individual using Spark’s
GROUPBYKEY. Following this function, we have an array for each individual,
containing the variant-tuples for said individual (See Figure 1 Spark “Group by
individual”). This is essentially the structure of a sparse vector, and we can map each
array to the sparse vector structure required by MLlib’s machine-learning algorithms.
     We proceed to create an MLlib k-means model and train the model on this data.
We report the accuracy based on the resulting cluster-set compared to the expected
result, according to each individual’s population and super population. We run a
maximum of ten k-means iterations and use the Manhattan distance measure.


3. Results and discussion

3.1. Comparison to Hadoop

We previously implemented a similar program in Hadoop using the Apache Mahout
machine-learning library. Although falling shy of Apache’s claim, we observe a
speedup of more than 50% with our Spark implementation. In Hadoop, clustering
under 1 million variants took close to 16 minutes. Processing and clustering over 1
million variants using Spark, on the other hand, took less than 7 minutes. One
contributing factor to the speedup is in-memory caching. Unlike Hadoop, which writes
the output of its data transformations to disk, Spark can cache intermediate data-
structures in memory. This minimises disk IO and removes a huge bottleneck. Another
reason for the better performance is the different programming model. Hadoop relies
on the MapReduce paradigm, which, although easy to use with distributed systems [7],
can be restrictive in job design. A ‘Map’ task must generally precede a ‘Reduce’ task.
Spark implements Map and Reduce functions, however a Spark job can call these
functions in any order. This allowed us to implement the VCF transformations in a
more optimised way (See Figure 1 for a comparison between the Spark and Hadoop
implementation).

3.2. Comparison to Big Data Genomics

Unlike the Big Data Genomics (BDG) implementation, we perform transformations on
the variant data directly from VCF files. Although the ADAM columnar data-structure
may have benefits for certain operations, for k-means clustering, we observed better
performance forgoing the ADAM format (Table 2).
     We processed and clustered the human chromosome 22, which includes close to
500,000 variants, using the same options as BDG. The process took approximately 1.3
minutes to complete. This includes training and predicting, and is faster than the 2
minutes runtime reported by BDG. Our method also eliminates the time required for
converting VCF files to the ADAM format. However, a trade-off for our faster
approach is disk-space. Because the ADAM format uses compressed files, as opposed
to uncompressed VCF files, their file-sizes would be smaller.
3.3. Scalability

We demonstrate the potential of Spark to scale from a subset of chromosome 1
containing less than one hundred thousand variants, to multiple chromosomes
containing millions of variants. The time required scales in a linear-fashion, based on
the number of variants present in the data. However, the overhead of Spark becomes
apparent with relatively small datasets, as can be seen in Table 2, where the times
required for completing the two smallest jobs only differ by one second.
For the smaller jobs, Spark requires only 1GB memory per executor (or core).
Increasing the number of variants beyond 2 million requires a greater memory
allocation for each executor. Although this is not a problem with high-memory clusters,
on smaller clusters, such as our four-node Azure cluster, increasing the memory
allocation beyond 1GB will limit the number of executors that can run simultaneously,
due to memory-constraints. Although of no advantage to clustering quality, as we
explain in the next section, we further demonstrate the scalability by increasing the
dataset to include multiple chromosomes. These jobs complete successfully, albeit after
more time and with higher memory requirements (Table 1).
     Spark not only scales well with data size, but also with cluster size. To
demonstrate this we run identical jobs on our in-house cluster. Clustering every
chromosome 1 variant from VCF files takes only six minutes when using 80 executers.
As is expected, this is much faster than the small cluster which took over 17 minutes
for the same job. Although these two clusters are different in OS, size and hardware
configuration, a Spark jobs, being implemented in Scala (or Java), can run on any
cluster. The only change we make is to the launch script where we specify how many
executors to use.

3.4. Clustering quality

When we cluster the entire group of 1092 individuals, the majority are clustered with
individuals from the same super-population. This is portrayed in the Adjusted Rand
Index (ARI) [8], which is 0.8 (where a value of ‘1.0’ would represent a result where all
like-individuals are grouped together). Note, we chose ARI as it is frequently used to
compare the accuracy of two clusterings (here: our prediction compared to the true
population associations).
     We can substantially improve the accuracy to a perfect classification (ARI=1.0) by
removing the fourth super-population, AMR (American). Including AMR individuals,
we observe that the majority of these individuals are placed in the same group as
Europeans, likely accurately reflecting their migrational backgrounds. Only a minority
of AMR individuals form an independent group, likely comprising of genetic
information otherwise not captured by the 26 sub-populations of the 1000 genomes
project.
     Interestingly, we achieved these results with as few as one million variants, i.e. less
than half the variants available in chromosome 1, and increasing the size of the dataset
further saw no improvement on clustering the the AMR individuals.
4. Conclusion

We demonstrate the potential applications of processing VCF files using Apache Spark.
We successfully apply k-means clustering to over 1 thousand individuals with millions
of variants. We explore performance on different architectures starting with a relatively
small 24-node cluster and subsequently scale up to a cluster with hundreds of nodes.
We demonstrate our method of transforming and clustering individuals from VCF files
to be faster than clustering individuals stored in the ADAM format, albeit at the cost of
disk space.
In future work we aim to improve the disk space footprint by using compressed VCF
files. The 1000 genomes consortium distributes their VCF files with gzip compression,
which is unsuitable for use in Spark as it is not possible to split gzip files. We therefore
plan to investigate other compression methods, such as bz2, which Spark can read line
by line. We expect this to not only minimise storage requirements of the actual data,
but also further reduce runtime, making the approach attractive for applying the more
complicated machine learning approaches to genomic data.




         Figure 1. Schematic visualizing VCF processing using Hadoop and Spark respectively.




                Figure 2. Time scaling with increasing number of clustered variants.
 Table 1. Benchmarks from VCF clustering jobs from different variants and configurations. The ‘small’
 cluster has three nodes, with a total of 24 CPU cores and 36GB memory, running on Microsoft Azure. The
 ‘large’ cluster has fourteen nodes, with a total of 448 CPU cores and 1.31TB memory, running on an in-
 house Hadoop setup.
Cluster            Chr(s)           Size (Kbase)          Variants           Executers                 Time
                                                                             (memory)
     Small                   1              5,000              65,572            24 (1GB)                    3m 57s
                             1             10,000             138,840            24 (1GB)                    3m 58s
                             1             20,000             272,364            24 (1GB)                    4m 40s
                             1             40,000             531,230            24 (1GB)                    5m 43s
                             1             80,000           1,058,736            24 (1GB)                    6m 43s
                             1            120,000           1,598,000            24 (1GB)                    8m 14s
                             1            240,000           2,869,355            12 (2GB)                    17m 8s
     Large                   1             40,000             531,230            80 (1GB)                    1m 55s
                             1            240,000           2,869,355            80 (2GB)                    5m 25s
                             1            248,956           3,007,196            80 (2GB)                    6m 02s
                             1            248,956           3,007,196           400 (2GB)                    4m 30s
                            1-2           491,149           6,314,788            80 (3GB)                   14m 35s
                            1-4           879,658          11,815,007            80 (4GB)                   48m 24s


 Table 2. Benchmark from clustering individuals from VCF files and ADAM files. The time for ADAM
 clustering is reported by BDG. For both methods, the data was filtered to only include the populations listed.
 The time does not include the one-off task of extracting the compressed VCF file (required by both clustering
 methods) or the conversion of the VCF file to the ADAM format (only required for clustering the ADAM
 file).
 Cluster           Populations               Chr              Variants          Executers               Time
   ADAM          GBR, ASW, CHB                       22          494,328                    80              2m 01s
      VCF        GBR, ASW, CHB                       22          494,328                    80              1m 20s



 References

 [1] L.D. Stein, The case for cloud computing in genome informatics, Genome Biology 11 (2010), 207.
 [2] The 1000 Genomes Project Consortium, An integrated map of genetic variation from 1,092 human
      genomes, Nature 491 (2012), 56-65.
 [3] Borthakur, Dhruba. "The hadoop distributed file system: Architecture and design." Hadoop Project
      Website 11.2007 (2007): 21.
 [4] Zaharia, Matei, et al. "Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster
      computing." Proceedings of the 9th USENIX conference on Networked Systems Design and
      Implementation. USENIX Association, 2012.
 [5] Massie, Matt and Nothaft, Frank and Hartl, Christopher and Kozanitis, Christos and Schumacher, André
      and Joseph, Anthony D. and Patterson, David A, ADAM: Genomics formats and processing patterns
      for cloud scale computing, (2013) EECS Department, University of California, Berkeley
      http://www.eecs.berkeley.edu/Pubs/TechRpts/2013/EECS-2013-207.html
 [6] Matei Zaharia, Mosharaf Chowdhury, Tathagata Das, Ankur Dave, Justin Ma, Murphy McCauley,
      Michael J. Franklin, Scott Shenker, Ion Stoica. Resilient Distributed Datasets: A Fault-Tolerant
      Abstraction for In-Memory Cluster Computing Technical Report UCB/EECS-2011-82. July 2011.
 [7] D. Jeffrey and S. Ghemawat, MapReduce: Simplified data processing on large clusters, OSDI (2004)
      Sixth Symposium on Operating System Design and Implementation, San Francisco
 [8] L. Hubert and P. Arabie. Comparing partitions. Journal of Classification, 2(1):193–218, 1985.