=Paper= {{Paper |id=Vol-2578/DARLIAP14 |storemode=property |title=A Balanced Solution for the Partition-based Spatial Merge Join in MapReduce |pdfUrl=https://ceur-ws.org/Vol-2578/DARLIAP14.pdf |volume=Vol-2578 |authors=Sara Migliorini,Alberto Belussi |dblpUrl=https://dblp.org/rec/conf/edbt/MiglioriniB20 }} ==A Balanced Solution for the Partition-based Spatial Merge Join in MapReduce== https://ceur-ws.org/Vol-2578/DARLIAP14.pdf
                          A Balanced Solution for the Partition-based
                               Spatial Merge Join in MapReduce
                                Sara Migliorini                                                            Alberto Belussi
         Dept. of Computer Science, University of Verona                                 Dept. of Computer Science, University of Verona
                    sara.migliorini@univr.it                                                         alberto.belussi@univr.it

ABSTRACT                                                                               of bytes as splitting criteria, without considering the content of
Several MapReduce frameworks have been developed in recent                             such data. In other words, given the original dataset, its records
years in order to cope with the need to process an increasing                          are placed inside the current partition (or split) until a given size
amount of data. Moreover, some extensions of them have been                            threshold is reached, then another partition is initialized, and so
proposed to deal with particular kind of information, like the                         on. Such an approach can be suitable in the general case when
spatial one. In this paper we will refer to SpatialHadoop, a spatial                   all the dataset content has to be processed. However, in other
extension of Apache Hadoop which provides a rich set of spatial                        cases when data are analyzed using selective queries based on
data types and operations. In the geo-spatial domain, spatial                          some attributes, such as time intervals or spatial regions, this
join is considered a fundamental operation for performing data                         approach may be very inefficient, since the correlation between
analysis. However, the join operation is generally classified as                       data instances is completely ignored. For this reason, in the spatial
a critical task to be performed in MapReduce, since it requires                        context, some global indexing (or partitioning) techniques have
to process two datasets at time. Several different solutions have                      been developed, which try to place inside the same partition data
been proposed in literature for efficiently performing a spatial                       which are spatially correlated in some way (i.e., placing inside
join which may or may not require the presence of a spatial                            the same partition nearby objects).
index computed on both datasets or only one of them. As already                            In geo-spatial applications, spatial join is considered an es-
discussed in literature, the efficiency of such operation depends                      sential operation for data analysis [4, 5], since it allows the data
on the ability to both prune unnecessary data as soon as possible                      analyst to discover important correlations between data or enrich
and to provide a balanced amount of work to be done by each                            the available information. A spatial join is a multi-dimensional
parallelly executed task.                                                              join specifically tailored for spatial data, in particular given two
   In this paper, we take a step forward in this direction by propos-                  datasets A and B each one characterized by a geometric attribute,
ing an evolution of the Partition-based Spatial Merge Join algo-                       called д and f respectively, a spatial join A ◃▹ B returns the set of
rithm which tries to completely exploit the benefit of the paral-                      pairs (a, b) such that a ∈ A and b ∈ B and the geometric intersec-
lelism induced by the MapReduce framework. In particular, we                           tion between the attributes a.f and b.д is not empty. For instance,
concentrate on the partition phase which has to produce filtered                       we can consider the case in which a geographer needs to compute
balanced and meaningful subdivisions of the original datasets.                         the spatial join between two huge datasets, one containing the
                                                                                       main roads and one the water areas, in order to predict the main
KEYWORDS                                                                               roads of her country which could be subject to flooding risks.
                                                                                       Similarly, a geographer may need to perform a join between a
Big Data, Partitioning, Spatial join, Balanced tasks, MapReduce
                                                                                       dataset containing the main roads and another one representing
                                                                                       the administrative subdivisions of her country, in order to study
1    INTRODUCTION                                                                      the density of the road network in each state.
The MapReduce paradigm has been specifically developed for                                 However the join operation has been traditionally considered
processing huge amount of data in an efficient way. In particular,                     a critical operation in MapReduce for two main reasons: (1) the
it requires to subdivide the desired analysis operation into two                       need to process two distinct datasets (files) at the same time, (2)
subsequent phases: the first one is called map and it performs in                      the need to perform some pruning or filtering operation in order
parallel the same operation on independent chunk of data, pro-                         to reduce the amount of unnecessary comparisons. In order to
ducing a set of partial results. These partial results are (possibly)                  solve such problems, many efforts have been devoted in recent
combined by the second phase which is called reduce and may                            years leading to the development of different MapReduce imple-
be parallelized as well. Nowadays there are several frameworks                         mentation of the join [6, 10] each one applicable to a particular
that implement the MapReduce paradigm, undoubtedly the most                            context. This also holds in the spatial context, where several
famous ones are Apache Hadoop [15] and Apache Spark [17].                              algorithms have been defined and implemented, which essen-
Moreover, in order to cope with the specific needs of particular                       tially differ for the use of a spatial index and in the way this
kinds of information, such as the spatial and temporal one, some                       index is built and used. At this regard SpatialHadoop is one of
extensions have been developed which provides the implemen-                            the available framework which provides an implementation for
tation of the necessary data types and operations. As regards to                       all these algorithms [9] that can also be combined with different
the spatial context, notable systems are SpatialHadoop [8] and                         kinds of indexes (partitioning techniques) [7]. In general, none
GeoSpark [16].                                                                         of these algorithms can be considered better than the others, but
    The basic partitioning approach traditionally implemented by                       the choice might depend on the characteristics of the involved
a MapReduce framework essentially uses a predefined amount                             datasets [2].
                                                                                           As we will describe in Sect. 2, the application of a spatial par-
© 2020 Copyright for this paper by its author(s). Published in the Workshop Proceed-
ings of the EDBT/ICDT 2020 Joint Conference (March 30-April 2, 2020, Copenhagen,
                                                                                       titioning technique before the execution of a join operation is
Denmark) on CEUR-WS.org. Use permitted under Creative Commons License At-              particularly useful in order to both discard entire partitions from
tribution 4.0 International (CC BY 4.0)                                                the analysis, or to balance the amount of work to be done in
parallel (typically by each map task). Relatively to this aspect, in    intersection, potentially reducing the number of map tasks and
literature has been extensively studied the importance of balanc-       the number of comparisons to be performed inside them.
ing the amount of work to be done by each map task in order to             Fig. 1 illustrates the application of Djni and Djgi to the same
completely exploit the parallelism induced by the MapReduce             pair of datasets containing rectangles: dataset D i contains the
paradigm [1, 3]. However, the preliminary application of a spatial      blue rectangles, while D j contains the orange ones, for both
partitioning technique comes with its cost and sometimes it is          datasets the corresponding MBR is also depicted. As you can no-
justified only if such new organization of data can be reused           tice for both algorithms, the input datasets have been subdivided
several time, absorbing such initial cost. Moreover, the right par-     into a certain number of partitions (or splits), but while for Djni
titioning technique to be applied may depends not only on the           each split can contain data coming from any zone of the space,
dataset characteristics, such as its distribution [3], but also from    for Djgi each split contains only data that are located nearby,
the type of analysis will be performed [12].                            reducing both the number of split combinations to be considered
    From these considerations, since the join operation requires        and the amount of unnecessary comparisons to be done. This
to process two distinct datasets together, the choice of the right      is a consequence of the fact that the splits of Djni cover almost
partitioning technique has necessarily to consider both datasets        the whole MBR of the dataset (see spliti and split j rectangles in
together in order to balance the amount of work to be done by           Fig. 1), while the splits (cells) of Djgi cover a small part of the
each map task. As we will better explain in Sect. 3, given two          reference space (see c i and k j cells in Fig. 1), thus reducing the
datasets A and B, each one individually partitioned with the most       number of geometric intersections to compute.
appropriate partitioning technique, it may happen that the splits
obtained by combining them for the join will not necessarily                                        &]o ]               &]o i
                                                                                                                        •‰o]ší
