<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>Generalized Parallel Join Algorithms and Designing Cost Models</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>Alice Pigul</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>SPbSU m</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>pay@math.spbu.ru</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <aff id="aff0">
          <label>0</label>
          <institution>Proceedings of the Spring Young Researcher's Colloquium On Database and Information Systems SYRCoDIS</institution>
          ,
          <addr-line>Moscow, Russia, 2012</addr-line>
        </aff>
      </contrib-group>
      <abstract>
        <p>Applications for large-scale data analysis use such techniques as parallel DBMS, MapReduce (MR) paradigm, and columnar storage. In this paper we focus in a MapReduce environment. The aim of this work is to compare the different join algorithms and designing cost models for further use in the query optimizer.</p>
      </abstract>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>1 Introduction</title>
      <p>Data-intensive applications include large-scale data
warehouse systems, cloud computing, data-intensive
analysis. These applications have their own specific
computational workload. For example, analytic systems
produce relatively rare updates but heavy select
operation with millions of records to be processed, often
with aggregations.</p>
      <p>There are the following architectures that are used
to analyze massive amounts of data: MapReduce
paradigm, parallel DBMSs, column-wise store, and
various combinations of these approaches.</p>
      <p>Applications of this type process multiple data sets.
This implies need to perform several join operation. It’s
known join operation is one of the most expensive
operations in terms both I / O and CPU costs.</p>
      <p>Unfortunately, join algorithms is not directly
supported in MapReduce. There are some approaches to
solve this problem by using a high-level language
PigLatin, HiveQL for SQL queries or implementing
algorithms from research papers. The aim of this work
is to generalize and compare existing equi-join
algorithms with some optimization techniques and build
cost model which could be used in a query optimizer for
a distributed DBMS with MapReduce.</p>
      <p>This paper is organized as follows the section 2
describe state of the art. Join algorithms and some
optimization techniques were introduced in 3 section.
The designing of cost models for join algorithms are
presented in 4 section. Performance evaluation will be
described in 5 section. Finally, future direction and
some discussion of experiments will be given.</p>
      <p>
        DBMS inside MapReduce. The basic idea is
to connect multiple single node database
systems using MapReduce as the task
coordinator and network communication layer.
An example is a hybrid database HadoopDB
[
        <xref ref-type="bibr" rid="ref2">2</xref>
        ].
      </p>
      <p>
        MapReduce aside of the parallel DBMS.
MapReduce is used to implement an ETL
produced data to be stored in parallel DBMS.
This approach is discussed in [
        <xref ref-type="bibr" rid="ref28">28</xref>
        ] Vertica,
which also supports the column-wise store.
      </p>
      <p>
        Another group of hybrid systems combines
MapReduce with column-wise store. MapReduce and
column-wise store are effective in data-intensive
applications. Hybrid systems based on this two
techniques may be found in [
        <xref ref-type="bibr" rid="ref13 ref20">20,13</xref>
        ].
      </p>
      <sec id="sec-1-1">
        <title>2.2 Algorithms for Join Operation</title>
        <p>
          Detailed comparison of relational join algorithms was
presented in [
          <xref ref-type="bibr" rid="ref26">26</xref>
          ]. In our paper, the consideration is
restricted to a comparison of joins in the context of
MapReduce paradigm.
        </p>
        <p>Papers which discuss equi-join algorithms can be
divided into two categories which describe join
algorithms and multi join execution plans.</p>
        <p>
          The former category deals with design and analyses join
algorithm of two data sets. A comparative analysis of
two-way join techniques is presented in [
          <xref ref-type="bibr" rid="ref21 ref4">6, 4, 21</xref>
          ]. The
cost model for two-way join algorithms in terms of cost
I/O is presented in [
          <xref ref-type="bibr" rid="ref17">7, 17</xref>
          ].
        </p>
        <p>
          The basic idea of multi-way join is to find strategies
to combine the natural join of several relations.
Different join algorithms from relation algebra are
presented in [
          <xref ref-type="bibr" rid="ref30">30</xref>
          ]. The authors introduce the extension
of MapReduce to facilitate implement relation
operations. Several optimizations for multi-way join are
described in [
          <xref ref-type="bibr" rid="ref18 ref3">3, 18</xref>
          ]. Authors introduced a one-to-many
shuffling strategy. Multi-way join optimization for
column-wise store is considered in [
          <xref ref-type="bibr" rid="ref20 ref32">20, 32</xref>
          ].
        </p>
        <p>
          Theta-Joins and set-similarity joins using
MapReduce are addressed in [
          <xref ref-type="bibr" rid="ref23">23</xref>
          ] and [
          <xref ref-type="bibr" rid="ref27">27</xref>
          ] respectively.
        </p>
      </sec>
      <sec id="sec-1-2">
        <title>2.3 Optimization techniques and cost models</title>
        <p>
          In contrast to the sql queries in parallel database, the
MapReduce program contains user-defined map and
reduce functions. Map and reduce functions can be
considered as a black-box, when nothing is known
about these functions, or they can be written on sql-like
languages, such as HiveQL, PigLatin, MRQL, or sql
operations can be extracted from functions on semantic
basis. Automatic finding good configuration settings for
arbitrary program offered in [
          <xref ref-type="bibr" rid="ref16">16</xref>
          ]. Theoretical designing
cost models for arbitrary MR program for each phase
separately presented in [
          <xref ref-type="bibr" rid="ref15">15</xref>
          ]. If the MR program is
similar to the semantics of SQL, it allows us to
construct a more accurate cost model or adapt some of
the optimization techniques from relational databases.
HadoopToSQL [
          <xref ref-type="bibr" rid="ref22">22</xref>
          ] allows to take advantage of two
different data storages such as SQL database and the
text format in MapReduce storage and to use index at
right time by transforming the MR program to SQL.
Manimal system [
          <xref ref-type="bibr" rid="ref17">17</xref>
          ] uses static analysis for detection
and exploiting selection, projection and data
compression in MR programs and if needed to employ
B+ tree index.
        </p>
        <p>
          New SQL-like query language and algebra is presented
in [
          <xref ref-type="bibr" rid="ref12">12</xref>
          ]. But they are needed cost model based on
statistic. Detailed construction of the model to estimate
the I/O cost for each phase separately is given in [
          <xref ref-type="bibr" rid="ref24">24</xref>
          ].
Simple theoretical considerations for selecting a
particular join algorithm are presented in [
          <xref ref-type="bibr" rid="ref21">21</xref>
          ]. Another
approach [7] for selecting join algorithm is to measure
the correlation between the input size and the join
algorithm execution time with fixed cluster
configuration settings.
3
        </p>
      </sec>
    </sec>
    <sec id="sec-2">
      <title>Join algorithms and optimization techniques</title>
      <p>In this section we consider various techniques of
twoway joins in MapReduce framework. Join algorithms
can be divided into two groups: Reduce-side join and
Map-side join. The pseudo code presented in Listings,
where R – right dataset, L – left dataset, V – line from
file, Key – join key, that was parsed from a tuple, in this
context tuple is V.</p>
      <sec id="sec-2-1">
        <title>3.1 Reduce-Side join</title>
        <p>Reduce-side join is an algorithm which performs data
pre-processing in Map phase, and direct join is done
during the Reduce phase. Join of this type is the most
general without any restriction on the data. Reduce-side
join is the most time-consuming, because it contains an
additional phase and transmits data over the network
from one phase to another. In addition, the algorithm
has to pass information about source of data through the
network. The main objective of the improvement is to
reduce the data transmission over the network from the
Map task to the Reduce task by filtering the original
data through semi-joins. Another disadvantage of this
class of algorithms is the sensitivity to the data skew,
which can be addressed by replacing the default hash
partitioner with a range partitioner.</p>
        <p>There are three algorithms in this group:
 General reducer-side join,

</p>
        <sec id="sec-2-1-1">
          <title>Optimized reducer-side join,</title>
          <p>the Hybrid Hadoop join.</p>
          <p>General reducer-side join is the simplest one. The
