зеркало из https://github.com/golang/tools.git
813 строки
26 KiB
Go
813 строки
26 KiB
Go
// Copyright 2018 The Go Authors. All rights reserved.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
package jsonrpc2
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"golang.org/x/tools/internal/event"
|
|
"golang.org/x/tools/internal/event/keys"
|
|
"golang.org/x/tools/internal/event/label"
|
|
"golang.org/x/tools/internal/jsonrpc2"
|
|
)
|
|
|
|
// Binder builds a connection configuration.
|
|
// This may be used in servers to generate a new configuration per connection.
|
|
// ConnectionOptions itself implements Binder returning itself unmodified, to
|
|
// allow for the simple cases where no per connection information is needed.
|
|
type Binder interface {
|
|
// Bind returns the ConnectionOptions to use when establishing the passed-in
|
|
// Connection.
|
|
//
|
|
// The connection is not ready to use when Bind is called,
|
|
// but Bind may close it without reading or writing to it.
|
|
Bind(context.Context, *Connection) ConnectionOptions
|
|
}
|
|
|
|
// A BinderFunc implements the Binder interface for a standalone Bind function.
|
|
type BinderFunc func(context.Context, *Connection) ConnectionOptions
|
|
|
|
func (f BinderFunc) Bind(ctx context.Context, c *Connection) ConnectionOptions {
|
|
return f(ctx, c)
|
|
}
|
|
|
|
var _ Binder = BinderFunc(nil)
|
|
|
|
// ConnectionOptions holds the options for new connections.
|
|
type ConnectionOptions struct {
|
|
// Framer allows control over the message framing and encoding.
|
|
// If nil, HeaderFramer will be used.
|
|
Framer Framer
|
|
// Preempter allows registration of a pre-queue message handler.
|
|
// If nil, no messages will be preempted.
|
|
Preempter Preempter
|
|
// Handler is used as the queued message handler for inbound messages.
|
|
// If nil, all responses will be ErrNotHandled.
|
|
Handler Handler
|
|
// OnInternalError, if non-nil, is called with any internal errors that occur
|
|
// while serving the connection, such as protocol errors or invariant
|
|
// violations. (If nil, internal errors result in panics.)
|
|
OnInternalError func(error)
|
|
}
|
|
|
|
// Connection manages the jsonrpc2 protocol, connecting responses back to their
|
|
// calls.
|
|
// Connection is bidirectional; it does not have a designated server or client
|
|
// end.
|
|
type Connection struct {
|
|
seq int64 // must only be accessed using atomic operations
|
|
|
|
stateMu sync.Mutex
|
|
state inFlightState // accessed only in updateInFlight
|
|
done chan struct{} // closed (under stateMu) when state.closed is true and all goroutines have completed
|
|
|
|
writer chan Writer // 1-buffered; stores the writer when not in use
|
|
|
|
handler Handler
|
|
|
|
onInternalError func(error)
|
|
onDone func()
|
|
}
|
|
|
|
// inFlightState records the state of the incoming and outgoing calls on a
|
|
// Connection.
|
|
type inFlightState struct {
|
|
connClosing bool // true when the Connection's Close method has been called
|
|
reading bool // true while the readIncoming goroutine is running
|
|
readErr error // non-nil when the readIncoming goroutine exits (typically io.EOF)
|
|
writeErr error // non-nil if a call to the Writer has failed with a non-canceled Context
|
|
|
|
// closer shuts down and cleans up the Reader and Writer state, ideally
|
|
// interrupting any Read or Write call that is currently blocked. It is closed
|
|
// when the state is idle and one of: connClosing is true, readErr is non-nil,
|
|
// or writeErr is non-nil.
|
|
//
|
|
// After the closer has been invoked, the closer field is set to nil
|
|
// and the closeErr field is simultaneously set to its result.
|
|
closer io.Closer
|
|
closeErr error // error returned from closer.Close
|
|
|
|
outgoingCalls map[ID]*AsyncCall // calls only
|
|
outgoingNotifications int // # of notifications awaiting "write"
|
|
|
|
// incoming stores the total number of incoming calls and notifications
|
|
// that have not yet written or processed a result.
|
|
incoming int
|
|
|
|
incomingByID map[ID]*incomingRequest // calls only
|
|
|
|
// handlerQueue stores the backlog of calls and notifications that were not
|
|
// already handled by a preempter.
|
|
// The queue does not include the request currently being handled (if any).
|
|
handlerQueue []*incomingRequest
|
|
handlerRunning bool
|
|
}
|
|
|
|
// updateInFlight locks the state of the connection's in-flight requests, allows
|
|
// f to mutate that state, and closes the connection if it is idle and either
|
|
// is closing or has a read or write error.
|
|
func (c *Connection) updateInFlight(f func(*inFlightState)) {
|
|
c.stateMu.Lock()
|
|
defer c.stateMu.Unlock()
|
|
|
|
s := &c.state
|
|
|
|
f(s)
|
|
|
|
select {
|
|
case <-c.done:
|
|
// The connection was already completely done at the start of this call to
|
|
// updateInFlight, so it must remain so. (The call to f should have noticed
|
|
// that and avoided making any updates that would cause the state to be
|
|
// non-idle.)
|
|
if !s.idle() {
|
|
panic("jsonrpc2_v2: updateInFlight transitioned to non-idle when already done")
|
|
}
|
|
return
|
|
default:
|
|
}
|
|
|
|
if s.idle() && s.shuttingDown(ErrUnknown) != nil {
|
|
if s.closer != nil {
|
|
s.closeErr = s.closer.Close()
|
|
s.closer = nil // prevent duplicate Close calls
|
|
}
|
|
if s.reading {
|
|
// The readIncoming goroutine is still running. Our call to Close should
|
|
// cause it to exit soon, at which point it will make another call to
|
|
// updateInFlight, set s.reading to false, and mark the Connection done.
|
|
} else {
|
|
// The readIncoming goroutine has exited, or never started to begin with.
|
|
// Since everything else is idle, we're completely done.
|
|
if c.onDone != nil {
|
|
c.onDone()
|
|
}
|
|
close(c.done)
|
|
}
|
|
}
|
|
}
|
|
|
|
// idle reports whether the connection is in a state with no pending calls or
|
|
// notifications.
|
|
//
|
|
// If idle returns true, the readIncoming goroutine may still be running,
|
|
// but no other goroutines are doing work on behalf of the connection.
|
|
func (s *inFlightState) idle() bool {
|
|
return len(s.outgoingCalls) == 0 && s.outgoingNotifications == 0 && s.incoming == 0 && !s.handlerRunning
|
|
}
|
|
|
|
// shuttingDown reports whether the connection is in a state that should
|
|
// disallow new (incoming and outgoing) calls. It returns either nil or
|
|
// an error that is or wraps the provided errClosing.
|
|
func (s *inFlightState) shuttingDown(errClosing error) error {
|
|
if s.connClosing {
|
|
// If Close has been called explicitly, it doesn't matter what state the
|
|
// Reader and Writer are in: we shouldn't be starting new work because the
|
|
// caller told us not to start new work.
|
|
return errClosing
|
|
}
|
|
if s.readErr != nil {
|
|
// If the read side of the connection is broken, we cannot read new call
|
|
// requests, and cannot read responses to our outgoing calls.
|
|
return fmt.Errorf("%w: %v", errClosing, s.readErr)
|
|
}
|
|
if s.writeErr != nil {
|
|
// If the write side of the connection is broken, we cannot write responses
|
|
// for incoming calls, and cannot write requests for outgoing calls.
|
|
return fmt.Errorf("%w: %v", errClosing, s.writeErr)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// incomingRequest is used to track an incoming request as it is being handled
|
|
type incomingRequest struct {
|
|
*Request // the request being processed
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
endSpan func() // called (and set to nil) when the response is sent
|
|
}
|
|
|
|
// Bind returns the options unmodified.
|
|
func (o ConnectionOptions) Bind(context.Context, *Connection) ConnectionOptions {
|
|
return o
|
|
}
|
|
|
|
// newConnection creates a new connection and runs it.
|
|
//
|
|
// This is used by the Dial and Serve functions to build the actual connection.
|
|
//
|
|
// The connection is closed automatically (and its resources cleaned up) when
|
|
// the last request has completed after the underlying ReadWriteCloser breaks,
|
|
// but it may be stopped earlier by calling Close (for a clean shutdown).
|
|
func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) *Connection {
|
|
// TODO: Should we create a new event span here?
|
|
// This will propagate cancellation from ctx; should it?
|
|
ctx := notDone{bindCtx}
|
|
|
|
c := &Connection{
|
|
state: inFlightState{closer: rwc},
|
|
done: make(chan struct{}),
|
|
writer: make(chan Writer, 1),
|
|
onDone: onDone,
|
|
}
|
|
// It's tempting to set a finalizer on c to verify that the state has gone
|
|
// idle when the connection becomes unreachable. Unfortunately, the Binder
|
|
// interface makes that unsafe: it allows the Handler to close over the
|
|
// Connection, which could create a reference cycle that would cause the
|
|
// Connection to become uncollectable.
|
|
|
|
options := binder.Bind(bindCtx, c)
|
|
framer := options.Framer
|
|
if framer == nil {
|
|
framer = HeaderFramer()
|
|
}
|
|
c.handler = options.Handler
|
|
if c.handler == nil {
|
|
c.handler = defaultHandler{}
|
|
}
|
|
c.onInternalError = options.OnInternalError
|
|
|
|
c.writer <- framer.Writer(rwc)
|
|
reader := framer.Reader(rwc)
|
|
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
select {
|
|
case <-c.done:
|
|
// Bind already closed the connection; don't start a goroutine to read it.
|
|
return
|
|
default:
|
|
}
|
|
|
|
// The goroutine started here will continue until the underlying stream is closed.
|
|
//
|
|
// (If the Binder closed the Connection already, this should error out and
|
|
// return almost immediately.)
|
|
s.reading = true
|
|
go c.readIncoming(ctx, reader, options.Preempter)
|
|
})
|
|
return c
|
|
}
|
|
|
|
// Notify invokes the target method but does not wait for a response.
|
|
// The params will be marshaled to JSON before sending over the wire, and will
|
|
// be handed to the method invoked.
|
|
func (c *Connection) Notify(ctx context.Context, method string, params interface{}) (err error) {
|
|
ctx, done := event.Start(ctx, method,
|
|
jsonrpc2.Method.Of(method),
|
|
jsonrpc2.RPCDirection.Of(jsonrpc2.Outbound),
|
|
)
|
|
attempted := false
|
|
|
|
defer func() {
|
|
labelStatus(ctx, err)
|
|
done()
|
|
if attempted {
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
s.outgoingNotifications--
|
|
})
|
|
}
|
|
}()
|
|
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
// If the connection is shutting down, allow outgoing notifications only if
|
|
// there is at least one call still in flight. The number of calls in flight
|
|
// cannot increase once shutdown begins, and allowing outgoing notifications
|
|
// may permit notifications that will cancel in-flight calls.
|
|
if len(s.outgoingCalls) == 0 && len(s.incomingByID) == 0 {
|
|
err = s.shuttingDown(ErrClientClosing)
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
s.outgoingNotifications++
|
|
attempted = true
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
notify, err := NewNotification(method, params)
|
|
if err != nil {
|
|
return fmt.Errorf("marshaling notify parameters: %v", err)
|
|
}
|
|
|
|
event.Metric(ctx, jsonrpc2.Started.Of(1))
|
|
return c.write(ctx, notify)
|
|
}
|
|
|
|
// Call invokes the target method and returns an object that can be used to await the response.
|
|
// The params will be marshaled to JSON before sending over the wire, and will
|
|
// be handed to the method invoked.
|
|
// You do not have to wait for the response, it can just be ignored if not needed.
|
|
// If sending the call failed, the response will be ready and have the error in it.
|
|
func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
|
|
// Generate a new request identifier.
|
|
id := Int64ID(atomic.AddInt64(&c.seq, 1))
|
|
ctx, endSpan := event.Start(ctx, method,
|
|
jsonrpc2.Method.Of(method),
|
|
jsonrpc2.RPCDirection.Of(jsonrpc2.Outbound),
|
|
jsonrpc2.RPCID.Of(fmt.Sprintf("%q", id)),
|
|
)
|
|
|
|
ac := &AsyncCall{
|
|
id: id,
|
|
ready: make(chan struct{}),
|
|
ctx: ctx,
|
|
endSpan: endSpan,
|
|
}
|
|
// When this method returns, either ac is retired, or the request has been
|
|
// written successfully and the call is awaiting a response (to be provided by
|
|
// the readIncoming goroutine).
|
|
|
|
call, err := NewCall(ac.id, method, params)
|
|
if err != nil {
|
|
ac.retire(&Response{ID: id, Error: fmt.Errorf("marshaling call parameters: %w", err)})
|
|
return ac
|
|
}
|
|
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
err = s.shuttingDown(ErrClientClosing)
|
|
if err != nil {
|
|
return
|
|
}
|
|
if s.outgoingCalls == nil {
|
|
s.outgoingCalls = make(map[ID]*AsyncCall)
|
|
}
|
|
s.outgoingCalls[ac.id] = ac
|
|
})
|
|
if err != nil {
|
|
ac.retire(&Response{ID: id, Error: err})
|
|
return ac
|
|
}
|
|
|
|
event.Metric(ctx, jsonrpc2.Started.Of(1))
|
|
if err := c.write(ctx, call); err != nil {
|
|
// Sending failed. We will never get a response, so deliver a fake one if it
|
|
// wasn't already retired by the connection breaking.
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if s.outgoingCalls[ac.id] == ac {
|
|
delete(s.outgoingCalls, ac.id)
|
|
ac.retire(&Response{ID: id, Error: err})
|
|
} else {
|
|
// ac was already retired by the readIncoming goroutine:
|
|
// perhaps our write raced with the Read side of the connection breaking.
|
|
}
|
|
})
|
|
}
|
|
return ac
|
|
}
|
|
|
|
type AsyncCall struct {
|
|
id ID
|
|
ready chan struct{} // closed after response has been set and span has been ended
|
|
response *Response
|
|
ctx context.Context // for event logging only
|
|
endSpan func() // close the tracing span when all processing for the message is complete
|
|
}
|
|
|
|
// ID used for this call.
|
|
// This can be used to cancel the call if needed.
|
|
func (ac *AsyncCall) ID() ID { return ac.id }
|
|
|
|
// IsReady can be used to check if the result is already prepared.
|
|
// This is guaranteed to return true on a result for which Await has already
|
|
// returned, or a call that failed to send in the first place.
|
|
func (ac *AsyncCall) IsReady() bool {
|
|
select {
|
|
case <-ac.ready:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// retire processes the response to the call.
|
|
func (ac *AsyncCall) retire(response *Response) {
|
|
select {
|
|
case <-ac.ready:
|
|
panic(fmt.Sprintf("jsonrpc2: retire called twice for ID %v", ac.id))
|
|
default:
|
|
}
|
|
|
|
ac.response = response
|
|
labelStatus(ac.ctx, response.Error)
|
|
ac.endSpan()
|
|
// Allow the trace context, which may retain a lot of reachable values,
|
|
// to be garbage-collected.
|
|
ac.ctx, ac.endSpan = nil, nil
|
|
|
|
close(ac.ready)
|
|
}
|
|
|
|
// Await waits for (and decodes) the results of a Call.
|
|
// The response will be unmarshaled from JSON into the result.
|
|
func (ac *AsyncCall) Await(ctx context.Context, result interface{}) error {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-ac.ready:
|
|
}
|
|
if ac.response.Error != nil {
|
|
return ac.response.Error
|
|
}
|
|
if result == nil {
|
|
return nil
|
|
}
|
|
return json.Unmarshal(ac.response.Result, result)
|
|
}
|
|
|
|
// Respond delivers a response to an incoming Call.
|
|
//
|
|
// Respond must be called exactly once for any message for which a handler
|
|
// returns ErrAsyncResponse. It must not be called for any other message.
|
|
func (c *Connection) Respond(id ID, result interface{}, err error) error {
|
|
var req *incomingRequest
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
req = s.incomingByID[id]
|
|
})
|
|
if req == nil {
|
|
return c.internalErrorf("Request not found for ID %v", id)
|
|
}
|
|
|
|
if err == ErrAsyncResponse {
|
|
// Respond is supposed to supply the asynchronous response, so it would be
|
|
// confusing to call Respond with an error that promises to call Respond
|
|
// again.
|
|
err = c.internalErrorf("Respond called with ErrAsyncResponse for %q", req.Method)
|
|
}
|
|
return c.processResult("Respond", req, result, err)
|
|
}
|
|
|
|
// Cancel cancels the Context passed to the Handle call for the inbound message
|
|
// with the given ID.
|
|
//
|
|
// Cancel will not complain if the ID is not a currently active message, and it
|
|
// will not cause any messages that have not arrived yet with that ID to be
|
|
// cancelled.
|
|
func (c *Connection) Cancel(id ID) {
|
|
var req *incomingRequest
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
req = s.incomingByID[id]
|
|
})
|
|
if req != nil {
|
|
req.cancel()
|
|
}
|
|
}
|
|
|
|
// Wait blocks until the connection is fully closed, but does not close it.
|
|
func (c *Connection) Wait() error {
|
|
var err error
|
|
<-c.done
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
err = s.closeErr
|
|
})
|
|
return err
|
|
}
|
|
|
|
// Close stops accepting new requests, waits for in-flight requests and enqueued
|
|
// Handle calls to complete, and then closes the underlying stream.
|
|
//
|
|
// After the start of a Close, notification requests (that lack IDs and do not
|
|
// receive responses) will continue to be passed to the Preempter, but calls
|
|
// with IDs will receive immediate responses with ErrServerClosing, and no new
|
|
// requests (not even notifications!) will be enqueued to the Handler.
|
|
func (c *Connection) Close() error {
|
|
// Stop handling new requests, and interrupt the reader (by closing the
|
|
// connection) as soon as the active requests finish.
|
|
c.updateInFlight(func(s *inFlightState) { s.connClosing = true })
|
|
|
|
return c.Wait()
|
|
}
|
|
|
|
// readIncoming collects inbound messages from the reader and delivers them, either responding
|
|
// to outgoing calls or feeding requests to the queue.
|
|
func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter Preempter) {
|
|
var err error
|
|
for {
|
|
var (
|
|
msg Message
|
|
n int64
|
|
)
|
|
msg, n, err = reader.Read(ctx)
|
|
if err != nil {
|
|
break
|
|
}
|
|
|
|
switch msg := msg.(type) {
|
|
case *Request:
|
|
c.acceptRequest(ctx, msg, n, preempter)
|
|
|
|
case *Response:
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if ac, ok := s.outgoingCalls[msg.ID]; ok {
|
|
delete(s.outgoingCalls, msg.ID)
|
|
ac.retire(msg)
|
|
} else {
|
|
// TODO: How should we report unexpected responses?
|
|
}
|
|
})
|
|
|
|
default:
|
|
c.internalErrorf("Read returned an unexpected message of type %T", msg)
|
|
}
|
|
}
|
|
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
s.reading = false
|
|
s.readErr = err
|
|
|
|
// Retire any outgoing requests that were still in flight: with the Reader no
|
|
// longer being processed, they necessarily cannot receive a response.
|
|
for id, ac := range s.outgoingCalls {
|
|
ac.retire(&Response{ID: id, Error: err})
|
|
}
|
|
s.outgoingCalls = nil
|
|
})
|
|
}
|
|
|
|
// acceptRequest either handles msg synchronously or enqueues it to be handled
|
|
// asynchronously.
|
|
func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes int64, preempter Preempter) {
|
|
// Add a span to the context for this request.
|
|
labels := append(make([]label.Label, 0, 3), // Make space for the ID if present.
|
|
jsonrpc2.Method.Of(msg.Method),
|
|
jsonrpc2.RPCDirection.Of(jsonrpc2.Inbound),
|
|
)
|
|
if msg.IsCall() {
|
|
labels = append(labels, jsonrpc2.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
|
|
}
|
|
ctx, endSpan := event.Start(ctx, msg.Method, labels...)
|
|
event.Metric(ctx,
|
|
jsonrpc2.Started.Of(1),
|
|
jsonrpc2.ReceivedBytes.Of(msgBytes))
|
|
|
|
// In theory notifications cannot be cancelled, but we build them a cancel
|
|
// context anyway.
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
req := &incomingRequest{
|
|
Request: msg,
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
endSpan: endSpan,
|
|
}
|
|
|
|
// If the request is a call, add it to the incoming map so it can be
|
|
// cancelled (or responded) by ID.
|
|
var err error
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
s.incoming++
|
|
|
|
if req.IsCall() {
|
|
if s.incomingByID[req.ID] != nil {
|
|
err = fmt.Errorf("%w: request ID %v already in use", ErrInvalidRequest, req.ID)
|
|
req.ID = ID{} // Don't misattribute this error to the existing request.
|
|
return
|
|
}
|
|
|
|
if s.incomingByID == nil {
|
|
s.incomingByID = make(map[ID]*incomingRequest)
|
|
}
|
|
s.incomingByID[req.ID] = req
|
|
|
|
// When shutting down, reject all new Call requests, even if they could
|
|
// theoretically be handled by the preempter. The preempter could return
|
|
// ErrAsyncResponse, which would increase the amount of work in flight
|
|
// when we're trying to ensure that it strictly decreases.
|
|
err = s.shuttingDown(ErrServerClosing)
|
|
}
|
|
})
|
|
if err != nil {
|
|
c.processResult("acceptRequest", req, nil, err)
|
|
return
|
|
}
|
|
|
|
if preempter != nil {
|
|
result, err := preempter.Preempt(req.ctx, req.Request)
|
|
|
|
if req.IsCall() && errors.Is(err, ErrAsyncResponse) {
|
|
// This request will remain in flight until Respond is called for it.
|
|
return
|
|
}
|
|
|
|
if !errors.Is(err, ErrNotHandled) {
|
|
c.processResult("Preempt", req, result, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
// If the connection is shutting down, don't enqueue anything to the
|
|
// handler — not even notifications. That ensures that if the handler
|
|
// continues to make progress, it will eventually become idle and
|
|
// close the connection.
|
|
err = s.shuttingDown(ErrServerClosing)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// We enqueue requests that have not been preempted to an unbounded slice.
|
|
// Unfortunately, we cannot in general limit the size of the handler
|
|
// queue: we have to read every response that comes in on the wire
|
|
// (because it may be responding to a request issued by, say, an
|
|
// asynchronous handler), and in order to get to that response we have
|
|
// to read all of the requests that came in ahead of it.
|
|
s.handlerQueue = append(s.handlerQueue, req)
|
|
if !s.handlerRunning {
|
|
// We start the handleAsync goroutine when it has work to do, and let it
|
|
// exit when the queue empties.
|
|
//
|
|
// Otherwise, in order to synchronize the handler we would need some other
|
|
// goroutine (probably readIncoming?) to explicitly wait for handleAsync
|
|
// to finish, and that would complicate error reporting: either the error
|
|
// report from the goroutine would be blocked on the handler emptying its
|
|
// queue (which was tried, and introduced a deadlock detected by
|
|
// TestCloseCallRace), or the error would need to be reported separately
|
|
// from synchronizing completion. Allowing the handler goroutine to exit
|
|
// when idle seems simpler than trying to implement either of those
|
|
// alternatives correctly.
|
|
s.handlerRunning = true
|
|
go c.handleAsync()
|
|
}
|
|
})
|
|
if err != nil {
|
|
c.processResult("acceptRequest", req, nil, err)
|
|
}
|
|
}
|
|
|
|
// handleAsync invokes the handler on the requests in the handler queue
|
|
// sequentially until the queue is empty.
|
|
func (c *Connection) handleAsync() {
|
|
for {
|
|
var req *incomingRequest
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if len(s.handlerQueue) > 0 {
|
|
req, s.handlerQueue = s.handlerQueue[0], s.handlerQueue[1:]
|
|
} else {
|
|
s.handlerRunning = false
|
|
}
|
|
})
|
|
if req == nil {
|
|
return
|
|
}
|
|
|
|
// Only deliver to the Handler if not already canceled.
|
|
if err := req.ctx.Err(); err != nil {
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if s.writeErr != nil {
|
|
// Assume that req.ctx was canceled due to s.writeErr.
|
|
// TODO(#51365): use a Context API to plumb this through req.ctx.
|
|
err = fmt.Errorf("%w: %v", ErrServerClosing, s.writeErr)
|
|
}
|
|
})
|
|
c.processResult("handleAsync", req, nil, err)
|
|
continue
|
|
}
|
|
|
|
result, err := c.handler.Handle(req.ctx, req.Request)
|
|
c.processResult(c.handler, req, result, err)
|
|
}
|
|
}
|
|
|
|
// processResult processes the result of a request and, if appropriate, sends a response.
|
|
func (c *Connection) processResult(from interface{}, req *incomingRequest, result interface{}, err error) error {
|
|
switch err {
|
|
case ErrAsyncResponse:
|
|
if !req.IsCall() {
|
|
return c.internalErrorf("%#v returned ErrAsyncResponse for a %q Request without an ID", from, req.Method)
|
|
}
|
|
return nil // This request is still in flight, so don't record the result yet.
|
|
case ErrNotHandled, ErrMethodNotFound:
|
|
// Add detail describing the unhandled method.
|
|
err = fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method)
|
|
}
|
|
|
|
if req.endSpan == nil {
|
|
return c.internalErrorf("%#v produced a duplicate %q Response", from, req.Method)
|
|
}
|
|
|
|
if result != nil && err != nil {
|
|
c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v", from, req.Method, err, result)
|
|
result = nil // Discard the spurious result and respond with err.
|
|
}
|
|
|
|
if req.IsCall() {
|
|
if result == nil && err == nil {
|
|
err = c.internalErrorf("%#v returned a nil result and nil error for a %q Request that requires a Response", from, req.Method)
|
|
}
|
|
|
|
response, respErr := NewResponse(req.ID, result, err)
|
|
|
|
// The caller could theoretically reuse the request's ID as soon as we've
|
|
// sent the response, so ensure that it is removed from the incoming map
|
|
// before sending.
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
delete(s.incomingByID, req.ID)
|
|
})
|
|
if respErr == nil {
|
|
writeErr := c.write(notDone{req.ctx}, response)
|
|
if err == nil {
|
|
err = writeErr
|
|
}
|
|
} else {
|
|
err = c.internalErrorf("%#v returned a malformed result for %q: %w", from, req.Method, respErr)
|
|
}
|
|
} else { // req is a notification
|
|
if result != nil {
|
|
err = c.internalErrorf("%#v returned a non-nil result for a %q Request without an ID", from, req.Method)
|
|
} else if err != nil {
|
|
err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, req.Method, err)
|
|
}
|
|
if err != nil {
|
|
// TODO: can/should we do anything with this error beyond writing it to the event log?
|
|
// (Is this the right label to attach to the log?)
|
|
event.Label(req.ctx, keys.Err.Of(err))
|
|
}
|
|
}
|
|
|
|
labelStatus(req.ctx, err)
|
|
|
|
// Cancel the request and finalize the event span to free any associated resources.
|
|
req.cancel()
|
|
req.endSpan()
|
|
req.endSpan = nil
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if s.incoming == 0 {
|
|
panic("jsonrpc2_v2: processResult called when incoming count is already zero")
|
|
}
|
|
s.incoming--
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// write is used by all things that write outgoing messages, including replies.
|
|
// it makes sure that writes are atomic
|
|
func (c *Connection) write(ctx context.Context, msg Message) error {
|
|
writer := <-c.writer
|
|
defer func() { c.writer <- writer }()
|
|
n, err := writer.Write(ctx, msg)
|
|
event.Metric(ctx, jsonrpc2.SentBytes.Of(n))
|
|
|
|
if err != nil && ctx.Err() == nil {
|
|
// The call to Write failed, and since ctx.Err() is nil we can't attribute
|
|
// the failure (even indirectly) to Context cancellation. The writer appears
|
|
// to be broken, and future writes are likely to also fail.
|
|
//
|
|
// If the read side of the connection is also broken, we might not even be
|
|
// able to receive cancellation notifications. Since we can't reliably write
|
|
// the results of incoming calls and can't receive explicit cancellations,
|
|
// cancel the calls now.
|
|
c.updateInFlight(func(s *inFlightState) {
|
|
if s.writeErr == nil {
|
|
s.writeErr = err
|
|
for _, r := range s.incomingByID {
|
|
r.cancel()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
// internalErrorf reports an internal error. By default it panics, but if
|
|
// c.onInternalError is non-nil it instead calls that and returns an error
|
|
// wrapping ErrInternal.
|
|
func (c *Connection) internalErrorf(format string, args ...interface{}) error {
|
|
err := fmt.Errorf(format, args...)
|
|
if c.onInternalError == nil {
|
|
panic("jsonrpc2: " + err.Error())
|
|
}
|
|
c.onInternalError(err)
|
|
|
|
return fmt.Errorf("%w: %v", ErrInternal, err)
|
|
}
|
|
|
|
// labelStatus labels the status of the event in ctx based on whether err is nil.
|
|
func labelStatus(ctx context.Context, err error) {
|
|
if err == nil {
|
|
event.Label(ctx, jsonrpc2.StatusCode.Of("OK"))
|
|
} else {
|
|
event.Label(ctx, jsonrpc2.StatusCode.Of("ERROR"))
|
|
}
|
|
}
|
|
|
|
// notDone is a context.Context wrapper that returns a nil Done channel.
|
|
type notDone struct{ ctx context.Context }
|
|
|
|
func (ic notDone) Value(key interface{}) interface{} {
|
|
return ic.ctx.Value(key)
|
|
}
|
|
|
|
func (notDone) Done() <-chan struct{} { return nil }
|
|
func (notDone) Err() error { return nil }
|
|
func (notDone) Deadline() (time.Time, bool) { return time.Time{}, false }
|