зеркало из https://github.com/github/vitess-gh.git
rpc: fix go lint issues without breaking the API
This commit is contained in:
Родитель
27380cc193
Коммит
40affe702e
|
@ -1,3 +1,4 @@
|
|||
// Package auth provides authentication codecs
|
||||
package auth
|
||||
|
||||
import (
|
||||
|
@ -46,7 +47,10 @@ func (c *cramMD5Credentials) Load(filename string) error {
|
|||
const CRAMMD5MaxRequests = 2
|
||||
|
||||
var (
|
||||
AuthenticationServer = rpc.NewServer()
|
||||
// AuthenticationServer holds a default server for authentication
|
||||
AuthenticationServer = rpc.NewServer()
|
||||
|
||||
// DefaultAuthenticatorCRAMMD5 holds a default authenticator
|
||||
DefaultAuthenticatorCRAMMD5 = NewAuthenticatorCRAMMD5()
|
||||
)
|
||||
|
||||
|
@ -54,6 +58,7 @@ var (
|
|||
// authenticate.
|
||||
var AuthenticationFailed = errors.New("authentication error: authentication failed")
|
||||
|
||||
// NewAuthenticatorCRAMMD5 creates a new CRAM-MD5 authenticator
|
||||
func NewAuthenticatorCRAMMD5() *AuthenticatorCRAMMD5 {
|
||||
return &AuthenticatorCRAMMD5{make(cramMD5Credentials)}
|
||||
}
|
||||
|
@ -113,12 +118,15 @@ func (a *AuthenticatorCRAMMD5) Authenticate(ctx context.Context, req *Authentica
|
|||
return AuthenticationFailed
|
||||
}
|
||||
|
||||
// GetNewChallengeReply holds reply data for Challenge
|
||||
type GetNewChallengeReply struct {
|
||||
Challenge string
|
||||
}
|
||||
|
||||
// AuthenticateReply holds reply data for Authenticate
|
||||
type AuthenticateReply struct{}
|
||||
|
||||
// AuthenticateRequest holds request data for AuthenticateRequest
|
||||
type AuthenticateRequest struct {
|
||||
Proof string
|
||||
state authenticationState
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
// CRAMMD5GetChallenge creates a new RFC822 compatible ID
|
||||
func CRAMMD5GetChallenge() (string, error) {
|
||||
var randDigits uint32
|
||||
err := binary.Read(rand.Reader, binary.LittleEndian, &randDigits)
|
||||
|
@ -27,6 +28,7 @@ func CRAMMD5GetChallenge() (string, error) {
|
|||
return fmt.Sprintf("<%d.%d@%s>", randDigits, timestamp, hostname), nil
|
||||
}
|
||||
|
||||
// CRAMMD5GetExpected creates a "possible" ID with the given credentials
|
||||
func CRAMMD5GetExpected(username, secret, challenge string) string {
|
||||
var ret []byte
|
||||
hash := hmac.New(md5.New, []byte(secret))
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package bsonrpc provides codecs for bsonrpc communication
|
||||
package bsonrpc
|
||||
|
||||
import (
|
||||
|
@ -20,16 +21,21 @@ const (
|
|||
codecName = "bson"
|
||||
)
|
||||
|
||||
// ClientCodec holds required parameters for providing a client codec for
|
||||
// bsonrpc
|
||||
type ClientCodec struct {
|
||||
rwc io.ReadWriteCloser
|
||||
}
|
||||
|
||||
// NewClientCodec creates a new client codec for bsonrpc communication
|
||||
func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec {
|
||||
return &ClientCodec{conn}
|
||||
}
|
||||
|
||||
// DefaultBufferSize holds the default value for buffer size
|
||||
const DefaultBufferSize = 4096
|
||||
|
||||
// WriteRequest sends the request to the server
|
||||
func (cc *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error {
|
||||
buf := bytes2.NewChunkedWriter(DefaultBufferSize)
|
||||
if err := bson.MarshalToBuffer(buf, &RequestBson{r}); err != nil {
|
||||
|
@ -42,35 +48,44 @@ func (cc *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// ReadResponseHeader reads the header of server response
|
||||
func (cc *ClientCodec) ReadResponseHeader(r *rpc.Response) error {
|
||||
return bson.UnmarshalFromStream(cc.rwc, &ResponseBson{r})
|
||||
}
|
||||
|
||||
// ReadResponseBody reads the body of server response
|
||||
func (cc *ClientCodec) ReadResponseBody(body interface{}) error {
|
||||
return bson.UnmarshalFromStream(cc.rwc, body)
|
||||
}
|
||||
|
||||
// Close closes the codec
|
||||
func (cc *ClientCodec) Close() error {
|
||||
return cc.rwc.Close()
|
||||
}
|
||||
|
||||
// ServerCodec holds required parameters for providing a server codec for
|
||||
// bsonrpc
|
||||
type ServerCodec struct {
|
||||
rwc io.ReadWriteCloser
|
||||
cw *bytes2.ChunkedWriter
|
||||
}
|
||||
|
||||
// NewServerCodec creates a new server codec for bsonrpc communication
|
||||
func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec {
|
||||
return &ServerCodec{conn, bytes2.NewChunkedWriter(DefaultBufferSize)}
|
||||
}
|
||||
|
||||
// ReadRequestHeader reads the header of the request
|
||||
func (sc *ServerCodec) ReadRequestHeader(r *rpc.Request) error {
|
||||
return bson.UnmarshalFromStream(sc.rwc, &RequestBson{r})
|
||||
}
|
||||
|
||||
// ReadRequestBody reads the body of the request
|
||||
func (sc *ServerCodec) ReadRequestBody(body interface{}) error {
|
||||
return bson.UnmarshalFromStream(sc.rwc, body)
|
||||
}
|
||||
|
||||
// WriteResponse send the response of the request to the client
|
||||
func (sc *ServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error {
|
||||
if err := bson.MarshalToBuffer(sc.cw, &ResponseBson{r}); err != nil {
|
||||
return err
|
||||
|
@ -83,26 +98,33 @@ func (sc *ServerCodec) WriteResponse(r *rpc.Response, body interface{}, last boo
|
|||
return err
|
||||
}
|
||||
|
||||
// Close closes the codec
|
||||
func (sc *ServerCodec) Close() error {
|
||||
return sc.rwc.Close()
|
||||
}
|
||||
|
||||
// DialHTTP dials a HTTP endpoint with bsonrpc codec
|
||||
func DialHTTP(network, address string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) {
|
||||
return rpcwrap.DialHTTP(network, address, codecName, NewClientCodec, connectTimeout, config)
|
||||
}
|
||||
|
||||
// DialAuthHTTP dials a HTTP endpoint with bsonrpc codec as authentication enabled
|
||||
func DialAuthHTTP(network, address, user, password string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) {
|
||||
return rpcwrap.DialAuthHTTP(network, address, user, password, codecName, NewClientCodec, connectTimeout, config)
|
||||
}
|
||||
|
||||
// ServeRPC serves bsonrpc codec with the default rpc server
|
||||
func ServeRPC() {
|
||||
rpcwrap.ServeRPC(codecName, NewServerCodec)
|
||||
}
|
||||
|
||||
// ServeAuthRPC serves bsonrpc codec with the default authentication enabled rpc
|
||||
// server
|
||||
func ServeAuthRPC() {
|
||||
rpcwrap.ServeAuthRPC(codecName, NewServerCodec)
|
||||
}
|
||||
|
||||
// ServeCustomRPC serves bsonrpc codec with a custom rpc server
|
||||
func ServeCustomRPC(handler *http.ServeMux, server *rpc.Server, useAuth bool) {
|
||||
rpcwrap.ServeCustomRPC(handler, server, useAuth, codecName, NewServerCodec)
|
||||
}
|
||||
|
|
|
@ -12,10 +12,12 @@ import (
|
|||
rpc "github.com/youtube/vitess/go/rpcplus"
|
||||
)
|
||||
|
||||
// RequestBson provides bson rpc request parameters
|
||||
type RequestBson struct {
|
||||
*rpc.Request
|
||||
}
|
||||
|
||||
// MarshalBson marshals request to the given writer with optional prefix
|
||||
func (req *RequestBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
||||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
@ -26,6 +28,8 @@ func (req *RequestBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
|||
lenWriter.Close()
|
||||
}
|
||||
|
||||
// UnmarshalBson unmarshals request to the given byte buffer as verifying the
|
||||
// kind
|
||||
func (req *RequestBson) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
||||
bson.VerifyObject(kind)
|
||||
bson.Next(buf, 4)
|
||||
|
@ -45,10 +49,12 @@ func (req *RequestBson) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
|||
}
|
||||
}
|
||||
|
||||
// ResponseBson provides bson rpc request parameters
|
||||
type ResponseBson struct {
|
||||
*rpc.Response
|
||||
}
|
||||
|
||||
// MarshalBson marshals response to the given writer with optional prefix
|
||||
func (resp *ResponseBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
||||
bson.EncodeOptionalPrefix(buf, bson.Object, key)
|
||||
lenWriter := bson.NewLenWriter(buf)
|
||||
|
@ -60,6 +66,8 @@ func (resp *ResponseBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
|
|||
lenWriter.Close()
|
||||
}
|
||||
|
||||
// UnmarshalBson unmarshals response to the given byte buffer as verifying the
|
||||
// kind
|
||||
func (resp *ResponseBson) UnmarshalBson(buf *bytes.Buffer, kind byte) {
|
||||
bson.VerifyObject(kind)
|
||||
bson.Next(buf, 4)
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package jsonrpc provides wrappers for json rpc communication
|
||||
package jsonrpc
|
||||
|
||||
import (
|
||||
|
@ -13,14 +14,18 @@ import (
|
|||
"github.com/youtube/vitess/go/rpcwrap"
|
||||
)
|
||||
|
||||
// DialHTTP dials a json rpc HTTP endpoint with optional TLS config
|
||||
func DialHTTP(network, address string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) {
|
||||
return rpcwrap.DialHTTP(network, address, "json", oldjson.NewClientCodec, connectTimeout, config)
|
||||
}
|
||||
|
||||
// ServeRPC serves a json rpc endpoint using default server
|
||||
func ServeRPC() {
|
||||
rpcwrap.ServeRPC("json", oldjson.NewServerCodec)
|
||||
}
|
||||
|
||||
// ServeAuthRPC serves a json rpc endpoint using authentication enabled default
|
||||
// server
|
||||
func ServeAuthRPC() {
|
||||
rpcwrap.ServeAuthRPC("json", oldjson.NewServerCodec)
|
||||
}
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
// Package proto provides protocol functions
|
||||
package proto
|
||||
|
||||
import (
|
||||
|
@ -56,6 +57,7 @@ func SetUsername(ctx context.Context, username string) (ok bool) {
|
|||
return true
|
||||
}
|
||||
|
||||
// NewContext creates a default context satisfying context.Context
|
||||
func NewContext(remoteAddr string) context.Context {
|
||||
return &rpcContext{remoteAddr: remoteAddr}
|
||||
}
|
||||
|
@ -70,14 +72,17 @@ func (ctx *rpcContext) Deadline() (deadline time.Time, ok bool) {
|
|||
return time.Time{}, false
|
||||
}
|
||||
|
||||
// Done channel for cancelation.
|
||||
func (ctx *rpcContext) Done() <-chan struct{} {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Err is stub for interface function
|
||||
func (ctx *rpcContext) Err() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Value returns only predefined variables if already set
|
||||
func (ctx *rpcContext) Value(key interface{}) interface{} {
|
||||
k, ok := key.(contextKey)
|
||||
if !ok {
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
// Package rpcwrap provides wrappers for rpcplus package
|
||||
package rpcwrap
|
||||
|
||||
import (
|
||||
|
@ -31,20 +32,24 @@ var (
|
|||
connAccepted = stats.NewInt("connection-accepted")
|
||||
)
|
||||
|
||||
// ClientCodecFactory holds pattern for other client codec factories
|
||||
type ClientCodecFactory func(conn io.ReadWriteCloser) rpc.ClientCodec
|
||||
|
||||
// BufferedConnection holds connection data for codecs
|
||||
type BufferedConnection struct {
|
||||
isClosed bool
|
||||
*bufio.Reader
|
||||
io.WriteCloser
|
||||
}
|
||||
|
||||
// NewBufferedConnection creates a new Buffered Connection
|
||||
func NewBufferedConnection(conn io.ReadWriteCloser) *BufferedConnection {
|
||||
connCount.Add(1)
|
||||
connAccepted.Add(1)
|
||||
return &BufferedConnection{false, bufio.NewReader(conn), conn}
|
||||
}
|
||||
|
||||
// Close closes the buffered connection
|
||||
// FIXME(sougou/szopa): Find a better way to track connection count.
|
||||
func (bc *BufferedConnection) Close() error {
|
||||
if !bc.isClosed {
|
||||
|
@ -119,6 +124,7 @@ func dialHTTP(network, address, codecName string, cFactory ClientCodecFactory, a
|
|||
return nil, &net.OpError{Op: "dial-http", Net: network + " " + address, Addr: nil, Err: err}
|
||||
}
|
||||
|
||||
// ServerCodecFactory holds pattern for other server codec factories
|
||||
type ServerCodecFactory func(conn io.ReadWriteCloser) rpc.ServerCodec
|
||||
|
||||
// ServeRPC handles rpc requests using the hijack scheme of rpc
|
||||
|
@ -186,7 +192,7 @@ func GetRpcPath(codecName string, auth bool) string {
|
|||
return path
|
||||
}
|
||||
|
||||
// ServeCustomRPC serves the given http rpc requests with the provided ServeMux,
|
||||
// ServeHTTPRPC serves the given http rpc requests with the provided ServeMux,
|
||||
// does not support built-in authentication
|
||||
func ServeHTTPRPC(
|
||||
handler *http.ServeMux,
|
||||
|
@ -197,7 +203,7 @@ func ServeHTTPRPC(
|
|||
|
||||
handler.Handle(
|
||||
GetRpcPath(codecName, false),
|
||||
&httpRpcHandler{
|
||||
&httpRPCHandler{
|
||||
cFactory: cFactory,
|
||||
server: server,
|
||||
contextCreator: contextCreator,
|
||||
|
@ -205,9 +211,9 @@ func ServeHTTPRPC(
|
|||
)
|
||||
}
|
||||
|
||||
// httpRpcHandler handles rpc queries for a all types of HTTP requests, does not
|
||||
// httpRPCHandler handles rpc queries for a all types of HTTP requests, does not
|
||||
// maintain a persistent connection.
|
||||
type httpRpcHandler struct {
|
||||
type httpRPCHandler struct {
|
||||
cFactory ServerCodecFactory
|
||||
server *rpc.Server
|
||||
// contextCreator creates an application specific context, while creating
|
||||
|
@ -217,7 +223,7 @@ type httpRpcHandler struct {
|
|||
}
|
||||
|
||||
// ServeHTTP implements http.Handler's ServeHTTP
|
||||
func (h *httpRpcHandler) ServeHTTP(c http.ResponseWriter, req *http.Request) {
|
||||
func (h *httpRPCHandler) ServeHTTP(c http.ResponseWriter, req *http.Request) {
|
||||
codec := h.cFactory(&httpReadWriteCloser{rw: c, req: req})
|
||||
|
||||
var ctx context.Context
|
||||
|
|
Загрузка…
Ссылка в новой задаче