=Paper=
{{Paper
|id=None
|storemode=property
|title=Dynamic Partitioning of Big Hierarchical Graphs
|pdfUrl=https://ceur-ws.org/Vol-1018/paper2.pdf
|volume=Vol-1018
|dblpUrl=https://dblp.org/rec/conf/vldb/SpyropoulosK13
}}
==Dynamic Partitioning of Big Hierarchical Graphs==
Dynamic Partitioning of Big Hierarchical Graphs ∗
Vasilis Spyropoulos Yannis Kotidis
Athens University of Economics and Business Athens University of Economics and Business
76 Patission Street 76 Patission Street
Athens, Greece Athens, Greece
vasspyrop@aueb.gr kotidis@aueb.gr
ABSTRACT Greece
Hierarchical graphs are multigraphs, which have as vertices the leaf
nodes of a tree that lays out a hierarchy, and as edges the interac-
tions between the entities represented by these nodes. In this paper Attiki Messinia
we deal with the management of records that are the edges of such
a graph by describing a model that fits well in a number of appli-
cations, many of which deal with very big volumes of streaming
Athens Piraeus Kalamata
distributed data that have to be stored in a way so as their future re-
trieval and analysis will be efficient. We formally define a partition-
ing schema that respects the hierarchy tree, and apply these ideas
by using well known open source big data tools such as Apache
Hadoop and HBase on a small cluster. We built a framework on
which we examine some basic policies for the partitioning of such
graphs and draw interesting conclusions regarding the quality of the
partitions produced and their effectiveness in processing analytical Figure 1: Example of a hierarchical graph - the graph consists
queries drawn from the imposed hierarchy. of the unlabeled nodes, which are also the leaves of a hierarchy
tree, and the edges/interactions between them
1. INTRODUCTION
There are numerous applications such as management and visu-
alization of Telecommunications data [1], Web log mining [2] or subscribers of a telephony network. Then, each call serviced by the
Internet traffic analysis [3], in which data records can be described telephony network instantiates a new directed edge in the graph, be-
as edges between vertices of a hierarchical graph, i.e a directed tween the respective vertices (caller and callee) located at the leaves
multigraph whose vertices are also the leaf nodes in a hierarchy of the hierarchy tree, as is shown in the Figure. This hierarchy is
tree. As an example, Call Detail Records (CDRs) can be naturally exploited in order to help pose queries that seek to retrieve cer-
depicted via a massive graph structure in which nodes represent tain records for further analysis. For instance in order to calculate
customers’ phone numbers and edges between them their calls. At statistics on the out-of-state calls originating in Athens, this inten-
the same time, the nodes of this graph are the leaves of a tree that tion may be described by a query edge between Athens and Greece
indicates their location and superimposes a geographical hierarchy in the tree. Similarly, query edge (Attiki, Attiki) denotes the set of
over this data [4]. calls originating and terminating within this state. The aforemen-
You can see such an example in Figure 1 which presents a small tioned query edges are shown in Figure 1 (dashed lines). Another
part of the hierarchy of locations in Greece. In this Figure, At- example where similar hierarchical graphs exist, is social networks
tiki and Messinia are states of Greece, while Athens, Piraeus and where users are organized in groups according to their location or
Kalamata are cities in these states. The unlabeled nodes represent other characteristics such as age or interests and we need to record
the interactions between them. Users (e.g. analysts) of such data
∗
This research has been co-financed by the European Union (Euro- often need to answer queries regarding interactions or distributions
pean Social Fund ESF) and Greek national funds through the Op- of records between hierarchy groups not necessarily belonging to
erational Program ”Education and Lifelong Learning” of the Na-
tional Strategic Reference Framework (NSRF) - Research Funding the same level of the hierarchy.
Program: RECOST In the aforementioned applications, this kind of graphs can grow
to enormous size. For instance a large telecom provider may ser-
vice hundreds of millions of calls per day, each triggering a new
edge in the graph. Moreover, this data is distributed by nature
as it is being streamed from distant locations (e.g. call centers,
web hosts, ip routers). Thus, we need solutions that can cope with
the volume, but also with the streaming and distributed nature that
characterize this kind of data.
In our work we address these challenges by moving the storage
and processing of these graphs to the cloud. We propose a sys-
tem that uses the distributed data store HBase [5] running on the
Hadoop distributed file system [6], but also MapReduce [7] tech- (Greece,Greece)
niques so as to handle a continuous stream of updates efficiently.
Our system leverages the available degree of hardware parallelism (Attiki,Greece) (Messinia,Greece)
by devising a dynamic partitioning scheme over the streamed edges
of the hierarchical graph. Our techniques aim at generating parti-
(Attiki,Attiki) (Attiki,Messinia)
tions that correspond to clusters of graph edges, which are naturally
mapped to collections of nodes in the hierarchy tree, while respect-
ing the distribution of the streamed records. In this way, analysis of (Attiki,Athens) (Attiki,Piraeus)
the records based on the superimposed hierarchy can be performed
in an efficient manner. Our contributions are:
Figure 2: Example state of a partition tree TP
• We revisit the problem of managing massive hierarchical
graphs that are streamed by many applications of interest.
Our techniques utilize emerging computational and data man- The objective in our work is quite different to vertex partition-
agement platforms for manipulating large, dynamic and dis- ing since we actually do partitioning of the edges of a hierarchical
tributed collections of records in a cluster of machines. Avail- multigraph. These edges represent interactions between nodes that
able parallelism is exploited via a dynamic partitioning need to be investigated according to the imposed hierarchy. In a
scheme we propose for the streamed records. typical scenario aggregation of these edges at the higher levels of
the hierarchy tree is more important for the application, while in
• We formally define the space of choices for partitioning the certain applications, such as analysis of call detail records, deci-
streamed graph, while respecting the hierarchy tree that is sion making based on fine-granularity statistics (i.e. low-level ag-
superimposed over its nodes. We then present a number of gregations) is in-fact prohibited by law, so that certain carriers can-
interesting partitioning policies, and describe the details of not obtain unfair advantage over their competitors. Our techniques
the system we built for implementing our framework while can benefit systems built for visualizing hierarchical multigraphs
utilizing off-the-shelf tools. (e.g. [1]). Moreover, indexing techniques for hierarchical multi-
graphs such as [4] and multi-dimensional indexes over key-value
• We present an experimental evaluation of our system using a stores as in [13] can be incorporated in our system for providing
small cluster of machines. Our results demonstrate the effi- fast access to individual records within the partitions created by
ciency of our system in managing massive graphs scaling to our framework.
millions of edges. We also provide a comparison among the
partitioning policies we implemented based on the results of
a number of experiments that we conducted. 3. SYSTEM OVERVIEW
The rest of the paper is organized as follows. In Section 2 we dis- 3.1 Definitions
cuss related work. In Section 3 we formally introduce our frame-
Assuming a rooted hierarchy tree T , we denote the set of its
work, discuss the type of graph data and queries we consider. Then,
leaves as Leaves(T ). We refer to a subtree rooted at a vertex
we describe a partitioning scheme based on the tree hierarchy that
x ∈ T as Tx . For vertices u and v of T we say that v is de-
accompanies the graph data, a number of partitioning policies that
scendant of u if there is a path descending from u to v, including
we implemented and discuss the architecture of our system. Sec-
the case that u equals v. Vertices u and v are called comparable in
tion 4 presents our experiments and Section 5 contains concluding
T if one of them is descendant of the other and incomparable in T
remarks.
if neither u nor v is a descendant of the other. Also, by depth(u)
we shall refer to the number of edges that have to be crossed so as
2. RELATED WORK to get from u to the tree root node.
Interest in graphs and their applications in data management has A hierarchical graph is a multigraph G(T, V, E), where T is a
been renewed due to the wide spread of fields such as social net- tree, V is the set of vertices in the graph, which are also the leaves
works and the semantic web. In the same time there is a profound of T (V =Leaves(T )) and E is a multiset of edges between the
need for the efficient management of big distributed data. As a nodes in V . We assume that each instance of an edge is labeled
result we can see a lot of recent work done in the area, ranging with a unique identifier in order to be able to assign data on them
from graph databases to distributed graph processing or graph par- (i.e. in the CDRs example, each edge is associated with a unique
titioning techniques. The latter mainly cope with the problem of key that identifies the corresponding CDR).
splitting a large graph by assigning its vertices into independent In order to represent the set of edges between nodes in T and
partitions. While there are several variations of the problem, a typ- their relationships, we define the graph GT 2 . Each possible pair of
ical objective is to obtain partitions such that the sum of the vertex nodes u and v in T is a vertex [u, v] in GT 2 and we refer to u as the
weights across partitions is even while the sum of the inter-partition source node in T and v as the destination node in T . The edges in
edges is minimized [8, 9]. The work in [10] proposes data parti- GT 2 imply a hierarchy inherited from the hierarchy described by
tioning that is guided by the user’s queries. Another approach that tree T . In particular, there is an edge between nodes [u1 , v1 ] and
aims at the partitioning of graphs across clusters of servers in a dy- [u2 , v2 ] in GT 2 if exactly one of the following conditions hold: (i)
namic way by using queries during the runtime of the system can u2 is a child of u1 in T , (ii) v2 is a child of v1 in T .
be found in [11]. Our query-driven partitioning policy described in Graph GT 2 is used in our framework in order to define our par-
Section 3 is motivated by these ideas but the actual setting is dif- titioning scheme on the edges of the hierarchical graph. Moreover,
ferent. In [12] the authors present SPAR, a social partitioning and the nodes of GT 2 , as will be explained in Section 3.2, are used
replication middle-ware that uses the social graph structure in or- in order to model possible queries on this data. Then, the edges of
der to achieve data locality. In our work we also use a structure to GT 2 will determine partitions that contain relevant data for a query.
guide the partitioning but the structure we use is a hierarchy tree. Each vertex [u, v] ∈ GT 2 is a candidate partition Pu,v to be ma-
terialized by our partitioning scheme. Keeping all GT 2 in memory outcome is encoded by a set of nodes that are reachable from node
is not a feasible solution at its size is quadratic on the size of T . [r, r] in graph GT 2 , depending whether the respective edge denotes
As will be explained, our partitioning process progressively splits a parent-child relationship on the source or destination node.
the hierarchical graph and constructs a partition tree TP , which is a After the split, the vertices representing the new partitions are
subgraph of GT 2 . When we need to locate partitions so as to insert added to TP . Vertex [r, r] in TP points no longer to an active par-
new records or answer queries we use TP as our lookup structure. tition but we keep it since it describes the records contained in the
An example partition tree TP which follows the hierarchy of Figure active partitions pointed by its descendants (the new vertices that
1 is shown in Figure 2 we added) and we use this information when we traverse TP in
Finally, by Cu,v we refer to a counter of how many records are order to insert new records or answer a query. This process takes
contained in the partition Pu,v and by threshold to the value that place for every active partition Pu,v when Cu,v grows greater than
when the number of records in a partition grows bigger than, the threshold after the insertion of new records. This way the vertices
partition has to be split in smaller ones and then get dropped. in TP that point, or previously have been pointing, to an active par-
tition, form a hierarchical tree. At any moment the leaves of TP
3.2 Hierarchical Queries point to the active partitions while the inner nodes, including the
In our framework, retrieval of edges belonging to the hierarchi- root, are “aggregations” of these partitions.
cal graph is accomplished via queries that are modeled using the Any node in TP can optionally maintain a series of useful ap-
hierarchy tree T . In particular a query Qu0 ,v0 is denoted as a query plication specific statistics such as the number of records in the
edge in T (see Figure 1). This query denotes our intention to re- partition, aggregations over measures of these records, calculations
trieve all edges that have as source vertices the leaves of Tu0 and as regarding heavy hitters such as top-k sources and top-k destina-
destination vertices the leaves of Tv0 . When such a query arrives we tions, etc. Furthermore, when fast approximate answers are desired
need to be able to decide which of the materialized partitions in TP by the application (for example during exploratory data analysis or
may contain relevant graph edges. In order to achieve that we tra- as a preview while the exact answer is computed) it is also possible
verse TP in a top-down fashion and check each vertex Pu,v against to maintain synopses such as Sketches [14], Histograms [15, 16]
the query Qu0 ,v0 . The partition is considered useful in answering or Wavelets [17, 18] on the nodes of TP . Since these nodes are
the query when their respective source and destination vertices are traversed while new data is added in the partitions, maintenance of
comparable in T . We continue traversing TP descending the use- these synopses can be easily incorporated in the process. While
ful partitions until we get to the active partitions (active partitions these extensions are applicable in our framework, their discussion
are these that contain data and are pointed by the leaf nodes of TP is beyond the scope of this paper.
as explained in Section 3.3), which are returned to the query for In what follows we describe the different split/partitioning poli-
further processing (e.g. filtering of relevant edges). In case that cies that we implemented in our system and used in our experi-
during the traversal we come across a Pu,v for which it stands that ments. First we describe two simple query agnostic policies and
u is equal to u0 and v is equal to v 0 then we stop the traversal and next, in more detail, a partitioning policy that we call Query-Driven
retrieve the leaf nodes in the subtree rooted at Pu,v . In that case, Partitioning that decides the split to materialize by taking under
all the edges in the respective partitions are returned to the user. consideration a set of queries that are most important to the user
In our running example, assuming that the partition tree TP is and makes the split decisions according to them.
at the state shown in Figure 2 and that we have to answer query
QAthens,M essinia that retrieves all CDRs from locations in Athens Query-Agnostic Policies: The first two policies assume no previ-
to locations in the state of Messinia, we first check the root of TP ous knowledge about the interests of the user. Each of them though
which is [Greece, Greece]. Since Athens is comparable to Greece utilizes a different heuristic as explained below.
and Messinia is comparable to Greece we continue with examin- • Round-Robin Partitioning: Round-Robin is a simple ap-
ing the children of [Greece, Greece], which are [Attiki, Greece] proach to partitioning the hierarchical graph. Partitions in
and [Messinia, Greece]. Athens is comparable to Attiki and so is TP that need to split, are split alternately by source or des-
Messinia to Greece, so node [Attiki, Greece] is useful, but Athens tination. This process results in creating balanced partitions
is incomparable to Messinia so we do not have to further investi- in the sense that source and destination nodes in a partition
gate the node [Messinia, Greece] or any node in TM essinia,Greece . have a maximum distance of one hierarchy level. That way
Next we have to check nodes [Attiki, Attiki] and [Attiki, Messinia] the partitions created are not biased towards the source or
which are the children of [Attiki, Greece]. Athens is comparable destination nodes of the constituent edges.
to Attiki but Messinia is incomparable to Attiki so [Attiki, Attiki]
is not considered useful. On the other hand [Attiki, Messinia] is • Min-Split Partitioning: Min-Split partitioning policy is a
useful since Athens is comparable to Attiki and Messinia is compa- heuristic method, which tries to create the minimum number
rable to Messinia. [Attiki, Messinia] is a leaf node in TP and so the of new partitions, when an active partition overflows. This
traversal ends here and the partition pointed by [Attiki, Messinia] policy seems preferable when the goal is to create as few
is the result returned to the query. active partitions as possible, while keeping their size close
to the selected threshold. Thus, when it has to make a split
3.3 Overview of the Partitioning Process choice, it simply chooses between the candidate splits the
one containing fewer partitions.
A high level description of the partitioning process is as follows:
In the beginning let TP consist of just one vertex [r, r] where r Query-Driven Partitioning: In Query-Driven Partitioning we as-
refers to the root of T . That means that we initially materialize sume that we have a prior knowledge of the queries that the users
just one global partition Pr,r containing all possible edges amongst of the system are mostly interested in. So, when we have to make
leaves in T . When Cr,r grows greater than threshold the split a split choice we chose the candidate split that suggests a parti-
process is triggered. The split process decides whether Pr,r will tioning better suited to answer the set of queries. In what follows
be split by its source or destination node, depending on the rules we present the idea of Query-Driven Partitioning for hierarchical
of the chosen partitioning policy, which is discussed later. Each graphs in a formal way.
new data client
Let Pu,v be a partition in the partition tree TP and Qu0 ,v0 be a
query asking for all records having as source and destination all the
nodes that are leaves of the hierarchy tree’s T subtrees Tu0 and Tv0 , Buffer Admission Partitioning
Query
respectively. Recall that a partition is useful for answering a query Storage Manager Engine
Engine
if their respective source and destination nodes are comparable in
T , otherwise the partition is pruned while navigating the partition
tree in search of answers to the query.
For a useful partition, we can define a measure of the overhead Insertion Job Partitioning Job Query Job
that the retrieval of Pu,v adds to the overall cost of answering the MapReduce
query by considering the number of records that belong to Pu,v but
are not part of the result of Qu0 ,v0 . We can use this measure to make Metadata
the split choice for a partition to be split, by calculating the query
answering overhead for each of the candidate splits. Extending this, Data
we can calculate the overhead not just for one query but for a set of HBase
queries that the users are mostly interested in. HDFS
In order to measure the overhead of a useful partition for a given
query, we have to estimate the portion of the partition records that
are not useful for the query, but will have to be retrieved when scan- Figure 3: Framework overview
ning the partition for relevant data. Since both the partition and
the query follow the hierarchy implied by the hierarchy tree T we
should check the partition’s source and destination nodes u and v been explained, our calculations are based on the assumption that
against the query’s source and destination nodes u0 and v 0 , respec- the data distribution within the partition is uniform. Of course, this
tively. We will describe the procedure for u and u0 , but whatever is a bold assumption and we expect that the system may occasion-
we mention holds true also for v an v 0 . ally make wrong decisions, leading to suboptimal splits. An easy
Since nodes u and u0 are comparable (otherwise the partition is workaround is to consider additional statistics, for instance in the
not useful), we have to consider the following three cases: (i) u form of sketches, that will help better estimate the distribution of
equals u0 , (ii) u is a descendant of u0 (depth(u) > depth(u0 ), records within a partition, at the cost of increased overhead due
and (iii) u0 is a descendant of u (depth(u) < depth(u0 )). Let to bookkeeping of these synopses. We leave exploration of such
f itness(u0 , u) denote the portion of the leaf nodes in Tu that are choices as future work.
also leaves in the subtree of Tu0 . In the first and second cases we
can safely infer that f itness(u0 , u) equals to 1, since Tu is con- 3.4 Implementation Overview
tained in Tu0 . On the contrary, in the third case, Tu0 is contained in
We implemented our system as a framework consisting of four
Tu and, thus, f itness(u0 , u) is calculated by considering the ratio
main modules, the Buffer Storage, the Admission Manager, the
of the leaves of Tu0 over the leaves of Tu .
Partitioning Engine and the Query Engine. It uses the well-known
distributed data store HBase running on top of HDFS (the Hadoop
0 , if u and u0 are not comparable
Distributed File System) and also the Hadoop MapReduce engine
, if u and u0 are comparable
1
f itness(u0 , u) =
in order to accomplish tasks such as loading and repartitioning of
and depth(u) ≥ depth(u0 )
|leaves(Tu0 )| the data. We materialize each partition as an HBase table so as to
|leaves(Tu )|
, otherwise make easier the retrieval of records belonging to one partition by
involving the scan of one and only table. Another choice would be
Then, the fitness of the partition for the query is computed as: to use one big table to store all the records and define each partition
space by applying a compound row-key design.
f itness(Qu0 ,v0 , Pu,v ) = f itness(u0 , u) · f itness(v 0 , v) Figure 3 provides a high-level view of the framework. The Buffer
Intuitively, this measure estimates the percentage of records in the Storage module is located at the input of the system and stores tem-
partition that are useful for the query, assuming no additional knowl- porarily the new records that are streamed from distributed sources,
edge on the data distribution is given. into text files. When the size of these records grows bigger than
In case the partition is split into k sub-partitions, assuming a a maximum buffer size, then the Buffer Storage module sends a
uniform distribution of the records in Pu,v , then each of these par- message to the Admission Manager module, which executes the
C task of pulling the records from Buffer Storage and loading them
titions will receive u,vk
of records, where Cu,v is the size of the
into HBase. After each load of new records, Admission Manager
partition. Then, given that we have calculated the fitness for each
checks for partitions that grew over the partitioning threshold. For
of these smaller partitions denoted as f1 , . . . fk , respectively, we
any such partition, Admission Manager passes the required instruc-
compute the overhead of the split as the number of non-useful to
tions to Partitioning Engine, which decides a split according to the
the query records expected to be retrieved from the set of partitions
selected policy amongst the ones described earlier and applies it.
belonging to the candidate split as:
For the support of these tasks, the system maintains a set of meta-
Cu,v X data, such as the hierarchy and partition trees that are used, updated
overhead(Qu0 ,v0 , Split(Pu,v )) = (1 − fi )
k and shared by all the different modules. The actual execution of
i=1...k,fi >0
the tasks is taking place as a number of MapReduce jobs, with the
For a set of queries the cumulative overhead of the split is com- most important of them being the Insertion Job which puts each
puted by summing the estimated overhead for each query. Thus, new record in the appropriate partition by looking up the hierarchy
given a choice of splitting the partition by source of destination, and partition trees, and the Partitioning Job which moves the data
we compare the overheads that we calculated for each of the splits from a splitting partition to the new partitions.
and select to materialize the one with the lowest number. As have Finally, we also implemented a Query Engine, which accepts a
35000 the repartitioning of the schema, when needed. The results of this
20M
200M experiment are summarized in Figure 5. We can see that the Min-
30000
loading/partitioning time Split and Query-Driven policies spent slightly more time in repar-
25000 titioning the hierarchical graph. This is explained since Min-Split
20000
follows a conservative approach of repartitioning the data by taking
the minimum number of splits at each step, resulting in more sub-
15000 sequent splits. The Query-Driven policy is often keen to repartition
10000
the data by extending the partitioning tree in order to best fit the
input queries that drive its selection process. Finally, in Figure 4
5000 we compare the scalability of our framework using a larger number
0
of input data (200 million records) for each policy. Compared to
round-robin min-split query-driven the smaller dataset, the Figure suggest a sublinear increase of the
policy loading times, something that is contributed to the overhead times
of the underlying frameworks (e.g. MapReduce jobs setup) that
Figure 4: 20M vs 200M Records Loading Times affects more (proportionally) the smaller input.
4.3 Partitions’ Quality
user query or set of queries and, by looking up the hierarchy and In order to examine the quality of the partitions created by each
partition trees, discovers the partitions that possibly contain rele- of the policies tested, we propose a measure that we call distance.
vant records, scans them and returns the results to the user. This metric can be used in applications where the goal is to derive
the fewer number of partitions that are each smaller or equal to
4. EXPERIMENTS the selected threshold. Given the actual final partition sizes sizei ,
where i=1 . . . pm and pm is the number of the active partitions for
policy m at the end of the loading phase, we define distancem to
4.1 Experimental Setup be:
All experiments were conducted on a cluster of seven virtual v
machines located at the GRNET’s cloud infrastructure Okeanos1 .
u pm
uX
Each machine had 4 processors, 4 GB of memory, while the disk distancem = t (sizei − threshold)2
sizes varied from 40 to 100 GB. The operating system installed was i=1
Ubuntu Server 12.04 and we used Hadoop version 1.1.2 and HBase Intuitively, a smaller distance value denotes a set of partitions that
version 0.94.6.1. We used one machine as the system master run- are created evenly near, but not exceeding, the selected threshold.
ning the NameNode, JobTracker and HMaster daemons, while each We have calculated and present the value of the distance metric for
of the remaining six slave machines were running instances of the each policy and after each loading/partitioning job in Figure 6(a).
DataNode, TaskTracker and HRegionServer daemons. What is worth noticing in this Figure is that the Min-Split policy,
In order to be able to generate massive graph data records, we while in the beginning was the best amongst the others, later on it
wrote a custom CDR data generator. We used the geopolitical hier- created partitions that had a larger collective distance. This is ex-
archy of Greece as described by the Ministry of Interior2 to create plained by the fact that the choice of the smaller split, leads to many
the hierarchy tree and then we generated a number of phone num- deep splits of the partition tree and, subsequently, when the leaves
bers for each city in it. The resulting hierarchy tree consisted of are reached, the deep partitions that need to be split are getting split
five levels, which from top to bottom are Country, District, State, in high levels of the hierarchy (on the opposite direction) since this
City and Phone Number, each of them having a node count of 1, is the only feasible split left. This fact also leads to the increase
13, 58, 324 and 1134698 nodes, respectively. The phone num- of the number of active partitions that is evident in Figure 6(b).
bers consist of regular phone numbers that make and receive calls, Thus, even though at each step Min-Split makes (locally) optimal
inbound-only phones as those found in customer service depart- decisions regarding the split that leads to the smaller increase of
ments or telemarketing, and outbound-only phones as those used the distance metric, the final resulting partitioning is worse that the
by marketing agencies. Each record in the experiment data sets ones achieved by the other policies.
consists of a source and destination phone number and a unique
call record id. Last, we created a random set of 10 queries which 4.4 Queries Answering
we used to guide the Query-Driven policy, but also to examine the In order to examine the effectiveness of the dynamic partitioning
performance of each policy in answering them. The queries were schema, for each of the policies we ran the set of queries men-
picked randomly amongst all possible queries having as source or tioned in Section 4.1. We used these same queries to guide the
destination inner nodes of the hierarchy tree, meaning we excluded Query-Driven policy. In Figure 6(c) we can see a comparison of the
the root (country) and the leaves (single phone numbers). total records retrieved by each of the policies, and the fraction of
them that were useful in answering these queries. As expected the
4.2 Loading/Partitioning Time Query-Driven policy has better precision than the other two poli-
The first experiment we conducted is a comparison of the load- cies, while the Min-Split policy, which has gone deep in the hierar-
ing times for each of the partitioning policies we implemented. We chy tree, was not able to create partitions suitable for the selected
created a data set of a total of 50 million CDR records and broke set of random queries.
it into an initial ingest set of 20 million records and 10 append sets
of 3 million records each. For each of these 11 steps we traced
the total time it took to insert the records in HBase and perform
5. CONCLUSIONS
In this paper we considered the problem of managing big hierar-
1
https://okeanos.grnet.gr/ chical graphs by exploiting the implied hierarchy so as to partition
2
http://www.ypes.gr/ the data edges in a way that would better support future retrieval
12000 12000 12000
loading loading loading
partitioning partitioning partitioning
10000 10000 10000
time (seconds)
time (seconds)
time (seconds)
8000 8000 8000
6000 6000 6000
4000 4000 4000
2000 2000 2000
0 0 0
1 2 3 4 5 6 7 8 9 10 11 1 2 3 4 5 6 7 8 9 10 11 1 2 3 4 5 6 7 8 9 10 11
loading/partitioning job loading/partitioning job loading/partitioning job
(a) Round-Robin (b) Min-Split (c) Query-Driven
Figure 5: Policies’ loading and partitioning times
8e+06 1800 1.2e+07
round-robin round-robin overhead
7e+06 min-split 1600 min-split result
number of retrieved records
query-driven query-driven 1e+07
6e+06 1400
number of partitions
1200 8e+06
5e+06
distance
1000
4e+06 6e+06
800
3e+06
600 4e+06
2e+06 400
2e+06
1e+06 200
0 0 0
1 2 3 4 5 6 7 8 9 10 11 1 2 3 4 5 6 7 8 9 10 11 round-robin min-split query-driven
loading/partitioning job loading/partitioning job policy
(a) Partitions’ distance (b) Number of active partitions (c) Queries
Figure 6: Comparison between policies
and analysis. We evaluated a number of dynamic partitioning poli- without eigenvectors a multilevel approach,” IEEE Trans.
cies using open source big data tools on a small cluster of nodes. Pattern Anal. Mach. Intell., vol. 29, Nov. 2007.
From the policies considered, the Query-Driven partitioning policy [10] K. Tzoumas, A. Deshpande, and C. S. Jensen,
lead to partition schemes that enable faster analysis of the records, “Sharing-aware horizontal partitioning for exploiting
assuming that some a-priori knowledge of the user queries is given. correlations during query processing,” Proc. VLDB Endow.,
In an uncertain environment, the Round-Robin policy seems to vol. 3, pp. 542–553, Sept. 2010.
result in more balanced partitions with good query performance. [11] S. Yang, X. Yan, B. Zong, and A. Khan, “Towards effective
Moreover, it has been shown to have the best performance in the partition management for large graphs,” in Proceedings of
loading and partitioning processes. ACM SIGMOD, pp. 517–528, 2012.
[12] J. M. Pujol, V. Erramilli, G. Siganos, X. Yang, N. Laoutaris,
6. REFERENCES P. Chhabra, and P. Rodriguez, “The little engine(s) that
[1] J. Abello and J. Korn, “Visualizing massive multi-digraphs,” could: scaling online social networks,” SIGCOMM Comput.
in Proceedings of INFOVIS, pp. 39–47, 2000. Commun. Rev., vol. 40, pp. 375–386, Aug. 2010.
[2] A. Broder, R. Kumar, F. Maghoul, P. Raghavan, [13] S. Nishimura, S. Das, D. Agrawal, and A. E. Abbadi,
S. Rajagopalan, R. Stata, A. Tomkins, and J. Wiener, “Graph “MD-HBase: A scalable multi-dimensional data
structure in the web,” Comput. Netw., vol. 33, June 2000. infrastructure for location aware services,” 2011.
[3] M. Faloutsos, P. Faloutsos, and C. Faloutsos, “On power-law [14] G. Cormode and S. Muthukrishnan, “An improved data
relationships of the internet topology,” SIGCOMM Comput. stream summary: the count-min sketch and its applications,”
Commun. Rev., vol. 29, pp. 251–262, Aug. 1999. J. Algorithms, vol. 55, no. 1, pp. 58–75, 2005.
[4] J. Abello and Y. Kotidis, “Hierarchical graph indexing,” [15] F. Reiss, M. N. Garofalakis, and J. M. Hellerstein, “Compact
Proceedings of the twelfth international conference on histograms for hierarchical identifiers,” in VLDB, 2006.
Information and knowledge management, 2003. [16] A. C. Gilbert, S. Guha, P. Indyk, Y. Kotidis,
[5] “Apache HBase.” http://hbase.apache.org/. S. Muthukrishnan, and M. Strauss, “Fast, small-space
algorithms for approximate histogram maintenance,” in
[6] “Apache Hadoop.” http://hadoop.apache.org/.
STOC, pp. 389–398, 2002.
[7] J. Dean and S. Ghemawat, “MapReduce : Simplified data
[17] A. Deligiannakis, M. N. Garofalakis, and N. Roussopoulos,
processing on large clusters,” Communications of the ACM,
“Extended wavelets for multiple measures,” ACM Trans.
vol. 51, no. 1, pp. 1–13, 2008.
Database Syst., vol. 32, no. 2, 2007.
[8] A. Abou-Rjeili and G. Karypis, “Multilevel algorithms for
[18] A. C. Gilbert, Y. Kotidis, S. Muthukrishnan, and M. Strauss,
partitioning power-law graphs,” in Proceedings of IPDPS,
“One-pass wavelet decompositions of data streams,” IEEE
pp. 124–124, 2006.
Trans. Knowl. Data Eng., vol. 15, no. 3, pp. 541–554, 2003.
[9] I. S. Dhillon, Y. Guan, and B. Kulis, “Weighted graph cuts