<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>SQL QUERY EXECUTION OPTIMIZATION ON SPARK SQL</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>G. Mozhaiskii</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
          <xref ref-type="aff" rid="aff1">1</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>V. Korkhov</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
          <xref ref-type="aff" rid="aff1">1</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>I. Gankevich</string-name>
          <email>i.gankevich@spbu.ru</email>
          <xref ref-type="aff" rid="aff0">0</xref>
          <xref ref-type="aff" rid="aff1">1</xref>
        </contrib>
        <aff id="aff0">
          <label>0</label>
          <institution>Gleb Mozhaiskii</institution>
          ,
          <addr-line>Vladimir Korkhov, Ivan Gankevich</addr-line>
        </aff>
        <aff id="aff1">
          <label>1</label>
          <institution>Saint Petersburg State University</institution>
          ,
          <addr-line>13B Universitetskaya Emb., Saint Petersburg, 199034</addr-line>
          ,
          <country country="RU">Russia</country>
        </aff>
      </contrib-group>
      <pub-date>
        <year>2021</year>
      </pub-date>
      <fpage>5</fpage>
      <lpage>9</lpage>
      <abstract>
        <p>Spark and Hadoop ecosystem includes a wide variety of different components and can be integrated with any tool required for Big Data nowadays. From release-to-release developers of these frameworks optimize the inner work of components and make their usage more flexible and elaborate. Nevertheless, since inventing MapReduce as a programming model and the first Hadoop releases data skew has been the main problem of distributed data processing. Data skew leads to performance degradation, i.e., slowdown of application execution due to idling while waiting for the resources to become available. The newest Spark framework versions allow handling this situation easily from the box. However, there is no opportunity to upgrade versions of tools and appropriate logic in the case of corporate environments with multiple large-scale projects development of which was started years ago. In this article we consider approaches to execution optimization of SQL query in case of data skew on concrete example with HDFS and Spark SQL 2.3.2 version usage.</p>
      </abstract>
      <kwd-group>
        <kwd>Big Data</kwd>
        <kwd>Spark SQL</kwd>
        <kwd>distributed data processing</kwd>
        <kwd>HDFS</kwd>
      </kwd-group>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>1. Introduction</title>
      <p>The main purpose of this work is to optimize the execution time of certain SQL query (fig. 1),
which uses SQL JOIN operation and filters the result set by specified parameters, performed on large
amount of distributed data with minor source code changes. Data contain records about events in
multiple tables that we join and correlate, multiple event tables can be combined by keys and
occurrence order.</p>
      <p>Required tools to use are Apache Spark framework version 2.3.2 and its SQL module. The
data represented in Apache Parquet files format with SNAPPY1 compression and stored in HDFS.</p>
      <p>SELECT</p>
      <p>U.field01,
U.field02,
U.field03,
I.field03 AS I_DATE,</p>
      <p>U.field04
FROM UE U</p>
      <p>LEFT JOIN (SELECT field02,field01,field03,
(LAG(field03,1,0) OVER win) AS previous_date,
(LEAD(field03,1,0) OVER win) AS next_date
FROM IntraFreq</p>
      <p>WINDOW win AS (PARTITION BY field02, field01 ORDER BY field03))
I</p>
      <p>ON U.field02 = I.field02 AND</p>
      <p>U.field01 = I.field01 AND
ABS(U.field03-I.field03) &lt; ABS(U.field03-I.previous_date) AND
ABS(U.field03-I.field03) &lt; ABS(U.field03-I.next_date)</p>
    </sec>
    <sec id="sec-2">
      <title>2. Data representation</title>
      <p>The data were read using Dask python3 library than converted to Pandas data frames and
finally read by PySpark. The dimension of the first table is 92 797 197×10 and of the second one is
11 268 684×10. There is a significant difference in the dimensions of the data tables. It makes a direct
impact on the performance of SQL JOIN operation in case of data distribution.</p>
    </sec>
    <sec id="sec-3">
      <title>3. Spark analytics</title>
      <p>The examination of the query execution process provided by Spark analytics module gives the
following results: the data extraction process takes 7.6 seconds and leads to unbalanced shuffle
readwrite of 877.4 MB (fig.2). That means that the certain part of computational time is spent on sending
data over the network from one cluster node to another which means slowdown of the whole
application execution flow. The query execution time is 31.65 seconds, and the total time is around
39.26 seconds.</p>
      <p>The join algorithm which Spark uses is SortMergeJoin which shows bad performance on
tables with significant difference in dimensions.
1</p>
      <p>SortMergeJoin is composed of 2 steps. The first step is to sort the datasets and the second
operation is to merge the sorted data in the partition by iterating over the elements and according to the
join key join the rows having the same value. It is default algorithm that Spark uses for joins, but it is
not suitable for our data, because the partitions should be co-located. Otherwise, there will be shuffle
operations to redistribute the rows that are joined as it has a pre-requirement that all rows having the
same value for the join key should be stored in the same partition.</p>
      <p>
        Moreover, the partitions size per node are too large, the recommended size should be between