same algorithms are called Standard Repartition Join in
[6]. The abbreviation is GRSJ.</p>
        </sec>
        <sec id="sec-2-1-2">
          <title>Map (K: null, V from R or L)</title>
          <p>Tag = bit from name of R or L;
emit (Key, pair(V,Tag));
Reduce (K’: join key, LV: list of V with key K’)
create buffers Br and Bl for R and L;
for t in LV do</p>
          <p>add t.v to Br or Bl by t.Tag;
for r in Br do
for l in Bl do
emit (null, tuple(r.V,l.V));</p>
        </sec>
        <sec id="sec-2-1-3">
          <title>Listing 1: GRSJ.</title>
          <p>This algorithm has both Map and Reduce phases. In the
Map phase, data are read from two sources and tags are
attached to the value to identify the source of a
key/value pair. As the key is not effecting by this
tagging, so we can use the standard hash partitioner. In
Reduce phase, data with the same key and different tags
are joined with nested-loop algorithm. The problems of
this approach are that the reducer should have sufficient
memory for all records with a same key; and the
algorithm sensitivity to the data skew.</p>
          <p>
            Optimized reducer-side join enhances previous
algorithm by overriding sorting and grouping by the
key, as well as tagging data source. Also known as
Improved Repartition Join in [6], Default join in [
            <xref ref-type="bibr" rid="ref14">14</xref>
            ].
The abbreviation is ORSJ. In the algorithm all the
values of the first tag are followed by the values of the
second one. In contrast with the General reducer-side
join, the tag is attached to both a key and a value. Due
to the fact that the tag is attached to a key, the
partitioner must be overridden in order to split the nodes
by the key only. This case requires buffering for only
one of input sets. Optimized reducer-side join inherits
major disadvantages of General reducer-side join
namely the transferring through the network additional
information about the source and the algorithm
sensitivity to the data skew.
          </p>
        </sec>
        <sec id="sec-2-1-4">
          <title>Map (K:null, V from R or L)</title>
          <p>Tag = bit from name of R or L;
emit (pair(Key,Tag), pair(V,Tag));
Partitioner(K:key, V:value, P:the number of reducers)
return hash_f(K.Key) mod P;
Reduce (K’: join key, LV: list of V’ with key K’)
create buffers Br for R;
for t in LV with t.Tag corresponds to R do</p>
          <p>add t.v to Br;
for l in LV with l.Tag corresponds to L do
for r in Br do</p>
          <p>emit (null, tuple(r.V,l.V));</p>
        </sec>
        <sec id="sec-2-1-5">
          <title>Listing 2: ORSJ.</title>
          <p>
            The Hybrid join [
            <xref ref-type="bibr" rid="ref4">4</xref>
            ] combines the Map-side and
Reduce-side joins. The abbreviation is
HYB.
          </p>
        </sec>
        <sec id="sec-2-1-6">
          <title>Job 1: partition the smaller file S</title>
          <p>Map (K:null, V from S)</p>
          <p>emit (Key,V);
Reduce (K’:join key, LV: list of V’ with key K’)
for t in LV do</p>
          <p>emit (null, t);</p>
        </sec>
        <sec id="sec-2-1-7">
          <title>Job 2: join two datasets</title>
          <p>Map (K:null, V from B)</p>
          <p>emit (Key,V);
init() //for Reduce phase
read needed partition of output file from Job 1;
add it to hashMap(Key, list(V)) H;
Reduce (K’:join key, LV: list of V’ with key K’)
if(K’ in H) then
for r in LV do
for l in H.get(K’) do</p>
          <p>emit (null, tuple(r,l));</p>
        </sec>
        <sec id="sec-2-1-8">
          <title>Listing 3: HYB.</title>
          <p>In Map phase, we process only one set and the
second set is partitioned in advance. The pre-partitioned
set is pulled out of blocks from a distributed system in
the Reduce phase, where it is joined with another data
set that came from the Map phase. The similarity with
the Map-side join is the restriction that one of the sets
has to be split in advance with the same partitioner,
which will split the second set. Unlike Map-side join, it
is necessary to split in advance only one set. The
similarity with the Reduce-side join is that algorithm
requires two phases, one of them for pre-processing of
data and one for direct join. In contrast with the
Reduce-side join we do not need additional information
about the source of data, as they come to the Reducer at
a time.</p>
        </sec>
      </sec>
      <sec id="sec-2-2">
        <title>3.2 Map-Side join</title>
        <p>Map-side join is an algorithm without Reduce phase.
This kind of join can be divided into two groups. First
of them is partition join, when data previously
partitioned into the same number of parts with the same
partitioner. The relevant parts will be joined during the
Map phase. This map-side join is sensitive to the data
skew. The second is in memory join, when the smaller
dataset send whole to all mappers and bigger dataset is
partitioned over the mappers. The problem with this
type of join occurs when the smaller of the sets can not
fit in memory.</p>
        <p>There are three methods to avoid this problem:
 JDBM-based map join,

</p>
        <sec id="sec-2-2-1">
          <title>Multi-phase map join,</title>
        </sec>
        <sec id="sec-2-2-2">
          <title>Reversed map join.</title>
          <p>Map-side partition join algorithm assumes that the two
sets of data pre-partitioned into the same number of
splits by the same partitioner. Also known as default
map join. The abbreviation is MSPJ. At the Map phase
one of the sets is read and loaded into the hash table,
then two sets are joined by the hash table. This
algorithm buffers all records with the same keys in
memory, as is the case with skew data may fail due to
lack of enough memory.</p>
        </sec>
        <sec id="sec-2-2-3">
          <title>Job 1: partition dataset S as in HYB</title>
          <p>Job 2: partition dataset B as in HYB
Job 3: join two datasets
init() //for Map phase
read needed partition of output file from Job 1;
add it to hashMap(Key, list(V)) H;
Map(K:null, V from B)
if (K in H) then
for r in LV do
for l in H.get(K) do
emit(null, tuple(r,l));</p>
        </sec>
        <sec id="sec-2-2-4">
          <title>Listing 4: MSPJ.</title>
          <p>Map-side partition merge join is an improvement of the
previous version of the join. The abbreviation is
MSPMJ. If data sets in addition to their partition are
sorted by the same ordering, we apply merge join. The
advantage of this approach is that the reading of the
second set is on-demand, but not completely, thus
memory overflow can be avoided. As in the previous
cases, for optimization can be used the semi-join
filtering and range partitioner.</p>
        </sec>
        <sec id="sec-2-2-5">
          <title>Job 1: partition S dataset as in HYB</title>
          <p>Job 2: partition B dataset as in HYB
Job 3: join two datasets
init() //for Map phase
find needed partition SP of output file from Job 1;
read first lines with the same key K2 from SP and add
to buffer B;
Map(K:null, V from B)
while (K &gt; K2) do
read T from SP with key K2;
while (K == K2) do
add T to B;
read T from SP with key K2;
if (K == K2) then
for r in B do</p>
          <p>emit(null, tuple(r,V));</p>
        </sec>
        <sec id="sec-2-2-6">
          <title>Listing 5: MSPMJ.</title>
          <p>
            In-Memory Join does not require to distribute original
data in advance unlike the versions of map joins
discussed above. The same algorithms are called
Mapside replication join in [7], Broadcast Join in [6],
Memory-backed joins [
            <xref ref-type="bibr" rid="ref4">4</xref>
            ], Fragment-Replicate join in
[
            <xref ref-type="bibr" rid="ref14">14</xref>
            ]. The abbreviation is IMMJ. Nevertheless, this
algorithm has a strong restriction on the size of one of
the sets: it must fit completely in memory. The
advantage of this approach is its resistance to the data
skew because it sequentially reads the same number of
tuples at each node. There are two options for
transferring the smaller of the sets:
 using a distributed cache,
 reading from a distributed file system.
init() // for Map phase
read S from HDFS;
add it to hashMap(Key, list(V)) H;
map (K:null, V from B)
if (K in H) then
for l in H.get(K) do
          </p>
          <p>emit (null, tuple(v,l));</p>
        </sec>
        <sec id="sec-2-2-7">
          <title>Listing 5: IMMJ.</title>
          <p>The next three algorithms optimize the In-Memory Join
