<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta />
    <article-meta>
      <title-group>
        <article-title>Results of Workflows on Large Data</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>Sadeem Alsudais</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>Supervised by Chen Li</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>CEUR Workshop ProceedingsC(EUR-WS.org)</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="editor">
          <string-name>Sydney, Australia.</string-name>
        </contrib>
        <aff id="aff0">
          <label>0</label>
          <institution>Department of Computer Science</institution>
          ,
          <addr-line>UC Irvine, CA 92697</addr-line>
          ,
          <country country="US">USA</country>
        </aff>
      </contrib-group>
      <abstract>
        <p>Data analytics using workflows is an iterative process, in which an analyst makes many iterations of changes, such as additions, deletions, and alterations of operators and their links. In many cases, the analyst wants to compare these workflow versions and their execution results to help in deciding the next iterations of changes. Moreover, the analyst needs to know which versions produced undesired results to avoid refining the workflow in those versions. To enable the analyst to get an overview of the workflow versions and their results, we introDdruocvee, a framework that manages the end-to-end lifecycle of constructing, refining, and executing workflows on large data sets and provides a dashboard to monitor these execution results. In many cases, the result of an execution is the same as the result of a prior execution. Identifying such equivalence between the execution results of diferent workflow versions is important for two reasons. First, it can help us reduce the storage cost of the results by storing equivalent results only once. Second, stored results of early executions can be reused for future executions with the same results. Existing tools that track such executions are geared towards small-scale data and lack the means to reuse existing results in future executiDornosv.eI,nwe reason the semantic equivalence of the workflow versions to reduce the storage space and reuse the materialized results. workflow version control, workflow reproduciblity, semantic workflow equivalence verification</p>
      </abstract>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>1. Introduction</title>
      <sec id="sec-1-1">
        <title>Data-processing workflows are extensively used by</title>
        <p>Texera is an open source system</p>
        <p>we have been
analysts to extract and analyze data over large volumsceans:tw.eet regex:"climate"
wordcloud</p>
        <p>scan:state