produce the overall better setting w.r.t. the balancing criterion.                                                                 •‰o]š]
                                                                                                                        •‰o]š
                                                                                                                          o i
    In this paper we consider the implementation of the Partition                                   •‰o]š]
Based Spatial Merge Join [13] provided by SpatialHadoop, de-
noted as Sjmr, which is the only spatial join algorithm that does                                             íÆu       •‰o]šu
                                                                           ]
not rely on the preliminary application of individual spatial par-
                                                                                                                                     ~ •             •‰o]š
                                                                                                                                                      ‰ i
tition techniques on the input datasets, but it defines a partition
grid based on both datasets together. Given such algorithm we
enhance the definition of a common partitioning grid in order                                                            vÆu

to both preliminary prune unnecessary data and promote the                                      i   /v       Æ]         /v    Æi
                                                                                                                                     ]
                                                                                                                                            ]

balancing of the obtained splits. Such partitioning technique has
to consider the spatial characteristics of both dataset together,                                                            lZ

such as the spatial distribution of its objects as well as the global                                    ]
                                                                                                                             l]
covered area.                                                                                                                li
                                                                                                                             ll      ~ •        lZ      i
                                                                                                                  íÆð


2   BACKGROUND                                                          Figure 1: Example of execution of Djni and Djgi on the
SpatialHadoop provides four different spatial join algorithms           same pair of datasets.
which essentially differ for the use of spatial indexes or for the
way data are repartitioned on the fly. In particular, the simplest          As you can notice, when the Djgi algorithm is applied, the
solution is represented by the Djni algorithm. Djni stands for          dimension of each combined split can be very different from
Distributed Join with No Index, it uses the default random parti-       each other, depending on the size and shape of the intersection
tioning technique, which is provided by any MapReduce frame-            between the original splits. In order to promote the definition of
work and is based only on the size constraint. In other words, it       balanced map tasks, the Djre algorithm has been developed. Djre
does not involve any preliminary repartition of the data based          stands for Distributed Join with Repartition, in this case only one
on their spatial properties (no index or global repartitioning is       of the two input datasets has been previously partitioned by
applied). Since Djni cannot rely on any spatial property, given         using the most appropriate spatial index, while the other one is
two datasets D i and D j subdivided into n and m partitions, re-        repartitioned by using the subdivision (or grid) induced by the
spectively, the number of map tasks to be instantiated is equal to      first one. In this case, the number of map tasks to be instantiated
the Cartesian product n × m. This case represents the worst case        is equal to the number of partitions of the previous dataset that
scenario for both the number of map tasks to be instantiated and        intersect also the other one. Moreover, while the shape of the
the amount of data to be processed. Moreover, since each pair of        obtained splits is uniform, there could be great differences in the
input partitions can contain data from any region of the space,         number of objects contained in each split, particularly if the two
there could be some map tasks that produce a huge number of             datasets covers only a partial overlapping space, or they cover
intersecting pairs, while others could have to compare only very        the same space with different distributions.
far geometries that do not partecipate to the result.                       Sjmr is the only spatial join algorithm which does not assume
   A first improvement of Djni is represented by the Djgi algo-         that the data have been preliminary partitioned with respect
