=Paper=
{{Paper
|id=Vol-3186/paper10
|storemode=property
|title=Drove: Tracking Execution Results of Workflows on Large Data
|pdfUrl=https://ceur-ws.org/Vol-3186/paper_10.pdf
|volume=Vol-3186
|authors=Sadeem Alsudais
|dblpUrl=https://dblp.org/rec/conf/vldb/Alsudais22
}}
==Drove: Tracking Execution Results of Workflows on Large Data==
Drove: Tracking Execution Results of Workflows on Large
Data
Sadeem Alsudais
Supervised by Chen Li
Department of Computer Science, UC Irvine, CA 92697, USA
Abstract
Data analytics using workflows is an iterative process, in which an analyst makes many iterations of changes, such as
additions, deletions, and alterations of operators and their links. In many cases, the analyst wants to compare these workflow
versions and their execution results to help in deciding the next iterations of changes. Moreover, the analyst needs to know
which versions produced undesired results to avoid refining the workflow in those versions. To enable the analyst to get an
overview of the workflow versions and their results, we introduce Drove, a framework that manages the end-to-end lifecycle
of constructing, refining, and executing workflows on large data sets and provides a dashboard to monitor these execution
results. In many cases, the result of an execution is the same as the result of a prior execution. Identifying such equivalence
between the execution results of different workflow versions is important for two reasons. First, it can help us reduce the
storage cost of the results by storing equivalent results only once. Second, stored results of early executions can be reused for
future executions with the same results. Existing tools that track such executions are geared towards small-scale data and
lack the means to reuse existing results in future executions. In Drove, we reason the semantic equivalence of the workflow
versions to reduce the storage space and reuse the materialized results.
Keywords
workflow version control, workflow reproduciblity, semantic workflow equivalence verification
1. Introduction
filter:label=0unnest:coordinate scatterplot
wordcloud
Data-processing workflows are extensively used by scan:state
join:location
analysts to extract and analyze data over large volumes. scan:tweet regex:"climate"
Texera is an open source system we have been label:relative?
filter:label=1 aggregate:state choropleth
developing in the past years that provides a GUI-based
interface for users to construct a workflow as a DAG of (a) Version a: initial construction.
Duplicate Result Version Change
operators, refine and fine-tune the workflow, execute
it, and examine the final results [1]. The users may wordcloud filter:label=0 unnest:coordinate scatterplot
perform multiple iterations of refinement, execution, scan:state filter:state=CA
and examination before producing the final version scan:tweet regex:"climate" join:location
of the workflow [2, 3]. A refinement of the workflow label:relative? filter:label=1 aggregate:state choropleth
creates a new version. Tracking different versions of a
(b) Version b: after adding a filter operator (highlighted
workflow and its produced results is a growing area of in green). The operator highlighted in blue has the
interest [4, 5, 6, 7, 8, 9]. Due to the iterative process in same results of the corresponding operator in version
data analytics, one would be interested in looking at the a.
past execution results to get answers for the following Duplicate Result Version Change
questions.
wordcloud filter:state=CA filter:label=0 unnest:coordinate scatterplot
Q1. Which workflow versions generated these scan:state
results? scan:tweet regex:"climate"
join:location
Q2. How did the differences between two versions label:relative?
filter:label=1 aggregate:state choropleth
affect their results? (c) Version c: after deleting the filter operator and adding
it after the join operator. The two operators high-
Motivation. Figure 1 illustrates an example of an anal- lighted in blue have the same results of the corre-
ysis workflow that evolved into three versions. In the sponding operators in version a and version b.
Proceedings of the VLDB 2022 PhD Workshop, September 5, 2022. Figure 1: Multiple versions of a workflow for tweet analysis
Sydney, Australia.
and their duplicate results of different executions.
Envelope-Open salsudai@uci.edu (S. Alsudais)
Orcid 0000-0003-3928-690X (S. Alsudais)
© 2022 Copyright (C) 2022 for this paper by its authors. Use permitted under
Creative Commons License Attribution 4.0 International (CC BY 4.0). example, a data analyst is interested in the tweets whose
CEUR
Workshop
Proceedings
http://ceur-ws.org
ISSN 1613-0073
CEUR Workshop Proceedings (CEUR-WS.org)
content contains the keyword climate . In the first ver- ure 1. Another large body of work is about checking
sion shown in Figure 1a, she wants to look at the most the semantic equivalence of two SQL queries, such as
discussed topics in a “wordcloud” visualization. She also UDP [13], Equitas [14], and Spes [15]. One may want
uses an operator to further label the tweets as related to to solve our problem by treating a workflow as a SQL
climate or not, using labels “related” and ”unrelated”. She query, possibly with UDF functions, then using these
wants to visualize the “related” tweets as a choropleth solutions. Unfortunately, these solutions have certain
map that aggregates the tweet count by each state. She restrictions on the type of supported operators, such as
wants to visualize those “unrelated” tweets as a scatter- relational operators and a restrictive class of user-defined
plot by their location. After running this version, she function (UDF) operators. They cannot support those op-
notices that the most associated topic is wildfire from erators that are common in workflows, such as labeling
the wordcloud operator. Since wildfires are common and unnest in the running example.
in California, she decides to look at the spatial distribu- Our goal is to develop Drove to overcome these limita-
tion of the tweets in this state. Hence, she refines the tions. In particular, it should find semantic equivalence of
workflow to version b by adding a filter in the California workflows even if they have different structures, and sup-
region, as shown in Figure 1b. After she runs this version, port a variety of operators including relational operators
she notices that adding the region filter caused the choro- and other types such as UDFs.
pleth map to highlight only California. So she decides to
do the filter for the scatterplot result only, and generates edits
Version Control
version c in Figure 1c. Now, she wants to compare the Manager previous
versions
scatterplots of version a and version c to examine the submit job versions
density of “unrelated” tweets in California versus the Execution
Analyzer
entire US. ?
Manager
metadata
Find conditions
This example shows the importance of providing a
Segments
results
dashboard for managing the different workflow versions execute
and their execution results. In this work, we seek to ?
provide such a dashboard to guide the analytics tasks. Verify
Execution
Our solution. One way is to store each workflow version Equivalence DB Catalog
Engine (Amber)
and its execution results [2, 3]. Its main limitation is that
there can be many versions and executions of a workflow,
and these results can be large. Thus this approach can
consume a lot of storage space [10]. For example, in Figure 2: Drove’s Tracking Executions Modules.
one deployment of Texera, it recorded 2,039 executions
from 91 different workflows in 75 days. Storing all these
execution results required a lot of space. 2. Drove Overview
To address this challenge, we want to develop a solu-
tion that leverages the fact many of these results can be Figure 2 depicts an overview of Drove. A user formulates
equivalent due to the iterative analytic process [3, 6], as a data-processing workflow through the UI of Texera.
illustrated in Figure 1. We assume the input relations A workflow is a directed-acyclic graph (DAG) 𝐺 = (𝑉 , 𝐸),
between the workflow versions to be the same and de- where vertices are operators and edges represent the di-
terminism of the operators. We present Drove, a holistic rection of the data flow. A workflow can have multiple
approach to managing the end-to-end lifecycle of orches- sink operators, each of which produces its own results.
trating, refining, and executing workflows and examining For each sink operator, we can view the sub-DAG consist-
their corresponding results. It is developed in Texera to ing of its ancestors as a query that produces the results in
provide the means for the user to conduct the analytics this sink. Figure 3 shows the three sub-DAGs correspond-
and efficiently store and reuse the versions and results. ing to the three sink operators in the running example
Related work. Existing solutions for workflows [11, 10] (version a).
rely on identifying the exact match of the entire DAG
or a sub-DAG of a workflow to reuse materialized re- Sub-DAG1
sults. These solutions cannot solve the case where two Sub-DAG2
workflows are semantically equivalent but have differ-
ent structures. The solution in [12] verifies the semantic Sub-DAG3
equivalence of two Spark workflow jobs, and it supports
a limited number of operators such as aggregation. It Figure 3: Sink operators and their corresponding sub-DAGs.
cannot verify the equivalence of the workflows in Fig-
Drove uses a catalog to store the following informa- cannot reuse the results of ℎ𝑏 to answer ℎ𝑐 . In this case, it
tion: (1) workflows; (2) their versions and the differences looks for an earlier version, 𝑣𝑎 . It compares the sub-DAG
between two adjacent versions; (3) metadata about work- of ℎ𝑎 with its corresponding one from 𝑣𝑎 . The Analyzer
flow executions, such as version, the start and end times, gives a positive answer this time, so we can reuse the
states, etc. We also store the sink-operator results of eachresult of ℎ𝑎 to answer ℎ𝑐 .
execution in a database. To reduce the storage space, the In general, for every sink in the execution request of a
system allows users to delete the results of some of the ex-workflow, we do the following. For each previous version
ecutions. The Version Control Manager is in charge and the corresponding sink operator, the Execution Man-
of the workflow versions and their creation, storage, and ager first identifies if the structure of the sub-DAGs are
retrieval. When a workflow with a version 𝑣𝑛 is modified, similar, otherwise it contacts the Analyzer to verify their
a new version is created that includes those changes, i.e., semantic equivalence. This process terminates when the
𝑣𝑛+1 = 𝑣𝑛 + Δ. The changes Δ can be any combination of Analyzer confirms an equivalence between the current
DAG operations such as addition of a new operator, dele- sink and the sink of one of the earlier versions.
tion of an existing operator, substitution of an operator, Open Problems. The number of versions for a single
property editing of an operator, and addition or deletion workflow can be large. For instance, in a Texera pro-
of a link. For the remaining of this discussion, when we duction system, some workflows can have more than 80
refer to a version, it is a version that has been executed, executed versions. Checking the semantic equivalence
because we are interested in their results. For each exe- between a sink operator with those of all the previous ver-
cuted version, we keep at most one execution result. In sions can have a high overhead. One interesting question
the case where a version is executed multiple times, we that we are exploring is deciding when to stop comparing
only store the result of the first execution, and reuse for the version of the execution request with prior ones in-
the later executions of the same version. A workflow stead of comparing with all the previous versions without
submitted by the user is executed by the backend engine finding any positive semantic equivalent match. We plan
called Amber [16]. The Analyzer checks the semantic to devise an objective function to decide on the fly when
equivalence of DAGs. More details about those modules to stop the comparison considering a few factors, such
will be explained shortly. as the degree of differences between the versions, the
size of the processed data, the expense of the execution
2.1. Answering Workflows Using job and other indicators. Another direction is to avoid
unnecessary equivalence verification by identifying the
Materialized Results structural similarities of the sub-DAGs. For example,
When the user submits a version of the workflow, the before calling the Analyzer to verify the equivalence of
Execution Manager tries to see if one of the previous the sub-DAGs of ℎ𝑐 and ℎ𝑏 , we first verify the structural
execution results can be used to answer the workflow. similarity of the sub-DAG of ℎ𝑐 with an earlier sub-DAG
For instance, when the user submits the version 𝑣𝑐 in the that has the same structure, i.e. ℎ𝑎 . To avoid storing
running example, we try to find any reusable results from all the different structures of all workflow versions, we
the executions of 𝑣𝑎 and 𝑣𝑏 . In particular, consider the only keep the structure of the sub-DAG that created the
wordcloud sink 𝑤𝑐 operator in 𝑣𝑐 and its corresponding materialized result first.
sink operator 𝑤𝑏 in 𝑣𝑏 . Notice that the sub-DAG of 𝑤𝑐
and the sub-DAG of 𝑤𝑏 have the same structure. Such 2.2. Verifying equivalence between two
structural similarities can be identified using existing
DAGs
techniques [11]. In this case, we can use the results of 𝑤𝑏
as the result of 𝑤𝑐 . When the Analyzer receives a pair of DAGs, it needs to
Now consider the scatterplot sink 𝑠𝑐 operator in 𝑣𝑐 and verify their semantic equivalence. We view each DAG a
its corresponding sink operator 𝑠𝑏 in 𝑣𝑏 . The sub-DAG of query, so essentially we want to check the equivalence
𝑠𝑐 and the sub-DAG of 𝑠𝑏 have different structures. We of two queries. Notice that checking equivalence of two
push the pair of these two sub-DAGs to the Analyzer to queries is undecidable in general [17]. In the literature
verify their equivalence. Based on the positive answer there are solutions for queries with certain constraints,
from the Analyzer, the Execution Manager decides to such as UDP [13], Equitas [14], and Spes [15]. Table 1
reuse the results of 𝑠𝑏 to answer 𝑠𝑐 . describes the conditions that need to be satisfied to use
Next we consider the choropleth sink operator ℎ𝑐 in Equitas and Spes to verify set equivalence. In our system
𝑣𝑐 and its corresponding sink operator ℎ𝑏 in 𝑣𝑏 . The sub- we want to support different kinds of semantics, such as
DAG of ℎ𝑐 and the sub-DAG of ℎ𝑏 have different struc- set semantic, bag semantic, and list semantic, depending
tures. We push the pair of these two sub-DAGs to the on the needs of the user.
Analyzer to verify their equivalence. Since the answer The Analyzer incorporates one of these solutions as
from the Analyzer is negative, the Execution Manager a module called “Equivalence Verifier” (𝐸𝑉). To use this
Table 1 of the 𝐸𝑉. Adding Label on the left or Unnest on the right
Constraints the query pair should satisfy to use Equitas(♣) will violate the constraints. In general, there can be more
and Spes(♢) for set equivalence verification. than one maximal covering segment.
Condition/Operator SPJ Outer Agg (count, Agg (max, Union Given two workflow DAGs, an 𝐸𝑉, and a type of equiv-
Join sum, avg) min)
Predicate conditions have ♣♢ ♣♢ ♣♢ ♣♢ ♢
alence such as set equivalence, bag equivalence, or list
to be linear equivalence, we compute a pair of maximal segments
The pair should have ex- ♣ ♣ ♣
actly 0 or 1 of the operator that cover all the changes between these two DAGs and
types
Table is not scanned more ♣
satisfy the constraints of the 𝐸𝑉. After that, the Analyzer
than once passes the two segments as two queries to the 𝐸𝑉 to verify
Input must be SPJ ♣
The pair is isomorphic ♢ ♢ ♢ ♢ their equivalence.
Grouping columns must be ♢ ♢
the same
2.3. Extending Equivalence Verifiers to
module, we need to ensure the queries passed to it meet Include Non-relational Operators
its constraints. When the pair of DAGs violate the con-
straints, e.g., inclusion of Unnest and Label in the run- It is possible that the minimum segments that contain the
ning example, the Analyzer cannot pass the pair to the changes do not satisfy the constraints to use an existing
𝐸𝑉. Notice that the two DAGs are isomorphic to each 𝐸𝑉s. For instance, in the running example, a change
other except those places with changes. We exploit this on the Label operator will result in the operator being
isomorphic mapping to break each DAG into smaller included in the minimum segment. The equivalence of
“segments”. We can reduce the problem of evaluating the two operators in the two versions cannot be verified
the equivalence of entire DAGs to the problem of verify- by the existing 𝐸𝑉’s because it is a non-relational operator.
ing segment-wise equivalence. The segments start from In this case, the Analyzer cannot verify equivalence of
the left most changes and end at the right most changes the pair. We plan to extend the existing 𝐸𝑉s to overcome
following the topological ordering of the DAG’s. For these limitations.
example, Figure 4 shows the pair of DAGs of the scatter- We define the result of a sink operator as a list of tuples
plot sink operator in versions 𝑣𝑐 and 𝑣𝑏 . The minimum with a specific schema and a possible order. To reason
segments are highlighted to show the inclusion of all the semantics of a segment, we represent its results as
the differences between the two versions, i.e., addition ⟨Tuple⟩ and ⟨List⟩. ⟨Tuple⟩ represents a single tuple on the
of Filter before Join and removal of Filter after Join in 𝑣𝑏 , list. ⟨List⟩ represents the entire result list and contains
and removal of Filter before Join and addition of Filter information about how many times a tuple exists in the
after Join in 𝑣𝑐 . list and the order of the list. ⟨Tuple⟩ and ⟨List⟩ can be
minimum segment represented using the elements 𝑇, 𝑆, 𝐶, and 𝑂 as follows.
maximal
⟨Tuple⟩ 𝑇 ∶∶= 𝐹 𝑂𝐿 and 𝑆 ∶∶= 𝑐𝑜𝑙𝑢𝑚𝑛𝑠
covering
segment
isomorphic ⟨List⟩ 𝐶 ∶∶= 𝑆𝑃𝑁 𝐹 and 𝑂 ∶∶= 𝑐𝑜𝑛𝑠𝑡𝑟𝑎𝑖𝑛𝑡
isomorphic mapping
mapping 𝑇 is a first-order-logic (FOL) formula that indicates if
minimum an input tuple exists in the output result or not. 𝑆 repre-
segment
sents the set of columns, i.e., the schema of the tuple in
maximal the output result. 𝐶 indicates the cardinality of a tuple in
covering
segment the entire relation and is represented in a sum-product
normal form (SPNF). 𝑂 contains the columns the list is
ordered in. This result representation allows us to only
Figure 4: Minimum segments and maximal covering seg- use ⟨Tuple⟩ elements when we need the set semantics.
ments for the scatterplot sinks in 𝑣𝑏 and 𝑣𝑐 . We abstract the operators to show their impact on each
part of the representation in Table 2. We construct the
Once the minimum segment in a version is identified, representation at each operator using its own logic based
the Analyzer expands it in both directions to include as on its properties. Finally, to verify if two segments are
many operators as possible while they still satisfy the equivalent, we ask an SMT solver [18] to verify whether
constraints of the 𝐸𝑉. In other words, the Analyzer aims 𝑠𝑒𝑔𝑚𝑒𝑛𝑡1 ≠ 𝑠𝑒𝑔𝑚𝑒𝑛𝑡2 using their representation is satisfi-
to find a maximal covering segment that satisfies the able or not.
constraints of the 𝐸𝑉. In the running example, to find a
maximal covering segment for each minimum segment, 3. Conclusion and future work
we include Scan on the left and Filter on the right. The in-
In this work, we introduced Drove, a framework that
clusion of these operators does not violate the constraints
manages the end-to-end lifecycle of the execution result
Table 2 enabled lifecycle management of collaborative data
Impact of an operator on element of the ⟨Tuple⟩ and ⟨List⟩ analysis workflows, IEEE Data Eng. Bull. (2018).
representation. [3] S. Woodman, H. Hiden, P. Watson, P. Missier,
Operator Achieving reproducibility by combining prove-
Order By Label Unnest Replicate
Representation nance with service and workflow versioning, in:
T WORKS’11, 2011.
Tuple
S
[4] Y. Zhang, F. Xu, E. Frise, S. Wu, B. Yu, W. Xu, Data-
C
List lab: a version data management and analytics sys-
O
tem, in: BIGDSE@ICSE’16, 2016.
[5] A. Chen, A. Chow, A. Davidson, A. DCunha,
of a data-processing workflow. Drove includes in its A. Ghodsi, S. A. Hong, A. Konwinski, C. Mewald,
core a few modules to achieve the tracking of the results. S. Murching, T. Nykodym, P. Ogilvie, M. Parkhe,
We presented a few optimizations to reuse previously- A. Singh, F. Xie, M. Zaharia, R. Zang, J. Zheng, C. Zu-
stored results by reasoning the semantics of workflow mar, Developments in mlflow: A system to acceler-
versions that produced the results. We showed a unique ate the machine learning lifecycle, in: DEEM@SIG-
technique to decompose a complex workflow DAG to MOD’20, 2020.
smaller segments that include the version changes to [6] M. Vartak, H. Subramanyam, W. Lee,
verify their equivalence. We also proposed a technique to S. Viswanathan, S. Husnoo, S. Madden, M. Zaharia,
capture the semantics of non-relational operators using Modeldb: a system for machine learning model
a lightweight representation. management, in: HILDA@SIGMOD’16, 2016.
We plan to enhance the framework by studying the [7] Klaus Greff, Aaron Klein, Martin Chovanec, Frank
following topics. (1) We plan to extend the current pro- Hutter, Jürgen Schmidhuber, The Sacred Infras-
totype to highlight fine-grain differences between a pair tructure for Computational Research, in: SciPy’17,
of results. We also plan to include a high-level snapshot 2017.
of the different versions and their results to give the user [8] G. Gharibi, V. Walunj, R. Alanazi, S. Rella, Y. Lee,
an overview of the different runs. (2) One challenge in Automated management of deep learning experi-
the framework is to decide the number of versions to ments, in: DEEM@SIGMOD’19, 2019.
compare the current version with in order to maximize [9] H. Miao, A. Li, L. S. Davis, A. Deshpande, Towards
the chance of reusing earlier results. In the future, we unified data and lifecycle management for deep
plan to use a cost-based process to decide the number. learning, in: ICDE’17, 2017.
(3) We plan to extend the verification to include contain- [10] I. Elghandour, A. Aboulnaga, Restore: Reusing
ment relation so that we further reduce the storage and results of mapreduce jobs, VLDB’12 (2012).
maximize reuse opportunities. We need a way to iden- [11] F. Nagel, P. A. Boncz, S. Viglas, Recycling in
tify a delta query to be run on existing results to answer pipelined query evaluation, in: ICDE’13, 2013.
the contained version request. (4) We plan to study the [12] S. Grossman, S. Cohen, S. Itzhaky, N. Rinetzky,
degree of similarity between workflow versions so that M. Sagiv, Verifying equivalence of spark programs,
we can quickly identify those with the highest similarity in: CAV’17, 2017.
with the current version. [13] S. Chu, B. Murphy, J. Roesch, A. Cheung, D. Suciu,
Axiomatic foundations and algorithms for deciding
semantic equivalences of SQL queries, VLDB’18
Acknowledgments (2018).
[14] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris, D. Xu,
This work is supported by a graduate fellowship from Automated verification of query equivalence using
KSU and partially supported by the National Science satisfiability modulo theories, VLDB’19 (2019).
Foundation under the awards III 1745673 and III 2107150 [15] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris,
and the Orange County Health Care Agency. J. Wu, SPES: A two-stage query equivalence verifier,
CoRR’20 (2020).
[16] A. Kumar, Z. Wang, S. Ni, C. Li, Amber: A debug-
References gable dataflow system based on the actor model,
VLDB 13 (2020).
[1] Z. Wang, A. Kumar, S. Ni, C. Li, Demonstration
[17] A. Mostowski, Impossibility of an algorithm for
of interactive runtime debugging of distributed
the decision problem in finite classes, Journal of
dataflows in texera, VLDB 13 (2020).
Symbolic Logic 15 (1950).
[2] H. Miao, A. Deshpande, Provdb: Provenance-
[18] L. M. de Moura, N. S. Bjørner, Z3: an efficient SMT
solver, in: TACAS’08, 2008.