=Paper= {{Paper |id=Vol-2841/DARLI-AP_1 |storemode=property |title=Needle in a haystack queries in cloud data lakes |pdfUrl=https://ceur-ws.org/Vol-2841/DARLI-AP_1.pdf |volume=Vol-2841 |authors=Grisha Weintraub,Ehud Gudes,Shlomi Dolev |dblpUrl=https://dblp.org/rec/conf/edbt/WeintraubGD21 }} ==Needle in a haystack queries in cloud data lakes== https://ceur-ws.org/Vol-2841/DARLI-AP_1.pdf
                    Needle in a haystack queries in cloud data lakes
               Grisha Weintraub                                              Ehud Gudes                                   Shlomi Dolev
         grishaw@post.bgu.ac.il                                      ehud@cs.bgu.ac.il                              dolev@cs.bgu.ac.il
    Ben-Gurion University of the Negev                       Ben-Gurion University of the Negev              Ben-Gurion University of the Negev
            Beer-Sheva, Israel                                       Beer-Sheva, Israel                              Beer-Sheva, Israel
ABSTRACT                                                                               on top of data lakes. Hive [18], originally built at Facebook, was
Cloud data lakes are a modern approach for storing large amounts                       the first such system. In Hive, SQL-like queries are compiled
of data in a convenient and inexpensive way. Query engines (e.g.                       into a series of map-reduce jobs and are executed on MapReduce
Hive, Presto, SparkSQL) are used to run SQL queries on data                            [25] framework. A somewhat similar approach is implemented
lakes. Their main focus is on analytical queries while random                          in Spark SQL [13], where SQL queries are running on top of the
reads are overlooked. In this paper, we present our approach for                       Spark engine [47].
optimizing needle in a haystack queries in cloud data lakes. The                           Another group of query engines do not use general-purpose
main idea is to maintain an index structure that maps indexed                          frameworks such as MapReduce or Spark but rely on their own
column values to their files. According to our analysis and exper-                     engines, built on the ideas from parallel databases. The first such
imental evaluation, our solution imposes a reasonable storage                          system was Google’s Dremel [36], which inspired many modern
overhead while providing an order of magnitude performance                             query engines; most well-known are Drill [32], Presto [42] and
improvement.                                                                           Impala [33].
                                                                                           The following techniques are commonly associated with the
                                                                                       query engines running on data lakes:
1    INTRODUCTION
                                                                                             • in-situ processing: In-situ means that the data is ac-
Data lakes are a relatively new concept, which has recently re-
                                                                                               cessed in-place (i.e. in HDFS or cloud object store), and
ceived much attention from both the research community and
                                                                                               there is no need to load the data into the database man-
industry [2, 29, 41]. There is no formal definition of the term data
                                                                                               agement system (DBMS). Thanks to this feature, the same
lake, but it is commonly described as a centralized repository con-
                                                                                               data lake can be queried simultaneously by different query
taining very large amounts of data in their original (or minimally
                                                                                               engines without any difficulties.
processed) format. The main idea behind this approach is that in-
                                                                                             • parallel processing: The data is accessed in parallel
stead of loading the data into traditional data warehouses, which
                                                                                               (usually by a cluster of dedicated machines).
require scrupulous schema design, and a high development and
                                                                                             • columnar storage: The main idea is that instead of stor-
maintenance cost, the data is simply streamed into a distributed
                                                                                               ing the data row by row, as is customary in relational
storage system (e.g. HDFS [43]), and from that moment becomes
                                                                                               databases, the data is stored column by column. The most
available for analytics. Cloud data lakes are managed by cloud
                                                                                               important benefits of such column-wise storage are fast
providers and store their data in cloud object stores, as AWS S3
                                                                                               projection queries and efficient column-specific compres-
[12], Google Cloud Storage [31], and Azure Blob Storage [37].
                                                                                               sion and encoding techniques. Some of the popular open
For analytics, compute engines (e.g. Spark [47], MapReduce [25])
                                                                                               source formats are ORC and Parquet [28].
are used "on-demand". They access cloud data lakes via simple
                                                                                             • data partitioning: The idea is simple and yet powerful.
get/put API over the network, and this separation of storage and
                                                                                               The data is partitioned horizontally based on some of the
compute layers results in a lower cost and allows independent
                                                                                               input columns. Then, files related to each partition are
scaling of each layer (Fig. 1).
                                                                                               stored in a different directory in the storage system (see
                                                                                               an example below).
                                                                                           Let us demonstrate how these techniques look in a typical
                                                                                       real-world scenario. Consider a large enterprise company receiv-
                                                                                       ing a lot of events (e.g. from IoT sensors) and stores them in a
                                                                                       data lake. The data lake would look like a collection of files (Fig.
                                                                                       2) stored in some distributed storage system. In our example,
                                                                                       the data lake is partitioned by year and month columns, so for
                                                                                       example events from March 2020 are located in the path data-
                                                                                       lake/year:2020/month:03/. Events are stored in a columnar format
                                                                                       Parquet, so queries on specific columns would scan only relevant
                                                                                       chunks of data. When a query engine executes a query, it scans
                                                                                       all the relevant files in parallel and returns the result to the client.
                                                                                           Data lakes and query engines are optimized for analytic queries
                 Figure 1: Data Lakes Architecture
                                                                                       and a typical query calculates aggregate values (e.g. sum, max,
                                                                                       min) over a specific partition. For such queries, all the files under
   A plethora of new systems, often referred to as query engines                       specified partition should be scanned.
[1], have been developed in recent years to support SQL queries                            However, sometimes users may be merely interested in find-
© 2021 Copyright for this paper by its author(s). Published in the Workshop Proceed-   ing a specific record in the data lake (a.k.a needle in a haystack).
ings of the EDBT/ICDT 2021 Joint Conference (March 23–26, 2021, Nicosia, Cyprus)       Consider for example a scenario where a specific event should
on CEUR-WS.org. Use permitted under Creative Commons License Attribution 4.0
International (CC BY 4.0)                                                              be fetched from the lake based on its unique identifier for trou-
                                                                                       bleshooting or business flow analysis. Another important use
                  data-lake                                                              Table 1: Index content example
                      year:2020
                                                                                  Column Name       Column Value       File Names
                           month:02
                                                                                     event-id             207          file051, file033
                                file201.parquet                                      event-id             350          file002
                                                                                     event-id             418          file170, file034
                                file170.parquet
                                                                                         ...               ...         ...
                           month:03                                                  client-ip        192.168.1.15     file170
                                                                                     client-ip        172.16.254.1     file883, file051
                                file051.parquet

                      year:2019                                           approach to perform query 𝑄 is to scan all 𝐹 files in parallel,
                           month:...                                      which is wasteful as only 𝐹 (𝑄) files should be scanned and in
                              ...                                         many cases |𝐹 | >> |𝐹 (𝑄)|. If we knew 𝐹 (𝑄) for a given 𝑄 we
                                                                          would read only relevant files and so will be able to significantly
        Figure 2: Data lake layout in storage system                      improve query performance. Thus the problem we are trying
                                                                          to solve in this paper can be defined as – “construct a function
                                                                          𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) that receives a query 𝑄 and returns 𝐹 (𝑄) - the list
case is managing the General Data Protection Regulation (GDPR)            of files that contain tuples that satisfy 𝑄”.
compliance [45], where records related to a specific user should
be found in the data lake upon user request. Unfortunately, even
when the requested information resides in a single file, query
engines will perform a scan of all the files in the data lake (can be
millions). Obviously, this behavior results in a long query latency
and an expensive computation cost.
   In this paper, we present our approach for optimizing "needle
in a haystack" queries in cloud data lakes. The main idea is to
build an index structure that maps indexed column values to
the files that contain those values. We assume enormous data
volumes in cloud data lakes, and hence, we need our index to be
highly scalable in both compute and storage sense. For scalable                               Figure 3: System model
compute, we utilize parallel processing paradigm [25, 47]; for
scalable storage, we design a new storage format that allows                  Fig. 3 presents a high-level diagram of the considered system
unlimited scaling while minimizing the number of network op-              model.
erations during reads.                                                        (1) A client submits a query to the query engine
   Our main contributions are as follows:                                     (2) A query engine queries the index to get relevant file names
     • Development of a novel index structure that allows speed-              (3) A query engine reads relevant files from the data lake
        ing up random queries in cloud data lakes.                            (4) A query engine computes query result and returns it to
     • A prototype implementation of our solution and its exper-                   the client
        imental evaluation in a real-world cloud data lake.                   An obvious approach to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) is to create
   The rest of the paper is structured as follows. In Section 2, we       a simple inverted index [40] that stores a mapping between the