rithm which starts from indexed data. Djgi stands for Distributed       to spatial criteria, but it performs itself the best subdivision by
Join with Grid Index, in this case both input datasets are assumed      considering both datasets together. Sjmr stands for Spatial Join
to be previously repartitioned by using one of the available spa-       MapReduce and it is the MapReduce implementation of the Parti-
tial indexes: the most suitable one given the dataset characteris-      tion Based Spatial Merge Join [13]. It uses a common global grid
tics [3]. In this case, each partition is identified by a global MBR    for partitioning data before executing the required comparisons.
(Minimum Bounding Rectangle) and map tasks are instantiated             As illustrated in Fig. 2, this grid is regular, namely the shape and
only for those pairs of partitions that have a not empty MBR            dimension of its cells is uniform and is automatically determined
based only on the input file size. The grid definition does not take   in bytes of the dataset A ∪ B, while sp is the size in bytes of the
care of neither the dataset distribution, nor the reference space      default split size.
individually covered by the two input datasets. In this paper,
we extend the global partitioning performed by the Sjmr at the
beginning in order to take care of also these aspects.

                                   Grid ixj


    Di
                                     cij



                                                         cij


                        Dj     Join cij


                                                                       Figure 3: (a) Example of application of Sjmr to two com-
Figure 2: Example of execution of the Sjmr algorithm. In               pletely overlapping datasets. (b) Example of unbalanced
this case a global partitioning grid I∪ is build which in-             grid produced by Djgi: the four left upper cells have been
cludes the union of the MBRs of the two datasets. Each                 highlighted for showing their different shape and dimen-
cell c i j is separately processed considering the geometries          sion.
coming from both datasets.
                                                                          This subdivision may be preferable to the application of indi-
    Tab. 1 provides a brief comparison between the available spa-      vidual indexes w.r.t. the balancing criteria, because it considers