label:relative?
developing in the past years that provides a GUI-based
interface for users to construct a workflow as a DAG of
operators, refine and fine-tune the workflow, execute
it, and examine the final result1s].[ The users may
perform multiple iterations of refinement, execution,
and examination before producing the final versiosncan:tweet regex:"climate"
of the workflow [2, 3]. A refinement of the workflow
creates a new version. Tracking diferent versions of a
wordcloud
label:relative?
data analytics, one would be interested in looking at tah.e
past execution results to get answers for the followinDguplicate Result
questions.
workflow and its produced results is a growing area of in green). The operator highlighted in blue has the
interest4[, 5, 6, 7, 8, 9]. Due to the iterative process in same results of the corresponding operator in version
(b) Version b: after adding a filter operator (highlighted</p>
        <p>Version Change
join:location
filter:label=0unnest:coordinate scatterplot
filter:label=1 aggregate:state choropleth
scan:state filter:state=CA
join:location
filter:label=0 unnest:coordinatescatterplot
filter:label=1 aggregate:state choropleth
(a) Version a: initial construction.</p>
        <p>Duplicate Result</p>
        <p>Version Change</p>
        <p>Which workflow versions generated these
Q1.
results?
Q2. How did the diferences between two versions
afect their results?
scan:tweet regex:"climate"
wordcloud
label:relative?
scan:state</p>
        <p>filter:state=CA filter:label=0 unnest:coordinate scatterplot
join:location</p>
        <p>filter:label=1 aggregate:state choropleth
(c) Version c: after deleting the filter operator and adding</p>
        <p>it after the join operator. The two operators
highysis workflow that evolved into three versions. In the sponding operators in version a and version b.
Motivation. Figure1 illustrates an example of an anal-lighted in blue have the same results of the
correand their duplicate results of diferent executions.</p>
        <p>example, a data analyst is interested in the tweets whose
content contains the keywoclridmate. In the first ver- ure 1. Another large body of work is about checking
sion shown in Figure1a, she wants to look at the mostthe semantic equivalence of two SQL queries, such as
discussed topics in a “wordcloud” visualization. She aUlsDoP [13], Equitas 1[4], and Spes [15]. One may want
uses an operator to further label the tweets as relattoedsotlove our problem by treating a workflow as a SQL
climate or not, using labels “related” and ”unrelated”q.uSehrey, possibly with UDF functions, then using these
wants to visualize the “related” tweets as a choropsoleltuhtions. Unfortunately, these solutions have certain
map that aggregates the tweet count by each stater.eSshterictions on the type of supported operators, such as
wants to visualize those “unrelated” tweets as a scarteltaetri-onal operators and a restrictive class of user-defined
plot by their location. After running this version, sfhuenction (UDF) operators. They cannot support those
opnotices that the most associated topwiicldisfire from erators that are common in workflows, such as labeling
the wordcloud operator. Since wildfires are commoannd unnest in the running example.
in California, she decides to look at the spatial distribOuu-r goal is to develoDprove to overcome these
limitation of the tweets in this state. Hence, she refines tthieons. In particular, it should find semantic equivalence of
workflow to version b by adding a filter in the Californiwaorkflows even if they have diferent structures, and
supregion, as shown in Figur1eb. After she runs this version, port a variety of operators including relational operators
she notices that adding the region filter caused the choarnod- other types such as UDFs.
pleth map to highlight only California. So she decides to
vsdceoartsthitoeenrfiltpcelirontfoFsriogtfuhvre1eecrs.csaiNotontwea,rspahlnoedtwvraeenrsutsislottnoocncltoyom,apenaxdraegmetinnheeertahtees seudbitsmit joVberMsioannaCgoenrtrol pverersviioonuss son
eddanaenTnstdhihsrbititesohyaeUerixoSrda.femf“oxuprenlcmerueastlnhiaaootgwneidsnr”getsthtuwehleeitmedst.ipfesorIrnientntatChnawicsloeiwrfookofrflornpwkir,avowevvreiedsrisiosneunegsskatthoeSegFminedAnntsa?lyzecronditions exe?cute EMxaencaugtiltrssueoenr ttaeaadm irvse
pOruorvsiodleustuiocnh.aOdnaeswhbaoyaisrtdotsotgoureideeatchhewaonraklfloywtivcesrtsiaosnks. EquVivearilfeynce EngEixneec(uAtimonber) DB Catalog
and its execution resul2t,s3[]. Its main limitation is that
there can be many versions and executions of a workflow,
and these results can be large. Thus this approach can
consume a lot of storage spac1e0][. For example, in Figure 2: Drove’s Tracking Executions Modules.
one deployment oTfexera, it recorded 2,039 executions
from 91 diferent workflows in 75 days. Storing all these
execution results required a lot of space. 2. Drove Overview</p>
        <p>To address this challenge, we want to develop a
solution that leverages the fact many of these results caFnigbuere2 depicts an overview oDfrove. A user formulates
equivalent due to the iterative analytic pr3o,c6e]s,sa[s a data-processing workflow through the UIToefxera.
illustrated in Figu1r.eWe assume the input relationAs workflow is a directed-acyclic graph (DAG=) ( , ) ,
between the workflow versions to be the same and dew- here vertices are operators and edges represent the
diterminism of the operators. We presDenrotve, a holistic rection of the data flow. A workflow can have multiple
approach to managing the end-to-end lifecycle of orchsiensk- operators, each of which produces its own results.
trating, refining, and executing workflows and examiningFor each sink operator, we can view the sub-DAG
consisttheir corresponding results. It is developTedexienra to ing of its ancestors as a query that produces the results in
provide the means for the user to conduct the analytthicisssink. Figur3e shows the three sub-DAGs
correspondand eficiently store andreuse the versions and results.ing to the three sink operators in the running example
Related work. Existing solutions for workflow1s1[, 10] (version a).
rely on identifying the exact match of the entire DAG
or a sub-DAG of a workflow to reuse materialized re- Sub-DAG1
sults. These solutions cannot solve the case where two Sub-DAG2
workflows are semantically equivalent but have
diferent structures. The solution12in] v[erifies the semantic Sub-DAG3
equivalence of two Spark workflow jobs, and it supports
a limited number of operators such as aggregationF. igIture 3: Sink operators and their corresponding sub-DAGs.
cannot verify the equivalence of the workflows in
Fig</p>
        <p>Drove uses a catalog to store the following inforcmaan-not reuse the resultℎs otfo answerℎ . In this case, it
tion: (1) workflows; (2) their versions and the diferenceslooks for an earlier versio n. ,It compares the sub-DAG
between two adjacent versions; (3) metadata about worfkℎ- with its corresponding one fro m.The Analyzer
lfow executions, such as version, the start and end timegsi,ves a positive answer this time, so we can reuse the
states, etc. We also store the sink-operator results orfesauclht oℎf to answerℎ .
execution in a database. To reduce the storage space, thIne general, for every sink in the execution request of a
system allows users to delete the results of some of thweeoxr-kflow, we do the following. For each previous version
ecutions. TheVersion Control Manager is in charge and the corresponding sink operator,Etxheceution
Manof the workflow versions and their creation, storage, aangder first identifies if the structure of the sub-DAGs are
retrieval. When a workflow with a versio nis modified,</p>
        <p>similar, otherwise it contactsAtnhaelyzer to verify their
a new version is created that includes those changes,sie.me.a,ntic equivalence. This process terminates when the
DAG operations such as addition of a new operator, dseilnek- and the sink of one of the earlier versions.
 +1 =   + Δ. The changesΔ can be any combination ofAnalyzer confirms an equivalence between the current
tion of an existing operator, substitution of an operaOtopre,n Problems. The number of versions for a single
property editing of an operator, and addition or delewtoiornkflow can be large. For instance, inTeaxera
proof a link. For the remaining of this discussion, when wdeuction system, some workflows can have more tha8n0
refer to a version, it is a version that has been execuetxeedc,uted versions. Checking the semantic equivalence
because we are interested in their results. For eachbeextew-een a sink operator with those of all the previous
vercuted version, we keep at most one execution results.iIonns can have a high overhead. One interesting question
the case where a version is executed multiple times, twheat we are exploring is deciding when to stop comparing
only store the result of the first execution, and reusetfhoerversion of the execution request with prior ones
inthe later executions of the same version. A workfloswtead of comparing with all the previous versions without
submitted by the user is executed by the backend enginfieding any positive semantic equivalent match. We plan
calledAmber [16]. The Analyzer checks the semantic to devise an objective function to decide on the fly when
equivalence of DAGs. More details about those modulteosstop the comparison considering a few factors, such
will be explained shortly.</p>
        <sec id="sec-1-1-1">
          <title>2.1. Answering Workflows Using Materialized Results</title>
          <p>as the degree of diferences between the versions, the
size of the processed data, the expense of the execution
job and other indicators. Another direction is to avoid
unnecessary equivalence verification by identifying the
structural similarities of the sub-DAGs. For example,
as the result o f .
structural similarities can be identified using existing
techniques1[1]. In this case, we can use the results  of</p>
        </sec>
        <sec id="sec-1-1-2">
          <title>DAGs</title>
          <p>When the user submits a version of the workflow, thebefore calling thAenalyzer to verify the equivalence of
Execution Manager tries to see if one of the previoutshe sub-DAGs ofℎ andℎ , we first verify the structural
execution results can be used to answer the workfloswim. ilarity of the sub-DAG oℎf with an earlier sub-DAG
For instance, when the user submits the versioin the</p>
          <p>that has the same structure, ℎi.e.. To avoid storing
wordcloud sink  operator in and its correspondingmaterialized result first.
running example, we try to find any reusable results froamll the diferent structures of all workflow versions, we
the executions of and   . In particular, consider thoenly keep the structure of the sub-DAG that created the
sink operator in   . Notice that the sub-DAG  o f
and the sub-DAG of  have the same structure. Such2.2. Verifying equivalence between two</p>
        </sec>
      </sec>
      <sec id="sec-1-2">
        <title>When theAnalyzer receives a pair of DAGs, it needs to</title>
        <p>Now consider the scatterplot sinokperator in and verify their semantic equivalence. We view each DAG a
its corresponding sink operat oirn   . The sub-DAG of query, so essentially we want to check the equivalence
  and the sub-DAG of have diferent structures. Weof two queries. Notice that checking equivalence of two
push the pair of these two sub-DAGs to Athnealyzer to</p>
        <p>queries is undecidable in genera17l][. In the literature
verify their equivalence. Based on the positive answtehrere are solutions for queries with certain constraints,
from theAnalyzer, theExecution Manager decides to such as UDP [13], Equitas 1[4], and Spes [15]. Table1
describes the conditions that need to be satisfied to use
reuse the results  ofto answe r .</p>
        <p>Next we consider the choropleth sink operℎa tionr Equitas and Spes to verify set equivalence. In our system
  and its corresponding sink operaℎt oirn  . The
sub</p>
        <p>we want to support diferent kinds of semantics, such as
tures. We push the pair of these two sub-DAGs to tohne the needs of the user.</p>
        <p>DAG of ℎ and the sub-DAG ofℎ have diferent struc- set semantic, bag semantic, and list semantic, depending
Analyzer to verify their equivalence. Since the answerThe Analyzer incorporates one of these solutions as
from theAnalyzer is negative, thEexecution Manager</p>
        <p>a module called “Equivalence Verifier ” (). To use this
SPJ Outer Agg (count, Agg (max, Union</p>
        <p>Join sum, avg) min)
Predicate conditions have ♣ ♢ ♣ ♢ ♣ ♢ ♣ ♢
to be linear
The pair should have
exactly 0 or 1 of the operator
types
Table is not scanned more
than once
Input must be SPJ ♣
The pair is isomorphic ♢ ♢
Grouping columns must be ♢ ♢
the same</p>
        <p>2.3. Extending Equivalence Verifiers to
module, we need to ensure the queries passed to it meet Include Non-relational Operators
its constraints. When the pair of DAGs violate the
constraints, e.g., inclusionUonfnest and Label in the run- It is possible that the minimum segments that contain the
ning example, theAnalyzer cannot pass the pair to thcehanges do not satisfy the constraints to use an existing
 . Notice that the two DAGs are isomorphic to e achs. For instance, in the running example, a change
other except those places with changes. We exploit otnhistheLabel operator will result in the operator being
isomorphic mapping to break each DAG into smalleinrcluded in the minimum segment. The equivalence of
“segments”. We can reduce the problem of evaluatitnhge two operators in the two versions cannot be verified
the equivalence of entire DAGs to the problem of verbifyy-the existing ’s because it is a non-relational operator.
ing segment-wise equivalence. The segments start froInm this case, thAenalyzer cannot verify equivalence of
the left most changes and end at the right most changtehse pair. We plan to extend the existinsgto overcome
following the topological ordering of the DAG’s. tFhoerse limitations.
example, Figure4 shows the pair of DAGs of the scatter- We define the result of a sink operator as a list of tuples
plot sink operator in versio nasnd   . The minimum with a specific schema and a possible order. To reason
segments are highlighted to show the inclusion oftahlel semantics of a segment, we represent its results as
the diferences between the two versions, i.e., additio⟨nTuple⟩ and⟨List⟩. ⟨Tuple⟩ represents a single tuple on the
of Filter beforeJoin and removal oFfilter after Join in   , list.⟨List⟩ represents the entire result list and contains
and removal oFfilter beforeJoin and addition oFfilter information about how many times a tuple exists in the
after Join in   . list and the order of the li⟨sTtu.ple⟩ and ⟨List⟩ can be
minimum segment represented using the eleme nt,s,  , and as follows.
isomorphic
mapping</p>
        <p>is a first-order-logic (FOL) formula that indicates if
minimum an input tuple exists in the output result
orrneporte.segment sents the set of columns, i.e., the schema of the tuple in
maximal the output resulti.ndicates the cardinality of a tuple in
sceogvemreinngt the entire relation and is represented in a sum-product
normal form (SPNF). contains the columns the list is
ordered in. This result representation allows us to only
Figure 4: Minimum segments and maximal covering seg- use ⟨Tuple⟩ elements when we need the set semantics.
ments for the scatterplot sinks in   and   . We abstract the operators to show their impact on each
part of the representation in T2a.bWlee construct the</p>
        <p>Once the minimum segment in a version is identified,representation at each operator using its own logic based
theAnalyzer expands it in both directions to includeoans its properties. Finally, to verify if two segments are
many operators as possible while they still satisfyetqhueivalent, we ask an SMT solve1r8][ to verify whether
constraints of the. In other words, thAenalyzer aims  1 ≠  2 using their representation is
satisfito find a maximal covering segment that satisfies theable or not.
constraints of the. In the running example, to find a
maximal covering segment for each minimum segment3,. Conclusion and future work
we includeScan on the left andFilter on the right. The
inclusion of these operators does not violate the constrInaintthsis work, we introduceDdrove, a framework that
manages the end-to-end lifecycle of the execution result
maximal
covering
segment
isomorphic
mapping
⟨Tuple⟩
⟨List⟩</p>
        <p>∶∶=  
 ∶∶=  
and  ∶∶= 
and  ∶∶= 
Table 2 enabled lifecycle management of collaborative data
Impact of an operator on element of the ⟨Tuple⟩ and ⟨List⟩ analysis workflows, IEEE Data Eng. Bull. (2018).
representation. [3] S. Woodman, H. Hiden, P. Watson, P. Missier,
RepOrpeseernattoatrion Order By Label Unnest Replicate
Anachniceeviwnigthreseprrvoidcuecaibnidliwtyorbkyflowcovmebrisnioinnginpgr,oivne:LTuisptle STCO [4] WlYa.OZb:RhaaKnSvg’e1,r1Fs,.i2oX0nu11,d.Ea.tFarimsea,nSa.
gWeum,eBn.tYaun,Wd.aXnua,lyDtaitcas-system, in: BIGDSE@ICSE’16, 2016.</p>
        <p>[5] A. Chen, A. Chow, A. Davidson, A. DCunha,
of a data-processing workflow.Drove includes in its A. Ghodsi, S. A. Hong, A. Konwinski, C. Mewald,
core a few modules to achieve the tracking of the results. S. Murching, T. Nykodym, P. Ogilvie, M. Parkhe,
We presented a few optimizations to reuse previously- A. Singh, F. Xie, M. Zaharia, R. Zang, J. Zheng, C.
Zustored results by reasoning the semantics of workflow mar, Developments in mlflow: A system to
accelerversions that produced the results. We showed a unique ate the machine learning lifecycle, in:
DEEM@SIGtechnique to decompose a complex workflow DAG to MOD’20, 2020.
smaller segments that include the version changes[t6o] M. Vartak, H. Subramanyam, W. Lee,
verify their equivalence. We also proposed a technique to S. Viswanathan, S. Husnoo, S. Madden, M. Zaharia,
capture the semantics of non-relational operators usingModeldb: a system for machine learning model
a lightweight representation. management, in: HILDA@SIGMOD’16, 2016.</p>
        <p>We plan to enhance the framework by studying th[e7] Klaus Gref, Aaron Klein, Martin Chovanec, Frank
following topics. (1) We plan to extend the current pro-Hutter, Jürgen Schmidhuber, The Sacred
Infrastotype to highlight fine-grain diferences between a pair tructure for Computational Research, in: SciPy’17,
of results. We also plan to include a high-level snapshot 2017.</p>
        <p>[8] G. Gharibi, V. Walunj, R. Alanazi, S. Rella, Y. Lee,
of the diferent versions and their results to give the user
an overview of the diferent runs. (2) One challenge in Automated management of deep learning
experithe framework is to decide the number of versions to ments, in: DEEM@SIGMOD’19, 2019.</p>
        <p>[9] H. Miao, A. Li, L. S. Davis, A. Deshpande, Towards
compare the current version with in order to maximize
the chance of reusing earlier results. In the future, we unified data and lifecycle management for deep
plan to use a cost-based process to decide the number. learning, in: ICDE’17, 2017.
(3) We plan to extend the verification to include conta[1in0]- I. Elghandour, A. Aboulnaga, Restore: Reusing
ment relation so that we further reduce the storage andresults of mapreduce jobs, VLDB’12 (2012).
maximize reuse opportunities. We need a way to ide[n1-1] F. Nagel, P. A. Boncz, S. Viglas, Recycling in
tify a delta query to be run on existing results to answerpipelined query evaluation, in: ICDE’13, 2013.
the contained version request. (4) We plan to study[t1h2e] S. Grossman, S. Cohen, S. Itzhaky, N. Rinetzky,
degree of similarity between workflow versions so that M. Sagiv, Verifying equivalence of spark programs,
we can quickly identify those with the highest similarity in: CAV’17, 2017.
with the current version. [13] S. Chu, B. Murphy, J. Roesch, A. Cheung, D. Suciu,
Axiomatic foundations and algorithms for deciding
semantic equivalences of SQL queries, VLDB’18</p>
      </sec>
    </sec>
    <sec id="sec-2">
      <title>Acknowledgments (2018).</title>
      <p>[14] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris, D. Xu,
This work is supported by a graduate fellowship from Automated verification of query equivalence using
KSU and partially supported by the National Science satisfiability modulo theories, VLDB’19 (2019).
Foundation under the awards III 1745673 and III 210715[015] Q. Zhou, J. Arulraj, S. B. Navathe, W. Harris,
and the Orange County Health Care Agency. J. Wu, SPES: A two-stage query equivalence verifier,
CoRR’20 (2020).</p>
      <p>References [16] A. Kumar, Z. Wang, S. Ni, C. Li, Amber: A
debuggable dataflow system based on the actor model,
[1] Z. Wang, A. Kumar, S. Ni, C. Li, Demonstration VLDB 13 (2020).</p>
      <p>[17] A. Mostowski, Impossibility of an algorithm for
of interactive runtime debugging of distributed
dataflows in texera, VLDB 13 (2020). the decision problem in finite classes, Journal of
[2] H. Miao, A. Deshpande, Provdb: Provenance- Symbolic Logic 15 (1950).</p>
      <p>[18] L. M. de Moura, N. S. Bjørner, Z3: an eficient SMT
solver, in: TACAS’08, 2008.</p>
    </sec>
  </body>
  <back>
    <ref-list />
  </back>
</article>