<!DOCTYPE article PUBLIC "-//NLM//DTD JATS (Z39.96) Journal Archiving and Interchange DTD v1.0 20120330//EN" "JATS-archivearticle1.dtd">
<article xmlns:xlink="http://www.w3.org/1999/xlink">
  <front>
    <journal-meta>
      <journal-title-group>
        <journal-title>Multiple workflows scheduling in multi-tenant
distributed systems: A taxonomy and future directions ACM Comput. Surv.</journal-title>
      </journal-title-group>
    </journal-meta>
    <article-meta>
      <title-group>
        <article-title>Static-dynamic algorithm for managing asynchronous computations in distributed environments</article-title>
      </title-group>
      <contrib-group>
        <contrib contrib-type="author">
          <string-name>S A Gorsky</string-name>
          <email>gorsky@icc.ru</email>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <contrib contrib-type="author">
          <string-name>A G Feoktistov</string-name>
          <xref ref-type="aff" rid="aff0">0</xref>
        </contrib>
        <aff id="aff0">
          <label>0</label>
          <institution>Matrosov Institute for System Dynamics and Control Theory of SB RAS</institution>
          ,
          <addr-line>Lermontov St. 134, Irkutsk, Russia, 664033</addr-line>
        </aff>
      </contrib-group>
      <pub-date>
        <year>2020</year>
      </pub-date>
      <volume>53</volume>
      <issue>1</issue>
      <fpage>198</fpage>
      <lpage>205</lpage>
      <abstract>
        <p>The paper addresses a relevant problem of computation scheduling in scientific applications (distributed applied software packages) executed in distributed environments. Forming an optimal schedule of jobs for executing of applied software (modules) is an NP-hard problem. Therefore, in practice, heuristic methods of scheduling are often used. In this regard, we propose a new static-dynamic algorithm for managing computations in heterogeneous distributed environments. The results of operating the proposed algorithm are simulated in comparison with other scenarios for computing management. They show that applying the algorithm makes it possible to achieve a rational balance between the scheduling time and the computations makespan.</p>
      </abstract>
    </article-meta>
  </front>
  <body>
    <sec id="sec-1">
      <title>1. Introduction</title>
      <p>Applications developed for supporting scientific workflows include a variety of software that
implements data processing and analysis, different computational methods, and processes for modeling
the systems under study. Typically, such software is based on a modular approach and oriented to
highperformance computing [1]. Inputs and outputs of modules determine relations between them. In this
case, the computational job describes executed module, its inputs and outputs, features of launching the
module, and requirements to a computing environment. Thus, a workflow consists of a set of
interconnected jobs. Additional job specification includes information about relations between parent
and child jobs.</p>
      <p>When a workflow is executed in a parallel or distributed computing environment of the public access,
non-trivial problems arise in determining the order of workflow job executions on limited resources of
the environment. In general, the formation of the optimal schedule of jobs is NP-hard [2].</p>
      <p>We focus on asynchronous computations. Within the framework of such computations, workflow
modules are launched when their input data is ready.</p>
      <p>In the paper, we consider execution workflows under uncertainties in distributed applied software
packages that relate to a special class of scalable scientific applications [3]. The presence of uncertainties
determines the use of heuristic approaches to scheduling computations.</p>
      <p>Such heuristics should rely on known relations between modules. In addition, considering data
placement when executing workflow modules is also essential.
executable programs (modules) and requirements (number of processors or cores, sizes of RAM and
disk memory, etc.) to resources of a computing environment.</p>
      <p>In the paper, we consider abstract workflows, jobs and data of which are not related to specific
resources of computational environments. In general, such environments can integrate public access
resources with additional resources from various grid and cloud platforms.</p>
      <p>Well-known Workflow Management Systems (WMSs) implement both the resource allocation and