tial join algorithms. The first column Op reports the name of          both datasets together. Fig. 3.b illustrates an example in which
the spatial join algorithm. Column BR indicates if the algorithm       the splits produced by using separate individual indexes (one in
requires the use of a modified binary reader in order to load two      green and one in red) can generate not only a greater number
dataset at time. In particular, a tick indicates that the algorithm    of combinations of intersecting splits, but also very unbalanced
considers each dataset individually and contemporarily loads a         combinations, like the ones highlighted in the upper left corner
partition from each of them. Conversely, in the Sjmr case, the         of Fig. 3.b where one green cell is combined with four red cells.
algorithm uses the default Hadoop reader and is able to load data         The default construction of the grid in Sjmr considers the
coming from the two inputs by simply merging the original files.       union of the two datasets. However, if their original reference
Notice that the modified binary reader induces some additional         spaces are not completely overlapping, this grid can contain cells
problems w.r.t. the locality principle: the system guarantees that     outside the join reference space that include data coming only
at least one of the two inputs is locally loaded by the computation    from one dataset. Some exemplifying situations are illustrated in
node, but the second one can be read both locally or remotely,         Fig. 4: in the first case the reference space of one dataset is con-
inducing some overhead. Conversely, with the Sjmr each map             tained inside the other one, while in the second case the reference
task is always able to locally read its input, with great advan-       spaces of the two datasets are shifted. These situations reveal
tage in terms of performance of the I/O operations. Column In          that it is not necessary to compute the global grid starting from
reports the number of datasets that are assumed to be indexed          the union of the MBRs of the two datasets, but in order to reduce
before the spatial join execution, while column Rep indicates          the amount of unnecessary work, we can instead consider the
if a repartition of one dataset is applied before the join. Finally,   intersection between the MBRs of the two input datasets during
column Ref reports a reference to the original algorithm.              the grid definition. Moreover, in case of uniformly distributed
                                                                       datasets, the definition of the grid starting from the intersection
 Table 1: Summary of the various spatial join operators.               of the MBRs ensures that the obtained splits are also more likely
                                                                       to be balanced.
    Op     BR   In   Rep                       Ref
    Djni   X     0    ✗              Block Nested Loop Join
    Djgi   X     2    ✗       Grid File Spatial Join algorithm [11]
    Djre   X     1    X                Bulk-Index Join [14]
    Sjmr    ✗    0    ✗      Partition Based Spatial Merge Join [13]



3        MOTIVATING EXAMPLES
In this section we illustrates some example of situations in which
the application of the classical partitioning technique provided
by the Sjmr algorithm can produce unbalanced situations.               Figure 4: Example of reference spaces that are not com-
   We will start by considering the best case regarding the join       pletely overlapping: (a) one reference space inside the
between two spatial datasets A and B whose geometries are uni-         other, (b) two shifted reference spaces.
formly distributed around the same reference space. This situa-
tion is illustrated in Fig. 3.a. The global uniform grid is computed       Finally, we consider the case in which the two datasets are not
starting from the MBR of the union of the two datasets (i.e.,          uniformly distributed. In this case the use of cells with the same
MBR(A ∪ B)) and by uniformly      p subdividing the space both hori-   shape and dimension can induce unbalanced split dimensions, as
zontally and vertically into ⌈ ds/sp⌉ cells, where ds is the size      illustrate in Fig. 5.
                                                                                        Definition 4.5 (Balanced Partitioning). A partitioning P for a
                                                                                      dataset D is said to be balanced if and only if:
                                                                                                       ∀pi , p j ∈ P : abs(|pi | − |p j |) < ε         (2)
                                                                                      where |pi | denotes the cardinality of partition pi .
                                                                                         The aim of this paper is to obtain balanced spatially-enhanced
                                                                                      partitions which contains data from both input datasets involved
                                                                                      in the join operation.

Figure 5: Unbalanced situation: the cells located closer to                           5   PROPOSED SOLUTION
the boundary are more populated then the central ones.                                This section presents the proposed solution, called Esjmr (En-
                                                                                      hanced Sjmr), together with a detailed description of the differ-
                                                                                      ences introduced w.r.t. the original Sjmr algorithm provided by
4     PROBLEM FORMULATION                                                             SpatialHadoop.
This section formalizes the problem of obtaining balanced parti-                          Sjmr is composed of three MapReduce jobs: the first two are
tions in the context of the spatial join execution.                                   responsible for computing the MBR of the two involved datasets
                                                                                      separately. The union of these two MBRs is then used by the
   Definition 4.1 (Spatial Dataset). A spatial dataset D = {r 1 , . . . , r n }       third job for computing the global uniform grid and performing
is a collection of records r i each one characterized by a spatial                    the spatial join. In particular, as regards to the third job, during
attribute д.                                                                          the map phase, each mapper assigns its input geometries to one
                                                                                      or more cells of the uniform grid, then the reducers (potentially
  In the following, we will use the notation r i .д in order to
                                                                                      one for each grid cell) receives the geometries contained in a
denote the spatial attribute д contained in the record r i .
                                                                                      cell and computes the join on them by executing a plane-sweep
   Definition 4.2 (Partitioning). Given a dataset D = {r 1 , . . . , r n },           algorithm. Some expedients are used to avoid the production of
a partitioning P is a collection of subsets of D:                                     duplicated tuples in the final output.
                                                                                          In Esjmr we propose a unique job for computing the MBR of
    P = {p1 , . . . , ph } such that ∀pi ∈ P (pi ⊆ D ∧ D = ∪i pi )             (1)    both datasets together, as illustrated in Algorithm 1. In particular,
                                                                                      the produced output is already the intersection of the dataset
   The general notion of partitioning does not require that any
                                                                                      MBRs. Notice that, in order to read the two input dataset, the job
