=Paper=
{{Paper
|id=Vol-2841/DARLI-AP_1
|storemode=property
|title=Needle in a haystack queries in cloud data lakes
|pdfUrl=https://ceur-ws.org/Vol-2841/DARLI-AP_1.pdf
|volume=Vol-2841
|authors=Grisha Weintraub,Ehud Gudes,Shlomi Dolev
|dblpUrl=https://dblp.org/rec/conf/edbt/WeintraubGD21
}}
==Needle in a haystack queries in cloud data lakes==
Needle in a haystack queries in cloud data lakes
Grisha Weintraub Ehud Gudes Shlomi Dolev
grishaw@post.bgu.ac.il ehud@cs.bgu.ac.il dolev@cs.bgu.ac.il
Ben-Gurion University of the Negev Ben-Gurion University of the Negev Ben-Gurion University of the Negev
Beer-Sheva, Israel Beer-Sheva, Israel Beer-Sheva, Israel
ABSTRACT on top of data lakes. Hive [18], originally built at Facebook, was
Cloud data lakes are a modern approach for storing large amounts the first such system. In Hive, SQL-like queries are compiled
of data in a convenient and inexpensive way. Query engines (e.g. into a series of map-reduce jobs and are executed on MapReduce
Hive, Presto, SparkSQL) are used to run SQL queries on data [25] framework. A somewhat similar approach is implemented
lakes. Their main focus is on analytical queries while random in Spark SQL [13], where SQL queries are running on top of the
reads are overlooked. In this paper, we present our approach for Spark engine [47].
optimizing needle in a haystack queries in cloud data lakes. The Another group of query engines do not use general-purpose
main idea is to maintain an index structure that maps indexed frameworks such as MapReduce or Spark but rely on their own
column values to their files. According to our analysis and exper- engines, built on the ideas from parallel databases. The first such
imental evaluation, our solution imposes a reasonable storage system was Google’s Dremel [36], which inspired many modern
overhead while providing an order of magnitude performance query engines; most well-known are Drill [32], Presto [42] and
improvement. Impala [33].
The following techniques are commonly associated with the
query engines running on data lakes:
1 INTRODUCTION
• in-situ processing: In-situ means that the data is ac-
Data lakes are a relatively new concept, which has recently re-
cessed in-place (i.e. in HDFS or cloud object store), and
ceived much attention from both the research community and
there is no need to load the data into the database man-
industry [2, 29, 41]. There is no formal definition of the term data
agement system (DBMS). Thanks to this feature, the same
lake, but it is commonly described as a centralized repository con-
data lake can be queried simultaneously by different query
taining very large amounts of data in their original (or minimally
engines without any difficulties.
processed) format. The main idea behind this approach is that in-
• parallel processing: The data is accessed in parallel
stead of loading the data into traditional data warehouses, which
(usually by a cluster of dedicated machines).
require scrupulous schema design, and a high development and
• columnar storage: The main idea is that instead of stor-
maintenance cost, the data is simply streamed into a distributed
ing the data row by row, as is customary in relational
storage system (e.g. HDFS [43]), and from that moment becomes
databases, the data is stored column by column. The most
available for analytics. Cloud data lakes are managed by cloud
important benefits of such column-wise storage are fast
providers and store their data in cloud object stores, as AWS S3
projection queries and efficient column-specific compres-
[12], Google Cloud Storage [31], and Azure Blob Storage [37].
sion and encoding techniques. Some of the popular open
For analytics, compute engines (e.g. Spark [47], MapReduce [25])
source formats are ORC and Parquet [28].
are used "on-demand". They access cloud data lakes via simple
• data partitioning: The idea is simple and yet powerful.
get/put API over the network, and this separation of storage and
The data is partitioned horizontally based on some of the
compute layers results in a lower cost and allows independent
input columns. Then, files related to each partition are
scaling of each layer (Fig. 1).
stored in a different directory in the storage system (see
an example below).
Let us demonstrate how these techniques look in a typical
real-world scenario. Consider a large enterprise company receiv-
ing a lot of events (e.g. from IoT sensors) and stores them in a
data lake. The data lake would look like a collection of files (Fig.
2) stored in some distributed storage system. In our example,
the data lake is partitioned by year and month columns, so for
example events from March 2020 are located in the path data-
lake/year:2020/month:03/. Events are stored in a columnar format
Parquet, so queries on specific columns would scan only relevant
chunks of data. When a query engine executes a query, it scans
all the relevant files in parallel and returns the result to the client.
Data lakes and query engines are optimized for analytic queries
Figure 1: Data Lakes Architecture
and a typical query calculates aggregate values (e.g. sum, max,
min) over a specific partition. For such queries, all the files under
A plethora of new systems, often referred to as query engines specified partition should be scanned.
[1], have been developed in recent years to support SQL queries However, sometimes users may be merely interested in find-
© 2021 Copyright for this paper by its author(s). Published in the Workshop Proceed- ing a specific record in the data lake (a.k.a needle in a haystack).
ings of the EDBT/ICDT 2021 Joint Conference (March 23–26, 2021, Nicosia, Cyprus) Consider for example a scenario where a specific event should
on CEUR-WS.org. Use permitted under Creative Commons License Attribution 4.0
International (CC BY 4.0) be fetched from the lake based on its unique identifier for trou-
bleshooting or business flow analysis. Another important use
data-lake Table 1: Index content example
year:2020
Column Name Column Value File Names
month:02
event-id 207 file051, file033
file201.parquet event-id 350 file002
event-id 418 file170, file034
file170.parquet
... ... ...
month:03 client-ip 192.168.1.15 file170
client-ip 172.16.254.1 file883, file051
file051.parquet
year:2019 approach to perform query 𝑄 is to scan all 𝐹 files in parallel,
month:... which is wasteful as only 𝐹 (𝑄) files should be scanned and in
... many cases |𝐹 | >> |𝐹 (𝑄)|. If we knew 𝐹 (𝑄) for a given 𝑄 we
would read only relevant files and so will be able to significantly
Figure 2: Data lake layout in storage system improve query performance. Thus the problem we are trying
to solve in this paper can be defined as – “construct a function
𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) that receives a query 𝑄 and returns 𝐹 (𝑄) - the list
case is managing the General Data Protection Regulation (GDPR) of files that contain tuples that satisfy 𝑄”.
compliance [45], where records related to a specific user should
be found in the data lake upon user request. Unfortunately, even
when the requested information resides in a single file, query
engines will perform a scan of all the files in the data lake (can be
millions). Obviously, this behavior results in a long query latency
and an expensive computation cost.
In this paper, we present our approach for optimizing "needle
in a haystack" queries in cloud data lakes. The main idea is to
build an index structure that maps indexed column values to
the files that contain those values. We assume enormous data
volumes in cloud data lakes, and hence, we need our index to be
highly scalable in both compute and storage sense. For scalable Figure 3: System model
compute, we utilize parallel processing paradigm [25, 47]; for
scalable storage, we design a new storage format that allows Fig. 3 presents a high-level diagram of the considered system
unlimited scaling while minimizing the number of network op- model.
erations during reads. (1) A client submits a query to the query engine
Our main contributions are as follows: (2) A query engine queries the index to get relevant file names
• Development of a novel index structure that allows speed- (3) A query engine reads relevant files from the data lake
ing up random queries in cloud data lakes. (4) A query engine computes query result and returns it to
• A prototype implementation of our solution and its exper- the client
imental evaluation in a real-world cloud data lake. An obvious approach to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) is to create
The rest of the paper is structured as follows. In Section 2, we a simple inverted index [40] that stores a mapping between the
formally define the problem. In section 3, we describe the pro- values of relevant columns and the files that contain these values
posed solution. In section 4, we provide a performance analysis (Table 1). An open question is: what is the best way to do that
of our solution. In Section 5, we introduce our proof-of-concept in a big data environment. More formally, we are looking for a
implementation and provide an experimental evaluation thereof. solution that will satisfy the following constraints:
In Section 6, we review related work. We conclude in Section 7. (1) scalability: we want our index to be able to handle very
large and growing amounts of data
2 PROBLEM STATEMENT (2) performance: we need our index to be able to respond to
We model a data lake as a set of tuples 𝐷 = {< 𝑐 1 : 𝑣 1, 𝑐 2 : the lookup queries in a reasonable time (optimally sub-
𝑣 2, ..., 𝑐𝑛 : 𝑣𝑛 >| ∀1 ≤ 𝑖 ≤ 𝑛, 𝑐𝑖 ∈ 𝐶, 𝑣𝑖 ∈ 𝑉 } where 𝐶 is the set of seconds)
column names and 𝑉 is a set of column values (see notation in (3) cost: we want to minimize the monetary cost of our solu-
Table 5). Data lake 𝐷 is stored in a distributed storage system as tion as much as possible
a collection of 𝑁 files, denoted by 𝐹 = {𝑓1, 𝑓2, ..., 𝑓𝑁 }. The trivial solution would be to store our index in a distributed
Query engines support SQL queries on data lakes. In this paper NoSQL database (e.g Cassandra [34], DynamoDB [11]) and up-
our focus is on simple selection queries of type: date it with parallel compute engines (e.g Spark, MapReduce).
However, while this approach may satisfy the first two of our
SELECT C1, C2, ... requirements (scalability and performance), its monetary cost
FROM DATA-LAKE is far from being optimal. Let us show why by using some real
WHERE SOME-COLUMN = SOME-VALUE numbers.
Files that contain tuples that satisfy query 𝑄 are denoted by We have two main alternatives for using a database service in
𝐹 (𝑄). As already mentioned in the previous section, the current the cloud environment: IaaS and DaaS.
Table 2: Storage cost comparison index
col:client-ip
cost of 1TB/year pricing info references
Cassandra on EC2 16,164 $ [5, 9] index03
DynamoDB 2,925 $ [4] index07
S3 276 $ [6]
col:event-id
index13
(1) IaaS: "Infrastructure-as-a-Service" model provides users
index23
with the requested number of virtual machines where they
can install DBMS and use it as they wish (e.g. Cassandra col:...
on top of EC2). ...
(2) DaaS: "Database-as-a-Service" model provides users with
database API without the need for managing hardware or Figure 4: Index storage layout example
software (e.g. DynamoDB).
In Table 2 1 , we compare the monetary cost of storing data in
each one of these alternatives against storing it in a cloud object
store (similarly to how the data is stored in cloud data lakes). It
3 OUR APPROACH
clearly can be seen that an object store is at least one order of In sections 3.1, 3.2, and 3.3 below, we explain how exactly we
magnitude lower-priced than any other option. Since our focus is store, compute, and use our index. In section 3.4, we discuss
on real-world cloud data lakes (i.e. petabytes and beyond scale), potential optimizations to our basic scheme.
we need to support indexes of very large volumes as well (index
on a unique column of data lake 𝐷 will need 𝑂 (|𝐷 |) storage). 3.1 Index Storage
Thus, potential monetary savings of implementing indexing by We store our index as a collection of files partitioned by an in-
using object stores, instead of the naive solution based on key- dexed column name (Fig. 4). Each index file is responsible for
value stores, can be tremendous. some range of column values and stores relevant information
This observation is not new, and object stores superiority about this range in a format depicted in Table 3.
in cost is a well-known fact in both industry [24] and research There are two sections in the index file - data and metadata.
community [44]. However, object stores are not suitable for every The data part consists of a map between a column value and a
case. More specifically, when comparing them to key-value stores, list of corresponding file names ordered by a column value. The
they have the following caveats: map is divided into 𝐾 chunks based on the predefined number of
(1) higher latency : the actual numbers may vary, but usu- values per chunk 𝑀, such that each of the 𝐾 chunks contains a
ally object stores have higher latency than DBMS [44] mapping for the 𝑀 consecutive values of the particular indexed
(2) limited requests rate : while requests rate is nearly column. For example, in the index presented in Table 3, we have
unlimited in scalable databases, in object stores there is an 𝑀 = 3, and the first chunk contains a mapping between 3 column
upper limit (typically around thousands per second [38]) values (207, 350, 418) and the file names that contain tuples with
(3) optimized for large objects : object stores are op- these values.
timized for storing large objects (typically GB’s [8]) and Metadata stores information about the location of data chunks
hence are not suitable for systems that need random reads inside the index file along with their "border" values. Considering
(4) no random writes : the only way to mutate objects in the example in Table 3 again, from the metadata section we learn
object stores is to replace them, which may be problematic that the firsts chunk’s minimal value is 207, the maximal value is
for systems that need random writes 418, and its location in the file is in offsets 0:2048.
For very big data lakes, we will have many index files and then
While the first two points ("higher latency" and "limited re-
will start experiencing the same problem we are trying to solve
quests rate") are usually acceptable in OLAP systems, the last
in data lakes (many irrelevant files are accessed). We solve this
two ("large objects" and "no random writes") introduce signifi-
issue by managing an index of indexes (i.e. root index) - a single
cant implementation challenges. In our solution, we address both
file that stores a mapping between the index file name and its
these challenges.
border values (Table 4). In Section 3.3 below, we will explain how
To summarize, in this work, our research goal is twofold:
the root index and metadata sections are used by the index users.
• to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) and thereby improving random
reads performance in cloud data lakes 3.2 Index Computation
• to build our solution on top of object stores so the addi-
Since data lakes store enormous volumes of data, our index will
tional monetary cost imposed by our scheme will be as
be very large as well. To be able to compute such an index in a
low as possible
reasonable time we would like to be able to compute it in parallel
by using common parallel compute engines like MapReduce [25]
1 For simplicity, we ignore some factors that cannot change the general trend, like and Spark [47].
the maintenance cost of Cassandra, the cost of requests in S3 and DynamoDB, Fortunately, we can easily do that by a simple map-reduce flow.
and the fact the data in object stores can be compressed. Our goal is only to show Pseudo-code (in Spark-like syntax) of the algorithm for creating
that object stores are significantly cheaper than the other options. We also provide
numbers for specific systems (Cassandra, DynamoDB, AWS), but the same cost an index on a specific column in a given data lake is presented in
trend is preserved in other similar systems as well. Algorithm 1.
Table 3: Index file example Upon receiving a "needle in a haystack" query 𝑄 and a path to
index location, we first extract the column name and the column
Data 207 file051, file033 value from the "where" clause of the query (line 2). We then read
350 file002 the root index and find to which index file the given value belongs
418 file170, file034 (lines 3-4). Once we know what index file has information about
513 file0002 our value, we perform the following:
680 file443, file001, file033
• we read the metadata part of the file
799 file883
• we find to which data chunk our value belongs based on
... ...
the min/max values.
Metadata 1 min:207|max:418|offset:0 • once found to which chunk our value belongs, we read
2 min:513|max:799|offset:2048 the (column value, List) map from the specified
... ... offset
• if our value appears in the map we are done and the
Table 4: Root index example method returns the file names list
• otherwise we know that requested value is not in the index
Column Name Min Max Index File Name so we exit with a relevant flag
event-id 207 560 index13 Note that Algorithm 3 does not require parallel processing
event-id 590 897 index23 and can run as a standalone function.
... ... ... ...
Algorithm 1 Create Index
In the map phase, each worker emits (col-value, file-name) 1: procedure CreateIndex(datalake-path, index-path, col-name)
pairs and in reduce phase map results are grouped into (col- 2: datalake = read(datalake-path)
3: index = datalake.groupBy(col-name).agg(collect("f-name"))
value, List) map. Then, the result is written into the
4: index-sorted = index.orderBy(col-name)
distributed storage in our custom format (presented in Table 3
5: index-sorted.write.format("my.index").save(index-path)
and denoted in the pseudo-code as my.index). Finally, we create 6: create-root-index(index-path)
the root index file by scanning metadata sections of all the created 7: end procedure
index files and taking minimum and maximum values from each
file.
Algorithm 1 creates an index for an existing data lake (i.e.
static scenario). In a dynamic scenario, bulks of new files are
being added to the lake periodically during scheduled Extract- Algorithm 2 Update Index
Transform-Load (ETL) processes. To support a dynamic scenario, 1: procedure UpdateIndex(path-to-new-files, index-path, col-name)
we should be able to update our index in accordance with the 2: ranges = getIndexRanges(index-path, col-name)
new data. 3: delta = read(path-to-new-files).select(col-name, "f-name")
Our algorithm for index update, presented in Spark-like syntax 4: labeled-delta = delta.join(broadcast(ranges))
in Algorithm 2, runs in parallel and tries to modify as few existing 5: old-index = read(labeled-delta.select(index-file-name))
index files as possible. It works like that: 6: new-index = labeled-delta.union(old-index)
7: new-index.repartition(index-file-name)
• First (line 2), we read the root index to get "ranges" - a set 8: run CreateIndex on new-index and override old-index files and
of tuples (min, max, index-file-name) for a given column the root-index
(the result may look similar to Table 4). 9: end procedure
• Then (lines 3-4), we use broadcast join technique [16] to
compute "labeled-delta" - values from delta files labeled
with the corresponding "index-file-name" from "ranges"
set. Considering the example in Tables 1-4, the result
would look as a set of triples (event-id, data-file-name, Algorithm 3 Get Files
index-file-name); for example, (450, file1111, index13), (645, 1: procedure GetFiles(Q, index-path)
file1112, index23). 2: col-name, col-value = extract-col-values(Q)
• Finally (lines 5-8), we use index file names computed in 3: root-index = readRootIndex(index-path)
labeled-delta, to identify index files that should be changed. 4: index-file = root-index.getFile(col-name, col-value)
We read these files and combine their data with the delta 5: meta = readMetadata(index-file)
files. Then, we apply Algorithm 1 on this data and override 6: chunk-id = meta.getChunkIdByValue(col-value)
7: map = readDataChunk(index-file, chunk-id)
relevant existing index files and the root index.
8: fileNames = map.get(col-value)
• Note that we need to re-partition our data by "index-file- 9: if fileNames is not empty then
name" (line 7) to ensure that there will not be a mix be- 10: return fileNames
tween different index files and each index file will be com- 11: else
puted by a different process. 12: return VALUE-NOT-EXISTS
13: end if
3.3 Index Usage 14: end procedure
In this section we explain how to use our index format to imple-
ment 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) (pseudo-code is presented in Algorithm 3).
3.4 Optimizations Table 5: Notation
In this section, we briefly discuss several potential optimizations
that can be done to our basic scheme. We plan to deepen some Symbol Description
of them in our future work. 𝐷 A data lake
• Bloom filter: In Algorithm 3, we perform two reads 𝐶 Data lake column names
from the index file that may contain our queried value 𝑉 Data lake column values
even if the value does not exist in the data lake at all. A 𝐹 Data lake files
possible solution to this issue is adding a Bloom filter [17] 𝑁 Number of files in the data lake
containing all the column values in the index file to the 𝐾 Number of data chunks in index file
metadata section and check it before querying the index 𝑀 Number of values in a data chunk
data chunk. This way, in most cases, we will perform a 𝑄 A query
single read operation for non-existing value but will have 𝑅 Read operation from remote storage
a larger metadata section in each index file. 𝐹 (𝑄) Files that contain tuples that satisfy 𝑄
• Caching: By caching the root index and metadata sections |𝑥 | Size of object 𝑥
on the query engine side, we can significantly reduce the 𝐶𝑥 Cost of operation 𝑥
number of network operations per query.
• Compression/Encoding: We can apply different encod-
ing and compression techniques to reduce the storage data lake files, creating an index by map-reduce flow, and
overhead of our index. For example, instead of storing file writing the index into a distributed storage.
names lists as strings, we can use compressed bit vectors In summary, our solution imposes reasonable storage and
[21]; metadata offsets can be encoded with delta encoding (offline) computation overhead but improves query performance
[35]. by order of magnitude (equations 1-3).
4 PERFORMANCE ANALYSIS 5 IMPLEMENTATION AND EXPERIMENTAL
In this section, we analyze our solution from a performance RESULTS
perspective. We first show how our index improves queries per- We have implemented a prototype of our solution (Algorithms 1,
formance and then we show what is the performance overhead 3); the implementation and extended results are available on-line
imposed by it. [46]. For experimental evaluation, we cooperated with a large
A cost of query 𝑄 is dominated by a cost of read operations enterprise company that manages cloud data lakes at petabyte
from remote storage. When columnar format is used, irrelevant scale. We used an anonymized subset of the real data lake for our
columns can be skipped, but at least some data should be read experiments.
from each file. Thus, by using a notation from Table 5, we can The data lake we used is stored in AWS S3 cloud storage. The
define a cost of query Q as: data lake properties are as follows:
𝐶𝑄 = 𝐶𝑅 · |𝐹 | (1) • 1,242 files in Parquet format (compressed with snappy)
• 605,539,843 total records
When using our index, we read only relevant data files 𝐹 (𝑄)
• each record has a unique identifier (8 bytes)
and perform at most three read operations from the index files
• 233 columns (mainly strings but also numbers and dates)
(for the root index, metadata, and data chunk). Hence, the cost is
• 54.2 GB (total compressed size)
reduced to:
Our evaluation focused on the following stages:
𝐶𝑄 = 𝐶𝑅 · (|𝐹 (𝑄)| + 3) (2)
(1) Index Creation: We created an index according to our
Thus, the performance is improved by:
approach (Algorithm 1) and according to a naive approach
|𝐹 | based on a key-value store. Then, we compared index
(3)
|𝐹 (𝑄)| + 3 creation time and its storage size in each of the approaches.
Since our focus is on "needle in a haystack" queries, we assume (2) Index Usage: We queried random values from the data
that |𝐹 (𝑄)| is a small number (typically below 100), while |𝐹 | lake in different ways (with and without index) and com-
in real-world data lakes can reach millions [42]. Thus, we can pared query execution time in different modes.
expect a reduction of query cost by order of magnitude.
Our solution imposes overhead in the following two dimen- 5.1 Index Creation Evaluation
sions: We built an index on two data lake columns:
• Storage: Additional storage required for our index has an • record-id (Long): unique identifier of each record in
upper limit of |𝐷 | (in case that all columns are indexed). the data lake (distinct count - 605,539,843)
In a more realistic scenario, when only a few of the data • event-id (String) : unique identifier of each event in
lake columns are indexed, storage overhead is expected to the data lake (distinct count - 99,765,289), each event-id is
be much lower than |𝐷 |. For example, in our experimental mapped to 6 record-ids on average.
evaluation an index on unique column requires around For index creation, we used Apache Spark (2.4.4) running on
5% of the data lake size (exact numbers are presented in AWS EMR (5.29.0). EMR cluster had 15 nodes of type r5d.2xlarge
Section 5.1) (8 cores, 64GB memory, 300GB SSD storage). Our index was
• CreateIndex/UpdateIndex: Algorithms 1 and 2 are sup- stored in AWS S3 as a collection of Parquet files compressed with
posed to run offline without affecting query performance. Snappy and with a chunk size of 10 MB; root-index was stored in
Their cost is dominated by the cost of reading relevant a single line-delimited JSON file. The key-value index was stored
Table 6: Index creation results approach is an order of magnitude faster than those based on a
key-value store. In addition, in our approach, the storage size of
column name our index (S3) Cassandra index the index is much smaller for a unique column and more or less
compute time: the same for a non-unique column.
compute time:
3 min
4 hours, 23 minutes
record-id (pk)
storage size:
1 root file - 5.5 KB
storage size: 5.2 Index Usage Evaluation
13.2 GB For index usage evaluation, we performed the following test for
30 index files - 3.1 GB
compute time: the indexed columns (record-id, event-id):
compute time:
3 min
47 min
event-id (1) get 10 random column values from the data lake
storage size:
storage size: (2) for each column-value, do
1 root file - 5.3 KB
4.4 GB (a) run "select * from data-lake where column = column-
30 index files - 5.2 GB
value"
(b) print query execution time
in Apache Cassandra (3.4.4) running on AWS EC2 (3 nodes of
type i3.xlarge, a replication factor of 1, LZ4 compression).
Experimental results of index creation are presented in Table We executed this test in the following modes and compared
6 and Figures 5, 6. their results.
Our Index (S3)
Cassandra Index (1) SparkSQL with our index (Algorithm 3)
Index creation time (minutes)
(2) SparkSQL with Cassandra index
102
(3) SparkSQL without index
(4) AWS Athena [3] without index
101
SparkSQL was executed on the same cluster as in the previ-
ous section (AWS EMR with 15 nodes). AWS Athena (Amazon
serverless query engine based on Presto [42]) ran on AWS Glue
[7] table created on top of the data lake Parquet files. The average
results of the 10 runs for each one of the modes are presented in
record-id event-id Fig. 7.
Columns
Experimental results confirm our performance analysis in
Section 4, as it clearly can be seen that our solution (as well as
Figure 5: Index Creation Time
Cassandra-based) outperforms existing query engines (Spark and
Athena) by order of magnitude.
To better understand the trade-offs between our solution and
14 Cassandra-based, we also compared their execution time of cal-
Our Index (S3)
Cassandra Index culating 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄). The results are presented in Fig. 8. We eval-
12
uated two versions of our approach, with and without caching of
Index storage size (GB)
the root index (see Section 3.4). Cassandra latency is lower than
10
that of our approach, but when using a cache, the difference is
not significant (less than a second).
8
To summarize, as expected, adding indexing to data lakes dras-
tically improves queries performance. In this evaluation, we have
6
demonstrated that index can be implemented in two different
ways - a trivial approach based on key-value stores and our ap-
4
proach based on cloud object stores. Both approaches improve
queries’ execution time in an almost identical way (the difference
record-id event-id is in milliseconds resolution). However, our approach has the
Columns following advantages over the naive solution:
Figure 6: Index Creation Storage
(1) order of magnitude monetary cost improvement (Table 2)
It is worth noting, that even though our primary reason to (2) order of magnitude index creation time improvement (Fig.
implement the data lake index on top of object stores was moti- 5)
vated by reducing the monetary cost, our experiments provide (3) significantly less storage overhead in some cases (Fig. 6)
an additional benefit of this decision. Index creation time of our
80 Our Index (S3) At a high level, our solution resembles a well-known inverted
Cassandra Index index structure which is extensively used in full-text search en-
Query execution time (seconds)
No Index Spark gines [14, 20, 40, 48]. Inverted index maps words to the lists of
60 No Index Athena documents that contain them, and for data sets of the moderate
size, it can be implemented with the standard index structures as
a B+ tree or a hash index [40]. For "big data" volumes, distributed
40
inverted indexes were proposed [48]. Two main techniques used
in distributed inverted indexing are document-based [14] and
20 term-based [20]. Both these techniques assume combined com-
pute and storage layers, while in our system model, the main
assumption is that compute and storage are separated.
0 In [27], the authors study inverted indexes in dataspaces ("col-
record-id event-id lections of heterogeneous and partially unstructured data"). Their
Columns main focus is on the heterogeneity of data and very large indexes
are not considered (index implementation is based on a single-
Figure 7: Data Lake Queries Latency node Lucene engine [15]); in our solution, the focus is on struc-
tured data, yet we expect very large volumes of both data and
indexes. In [26], indexing in big data is achieved by enhancing
the MapReduce load process via user-defined functions (UDF).
However, this solution is limited to a particular system (Hadoop),
Our Index (S3) where compute (MapReduce) and storage (HDFS) components
2,000 Our Index + Cache (S3)
are running on the same machines; our focus is on disaggregated
Get files execution time (ms)
Cassandra Index
architecture. System model similar to ours is considered in [19],
1,500
as well as in multiple industry approaches (e.g. [10]), however
in these approaches index is stored in a key-value database, and
as we show in this work it has two severe drawbacks comparing
1,000 to our solution - significantly higher monetary cost and much
longer load time.
500 To the best of our knowledge, our work is the first research
attempt to improve queries performance in cloud data lakes by
building indexing on top of object stores. However, two indus-
0 try projects focus on the same domain - Microsoft Hyperspace
record-id event-id [39] and Databricks Delta Lake [23]. Both assume data lakes
Columns
architecture and try to improve queries performance by using
indexes stored in object stores. Below we briefly discuss how
Figure 8: Get Files Latency
these projects differ from our approach based on the available
online information about these systems [23, 39].
6 RELATED WORK
Data lakes are a relatively new approach for storing and ana-
lyzing large amounts of data. As a new technology, it still lacks • Hyperspace currently uses only "covering indexes" and
a formal definition, and in existing literature, different authors plan to support other index types in the future. Covering
assign different (and sometimes even contradictory) meanings indexes are built upon user-provided columns lists "in-
to this concept [29, 41]. In this paper, we stick to the definition dexed columns" and "data columns", and aim to improve
from the recent Seattle Report on Database Research [2], where the performance of the queries that search by "indexed
the main characteristic of the data lakes architecture is that it columns" and retrieve "data columns". The implementation
"disaggregates compute and storage, so they can scale indepen- is based on copying both "indexed" and "data" columns
dently." from the data lake and storing them in a columnar format
There are several research challenges in the data lakes domain. sorted by "indexed" columns. While covering indexes can
The most fundamental ones are knowledge discovery [30] and improve some types of queries, they are not suitable for
various optimizations [2]. We focus on the latter and study opti- "needle in a haystack" queries where retrieval of all the
mization of a specific scenario in data lake systems - "needle in a columns should be supported (e.g. for GDPR scenario).
haystack" queries. Our solution can be seen as a combination of • Delta Lake uses Bloom filter indexes in the following
different techniques from databases and big data domains. Thus, way. Each file has an associated file that contains a Bloom
we use partitions as in [18] to organize our index according to filter containing values of indexed columns from this file.
columns. For scalability, we use root-index as in B+ trees [40] and Then, upon a user query, Bloom filters are checked before
some distributed databases (e.g. [22]). The format of our index reading the files, and the file is read only if the value was
files is similar to columnar formats [28] with multiple chunks found in the corresponding Bloom filter. We use a similar
of data and a single metadata section. We use Bloom filters [17] technique (Section 3.4), but in our case, only a single Bloom
to improve the performance of queries on non-existing values. filter will be read for a particular value, while in Delta Lake
Parallel processing paradigm [25, 47] is used for index creation all existing Bloom filters should be read.
and updating.
7 CONCLUSION [18] Jesús Camacho-Rodríguez, Ashutosh Chauhan, Alan Gates, Eugene Koifman,
Owen O’Malley, Vineet Garg, Zoltan Haindrich, Sergey Shelukhin, Prasanth
In recent years, cloud data lakes have become a prevalent way of Jayachandran, Siddharth Seth, et al. 2019. Apache hive: From mapreduce to
storing large amounts of data. The main implication is a separa- enterprise-grade big data warehousing. In Proceedings of the 2019 International
Conference on Management of Data (SIGMOD). 1773–1786.
tion between the storage and compute layers, and as a result, a [19] Jesús Camacho-Rodríguez, Dario Colazzo, and Ioana Manolescu. 2013. Web
rise of new technologies in both compute and storage domains. data indexing in the cloud: efficiency and cost reductions. In Proceedings of
Query engines, the main player in the compute domain, are de- the 16th International Conference on Extending Database Technology (EDBT).
41–52.
signed to run on in-place data that is usually stored in a columnar [20] B Barla Cambazoglu et al. 2013. A term-based inverted index partitioning
format. Their main focus is on analytical queries while random model for efficient distributed query processing. ACM Transactions on the
reads are overlooked. Web (TWEB) 7, 3 (2013), 1–23.
[21] Samy Chambi et al. 2016. Better bitmap performance with roaring bitmaps.
In this paper, we present our approach for optimizing needle Software: practice and experience 46, 5 (2016), 709–719.
in a haystack queries in cloud data lakes. The main idea is to [22] Fay Chang et al. 2008. Bigtable: A distributed storage system for structured
data. ACM Transactions on Computer Systems (TOCS) 26, 2 (2008), 1–26.
maintain an index structure that maps indexed column values [23] Databricks. 2020. Delta Lake. Retrieved November 14, 2020 from https:
to their files. We built our solution in accordance with data lake //docs.databricks.com/delta
architecture, where the storage is completely separated from the [24] Databricks. 2020. Top 5 Reasons for Choosing S3 over HDFS. Re-
trieved November 14, 2020 from https://databricks.com/blog/2017/05/31/
compute. The reason for building our index on top of object top-5-reasons-for-choosing-s3-over-hdfs.html
stores was motivated by the observation that the monetary cost [25] Jeffrey Dean and Sanjay Ghemawat. 2008. MapReduce: simplified data pro-
of this solution is significantly lower than the one based on DBMS. cessing on large clusters. Commun. ACM 51, 1 (2008), 107–113.
[26] Jens Dittrich et al. 2010. Hadoop++ making a yellow elephant run like a
Our experimental evaluation provides an additional reason to cheetah (without it even noticing). Proceedings of the VLDB Endowment 3, 1-2
favor object stores over the DMBS approach - much lower index (2010), 515–529.
[27] Xin Dong and Alon Halevy. 2007. Indexing dataspaces. In Proceedings of the
computation time. 2007 ACM SIGMOD international conference on Management of data (SIGMOD).
For future work, we are planning to extend our index scheme 43–54.
and support more complex query types (e.g. joins, group by). [28] Avrilia Floratou et al. 2014. Sql-on-hadoop: Full circle back to shared-nothing
database architectures. Proceedings of the VLDB Endowment 7, 12 (2014),
An additional research direction may be an implementation of 1295–1306.
different systems according to data lake architecture and thereby [29] Corinna Giebler et al. 2019. Leveraging the Data Lake: Current State and
improving their monetary cost and load time. For example, im- Challenges. In International Conference on Big Data Analytics and Knowledge
Discovery (DaWaK). Springer, 179–188.
plementing a key-value store on top of object stores seems to be [30] Paolo Giudice et al. 2019. An approach to extracting complex knowledge
a promising research direction which can be based on the ideas patterns among concepts belonging to structured, semi-structured and un-
structured sources in a data lake. Information Sciences 478 (2019), 606–626.
presented in this work. [31] Google. 2020. Google Cloud Storage. Retrieved November 14, 2020 from
https://cloud.google.com/storage
REFERENCES [32] Michael Hausenblas and Jacques Nadeau. 2013. Apache drill: interactive
ad-hoc analysis at scale. Big Data 1, 2 (2013), 100–104.
[1] Daniel Abadi et al. 2015. SQL-on-hadoop systems: tutorial. Proceedings of the [33] Marcel Kornacker et al. 2015. Impala: A Modern, Open-Source SQL Engine
VLDB Endowment 8, 12 (2015), 2050–2051. for Hadoop.. In Proceedings of the 7th Conference on Innovative Data Systems
[2] Daniel Abadi et al. 2020. The Seattle Report on Database Research. ACM Research (CIDR), Vol. 1. 9.
SIGMOD Record 48, 4 (2020), 44–53. [34] Avinash Lakshman and Prashant Malik. 2010. Cassandra: a decentralized
[3] Amazon. 2020. Amazon Athena. Retrieved November 14, 2020 from https: structured storage system. ACM SIGOPS Operating Systems Review 44, 2
//aws.amazon.com/athena (2010), 35–40.
[4] Amazon. 2020. Amazon DynamoDB On-Demand Pricing. Retrieved November [35] Daniel Lemire and Leonid Boytsov. 2015. Decoding billions of integers per
14, 2020 from https://aws.amazon.com/dynamodb/pricing/on-demand/ second through vectorization. Software: Practice and Experience 45, 1 (2015),
[5] Amazon. 2020. Amazon EC2 On-Demand Pricing. Retrieved November 14, 1–29.
2020 from https://aws.amazon.com/ec2/pricing/on-demand/ [36] Sergey Melnik et al. 2011. Dremel: interactive analysis of web-scale datasets.
[6] Amazon. 2020. Amazon S3 pricing. Retrieved November 14, 2020 from Commun. ACM 54, 6 (2011), 114–123.
https://aws.amazon.com/s3/pricing/ [37] Microsoft. 2020. Azure Blob Storage. Retrieved November 14, 2020 from
[7] Amazon. 2020. AWS Glue. Retrieved November 14, 2020 from https://aws. https://azure.microsoft.com/services/storage/blobs/
amazon.com/glue [38] Microsoft. 2020. Azure subscription and service limits, quotas, and constraints.
[8] Amazon. 2020. Best Practices for Amazon EMR. Retrieved November 14, 2020 Retrieved November 14, 2020 from https://docs.microsoft.com/en-us/azure/
from https://d0.awsstatic.com/whitepapers/aws-amazon-emr-best-practices. azure-resource-manager/management/azure-subscription-service-limits
pdf [39] Microsoft. 2020. Hyperspace. Retrieved November 14, 2020 from https:
[9] Amazon. 2020. Best Practices for Running Apache Cassandra on Amazon EC2. //microsoft.github.io/hyperspace
Retrieved November 14, 2020 from https://aws.amazon.com/blogs/big-data/ [40] Raghu Ramakrishnan and Johannes Gehrke. 2000. Database management
best-practices-for-running-apache-cassandra-on-amazon-ec2/ systems. McGraw-Hill.
[10] Amazon. 2020. Building and Maintaining an Amazon [41] Franck Ravat and Yan Zhao. 2019. Data lakes: Trends and perspectives. In
S3 Metadata Index without Servers. Retrieved Novem- International Conference on Database and Expert Systems Applications (DEXA).
ber 14, 2020 from https://aws.amazon.com/blogs/big-data/ Springer, 304–313.
building-and-maintaining-an-amazon-s3-metadata-index-without-servers/ [42] Raghav Sethi, Martin Traverso, Dain Sundstrom, David Phillips, Wenlei Xie,
[11] Amazon. 2020. DynamoDB. Retrieved November 14, 2020 from https://aws. Yutian Sun, Nezih Yegitbasi, Haozhun Jin, Eric Hwang, Nileema Shingte, et al.
amazon.com/dynamodb/ 2019. Presto: SQL on everything. In Proceedings of the 35th International
[12] Amazon. 2020. S3. Retrieved November 14, 2020 from https://aws.amazon. Conference on Data Engineering (ICDE). IEEE, 1802–1813.
com/s3/ [43] Konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler.
[13] Michael Armbrust, Reynold S Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K 2010. The hadoop distributed file system. In Proceedings of the 26th Symposium
Bradley, Xiangrui Meng, Tomer Kaftan, Michael J Franklin, Ali Ghodsi, et al. on Mass Storage Systems and Technologies (MSST). IEEE, 1–10.
2015. Spark sql: Relational data processing in spark. In Proceedings of the 2015 [44] Junjay Tan et al. 2019. Choosing a cloud DBMS: architectures and tradeoffs.
International Conference on Management of Data (SIGMOD). 1383–1394. Proceedings of the VLDB Endowment 12, 12 (2019), 2170–2182.
[14] Luiz André Barroso, Jeffrey Dean, and Urs Holzle. 2003. Web search for a [45] IT Governance Privacy Team. 2020. EU General Data Protection Regulation
planet: The Google cluster architecture. IEEE micro 23, 2 (2003), 22–28. (GDPR)–An implementation and compliance guide. IT Governance Ltd.
[15] Andrzej Białecki, Robert Muir, Grant Ingersoll, and Lucid Imagination. 2012. [46] Grisha Weintraub. 2020. data-lake-index. Retrieved November 14, 2020 from
Apache lucene 4. In SIGIR 2012 workshop on open source information retrieval. https://github.com/grishaw/data-lake-index
17. [47] Matei Zaharia et al. 2016. Apache spark: a unified engine for big data process-
[16] Spyros Blanas, Jignesh M Patel, Vuk Ercegovac, Jun Rao, Eugene J Shekita, and ing. Commun. ACM 59, 11 (2016), 56–65.
Yuanyuan Tian. 2010. A comparison of join algorithms for log processing in [48] Justin Zobel and Alistair Moffat. 2006. Inverted files for text search engines.
mapreduce. In Proceedings of the 2010 ACM SIGMOD International Conference ACM computing surveys (CSUR) 38, 2 (2006), 6–es.
on Management of data (SIGMOD). 975–986.
[17] B. H. Bloom. 1970. Space/time trade-offs in hash coding with allowable errors.
Commun. ACM 13, 7 (1970), 422–426.