=Paper= {{Paper |id=None |storemode=property |title=Scalable and Robust Management of Dynamic Graph Data |pdfUrl=https://ceur-ws.org/Vol-1018/paper7.pdf |volume=Vol-1018 |dblpUrl=https://dblp.org/rec/conf/vldb/LabouseurOH13 }} ==Scalable and Robust Management of Dynamic Graph Data== https://ceur-ws.org/Vol-1018/paper7.pdf
Scalable and Robust Management of Dynamic Graph Data∗
                      Alan G. Labouseur, Paul W. Olsen Jr., and Jeong-Hyon Hwang
                                              {alan, polsen, jhh}@cs.albany.edu
             Department of Computer Science, University at Albany – State University of New York, USA



ABSTRACT                                                               the open source Neo4j [22], and others [2, 5, 6, 7, 12, 14].
Most real-world networks evolve over time. This evolution              They, however, lack support for efficiently managing large
can be modeled as a series of graphs that represent a net-             graph snapshots. Our G* system [13, 27] efficiently stores
work at different points in time. Our G* system enables                and queries graph snapshots on multiple worker servers by
efficient storage and querying of these graph snapshots by             taking advantage of the commonalities among snapshots.
taking advantage of the commonalities among them. We                   DeltaGraph [16] achieves a similar goal. Our work is com-
are extending G* for highly scalable and robust operation.             plementary to DeltaGraph in that it focuses on new chal-
   This paper shows that the classic challenges of data distri-        lenges in data distribution and robustness in the context of
bution and replication are imbued with renewed significance            continuously creating large graph snapshots.
given continuously generated graph snapshots. Our data                    Single-graph systems typically distribute the entirety of
distribution technique adjusts the set of worker servers for           a single graph over all workers to maximize the benefits of
storing each graph snapshot in a manner optimized for pop-             parallelism. When there are multiple graph snapshots, how-
ular queries. Our data replication approach maintains each             ever, distributing each snapshot on all workers may slow
snapshot replica on a different number of workers, making              down query execution. In particular, if multiple snapshots
available the most efficient replica configurations for differ-        are usually queried together, it is more advantageous to
ent types of queries.                                                  store each snapshot on fewer workers as long as the over-
                                                                       all queried data are balanced over all workers. In this way,
                                                                       the system can reduce network overhead (i.e., improve query
1.   INTRODUCTION                                                      speed) while benefiting from high degrees of parallelism. We
   Real-world networks, including social networks and the              present a technique that automatically adjusts the number
Web, constantly evolve over time [3]. Periodic snapshots of            of workers in a manner optimized for popular queries.
such a network can be represented as graphs where vertices                As implied above, there are vast differences in execution
represent entities and edges represent relationships between           time depending on the distribution configurations and the
entities. These graph snapshots allow us to analyze the evo-           number of snapshots queried together. Replication gives us,
lution of a network over time by examining variations of               in addition to enhanced system reliability, the opportunity
certain features, such as the distribution of vertex degrees           to utilize as many distribution configurations as there are
and clustering coefficients [17], network density [20], the size       replicas. G* constructs r replicas for each snapshot to toler-
of each connected component [17, 18], the shortest distance            ate up to r − 1 simultaneous worker failures. Our technique
between pairs of vertices [20, 23], and the centrality or eccen-       classifies queries into r categories and optimizes the distri-
tricity of vertices [23]. Trends discovered by these analyses          bution of each replica for one of the query categories.
play a crucial role in sociopolitical science, marketing, se-             In this paper, we make the following contributions:
curity, transportation, epidemiology, and many other areas.
                                                                          • We define the problem of distributing graph snapshots
For example, when vertices represent people, credit cards,
                                                                            and present a solution that expedites queries by ad-
and consumer goods, and edges represent ownership and
                                                                            justing the set of workers for storing each snapshot.
purchasing relationships, disruptions in degree distribution,
viewed over time, may indicate anomalous behavior, perhaps                • We provide a technique for adaptively determining
even fraud.                                                                 replica placement to improve system performance and
   Several single-graph systems are available today: Google’s               reliability.
Pregel [21], Microsoft’s Trinity [29], Stanford’s GPS [24],
                                                                          • We present preliminary evaluation results that show
∗This work is supported by NSF CAREER Award                                 the effectiveness of the above techniques.
IIS-1149372.                                                              • We discuss our research plans to complete the con-
                                                                            struction of a highly scalable and reliable system for
                                                                            managing large graph snapshots.
                                                                          The remainder of the paper is organized as follows: Sec-
                                                                       tion 2 presents the research context and provides formal
                                                                       definitions of the problems studied in the paper. Sections 3
                                                                       and 4 describe our new techniques for distributing and repli-
                                                                       cating graph snapshots. Section 5 presents our preliminary
                                                                       evaluation results. Section 6 discusses related work. Sec-
                                                                       tion 7 concludes this paper.


                                                                   1
      (3/4, {G1}), (4/5, {G2}), (5/6, {G3})
         avg
       average                                                                                                             G1 c                CGI
      (1, 2, {G1, G2, G3}), (1, 1, {G1, G2, G3}), (2, 0, {G1}), (3, 1, {G2} (4, 2, {G3})                                                                     G1 G2 G3
       union
                                                                                                                               d
      (1, 2, {G1,G2,G3})                                                                                                                       {G1} {G2,+G3}           {G1,+G2} {G3}
     count_sum
     count, sum
                             (1, 1, {G1,G2,G3})
                             count_sum
                                                    (2, 0, {G1}), (3, 1, {G2}), (4, 2, {G3}))
                                                    count_sum
                                                                                                                           G2 c     e               c            ce    d              df
      (a, 2, {G1,G2,G3})     (b, 1, {G1,G2,G3})     (c, 0, {G1}), (d, 0, {G1, G2}), (c, 1, {G2, G3}), ...                      d
       degree                    degree                 degree                                                                                 c1       c2       e1   d1     d2       f1
      (a, ..., {G1,G2,G3})
       vertex
                             (b, ..., {G1,G2,G3})
                                 vertex
                                                    (c, ..., {G1}), (d, ..., {G1, G2}), (c, ...,{G2, G3}), ...
                                                        vertex
                                                                                                                           G3 c     e               c        e    e    d          f        f
          a      c                                       c                        c      e                                     d    f           disk
                                   b      d                          d                          d      f
          b
     {G1,G2,G3}                  {G1,G2,G3}             {G1}      {G1,G2} {G2,G3}               {G3}                 Figure 2: Storage of Snapshots G1 , G2 , G3 and CGI
  α                          β                      γ

 Figure 1: Parallel Calculation of Average Degree                                                                     PageRank Query      Shared-Nothing              Shared-Everything
                                                                                                                      One snapshot           285 seconds                    22 seconds
                                                                                                                      All snapshots          285 seconds                   2,205 seconds
2.        BACKGROUND
                                                                                                                     Table 1: Impact of Graph Snapshot Configuration
2.1           Summary of G*                                                                                          Figure 1 shows how the average degree calculation takes
   G* is a distributed system for managing large graph snap-                                                         place in parallel over three workers. The vertex and degree
shots that represent an evolving network at different points                                                         operators in Figure 1 compute the degree of each vertex
in time [13, 27]. As Figure 1 shows, these graph snapshots                                                           only once while associating the result with all of the rele-
(e.g., G1 , G2 , and G3 ) are distributed over workers (e.g., α,                                                     vant graph snapshots (e.g., the degree of vertex a is shared
β, and γ) that both store and query the graph data assigned                                                          across G1 , G2 , and G3 ). In the example, the count sum oper-
to them. The master of the system (not shown in Figure 1)                                                            ators aggregate the degree data, the union operator merges
transforms each submitted query into a network of opera-                                                             these data, and the avg operator produces the final result.
tors that process graph data on workers in a parallel fashion.                                                       Details of our graph processing operators and programming
Our previous work on G* can be summarized as follows:                                                                primitives for easy implementation of custom operators are
Graph Storage. In G*, each worker efficiently stores its                                                             provided in our earlier papers [13, 27].
data by taking advantage of commonalities among graph
snapshots. Figure 2 shows how worker γ from Figure 1 in-                                                             2.2     Problem Statements
crementally stores its portion of snapshots G1 , G2 , and G3                                                           Our previous work [13, 27] focused on efficiently storing
on disk. The worker stores c1 and d1 , the first versions of                                                         and querying graph snapshots. We now take up the chal-
c and d, when it stores G1 . When vertex c obtains a new                                                             lenge of doing so in a highly scalable and robust manner.
edge to e in G2 , the worker stores c2 , the second version of
c, which shares commonalities with the previous version and                                                          2.2.1    Multiple Snapshot Distribution
also contains a new edge to e. When vertex d obtains a new                                                              Accelerating computation by distributing data over mul-
edge to f in G3 , the worker stores d2 , the second version of d                                                     tiple servers has been a popular approach in parallel
which contains a new edge to f . All of these vertex versions                                                        databases [10] and distributed systems [9]. Furthermore,
are stored on disk only once regardless of how many graph                                                            techniques for partitioning graphs to facilitate parallel com-
snapshots they belong to.                                                                                            putation have also been developed [15, 24, 25, 26]. How-
   To track all of these vertex versions, each worker main-                                                          ever, distributing large graph snapshots over multiple work-
tains a Compact Graph Index (CGI) that maps each com-                                                                ers raises new challenges. In particular, it is not desirable
bination of vertex ID and graph ID onto the disk location                                                            to use traditional graph partitioning techniques which con-
that stores the corresponding vertex version. For each vertex                                                        sider only one graph at a time and incur high overhead
version (e.g., c2 ), the CGI stores only one (vertex ID, disk                                                        given a large number of vertices and edges. Solutions to
location) pair in a collection for the combination of snap-                                                          this problem must (re)distribute with low overhead graph
shots that contain that vertex version (e.g., {G2 , G3 }). In                                                        snapshots that are continuously generated and take advan-
this manner, the CGI handles only vertex IDs and disk lo-                                                            tage of the property that query execution time depends on
cations while all of the vertex and edge attributes are stored                                                       both the number of snapshots queried and the distribution
on disk. Therefore, the CGI can be kept fully or mostly in                                                           of the graph snapshots as illustrated below.
memory, enabling fast lookups and updates. To prevent the
                                                                                                                     Example. Consider a scenario where each of 100 similarly-
CGI from becoming overburdened by managing too many
                                                                                                                     sized graph snapshots contains approximately 1 million ver-
snapshot combinations, each worker automatically groups
                                                                                                                     tices and 100 million edges. Assume also that the system
snapshots and then separately indexes each group of snap-
                                                                                                                     consists of one master and 100 workers. Table 1 compares
shots [13, 27].
                                                                                                                     two snapshot distribution configurations: Shared-Nothing,
Query Processing. Like traditional database systems, G*                                                              where each of the 100 snapshots is stored on one dis-
supports sophisticated queries using a dataflow approach                                                             tinct worker, and Shared-Everything, where each snapshot
where operators process data in parallel. To quickly process                                                         is evenly distributed over all of the 100 workers. For each of
queries on multiple graph snapshots, however, G* supports                                                            these configurations, two types of queries for computing the
special operators that share computations across snapshots.                                                          PageRank of each vertex are executed: Query One Snap-


                                                                                                                 2
shot, and Query All Snapshots. The explanations below are                             G1,1        G1,2              G3,1         G3,2
based on our evaluation results (see Section 5 for details).
   In the case of Shared-Nothing, querying one snapshot us-                           G2,1        G3,1              G2,1         G1,1
ing only one worker takes 285 seconds (205 seconds to con-                            G2,2        G3,2              G2,2         G1,2
struct the snapshot from disk and 80 seconds to run 20 iter-
ations of PageRank). Querying all snapshots on all workers                        !           "                 !            "
in parallel takes the same amount of time. When the Shared-
Everything configuration is used, querying one snapshot on                       (a) Before Exchange          (b) After Exchange
all workers takes approximately 22 seconds, mainly due to                  Figure 3: Exchanging Segments. If snapshots G1 and
network communications for the edges that cross worker                     G2 are queried together frequently, workers α and β in Fig-
boundaries (the disk I/O and CPU costs correspond to only                  ure 3(a) can better balance the workload and reduce the
205/100 seconds and 80/100 seconds, respectively, due to                   network overhead by swapping G1,1 and G3,1 .
the distribution of the snapshot over 100 workers). In this
configuration, querying 100 snapshots takes 2,205 seconds as               or all of the snapshots, find a replica distribution {Vi,j,w :
the PageRank of each vertex varies across graph snapshots,                 i = 1, 2, · · P
                                                                                         · ∧ j = 1, 2, · · · , r ∧ w = 1, 2, · · · , n} that
thereby causing 100 times more message transmissions than                  minimizes       q∈Q time(q, {Vi,j,w }) where Vi,j,w denotes the
the previous case. This example shows the benefits of dif-                 set of vertices that are from the jth replica Gi,j (Vi,j , Ei,j )
ferent snapshot distribution approaches for different types                of snapshot Gi (Vi , Ei ) and that are assigned to worker w,
of queries (e.g., Shared-Nothing for queries on all snapshots              and time(q, {Vi,j,w }) denotes the execution time of query q
and Shared-Everything for queries on one snapshot).                        on the distributed snapshot replicas {Vi,j,w } satisfying (1)
Formal Definition. Our ultimate goal is to keep track                      ∪nw=1 Vi,j,w = Vi,j = Vi for j = 1, 2, · · · , r, (i.e., the parts of

of the popularity of graph snapshots and to optimize the                   a snapshot replica on all workers cover the original replica),
storage/distribution of unpopular snapshots for space effi-                (2) Vi,j,w ∩ Vi,j,w0 = ∅ if w 6= w0 (i.e., workers are assigned
ciency (Section 2.1) and popular snapshots for query speed.                disjoint parts of a snapshot replica), and (3) Vi,j,w ∩Vi,j 0 ,w =
In this paper, we focus on the problem of distributing pop-                ∅ if j 6= j 0 (i.e., no worker w contains multiple copies of
ular snapshots over workers in a manner that minimizes the                 a vertex and its edges, which tolerates r − 1 simultaneous
execution time of queries on these snapshots. This problem                 worker failures).
can be formally defined as follows:
                                                                           Section 4 presents our solution to the above problem.
   Problem 1. (Snapshot Distribution) Given a series
of graph snapshots {Gi (Vi , Ei ) : i = 1, 2, · · · }, n workers,          3.     GRAPH SNAPSHOT DISTRIBUTION
and a set of queries Q on some or all of the snapshots, find                  As mentioned in Section 2.2.1, G* needs to store each
            P {Vi,w : i = 1, 2, · · · ∧ w = 1, 2, · · · , n} that
a distribution                                                             graph snapshot on an appropriate number of workers while
minimizes q∈Q time(q, {Vi,w }) where Vi,w denotes the set                  balancing the utilization of network and CPU resources.
of vertices that are from snapshot Gi (Vi , Ei ) and that are              In contrast to traditional methods for partitioning a static
assigned to worker w, and time(q, {Vi,w }) represents the ex-              graph [15, 25], G* must determine the location of each ver-
ecution time of query q ∈ Q on the distributed snapshots                   tex and its edges on the fly in response to a continuous influx
{Vi,w } satisfying (1) ∪nw=1 Vi,w = Vi (i.e., the parts of a               of data from external sources.
snapshot on all workers cover the original snapshot) and (2)                  Our dynamic data distribution approach meets the above
Vi,w ∩Vi,w0 = ∅ if w 6= w0 (i.e., workers are assigned disjoint            requirements. In this approach, each G* worker partitions
parts of a snapshot).                                                      its graph data into segments with a certain maximum size
Our solution to the above problem is presented in Section 3.               (e.g., 10GB) so that it can control its load by migrating
                                                                           some segments to other workers (Section 3.1). Our ap-
2.2.2     Snapshot Replication                                             proach continuously routes incoming messages for updating
   There have been various techniques for replicating data to              vertices and edges to appropriate workers with low latency
improve availability and access speed [8, 11, 28]. A central               (Section 3.2). When a segment becomes full, G* splits that
data replication challenge in G* is to distribute each replica             segment into two that are similar in size while maintaining
of a snapshot over a possibly different number of workers                  data locality by keeping data accessed together within the
to maximize both performance and availability. For each                    same segment (Section 3.3). It does all of the above while
query, the most beneficial replica also needs to be found                  supporting G*’s graph processing operators (Section 3.4).
according to the characteristics of the query (e.g., the num-
ber of snapshots queried). If two replicas of a graph snap-
                                                                           3.1     Load Balancing
shot are distributed using the Shared-Nothing and Shared-                     In G*, each worker periodically communicates with a ran-
Everything approaches, queries on a single snapshot should                 domly chosen worker to balance graph data. Our key prin-
use the Shared-Everything replica configuration rather than                ciples in load balancing are to (1) maximize the benefits of
the other. In practice, however, each query can access an                  parallelism by uniformly distributing data that are queried
arbitrary number of graph snapshots (not necessarily one or                together and (2) minimize network overhead by co-locating
all), thereby complicating the above challenges. The prob-                 data from the same snapshot. Consider Figure 3(a) where
lem of replicating graph snapshots can be defined as follows:              three snapshots are partitioned into a total of 6 similarly-
                                                                           sized segments. In this example, each of workers α and β
   Problem 2. (Snapshot Replication) Given a series                        are assigned a segment from snapshot G1 , α is assigned two
of graph snapshots {Gi (Vi , Ei ) : i = 1, 2, · · · }, the degree of       segments from G2 , and β is assigned two segments from
replication r, n workers, and a set of queries Q on some                   G3 . If snapshots G1 and G2 are frequently queried together


                                                                       3
(see those shaded in Figure 3(a)), this snapshot distribu-               segment and then moves a half of the data from the previ-
tion leads to inefficient query execution due to imbalanced              ous segment to the new segment. To minimize the number
workload between the workers and network communications                  of edges that cross segment boundaries, we use a traditional
for the edges between G1,1 and G1,2 . This problem can be                graph partitioning method [15]. Whenever a segment is split
remedied by exchanging G1,1 and G3,1 between the workers,                as above, the worker also updates the (vertex ID, segment
which results in a balanced distribution of the data queried             ID) pairs for all of the vertices migrated to the new segment.
together (i.e., G1 and G2 ) and localized processing of G1 on            This update process incurs relatively low overhead since the
β and G2 on α, respectively.                                             index can usually be kept in memory as in the case of the
    Given a pair of workers, our technique estimates, for each           CGI (Section 2.1). If a worker splits a segment which was
segment, the benefit of migrating that segment to the other              obtained from another worker, it sends the update infor-
worker, and then performs the most beneficial migration.                 mation to the worker that originally created it in order to
This process is repeated a maximum number of times or un-                enable data forwarding as mentioned in Section 3.2.
til the migration benefit falls below a predefined threshold.
The benefit of migrating a segment is calculated by multi-               3.4   Supporting Graph Processing Operators
plying the probability that the segment is queried with the                G*’s graph processing operators, such as those for com-
expected reduction in query time (i.e., the difference be-               puting clustering coefficients, PageRank, or the shortest dis-
tween expected query time before and after migration).                   tance between vertices, are usually instantiated on every
    For a set Si of segments on worker i and another set Sj of           worker that stores relevant graph data [13]. These opera-
segments
    P         on worker j, the expected query time is computed           tors may exchange messages to compute a value for each
as q∈Qk p(q) · time(q, Si , Sj ) where Qk is a collection of k           vertex (e.g., the current shortest distance from a source ver-
popular query patterns, p(q) is the probability that query               tex). If an operator needs to send a message to a vertex, the
pattern q is executed, and time(q, Si , Sj ) denotes the esti-           message is first sent to the worker whose ID corresponds to
mated duration of q given segment placements Si and Sj .                 the hash value of the vertex ID. This worker then forwards
    Our technique obtains Qk (equivalently, k popular combi-             the message to the worker that currently stores the vertex.
nations of segments queried together) as follows: Sort seg-              This forwarding mechanism is similar to that for handing
ments from Si ∪ Sj in order of decreasing popularity. Ini-               updates of vertices and edges (Section 3.2).
tialize Qk (for storing k popular query patterns) with the
first segment. Then, for each of the remaining segments,                 4.    GRAPH SNAPSHOT REPLICATION
combine it with each element from Qk and insert the result                  G* masks up to r − 1 simultaneous worker failures by cre-
back into Qk . Whenever |Qk | > k, remove its least popular              ating r copies of each graph data segment. As discussed in
element. We estimate the popularity of each combination                  Sections 2.2 and 3, the optimal distribution of each graph
of segments by consolidating the counting synopses [4] for               snapshot over workers may vary with the number of snap-
those segments. Whenever a query accesses a segment, the                 shots frequently queried together. Based on this observa-
associated synopsis is updated using the ID of the query.                tion, we developed a new data replication technique that
    We compute time(q, Si , Sj ) as max(c(q, Si ), c(q, Sj )) +          speeds up queries by configuring the storage of replicas to
c0 (q, Si , Sj ) where c(q, Si ) is the estimated duration of pro-       benefit different categories of queries. This approach uses
cessing the segments from Si for query q, and c0 (q, Si , Sj )           an online clustering algorithm [1] to classify queries into r
represents the estimated time for exchanging messages be-                categories based on the number of graphs that they access.
tween workers i and j for query q.                                       It then assigns the j-th replica of each data segment to a
                                                                         worker in a manner optimized for the j-th query category.
3.2    Updates of Vertices and Edges                                     The master and workers support this approach as follows:
   Each new vertex (or any edge that emanates from the ver-
tex) is first routed to a worker chosen according to the hash            4.1   Updates of Vertices and Edges
value of the vertex ID. That worker assigns such a vertex to               Updates of vertices and edges are handled as described in
one of its data segments while saving the (vertex ID, segment            Section 3.2 except that they are routed to r data segment
ID) pair in an index similar to the CGI (Section 2.1). If a              replicas on different workers. For this reason, each worker
worker receives an edge that emanates from an existing ver-              keeps a mapping that associates each segment ID with the
tex v, it assigns that edge to the segment that contains v. If           r workers that store a replica of the segment. Our approach
a worker w has created a segment S and then migrated it to               protects this mapping on worker w by replicating it on work-
another worker w0 for load balancing reasons (Section 3.1),              ers (w+1)%n, (w+2)%n, · · · , (w+r−1)%n where n denotes
worker w forwards the data bound to S to w0 . To support                 the number of workers. If a worker fails, the master assigns
such data forwarding, each worker keeps track of the worker              another worker to take over.
location of each data segment that it has created before.
Updates of vertices and edges, including changes in their at-            4.2   Splitting a Full Segment
tribute values, are handled as in the case of edge additions.              The replicas of a data segment are split in the same way
This assignment of graph data to workers is scalable because             due to the use of a deterministic partition method. For each
it distributes the overhead of managing data over workers.               vertex migrated from one data segment to another, the r
It also proceeds in a parallel, pipelined fashion without any            workers that keep track of that vertex update their (vertex
blocking operations.                                                     ID, segment ID) pairs accordingly.

3.3    Splitting a Full Segment                                          4.3   Query-Aware Replica Selection
  If the size of a data segment reaches the maximum (e.g.,                 For each query, the master identifies the worker locations
10GB), the worker that manages the segment creates a new                 of the data segment replicas to process. To this end, the


                                                                     4
 Message passing (12-bytes/message)       1M messages/sec                 SSSP Query       All Workers     Subset of Workers
 Disk I/O bandwidth                       200 Mbytes/sec                  One snapshot     8.2 seconds        19.2 seconds
 Snapshot construction in memory          200 seconds                     All snapshots    80.5 seconds       53.2 seconds
 PageRank iteration per snapshot          4 seconds
                                                                          Table 4: Impact of Graph Data Distribution
      Table 2: Speed and Bandwidth Observations

                                                                     shows that the relative benefit of data distribution (i.e., the
      # Cores    1    2     4     8     16    24     48              speedup relative to the number of workers) tends to decrease
      Speedup   1.0   1.9   3.7   5.9   9.7   12.5   14.7            with more workers. This is mainly due to increased network
                                                                     traffic, which shows the importance of balancing CPU and
           Table 3: Actual Speedup Result                            network resources in the context of continuously creating
                                                                     large graph snapshots.
master keeps track of the mapping between graph snapshots              The effectiveness of two different distributions is demon-
and the data segments that constitute them. The master               strated in Table 4. If most queries access only the largest
also maintains the mapping between data segment replicas             snapshot, then it is beneficial to distribute that snapshot
and the workers that store them. Using these mappings, the           over all workers to maximize query speed. On the other
master selects one replica for each data segment such that           hand, if all of the snapshots are queried together, our ap-
the overall processing load is uniformly distributed over a          proach stores each graph on a smaller subset of workers to
large number of workers and the expected network overhead            reduce network overhead. In this case, all of the workers
is low. Next, as Figure 1 shows, the master instantiates             can still be used in parallel since the entire graph data is
operators on these workers and starts executing the query.           distributed over all workers. The benefits of distribution
                                                                     configurations are less pronounced in Table 4 than Table 1
4.4     Load Balancing                                               due to a smaller number of message transmissions and fewer
   Each worker balances its graph data as explained in Sec-          workers. Table 4 also demonstrates the benefit of G* in
tion 3. The only difference is that whenever a query of cat-         executing queries on multiple snapshots. In particular, the
egory j accesses a replica of a data segment, the counting           time for processing 500 snapshots (e.g., 80.5 seconds) is only
synopsis of the j-th replica of the data segment is updated          up to 10 times longer than that for processing the largest
using the ID of the query (Section 3.1). In this way, the j-th       snapshot (e.g., 8.2 seconds) since the computations on the
replica of each segment is assigned to a worker in a manner          largest snapshot are shared across smaller snapshots.
optimized for query category j.

                                                                     6.   RELATED WORK
5.     PRELIMINARY EVALUATION
                                                                       In this section, we briefly summarize related research, fo-
   This section presents our preliminary results obtained by         cusing on previous graph systems, data distribution, and
running G* on a six-node, 48-core cluster. In this cluster,          data replication.
each machine has two Quad-Core Xeon E5430 2.67 GHz
CPUs, 16GB RAM, and a 2TB hard drive. We plan to                     Previous Graph Systems. In contrast to systems which
extend these experiments with more queries on larger data            process one graph at a time [2, 5, 6, 7, 12, 14, 21, 22, 24,
sets in a bigger cluster (Section 7).                                29], G* efficiently executes sophisticated queries on multi-
   To construct a realistic example in Section 2.2.1, we mea-        ple graph snapshots. G*’s benefits over previous systems are
sured the overhead of key operations summarized in Table 2.          experimentally demonstrated in our prior work [13]. Delt-
In our evaluation, a worker was able to transmit up to 1 mil-        aGraph [16] and GraphChi [19] are promising systems for
lion messages to other workers within a second, although a           dynamic graphs but do not directly address the data distri-
1Gbps connection may enable 10 million transmissions of              bution/replication issues considered in this paper.
12-byte messages in theory. The reason behind this result
                                                                     Data Distribution. Traditional graph partitioning tech-
is that there is inherent overhead when writing and creating
                                                                     niques split a static graph into subgraphs in a manner that
message objects to and from TCP sockets in Java. Further-
                                                                     minimizes the number of crossing edges [15, 25]. There are
more, reading approximately 1Gbytes of data from disk to
                                                                     also recent graph repartitioning schemes that observe com-
construct a graph snapshot took 5 seconds. However, con-
                                                                     munication patterns and then move vertices to reduce net-
structing a snapshot in memory by creating 100 million edge
                                                                     work overhead [24, 26]. In contrast to them, our technique
objects and registering them in an internal data structure
                                                                     dynamically adjusts the number of workers that store each
took approximately 200 seconds.
                                                                     graph snapshot according to the real-time influx of graph
   In the next set of experiments, we created a series of 500
                                                                     data and popular types of queries (Section 3).
graph snapshots using a binary tree generator. Each snap-
shot in the series was constructed by first cloning the pre-         Data Replication. There has been extensive work on data
vious snapshot and then inserting 20,000 additional vertices         replication that focused on improving data availability and
and edges to the new graph. Therefore, the last graph in             performance [8, 11, 28]. Researchers developed techniques
the series contained 10 million vertices. We ran a query             for ensuring replica consistency [11] and finding most advan-
that computes, for each graph, the distribution of the short-        tageous replica placement [8]. Stonebraker et al. proposed
est distances from the root to all other vertices. Table 3           an approach that stores each database replica differently,
shows, for the shortest distance query, the speedup achieved         optimized for a different query type [28]. While our repli-
by distributing the snapshots over more workers. The high-           cation approach has some similarity in terms of high-level
est speedup was achieved with 48 workers. This table also            ideas, it is substantially different in that it distributes each


                                                                 5
graph snapshot over a different number of workers to speed           [10] D. DeWitt, R. Gerber, G. Graefe, M. Heytens,
up different types of queries.                                            K. Kumar, and M. Muralikrishna. Gamma - A High
                                                                          Performance Dataflow Database Machine. In VLDB,
7.   CONCLUSIONS AND FUTURE WORK                                          pages 228–237, 1986.
   We presented G*, a scalable and robust system for storing         [11] J. Gray, P. Helland, P. E. O’Neil, and D. Shasha. The
and querying large graph snapshots. G* tackles new data                   Dangers of Replication and a Solution. In SIGMOD,
distribution and replication challenges that arise in the con-            pages 173–182, 1996.
text of continuously creating large graph snapshots. Our             [12] D. Gregor and A. Lumsdaine. The Parallel BGL: A
data distribution technique efficiently stores graph data on              Generic Library for Distributed Graph Computations.
the fly using multiple worker servers in parallel. This tech-             In POOSC, 2005.
nique also gradually adjusts the number of workers that              [13] J.-H. Hwang, J. Birnbaum, A. Labouseur, P. W. Olsen
store each graph snapshot while balancing network and CPU                 Jr., S. R. Spillane, J. Vijayan, and W.-S. Han. G*: A
overhead to maximize overall performance. Our data repli-                 System for Efficiently Managing Large Graphs.
cation technique maintains each graph replica on a different              Technical Report SUNYA-CS-12-04, CS Department,
number of workers, making available the most efficient stor-              University at Albany – SUNY, 2012.
age configurations for various combinations of queries.              [14] U. Kang, C. E. Tsourakakis, and C. Faloutsos.
   We are working on full implementations of the techniques               PEGASUS: A Peta-Scale Graph Mining System. In
presented in this paper to enable new experiments with ad-                ICDM, pages 229–238, 2009.
ditional queries on larger data sets. We will analyze these          [15] G. Karypis and V. Kumar. Analysis of Multilevel
techniques to classify their complexity. We plan to look                  Graph Partitioning. In SC, page 29, 1995.
into the challenges of scheduling groups of queries, dealing         [16] U. Khurana and A. Deshpande. Efficient Snapshot
with varying degrees of parallelism, resource utilization, and            Retrieval over Historical Graph Data. CoRR,
user-generated performance preferences. We are exploring                  abs/1207.5777, 2012.
failure recovery techniques for long-running queries while           [17] G. Kossinets and D. J. Watts. Empirical Analysis of
exposing the tradeoff between recovery speed and execution                an Evolving Social Network. Science, 311(5757):88–90,
time. We also want to study opportunities for more granu-                 2006.
lar splitting, merging, and exchanging of data at the vertex         [18] R. Kumar, J. Novak, and A. Tomkins. Structure and
and edge level rather than in large segments as discussed                 Evolution of Online Social Networks. In KDD, pages
in this paper. We intend to seek opportunities for gains in               611–617, 2006.
execution speed at the expense of storage space by segre-            [19] A. Kyrola, G. Blelloch, and C. Guestrin. Graphchi:
gating recent and popular “hot” data (which we could store                large-scale graph computation on just a pc. In OSDI,
in a less compressed manner) from less popular “cold” data                pages 31–46, 2012.
(which could be highly compressed).
                                                                     [20] J. Leskovec, J. M. Kleinberg, and C. Faloutsos.
                                                                          Graphs over Time: Densification Laws, Shrinking
8.   REFERENCES                                                           diameters and Possible Explanations. In KDD, pages
 [1] C. C. Aggarwal, J. Han, J. Wang, and P. S. Yu. A                     177–187, 2005.
     Framework for Clustering Evolving Data Streams. In              [21] G. Malewicz, M. H. Austern, A. J. C. Bik, J. C.
     VLDB, pages 81–92, 2003.                                             Dehnert, I. Horn, N. Leiser, and G. Czajkowski.
 [2] Apache Hama. http://hama.apache.org.                                 Pregel: A System for Large-Scale Graph Processing.
 [3] B. Bahmani, R. Kumar, M. Mahdian, and E. Upfal.                      In SIGMOD, pages 135–146, 2010.
     PageRank on an Evolving Graph. In KDD, pages                    [22] Neo4j The Graph Database. http://neo4j.org/.
     24–32, 2012.                                                    [23] C. Ren, E. Lo, B. Kao, X. Zhu, and R. Cheng. On
 [4] K. S. Beyer, P. J. Haas, B. Reinwald, Y. Sismanis, and               Querying Historical Evolving Graph Sequences.
     R. Gemulla. On Synopses for Distinct-Value                           PVLDB, 4(11):726–737, 2011.
     Estimation Under Multiset Operations. In SIGMOD,                [24] S. Salihoglu and J. Widom. GPS: A Graph Processing
     pages 199–210, 2007.                                                 System. In SSDBM, 2013.
 [5] Cassovary. Open Sourced from Twitter                            [25] K. Schloegel, G. Karypis, and V. Kumar. Graph
     https://github.com/twitter/cassovary.                                Partitioning for High Performance Scientific
 [6] A. Chan, F. K. H. A. Dehne, and R. Taylor.                           Simulations. Technical Report TR 00-018, Computer
     CGMGRAPH/CGMLIB: Implementing and Testing                            Science and Engineering, U. of Minnesota, 2000.
     CGM Graph Algorithms on PC Clusters and Shared                  [26] Z. Shang and J. X. Yu. Catch the Wind: Graph
     Memory Machines. IJHPCA, 19(1):81–97, 2005.                          Workload Balancing on Cloud. In ICDE, pages
 [7] R. Chen, X. Weng, B. He, and M. Yang. Large Graph                    553–564, 2013.
     Processing in the Cloud. In SIGMOD, pages                       [27] S. R. Spillane, J. Birnbaum, D. Bokser, D. Kemp,
     1123–1126, 2010.                                                     A. Labouseur, P. W. Olsen Jr., J. Vijayan, and J.-H.
 [8] Y. Chen, R. H. Katz, and J. Kubiatowicz. Dynamic                     Hwang. A Demonstration of the G* Graph Database
     Replica Placement for Scalable Content Delivery. In                  System. In ICDE, pages 1356–1359, 2013.
     IPTPS, pages 306–318, 2002.                                     [28] M. Stonebraker et al. C-Store: A Column-oriented
 [9] J. Dean and S. Ghemawat. MapReduce: Simplified                       DBMS. In VLDB, pages 553–564, 2005.
     Data Processing on Large Clusters. In OSDI, pages               [29] Trinity.
     137–150, 2004.                                                       http://research.microsoft.com/en-us/projects/trinity/.



                                                                 6