other specific property holds between the contained objects. Con-
                                                                                      uses a strategy already known in literature [6, 15] that combines
versely, the notion of spatially-enhanced partitioning is defined
                                                                                      the two inputs into a unique file by keeping a reference to the
in order to place nearby objects inside the same partition.
                                                                                      source. In the algorithms this additional reference is denoted
   Given a subdivision of the space covered by a dataset D and
                                                                                      by representing the value of a tuple as ⟨r i , f ⟩ where r i is the
represented by a set of cells G = {c 1 , . . . , ch }, each cell c k will
                                                                                      original record contained in the input file, while f with f =
contain only the records of D whose spatial attribute д has a not
                                                                                      {1, 2} denotes the fact that the record r i originally belongs to the
empty intersection with c k . Partitions are then defined starting
                                                                                      first or second input file. Moreover, together with the MBR of
from this grid subdivision, so that each partition corresponds to
                                                                                      the intersection, we produce also an estimation of the number
a cell of G.
                                                                                      of geometries contained in such intersection. This estimation
   Definition 4.3 (Minimum Bounding Rectangle). Given a geom-                         is useful in order to properly instantiate the partitioning grid,
etry д defined by a not empty set of 2D coordinates, i.e. д =                         indeed the intersection should contain less geometries than the
{(x 1 , y1 ), . . . (x n , yn )}, the Minimum Bounding Rectangle (MBR)                union of the two original datasets, particularly in the case their
of д is the rectangle representing the maximum extension of                           reference spaces are not completely overlapping.
д. In other words, the MBR of д is defined by the coordinates                             In Algorithm 1 each mapper is responsible for updating the
min({x 1 , . . . , x n }), min({y1 , . . . , yn }), max({x 1 , . . . , x n }), max(   dataset MBRs based on the geometries contained in its split, in
{y1 , . . . , yn }). The definition of MBR can be easily extended to a                the psedo-code r i .д stands for the geometric attribute д contained
set of geometries as well.                                                            in the record r i . Notice that instead of building only two sep-
                                                                                      arate MBRs, each mapper maintains and updates two ordered
  In the following we will use MBR(д), MBR(D) or MBR(pi ) to                          lists of partial MBRs (one for each input file). When a new ge-
denote the MBR of a generic geometry д, of a dataset D or a                           ometry is processed, the first overlapping MBR intersecting it
partition pi , respectively.                                                          is updated accordingly, or a new partial MBR is added to the
                                                                                      corresponding list. A counter is also maintained for each partial
   Definition 4.4 (Spatially-enhanced partitioning). Given a dataset
                                                                                      MBR which represents the number of geometries intersecting it.
D and a grid G = {c 1 , . . . , ch } covering MBR(D), a spatially-
                                                                                      This counter will be used for estimating the number of geome-
enhanced partitioning P of D is a collection of subsets of D such
                                                                                      tries belonging to the dataset intersection. Clearly, this may be
that:
                                                                                      an overestimation, but it is more indicative than considering the
    P = {p1 , . . . , ph } such that ∀pi ∈ P                                          sum of the cardinality of the two datasets. At the end of each
         (pi ⊆ D ∧ D = ∪i pi ∧ ∀r j ∈ pi (r j .д ∩ MBR(pi ) , ∅))                     map, the Cleanup procedure will perform some aggregations of
                                                                                      adjacent overlapping MBRs, so that the unique reducer will re-
   Independently from the kind of considered partitions (i.e., spa-                   ceive a limited amount of MBRs. The reducer can easily compute
tially-enhanced or not), we can define the concept of balanced                        both the intersection of the dataset MBRs and an estimation of
partitioning as follows.                                                              the number of geometries in the intersection. Notice that in all
 Algorithm 1: MBR computation                                         Algorithm 2: Partitioning computation
1 class Mapper                                                       1 class Mapper
2    mbr i1 ←− mbr i2 ←− ∅                                           2      G ←− grid computed using the previous job
3     method Map( _ , ⟨r i , f ⟩)                                    3      th ←− cell occupation threshold
4        if f = 1 then                                               4      method Map( _ , ⟨r i , f ⟩)
5            if ∃x ∈ mbr i1 (x .mbr ∩ Mbr(r i .д)) then               5        cl ←− IntersectingCells(G, r i .д)
6                x .mbr ←− Extend(x .mbr, Mbr(r i .д))                6        for c ∈ cl do
 7                 x .count ←− x .count + 1                           7            Write(c, ⟨r i , f ⟩)
 8            else
 9                x .mbr ←− Mbr(r i , д)                             8 class Reducer

10                x .count ←− 1                                      9      method Reduce(c, l = {⟨r i , f ) . . . }
                                                                              if |l | > th then
                 mbr i1 .SortedAdd(x)
                                                                     10
