=Paper=
{{Paper
|id=Vol-3041/445-449-paper-82
|storemode=property
|title=SQL Query Execution Optimization on Spark SQL
|pdfUrl=https://ceur-ws.org/Vol-3041/445-449-paper-82.pdf
|volume=Vol-3041
|authors=Gleb Mozhaiskii,Vladimir Korkhov,Ivan Gankevich
}}
==SQL Query Execution Optimization on Spark SQL==
Proceedings of the 9th International Conference "Distributed Computing and Grid Technologies in Science and Education" (GRID'2021), Dubna, Russia, July 5-9, 2021 SQL QUERY EXECUTION OPTIMIZATION ON SPARK SQL G. Mozhaiskii, V. Korkhov, I. Gankevich Saint Petersburg State University, 13B Universitetskaya Emb., Saint Petersburg, 199034, Russia E-mail: i.gankevich@spbu.ru Spark and Hadoop ecosystem includes a wide variety of different components and can be integrated with any tool required for Big Data nowadays. From release-to-release developers of these frameworks optimize the inner work of components and make their usage more flexible and elaborate. Nevertheless, since inventing MapReduce as a programming model and the first Hadoop releases data skew has been the main problem of distributed data processing. Data skew leads to performance degradation, i.e., slowdown of application execution due to idling while waiting for the resources to become available. The newest Spark framework versions allow handling this situation easily from the box. However, there is no opportunity to upgrade versions of tools and appropriate logic in the case of corporate environments with multiple large-scale projects development of which was started years ago. In this article we consider approaches to execution optimization of SQL query in case of data skew on concrete example with HDFS and Spark SQL 2.3.2 version usage. Keywords: Big Data, Spark SQL, distributed data processing, HDFS Gleb Mozhaiskii, Vladimir Korkhov, Ivan Gankevich Copyright © 2021 for this paper by its authors. Use permitted under Creative Commons License Attribution 4.0 International (CC BY 4.0). 445 Proceedings of the 9th International Conference "Distributed Computing and Grid Technologies in Science and Education" (GRID'2021), Dubna, Russia, July 5-9, 2021 1. Introduction The main purpose of this work is to optimize the execution time of certain SQL query (fig. 1), which uses SQL JOIN operation and filters the result set by specified parameters, performed on large amount of distributed data with minor source code changes. Data contain records about events in multiple tables that we join and correlate, multiple event tables can be combined by keys and occurrence order. Required tools to use are Apache Spark framework version 2.3.2 and its SQL module. The data represented in Apache Parquet files format with SNAPPY1 compression and stored in HDFS. SELECT U.field01, U.field02, U.field03, I.field03 AS I_DATE, U.field04 FROM UE U LEFT JOIN (SELECT field02,field01,field03, (LAG(field03,1,0) OVER win) AS previous_date, (LEAD(field03,1,0) OVER win) AS next_date FROM IntraFreq WINDOW win AS (PARTITION BY field02, field01 ORDER BY field03)) I ON U.field02 = I.field02 AND U.field01 = I.field01 AND ABS(U.field03-I.field03) < ABS(U.field03-I.previous_date) AND ABS(U.field03-I.field03) < ABS(U.field03-I.next_date) Figure 1. SQL query execution of which we optimized. The query is executed on Apache YARN cluster using the following configuration: • 4 Spark executor instances; • Spark submit mode is cluster; • Spark driver has 4 GB of RAM, 1 CPU kernel; • Spark executor has 16 GB RAM, 4 CPU kernels. 2. Data representation The data were read using Dask python3 library than converted to Pandas data frames and finally read by PySpark. The dimension of the first table is 92 797 197×10 and of the second one is 11 268 684×10. There is a significant difference in the dimensions of the data tables. It makes a direct impact on the performance of SQL JOIN operation in case of data distribution. 3. Spark analytics The examination of the query execution process provided by Spark analytics module gives the following results: the data extraction process takes 7.6 seconds and leads to unbalanced shuffle read- write of 877.4 MB (fig.2). That means that the certain part of computational time is spent on sending data over the network from one cluster node to another which means slowdown of the whole application execution flow. The query execution time is 31.65 seconds, and the total time is around 39.26 seconds. The join algorithm which Spark uses is SortMergeJoin which shows bad performance on tables with significant difference in dimensions. 1 https://github.com/google/snappy 446 Proceedings of the 9th International Conference "Distributed Computing and Grid Technologies in Science and Education" (GRID'2021), Dubna, Russia, July 5-9, 2021 SortMergeJoin is composed of 2 steps. The first step is to sort the datasets and the second operation is to merge the sorted data in the partition by iterating over the elements and according to the join key join the rows having the same value. It is default algorithm that Spark uses for joins, but it is not suitable for our data, because the partitions should be co-located. Otherwise, there will be shuffle operations to redistribute the rows that are joined as it has a pre-requirement that all rows having the same value for the join key should be stored in the same partition. Moreover, the partitions size per node are too large, the recommended size should be between 30MB and 100MB per execution node according to [1]. Figure 2. The execution statistic per node before optimization. 4. Optimization To optimize time efficiency of SQL query (fig. 1) next approaches were used: BroadcastHashJoin, spark.shuffle.partitions parameter tuning and bucketing. According to [2] the MAPJOIN (or BROADCAST) hint shows good performance boost in case of similar data features. The BroadcastHashJoin (MAPJOIN) will spread one of the tables to all executor nodes, so exchange stage execution time will be reduced. That approach (fig. 3) will save time spent previously on data routing over the network and reduce shuffle read-write volume to 111.5MB, but the problem with non-uniform data distribution for processing between execution nodes is still actual. SELECT /*+ BROADCAST(I) */ U.field01, U.field02, U.field03, I.field03 AS I_DATE, U.field04 FROM UE U LEFT JOIN (SELECT field02,field01,field03, (LAG(field03,1,0) OVER win) AS previous_date, (LEAD(field03,1,0) OVER win) AS next_date FROM IntraFreq WINDOW win AS (PARTITION BY field02, field01 ORDER BY field03)) ON U.field02 = I.field02 AND U.field01 = I.field01 AND ABS(U.field03-I.field03) < ABS(U.field03-I.previous_date) AND ABS(U.field03-I.field03) < ABS(U.field03-I.next_date) Figure 3. SQL query optimized using BroadcastHashJoin. The spark.shuffle.partitions parameter according to [1] should give uniform data distribution across executor nodes and its size should be between 30MB and 100MB. Empirically the parameter value was set to 35. That provides uniform shuffle read-write around 35MB per execution node. Nevertheless, the exchange stage is still present while reading the table which will be broadcasted. To reduce this stage bucketing can be used [3]. Bucketing partitions the data and prevents 447 Proceedings of the 9th International Conference "Distributed Computing and Grid Technologies in Science and Education" (GRID'2021), Dubna, Russia, July 5-9, 2021 data shuffling. The data is allocated to a predefined number of buckets based on the value of one or more data columns. At the application startup time, data processing module calculates the hash values for specified columns and based on it places the data in one of the buckets. Bucketing provides following advantage - each bucket contains equal size of data, map-side joins perform better compared to a non-bucketed table. To implement bucketing the way of parquet file reading should be changed (fig. 4). CREATE TABLE IntraFreq USING PARQUET CLUSTERED BY (field01) SORTED BY (field02 ASC, field01 ASC, field03 ASC) INTO $partitions_number buckets AS select field01, field02, field03 from parquet.`$path` Figure 4. The way to read parquet file to use bucketing. 5. Results and future SQL query (fig. 1) optimization techniques and corresponding results presented in table 1 and figure 5. Nevertheless, these optimization approaches are applicable only when the size of the broadcasted table is lower than the amount of memory allocated for Spark driver and executors. In the future, we plan to implement other optimizations including custom partitioner and custom shuffle manager. Table 1. The optimization techniques and their effect on query execution time Set up “SELECT” query Data Reding time Total time time SortMergeJoin ~31.65 sec ~7.6 sec ~40.38 sec BroadcastHashJoin + spark.shuffle.partitions = 35 + ~11.9 sec ~19.2 sec ~31.28 sec bucketing 448 Proceedings of the 9th International Conference "Distributed Computing and Grid Technologies in Science and Education" (GRID'2021), Dubna, Russia, July 5-9, 2021 Figure 5. DAG after optimizations. References [1] A. Yudovin. Essential Optimization Methods to Make Apache Spark Work Faster // ALTOROS. – 2019. – URL: https://www.altoros.com/research-papers/essential-optimization-methods-to-make- apache-spark-work-faster/. [2] Ji X. et al. Query Execution Optimization in Spark SQL // Scientific Programming. – 2020. – Vol. 2020. [3] Pawan Singh Negi. Bucketing in Spark: Spark job optimization using Bucketing // Clairvoyant Blog. – 2021. – URL: https://blog.clairvoyantsoft.com/bucketing-in-spark-878d2e02140f. 449