jobs execution of abstract workflows on dedicated resources [5-7]. Often, an abstract workflow is
presented in the form of a Directed Acyclic Graph (DAG), which reflects relations between modules or
jobs of a workflow [8]. Therefore, many WMSs (for example, Pegasus [9]) use popular tools such as
Condor DAGMan [10] or Gridway [11] that support managing similar relation description structures of
computing tasks. Often, Condor DAGMan is independently applied for managing workflows.</p>
      <p>Within the framework of our study, we assume the presence of uncertainties in the job execution
time and the amount of transferred data. In this case, when selecting resources, such tools apply simple
heuristics, such, for example, as the allocation of the first available node with suitable characteristics or
the node with the least number of jobs [12].</p>
      <p>Another popular approach is to split the workflow into a set of micro-workflows, each of which runs
on a separate resource [13]. However, such a decomposition may not always be implementable for an
arbitrary workflow owing to the possible lack of the necessary structural relations between workflow
jobs [14].</p>
      <p>In this regard, we propose a new heuristic, which in some cases allows us to improve the allocation
of resources by taking into account the relations between jobs in comparison with the Condor DAGMan
scheduler. When using this heuristic, the computation planning time is close to the time spent by the
compared tool.</p>
    </sec>
    <sec id="sec-2">
      <title>3. Static-dynamic algorithm</title>
      <p>We have developed an algorithm that includes static and dynamic stages of operation. In the static stage,
we assign priorities to workflow jobs according to our proposed heuristic. These priorities can then
change dynamically in accordance with the data readiness for the execution of modules.</p>
      <sec id="sec-2-1">
        <title>A. Model Workflow</title>
        <p>As an example, let us consider the workflow represented in Figure 1 by a bipartite DAG, which includes
two disjoint sets of vertices (a set of the parameters d1, d2, …, d11 and a set of the modules
m1, m2, …, m10). Arcs reflect the data transfer between modules. Each ith module has a one-to-one
correspondence with the ith job.</p>
        <p>d1</p>
        <p>d2
m1
m2
m3
m4
d3
d4
d5
m5
m6
m7
d6
d7
d8
m8
d9
m9
m10
d10
d11</p>
        <p>The initial data are represented by the parameter d1. d6, d10, and d11 are target parameters, whose
values are to be calculated. We can see that the workflow is weakly structured in terms of the
decomposition into a set of micro-workflows.</p>
      </sec>
      <sec id="sec-2-2">
        <title>B. Heuristic</title>
        <p>The proposed heuristic is based on the analysis of causal relations between modules in the static stage
of the algorithm operation. The execution of each module causes the further execution of a certain
number of modules and the implementation of a certain number of parameter transfers. The analysis is
carried out in order to determine the priorities of jobs for the execution of modules.</p>
        <p>Assignment of jobs priorities begins with modules that no other module depends on. We apply a
lexicographical rule for multi-criteria assigning the job priorities using the following four characteristics
with their optimality conditions [15]:



</p>
        <p>Number na  max of modules launch in the future caused by the module execution,
Number ntd  max of parameters, whose transfers in the future are caused by the module
execution,
Sum np  max of priorities of modules, whose launches in the future are caused by the module
execution,</p>
        <p>Number nrd  min of parameters that are received by a module.</p>
        <p>In comparison with the lexicographical method mentioned in [15], we use estimates for values of
characteristics instead of the values themselves. To obtain such estimates, we apply the original
algorithm [16].</p>
        <p>The results of assigning priorities to the jobs of the workflow shown in Figure 1 are represented in
Table I. The lexicographical rule applied three times to the following groups of modules: {m5, m9, m10},
{m6, m7}, and {m2, m3}.
The general scheme for job scheduling is represented by the main algorithm, which is realized using the
function JobScheduling(). This scheme includes the following operations:



</p>
        <p>Finding available resources that can be allocated,
Determining jobs ready to be run, whose inputs have already been calculated,
Resources allocation for the ready jobs,</p>
        <p>Preparing data transfer for allocated resources.</p>
        <p>These operations are correspondingly implemented by the functions ResourceMonitoring(),