formally define the problem. In section 3, we describe the pro-           values of relevant columns and the files that contain these values
posed solution. In section 4, we provide a performance analysis           (Table 1). An open question is: what is the best way to do that
of our solution. In Section 5, we introduce our proof-of-concept          in a big data environment. More formally, we are looking for a
implementation and provide an experimental evaluation thereof.            solution that will satisfy the following constraints:
In Section 6, we review related work. We conclude in Section 7.               (1) scalability: we want our index to be able to handle very
                                                                                   large and growing amounts of data
2    PROBLEM STATEMENT                                                        (2) performance: we need our index to be able to respond to
We model a data lake as a set of tuples 𝐷 = {< 𝑐 1 : 𝑣 1, 𝑐 2 :                    the lookup queries in a reasonable time (optimally sub-
𝑣 2, ..., 𝑐𝑛 : 𝑣𝑛 >| ∀1 ≤ 𝑖 ≤ 𝑛, 𝑐𝑖 ∈ 𝐶, 𝑣𝑖 ∈ 𝑉 } where 𝐶 is the set of            seconds)
column names and 𝑉 is a set of column values (see notation in                 (3) cost: we want to minimize the monetary cost of our solu-
Table 5). Data lake 𝐷 is stored in a distributed storage system as                 tion as much as possible
a collection of 𝑁 files, denoted by 𝐹 = {𝑓1, 𝑓2, ..., 𝑓𝑁 }.                   The trivial solution would be to store our index in a distributed
    Query engines support SQL queries on data lakes. In this paper        NoSQL database (e.g Cassandra [34], DynamoDB [11]) and up-
our focus is on simple selection queries of type:                         date it with parallel compute engines (e.g Spark, MapReduce).
                                                                          However, while this approach may satisfy the first two of our
               SELECT C1, C2, ...                                         requirements (scalability and performance), its monetary cost
               FROM DATA-LAKE                                             is far from being optimal. Let us show why by using some real
               WHERE SOME-COLUMN = SOME-VALUE                             numbers.
   Files that contain tuples that satisfy query 𝑄 are denoted by              We have two main alternatives for using a database service in
𝐹 (𝑄). As already mentioned in the previous section, the current          the cloud environment: IaaS and DaaS.
                 Table 2: Storage cost comparison                                                            index
                                                                                                                  col:client-ip
                             cost of 1TB/year           pricing info references
 Cassandra on EC2                 16,164 $                       [5, 9]                                               index03
    DynamoDB                       2,925 $                        [4]                                                 index07
        S3                          276 $                         [6]
                                                                                                                  col:event-id
                                                                                                                      index13
   (1) IaaS: "Infrastructure-as-a-Service" model provides users
                                                                                                                      index23
       with the requested number of virtual machines where they
       can install DBMS and use it as they wish (e.g. Cassandra                                                   col:...
       on top of EC2).                                                                                               ...
   (2) DaaS: "Database-as-a-Service" model provides users with
       database API without the need for managing hardware or                                    Figure 4: Index storage layout example
       software (e.g. DynamoDB).
   In Table 2 1 , we compare the monetary cost of storing data in