for a case, when two sets are large and no of them fits
into the memory.</p>
          <p>
            JDBM-based map join is presented in [
            <xref ref-type="bibr" rid="ref21">21</xref>
            ]. In this case,
JDBM library automatically swaps hash table from
memory to disk.
          </p>
          <p>The same as IMMJ, but H is implemented by HTree
instead of hashMap .</p>
        </sec>
        <sec id="sec-2-2-8">
          <title>Listing 6: JDBM.</title>
          <p>
            Multi-phase map join [
            <xref ref-type="bibr" rid="ref21">21</xref>
            ] is algorithm where the
smaller of the sets is partitioned into parts that fit into
memory, and for each part runs In-Memory join. The
problem with this approach is that it has a poor
performance. If the size of the set, which to be put in
the memory is increased twice, the execution time of
this join is also doubled. It is important to note that the
set, which will not be loaded into memory, will be read
many times from the disk.
          </p>
          <p>For part P from S that fit into memory do IMMJ(P,B).</p>
        </sec>
        <sec id="sec-2-2-9">
          <title>Listing 7: Multi-phase map join.</title>
          <p>
            Idea of Reversed map join [
            <xref ref-type="bibr" rid="ref21">21</xref>
            ] approach is that the
bigger of the sets, which is partitions during the Map
phase, loading in the hash table. Also known as
Broadcast Join in [6]. The abbreviation is REV. The
second dataset is read from a file line by line and joined
using a hash table.
          </p>
          <p>init() //for Map phase
read S from HDFS;
add it to hashMap(Key, list(V)) H;
map (K:null, V from S)</p>
          <p>add to hashMap(Key, V) H;
close() //for Map phase
find B in HDFS
while (not end B) do
read line T;
K = join key from tuple T;
if (K in H) then
for l in H.get(K) do</p>
          <p>emit(null, tuple(T,l));</p>
        </sec>
        <sec id="sec-2-2-10">
          <title>Listing 7: REV.</title>
        </sec>
      </sec>
      <sec id="sec-2-3">
        <title>3.3 Semi-Join</title>
        <p>Sometimes a large portion of the data set does not take
part in the join. Deleting of tuples that will not be used
in join significantly reduces the amount of data
transferred over the network and the size of the dataset
for the join. This preprocessing can be carried out using
semi-joins by selection or by a bitwise filter. However,
these filtering techniques introduce some cost (an
additional MR job), so the semi-join can improve the
performance of the system only if the join key has low
selectivity. There are three ways to implement the
semijoin operation:
 a semi-join using bloom-filter,


semi-join using selection,
an adaptive semi-join.</p>
        <p>Bloom-filter is a bit array that defines a membership of
element in the set. False positive answers are possible,
but there are no false-negative responses in the solution
of the containment problem. The accuracy of the
containment problem solution depends on the size of
the bitmap and on the number of elements in the set.
These parameters are set by the user. It is known that
for a bitmap of fixed size m and for the data set of n
tuples, the optimal number of hash functions is
k=0.6931*m/n. In the context of MapReduce, the
semijoin is performed in two jobs. The first job consists of
the Map phase, in which keys from one set are selected
and added to the Bloom-filter. The Reduce phase
combines several Bloom-filters from first phase into
one. The second job consists only of the Map phase,
which filters the second data set with a Bloom-filter
constructed in previous job. The accuracy of this
approach can be improved by increasing the size of the
bitmap. However in this case, a larger bitmap consumes
more amounts of memory. The advantage of this
method is its the compactness. The performance of the
semi-join using Bloom-filter highly depends on the
balance between the Bloom-filter size, which increases
the time needed for its reconstruction of the filter in the
second job, and the number of false positive responses
in the containment solution. The large size of the data
set can seriously degrade the performance of the join.</p>
        <sec id="sec-2-3-1">
          <title>Job 1: construct Bloom filter</title>
          <p>Map (K:null, V from L)</p>
          <p>Add Key to BloomFilter Bl
close() //for Map phase
emit(null, Bl);</p>
        </sec>
        <sec id="sec-2-3-2">
          <title>Reduce (K’: key, LV) //only 1 Reducer</title>
          <p>for l in LV do</p>
          <p>union filters by operation Or
close() // for Reduce phase
write resulting filter into file;</p>
        </sec>
        <sec id="sec-2-3-3">
          <title>Job 2: filter dataset</title>
          <p>init() //for Map phase</p>
          <p>read filter from file in Bl
Map (K:null, V from R)
if (Key in Bl) then</p>
          <p>emit (null, V);
Job 3: do join with L dataset and filtered dataset from
Job 2.</p>
        </sec>
        <sec id="sec-2-3-4">
          <title>Listing 7: Semi-join using Bloom-filter.</title>
          <p>Semi-join with selection extracts unique keys and
constructs a hash table. The second set is filtered by the
hash table constructed in the previous step. In the
context of MapReduce, the semi-join is performed in
two jobs. Unique keys are selected during the Map
phase of the first job and then they are combined into
one file during the Map phase. The second job consists
of only the Map phase, which filters out the second set.
The semi-join using selection has some limitations.
Hash table in memory, based on records of unique keys,
can be very large, and depends on the key size and the
number of different keys.</p>
        </sec>
        <sec id="sec-2-3-5">
          <title>Job 1: find unique keys</title>
          <p>Map (K:null, V from L)
Create HashMap H;
if (not Key in H) then
add Key to H;
emit (Key, null);</p>
        </sec>
        <sec id="sec-2-3-6">
          <title>Reduce (K’: key, LV) //only one Reducer emit (null,key);</title>
        </sec>
        <sec id="sec-2-3-7">
          <title>Job 2: filter dataset</title>
          <p>init() //for Map phase</p>
          <p>add to HashMap H unique keys from job 1;
Map (K:null, V from R)
if (Key in H) then</p>
          <p>emit (null,V);
Job 3: do join with L dataset and filtered dataset from
Job 2.</p>
        </sec>
        <sec id="sec-2-3-8">
          <title>Listing 8: Semi-join with selection.</title>
          <p>The Adaptive semijoin is performed in one job, but
filters the original data on the flight during the join.
Similar to the Reduce-side join at the Map phase the
keys from two data sets are read and values are set
equal to tags which identify the source of the keys. At
the Reduce phase keys with different tags are selected.
The disadvantage of this approach is that additional
information about the source of data is transmitted over
the network.</p>
          <p>Job 1: find keys which are present in two datasets</p>
          <p>Map (K:null, V from R or L)</p>
          <p>Tag = bit from name of R or L;
emit (Key,Tag);
Reduce (K’: join key, LV: list of V with key K’)</p>
          <p>Val = first value from LV;
for t in LV do
if (not Val==Val2) then</p>
          <p>emit (null, K’);
Job 2: before joining it is necessary to filter the smaller
dataset by keys from the Job 1 that will be loaded into
hash map. Then the bigger dataset is joined with filtered
one.</p>
        </sec>
        <sec id="sec-2-3-9">
          <title>Listing 8: Adaptive semi-join.</title>
        </sec>
      </sec>
      <sec id="sec-2-4">
        <title>3.4 Range Partitioners</title>
        <p>All algorithms, except the In-Memory join and their
optimizations are sensitive to the data skew. This
section describes two techniques of the default hash
partitioner replacement.</p>
        <p>
          A Simple Range-based Partitioner [
          <xref ref-type="bibr" rid="ref4">4</xref>
          ] (this kind similar
to the Skew join in [
          <xref ref-type="bibr" rid="ref14">14</xref>
          ]) applies a range vector of
dimension n constructed from the join keys before
starting a MR job. By this vector join keys will be
splitted into n parts, where n is the number of Reduce
jobs. Ideally partitioner vector is constructed from the
whole original set of keys, in practice a certain number
of keys is chosen randomly from the data set. It is
known that the optimal number of keys for the vector
construction is equal to the square root of the total
number of tuples. With a heavy data skew into a single
key value, some elements of the vector may be
identical. If the key belongs to multiple nodes, a node is
selected randomly in the case of data on which to build
a hash table, otherwise the key is sent to all nodes (to
save memory as a hash table is contained in the
memory).
        </p>
        <p>
          Virtual Processor Partitioner [
          <xref ref-type="bibr" rid="ref4">4</xref>
          ] is an improvement of
