Needle in a haystack queries in cloud data lakes Grisha Weintraub Ehud Gudes Shlomi Dolev grishaw@post.bgu.ac.il ehud@cs.bgu.ac.il dolev@cs.bgu.ac.il Ben-Gurion University of the Negev Ben-Gurion University of the Negev Ben-Gurion University of the Negev Beer-Sheva, Israel Beer-Sheva, Israel Beer-Sheva, Israel ABSTRACT on top of data lakes. Hive [18], originally built at Facebook, was Cloud data lakes are a modern approach for storing large amounts the first such system. In Hive, SQL-like queries are compiled of data in a convenient and inexpensive way. Query engines (e.g. into a series of map-reduce jobs and are executed on MapReduce Hive, Presto, SparkSQL) are used to run SQL queries on data [25] framework. A somewhat similar approach is implemented lakes. Their main focus is on analytical queries while random in Spark SQL [13], where SQL queries are running on top of the reads are overlooked. In this paper, we present our approach for Spark engine [47]. optimizing needle in a haystack queries in cloud data lakes. The Another group of query engines do not use general-purpose main idea is to maintain an index structure that maps indexed frameworks such as MapReduce or Spark but rely on their own column values to their files. According to our analysis and exper- engines, built on the ideas from parallel databases. The first such imental evaluation, our solution imposes a reasonable storage system was Google’s Dremel [36], which inspired many modern overhead while providing an order of magnitude performance query engines; most well-known are Drill [32], Presto [42] and improvement. Impala [33]. The following techniques are commonly associated with the query engines running on data lakes: 1 INTRODUCTION • in-situ processing: In-situ means that the data is ac- Data lakes are a relatively new concept, which has recently re- cessed in-place (i.e. in HDFS or cloud object store), and ceived much attention from both the research community and there is no need to load the data into the database man- industry [2, 29, 41]. There is no formal definition of the term data agement system (DBMS). Thanks to this feature, the same lake, but it is commonly described as a centralized repository con- data lake can be queried simultaneously by different query taining very large amounts of data in their original (or minimally engines without any difficulties. processed) format. The main idea behind this approach is that in- • parallel processing: The data is accessed in parallel stead of loading the data into traditional data warehouses, which (usually by a cluster of dedicated machines). require scrupulous schema design, and a high development and • columnar storage: The main idea is that instead of stor- maintenance cost, the data is simply streamed into a distributed ing the data row by row, as is customary in relational storage system (e.g. HDFS [43]), and from that moment becomes databases, the data is stored column by column. The most available for analytics. Cloud data lakes are managed by cloud important benefits of such column-wise storage are fast providers and store their data in cloud object stores, as AWS S3 projection queries and efficient column-specific compres- [12], Google Cloud Storage [31], and Azure Blob Storage [37]. sion and encoding techniques. Some of the popular open For analytics, compute engines (e.g. Spark [47], MapReduce [25]) source formats are ORC and Parquet [28]. are used "on-demand". They access cloud data lakes via simple • data partitioning: The idea is simple and yet powerful. get/put API over the network, and this separation of storage and The data is partitioned horizontally based on some of the compute layers results in a lower cost and allows independent input columns. Then, files related to each partition are scaling of each layer (Fig. 1). stored in a different directory in the storage system (see an example below). Let us demonstrate how these techniques look in a typical real-world scenario. Consider a large enterprise company receiv- ing a lot of events (e.g. from IoT sensors) and stores them in a data lake. The data lake would look like a collection of files (Fig. 2) stored in some distributed storage system. In our example, the data lake is partitioned by year and month columns, so for example events from March 2020 are located in the path data- lake/year:2020/month:03/. Events are stored in a columnar format Parquet, so queries on specific columns would scan only relevant chunks of data. When a query engine executes a query, it scans all the relevant files in parallel and returns the result to the client. Data lakes and query engines are optimized for analytic queries Figure 1: Data Lakes Architecture and a typical query calculates aggregate values (e.g. sum, max, min) over a specific partition. For such queries, all the files under A plethora of new systems, often referred to as query engines specified partition should be scanned. [1], have been developed in recent years to support SQL queries However, sometimes users may be merely interested in find- © 2021 Copyright for this paper by its author(s). Published in the Workshop Proceed- ing a specific record in the data lake (a.k.a needle in a haystack). ings of the EDBT/ICDT 2021 Joint Conference (March 23–26, 2021, Nicosia, Cyprus) Consider for example a scenario where a specific event should on CEUR-WS.org. Use permitted under Creative Commons License Attribution 4.0 International (CC BY 4.0) be fetched from the lake based on its unique identifier for trou- bleshooting or business flow analysis. Another important use data-lake Table 1: Index content example year:2020 Column Name Column Value File Names month:02 event-id 207 file051, file033 file201.parquet event-id 350 file002 event-id 418 file170, file034 file170.parquet ... ... ... month:03 client-ip 192.168.1.15 file170 client-ip 172.16.254.1 file883, file051 file051.parquet year:2019 approach to perform query 𝑄 is to scan all 𝐹 files in parallel, month:... which is wasteful as only 𝐹 (𝑄) files should be scanned and in ... many cases |𝐹 | >> |𝐹 (𝑄)|. If we knew 𝐹 (𝑄) for a given 𝑄 we would read only relevant files and so will be able to significantly Figure 2: Data lake layout in storage system improve query performance. Thus the problem we are trying to solve in this paper can be defined as – “construct a function 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) that receives a query 𝑄 and returns 𝐹 (𝑄) - the list case is managing the General Data Protection Regulation (GDPR) of files that contain tuples that satisfy 𝑄”. compliance [45], where records related to a specific user should be found in the data lake upon user request. Unfortunately, even when the requested information resides in a single file, query engines will perform a scan of all the files in the data lake (can be millions). Obviously, this behavior results in a long query latency and an expensive computation cost. In this paper, we present our approach for optimizing "needle in a haystack" queries in cloud data lakes. The main idea is to build an index structure that maps indexed column values to the files that contain those values. We assume enormous data volumes in cloud data lakes, and hence, we need our index to be highly scalable in both compute and storage sense. For scalable Figure 3: System model compute, we utilize parallel processing paradigm [25, 47]; for scalable storage, we design a new storage format that allows Fig. 3 presents a high-level diagram of the considered system unlimited scaling while minimizing the number of network op- model. erations during reads. (1) A client submits a query to the query engine Our main contributions are as follows: (2) A query engine queries the index to get relevant file names • Development of a novel index structure that allows speed- (3) A query engine reads relevant files from the data lake ing up random queries in cloud data lakes. (4) A query engine computes query result and returns it to • A prototype implementation of our solution and its exper- the client imental evaluation in a real-world cloud data lake. An obvious approach to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) is to create The rest of the paper is structured as follows. In Section 2, we a simple inverted index [40] that stores a mapping between the formally define the problem. In section 3, we describe the pro- values of relevant columns and the files that contain these values posed solution. In section 4, we provide a performance analysis (Table 1). An open question is: what is the best way to do that of our solution. In Section 5, we introduce our proof-of-concept in a big data environment. More formally, we are looking for a implementation and provide an experimental evaluation thereof. solution that will satisfy the following constraints: In Section 6, we review related work. We conclude in Section 7. (1) scalability: we want our index to be able to handle very large and growing amounts of data 2 PROBLEM STATEMENT (2) performance: we need our index to be able to respond to We model a data lake as a set of tuples 𝐷 = {< 𝑐 1 : 𝑣 1, 𝑐 2 : the lookup queries in a reasonable time (optimally sub- 𝑣 2, ..., 𝑐𝑛 : 𝑣𝑛 >| ∀1 ≤ 𝑖 ≤ 𝑛, 𝑐𝑖 ∈ 𝐶, 𝑣𝑖 ∈ 𝑉 } where 𝐶 is the set of seconds) column names and 𝑉 is a set of column values (see notation in (3) cost: we want to minimize the monetary cost of our solu- Table 5). Data lake 𝐷 is stored in a distributed storage system as tion as much as possible a collection of 𝑁 files, denoted by 𝐹 = {𝑓1, 𝑓2, ..., 𝑓𝑁 }. The trivial solution would be to store our index in a distributed Query engines support SQL queries on data lakes. In this paper NoSQL database (e.g Cassandra [34], DynamoDB [11]) and up- our focus is on simple selection queries of type: date it with parallel compute engines (e.g Spark, MapReduce). However, while this approach may satisfy the first two of our SELECT C1, C2, ... requirements (scalability and performance), its monetary cost FROM DATA-LAKE is far from being optimal. Let us show why by using some real WHERE SOME-COLUMN = SOME-VALUE numbers. Files that contain tuples that satisfy query 𝑄 are denoted by We have two main alternatives for using a database service in 𝐹 (𝑄). As already mentioned in the previous section, the current the cloud environment: IaaS and DaaS. Table 2: Storage cost comparison index col:client-ip cost of 1TB/year pricing info references Cassandra on EC2 16,164 $ [5, 9] index03 DynamoDB 2,925 $ [4] index07 S3 276 $ [6] col:event-id index13 (1) IaaS: "Infrastructure-as-a-Service" model provides users index23 with the requested number of virtual machines where they can install DBMS and use it as they wish (e.g. Cassandra col:... on top of EC2). ... (2) DaaS: "Database-as-a-Service" model provides users with database API without the need for managing hardware or Figure 4: Index storage layout example software (e.g. DynamoDB). In Table 2 1 , we compare the monetary cost of storing data in each one of these alternatives against storing it in a cloud object store (similarly to how the data is stored in cloud data lakes). It 3 OUR APPROACH clearly can be seen that an object store is at least one order of In sections 3.1, 3.2, and 3.3 below, we explain how exactly we magnitude lower-priced than any other option. Since our focus is store, compute, and use our index. In section 3.4, we discuss on real-world cloud data lakes (i.e. petabytes and beyond scale), potential optimizations to our basic scheme. we need to support indexes of very large volumes as well (index on a unique column of data lake 𝐷 will need 𝑂 (|𝐷 |) storage). 3.1 Index Storage Thus, potential monetary savings of implementing indexing by We store our index as a collection of files partitioned by an in- using object stores, instead of the naive solution based on key- dexed column name (Fig. 4). Each index file is responsible for value stores, can be tremendous. some range of column values and stores relevant information This observation is not new, and object stores superiority about this range in a format depicted in Table 3. in cost is a well-known fact in both industry [24] and research There are two sections in the index file - data and metadata. community [44]. However, object stores are not suitable for every The data part consists of a map between a column value and a case. More specifically, when comparing them to key-value stores, list of corresponding file names ordered by a column value. The they have the following caveats: map is divided into 𝐾 chunks based on the predefined number of (1) higher latency : the actual numbers may vary, but usu- values per chunk 𝑀, such that each of the 𝐾 chunks contains a ally object stores have higher latency than DBMS [44] mapping for the 𝑀 consecutive values of the particular indexed (2) limited requests rate : while requests rate is nearly column. For example, in the index presented in Table 3, we have unlimited in scalable databases, in object stores there is an 𝑀 = 3, and the first chunk contains a mapping between 3 column upper limit (typically around thousands per second [38]) values (207, 350, 418) and the file names that contain tuples with (3) optimized for large objects : object stores are op- these values. timized for storing large objects (typically GB’s [8]) and Metadata stores information about the location of data chunks hence are not suitable for systems that need random reads inside the index file along with their "border" values. Considering (4) no random writes : the only way to mutate objects in the example in Table 3 again, from the metadata section we learn object stores is to replace them, which may be problematic that the firsts chunk’s minimal value is 207, the maximal value is for systems that need random writes 418, and its location in the file is in offsets 0:2048. For very big data lakes, we will have many index files and then While the first two points ("higher latency" and "limited re- will start experiencing the same problem we are trying to solve quests rate") are usually acceptable in OLAP systems, the last in data lakes (many irrelevant files are accessed). We solve this two ("large objects" and "no random writes") introduce signifi- issue by managing an index of indexes (i.e. root index) - a single cant implementation challenges. In our solution, we address both file that stores a mapping between the index file name and its these challenges. border values (Table 4). In Section 3.3 below, we will explain how To summarize, in this work, our research goal is twofold: the root index and metadata sections are used by the index users. • to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) and thereby improving random reads performance in cloud data lakes 3.2 Index Computation • to build our solution on top of object stores so the addi- Since data lakes store enormous volumes of data, our index will tional monetary cost imposed by our scheme will be as be very large as well. To be able to compute such an index in a low as possible reasonable time we would like to be able to compute it in parallel by using common parallel compute engines like MapReduce [25] 1 For simplicity, we ignore some factors that cannot change the general trend, like and Spark [47]. the maintenance cost of Cassandra, the cost of requests in S3 and DynamoDB, Fortunately, we can easily do that by a simple map-reduce flow. and the fact the data in object stores can be compressed. Our goal is only to show Pseudo-code (in Spark-like syntax) of the algorithm for creating that object stores are significantly cheaper than the other options. We also provide numbers for specific systems (Cassandra, DynamoDB, AWS), but the same cost an index on a specific column in a given data lake is presented in trend is preserved in other similar systems as well. Algorithm 1. Table 3: Index file example Upon receiving a "needle in a haystack" query 𝑄 and a path to index location, we first extract the column name and the column Data 207 file051, file033 value from the "where" clause of the query (line 2). We then read 350 file002 the root index and find to which index file the given value belongs 418 file170, file034 (lines 3-4). Once we know what index file has information about 513 file0002 our value, we perform the following: 680 file443, file001, file033 • we read the metadata part of the file 799 file883 • we find to which data chunk our value belongs based on ... ... the min/max values. Metadata 1 min:207|max:418|offset:0 • once found to which chunk our value belongs, we read 2 min:513|max:799|offset:2048 the (column value, List) map from the specified ... ... offset • if our value appears in the map we are done and the Table 4: Root index example method returns the file names list • otherwise we know that requested value is not in the index Column Name Min Max Index File Name so we exit with a relevant flag event-id 207 560 index13 Note that Algorithm 3 does not require parallel processing event-id 590 897 index23 and can run as a standalone function. ... ... ... ... Algorithm 1 Create Index In the map phase, each worker emits (col-value, file-name) 1: procedure CreateIndex(datalake-path, index-path, col-name) pairs and in reduce phase map results are grouped into (col- 2: datalake = read(datalake-path) 3: index = datalake.groupBy(col-name).agg(collect("f-name")) value, List) map. Then, the result is written into the 4: index-sorted = index.orderBy(col-name) distributed storage in our custom format (presented in Table 3 5: index-sorted.write.format("my.index").save(index-path) and denoted in the pseudo-code as my.index). Finally, we create 6: create-root-index(index-path) the root index file by scanning metadata sections of all the created 7: end procedure index files and taking minimum and maximum values from each file. Algorithm 1 creates an index for an existing data lake (i.e. static scenario). In a dynamic scenario, bulks of new files are being added to the lake periodically during scheduled Extract- Algorithm 2 Update Index Transform-Load (ETL) processes. To support a dynamic scenario, 1: procedure UpdateIndex(path-to-new-files, index-path, col-name) we should be able to update our index in accordance with the 2: ranges = getIndexRanges(index-path, col-name) new data. 3: delta = read(path-to-new-files).select(col-name, "f-name") Our algorithm for index update, presented in Spark-like syntax 4: labeled-delta = delta.join(broadcast(ranges)) in Algorithm 2, runs in parallel and tries to modify as few existing 5: old-index = read(labeled-delta.select(index-file-name)) index files as possible. It works like that: 6: new-index = labeled-delta.union(old-index) 7: new-index.repartition(index-file-name) • First (line 2), we read the root index to get "ranges" - a set 8: run CreateIndex on new-index and override old-index files and of tuples (min, max, index-file-name) for a given column the root-index (the result may look similar to Table 4). 9: end procedure • Then (lines 3-4), we use broadcast join technique [16] to compute "labeled-delta" - values from delta files labeled with the corresponding "index-file-name" from "ranges" set. Considering the example in Tables 1-4, the result would look as a set of triples (event-id, data-file-name, Algorithm 3 Get Files index-file-name); for example, (450, file1111, index13), (645, 1: procedure GetFiles(Q, index-path) file1112, index23). 2: col-name, col-value = extract-col-values(Q) • Finally (lines 5-8), we use index file names computed in 3: root-index = readRootIndex(index-path) labeled-delta, to identify index files that should be changed. 4: index-file = root-index.getFile(col-name, col-value) We read these files and combine their data with the delta 5: meta = readMetadata(index-file) files. Then, we apply Algorithm 1 on this data and override 6: chunk-id = meta.getChunkIdByValue(col-value) 7: map = readDataChunk(index-file, chunk-id) relevant existing index files and the root index. 8: fileNames = map.get(col-value) • Note that we need to re-partition our data by "index-file- 9: if fileNames is not empty then name" (line 7) to ensure that there will not be a mix be- 10: return fileNames tween different index files and each index file will be com- 11: else puted by a different process. 12: return VALUE-NOT-EXISTS 13: end if 3.3 Index Usage 14: end procedure In this section we explain how to use our index format to imple- ment 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) (pseudo-code is presented in Algorithm 3). 3.4 Optimizations Table 5: Notation In this section, we briefly discuss several potential optimizations that can be done to our basic scheme. We plan to deepen some Symbol Description of them in our future work. 𝐷 A data lake • Bloom filter: In Algorithm 3, we perform two reads 𝐶 Data lake column names from the index file that may contain our queried value 𝑉 Data lake column values even if the value does not exist in the data lake at all. A 𝐹 Data lake files possible solution to this issue is adding a Bloom filter [17] 𝑁 Number of files in the data lake containing all the column values in the index file to the 𝐾 Number of data chunks in index file metadata section and check it before querying the index 𝑀 Number of values in a data chunk data chunk. This way, in most cases, we will perform a 𝑄 A query single read operation for non-existing value but will have 𝑅 Read operation from remote storage a larger metadata section in each index file. 𝐹 (𝑄) Files that contain tuples that satisfy 𝑄 • Caching: By caching the root index and metadata sections |𝑥 | Size of object 𝑥 on the query engine side, we can significantly reduce the 𝐶𝑥 Cost of operation 𝑥 number of network operations per query. • Compression/Encoding: We can apply different encod- ing and compression techniques to reduce the storage data lake files, creating an index by map-reduce flow, and overhead of our index. For example, instead of storing file writing the index into a distributed storage. names lists as strings, we can use compressed bit vectors In summary, our solution imposes reasonable storage and [21]; metadata offsets can be encoded with delta encoding (offline) computation overhead but improves query performance [35]. by order of magnitude (equations 1-3). 4 PERFORMANCE ANALYSIS 5 IMPLEMENTATION AND EXPERIMENTAL In this section, we analyze our solution from a performance RESULTS perspective. We first show how our index improves queries per- We have implemented a prototype of our solution (Algorithms 1, formance and then we show what is the performance overhead 3); the implementation and extended results are available on-line imposed by it. [46]. For experimental evaluation, we cooperated with a large A cost of query 𝑄 is dominated by a cost of read operations enterprise company that manages cloud data lakes at petabyte from remote storage. When columnar format is used, irrelevant scale. We used an anonymized subset of the real data lake for our columns can be skipped, but at least some data should be read experiments. from each file. Thus, by using a notation from Table 5, we can The data lake we used is stored in AWS S3 cloud storage. The define a cost of query Q as: data lake properties are as follows: 𝐶𝑄 = 𝐶𝑅 · |𝐹 | (1) • 1,242 files in Parquet format (compressed with snappy) • 605,539,843 total records When using our index, we read only relevant data files 𝐹 (𝑄) • each record has a unique identifier (8 bytes) and perform at most three read operations from the index files • 233 columns (mainly strings but also numbers and dates) (for the root index, metadata, and data chunk). Hence, the cost is • 54.2 GB (total compressed size) reduced to: Our evaluation focused on the following stages: 𝐶𝑄 = 𝐶𝑅 · (|𝐹 (𝑄)| + 3) (2) (1) Index Creation: We created an index according to our Thus, the performance is improved by: approach (Algorithm 1) and according to a naive approach |𝐹 | based on a key-value store. Then, we compared index (3) |𝐹 (𝑄)| + 3 creation time and its storage size in each of the approaches. Since our focus is on "needle in a haystack" queries, we assume (2) Index Usage: We queried random values from the data that |𝐹 (𝑄)| is a small number (typically below 100), while |𝐹 | lake in different ways (with and without index) and com- in real-world data lakes can reach millions [42]. Thus, we can pared query execution time in different modes. expect a reduction of query cost by order of magnitude. Our solution imposes overhead in the following two dimen- 5.1 Index Creation Evaluation sions: We built an index on two data lake columns: • Storage: Additional storage required for our index has an • record-id (Long): unique identifier of each record in upper limit of |𝐷 | (in case that all columns are indexed). the data lake (distinct count - 605,539,843) In a more realistic scenario, when only a few of the data • event-id (String) : unique identifier of each event in lake columns are indexed, storage overhead is expected to the data lake (distinct count - 99,765,289), each event-id is be much lower than |𝐷 |. For example, in our experimental mapped to 6 record-ids on average. evaluation an index on unique column requires around For index creation, we used Apache Spark (2.4.4) running on 5% of the data lake size (exact numbers are presented in AWS EMR (5.29.0). EMR cluster had 15 nodes of type r5d.2xlarge Section 5.1) (8 cores, 64GB memory, 300GB SSD storage). Our index was • CreateIndex/UpdateIndex: Algorithms 1 and 2 are sup- stored in AWS S3 as a collection of Parquet files compressed with posed to run offline without affecting query performance. Snappy and with a chunk size of 10 MB; root-index was stored in Their cost is dominated by the cost of reading relevant a single line-delimited JSON file. The key-value index was stored Table 6: Index creation results approach is an order of magnitude faster than those based on a key-value store. In addition, in our approach, the storage size of column name our index (S3) Cassandra index the index is much smaller for a unique column and more or less compute time: the same for a non-unique column. compute time: 3 min 4 hours, 23 minutes record-id (pk) storage size: 1 root file - 5.5 KB storage size: 5.2 Index Usage Evaluation 13.2 GB For index usage evaluation, we performed the following test for 30 index files - 3.1 GB compute time: the indexed columns (record-id, event-id): compute time: 3 min 47 min event-id (1) get 10 random column values from the data lake storage size: storage size: (2) for each column-value, do 1 root file - 5.3 KB 4.4 GB (a) run "select * from data-lake where column = column- 30 index files - 5.2 GB value" (b) print query execution time in Apache Cassandra (3.4.4) running on AWS EC2 (3 nodes of type i3.xlarge, a replication factor of 1, LZ4 compression). Experimental results of index creation are presented in Table We executed this test in the following modes and compared 6 and Figures 5, 6. their results. Our Index (S3) Cassandra Index (1) SparkSQL with our index (Algorithm 3) Index creation time (minutes) (2) SparkSQL with Cassandra index 102 (3) SparkSQL without index (4) AWS Athena [3] without index 101 SparkSQL was executed on the same cluster as in the previ- ous section (AWS EMR with 15 nodes). AWS Athena (Amazon serverless query engine based on Presto [42]) ran on AWS Glue [7] table created on top of the data lake Parquet files. The average results of the 10 runs for each one of the modes are presented in record-id event-id Fig. 7. Columns Experimental results confirm our performance analysis in Section 4, as it clearly can be seen that our solution (as well as Figure 5: Index Creation Time Cassandra-based) outperforms existing query engines (Spark and Athena) by order of magnitude. To better understand the trade-offs between our solution and 14 Cassandra-based, we also compared their execution time of cal- Our Index (S3) Cassandra Index culating 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄). The results are presented in Fig. 8. We eval- 12 uated two versions of our approach, with and without caching of Index storage size (GB) the root index (see Section 3.4). Cassandra latency is lower than 10 that of our approach, but when using a cache, the difference is not significant (less than a second). 8 To summarize, as expected, adding indexing to data lakes dras- tically improves queries performance. In this evaluation, we have 6 demonstrated that index can be implemented in two different ways - a trivial approach based on key-value stores and our ap- 4 proach based on cloud object stores. Both approaches improve queries’ execution time in an almost identical way (the difference record-id event-id is in milliseconds resolution). However, our approach has the Columns following advantages over the naive solution: Figure 6: Index Creation Storage (1) order of magnitude monetary cost improvement (Table 2) It is worth noting, that even though our primary reason to (2) order of magnitude index creation time improvement (Fig. implement the data lake index on top of object stores was moti- 5) vated by reducing the monetary cost, our experiments provide (3) significantly less storage overhead in some cases (Fig. 6) an additional benefit of this decision. Index creation time of our 80 Our Index (S3) At a high level, our solution resembles a well-known inverted Cassandra Index index structure which is extensively used in full-text search en- Query execution time (seconds) No Index Spark gines [14, 20, 40, 48]. Inverted index maps words to the lists of 60 No Index Athena documents that contain them, and for data sets of the moderate size, it can be implemented with the standard index structures as a B+ tree or a hash index [40]. For "big data" volumes, distributed 40 inverted indexes were proposed [48]. Two main techniques used in distributed inverted indexing are document-based [14] and 20 term-based [20]. Both these techniques assume combined com- pute and storage layers, while in our system model, the main assumption is that compute and storage are separated. 0 In [27], the authors study inverted indexes in dataspaces ("col- record-id event-id lections of heterogeneous and partially unstructured data"). Their Columns main focus is on the heterogeneity of data and very large indexes are not considered (index implementation is based on a single- Figure 7: Data Lake Queries Latency node Lucene engine [15]); in our solution, the focus is on struc- tured data, yet we expect very large volumes of both data and indexes. In [26], indexing in big data is achieved by enhancing the MapReduce load process via user-defined functions (UDF). However, this solution is limited to a particular system (Hadoop), Our Index (S3) where compute (MapReduce) and storage (HDFS) components 2,000 Our Index + Cache (S3) are running on the same machines; our focus is on disaggregated Get files execution time (ms) Cassandra Index architecture. System model similar to ours is considered in [19], 1,500 as well as in multiple industry approaches (e.g. [10]), however in these approaches index is stored in a key-value database, and as we show in this work it has two severe drawbacks comparing 1,000 to our solution - significantly higher monetary cost and much longer load time. 500 To the best of our knowledge, our work is the first research attempt to improve queries performance in cloud data lakes by building indexing on top of object stores. However, two indus- 0 try projects focus on the same domain - Microsoft Hyperspace record-id event-id [39] and Databricks Delta Lake [23]. Both assume data lakes Columns architecture and try to improve queries performance by using indexes stored in object stores. Below we briefly discuss how Figure 8: Get Files Latency these projects differ from our approach based on the available online information about these systems [23, 39]. 6 RELATED WORK Data lakes are a relatively new approach for storing and ana- lyzing large amounts of data. As a new technology, it still lacks • Hyperspace currently uses only "covering indexes" and a formal definition, and in existing literature, different authors plan to support other index types in the future. Covering assign different (and sometimes even contradictory) meanings indexes are built upon user-provided columns lists "in- to this concept [29, 41]. In this paper, we stick to the definition dexed columns" and "data columns", and aim to improve from the recent Seattle Report on Database Research [2], where the performance of the queries that search by "indexed the main characteristic of the data lakes architecture is that it columns" and retrieve "data columns". The implementation "disaggregates compute and storage, so they can scale indepen- is based on copying both "indexed" and "data" columns dently." from the data lake and storing them in a columnar format There are several research challenges in the data lakes domain. sorted by "indexed" columns. While covering indexes can The most fundamental ones are knowledge discovery [30] and improve some types of queries, they are not suitable for various optimizations [2]. We focus on the latter and study opti- "needle in a haystack" queries where retrieval of all the mization of a specific scenario in data lake systems - "needle in a columns should be supported (e.g. for GDPR scenario). haystack" queries. Our solution can be seen as a combination of • Delta Lake uses Bloom filter indexes in the following different techniques from databases and big data domains. Thus, way. Each file has an associated file that contains a Bloom we use partitions as in [18] to organize our index according to filter containing values of indexed columns from this file. columns. For scalability, we use root-index as in B+ trees [40] and Then, upon a user query, Bloom filters are checked before some distributed databases (e.g. [22]). The format of our index reading the files, and the file is read only if the value was files is similar to columnar formats [28] with multiple chunks found in the corresponding Bloom filter. We use a similar of data and a single metadata section. We use Bloom filters [17] technique (Section 3.4), but in our case, only a single Bloom to improve the performance of queries on non-existing values. filter will be read for a particular value, while in Delta Lake Parallel processing paradigm [25, 47] is used for index creation all existing Bloom filters should be read. and updating. 7 CONCLUSION [18] Jesús Camacho-Rodríguez, Ashutosh Chauhan, Alan Gates, Eugene Koifman, Owen O’Malley, Vineet Garg, Zoltan Haindrich, Sergey Shelukhin, Prasanth In recent years, cloud data lakes have become a prevalent way of Jayachandran, Siddharth Seth, et al. 2019. Apache hive: From mapreduce to storing large amounts of data. The main implication is a separa- enterprise-grade big data warehousing. In Proceedings of the 2019 International Conference on Management of Data (SIGMOD). 1773–1786. tion between the storage and compute layers, and as a result, a [19] Jesús Camacho-Rodríguez, Dario Colazzo, and Ioana Manolescu. 2013. Web rise of new technologies in both compute and storage domains. data indexing in the cloud: efficiency and cost reductions. In Proceedings of Query engines, the main player in the compute domain, are de- the 16th International Conference on Extending Database Technology (EDBT). 41–52. signed to run on in-place data that is usually stored in a columnar [20] B Barla Cambazoglu et al. 2013. A term-based inverted index partitioning format. Their main focus is on analytical queries while random model for efficient distributed query processing. ACM Transactions on the reads are overlooked. Web (TWEB) 7, 3 (2013), 1–23. [21] Samy Chambi et al. 2016. Better bitmap performance with roaring bitmaps. In this paper, we present our approach for optimizing needle Software: practice and experience 46, 5 (2016), 709–719. in a haystack queries in cloud data lakes. The main idea is to [22] Fay Chang et al. 2008. Bigtable: A distributed storage system for structured data. ACM Transactions on Computer Systems (TOCS) 26, 2 (2008), 1–26. maintain an index structure that maps indexed column values [23] Databricks. 2020. Delta Lake. Retrieved November 14, 2020 from https: to their files. We built our solution in accordance with data lake //docs.databricks.com/delta architecture, where the storage is completely separated from the [24] Databricks. 2020. Top 5 Reasons for Choosing S3 over HDFS. Re- trieved November 14, 2020 from https://databricks.com/blog/2017/05/31/ compute. The reason for building our index on top of object top-5-reasons-for-choosing-s3-over-hdfs.html stores was motivated by the observation that the monetary cost [25] Jeffrey Dean and Sanjay Ghemawat. 2008. MapReduce: simplified data pro- of this solution is significantly lower than the one based on DBMS. cessing on large clusters. Commun. ACM 51, 1 (2008), 107–113. [26] Jens Dittrich et al. 2010. Hadoop++ making a yellow elephant run like a Our experimental evaluation provides an additional reason to cheetah (without it even noticing). Proceedings of the VLDB Endowment 3, 1-2 favor object stores over the DMBS approach - much lower index (2010), 515–529. [27] Xin Dong and Alon Halevy. 2007. Indexing dataspaces. In Proceedings of the computation time. 2007 ACM SIGMOD international conference on Management of data (SIGMOD). For future work, we are planning to extend our index scheme 43–54. and support more complex query types (e.g. joins, group by). [28] Avrilia Floratou et al. 2014. Sql-on-hadoop: Full circle back to shared-nothing database architectures. Proceedings of the VLDB Endowment 7, 12 (2014), An additional research direction may be an implementation of 1295–1306. different systems according to data lake architecture and thereby [29] Corinna Giebler et al. 2019. Leveraging the Data Lake: Current State and improving their monetary cost and load time. For example, im- Challenges. In International Conference on Big Data Analytics and Knowledge Discovery (DaWaK). Springer, 179–188. plementing a key-value store on top of object stores seems to be [30] Paolo Giudice et al. 2019. An approach to extracting complex knowledge a promising research direction which can be based on the ideas patterns among concepts belonging to structured, semi-structured and un- structured sources in a data lake. Information Sciences 478 (2019), 606–626. presented in this work. [31] Google. 2020. Google Cloud Storage. Retrieved November 14, 2020 from https://cloud.google.com/storage REFERENCES [32] Michael Hausenblas and Jacques Nadeau. 2013. Apache drill: interactive ad-hoc analysis at scale. Big Data 1, 2 (2013), 100–104. [1] Daniel Abadi et al. 2015. SQL-on-hadoop systems: tutorial. Proceedings of the [33] Marcel Kornacker et al. 2015. Impala: A Modern, Open-Source SQL Engine VLDB Endowment 8, 12 (2015), 2050–2051. for Hadoop.. In Proceedings of the 7th Conference on Innovative Data Systems [2] Daniel Abadi et al. 2020. The Seattle Report on Database Research. ACM Research (CIDR), Vol. 1. 9. SIGMOD Record 48, 4 (2020), 44–53. [34] Avinash Lakshman and Prashant Malik. 2010. Cassandra: a decentralized [3] Amazon. 2020. Amazon Athena. Retrieved November 14, 2020 from https: structured storage system. ACM SIGOPS Operating Systems Review 44, 2 //aws.amazon.com/athena (2010), 35–40. [4] Amazon. 2020. Amazon DynamoDB On-Demand Pricing. Retrieved November [35] Daniel Lemire and Leonid Boytsov. 2015. Decoding billions of integers per 14, 2020 from https://aws.amazon.com/dynamodb/pricing/on-demand/ second through vectorization. Software: Practice and Experience 45, 1 (2015), [5] Amazon. 2020. Amazon EC2 On-Demand Pricing. Retrieved November 14, 1–29. 2020 from https://aws.amazon.com/ec2/pricing/on-demand/ [36] Sergey Melnik et al. 2011. Dremel: interactive analysis of web-scale datasets. [6] Amazon. 2020. Amazon S3 pricing. Retrieved November 14, 2020 from Commun. ACM 54, 6 (2011), 114–123. https://aws.amazon.com/s3/pricing/ [37] Microsoft. 2020. Azure Blob Storage. Retrieved November 14, 2020 from [7] Amazon. 2020. AWS Glue. Retrieved November 14, 2020 from https://aws. https://azure.microsoft.com/services/storage/blobs/ amazon.com/glue [38] Microsoft. 2020. Azure subscription and service limits, quotas, and constraints. [8] Amazon. 2020. Best Practices for Amazon EMR. Retrieved November 14, 2020 Retrieved November 14, 2020 from https://docs.microsoft.com/en-us/azure/ from https://d0.awsstatic.com/whitepapers/aws-amazon-emr-best-practices. azure-resource-manager/management/azure-subscription-service-limits pdf [39] Microsoft. 2020. Hyperspace. Retrieved November 14, 2020 from https: [9] Amazon. 2020. Best Practices for Running Apache Cassandra on Amazon EC2. //microsoft.github.io/hyperspace Retrieved November 14, 2020 from https://aws.amazon.com/blogs/big-data/ [40] Raghu Ramakrishnan and Johannes Gehrke. 2000. Database management best-practices-for-running-apache-cassandra-on-amazon-ec2/ systems. McGraw-Hill. [10] Amazon. 2020. Building and Maintaining an Amazon [41] Franck Ravat and Yan Zhao. 2019. Data lakes: Trends and perspectives. In S3 Metadata Index without Servers. Retrieved Novem- International Conference on Database and Expert Systems Applications (DEXA). ber 14, 2020 from https://aws.amazon.com/blogs/big-data/ Springer, 304–313. building-and-maintaining-an-amazon-s3-metadata-index-without-servers/ [42] Raghav Sethi, Martin Traverso, Dain Sundstrom, David Phillips, Wenlei Xie, [11] Amazon. 2020. DynamoDB. Retrieved November 14, 2020 from https://aws. Yutian Sun, Nezih Yegitbasi, Haozhun Jin, Eric Hwang, Nileema Shingte, et al. amazon.com/dynamodb/ 2019. Presto: SQL on everything. In Proceedings of the 35th International [12] Amazon. 2020. S3. Retrieved November 14, 2020 from https://aws.amazon. Conference on Data Engineering (ICDE). IEEE, 1802–1813. com/s3/ [43] Konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler. [13] Michael Armbrust, Reynold S Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K 2010. The hadoop distributed file system. In Proceedings of the 26th Symposium Bradley, Xiangrui Meng, Tomer Kaftan, Michael J Franklin, Ali Ghodsi, et al. on Mass Storage Systems and Technologies (MSST). IEEE, 1–10. 2015. Spark sql: Relational data processing in spark. In Proceedings of the 2015 [44] Junjay Tan et al. 2019. Choosing a cloud DBMS: architectures and tradeoffs. International Conference on Management of Data (SIGMOD). 1383–1394. Proceedings of the VLDB Endowment 12, 12 (2019), 2170–2182. [14] Luiz André Barroso, Jeffrey Dean, and Urs Holzle. 2003. Web search for a [45] IT Governance Privacy Team. 2020. EU General Data Protection Regulation planet: The Google cluster architecture. IEEE micro 23, 2 (2003), 22–28. (GDPR)–An implementation and compliance guide. IT Governance Ltd. [15] Andrzej Białecki, Robert Muir, Grant Ingersoll, and Lucid Imagination. 2012. [46] Grisha Weintraub. 2020. data-lake-index. Retrieved November 14, 2020 from Apache lucene 4. In SIGIR 2012 workshop on open source information retrieval. https://github.com/grishaw/data-lake-index 17. [47] Matei Zaharia et al. 2016. Apache spark: a unified engine for big data process- [16] Spyros Blanas, Jignesh M Patel, Vuk Ercegovac, Jun Rao, Eugene J Shekita, and ing. Commun. ACM 59, 11 (2016), 56–65. Yuanyuan Tian. 2010. A comparison of join algorithms for log processing in [48] Justin Zobel and Alistair Moffat. 2006. Inverted files for text search engines. mapreduce. In Proceedings of the 2010 ACM SIGMOD International Conference ACM computing surveys (CSUR) 38, 2 (2006), 6–es. on Management of data (SIGMOD). 975–986. [17] B. H. Bloom. 1970. Space/time trade-offs in hash coding with allowable errors. Commun. ACM 13, 7 (1970), 422–426.