each one of these alternatives against storing it in a cloud object
store (similarly to how the data is stored in cloud data lakes). It
                                                                                       3     OUR APPROACH
clearly can be seen that an object store is at least one order of                      In sections 3.1, 3.2, and 3.3 below, we explain how exactly we
magnitude lower-priced than any other option. Since our focus is                       store, compute, and use our index. In section 3.4, we discuss
on real-world cloud data lakes (i.e. petabytes and beyond scale),                      potential optimizations to our basic scheme.
we need to support indexes of very large volumes as well (index
on a unique column of data lake 𝐷 will need 𝑂 (|𝐷 |) storage).                         3.1    Index Storage
Thus, potential monetary savings of implementing indexing by                           We store our index as a collection of files partitioned by an in-
using object stores, instead of the naive solution based on key-                       dexed column name (Fig. 4). Each index file is responsible for
value stores, can be tremendous.                                                       some range of column values and stores relevant information
   This observation is not new, and object stores superiority                          about this range in a format depicted in Table 3.
in cost is a well-known fact in both industry [24] and research                            There are two sections in the index file - data and metadata.
community [44]. However, object stores are not suitable for every                      The data part consists of a map between a column value and a
case. More specifically, when comparing them to key-value stores,                      list of corresponding file names ordered by a column value. The
they have the following caveats:                                                       map is divided into 𝐾 chunks based on the predefined number of
   (1) higher latency : the actual numbers may vary, but usu-                          values per chunk 𝑀, such that each of the 𝐾 chunks contains a
       ally object stores have higher latency than DBMS [44]                           mapping for the 𝑀 consecutive values of the particular indexed
   (2) limited requests rate : while requests rate is nearly                           column. For example, in the index presented in Table 3, we have
       unlimited in scalable databases, in object stores there is an                   𝑀 = 3, and the first chunk contains a mapping between 3 column
       upper limit (typically around thousands per second [38])                        values (207, 350, 418) and the file names that contain tuples with
   (3) optimized for large objects : object stores are op-                             these values.
       timized for storing large objects (typically GB’s [8]) and                          Metadata stores information about the location of data chunks
       hence are not suitable for systems that need random reads                       inside the index file along with their "border" values. Considering
   (4) no random writes : the only way to mutate objects in                            the example in Table 3 again, from the metadata section we learn
       object stores is to replace them, which may be problematic                      that the firsts chunk’s minimal value is 207, the maximal value is
       for systems that need random writes                                             418, and its location in the file is in offsets 0:2048.
                                                                                           For very big data lakes, we will have many index files and then
   While the first two points ("higher latency" and "limited re-
                                                                                       will start experiencing the same problem we are trying to solve
quests rate") are usually acceptable in OLAP systems, the last
                                                                                       in data lakes (many irrelevant files are accessed). We solve this
two ("large objects" and "no random writes") introduce signifi-
                                                                                       issue by managing an index of indexes (i.e. root index) - a single
cant implementation challenges. In our solution, we address both
                                                                                       file that stores a mapping between the index file name and its
these challenges.
                                                                                       border values (Table 4). In Section 3.3 below, we will explain how
   To summarize, in this work, our research goal is twofold:
                                                                                       the root index and metadata sections are used by the index users.
     • to implement 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) and thereby improving random
       reads performance in cloud data lakes                                           3.2    Index Computation
     • to build our solution on top of object stores so the addi-
                                                                                       Since data lakes store enormous volumes of data, our index will
       tional monetary cost imposed by our scheme will be as
                                                                                       be very large as well. To be able to compute such an index in a
       low as possible
                                                                                       reasonable time we would like to be able to compute it in parallel
                                                                                       by using common parallel compute engines like MapReduce [25]
1 For simplicity, we ignore some factors that cannot change the general trend, like    and Spark [47].
the maintenance cost of Cassandra, the cost of requests in S3 and DynamoDB,               Fortunately, we can easily do that by a simple map-reduce flow.
and the fact the data in object stores can be compressed. Our goal is only to show     Pseudo-code (in Spark-like syntax) of the algorithm for creating
that object stores are significantly cheaper than the other options. We also provide
numbers for specific systems (Cassandra, DynamoDB, AWS), but the same cost             an index on a specific column in a given data lake is presented in
trend is preserved in other similar systems as well.                                   Algorithm 1.
                  Table 3: Index file example                                Upon receiving a "needle in a haystack" query 𝑄 and a path to
                                                                         index location, we first extract the column name and the column
         Data          207    file051, file033                           value from the "where" clause of the query (line 2). We then read
                       350    file002                                    the root index and find to which index file the given value belongs
                       418    file170, file034                           (lines 3-4). Once we know what index file has information about
                       513    file0002                                   our value, we perform the following:
                       680    file443, file001, file033
                                                                             • we read the metadata part of the file
                       799    file883
                                                                             • we find to which data chunk our value belongs based on
                       ...    ...
                                                                               the min/max values.
         Metadata      1      min:207|max:418|offset:0                       • once found to which chunk our value belongs, we read
                       2      min:513|max:799|offset:2048                      the (column value, List) map from the specified
                       ...    ...                                              offset
                                                                             • if our value appears in the map we are done and the
                 Table 4: Root index example                                   method returns the file names list
                                                                             • otherwise we know that requested value is not in the index
        Column Name          Min    Max    Index File Name                     so we exit with a relevant flag
            event-id         207    560        index13                     Note that Algorithm 3 does not require parallel processing
            event-id         590    897        index23                   and can run as a standalone function.
               ...            ...    ...          ...
                                                                         Algorithm 1 Create Index
    In the map phase, each worker emits (col-value, file-name)            1: procedure CreateIndex(datalake-path, index-path, col-name)