JobMonitoring(), ResourceAllocation(), and DataTransferPreparation(). In addition, we use auxiliary
functions GetDedicatedResources(), ResourseAvailable(), JobReady(), CheckPriority(),
PrioritiesChanging(), ReadyJobAllocation(), and JobAllocationPrediction() which are not discussed in
detail in the paper.</p>
        <p>The simplified algorithms 1-5 for the key functions for monitoring resources and jobs, data transfer
preparation, resource allocation, and job scheduling are given bellow. A monitoring system is
represented in [16]. Data transferring preparation for modules is carried out only when they are
guaranteed to be launched on the allocated resources in accordance with the priorities of their jobs.</p>
        <p>Variables used in the aforementioned algorithms have the following interpretation:







</p>
        <p>DAG wf,
Integer vector r = (r1, r2, …, rk) of dedicated resources, where k is a number of resources,
Boolean vector v = (v1, v2, …, vk) of resource states, where vi = 1 (vi = 0) shows that the ith
resource is available (not available),
Boolean vector w = (w1, w2, …, wl) of job states, where wi = 1 (wi = 0) shows that the ith job is
ready (not ready),
Real vector p = (p1, p2, …, pl) of job priorities,
Boolean matrix A of the dimensions lk shows the result of resource allocation for the ready
jobs, where aij = 1 (aij = 0) means that the jth resource is allocated (not allocated) to the ith
resource.</p>
        <p>Boolean matrix X of the dimensions mk defines the placement of parameters on resources,
where xij = 1 (xij = 0) shows that the ith parameter is placed (not placed) on the jth resource,
where m is a number of parameters.</p>
        <p>Boolean matrix Y of the dimensions mk reflects parameter transfers, where yij = 1 (yij = 0)
shows that the ith parameter will be transferred (will not be transferred) to the jth resource.</p>
        <p>The algorithms considered above form the basis of the proposed static-dynamic algorithm. This
algorithm is implemented in the Orlando Tools framework used to develop distributed applied software
packages [17].</p>
        <sec id="sec-2-2-1">
          <title>Algorithm 1 for resource monitoring Algorithm 2 for job monitoring</title>
          <p>1 function ResourceMonitoring()
2 // Getting a list of dedicated resources
3 r ← GetDedicatedResources()
4 // Initializing resource states
5 v ← (0, 0, …, 0)
6 // Determining the current state of a resource
7 for i = 1..k do
8 if ResourseAvailable(ri) = 1 then
9 vi = 1
10 else
11 vi = 0
12 end if
13 end do
14 return v
15 end function
1
2
3
4
5
6
7
8
9
10
function JobMonitoring()
// Initializing job states</p>
          <p>w ← (0, 0, …, 0)
// Determining the job states (ready or not
ready)
for i = 1..l do
if JobReady(i) = 1 then</p>
          <p>wi = 1
end if
end do
return w
end function</p>
        </sec>
        <sec id="sec-2-2-2">
          <title>Algorithm 3 for data transferring preparation Algorithm 4 for resource allocation</title>
          <p>1 function DataTransferPreparation(A, X) 1 function ResourceAllocation(wf, v, w, p, X)
2 // Initializing data transferring 2 // Initializing resources allocation
3 Y ← (0, 0, …, 0; 0, 0, …, 0; …; 0, 0, …, 0) 3 A ← (0, 0, …, 0; 0, 0, …, 0; …; 0, 0, …, 0)
4 for i = 1..l do 4 // If the priority of the ready job is less than
5 for j = 1..k do 5 // the priorities of the unready jobs,
6 // Analyzing the jth resource allocation for the 6 // then changing the priorities
ith module 7 for i = 1..l do
7 if aij = 1 then 8 if CheckPriority(wf, i, p) then
8 // Getting inputs of the ith module 9 p ← PrioritiesChanging(w, p)
9 b ← GetInputs(i) 10 end if
10 for q = 1..m do 11 end do
11 // Checking the qth parameter placement on 12 // Allocation of the ready jobs
the jth resource 13 A ← ReadyJobAllocation(wf, v, w, p, A)
12 if bq = 1  xqj = 0 then 14 // Predicting the allocation for the unready jobs
13 // Preparing the qth parameter transfer on the 15 // taking into account the job priorities and
jth resource 16 // data placement
14 yqi = 1 17 A ← JobAllocationPrediction(wf, v, w, p, A, X)
15 end if 18 return (p, A)
16 end do 19 end function
17 end if
18 end do
19 end do
20 return Y
21 end function</p>
          <p>Algorithm 5 for job scheduling