30MB and 100MB per execution node according to [
        <xref ref-type="bibr" rid="ref1">1</xref>
        ].
      </p>
    </sec>
    <sec id="sec-4">
      <title>4. Optimization</title>
      <p>To optimize time efficiency of SQL query (fig. 1) next approaches were used:
BroadcastHashJoin, spark.shuffle.partitions parameter tuning and bucketing.</p>
      <p>
        According to [
        <xref ref-type="bibr" rid="ref2">2</xref>
        ] the MAPJOIN (or BROADCAST) hint shows good performance boost in case
of similar data features. The BroadcastHashJoin (MAPJOIN) will spread one of the tables to all
executor nodes, so exchange stage execution time will be reduced. That approach (fig. 3) will save
time spent previously on data routing over the network and reduce shuffle read-write volume to
111.5MB, but the problem with non-uniform data distribution for processing between execution nodes
is still actual.
      </p>
      <p>SELECT /*+ BROADCAST(I) */</p>
      <p>U.field01,
U.field02,
U.field03,
I.field03 AS I_DATE,</p>
      <p>U.field04
FROM UE U</p>
      <p>LEFT JOIN (SELECT field02,field01,field03,
(LAG(field03,1,0) OVER win) AS previous_date,
(LEAD(field03,1,0) OVER win) AS next_date
FROM IntraFreq</p>
      <p>WINDOW win AS (PARTITION BY field02, field01 ORDER BY field03))
ON U.field02 = I.field02 AND</p>
      <p>U.field01 = I.field01 AND
ABS(U.field03-I.field03) &lt; ABS(U.field03-I.previous_date) AND
ABS(U.field03-I.field03) &lt; ABS(U.field03-I.next_date)</p>
      <p>
        The spark.shuffle.partitions parameter according to [
        <xref ref-type="bibr" rid="ref1">1</xref>
        ] should give uniform data distribution
across executor nodes and its size should be between 30MB and 100MB. Empirically the parameter
value was set to 35. That provides uniform shuffle read-write around 35MB per execution node.
      </p>
      <p>
        Nevertheless, the exchange stage is still present while reading the table which will be
broadcasted. To reduce this stage bucketing can be used [
        <xref ref-type="bibr" rid="ref3">3</xref>
        ]. Bucketing partitions the data and prevents
data shuffling. The data is allocated to a predefined number of buckets based on the value of one or
more data columns. At the application startup time, data processing module calculates the hash values
for specified columns and based on it places the data in one of the buckets. Bucketing provides
following advantage - each bucket contains equal size of data, map-side joins perform better compared
to a non-bucketed table.
      </p>
      <p>To implement bucketing the way of parquet file reading should be changed (fig. 4).
CREATE TABLE IntraFreq</p>
      <p>USING PARQUET
CLUSTERED BY (field01)</p>
      <p>SORTED BY (field02 ASC, field01 ASC, field03 ASC)</p>
      <p>INTO $partitions_number buckets
AS select field01, field02, field03 from parquet.`$path`</p>
    </sec>
    <sec id="sec-5">
      <title>5. Results and future</title>
      <p>SQL query (fig. 1) optimization techniques and corresponding results presented in table 1 and
figure 5.</p>
      <p>Nevertheless, these optimization approaches are applicable only when the size of the
broadcasted table is lower than the amount of memory allocated for Spark driver and executors. In the
future, we plan to implement other optimizations including custom partitioner and custom shuffle
manager.</p>
    </sec>
  </body>
  <back>
    <ref-list>
      <ref id="ref1">
        <mixed-citation>
          [1]
          <string-name>
            <given-names>A.</given-names>
            <surname>Yudovin</surname>
          </string-name>
          . Essential Optimization Methods to Make Apache Spark Work Faster // ALTOROS. -
          <year>2019</year>
          . - URL: https://www.altoros.com/research-papers/
          <article-title>essential-optimization-methods-to-makeapache-spark-work-faster/.</article-title>
        </mixed-citation>
      </ref>
      <ref id="ref2">
        <mixed-citation>
          [2]
          <string-name>
            <surname>Ji</surname>
            <given-names>X.</given-names>
          </string-name>
          et al. Query Execution Optimization in Spark SQL // Scientific Programming.
          <article-title>-</article-title>
          <year>2020</year>
          . - Vol.
          <year>2020</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref3">
        <mixed-citation>
          [3]
          <string-name>
            <given-names>Pawan</given-names>
            <surname>Singh Negi</surname>
          </string-name>
          .
          <article-title>Bucketing in Spark: Spark job optimization using Bucketing // Clairvoyant Blog</article-title>
          .
          <article-title>-</article-title>
          <year>2021</year>
          . - URL: https://blog.clairvoyantsoft.com/bucketing-in
          <source>-spark-878d2e02140f.</source>
        </mixed-citation>
      </ref>
    </ref-list>
  </back>
</article>