pairs and in reduce phase map results are grouped into (col-              2:    datalake = read(datalake-path)
                                                                          3:    index = datalake.groupBy(col-name).agg(collect("f-name"))
value, List) map. Then, the result is written into the
                                                                          4:    index-sorted = index.orderBy(col-name)
distributed storage in our custom format (presented in Table 3
                                                                          5:    index-sorted.write.format("my.index").save(index-path)
and denoted in the pseudo-code as my.index). Finally, we create           6:    create-root-index(index-path)
the root index file by scanning metadata sections of all the created      7: end procedure
index files and taking minimum and maximum values from each
file.
    Algorithm 1 creates an index for an existing data lake (i.e.
static scenario). In a dynamic scenario, bulks of new files are
being added to the lake periodically during scheduled Extract-           Algorithm 2 Update Index
Transform-Load (ETL) processes. To support a dynamic scenario,            1: procedure UpdateIndex(path-to-new-files, index-path, col-name)
we should be able to update our index in accordance with the              2:    ranges = getIndexRanges(index-path, col-name)
new data.                                                                 3:    delta = read(path-to-new-files).select(col-name, "f-name")
    Our algorithm for index update, presented in Spark-like syntax        4:    labeled-delta = delta.join(broadcast(ranges))
in Algorithm 2, runs in parallel and tries to modify as few existing      5:    old-index = read(labeled-delta.select(index-file-name))
index files as possible. It works like that:                              6:    new-index = labeled-delta.union(old-index)
                                                                          7:    new-index.repartition(index-file-name)
      • First (line 2), we read the root index to get "ranges" - a set    8:    run CreateIndex on new-index and override old-index files and
        of tuples (min, max, index-file-name) for a given column            the root-index
        (the result may look similar to Table 4).                         9: end procedure
      • Then (lines 3-4), we use broadcast join technique [16] to
        compute "labeled-delta" - values from delta files labeled
        with the corresponding "index-file-name" from "ranges"
        set. Considering the example in Tables 1-4, the result
        would look as a set of triples (event-id, data-file-name,        Algorithm 3 Get Files
        index-file-name); for example, (450, file1111, index13), (645,    1: procedure GetFiles(Q, index-path)
        file1112, index23).                                               2:    col-name, col-value = extract-col-values(Q)
      • Finally (lines 5-8), we use index file names computed in          3:    root-index = readRootIndex(index-path)
        labeled-delta, to identify index files that should be changed.    4:    index-file = root-index.getFile(col-name, col-value)
        We read these files and combine their data with the delta         5:    meta = readMetadata(index-file)
        files. Then, we apply Algorithm 1 on this data and override       6:    chunk-id = meta.getChunkIdByValue(col-value)
                                                                          7:    map = readDataChunk(index-file, chunk-id)
        relevant existing index files and the root index.
                                                                          8:    fileNames = map.get(col-value)
      • Note that we need to re-partition our data by "index-file-        9:    if fileNames is not empty then
        name" (line 7) to ensure that there will not be a mix be-        10:         return fileNames
        tween different index files and each index file will be com-     11:    else
        puted by a different process.                                    12:         return VALUE-NOT-EXISTS
                                                                         13:    end if
3.3    Index Usage                                                       14: end procedure
In this section we explain how to use our index format to imple-
ment 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄) (pseudo-code is presented in Algorithm 3).
3.4     Optimizations                                                                           Table 5: Notation
In this section, we briefly discuss several potential optimizations
that can be done to our basic scheme. We plan to deepen some                    Symbol     Description
of them in our future work.                                                        𝐷       A data lake
      • Bloom filter: In Algorithm 3, we perform two reads                         𝐶       Data lake column names
        from the index file that may contain our queried value                     𝑉       Data lake column values
        even if the value does not exist in the data lake at all. A                𝐹       Data lake files
        possible solution to this issue is adding a Bloom filter [17]              𝑁       Number of files in the data lake
        containing all the column values in the index file to the                  𝐾       Number of data chunks in index file
        metadata section and check it before querying the index                    𝑀       Number of values in a data chunk
        data chunk. This way, in most cases, we will perform a                     𝑄       A query
        single read operation for non-existing value but will have                 𝑅       Read operation from remote storage
        a larger metadata section in each index file.                            𝐹 (𝑄)     Files that contain tuples that satisfy 𝑄
      • Caching: By caching the root index and metadata sections                  |𝑥 |     Size of object 𝑥
        on the query engine side, we can significantly reduce the                 𝐶𝑥       Cost of operation 𝑥
        number of network operations per query.
      • Compression/Encoding: We can apply different encod-
        ing and compression techniques to reduce the storage                    data lake files, creating an index by map-reduce flow, and
        overhead of our index. For example, instead of storing file             writing the index into a distributed storage.
        names lists as strings, we can use compressed bit vectors          In summary, our solution imposes reasonable storage and
        [21]; metadata offsets can be encoded with delta encoding       (offline) computation overhead but improves query performance
        [35].                                                           by order of magnitude (equations 1-3).

4     PERFORMANCE ANALYSIS                                              5     IMPLEMENTATION AND EXPERIMENTAL
In this section, we analyze our solution from a performance                   RESULTS
perspective. We first show how our index improves queries per-          We have implemented a prototype of our solution (Algorithms 1,
formance and then we show what is the performance overhead              3); the implementation and extended results are available on-line
imposed by it.                                                          [46]. For experimental evaluation, we cooperated with a large
   A cost of query 𝑄 is dominated by a cost of read operations          enterprise company that manages cloud data lakes at petabyte
