Drove: Tracking Execution Results of Workflows on Large Data Sadeem Alsudais Supervised by Chen Li Department of Computer Science, UC Irvine, CA 92697, USA Abstract Data analytics using workflows is an iterative process, in which an analyst makes many iterations of changes, such as additions, deletions, and alterations of operators and their links. In many cases, the analyst wants to compare these workflow versions and their execution results to help in deciding the next iterations of changes. Moreover, the analyst needs to know which versions produced undesired results to avoid refining the workflow in those versions. To enable the analyst to get an overview of the workflow versions and their results, we introduce Drove, a framework that manages the end-to-end lifecycle of constructing, refining, and executing workflows on large data sets and provides a dashboard to monitor these execution results. In many cases, the result of an execution is the same as the result of a prior execution. Identifying such equivalence between the execution results of different workflow versions is important for two reasons. First, it can help us reduce the storage cost of the results by storing equivalent results only once. Second, stored results of early executions can be reused for future executions with the same results. Existing tools that track such executions are geared towards small-scale data and lack the means to reuse existing results in future executions. In Drove, we reason the semantic equivalence of the workflow versions to reduce the storage space and reuse the materialized results. Keywords workflow version control, workflow reproduciblity, semantic workflow equivalence verification 1. Introduction filter:label=0unnest:coordinate scatterplot wordcloud Data-processing workflows are extensively used by scan:state join:location analysts to extract and analyze data over large volumes. scan:tweet regex:"climate" Texera is an open source system we have been label:relative? filter:label=1 aggregate:state choropleth developing in the past years that provides a GUI-based interface for users to construct a workflow as a DAG of (a) Version a: initial construction. Duplicate Result Version Change operators, refine and fine-tune the workflow, execute it, and examine the final results [1]. The users may wordcloud filter:label=0 unnest:coordinate scatterplot perform multiple iterations of refinement, execution, scan:state filter:state=CA and examination before producing the final version scan:tweet regex:"climate" join:location of the workflow [2, 3]. A refinement of the workflow label:relative? filter:label=1 aggregate:state choropleth creates a new version. Tracking different versions of a (b) Version b: after adding a filter operator (highlighted workflow and its produced results is a growing area of in green). The operator highlighted in blue has the interest [4, 5, 6, 7, 8, 9]. Due to the iterative process in same results of the corresponding operator in version data analytics, one would be interested in looking at the a. past execution results to get answers for the following Duplicate Result Version Change questions. wordcloud filter:state=CA filter:label=0 unnest:coordinate scatterplot Q1. Which workflow versions generated these scan:state results? scan:tweet regex:"climate" join:location Q2. How did the differences between two versions label:relative? filter:label=1 aggregate:state choropleth affect their results? (c) Version c: after deleting the filter operator and adding it after the join operator. The two operators high- Motivation. Figure 1 illustrates an example of an anal- lighted in blue have the same results of the corre- ysis workflow that evolved into three versions. In the sponding operators in version a and version b. Proceedings of the VLDB 2022 PhD Workshop, September 5, 2022. Figure 1: Multiple versions of a workflow for tweet analysis Sydney, Australia. and their duplicate results of different executions. Envelope-Open salsudai@uci.edu (S. Alsudais) Orcid 0000-0003-3928-690X (S. Alsudais) © 2022 Copyright (C) 2022 for this paper by its authors. Use permitted under Creative Commons License Attribution 4.0 International (CC BY 4.0). example, a data analyst is interested in the tweets whose CEUR Workshop Proceedings http://ceur-ws.org ISSN 1613-0073 CEUR Workshop Proceedings (CEUR-WS.org) content contains the keyword climate . In the first ver- ure 1. Another large body of work is about checking sion shown in Figure 1a, she wants to look at the most the semantic equivalence of two SQL queries, such as discussed topics in a “wordcloud” visualization. She also UDP [13], Equitas [14], and Spes [15]. One may want uses an operator to further label the tweets as related to to solve our problem by treating a workflow as a SQL climate or not, using labels “related” and ”unrelated”. She query, possibly with UDF functions, then using these wants to visualize the “related” tweets as a choropleth solutions. Unfortunately, these solutions have certain map that aggregates the tweet count by each state. She restrictions on the type of supported operators, such as wants to visualize those “unrelated” tweets as a scatter- relational operators and a restrictive class of user-defined plot by their location. After running this version, she function (UDF) operators. They cannot support those op- notices that the most associated topic is wildfire from erators that are common in workflows, such as labeling the wordcloud operator. Since wildfires are common and unnest in the running example. in California, she decides to look at the spatial distribu- Our goal is to develop Drove to overcome these limita- tion of the tweets in this state. Hence, she refines the tions. In particular, it should find semantic equivalence of workflow to version b by adding a filter in the California workflows even if they have different structures, and sup- region, as shown in Figure 1b. After she runs this version, port a variety of operators including relational operators she notices that adding the region filter caused the choro- and other types such as UDFs. pleth map to highlight only California. So she decides to do the filter for the scatterplot result only, and generates edits Version Control version c in Figure 1c. Now, she wants to compare the Manager previous versions scatterplots of version a and version c to examine the submit job versions density of “unrelated” tweets in California versus the Execution Analyzer entire US. ? Manager metadata Find conditions This example shows the importance of providing a Segments results dashboard for managing the different workflow versions execute and their execution results. In this work, we seek to ? provide such a dashboard to guide the analytics tasks. Verify Execution Our solution. One way is to store each workflow version Equivalence DB Catalog Engine (Amber) and its execution results [2, 3]. Its main limitation is that there can be many versions and executions of a workflow, and these results can be large. Thus this approach can consume a lot of storage space [10]. For example, in Figure 2: Drove’s Tracking Executions Modules. one deployment of Texera, it recorded 2,039 executions from 91 different workflows in 75 days. Storing all these execution results required a lot of space. 2. Drove Overview To address this challenge, we want to develop a solu- tion that leverages the fact many of these results can be Figure 2 depicts an overview of Drove. A user formulates equivalent due to the iterative analytic process [3, 6], as a data-processing workflow through the UI of Texera. illustrated in Figure 1. We assume the input relations A workflow is a directed-acyclic graph (DAG) 𝐺 = (𝑉 , 𝐸), between the workflow versions to be the same and de- where vertices are operators and edges represent the di- terminism of the operators. We present Drove, a holistic rection of the data flow. A workflow can have multiple approach to managing the end-to-end lifecycle of orches- sink operators, each of which produces its own results. trating, refining, and executing workflows and examining For each sink operator, we can view the sub-DAG consist- their corresponding results. It is developed in Texera to ing of its ancestors as a query that produces the results in provide the means for the user to conduct the analytics this sink. Figure 3 shows the three sub-DAGs correspond- and efficiently store and reuse the versions and results. ing to the three sink operators in the running example Related work. Existing solutions for workflows [11, 10] (version a). rely on identifying the exact match of the entire DAG or a sub-DAG of a workflow to reuse materialized re- Sub-DAG1 sults. These solutions cannot solve the case where two Sub-DAG2 workflows are semantically equivalent but have differ- ent structures. The solution in [12] verifies the semantic Sub-DAG3 equivalence of two Spark workflow jobs, and it supports a limited number of operators such as aggregation. It Figure 3: Sink operators and their corresponding sub-DAGs. cannot verify the equivalence of the workflows in Fig- Drove uses a catalog to store the following informa- cannot reuse the results of ℎ𝑏 to answer ℎ𝑐 . In this case, it tion: (1) workflows; (2) their versions and the differences looks for an earlier version, 𝑣𝑎 . It compares the sub-DAG between two adjacent versions; (3) metadata about work- of ℎ𝑎 with its corresponding one from 𝑣𝑎 . The Analyzer flow executions, such as version, the start and end times, gives a positive answer this time, so we can reuse the states, etc. We also store the sink-operator results of eachresult of ℎ𝑎 to answer ℎ𝑐 . execution in a database. To reduce the storage space, the In general, for every sink in the execution request of a system allows users to delete the results of some of the ex-workflow, we do the following. For each previous version ecutions. The Version Control Manager is in charge and the corresponding sink operator, the Execution Man- of the workflow versions and their creation, storage, and ager first identifies if the structure of the sub-DAGs are retrieval. When a workflow with a version 𝑣𝑛 is modified, similar, otherwise it contacts the Analyzer to verify their a new version is created that includes those changes, i.e., semantic equivalence. This process terminates when the 𝑣𝑛+1 = 𝑣𝑛 + Δ. The changes Δ can be any combination of Analyzer confirms an equivalence between the current DAG operations such as addition of a new operator, dele- sink and the sink of one of the earlier versions. tion of an existing operator, substitution of an operator, Open Problems. The number of versions for a single property editing of an operator, and addition or deletion workflow can be large. For instance, in a Texera pro- of a link. For the remaining of this discussion, when we duction system, some workflows can have more than 80 refer to a version, it is a version that has been executed, executed versions. Checking the semantic equivalence because we are interested in their results. For each exe- between a sink operator with those of all the previous ver- cuted version, we keep at most one execution result. In sions can have a high overhead. One interesting question the case where a version is executed multiple times, we that we are exploring is deciding when to stop comparing only store the result of the first execution, and reuse for the version of the execution request with prior ones in- the later executions of the same version. A workflow stead of comparing with all the previous versions without submitted by the user is executed by the backend engine finding any positive semantic equivalent match. We plan called Amber [16]. The Analyzer checks the semantic to devise an objective function to decide on the fly when equivalence of DAGs. More details about those modules to stop the comparison considering a few factors, such will be explained shortly. as the degree of differences between the versions, the size of the processed data, the expense of the execution 2.1. Answering Workflows Using job and other indicators. Another direction is to avoid unnecessary equivalence verification by identifying the Materialized Results structural similarities of the sub-DAGs. For example, When the user submits a version of the workflow, the before calling the Analyzer to verify the equivalence of Execution Manager tries to see if one of the previous the sub-DAGs of ℎ𝑐 and ℎ𝑏 , we first verify the structural execution results can be used to answer the workflow. similarity of the sub-DAG of ℎ𝑐 with an earlier sub-DAG For instance, when the user submits the version 𝑣𝑐 in the that has the same structure, i.e. ℎ𝑎 . To avoid storing running example, we try to find any reusable results from all the different structures of all workflow versions, we the executions of 𝑣𝑎 and 𝑣𝑏 . In particular, consider the only keep the structure of the sub-DAG that created the wordcloud sink 𝑤𝑐 operator in 𝑣𝑐 and its corresponding materialized result first. sink operator 𝑤𝑏 in 𝑣𝑏 . Notice that the sub-DAG of 𝑤𝑐 and the sub-DAG of 𝑤𝑏 have the same structure. Such 2.2. Verifying equivalence between two structural similarities can be identified using existing DAGs techniques [11]. In this case, we can use the results of 𝑤𝑏 as the result of 𝑤𝑐 . When the Analyzer receives a pair of DAGs, it needs to Now consider the scatterplot sink 𝑠𝑐 operator in 𝑣𝑐 and verify their semantic equivalence. We view each DAG a its corresponding sink operator 𝑠𝑏 in 𝑣𝑏 . The sub-DAG of query, so essentially we want to check the equivalence 𝑠𝑐 and the sub-DAG of 𝑠𝑏 have different structures. We of two queries. Notice that checking equivalence of two push the pair of these two sub-DAGs to the Analyzer to queries is undecidable in general [17]. In the literature verify their equivalence. Based on the positive answer there are solutions for queries with certain constraints, from the Analyzer, the Execution Manager decides to such as UDP [13], Equitas [14], and Spes [15]. Table 1 reuse the results of 𝑠𝑏 to answer 𝑠𝑐 . describes the conditions that need to be satisfied to use Next we consider the choropleth sink operator ℎ𝑐 in Equitas and Spes to verify set equivalence. In our system 𝑣𝑐 and its corresponding sink operator ℎ𝑏 in 𝑣𝑏 . The sub- we want to support different kinds of semantics, such as DAG of ℎ𝑐 and the sub-DAG of ℎ𝑏 have different struc- set semantic, bag semantic, and list semantic, depending tures. We push the pair of these two sub-DAGs to the on the needs of the user. Analyzer to verify their equivalence. Since the answer The Analyzer incorporates one of these solutions as from the Analyzer is negative, the Execution Manager a module called “Equivalence Verifier” (𝐸𝑉). To use this Table 1 of the 𝐸𝑉. Adding Label on the left or Unnest on the right Constraints the query pair should satisfy to use Equitas(♣) will violate the constraints. In general, there can be more and Spes(♢) for set equivalence verification. than one maximal covering segment. Condition/Operator SPJ Outer Agg (count, Agg (max, Union Given two workflow DAGs, an 𝐸𝑉, and a type of equiv- Join sum, avg) min) Predicate conditions have ♣♢ ♣♢ ♣♢ ♣♢ ♢ alence such as set equivalence, bag equivalence, or list to be linear equivalence, we compute a pair of maximal segments The pair should have ex- ♣ ♣ ♣ actly 0 or 1 of the operator that cover all the changes between these two DAGs and types Table is not scanned more ♣ satisfy the constraints of the 𝐸𝑉. After that, the Analyzer than once passes the two segments as two queries to the 𝐸𝑉 to verify Input must be SPJ ♣ The pair is isomorphic ♢ ♢ ♢ ♢ their equivalence. Grouping columns must be ♢ ♢ the same 2.3. Extending Equivalence Verifiers to module, we need to ensure the queries passed to it meet Include Non-relational Operators its constraints. When the pair of DAGs violate the con- straints, e.g., inclusion of Unnest and Label in the run- It is possible that the minimum segments that contain the ning example, the Analyzer cannot pass the pair to the changes do not satisfy the constraints to use an existing 𝐸𝑉. Notice that the two DAGs are isomorphic to each 𝐸𝑉s. For instance, in the running example, a change other except those places with changes. We exploit this on the Label operator will result in the operator being isomorphic mapping to break each DAG into smaller included in the minimum segment. The equivalence of “segments”. We can reduce the problem of evaluating the two operators in the two versions cannot be verified the equivalence of entire DAGs to the problem of verify- by the existing 𝐸𝑉’s because it is a non-relational operator. ing segment-wise equivalence. The segments start from In this case, the Analyzer cannot verify equivalence of the left most changes and end at the right most changes the pair. We plan to extend the existing 𝐸𝑉s to overcome following the topological ordering of the DAG’s. For these limitations. example, Figure 4 shows the pair of DAGs of the scatter- We define the result of a sink operator as a list of tuples plot sink operator in versions 𝑣𝑐 and 𝑣𝑏 . The minimum with a specific schema and a possible order. To reason segments are highlighted to show the inclusion of all the semantics of a segment, we represent its results as the differences between the two versions, i.e., addition ⟨Tuple⟩ and ⟨List⟩. ⟨Tuple⟩ represents a single tuple on the of Filter before Join and removal of Filter after Join in 𝑣𝑏 , list. ⟨List⟩ represents the entire result list and contains and removal of Filter before Join and addition of Filter information about how many times a tuple exists in the after Join in 𝑣𝑐 . list and the order of the list. ⟨Tuple⟩ and ⟨List⟩ can be minimum segment represented using the elements 𝑇, 𝑆, 𝐶, and 𝑂 as follows. maximal ⟨Tuple⟩ 𝑇 ∶∶= 𝐹 𝑂𝐿 and 𝑆 ∶∶= 𝑐𝑜𝑙𝑢𝑚𝑛𝑠 covering segment isomorphic ⟨List⟩ 𝐶 ∶∶= 𝑆𝑃𝑁 𝐹 and 𝑂 ∶∶= 𝑐𝑜𝑛𝑠𝑡𝑟𝑎𝑖𝑛𝑡 isomorphic mapping mapping 𝑇 is a first-order-logic (FOL) formula that indicates if minimum an input tuple exists in the output result or not. 𝑆 repre- segment sents the set of columns, i.e., the schema of the tuple in maximal the output result. 𝐶 indicates the cardinality of a tuple in covering segment the entire relation and is represented in a sum-product normal form (SPNF). 𝑂 contains the columns the list is ordered in. This result representation allows us to only Figure 4: Minimum segments and maximal covering seg- use ⟨Tuple⟩ elements when we need the set semantics. ments for the scatterplot sinks in 𝑣𝑏 and 𝑣𝑐 . We abstract the operators to show their impact on each part of the representation in Table 2. We construct the Once the minimum segment in a version is identified, representation at each operator using its own logic based the Analyzer expands it in both directions to include as on its properties. Finally, to verify if two segments are many operators as possible while they still satisfy the equivalent, we ask an SMT solver [18] to verify whether constraints of the 𝐸𝑉. In other words, the Analyzer aims 𝑠𝑒𝑔𝑚𝑒𝑛𝑡1 ≠ 𝑠𝑒𝑔𝑚𝑒𝑛𝑡2 using their representation is satisfi- to find a maximal covering segment that satisfies the able or not. constraints of the 𝐸𝑉. In the running example, to find a maximal covering segment for each minimum segment, 3. Conclusion and future work we include Scan on the left and Filter on the right. The in- In this work, we introduced Drove, a framework that clusion of these operators does not violate the constraints manages the end-to-end lifecycle of the execution result Table 2 enabled lifecycle management of collaborative data Impact of an operator on element of the ⟨Tuple⟩ and ⟨List⟩ analysis workflows, IEEE Data Eng. Bull. (2018). representation. [3] S. Woodman, H. Hiden, P. Watson, P. Missier, Operator Achieving reproducibility by combining prove- Order By Label Unnest Replicate Representation nance with service and workflow versioning, in: T WORKS’11, 2011. Tuple S [4] Y. Zhang, F. Xu, E. Frise, S. Wu, B. Yu, W. Xu, Data- C List lab: a version data management and analytics sys- O tem, in: BIGDSE@ICSE’16, 2016. [5] A. Chen, A. Chow, A. Davidson, A. DCunha, of a data-processing workflow. Drove includes in its A. Ghodsi, S. A. Hong, A. Konwinski, C. Mewald, core a few modules to achieve the tracking of the results. S. Murching, T. Nykodym, P. Ogilvie, M. Parkhe, We presented a few optimizations to reuse previously- A. Singh, F. Xie, M. Zaharia, R. Zang, J. Zheng, C. Zu- stored results by reasoning the semantics of workflow mar, Developments in mlflow: A system to acceler- versions that produced the results. We showed a unique ate the machine learning lifecycle, in: DEEM@SIG- technique to decompose a complex workflow DAG to MOD’20, 2020. smaller segments that include the version changes to [6] M. Vartak, H. Subramanyam, W. Lee, verify their equivalence. We also proposed a technique to S. Viswanathan, S. Husnoo, S. Madden, M. Zaharia, capture the semantics of non-relational operators using Modeldb: a system for machine learning model a lightweight representation. management, in: HILDA@SIGMOD’16, 2016. We plan to enhance the framework by studying the [7] Klaus Greff, Aaron Klein, Martin Chovanec, Frank following topics. (1) We plan to extend the current pro- Hutter, Jürgen Schmidhuber, The Sacred Infras- totype to highlight fine-grain differences between a pair tructure for Computational Research, in: SciPy’17, of results. We also plan to include a high-level snapshot 2017. of the different versions and their results to give the user [8] G. Gharibi, V. Walunj, R. Alanazi, S. Rella, Y. Lee, an overview of the different runs. (2) One challenge in Automated management of deep learning experi- the framework is to decide the number of versions to ments, in: DEEM@SIGMOD’19, 2019. compare the current version with in order to maximize [9] H. Miao, A. Li, L. S. Davis, A. Deshpande, Towards the chance of reusing earlier results. In the future, we unified data and lifecycle management for deep plan to use a cost-based process to decide the number. learning, in: ICDE’17, 2017. (3) We plan to extend the verification to include contain- [10] I. Elghandour, A. Aboulnaga, Restore: Reusing ment relation so that we further reduce the storage and results of mapreduce jobs, VLDB’12 (2012). maximize reuse opportunities. We need a way to iden- [11] F. Nagel, P. A. Boncz, S. Viglas, Recycling in tify a delta query to be run on existing results to answer pipelined query evaluation, in: ICDE’13, 2013. the contained version request. (4) We plan to study the [12] S. Grossman, S. Cohen, S. Itzhaky, N. Rinetzky, degree of similarity between workflow versions so that M. Sagiv, Verifying equivalence of spark programs, we can quickly identify those with the highest similarity in: CAV’17, 2017. with the current version. [13] S. Chu, B. Murphy, J. Roesch, A. Cheung, D. Suciu, Axiomatic foundations and algorithms for deciding semantic equivalences of SQL queries, VLDB’18 Acknowledgments (2018). [14] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris, D. Xu, This work is supported by a graduate fellowship from Automated verification of query equivalence using KSU and partially supported by the National Science satisfiability modulo theories, VLDB’19 (2019). Foundation under the awards III 1745673 and III 2107150 [15] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris, and the Orange County Health Care Agency. J. Wu, SPES: A two-stage query equivalence verifier, CoRR’20 (2020). [16] A. Kumar, Z. Wang, S. Ni, C. Li, Amber: A debug- References gable dataflow system based on the actor model, VLDB 13 (2020). [1] Z. Wang, A. Kumar, S. Ni, C. Li, Demonstration [17] A. Mostowski, Impossibility of an algorithm for of interactive runtime debugging of distributed the decision problem in finite classes, Journal of dataflows in texera, VLDB 13 (2020). Symbolic Logic 15 (1950). [2] H. Miao, A. Deshpande, Provdb: Provenance- [18] L. M. de Moura, N. S. Bjørner, Z3: an efficient SMT solver, in: TACAS’08, 2008.