=Paper=
{{Paper
|id=None
|storemode=property
|title=Scalable and Robust Management of Dynamic Graph Data
|pdfUrl=https://ceur-ws.org/Vol-1018/paper7.pdf
|volume=Vol-1018
|dblpUrl=https://dblp.org/rec/conf/vldb/LabouseurOH13
}}
==Scalable and Robust Management of Dynamic Graph Data==
Scalable and Robust Management of Dynamic Graph Data∗ Alan G. Labouseur, Paul W. Olsen Jr., and Jeong-Hyon Hwang {alan, polsen, jhh}@cs.albany.edu Department of Computer Science, University at Albany – State University of New York, USA ABSTRACT the open source Neo4j [22], and others [2, 5, 6, 7, 12, 14]. Most real-world networks evolve over time. This evolution They, however, lack support for efficiently managing large can be modeled as a series of graphs that represent a net- graph snapshots. Our G* system [13, 27] efficiently stores work at different points in time. Our G* system enables and queries graph snapshots on multiple worker servers by efficient storage and querying of these graph snapshots by taking advantage of the commonalities among snapshots. taking advantage of the commonalities among them. We DeltaGraph [16] achieves a similar goal. Our work is com- are extending G* for highly scalable and robust operation. plementary to DeltaGraph in that it focuses on new chal- This paper shows that the classic challenges of data distri- lenges in data distribution and robustness in the context of bution and replication are imbued with renewed significance continuously creating large graph snapshots. given continuously generated graph snapshots. Our data Single-graph systems typically distribute the entirety of distribution technique adjusts the set of worker servers for a single graph over all workers to maximize the benefits of storing each graph snapshot in a manner optimized for pop- parallelism. When there are multiple graph snapshots, how- ular queries. Our data replication approach maintains each ever, distributing each snapshot on all workers may slow snapshot replica on a different number of workers, making down query execution. In particular, if multiple snapshots available the most efficient replica configurations for differ- are usually queried together, it is more advantageous to ent types of queries. store each snapshot on fewer workers as long as the over- all queried data are balanced over all workers. In this way, the system can reduce network overhead (i.e., improve query 1. INTRODUCTION speed) while benefiting from high degrees of parallelism. We Real-world networks, including social networks and the present a technique that automatically adjusts the number Web, constantly evolve over time [3]. Periodic snapshots of of workers in a manner optimized for popular queries. such a network can be represented as graphs where vertices As implied above, there are vast differences in execution represent entities and edges represent relationships between time depending on the distribution configurations and the entities. These graph snapshots allow us to analyze the evo- number of snapshots queried together. Replication gives us, lution of a network over time by examining variations of in addition to enhanced system reliability, the opportunity certain features, such as the distribution of vertex degrees to utilize as many distribution configurations as there are and clustering coefficients [17], network density [20], the size replicas. G* constructs r replicas for each snapshot to toler- of each connected component [17, 18], the shortest distance ate up to r − 1 simultaneous worker failures. Our technique between pairs of vertices [20, 23], and the centrality or eccen- classifies queries into r categories and optimizes the distri- tricity of vertices [23]. Trends discovered by these analyses bution of each replica for one of the query categories. play a crucial role in sociopolitical science, marketing, se- In this paper, we make the following contributions: curity, transportation, epidemiology, and many other areas. • We define the problem of distributing graph snapshots For example, when vertices represent people, credit cards, and present a solution that expedites queries by ad- and consumer goods, and edges represent ownership and justing the set of workers for storing each snapshot. purchasing relationships, disruptions in degree distribution, viewed over time, may indicate anomalous behavior, perhaps • We provide a technique for adaptively determining even fraud. replica placement to improve system performance and Several single-graph systems are available today: Google’s reliability. Pregel [21], Microsoft’s Trinity [29], Stanford’s GPS [24], • We present preliminary evaluation results that show ∗This work is supported by NSF CAREER Award the effectiveness of the above techniques. IIS-1149372. • We discuss our research plans to complete the con- struction of a highly scalable and reliable system for managing large graph snapshots. The remainder of the paper is organized as follows: Sec- tion 2 presents the research context and provides formal definitions of the problems studied in the paper. Sections 3 and 4 describe our new techniques for distributing and repli- cating graph snapshots. Section 5 presents our preliminary evaluation results. Section 6 discusses related work. Sec- tion 7 concludes this paper. 1 (3/4, {G1}), (4/5, {G2}), (5/6, {G3}) avg average G1 c CGI (1, 2, {G1, G2, G3}), (1, 1, {G1, G2, G3}), (2, 0, {G1}), (3, 1, {G2} (4, 2, {G3}) G1 G2 G3 union d (1, 2, {G1,G2,G3}) {G1} {G2,+G3} {G1,+G2} {G3} count_sum count, sum (1, 1, {G1,G2,G3}) count_sum (2, 0, {G1}), (3, 1, {G2}), (4, 2, {G3})) count_sum G2 c e c ce d df (a, 2, {G1,G2,G3}) (b, 1, {G1,G2,G3}) (c, 0, {G1}), (d, 0, {G1, G2}), (c, 1, {G2, G3}), ... d degree degree degree c1 c2 e1 d1 d2 f1 (a, ..., {G1,G2,G3}) vertex (b, ..., {G1,G2,G3}) vertex (c, ..., {G1}), (d, ..., {G1, G2}), (c, ...,{G2, G3}), ... vertex G3 c e c e e d f f a c c c e d f disk b d d d f b {G1,G2,G3} {G1,G2,G3} {G1} {G1,G2} {G2,G3} {G3} Figure 2: Storage of Snapshots G1 , G2 , G3 and CGI α β γ Figure 1: Parallel Calculation of Average Degree PageRank Query Shared-Nothing Shared-Everything One snapshot 285 seconds 22 seconds All snapshots 285 seconds 2,205 seconds 2. BACKGROUND Table 1: Impact of Graph Snapshot Configuration 2.1 Summary of G* Figure 1 shows how the average degree calculation takes G* is a distributed system for managing large graph snap- place in parallel over three workers. The vertex and degree shots that represent an evolving network at different points operators in Figure 1 compute the degree of each vertex in time [13, 27]. As Figure 1 shows, these graph snapshots only once while associating the result with all of the rele- (e.g., G1 , G2 , and G3 ) are distributed over workers (e.g., α, vant graph snapshots (e.g., the degree of vertex a is shared β, and γ) that both store and query the graph data assigned across G1 , G2 , and G3 ). In the example, the count sum oper- to them. The master of the system (not shown in Figure 1) ators aggregate the degree data, the union operator merges transforms each submitted query into a network of opera- these data, and the avg operator produces the final result. tors that process graph data on workers in a parallel fashion. Details of our graph processing operators and programming Our previous work on G* can be summarized as follows: primitives for easy implementation of custom operators are Graph Storage. In G*, each worker efficiently stores its provided in our earlier papers [13, 27]. data by taking advantage of commonalities among graph snapshots. Figure 2 shows how worker γ from Figure 1 in- 2.2 Problem Statements crementally stores its portion of snapshots G1 , G2 , and G3 Our previous work [13, 27] focused on efficiently storing on disk. The worker stores c1 and d1 , the first versions of and querying graph snapshots. We now take up the chal- c and d, when it stores G1 . When vertex c obtains a new lenge of doing so in a highly scalable and robust manner. edge to e in G2 , the worker stores c2 , the second version of c, which shares commonalities with the previous version and 2.2.1 Multiple Snapshot Distribution also contains a new edge to e. When vertex d obtains a new Accelerating computation by distributing data over mul- edge to f in G3 , the worker stores d2 , the second version of d tiple servers has been a popular approach in parallel which contains a new edge to f . All of these vertex versions databases [10] and distributed systems [9]. Furthermore, are stored on disk only once regardless of how many graph techniques for partitioning graphs to facilitate parallel com- snapshots they belong to. putation have also been developed [15, 24, 25, 26]. How- To track all of these vertex versions, each worker main- ever, distributing large graph snapshots over multiple work- tains a Compact Graph Index (CGI) that maps each com- ers raises new challenges. In particular, it is not desirable bination of vertex ID and graph ID onto the disk location to use traditional graph partitioning techniques which con- that stores the corresponding vertex version. For each vertex sider only one graph at a time and incur high overhead version (e.g., c2 ), the CGI stores only one (vertex ID, disk given a large number of vertices and edges. Solutions to location) pair in a collection for the combination of snap- this problem must (re)distribute with low overhead graph shots that contain that vertex version (e.g., {G2 , G3 }). In snapshots that are continuously generated and take advan- this manner, the CGI handles only vertex IDs and disk lo- tage of the property that query execution time depends on cations while all of the vertex and edge attributes are stored both the number of snapshots queried and the distribution on disk. Therefore, the CGI can be kept fully or mostly in of the graph snapshots as illustrated below. memory, enabling fast lookups and updates. To prevent the Example. Consider a scenario where each of 100 similarly- CGI from becoming overburdened by managing too many sized graph snapshots contains approximately 1 million ver- snapshot combinations, each worker automatically groups tices and 100 million edges. Assume also that the system snapshots and then separately indexes each group of snap- consists of one master and 100 workers. Table 1 compares shots [13, 27]. two snapshot distribution configurations: Shared-Nothing, Query Processing. Like traditional database systems, G* where each of the 100 snapshots is stored on one dis- supports sophisticated queries using a dataflow approach tinct worker, and Shared-Everything, where each snapshot where operators process data in parallel. To quickly process is evenly distributed over all of the 100 workers. For each of queries on multiple graph snapshots, however, G* supports these configurations, two types of queries for computing the special operators that share computations across snapshots. PageRank of each vertex are executed: Query One Snap- 2 shot, and Query All Snapshots. The explanations below are G1,1 G1,2 G3,1 G3,2 based on our evaluation results (see Section 5 for details). In the case of Shared-Nothing, querying one snapshot us- G2,1 G3,1 G2,1 G1,1 ing only one worker takes 285 seconds (205 seconds to con- G2,2 G3,2 G2,2 G1,2 struct the snapshot from disk and 80 seconds to run 20 iter- ations of PageRank). Querying all snapshots on all workers ! " ! " in parallel takes the same amount of time. When the Shared- Everything configuration is used, querying one snapshot on (a) Before Exchange (b) After Exchange all workers takes approximately 22 seconds, mainly due to Figure 3: Exchanging Segments. If snapshots G1 and network communications for the edges that cross worker G2 are queried together frequently, workers α and β in Fig- boundaries (the disk I/O and CPU costs correspond to only ure 3(a) can better balance the workload and reduce the 205/100 seconds and 80/100 seconds, respectively, due to network overhead by swapping G1,1 and G3,1 . the distribution of the snapshot over 100 workers). In this configuration, querying 100 snapshots takes 2,205 seconds as or all of the snapshots, find a replica distribution {Vi,j,w : the PageRank of each vertex varies across graph snapshots, i = 1, 2, · · P · ∧ j = 1, 2, · · · , r ∧ w = 1, 2, · · · , n} that thereby causing 100 times more message transmissions than minimizes q∈Q time(q, {Vi,j,w }) where Vi,j,w denotes the the previous case. This example shows the benefits of dif- set of vertices that are from the jth replica Gi,j (Vi,j , Ei,j ) ferent snapshot distribution approaches for different types of snapshot Gi (Vi , Ei ) and that are assigned to worker w, of queries (e.g., Shared-Nothing for queries on all snapshots and time(q, {Vi,j,w }) denotes the execution time of query q and Shared-Everything for queries on one snapshot). on the distributed snapshot replicas {Vi,j,w } satisfying (1) Formal Definition. Our ultimate goal is to keep track ∪nw=1 Vi,j,w = Vi,j = Vi for j = 1, 2, · · · , r, (i.e., the parts of of the popularity of graph snapshots and to optimize the a snapshot replica on all workers cover the original replica), storage/distribution of unpopular snapshots for space effi- (2) Vi,j,w ∩ Vi,j,w0 = ∅ if w 6= w0 (i.e., workers are assigned ciency (Section 2.1) and popular snapshots for query speed. disjoint parts of a snapshot replica), and (3) Vi,j,w ∩Vi,j 0 ,w = In this paper, we focus on the problem of distributing pop- ∅ if j 6= j 0 (i.e., no worker w contains multiple copies of ular snapshots over workers in a manner that minimizes the a vertex and its edges, which tolerates r − 1 simultaneous execution time of queries on these snapshots. This problem worker failures). can be formally defined as follows: Section 4 presents our solution to the above problem. Problem 1. (Snapshot Distribution) Given a series of graph snapshots {Gi (Vi , Ei ) : i = 1, 2, · · · }, n workers, 3. GRAPH SNAPSHOT DISTRIBUTION and a set of queries Q on some or all of the snapshots, find As mentioned in Section 2.2.1, G* needs to store each P {Vi,w : i = 1, 2, · · · ∧ w = 1, 2, · · · , n} that a distribution graph snapshot on an appropriate number of workers while minimizes q∈Q time(q, {Vi,w }) where Vi,w denotes the set balancing the utilization of network and CPU resources. of vertices that are from snapshot Gi (Vi , Ei ) and that are In contrast to traditional methods for partitioning a static assigned to worker w, and time(q, {Vi,w }) represents the ex- graph [15, 25], G* must determine the location of each ver- ecution time of query q ∈ Q on the distributed snapshots tex and its edges on the fly in response to a continuous influx {Vi,w } satisfying (1) ∪nw=1 Vi,w = Vi (i.e., the parts of a of data from external sources. snapshot on all workers cover the original snapshot) and (2) Our dynamic data distribution approach meets the above Vi,w ∩Vi,w0 = ∅ if w 6= w0 (i.e., workers are assigned disjoint requirements. In this approach, each G* worker partitions parts of a snapshot). its graph data into segments with a certain maximum size Our solution to the above problem is presented in Section 3. (e.g., 10GB) so that it can control its load by migrating some segments to other workers (Section 3.1). Our ap- 2.2.2 Snapshot Replication proach continuously routes incoming messages for updating There have been various techniques for replicating data to vertices and edges to appropriate workers with low latency improve availability and access speed [8, 11, 28]. A central (Section 3.2). When a segment becomes full, G* splits that data replication challenge in G* is to distribute each replica segment into two that are similar in size while maintaining of a snapshot over a possibly different number of workers data locality by keeping data accessed together within the to maximize both performance and availability. For each same segment (Section 3.3). It does all of the above while query, the most beneficial replica also needs to be found supporting G*’s graph processing operators (Section 3.4). according to the characteristics of the query (e.g., the num- ber of snapshots queried). If two replicas of a graph snap- 3.1 Load Balancing shot are distributed using the Shared-Nothing and Shared- In G*, each worker periodically communicates with a ran- Everything approaches, queries on a single snapshot should domly chosen worker to balance graph data. Our key prin- use the Shared-Everything replica configuration rather than ciples in load balancing are to (1) maximize the benefits of the other. In practice, however, each query can access an parallelism by uniformly distributing data that are queried arbitrary number of graph snapshots (not necessarily one or together and (2) minimize network overhead by co-locating all), thereby complicating the above challenges. The prob- data from the same snapshot. Consider Figure 3(a) where lem of replicating graph snapshots can be defined as follows: three snapshots are partitioned into a total of 6 similarly- sized segments. In this example, each of workers α and β Problem 2. (Snapshot Replication) Given a series are assigned a segment from snapshot G1 , α is assigned two of graph snapshots {Gi (Vi , Ei ) : i = 1, 2, · · · }, the degree of segments from G2 , and β is assigned two segments from replication r, n workers, and a set of queries Q on some G3 . If snapshots G1 and G2 are frequently queried together 3 (see those shaded in Figure 3(a)), this snapshot distribu- segment and then moves a half of the data from the previ- tion leads to inefficient query execution due to imbalanced ous segment to the new segment. To minimize the number workload between the workers and network communications of edges that cross segment boundaries, we use a traditional for the edges between G1,1 and G1,2 . This problem can be graph partitioning method [15]. Whenever a segment is split remedied by exchanging G1,1 and G3,1 between the workers, as above, the worker also updates the (vertex ID, segment which results in a balanced distribution of the data queried ID) pairs for all of the vertices migrated to the new segment. together (i.e., G1 and G2 ) and localized processing of G1 on This update process incurs relatively low overhead since the β and G2 on α, respectively. index can usually be kept in memory as in the case of the Given a pair of workers, our technique estimates, for each CGI (Section 2.1). If a worker splits a segment which was segment, the benefit of migrating that segment to the other obtained from another worker, it sends the update infor- worker, and then performs the most beneficial migration. mation to the worker that originally created it in order to This process is repeated a maximum number of times or un- enable data forwarding as mentioned in Section 3.2. til the migration benefit falls below a predefined threshold. The benefit of migrating a segment is calculated by multi- 3.4 Supporting Graph Processing Operators plying the probability that the segment is queried with the G*’s graph processing operators, such as those for com- expected reduction in query time (i.e., the difference be- puting clustering coefficients, PageRank, or the shortest dis- tween expected query time before and after migration). tance between vertices, are usually instantiated on every For a set Si of segments on worker i and another set Sj of worker that stores relevant graph data [13]. These opera- segments P on worker j, the expected query time is computed tors may exchange messages to compute a value for each as q∈Qk p(q) · time(q, Si , Sj ) where Qk is a collection of k vertex (e.g., the current shortest distance from a source ver- popular query patterns, p(q) is the probability that query tex). If an operator needs to send a message to a vertex, the pattern q is executed, and time(q, Si , Sj ) denotes the esti- message is first sent to the worker whose ID corresponds to mated duration of q given segment placements Si and Sj . the hash value of the vertex ID. This worker then forwards Our technique obtains Qk (equivalently, k popular combi- the message to the worker that currently stores the vertex. nations of segments queried together) as follows: Sort seg- This forwarding mechanism is similar to that for handing ments from Si ∪ Sj in order of decreasing popularity. Ini- updates of vertices and edges (Section 3.2). tialize Qk (for storing k popular query patterns) with the first segment. Then, for each of the remaining segments, 4. GRAPH SNAPSHOT REPLICATION combine it with each element from Qk and insert the result G* masks up to r − 1 simultaneous worker failures by cre- back into Qk . Whenever |Qk | > k, remove its least popular ating r copies of each graph data segment. As discussed in element. We estimate the popularity of each combination Sections 2.2 and 3, the optimal distribution of each graph of segments by consolidating the counting synopses [4] for snapshot over workers may vary with the number of snap- those segments. Whenever a query accesses a segment, the shots frequently queried together. Based on this observa- associated synopsis is updated using the ID of the query. tion, we developed a new data replication technique that We compute time(q, Si , Sj ) as max(c(q, Si ), c(q, Sj )) + speeds up queries by configuring the storage of replicas to c0 (q, Si , Sj ) where c(q, Si ) is the estimated duration of pro- benefit different categories of queries. This approach uses cessing the segments from Si for query q, and c0 (q, Si , Sj ) an online clustering algorithm [1] to classify queries into r represents the estimated time for exchanging messages be- categories based on the number of graphs that they access. tween workers i and j for query q. It then assigns the j-th replica of each data segment to a worker in a manner optimized for the j-th query category. 3.2 Updates of Vertices and Edges The master and workers support this approach as follows: Each new vertex (or any edge that emanates from the ver- tex) is first routed to a worker chosen according to the hash 4.1 Updates of Vertices and Edges value of the vertex ID. That worker assigns such a vertex to Updates of vertices and edges are handled as described in one of its data segments while saving the (vertex ID, segment Section 3.2 except that they are routed to r data segment ID) pair in an index similar to the CGI (Section 2.1). If a replicas on different workers. For this reason, each worker worker receives an edge that emanates from an existing ver- keeps a mapping that associates each segment ID with the tex v, it assigns that edge to the segment that contains v. If r workers that store a replica of the segment. Our approach a worker w has created a segment S and then migrated it to protects this mapping on worker w by replicating it on work- another worker w0 for load balancing reasons (Section 3.1), ers (w+1)%n, (w+2)%n, · · · , (w+r−1)%n where n denotes worker w forwards the data bound to S to w0 . To support the number of workers. If a worker fails, the master assigns such data forwarding, each worker keeps track of the worker another worker to take over. location of each data segment that it has created before. Updates of vertices and edges, including changes in their at- 4.2 Splitting a Full Segment tribute values, are handled as in the case of edge additions. The replicas of a data segment are split in the same way This assignment of graph data to workers is scalable because due to the use of a deterministic partition method. For each it distributes the overhead of managing data over workers. vertex migrated from one data segment to another, the r It also proceeds in a parallel, pipelined fashion without any workers that keep track of that vertex update their (vertex blocking operations. ID, segment ID) pairs accordingly. 3.3 Splitting a Full Segment 4.3 Query-Aware Replica Selection If the size of a data segment reaches the maximum (e.g., For each query, the master identifies the worker locations 10GB), the worker that manages the segment creates a new of the data segment replicas to process. To this end, the 4 Message passing (12-bytes/message) 1M messages/sec SSSP Query All Workers Subset of Workers Disk I/O bandwidth 200 Mbytes/sec One snapshot 8.2 seconds 19.2 seconds Snapshot construction in memory 200 seconds All snapshots 80.5 seconds 53.2 seconds PageRank iteration per snapshot 4 seconds Table 4: Impact of Graph Data Distribution Table 2: Speed and Bandwidth Observations shows that the relative benefit of data distribution (i.e., the # Cores 1 2 4 8 16 24 48 speedup relative to the number of workers) tends to decrease Speedup 1.0 1.9 3.7 5.9 9.7 12.5 14.7 with more workers. This is mainly due to increased network traffic, which shows the importance of balancing CPU and Table 3: Actual Speedup Result network resources in the context of continuously creating large graph snapshots. master keeps track of the mapping between graph snapshots The effectiveness of two different distributions is demon- and the data segments that constitute them. The master strated in Table 4. If most queries access only the largest also maintains the mapping between data segment replicas snapshot, then it is beneficial to distribute that snapshot and the workers that store them. Using these mappings, the over all workers to maximize query speed. On the other master selects one replica for each data segment such that hand, if all of the snapshots are queried together, our ap- the overall processing load is uniformly distributed over a proach stores each graph on a smaller subset of workers to large number of workers and the expected network overhead reduce network overhead. In this case, all of the workers is low. Next, as Figure 1 shows, the master instantiates can still be used in parallel since the entire graph data is operators on these workers and starts executing the query. distributed over all workers. The benefits of distribution configurations are less pronounced in Table 4 than Table 1 4.4 Load Balancing due to a smaller number of message transmissions and fewer Each worker balances its graph data as explained in Sec- workers. Table 4 also demonstrates the benefit of G* in tion 3. The only difference is that whenever a query of cat- executing queries on multiple snapshots. In particular, the egory j accesses a replica of a data segment, the counting time for processing 500 snapshots (e.g., 80.5 seconds) is only synopsis of the j-th replica of the data segment is updated up to 10 times longer than that for processing the largest using the ID of the query (Section 3.1). In this way, the j-th snapshot (e.g., 8.2 seconds) since the computations on the replica of each segment is assigned to a worker in a manner largest snapshot are shared across smaller snapshots. optimized for query category j. 6. RELATED WORK 5. PRELIMINARY EVALUATION In this section, we briefly summarize related research, fo- This section presents our preliminary results obtained by cusing on previous graph systems, data distribution, and running G* on a six-node, 48-core cluster. In this cluster, data replication. each machine has two Quad-Core Xeon E5430 2.67 GHz CPUs, 16GB RAM, and a 2TB hard drive. We plan to Previous Graph Systems. In contrast to systems which extend these experiments with more queries on larger data process one graph at a time [2, 5, 6, 7, 12, 14, 21, 22, 24, sets in a bigger cluster (Section 7). 29], G* efficiently executes sophisticated queries on multi- To construct a realistic example in Section 2.2.1, we mea- ple graph snapshots. G*’s benefits over previous systems are sured the overhead of key operations summarized in Table 2. experimentally demonstrated in our prior work [13]. Delt- In our evaluation, a worker was able to transmit up to 1 mil- aGraph [16] and GraphChi [19] are promising systems for lion messages to other workers within a second, although a dynamic graphs but do not directly address the data distri- 1Gbps connection may enable 10 million transmissions of bution/replication issues considered in this paper. 12-byte messages in theory. The reason behind this result Data Distribution. Traditional graph partitioning tech- is that there is inherent overhead when writing and creating niques split a static graph into subgraphs in a manner that message objects to and from TCP sockets in Java. Further- minimizes the number of crossing edges [15, 25]. There are more, reading approximately 1Gbytes of data from disk to also recent graph repartitioning schemes that observe com- construct a graph snapshot took 5 seconds. However, con- munication patterns and then move vertices to reduce net- structing a snapshot in memory by creating 100 million edge work overhead [24, 26]. In contrast to them, our technique objects and registering them in an internal data structure dynamically adjusts the number of workers that store each took approximately 200 seconds. graph snapshot according to the real-time influx of graph In the next set of experiments, we created a series of 500 data and popular types of queries (Section 3). graph snapshots using a binary tree generator. Each snap- shot in the series was constructed by first cloning the pre- Data Replication. There has been extensive work on data vious snapshot and then inserting 20,000 additional vertices replication that focused on improving data availability and and edges to the new graph. Therefore, the last graph in performance [8, 11, 28]. Researchers developed techniques the series contained 10 million vertices. We ran a query for ensuring replica consistency [11] and finding most advan- that computes, for each graph, the distribution of the short- tageous replica placement [8]. Stonebraker et al. proposed est distances from the root to all other vertices. Table 3 an approach that stores each database replica differently, shows, for the shortest distance query, the speedup achieved optimized for a different query type [28]. While our repli- by distributing the snapshots over more workers. The high- cation approach has some similarity in terms of high-level est speedup was achieved with 48 workers. This table also ideas, it is substantially different in that it distributes each 5 graph snapshot over a different number of workers to speed [10] D. DeWitt, R. Gerber, G. Graefe, M. Heytens, up different types of queries. K. Kumar, and M. Muralikrishna. Gamma - A High Performance Dataflow Database Machine. In VLDB, 7. CONCLUSIONS AND FUTURE WORK pages 228–237, 1986. We presented G*, a scalable and robust system for storing [11] J. Gray, P. Helland, P. E. O’Neil, and D. Shasha. The and querying large graph snapshots. G* tackles new data Dangers of Replication and a Solution. In SIGMOD, distribution and replication challenges that arise in the con- pages 173–182, 1996. text of continuously creating large graph snapshots. Our [12] D. Gregor and A. Lumsdaine. The Parallel BGL: A data distribution technique efficiently stores graph data on Generic Library for Distributed Graph Computations. the fly using multiple worker servers in parallel. This tech- In POOSC, 2005. nique also gradually adjusts the number of workers that [13] J.-H. Hwang, J. Birnbaum, A. Labouseur, P. W. Olsen store each graph snapshot while balancing network and CPU Jr., S. R. Spillane, J. Vijayan, and W.-S. Han. G*: A overhead to maximize overall performance. Our data repli- System for Efficiently Managing Large Graphs. cation technique maintains each graph replica on a different Technical Report SUNYA-CS-12-04, CS Department, number of workers, making available the most efficient stor- University at Albany – SUNY, 2012. age configurations for various combinations of queries. [14] U. Kang, C. E. Tsourakakis, and C. Faloutsos. We are working on full implementations of the techniques PEGASUS: A Peta-Scale Graph Mining System. In presented in this paper to enable new experiments with ad- ICDM, pages 229–238, 2009. ditional queries on larger data sets. We will analyze these [15] G. Karypis and V. Kumar. Analysis of Multilevel techniques to classify their complexity. We plan to look Graph Partitioning. In SC, page 29, 1995. into the challenges of scheduling groups of queries, dealing [16] U. Khurana and A. Deshpande. Efficient Snapshot with varying degrees of parallelism, resource utilization, and Retrieval over Historical Graph Data. CoRR, user-generated performance preferences. We are exploring abs/1207.5777, 2012. failure recovery techniques for long-running queries while [17] G. Kossinets and D. J. Watts. Empirical Analysis of exposing the tradeoff between recovery speed and execution an Evolving Social Network. Science, 311(5757):88–90, time. We also want to study opportunities for more granu- 2006. lar splitting, merging, and exchanging of data at the vertex [18] R. Kumar, J. Novak, and A. Tomkins. Structure and and edge level rather than in large segments as discussed Evolution of Online Social Networks. In KDD, pages in this paper. We intend to seek opportunities for gains in 611–617, 2006. execution speed at the expense of storage space by segre- [19] A. Kyrola, G. Blelloch, and C. Guestrin. Graphchi: gating recent and popular “hot” data (which we could store large-scale graph computation on just a pc. In OSDI, in a less compressed manner) from less popular “cold” data pages 31–46, 2012. (which could be highly compressed). [20] J. Leskovec, J. M. Kleinberg, and C. Faloutsos. Graphs over Time: Densification Laws, Shrinking 8. REFERENCES diameters and Possible Explanations. In KDD, pages [1] C. C. Aggarwal, J. Han, J. Wang, and P. S. Yu. A 177–187, 2005. Framework for Clustering Evolving Data Streams. In [21] G. Malewicz, M. H. Austern, A. J. C. Bik, J. C. VLDB, pages 81–92, 2003. Dehnert, I. Horn, N. Leiser, and G. Czajkowski. [2] Apache Hama. http://hama.apache.org. Pregel: A System for Large-Scale Graph Processing. [3] B. Bahmani, R. Kumar, M. Mahdian, and E. Upfal. In SIGMOD, pages 135–146, 2010. PageRank on an Evolving Graph. In KDD, pages [22] Neo4j The Graph Database. http://neo4j.org/. 24–32, 2012. [23] C. Ren, E. Lo, B. Kao, X. Zhu, and R. Cheng. On [4] K. S. Beyer, P. J. Haas, B. Reinwald, Y. Sismanis, and Querying Historical Evolving Graph Sequences. R. Gemulla. On Synopses for Distinct-Value PVLDB, 4(11):726–737, 2011. Estimation Under Multiset Operations. In SIGMOD, [24] S. Salihoglu and J. Widom. GPS: A Graph Processing pages 199–210, 2007. System. In SSDBM, 2013. [5] Cassovary. Open Sourced from Twitter [25] K. Schloegel, G. Karypis, and V. Kumar. Graph https://github.com/twitter/cassovary. Partitioning for High Performance Scientific [6] A. Chan, F. K. H. A. Dehne, and R. Taylor. Simulations. Technical Report TR 00-018, Computer CGMGRAPH/CGMLIB: Implementing and Testing Science and Engineering, U. of Minnesota, 2000. CGM Graph Algorithms on PC Clusters and Shared [26] Z. Shang and J. X. Yu. Catch the Wind: Graph Memory Machines. IJHPCA, 19(1):81–97, 2005. Workload Balancing on Cloud. In ICDE, pages [7] R. Chen, X. Weng, B. He, and M. Yang. Large Graph 553–564, 2013. Processing in the Cloud. In SIGMOD, pages [27] S. R. Spillane, J. Birnbaum, D. Bokser, D. Kemp, 1123–1126, 2010. A. Labouseur, P. W. Olsen Jr., J. Vijayan, and J.-H. [8] Y. Chen, R. H. Katz, and J. Kubiatowicz. Dynamic Hwang. A Demonstration of the G* Graph Database Replica Placement for Scalable Content Delivery. In System. In ICDE, pages 1356–1359, 2013. IPTPS, pages 306–318, 2002. [28] M. Stonebraker et al. C-Store: A Column-oriented [9] J. Dean and S. Ghemawat. MapReduce: Simplified DBMS. In VLDB, pages 553–564, 2005. Data Processing on Large Clusters. In OSDI, pages [29] Trinity. 137–150, 2004. http://research.microsoft.com/en-us/projects/trinity/. 6