11
                                                                     11            ll ←− ∅
12        else                                                       12            while ll = ∅ ∧ BigSplits(ll) do
13            if ∃y ∈ mbr i2 (y.mbr ∩ Mbr(r i .д)) then              13                 ll ←− Split(c, ll)
14                y.mbr ←− Extend(y.mbr, Mbr(r i .д))                14             for l ∈ ll do
15                y.count ←− y.count + 1                             15                 WriteInSplit(l)
16            else
                                                                     16         else
17               y.mbr ←− Mbr(r i , д)
                                                                     17             WriteInSplit(l)
18               y.count ←− 1
19               mbr i2 .SortedAdd(y)

      method Cleanup()
                                                                     uniform grid G which has an extension equal to the MBR of
20
21      Compact(mbr i1 )
                                                                     the datasets intersection and cells with uniform size computed
22      for x ∈ mbr i1 do                                            using the dimension (number of objects) of the intersection and
23          Write( _ , ⟨x .mbr, x .count, 1⟩)                        the given split size, (2) a threshold value th representing the
24        Compact(mbr i2 )                                           maximum number of objects to be included in each split.
25        for y ∈ mbr i2 do                                              More specifically, given the dataset D ∩ obtained from the inter-
26            Write( _ , ⟨y.mbr, y.count, 2⟩)                        section of the two input datasets, whose size in bytes is denoted
                                                                     as size(D ∩ ), and the desired size in bytes of a split, denoted as
27 class Reducer                                                     size(split), the estimated initial number of cells of the grid G is
28    mbr 1 ←− mbr 2 ←− mbr ←− EmptyMbr                              computed as #cells = ⌈size(D ∩ )/size(split)⌉. √ Given such √ estima-
29    l 1 ←− l 2 ←− ∅                                                tion, the grid G will have the dimension ⌈ #cells⌉ × ⌈ √#cells⌉,
30    method Reduce( _, ⟨mbr i , counti , f ⟩)                       while the cell weight will be equal to width(MBR(D ∩ ))/⌈√ #cells⌉
31         if f = 1 then                                             and the cell height will be equal to heiдht(MBR(D ∩ ))/⌈ #cells⌉.
32             mbr 1 ←− ⟨Extend(mbr 1 , mbr i )                          Function IntersectingCells(G, д) returns the set of cells of
33             l 1 ←− l 1 ∪ {(mbr i , counti )}                      G which have a not empty intersection with д. This set can be ef-
                                                                     ficiently obtained by considering the MBR of д and thanks to the
34        else
                                                                     fixed dimension of the cells. In other words, the index of the first
35           mbr 2 ←− ⟨Extend(mbr 2 , mbr i )
                                                                     intersected cell is obtained by dividing the minimum x and the
36            l 2 ←− l 2 ∪ {(mbr i , counti )}
                                                                     minumum y of Mbr(д) for the cell width and height, respectively.
37    method Cleanup()                                               In this way the map tasks produce an initial uniform subdivision
38      mbr ←− mbr 1 ∩ mbr 2                                         of the geometries which can be enough if geometries are uni-
39      for x ∈ l 1 ∪ l 2 do                                         formly distributed. Otherwise, some recursive subdivisions of an
40          if x .mbr ∩ mbr then                                     overcrowded cell can be necessary.
41              count ←− count + x .count                                The reducers, potentially one for each not empty cell produced
                                                                     by the mappers, are responsible for checking the degree of occu-
42        Write( _ , ⟨mbr, count⟩)                                   pancy of such cells. In particular, if the degree of occupancy is
                                                                     less than the given threshold, the geometries contained in the cell
                                                                     can be directly written in a split by the function WriteInSplit().
                                                                     Otherwise, the geometries inside the cell will be partitioned again
algorithms, the symbol “_” denotes a dummy serial identifier for     by recursively subdividing the cell into four splits (like in a quad-
a MapReduce input for which we do not take care of.                  tree index). Function Split is responsible for doing such recursive
   Given the MBR of the dataset intersection which represents        subdivision, while function BigSplit returns true if the degree
