MELOGRAPH: Multi-Engine WorkfLOw Graph Processing ∗ Camelia Elena Ciolac Chalmers University of Technology Gothenburg, Sweden camelia@chalmers.se ABSTRACT to better reflect the application data types, [7] highlighted This paper introduces MELOGRAPH, a new system that the necessity of multiparadigm programming with Big data. exposes in the front-end a domain specific language(DSL) At that time, in 2010, this collocation defined the lack of an for graph processing tasks and in the back-end identifies, unified query language among NoSQL datastores and the ranks and generates source code for the top-N ranked en- need develop an integration layer using the programming gines. This approach lets the specialized MELOGRAPH be languages (e.g. Java, Python) in which these datastores part of a more general multi-engine workflow optimizer. The provided query APIs. candidate execution engines are chosen from the contempo- This research topic remained an open challenge and few raneous Big Data ecosystem: graph databases (e.g. Neo4j, robust solutions have been provided to this date. From TitanDB, OrientDB, Sparksee/DEX) and robust graph pro- the industry, we cite Oracle Big Data SQL [13] with its cessing frameworks with Java API or packaged libraries of ”Query Franchising” strategy of unifying queries over Or- algorithms (e.g. Giraph, Okapi, Flink Gelly, Hama, Grem- acle databases, Hadoop and NoSQL datastores. From the lin). As MELOGRAPH is work in progress, our current open source community, we highlight the efforts of the Cas- paper stresses upon the state of the art in this field, pro- cading Lingual project [1] which uses Apache Calcite (for- vides a general architecture and some early implementation merly named Optiq) to support SQL over a variety of data insights. providers, by pushing as much as possible of the query pro- cessing to the datastores that manage the data. In the sci- entific community, the most recent and remarkable develop- Keywords ment in this field is the BigDAWG polystore[4] [6], promot- Big Data ecosystem; multi-engine workflow; graph process- ing the ”islands of information”, each with its query language ing tasks that finally maps to the underlying storage engines’ native language through ”shims” acting as translators. 1. STATE OF THE ART Perhaps even more challenging than with storing and query- ing Big Data, developing complex analytics workflows needs The multitude of frameworks and datastores in the Big choosing among the plethora of candidate frameworks. Sev- Data ecosystem, with their different data models, libraries eral Big Data architecture patterns were promoted (e.g. [9]), of implemented algorithms, available connectors and perfor- in which scripting or a workflow manager orchestrates tasks mance profiles, make it challenging to select the right tools on pre-established engines. Apache Oozie, Azkaban, Luigi when building a Big Data storage and processing architec- are the most popular workflow managers in Hadoop, yet af- ture. Instead, one can benefit from the open-source licenses ter inspecting their set of supported engines we conclude and community editions to set up a polyvalent architecture, that they were rather designed for ETL pipelines, not com- with polyglot persistence and multiple processing engines. plex analytics workflows. Let us underline that out-of-the- On the data management side, with the emergence of box they don’t support scheduling on specialized graph pro- NoSQL datastores that usually complement relational data- cessing frameworks. bases in the enterprise data architecture, there has been a A more mature approach to tackle the complexity of the continuous interest for polyglot persistence (also referred to problem is to enhance a workflow manager with some ”in- as ”multiparadigm data storage” [7]). Besides explaining telligence”, with the ability to decide between several frame- this data strategy as a specialization in data representation works and to generate all necessary ETLs between the cho- ∗Big Data @ Chalmers, ICT Area of Advance sen execution engines. Therefore nowadays an increasing focus is put on designing multi-engine workflows [17], ad- dressing the various facets of this problem: process mod- elling and detail abstracting of execution, data formats, data sources [11]; execution engine selection modelled as a global optimization problem [12]; dynamic scheduling and resource management [3]. c 2016, Copyright is with the authors. Published in the Workshop Pro- A couple of publications present prototypes of systems ceedings of the EDBT/ICDT 2016 Joint Conference (March 15, 2016, Bor- deaux, France) on CEUR-WS.org (ISSN 1613-0073). Distribution of this that address all these dimensions: QoX [14], IReS [3], Mus- paper is permitted under the terms of the Creative Commons license CC- keteer [8]. Common to all of them is the decoupling of by-nc-nd 4.0 the front-end from the back-end engine, yet each one has task, a label propagation (or other epidemics-based iden- its specific approach; for example QoX exposes an XML- tification of network structure) and finally aggregations to based ”proprietary flow metadata language” (xLM) at the compute local clustering coefficients. front-end and in the back-end uses a library of operations Secondly, a use case from the entertainment industry, where [14], while Musketeer exposes a SQL-like query language one can use YouTube Data API to extract the network of and a ”Gather-Apply-Scatter style” domain specific language videos (retaining video’s unique identifier, metadata about (DSL) for graph processing in the front-end and generates its channel/user, video title, description) and their related- code based on code templates in the back-end [8]. Also, to relationships (a search for relatedToVideoId). Once the all three aforementioned systems use cost-based optimiza- graph is obtained, two processing tasks are launched se- tion, with specific search space and search algorithms for quentially: a pattern matching query over the graph and a single- or multi- engine optimal scheduling of the workflow. text semantic analysis over the resulting subgraphs video’s Case studies and demonstrations of some of the aforemen- description in order to extract entities (entity recognition tioned systems are anchored in the contemporaneous Big task). Data ecosystem: IReS with a practical demonstration over We end by presenting an example from [15]: a workflow { Hadoop, Hama, Spark, PostgreSQL with HDFS, HBase, from the marketing domain, where a PageRank graph analy- Elasticsearch} [3], Musketeer with an evaluation over {Spark, sis task identifies the most influential customers, a different Hadoop MapReduce, GraphChi, PowerGraph, GraphLINQ} task performs sentiment analysis of customers’ reviews to [8]. These proofs of concept did address graph processing discover the satisfied ones, and finally a join of the two re- tasks to a limited extent, without the intention to include sultsets is performed. a larger inventory of options currently available in the Big Data ecosystem. Except from these studies, few practical results of inte- 3. AN OVERVIEW OF MELOGRAPH grating graph analytics in larger workflows are reported (e.g. The main components that build up the MELOGRAPH graph processing and NLP [5]). In an extensive review of internals are: the DSL kneaded in a language workbench, the current state of the art of graph processing on Big Data the Candidate Solutions Assembler, the Ranker and the So- [2], the conclusion is that nowadays ”most of the large scale lution Packager. Now let us present some details for each graph processing platforms have the limitation that they are component. not able to connect their graph processing capabilities with Our domain specific language, MELOGRAPHy, is designed the vast ecosystem of other analytics systems”.They cite the and built to facilitate the definition of graph processing tasks industrial pioneering work of Teradata Aster 6.0 that ”have in a manner agnostic to the execution engine. In a first started to tackle this challenge by extending its analytics ca- stage, MELOGRAPHy supports functionality only based on pabilities with a multi-engine processing architecture”[2]. A the inventory of algorithms and queries available in the en- deeper investigation into this system [15] reveals that graphs gines. However, in a second stage we want to extend the ” can be derived, or projected, from many sources and types language to allow custom vertex-centric iterations using the of business data” and that a ”graph analytics program, or Bulk Synchronous Parallel Model. We give some insights graph function, is modeled as a polymorphic table operator into our DSL in section 4 of the paper. like Aster’s existing SQL-MR analytics functions”, making Based on the inventory of engines and on a set of patterns, it possible to be invoked even from SQL queries. the Candidate Solutions Assembler (CSA) builds the possi- Finally, let us highlight some differences between MEL- ble pipelines for solving the graph processing task. The CSA OGRAPH and the aforementioned systems. Compared to is concerned with the feasibility of the solution, not with its Aster’s SQL-GR, MELOGRAPH not only aims to support a optimality, therefore some of the candidate solutions it pro- variety of data sources as input, but also to support various duces may exhibit weak performance at runtime. execution engines for each task; however MELOGRAPH’s It is the role of the Ranker component to reward or to pe- DSL doesn’t make it embeddable in SQL queries. Compared nalize candidate solutions based on a set of heuristics. Ide- to the other systems we innovate in the following aspects: ally, a cost-based model should be employed by the Ranker 1) MELOGRAPH addresses specific graph processing tasks, to build its final ranking of the candidate solutions; we’ll the suitability of storing data in a graph database prior to address this aspect in a future study. From the cost per- processing, possible optimizations in cascaded graph tasks; spective, for the moment we can anticipate adopting the ap- 2) MELOGRAPH generates Java source code on-the-fly and proach to treat execution engines as black boxes; this idea does not treat operators as black boxes; 3) MELOGRAPH already won consensus among both Big data management does not need any syntax validation since this is already researchers (e.g. ” A ’black box’ approach makes a lot more enforced by the MPS editor (based on the DSL structure). sense when coping with disparate underlying engines” [16]) and multi-engine workflows researchers ( e.g. [3] present an optimization of the workflow scheduling which is ”orthog- 2. SOME MOTIVATING EXAMPLES onal to (and in fact enhanced by) any optimization effort Before diving in the discussion about MELOGRAPH, let within a single engine”). us first depict some examples of workflows containing graph Finally, the Solution Packager prepares all the necessary processing tasks. In a first example, from the citations net- shell scripts to launch in execution the various tools found work in Computer Science we firstly retain a subgraph hav- in each of the top-N ranked candidate solutions. Therefore, ing in nodes only publications cited at least k times, then the final result of MELOGRAPH consists in a number of N among them find communities based on papers’ co-citation folders in the local file system, each folder storing: and lastly evaluate how close is each community to a clique structure. In this case we have a sequence of three graph • the file containing the code source generated by the processing tasks in the workflow: a k-core graph processing language workbench at compilation time; • additional ETL scripts/programs to convert inputs to required format or to load data in a graph database; • a driver script that orchestrates all the tools in that solution. The next section presents how MELOGRAPH works, from receiving the input task to packaging a set of alternative so- lutions. 4. MELOGRAPH IN ACTION Before presenting the processing workflow of MELOGRAPH, in current version, let us first provide some insights into ba- sic MELOGRAPHy domain specific language. Regarding the DSL development environment we opted for the JetBrains MPS metaprogramming system[10] and made use of its model-to-model transformation approach to code generation. JetBrains MPS separates language devel- opment concerns into: • structure (types of nodes in the Abstract Syntax Tree Figure 2: Illustration of edge and vertex data source and their relationships), made of concepts organized concepts in MELOGRAPHy in hierarchies; • editor in charge of visualization of syntax from the the vertices data sets and hence include isolated vertices too. user’s perspective, optionally exhibiting some customized Besides eliminating unnecessary data loads, our approach behaviour; has a more subtle benefit: it easily accommodates polyglot • generator which ”defines the denotational semantics for persistence, where vertex information is stored in a separate the concepts in the language”[10]. datastore than the information from which we build edges. One final comment is that we take into consideration adding The root concept in MELOGRAPHy’s structure is the T ask. one more child to the T ask’s aforementioned structure, with For this concept, Figure 1 presents the three aforementioned the scope of empowering the user to suggest his/her pre- concerns. Let us briefly make some comments. ferred execution engine. Firstly, we highlight our choice to use the GraphAlgorithm Figure 2 depicts the structure of concepts that extend concept to abstract both queries (e.g. pattern matching) and EdgeDataSourceT ype and V ertexDataSourceT ype for the graph analytics (e.g. PageRank). specific case of table data source (e.g. in Hive; but also in a Secondly, in terms of data sources, the minimal informa- relational database in Oracle or MySQL). In a brief paren- tion needed to build and to analyze the graph is its struc- thesis we comment that given a relational database table ture, which is given by the edges together with their inci- as data source, MELOGRAPH will automatically include dent vertices identifiers. This is why dataSourceEdgeInf o Sqoop in all candidate solutions pipelines and thus the ac- is mandatory, whereas dataSourceV ertexInf o is optional. tual input to the graph algorithm will be the HDFS file We realize that this approach misses out the isolated nodes, obtained after loading the data. Thus, in the source code but the user still has the option to load both the edges and generation templates we use file input directly in such cases. Let us also remark that except from the case of graph databases (where this information is native), for the rest of the data sources it is mandatory to specify the means of accessing edges’ endpoints. In the illustrated case of a table data source locators are column names, similarly in the case of a CSV file locators are field indices and in the case of HBase locators are fully qualified columns names from some column family. We advance now to the MELOGRAPH workflow. As shown in Figure 1, the use of mapping configurations makes it possible to generate code for multiple engines at the same time according to a set of Java code templates developed by us. Details of this architecture are given in Figure 3. Along with engines-specific code, we generate the class M ELOGRAP Hclass that uses the functionality of all MEL- OGRAPH components discussed in the overview section of this paper. However, the ranking of the solutions can be obtained only when this Java program is run, consequently some of the heuristics need to be implemented in the model- to-model transformers, too. Figure 1: Insights into MELOGRAPHy DSL With the data sources and the algorithm specified, MEL- [3] K. Doka, N. Papailiou, D. Tsoumakos, C. Mantas, and N. Koziris. Ires: Intelligent, multi-engine resource scheduler for big data analytics workflows. In Proceedings of the 2015 ACM SIGMOD, SIGMOD ’15, pages 1451–1456, New York, NY, USA, 2015. ACM. [4] J. Duggan, A. J. Elmore, M. Stonebraker, M. Balazinska, B. Howe, J. Kepner, S. Madden, D. Maier, T. Mattson, and S. Zdonik. The bigdawg polystore system. SIGMOD Rec., 44(2):11–16, Aug. 2015. [5] D. Ediger, S. Appling, E. Briscoe, R. McColl, and J. Poovey. Real-time streaming intelligence: Integrating graph and nlp analytics. In Proceedings of IEEE HPEC, 2014, pages 1–6, Sept 2014. [6] A. Elmore, J. Duggan, M. Stonebraker, M. Balazinska, U. Cetintemel, V. Gadepally, J. Heer, B. Howe, J. Kepner, T. Kraska, S. Madden, D. Maier, T. Mattson, S. Papadopoulos, J. Parkhurst, N. Tatbul, M. Vartak, and S. Zdonik. A demonstration of the bigdawg polystore system. Proceedings of the PVLDB Endow., 8(12), 2015. [7] D. Ghosh. Multiparadigm data storage for enterprise applications. IEEE Softw., 27(5):57–60, Sept. 2010. [8] I. Gog, M. Schwarzkopf, N. Crooks, M. P. Grosvenor, A. Clement, and S. Hand. Musketeer: All for one, one for all in data processing systems. In Proceedings of Figure 3: MELOGRAPH generating multi-engine the EuroSys ’15, pages 2:1–2:16, New York, NY, USA, source code for a Task at runtime 2015. ACM. [9] M. Grover, T. Malaska, J. Seidman, and G. Shapira. OGRAPH’s Candidate Solutions Assembler will emit all fea- Hadoop Application Architectures. O’Reilly, Beijing, sible pipelines to accomplish the task. This involves assem- 2015. bling tools based on predefined patterns. Except from the [10] JetBrains. Mps user’s guide, data movement scripts that MELOGRAPH will eventually https://confluence.jetbrains.com/display/mpsd32/mps+user generate, we need to embed these patterns in the model- [11] V. Kantere and M. Filatov. Modelling processes of big to-model transformer for each individual engine too. This data analytics. In J. e. a. Wang, editor, WISE (1), approach is required by the fact that all source files are gen- volume 9418 of Lecture Notes in Computer Science, erated at once and need to be consistent among them. MEL- pages 309–322. Springer, 2015. OGRAPH’s Ranker will then rank the solutions and output [12] G. Kougka, A. Gounaris, and K. Tsichlas. Practical a list of the top-N solutions. algorithms for execution engine selection in data flows. Future Generation Computer Systems, 5. CONCLUSIONS 45(Complete):133–148, 2015. The current state of the art in multi-engine processing [13] ORACLE. Unified query for big data management workflows on Big Data displays increasing interest in tack- systems integrating big data systems with enterprise ling the practical challenges raised by the contemporaneous data warehouses. Technical report, ORACLE, Big Data ecosystem. We join this emerging research move- ORACLE, Jan. 2015. ment and present in this paper the first bricks that we’ve [14] A. Simitsis, K. Wilkinson, M. Castellanos, and put in developing MELOGRAPH. U. Dayal. Optimizing analytic data flows for multiple MELOGRAPH is specialized in graph processing tasks execution engines. In Proceedings of the 2012 ACM which are generally part of ampler workflows; however, the SIGMOD, pages 829–840, New York, NY, USA, 2012. whole workflow needs to be developed in JetBrains MPS, ACM. too. In the design of MELOGRAPH we address hetero- [15] D. Simmen, K. Schnaitter, J. Davis, Y. He, geneity at all levels, from the data sources, execution engines S. Lohariwala, A. Mysore, V. Shenoi, M. Tan, and and runtime computing environment (through the heuristics Y. Xiao. Large-scale graph analytics in aster 6: used in ranking candidate solutions). Bringing context to big data discovery. Proc. VLDB Endow., 7(13):1405–1416, Aug. 2014. 6. REFERENCES [16] M. Stonebraker. The case for polystores, [1] Cascading lingual, http://wp.sigmod.org/?p=1629, July 2015. http://www.cascading.org/projects/lingual/. [17] D. Tsoumakos and C. Mantas. The case for [2] F. Bajaber, S. Sakr, O. Batarfi, A. Altalhi, multi-engine data analytics. In D. e. a. an Mey, editor, R. Elshawi, and A. Barnawi. Big data processing Euro-Par 2013: Parallel Processing Workshops, systems: State-of-the-art and open challenges. In volume 8374 of Lecture Notes in Computer Science, Proceedings of the ICCC 2015, pages 1–8, April 2015. pages 406–415. Springer Berlin Heidelberg, 2014.