зеркало из https://github.com/golang/tools.git
internal/jsonrpc2_v2: an updated jsonrpc2 library
Change-Id: I609173baa6842d33068a7e9596d54f03d89c5401 Reviewed-on: https://go-review.googlesource.com/c/tools/+/292169 Run-TryBot: Ian Cottrell <iancottrell@google.com> gopls-CI: kokoro <noreply+kokoro@google.com> TryBot-Result: Go Bot <gobot@golang.org> Trust: Ian Cottrell <iancottrell@google.com> Reviewed-by: Robert Findley <rfindley@google.com>
This commit is contained in:
Родитель
52cb77242b
Коммит
877f9c48b6
|
@ -0,0 +1,486 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/tools/internal/event"
|
||||
"golang.org/x/tools/internal/event/label"
|
||||
"golang.org/x/tools/internal/lsp/debug/tag"
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// Binder builds a connection configuration.
|
||||
// This may be used in servers to generate a new configuration per connection.
|
||||
// ConnectionOptions itself implements Binder returning itself unmodified, to
|
||||
// allow for the simple cases where no per connection information is needed.
|
||||
type Binder interface {
|
||||
// Bind is invoked when creating a new connection.
|
||||
// The connection is not ready to use when Bind is called.
|
||||
Bind(context.Context, *Connection) (ConnectionOptions, error)
|
||||
}
|
||||
|
||||
// ConnectionOptions holds the options for new connections.
|
||||
type ConnectionOptions struct {
|
||||
// Framer allows control over the message framing and encoding.
|
||||
// If nil, HeaderFramer will be used.
|
||||
Framer Framer
|
||||
// Preempter allows registration of a pre-queue message handler.
|
||||
// If nil, no messages will be preempted.
|
||||
Preempter Preempter
|
||||
// Handler is used as the queued message handler for inbound messages.
|
||||
// If nil, all responses will be ErrNotHandled.
|
||||
Handler Handler
|
||||
}
|
||||
|
||||
// Connection manages the jsonrpc2 protocol, connecting responses back to their
|
||||
// calls.
|
||||
// Connection is bidirectional; it does not have a designated server or client
|
||||
// end.
|
||||
type Connection struct {
|
||||
seq int64 // must only be accessed using atomic operations
|
||||
closer io.Closer
|
||||
writerBox chan Writer
|
||||
outgoingBox chan map[ID]chan<- *Response
|
||||
incomingBox chan map[ID]*incoming
|
||||
async async
|
||||
}
|
||||
|
||||
type AsyncCall struct {
|
||||
id ID
|
||||
response chan *Response // the channel a response will be delivered on
|
||||
resultBox chan asyncResult
|
||||
endSpan func() // close the tracing span when all processing for the message is complete
|
||||
}
|
||||
|
||||
type asyncResult struct {
|
||||
result []byte
|
||||
err error
|
||||
}
|
||||
|
||||
// incoming is used to track an incoming request as it is being handled
|
||||
type incoming struct {
|
||||
request *Request // the request being processed
|
||||
baseCtx context.Context // a base context for the message processing
|
||||
done func() // a function called when all processing for the message is complete
|
||||
handleCtx context.Context // the context for handling the message, child of baseCtx
|
||||
cancel func() // a function that cancels the handling context
|
||||
}
|
||||
|
||||
// Bind returns the options unmodified.
|
||||
func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) {
|
||||
return o, nil
|
||||
}
|
||||
|
||||
// newConnection creates a new connection and runs it.
|
||||
// This is used by the Dial and Serve functions to build the actual connection.
|
||||
func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
|
||||
c := &Connection{
|
||||
closer: rwc,
|
||||
writerBox: make(chan Writer, 1),
|
||||
outgoingBox: make(chan map[ID]chan<- *Response, 1),
|
||||
incomingBox: make(chan map[ID]*incoming, 1),
|
||||
}
|
||||
|
||||
options, err := binder.Bind(ctx, c)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if options.Framer == nil {
|
||||
options.Framer = HeaderFramer()
|
||||
}
|
||||
if options.Preempter == nil {
|
||||
options.Preempter = defaultHandler{}
|
||||
}
|
||||
if options.Handler == nil {
|
||||
options.Handler = defaultHandler{}
|
||||
}
|
||||
c.outgoingBox <- make(map[ID]chan<- *Response)
|
||||
c.incomingBox <- make(map[ID]*incoming)
|
||||
c.async.init()
|
||||
// the goroutines started here will continue until the underlying stream is closed
|
||||
reader := options.Framer.Reader(rwc)
|
||||
readToQueue := make(chan *incoming)
|
||||
queueToDeliver := make(chan *incoming)
|
||||
go c.readIncoming(ctx, reader, readToQueue)
|
||||
go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
|
||||
go c.deliverMessages(ctx, options.Handler, queueToDeliver)
|
||||
// releaseing the writer must be the last thing we do in case any requests
|
||||
// are blocked waiting for the connection to be ready
|
||||
c.writerBox <- options.Framer.Writer(rwc)
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Notify invokes the target method but does not wait for a response.
|
||||
// The params will be marshaled to JSON before sending over the wire, and will
|
||||
// be handed to the method invoked.
|
||||
func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error {
|
||||
notify, err := NewNotification(method, params)
|
||||
if err != nil {
|
||||
return errors.Errorf("marshaling notify parameters: %v", err)
|
||||
}
|
||||
ctx, done := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
)
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
err = c.write(ctx, notify)
|
||||
switch {
|
||||
case err != nil:
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
done()
|
||||
return err
|
||||
}
|
||||
|
||||
// Call invokes the target method and returns an object that can be used to await the response.
|
||||
// The params will be marshaled to JSON before sending over the wire, and will
|
||||
// be handed to the method invoked.
|
||||
// You do not have to wait for the response, it can just be ignored if not needed.
|
||||
// If sending the call failed, the response will be ready and have the error in it.
|
||||
func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
|
||||
result := &AsyncCall{
|
||||
id: Int64ID(atomic.AddInt64(&c.seq, 1)),
|
||||
resultBox: make(chan asyncResult, 1),
|
||||
}
|
||||
// generate a new request identifier
|
||||
call, err := NewCall(result.id, method, params)
|
||||
if err != nil {
|
||||
//set the result to failed
|
||||
result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)}
|
||||
return result
|
||||
}
|
||||
ctx, endSpan := event.Start(ctx, method,
|
||||
tag.Method.Of(method),
|
||||
tag.RPCDirection.Of(tag.Outbound),
|
||||
tag.RPCID.Of(fmt.Sprintf("%q", result.id)),
|
||||
)
|
||||
result.endSpan = endSpan
|
||||
event.Metric(ctx, tag.Started.Of(1))
|
||||
// We have to add ourselves to the pending map before we send, otherwise we
|
||||
// are racing the response.
|
||||
// rchan is buffered in case the response arrives without a listener.
|
||||
result.response = make(chan *Response, 1)
|
||||
pending := <-c.outgoingBox
|
||||
pending[result.id] = result.response
|
||||
c.outgoingBox <- pending
|
||||
// now we are ready to send
|
||||
if err := c.write(ctx, call); err != nil {
|
||||
// sending failed, we will never get a response, so deliver a fake one
|
||||
r, _ := NewResponse(result.id, nil, err)
|
||||
c.incomingResponse(r)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ID used for this call.
|
||||
// This can be used to cancel the call if needed.
|
||||
func (a *AsyncCall) ID() ID { return a.id }
|
||||
|
||||
// IsReady can be used to check if the result is already prepared.
|
||||
// This is guaranteed to return true on a result for which Await has already
|
||||
// returned, or a call that failed to send in the first place.
|
||||
func (a *AsyncCall) IsReady() bool {
|
||||
select {
|
||||
case r := <-a.resultBox:
|
||||
a.resultBox <- r
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Await the results of a Call.
|
||||
// The response will be unmarshaled from JSON into the result.
|
||||
func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
|
||||
defer a.endSpan()
|
||||
var r asyncResult
|
||||
select {
|
||||
case response := <-a.response:
|
||||
// response just arrived, prepare the result
|
||||
switch {
|
||||
case response.Error != nil:
|
||||
r.err = response.Error
|
||||
event.Label(ctx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
r.result = response.Result
|
||||
event.Label(ctx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
case r = <-a.resultBox:
|
||||
// result already available
|
||||
case <-ctx.Done():
|
||||
event.Label(ctx, tag.StatusCode.Of("CANCELLED"))
|
||||
return ctx.Err()
|
||||
}
|
||||
// refill the box for the next caller
|
||||
a.resultBox <- r
|
||||
// and unpack the result
|
||||
if r.err != nil {
|
||||
return r.err
|
||||
}
|
||||
if result == nil || len(r.result) == 0 {
|
||||
return nil
|
||||
}
|
||||
return json.Unmarshal(r.result, result)
|
||||
}
|
||||
|
||||
// Respond deliverers a response to an incoming Call.
|
||||
// It is an error to not call this exactly once for any message for which a
|
||||
// handler has previously returned ErrAsyncResponse. It is also an error to
|
||||
// call this for any other message.
|
||||
func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
|
||||
pending := <-c.incomingBox
|
||||
defer func() { c.incomingBox <- pending }()
|
||||
entry, found := pending[id]
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
delete(pending, id)
|
||||
return c.respond(entry, result, rerr)
|
||||
}
|
||||
|
||||
// Cancel is used to cancel an inbound message by ID, it does not cancel
|
||||
// outgoing messages.
|
||||
// This is only used inside a message handler that is layering a
|
||||
// cancellation protocol on top of JSON RPC 2.
|
||||
// It will not complain if the ID is not a currently active message, and it will
|
||||
// not cause any messages that have not arrived yet with that ID to be
|
||||
// cancelled.
|
||||
func (c *Connection) Cancel(id ID) {
|
||||
pending := <-c.incomingBox
|
||||
defer func() { c.incomingBox <- pending }()
|
||||
if entry, found := pending[id]; found && entry.cancel != nil {
|
||||
entry.cancel()
|
||||
entry.cancel = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Wait blocks until the connection is fully closed, but does not close it.
|
||||
func (c *Connection) Wait() error {
|
||||
return c.async.wait()
|
||||
}
|
||||
|
||||
// Close can be used to close the underlying stream, and then wait for the connection to
|
||||
// fully shut down.
|
||||
// This does not cancel in flight requests, but waits for them to gracefully complete.
|
||||
func (c *Connection) Close() error {
|
||||
// close the underlying stream
|
||||
if err := c.closer.Close(); err != nil && !isClosingError(err) {
|
||||
return err
|
||||
}
|
||||
// and then wait for it to cause the connection to close
|
||||
if err := c.Wait(); err != nil && !isClosingError(err) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// readIncoming collects inbound messages from the reader and delivers them, either responding
|
||||
// to outgoing calls or feeding requests to the queue.
|
||||
func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) {
|
||||
defer close(toQueue)
|
||||
for {
|
||||
// get the next message
|
||||
// no lock is needed, this is the only reader
|
||||
msg, n, err := reader.Read(ctx)
|
||||
if err != nil {
|
||||
// The stream failed, we cannot continue
|
||||
c.async.setError(err)
|
||||
return
|
||||
}
|
||||
switch msg := msg.(type) {
|
||||
case *Request:
|
||||
entry := &incoming{
|
||||
request: msg,
|
||||
}
|
||||
// add a span to the context for this request
|
||||
labels := append(make([]label.Label, 0, 3), // make space for the id if present
|
||||
tag.Method.Of(msg.Method),
|
||||
tag.RPCDirection.Of(tag.Inbound),
|
||||
)
|
||||
if msg.IsCall() {
|
||||
labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
|
||||
}
|
||||
entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...)
|
||||
event.Metric(entry.baseCtx,
|
||||
tag.Started.Of(1),
|
||||
tag.ReceivedBytes.Of(n))
|
||||
// in theory notifications cannot be cancelled, but we build them a cancel context anyway
|
||||
entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
|
||||
// if the request is a call, add it to the incoming map so it can be
|
||||
// cancelled by id
|
||||
if msg.IsCall() {
|
||||
pending := <-c.incomingBox
|
||||
c.incomingBox <- pending
|
||||
pending[msg.ID] = entry
|
||||
}
|
||||
// send the message to the incoming queue
|
||||
toQueue <- entry
|
||||
case *Response:
|
||||
// If method is not set, this should be a response, in which case we must
|
||||
// have an id to send the response back to the caller.
|
||||
c.incomingResponse(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) incomingResponse(msg *Response) {
|
||||
pending := <-c.outgoingBox
|
||||
response, ok := pending[msg.ID]
|
||||
if ok {
|
||||
delete(pending, msg.ID)
|
||||
}
|
||||
c.outgoingBox <- pending
|
||||
if response != nil {
|
||||
response <- msg
|
||||
}
|
||||
}
|
||||
|
||||
// manageQueue reads incoming requests, attempts to proccess them with the preempter, or queue them
|
||||
// up for normal handling.
|
||||
func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
|
||||
defer close(toDeliver)
|
||||
q := []*incoming{}
|
||||
ok := true
|
||||
for {
|
||||
var nextReq *incoming
|
||||
if len(q) == 0 {
|
||||
// no messages in the queue
|
||||
// if we were closing, then we are done
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
// not closing, but nothing in the queue, so just block waiting for a read
|
||||
nextReq, ok = <-fromRead
|
||||
} else {
|
||||
// we have a non empty queue, so pick whichever of reading or delivering
|
||||
// that we can make progress on
|
||||
select {
|
||||
case nextReq, ok = <-fromRead:
|
||||
case toDeliver <- q[0]:
|
||||
//TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
|
||||
q = q[1:]
|
||||
}
|
||||
}
|
||||
if nextReq != nil {
|
||||
// TODO: should we allow to limit the queue size?
|
||||
var result interface{}
|
||||
rerr := nextReq.handleCtx.Err()
|
||||
if rerr == nil {
|
||||
// only preempt if not already cancelled
|
||||
result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
|
||||
}
|
||||
switch {
|
||||
case rerr == ErrNotHandled:
|
||||
// message not handled, add it to the queue for the main handler
|
||||
q = append(q, nextReq)
|
||||
case rerr == ErrAsyncResponse:
|
||||
// message handled but the response will come later
|
||||
default:
|
||||
// anything else means the message is fully handled
|
||||
c.reply(nextReq, result, rerr)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
|
||||
defer c.async.done()
|
||||
for entry := range fromQueue {
|
||||
// cancel any messages in the queue that we have a pending cancel for
|
||||
var result interface{}
|
||||
rerr := entry.handleCtx.Err()
|
||||
if rerr == nil {
|
||||
// only deliver if not already cancelled
|
||||
result, rerr = handler.Handle(entry.handleCtx, entry.request)
|
||||
}
|
||||
switch {
|
||||
case rerr == ErrNotHandled:
|
||||
// message not handled, report it back to the caller as an error
|
||||
c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
|
||||
case rerr == ErrAsyncResponse:
|
||||
// message handled but the response will come later
|
||||
default:
|
||||
c.reply(entry, result, rerr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// reply is used to reply to an incoming request that has just been handled
|
||||
func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
|
||||
if entry.request.IsCall() {
|
||||
// we have a call finishing, remove it from the incoming map
|
||||
pending := <-c.incomingBox
|
||||
defer func() { c.incomingBox <- pending }()
|
||||
delete(pending, entry.request.ID)
|
||||
}
|
||||
if err := c.respond(entry, result, rerr); err != nil {
|
||||
// no way to propagate this error
|
||||
//TODO: should we do more than just log it?
|
||||
event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err)
|
||||
}
|
||||
}
|
||||
|
||||
// respond sends a response.
|
||||
// This is the code shared between reply and SendResponse.
|
||||
func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
|
||||
var err error
|
||||
if entry.request.IsCall() {
|
||||
// send the response
|
||||
if result == nil && rerr == nil {
|
||||
// call with no response, send an error anyway
|
||||
rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
|
||||
}
|
||||
var response *Response
|
||||
response, err = NewResponse(entry.request.ID, result, rerr)
|
||||
if err == nil {
|
||||
// we write the response with the base context, in case the message was cancelled
|
||||
err = c.write(entry.baseCtx, response)
|
||||
}
|
||||
} else {
|
||||
switch {
|
||||
case rerr != nil:
|
||||
// notification failed
|
||||
err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
|
||||
rerr = nil
|
||||
case result != nil:
|
||||
//notification produced a response, which is an error
|
||||
err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
|
||||
default:
|
||||
// normal notification finish
|
||||
}
|
||||
}
|
||||
switch {
|
||||
case rerr != nil || err != nil:
|
||||
event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR"))
|
||||
default:
|
||||
event.Label(entry.baseCtx, tag.StatusCode.Of("OK"))
|
||||
}
|
||||
// and just to be clean, invoke and clear the cancel if needed
|
||||
if entry.cancel != nil {
|
||||
entry.cancel()
|
||||
entry.cancel = nil
|
||||
}
|
||||
// mark the entire request processing as done
|
||||
entry.done()
|
||||
return err
|
||||
}
|
||||
|
||||
// write is used by all things that write outgoing messages, including replies.
|
||||
// it makes sure that writes are atomic
|
||||
func (c *Connection) write(ctx context.Context, msg Message) error {
|
||||
writer := <-c.writerBox
|
||||
defer func() { c.writerBox <- writer }()
|
||||
n, err := writer.Write(ctx, msg)
|
||||
event.Metric(ctx, tag.SentBytes.Of(n))
|
||||
return err
|
||||
}
|
|
@ -0,0 +1,179 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// Reader abstracts the transport mechanics from the JSON RPC protocol.
|
||||
// A Conn reads messages from the reader it was provided on construction,
|
||||
// and assumes that each call to Read fully transfers a single message,
|
||||
// or returns an error.
|
||||
// A reader is not safe for concurrent use, it is expected it will be used by
|
||||
// a single Conn in a safe manner.
|
||||
type Reader interface {
|
||||
// Read gets the next message from the stream.
|
||||
Read(context.Context) (Message, int64, error)
|
||||
}
|
||||
|
||||
// Writer abstracts the transport mechanics from the JSON RPC protocol.
|
||||
// A Conn writes messages using the writer it was provided on construction,
|
||||
// and assumes that each call to Write fully transfers a single message,
|
||||
// or returns an error.
|
||||
// A writer is not safe for concurrent use, it is expected it will be used by
|
||||
// a single Conn in a safe manner.
|
||||
type Writer interface {
|
||||
// Write sends a message to the stream.
|
||||
Write(context.Context, Message) (int64, error)
|
||||
}
|
||||
|
||||
// Framer wraps low level byte readers and writers into jsonrpc2 message
|
||||
// readers and writers.
|
||||
// It is responsible for the framing and encoding of messages into wire form.
|
||||
type Framer interface {
|
||||
// Reader wraps a byte reader into a message reader.
|
||||
Reader(rw io.Reader) Reader
|
||||
// Writer wraps a byte writer into a message writer.
|
||||
Writer(rw io.Writer) Writer
|
||||
}
|
||||
|
||||
// RawFramer returns a new Framer.
|
||||
// The messages are sent with no wrapping, and rely on json decode consistency
|
||||
// to determine message boundaries.
|
||||
func RawFramer() Framer { return rawFramer{} }
|
||||
|
||||
type rawFramer struct{}
|
||||
type rawReader struct{ in *json.Decoder }
|
||||
type rawWriter struct{ out io.Writer }
|
||||
|
||||
func (rawFramer) Reader(rw io.Reader) Reader {
|
||||
return &rawReader{in: json.NewDecoder(rw)}
|
||||
}
|
||||
|
||||
func (rawFramer) Writer(rw io.Writer) Writer {
|
||||
return &rawWriter{out: rw}
|
||||
}
|
||||
|
||||
func (r *rawReader) Read(ctx context.Context) (Message, int64, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
var raw json.RawMessage
|
||||
if err := r.in.Decode(&raw); err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
msg, err := DecodeMessage(raw)
|
||||
return msg, int64(len(raw)), err
|
||||
}
|
||||
|
||||
func (w *rawWriter) Write(ctx context.Context, msg Message) (int64, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
data, err := EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return 0, errors.Errorf("marshaling message: %v", err)
|
||||
}
|
||||
n, err := w.out.Write(data)
|
||||
return int64(n), err
|
||||
}
|
||||
|
||||
// HeaderFramer returns a new Framer.
|
||||
// The messages are sent with HTTP content length and MIME type headers.
|
||||
// This is the format used by LSP and others.
|
||||
func HeaderFramer() Framer { return headerFramer{} }
|
||||
|
||||
type headerFramer struct{}
|
||||
type headerReader struct{ in *bufio.Reader }
|
||||
type headerWriter struct{ out io.Writer }
|
||||
|
||||
func (headerFramer) Reader(rw io.Reader) Reader {
|
||||
return &headerReader{in: bufio.NewReader(rw)}
|
||||
}
|
||||
|
||||
func (headerFramer) Writer(rw io.Writer) Writer {
|
||||
return &headerWriter{out: rw}
|
||||
}
|
||||
|
||||
func (r *headerReader) Read(ctx context.Context) (Message, int64, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
var total, length int64
|
||||
// read the header, stop on the first empty line
|
||||
for {
|
||||
line, err := r.in.ReadString('\n')
|
||||
total += int64(len(line))
|
||||
if err != nil {
|
||||
return nil, total, errors.Errorf("failed reading header line: %w", err)
|
||||
}
|
||||
line = strings.TrimSpace(line)
|
||||
// check we have a header line
|
||||
if line == "" {
|
||||
break
|
||||
}
|
||||
colon := strings.IndexRune(line, ':')
|
||||
if colon < 0 {
|
||||
return nil, total, errors.Errorf("invalid header line %q", line)
|
||||
}
|
||||
name, value := line[:colon], strings.TrimSpace(line[colon+1:])
|
||||
switch name {
|
||||
case "Content-Length":
|
||||
if length, err = strconv.ParseInt(value, 10, 32); err != nil {
|
||||
return nil, total, errors.Errorf("failed parsing Content-Length: %v", value)
|
||||
}
|
||||
if length <= 0 {
|
||||
return nil, total, errors.Errorf("invalid Content-Length: %v", length)
|
||||
}
|
||||
default:
|
||||
// ignoring unknown headers
|
||||
}
|
||||
}
|
||||
if length == 0 {
|
||||
return nil, total, errors.Errorf("missing Content-Length header")
|
||||
}
|
||||
data := make([]byte, length)
|
||||
n, err := io.ReadFull(r.in, data)
|
||||
total += int64(n)
|
||||
if err != nil {
|
||||
return nil, total, err
|
||||
}
|
||||
msg, err := DecodeMessage(data)
|
||||
return msg, total, err
|
||||
}
|
||||
|
||||
func (w *headerWriter) Write(ctx context.Context, msg Message) (int64, error) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return 0, ctx.Err()
|
||||
default:
|
||||
}
|
||||
data, err := EncodeMessage(msg)
|
||||
if err != nil {
|
||||
return 0, errors.Errorf("marshaling message: %v", err)
|
||||
}
|
||||
n, err := fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data))
|
||||
total := int64(n)
|
||||
if err == nil {
|
||||
n, err = w.out.Write(data)
|
||||
total += int64(n)
|
||||
}
|
||||
return total, err
|
||||
}
|
|
@ -0,0 +1,84 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
|
||||
// https://www.jsonrpc.org/specification
|
||||
// It is intended to be compatible with other implementations at the wire level.
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrIdleTimeout is returned when serving timed out waiting for new connections.
|
||||
ErrIdleTimeout = errors.New("timed out waiting for new connections")
|
||||
// ErrNotHandled is returned from a handler to indicate it did not handle the
|
||||
// message.
|
||||
ErrNotHandled = errors.New("JSON RPC not handled")
|
||||
// ErrAsyncResponse is returned from a handler to indicate it will generate a
|
||||
// response asynchronously.
|
||||
ErrAsyncResponse = errors.New("JSON RPC asynchronous response")
|
||||
)
|
||||
|
||||
// Preempter handles messages on a connection before they are queued to the main
|
||||
// handler.
|
||||
// Primarily this is used for cancel handlers or notifications for which out of
|
||||
// order processing is not an issue.
|
||||
type Preempter interface {
|
||||
// Preempt is invoked for each incoming request before it is queued.
|
||||
// If the request is a call, it must return a value or an error for the reply.
|
||||
// Preempt should not block or start any new messages on the connection.
|
||||
Preempt(ctx context.Context, req *Request) (interface{}, error)
|
||||
}
|
||||
|
||||
// Handler handles messages on a connection.
|
||||
type Handler interface {
|
||||
// Handle is invoked for each incoming request.
|
||||
// If the request is a call, it must return a value or an error for the reply.
|
||||
Handle(ctx context.Context, req *Request) (interface{}, error)
|
||||
}
|
||||
|
||||
type defaultHandler struct{}
|
||||
|
||||
func (defaultHandler) Preempt(context.Context, *Request) (interface{}, error) {
|
||||
return nil, ErrNotHandled
|
||||
}
|
||||
|
||||
func (defaultHandler) Handle(context.Context, *Request) (interface{}, error) {
|
||||
return nil, ErrNotHandled
|
||||
}
|
||||
|
||||
// async is a small helper for things with an asynchronous result that you can
|
||||
// wait for.
|
||||
type async struct {
|
||||
ready chan struct{}
|
||||
errBox chan error
|
||||
}
|
||||
|
||||
func (a *async) init() {
|
||||
a.ready = make(chan struct{})
|
||||
a.errBox = make(chan error, 1)
|
||||
a.errBox <- nil
|
||||
}
|
||||
|
||||
func (a *async) done() {
|
||||
close(a.ready)
|
||||
}
|
||||
|
||||
func (a *async) wait() error {
|
||||
<-a.ready
|
||||
err := <-a.errBox
|
||||
a.errBox <- err
|
||||
return err
|
||||
}
|
||||
|
||||
func (a *async) setError(err error) {
|
||||
storedErr := <-a.errBox
|
||||
if storedErr == nil {
|
||||
storedErr = err
|
||||
}
|
||||
a.errBox <- storedErr
|
||||
}
|
|
@ -0,0 +1,389 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"path"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/tools/internal/event/export/eventtest"
|
||||
jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
|
||||
"golang.org/x/tools/internal/stack/stacktest"
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
var callTests = []invoker{
|
||||
call{"no_args", nil, true},
|
||||
call{"one_string", "fish", "got:fish"},
|
||||
call{"one_number", 10, "got:10"},
|
||||
call{"join", []string{"a", "b", "c"}, "a/b/c"},
|
||||
sequence{"notify", []invoker{
|
||||
notify{"set", 3},
|
||||
notify{"add", 5},
|
||||
call{"get", nil, 8},
|
||||
}},
|
||||
sequence{"preempt", []invoker{
|
||||
async{"a", "wait", "a"},
|
||||
notify{"unblock", "a"},
|
||||
collect{"a", true, false},
|
||||
}},
|
||||
sequence{"basic cancel", []invoker{
|
||||
async{"b", "wait", "b"},
|
||||
cancel{"b"},
|
||||
collect{"b", nil, true},
|
||||
}},
|
||||
sequence{"queue", []invoker{
|
||||
async{"a", "wait", "a"},
|
||||
notify{"set", 1},
|
||||
notify{"add", 2},
|
||||
notify{"add", 3},
|
||||
notify{"add", 4},
|
||||
call{"peek", nil, 0}, // accumulator will not have any adds yet
|
||||
notify{"unblock", "a"},
|
||||
collect{"a", true, false},
|
||||
call{"get", nil, 10}, // accumulator now has all the adds
|
||||
}},
|
||||
sequence{"fork", []invoker{
|
||||
async{"a", "fork", "a"},
|
||||
notify{"set", 1},
|
||||
notify{"add", 2},
|
||||
notify{"add", 3},
|
||||
notify{"add", 4},
|
||||
call{"get", nil, 10}, // fork will not have blocked the adds
|
||||
notify{"unblock", "a"},
|
||||
collect{"a", true, false},
|
||||
}},
|
||||
}
|
||||
|
||||
type binder struct {
|
||||
framer jsonrpc2.Framer
|
||||
runTest func(*handler)
|
||||
}
|
||||
|
||||
type handler struct {
|
||||
conn *jsonrpc2.Connection
|
||||
accumulator int
|
||||
waitersBox chan map[string]chan struct{}
|
||||
calls map[string]*jsonrpc2.AsyncCall
|
||||
}
|
||||
|
||||
type invoker interface {
|
||||
Name() string
|
||||
Invoke(t *testing.T, ctx context.Context, h *handler)
|
||||
}
|
||||
|
||||
type notify struct {
|
||||
method string
|
||||
params interface{}
|
||||
}
|
||||
|
||||
type call struct {
|
||||
method string
|
||||
params interface{}
|
||||
expect interface{}
|
||||
}
|
||||
|
||||
type async struct {
|
||||
name string
|
||||
method string
|
||||
params interface{}
|
||||
}
|
||||
|
||||
type collect struct {
|
||||
name string
|
||||
expect interface{}
|
||||
fails bool
|
||||
}
|
||||
|
||||
type cancel struct {
|
||||
name string
|
||||
}
|
||||
|
||||
type sequence struct {
|
||||
name string
|
||||
tests []invoker
|
||||
}
|
||||
|
||||
type echo call
|
||||
|
||||
type cancelParams struct{ ID int64 }
|
||||
|
||||
func TestConnectionRaw(t *testing.T) {
|
||||
testConnection(t, jsonrpc2.RawFramer())
|
||||
}
|
||||
|
||||
func TestConnectionHeader(t *testing.T) {
|
||||
testConnection(t, jsonrpc2.HeaderFramer())
|
||||
}
|
||||
|
||||
func testConnection(t *testing.T, framer jsonrpc2.Framer) {
|
||||
stacktest.NoLeak(t)
|
||||
ctx := eventtest.NewContext(context.Background(), t)
|
||||
listener, err := jsonrpc2.NetPipe(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
server, err := jsonrpc2.Serve(ctx, listener, binder{framer, nil}, jsonrpc2.ServeOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer func() {
|
||||
listener.Close()
|
||||
server.Wait()
|
||||
}()
|
||||
|
||||
for _, test := range callTests {
|
||||
t.Run(test.Name(), func(t *testing.T) {
|
||||
client, err := jsonrpc2.Dial(ctx,
|
||||
listener.Dialer(), binder{framer, func(h *handler) {
|
||||
defer h.conn.Close()
|
||||
ctx := eventtest.NewContext(ctx, t)
|
||||
test.Invoke(t, ctx, h)
|
||||
if call, ok := test.(*call); ok {
|
||||
// also run all simple call tests in echo mode
|
||||
(*echo)(call).Invoke(t, ctx, h)
|
||||
}
|
||||
}})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
client.Wait()
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func (test notify) Name() string { return test.method }
|
||||
func (test notify) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
if err := h.conn.Notify(ctx, test.method, test.params); err != nil {
|
||||
t.Fatalf("%v:Notify failed: %v", test.method, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (test call) Name() string { return test.method }
|
||||
func (test call) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
results := newResults(test.expect)
|
||||
if err := h.conn.Call(ctx, test.method, test.params).Await(ctx, results); err != nil {
|
||||
t.Fatalf("%v:Call failed: %v", test.method, err)
|
||||
}
|
||||
verifyResults(t, test.method, results, test.expect)
|
||||
}
|
||||
|
||||
func (test echo) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
results := newResults(test.expect)
|
||||
if err := h.conn.Call(ctx, "echo", []interface{}{test.method, test.params}).Await(ctx, results); err != nil {
|
||||
t.Fatalf("%v:Echo failed: %v", test.method, err)
|
||||
}
|
||||
verifyResults(t, test.method, results, test.expect)
|
||||
}
|
||||
|
||||
func (test async) Name() string { return test.name }
|
||||
func (test async) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
h.calls[test.name] = h.conn.Call(ctx, test.method, test.params)
|
||||
}
|
||||
|
||||
func (test collect) Name() string { return test.name }
|
||||
func (test collect) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
o := h.calls[test.name]
|
||||
results := newResults(test.expect)
|
||||
err := o.Await(ctx, results)
|
||||
switch {
|
||||
case test.fails && err == nil:
|
||||
t.Fatalf("%v:Collect was supposed to fail", test.name)
|
||||
case !test.fails && err != nil:
|
||||
t.Fatalf("%v:Collect failed: %v", test.name, err)
|
||||
}
|
||||
verifyResults(t, test.name, results, test.expect)
|
||||
}
|
||||
|
||||
func (test cancel) Name() string { return test.name }
|
||||
func (test cancel) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
o := h.calls[test.name]
|
||||
if err := h.conn.Notify(ctx, "cancel", &cancelParams{o.ID().Raw().(int64)}); err != nil {
|
||||
t.Fatalf("%v:Collect failed: %v", test.name, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (test sequence) Name() string { return test.name }
|
||||
func (test sequence) Invoke(t *testing.T, ctx context.Context, h *handler) {
|
||||
for _, child := range test.tests {
|
||||
child.Invoke(t, ctx, h)
|
||||
}
|
||||
}
|
||||
|
||||
// newResults makes a new empty copy of the expected type to put the results into
|
||||
func newResults(expect interface{}) interface{} {
|
||||
switch e := expect.(type) {
|
||||
case []interface{}:
|
||||
var r []interface{}
|
||||
for _, v := range e {
|
||||
r = append(r, reflect.New(reflect.TypeOf(v)).Interface())
|
||||
}
|
||||
return r
|
||||
case nil:
|
||||
return nil
|
||||
default:
|
||||
return reflect.New(reflect.TypeOf(expect)).Interface()
|
||||
}
|
||||
}
|
||||
|
||||
// verifyResults compares the results to the expected values
|
||||
func verifyResults(t *testing.T, method string, results interface{}, expect interface{}) {
|
||||
if expect == nil {
|
||||
if results != nil {
|
||||
t.Errorf("%v:Got results %+v where none expeted", method, expect)
|
||||
}
|
||||
return
|
||||
}
|
||||
val := reflect.Indirect(reflect.ValueOf(results)).Interface()
|
||||
if !reflect.DeepEqual(val, expect) {
|
||||
t.Errorf("%v:Results are incorrect, got %+v expect %+v", method, val, expect)
|
||||
}
|
||||
}
|
||||
|
||||
func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) {
|
||||
h := &handler{
|
||||
conn: conn,
|
||||
waitersBox: make(chan map[string]chan struct{}, 1),
|
||||
calls: make(map[string]*jsonrpc2.AsyncCall),
|
||||
}
|
||||
h.waitersBox <- make(map[string]chan struct{})
|
||||
if b.runTest != nil {
|
||||
go b.runTest(h)
|
||||
}
|
||||
return jsonrpc2.ConnectionOptions{
|
||||
Framer: b.framer,
|
||||
Preempter: h,
|
||||
Handler: h,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (h *handler) waiter(name string) chan struct{} {
|
||||
waiters := <-h.waitersBox
|
||||
defer func() { h.waitersBox <- waiters }()
|
||||
waiter, found := waiters[name]
|
||||
if !found {
|
||||
waiter = make(chan struct{})
|
||||
waiters[name] = waiter
|
||||
}
|
||||
return waiter
|
||||
}
|
||||
|
||||
func (h *handler) Preempt(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
|
||||
switch req.Method {
|
||||
case "unblock":
|
||||
var name string
|
||||
if err := json.Unmarshal(req.Params, &name); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
close(h.waiter(name))
|
||||
return nil, nil
|
||||
case "peek":
|
||||
if len(req.Params) > 0 {
|
||||
return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
|
||||
}
|
||||
return h.accumulator, nil
|
||||
case "cancel":
|
||||
var params cancelParams
|
||||
if err := json.Unmarshal(req.Params, ¶ms); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
h.conn.Cancel(jsonrpc2.Int64ID(params.ID))
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, jsonrpc2.ErrNotHandled
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
|
||||
switch req.Method {
|
||||
case "no_args":
|
||||
if len(req.Params) > 0 {
|
||||
return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
|
||||
}
|
||||
return true, nil
|
||||
case "one_string":
|
||||
var v string
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
return "got:" + v, nil
|
||||
case "one_number":
|
||||
var v int
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
return fmt.Sprintf("got:%d", v), nil
|
||||
case "set":
|
||||
var v int
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
h.accumulator = v
|
||||
return nil, nil
|
||||
case "add":
|
||||
var v int
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
h.accumulator += v
|
||||
return nil, nil
|
||||
case "get":
|
||||
if len(req.Params) > 0 {
|
||||
return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
|
||||
}
|
||||
return h.accumulator, nil
|
||||
case "join":
|
||||
var v []string
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
return path.Join(v...), nil
|
||||
case "echo":
|
||||
var v []interface{}
|
||||
if err := json.Unmarshal(req.Params, &v); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
var result interface{}
|
||||
err := h.conn.Call(ctx, v[0].(string), v[1]).Await(ctx, &result)
|
||||
return result, err
|
||||
case "wait":
|
||||
var name string
|
||||
if err := json.Unmarshal(req.Params, &name); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
select {
|
||||
case <-h.waiter(name):
|
||||
return true, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
case <-time.After(time.Second):
|
||||
return nil, errors.Errorf("wait for %q timed out", name)
|
||||
}
|
||||
case "fork":
|
||||
var name string
|
||||
if err := json.Unmarshal(req.Params, &name); err != nil {
|
||||
return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
|
||||
}
|
||||
waitFor := h.waiter(name)
|
||||
go func() {
|
||||
select {
|
||||
case <-waitFor:
|
||||
h.conn.Respond(req.ID, true, nil)
|
||||
case <-ctx.Done():
|
||||
h.conn.Respond(req.ID, nil, ctx.Err())
|
||||
case <-time.After(time.Second):
|
||||
h.conn.Respond(req.ID, nil, errors.Errorf("wait for %q timed out", name))
|
||||
}
|
||||
}()
|
||||
return nil, jsonrpc2.ErrAsyncResponse
|
||||
default:
|
||||
return nil, jsonrpc2.ErrNotHandled
|
||||
}
|
||||
}
|
|
@ -0,0 +1,181 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// ID is a Request identifier.
|
||||
type ID struct {
|
||||
value interface{}
|
||||
}
|
||||
|
||||
// Message is the interface to all jsonrpc2 message types.
|
||||
// They share no common functionality, but are a closed set of concrete types
|
||||
// that are allowed to implement this interface. The message types are *Request
|
||||
// and *Response.
|
||||
type Message interface {
|
||||
// marshal builds the wire form from the API form.
|
||||
// It is private, which makes the set of Message implementations closed.
|
||||
marshal(to *wireCombined)
|
||||
}
|
||||
|
||||
// Request is a Message sent to a peer to request behavior.
|
||||
// If it has an ID it is a call, otherwise it is a notification.
|
||||
type Request struct {
|
||||
// ID of this request, used to tie the Response back to the request.
|
||||
// This will be nil for notifications.
|
||||
ID ID
|
||||
// Method is a string containing the method name to invoke.
|
||||
Method string
|
||||
// Params is either a struct or an array with the parameters of the method.
|
||||
Params json.RawMessage
|
||||
}
|
||||
|
||||
// Response is a Message used as a reply to a call Request.
|
||||
// It will have the same ID as the call it is a response to.
|
||||
type Response struct {
|
||||
// result is the content of the response.
|
||||
Result json.RawMessage
|
||||
// err is set only if the call failed.
|
||||
Error error
|
||||
// id of the request this is a response to.
|
||||
ID ID
|
||||
}
|
||||
|
||||
// StringID creates a new string request identifier.
|
||||
func StringID(s string) ID { return ID{value: s} }
|
||||
|
||||
// Int64ID creates a new integer request identifier.
|
||||
func Int64ID(i int64) ID { return ID{value: i} }
|
||||
|
||||
// IsValid returns true if the ID is a valid identifier.
|
||||
// The default value for ID will return false.
|
||||
func (id ID) IsValid() bool { return id.value != nil }
|
||||
|
||||
// Raw returns the underlying value of the ID.
|
||||
func (id ID) Raw() interface{} { return id.value }
|
||||
|
||||
// NewNotification constructs a new Notification message for the supplied
|
||||
// method and parameters.
|
||||
func NewNotification(method string, params interface{}) (*Request, error) {
|
||||
p, merr := marshalToRaw(params)
|
||||
return &Request{Method: method, Params: p}, merr
|
||||
}
|
||||
|
||||
// NewCall constructs a new Call message for the supplied ID, method and
|
||||
// parameters.
|
||||
func NewCall(id ID, method string, params interface{}) (*Request, error) {
|
||||
p, merr := marshalToRaw(params)
|
||||
return &Request{ID: id, Method: method, Params: p}, merr
|
||||
}
|
||||
|
||||
func (msg *Request) IsCall() bool { return msg.ID.IsValid() }
|
||||
|
||||
func (msg *Request) marshal(to *wireCombined) {
|
||||
to.ID = msg.ID.value
|
||||
to.Method = msg.Method
|
||||
to.Params = msg.Params
|
||||
}
|
||||
|
||||
// NewResponse constructs a new Response message that is a reply to the
|
||||
// supplied. If err is set result may be ignored.
|
||||
func NewResponse(id ID, result interface{}, rerr error) (*Response, error) {
|
||||
r, merr := marshalToRaw(result)
|
||||
return &Response{ID: id, Result: r, Error: rerr}, merr
|
||||
}
|
||||
|
||||
func (msg *Response) marshal(to *wireCombined) {
|
||||
to.ID = msg.ID.value
|
||||
to.Error = toWireError(msg.Error)
|
||||
to.Result = msg.Result
|
||||
}
|
||||
|
||||
func toWireError(err error) *wireError {
|
||||
if err == nil {
|
||||
// no error, the response is complete
|
||||
return nil
|
||||
}
|
||||
if err, ok := err.(*wireError); ok {
|
||||
// already a wire error, just use it
|
||||
return err
|
||||
}
|
||||
result := &wireError{Message: err.Error()}
|
||||
var wrapped *wireError
|
||||
if errors.As(err, &wrapped) {
|
||||
// if we wrapped a wire error, keep the code from the wrapped error
|
||||
// but the message from the outer error
|
||||
result.Code = wrapped.Code
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func EncodeMessage(msg Message) ([]byte, error) {
|
||||
wire := wireCombined{VersionTag: wireVersion}
|
||||
msg.marshal(&wire)
|
||||
data, err := json.Marshal(&wire)
|
||||
if err != nil {
|
||||
return data, errors.Errorf("marshaling jsonrpc message: %w", err)
|
||||
}
|
||||
return data, nil
|
||||
}
|
||||
|
||||
func DecodeMessage(data []byte) (Message, error) {
|
||||
msg := wireCombined{}
|
||||
if err := json.Unmarshal(data, &msg); err != nil {
|
||||
return nil, errors.Errorf("unmarshaling jsonrpc message: %w", err)
|
||||
}
|
||||
if msg.VersionTag != wireVersion {
|
||||
return nil, errors.Errorf("invalid message version tag %s expected %s", msg.VersionTag, wireVersion)
|
||||
}
|
||||
id := ID{}
|
||||
switch v := msg.ID.(type) {
|
||||
case nil:
|
||||
case float64:
|
||||
// coerce the id type to int64 if it is float64, the spec does not allow fractional parts
|
||||
id = Int64ID(int64(v))
|
||||
case int64:
|
||||
id = Int64ID(v)
|
||||
case string:
|
||||
id = StringID(v)
|
||||
default:
|
||||
return nil, errors.Errorf("invalid message id type <%T>%v", v, v)
|
||||
}
|
||||
if msg.Method != "" {
|
||||
// has a method, must be a call
|
||||
return &Request{
|
||||
Method: msg.Method,
|
||||
ID: id,
|
||||
Params: msg.Params,
|
||||
}, nil
|
||||
}
|
||||
// no method, should be a response
|
||||
if !id.IsValid() {
|
||||
return nil, ErrInvalidRequest
|
||||
}
|
||||
resp := &Response{
|
||||
ID: id,
|
||||
Result: msg.Result,
|
||||
}
|
||||
// we have to check if msg.Error is nil to avoid a typed error
|
||||
if msg.Error != nil {
|
||||
resp.Error = msg.Error
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func marshalToRaw(obj interface{}) (json.RawMessage, error) {
|
||||
if obj == nil {
|
||||
return nil, nil
|
||||
}
|
||||
data, err := json.Marshal(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return json.RawMessage(data), nil
|
||||
}
|
|
@ -0,0 +1,129 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
// This file contains implementations of the transport primitives that use the standard network
|
||||
// package.
|
||||
|
||||
// NetListenOptions is the optional arguments to the NetListen function.
|
||||
type NetListenOptions struct {
|
||||
NetListenConfig net.ListenConfig
|
||||
NetDialer net.Dialer
|
||||
}
|
||||
|
||||
// NetListener returns a new Listener that listents on a socket using the net package.
|
||||
func NetListener(ctx context.Context, network, address string, options NetListenOptions) (Listener, error) {
|
||||
ln, err := options.NetListenConfig.Listen(ctx, network, address)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &netListener{net: ln}, nil
|
||||
}
|
||||
|
||||
// netListener is the implementation of Listener for connections made using the net package.
|
||||
type netListener struct {
|
||||
net net.Listener
|
||||
}
|
||||
|
||||
// Accept blocks waiting for an incoming connection to the listener.
|
||||
func (l *netListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
return l.net.Accept()
|
||||
}
|
||||
|
||||
// Close will cause the listener to stop listening. It will not close any connections that have
|
||||
// already been accepted.
|
||||
func (l *netListener) Close() error {
|
||||
addr := l.net.Addr()
|
||||
err := l.net.Close()
|
||||
if addr.Network() == "unix" {
|
||||
rerr := os.Remove(addr.String())
|
||||
if rerr != nil && err == nil {
|
||||
err = rerr
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Dialer returns a dialer that can be used to connect to the listener.
|
||||
func (l *netListener) Dialer() Dialer {
|
||||
return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{
|
||||
Timeout: 5 * time.Second,
|
||||
})
|
||||
}
|
||||
|
||||
// NetDialer returns a Dialer using the supplied standard network dialer.
|
||||
func NetDialer(network, address string, nd net.Dialer) Dialer {
|
||||
return &netDialer{
|
||||
network: network,
|
||||
address: address,
|
||||
dialer: nd,
|
||||
}
|
||||
}
|
||||
|
||||
type netDialer struct {
|
||||
network string
|
||||
address string
|
||||
dialer net.Dialer
|
||||
}
|
||||
|
||||
func (n *netDialer) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
return n.dialer.DialContext(ctx, n.network, n.address)
|
||||
}
|
||||
|
||||
// NetPipe returns a new Listener that listens using net.Pipe.
|
||||
// It is only possibly to connect to it using the Dialier returned by the
|
||||
// Dialer method, each call to that method will generate a new pipe the other
|
||||
// side of which will be returnd from the Accept call.
|
||||
func NetPipe(ctx context.Context) (Listener, error) {
|
||||
return &netPiper{
|
||||
done: make(chan struct{}),
|
||||
dialed: make(chan io.ReadWriteCloser),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// netPiper is the implementation of Listener build on top of net.Pipes.
|
||||
type netPiper struct {
|
||||
done chan struct{}
|
||||
dialed chan io.ReadWriteCloser
|
||||
}
|
||||
|
||||
// Accept blocks waiting for an incoming connection to the listener.
|
||||
func (l *netPiper) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
// block until we have a listener, or are closed or cancelled
|
||||
select {
|
||||
case rwc := <-l.dialed:
|
||||
return rwc, nil
|
||||
case <-l.done:
|
||||
return nil, io.EOF
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// Close will cause the listener to stop listening. It will not close any connections that have
|
||||
// already been accepted.
|
||||
func (l *netPiper) Close() error {
|
||||
// unblock any accept calls that are pending
|
||||
close(l.done)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *netPiper) Dialer() Dialer {
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
|
||||
client, server := net.Pipe()
|
||||
l.dialed <- server
|
||||
return client, nil
|
||||
}
|
|
@ -0,0 +1,208 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"time"
|
||||
|
||||
"golang.org/x/tools/internal/event"
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
// Listener is implemented by protocols to accept new inbound connections.
|
||||
type Listener interface {
|
||||
// Accept an inbound connection to a server.
|
||||
// It must block until an inbound connection is made, or the listener is
|
||||
// shut down.
|
||||
Accept(context.Context) (io.ReadWriteCloser, error)
|
||||
|
||||
// Close is used to ask a listener to stop accepting new connections.
|
||||
Close() error
|
||||
|
||||
// Dialer returns a dialer that can be used to connect to this listener
|
||||
// locally.
|
||||
// If a listener does not implement this it will return a nil.
|
||||
Dialer() Dialer
|
||||
}
|
||||
|
||||
// Dialer is used by clients to dial a server.
|
||||
type Dialer interface {
|
||||
// Dial returns a new communication byte stream to a listening server.
|
||||
Dial(ctx context.Context) (io.ReadWriteCloser, error)
|
||||
}
|
||||
|
||||
// Server is a running server that is accepting incoming connections.
|
||||
type Server struct {
|
||||
listener Listener
|
||||
binder Binder
|
||||
options ServeOptions // a copy of the config that started this server
|
||||
async async
|
||||
}
|
||||
|
||||
// ServeOptions holds the options to the Serve function.
|
||||
//TODO: kill ServeOptions and push timeout into the listener
|
||||
type ServeOptions struct {
|
||||
// IdleTimeout is the maximum amount of time to remain idle and running.
|
||||
IdleTimeout time.Duration
|
||||
}
|
||||
|
||||
// Dial uses the dialer to make a new connection, wraps the returned
|
||||
// reader and writer using the framer to make a stream, and then builds
|
||||
// a connection on top of that stream using the binder.
|
||||
func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
|
||||
// dial a server
|
||||
rwc, err := dialer.Dial(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return newConnection(ctx, rwc, binder)
|
||||
}
|
||||
|
||||
// Serve starts a new server listening for incoming connections and returns
|
||||
// it.
|
||||
// This returns a fully running and connected server, it does not block on
|
||||
// the listener.
|
||||
// You can call Wait to block on the server, or Shutdown to get the sever to
|
||||
// terminate gracefully.
|
||||
// To notice incoming connections, use an intercepting Binder.
|
||||
func Serve(ctx context.Context, listener Listener, binder Binder, options ServeOptions) (*Server, error) {
|
||||
server := &Server{
|
||||
listener: listener,
|
||||
binder: binder,
|
||||
options: options,
|
||||
}
|
||||
server.async.init()
|
||||
go server.run(ctx)
|
||||
return server, nil
|
||||
}
|
||||
|
||||
// Wait returns only when the server has shut down.
|
||||
func (s *Server) Wait() error {
|
||||
return s.async.wait()
|
||||
}
|
||||
|
||||
// run accepts incoming connections from the listener,
|
||||
// If IdleTimeout is non-zero, run exits after there are no clients for this
|
||||
// duration, otherwise it exits only on error.
|
||||
func (s *Server) run(ctx context.Context) {
|
||||
defer s.async.done()
|
||||
// Max duration: ~290 years; surely that's long enough.
|
||||
const forever = 1<<63 - 1
|
||||
idleTimeout := s.options.IdleTimeout
|
||||
if idleTimeout <= 0 {
|
||||
idleTimeout = forever
|
||||
}
|
||||
idleTimer := time.NewTimer(idleTimeout)
|
||||
|
||||
// run a goroutine that listens for incoming connections and posts them
|
||||
// back to the worker
|
||||
newStreams := make(chan io.ReadWriteCloser)
|
||||
go func() {
|
||||
for {
|
||||
// we never close the accepted connection, we rely on the other end
|
||||
// closing or the socket closing itself naturally
|
||||
rwc, err := s.listener.Accept(ctx)
|
||||
if err != nil {
|
||||
if !isClosingError(err) {
|
||||
event.Error(ctx, "Accept", err)
|
||||
}
|
||||
// signal we are done generating new connections for good
|
||||
close(newStreams)
|
||||
return
|
||||
}
|
||||
newStreams <- rwc
|
||||
}
|
||||
}()
|
||||
|
||||
closedConns := make(chan struct{})
|
||||
activeConns := 0
|
||||
lnClosed := false
|
||||
for {
|
||||
select {
|
||||
case rwc := <-newStreams:
|
||||
// whatever happes we are not idle anymore
|
||||
idleTimer.Stop()
|
||||
if rwc == nil {
|
||||
// the net listener has been closed
|
||||
lnClosed = true
|
||||
if activeConns == 0 {
|
||||
// accept is done and there are no active connections, so just stop now
|
||||
return
|
||||
}
|
||||
// replace the channel with one that will never trigger
|
||||
// this is save because the only writer has already quit
|
||||
newStreams = nil
|
||||
// and then wait for all active connections to stop
|
||||
continue
|
||||
}
|
||||
// a new inbound connection,
|
||||
conn, err := newConnection(ctx, rwc, s.binder)
|
||||
if err != nil {
|
||||
if !isClosingError(err) {
|
||||
event.Error(ctx, "NewConn", err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
// register the new conn as active
|
||||
activeConns++
|
||||
// wrap the conn in a close monitor
|
||||
//TODO: we do this to maintain our active count correctly, is there a better way?
|
||||
go func() {
|
||||
err := conn.Wait()
|
||||
if err != nil && !isClosingError(err) {
|
||||
event.Error(ctx, "closed a connection", err)
|
||||
}
|
||||
closedConns <- struct{}{}
|
||||
}()
|
||||
case <-closedConns:
|
||||
activeConns--
|
||||
if activeConns == 0 {
|
||||
// no more active connections, restart the idle timer
|
||||
if lnClosed {
|
||||
// we can never get a new connection, so we are done
|
||||
return
|
||||
}
|
||||
// we are idle, but might get a new connection still
|
||||
idleTimer.Reset(idleTimeout)
|
||||
}
|
||||
case <-idleTimer.C:
|
||||
// no activity for a while, time to stop serving
|
||||
s.async.setError(ErrIdleTimeout)
|
||||
return
|
||||
case <-ctx.Done():
|
||||
s.async.setError(ctx.Err())
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// isClosingError reports if the error occurs normally during the process of
|
||||
// closing a network connection. It uses imperfect heuristics that err on the
|
||||
// side of false negatives, and should not be used for anything critical.
|
||||
func isClosingError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
// fully unwrap the error, so the following tests work
|
||||
for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) {
|
||||
err = wrapped
|
||||
}
|
||||
|
||||
// was it based on an EOF error?
|
||||
if err == io.EOF {
|
||||
return true
|
||||
}
|
||||
|
||||
// Per https://github.com/golang/go/issues/4373, this error string should not
|
||||
// change. This is not ideal, but since the worst that could happen here is
|
||||
// some superfluous logging, it is acceptable.
|
||||
if err.Error() == "use of closed network connection" {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
|
@ -0,0 +1,144 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
|
||||
"golang.org/x/tools/internal/stack/stacktest"
|
||||
)
|
||||
|
||||
func TestIdleTimeout(t *testing.T) {
|
||||
stacktest.NoLeak(t)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
listener, err := jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer listener.Close()
|
||||
|
||||
server, err := jsonrpc2.Serve(ctx, listener, jsonrpc2.ConnectionOptions{},
|
||||
jsonrpc2.ServeOptions{
|
||||
IdleTimeout: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
connect := func() *jsonrpc2.Connection {
|
||||
client, err := jsonrpc2.Dial(ctx,
|
||||
listener.Dialer(),
|
||||
jsonrpc2.ConnectionOptions{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return client
|
||||
}
|
||||
// Exercise some connection/disconnection patterns, and then assert that when
|
||||
// our timer fires, the server exits.
|
||||
conn1 := connect()
|
||||
conn2 := connect()
|
||||
if err := conn1.Close(); err != nil {
|
||||
t.Fatalf("conn1.Close failed with error: %v", err)
|
||||
}
|
||||
if err := conn2.Close(); err != nil {
|
||||
t.Fatalf("conn2.Close failed with error: %v", err)
|
||||
}
|
||||
conn3 := connect()
|
||||
if err := conn3.Close(); err != nil {
|
||||
t.Fatalf("conn3.Close failed with error: %v", err)
|
||||
}
|
||||
|
||||
serverError := server.Wait()
|
||||
|
||||
if !errors.Is(serverError, jsonrpc2.ErrIdleTimeout) {
|
||||
t.Errorf("run() returned error %v, want %v", serverError, jsonrpc2.ErrIdleTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
type msg struct {
|
||||
Msg string
|
||||
}
|
||||
|
||||
type fakeHandler struct{}
|
||||
|
||||
func (fakeHandler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
|
||||
switch req.Method {
|
||||
case "ping":
|
||||
return &msg{"pong"}, nil
|
||||
default:
|
||||
return nil, jsonrpc2.ErrNotHandled
|
||||
}
|
||||
}
|
||||
|
||||
func TestServe(t *testing.T) {
|
||||
stacktest.NoLeak(t)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
factory func(context.Context) (jsonrpc2.Listener, error)
|
||||
}{
|
||||
{"tcp", func(ctx context.Context) (jsonrpc2.Listener, error) {
|
||||
return jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
|
||||
}},
|
||||
{"pipe", func(ctx context.Context) (jsonrpc2.Listener, error) {
|
||||
return jsonrpc2.NetPipe(ctx)
|
||||
}},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.name, func(t *testing.T) {
|
||||
fake, err := test.factory(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
conn, shutdown, err := newFake(ctx, fake)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer shutdown(ctx)
|
||||
var got msg
|
||||
if err := conn.Call(ctx, "ping", &msg{"ting"}).Await(ctx, &got); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if want := "pong"; got.Msg != want {
|
||||
t.Errorf("conn.Call(...): returned %q, want %q", got, want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func newFake(ctx context.Context, l jsonrpc2.Listener) (*jsonrpc2.Connection, func(context.Context), error) {
|
||||
server, err := jsonrpc2.Serve(ctx, l, jsonrpc2.ConnectionOptions{
|
||||
Handler: fakeHandler{},
|
||||
}, jsonrpc2.ServeOptions{
|
||||
IdleTimeout: 100 * time.Millisecond,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
client, err := jsonrpc2.Dial(ctx,
|
||||
l.Dialer(),
|
||||
jsonrpc2.ConnectionOptions{
|
||||
Handler: fakeHandler{},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return client, func(ctx context.Context) {
|
||||
l.Close()
|
||||
client.Close()
|
||||
server.Wait()
|
||||
}, nil
|
||||
}
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright 2018 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// This file contains the go forms of the wire specification.
|
||||
// see http://www.jsonrpc.org/specification for details
|
||||
|
||||
var (
|
||||
// ErrUnknown should be used for all non coded errors.
|
||||
ErrUnknown = NewError(-32001, "JSON RPC unknown error")
|
||||
// ErrParse is used when invalid JSON was received by the server.
|
||||
ErrParse = NewError(-32700, "JSON RPC parse error")
|
||||
// ErrInvalidRequest is used when the JSON sent is not a valid Request object.
|
||||
ErrInvalidRequest = NewError(-32600, "JSON RPC invalid request")
|
||||
// ErrMethodNotFound should be returned by the handler when the method does
|
||||
// not exist / is not available.
|
||||
ErrMethodNotFound = NewError(-32601, "JSON RPC method not found")
|
||||
// ErrInvalidParams should be returned by the handler when method
|
||||
// parameter(s) were invalid.
|
||||
ErrInvalidParams = NewError(-32602, "JSON RPC invalid params")
|
||||
// ErrInternal indicates a failure to process a call correctly
|
||||
ErrInternal = NewError(-32603, "JSON RPC internal error")
|
||||
|
||||
// The following errors are not part of the json specification, but
|
||||
// compliant extensions specific to this implimentation.
|
||||
|
||||
// ErrServerOverloaded is returned when a message was refused due to a
|
||||
// server being temporarily unable to accept any new messages.
|
||||
ErrServerOverloaded = NewError(-32000, "JSON RPC overloaded")
|
||||
)
|
||||
|
||||
const wireVersion = "2.0"
|
||||
|
||||
// wireCombined has all the fields of both Request and Response.
|
||||
// We can decode this and then work out which it is.
|
||||
type wireCombined struct {
|
||||
VersionTag string `json:"jsonrpc"`
|
||||
ID interface{} `json:"id,omitempty"`
|
||||
Method string `json:"method,omitempty"`
|
||||
Params json.RawMessage `json:"params,omitempty"`
|
||||
Result json.RawMessage `json:"result,omitempty"`
|
||||
Error *wireError `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// wireError represents a structured error in a Response.
|
||||
type wireError struct {
|
||||
// Code is an error code indicating the type of failure.
|
||||
Code int64 `json:"code"`
|
||||
// Message is a short description of the error.
|
||||
Message string `json:"message"`
|
||||
// Data is optional structured data containing additional information about the error.
|
||||
Data json.RawMessage `json:"data,omitempty"`
|
||||
}
|
||||
|
||||
// NewError returns an error that will encode on the wire correctly.
|
||||
// The standard codes are made available from this package, this function should
|
||||
// only be used to build errors for application specific codes as allowed by the
|
||||
// specification.
|
||||
func NewError(code int64, message string) error {
|
||||
return &wireError{
|
||||
Code: code,
|
||||
Message: message,
|
||||
}
|
||||
}
|
||||
|
||||
func (err *wireError) Error() string {
|
||||
return err.Message
|
||||
}
|
|
@ -0,0 +1,118 @@
|
|||
// Copyright 2020 The Go Authors. All rights reserved.
|
||||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package jsonrpc2_test
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
|
||||
)
|
||||
|
||||
func TestWireMessage(t *testing.T) {
|
||||
for _, test := range []struct {
|
||||
name string
|
||||
msg jsonrpc2.Message
|
||||
encoded []byte
|
||||
}{{
|
||||
name: "notification",
|
||||
msg: newNotification("alive", nil),
|
||||
encoded: []byte(`{"jsonrpc":"2.0","method":"alive"}`),
|
||||
}, {
|
||||
name: "call",
|
||||
msg: newCall("msg1", "ping", nil),
|
||||
encoded: []byte(`{"jsonrpc":"2.0","id":"msg1","method":"ping"}`),
|
||||
}, {
|
||||
name: "response",
|
||||
msg: newResponse("msg2", "pong", nil),
|
||||
encoded: []byte(`{"jsonrpc":"2.0","id":"msg2","result":"pong"}`),
|
||||
}, {
|
||||
name: "numerical id",
|
||||
msg: newCall(1, "poke", nil),
|
||||
encoded: []byte(`{"jsonrpc":"2.0","id":1,"method":"poke"}`),
|
||||
}, {
|
||||
// originally reported in #39719, this checks that result is not present if
|
||||
// it is an error response
|
||||
name: "computing fix edits",
|
||||
msg: newResponse(3, nil, jsonrpc2.NewError(0, "computing fix edits")),
|
||||
encoded: []byte(`{
|
||||
"jsonrpc":"2.0",
|
||||
"id":3,
|
||||
"error":{
|
||||
"code":0,
|
||||
"message":"computing fix edits"
|
||||
}
|
||||
}`),
|
||||
}} {
|
||||
b, err := jsonrpc2.EncodeMessage(test.msg)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
checkJSON(t, b, test.encoded)
|
||||
msg, err := jsonrpc2.DecodeMessage(test.encoded)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if !reflect.DeepEqual(msg, test.msg) {
|
||||
t.Errorf("decoded message does not match\nGot:\n%+#v\nWant:\n%+#v", msg, test.msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func newNotification(method string, params interface{}) jsonrpc2.Message {
|
||||
msg, err := jsonrpc2.NewNotification(method, params)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func newID(id interface{}) jsonrpc2.ID {
|
||||
switch v := id.(type) {
|
||||
case nil:
|
||||
return jsonrpc2.ID{}
|
||||
case string:
|
||||
return jsonrpc2.StringID(v)
|
||||
case int:
|
||||
return jsonrpc2.Int64ID(int64(v))
|
||||
case int64:
|
||||
return jsonrpc2.Int64ID(v)
|
||||
default:
|
||||
panic("invalid ID type")
|
||||
}
|
||||
}
|
||||
|
||||
func newCall(id interface{}, method string, params interface{}) jsonrpc2.Message {
|
||||
msg, err := jsonrpc2.NewCall(newID(id), method, params)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func newResponse(id interface{}, result interface{}, rerr error) jsonrpc2.Message {
|
||||
msg, err := jsonrpc2.NewResponse(newID(id), result, rerr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return msg
|
||||
}
|
||||
|
||||
func checkJSON(t *testing.T, got, want []byte) {
|
||||
// compare the compact form, to allow for formatting differences
|
||||
g := &bytes.Buffer{}
|
||||
if err := json.Compact(g, []byte(got)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
w := &bytes.Buffer{}
|
||||
if err := json.Compact(w, []byte(want)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if g.String() != w.String() {
|
||||
t.Errorf("encoded message does not match\nGot:\n%s\nWant:\n%s", g, w)
|
||||
}
|
||||
}
|
Загрузка…
Ссылка в новой задаче