the grid extension, the second task is responsible for performing    of occupancy of c overcomes the threshold th.
the balanced partitioning of the two input datasets. It is very          The last job is responsible for performing the spatial join be-
similar to the map phase of the second Sjmr task, but it also uses   tween the geometries that belong to the intersection, namely the
a reduce phase for refining the obtained partitions and producing    geometries of D ∩ . This is done during a map phase, where each
more balanced splits. This second job is described in Algorithm 2,   mapper receives a split, namely the geometries of both datasets
where it is assumed that each map task knows: (1) the initial        that belong to a particular cell. The first operation to be done is
subdividing the geometries into two lists (one for each dataset)            7     CONCLUSION
and then the plain sweep algorithm is performed to compute                  In recent years the amount of spatial data to be processed is
the intersecting pairs. Some expedients can be applied in order             continuously increasing, thanks also to the spread of IoT and
to avoid the production of duplicated pairs during the writing              mobile devices with geo-spatial capabilities. For this reason, some
performed by the mappers, as already done by SpatialHadoop.                 MapReduce frameworks have been developed which are specifi-
                                                                            cally tailored for modeling and processing spatial data.
 Algorithm 3: Spatial Join                                                      Among the possible spatial operation, spatial join is certainly
1 class Mapper                                                              considered a fundamental one for performing meaningful geo-
2     l 1 ←− l 2 ←− {}                                                      spatial analysis. However, the join is traditionally considered a
3     method Map( _ , ⟨r i , f ⟩)                                           critical operation to be performed in the MapReduce context,
4          if f = 1 then                                                    since it requires to process two distinct datasets (files) at time.
 5             l 1 ←− l 1 .SortedAdd(r i )                                  For this reason, several join solutions have been developed both
                                                                            in the general case and in the spatial one. As already discussed in
6           else                                                            literature, the performances of such algorithms greatly depends
 7              l 2 ←− l 2 .SortedAdd(r i )                                 on the ability to prune unnecessary data as soon as possibile, and
8      method Cleanup()                                                     to produce balanced partitions. Having balanced partitions means
9        pl ←− PlaneSweep(l 1 , l 2 )                                       that the parallel map tasks have essentially the same amount of
10       for ⟨r 1 , r 2 ⟩ ∈ pl do                                           work to do. In this way, we can maximize the benefits induced
11           Write(r 1 , r 2 )                                              by the parallelism.
                                                                                This paper takes inspiration from the Sjmr algorithm provided
                                                                            by SpatialHadoop which is the only one that does not require a
                                                                            preliminary construction of a spatial index on both input datasets
                                                                            and also considers both datasets together during the repartition.
6    EXPERIMENTS AND VALIDATION                                             Starting from this algorithm, we propose an enhanced version
This section presents some preliminary results obtained by apply-           of Sjmr, called Esjmr, with the aim to produce both more bal-
ing the proposed Esjmr technique, together with a comparison                anced and filtered partitions. Some preliminar experiments have
with the partitioning used by Sjmr. In particular, we are inter-            been performed in order to check the benefits of the partitioning
ested in analysing the degree of balancing obtained by the two              induced by Esjmr w.r.t. the one produce by Sjmr. Such initial
partitioning techniques.                                                    results encourages further development in this direction and will
   Such results are illustrate in Table 2 where two real world cases        guide the definition of future works.
are considered: (1) the join between the water area (WA) and the
primary roads of USA (PR), and (2) the join between the roads               REFERENCES
(RD) and the administrative subdivisions (AS) of Australia. In the           [1] A. Belussi, D. Carra, S. Migliorini, M. Negri, and G. Pelagatti. 2018. What
table column |Mbr∪ | reports the number of geometries contained                  Makes Spatial Data Big? A Discussion on How to Partition Spatial Data. In
in the union of the two MBRs, while |Mbr∩ | the number of                        10th International Conference on Geographic Information Science. 1–15. https:
                                                                                 //doi.org/10.1109/ICDE.2015.7113382
geometries in the intersection between the two MBRs. As you                  [2] A. Belussi, S. Migliorini, and A. Eldawy. 2018. A Cost Model for Spatial Join
can notice, in both cases this number is quite different: the use of             Operations in SpatialHadoop. Technical Report RR108/2018. Dept. of Computer
                                                                                 Science, University of Verona. https://iris.univr.it/handle/11562/981957
the intersection allow to prune unnecessary geometries as soon               [3] A. Belussi, S. Migliorini, and A. Eldawy. 2018. Detecting Skewness of Big
as possible. Column #splits reports the number of not empty cells                Spatial Data in SpatialHadoop. In Proc. of the 26th ACM SIGSPATIAL Interna-
produced by the two techniques, while column %RDS reports the                    tional Conference on Advances in Geographic Information Systems. 432–435.
                                                                                 https://doi.org/10.1145/3274895.3274923
relative standard deviation between the size of the splits. This             [4] A. Belussi, S. Migliorini, M. Negri, and G. Pelagatti. 2015. Validation of Spatial
last measure is intended as a measure of the balancing degree                    Integrity Constraints in City Models. In Proc. of the 4th ACM SIGSPATIAL Int.
between the split sizes.                                                         Workshop on Mobile Geographic Information Systems (MobiGIS âĂŹ15). 70–79.
                                                                                 https://doi.org/10.1145/2834126.2834137
   As regards to the number of splits, while Sjmr is able to pro-            [5] A. Belussi, M. Negri, and G. Pelagatti. 2006. Modelling Spatial Whole-Part