the previous algorithm based on increasing the number
of partition. The number of parts is specified multiple of
the tasks number. The approach tends to load the nodes
with the same keys uniformly (compared with the
previous version). The same keys are scattered on more
nodes than in the previous case.
//before the MR job starts
// optimal max = sqrt(|R|+|L|)
getSamples (Red:the number of reducers, max: the max
number of samples)
C = max/Splits.length;
Create buffer B;
for s in Splits of R and L do
get C keys from s;
add it to B;
sort B;
//in case simple range partitioner P == 1
//in case virtual range partitioner P &gt; 1
for j&lt;(Red*P) do
        </p>
        <p>T = B.length/(Red*P)*(j+1);
write into file B[T];</p>
        <sec id="sec-2-4-1">
          <title>Map(K:null, V from L or R)</title>
          <p>Tag = bit from name of R or L;
read file with samples and add samples to Buffer B;
//in case virtual partition it is needed to
// each index mod |Reducers|
Ind = {i: B[i-1] &lt; Key &lt;= B[i]}
// Ind may be array of indexes in skew case
if (Ind.length &gt;1) then
if (V in L) then
node = random(Ind);
emit (pair(Key, node), pair(V, Tag));
for i in Ind do</p>
          <p>emit (pair(Key, i), pair(V, Tag));
else
else
emit (pair(Key, Ind), pair(V, Tag));
Partitioner (K:key, V:value, P:the number of reducers)
return K.Ind;
Reducer (K’: join key, LV: list of V’ with key K’)</p>
          <p>The same as GRSJ</p>
        </sec>
        <sec id="sec-2-4-2">
          <title>Listing 8: The range partitioners.</title>
        </sec>
      </sec>
      <sec id="sec-2-5">
        <title>3.5 Distributed cache</title>
        <p>The advantage of using distributed cache is that data set
are copied only once at the node. It is especially
effective if several tasks at one node need the same file.
In contrast the access to the global file system needs
more communication between the nodes. Better
performance of the joins without the cache can be
achieved by increasing number of the files replication,
so there's a good chance to access the file version
locally.</p>
      </sec>
    </sec>
    <sec id="sec-3">
      <title>4 Cost model</title>
      <p>
        Due to significant differences between parallel DBMS
and MapReduce, the MapReduce paradigm requires
another optimization techniques based on indexing and
compression, programming models, data distribution
and query execution strategy. Therefore, we need a
different strategy of designing model cost. There are
two types of designing cost models: the task execution
simulation [
        <xref ref-type="bibr" rid="ref29">29</xref>
        ] and analytical cost calculation [
        <xref ref-type="bibr" rid="ref15 ref24">15, 24</xref>
        ].
To measure the query parallelism effectiveness, it is
need to build a cost model that can describe the
behavior of each algorithm for parallel query.
Analytical model is cost formulas that are used to
calculate the query execution time, taking into account
the specific of parallel algorithm. Below, analytical
cost model for join algorithms and their optimizations
will be constructed.
      </p>
      <sec id="sec-3-1">
        <title>4.1Configuration settings</title>
        <p>Execution of MR program depends on input data
statistic such as selectivity, skew, compression, on
cluster resource such as number of nodes, on
configuration parameters, such as I/O cost, and on
properties of specific algorithm. Below, the parameters
used in the analysis are presented in table.</p>
        <p>Variable
s(x)
p(x)
wid
ct
pC
sC
sortC
cC
mC
selP
selC
|red|
|map|
rh
wh
rwl
tC
sortMB
sortRP
sortSP
F
shuBP
shuMP
memMT
memT
redBP</p>
      </sec>
      <sec id="sec-3-2">
        <title>4.2 Cost of arbitrary MR program</title>
        <p>As mentioned above, the MR job consists of the
execution stages, thus it is possible to estimate each
phase separately. Job may contain the following stages:
Setup, Read (read map input), Map (map function),
Buffer (serializing to buffer, partitioning, sorting,
combining, compressing, write output data to local
disk), Merge (merging spill files), Shuffle (transferring
map output to reducers), MergeR(merging received
files), Reduce (reduce function), Write (writing result to
the HDFS), Cleanup. Due to the fact that the job of MR
program carried out in parallel or in waves, it is possible
to calculate the approximate total cost of the job
through the cost of one task (one mapper and one
reducer). The Cost job take into account the parallel
threads of execution and compute the total cost of MR
job, where cm and cr are costs of one task mapper or
reducer respectively, MaxMN and MaxRN are
maximum map tasks or reduce task per node.</p>
        <p>Cost job 
| map | *cm
</p>
        <p>| red | *cr
| nodes | *MaxMN | nodes | *MaxRN
This formula is bad for the skew data, when one task is
time consuming.
 ctr
сm  сread  cMap  cBuffer  сmerge,| red | 0 ,</p>
        <p>cread  cMap  cWrite, otherwise
cr  cshuffle  cmergeR  creduce  cWrite .</p>
        <p>
          CMap and creduce are the cost of user-define functions, so
for each join algorithm it is calculated by the own
formula. Another cost values from (cread, cBuffer, cWrite,
cmerge, cmergeR, cshuffle, ct) are common for join algorithms.
Consider these costs in more detail as [
          <xref ref-type="bibr" rid="ref15 ref24">15, 24</xref>
          ]. Stages
