Add unit test for binlog player / streamer RPC. Handle panics correctly too.

This commit is contained in:
Alain Jobart 2015-05-11 07:55:50 -07:00
Родитель 8f3e17ee5d
Коммит 9302678856
5 изменённых файлов: 310 добавлений и 24 удалений

Просмотреть файл

@ -0,0 +1,208 @@
// Copyright 2015, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package binlogplayertest
import (
"fmt"
"reflect"
"strings"
"testing"
"time"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/binlog/binlogplayer"
"github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/key"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
// FakeBinlogStreamer is our implementation of UpdateStream
type FakeBinlogStreamer struct {
t *testing.T
panics bool
}
// NewFakeBinlogStreamer returns the test instance for UpdateStream
func NewFakeBinlogStreamer(t *testing.T) *FakeBinlogStreamer {
return &FakeBinlogStreamer{
t: t,
panics: false,
}
}
// ServeUpdateStream is part of the the UpdateStream interface
func (fake *FakeBinlogStreamer) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error {
if fake.panics {
panic(fmt.Errorf("test-triggered panic"))
}
return nil
}
//
// StreamKeyRange tests
//
var testKeyRangeRequest = &proto.KeyRangeRequest{
Position: myproto.ReplicationPosition{
GTIDSet: myproto.MariadbGTID{
Domain: 1,
Server: 3456,
Sequence: 7890,
},
},
KeyspaceIdType: key.KIT_UINT64,
KeyRange: key.KeyRange{
Start: key.Uint64Key(0x7000000000000000).KeyspaceId(),
End: key.Uint64Key(0x9000000000000000).KeyspaceId(),
},
Charset: &mproto.Charset{
Client: 12,
Conn: 13,
Server: 14,
},
}
var testBinlogTransaction = &proto.BinlogTransaction{
Statements: []proto.Statement{
proto.Statement{
Category: proto.BL_ROLLBACK,
Charset: &mproto.Charset{
Client: 120,
Conn: 130,
Server: 140,
},
Sql: []byte("my statement"),
},
},
Timestamp: 78,
GTIDField: myproto.GTIDField{
Value: myproto.MariadbGTID{
Domain: 1,
Server: 345,
Sequence: 789,
},
},
}
// StreamKeyRange is part of the the UpdateStream interface
func (fake *FakeBinlogStreamer) StreamKeyRange(req *proto.KeyRangeRequest, sendReply func(reply *proto.BinlogTransaction) error) error {
if fake.panics {
panic(fmt.Errorf("test-triggered panic"))
}
sendReply(testBinlogTransaction)
return nil
}
func testStreamKeyRange(t *testing.T, bpc binlogplayer.BinlogPlayerClient) {
c := make(chan *proto.BinlogTransaction)
bpr := bpc.StreamKeyRange(testKeyRangeRequest, c)
if se, ok := <-c; !ok {
t.Fatalf("got no response")
} else {
if !reflect.DeepEqual(*se, *testBinlogTransaction) {
t.Errorf("got wrong result, got %v expected %v", *se, *testBinlogTransaction)
}
}
if se, ok := <-c; ok {
t.Fatalf("got a response when error expected: %v", se)
}
if err := bpr.Error(); err != nil {
t.Errorf("got unexpected error: %v", err)
}
}
func testStreamKeyRangePanics(t *testing.T, bpc binlogplayer.BinlogPlayerClient) {
c := make(chan *proto.BinlogTransaction)
bpr := bpc.StreamKeyRange(testKeyRangeRequest, c)
if se, ok := <-c; ok {
t.Fatalf("got a response when error expected: %v", se)
}
err := bpr.Error()
if err == nil || !strings.Contains(err.Error(), "test-triggered panic") {
t.Errorf("wrong error from panic: %v", err)
}
}
//
// StreamTables test
//
var testTablesRequest = &proto.TablesRequest{
Position: myproto.ReplicationPosition{
GTIDSet: myproto.MariadbGTID{
Domain: 1,
Server: 345,
Sequence: 789,
},
},
Tables: []string{"table1", "table2"},
Charset: &mproto.Charset{
Client: 12,
Conn: 13,
Server: 14,
},
}
// StreamTables is part of the the UpdateStream interface
func (fake *FakeBinlogStreamer) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) error {
if fake.panics {
panic(fmt.Errorf("test-triggered panic"))
}
sendReply(testBinlogTransaction)
return nil
}
func testStreamTables(t *testing.T, bpc binlogplayer.BinlogPlayerClient) {
c := make(chan *proto.BinlogTransaction)
bpr := bpc.StreamTables(testTablesRequest, c)
if se, ok := <-c; !ok {
t.Fatalf("got no response")
} else {
if !reflect.DeepEqual(*se, *testBinlogTransaction) {
t.Errorf("got wrong result, got %v expected %v", *se, *testBinlogTransaction)
}
}
if se, ok := <-c; ok {
t.Fatalf("got a response when error expected: %v", se)
}
if err := bpr.Error(); err != nil {
t.Errorf("got unexpected error: %v", err)
}
}
func testStreamTablesPanics(t *testing.T, bpc binlogplayer.BinlogPlayerClient) {
c := make(chan *proto.BinlogTransaction)
bpr := bpc.StreamTables(testTablesRequest, c)
if se, ok := <-c; ok {
t.Fatalf("got a response when error expected: %v", se)
}
err := bpr.Error()
if err == nil || !strings.Contains(err.Error(), "test-triggered panic") {
t.Errorf("wrong error from panic: %v", err)
}
}
// HandlePanic is part of the the UpdateStream interface
func (fake *FakeBinlogStreamer) HandlePanic(err *error) {
if x := recover(); x != nil {
*err = fmt.Errorf("Caught panic: %v", x)
}
}
// Run runs the test suite
func Run(t *testing.T, bpc binlogplayer.BinlogPlayerClient, addr string, fake *FakeBinlogStreamer) {
if err := bpc.Dial(addr, 30*time.Second); err != nil {
t.Fatalf("Dial failed: %v", err)
}
// no panic
testStreamKeyRange(t, bpc)
testStreamTables(t, bpc)
// panic now, and test
fake.panics = true
testStreamKeyRangePanics(t, bpc)
testStreamTablesPanics(t, bpc)
}