duce less splits (smaller number of join mappers), Esjmr guar-                   Relationships using an ISO-TC211 Conformant Approach. Inf. Softw. Technol.
                                                                                 48, 11 (2006), 1095–1103. https://doi.org/10.1016/j.infsof.2006.02.002
antees more balanced parallel tasks. Consider for example the                [6] S. Blanas, J. M. Patel, V. Ercegovac, J. Rao, E. J. Shekita, and Y. Tian. 2010. A
first case: given the original uniform grid, six splits contains a               Comparison of Join Algorithms for Log Processing in MapReduce. In Proc. of
number of geometries less than the desired threshold, while the                  the 2010 ACM SIGMOD Int. Conf. on Management of Data. 975–986. https:
                                                                                 //doi.org/10.1145/1807167.1807273
remaining two contain more than half of the geometries, so they              [7] A. Eldawy, L. Alarabi, and M. F. Mokbel. 2015. Spatial Partitioning Techniques
are recursively subdivided by Esjmr.                                             in SpatialHadoop. Proc. VLDB Endow. 8, 12 (2015), 1602–1605. https://doi.org/
                                                                                 10.14778/2824032.2824057
                                                                             [8] A. Eldawy and M. F. Mokbel. 2015. SpatialHadoop: A MapReduce framework
Table 2: Comparison of the experimental results obtained                         for spatial data. In 2015 IEEE 31st International Conference on Data Engineering.
by applying Sjmr and Esjmr algorihtms.                                           1352–1363. https://doi.org/10.1109/ICDE.2015.7113382
                                                                             [9] A. Eldawy and M. F. Mokbel. 2017. Spatial Join with Hadoop. In Encyclopedia
                                                                                 of GIS. Springer, 2032–2036. https://doi.org/10.1007/978-3-319-17885-1_1570
 Datasets                 Sjmr                            Esjmr             [10] J. Gu, S. Peng, X. S. Wang, W. Rao, M. Yang, and Y. Cao. 2014. Cost-Based Join
               |Mbr∪ |     #splits   %RDS      |Mbr∩ |     #splits   %RDS        Algorithm Selection in Hadoop. In 15th Int. Conf. on Web Information Systems
 WA ◃▹ PR     2,305,162      8       181%     2,007,414      22       50%        Engineering. 246–261. https://doi.org/10.1007/978-3-319-11746-1_18
                                                                            [11] L. Harada, M. Nakano, M. Kitsuregawa, and M. Takagi. 1990. Query Processing
 AS ◃▹ PR     1,245,200      5       188%     1,244,800      14       65%        for Multi-Attribute Clustered Records. In Proceedings of 16th Int. Conf. on Very
                                                                                 Large Data Bases. 59–70.
                                                                            [12] S. Migliorini, A. Belussi, E. Quintarelli, and D. Carra. 2020. A Context-based
   The preliminary experiments encourage the investigation in                    Approach for Partitioning Big Data. In Proceedings 23rd International Confer-
this direction, since the proposed technique ensures the construc-               ence on Extending Database Technology (EDBT). 1–4. https://doi.org/10.1109/
tion or more balanced splits.                                                    ICDE.1999.754937
[13] J. M. Patel and D. J. DeWitt. 1996. Partition Based Spatial-merge Join. SIGMOD
     Rec. 25, 2 (June 1996), 259–270. https://doi.org/10.1145/235968.233338
[14] J. van den Bercken, B. Seeger, and P. Widmayer. 1999. The bulk index join: a
     generic approach to processing non-equijoins. In Proceedings 15th Int. Conf.
     on Data Engineering. 257–. https://doi.org/10.1109/ICDE.1999.754937
[15] T. White. 2015. Hadoop: The Definitive Guide (4th ed.). O’Reilly Media, Inc.
[16] J. Yu, J. Wu, and M. Sarwat. 2015. GeoSpark: a cluster computing framework
     for processing large-scale spatial data. In Proceedings of the 23rd SIGSPATIAL
     International Conference on Advances in Geographic Information Systems. 70:1–
     70:4. https://doi.org/10.1145/2820783.2820860
[17] M. Zaharia, R. S. Xin, P. Wendell, T. Das, M. Armbrust, A. Dave, X. Meng,
     J. Rosen, S. Venkataraman, M. J. Franklin, and et al. 2016. Apache Spark: A
     Unified Engine for Big Data Processing. Commun. ACM 59, 11 (2016), 56–65.
     https://doi.org/10.1145/2934664