from remote storage. When columnar format is used, irrelevant           scale. We used an anonymized subset of the real data lake for our
columns can be skipped, but at least some data should be read           experiments.
from each file. Thus, by using a notation from Table 5, we can              The data lake we used is stored in AWS S3 cloud storage. The
define a cost of query Q as:                                            data lake properties are as follows:
                           𝐶𝑄 = 𝐶𝑅 · |𝐹 |                        (1)          • 1,242 files in Parquet format (compressed with snappy)
                                                                              • 605,539,843 total records
   When using our index, we read only relevant data files 𝐹 (𝑄)
                                                                              • each record has a unique identifier (8 bytes)
and perform at most three read operations from the index files
                                                                              • 233 columns (mainly strings but also numbers and dates)
(for the root index, metadata, and data chunk). Hence, the cost is
                                                                              • 54.2 GB (total compressed size)
reduced to:
                                                                            Our evaluation focused on the following stages:
                      𝐶𝑄 = 𝐶𝑅 · (|𝐹 (𝑄)| + 3)                 (2)
                                                                            (1) Index Creation: We created an index according to our
    Thus, the performance is improved by:
                                                                                approach (Algorithm 1) and according to a naive approach
                                 |𝐹 |                                           based on a key-value store. Then, we compared index
                                                                 (3)
                             |𝐹 (𝑄)| + 3                                        creation time and its storage size in each of the approaches.
   Since our focus is on "needle in a haystack" queries, we assume          (2) Index Usage: We queried random values from the data
that |𝐹 (𝑄)| is a small number (typically below 100), while |𝐹 |                lake in different ways (with and without index) and com-
in real-world data lakes can reach millions [42]. Thus, we can                  pared query execution time in different modes.
expect a reduction of query cost by order of magnitude.
   Our solution imposes overhead in the following two dimen-            5.1    Index Creation Evaluation
sions:                                                                  We built an index on two data lake columns:
      • Storage: Additional storage required for our index has an            • record-id (Long): unique identifier of each record in
        upper limit of |𝐷 | (in case that all columns are indexed).            the data lake (distinct count - 605,539,843)
        In a more realistic scenario, when only a few of the data            • event-id (String) : unique identifier of each event in
        lake columns are indexed, storage overhead is expected to              the data lake (distinct count - 99,765,289), each event-id is
        be much lower than |𝐷 |. For example, in our experimental              mapped to 6 record-ids on average.
        evaluation an index on unique column requires around               For index creation, we used Apache Spark (2.4.4) running on
        5% of the data lake size (exact numbers are presented in        AWS EMR (5.29.0). EMR cluster had 15 nodes of type r5d.2xlarge
        Section 5.1)                                                    (8 cores, 64GB memory, 300GB SSD storage). Our index was
      • CreateIndex/UpdateIndex: Algorithms 1 and 2 are sup-            stored in AWS S3 as a collection of Parquet files compressed with
        posed to run offline without affecting query performance.       Snappy and with a chunk size of 10 MB; root-index was stored in
        Their cost is dominated by the cost of reading relevant         a single line-delimited JSON file. The key-value index was stored
                                            Table 6: Index creation results                     approach is an order of magnitude faster than those based on a
                                                                                                key-value store. In addition, in our approach, the storage size of
 column name                                       our index (S3)         Cassandra index       the index is much smaller for a unique column and more or less
                                                compute time:                                   the same for a non-unique column.
                                                                          compute time:
                                                3 min
                                                                          4 hours, 23 minutes
  record-id (pk)
                                                storage size:
                                                1 root file - 5.5 KB
                                                                          storage size:         5.2    Index Usage Evaluation
                                                                          13.2 GB               For index usage evaluation, we performed the following test for
                                                30 index files - 3.1 GB
                                                compute time:                                   the indexed columns (record-id, event-id):
                                                                            compute time:
                                                3 min
                                                                            47 min
     event-id                                                                                      (1) get 10 random column values from the data lake
                                                storage size:
                                                                            storage size:          (2) for each column-value, do
                                                1 root file - 5.3 KB
                                                                            4.4 GB                    (a) run "select * from data-lake where column = column-
                                                30 index files - 5.2 GB
                                                                                                          value"
                                                                                                     (b) print query execution time
in Apache Cassandra (3.4.4) running on AWS EC2 (3 nodes of
type i3.xlarge, a replication factor of 1, LZ4 compression).
   Experimental results of index creation are presented in Table                                   We executed this test in the following modes and compared