1 function JobScheduling( wf, p, X)
2 // Getting current states of resources and jobs
3 v ← ResourceMonitoring()
4 w ← JobMonitoring()
5 // Checking available resources and ready jobs
6 if (v1 ˅ v2 ˅…˅ vk = 1)  (w1 ˅ w2 ˅…˅ wl = 1)</p>
          <p>then
7 // Resources allocation
8 (p, A) ← ResourceAllocation(wf, v, w, p, X)
9 end if
10 // Data preparation
11 Y ← DataTransferPreparation(A, X)
12 for i = 1..m do
13 for j = 1..k do
14 xij = xij ˅ yij
15 end do
16 end do
17 return (p, A, X, Y)
18 end function</p>
        </sec>
      </sec>
      <sec id="sec-2-3">
        <title>D. Experiment</title>
        <p>In the first example, let us suppose that we have to execute flow of workflows in a computing
environment consists of a set of Virtual Machines (VMs). Two VM with one core is allocated to running
one workflow. All workflows have the structure shown in Figure 1.</p>
        <p>This job flow implements parameters sweep computations [18]. Each job is executed with a unique
variant of the source data. Therefore, the size of the data, their transfer time, and the execution time may
differ for different jobs.</p>
        <p>To demonstrate the result in operating our algorithms, we will consider a simple example with model
time in applying to one workflow. Let there be the following execution times of jobs j1, j2, …, j10: t1 = 2,
t2 = 2, t3 = 3, t4 = 2, t5 = 1, t6 = 2, t7 = 2, t8 = 5, t9 = 2, and t10 = 1. The transferring time for any data (d1,
d2, …, d11) is assumed to be equal to 1. These times are proportional to the average computation results
when solving a real problem in the public access Irkutsk Supercomputer Center [19].</p>
        <p>Figure 2 and Figure 3 show the job scheduling by the Condor DAGMan algorithm and proposed
static-dynamic algorithm. Optimal workflow makespan is equalled to 16. The computation time with
Condor DAGMan is 19. The static-dynamic algorithm reduces this time to 17. This is a significant
advantage for such an example.</p>
        <p>7
8
Mtiomdeel 1</p>
        <p>We can see that both algorithms start out the same way. The m1 modules m1, m2, and m4 are
consistently launched on VM1 in accordance with inter-module relations. In parallel, data is being
prepared on VM2 for executing the module m3.</p>
        <p>After that, the operation of the algorithms begins to differ. The Condor DAGMan algorithm launches
the m5 module for execution on VM1. At the same time, our algorithm selects the m6 module.</p>
        <p>According to the heuristic used by our algorithm, module m6 has a higher priority than module m5.
Really, more number of modules depends on the m6 module in further computing within the workflow
compared to the m5 module.</p>
        <p>Owing to launching the module m5, the Condor DAGMan algorithm is forced to delay the execution
of the m6 module because of the need in preparing its input data on VM2.</p>
        <p>In the given example, the reduction of the workflow runtime within our algorithm is also due to the
preliminary transfer of the input data for the modules m8 and m9. Figures 4 and 5 show these transfers
at the corresponding times.</p>
        <p>The transfers of the parameters d4 and d8 are performed although the modules related to them are not
yet ready to run. The parameter d4 for the module m8 is transmitted while the module m6 is being
executed (Figure 4). This is due to the fact that the module m8 will be launched on the VM where the
module m6 is executing.</p>
        <p>Model
time</p>
        <p>The decision to transfer the parameter d8 (Figure 5) is also obvious since the decision to launch the
