зеркало из https://github.com/microsoft/ivy.git
repstore2 and repstore2bug are working
This commit is contained in:
Родитель
744700667c
Коммит
19a645e6f2
|
@ -0,0 +1,522 @@
|
|||
#lang ivy1.6
|
||||
|
||||
# Exercise: A buggy replicating key-value store
|
||||
# =============================================
|
||||
#
|
||||
# In this version of the [replicating key-value
|
||||
# store](repstore2.html), we've added a bug as an exercise. Try to
|
||||
# find the bug using compositional testing (don't use diff, it's too
|
||||
# easy). Fix the bug and test again.
|
||||
#
|
||||
# ### Bonus exercise
|
||||
#
|
||||
# At the end of the file, there is an isolate `iso_sys` that includes
|
||||
# the complete implmentation (including both servers and the
|
||||
# network). Try testing this isolate to see if you can find error.
|
||||
# If not, can you explain why not?
|
||||
|
||||
# Change the interpretation of type `key` to give it a larger range (say, `strbv[4]`).
|
||||
# Try testing isolates `iso_prim` and `iso_sec`. Do you still find the bug. If not,
|
||||
# can you explain why not?
|
||||
|
||||
# Basic data types
|
||||
# ----------------
|
||||
|
||||
# As with he simple one-process server (see
|
||||
# [repstore1.ivy](repstore1.html)), we begin by declaring the data
|
||||
# types needed to represent transactions and the messages that carry
|
||||
# them.
|
||||
|
||||
# The key and value types are uninterpreted for the moment.
|
||||
|
||||
|
||||
type key
|
||||
type value
|
||||
ghost type txid
|
||||
|
||||
type request_kind = {write,read}
|
||||
|
||||
object request_body = {
|
||||
type t = struct {
|
||||
knd : request_kind,
|
||||
ky : key,
|
||||
vl : value
|
||||
}
|
||||
}
|
||||
|
||||
object request = {
|
||||
type t = struct {
|
||||
tx : txid,
|
||||
bd : request_body.t
|
||||
}
|
||||
}
|
||||
|
||||
object response_body = {
|
||||
type t = struct {
|
||||
vl : value
|
||||
}
|
||||
}
|
||||
|
||||
object response = {
|
||||
type t = struct {
|
||||
tx : txid,
|
||||
bd : response_body.t
|
||||
}
|
||||
}
|
||||
|
||||
# The definition of a replica is also the same as before.
|
||||
|
||||
module replica = {
|
||||
function store(K:key) : value
|
||||
after init {
|
||||
store(K) := 0
|
||||
}
|
||||
action exec(inp : request_body.t) returns (out:response_body.t) = {
|
||||
if inp.knd = write {
|
||||
store(inp.ky) := inp.vl;
|
||||
}
|
||||
else if inp.knd = read {
|
||||
out.vl := store(inp.ky);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# The first difference is that we now have two servers, so the
|
||||
# enumerated type `dest.t` has two values, one for the primary and one
|
||||
# for the secondary.
|
||||
|
||||
object dest = {
|
||||
ghost type t = {prim,sec}
|
||||
}
|
||||
|
||||
# Reference object
|
||||
# ----------------
|
||||
|
||||
# Our reference model is the same as before.
|
||||
|
||||
object ref = {
|
||||
action create(inp : request_body.t, dst:dest.t) returns (tx : txid)
|
||||
action commit(tx : txid,dst:dest.t)
|
||||
action eval(tx : txid) returns (res : response_body.t)
|
||||
|
||||
instance rep : replica
|
||||
|
||||
individual next : txid
|
||||
function txs(X:txid) : request_body.t
|
||||
function txres(X:txid) : response_body.t
|
||||
relation committed(X:txid)
|
||||
|
||||
after init {
|
||||
next := 0;
|
||||
committed(X) := false;
|
||||
}
|
||||
|
||||
implement create {
|
||||
tx := next;
|
||||
txs(tx) := inp;
|
||||
next := next + 1;
|
||||
}
|
||||
|
||||
implement commit {
|
||||
assert 0 <= tx & tx < next;
|
||||
assert ~committed(tx);
|
||||
txres(tx) := rep.exec(txs(tx));
|
||||
committed(tx) := true;
|
||||
}
|
||||
delegate commit
|
||||
|
||||
implement eval {
|
||||
assert committed(tx);
|
||||
res := txres(tx);
|
||||
}
|
||||
delegate eval
|
||||
|
||||
interpret txid -> int
|
||||
}
|
||||
|
||||
|
||||
|
||||
# This time, however, we'll add another reference object to keep track
|
||||
# of the serialization order, so we can specify the interface between
|
||||
# primary and secondary. It has one action `serialize` that indicates when
|
||||
# a write transaction is serialized.
|
||||
|
||||
object ser = {
|
||||
|
||||
action serialize(tx : txid)
|
||||
|
||||
# The serializer object keeps track of which transactions have
|
||||
# been serialized.
|
||||
|
||||
relation serialized(X:txid)
|
||||
|
||||
after init {
|
||||
serialized(X) := false;
|
||||
}
|
||||
|
||||
# To serialzie a write, we must guarantee that transaction exists,
|
||||
# and that it has not already been serialized. Further, we can
|
||||
# only serialize write transactions.
|
||||
|
||||
implement serialize {
|
||||
assert 0 <= tx & tx < ref.next;
|
||||
assert ~serialized(tx);
|
||||
assert ref.txs(tx).knd = write;
|
||||
serialized(tx) := true;
|
||||
}
|
||||
delegate serialize
|
||||
|
||||
# Further,we specify that a write transaction cannot be committed
|
||||
# until it is serialized.
|
||||
|
||||
before ref.commit(tx : txid,dst:dest.t) {
|
||||
assert ref.txs(tx).knd = write -> serialized(tx);
|
||||
}
|
||||
}
|
||||
|
||||
# The implementation
|
||||
# ------------------
|
||||
|
||||
# Now we are ready to define our system implemention, consisting of
|
||||
# client endpoints, the primary server and the secondary server.
|
||||
|
||||
# Notice we include the `tcp` module here, since we will need it for
|
||||
# the ordered channels between the servers.
|
||||
|
||||
include tcp
|
||||
include udp
|
||||
|
||||
# Again, we have an uninterpreted type of client ids, and a request
|
||||
# message structure that encapsulates the request with its client id
|
||||
# for routing the response.
|
||||
|
||||
type client_id
|
||||
|
||||
type req_msg = struct {
|
||||
cid : client_id,
|
||||
req : request.t
|
||||
}
|
||||
|
||||
# The client endpoint is the same as before, except that we now have
|
||||
# two servers, so the client must decide which of the servers to send
|
||||
# a request to.
|
||||
|
||||
module client(cid,prim_chan,sec_chan,cl_chans) = {
|
||||
|
||||
action client_request(req : request_body.t, the_dst: dest.t)
|
||||
action client_response(req : response_body.t, tx : txid)
|
||||
|
||||
# To generate a request, we build a request message and send it to
|
||||
# the server. The server endpoint we send to is determined by the
|
||||
# parameter `the_dst`.
|
||||
|
||||
implement client_request {
|
||||
local m : req_msg {
|
||||
m.cid := cid;
|
||||
m.req.tx := ref.create(req,the_dst);
|
||||
m.req.bd := req;
|
||||
if the_dst = dest.prim {
|
||||
call prim_chan.send(m);
|
||||
} else {
|
||||
call sec_chan.send(m);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# To handle a response from the server, we simply pass it to the
|
||||
# `client_response` callback. The transaction id parameter is
|
||||
# "ghost" and is only used for specification.
|
||||
|
||||
implement cl_chans.recv(resp : response.t) {
|
||||
call client_response(resp.bd,resp.tx)
|
||||
}
|
||||
}
|
||||
|
||||
# The primary server module now has two additional parameters, for the
|
||||
# forward channel (forwarding writes to the secondary) and the reverse
|
||||
# channel (returning acks).
|
||||
|
||||
module primary_node(port, fwd_chan, rev_chan, cl_chans) = {
|
||||
instance rep : replica
|
||||
|
||||
# Again, we have an endpoint for receiving requests from clients.
|
||||
|
||||
instance req_chan : nondup_endpoint(port,req_msg)
|
||||
|
||||
# We have to remember how many pending writes each key has. We use
|
||||
# a map from keys to counters for this.
|
||||
|
||||
instance counter : unbounded_sequence
|
||||
function pending(K:key) : counter.t
|
||||
|
||||
# Initially, all the counters are zero.
|
||||
|
||||
after init {
|
||||
pending(K) := 0;
|
||||
}
|
||||
|
||||
# When receiving a request message from a client, the primary
|
||||
# node must first check whether it is a read or a write. In the case
|
||||
# of a read, we check if the key has a pending write. If not, we commit
|
||||
# the read, execute it, and return the response to the client. If there is
|
||||
# a pending write, we postpone the read by forwarding it to ourself. In
|
||||
# case of a write, we serialize the write, forward it to the secondary,
|
||||
# increment the pending count of the key, and finally execute it.
|
||||
#
|
||||
|
||||
implement req_chan.recv(inp : req_msg) {
|
||||
var rr := inp.req.bd;
|
||||
if rr.knd = read {
|
||||
if pending(rr.ky) = 0 {
|
||||
call ref.commit(inp.req.tx,dest.prim);
|
||||
var res : response.t;
|
||||
res.tx := inp.req.tx;
|
||||
res.bd := rep.exec(rr);
|
||||
call cl_chans(inp.cid).send(res)
|
||||
} else {
|
||||
call req_chan.send(inp); # if cannot execute, recirculate
|
||||
}
|
||||
} else if rr.knd = write {
|
||||
call ser.serialize(inp.req.tx); # this is ghost!
|
||||
call fwd_chan.send(inp);
|
||||
var res := rep.exec(inp.req.bd);
|
||||
}
|
||||
}
|
||||
|
||||
# When we receive a write request on the acknowledgement channel,
|
||||
# we decrement the pending count of the key.
|
||||
|
||||
implement rev_chan.recv(inp : req_msg) {
|
||||
var rr := inp.req.bd;
|
||||
if rr.knd = write {
|
||||
pending(rr.ky) := pending(rr.ky).prev;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# The secondary server handles only read request from clients. Since
|
||||
# the secondary's replica refelects only committed writes, we cann
|
||||
# immediately response to a read. We commit the read, build a response
|
||||
# message by executing the read on the replica, and send the response
|
||||
# back to the client.
|
||||
|
||||
module secondary_node(port, fwd_chan, rev_chan, cl_chans) = {
|
||||
instance rep : replica
|
||||
|
||||
instance req_chan : nondup_endpoint(port,req_msg)
|
||||
|
||||
implement req_chan.recv(inp : req_msg) {
|
||||
var rr := inp.req.bd;
|
||||
if rr.knd = read {
|
||||
call ref.commit(inp.req.tx,dest.sec);
|
||||
var res : response.t;
|
||||
res.tx := inp.req.tx;
|
||||
res.bd := rep.exec(rr);
|
||||
call cl_chans(inp.cid).send(res);
|
||||
}
|
||||
# ignore writes!
|
||||
}
|
||||
|
||||
# When the secondary receives a forwarded write from the primary,
|
||||
# it commits it, executes it, and sends the response to the
|
||||
# client.
|
||||
#
|
||||
|
||||
implement fwd_chan.recv(inp : req_msg) {
|
||||
var res : response.t;
|
||||
res.tx := inp.req.tx;
|
||||
res.bd := rep.exec(inp.req.bd);
|
||||
call cl_chans(inp.cid).send(res);
|
||||
call rev_chan.send(inp);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
# The plumbing
|
||||
# ------------
|
||||
|
||||
# Now we two servers, a primary and a secondary, and an array of
|
||||
# client endpoints, connecting them up with network channels.
|
||||
|
||||
# We have two TCP (ordered) channels beteween the primary and
|
||||
# secondary, one forward (listening on port 44090) and one reverse
|
||||
# (listening on port 44091). Both carry request messages.
|
||||
|
||||
instance fwd_chan : tcp_channel("localhost:44090",req_msg)
|
||||
instance rev_chan : tcp_channel("localhost:44091",req_msg)
|
||||
|
||||
# As before, we use a parameterized array of (unordered) endpoints for
|
||||
# the clients called `cl_chans`. The endpoints will use a range of
|
||||
# port numbers beginning with `441000`.
|
||||
|
||||
instance cl_chans : nondup_endpoint_set(client_id,44100,response.t)
|
||||
|
||||
# We create a corresponding array of clients.
|
||||
instance cl(X:client_id) : client(X,prim.req_chan,sec.req_chan,cl_chans)
|
||||
|
||||
# We create the primary and secondary servers, connecting them with
|
||||
# channels. The port numbers are for the client request channels of
|
||||
# the servers.
|
||||
|
||||
instance prim : primary_node(44200,fwd_chan.sndr,rev_chan.rcvr,cl_chans)
|
||||
instance sec : secondary_node(44201,fwd_chan.rcvr,rev_chan.sndr,cl_chans)
|
||||
|
||||
# The interface specifications
|
||||
# ----------------------------
|
||||
|
||||
# The service specification is unchanged from the simple sequential
|
||||
# server.
|
||||
|
||||
object service_spec = {
|
||||
|
||||
relation responded(X:txid)
|
||||
|
||||
after init {
|
||||
responded(X) := false;
|
||||
}
|
||||
|
||||
before cl_chans.send(p : client_id, m : response.t) {
|
||||
assert ~responded(m.tx);
|
||||
assert m.bd = ref.eval(m.tx);
|
||||
responded(m.tx) := true;
|
||||
}
|
||||
}
|
||||
|
||||
# Now, however, we need a specification for the interface between the
|
||||
# primary and the secondary. This will allow us to test the two
|
||||
# servers in isolation. We call this specification `mid_spec`. The state
|
||||
# of this interface keeps a FIFO queue of messages that have been serialized
|
||||
# but not yet transmitted over the interface. When a message is serialize,
|
||||
# we insert it in this queue
|
||||
|
||||
object mid_spec = {
|
||||
instance queue : unbounded_queue(txid)
|
||||
|
||||
after ser.serialize(tx:txid) {
|
||||
call queue.push(tx);
|
||||
}
|
||||
|
||||
# When a message is received at the secondary on the forward
|
||||
# channel, its contents must be correct for the given transaction
|
||||
# id. Moreover, the transaction id must be the next on in the
|
||||
# FIFO queue (in other words, the writes must arrive in
|
||||
# serialization order). To make things a little easier, we commit
|
||||
# the message when it is received. This gurantees that commits
|
||||
# from the secondary always occur in order, without our having to
|
||||
# explicitly specify that. Because the commit operation is
|
||||
# "ghost", it doesn't matter whether we execute in implementation
|
||||
# or specification code. We can put it wherever it is most
|
||||
# convenient.
|
||||
|
||||
before fwd_chan.rcvr.recv(inp : req_msg) {
|
||||
assert inp.req.bd = ref.txs(inp.req.tx);
|
||||
assert inp.req.tx = queue.pop;
|
||||
call ref.commit(inp.req.tx,dest.sec);
|
||||
}
|
||||
|
||||
# This tells IVy that the above specification is a guarantee for
|
||||
# the primary server, not for the network, which is the actual
|
||||
# caller of `fwd_chan.rcvr.recv`. This means that the assertions
|
||||
# will be verified when we test the primary. Usually, IVy's
|
||||
# default assignment of guarantees to object is what we want, but
|
||||
# in this case the caller is not the guarantor of the assertions.
|
||||
|
||||
delegate fwd_chan_rcvr_recv[before] -> prim
|
||||
|
||||
# The interface also requires that a given write message be acked
|
||||
# at most once. Otherwise, the pending counts in the primary will
|
||||
# be wrong. To ensure this, we maintain a set `acked` of
|
||||
# acknowledgef transactions.
|
||||
|
||||
relation acked(X:txid)
|
||||
|
||||
after init {
|
||||
acked(X) := false;
|
||||
}
|
||||
|
||||
# When an ack message is sent on the reverse channel, it must have
|
||||
# been committed (this is the meaning of the message) and it must
|
||||
# not already be acked. We record the fact that it is now acked.
|
||||
|
||||
before rev_chan.sndr.send(inp : req_msg) {
|
||||
assert ref.committed(inp.req.tx);
|
||||
assert ~acked(inp.req.tx);
|
||||
acked(inp.req.tx) := true;
|
||||
}
|
||||
|
||||
# When a request is created, we record the destination server. In
|
||||
# this way we can determine which server is allowed to commit the
|
||||
# request. This is a very common situation in specifications of
|
||||
# distributed services. Much of the interface state tends to be
|
||||
# related to the question of who is allowed to commit shich
|
||||
# transations.
|
||||
|
||||
function req_dest(X:txid) : dest.t
|
||||
|
||||
after ref.create(inp : request_body.t, dst:dest.t) returns (tx : txid) {
|
||||
req_dest(tx) := dst
|
||||
}
|
||||
|
||||
# We now specify that all reads must be commited by the server
|
||||
# they are addressed to, while all writes are committed by the
|
||||
# secondary
|
||||
|
||||
before ref.commit(tx : txid,dst:dest.t) {
|
||||
assert ref.txs(tx).knd = read -> req_dest(tx) = dst;
|
||||
assert ref.txs(tx).knd = write -> dst = dest.sec
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
# We export/import our API.
|
||||
|
||||
export cl.client_request
|
||||
import cl.client_response
|
||||
|
||||
# Finally, we create two isolates the allow us to verify our two
|
||||
# servers in isolation. Each isolate verifies just the the guarentees
|
||||
# of given server, either the primary or secondary. In both cases we
|
||||
# use our interface specifications and our reference object. The other
|
||||
# server and the network are abstracted away. Their role is played by
|
||||
# the test generator. Because we placed our interface specification
|
||||
# `mid_spec` at the secondary end of the channels `fwd_chan` and
|
||||
# `rev_chan`, those channels are used in the primary's isolate, but
|
||||
# not in the secondary's. Notice also that the client endpoints `cl`
|
||||
# are present in both isolates. Since they are trivial "stubs", there
|
||||
# is no reason to separate them into their own isolate.
|
||||
|
||||
trusted isolate iso_prim = prim with cl,cl_chans,ref,ser,service_spec,mid_spec,fwd_chan,rev_chan
|
||||
trusted isolate iso_sec = sec with cl,cl_chans,ref,ser,service_spec,mid_spec
|
||||
|
||||
# Here is an extra isolate you can use for integration testing:
|
||||
|
||||
trusted isolate iso_sys = this
|
||||
|
||||
# As before, we have to give concrete interpretations for the abstract
|
||||
# types in order to test.
|
||||
|
||||
interpret txid -> int
|
||||
interpret key -> strbv[1]
|
||||
interpret value -> bv[16]
|
||||
interpret client_id -> bv[1]
|
||||
|
||||
# The source file for this example is [here](repstore2.ivy). Try running this
|
||||
# example using commands like this:
|
||||
#
|
||||
# $ ivy_to_cpp target=test isolate=iso_prim build=true repstore2.ivy
|
||||
# $ ./repstore1 iters=100 runs=10 out=file.iev
|
||||
#
|
||||
# This tests the primary server. Look at the trace file `file.iev`
|
||||
# and notice which actions are generated by the tester and which by
|
||||
# the code.
|
||||
#
|
||||
# To test the secondary, use these commands:
|
||||
#
|
||||
# $ ivy_to_cpp target=test isolate=iso_sec build=true repstore2.ivy
|
||||
# $ ./repstore1 iters=100 runs=10 out=file.iev
|
||||
#
|
||||
# Again, see what actions the tester is generating. Try putting some
|
||||
# errors in this example and see if they produce assertions
|
||||
# failures. If not, why not?
|
||||
#
|
Загрузка…
Ссылка в новой задаче