diff --git a/go/rpcwrap/auth/authentication.go b/go/rpcwrap/auth/authentication.go index ff473b28a2..c6f5a8f8cf 100644 --- a/go/rpcwrap/auth/authentication.go +++ b/go/rpcwrap/auth/authentication.go @@ -1,3 +1,4 @@ +// Package auth provides authentication codecs package auth import ( @@ -46,7 +47,10 @@ func (c *cramMD5Credentials) Load(filename string) error { const CRAMMD5MaxRequests = 2 var ( - AuthenticationServer = rpc.NewServer() + // AuthenticationServer holds a default server for authentication + AuthenticationServer = rpc.NewServer() + + // DefaultAuthenticatorCRAMMD5 holds a default authenticator DefaultAuthenticatorCRAMMD5 = NewAuthenticatorCRAMMD5() ) @@ -54,6 +58,7 @@ var ( // authenticate. var AuthenticationFailed = errors.New("authentication error: authentication failed") +// NewAuthenticatorCRAMMD5 creates a new CRAM-MD5 authenticator func NewAuthenticatorCRAMMD5() *AuthenticatorCRAMMD5 { return &AuthenticatorCRAMMD5{make(cramMD5Credentials)} } @@ -113,12 +118,15 @@ func (a *AuthenticatorCRAMMD5) Authenticate(ctx context.Context, req *Authentica return AuthenticationFailed } +// GetNewChallengeReply holds reply data for Challenge type GetNewChallengeReply struct { Challenge string } +// AuthenticateReply holds reply data for Authenticate type AuthenticateReply struct{} +// AuthenticateRequest holds request data for AuthenticateRequest type AuthenticateRequest struct { Proof string state authenticationState diff --git a/go/rpcwrap/auth/crammd5.go b/go/rpcwrap/auth/crammd5.go index bc3a4cf132..cdec2a7323 100644 --- a/go/rpcwrap/auth/crammd5.go +++ b/go/rpcwrap/auth/crammd5.go @@ -11,6 +11,7 @@ import ( "time" ) +// CRAMMD5GetChallenge creates a new RFC822 compatible ID func CRAMMD5GetChallenge() (string, error) { var randDigits uint32 err := binary.Read(rand.Reader, binary.LittleEndian, &randDigits) @@ -27,6 +28,7 @@ func CRAMMD5GetChallenge() (string, error) { return fmt.Sprintf("<%d.%d@%s>", randDigits, timestamp, hostname), nil } +// CRAMMD5GetExpected creates a "possible" ID with the given credentials func CRAMMD5GetExpected(username, secret, challenge string) string { var ret []byte hash := hmac.New(md5.New, []byte(secret)) diff --git a/go/rpcwrap/bsonrpc/codecs.go b/go/rpcwrap/bsonrpc/codecs.go index de093f0970..0831f0fa21 100644 --- a/go/rpcwrap/bsonrpc/codecs.go +++ b/go/rpcwrap/bsonrpc/codecs.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// Package bsonrpc provides codecs for bsonrpc communication package bsonrpc import ( @@ -20,16 +21,21 @@ const ( codecName = "bson" ) +// ClientCodec holds required parameters for providing a client codec for +// bsonrpc type ClientCodec struct { rwc io.ReadWriteCloser } +// NewClientCodec creates a new client codec for bsonrpc communication func NewClientCodec(conn io.ReadWriteCloser) rpc.ClientCodec { return &ClientCodec{conn} } +// DefaultBufferSize holds the default value for buffer size const DefaultBufferSize = 4096 +// WriteRequest sends the request to the server func (cc *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error { buf := bytes2.NewChunkedWriter(DefaultBufferSize) if err := bson.MarshalToBuffer(buf, &RequestBson{r}); err != nil { @@ -42,35 +48,44 @@ func (cc *ClientCodec) WriteRequest(r *rpc.Request, body interface{}) error { return err } +// ReadResponseHeader reads the header of server response func (cc *ClientCodec) ReadResponseHeader(r *rpc.Response) error { return bson.UnmarshalFromStream(cc.rwc, &ResponseBson{r}) } +// ReadResponseBody reads the body of server response func (cc *ClientCodec) ReadResponseBody(body interface{}) error { return bson.UnmarshalFromStream(cc.rwc, body) } +// Close closes the codec func (cc *ClientCodec) Close() error { return cc.rwc.Close() } +// ServerCodec holds required parameters for providing a server codec for +// bsonrpc type ServerCodec struct { rwc io.ReadWriteCloser cw *bytes2.ChunkedWriter } +// NewServerCodec creates a new server codec for bsonrpc communication func NewServerCodec(conn io.ReadWriteCloser) rpc.ServerCodec { return &ServerCodec{conn, bytes2.NewChunkedWriter(DefaultBufferSize)} } +// ReadRequestHeader reads the header of the request func (sc *ServerCodec) ReadRequestHeader(r *rpc.Request) error { return bson.UnmarshalFromStream(sc.rwc, &RequestBson{r}) } +// ReadRequestBody reads the body of the request func (sc *ServerCodec) ReadRequestBody(body interface{}) error { return bson.UnmarshalFromStream(sc.rwc, body) } +// WriteResponse send the response of the request to the client func (sc *ServerCodec) WriteResponse(r *rpc.Response, body interface{}, last bool) error { if err := bson.MarshalToBuffer(sc.cw, &ResponseBson{r}); err != nil { return err @@ -83,26 +98,33 @@ func (sc *ServerCodec) WriteResponse(r *rpc.Response, body interface{}, last boo return err } +// Close closes the codec func (sc *ServerCodec) Close() error { return sc.rwc.Close() } +// DialHTTP dials a HTTP endpoint with bsonrpc codec func DialHTTP(network, address string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) { return rpcwrap.DialHTTP(network, address, codecName, NewClientCodec, connectTimeout, config) } +// DialAuthHTTP dials a HTTP endpoint with bsonrpc codec as authentication enabled func DialAuthHTTP(network, address, user, password string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) { return rpcwrap.DialAuthHTTP(network, address, user, password, codecName, NewClientCodec, connectTimeout, config) } +// ServeRPC serves bsonrpc codec with the default rpc server func ServeRPC() { rpcwrap.ServeRPC(codecName, NewServerCodec) } +// ServeAuthRPC serves bsonrpc codec with the default authentication enabled rpc +// server func ServeAuthRPC() { rpcwrap.ServeAuthRPC(codecName, NewServerCodec) } +// ServeCustomRPC serves bsonrpc codec with a custom rpc server func ServeCustomRPC(handler *http.ServeMux, server *rpc.Server, useAuth bool) { rpcwrap.ServeCustomRPC(handler, server, useAuth, codecName, NewServerCodec) } diff --git a/go/rpcwrap/bsonrpc/custom_codecs.go b/go/rpcwrap/bsonrpc/custom_codecs.go index fc50ca18c9..2988afd0ba 100644 --- a/go/rpcwrap/bsonrpc/custom_codecs.go +++ b/go/rpcwrap/bsonrpc/custom_codecs.go @@ -12,10 +12,12 @@ import ( rpc "github.com/youtube/vitess/go/rpcplus" ) +// RequestBson provides bson rpc request parameters type RequestBson struct { *rpc.Request } +// MarshalBson marshals request to the given writer with optional prefix func (req *RequestBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) { bson.EncodeOptionalPrefix(buf, bson.Object, key) lenWriter := bson.NewLenWriter(buf) @@ -26,6 +28,8 @@ func (req *RequestBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) { lenWriter.Close() } +// UnmarshalBson unmarshals request to the given byte buffer as verifying the +// kind func (req *RequestBson) UnmarshalBson(buf *bytes.Buffer, kind byte) { bson.VerifyObject(kind) bson.Next(buf, 4) @@ -45,10 +49,12 @@ func (req *RequestBson) UnmarshalBson(buf *bytes.Buffer, kind byte) { } } +// ResponseBson provides bson rpc request parameters type ResponseBson struct { *rpc.Response } +// MarshalBson marshals response to the given writer with optional prefix func (resp *ResponseBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) { bson.EncodeOptionalPrefix(buf, bson.Object, key) lenWriter := bson.NewLenWriter(buf) @@ -60,6 +66,8 @@ func (resp *ResponseBson) MarshalBson(buf *bytes2.ChunkedWriter, key string) { lenWriter.Close() } +// UnmarshalBson unmarshals response to the given byte buffer as verifying the +// kind func (resp *ResponseBson) UnmarshalBson(buf *bytes.Buffer, kind byte) { bson.VerifyObject(kind) bson.Next(buf, 4) diff --git a/go/rpcwrap/jsonrpc/wrapper.go b/go/rpcwrap/jsonrpc/wrapper.go index ba1977a009..1bbc91de71 100644 --- a/go/rpcwrap/jsonrpc/wrapper.go +++ b/go/rpcwrap/jsonrpc/wrapper.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// Package jsonrpc provides wrappers for json rpc communication package jsonrpc import ( @@ -13,14 +14,18 @@ import ( "github.com/youtube/vitess/go/rpcwrap" ) +// DialHTTP dials a json rpc HTTP endpoint with optional TLS config func DialHTTP(network, address string, connectTimeout time.Duration, config *tls.Config) (*rpc.Client, error) { return rpcwrap.DialHTTP(network, address, "json", oldjson.NewClientCodec, connectTimeout, config) } +// ServeRPC serves a json rpc endpoint using default server func ServeRPC() { rpcwrap.ServeRPC("json", oldjson.NewServerCodec) } +// ServeAuthRPC serves a json rpc endpoint using authentication enabled default +// server func ServeAuthRPC() { rpcwrap.ServeAuthRPC("json", oldjson.NewServerCodec) } diff --git a/go/rpcwrap/proto/proto.go b/go/rpcwrap/proto/proto.go index a11a7fe867..b3211232d8 100644 --- a/go/rpcwrap/proto/proto.go +++ b/go/rpcwrap/proto/proto.go @@ -1,3 +1,4 @@ +// Package proto provides protocol functions package proto import ( @@ -56,6 +57,7 @@ func SetUsername(ctx context.Context, username string) (ok bool) { return true } +// NewContext creates a default context satisfying context.Context func NewContext(remoteAddr string) context.Context { return &rpcContext{remoteAddr: remoteAddr} } @@ -70,14 +72,17 @@ func (ctx *rpcContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false } +// Done channel for cancelation. func (ctx *rpcContext) Done() <-chan struct{} { return nil } +// Err is stub for interface function func (ctx *rpcContext) Err() error { return nil } +// Value returns only predefined variables if already set func (ctx *rpcContext) Value(key interface{}) interface{} { k, ok := key.(contextKey) if !ok { diff --git a/go/rpcwrap/rpcwrap.go b/go/rpcwrap/rpcwrap.go index d7be43882a..735e244ca9 100644 --- a/go/rpcwrap/rpcwrap.go +++ b/go/rpcwrap/rpcwrap.go @@ -2,6 +2,7 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. +// Package rpcwrap provides wrappers for rpcplus package package rpcwrap import ( @@ -31,20 +32,24 @@ var ( connAccepted = stats.NewInt("connection-accepted") ) +// ClientCodecFactory holds pattern for other client codec factories type ClientCodecFactory func(conn io.ReadWriteCloser) rpc.ClientCodec +// BufferedConnection holds connection data for codecs type BufferedConnection struct { isClosed bool *bufio.Reader io.WriteCloser } +// NewBufferedConnection creates a new Buffered Connection func NewBufferedConnection(conn io.ReadWriteCloser) *BufferedConnection { connCount.Add(1) connAccepted.Add(1) return &BufferedConnection{false, bufio.NewReader(conn), conn} } +// Close closes the buffered connection // FIXME(sougou/szopa): Find a better way to track connection count. func (bc *BufferedConnection) Close() error { if !bc.isClosed { @@ -119,6 +124,7 @@ func dialHTTP(network, address, codecName string, cFactory ClientCodecFactory, a return nil, &net.OpError{Op: "dial-http", Net: network + " " + address, Addr: nil, Err: err} } +// ServerCodecFactory holds pattern for other server codec factories type ServerCodecFactory func(conn io.ReadWriteCloser) rpc.ServerCodec // ServeRPC handles rpc requests using the hijack scheme of rpc @@ -186,7 +192,7 @@ func GetRpcPath(codecName string, auth bool) string { return path } -// ServeCustomRPC serves the given http rpc requests with the provided ServeMux, +// ServeHTTPRPC serves the given http rpc requests with the provided ServeMux, // does not support built-in authentication func ServeHTTPRPC( handler *http.ServeMux, @@ -197,7 +203,7 @@ func ServeHTTPRPC( handler.Handle( GetRpcPath(codecName, false), - &httpRpcHandler{ + &httpRPCHandler{ cFactory: cFactory, server: server, contextCreator: contextCreator, @@ -205,9 +211,9 @@ func ServeHTTPRPC( ) } -// httpRpcHandler handles rpc queries for a all types of HTTP requests, does not +// httpRPCHandler handles rpc queries for a all types of HTTP requests, does not // maintain a persistent connection. -type httpRpcHandler struct { +type httpRPCHandler struct { cFactory ServerCodecFactory server *rpc.Server // contextCreator creates an application specific context, while creating @@ -217,7 +223,7 @@ type httpRpcHandler struct { } // ServeHTTP implements http.Handler's ServeHTTP -func (h *httpRpcHandler) ServeHTTP(c http.ResponseWriter, req *http.Request) { +func (h *httpRPCHandler) ServeHTTP(c http.ResponseWriter, req *http.Request) { codec := h.cFactory(&httpReadWriteCloser{rw: c, req: req}) var ctx context.Context