internal/jsonrpc2: remove request state

This removes the state machine from the request.
It adds a done handler and uses that to manage the unlock channel instead
This also allows us to remove the nextRequest channel from request.
This is the last major piece that allows us to split up the run method into
composable handlers.

Change-Id: I5517ed5a51e30534754522a58453c27b5178ffa8
Reviewed-on: https://go-review.googlesource.com/c/tools/+/226839
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Ian Cottrell 2020-03-31 15:39:10 -04:00
Родитель 44c82bac18
Коммит 5c4bdbc02c
2 изменённых файлов: 33 добавлений и 28 удалений

Просмотреть файл

@ -74,7 +74,7 @@ func MethodNotFound(ctx context.Context, r *Request) error {
func MustReply(handler Handler) Handler {
return func(ctx context.Context, req *Request) error {
err := handler(ctx, req)
if req.state < requestReplied {
if req.done != nil {
panic(fmt.Errorf("request %q was never replied to", req.Method))
}
return err

Просмотреть файл

@ -38,22 +38,13 @@ type Conn struct {
handling map[ID]*Request
}
type requestState int
const (
requestWaiting = requestState(iota)
requestSerial
requestParallel
requestReplied
requestDone
)
// Request is sent to a server to represent a Call or Notify operaton.
type Request struct {
conn *Conn
cancel context.CancelFunc
state requestState
nextRequest chan struct{}
conn *Conn
cancel context.CancelFunc
// done holds set of callbacks added by OnReply, and is set back to nil if
// Reply has been called.
done []func()
// The Wire values of the request.
WireRequest
@ -222,16 +213,17 @@ func (r *Request) IsNotify() bool {
// This will mark the request as done, triggering any done
// handlers
func (r *Request) Reply(ctx context.Context, result interface{}, err error) error {
if r.state >= requestReplied {
if r.done == nil {
return fmt.Errorf("reply invoked more than once")
}
if r.state < requestParallel {
r.state = requestParallel
close(r.nextRequest)
}
r.state = requestReplied
recordStatus(ctx, nil)
defer func() {
recordStatus(ctx, err)
for i := len(r.done); i > 0; i-- {
r.done[i-1]()
}
r.done = nil
}()
if r.IsNotify() {
return nil
@ -280,6 +272,17 @@ func setHandling(r *Request, active bool) {
}
}
// OnReply adds a done callback to the request.
// All added callbacks are invoked during the one required call to Reply, and
// then dropped.
// It is an error to call this after Reply.
// This call is not safe for concurrent use, but should only be invoked by
// handlers and in general only one handler should be working on a request
// at any time.
func (r *Request) OnReply(do func()) {
r.done = append(r.done, do)
}
// combined has all the fields of both Request and Response.
// We can decode this and then work out which it is.
type combined struct {
@ -322,12 +325,12 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
case msg.Method != "":
// If method is set it must be a request.
reqCtx, cancelReq := context.WithCancel(runCtx)
thisRequest := nextRequest
waitForPrevious := nextRequest
nextRequest = make(chan struct{})
unlockNext := nextRequest
req := &Request{
conn: c,
cancel: cancelReq,
nextRequest: nextRequest,
conn: c,
cancel: cancelReq,
WireRequest: WireRequest{
VersionTag: msg.VersionTag,
Method: msg.Method,
@ -335,6 +338,9 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
ID: msg.ID,
},
}
req.OnReply(func() {
close(unlockNext)
})
if c.LegacyHooks != nil {
reqCtx = c.LegacyHooks.Request(reqCtx, c, Receive, &req.WireRequest)
}
@ -349,9 +355,8 @@ func (c *Conn) Run(runCtx context.Context, handler Handler) error {
setHandling(req, true)
_, queueDone := event.StartSpan(reqCtx, "queued")
go func() {
<-thisRequest
<-waitForPrevious
queueDone()
req.state = requestSerial
defer func() {
setHandling(req, false)
done()