=Paper=
{{Paper
|id=Vol-3002/paper22
|storemode=property
|title=Convergence Verification of Declarative Distributed Systems
|pdfUrl=https://ceur-ws.org/Vol-3002/paper22.pdf
|volume=Vol-3002
|authors=Diego Calvanese,Francesco Di Cosmo,Jorge Lobo,Marco Montali
|dblpUrl=https://dblp.org/rec/conf/cilc/CalvaneseC0M21
}}
==Convergence Verification of Declarative Distributed Systems==
Convergence Verification of Declarative
Distributed Systems?
Diego Calvanese1 , Francesco Di Cosmo1 , Jorge Lobo2 , and Marco Montali1
1
Free University of Bozen-Bolzano
2
University of Pompeu Fabra
Abstract. Logic-based languages, such as Datalog and Answer Set Pro-
gramming, have been recently put forward as a data-centric model to
effectively specify and implement network services and protocols, see-
ing them as dynamic systems of distributed computational nodes where
each node evolves an internal database and exchanges data with the other
nodes of the network. This approach provides the basis for declarative
distributed computing. However, a rigorous, comprehensive characteri-
zation of the decidability and complexity of verification in declarative
distributed systems is yet to come. This paper charts the decidability
border of the verification of convergence properties, considering the case
where the network is a fixed connected graph, nodes can incorporate
fresh data from the external world into the system, and can communi-
cate asynchronously by means of reliable but unordered channels.
1 Introduction
In the past years we have seen how declarative database query languages, such as
Datalog, can naturally be used to specify and implement network services and
protocols [19]. The approach, referred to as declarative networking [5], makes
the specifications of complex network protocols concise, intuitive, and directly
executable through distributed query processing algorithms [25]. The compi-
lation of the rules constituting the specification into actual implementations
performs well when compared with imperative C/C++ implementations of the
same protocols [20]. Applications for declarative networking go far beyond net-
work protocols, and languages and techniques developed in this setting provide
the basis for declarative distributed computing. This paradigm has been used for
security and provenance in distributed query processing [29,30], in the analysis
of asynchronous event systems [2], and as the core of the Webdam language
for distributed Web applications [3]. We refer to these systems as declarative
distributed systems (ddss).
There are several variants of concrete languages for specifying ddss [20,4,3,22],
but their common denominator is data-centricity: computations in a single node
are limited to evaluations of queries on a relational database (DB), and messages
?
Copyright © 2021 for this paper by its authors. Use permitted under Creative
Commons License Attribution 4.0 International (CC BY 4.0).
passed between nodes are snippets of DBs, providing a close correspondence be-
tween the programs and a formal specification in logic of their computation. This
facilitates the development of program analysis tools [26,22]. However, in spite of
several studies on the foundations of ddss [17,8], a rigorous, comprehensive char-
acterization of the decidability and complexity of verification in such systems, is
yet to come. On the one hand, verification techniques and tools for ddss have
been exploited in various settings, but providing only an empirical/experimental
assessment [26,22,12,13]. On the other hand, formal models for ddss have been
mainly developed to study their ability to compute queries in a distributed man-
ner, and not to assess their temporal/dynamic evolution [8,7,6]. Instead, in this
paper we provide a step towards a formal, systematic characterization of verifi-
cation of ddss by focusing on convergence properties. We show that, in general,
verification is undecidable even for specific convergence properties and extremely
limited, single-node ddss. To tame this strong negative result, we leverage on
the notion of state-boundedness [11,9], and detail the decidability frontier of
verification when a bound is imposed on the system data-sources.
In the next Section 2 we introduce the dds model, provide its execution
semantics, and define the verification problem over convergence properties. In
Section 3 we chart the decidability boundary of verification of convergence. Fi-
nally, Section 4 provides the conclusions.
2 Declarative Distributed Systems
The general computational model of ddss can be described as a network of
uniquely identified nodes, each running an input/output state machine, where
outputs of some machines become inputs of others [21]. We assume familiarity
with Datalog and the stable model semantics [15] and adopt the standard con-
ventions: variables are denoted with uppercase letters, constants by lowercase
letters, and ‘ ’ is used as a placeholder for an anonymous variable (i.e., one that
does not appear elsewhere in the rule).
2.1 Computational Model in a Node
A state in the state machine of a node is represented by a (relational) DB. As
customary, a DB defines a set of relations conforming to a given schema. A DB
schema is a finite set of relation schemas R/n, with a relation name R and an
arity n, representing the number of components (or attributes) it contains. By
a slight abuse of notation, sometimes we drop the arity and interchangeably use
the same symbol to indicate the relation schema and its name. We fix a countable
data domain ∆ denoting an infinite set of constants. An instance of a relation
schema R/n is a finite set of n-tuples over ∆, and a DB is the union of relation
instances over its schema. Given a DB instance I, we denote by adom(I) the
active domain of I, that is the (finite) set of constants explicitly appearing in I.
State transitions occur when the node receives inputs, in the form of DBs,
from external components (such as applications running in the node, or humans),
and/or from state machines running in other nodes. A state change can also
produce an output in the form of a DB that can be delivered to another node
fact by fact. Thus, every node has an input schema I, a state schema S, and
a transport schema T . The latter is used to communicate between nodes. Let
I, S, and T denote all possible instances of I, S, and T , respectively. The state
transition mapping in a node is a subset of S × T × I × S × T: given a pair (S , T )
of state and transport DBs, and an input DB I , the transition results in a new
pair (Sn , Tn ) of state and transport DBs.
Specifically, that mapping is specified by means of d2c, a declarative pro-
gramming language introduced in [22], which is an extension of plain Datalog,
enhanced with process communication and state changes capabilities. A state
transition mapping in d2c is defined by rules of the form:
H if L1 , . . . , Ln , prev Ln+1 , . . . , Lm , C
Like in Datalog, H is the head atom, but in d2c it is possibly annotated with
a term of the form @t, where t is a variable or constant from a fixed domain in
the language used to identify nodes (concretely, such domain would correspond
to objects such as IP addresses or URLs). The Li s are literals (i.e., atoms or
negated atoms), again possibly annotated with a term of the form @t. Finally,
C is a set of (in)equality constraints over variables and constants.
The predicate names follow the standard correspondence of predicates and
DB tables made by Datalog. All variables appearing in H, in C, or in any
negated atom inside a rule must also appear in a non-negated atom among the
Li s of the same rule3 . The name of annotated atoms must correspond to relation
names in the transport schema T of the node, and only names that correspond
to transport or state tables can appear in H. Informally, a ground instance
h if l1 , . . . , ln , prev ln+1 , . . . , lm , c of a rule says that h is in the current state
or is an output of the current state if l1 , . . . , ln are true in the current state,
ln+1 , . . . , lm were true in the previous state, and c is valid. As in Datalog with
negation, we make the usual assumption on the stratification of negated atoms
in l1 , . . . , ln w.r.t. h, so that the transition mapping can be computed using the
standard Datalog fixpoint evaluation of Datalog. Notice that the literals in the
prev scope do not concur to determine stratification, since they are over a
fixed database already computed in the previous computation step. Also anno-
tated predicates are not involved in stratification, since they address incoming
messages received in the form of an extensional DB fact. The meaning of an
annotation depends on whether it appears in the head or the body of a rule: in
the former case, it indicates the destination of the transport tuple, in the latter
it points to the source.
3
We can relax this requirement for negated atoms containing anonymous variables.
~ . . . , ) appearing in a rule where
Consider in particular a negated atom not P (X,
~
variables X also appear in positive atoms. Such an atom can be replaced by
~ where N is a fresh predicate, provided we introduce the additional rule
not N (X),
~ if P (X,
N (X) ~ , . . . , ).
Example 1. Consider the input predicate echo/2, state predicate alive/1, and
transport predicate say/1. Rule say(M)@D if echo(M, D) models that the node
sends m to the node with ID d (if it is a node neighbor) when the corresponding
echo(m, d) fact is given as input. M and D are variables, which in this case are
bound to constants m and d respectively. Rule alive(S) if say(M)@S models
that the node adds to its state the information that node s is alive, whenever the
transport tuple say(m) is received from node s. Rule say(M)@Src if say(M)@Src
“echoes” the say tuple back to the source node. t
u
To support also non-deterministic transitions, D2C features the special pred-
icate choice by Saccà and Zaniolo [27], where choice(X, Y) is a positive atom
among the literals L1 , . . . , Ln . Variables X and Y must appear also in another
positive atom among the Li s, and once we fix X, a single value for Y is chosen
by picking it inside the set of all values that can substitute it, so as to enforce
a functional dependency X −→ Y. Also the variant choice(Y) is admitted, to
enforce a functional dependency with trivial domain. It is known that the data
complexity of finding a stable model of a Datalog program with choice remains
polynomial [16].
2.2 The Network Model
A dds relies on an underlying network N of communicating nodes, each running
a d2c program. In distributed computing, N is typically represented as a graph
hV, Ai, where V are the computation nodes, and the arcs in A reflect the ability
to communicate according to the physical network configuration. Directed arcs
denote non-symmetric communication.
There is a complex spectrum of different network models, depending on the
topology of the graph, the degree of mobility of nodes, and on which extent the
network may vary over time. Here we consider networks with (i) fixed topol-
ogy; (ii) bidirectional communication channels; (iii) strongly connected nodes;
(iv) the ability for every node to communicate with itself. This class of networks
can be represented by fixed undirected, connected graphs, where each node has a
self-loop. From now on, we always assume that the network is in this class. To
make nodes aware of their own name and that of their neighbors, each node has
the following rules:
my name(M) if prev my name(M).
neighbor(N) if prev neighbor(N).
This information is read-only, so no other rule can use my name and neighbor
in its head.
2.3 DDS Formal Model
We now formalize ddss adopting homogeneous nodes, i.e., all nodes run the same
program. This also means that the local DB schemas are the same for all nodes.
Still, the behavior of different nodes might diverge over time, depending on:
(i) their location in the network, (ii) the presence of non-deterministic choices
in the program, and (iii) the data obtained from the interaction with other nodes
and with the external world. A dds M is a tuple hN , I, T , S, P, D0 i, where:
• N is the network graph (obeying to the assumptions of Section 2.2);
• I, T , and S respectively denote the input, transport, and state schemas of
every node in M;
• P is the d2c program run by every node in M;
• D0 is a local state DB over S representing the initial state of each node, and so
that it assigns no tuples to my name and neighbor (these are in fact implicitly
set differently for each node, depending on the network topology); its active
domain is denoted by ∆0 , while its extension with node names is denoted by
∆0,M
Commonly, the execution semantics of dynamic systems over relational data
is given in terms of a relational transition system (RTS), i.e., a transition sys-
tem where each state is labeled by a DB, representing the configuration of the
current memory of the system in that state [28]. However, in our distributed
setting, such a global memory should account for two peculiar local aspects.
On the one hand, it needs to store the current local configuration of nodes in
the network, and so it contains one DB per node. On the other hand, it must
track the state of the network in terms of messages being exchanged (and not
yet processed by their recipient nodes). The representation of this latter aspect
depends on the chosen communication model and, in turn, how communication
channels operate. Hence, we represent communication channels using a generic
data structure CT defined over the transport schema T . Given a pair of con-
nected nodes i and j in the network, each local channel configuration stores
the state of the communication channel linking i and j, by instantiating CT
to store the pending messages present in the network that have been sent by i
and have not yet been received/processed by j. We denote by C the set of all
possible instantiations of CT . Once the communication model is fixed, this ab-
stract data structure is grounded into a specific one, which must suitably reflect
the functioning of the communication mode in terms of ordering and reliabil-
ity. For example, queueT indicates an order-preserving, reliable communication
channel over pairs of nodes, while multisetT represents an unordered, reliable
communication channel where messages sent in a given order may be processed
in reverse.
To account for those local aspects, we reorganize the global memory into
many local ones attached to nodes and channels. Technically, given a dds hN , I, T , S, P, D0 i
with N = hV, Ai, and given a data structure CT over T for communication, the
execution semantics of the dds under CT is given via a so-called distributed RTS
(DRTS) Υ of the form hN , ∆, S, T , Σ, σ0 , ndb, nch, ⇒i, where:
• Σ is a (possibly infinite) set of states;
• σ0 ∈ Σ is the initial state;
• ndb is a function that, given a state σ ∈ Σ and a node n ∈ V , returns a
corresponding DB instance of S over ∆ for n;
• nch is a function that, given a state σ ∈ Σ and two nodes n1 , n2 ∈ V such that
hn1 , n2 i ∈ A, returns an instance of CT storing the pending messages from n1
to n2 ;
• ⇒ ⊆ Σ × Σ is a transition relation between states.
Υ is serial if, for every state σ ∈ Σ, there exists σ 0 ∈ Σ such that σ ⇒ σ 0 .
Notice that the input schema and databases are not mentioned in the general
DRT S definition, but will be used later to define concrete transition relations.
Moreover, in case the instantiations of CT can be represented by means of a DB,
it is possible to boil down a DRTS to a standard RTS by labeling states with the
disjoint union of all the state and channel DBs mentioned in the DRTS states.
To build a DRTS capturing the execution semantics of a dds M, one has
to choose which communication model is used by M to handle the exchange of
messages in the network, and how does M interact with the external users that
exchange data with the computation nodes. Specifically, we consider the rele-
vant setting where communication is reliable and asynchronous, i.e., messages
are never lost and message exchanges occur independently from each other. Con-
sistently with the fact that each transport atom in a D2C program is attached
to a sender/receiver, each message consists of a single transport fact.
As for user interaction, nodes may receive a new input DB when they start
processing an incoming message. We call ddss obeying to that input-policy inter-
active ddss (iddss). Coupling the processing of input DBs with that of incoming
messages is without loss of generality, since a node may send dummy messages
to itself just to signal that it is ready to process a user input.
2.4 Execution Semantics
Due to the aforementioned asynchronicity and reliability, we can assume that,
at each computation step, only one node reacts to the delivery of an incoming
message. Within the computed result, there may be transport facts labeled with
corresponding destination nodes; these are all simultaneously emitted, and will
be asynchronously received by their respective recipient nodes fact by fact. Since
we assume no guarantees on the order in which messages are received, commu-
nication can be abstractly captured by equipping each node with one message
multiset per neighbor. Hence, from now on, we always assume that the data
structure for communication channel is multiset, and make use of the usual
operators to manipulate and inspect multisets. Relying on multisets instead of
sets is important to capture the fact that two distinct messages exchanged be-
tween the same two nodes and carrying exactly the same payload may be both
on their way to the recipient node.
The dds evolution is then captured by iterating through these steps:
• a non-empty multiset is non-deterministically picked, non-deterministically
extracting a message M .
• the destination node performs a computation step triggered by M , possibly
considering also external input data;
• the node state is updated and the produced messages are inserted in the
corresponding destination multiset.
Formally, given a program P , an input DB I, a previous state DB S, and
a labeled transport tuple t@n, we denote by state(P, I, S, t@n) the new state
database computed by the program P over S∪I∪{t@n}, by transp(P, I, S, t@n, d)
the set of computed output tuples (over the trasnport signature) labeled by @d,
and by transp ↓ (P, I, S, t@n, d) the set of tuples in transp(P, I, S, t@n, d) where
the label @d has been dropped. Let M = hN , I, T , S, P, D0 i be a dds system
whose network is N = hV, Ai. To formalize the execution semantics, we intro-
duce a relation c-stepM that substantiates the generic state transition mapping
introduced in Section 2.1:
Y Y Y Y
c-stepM ⊆ S× C×A×I× S× C
n∈V a∈A n∈V a∈A
Specifically, given S, S 0 ∈ n∈V S, and C, C 0 ∈ n∈V C, a channel (s, d) ∈ A,
Q Q
and an input database I ∈ I, we have that hS, C, (s, d), I, S 0 , C 0 i ∈ c-stepM if
and only if there exists a message tuple t ∈ C(s,d) such that:
(
0 state(P, I, S, t@s) if n = d
Sn =
Sn otherwise
C(s,d) \ {t}
if n = s and m = d
0
C(n,m) = C(d,m) ∪ transp ↓ (P, I, Sn , t@d, m) if n = d
C(n,m) otherwise
int
Finally, we define the transition system of M, written ΥM , as the DRTS
hN , ∆, S, T , Σ, σ0 , ndb, nch, ⇒i, where:
Q Q
– Σ ⊂ n∈V S× a∈A C and, for each σ ∈ Σ of the form σ = ((Sn )n∈V , ((Cc )c∈A )),
ndb(σ, n) = Sn and nch(σ, s, d) = C(s,d) ;
– σ0 = ((D0 )n∈V , (Cc )c∈A ), where C(s,d) = ∅ if s 6= d and C(s, d) = {start}
otherwise, where start is a special 0-ary transport tuple used to guarantee
at least one computation step to each node;
– The extensions of Σ and ⇒ are defined by simultaneous induction as follows:
1. σ0 ∈ Σ;
2. if (S, C) ∈ Σ, then, for each hS, C, (s, d), I, S 0 , C 0 i ∈ c-stepM , it is true
that (S 0 , C 0 ) ∈ Σ and (S, C) ⇒ (S 0 , C 0 ).
3. if (S, C) ∈ Σ and, for each c ∈ A, Cc = ∅, then (S, C) ⇒ (S, C).
Notice that ⇒ is guaranteed to be serial.
2.5 Convergence Properties
Convergence is a generalization of termination for ddss, indicating that the dis-
tributed computation run by the whole dds eventually reaches a stable situation
where all nodes are quiescent, i.e., do not change anymore their state DBs. This
typically occurs when no messages are exchanged. However, we want to rule out
those cases in which quiescence is reached because one or more nodes stop their
computation due to an error (e.g., because the node has received an unexpected
message/input, or has entered an undesired state). We assume that a node de-
clares that it is faulty by inserting the special flag error in its state. Under this
assumption, d2c programs employ error to indicate under which circumstances
a node becomes faulty.
Convergence properties are then defined by mixing two dimensions: (i) num-
ber of faulty nodes in a run, with the two extreme cases of total (no faulty
node) vs partial (at least one non-faulty node) correctness; (ii) quantification
over runs, considering the case in which the dds sometimes (i.e., for at least one
run) vs always (i.e., for all runs) converges. Given a dds M, this gives rise to
four variants of convergence:
– M sometimes converges with total correctness if there exists a run of M
eventually reaching a state where all nodes are quiescent, and none is faulty.
– M sometimes converges with partial correctness if there exists a run of M
eventually reaching a state where all nodes are quiescent, and at least one is
not faulty.
– M always converges with total correctness if every run in which all nodes of
M stay non-faulty eventually converges.
– M always converges with partial correctness if every run in which at least
one node of M stays non-faulty eventually converges.
Moreover, convergence properties can be formulated in sophisticated logics
that allow to specify properties over the data flowing in the DDS and the DDS
temporal behavior. A suitable language over RTSs is FO-CTL, which mixes first-
order (FO) logic with the computation three logic (CTL). It is possible to adapt
that logic to DRTSs, called DDS − CT L. That can be achieved by exploiting
some D2C rule to transfer the previous state configuration into a copy in the
current one, using the verification formula to check that the two available state
configurations are identical and, finally, using temporal operators to propagate
that condition in time. Thus, the next undecidability results apply also to that
DDS-CTL, but also the decidability proofs can be extended to address the full
language.
3 Convergence Verification of IDDS
While input, state, and transport schemas are fixed in advance, the extension
of such relations is not, and could grow unboundedly over time. If no bound
is imposed on the data manipulated by the dds, verification of convergence
properties is undecidable even for a single-node dds [9].
Taking inspiration from the well-studied notion of state-boundedness [11,9,10],
we hence study how decidability is affected when a (pre-defined and known)
bound is imposed on the different information sources of the dds. Specifically,
given an integer b, we say that a dds M is input b-bounded if the input DB is
constrained to mention at most b values during each single step, i.e., the car-
dinality of the input database active domain is always at most b. In this case,
unboundedly many values can still be input over time, provided that they do
not accumulate in a single computation step. When the fixed value b is under-
stood or arbitrary, we simply talk about boundedness. We define state b-bounded
(state-bounded ) and transport b-bounded (transport-bounded ) ddss analogously.
However, this time the constraints are a consequence of the combination of the
dds program and of the constraints on the input.
A bound independent from the network size fits with the idea that the declar-
ative programs run by nodes are not tailored to a specific network. Moreover,
state-boundedness does not interfere with the information each node has about
its neighbors, since our networks are fixed, thus the relation neighbor is trivially
bounded by the fixed number of nodes (cf. Section 2.2). In fact, the next decid-
ability results deal with network complexity, i.e., the complexity of the global
initial configuration projected over the neighbor and my name predicates. Thus,
the data complexity of the initial configuration incorporates the network com-
plexity.
Additionally, a channel b-bounded (channel-bounded ) condition imposes a
similar constraint on the channel multiset cardinality, i.e., in any reachable con-
figuration there are at most b facts laying on any channel. In channel-bounded
ddss the instantiations of the multiset channel data-structure can be represented
by means of a finite DB, since the multiplicity of messages is bounded and a finite
domain of constants suffice to represent them. Thus, in this case, the respective
DRTSs can be translated into standard RTSs. We also say that the whole dds is
bounded if all of its information sources are so, i.e., it is input-, state-, transport-,
and channel-bounded at the same time.
In our analysis, a key observation is that the notion of uniformity [11] can
be straightforwardly recast for ddss. Intuitively, uniformity (corresponding to
genericity in databases) states that the dynamics of a dds M are invariant under
permutation of values in the node DBs, modulo the finite subset ∆0,M of ∆: the
system exhibits the same behavior (modulo renaming of values) when nodes
compute over isomorphic DBs. Technically, we recast the notion of uniformity
in [11] to the case of ddss as follows. Given two DBs D1 , D2 over schema R
with |adom(D1 )| = |adom(D2 )|, we say that D1 and D2 are isomorphic if there
exists a bijection h : ∆1 −→ ∆2 , with adom(D1 ) ⊆ ∆1 and adom(D2 ) ⊆ ∆2 ,
such that for every relation R/n ∈ R and every fact R(d1 , . . . , dn ) ∈ D1 , we
have R(h(d1 ), . . . , h(dn )) ∈ D2 . With some abuse of notation, in this case we
write D2 = h(D1 ). With this notion at hand, given a channel-bounded dds M
with DRTS ΥM = hN , ∆, S, T , Σ, σ0 , ndb, nch, ⇒i, we say that ΥM is uniform if
0
Q
for
Q every σ, σnext , σ ∈ Σ and every pair D = ((S n ) n∈V , ((C c ) c∈A )) ∈ n∈V S ×
a∈A C, where Cc is a DB instance of the multiset data-structure over ∆: if
1. σ ⇒ σnext ;
2. the number of constants mentioned in σ and σnext together is the same as
that of constants mentioned in σ 0 and D;
3. there exists a bijection ∆ −→ ∆ that fixes ∆0,M and maps each node DB
and channel DB of σ and σnext into those of σ 0 and D, thus enforcing an
isomorphism over all components of the various states and D;
then D ∈ Σ and σ 0 ⇒ D. Making use of uniformity, we get:
Theorem 1. Verification of convergence properties over bounded i ddss is de-
cidable in pspace in the network size.
Proof (sketch). First of all, it can be easily proven that ddss are uniform. Uni-
formity comes from the fact that: (i) d2c is based on Datalog, which, as virtually
all DB query languages, enjoys genericity; (ii) external inputs are provided “uni-
formly”, i.e., whenever an input DB is delivered to a node, there is an alternative
execution in which the node receives an isomorphic variant of the same input.
For a uniform dynamic system, with a pre-defined bound on the size of its DBs,
thus including the RTS version of the DRTSs of bounded-dds, verification of
FO-CTL properties, including translations of convergence, is decidable with a
pspace (tight) bound in data complexity [11], which in our case comprises also
the size of the network. The intuition behind the decidability proof in [11], is
i
that given an RTS ΥM and a FO-CTL property Φ, a finite domain ∆f ⊂ ∆
i i i
(with ∆0,M ⊆ ∆f ) can be found such that ΥM |= Φ iff ΘM |= Φ, where: (i) ΥM
i
is the RTS version of the DRTS of M; (ii) ΘM is the RTS obtained by applying
i
the same construction for ΥM but using the finite domain ∆f instead of ∆. Note
i
that ΘM is finite-state, hence the standard on-the-fly model checking algorithm
for CTL can be applied to check whether Φ holds.
We next show that bounding the channel multisets is necessary towards de-
cidability of verification over iddss.
Theorem 2. Verification of convergence over input- and state-bounded i ddss
is undecidable, even when the dds employs: (i) a single-node network; (ii) a
single unary, 1-bounded input relation; (iii) 0-ary state relations; (iv) two 2-ary
transport relations.
Proof (sketch). The proof is via a reduction from the undecidable halting prob-
lem of deterministic 2-counter machines [23] to convergence of iddss as in the
theorem statement. The 2-counter machine M states are encoded in the dds
state by means of 0-ary state predicates (state-flags). The counters are encoded
in the node self-loop channel as the lengths of two cyclic graphs whose edges are
specified by the transport relations counter1/2 and counter2/2, respectively.
The initial dds state DB contains the state-flag encoding the initial state of M.
At the first computation step the node, say named me, sends to himself two
messages: counter1(me, me) and counter2(me, me). Then, the program triggers
the increment and conditional decrement instructions according to the current
dds state-flag.
To increment a counter, say counter 1, the node extracts a single constant
from the current input DB, checks whether it is fresh with respect to the counter,
and, in that case, puts the fresh constant in the appropriate cyclic graph. To
perform the freshness test, the node expects to receive (and then sends it back on
the channel), fact by fact, the counter1 cyclic-graph in the right order, i.e. from
counter1(me, ) to counter1( , me). In case the input constant was not available
in the input DB, i.e., the DB was empty, or it already appeared in the incoming
message, i.e., it was not fresh, or the incoming message is a counter2 or an
unexpected counter1 message, the node enters in an error state.
To perform a conditional decrement on a counter, say again counter 1, the
node expects to receive a counter1 message. If it is a counter1(me, me) message,
then the node sends back the message and detects that the counter is zero. If the
message is a different counter1 message, the node stores it in the state, expects
to receive the next counter1 edge and then sends back only one edge where
the middle constant has been dropped. Again, in case unexpected messages are
delivered, the node enters in an error state.
After performing each increment or decrement instruction, the node transi-
tions to the next state. In case the error state is reached, the node starts sending
at each step a foo/0 message, whose reception triggers a random state transition
(excluding the final state of M and the dds current state-flag). Thus, M termi-
nates if and only if the dds sometimes (always) converges with total (partial)
correctness.
Considering Theorem 2, in the following we assume that ddss are channel-
bounded. Notice that channel-boundedness implies transport-boundedness, since
at each computation step the whole transport DB is always sent in the reliable
channel. This means that it makes no sense to study the verification of ddss that
are channel-bounded but not transport-bounded. Thus, we consider now what
happens when the dds is input- and transport-bounded, but have unconstrained
state.
Theorem 3. Verification of convergence over input- and channel-bounded i ddss
is undecidable, even when i dds employs: (i) a single computation node; (ii) a
single unary, 1-bounded input relation; (iii) Two 1-ary state relations. (iv) a
single 0-ary transport relation;
Proof (sketch). The proof is similar to that of theorem 2, but the counters of M
are now encoded in the state of M by means of two state predicates counter1/1
and counter2/1. The node continually sends to himself a wakeup/0 message,
unless it reaches the final state of M. Thus, the channel gets empty, causing the
dds to trivially converge, if and only if the dds reaches the final state of M.
In this case, freshness and zero checks can be done in one step by means of a
couple of D2C rules that inspect the state DB. In case the freshness check fails
or the input constant is not available, the node enters in a state error as above,
triggering a run that never converges. Hence, M terminates if and only if the
dds sometimes (always) converges with total (partial) correctness.
We investigate now what happens when the state DBs and channels are
bounded, but the input is not. This is a subtle case: unboundedly many input
data can be delivered to a node, but not inserted into its state/transport. In
fact:
Theorem 4. Verification of convergence properties over state- and channel-
bounded i ddss is in pspace in the network size.
Proof. Let M be an iddss, with DRTS hN , ∆, S, T , Σ, σ0 , ndb, nch, ⇒i, whose
state and transport are, together, bounded by b, and let ϕ be a convergence
property. While Υ could be infinite, we can specify and build a finite DRTS
Υ 0 equivalent to Υ under ϕ. Thus, to check whether Υ |= ϕ amounts to check
whether Υ 0 |= ϕ.
Let vars(ϕ) and Con be the set of variables and constants, respectively,
occurring in ϕ. Fix a set C ⊂ ∆ of 2b + |vars(ϕ)| + |Con| constants, disjoint
from those in ∆0M , i.e., the initial state active domain, and call ∆0 = ∆0,M ∪ C.
Then, Υ 0 is the DRTS hN , ∆, S, T , Σ 0 , σ0 , ndb 0 , nch 0 , ⇒0 i such that: (i) Σ 0 is the
set of all the states σ ∈ Σ such that the set of constants mentioned in σ are at
most b and all contained in ∆0 ; (ii) ndb 0 , nch 0 , and ⇒0 are the restrictions to
Σ 0 of ndb, nch, and ⇒ respectively. Since both the schemas S and T , and the
domain ∆0 are finite, also Σ 0 is finite, so that Υ 0 is a finite DRTS. Moreover, since
|∆0 | ≥ 2b + |Con| + |vars(ϕ)| and Υ is uniform (see proof 1), it is possible to
adapt Th. 3.18 in [11] to obtain that the RTS versions of Υ and Υ 0 are equivalent
under ϕ. In fact, recall that those RTS versions exist because, since the channels
are bounded, the instances of the channel multisets can be encoded in a DB.
We now show a procedure to effectively build Υ 0 from M: we will compute a
number of grounded versions of the program of M, which can be recursively
applied starting from σ0 up to a fix-point to build Υ 0 .
Given a rule in a D2C program, we call input-, state-, and transport-variables
those variables occurring only in input-, state-, and transport-predicates respec-
tively. While the semantics of D2C requires a preliminary full grounding of the
rules under the state and the input DBs active domains, we start by grounding
only the state- and transport-variables with constants in the (finite) full domain
∆0 of Υ 0 , resulting in the program P 0 . Its variables occur only in input-predicates,
hence the heads are fully grounded in ∆0 . Nevertheless, the resulting program is
suitable to compute all the transitions in Υ 0 since, by construction, each state
in Υ 0 has a DB over ∆0 .
While we should ground also the input-variables with constants in the un-
constrained input domains, we can avoid it by noting that they act like variables
in a Boolean query over the input, independent of the grounded part of the pro-
gram. Specifically, for each semi-grounded rule ρ in P 0 with at least one variable,
consider the existential closure qρ of the conjunction of all input literals in ρ.
These qi form a finite family of existential sentences. The effect of an input DB
I is just to discard those rules ρ such that I 6|= qρ . To capture the effects of all
possible input DBs at once, we initialize a table with one column for each qρ
and populate it with all the distinct rows rj containing a possible sequence of
the symbols > and ⊥. For each row rj in the table, consider the conjunction
of: (i) all qρ such that the corresponding cell contains >, and (ii) of all ¬qρ
such that the corresponding cell contains ⊥. Written in negative normal form,
this is a conjunction of existential and universal sentences, which can be trans-
formed into a prenex formula Ξj of the form ∃≤n ∀≤n , where n is the number
of all variables in P 0 times the number of queries qρ . This is a fragment of the
Bernays-Schönfinkel class enjoying a finite satisfiablity problem in nexptime in
the length of the sentence, but in O(1) in the network size. If the sentence Ξj
is not satisfiable in the finite, then there is no input DB enabling the effects
described by rj , thus it has to be deleted from the table. Otherwise, it has to
be retained. After this pruning, for each row rj consider the fully-grounded sub-
program Pj0 of P 0 containing only those rules ρ, pruned of the input predicates,
such that the corresponding cell in rj contains >. If the body results empty, fill
it with true.
Given a state σ 0 in Υ 0 , the finite family of programs Pj0 , with no input predi-
cates, capture the effects of all the infinitely many input DBs. Thus, to compute a
successor of a given configuration in Σ 0 , it is sufficient to compute a stable model
of a program Pj0 over the configuration DB, which can be done in ptime in data
complexity. Thus, by applying on-the-fly verification techniques for finite RTSs
against FO-CTL, which can express convergence translated over those RTSs, we
can check Υ 0 in npspace in data complexity, which amounts to pspace.
4 Conclusions
In the wide spectrum of declarative distributed computing, we have formalized
and studied verification of convergence in the important case of reliable com-
munication with unordered asynchronous communication and interactive input
policy. While the problem is undecidable in general, decidability can be regained
by imposing boundedness conditions on the channels and state DBs.
We foresee two main lines of future research. First, we plan to build on our
foundational results to implement verification techniques for dds cases with de-
cidable verification . To this aim, we will rely on existing ASP techniques for d2c,
and in particular on the implementation in [18], which can simulate runs of ddss
according to our formalization. Remarkably, those cases are all channel-bounded,
and thus pose no problem for the implementation of the multiset channels. How-
ever, great care should be put in handling the message passing mechanism, since
the reception of a random incoming message requires to analyze many differ-
ent branches, resulting in a bottleneck. Moreover, the technique in the proof
for theorem 4 could be exploited to abstract away the interaction policy, thus
avoiding the necessity of providing random input DBs at each step. Second, we
want to study how our verification results carry over the setting in which the
network topology can change, both with respect to node connection, and for
what concerns the creation and deactivation of computation nodes. In this light,
it is worth noting that, following the approach in [24], the results here presented
can be seamlessly generalized to the case where node connections are arbitrar-
ily changed over time, and nodes can be created and deactived, provided that
their overall number does not exceed a pre-defined bound. On the other hand,
the case where unboundedly many computation nodes can be created is left for
interesting future work. In fact, we intend to leverage parameterized verification
[1,14] to study data-centric distributed systems whose topology can change over
time in an unbounded, but controlled way, i.e., respecting certain patterns (see,
for example, [13], where constraints about topology structures are added to the
specification).
We are also interested in exploring the complexity of weaker properties be-
sides convergence, like safety and liveness.
References
1. Abdulla, P.A., Delzanno, G., Rezine, A.: Approximated parameterized verification
of infinite-state processes with global conditions. Formal Methods in System Design
34(2), 126–156 (2009)
2. Abiteboul, S., Abrams, Z., Haar, S., Milo, T.: Diagnosis of asynchronous discrete
event systems: Datalog to the rescue! In: Proc. of the 24th ACM SIGACT SIGMOD
SIGART Symp. on Principles of Database Systems (PODS). pp. 358–367. ACM
Press (2005)
3. Abiteboul, S., Bienvenu, M., Galland, A., Antoine, É.: A rule-based language for
web data management. In: Proc. of the 30th ACM SIGACT SIGMOD SIGART
Symp. on Principles of Database Systems (PODS). pp. 293–304. ACM Press (2011)
4. Alvaro, P., Ameloot, T.J., Hellerstein, J.M., Marczak, W., Van den Bussche, J.:
A declarative semantics for Dedalus. Tech. Rep. UCB/EECS-2011-120, EECS De-
partment, University of California, Berkeley (Nov 2011), http://www.eecs.berkeley.
edu/Pubs/TechRpts/2011/EECS-2011-120.html
5. Ameloot, T.J.: Declarative networking: Recent theoretical work on coordination,
correctness, and declarative semantics. SIGMOD Record 43(2), 5–16 (2014)
6. Ameloot, T.J., Geck, G., Ketsman, B., Neven, F., Schwentick, T.: Parallel-
correctness and transferability for conjunctive queries. In: Proc. of the 34th ACM
SIGACT SIGMOD SIGAI Symp. on Principles of Database Systems (PODS). pp.
47–58 (2015)
7. Ameloot, T.J., Ketsman, B., Neven, F., Zinn, D.: Weaker forms of monotonicity for
declarative networking: A more fine-grained answer to the CALM-conjecture. In:
Proc. of the 33rd ACM SIGACT SIGMOD SIGAI Symp. on Principles of Database
Systems (PODS). pp. 64–75 (2014)
8. Ameloot, T.J., Neven, F., Van den Bussche, J.: Relational transducers for declar-
ative networking. J. of the ACM 60(2), 15:1–15:38 (2013)
9. Bagheri Hariri, B., Calvanese, D., De Giacomo, G., Deutsch, A., Montali, M.:
Verification of relational data-centric dynamic systems with external services. In:
Proc. of the 32nd ACM SIGACT SIGMOD SIGAI Symp. on Principles of Database
Systems (PODS). pp. 163–174 (2013)
10. Bagheri Hariri, B., Calvanese, D., Deutsch, A., Montali, M.: State-boundedness in
data-aware dynamic systems. In: Proc. of the 14th Int. Conf. on the Principles of
Knowledge Representation and Reasoning (KR). AAAI Press (2014)
11. Belardinelli, F., Lomuscio, A., Patrizi, F.: Verification of agent-based ar-
tifact systems. J. of Artificial Intelligence Research 51, 333–376 (2014).
https://doi.org/10.1613/jair.4424
12. Chen, C., Jia, L., Xu, H., Luo, C., Zhou, W., Loo, B.T.: A program logic for
verifying secure routing protocols. In: 34th IFIP Int. Conf. on Formal Techniques
for Distributed Objects, Components and Systems (FORTE 2014). Lecture Notes
in Computer Science, vol. 8461, pp. 117–132. Springer (2014)
13. Chen, C., Loh, L.K., Jia, L., Zhou, W., Loo, B.T.: Automated verification of safety
properties of declarative networking programs. In: Proc. of the 17th Int. Sympo-
sium on Principles and Practice of Declarative Programming (PPDP). pp. 79–90
(2015)
14. Delzanno, G., Sangnier, A., Traverso, R.: Parameterized verification of broadcast
networks of register automata. In: Proc. of the 7th Int. Workshop on Reachabil-
ity Problems (RP). Lecture Notes in Computer Science, vol. 8169, pp. 109–121.
Springer (2013)
15. Gelfond, M., Lifschitz, V.: The stable model semantics for logic programming. In:
Proc. of the 5th Int. Conf. on Logic Programming (ICLP). pp. 1070–1080. The
MIT Press (1988)
16. Giannotti, F., Pedreschi, D.: Datalog with non-deterministic choice computes
NDB-PTIME. J. of Logic Programming 35(1), 79–101 (1998)
17. Hellerstein, J.M.: The declarative imperative: Experiences and conjectures in dis-
tributed logic. SIGMOD Record 39(1), 5–19 (2010)
18. Lobo, J., Wood, D., Verma, D., Calo, S.: Distributed state machines: A declarative
framework for the management of distributed systems. In: Proc. of the 8th Int.
Conf. on Network and Service Management (CNSM). pp. 224–228 (2012)
19. Loo, B.T., Condie, T., Garofalakis, M., Gay, D.E., Hellerstein, J.M., Maniatis, P.,
Ramakrishnan, R., Roscoe, T., Stoica, I.: Declarative networking. Communications
of the ACM 52(11), 87–95 (2009)
20. Loo, B.T., Condie, T., Hellerstein, J.M., Maniatis, P., Roscoe, T., Stoica, I.: Im-
plementing declarative overlays. Operating Systems Review 39(5), 75–90 (2005)
21. Lynch, N.A.: Distributed Algorithms. Morgan Kaufmann (1996)
22. Ma, J., Le, F., Wood, D., Russo, A., Lobo, J.: A declarative approach to distributed
computing: Specification, execution and analysis. Theory and Practice of Logic
Programming 13, 815–830 (2013)
23. Minsky, M.L.: Computation: Finite and Infinite Machines. Prentice-Hall (1967)
24. Montali, M., Calvanese, D., De Giacomo, G.: Verification of data-aware
commitment-based multiagent systems. In: Proc. of the 13th Int. Conf. on Au-
tonomous Agents and Multiagent Systems (AAMAS). pp. 157–164 (2014)
25. Nigam, V., Jia, L., Loo, B.T., Scedrov, A.: Maintaining distributed logic programs
incrementally. Computer Languages, Systems & Structures 38(2), 158–180 (2012)
26. Ren, Y., Zhou, W., Wang, A., Jia, L., Gurney, A.J., Loo, B.T., Rexford, J.: FSR:
Formal analysis and implementation toolkit for safe inter-domain routing. Com-
puter Communication Review 41(4), 440–441 (2011)
27. Saccà, D., Zaniolo, C.: Stable models and non-determinism in logic programs with
negation. In: Proc. of the 9th ACM SIGACT SIGMOD SIGART Symp. on Prin-
ciples of Database Systems (PODS). pp. 205–217 (1990)
28. Vardi, M.Y.: Model checking for database theoreticians. In: Proc. of the 10th Int.
Conf. on Database Theory (ICDT). Lecture Notes in Computer Science, vol. 3363,
pp. 1–16. Springer (2005)
29. Zaychik Moffitt, V., Stoyanovich, J., Abiteboul, S., Miklau, G.: Collaborative access
control in WebdamLog. In: Proc. of the ACM SIGMOD Int. Conf. on Management
of Data. pp. 197–211. ACM (2015)
30. Zhou, W., Mapara, S., Ren, Y., Li, Y., Haeberlen, A., Ives, Z., Loo, B.T., Sherr, M.:
Distributed time-aware provenance. Proc. of the VLDB Endowment 6(2), 49–60
(2012)