Просмотреть файл

@ -0,0 +1,45 @@
// Copyright 2014, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package gorpcbinlogplayer
import (
"net"
"net/http"
"testing"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/binlog/binlogplayertest"
"github.com/youtube/vitess/go/vt/binlog/gorpcbinlogstreamer"
)
// the test here creates a fake server implementation, a fake client
// implementation, and runs the test suite against the setup.
func TestGoRPCBinlogStreamer(t *testing.T) {
// Listen on a random port
listener, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot listen: %v", err)
}
// Create a Go Rpc server and listen on the port
server := rpcplus.NewServer()
fakeUpdateStream := binlogplayertest.NewFakeBinlogStreamer(t)
server.Register(gorpcbinlogstreamer.New(fakeUpdateStream))
// create the HTTP server, serve the server from it
handler := http.NewServeMux()
bsonrpc.ServeCustomRPC(handler, server, false)
httpServer := http.Server{
Handler: handler,
}
go httpServer.Serve(listener)
// Create a Go Rpc client to talk to the fake tablet
client := &GoRpcBinlogPlayerClient{}
// and send it to the test suite
binlogplayertest.Run(t, client, listener.Addr().String(), fakeUpdateStream)
}

Просмотреть файл

@ -10,32 +10,44 @@ import (
"github.com/youtube/vitess/go/vt/servenv"
)
// UpdateStream is the go rpc UpdateStream server
type UpdateStream struct {
updateStream *binlog.UpdateStream
updateStream proto.UpdateStream
}
// ServeUpdateStream is part of the gorpc UpdateStream service
func (server *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply interface{}) error) (err error) {
defer server.updateStream.HandlePanic(&err)
return server.updateStream.ServeUpdateStream(req, func(reply *proto.StreamEvent) error {
return sendReply(reply)
})
}
// StreamKeyRange is part of the gorpc UpdateStream service
func (server *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, sendReply func(reply interface{}) error) (err error) {
defer server.updateStream.HandlePanic(&err)
return server.updateStream.StreamKeyRange(req, func(reply *proto.BinlogTransaction) error {
return sendReply(reply)
})
}
// StreamTables is part of the gorpc UpdateStream service
func (server *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply interface{}) error) (err error) {
defer server.updateStream.HandlePanic(&err)
return server.updateStream.StreamTables(req, func(reply *proto.BinlogTransaction) error {
return sendReply(reply)
})
}
// New returns a new go rpc server implementation stub for UpdateStream
func New(updateStream proto.UpdateStream) *UpdateStream {
return &UpdateStream{updateStream}
}
// registration mechanism
func init() {
binlog.RegisterUpdateStreamServices = append(binlog.RegisterUpdateStreamServices, func(updateStream *binlog.UpdateStream) {
servenv.Register("updatestream", &UpdateStream{updateStream})
binlog.RegisterUpdateStreamServices = append(binlog.RegisterUpdateStreamServices, func(updateStream proto.UpdateStream) {
servenv.Register("updatestream", New(updateStream))
})
}

Просмотреть файл

@ -29,3 +29,20 @@ type TablesRequest struct {
Tables []string
Charset *mproto.Charset
}
// UpdateStream is the interface for the server
type UpdateStream interface {
// ServeUpdateStream serves the query and streams the result
// for the full update stream
ServeUpdateStream(req *UpdateStreamRequest, sendReply func(reply *StreamEvent) error) error
// StreamKeyRange streams events related to a KeyRange only
StreamKeyRange(req *KeyRangeRequest, sendReply func(reply *BinlogTransaction) error) error
// StreamTables streams events related to a set of Tables only
StreamTables(req *TablesRequest, sendReply func(reply *BinlogTransaction) error) error
// HandlePanic should be called in a defer,
// first thing in the RPC implementation.
HandlePanic(*error)
}

Просмотреть файл

@ -11,6 +11,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -38,6 +39,7 @@ var (
tablesTransactions = stats.NewInt("UpdateStreamTablesTransactions")
)
// UpdateStream is the real implementation of proto.UpdateStream
type UpdateStream struct {
mycnf *mysqlctl.Mycnf
@ -80,13 +82,16 @@ func (sl *streamList) Stop() {
sl.Unlock()
}
// UpdateStreamRpcService is the singleton that gets initialized during
// UpdateStream is the singleton that gets initialized during
// startup and that gets called by all RPC server implementations
var UpdateStreamRpcService *UpdateStream
// Glue to delay registration of RPC servers until we have all the objects
type RegisterUpdateStreamServiceFunc func(*UpdateStream)
// RegisterUpdateStreamServiceFunc is the type to use for delayed
// registration of RPC servers until we have all the objects
type RegisterUpdateStreamServiceFunc func(proto.UpdateStream)
// RegisterUpdateStreamServices is the list of all registration
// callbacks to invoke
var RegisterUpdateStreamServices []RegisterUpdateStreamServiceFunc
// RegisterUpdateStreamService needs to be called to start listening
@ -115,24 +120,30 @@ func logError() {
}
}
// EnableUpdateStreamService enables the RPC service for UpdateStream
func EnableUpdateStreamService(dbname string, mysqld *mysqlctl.Mysqld) {
defer logError()
UpdateStreamRpcService.enable(dbname, mysqld)
}
// DisableUpdateStreamService disables the RPC service for UpdateStream
func DisableUpdateStreamService() {
defer logError()
UpdateStreamRpcService.disable()
}
// ServeUpdateStream sill serve one UpdateStream
func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) error {
return UpdateStreamRpcService.ServeUpdateStream(req, sendReply)
}
// IsUpdateStreamEnabled returns true if the RPC service is enabled
func IsUpdateStreamEnabled() bool {
return UpdateStreamRpcService.isEnabled()
}
// GetReplicationPosition returns the current replication position of
// the service
func GetReplicationPosition() (myproto.ReplicationPosition, error) {
return UpdateStreamRpcService.getReplicationPosition()
}
@ -178,13 +189,8 @@ func (updateStream *UpdateStream) isEnabled() bool {
return updateStream.state.Get() == ENABLED
}
// ServeUpdateStream is part of the proto.UpdateStream interface
func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply *proto.StreamEvent) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
}
}()
updateStream.actionLock.Lock()
if !updateStream.isEnabled() {
updateStream.actionLock.Unlock()
@ -215,13 +221,8 @@ func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamReque
return svm.Join()
}
// StreamKeyRange is part of the proto.UpdateStream interface
func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
}
}()
updateStream.actionLock.Lock()
if !updateStream.isEnabled() {
updateStream.actionLock.Unlock()
@ -251,13 +252,8 @@ func (updateStream *UpdateStream) StreamKeyRange(req *proto.KeyRangeRequest, sen
return svm.Join()
}
// StreamTables is part of the proto.UpdateStream interface
func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendReply func(reply *proto.BinlogTransaction) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
}
}()
updateStream.actionLock.Lock()
if !updateStream.isEnabled() {
updateStream.actionLock.Unlock()
@ -287,6 +283,14 @@ func (updateStream *UpdateStream) StreamTables(req *proto.TablesRequest, sendRep
return svm.Join()
}
// HandlePanic is part of the proto.UpdateStream interface
func (updateStream *UpdateStream) HandlePanic(err *error) {
if x := recover(); x != nil {
log.Errorf("Uncaught panic:\n%v\n%s", x, tb.Stack(4))
*err = fmt.Errorf("uncaught panic: %v", x)
}
}
func (updateStream *UpdateStream) getReplicationPosition() (myproto.ReplicationPosition, error) {
updateStream.actionLock.Lock()
defer updateStream.actionLock.Unlock()