m10 module has already been made. In this case, it remains to run the m9 module. The module m8 will
calculate the parameter d9 required for the module m9.</p>
        <p>Let us consider the issue of selecting a module for its launch on a resource that is being freed. We
have implemented the JobAllocationPrediction() function, which selects the module to run on the
resource when it becomes free. This check is performed for all resources on which workflow modules
are running. Within the check, a set of modules including modules that are not ready to run can exist
(for example, by time 8 in Figure 5).</p>
        <p>The JobAllocationPrediction() function checks how the completion of the module executing and
resource freeing will affect the readiness of the remaining unexecuted modules. Moreover, it allows us
to assign modules to resources that are not yet ready for launch. This assignment of modules to resources
allows us to prepare the necessary input data to the time of their launch.</p>
        <p>Within the considered example, there is another background data transfer. The parameter d3 is
transferred to the module m5. However, this transfer does not affect the workflow runtime.</p>
        <p>In the theory, the preparation of the input data for the modules could be performed through
overabundant sending the calculated parameters to all available computing resources. However, this
solution has significant drawbacks. In practice, data transfers can degrade compute performance and
slow down other transfers of the current data. Typically, the negative impact of data over-sending
increases with the number of resources used and the number of modules executed. Thus, the
overabundant data sending is highly undesirable for scalable workflows focused on a large number of
resources.</p>
        <p>For the job flow in the whole, the proposed algorithm reduces the total computation time by more
than 11%, taking into account the change in the module execution times in different jobs. At the same
time, the time spent on job scheduling by the proposed algorithm does not exceed the time of the Condor
DAGMan operating.</p>
        <p>The next example addresses a workflow with the parallel list of data (Figure 6). This is an important
feature of the workflow.</p>
        <p>Let there be the following execution times of jobs j1, j2, and j3 for executing the modules m1, m2, and
m3: t1 = 1, t2 = 2, and t3 = 1. The transferring time for any data (d1, d2, d3, d4) in Figure 6 is assumed to
be equal to 1.</p>
        <p>d2 and d3 are the parallel lists of data. Each element d2i of the parallel list d2 is processed by the ith
instance of the module m2 (Figure 70. The result of the execution of m2i is saved in the element d3i of
the parallel data list d3.</p>
        <p>Figures 8 and 9 show the simulation of the considered algorithms for this example. As in the first
example, the presence of tools for assigning preliminary data transfer allows us to reduce the overheads
associated with sending input data for modules. In this example, the proposed algorithm provides the
reduction by 13% in the workflow makespan.</p>
        <p>d1
m1
d2
m2
d3
m3
d4
d1
m1
d2
d2,1
d3,0
d3,3
m2,5</p>
      </sec>
    </sec>
    <sec id="sec-3">
      <title>4. Conclusions</title>
      <p>In the paper, we present a new algorithm that implements static-dynamic scheduling asynchronous
computations in distributed applied software packages. The algorithm operation is oriented to
heterogeneous distributed environments.</p>
      <p>Through the experimental results, we demonstrated that the advantages of the proposed algorithm
for different workflow types. In conditions of uncertainty over the modules execution time, the algorithm
makes it possible to rationally determine the order of modules’ launches in comparison with other ways
of scheduling.</p>
      <p>The advantages of the proposed algorithm for managing asynchronous computations in distributed
environments have been demonstrated in specific examples. In the future, we hope to find theoretical
estimates supporting such advantages for a larger number of cases.</p>
      <p>As additional future work, we plan to use additional heuristics based on analyzing the modules
execution time with test data in environment nodes within the framework of continuous integration,
delivery, and deployment of package software.</p>
    </sec>
    <sec id="sec-4">
      <title>Acknowledgments</title>
      <p>The study was supported by the Ministry of Science and Higher Education of the Russian Federation,
project no. 121032400051-9 «Technologies for the development and analysis of subject-oriented
intelligent group control systems in non-deterministic distributed environments».</p>
    </sec>
  </body>
  <back>
    <ref-list />
  </back>
</article>