of reading input data from HDFS and writing into
HDFS are calculated by:
сread  s(split ) * rh , cWrite  s(out) * wh ,
where split is input split for mapper task, out is the
output data of job. The buffering phase is more
complicated; during this stage three processes take
place: partitioning, sorting and spilling to disk.
cBuffer  s(split ) * rwl  p(outm) * ( pC  sC 
 p(buf ) 
 cC  sortC * log 2  | red | 


Where outm is output from map functions, buf is buffer
for this stage. The buffer is divided into two parts, there
are serialization buffer (SB), that contains key-value
pairs and an accounting buffer (AB) that contains the
metadata. So, the number of pairs in buffer is:
p(buf )  min{ p(SB), p( AB)}
        </p>
        <p> sortMB * 220 * (1  sortRP) * sortSP 
p(SB)   
 wid 
 sortMB * 220 * sortRP * sortSP 
p( AB)   
 16 
The number of spilled files (N) from this stage is:
 p(out) 
N   </p>
        <p> p(buf ) 
Then all spilled files must be merged with such
features:
 the number of spill files are merged at once is F,



assume that the following N  F 2 ,
at first pass it is merged so spill files that remain
files is multiplies F
at final merge if needed the combiner will be
used.</p>
        <p>The number of spill files equal to sum of spill files at
first pass (S1P), at intermediate pass (SIP) and at final
pass (SFP):
N , N  F

S1  F , (N  1) mod(F  1)  0,
(N  1) mod(F  1)  1
0, N  F

SIP    N  S1
S1   F  * F , N  F 2
N , N  F

SFP    N  S1</p>
        <p>1   F   N  SIP, N  F 2
cmerge  p(buf ) * wid * rwl * (2 * SIP  N  N * cC )
 p(buf ) * mC * (SIP  N )
ctr  s(outm)* | map | *
After that stage map output transferred to the reducers
(this cost includes the cost for all reducers).</p>
        <p>| nodes  1 |</p>
        <p>* tC
| nodes |
When segment arrive to the reducer it is placed in
shuffle buffer or if size of segment is greater than 25%
of buffer size then it is spilled into disk without
inmemory buffer. The buffer size is determined by the
configuration parameters as:
s(buf )  shuBP * memT. If buffer reaches size
threshold (s(thr)) or the number of segments is greater
than memMT, then segments are merged, sort and spill
into disk. s(thr )  s(buf ) * shuMP . The number of
segments (|segF|) in shuffle file and the number of such
files (|shF|) are:
1, s(seg)  0,25  s(buf )

 s(thr ) 
| segF |  , s(seg)  0,25 * s(buf )
 s(seg) 
memMT, | segF | memMT
 | map | 
| shF |  </p>
        <p>| segF |
If the number of shuffle files is greater than (2*F-1)
then all files are merged into one. So, all segments may
be divided on three states: in-memory buffer (segMB),
shuffle unmerged files (segUF) and shuffle merged files
(segMF).
| segMB || map | mod | segF |
0, | shF | 2 * F  1

| segMF | | shF | 2 * F  1</p>
        <p> F   1
| segUF || shF | F* | segMF |</p>
        <sec id="sec-3-2-1">
          <title>The cost of shuffle stage is:</title>
          <p>сsfuffle | segF | *s(seg ) * selC * rwl * (| shF | 
 | segMF | *2) | segMF | * | segF | * p(seg ) *
* mC | map | * p(seg ) * (mC  cC ) * I</p>
          <p>1, s(seg )  0,25 * s(buf )
I  </p>
          <p>0
Thereafter, segMB,segUF, segMF files must be merged.
Some segments from memory (segE) are spilled to disk
by redBP constraint.</p>
          <p>| segMB | *s(seg)  redBP * memT 
| segE |  s(seg) 

0, | segMB | *s(seg)  redBP * memT
If the number of files from disk is less than F then segE
files are merged separately.</p>
          <p>| segE | *s(seg)
s(m1)  </p>
          <p>0, | segUF |  | segMF | F
After the merging, the number of files from disk is:
| segUF |  | segMF | 1, s(m1)  0
| segD | </p>
          <p>| segUF |  | segMF |  | segE |
Then the process of merging is similar to cmerge, where
N=|segD|.</p>
          <p>In map function source tag is assign to each pair
(consider that input map pair is equal to output map
pair):
сMGRaSpJ  p(outm) * ct , wid  wid  0,000000953
In reduce function pairs with different tags are joined
(nested-loop):</p>
          <p>  p(inpr ) 2 
сrGeRdSuJce    2  * selР  p(inpr )  * ct
As opposite to General reducer-side join, the cost of
Optimized reducer-side join includes the cost of
combine function and the cost of reduce function is less
then сrGeRdSuJce:
сrOeRdSuJce   p(in2pr )   p(in2pr ) 2 * selР  * ct ,
сMORaSpJ  сMGRaSpJ , wid  wid  0,00000190734
In contrast to the previous join, MR program of the
Hybrid Hadoop join consist of pre-processing job and
join job. The pre-processing job is partition one dataset
into |red| parts, and besides these partitions may be got
from other MR job or from default MR job. The costs
of default map and reduce functions are:
сMpraepp  c prep</p>
          <p>reduce  p(in1) * ct
There are two ways to deliver full one dataset to the
mapper: read file from HDFS or by using distributed
cache. And if distributed cache is used then the
necessary files are copied to the slave nodes before the
job is started. So, the ctr cost is added. The costs of with
and without distributed cache deliver are:
ccache  s(in1) * wrl  p(in1) * ct
s(m2) </p>
          <p>SIP
chdfs  s(in1) * rh  p(in1) * ct
* (s(segUF)  s(segMF )  s(segE))</p>
          <p>N
At final it is merged remained files.
| segR | SFP * (| segMB |  | segE |)
N | segR |</p>
          <p>SIP
s(m3) </p>
          <p>N
The final cost of this phase is:
* | map | *s(seg )

сmergeR  (s(m1)  s(m2)  s(m3)) *  rwl 

mC </p>
          <p>
            
wid 
Since the join algorithms are known in advance we can
more accurately than the approach in [
            <xref ref-type="bibr" rid="ref28">28</xref>
            ] is to estimate
the cost of user-defined functions Map and Reduce.
          </p>
        </sec>
      </sec>
      <sec id="sec-3-3">
        <title>4.3 Cost model for Reduce-Side join</title>
        <p>In case of General reducer-side join, MR program
consists of one job and cost for combining is equal to 0.
The map and reduce functions costs of join job are:
с Mhyabp  сMpraepp (in2),</p>
        <p>ccache  p(in1) * p(in2) * selР * ct
crheydbuce  </p>
        <p>chdfs  p(in1) * p(in2) * selР * ct</p>
      </sec>
      <sec id="sec-3-4">
        <title>4.4 Cost model for Map-Side join</title>
        <p>The join job doesn’t have reducer phase.</p>
        <p>Map-side partition join consists of pre-processing jobs
for two input datasets (or partitions are got from another
job) and join job.</p>
        <p>The map function of join job is:
crMedSuPcJe  сhyb</p>
        <p>reduce
In-Memory Join the small dataset (in1) is broadcast to
all reducers.</p>
        <p>
ccache  p(in2) *

с MIMaMp J  

chdfs  p(in2) *

p(in1)
In reversed join the datasets are reversed, in2 (the
bigger one) is broadcast, in1 is split of smaller dataset
and it is loaded in hash table.</p>
        <p> p(in1) * ct  p(in2) * wrl  p(in1) *
</p>
        <p>p(in2)
* * selР * ct , cache
 | red |
сMreavp  
 p(in1) * ct  p(in2) * rh  p(in1) *
 p(in2)
* * selР * ct
 | red |
Multi-phase map join cost equal to sum of immj job
s(in1)
costs. The number of summands is  1 .
memT</p>
      </sec>
      <sec id="sec-3-5">
        <title>4.5 The semi-join cost</title>
        <p>The semi-join with selection consists of two jobs:
finding unique keys and filter the dataset by unique
keys. The cost of map function of finding unique keys is
sum of filling hash table and producing the output costs.
The input for this job is one dataset.
с Mfinadp  p(in1) * 2 * ct . The reduce function of that
job is run on the one reducer and the same as default
reduce function.</p>
        <p>The filtering job consists of one map phase, where the
file with unique key from previous job is loaded into
hash table and then the split of another dataset is probe.</p>
        <p>ccache  p(in) * ct
с Mfilap  </p>
        <p>chdfs  p(in) * ct
The Adaptive semi-join is similar to reduce-side join.
The two datasets are read and tagged by label in map
function. And at reducer the pairs with different tags are
output. The cost is equal to default job. But at the actual
join it is needed to add some cost of loading file with
unique keys, filling hash table and filtering useless pairs
as с Mfilap .</p>
        <p>In case of semi-join with bloom-filter the program
consists of two jobs: creating bloom filter and filtering
the dataset. In the map function, bloom filter for split
constructed and the output all filter as one pair.
сMblaopom  p(in1) * ct  s(bloom) * lo
Where lo is the cost for processing bloom filter. The
reducer is one and it is combine all bloom-filter into
one.
сRbleodoumce | map | *s(bloom) * lo
At another job the constructed bloom-filter is loaded
and the second dataset is probed.
с Mfilabp  s(bloom) * rh  p(in2) * ct</p>
      </sec>
    </sec>
    <sec id="sec-4">
      <title>5 Experiments</title>
      <sec id="sec-4-1">
        <title>5.1 Dataset</title>
        <p>
          Data are the set of tuples, which attributes are separated
by a comma. Tuple is split into a pair of a key and a
value, where value is the remaining attributes.
Generation of synthetic data was done as in [
          <xref ref-type="bibr" rid="ref4">4</xref>
          ]. Join
keys are distributed randomly.
        </p>
      </sec>
      <sec id="sec-4-2">
        <title>5.2 Cluster configuration</title>
        <p>Cluster consists of three virtual machines, where one of
them is master and slave at the same time, the
remaining two are the slaves. Host configuration
consists of 1 processor, 512 mb of memory for the
master, for others nodes have by 512 mb, 5 gb is the
disk size. Hadoop 20.203.0 runs on Ubuntu 10.10.</p>
      </sec>
      <sec id="sec-4-3">
        <title>5.3 The General Case</title>
        <p>The base idea of this experiment is to compare
executions time of different phases of various
algorithms. Some parameters are fixed: the number of
Map and Reduce tasks is 3, the input size is
10000*100000 and 1000000*1000000 tuples.</p>
        <p>For a small amount of data, Map phase, in which all
tuples are tagged, and Shuffle phase, in which data are
transferred from one phase to another, are more costly
in Reduce-Side joins. It should be noted that GRSJ is
better than ORSJ on small data, but it is the same on big
data. It is because in first case time does not spend on
combining tuples. Possible, on the larger data ORSJ
outperform GRSJ when the usefulness of grouping by
key will be more significant. Also for algorithms with
pre-processing more time are spent on partitioning data.
The algorithms in memory (IMMJ and REV) are similar
in small data. Two algorithms are not shown in the
graph because of their bad times: JDBM-based map join
and Multi-phase map join. In large data IMMJ
algorithm could not be executed because of memory
overflow.</p>
      </sec>
      <sec id="sec-4-4">
        <title>5.4 Semi-Join</title>
        <p>
          The main idea of this experiment is to compare different
semi-join algorithms. These parameters are fixed: the
number of Map and Reduce tasks is 3, the bitmap size
of Bloom-filter is 2500000, the number of
hashfunctions in Bloom-filter is 173, built-in Jenkins hash
algorithm is used in Bloom-filter. Adaptive semi-join
(ASGRSJ) does not finish because of memory
overflow. The abbreviation of Bloom-filter semi-join
for GRSJ is BGRSJ. The abbreviation of semi-join with
selection for GRSJ is SGRSJ respectively.
In [
          <xref ref-type="bibr" rid="ref21">21</xref>
          ] was showed that using of distributed cache is
not always good strategy. They suggested that the
problem can be a high speed network. This experiment
was carried out for Reversed Map-Side join, because for
which a distributed cache can be important. Replication
was varied as 1, 2, 3 and size of data is fixed –
1000000*1000000 tuples. When data is small, the
difference is not always visible. In large data algorithms
with distributed cache outperform approach of reading
from a globally distributed system.
It is known that many of the presented algorithms are
sensitive to the data skew. In this experiment take part
such algorithms as Reduce-side join with Simple
Range-based Partitioner for GRSJ (GRSJRange) and
Virtual Processor Partitionerfor GRSJ (GRSJVirtual),
and also for comparing in memory join: IMMJ, REV
because of resistant to the skew. Fixed parameters are
used: size of two dataset is 2000000, one of the data set
has skew 500000 of 5, and another has 10 or 1 of 5. In
case with IMMJ was memory overflow.
        </p>
        <p>Although these experiments do not completely cover
the tuneable set of Hadoop parameters, they are shown
the advantages and disadvantages of the proposed
algorithms. The main problems of these algorithms are
time spent on pre-processing, transferring data, the data
skew, and memory overflow.</p>
        <p>Each of the optimization techniques introduces
additional cost to the implementation of the join, so the
algorithm based on the tuneable settings and specific
data should be carefully chosen. Also important are the
parameters of the network bandwidth when distributed
cache are used or not used and a hardware specification
of nodes because of it is importance when speculative
executions are on. Speculative execution reduces
negative effects of non-uniform performance of
physical nodes.</p>
        <p>
          Based on the collected statistics such as data size,
how many keys will be taking part in the join, these
statistics may be collected as well as the construction of
a range partitioner, the query planner can choose an
efficient variant of the join. For example, in [
          <xref ref-type="bibr" rid="ref5">5</xref>
          ] was
proposed what-if analyses and cost-based optimization.
        </p>
      </sec>
    </sec>
    <sec id="sec-5">
      <title>6 Future work</title>
      <p>The algorithms discussed in this paper, only two sets
are joined. It is interesting to extend from binary
operation to multi argument joins. Among the proposed
algorithms, there is no effective universal solution.
Therefore, it is necessary to evaluate the proposed cost
models for join algorithms. And for this problem it is
need to use real cluster with more than three nodes in it
and more powerful to process bigger data, due to the
fact that the execution time on the virtual machine may
be different from the real cluster in reading/writing,
transferring data over the network and so on.</p>
      <p>
        Also the idea of processing the data skew in
MapReduce applications from [
        <xref ref-type="bibr" rid="ref19">19</xref>
        ] can be applied to the
join algorithms. Another direction to future work is to
extend algorithm to support a theta-join and outer join.
An interesting area for future work is to develop,
implement and evaluate algorithms or extended
algebraic operations suitable for complex similarity
queries in an open distributed heterogeneous
environment. The reasons to evaluate complex
structured queries are: a need to combine search criteria
for different types of information; a query refinement
e.g. based on user profile or feedback; advanced users
may need query structuring. The execution model and
algebraic operation to be implemented are outlined in
[
        <xref ref-type="bibr" rid="ref31">31</xref>
        ]. The main goal is to solve the problems presented
in [
        <xref ref-type="bibr" rid="ref8">8</xref>
        ] as a problem.
      </p>
      <p>In addition, one of the issues is efficient physical
representation of data. Binary formats are known to
outperform the text both in speed reading and
partitioning key / value pairs, and the transmission of
compressed data over the network. Along with the
binary data format, column storage has already been
proposed for paradigm MapReduce. It is interesting to
find the best representation for specific data.</p>
    </sec>
    <sec id="sec-6">
      <title>7 Conclusion</title>
      <p>In this work we describe the state of the art in the area
of massive parallel processing, presented our
comparative study of these algorithms, cost models and
our outline directions of future work.</p>
    </sec>
  </body>
  <back>
    <ref-list>
      <ref id="ref1">
        <mixed-citation>
          [1]
          <string-name>
            <surname>Daniel</surname>
            <given-names>J.</given-names>
          </string-name>
          <string-name>
            <surname>Abadi</surname>
          </string-name>
          ,
          <string-name>
            <surname>Samuel R. Madden</surname>
            , and
            <given-names>Nabil</given-names>
          </string-name>
          <string-name>
            <surname>Hachem</surname>
          </string-name>
          .
          <article-title>Column-stores vs. row-stores: how different are they really</article-title>
          ?
          <source>In Proceedings of the 2008 ACM SIGMOD international conference on Management of data, SIGMOD '08</source>
          , pages
          <fpage>967</fpage>
          -
          <lpage>980</lpage>
          , New York, NY, USA,
          <year>2008</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref2">
        <mixed-citation>
          [2]
          <string-name>
            <given-names>Azza</given-names>
            <surname>Abouzeid</surname>
          </string-name>
          , Kamil Bajda-Pawlikowski, Daniel Abadi, Avi Silberschatz, and
          <string-name>
            <given-names>Alexander</given-names>
            <surname>Rasin</surname>
          </string-name>
          .
          <article-title>Hadoopdb: an architectural hybrid of mapreduce and dbms technologies for analytical workloads</article-title>
          .
          <source>Proc. VLDB Endow</source>
          .,
          <volume>2</volume>
          :
          <fpage>922</fpage>
          -
          <lpage>933</lpage>
          ,
          <year>August 2009</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref3">
        <mixed-citation>
          [3]
          <string-name>
            <surname>Foto</surname>
            <given-names>N.</given-names>
          </string-name>
          <string-name>
            <surname>Afrati</surname>
            and
            <given-names>Jeffrey D.</given-names>
          </string-name>
          <string-name>
            <surname>Ullman</surname>
          </string-name>
          .
          <article-title>Optimizing joins in a map-reduce environment</article-title>
          .
          <source>In Proceedings of the 13th International Conference on Extending Database Technology, EDBT '10</source>
          , pages
          <fpage>99</fpage>
          -
          <lpage>110</lpage>
          , New York, NY, USA,
          <year>2010</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref4">
        <mixed-citation>
          [4]
          <string-name>
            <given-names>Fariha</given-names>
            <surname>Atta</surname>
          </string-name>
          .
          <article-title>Implementation and analysis of join algorithms to handle skew for the hadoop mapreduce framework</article-title>
          .
          <source>Master's thesis</source>
          ,
          <source>MSc Informatics</source>
          , School of Informatics, University of Edinburgh,
          <year>2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref5">
        <mixed-citation>
          [5] [6] [7]
          <string-name>
            <given-names>Shivnath</given-names>
            <surname>Babu</surname>
          </string-name>
          .
          <article-title>Towards automatic optimization of mapreduce programs</article-title>
          .
          <source>In Proceedings of the 1st ACM symposium on Cloud computing, SoCC '10</source>
          , pages
          <fpage>137</fpage>
          -
          <lpage>142</lpage>
          , New York, NY, USA,
          <year>2010</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref6">
        <mixed-citation>
          <string-name>
            <given-names>Spyros</given-names>
            <surname>Blanas</surname>
          </string-name>
          ,
          <string-name>
            <surname>Jignesh M. Patel</surname>
            , Vuk Ercegovac, Jun Rao,
            <given-names>Eugene J.</given-names>
          </string-name>
          <string-name>
            <surname>Shekita</surname>
            , and
            <given-names>Yuanyuan</given-names>
          </string-name>
          <string-name>
            <surname>Tian</surname>
          </string-name>
          .
          <article-title>A comparison of join algorithms for log processing in mapreduce</article-title>
          .
          <source>In Proceedings of the 2010 international conference on Management of data, SIGMOD '10</source>
          , pages
          <fpage>975</fpage>
          -
          <lpage>986</lpage>
          , New York, NY, USA,
          <year>2010</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref7">
        <mixed-citation>
          <string-name>
            <given-names>A</given-names>
            <surname>Chatzistergiou</surname>
          </string-name>
          .
          <article-title>Designing a parallel query engine over map/reduce. Master's thesis</article-title>
          ,
          <source>MSc Informatics</source>
          , School of Informatics, University of Edinburgh,
          <year>2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref8">
        <mixed-citation>
          [8]
          <string-name>
            <given-names>Surajit</given-names>
            <surname>Chaudhuri</surname>
          </string-name>
          , Raghu Ramakrishnan, and
          <string-name>
            <given-names>Gerhard</given-names>
            <surname>Weikum</surname>
          </string-name>
          .
          <article-title>Integrating db and ir technologies: What is the sound of one hand clapping? In CIDR</article-title>
          , pages
          <fpage>1</fpage>
          -
          <lpage>12</lpage>
          ,
          <year>2005</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref9">
        <mixed-citation>
          [9]
          <string-name>
            <surname>Jeffrey</surname>
            <given-names>Cohen</given-names>
          </string-name>
          , Brian Dolan, Mark Dunlap,
          <string-name>
            <surname>Joseph M. Hellerstein</surname>
            , and
            <given-names>Caleb</given-names>
          </string-name>
          <string-name>
            <surname>Welton</surname>
          </string-name>
          .
          <article-title>Mad skills: new analysis practices for big data</article-title>
          .
          <source>Proc. VLDB Endow</source>
          .,
          <volume>2</volume>
          :
          <fpage>1481</fpage>
          -
          <lpage>1492</lpage>
          ,
          <year>August 2009</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref10">
        <mixed-citation>
          [10]
          <string-name>
            <given-names>Jeffrey</given-names>
            <surname>Dean</surname>
          </string-name>
          and
          <string-name>
            <given-names>Sanjay</given-names>
            <surname>Ghemawat</surname>
          </string-name>
          .
          <article-title>Mapreduce: a flexible data processing tool</article-title>
          . Commun. ACM,
          <volume>53</volume>
          :
          <fpage>72</fpage>
          -
          <lpage>77</lpage>
          ,
          <year>January 2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref11">
        <mixed-citation>
          [11]
          <string-name>
            <surname>Jeffrey</surname>
            <given-names>Dean</given-names>
          </string-name>
          ,
          <string-name>
            <given-names>Sanjay</given-names>
            <surname>Ghemawat</surname>
          </string-name>
          , and
          <string-name>
            <given-names>Google</given-names>
            <surname>Inc</surname>
          </string-name>
          .
          <article-title>Mapreduce: simplified data processing on large clusters</article-title>
          .
          <source>In In OSDI04: Proceedings of the 6th conference on Symposium on Opearting Systems Design &amp; Implementation. USENIX Association</source>
          ,
          <year>2004</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref12">
        <mixed-citation>
          [12]
          <string-name>
            <surname>Leonidas</surname>
            <given-names>Fegaras</given-names>
          </string-name>
          ,
          <string-name>
            <given-names>Chengkai</given-names>
            <surname>Li</surname>
          </string-name>
          ,
          <string-name>
            <given-names>and Upa</given-names>
            <surname>Gupta</surname>
          </string-name>
          .
          <article-title>An optimization framework for map-reduce queries</article-title>
          .
          <source>In EDBT</source>
          <year>2012</year>
          , march
          <year>2012</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref13">
        <mixed-citation>
          [13]
          <string-name>
            <surname>Avrilia</surname>
            <given-names>Floratou</given-names>
          </string-name>
          ,
          <string-name>
            <surname>Jignesh M. Patel</surname>
            ,
            <given-names>Eugene J.</given-names>
          </string-name>
          <string-name>
            <surname>Shekita</surname>
            , and
            <given-names>Sandeep</given-names>
          </string-name>
          <string-name>
            <surname>Tata</surname>
          </string-name>
          .
          <article-title>Column-oriented storage techniques for mapreduce</article-title>
          .
          <source>Proc. VLDB Endow</source>
          .,
          <volume>4</volume>
          :
          <fpage>419</fpage>
          -
          <lpage>429</lpage>
          ,
          <year>April 2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref14">
        <mixed-citation>
          [14]
          <string-name>
            <surname>Alan</surname>
            <given-names>F Gates. Programming</given-names>
          </string-name>
          <string-name>
            <surname>Pig. O'Reilly Media</surname>
          </string-name>
          ,
          <year>2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref15">
        <mixed-citation>
          [15]
          <string-name>
            <given-names>Herodotos</given-names>
            <surname>Herodotou</surname>
          </string-name>
          .
          <article-title>Hadoop performance models</article-title>
          .
          <source>CoRR, abs/1106.0940</source>
          ,
          <year>2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref16">
        <mixed-citation>
          [16]
          <string-name>
            <given-names>Herodotos</given-names>
            <surname>Herodotou</surname>
          </string-name>
          and
          <string-name>
            <given-names>Shivnath</given-names>
            <surname>Babu</surname>
          </string-name>
          .
          <article-title>Profiling, what-if analysis, and cost-based optimization of mapreduce programs</article-title>
          .
          <source>PVLDB</source>
          ,
          <volume>4</volume>
          (
          <issue>11</issue>
          ):
          <fpage>1111</fpage>
          -
          <lpage>1122</lpage>
          ,
          <year>2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref17">
        <mixed-citation>
          [17]
          <string-name>
            <surname>Eaman</surname>
            <given-names>Jahani</given-names>
          </string-name>
          ,
          <string-name>
            <given-names>Michael J.</given-names>
            <surname>Cafarella</surname>
          </string-name>
          , and
          <string-name>
            <surname>Christopher</surname>
            <given-names>R</given-names>
          </string-name>
          ´e.
          <article-title>Automatic optimization for mapreduce programs</article-title>
          .
          <source>Proc. VLDB Endow</source>
          .,
          <volume>4</volume>
          :
          <fpage>385</fpage>
          -
          <lpage>396</lpage>
          , mar
          <year>2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref18">
        <mixed-citation>
          [18]
          <string-name>
            <surname>Dawei</surname>
            <given-names>Jiang</given-names>
          </string-name>
          ,
          <string-name>
            <surname>Anthony K. H. Tung</surname>
            , and
            <given-names>Gang</given-names>
          </string-name>
          <string-name>
            <surname>Chen</surname>
          </string-name>
          .
          <article-title>Map-join-reduce: Toward scalable and efficient data analysis on large clusters</article-title>
          .
          <source>IEEE Transactions on Knowledge and Data Engineering</source>
          ,
          <volume>23</volume>
          :
          <fpage>1299</fpage>
          -
          <lpage>1311</lpage>
          ,
          <year>2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref19">
        <mixed-citation>
          [19]
          <string-name>
            <given-names>YongChul</given-names>
            <surname>Kwon</surname>
          </string-name>
          , Magdalena Balazinska, Bill Howe, and
          <string-name>
            <given-names>Jerome</given-names>
            <surname>Rolia</surname>
          </string-name>
          .
          <article-title>A study of skew in mapreduce applications</article-title>
          . Moskow, Russia, june
          <year>2011</year>
          .
          <article-title>In the 5th Open Cirrus Summit</article-title>
          .
        </mixed-citation>
      </ref>
      <ref id="ref20">
        <mixed-citation>
          [20]
          <string-name>
            <surname>Yuting</surname>
            <given-names>Lin</given-names>
          </string-name>
          , Divyakant Agrawal, Chun Chen, Beng Chin Ooi, and
          <string-name>
            <given-names>Sai</given-names>
            <surname>Wu</surname>
          </string-name>
          .
          <article-title>Llama: leveraging columnar storage for scalable join processing in the mapreduce framework</article-title>
          .
          <source>In Proceedings of the 2011 international conference on Management of data, SIGMOD '11</source>
          , pages
          <fpage>961</fpage>
          -
          <lpage>972</lpage>
          , New York, NY, USA,
          <year>2011</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref21">
        <mixed-citation>
          [21]
          <string-name>
            <given-names>Gang</given-names>
            <surname>Luo</surname>
          </string-name>
          and
          <string-name>
            <given-names>Liang</given-names>
            <surname>Dong</surname>
          </string-name>
          .
          <article-title>Adaptive join plan generation in hadoop</article-title>
          .
          <source>Technical report</source>
          , Duke University,
          <year>2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref22">
        <mixed-citation>
          [22]
          <string-name>
            <given-names>Christine</given-names>
            <surname>Morin</surname>
          </string-name>
          and Gilles Muller, editors.
          <source>European Conference on Computer Systems, Proceedings of the 5th European conference on Computer systems, EuroSys</source>
          <year>2010</year>
          , Paris, France,
          <source>April 13-16</source>
          ,
          <year>2010</year>
          . ACM,
          <year>2010</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref23">
        <mixed-citation>
          [23]
          <string-name>
            <given-names>Alper</given-names>
            <surname>Okcan</surname>
          </string-name>
          and
          <string-name>
            <given-names>Mirek</given-names>
            <surname>Riedewald</surname>
          </string-name>
          .
          <article-title>Processing thetajoins using mapreduce</article-title>
          .
          <source>In Proceedings of the 2011 international conference on Management of data, SIGMOD '11</source>
          , pages
          <fpage>949</fpage>
          -
          <lpage>960</lpage>
          , New York, NY, USA,
          <year>2011</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref24">
        <mixed-citation>
          [24]
          <string-name>
            <given-names>Konstantina</given-names>
            <surname>Palla</surname>
          </string-name>
          .
          <article-title>A comparative analysis of join algorithms using the hadoop map/reduce framework</article-title>
          .
          <source>Master's thesis</source>
          ,
          <source>MSc Informatics</source>
          , School of Informatics, University of Edinburgh,
          <year>2009</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref25">
        <mixed-citation>
          [25]
          <string-name>
            <given-names>Andrew</given-names>
            <surname>Pavlo</surname>
          </string-name>
          , Erik Paulson, Alexander Rasin, Daniel J. Abadi,
          <string-name>
            <surname>David J. DeWitt</surname>
          </string-name>
          , Samuel Madden, and
          <string-name>
            <given-names>Michael</given-names>
            <surname>Stonebraker</surname>
          </string-name>
          .
          <article-title>A comparison of approaches to large-scale data analysis</article-title>
          .
          <source>In Proceedings of the 35th SIGMOD international conference on Management of data, SIGMOD '09</source>
          , pages
          <fpage>165</fpage>
          -
          <lpage>178</lpage>
          , New York, NY, USA,
          <year>2009</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref26">
        <mixed-citation>
          [26]
          <string-name>
            <surname>Donovan</surname>
            <given-names>A.</given-names>
          </string-name>
          <string-name>
            <surname>Schneider</surname>
            and
            <given-names>David J. DeWitt.</given-names>
          </string-name>
          <article-title>A performance evaluation of four parallel join algorithms in a shared-nothing multiprocessor environment</article-title>
          .
          <source>SIGMOD Rec</source>
          .,
          <volume>18</volume>
          :
          <fpage>110</fpage>
          -
          <lpage>121</lpage>
          ,
          <year>June 1989</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref27">
        <mixed-citation>
          [27]
          <string-name>
            <surname>Rares</surname>
            <given-names>Vernica</given-names>
          </string-name>
          ,
          <string-name>
            <given-names>Michael J.</given-names>
            <surname>Carey</surname>
          </string-name>
          , and
          <string-name>
            <given-names>Chen</given-names>
            <surname>Li</surname>
          </string-name>
          .
          <article-title>Efficient parallel set-similarity joins using mapreduce</article-title>
          .
          <source>In Proceedings of the 2010 international conference on Management of data, SIGMOD '10</source>
          , pages
          <fpage>495</fpage>
          -
          <lpage>506</lpage>
          , New York, NY, USA,
          <year>2010</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref28">
        <mixed-citation>
          [28]
          <string-name>
            <surname>Vertica</surname>
            <given-names>Systems</given-names>
          </string-name>
          , Inc.
          <source>Managing Big Data with Hadoop &amp; Vertica</source>
          ,
          <year>2009</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref29">
        <mixed-citation>
          [29]
          <string-name>
            <surname>Guanying</surname>
            <given-names>Wang</given-names>
          </string-name>
          , Ali Raza Butt, Prashant Pandey, and
          <string-name>
            <given-names>Karan</given-names>
            <surname>Gupta</surname>
          </string-name>
          .
          <article-title>A simulation approach to evaluating design decisions in mapreduce setups</article-title>
          .
          <source>In MASCOTS</source>
          , pages
          <fpage>1</fpage>
          -
          <lpage>11</lpage>
          . IEEE,
          <year>2009</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref30">
        <mixed-citation>
          [30]
          <string-name>
            <surname>Hung-chih Yang</surname>
          </string-name>
          , Ali Dasdan,
          <string-name>
            <surname>Ruey-Lung Hsiao</surname>
            , and
            <given-names>D. Stott</given-names>
          </string-name>
          <string-name>
            <surname>Parker</surname>
          </string-name>
          .
          <article-title>Map-reduce-merge: simplified relational data processing on large clusters</article-title>
          .
          <source>In Proceedings of the 2007 ACM SIGMOD international conference on Management of data, SIGMOD '07</source>
          , pages
          <fpage>1029</fpage>
          -
          <lpage>1040</lpage>
          , New York, NY, USA,
          <year>2007</year>
          . ACM.
        </mixed-citation>
      </ref>
      <ref id="ref31">
        <mixed-citation>
          [31]
          <string-name>
            <surname>Anna</surname>
            <given-names>Yarygina</given-names>
          </string-name>
          , Boris Novikov, and
          <string-name>
            <given-names>Natalia</given-names>
            <surname>Vassilieva</surname>
          </string-name>
          .
          <article-title>Processing complex similarity queries: A systematic approach</article-title>
          . In Maria Bielikova, Johann Eder, and A Min Tjoa, editors,
          <source>ABDIS 2011 Research Communications: Proceedings II of the 5th East-European Conference on Advances in Databases and Information Systems 20 - 23 September</source>
          <year>2011</year>
          , Vienna, pages
          <fpage>212</fpage>
          -
          <lpage>221</lpage>
          . Austrian Computer Society,
          <year>September 2011</year>
          .
        </mixed-citation>
      </ref>
      <ref id="ref32">
        <mixed-citation>
          [32]
          <string-name>
            <surname>Minqi</surname>
            <given-names>Zhou</given-names>
          </string-name>
          , Rong Zhang, Dadan Zeng, Weining Qian, and
          <string-name>
            <given-names>Aoying</given-names>
            <surname>Zhou</surname>
          </string-name>
          .
          <article-title>Join optimization in the mapreduce environment for column-wise data store</article-title>
          .
          <source>In Proceedings of the 2010 Sixth International Conference on Semantics, Knowledge and Grids</source>
          ,
          <source>SKG '10</source>
          , pages
          <fpage>97</fpage>
          -
          <lpage>104</lpage>
          , Washington, DC.
        </mixed-citation>
      </ref>
    </ref-list>
  </back>
</article>