In-memory Caching for Multi-query Optimization of Data-intensive Scalable Computing Workloads Pietro Michiardi Damiano Carra Sara Migliorini EURECOM University of Verona University of Verona Biot, France Verona, Italy Verona, Italy pietro.michiardi@eurecom.fr damiano.carra@univr.it sara.migliorini@univrit ABSTRACT for MapReduce [15, 16], for SCOPE operating on top of Cosmos In modern large-scale distributed systems, analytics jobs submit- [17] and for Massive Parallel Processing frameworks [18]. ted by various users often share similar work. Instead of optimiz- In this paper, we study MQO in the context of distributed ing jobs independently, multi-query optimization techniques can computing engines such as Apache Spark [3], with analytics be employed to save a considerable amount of cluster resources. jobs written in SparkSQL [19], in which relational operators are In this work, we introduce a novel method combining in- mapped to stages of computation and I/O. Following the tradi- memory cache primitives and multi-query optimization, to im- tion of RDBMSes, queries are first represented as (optimized) prove the efficiency of data-intensive, scalable computing frame- logical plans, which are transformed into (optimized) physical works. By careful selection and exploitation of common (sub) plans, and finally run as execution plans. Additionally, modern expressions, while satisfying memory constraints, our method parallel processing systems, such as Spark, include an operator to transforms a batch of queries into a new, more efficient one materialize in RAM the content of a (distributed) relation, which which avoids unnecessary recomputations. To find feasible and we use extensively. Our approach to MQO is that of traditional efficient execution plans, our method uses a cost-based optimiza- database systems, as it operates on a batch of queries. However, tion formulation akin to the multiple-choice knapsack problem. unlike traditional approaches, it blends pipelining and global Experiments on a prototype implementation of our system show query planning with shared operators, using in-memory caching significant benefits of worksharing for TPC-DS workloads. to support worksharing. Our problem formulation amounts to a cache admission problem, which we cast as a cost-based, con- strained combinatorial optimization task, setting it apart from 1 INTRODUCTION previous works in the literature. Modern technologies to analyze large amounts of data have flour- We present the design of a MQO component that, given a set ished in the past decade, with general-purpose cluster processing of concurrent queries, proceeds as follows. First, it analyzes query frameworks such as MapReduce [1], Dryad [2] and Spark [3]. plans to find sharing opportunities, using logical plan fingerprint- More recently, a lot of effort has been put in raising the level ing and an efficient lookup procedure. Then it builds multiple of abstraction, and allow users to interact with such systems sharing plans, using shared relational operators and scans, which with a relational API. SQL-like querying capabilities are not only subsume common work across the given query set. Sharing plans interesting to users for their simplicity, but also bring additional materialize their output relation in RAM. A cost-based optimiza- benefits from a wide range of automatic query optimizations, tion selects best sharing plans with dynamic programming, using aiming at efficiency and performance. cardinality estimation and a knapsack formulation of the prob- Large-scale analytics systems are deployed in shared envi- lem, that considers a memory budget given to the MQO problem. ronments, whereby multiple users submit queries concurrently. The final step is a global query plan rewrite, including sharing In this context, concurrent queries often perform similar work, plans which pipeline their output to modified consumer queries. such as scanning and processing the same set of input data. The We validate our system with a prototype built for SparkSQL, research in [4] on 25 production clusters, estimated that over using the standard TPC-DS benchmark for the experiments. Over- 35,000 hours of redundant computation could be eliminated per all, our method achieves up to 80% reduction in query runtime, day by simply reusing intermediate query results (approximately when compared to a setup with no worksharing. Our main con- equivalent to shutting off 1500 machines daily). It is thus truly tributions are as follows: desirable to study query optimization techniques that go beyond • We propose a general approach to MQO for distributed com- optimizing the performance of a single query, but instead con- puting frameworks. Our approach produces sharing plans, that sider multiple queries, for efficient resource utilization. are materialized in RAM, aiming at eliminating redundant work. Multi-query optimization (MQO) amounts to find similarities • We cast the optimization problem of selecting the best sharing among a set of queries and uses a variety of techniques to avoid plans as a Multiple-choice Knapsack problem, and solve it effi- redundant work during query execution. For traditional database ciently through dynamic programming. systems, MQO trades some small optimization overheads for • Our ideas materialize into a system prototype based on Spark- increased query performance, using techniques such as sharing SQL, which we evaluated using standard benchmarks, obtaining sub-expressions [5–7], materialized views selection [8, 9], and tangible improvements in aggregate query execution times. pipelining [10]. Work sharing optimizations operating at query runtime, for staged databases, have also been extensively studied 2 RELATED WORK [11–14]. The idea of reusing intermediate data across queries MQO in RDBMSes. Multi-query optimization has been studied running in distributed systems has received significant attention: extensively [6, 7, 10, 20, 21]. More recently, similar subexpres- sions sharing has been revisited by Zhou et al. in [5], who show © 2019 Copyright held by the author(s). Published in the Workshop Proceedings of the EDBT/ICDT 2019 Joint Conference (March 26, 2019, Lisbon, Portugal) on that reusable common subexpressions can improve query perfor- CEUR-WS.org. mance. Their approach avoids some limitations of earlier work [7, 20]. More recently, work sharing at the execution engine has 3 PROBLEM STATEMENT been studied [11–14]. The MQO problem is considered at query We introduce a simple running example, that is rich enough to runtime, and requires a staged database system. Techniques such illustrate the gist of the MQO problem. Consider the following as pipelining [12] and multiple query plans [22, 23] have proven three concurrent queries: extremely beneficial for OLAP workloads. Our work is rooted QUERY 1: on such previous literature, albeit the peculiarities of the dis- SELECT name, dept_name, salary FROM employees, departments, salaries tributed execution engine (which we also take into account in WHERE dep = dept_id our cost model), and the availability of an efficient mechanism AND id = emp_id for distributed caching steer our problem statement apart from AND gender = 'F' AND location = 'us' the typical optimization objectives and constraints from the liter- AND salary > 20000 ature. ORDER BY salary DESC Materialized views can be used in conjunction with MQO to QUERY 2: reduce query response times [8, 9, 24]. A broad range of works SELECT name, dept_name, title, addressed the problem of materialized view selection and main- to as title_expired_on FROM departments, employees, titles tenance, including both deterministic [7, 25, 26] and randomized WHERE dep = dept_id [27–29] strategies. In this paper, we focus on analytics queries AND id = emp_id in which data can be assumed to be static. Workloads consist AND gender = 'F' AND location = 'us' mostly of ad-hoc, long running, scan-heavy queries over data AND from >= 2010 periodically loaded in a distributed file system. Problems related to view maintenance do not manifest in our setup. Moreover, our QUERY 3: SELECT id, name, salary, from_date approach considers storing intermediate relations in RAM. FROM employees, salaries MQO in Cloud and Massively Parallel Processing (MPP). WHERE id = emp_id AND age > 30 Building upon MQO techniques in RDBMSes, Silva et al. [17] AND SALARY > 30000 proposed an extension to the SCOPE query optimizer which Figure 1 illustrates the optimized operator trees (logical plans) optimizes cloud scripts containing common expressions while of the queries in the above example. The leaf nodes represent the reconciling physical requirements. In MPP databases, the work in base relations. Each intermediate node is a relational operator (Se- [18] presents a comprehensive framework for the optimization lection, Projection, Join, etc.). The arrows between nodes indicate of Common Table Expressions (CTEs) implemented for Orca. data flow. Our MQO strategy uses such optimized logical plans Compared to our method, we consider not only CTEs but also to produce new plans – whose aim is to exploit sharing oppor- similar subexpressions to augment sharing opportunities. tunities by caching distributed relations – which are translated MQO in MapReduce. The idea of making MapReduce jobs share into physical plans for execution. some intermediate results was studied in [15–17, 30–34]. The First, we see that the three queries can share the scan of the common denominator of such works is that they operate at a employees, departments and salaries relations. Hence, a sim- lower level of abstraction than we do. ple approach to work sharing would be to inject a cache operator More recently, [35] formulates the MQO problem as a cost- in Query 1, which would steer the system to serve input relations based, binary optimization task, which is addressed with an exter- from RAM instead of reading them from disk, when executing nal optimizer. Nevertheless, this work does not exploit caching as Query 2 and 3. A more refined approach could be to find common a mechanism to re-use work. The work in [36] presents a method work (not only common I/O), in the form of similar subexpressions to estimate the benefit associated to materializing intermediate (SE) among the queries from the example, such as filtering and results of past queries, and this method is orthogonal to ours. projecting records, and materialize intermediate results in RAM, Additionally, views are materialized on disk, instead of memory. so that to re-use such intermediate relations. The work in [37] addresses the problem of work sharing by cast- Figure 1 illustrates four examples of similar SEs, which are ing it as an exact query subgraph matches. As such, it tackles labelled as ψi , i = 1, 2, 3, 4 (we explain the meaning of this label the MQO problem from a different angle, and it does not exploit in the next section). For example, consider the subexpression la- caching. The work in [38] cast the MQO task as an integer lin- belled as ψ 2 : all three queries share the same sub-tree structure, in ear programming problem. However, the induced cost model is the form Project p (Filter f (employees)), but use different filtering simplistic, and does not exploit in-memory caching. predicates and projections. In principle, it is thus possible to save Caching to recycle work. Previous works [39–43] that address reading, parsing, filtering and projecting costs on the employees the problem of reusing intermediate query results, cast it as a relation: by caching the intermediate output of a general form general caching problem. Our work substantially differs from of subexpression, which subsumes the three similar sub-trees those approaches in that they mainly focus on cache eviction, in each query. Such costs would be payed once, and the cached where past queries are used to decide what to keep in memory, in intermediate relation could serve three consumer queries. To this an on-line fashion. Instead, in this work we focus on the off-line aim, we need to build a covering expression (CE) that combines constrained optimization problem of cache admission: the goal the variants of the predicates appearing in the operators, e.g., is to decide the best content to store in the cache, rather than considering ψ 2 the corresponding CE could be: selecting which to evict if space is needed. The only work that considers the reuse of intermediate results when analyzing the Project id, name, dep, age (Filter gender=F ∨ age>30 (employees)) overall execution plan of multiple queries is [40]. Nevertheless, Similarly, the SEs labelled as ψ 3 and ψ 4 share the projection they focus on small problem instances which do not require the and filtering on department and salaries relations. general, cost-based approach we present in this work. We anticipate that, in the context of our work, it is possible to rank some SEs according to the benefits they bring, in terms of reducing redundant work. Sort ψ1 [salary, DESC] ψ2 Project Project ψ3 [name, dept_name, salary] [name, dept_name, title, to] ψ4 Join Join [id = emp_id] [id = emp_id] ψ1 ψ1 Project Project Project [id, name, salary, [id, name, dept_name] [id, name, dept_name] from_date] Join Join Join [dep = dept_id] [dep = dept_id] [id= emp_id] ψ2 ψ3 ψ4 ψ2 ψ3 ψ2 ψ4 Project Project Project Project Project Project Project Project [id, name, dep] [dept_id, dept_name] [emp_id, salary] [id, name, dep] [dept_id, dept_name] [emp_id, title, to] [id, name, age] [emp_id, salary, from_date Filter Filter Filter Filter Filter Filter Filter Filter [gender = F] [location= us] [salary > 20000] [gender = F] [location= us] [from>= 2010] [age > 30] [salary > 30000] employees departments salaries employees departments titles employees salaries Query 1 Query 2 Query 3 Figure 1: Logical plans for the queries in our running example. Similar subexpressions (SE) are emphasized by dashed boxes surrounding the corresponding sub-tree of each query. Boxes with the same border color denote the same SE. For instance, the SE Project p (Filter f (employees)) leads to ad- We model this step as a Multiple-Choice Knapsack problem, and ditional savings when compared to the SE Filter f (employees), use dynamic programming to solve it. and caching the intermediate relation of the corresponding CE §4.4: Query rewriting. The last step to achieve MQO is to results in a smaller memory footprint because of its selectivity. rewrite the input query set such as to use selected sharing plans. More to the point, we now consider the SE labelled as ψ 1 in Fig- ure 1: Query 1 and 2 share a common sub-tree in their respective logical plans, that involves selections, projections and joins. In 4.1 Similar Subexpression Identification this case, selecting this SE as a candidate to build a CE between Finding similar subexpressions, given an input set of logical plans, Query 1 and 2 contributes to decreased scanning, computation has received considerable attention in the literature. What sets and communication costs. However, since caching a relation in our approach apart from previous works lies behind the very RAM bears its own costs and must satisfy capacity constraints, nature of the resource we use to achieve work sharing: memory materializing in RAM the output of the CE might reveal not is limited, and the overall MQO process we present is seen as a beneficial after all: a join operator could potentially produce an constrained optimization problem, which strives to use caching intermediate relation too big to fit in RAM. with parsimony. Thus, we use a general rule of thumb that prefers Overall, given an input query set, our problem amounts to a large number of CEs (built from the corresponding SEs) with explore a potentially very large search space, to identify SEs, to small memory footprints instead of a small number of CEs with build the corresponding CEs – which we also call sharing plans, large memory requirements. This is in line with low-level systems and to decide which CEs to include in the optimized output considerations: data materialization in RAM is not cost-free, and plan. Our MQO strategy aims at reducing the search space to current parallel processing frameworks are sometimes fragile, build CEs by appropriately pruning SEs according to their rank. when it comes to memory management under pressure. Furthermore, a cost-based selection of candidate CEs must ensure Armed with the above considerations, we first consider the memory budget constraints to be met. input to our task: we search SEs given a set of “locally optimized” query plans, which are represented in a tree form. Such input 4 CACHE-BASED WORK SHARING plans have been optimized by applying common rules such as We consider a set of concurrent queries submitted by multiple early filtering, predicate push-down, plan simplification and col- users to be parsed, analyzed and individually optimized by a lapsing [19]. The natural hierarchy of optimized logical plans query optimizer. Our MQO method operates on a set of opti- implies that the higher in the tree an operator is, the less the data mized logical plans corresponding to the set of input queries, flowing from its edges. Hence, similar subexpressions that are that we call the input set. We approach the MQO problem with found higher in the plan are preferred because they potentially the following steps: exhibit smaller memory footprints. §4.1: Similar subexpressions identification. The goal is to Additional considerations are in order. Some operators pro- identify all common subexpressions in the input set. Two (or duce output relations that are not easy to materialize in RAM: for more) operators sharing similar subexpressions constitute a SE, example, binary operators such as join, generally produce large which are candidates for building covering expressions (CEs). outputs that would deplete memory resources if cached. When §4.2: Building sharing plans. The goal is to construct one or searching for SEs, we recognize “cache unfriendly” operators and more groups of covering expressions CEs for a set of SEs. preempt them for being considered as valid candidates, either by §4.3: Sharing plan selection. The goal is to select the best com- selecting SEs that appear lower in the logical plan hierarchy (e.g., bination of CEs, using estimated costs and memory constraints. which could imply caching the input relations of a join), or by selecting SEs that subsume them (e.g., which could imply caching Algorithm 1 Similar subexpressions identification a relation resulting from filtering a join output). Currently, we Input: Array of logical plans (trees), threshold k treat the join, Cartesian product and Union as “cache unfriendly” Output: Set S of SEs ωi operators. This means that our method does not produce SEs 1: procedure IdentifySEs([τ Q 1 , τ Q 2 , ...τ Q N ]) rooted at cache unfriendly operators; moreover, cache unfriendly 2: FT ← ∅ operators can be shared inside a common SE only when they are 3: foreach τ ∈ [τ Q 1 , τ Q 2 , ...τ Q N ] do syntactically equal.1 Next, we provide the necessary definitions 4: nodeToVisit ← Add(τ ) that are then used to describe the identification of SEs. 5: while nodeToVisit not empty do 6: τ curr ← Pop(nodeToVisit) Definition 4.1 (Sub-tree). Given a logical plan of a query Q, 7: ψ ← F(τ curr ) represented as a tree τ Q where leaf nodes are base relations and 8: if CacheFriendly(τroot curr ) then each intermediate node is a relational algebra operator, a sub- 9: FT.AddValueSet(ψ , τ curr ) Q tree τs of τ Q is a continuous portion of the logical plan of Q 10: end if curr ) ∨ if (! CacheFriendly(τroot containing an intermediate node of τ Q and all its descendant in 11: 12: ContainsUnfriendly(τ curr )) then τ Q . In other words, a sub-tree includes all the base relations and 13: curr nodeToVisit ← Add(τchildren ) operators that are necessary to build its root. 14: end if 15: end while If the context is clear, we denote a sub-trees simply as τ , with- 16: end for out indicating from which query it has been derived. Given any 17: S←∅ two sub-trees, we need to determine if they have the same struc- 18: foreach ψ ∈ FT.Keys do ture in terms of base relations and operators. To this aim, we 19: if |FT.GetValue(ψ )| ≥ k then define a similarity function based on a modified Merkle Tree 20: S ← S ∪ FT.GetValue(ψ ) (also known as hash tree)[44], whereby each internal node identi- 21: end if fier is the combination of identifiers of its children. In particular, 22: end for given an operator u, its identifier, denoted by ID(u), is given by: 23: return S ( 24: end procedure (u.label) u ∈ {filter, project,input rel.} ID(u) = (u.label, u.attributes) otherwise. Notice that this definition makes a distinction between loose and Definition 4.3 (Similar subexpression). A similar subexpression strict identifier. A loose identifier, such that used for projections (SE) ω is a set of sub-trees that have the same fingerprint ψ , i.e. and selections, allows the construction of a shared operator that ω = {τi | F (τi ) = ψ }. subsumes the individual attributes with more general ones, which Algorithm 1 provides a pseudo-code of our procedure to find, allows sharing computation among SEs. Instead, a strict identifier, given a set of input queries, the SEs according to Definition 4.3 such that used for all other operators (including joins and unions), that will be the input of the next phase, the search for covering imposes strict equality for two sub-graphs to be considered SEs. expressions. The underlying idea is to avoid a brute-force search In principle, this restricts the applicability of a shared operator. of fingerprints, which would produce a large number of SEs. However, given the above considerations about cache unfriendly Instead, by proceeding in a top-down manner when exploring operators, our approach still shares both I/O and computation. logical plans, we produce fewer SEs candidates, by interrupting Definition 4.2 (Fingerprint). Given a sub-tree τ , its fingerprint the lookup procedure as early and as “high” as possible. is computed as The procedure uses a fingerprint table FT (line 2) to track SEs: this is a HashMap, where the key is a fingerprint ψ , and the   h(ID(τroot )) τroot = leaf value is a set of subtrees. Each logical plan from the input set of F (τ ) = h(ID(τroot )|F (τchild ))   τroot = unary queries is examined in a depth-first manner. We first consider the h(ID(τroot )|F (τl.child )|F (τr.child )) τroot = binary   whole query tree (line 4) and check if its root is a cache-friendly  operator: in this case, we add the tree to the SEs identified by its where h() is a robust cryptographic hash function, and the oper- fingerprint. The method AddValueSet(ψ , τ ) retrieves the value ation | indicates concatenation. (which is a set) from the HashMap FT given the key ψ (line 9), The fingerprint F (τ ) is computed recursively starting from and adds the subtree τ to such a set – if the key does not exists, the root of the sub-tree (τroot ), down to the leaves (that is, in- it adds it and create a value with a set containing the subtree τ . put relations). If the root is a unary operator, we compute the If the root is not a cache-friendly operator, or the logical plan fingerprint of its child sub-tree (τchild ), conversely in case of a contains a cache-unfriendly operator, then we need to explore binary operator, we consider the left and right sub-trees (τl.child the subtrees (line 13), i.e. we consider the root’s child (if the the and τr.child ). For the sake of clarity, we omit an additional sort- operator at the root is unary) or children (otherwise). ing which ensures the isomorphic property for binary operators: At the end, we extract the set of SEs from the HashMap FT: for example, TableA join TableB and TableB join TableA are two we consider the SEs bigger than a threshold k in order to focus isomorphic expressions, and have the same fingerprint. on SEs that offer potential work sharing opportunities. We are now ready to define what a similar subexpression is. Going back to our running example, Algorithm 1 outputs a set of SEs as follows {ω 1, ω2, ω 3, ω 4 } – in Figure 1 the sub-trees 1 Our method can be easily extended for sharing similar join operators, for example corresponding to them are labelled ψ 1,ψ 2,ψ 3 and ψ 4 , where ψi is by applying the “equivalence classes” approach used in [5]. Despite technical sim- the fingerprint of SE ωi . For instance, ω 1 contains two sub-trees plicity, our current optimization problem formulation would end-up discarding such potential SEs, due to their large memory footprints. Hence, we currently preempt (one from Query 1, and one from Query 2), while ω 2 contains such SEs from being considered. three sub-trees, one from each query. … … … cached? Each CE covers different portions of the query logical ψ2 ψ2 Project Project Project plans, therefore a CE may include another CE. Looking at the [id, name, dep] [id, name, age] [id, name, dep, age] running example shown in Figure 1, we have that Ω1 (derived from ω 1 , in the figure labeled with ψ 1 ) contains Ω3 (derived from Filter Filter Filter [gender = F] [age > 30] [gender = F OR age > 30] ω 3 and labeled in the figure with ψ 3 ). If we decide to store Ω1 in the cache, it becomes questionable to store Ω3 as well. employees employees employees The next step of our process is then to identify the potential Query 1 and 2 Query 3 combinations of mutually exclusive CEs that will be the input of the optimization problem: each combination will have a value Figure 2: Building covering expression example. The first and weight, where the value provides a measure of the work and second trees are two similar subexpressions. The third sharing opportunities, and the weight indicates the amount of tree is the covering subexpression. space required to cache the CE in RAM. We start considering how to compute such values and weights, and we proceed with 4.2 Building Sharing Plans the algorithm to identify the potential combination of CEs. CE value and weight: a cost-based model. We use cardinality Given a list of candidate SEs, this phase aims at building cover- estimation and cost modeling to reason about the benefit of using ing subexpressions (CEs) corresponding to identified SEs, and CEs. The objective is to estimate if a given CE, that could serve generate a set of candidate groups of CEs for their final selection. multiple consumer queries, yields lower costs than executing Covering subexpressions. For each similar sub-query in the individually the original queries it subsumes. same SE ωi , the goal is to produce a new plan to “cover” all The cardinality estimator component analyzes relational op- operations of each individual sub-query. Recall that all sub-trees erators to estimate their output size. To do so, it first produces τ j within a SE ωi share the same sub-query plan fingerprint: that statistics about input relations. At relation level, it obtains the is, they operate on the same input relation(s) and apply the same number of records and average record size. At column level, it col- relational operators to generate intermediate output relations. If lects the min and max values, approximates column cardinality the operator attributes are exactly the same across all τ j , then and produces an equi-width histogram for each column. the CE will be identical to any of the τ j . In general, however, The cost estimator component uses the results from cardinality operators can have different attributes or predicates. In this case, estimation to approximate a (sub) query execution cost. We model the CE construction is slightly more involved. the total execution cost of a (sub) query as a combination of CPU, First, we note that, by construction, the only shared opera- disk and network I/O costs. Hence, given a sub-tree τ j , we denote tors we consider are projections and selections. Indeed, for cache by CE (τ j ) the execution cost of sub-tree τ j . This component re- unfriendly operators, the SE identification phase omits their fin- cursively analyzes, starting from the root of sub-tree τ j , relational gerprint from the lookup procedure (see Algorithm 1, lines 8-9). operators to determine their cost (and their selectivity), which is Nevertheless, they could be included within a subtree, but they the multiplication between predefined constants (representative are in any case “surrounded” by cache-friendly operators (see for of the compute cluster running the parallel processing frame- instance in Figure 1, the SE labeled as ψ 1 ). As a consequence, a work) and the estimated number of input and output records. CE can be constructed in a top-down manner, by “OR-ing” the Given a SE ω = {τ1, τ2, . . . , τm }, the total execution cost C(ωi ) filtering predicates and by “unioning” the projection columns of related to the execution of all similar sub-trees τ j ∈ ωi without the corresponding operators in the SE. The CE thus produces and the work-sharing optimization is given by materializes all output records that are needed for its consumer m Õ queries. Figure 2 illustrates an example of CE for a simple SE C(ωi ) = CE (τ j ). (1) of two sub-queries taken from the running example shown in j=1 Figure 1. In particular, we consider the SE labeled as ψ 2 . The resulting CE contains the same operators as the subtrees Instead, the cost of using the corresponding CE Ωi must account τ j ∈ ωi , but with modified predicates or attribute lists. for both the execution cost of the common sub-tree τi∗ , and mate- In general, we can build a CE, which we denote with Ωi , from a rialization (CW ) and retrieving (CR ) costs associated to the cache f () operator we use in our approach, which accounts for write and SE ωi , by applying a transformation function f (), [τ1, ...τm ] −−→ read operations: τi∗ , which trasforms a collection of similar sub-trees to a single, covering sub-tree τi∗ . Note that the resulting covering sub-tree C(Ωi ) = CE (τi∗ ) + CW (|τi∗ |) + m · CR (|τi∗ |), (2) has the fingerprint of the sub-trees in ωi . where both CW (|τi∗ |) and CR (|τi∗ |) are functions of the cardinality Definition 4.4 (Covering subexpression). A Covering subexpres- |τi∗ | of the intermediate output relation obtained by executing τi∗ . sion (CE) Ωi = f (ωi ) is a sub-tree τi∗ derived from the SE ωi by Eq. 2 indicates that retrieving costs are “payed” by each of the m applying the transformation f (), with F (τi∗ ) = F (τ j ) ∀τ j ∈ ωi , consumer queries from the SE ωi that can use the CE Ωi . such that all τ j ∈ ωi can be derived from τi∗ . Then, we can derive the value of a CE Ωi , denoted by v(Ωi ), as the difference between the cost of an unoptimized set of sub-trees In summary, the query plan τi∗ that composes Ωi contains (execution of ωi ) and the cost of the CE Ωi : the same nodes as any subtree τ j ∈ ωi , changing the predicates v(Ωi ) = C(ωi ) − C(Ωi ). (3) of the selections (OR of all the predicates in τ j ) and projections (union of all the predicates in τ j ). From Equations 1 and 2, we note that v(Ωi ) is an increasing Once the set of CEs, Ω = {Ω1, Ω2, . . . }, has been derived from function in m. Indeed, the more similar sub-queries a CE can the corresponding set of SEs, ω = {ω 1, ω 2, . . . }, we need to face serve, the higher its value. the problem of CE selection. The main question we need to answer Along with the value, we need to associate to a CE also a is: among the CEs contained in the set Ω, which ones should be weight, since the memory is limited and we need to take into Algorithm 2 Algorithm to generate CE candidates. contained in Ωi (line 5). With a CE and its descendant, we build Input: Set Ω of CEs a list of options that contains (i) the CE itself and its individual Output: Set of Knapsack items (potential CEs) descendants, and (ii) all the compounds of disjoint descendant 1: procedure GenerateKPitems(Ω = {Ω1 , Ω2 , . . . }) CEs (line 6 and 7). We then remove the descendant from Ω and 2: Ωexp ← ∅ continue the search for other groups. 3: while Ω not empty do Considering our running example, we start from Ω = {Ω1, Ω2, 4: Ωi ← PopLargest(Ω) Ω3, Ω4 }. The “largest” CE is Ω1 , and its descendants are Ω2 and 5: DescSet ← FindDescendant(Ωi , Ω) Ω3 , therefore the list of mutually exclusive options for this group 6: Groupi ← [Ωi ] ∪ Expand(DescSet) would be [Ω1, Ω2, Ω3, (Ω2, Ω3 )]. The output of Alg. 2 then is: 7: Ωexp ← Ωexp ∪ {Groupi } 8: Remove(DescSet, Ω) {[Ω1, Ω2, Ω3, (Ω2, Ω3 )] , [Ω4 ]} , (4) 9: end while where the notation (·, ·) indicates a compound CE, and [·, ·] indi- 10: return Ω exp cates a group of related CEs. 11: end procedure Note that a CE may be part of more than one larger CE: to keep the algorithm simple, we consider only the largest ancestor for each CE. To each option, we associate the value and the weight account if a CE can fit in the cache. The weight, denoted by (in case of a compound, the sum of each component), that will w(Ωi ) is the size required to cache in RAM the output of Ωi , i.e. be used by the optimization solver. ∆ w(Ωi ) = |τi∗ | = |Ωi |. Having defined the CE value and weight, we describe next the 4.3 Sharing Plan Selection algorithm to identify the potential combination of CE. Next, we delve into our MQO problem formulation. In this work, Generating the candidate set of CEs. Next, we focus on the we model the process that selects which sharing plan to use as a problem of generating a combinatorial set of CEs, with their asso- Multiple-choice Knapsack problem (MCKP) [45]. Essentially, the ciated value and weight, to be given as an input to the multi-query knapsack contains items (that is, sharing plans or CEs) that have optimization solver we have designed. Given the complexity of a weight and a value. The knapsack capacity is constrained by a the optimization task, our goal is to produce a small set of valu- constant c: this is representative of the memory constraints given able alternative options, which we call the candidate set of CEs. to the work sharing optimizer. Hence, the sum of the weights of We present an algorithm to produce such a set, but first illustrate all items placed in the knapsack cannot exceed its capacity c. the challenges it addresses using the example shown in Fig. 1. Our problem is thus to select which set of CEs (single, or com- Let’s focus on CE Ω1 (corresponding to the sub-trees labeled as pound) to include in the knapsack. The output of the previous ψ 1 ). A naive enumeration of all possible choices of candidate CE phase (and in particular, the output of Algorithm 2) is a set con- to be cached leads to the following, mutually exclusive options: (i) taining m groups of mutually exclusive options, or items. Each Ω1 , (ii) Ω2 , (iii) Ω3 , (iv) both (Ω2 ,Ω3 ), (v) both (Ω1 ,Ω2 ), and (vi) group G i , i = 1, 2, . . . , д, contains |G i | items, which can be single both (Ω1 ,Ω3 ). Intuitively, however, it is easy to discern valuable CE or compounds of CEs. For instance, looking at our running from wasteful options. For example, the compound CE (Ω1, Ω2 ) example, the output shown in Eq. (4) contains д = 2 groups: the could be a good choice, since Ω2 can be cached to serve query first group has 4 items, the second group just one item. Given a 1 and 2 – and of course used to build Ω1 – and for query 3. group i, each item j has a value vi,j and a weight w i,j computed Conversely, caching the compound (Ω1, Ω3 ) brings less value, as described in Sect. 4.2. since it only benefits query 1 and query 2, but costs more than The MCKP solver needs to choose at most one item from simply caching Ω1 , which also serves both query 1 and 2. each group such that the total value is maximized, while the It is thus important to define how to compute the value and corresponding total weight must not exceed the capacity c. More weight of compound CE. In this work we only consider compound formally, the problem can be cast as following: CEs for which value and weight are additive in the values and д Õ |G i | Õ weights of their components. This is achieved with compounds Maximize vi,j x i,j of disjoint CEs, i.e., those that have no common sub-trees. i=1 j=1 For example, consider the two CEs Ω1 and Ω2 , and the sub- д Õ |G i | trees used to build them. The CE Ω2 is included in Ω1 , but only Õ subject to w i,j x i,j ≤ c some of the originating sub-trees of Ω2 are included in the origi- (5) i=1 j=1 nating sub-trees of Ω1 (in particular, the ones in query 1 and 2, but |G i | not in query 3). Given our definition of the value and the weight Õ x i,j ≤ 1, ∀i = 1 . . . д of CEs, the value and the weight of the compound (Ω1, Ω2 ) may j=1 not be equal to the sums of the values and of the weights of each individual CE, since part of the CE need to be reused to x i,j ∈ {0, 1}, ∀i = 1 . . . д, j = 1 . . . |G i | compute different sub-trees. Thus, we discard this option from where the variable x i,j indicates if item j from group i has been the candidate set. selected or not. The MCKP is a well-known NP-Hard problem: Algorithm 2 generates the candidate input for the optimization in this work, we implement a dynamic programming technique solver as a set of non-overlapping groups of CEs; then, the opti- to solve it [46]. mization algorithm selects a single candidate for each group in Note that alternative formulations exist, for which a provably order to determine the best set of CEs to store in memory. Given optimal greedy algorithm can be constructed: for example, we the full set of Ω of CEs as input, we consider CE Ωi starting from could consider a fractional formulation of the knapsack problem. the root of the logical plan and remove it from the set (line 4). We This approach, however, would be feasible only if the underlying then look for its descendants from the input set Ω, i.e. all the CEs query execution engine could support partial caching of a relation. As it turns out, the system we target in our work does support 1.0 hierarchical storage levels for cached relations: what does not fit 0.8 in RAM, is automatically stored on disk. Although this represents 0.6 F(x) an interesting direction for future work (as it implies a linear time greedy heuristic can be used), in this paper we limit our 0.4 attention to the 0/1 problem formulation. 0.2 0.0 0.0 0.2 0.4 0.6 0.8 1.0 1.2 1.4 1.6 1.8 4.4 Query Rewriting Execution time ratio The last step is to transform the original input queries to benefit from the selected combination of cache plans. Figure 3: CDF of the performance gains of worksharing Recall that the output of a cache plan is materialized in RAM for a TCP-DS workload consisting of 50 selected queries. after its execution. Then, for each input query that is a consumer for a given cache plan, we build an extraction plan which manipu- lates the cached data to produce the output relation, as it would 1.2 Execution time ratio be obtained by the original input query. In other words, in the 1.0 general case, we apply the original input query to the cached 0.8 relation instead of using the original input relation. In the case of a CE subsuming identical SEs, the extraction plan is an identity: 0.6 the original query simply replaces the sub-tree containing the 0.4 CE by its cached intermediate relation. Instead, if shared opera- 5 10 15 20 tors are used – because of SEs having the same fingerprint but 40 different attributes – we build an extraction plan that applies the 35 30 original filter and projection predicates or attributes to “extract” 25 #SE relevant tuples from the cached relation produced from the CE. 20 15 Considering our running example, assume that the output of 10 the MCKP solver is to store Ω2 and Ω3 in cache. Ω3 derives from 5 ω 3 , where the composing sub-trees (one from query 1, and one 0 5 10 15 20 from query 2) are the same, therefore the extraction plan will be Window Size (#Queries) Ω3 itself. Instead, ω2 (from which Ω2 derives) contains sub-trees with different filtering and projection predicates: when Ω2 is Figure 4: Execution time ratio and number of similar materialized in the cache, we need to apply the correct filtering subexpression within a group of queries (given by the size (e.g., “gender = F”) and projection predicates to extract the actual of the window) as the window size increases. result when considering the different queries. 5 EXPERIMENTAL EVALUATION the output is approximately 26 GB (out of 120 GB available). The We now present experimental results to evaluate the effectiveness optimization process took less than 2 seconds, while the query of our methodology, which we implement for the Apache Spark runtime are in the order of tens of minutes (individually) and and SparkSQL systems – the details of the implementation can hours (all together). be found in our companion TR [48] Next, we consider an experimental setup in which we emulate Experimental setup. We run our experiments on a cluster con- the presence of a queuing component that triggers the execution sisting of 8 server-grade worker nodes, with 8 cores each and a of our worksharing optimization. In particular, since TPC-DS 1 Gbps commodity interconnect. Each worker is granted 30 GB queries have no associated submission timestamp, we take a ran- of RAM each, of which half is dedicated to caching. We use the domized approach (without replacement) to select which queries queries in the TPC-DS benchmark library for Spark SQL devel- are submitted to the queuing component, and parametrize the oped by Databricks [47], and generate a CSV dataset with scaling latter with the number of queries – we call this parameter the factor of 50. We use Apache Spark 2.0: for all test, we clear the window size – to accumulate before triggering our MQO mech- operating system’s buffer cache in all workers and master, and anism. For a given window size, we repeat the experiment, i.e., disable the “data compression" feature of Spark. we randomly select queries from the full TPC-DS workload, 20 Results. We select a subset of all queries available in the TPC-DS times, and we build the corresponding empirical CDF of the run- benchmark, and focus on the 50 queries that can be successfully time ratio, as defined above. We also measure the number of executed without failures or parsing errors. We present results SEs identified within the window size, and show the correspond- for a setup in which we consider all the 50 queries and execute ing empirical CDF. Given this setup, we consider all possible them in the order of their identifiers. Figure 3 shows the empirical combinations of queries to assess the benefits of worksharing. Cumulative Distribution Function (CDF) of the runtime ratios Figure 4 shows the boxplots of the runtime ratio (top) and between a system absorbing the workload with MQO enabled and number of similar subexpression identified (bottom) for different disabled. Overall, we note that, for 60% of the queries, we obtain window sizes. The boxplots indicate the main percentiles (5%, 25%, a 80% decrease of the runtime. In total, our approach reduces 50%, 75%, 95%) of the empirical CDF, along with the average (red the runtime for 82% of the queries. On the other hand, 18% of lines). The Figure shows a clear pattern: as the size of the window the queries experience a larger runtime, which is explained by increases, there are more chances of finding a high number of SE, the overheads associated to caching. Overall, our optimizer has thus better sharing opportunities, which translates into reduced identified 60 SEs, and it has built 45 CEs. The cache used to store aggregate runtime. We observe a 20% decrease of the aggregate runtime (median) with a window size of only five queries, which [15] T. Nykiel, M. Potamias, C. Mishra, G. Kollios, and N. Koudas, “Mrshare2: ramps up to 45% when the window size is set to 20 queries. Sharing across multiple queries in mapreduce,” VLDB Endowment, vol. 3, no. 1-2, pp. 494–505, Sep. 2010. Note that a queuing mechanism can introduce an additional [16] G. Wang and C.-Y. Chan, “Multi-query optimization in mapreduce framework,” delay for the execution of a query, because the system needs VLDB Endowment, vol. 7, no. 3, pp. 145–156, Nov. 2013. [17] Y. N. Silva, P.-A. Larson, and J. Zhou, “Exploiting common subexpressions for to accumulate a sufficient number of queries in the window be- cloud query processing,” in Proc. of IEEE ICDE, 2012, pp. 1337–1348. fore triggering their optimization and execution. Investigating [18] A. El-Helw, V. Raghavan, M. A. Soliman, G. Caragea, Z. Gu, and M. Petropoulos, the trade-off between efficiency and delay, as well as studying “Optimization of common table expressions in mpp database systems,” VLDB End., vol. 8, no. 12, pp. 1704–1715, 2015. scheduling policies to steer system behavior is part of our future [19] M. Armbrust, et al., “Spark sql: Relational data processing in spark,” in Proc. of research agenda. Due to space contraints, we refer to [48] for ACM SIGMOD, 2015, pp. 1383–1394. additional evaluations and discussion. [20] S. Finkelstein, “Common expression analysis in database applications,” in Proc. of ACM SIGMOD, 1982, pp. 235–245. [21] J. Shim, P. Scheuermann, and R. Vingralek, “Dynamic caching of query results 6 CONCLUSION for decision support systems,” in Proc. of IEEE SSDBM, 1999, pp. 254–. [22] G. Candea, N. Polyzotis, and R. Vingralek, “Predictable performance and high We presented a new approach to MQO that uses in-memory query concurrency for data analytics,” The VLDB Journal, vol. 20, no. 2, pp. caching to improve the efficiency of computing frameworks such 227–248, Apr. 2011. [23] ——, “A scalable, predictable join operator for highly concurrent data ware- as Apache Spark. Our method takes a batch of input queries and houses,” VLDB Endowment, vol. 2, no. 1, pp. 277–288, Aug. 2009. finds common (sub)expressions, leading to the construction of [24] J. Yang, K. Karlapalem, and Q. Li, “Algorithms for materialized view design in covering expressions that subsume the individual work required data warehousing environment,” in VLDB, vol. 97, 1997, pp. 25–29. [25] S. Agrawal, S. Chaudhuri, and V. R. Narasayya, “Automated selection of ma- by each query. To make the search problem tractable, we used sev- terialized views and indexes in sql databases.” in VLDB, vol. 2000, 2000, pp. eral techniques: modified hash trees to quickly identify common 496–505. sub-graphs, and an algorithm to enumerate (and prune) feasible [26] X. Baril and Z. Bellahsene, “Selection of materialized views: A cost-based approach,” in Advanced Information Systems Engineering. Springer, 2003, pp. common expressions. MQO was cast as a multiple-choice knap- 665–680. sack problem: each feasible common expression was associated [27] C. Zhang and J. Yang, “Genetic algorithm for materialized view selection in data warehouse environments,” in DataWarehousing and Knowledge Discovery. with a value (representative of how much work could be shared Springer, 1999, pp. 116–125. among queries) and a weight (representative of the memory pres- [28] R. Derakhshan, F. K. Dehne, O. Korn, and B. Stantic, “Simulated annealing for sure imposed by caching the common data), and the goal was to materialized view selection in data warehousing environment.” in Databases and applications, 2006, pp. 89–94. fill a knapsack representative of memory constraints. [29] P. Kalnis, N. Mamoulis, and D. Papadias, “View selection using randomized To quantify the benefit of our approach, we implemented search,” Data & Knowledge Engineering, vol. 42, no. 1, pp. 89–111, 2002. a prototype for Apache Spark SQL, and we used well-known [30] P. Agrawal, D. Kifer, and C. Olston, “Scheduling shared scans of large data files,” VLDB Endowment, vol. 1, no. 1, pp. 958–969, 2008. workloads. Our results indicated that worksharing opportunities [31] P. Bhatotia, A. Wieder, R. Rodrigues, U. A. Acar, and R. Pasquin, “Incoop: are frequent, and that our method brings substantial benefits in Mapreduce for incremental computations,” in Proc. of ACM SoCC, 2011, p. 7. [32] I. Elghandour and A. Aboulnaga, “Restore: reusing results of mapreduce jobs,” terms of reduced query runtime, with up to an 80% reduction for VLDB Endowment, vol. 5, no. 6, pp. 586–597, 2012. a large fraction of the submitted queries. [33] B. Li, E. Mazur, Y. Diao, A. McGregor, and P. Shenoy, “A platform for scalable one-pass analytics using mapreduce,” in Proc. of ACM SIGMOD, 2011, pp. REFERENCES 985–996. [34] ——, “Scalla: a platform for scalable one-pass analytics using mapreduce,” ACM [1] J. Dean and S. Ghemawat, “Mapreduce: simplified data processing on large Trans. on Database Systems, vol. 37, no. 4, p. 27, 2012. clusters,” Comm. of the ACM, vol. 51, no. 1, pp. 107–113, 2008. [35] J. Camacho-Rodríguez, D. Colazzo, M. Herschel, I. Manolescu, and [2] M. Isard, M. Budiu, Y. Yu, A. Birrell, and D. Fetterly, “Dryad: distributed S. Roy Chowdhury, “Reuse-based optimization for pig latin,” in Proceedings data-parallel programs from sequential building blocks,” in ACM SIGOPS Op. of the 25th ACM International on Conference on Information and Knowledge Systems Review, vol. 41, no. 3, 2007, pp. 59–72. Management, ser. CIKM ’16. ACM, 2016, pp. 2215–2220. [3] M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, [36] L. L. Perez and C. M. Jermaine, “History-aware query optimization with ma- S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant ab- terialized intermediate views,” in Proceedings of the 30th IEEE International straction for in-memory cluster computing,” in Proc. of USENIX NSDI, 2012, Conference on Data Engineering, Chicago, ICDE 2014, IL, USA, March 31 - April pp. 2–2. 4, 2014, 2014, pp. 520–531. [4] P. K. Gunda, L. Ravindranath, C. A. Thekkath, Y. Yu, and L. Zhuang, “Nectar: [37] A. Jindal, S. Qiao, H. Patel, Z. Yin, J. Di, M. Bag, M. Friedman, Y. Lin, K. Karana- Automatic management of data and computation in datacenters.” in Proc. of sos, and S. Rao, “Computation reuse in analytics job service at microsoft,” in OSDI, vol. 10, 2010, pp. 1–8. Proceedings of the 2018 International Conference on Management of Data, ser. [5] J. Zhou, P.-A. Larson, J.-C. Freytag, and W. Lehner, “Efficient exploitation of SIGMOD ’18. New York, NY, USA: ACM, 2018, pp. 191–203. similar subexpressions for query processing,” in Proc. of ACM SIGMOD, 2007, [38] A. Jindal, K. Karanasos, S. Rao, and H. Patel, “Selecting subexpressions to pp. 533–544. materialize at datacenter scale,” Proc. VLDB Endow., vol. 11, no. 7, pp. 800–812, [6] T. K. Sellis, “Multiple-query optimization,” ACM Trans. Database Syst., vol. 13, Mar. 2018. no. 1, pp. 23–52, Mar. 1988. [39] T. Azim, M. Karpathiotakis, and A. Ailamaki, “Recache: Reactive caching for [7] P. Roy, S. Seshadri, S. Sudarshan, and S. Bhobe, “Efficient and extensible fast analytics over heterogeneous data,” VLDB Endowment, vol. 11, no. 3, 2017. algorithms for multi query optimization,” in ACM SIGMOD Record, vol. 29, [40] K. Dursun, C. Binnig, U. Cetintemel, and T. Kraska, “Revisiting reuse in main no. 2, 2000, pp. 249–260. memory database systems,” in Proc. of ACM SIGMOD, 2017, pp. 1275–1289. [8] J. Goldstein and P.-Å. Larson, “Optimizing queries using materialized views: a [41] A. Floratou, N. Megiddo, N. Potti, F. Özcan, U. Kale, and J. Schmitz-Hermes, practical, scalable solution,” in ACM SIGMOD Record, vol. 30, no. 2, 2001, pp. “Adaptive caching in big sql using the hdfs cache,” in Proc. of ACM SoCC, 2016, 331–342. pp. 321–333. [9] H. Mistry, P. Roy, S. Sudarshan, and K. Ramamritham, “Materialized view [42] M. G. Ivanova, M. L. Kersten, N. J. Nes, and R. A. Gonçalves, “An architecture for selection and maintenance using multi-query optimization,” in ACM SIGMOD recycling intermediates in a column-store,” ACM Trans. on Database Systems, Record, vol. 30, no. 2, 2001, pp. 307–318. vol. 35, no. 4, p. 24, 2010. [10] N. N. Dalvi, S. K. Sanghai, R. Parsan, and S. Sudarshan, “Pipelining in multi- [43] F. Nagel, P. Boncz, and S. D. Viglas, “Recycling in pipelined query evaluation,” query optimization,” in Proc. of ACM PODS, 2001, pp. 59–70. in Proc. of IEEE ICDE, 2013, pp. 338–349. [11] I. Psaroudakis, M. Athanassoulis, and A. Ailamaki, “Sharing data and work [44] R. C. Merkle, “Protocols for public key cryptosystems,” IEEE Symposium on across concurrent analytical queries,” VLDB Endowment, vol. 6, no. 9, pp. 637– Security and Privacy, p. 122, 1980. 648, Jul. 2013. [45] P. Sinha and A. A. Zoltners, “The multiple-choice knapsack problem,” Opera- [12] S. Harizopoulos, V. Shkapenyuk, and A. Ailamaki, “Qpipe: A simultaneously tions Research, vol. 27, no. 3, pp. 503–515, 1979. pipelined relational query engine,” in Proc. of ACM SIGMOD, 2005, pp. 383–394. [46] H. Kellerer, U. Pferschy, and D. Pisinger, Introduction to NP-Completeness of [13] S. Arumugam, A. Dobra, C. M. Jermaine, N. Pansare, and L. Perez, “The knapsack problems. Springer, 2004. datapath system: A data-centric analytic processing engine for large data [47] “Spark sql performance test,” https://github.com/databricks/spark-sql-perf. warehouses,” in Proc. of ACM SIGMOD, 2010, pp. 519–530. [48] P. Michiardi, D. Carra, and S. Migliorini, “Cache-based multi-query opti- [14] G. Giannikis, G. Alonso, and D. Kossmann, “Shareddb: Killing one thousand mization for data-intensive scalable computing frameworks,” arXiv preprint queries with one stone,” VLDB Endowment, vol. 5, no. 6, pp. 526–537, Feb. 2012. arXiv:1805.08650, 2018.