=Paper= {{Paper |id=None |storemode=property |title=Generalized Parallel Join Algorithms and Designing Cost Models |pdfUrl=https://ceur-ws.org/Vol-899/paper5.pdf |volume=Vol-899 |dblpUrl=https://dblp.org/rec/conf/syrcodis/Pigul12 }} ==Generalized Parallel Join Algorithms and Designing Cost Models== https://ceur-ws.org/Vol-899/paper5.pdf
     Generalized Parallel Join Algorithms and Designing Cost
                             Models

                                                    Alice Pigul
                                                   SPbSU
                                             m05pay@math.spbu.ru

                      Abstract                              some discussion of experiments will be given.

Applications for large-scale data analysis use such         2 Related work
techniques as parallel DBMS, MapReduce (MR)
paradigm, and columnar storage. In this paper we focus
                                                            2.1 Architectural Approaches
in a MapReduce environment. The aim of this work is
to compare the different join algorithms and designing      Column storage is one of the architectural approaches to
cost models for further use in the query optimizer.         store data in columns, that the values of one field are
                                                            stored physically together in a compact storage area.
                                                            Column storage strategy improves performance by
1 Introduction                                              reducing the amount of unnecessary data from disk by
                                                            excluding the columns that are not needed. Additional
Data-intensive applications include large-scale data        gains may be obtained using data compression. Storage
warehouse systems, cloud computing, data-intensive          method in columns outperforms row-based storage for
analysis. These applications have their own specific        workloads typical for analytical applications, which are
computational workload. For example, analytic systems       characterized by heavy selection operation from
produce relatively rare updates but heavy select            millions of records, often with aggregation and by
operation with millions of records to be processed, often   infrequent update operation. For this class of workloads
with aggregations.                                          I/O is major factor limited the performance.
    There are the following architectures that are used     Comparison of column-wise and row-wise stores
to analyze massive amounts of data: MapReduce               approaches is presented in [1].
paradigm, parallel DBMSs, column-wise store, and                Another architectural approach is a software
various combinations of these approaches.                   framework MapReduce. Paradigm MapReduce was
    Applications of this type process multiple data sets.   introduced in [11] to process massive amounts of
This implies need to perform several join operation. It’s   unstructured data.
known join operation is one of the most expensive               Originally, this approach was contrasted with a
operations in terms both I / O and CPU costs.               parallel DBMS. Deep analysis of the advantages and
   Unfortunately, join algorithms is not directly           disadvantages of these two architectures was presented
supported in MapReduce. There are some approaches to        in [25,10].
solve this problem by using a high-level language               Later, hybrid systems appeared in [9, 2]. There are
PigLatin, HiveQL for SQL queries or implementing            three ways to combine approaches MapReduce and
algorithms from research papers. The aim of this work       parallel DBMS.
is to generalize and compare existing equi-join                   MapReduce inside a parallel DBMS. The main
algorithms with some optimization techniques and build                intention is to move computation closer to
cost model which could be used in a query optimizer for               data. This architecture can be exemplified with
a distributed DBMS with MapReduce.                                    hybrid database Greenplum with MAD
   This paper is organized as follows the section 2                   approach [9].
describe state of the art. Join algorithms and some
                                                                     DBMS inside MapReduce. The basic idea is
optimization techniques were introduced in 3 section.
                                                                     to connect multiple single node database
The designing of cost models for join algorithms are
                                                                     systems using MapReduce as the task
presented in 4 section. Performance evaluation will be
                                                                     coordinator and network communication layer.
described in 5 section. Finally, future direction and
                                                                     An example is a hybrid database HadoopDB
                                                                     [2].
Proceedings of the Spring Young Researcher's
Colloquium On Database and Information Systems                       MapReduce aside of the parallel DBMS.
SYRCoDIS, Moscow, Russia, 2012                                       MapReduce is used to implement an ETL
                                                                     produced data to be stored in parallel DBMS.
         This approach is discussed in [28] Vertica,        the I/O cost for each phase separately is given in [24].
         which also supports the column-wise store.         Simple theoretical considerations for selecting a
    Another group of hybrid systems combines                particular join algorithm are presented in [21]. Another
MapReduce with column-wise store. MapReduce and             approach [7] for selecting join algorithm is to measure
column-wise store are effective in data-intensive           the correlation between the input size and the join
applications. Hybrid systems based on this two              algorithm execution time with fixed cluster
techniques may be found in [20,13].                         configuration settings.

2.2 Algorithms for Join Operation                           3      Join algorithms and optimization
Detailed comparison of relational join algorithms was              techniques
presented in [26]. In our paper, the consideration is
restricted to a comparison of joins in the context of        In this section we consider various techniques of two-
MapReduce paradigm.                                         way joins in MapReduce framework. Join algorithms
   Papers which discuss equi-join algorithms can be         can be divided into two groups: Reduce-side join and
divided into two categories which describe join             Map-side join. The pseudo code presented in Listings,
algorithms and multi join execution plans.                  where R – right dataset, L – left dataset, V – line from
The former category deals with design and analyses join     file, Key – join key, that was parsed from a tuple, in this
algorithm of two data sets. A comparative analysis of       context tuple is V.
two-way join techniques is presented in [6, 4, 21]. The
cost model for two-way join algorithms in terms of cost     3.1 Reduce-Side join
I/O is presented in [7, 17].
   The basic idea of multi-way join is to find strategies   Reduce-side join is an algorithm which performs data
to combine the natural join of several relations.           pre-processing in Map phase, and direct join is done
Different join algorithms from relation algebra are         during the Reduce phase. Join of this type is the most
presented in [30]. The authors introduce the extension      general without any restriction on the data. Reduce-side
of MapReduce to facilitate implement relation               join is the most time-consuming, because it contains an
operations. Several optimizations for multi-way join are    additional phase and transmits data over the network
described in [3, 18]. Authors introduced a one-to-many      from one phase to another. In addition, the algorithm
shuffling strategy. Multi-way join optimization for         has to pass information about source of data through the
column-wise store is considered in [20, 32].                network. The main objective of the improvement is to
   Theta-Joins and set-similarity joins using               reduce the data transmission over the network from the
MapReduce are addressed in [23] and [27] respectively.      Map task to the Reduce task by filtering the original
                                                            data through semi-joins. Another disadvantage of this
2.3 Optimization techniques and cost models                 class of algorithms is the sensitivity to the data skew,
                                                            which can be addressed by replacing the default hash
In contrast to the sql queries in parallel database, the    partitioner with a range partitioner.
MapReduce program contains user-defined map and             There are three algorithms in this group:
reduce functions. Map and reduce functions can be                 General reducer-side join,
considered as a black-box, when nothing is known
                                                                      Optimized reducer-side join,
about these functions, or they can be written on sql-like
languages, such as HiveQL, PigLatin, MRQL, or sql                     the Hybrid Hadoop join.
operations can be extracted from functions on semantic          General reducer-side join is the simplest one. The
basis. Automatic finding good configuration settings for    same algorithms are called Standard Repartition Join in
arbitrary program offered in [16]. Theoretical designing    [6]. The abbreviation is GRSJ.
cost models for arbitrary MR program for each phase
                                                                Map (K: null, V from R or L)
separately presented in [15]. If the MR program is
                                                                  Tag = bit from name of R or L;
similar to the semantics of SQL, it allows us to                  emit (Key, pair(V,Tag));
construct a more accurate cost model or adapt some of
the optimization techniques from relational databases.          Reduce (K’: join key, LV: list of V with key K’)
HadoopToSQL [22] allows to take advantage of two                   create buffers Br and Bl for R and L;
different data storages such as SQL database and the               for t in LV do
text format in MapReduce storage and to use index at                     add t.v to Br or Bl by t.Tag;
right time by transforming the MR program to SQL.                  for r in Br do
Manimal system [17] uses static analysis for detection                   for l in Bl do
                                                                          emit (null, tuple(r.V,l.V));
and exploiting selection, projection and data
compression in MR programs and if needed to employ          Listing 1: GRSJ.
B+ tree index.                                              This algorithm has both Map and Reduce phases. In the
New SQL-like query language and algebra is presented        Map phase, data are read from two sources and tags are
in [12]. But they are needed cost model based on            attached to the value to identify the source of a
statistic. Detailed construction of the model to estimate   key/value pair. As the key is not effecting by this
tagging, so we can use the standard hash partitioner. In     set is pulled out of blocks from a distributed system in
Reduce phase, data with the same key and different tags      the Reduce phase, where it is joined with another data
are joined with nested-loop algorithm. The problems of       set that came from the Map phase. The similarity with
this approach are that the reducer should have sufficient    the Map-side join is the restriction that one of the sets
memory for all records with a same key; and the              has to be split in advance with the same partitioner,
algorithm sensitivity to the data skew.                      which will split the second set. Unlike Map-side join, it
    Optimized reducer-side join enhances previous            is necessary to split in advance only one set. The
algorithm by overriding sorting and grouping by the          similarity with the Reduce-side join is that algorithm
key, as well as tagging data source. Also known as           requires two phases, one of them for pre-processing of
Improved Repartition Join in [6], Default join in [14].      data and one for direct join. In contrast with the
The abbreviation is ORSJ. In the algorithm all the           Reduce-side join we do not need additional information
values of the first tag are followed by the values of the    about the source of data, as they come to the Reducer at
second one. In contrast with the General reducer-side        a time.
join, the tag is attached to both a key and a value. Due
to the fact that the tag is attached to a key, the                3.2 Map-Side join
partitioner must be overridden in order to split the nodes   Map-side join is an algorithm without Reduce phase.
by the key only. This case requires buffering for only       This kind of join can be divided into two groups. First
one of input sets. Optimized reducer-side join inherits      of them is partition join, when data previously
major disadvantages of General reducer-side join             partitioned into the same number of parts with the same
namely the transferring through the network additional       partitioner. The relevant parts will be joined during the
information about the source and the algorithm               Map phase. This map-side join is sensitive to the data
sensitivity to the data skew.                                skew. The second is in memory join, when the smaller
  Map (K:null, V from R or L)                                dataset send whole to all mappers and bigger dataset is
    Tag = bit from name of R or L;                           partitioned over the mappers. The problem with this
    emit (pair(Key,Tag), pair(V,Tag));                       type of join occurs when the smaller of the sets can not
                                                             fit in memory.
  Partitioner(K:key, V:value, P:the number of reducers)      There are three methods to avoid this problem:
      return hash_f(K.Key) mod P;                                   JDBM-based map join,
  Reduce (K’: join key, LV: list of V’ with key K’)                   Multi-phase map join,
     create buffers Br for R;                                         Reversed map join.
     for t in LV with t.Tag corresponds to R do
          add t.v to Br;                                     Map-side partition join algorithm assumes that the two
     for l in LV with l.Tag corresponds to L do              sets of data pre-partitioned into the same number of
          for r in Br do                                     splits by the same partitioner. Also known as default
               emit (null, tuple(r.V,l.V));                  map join. The abbreviation is MSPJ. At the Map phase
Listing 2: ORSJ.                                             one of the sets is read and loaded into the hash table,
   The Hybrid join [4] combines the Map-side and             then two sets are joined by the hash table. This
Reduce-side   joins.    The     abbreviation   is            algorithm buffers all records with the same keys in
HYB.                                                         memory, as is the case with skew data may fail due to
                                                             lack of enough memory.
  Job 1: partition the smaller file S
    Map (K:null, V from S)                                     Job 1: partition dataset S as in HYB
       emit (Key,V);                                           Job 2: partition dataset B as in HYB
                                                               Job 3: join two datasets
     Reduce (K’:join key, LV: list of V’ with key K’)            init() //for Map phase
       for t in LV do                                               read needed partition of output file from Job 1;
           emit (null, t);                                          add it to hashMap(Key, list(V)) H;
                                                                  Map(K:null, V from B)
  Job 2: join two datasets                                           if (K in H) then
    Map (K:null, V from B)                                               for r in LV do
        emit (Key,V);                                                        for l in H.get(K) do
                                                                           emit(null, tuple(r,l));
     init() //for Reduce phase                               Listing 4: MSPJ.
        read needed partition of output file from Job 1;     Map-side partition merge join is an improvement of the
        add it to hashMap(Key, list(V)) H;
                                                             previous version of the join. The abbreviation is
     Reduce (K’:join key, LV: list of V’ with key K’)
        if(K’ in H) then
                                                             MSPMJ. If data sets in addition to their partition are
            for r in LV do                                   sorted by the same ordering, we apply merge join. The
                for l in H.get(K’) do                        advantage of this approach is that the reading of the
                    emit (null, tuple(r,l));                 second set is on-demand, but not completely, thus
                                                             memory overflow can be avoided. As in the previous
Listing 3: HYB.
                                                             cases, for optimization can be used the semi-join
    In Map phase, we process only one set and the
                                                             filtering and range partitioner.
second set is partitioned in advance. The pre-partitioned
  Job 1: partition S dataset as in HYB                         Idea of Reversed map join [21] approach is that the
  Job 2: partition B dataset as in HYB                         bigger of the sets, which is partitions during the Map
  Job 3: join two datasets                                     phase, loading in the hash table. Also known as
    init() //for Map phase                                     Broadcast Join in [6]. The abbreviation is REV. The
       find needed partition SP of output file from Job 1;     second dataset is read from a file line by line and joined
       read first lines with the same key K2 from SP and add   using a hash table.
             to buffer B;
    Map(K:null, V from B)                                        init() //for Map phase
         while (K > K2) do                                          read S from HDFS;
               read T from SP with key K2;                          add it to hashMap(Key, list(V)) H;
               while (K == K2) do                                map (K:null, V from S)
                    add T to B;                                      add to hashMap(Key, V) H;
                    read T from SP with key K2;                  close() //for Map phase
         if (K == K2) then                                            find B in HDFS
               for r in B do                                          while (not end B) do
                   emit(null, tuple(r,V));                                 read line T;
                                                                           K = join key from tuple T;
Listing 5: MSPMJ.                                                          if (K in H) then
In-Memory Join does not require to distribute original                           for l in H.get(K) do
data in advance unlike the versions of map joins                                    emit(null, tuple(T,l));
discussed above. The same algorithms are called Map-           Listing 7: REV.
side replication join in [7], Broadcast Join in [6],
Memory-backed joins [4], Fragment-Replicate join in               3.3 Semi-Join
[14]. The abbreviation is IMMJ. Nevertheless, this
algorithm has a strong restriction on the size of one of       Sometimes a large portion of the data set does not take
the sets: it must fit completely in memory. The                part in the join. Deleting of tuples that will not be used
advantage of this approach is its resistance to the data       in join significantly reduces the amount of data
skew because it sequentially reads the same number of          transferred over the network and the size of the dataset
tuples at each node. There are two options for                 for the join. This preprocessing can be carried out using
transferring the smaller of the sets:                          semi-joins by selection or by a bitwise filter. However,
      using a distributed cache,                              these filtering techniques introduce some cost (an
      reading from a distributed file system.                 additional MR job), so the semi-join can improve the
  init() // for Map phase                                      performance of the system only if the join key has low
     read S from HDFS;                                         selectivity. There are three ways to implement the semi-
     add it to hashMap(Key, list(V)) H;                        join operation:
  map (K:null, V from B)                                             a semi-join using bloom-filter,
       if (K in H) then
          for l in H.get(K) do                                          semi-join using selection,
               emit (null, tuple(v,l));
                                                                        an adaptive semi-join.
Listing 5: IMMJ.
The next three algorithms optimize the In-Memory Join          Bloom-filter is a bit array that defines a membership of
for a case, when two sets are large and no of them fits        element in the set. False positive answers are possible,
into the memory.                                               but there are no false-negative responses in the solution
JDBM-based map join is presented in [21]. In this case,        of the containment problem. The accuracy of the
JDBM library automatically swaps hash table from               containment problem solution depends on the size of
memory to disk.                                                the bitmap and on the number of elements in the set.
                                                               These parameters are set by the user. It is known that
  The same as IMMJ, but H is implemented by HTree              for a bitmap of fixed size m and for the data set of n
  instead of hashMap .                                         tuples, the optimal number of hash functions is
Listing 6: JDBM.                                               k=0.6931*m/n. In the context of MapReduce, the semi-
Multi-phase map join [21] is algorithm where the               join is performed in two jobs. The first job consists of
smaller of the sets is partitioned into parts that fit into    the Map phase, in which keys from one set are selected
memory, and for each part runs In-Memory join. The             and added to the Bloom-filter. The Reduce phase
problem with this approach is that it has a poor               combines several Bloom-filters from first phase into
performance. If the size of the set, which to be put in        one. The second job consists only of the Map phase,
the memory is increased twice, the execution time of           which filters the second data set with a Bloom-filter
this join is also doubled. It is important to note that the    constructed in previous job. The accuracy of this
set, which will not be loaded into memory, will be read        approach can be improved by increasing the size of the
many times from the disk.                                      bitmap. However in this case, a larger bitmap consumes
  For part P from S that fit into memory do IMMJ(P,B).         more amounts of memory. The advantage of this
                                                               method is its the compactness. The performance of the
Listing 7: Multi-phase map join.                               semi-join using Bloom-filter highly depends on the
                                                               balance between the Bloom-filter size, which increases
the time needed for its reconstruction of the filter in the   The disadvantage of this approach is that additional
second job, and the number of false positive responses        information about the source of data is transmitted over
in the containment solution. The large size of the data       the network.
set can seriously degrade the performance of the join.          Job 1: find keys which are present in two datasets
  Job 1: construct Bloom filter                                   Map (K:null, V from R or L)
    Map (K:null, V from L)                                           Tag = bit from name of R or L;
         Add Key to BloomFilter Bl                                   emit (Key,Tag);
    close() //for Map phase
         emit(null, Bl);                                           Reduce (K’: join key, LV: list of V with key K’)
                                                                     Val = first value from LV;
    Reduce (K’: key, LV) //only 1 Reducer                             for t in LV do
     for l in LV do                                                       if (not Val==Val2) then
        union filters by operation Or                                        emit (null, K’);
     close() // for Reduce phase
        write resulting filter into file;                       Job 2: before joining it is necessary to filter the smaller
                                                                dataset by keys from the Job 1 that will be loaded into
  Job 2: filter dataset                                         hash map. Then the bigger dataset is joined with filtered
    init() //for Map phase                                      one.
          read filter from file in Bl
                                                              Listing 8: Adaptive semi-join.
    Map (K:null, V from R)
          if (Key in Bl) then
             emit (null, V);                                     3.4 Range Partitioners

  Job 3: do join with L dataset and filtered dataset from     All algorithms, except the In-Memory join and their
  Job 2.                                                      optimizations are sensitive to the data skew. This
Listing 7: Semi-join using Bloom-filter.                      section describes two techniques of the default hash
Semi-join with selection extracts unique keys and             partitioner replacement.
constructs a hash table. The second set is filtered by the    A Simple Range-based Partitioner [4] (this kind similar
hash table constructed in the previous step. In the           to the Skew join in [14]) applies a range vector of
context of MapReduce, the semi-join is performed in           dimension n constructed from the join keys before
two jobs. Unique keys are selected during the Map             starting a MR job. By this vector join keys will be
phase of the first job and then they are combined into        splitted into n parts, where n is the number of Reduce
one file during the Map phase. The second job consists        jobs. Ideally partitioner vector is constructed from the
of only the Map phase, which filters out the second set.      whole original set of keys, in practice a certain number
The semi-join using selection has some limitations.           of keys is chosen randomly from the data set. It is
Hash table in memory, based on records of unique keys,        known that the optimal number of keys for the vector
can be very large, and depends on the key size and the        construction is equal to the square root of the total
number of different keys.                                     number of tuples. With a heavy data skew into a single
                                                              key value, some elements of the vector may be
  Job 1: find unique keys
    Map (K:null, V from L)
                                                              identical. If the key belongs to multiple nodes, a node is
        Create HashMap H;                                     selected randomly in the case of data on which to build
        if (not Key in H) then                                a hash table, otherwise the key is sent to all nodes (to
             add Key to H;                                    save memory as a hash table is contained in the
             emit (Key, null);                                memory).
                                                              Virtual Processor Partitioner [4] is an improvement of
    Reduce (K’: key, LV) //only one Reducer                   the previous algorithm based on increasing the number
      emit (null,key);                                        of partition. The number of parts is specified multiple of
                                                              the tasks number. The approach tends to load the nodes
  Job 2: filter dataset
                                                              with the same keys uniformly (compared with the
    init() //for Map phase
         add to HashMap H unique keys from job 1;             previous version). The same keys are scattered on more
    Map (K:null, V from R)                                    nodes than in the previous case.
         if (Key in H) then
            emit (null,V);

  Job 3: do join with L dataset and filtered dataset from
  Job 2.
Listing 8: Semi-join with selection.
     The Adaptive semijoin is performed in one job, but
filters the original data on the flight during the join.
Similar to the Reduce-side join at the Map phase the
keys from two data sets are read and values are set
equal to tags which identify the source of the keys. At
the Reduce phase keys with different tags are selected.
  //before the MR job starts                                 behavior of each algorithm for parallel query.
  // optimal max = sqrt(|R|+|L|)                             Analytical model is cost formulas that are used to
  getSamples (Red:the number of reducers, max: the max       calculate the query execution time, taking into account
                 number of samples)                          the specific of parallel algorithm. Below, analytical
       C = max/Splits.length;                                cost model for join algorithms and their optimizations
       Create buffer B;                                      will be constructed.
      for s in Splits of R and L do
            get C keys from s;
                                                             4.1Configuration settings
            add it to B;
       sort B;
      //in case simple range partitioner P == 1              Execution of MR program depends on input data
      //in case virtual range partitioner P > 1              statistic such as selectivity, skew, compression, on
      for j<(Red*P) do                                       cluster resource such as number of nodes, on
             T = B.length/(Red*P)*(j+1);                     configuration parameters, such as I/O cost, and on
            write into file B[T];                            properties of specific algorithm. Below, the parameters
                                                             used in the analysis are presented in table.
  Map(K:null, V from L or R)
     Tag = bit from name of R or L;
                                                             Variable   Description
     read file with samples and add samples to Buffer B;
     //in case virtual partition it is needed to             s(x)       Size of x in mb
     // each index mod |Reducers|                            p(x)       Number of pairs for split x
     Ind = {i: B[i-1] < Key <= B[i]}                         wid        Pair width
     // Ind may be array of indexes in skew case             ct         The average computation time needed per
     if (Ind.length >1) then                                            pair
           if (V in L) then                                  pC         The cost for partition
                node = random(Ind);
               emit (pair(Key, node), pair(V, Tag));
                                                             sC         The cost for serialization
           else                                              sortC      The cost for sorting on keys
               for i in Ind do                               cC         The cost for executing combine function
                   emit (pair(Key, i), pair(V, Tag));        mC         The cost for merge
      else                                                   selP       Selectivity of pairs
          emit (pair(Key, Ind), pair(V, Tag));               selC       Selectivity of combining
                                                             |red|      Number of reducers
  Partitioner (K:key, V:value, P:the number of reducers)
      return K.Ind;                                          |map|      Number of mappers
                                                             rh         The cost for reading from HDFS
  Reducer (K’: join key, LV: list of V’ with key K’)         wh         The cost for writing to HDFS
      The same as GRSJ                                       rwl        The cost for local I/O operations
Listing 8: The range partitioners.                           tC         The cost of network transfer
                                                             sortMB     io.sort.mb parameter in Hadoop
3.5 Distributed cache                                                   configuration
                                                             sortRP     io.sort.record.percent
The advantage of using distributed cache is that data set    sortSP     io.sort.spill.percent
are copied only once at the node. It is especially           F          io.sort.factor
effective if several tasks at one node need the same file.   shuBP      mapred.job.shuffle.input.buffer.percent
In contrast the access to the global file system needs       shuMP      mapred.job.shuffle.merge.percent
more communication between the nodes. Better                 memMT      mapred.inmem.merge.threshold
performance of the joins without the cache can be            memT       mapred.child.java.opts
achieved by increasing number of the files replication,      redBP      mapred.job.reduce.input.buffer.percent
so there's a good chance to access the file version
locally.                                                     4.2 Cost of arbitrary MR program

4 Cost model                                                 As mentioned above, the MR job consists of the
                                                             execution stages, thus it is possible to estimate each
Due to significant differences between parallel DBMS         phase separately. Job may contain the following stages:
and MapReduce, the MapReduce paradigm requires               Setup, Read (read map input), Map (map function),
another optimization techniques based on indexing and        Buffer (serializing to buffer, partitioning, sorting,
compression, programming models, data distribution           combining, compressing, write output data to local
and query execution strategy. Therefore, we need a           disk), Merge (merging spill files), Shuffle (transferring
different strategy of designing model cost. There are        map output to reducers), MergeR(merging received
two types of designing cost models: the task execution       files), Reduce (reduce function), Write (writing result to
simulation [29] and analytical cost calculation [15, 24].    the HDFS), Cleanup. Due to the fact that the job of MR
To measure the query parallelism effectiveness, it is        program carried out in parallel or in waves, it is possible
need to build a cost model that can describe the             to calculate the approximate total cost of the job
through the cost of one task (one mapper and one                 The number of spill files equal to sum of spill files at
reducer). The Cost job take into account the parallel            first pass (S1P), at intermediate pass (SIP) and at final
                                                                 pass (SFP):
threads of execution and compute the total cost of MR
job, where cm and cr are costs of one task mapper or                  N , N  F
                                                                      
reducer respectively, MaxMN and MaxRN are                        S1   F , ( N  1) mod( F  1)  0,
maximum map tasks or reduce task per node.                            ( N  1) mod( F  1)  1
                                                                      
                   | map | *c m     | red | *c r
  Cost job                                      ctr                 0, N  F
               | nodes | *MaxMN | nodes | *MaxRN                       
This formula is bad for the skew data, when one task is          SIP         N  S1
                                                                       S1   F  * F , N  F
                                                                                               2
time consuming.
                                                                                    
      сread  cMap  c Buffer  сmerge , | red | 0 ,
      
 сm                                                                   N , N  F
      cread  cMap  cWrite , otherwise
                                                                       
                                                                 SFP    N  S1
cr  cshuffle  cmergeR  creduce  cWrite .                            1   F   N  SIP, N  F
                                                                                                    2


CMap and creduce are the cost of user-define functions, so
                                                                                   
for each join algorithm it is calculated by the own
formula. Another cost values from (cread, cBuffer, cWrite,       cmerge  p(buf ) * wid * rwl * (2 * SIP  N  N * cC )
cmerge, cmergeR, cshuffle, ct) are common for join algorithms.
Consider these costs in more detail as [15, 24]. Stages
                                                                  p(buf ) * mC * ( SIP  N )
of reading input data from HDFS and writing into                 After that stage map output transferred to the reducers
HDFS are calculated by:                                          (this cost includes the cost for all reducers).
 сread  s(split ) * rh , cWrite  s(out ) * wh ,                                              | nodes  1 |
                                                                 ctr  s(outm)* | map | *                    * tC
where split is input split for mapper task, out is the                                           | nodes |
output data of job. The buffering phase is more
complicated; during this stage three processes take              The data from mappers are transferred by segments to
place: partitioning, sorting and spilling to disk.               reducers. Without considering the data skew, it is
cBuffer  s( split ) * rwl  p(outm ) * ( pC  sC               assumed that the sizes of segments are the same.
                                                                              s(outm)
                       p(buf )                                 s( seg ) 
 cC  sortC * log 2                                                       | red |
                       | red |                                 When segment arrive to the reducer it is placed in
Where outm is output from map functions, buf is buffer           shuffle buffer or if size of segment is greater than 25%
for this stage. The buffer is divided into two parts, there      of buffer size then it is spilled into disk without in-
are serialization buffer (SB), that contains key-value           memory buffer. The buffer size is determined by the
pairs and an accounting buffer (AB) that contains the            configuration parameters as:
metadata. So, the number of pairs in buffer is:                   s(buf )  shuBP * memT. If buffer reaches size
p(buf )  min{ p(SB), p( AB)}                                    threshold (s(thr)) or the number of segments is greater
                                                                 than memMT, then segments are merged, sort and spill
          sortMB * 2 20 * (1  sortRP) * sortSP 
p( SB)                                                        into disk. s(thr )  s(buf ) * shuMP . The number of
                           wid                                 segments (|segF|) in shuffle file and the number of such
                                                                 files (|shF|) are:
           sortMB * 2 20 * sortRP * sortSP                                1, s ( seg )  0,25  s (buf )
p( AB )                                                                  
                          16                                               s (thr ) 
The number of spilled files (N) from this stage is:
                                                                 | segF |              , s ( seg )  0,25 * s (buf )
                                                                              s ( seg ) 
    p(out )                                                               memMT , | segF | memMT
N          
    p(buf ) 
Then all spilled files must be merged with such                            | map | 
                                                                 | shF |             
features:                                                                  | segF | 
   the number of spill files are merged at once is F,           If the number of shuffle files is greater than (2*F-1)
      assume that the following N  F ,
                                               2
                                                                then all files are merged into one. So, all segments may
     at first pass it is merged so spill files that remain      be divided on three states: in-memory buffer (segMB),
      files is multiplies F                                      shuffle unmerged files (segUF) and shuffle merged files
     at final merge if needed the combiner will be              (segMF).
      used.
  | segMB || map | mod | segF |                            In map function source tag is assign to each pair
                                                            (consider that input map pair is equal to output map
              0, | shF | 2 * F  1                        pair):
              
  | segMF |  | shF | 2 * F  1                          GRSJ
                                                            сMap   p(outm) * ct , wid  wid  0,000000953
                       F           1
              
                                                            In reduce function pairs with different tags are joined
  | segUF || shF |  F * | segMF |                         (nested-loop):
The cost of shuffle stage is:
                                                                        p(inpr )  2                   
сsfuffle | segF | *s ( seg ) * selC * rwl * (| shF |       GRSJ
                                                            сreduce                * selР  p(inpr )  * ct
                                                                       2                              
 | segMF | *2) | segMF | * | segF | * p( seg ) *                                                      
                                                            As opposite to General reducer-side join, the cost of
* mC  | map | * p( seg ) * (mC  cC ) * I                  Optimized reducer-side join includes the cost of
                                                            combine function and the cost of reduce function is less
   1, s ( seg )  0,25 * s (buf )
I                                                                GRSJ
                                                            then сreduce :
   0
Thereafter, segMB,segUF, segMF files must be merged.                  p(inpr )  p(inpr )  2        
Some segments from memory (segE) are spilled to disk
                                                             ORSJ
                                                            сreduce                       * selР  * ct ,
                                                                      2         2                  
by redBP constraint.                                                                                 
           | segMB | *s( seg )  redBP * memT            сMap  сMap , wid  wid  0,00000190734
                                                             ORSJ     GRSJ

          
| segE |                s( seg )            
                                                           In contrast to the previous join, MR program of the
          0, | segMB | *s( seg )  redBP * memT            Hybrid Hadoop join consist of pre-processing job and
                                                           join job. The pre-processing job is partition one dataset
                                                            into |red| parts, and besides these partitions may be got
If the number of files from disk is less than F then segE   from other MR job or from default MR job. The costs
files are merged separately.                                of default map and reduce functions are:
        | segE | *s( seg )                                   prep
                                                             сMap   creduce
                                                                       prep
                                                                              p(in1) * ct
s(m1)  
        0, | segUF |  | segMF | F                        There are two ways to deliver full one dataset to the
                                                            mapper: read file from HDFS or by using distributed
After the merging, the number of files from disk is:
                                                            cache. And if distributed cache is used then the
          | segUF |  | segMF | 1, s(m1)  0              necessary files are copied to the slave nodes before the
| segD |                                                  job is started. So, the ctr cost is added. The costs of with
          | segUF |  | segMF |  | segE |                 and without distributed cache deliver are:
Then the process of merging is similar to c merge, where
N=|segD|.                                                   ccache  s(in1) * wrl  p(in1) * ct
           SIP                                          c  s(in1) * rh  p(in1) * ct
s(m2)         * ( s( segUF )  s( segMF )  s( segE )) hdfs
            N                                          The map and reduce functions costs of join job are:
At final it is merged remained files.                        hyb
                                                            сMap  сMap
                                                                    prep
                                                                         (in 2),
| segR | SFP * (| segMB |  | segE |)
                                                                     ccache  p(in1) * p(in 2) * selР * ct
 N | segR |                                                 hyb
                                                            creduce 
                                                                     chdfs  p(in1) * p(in 2) * selР * ct
          SIP
s (m3)       * | map | *s( seg )                           4.4 Cost model for Map-Side join
           N
The final cost of this phase is:                            The join job doesn’t have reducer phase.
                                            mC            Map-side partition join consists of pre-processing jobs
сmergeR  ( s(m1)  s(m2)  s(m3)) *  rwl                for two input datasets (or partitions are got from another
                                            wid           job) and join job.
                                                            The map function of join job is:
Since the join algorithms are known in advance we can        MSPJ
                                                            creduce  сreduce
                                                                       hyb
more accurately than the approach in [28] is to estimate
the cost of user-defined functions Map and Reduce.          In-Memory Join the small dataset (in1) is broadcast to
                                                            all reducers.
4.3 Cost model for Reduce-Side join                                                     p(in1)
                                                                   ccache  p(in 2) * | red | * selР * ct
In case of General reducer-side join, MR program             IMMJ
                                                            сMap  
consists of one job and cost for combining is equal to 0.
                                                                    c  p(in 2) * p(in1) * selР * c
                                                                     hdfs            | red |
                                                                                                         t
In reversed join the datasets are reversed, in2 (the          value, where value is the remaining attributes.
bigger one) is broadcast, in1 is split of smaller dataset     Generation of synthetic data was done as in [4]. Join
and it is loaded in hash table.                               keys are distributed randomly.
        p (in1) * ct  p (in 2) * wrl  p (in1) *
        p (in 2)
       *                                                     5.2 Cluster configuration
                    * selР * ct , cache
        | red |                                              Cluster consists of three virtual machines, where one of
сMap  
 rev
                                                              them is master and slave at the same time, the
        p (in1) * ct  p (in 2) * rh  p (in1) *             remaining two are the slaves. Host configuration
        p (in 2)                                             consists of 1 processor, 512 mb of memory for the
       *           * selР * ct                               master, for others nodes have by 512 mb, 5 gb is the
        | red |                                              disk size. Hadoop 20.203.0 runs on Ubuntu 10.10.
Multi-phase map join cost equal to sum of immj job
                                     s(in1)                   5.3 The General Case
costs. The number of summands is             1.
                                     memT                     The base idea of this experiment is to compare
4.5 The semi-join cost                                        executions time of different phases of various
                                                              algorithms. Some parameters are fixed: the number of
The semi-join with selection consists of two jobs:            Map and Reduce tasks is 3, the input size is
finding unique keys and filter the dataset by unique          10000*100000 and 1000000*1000000 tuples.
keys. The cost of map function of finding unique keys is
sum of filling hash table and producing the output costs.
The input for this job is one dataset.
 find
сMap   p(in1) * 2 * ct . The reduce function of that
job is run on the one reducer and the same as default
reduce function.
The filtering job consists of one map phase, where the
file with unique key from previous job is loaded into
hash table and then the split of another dataset is probe.
      ccache  p(in ) * ct
 fil
сMap 
      chdfs  p(in ) * ct
                                                              Figure 1: Executions time of different phases of various
                                                              algorithms. Size 10000*100000.
The Adaptive semi-join is similar to reduce-side join.
The two datasets are read and tagged by label in map
function. And at reducer the pairs with different tags are
output. The cost is equal to default job. But at the actual
join it is needed to add some cost of loading file with
unique keys, filling hash table and filtering useless pairs
    fil
as сMap .
 In case of semi-join with bloom-filter the program
consists of two jobs: creating bloom filter and filtering
the dataset. In the map function, bloom filter for split
constructed and the output all filter as one pair.
                                                              Figure 2: Executions time of different phases of various
 bloom
сMap    p(in1) * ct  s(bloom) * lo                          algorithms. Size 1000000*1000000.
Where lo is the cost for processing bloom filter. The
reducer is one and it is combine all bloom-filter into        For a small amount of data, Map phase, in which all
one.                                                          tuples are tagged, and Shuffle phase, in which data are
                                                              transferred from one phase to another, are more costly
    duce | map | *s(bloom) * lo
 bloom
сRe
                                                              in Reduce-Side joins. It should be noted that GRSJ is
At another job the constructed bloom-filter is loaded         better than ORSJ on small data, but it is the same on big
and the second dataset is probed.                             data. It is because in first case time does not spend on
 filb
сMap   s(bloom) * rh  p(in 2) * ct                          combining tuples. Possible, on the larger data ORSJ
                                                              outperform GRSJ when the usefulness of grouping by
5 Experiments                                                 key will be more significant. Also for algorithms with
                                                              pre-processing more time are spent on partitioning data.
                                                              The algorithms in memory (IMMJ and REV) are similar
5.1 Dataset
                                                              in small data. Two algorithms are not shown in the
Data are the set of tuples, which attributes are separated    graph because of their bad times: JDBM-based map join
by a comma. Tuple is split into a pair of a key and a         and Multi-phase map join. In large data IMMJ
algorithm could not be executed because of memory              used: size of two dataset is 2000000, one of the data set
overflow.                                                      has skew 500000 of 5, and another has 10 or 1 of 5. In
                                                               case with IMMJ was memory overflow.

   5.4 Semi-Join

The main idea of this experiment is to compare different
semi-join algorithms. These parameters are fixed: the
number of Map and Reduce tasks is 3, the bitmap size
of Bloom-filter is 2500000, the number of hash-
functions in Bloom-filter is 173, built-in Jenkins hash
algorithm is used in Bloom-filter. Adaptive semi-join
(ASGRSJ) does not finish because of memory
overflow. The abbreviation of Bloom-filter semi-join
for GRSJ is BGRSJ. The abbreviation of semi-join with
selection for GRSJ is SGRSJ respectively.                      Figure 5: Processing the data skew.

                                                                   Although these experiments do not completely cover
                                                               the tuneable set of Hadoop parameters, they are shown
                                                               the advantages and disadvantages of the proposed
                                                               algorithms. The main problems of these algorithms are
                                                               time spent on pre-processing, transferring data, the data
                                                               skew, and memory overflow.
                                                                   Each of the optimization techniques introduces
                                                               additional cost to the implementation of the join, so the
                                                               algorithm based on the tuneable settings and specific
Figure 3: Comparison of different semi-join implementations.   data should be carefully chosen. Also important are the
5.5 Distributed cache                                          parameters of the network bandwidth when distributed
                                                               cache are used or not used and a hardware specification
In [21] was showed that using of distributed cache is          of nodes because of it is importance when speculative
not always good strategy. They suggested that the              executions are on. Speculative execution reduces
problem can be a high speed network. This experiment           negative effects of non-uniform performance of
was carried out for Reversed Map-Side join, because for        physical nodes.
which a distributed cache can be important. Replication            Based on the collected statistics such as data size,
was varied as 1, 2, 3 and size of data is fixed –              how many keys will be taking part in the join, these
1000000*1000000 tuples. When data is small, the                statistics may be collected as well as the construction of
difference is not always visible. In large data algorithms     a range partitioner, the query planner can choose an
with distributed cache outperform approach of reading          efficient variant of the join. For example, in [5] was
from a globally distributed system.                            proposed what-if analyses and cost-based optimization.

                                                               6 Future work

                                                               The algorithms discussed in this paper, only two sets
                                                               are joined. It is interesting to extend from binary
                                                               operation to multi argument joins. Among the proposed
                                                               algorithms, there is no effective universal solution.
                                                               Therefore, it is necessary to evaluate the proposed cost
                                                               models for join algorithms. And for this problem it is
                                                               need to use real cluster with more than three nodes in it
Figure 4: Performance of Reversed Map-Side join with and       and more powerful to process bigger data, due to the
without using distributed cache.                               fact that the execution time on the virtual machine may
                                                               be different from the real cluster in reading/writing,
5.6 Skew data                                                  transferring data over the network and so on.
                                                                    Also the idea of processing the data skew in
It is known that many of the presented algorithms are          MapReduce applications from [19] can be applied to the
sensitive to the data skew. In this experiment take part       join algorithms. Another direction to future work is to
such algorithms as Reduce-side join with Simple                extend algorithm to support a theta-join and outer join.
Range-based Partitioner for GRSJ (GRSJRange) and               An interesting area for future work is to develop,
Virtual Processor Partitionerfor GRSJ (GRSJVirtual),           implement and evaluate algorithms or extended
and also for comparing in memory join: IMMJ, REV               algebraic operations suitable for complex similarity
because of resistant to the skew. Fixed parameters are
queries in an open distributed heterogeneous                      [8] Surajit Chaudhuri, Raghu Ramakrishnan, and Gerhard
environment. The reasons to evaluate complex                          Weikum. Integrating db and ir technologies: What is the
structured queries are: a need to combine search criteria             sound of one hand clapping? In CIDR, pages 1–12, 2005.
for different types of information; a query refinement            [9] Jeffrey Cohen, Brian Dolan, Mark Dunlap, Joseph M.
e.g. based on user profile or feedback; advanced users                Hellerstein, and Caleb Welton. Mad skills: new analysis
may need query structuring. The execution model and                   practices for big data. Proc. VLDB Endow., 2:1481–
algebraic operation to be implemented are outlined in                 1492, August 2009.
[31]. The main goal is to solve the problems presented            [10] Jeffrey Dean and Sanjay Ghemawat. Mapreduce: a
in [8] as a problem.                                                   flexible data processing tool. Commun. ACM, 53:72–77,
    In addition, one of the issues is efficient physical               January 2010.
representation of data. Binary formats are known to               [11] Jeffrey Dean, Sanjay Ghemawat, and Google Inc.
outperform the text both in speed reading and                          Mapreduce: simplified data processing on large clusters.
partitioning key / value pairs, and the transmission of                In In OSDI04: Proceedings of the 6th conference on
compressed data over the network. Along with the                       Symposium on Opearting Systems Design &
                                                                       Implementation. USENIX Association, 2004.
binary data format, column storage has already been
proposed for paradigm MapReduce. It is interesting to             [12] Leonidas Fegaras, Chengkai Li, and Upa Gupta. An
find the best representation for specific data.                        optimization framework for map-reduce queries. In
                                                                       EDBT 2012, march 2012.

7 Conclusion                                                      [13] Avrilia Floratou, Jignesh M. Patel, Eugene J. Shekita,
                                                                       and Sandeep Tata. Column-oriented storage techniques
                                                                       for mapreduce. Proc. VLDB Endow., 4:419–429, April
In this work we describe the state of the art in the area              2011.
of massive parallel processing, presented our                     [14] Alan F Gates. Programming Pig. O’Reilly Media, 2011.
comparative study of these algorithms, cost models and
our outline directions of future work.                            [15] Herodotos Herodotou. Hadoop performance models.
                                                                       CoRR, abs/1106.0940, 2011.
                                                                  [16] Herodotos Herodotou and Shivnath Babu. Profiling,
                                                                       what-if analysis, and cost-based optimization of
References                                                             mapreduce programs. PVLDB, 4(11):1111– 1122, 2011.
[1] Daniel J. Abadi, Samuel R. Madden, and Nabil Hachem.          [17] Eaman Jahani, Michael J. Cafarella, and Christopher R´e.
    Column-stores vs. row-stores: how different are they               Automatic optimization for mapreduce programs. Proc.
    really? In Proceedings of the 2008 ACM SIGMOD                      VLDB Endow., 4:385–396, mar 2011.
    international conference on Management of data,
    SIGMOD ’08, pages 967–980, New York, NY, USA,                 [18] Dawei Jiang, Anthony K. H. Tung, and Gang Chen.
    2008. ACM.                                                         Map-join-reduce: Toward scalable and efficient data
                                                                       analysis on large clusters. IEEE Transactions on
[2] Azza Abouzeid, Kamil Bajda-Pawlikowski, Daniel                     Knowledge and Data Engineering, 23:1299– 1311, 2011.
    Abadi, Avi Silberschatz, and Alexander Rasin.
    Hadoopdb: an architectural hybrid of mapreduce and            [19] YongChul Kwon, Magdalena Balazinska, Bill Howe, and
    dbms technologies for analytical workloads. Proc. VLDB             Jerome Rolia. A study of skew in mapreduce
    Endow., 2:922–933, August 2009.                                    applications. Moskow, Russia, june 2011. In the 5th
                                                                       Open Cirrus Summit.
[3] Foto N. Afrati and Jeffrey D. Ullman. Optimizing joins
    in a map-reduce environment. In Proceedings of the 13th       [20] Yuting Lin, Divyakant Agrawal, Chun Chen, Beng Chin
                                                                       Ooi, and Sai Wu. Llama: leveraging columnar storage for
    International Conference on Extending Database
    Technology, EDBT ’10, pages 99–110, New York, NY,                  scalable join processing in the mapreduce framework. In
    USA, 2010. ACM.                                                    Proceedings of the 2011 international conference on
                                                                       Management of data, SIGMOD ’11, pages 961–972,
[4] Fariha Atta. Implementation and analysis of join                   New York, NY, USA, 2011. ACM.
    algorithms to handle skew for the hadoop mapreduce
    framework. Master’s thesis, MSc Informatics, School of        [21] Gang Luo and Liang Dong. Adaptive join plan
    Informatics, University of Edinburgh, 2010.                        generation in hadoop. Technical report, Duke University,
                                                                       2010.
[5]    Shivnath Babu. Towards automatic optimization of
      mapreduce programs. In Proceedings of the 1st ACM           [22] Christine Morin and Gilles Muller, editors. European
      symposium on Cloud computing, SoCC ’10, pages 137–               Conference on Computer Systems, Proceedings of the
      142, New York, NY, USA, 2010. ACM.                               5th European conference on Computer systems, EuroSys
                                                                       2010, Paris, France, April 13-16, 2010. ACM, 2010.
[6]    Spyros Blanas, Jignesh M. Patel, Vuk Ercegovac, Jun
      Rao, Eugene J. Shekita, and Yuanyuan Tian. A                [23] Alper Okcan and Mirek Riedewald. Processing theta-
      comparison of join algorithms for log processing in              joins using mapreduce. In Proceedings of the 2011
      mapreduce. In Proceedings of the 2010 international              international conference on Management of data,
      conference on Management of data, SIGMOD ’10, pages              SIGMOD ’11, pages 949–960, New York, NY, USA,
      975–986, New York, NY, USA, 2010. ACM.                           2011. ACM.

[7]    A Chatzistergiou. Designing a parallel query engine over   [24] Konstantina Palla. A comparative analysis of join
      map/reduce. Master’s thesis, MSc Informatics, School of          algorithms using the hadoop map/reduce framework.
      Informatics, University of Edinburgh, 2010.                      Master’s thesis, MSc Informatics, School of Informatics,
                                                                       University of Edinburgh, 2009.
[25] Andrew Pavlo, Erik Paulson, Alexander Rasin, Daniel J.
     Abadi, David J. DeWitt, Samuel Madden, and Michael
     Stonebraker. A comparison of approaches to large-scale
     data analysis. In Proceedings of the 35th SIGMOD
     international conference on Management of data,
     SIGMOD ’09, pages 165–178, New York, NY, USA,
     2009. ACM.
[26] Donovan A. Schneider and David J. DeWitt. A
     performance evaluation of four parallel join algorithms in
     a shared-nothing multiprocessor environment. SIGMOD
     Rec., 18:110–121, June 1989.
[27] Rares Vernica, Michael J. Carey, and Chen Li. Efficient
     parallel set-similarity joins using mapreduce. In
     Proceedings of the 2010 international conference on
     Management of data, SIGMOD ’10, pages 495–506,
     New York, NY, USA, 2010. ACM.
[28] Vertica Systems, Inc. Managing Big Data with Hadoop
     & Vertica, 2009.
[29] Guanying Wang, Ali Raza Butt, Prashant Pandey, and
     Karan Gupta. A simulation approach to evaluating design
     decisions in mapreduce setups. In MASCOTS, pages 1–
     11. IEEE, 2009.
[30] Hung-chih Yang, Ali Dasdan, Ruey-Lung Hsiao, and D.
     Stott Parker. Map-reduce-merge: simplified relational
     data processing on large clusters. In Proceedings of the
     2007 ACM SIGMOD international conference on
     Management of data, SIGMOD ’07, pages 1029–1040,
     New York, NY, USA, 2007. ACM.
[31] Anna Yarygina, Boris Novikov, and Natalia Vassilieva.
     Processing complex similarity queries: A systematic
     approach. In Maria Bielikova, Johann Eder, and A Min
     Tjoa, editors, ABDIS 2011 Research Communications:
     Proceedings II of the 5th East-European Conference on
     Advances in Databases and Information Systems 20 – 23
     September 2011, Vienna, pages 212–221. Austrian
     Computer Society, September 2011.
[32] Minqi Zhou, Rong Zhang, Dadan Zeng, Weining Qian,
     and Aoying Zhou. Join optimization in the mapreduce
     environment for column-wise data store. In Proceedings
     of the 2010 Sixth International Conference on Semantics,
     Knowledge and Grids, SKG ’10, pages 97–104,
     Washington, DC.