6 and Figures 5, 6.                                                                             their results.

                                                                           Our Index (S3)
                                                                          Cassandra Index          (1) SparkSQL with our index (Algorithm 3)
     Index creation time (minutes)




                                                                                                   (2) SparkSQL with Cassandra index
                                     102
                                                                                                   (3) SparkSQL without index
                                                                                                   (4) AWS Athena [3] without index



                                     101
                                                                                                   SparkSQL was executed on the same cluster as in the previ-
                                                                                                ous section (AWS EMR with 15 nodes). AWS Athena (Amazon
                                                                                                serverless query engine based on Presto [42]) ran on AWS Glue
                                                                                                [7] table created on top of the data lake Parquet files. The average
                                                                                                results of the 10 runs for each one of the modes are presented in
                                               record-id                     event-id           Fig. 7.
                                                              Columns
                                                                                                   Experimental results confirm our performance analysis in
                                                                                                Section 4, as it clearly can be seen that our solution (as well as
                                            Figure 5: Index Creation Time
                                                                                                Cassandra-based) outperforms existing query engines (Spark and
                                                                                                Athena) by order of magnitude.
                                                                                                   To better understand the trade-offs between our solution and
                                      14                                                        Cassandra-based, we also compared their execution time of cal-
                                                                           Our Index (S3)
                                                                          Cassandra Index       culating 𝑔𝑒𝑡𝐹𝑖𝑙𝑒𝑠 (𝑄). The results are presented in Fig. 8. We eval-
                                      12
                                                                                                uated two versions of our approach, with and without caching of
     Index storage size (GB)




                                                                                                the root index (see Section 3.4). Cassandra latency is lower than
                                      10
                                                                                                that of our approach, but when using a cache, the difference is
                                                                                                not significant (less than a second).
                                       8
                                                                                                   To summarize, as expected, adding indexing to data lakes dras-
                                                                                                tically improves queries performance. In this evaluation, we have
                                       6
                                                                                                demonstrated that index can be implemented in two different
                                                                                                ways - a trivial approach based on key-value stores and our ap-
                                       4
                                                                                                proach based on cloud object stores. Both approaches improve
                                                                                                queries’ execution time in an almost identical way (the difference
                                               record-id                     event-id           is in milliseconds resolution). However, our approach has the
                                                              Columns                           following advantages over the naive solution:

                                           Figure 6: Index Creation Storage
                                                                                                   (1) order of magnitude monetary cost improvement (Table 2)
   It is worth noting, that even though our primary reason to                                      (2) order of magnitude index creation time improvement (Fig.
implement the data lake index on top of object stores was moti-                                        5)
vated by reducing the monetary cost, our experiments provide                                       (3) significantly less storage overhead in some cases (Fig. 6)
an additional benefit of this decision. Index creation time of our
                                        80                       Our Index (S3)                 At a high level, our solution resembles a well-known inverted
                                                                Cassandra Index              index structure which is extensively used in full-text search en-
     Query execution time (seconds)




                                                                No Index Spark               gines [14, 20, 40, 48]. Inverted index maps words to the lists of
                                        60                      No Index Athena              documents that contain them, and for data sets of the moderate
                                                                                             size, it can be implemented with the standard index structures as
                                                                                             a B+ tree or a hash index [40]. For "big data" volumes, distributed
                                        40
                                                                                             inverted indexes were proposed [48]. Two main techniques used
                                                                                             in distributed inverted indexing are document-based [14] and
                                        20                                                   term-based [20]. Both these techniques assume combined com-
                                                                                             pute and storage layers, while in our system model, the main
                                                                                             assumption is that compute and storage are separated.
                                         0                                                      In [27], the authors study inverted indexes in dataspaces ("col-
                                                record-id                         event-id   lections of heterogeneous and partially unstructured data"). Their
                                                                Columns                      main focus is on the heterogeneity of data and very large indexes
                                                                                             are not considered (index implementation is based on a single-
                                          Figure 7: Data Lake Queries Latency                node Lucene engine [15]); in our solution, the focus is on struc-
                                                                                             tured data, yet we expect very large volumes of both data and
                                                                                             indexes. In [26], indexing in big data is achieved by enhancing
                                                                                             the MapReduce load process via user-defined functions (UDF).
                                                                                             However, this solution is limited to a particular system (Hadoop),
                                                            Our Index (S3)                   where compute (MapReduce) and storage (HDFS) components
                                      2,000             Our Index + Cache (S3)
                                                                                             are running on the same machines; our focus is on disaggregated
     Get files execution time (ms)




                                                          Cassandra Index
                                                                                             architecture. System model similar to ours is considered in [19],
                                      1,500
                                                                                             as well as in multiple industry approaches (e.g. [10]), however
                                                                                             in these approaches index is stored in a key-value database, and
                                                                                             as we show in this work it has two severe drawbacks comparing
                                      1,000                                                  to our solution - significantly higher monetary cost and much
                                                                                             longer load time.
                                       500                                                      To the best of our knowledge, our work is the first research
                                                                                             attempt to improve queries performance in cloud data lakes by
                                                                                             building indexing on top of object stores. However, two indus-
                                         0                                                   try projects focus on the same domain - Microsoft Hyperspace
                                                record-id                         event-id   [39] and Databricks Delta Lake [23]. Both assume data lakes
                                                                Columns
                                                                                             architecture and try to improve queries performance by using
                                                                                             indexes stored in object stores. Below we briefly discuss how
                                              Figure 8: Get Files Latency
                                                                                             these projects differ from our approach based on the available
                                                                                             online information about these systems [23, 39].

6   RELATED WORK
Data lakes are a relatively new approach for storing and ana-
lyzing large amounts of data. As a new technology, it still lacks                                • Hyperspace currently uses only "covering indexes" and
a formal definition, and in existing literature, different authors                                 plan to support other index types in the future. Covering
assign different (and sometimes even contradictory) meanings                                       indexes are built upon user-provided columns lists "in-
to this concept [29, 41]. In this paper, we stick to the definition                                dexed columns" and "data columns", and aim to improve
from the recent Seattle Report on Database Research [2], where                                     the performance of the queries that search by "indexed
the main characteristic of the data lakes architecture is that it                                  columns" and retrieve "data columns". The implementation
"disaggregates compute and storage, so they can scale indepen-                                     is based on copying both "indexed" and "data" columns
dently."                                                                                           from the data lake and storing them in a columnar format
    There are several research challenges in the data lakes domain.                                sorted by "indexed" columns. While covering indexes can
The most fundamental ones are knowledge discovery [30] and                                         improve some types of queries, they are not suitable for
various optimizations [2]. We focus on the latter and study opti-                                  "needle in a haystack" queries where retrieval of all the
mization of a specific scenario in data lake systems - "needle in a                                columns should be supported (e.g. for GDPR scenario).
haystack" queries. Our solution can be seen as a combination of                                  • Delta Lake uses Bloom filter indexes in the following
different techniques from databases and big data domains. Thus,                                    way. Each file has an associated file that contains a Bloom
we use partitions as in [18] to organize our index according to                                    filter containing values of indexed columns from this file.
columns. For scalability, we use root-index as in B+ trees [40] and                                Then, upon a user query, Bloom filters are checked before
some distributed databases (e.g. [22]). The format of our index                                    reading the files, and the file is read only if the value was
files is similar to columnar formats [28] with multiple chunks                                     found in the corresponding Bloom filter. We use a similar
of data and a single metadata section. We use Bloom filters [17]                                   technique (Section 3.4), but in our case, only a single Bloom
to improve the performance of queries on non-existing values.                                      filter will be read for a particular value, while in Delta Lake
Parallel processing paradigm [25, 47] is used for index creation                                   all existing Bloom filters should be read.
and updating.
7     CONCLUSION                                                                        [18] Jesús Camacho-Rodríguez, Ashutosh Chauhan, Alan Gates, Eugene Koifman,
                                                                                             Owen O’Malley, Vineet Garg, Zoltan Haindrich, Sergey Shelukhin, Prasanth
In recent years, cloud data lakes have become a prevalent way of                             Jayachandran, Siddharth Seth, et al. 2019. Apache hive: From mapreduce to
storing large amounts of data. The main implication is a separa-                             enterprise-grade big data warehousing. In Proceedings of the 2019 International
                                                                                             Conference on Management of Data (SIGMOD). 1773–1786.
tion between the storage and compute layers, and as a result, a                         [19] Jesús Camacho-Rodríguez, Dario Colazzo, and Ioana Manolescu. 2013. Web
rise of new technologies in both compute and storage domains.                                data indexing in the cloud: efficiency and cost reductions. In Proceedings of
Query engines, the main player in the compute domain, are de-                                the 16th International Conference on Extending Database Technology (EDBT).
                                                                                             41–52.
signed to run on in-place data that is usually stored in a columnar                     [20] B Barla Cambazoglu et al. 2013. A term-based inverted index partitioning
format. Their main focus is on analytical queries while random                               model for efficient distributed query processing. ACM Transactions on the
reads are overlooked.                                                                        Web (TWEB) 7, 3 (2013), 1–23.
                                                                                        [21] Samy Chambi et al. 2016. Better bitmap performance with roaring bitmaps.
   In this paper, we present our approach for optimizing needle                              Software: practice and experience 46, 5 (2016), 709–719.
in a haystack queries in cloud data lakes. The main idea is to                          [22] Fay Chang et al. 2008. Bigtable: A distributed storage system for structured
                                                                                             data. ACM Transactions on Computer Systems (TOCS) 26, 2 (2008), 1–26.
maintain an index structure that maps indexed column values                             [23] Databricks. 2020. Delta Lake. Retrieved November 14, 2020 from https:
to their files. We built our solution in accordance with data lake                           //docs.databricks.com/delta
architecture, where the storage is completely separated from the                        [24] Databricks. 2020. Top 5 Reasons for Choosing S3 over HDFS.                  Re-
                                                                                             trieved November 14, 2020 from https://databricks.com/blog/2017/05/31/
compute. The reason for building our index on top of object                                  top-5-reasons-for-choosing-s3-over-hdfs.html
stores was motivated by the observation that the monetary cost                          [25] Jeffrey Dean and Sanjay Ghemawat. 2008. MapReduce: simplified data pro-
of this solution is significantly lower than the one based on DBMS.                          cessing on large clusters. Commun. ACM 51, 1 (2008), 107–113.
                                                                                        [26] Jens Dittrich et al. 2010. Hadoop++ making a yellow elephant run like a
Our experimental evaluation provides an additional reason to                                 cheetah (without it even noticing). Proceedings of the VLDB Endowment 3, 1-2
favor object stores over the DMBS approach - much lower index                                (2010), 515–529.
                                                                                        [27] Xin Dong and Alon Halevy. 2007. Indexing dataspaces. In Proceedings of the
computation time.                                                                            2007 ACM SIGMOD international conference on Management of data (SIGMOD).
   For future work, we are planning to extend our index scheme                               43–54.
and support more complex query types (e.g. joins, group by).                            [28] Avrilia Floratou et al. 2014. Sql-on-hadoop: Full circle back to shared-nothing
                                                                                             database architectures. Proceedings of the VLDB Endowment 7, 12 (2014),
An additional research direction may be an implementation of                                 1295–1306.
different systems according to data lake architecture and thereby                       [29] Corinna Giebler et al. 2019. Leveraging the Data Lake: Current State and
improving their monetary cost and load time. For example, im-                                Challenges. In International Conference on Big Data Analytics and Knowledge
                                                                                             Discovery (DaWaK). Springer, 179–188.
plementing a key-value store on top of object stores seems to be                        [30] Paolo Giudice et al. 2019. An approach to extracting complex knowledge
a promising research direction which can be based on the ideas                               patterns among concepts belonging to structured, semi-structured and un-
                                                                                             structured sources in a data lake. Information Sciences 478 (2019), 606–626.
presented in this work.                                                                 [31] Google. 2020. Google Cloud Storage. Retrieved November 14, 2020 from
                                                                                             https://cloud.google.com/storage
REFERENCES                                                                              [32] Michael Hausenblas and Jacques Nadeau. 2013. Apache drill: interactive
                                                                                             ad-hoc analysis at scale. Big Data 1, 2 (2013), 100–104.
 [1] Daniel Abadi et al. 2015. SQL-on-hadoop systems: tutorial. Proceedings of the      [33] Marcel Kornacker et al. 2015. Impala: A Modern, Open-Source SQL Engine
     VLDB Endowment 8, 12 (2015), 2050–2051.                                                 for Hadoop.. In Proceedings of the 7th Conference on Innovative Data Systems
 [2] Daniel Abadi et al. 2020. The Seattle Report on Database Research. ACM                  Research (CIDR), Vol. 1. 9.
     SIGMOD Record 48, 4 (2020), 44–53.                                                 [34] Avinash Lakshman and Prashant Malik. 2010. Cassandra: a decentralized
 [3] Amazon. 2020. Amazon Athena. Retrieved November 14, 2020 from https:                    structured storage system. ACM SIGOPS Operating Systems Review 44, 2
     //aws.amazon.com/athena                                                                 (2010), 35–40.
 [4] Amazon. 2020. Amazon DynamoDB On-Demand Pricing. Retrieved November                [35] Daniel Lemire and Leonid Boytsov. 2015. Decoding billions of integers per
     14, 2020 from https://aws.amazon.com/dynamodb/pricing/on-demand/                        second through vectorization. Software: Practice and Experience 45, 1 (2015),
 [5] Amazon. 2020. Amazon EC2 On-Demand Pricing. Retrieved November 14,                      1–29.
     2020 from https://aws.amazon.com/ec2/pricing/on-demand/                            [36] Sergey Melnik et al. 2011. Dremel: interactive analysis of web-scale datasets.
 [6] Amazon. 2020. Amazon S3 pricing. Retrieved November 14, 2020 from                       Commun. ACM 54, 6 (2011), 114–123.
     https://aws.amazon.com/s3/pricing/                                                 [37] Microsoft. 2020. Azure Blob Storage. Retrieved November 14, 2020 from
 [7] Amazon. 2020. AWS Glue. Retrieved November 14, 2020 from https://aws.                   https://azure.microsoft.com/services/storage/blobs/
     amazon.com/glue                                                                    [38] Microsoft. 2020. Azure subscription and service limits, quotas, and constraints.
 [8] Amazon. 2020. Best Practices for Amazon EMR. Retrieved November 14, 2020                Retrieved November 14, 2020 from https://docs.microsoft.com/en-us/azure/
     from https://d0.awsstatic.com/whitepapers/aws-amazon-emr-best-practices.                azure-resource-manager/management/azure-subscription-service-limits
     pdf                                                                                [39] Microsoft. 2020. Hyperspace. Retrieved November 14, 2020 from https:
 [9] Amazon. 2020. Best Practices for Running Apache Cassandra on Amazon EC2.                //microsoft.github.io/hyperspace
     Retrieved November 14, 2020 from https://aws.amazon.com/blogs/big-data/            [40] Raghu Ramakrishnan and Johannes Gehrke. 2000. Database management
     best-practices-for-running-apache-cassandra-on-amazon-ec2/                              systems. McGraw-Hill.
[10] Amazon. 2020.                Building and Maintaining an Amazon                    [41] Franck Ravat and Yan Zhao. 2019. Data lakes: Trends and perspectives. In
     S3 Metadata Index without Servers.                         Retrieved Novem-             International Conference on Database and Expert Systems Applications (DEXA).
     ber      14,     2020     from       https://aws.amazon.com/blogs/big-data/             Springer, 304–313.
     building-and-maintaining-an-amazon-s3-metadata-index-without-servers/              [42] Raghav Sethi, Martin Traverso, Dain Sundstrom, David Phillips, Wenlei Xie,
[11] Amazon. 2020. DynamoDB. Retrieved November 14, 2020 from https://aws.                   Yutian Sun, Nezih Yegitbasi, Haozhun Jin, Eric Hwang, Nileema Shingte, et al.
     amazon.com/dynamodb/                                                                    2019. Presto: SQL on everything. In Proceedings of the 35th International
[12] Amazon. 2020. S3. Retrieved November 14, 2020 from https://aws.amazon.                  Conference on Data Engineering (ICDE). IEEE, 1802–1813.
     com/s3/                                                                            [43] Konstantin Shvachko, Hairong Kuang, Sanjay Radia, and Robert Chansler.
[13] Michael Armbrust, Reynold S Xin, Cheng Lian, Yin Huai, Davies Liu, Joseph K             2010. The hadoop distributed file system. In Proceedings of the 26th Symposium
     Bradley, Xiangrui Meng, Tomer Kaftan, Michael J Franklin, Ali Ghodsi, et al.            on Mass Storage Systems and Technologies (MSST). IEEE, 1–10.
     2015. Spark sql: Relational data processing in spark. In Proceedings of the 2015   [44] Junjay Tan et al. 2019. Choosing a cloud DBMS: architectures and tradeoffs.
     International Conference on Management of Data (SIGMOD). 1383–1394.                     Proceedings of the VLDB Endowment 12, 12 (2019), 2170–2182.
[14] Luiz André Barroso, Jeffrey Dean, and Urs Holzle. 2003. Web search for a           [45] IT Governance Privacy Team. 2020. EU General Data Protection Regulation
     planet: The Google cluster architecture. IEEE micro 23, 2 (2003), 22–28.                (GDPR)–An implementation and compliance guide. IT Governance Ltd.
[15] Andrzej Białecki, Robert Muir, Grant Ingersoll, and Lucid Imagination. 2012.       [46] Grisha Weintraub. 2020. data-lake-index. Retrieved November 14, 2020 from
     Apache lucene 4. In SIGIR 2012 workshop on open source information retrieval.           https://github.com/grishaw/data-lake-index
     17.                                                                                [47] Matei Zaharia et al. 2016. Apache spark: a unified engine for big data process-
[16] Spyros Blanas, Jignesh M Patel, Vuk Ercegovac, Jun Rao, Eugene J Shekita, and           ing. Commun. ACM 59, 11 (2016), 56–65.
     Yuanyuan Tian. 2010. A comparison of join algorithms for log processing in         [48] Justin Zobel and Alistair Moffat. 2006. Inverted files for text search engines.
     mapreduce. In Proceedings of the 2010 ACM SIGMOD International Conference               ACM computing surveys (CSUR) 38, 2 (2006), 6–es.
     on Management of data (SIGMOD). 975–986.
[17] B. H. Bloom. 1970. Space/time trade-offs in hash coding with allowable errors.
     Commun. ACM 13, 7 (1970), 422–426.