Adaptive Prejoin Approach for Performance Optimization in MapReduce-based Warehouses ∗ Weiping Qu Michael Rappold Stefan Dessloch Heterogeneous Information Department of Computer Heterogeneous Information Systems Group Science Systems Group University of Kaiserslautern University of Kaiserslautern University of Kaiserslautern qu@informatik.uni-kl.de m_rappol@cs.uni-kl.de dessloch@informatik.uni- kl.de ABSTRACT scalable file system, MapReduce/Hadoop1 systems enable MapReduce-based warehousing solutions (e.g. Hive) for big analytics on large amounts of unstructured data or struc- data analytics with the capabilities of storing and analyzing tured data in acceptable response time. high volume of both structured and unstructured data in a With the continuous growth of data, scalable data stores scalable file system have emerged recently. Their efficient based on Hadoop/HDFS2 have achieved more and more at- data loading features enable a so-called near real-time ware- tention for big data analytics. In addition, by means of sim- housing solution in contrast to those offered by conventional ply pulling data into the file system of MapReduce-based data warehouses with complex, long-running ETL processes. systems, unstructured data without schema information is However, there are still many opportunities for perfor- directly analyzed with parallelizable custom programs, where- mance improvements in MapReduce systems. The perfor- as data can only be queried in traditional data warehouses mance of analyzing structured data in them cannot cope after it has been loaded by ETL tools (cleansing, normaliza- with the one in traditional data warehouses. For example, tion, etc.), which normally takes a long period of time. join operations are generally regarded as a bottleneck of per- Consequently, many web or business companies add MapRe- forming generic complex analytics over structured data with duce systems to their analytical architecture. For example, MapReduce jobs. Fatma Özcan et al. [12] integrate their DB2 warehouse with In this paper, we present one approach for improving per- the Hadoop-based analysis tool - IBM Infosphere BigInsights formance in MapReduce-based warehouses by pre-joining with connectors between these two platforms. An analytical frequently used dimension columns with fact table redun- synthesis is provided, where unstructured data is initially dantly during data transfer and adapting queries to this join- placed in a Hadoop-based system and analyzed by MapRe- friendly schema automatically at runtime using a rewrite duce programs. Once its schema can be defined, it is further component. This approach is driven by the statistics infor- loaded into a DB2 warehouse with more efficient analysis ex- mation derived from previous executed workloads in terms ecution capabilities. of join operations. Another example is the data warehousing infrastructure The results show that the execution performance is im- at Facebook which involves a web-based tier, a federated proved by getting rid of join operations in a set of future MySQL tier and a Hadoop-based analytical cluster - Hive. workloads whose join exactly fits the pre-joined fact table Such orchestration of various analytical platforms forms a schema while the performance still remains the same for heterogeneous environment where each platform has a differ- other workloads. ent interface, data model, computational capability, storage system, etc. Pursuing a global optimization in such a heterogeneous 1. INTRODUCTION environment is always challenging, since it is generally hard By packaging complex custom imperative programs (text to estimate the computational capability or operational cost mining, machine learning, etc.) into simple map and reduce concisely on each autonomous platform. The internal query functions and executing them in parallel on files in a large engine and storage system do not tend to be exposed to outside and are not designed for data integration. ∗finished his work during his master study at university of In our case, relational databases and Hadoop will be in- kaiserslautern tegrated together to deliver an analytical cluster. Simply transferring data from relational databases to Hadoop with- out considering the computational capabilities in Hadoop can lead to lower performance. As an example, performing complex analytical workloads over multiple small/large tables (loaded from relational data- 1 one open-source implementation of MapReduce framework th 25 GI-Workshop on Foundations of Databases (Grundlagen von Daten- from Apache community, see http://hadoop.apache.org 2 banken), 28.05.2013 - 31.05.2013, Ilmenau, Germany. Hadoop Distributed File System - is used to store the data Copyright is held by the author/owner(s). in Hadoop for analysis information describing advertisements – their category, there name, daily in the form the advertiser information etc. The data sets originating in the latter through a set of l mostly correspond to actions such as viewing an advertisement, consumption. clicking on it, fanning a Facebook page etc. In traditional data The data from th warehousing terminology, more often than not the data in the Hadoop clusters processes dump compressing them into the Hive-Had bases) in Hadoop leads to a number of join operations which failures and also n slows down the whole processing. The reason is that the much load on the join performance is normally weak in MapReduce systems Scribe-Hadoop Clusters running the scrape Web Servers avoiding extra loa as compared to relational databases [15]. Performance limi- any notions of stro tations have been shown due to several reasons such as the order to avoid loc inherent unary feature of map and reduce functions. database server b cannot be read ev To achieve better global performance in such an analytical data from that par synthesis with multiple platforms from a global perspective servers, there are a of view, several strategies can be applied. Hive replication the scrapes and by data a daily dum One would be simply improving the join implementation Hadoop clusters. T Production Hive-Hadoop on single MapReduce platform. There have been several ex- Adhoc Hive-Hadoop Cluster tables. isting works trying to improve join performance in MapRe- Cluster As shown in Figu duce systems [3, 1]. where the data b Another one would be using heuristics for global perfor- stream processes. Hadoop cluster - i mance optimization. In this paper, we will take a look at the strict delivery dea second one. In order to validate our general idea of improv- Hive-Hadoop clus ing global performance on multiple platforms, we deliver our Federated MySQL well as any ad ho data sets. The ad adaptive approach in terms of join performance. We take the Figure 1: Data Flow Architecture run production job data flow architecture at Facebook as a starting point and Figure 1: Facebook Data Flow Architecture[17] the contributions are summarized as follows: 1014 1. Adaptively pre-joining tables during data transfer for better performance in Hadoop/Hive. 2.2 Hive Hive [16] is an open source data warehousing solution built 2. Rewriting incoming queries according to changing ta- on top of MapReduce/Hadoop. Analytics is essentially done ble schema. by MapReduce jobs and data is still stored and managed in Hadoop/HDFS. The remainder of this paper is structured as follows: Sec- Hive supports a higher-level SQL-like language called Hive- tion 2 describes the background of this paper. Section 3 gives QL for users who are familiar with SQL for accessing files a naı̈ve approach of fully pre-joining related tables. Based in Hadoop/HDFS, which highly increases the productivity on the performance observation of this naı̈ve approach, more of using MapReduce systems. When a HiveQL query comes considerations have been taken into account and an adap- in, it will be automatically translated into corresponding tive pre-join approach is proposed in Section 4, followed by MapReduce jobs with the same analytical semantics. For the implementation and experimental evaluation shown in this purpose, Hive has its own meta-data store which maps Section 5. Section 6 shows some related works. Section 7 the HDFS files to the relational data model. Files are log- concludes with a summary and future work. ically interpreted as relational tables during HiveQL query execution. 2. BACKGROUND Furthermore, in contrast to high data loading cost (using ETL jobs) in traditional data warehouses, Hive benefits from In this section, we will introduce our starting point, i.e. its efficient loading process which pulls raw files directly into the analytical data flow architecture at Facebook and its Hadoop/HDFS and further publishes them as tables. This MapReduce-based analytical platform - Hive. In addition, feature makes Hive much more suitable for dealing with large the performance issue in terms of join is also stated subse- volumes of data (i.e. big data). quently. 2.1 Facebook Data Flow Architecture 2.3 Join in Hadoop/Hive Instead of using a traditional data warehouse, Facebook There has been an ongoing debate comparing parallel data- uses Hive - a MapReduce-based analytical platform - to base systems and MapReduce/Hadoop. In [13], experiments perform analytics on information describing advertisement. showed that performance of selection, aggregation and join The MapReduce/Hadoop system offers high scalability which tasks in Hadoop could not reach parallel databases (Vertica enables Facebook to perform data analytics over 15PB of & DBMS-X). Several reasons of the performance difference data and load 60TB of new data every day [17]. The archi- have been also explained by Stonebraker et al. in [15] such tecture of data flow at Facebook is described as follows. as repetitive record parsing, and high I/O cost due to non- As depicted in Figure 1, data is extracted from two types compression & non-indexing. of data sources: a federated MySQL tier and a web-based Moreover, as MapReduce was not originally designed to tier. The former offers the category, the name and corre- combine information from two or more data sources, join im- sponding information of the advertisements as dimension plementations are always cumbersome [3]. The join perfor- data while the actions such as viewing an advertisement, mance relies heavily on the implementation of MapReduce clicking on it, fanning a Facebook page are extracted as fact jobs which have been considered as not straightforward. data from the latter. As Hive is built on top of MapReduce/Hadoop, the join There are two types of analytical cluster: production Hive operation is essentially done by corresponding MapReduce cluster and ad hoc Hive cluster. Periodic queries are per- jobs. Thus, Hive suffers from these issues even though there formed on the production Hive cluster while the ad hoc have been efforts [5] to improve join performance in MapRe- queries are executed on the ad hoc Hive cluster. duce systems or in Hive. information describing advertisemen ts ! their category, there name, the advertiser information etc. data The through a set of loader processes and then becomes sets originating in the latter as viewing an advertisement, consumption. mostly correspond to actions such clicking on it, fanning a Facebook ge etc. pa In traditional dataThe data from the federated mysql tier gets loaded warehousing terminology, more often than not the data in theHadoop clusters through daily scra pe processes. The sc processes dump the desired data sets from mysql datab compressing them on the source stems sy and finally movin into the Hive-Hadoop cluster. The scrapes need to b 3. FULL PRE-JOIN APPROACH failures and also need to be designed such that the Due to the fact that the join performance is a perfor- much load on the mysql databases. The latter is acc Scribe-HadoopClusters running the scrapes on a replicated tier of mysql database mance bottleneck in Hive with its inherent MapReduce Web Servers fea- a b c d avoiding extra load on the already loaded masters. ture, one naı̈ve thinking for improving total workload perfor- any notions ofAdaptive strongPre-joined consistencySchema the scraped in data is sa mance would be to simply eliminate the join task from the order to avoid locking overheads. The scrapes are retried fact table: λ database servera basis b c ind ther′ of x′ failures and if the case workload by performing a rewritten workload with the same “(λ,cannot be read even after repeated tries, the previ α.r, β.x)“ analytical semantics over pre-joined tables created in the data from that particular server used. is With thousands of fact table: λ′ data load phase. A performance gain would be expected by servers, there are always some servers that may not r s t performing large table scan with high parallelism of increas-Hive replication the scrapes and by a combination using of retries and scr data a daily dump of the dimension data is created ing working nodes in Hadoop instead of join. In addition, Production Hive-Hadoop p Hadoop clusters. dim These ps dum table: αare then converted to top the scalable storage system allows us to create AdhocHive-Hadoop redundant Cluster tables. Cluster x y z pre-joined tables for some workloads with specific join pat- r s t As shown in Figure 1, theretwoaredifferent Hive-Hadoop terns. where the data becomes available for consumption by dim table: α dim table: β stream processes. One of these clusters ! the produ In an experiment, we tried to validate this strategy. An Hadoop cluster - is used to execute obs thatj need to adher analytical workload (TPC-H Query 3) was executed over x y z strict delivery deadlines, where as the other clust two data sets of TPC-H benchmark (with scale factor 5 & Hive-Hadoop cluster is used toteexecu lower priority bat Federated MySQL dim table: βwell as any ad hoc analysis that the users want to 10) of the original table schema (with join at runtime) and a data sets. The ad hoc nature userofqueries makes it dan fully pre-joined table schema (without join) which fully Figure joins 1: Data Flow Architecture run production jobs in the same cluster. A badly wr all the related dimension tables with the fact table during Figure 3: Adaptive Pre-joined Schema in Facebook the load phase, respectively. In this case, we trade storage Example overhead for better total performance. 1014 As shown on the left side of the Figure 2(a), the perfor- mance gain of the total workload (including the join) over the periodic queries on production Hive-Hadoop cluster, a the data set with SF 5 can be seen with 6GB storage over- frequent column set could be extracted. head introduced by fully pre-joining the related tables into One example is illustrated in Figure 3. The frequent set one redundant table (shown in Figure 2(b)). The overall of additional columns has been extracted. The column r in dimension table α is frequently joined with fact table in 350 25 company in the previous workloads as a filter or aggregate data volume for executing workloads (GB) 300 20 column, as the same for the column x in dimension table average runtime (sec) 250 200 15 β. During next load phase, the fact table is expanded by no pre-join no pre-join 150 full pre-join 10 full pre-join redundantly pre-joining these two additional columns r and 100 5 x with it. 50 Depending on the statistics information of previous queries, 0 0 5GB 10GB 5GB 10GB different frequent sets of additional columns could be found data set size data set size in diverse time intervals. Thus, the fact table is pre-joined (a) Average Runtimes (b) Accessed Data Volume in an adaptive manner. Assume that the additional columns identified in previ- Figure 2: Running TPC-H Query-3 on Original and ous queries will also frequently occur in the future ones (as Full Pre-joined Table Schema in the Facebook example), the benefits of adaptive pre-join approach are two-fold: performance can be significantly increased if workloads with First, when all the columns (including dimension columns) the same join pattern later frequently occur, especially for in a certain incoming query which requires a join opera- periodic queries over production Hive-Hadoop cluster in the tion have been contained in the pre-joined fact table, this Facebook example. query could be directly performed on the pre-joined fact ta- However, the result of performing the same query on the ble without join. data set with SF 10 size is disappointing as there is no per- Second, the adaptive pre-join approach leads to a smaller formance gain while paying 12.5GB storage for redundancy table size in contrast to the full pre-join approach, as only (shown in Figure 2(b)), which is not what we expected. The subsets of the dimension tables are pre-joined. Thus, the reason could be that the overhead of scanning such redun- resulting storage overhead is reduced, which plays a signif- dant fully pre-joined tables and the high I/O cost as well off- icant role especially in big data scenarios (i.e. terabytes, set the performance gain as the accessed data volume grows. petabytes of data). To automatically accomplish the adaptive pre-join ap- 4. ADAPTIVE PRE-JOIN APPROACH proach, three sub-steps are developed: frequent column set Taking the lessons learned from the full pre-join approach extraction, pre-join and query rewrite. above, we propose an adaptive pre-join approach in this pa- per. 4.1 Frequent Column Set Extraction Instead of pre-joining full dimension tables with the fact In the first phase, the statistics collected for extracting table, we try to identify the dimension columns which oc- frequent set of additional columns is formated as a list of curred frequently in the select, where, etc. clauses of previ- entries each which has the following form: ous executed queries for filtering, aggregation and so on. We Set : {Fact, Dim X.Col i, Dim X.Col j ... Dim Y.Col k} refer to these columns as additional columns as compared to the join columns in the join predicates. By collecting a list of The join set always starts with the involved fact table additional column sets from previous queries, for example, while the joint dimension columns are identified and cap- tured from the select, where, etc. clauses or from the sub- to answer queries using views in data warehouses. Further- queries. more, several subsequent works [14, 10] have focuses on dy- The frequent set of additional columns could be extracted namic view management based on runtime statistics (e.g. using a set of frequent itemset mining approaches [2, 7, 11] reference frequency, result data size, execution cost) and measured profits for better query performance. In our work, 4.2 Query Rewrite we reviewed these sophisticated techniques in a MapReduce- As the table schema is changed in our case (i.e. newly gen- based environment. erated fact table schema), initial queries need to be rewritten Cheetah [4] is a high performance, custom data warehouse for successful execution. Since the fact table is pre-joined on top of MapReduce. It is very similar to the MapReduce- with a set of dedicated redundant dimension columns, the based warehouse Hive introduced in this paper. The perfor- tables which are involved in the from clause of the original mance issue of join implementation has also been addressed query can be replaced with this new fact table once all the in Cheetah. To reduce the network overhead for joining columns have been covered in it. big dimension table with fact table at query runtime, big By storing the mapping from newly generated fact table dimension tables are denormalized and all the dimension at- schema to the old schema in the catalog, the query rewrite tributes are directly stored into the fact table. In contrast, process can be easily applied. Note that the common issue we choose to only denormalize the frequently used dimen- of handling complex sub-queries for Hive can thereby be sion attributes with the fact table since we believe that less facilitated if the columns in the sub-query have been pre- I/O cost can be achieved in this way. joined with the fact table. 7. CONCLUSION AND FUTURE WORK 5. IMPLEMENTATION AND EVALUATION We propose a schema adaption approach for global opti- We use Sqoop3 as the basis to implement our approach. mization in an analytical synthesis of relational databases The TPC-H benchmark data set with SF 10 is adaptively and a MapReduce-based warehouse - Hive. As MapRe- pre-joined according to the workload statistics and trans- duce systems have weak join performance, frequently used ferred from MySQL to Hive. First, the extracted join pat- columns of dimension tables are pre-joined with the fact tern information is sent to Sqoop as additional transforma- table according to useful workload statistics in an adap- tion logic embedded in the data transfer jobs for generating tive manner before being transfered to Hive. Besides, a the adaptive pre-joined table schema on the original data rewrite component enables the execution of incoming work- sources. Furthermore, the generated schema is stored in loads with join operations over such pre-joined tables trans- Hive to enable automatic query rewrite at runtime. parently. In this way, better performance can be achieved in We tested the adaptive pre-join approach on a six-node Hive. Note that this approach is not restricted to any spe- cluster (Xeon Quadcore CPU at 2.53GHz, 4GB RAM, 1TB cific platform like Hive. Any MapReduce-based warehouse SATA-II disk, Gigabit Ethernet) running Hadoop and Hive. can benefit from it, as generic complex join operations occur After running the same TPC-H Query 3 over the adaptive in almost every analytical platform. pre-joined table schema, the result in the Figure 4(a) shows However, the experimental results also show that the per- that the average runtime is significantly reduced. The join formance improvement is not stable while the data volume grows continuously. For example, when the query is exe- 350 25 cuted on one larger pre-joined table, the performance gain data volume for executing workloads (GB) 300 20 from eliminating joins is offset by the impact caused by the 250 record parsing overhead and high I/O cost during the scan, average runtime (sec) 15 200 no pre-join no pre-join which results in worse performance. This concludes that full pre-join full pre-join 150 adaptive pre-join 10 adaptive pre-join the total performance of complex data analytics is effected 100 5 by multiple metrics rather than a unique consideration, e.g. 50 0 join. 0 10GB data set size 10GB data set size With the continuous growth of data, diverse frameworks and platforms (e.g. Hive, Pig) are built for large-scale data (a) Average Runtimes (b) Accessed Data Volume analytics and business intelligent applications. Data trans- fer between different platforms generally takes place in the Figure 4: Running TPC-H Query-3 on Original, Full absence of key information such as operational cost model, Pre-joined and Adaptive Pre-joined Table Schema resource consumption, computational capability etc. within platforms which are autonomous and inherently not designed task has been eliminated for this query and the additional for data integration. Therefore, we are looking at a generic overheads (record parsing, I/O cost) have been relieved due description of the operational semantics with their compu- to the smaller size of redundancy as shown in Figure 4(b). tational capabilities on different platforms and a cost model for performance optimization from a global perspective of 6. RELATED WORK view. The granularity we are observing is a single operator in the execution engines. Thus, a global operator model with An adaptively pre-joined fact table is essentially a mate- generic cost model is expected for performance improvement rialized view in Hive. Creating materialized views in data in several use cases, e.g. federated systems. warehouses is nothing new but a technique used for query Moreover, as an adaptively pre-joined fact table is re- optimization. Since 1990s, a substantial effort [6, 8] has been garded as a materialized view in a MapReduce-based ware- 3 house, another open problem left is how to handle the view an open source tool for data transfer between Hadoop and relational database, see http://sqoop.apache.org/ maintanence issue. The work from [9] introduced an incre- mental loading approach to achieve near real-time dataware- International Conference on Management of data, housing by using change data capture and change propaga- SIGMOD ’09, pages 165–178, New York, NY, USA, tion techniques. Ideas from this work could be taken further 2009. ACM. to improve the performance of total workload including the [14] P. Scheuermann, J. Shim, and R. Vingralek. pre-join task. Watchman: A data warehouse intelligent cache manager. In Proceedings of the 22th International 8. REFERENCES Conference on Very Large Data Bases, VLDB ’96, [1] F. N. Afrati and J. D. Ullman. Optimizing joins in a pages 51–62, San Francisco, CA, USA, 1996. Morgan map-reduce environment. In Proceedings of the 13th Kaufmann Publishers Inc. International Conference on Extending Database [15] M. Stonebraker, D. Abadi, D. J. DeWitt, S. Madden, Technology, EDBT ’10, pages 99–110, New York, NY, E. Paulson, A. Pavlo, and A. Rasin. Mapreduce and USA, 2010. ACM. parallel dbmss: friends or foes? Commun. ACM, [2] R. Agrawal and R. Srikant. Fast algorithms for mining 53(1):64–71, Jan. 2010. association rules in large databases. In Proceedings of [16] A. Thusoo, J. S. Sarma, N. Jain, Z. Shao, P. Chakka, the 20th International Conference on Very Large Data N. Zhang, S. Antony, H. Liu, and R. Murthy. Hive - a Bases, VLDB ’94, pages 487–499, San Francisco, CA, petabyte scale data warehouse using Hadoop. In ICDE USA, 1994. Morgan Kaufmann Publishers Inc. ’10: Proceedings of the 26th International Conference [3] S. Blanas, J. M. Patel, V. Ercegovac, J. Rao, E. J. on Data Engineering, pages 996–1005. IEEE, Mar. Shekita, and Y. Tian. A comparison of join algorithms 2010. for log processing in mapreduce. In Proceedings of the [17] A. Thusoo, Z. Shao, S. Anthony, D. Borthakur, 2010 ACM SIGMOD International Conference on N. Jain, J. Sen Sarma, R. Murthy, and H. Liu. Data Management of data, SIGMOD ’10, pages 975–986, warehousing and analytics infrastructure at facebook. New York, NY, USA, 2010. ACM. In Proceedings of the 2010 ACM SIGMOD [4] S. Chen. Cheetah: a high performance, custom data International Conference on Management of data, warehouse on top of mapreduce. Proc. VLDB Endow., SIGMOD ’10, pages 1013–1020, New York, NY, USA, 3(1-2):1459–1468, Sept. 2010. 2010. ACM. [5] A. Gruenheid, E. Omiecinski, and L. Mark. Query optimization using column statistics in hive. In Proceedings of the 15th Symposium on International Database Engineering & Applications, IDEAS ’11, pages 97–105, New York, NY, USA, 2011. ACM. [6] A. Y. Halevy. Answering queries using views: A survey. The VLDB Journal, 10(4):270–294, Dec. 2001. [7] J. Han, J. Pei, and Y. Yin. Mining frequent patterns without candidate generation. SIGMOD Rec., 29(2):1–12, May 2000. [8] V. Harinarayan, A. Rajaraman, and J. D. Ullman. Implementing data cubes efficiently. In Proceedings of the 1996 ACM SIGMOD international conference on Management of data, SIGMOD ’96, pages 205–216, New York, NY, USA, 1996. ACM. [9] T. Jörg and S. Deßloch. Towards generating etl processes for incremental loading. In Proceedings of the 2008 international symposium on Database engineering & applications, IDEAS ’08, pages 101–110, New York, NY, USA, 2008. ACM. [10] Y. Kotidis and N. Roussopoulos. Dynamat: a dynamic view management system for data warehouses. SIGMOD Rec., 28(2):371–382, June 1999. [11] H. Mannila, H. Toivonen, and I. Verkamo. Efficient algorithms for discovering association rules. pages 181–192. AAAI Press, 1994. [12] F. Özcan, D. Hoa, K. S. Beyer, A. Balmin, C. J. Liu, and Y. Li. Emerging trends in the enterprise data analytics: connecting hadoop and db2 warehouse. In Proceedings of the 2011 ACM SIGMOD International Conference on Management of data, SIGMOD ’11, pages 1161–1164, New York, NY, USA, 2011. ACM. [13] A. Pavlo, E. Paulson, A. Rasin, D. J. Abadi, D. J. DeWitt, S. Madden, and M. Stonebraker. A comparison of approaches to large-scale data analysis. In Proceedings of the 2009 ACM SIGMOD