Big leap: update_stream uses binlog_streamer

The update stream API also changes. It accepts a group id
based BinlogPosition as input, and streams events one at a
time. The position is also a separate event at the end of
each transaction.
This commit is contained in:
Sugu Sougoumarane 2013-11-18 00:41:05 -08:00
Родитель 402b9dad0b
Коммит 78c880dcad
14 изменённых файлов: 289 добавлений и 403 удалений

Просмотреть файл

@ -55,9 +55,6 @@ func main() {
ts.InitQueryService()
mysqlctl.RegisterUpdateStreamService(mycnf)
// Depends on both query and updateStream.
ts.RegisterCacheInvalidator()
// Depends on both query and updateStream.
if err := vttablet.InitAgent(tabletAlias, dbcfgs, mycnf, *dbCredentialsFile, *port, *securePort, *mycnfFile, *overridesFile); err != nil {
log.Fatal(err)

Просмотреть файл

@ -88,6 +88,10 @@ type BinlogPosition struct {
GroupId, ServerId int64
}
func (blp *BinlogPosition) String() string {
return fmt.Sprintf("%d:%d", blp.GroupId, blp.ServerId)
}
// BinlogStreamer streamer streams binlog events grouped
// by transactions.
type BinlogStreamer struct {
@ -201,6 +205,9 @@ func (bls *BinlogStreamer) parseEvents(sendTransaction sendTransactionFunc, read
Position: bls.blPos,
}
if err = sendTransaction(trans); err != nil {
if err == io.EOF {
return err
}
return fmt.Errorf("send reply error: %v", err)
}
statements = nil
@ -237,7 +244,10 @@ eventLoop:
}
values = rotateRE.FindSubmatch(event)
if values != nil {
bls.file.Rotate(path.Join(bls.dir, string(values[1])), mustParseInt64(values[2]))
err = bls.file.Rotate(path.Join(bls.dir, string(values[1])), mustParseInt64(values[2]))
if err != nil {
return nil, err
}
continue
}
values = delimRE.FindSubmatch(event)
@ -291,19 +301,40 @@ type fileInfo struct {
}
func (f *fileInfo) Init(name string, pos int64) error {
return f.Rotate(name, pos)
err := f.Rotate(name, pos)
if err != nil {
return err
}
// Make sure the current file hasn't rotated.
next := nextFileName(name)
fi, _ := os.Stat(next)
if fi == nil {
// Assume next file doesn't exist
return nil
}
// Next file exists. Check if current file size matches position
fi, err = f.handle.Stat()
if err != nil {
return err
}
if fi.Size() <= pos {
// The file has rotated
return f.Rotate(next, 4)
}
return nil
}
func (f *fileInfo) Rotate(name string, pos int64) (err error) {
if f.handle != nil {
f.handle.Close()
}
f.name = name
f.pos = pos
f.handle, err = os.Open(name)
if err != nil {
return fmt.Errorf("open error: %v", err)
}
f.name = name
f.pos = pos
return nil
}
@ -336,6 +367,17 @@ func (f *fileInfo) Close() (err error) {
return fmt.Errorf("close error: %v", err)
}
func nextFileName(name string) string {
newname := []byte(name)
index := len(newname) - 1
for newname[index] == '9' && index > 0 {
newname[index] = '0'
index--
}
newname[index] += 1
return string(newname)
}
// mustParseInt64 can be used if you don't expect to fail.
func mustParseInt64(b []byte) int64 {
val, err := strconv.ParseInt(string(b), 10, 64)

Просмотреть файл

@ -128,6 +128,29 @@ func TestFileInfo(t *testing.T) {
}
}
func TestNewName(t *testing.T) {
want := "0002"
got := nextFileName("0001")
if want != got {
t.Errorf("want %s, got %s", want, got)
}
want = "0010"
got = nextFileName("0009")
if want != got {
t.Errorf("want %s, got %s", want, got)
}
want = "0100"
got = nextFileName("0099")
if want != got {
t.Errorf("want %s, got %s", want, got)
}
want = ":000"
got = nextFileName("9999")
if want != got {
t.Errorf("want %s, got %s", want, got)
}
}
type fakeReader struct {
toSend []byte
err error
@ -317,3 +340,18 @@ func TestStream(t *testing.T) {
t.Error(err)
}
}
// TestRoration should not hang
func TestRotation(t *testing.T) {
env := setup("cat $3", 0)
defer cleanup(env)
bls := NewBinlogStreamer("db", "test/vt-0000041983-bin")
err := bls.Stream("vt-0000041983-bin.000004", 2682, func(tx *BinlogTransaction) error {
bls.Stop()
return nil
})
if err != nil {
t.Error(err)
}
}

Просмотреть файл

@ -11,7 +11,6 @@ import (
"time"
"encoding/base64"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/sqlparser"
)
@ -22,7 +21,7 @@ var (
)
type StreamEvent struct {
// Category can be "DML", "DDL" or "POS"
// Category can be "DML", "DDL", "ERR" or "POS"
Category string
// DML
@ -30,10 +29,10 @@ type StreamEvent struct {
PkColNames []string
PkValues [][]interface{}
// DDL
// DDL or ERR
Sql string
// Timestamp is set for DML and DDL
// Timestamp is set for DML, DDL or ERR
Timestamp int64
// POS
@ -117,8 +116,8 @@ func (evs *EventStreamer) transactionToEvent(trans *BinlogTransaction) error {
func (evs *EventStreamer) buildDMLEvent(sql []byte, insertid int64) (dmlEvent *StreamEvent, newinsertid int64, err error) {
commentIndex := bytes.LastIndex(sql, STREAM_COMMENT_START)
if commentIndex == -1 {
log.Errorf("DML has no stream comment: %s", sql)
evs.DmlErrors++
return &StreamEvent{Category: "ERR", Sql: string(sql)}, insertid, nil
}
streamComment := string(sql[commentIndex+len(STREAM_COMMENT_START):])
eventTree, err := parseStreamComment(streamComment)

Просмотреть файл

@ -97,6 +97,9 @@ func TestDMLEvent(t *testing.T) {
}, {
Category: BL_DML,
Sql: []byte("query /* _stream vtocc_e (eid id namevent) (null -1 'bmFtZQ==' ) (null 18446744073709551615 'bmFtZQ==' ); */"),
}, {
Category: BL_DML,
Sql: []byte("query"),
},
},
Position: BinlogPosition{
@ -113,6 +116,12 @@ func TestDMLEvent(t *testing.T) {
if want != got {
t.Errorf("want %s, got %s", want, got)
}
case "ERR":
want := `&{ERR [] [] query 1 0 0}`
got := fmt.Sprintf("%v", event)
if want != got {
t.Errorf("want %s, got %s", want, got)
}
case "POS":
want := `&{POS [] [] 0 20 30}`
got := fmt.Sprintf("%v", event)
@ -129,7 +138,7 @@ func TestDMLEvent(t *testing.T) {
if err != nil {
t.Error(err)
}
if evs.DmlCount != 1 {
if evs.DmlCount != 2 {
t.Errorf("want 1, got %d", evs.DmlCount)
}
if evs.TransactionCount != 1 {

Просмотреть файл

@ -6,9 +6,11 @@ package mysqlctl
import (
"fmt"
"sync"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/rpcwrap"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
@ -26,9 +28,42 @@ type UpdateStream struct {
actionLock sync.Mutex
state sync2.AtomicInt64
states *stats.States
mysqld *Mysqld
stateWaitGroup sync.WaitGroup
dbname string
streams streamList
}
type streamList struct {
sync.Mutex
streams map[*EventStreamer]bool
}
func (sl *streamList) Init() {
sl.Lock()
sl.streams = make(map[*EventStreamer]bool)
sl.Unlock()
}
func (sl *streamList) Add(e *EventStreamer) {
sl.Lock()
sl.streams[e] = true
sl.Unlock()
}
func (sl *streamList) Delete(e *EventStreamer) {
sl.Lock()
delete(sl.streams, e)
sl.Unlock()
}
func (sl *streamList) Stop() {
sl.Lock()
for stream := range sl.streams {
stream.Stop()
}
sl.Unlock()
}
var UpdateStreamRpcService *UpdateStream
@ -39,6 +74,10 @@ func RegisterUpdateStreamService(mycnf *Mycnf) {
}
UpdateStreamRpcService = &UpdateStream{mycnf: mycnf}
UpdateStreamRpcService.states = stats.NewStates("UpdateStreamState", []string{
"Disabled",
"Enabled",
}, time.Now(), DISABLED)
rpcwrap.RegisterAuthenticated(UpdateStreamRpcService)
}
@ -58,7 +97,7 @@ func DisableUpdateStreamService() {
UpdateStreamRpcService.disable()
}
func ServeUpdateStream(req *BinlogPosition, sendReply sendEventFunc) error {
func ServeUpdateStream(req *BinlogPosition, sendReply func(reply interface{}) error) error {
return UpdateStreamRpcService.ServeUpdateStream(req, sendReply)
}
@ -93,8 +132,10 @@ func (updateStream *UpdateStream) enable(dbcfgs dbconfigs.DBConfigs) {
}
updateStream.state.Set(ENABLED)
updateStream.states.SetState(ENABLED)
updateStream.mysqld = NewMysqld(updateStream.mycnf, dbcfgs.Dba, dbcfgs.Repl)
updateStream.dbname = dbcfgs.App.DbName
updateStream.streams.Init()
log.Infof("Enabling update stream, dbname: %s, binlogpath: %s", updateStream.dbname, updateStream.mycnf.BinLogPath)
}
@ -106,6 +147,8 @@ func (updateStream *UpdateStream) disable() {
}
updateStream.state.Set(DISABLED)
updateStream.states.SetState(DISABLED)
updateStream.streams.Stop()
updateStream.stateWaitGroup.Wait()
log.Infof("Update Stream Disabled")
}
@ -114,7 +157,7 @@ func (updateStream *UpdateStream) isEnabled() bool {
return updateStream.state.Get() == ENABLED
}
func (updateStream *UpdateStream) ServeUpdateStream(req *BinlogPosition, sendReply sendEventFunc) (err error) {
func (updateStream *UpdateStream) ServeUpdateStream(req *BinlogPosition, sendReply func(reply interface{}) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
@ -138,11 +181,10 @@ func (updateStream *UpdateStream) ServeUpdateStream(req *BinlogPosition, sendRep
log.Infof("ServeUpdateStream starting @ %v", rp)
evs := NewEventStreamer(updateStream.dbname, updateStream.mycnf.BinLogPath)
updateStream.streams.Add(evs)
defer updateStream.streams.Delete(evs)
return evs.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), func(reply *StreamEvent) error {
if !updateStream.isEnabled() {
evs.Stop()
return nil
}
return sendReply(reply)
})
}

Просмотреть файл

@ -57,11 +57,7 @@ type TransactionInfo struct {
type DmlType struct {
Table string
Keys []interface{}
}
type CacheInvalidate struct {
Dmls []DmlType
Keys []string
}
type DDLInvalidate struct {

Просмотреть файл

@ -336,31 +336,29 @@ func (qe *QueryEngine) StreamExecute(logStats *sqlQueryStats, query *proto.Query
qe.fullStreamFetch(logStats, conn, fullQuery, query.BindVariables, nil, nil, sendReply)
}
func (qe *QueryEngine) InvalidateForDml(cacheInvalidate *proto.CacheInvalidate) {
func (qe *QueryEngine) InvalidateForDml(dml *proto.DmlType) {
if qe.cachePool.IsClosed() {
return
}
qe.mu.RLock()
defer qe.mu.RUnlock()
for _, dml := range cacheInvalidate.Dmls {
invalidations := int64(0)
tableInfo := qe.schemaInfo.GetTable(dml.Table)
if tableInfo == nil {
panic(NewTabletError(FAIL, "Table %s not found", dml.Table))
}
if tableInfo.CacheType == schema.CACHE_NONE {
break
}
for _, val := range dml.Keys {
newKey := validateKey(tableInfo, val.(string))
if newKey != "" {
tableInfo.Cache.Delete(newKey)
}
invalidations++
}
tableInfo.invalidations.Add(invalidations)
invalidations := int64(0)
tableInfo := qe.schemaInfo.GetTable(dml.Table)
if tableInfo == nil {
panic(NewTabletError(FAIL, "Table %s not found", dml.Table))
}
if tableInfo.CacheType == schema.CACHE_NONE {
return
}
for _, val := range dml.Keys {
newKey := validateKey(tableInfo, val)
if newKey != "" {
tableInfo.Cache.Delete(newKey)
}
invalidations++
}
tableInfo.invalidations.Add(invalidations)
}
func (qe *QueryEngine) InvalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {

Просмотреть файл

@ -169,8 +169,8 @@ func IsCachePoolAvailable() bool {
return !SqlQueryRpcService.qe.cachePool.IsClosed()
}
func InvalidateForDml(cacheInvalidate *proto.CacheInvalidate) {
SqlQueryRpcService.invalidateForDml(cacheInvalidate)
func InvalidateForDml(dml *proto.DmlType) {
SqlQueryRpcService.invalidateForDml(dml)
}
func InvalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {

Просмотреть файл

@ -5,290 +5,137 @@
package tabletserver
import (
"encoding/gob"
"fmt"
"io"
"strconv"
"sync"
"time"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/sqltypes"
estats "github.com/youtube/vitess/go/stats" // stats is a private type defined somewhere else in this package, so it would conflict
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/mysqlctl"
cproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
const (
DISABLED = iota
ENABLED
RCINV_DISABLED = iota
RCINV_ENABLED
RCINV_SHUTTING_DOWN
)
// Error types for rowcache invalidator.
const (
// Fatal Errors
FATAL_ERROR = "InvalidatorFatal"
// Skippable errors, recorded and skipped.
INVALID_EVENT = "InvalidatorEvent"
)
type InvalidationError struct {
errPos string
errType string
msg string
}
func NewInvalidationError(errType, msg, pos string) *InvalidationError {
invErr := &InvalidationError{errType: errType, msg: msg, errPos: pos}
return invErr
}
func (err *InvalidationError) Error() string {
return fmt.Sprintf("%v: '%v' @ '%v'", err.errType, err.msg, err.errPos)
}
func (err *InvalidationError) isFatal() bool {
return (err.errType != INVALID_EVENT)
}
type InvalidationProcessor struct {
currentPosition *cproto.BinlogPosition
currentPosition mysqlctl.BinlogPosition
state sync2.AtomicInt64
states *estats.States
stateLock sync.Mutex
inTxn bool
dmlBuffer []*proto.DmlType
receiveEvent func(reply *mysqlctl.StreamEvent) error
encBuf []byte
states *stats.States
}
var CacheInvalidationProcessor *InvalidationProcessor
func NewInvalidationProcessor() *InvalidationProcessor {
invalidator := &InvalidationProcessor{}
invalidator.dmlBuffer = make([]*proto.DmlType, 10)
invalidator.receiveEvent = func(response *mysqlctl.StreamEvent) error {
return invalidator.invalidateEvent(response)
}
gob.Register(cproto.BinlogPosition{})
invalidator.encBuf = make([]byte, 0, 100)
return invalidator
}
func RegisterCacheInvalidator() {
if CacheInvalidationProcessor != nil {
return
}
CacheInvalidationProcessor = NewInvalidationProcessor()
CacheInvalidationProcessor.states = estats.NewStates("RowcacheInvalidationState", []string{
func init() {
CacheInvalidationProcessor = new(InvalidationProcessor)
CacheInvalidationProcessor.states = stats.NewStates("RowcacheInvalidationState", []string{
"Disabled",
"Enabled",
}, time.Now(), DISABLED)
estats.Publish("RowcacheInvalidationCheckPoint", estats.StringFunc(func() string {
if pos := CacheInvalidationProcessor.currentPosition; pos != nil {
return pos.String()
"ShuttingDown",
}, time.Now(), RCINV_DISABLED)
stats.Publish("RowcacheInvalidationCheckPoint", stats.StringFunc(func() string {
if CacheInvalidationProcessor.currentPosition.GroupId != 0 {
return CacheInvalidationProcessor.currentPosition.String()
}
return ""
}))
}
func StartRowCacheInvalidation() {
if !shouldInvalidatorRun() {
log.Infof("Row-cache invalidator not being enabled, criteria not met")
CacheInvalidationProcessor.stopRowCacheInvalidation()
return
}
if CacheInvalidationProcessor.isServiceEnabled() {
log.Infof("Row-cache invalidator service is already enabled")
return
}
CacheInvalidationProcessor.stateLock.Lock()
if shouldInvalidatorRun() {
CacheInvalidationProcessor.setState(ENABLED)
CacheInvalidationProcessor.stateLock.Unlock()
} else {
CacheInvalidationProcessor.setState(DISABLED)
CacheInvalidationProcessor.stateLock.Unlock()
return
}
log.Infof("Starting RowCacheInvalidation Service")
CacheInvalidationProcessor.runInvalidationLoop()
go CacheInvalidationProcessor.runInvalidationLoop()
}
func StopRowCacheInvalidation() {
if !CacheInvalidationProcessor.isServiceEnabled() {
log.Infof("Invalidator is already disabled - NOP")
return
}
CacheInvalidationProcessor.stopRowCacheInvalidation()
log.Infof("Rowcache invalidator stopped")
}
func ShouldInvalidatorRun() bool {
return shouldInvalidatorRun()
}
func shouldInvalidatorRun() bool {
return IsCachePoolAvailable() && mysqlctl.IsUpdateStreamEnabled()
}
func (rowCache *InvalidationProcessor) stopRowCacheInvalidation() {
rowCache.stateLock.Lock()
rowCache.setState(DISABLED)
rowCache.stateLock.Unlock()
}
func (rowCache *InvalidationProcessor) setState(state int64) {
rowCache.state.Set(state)
rowCache.states.SetState(state)
}
func (rowCache *InvalidationProcessor) isServiceEnabled() bool {
return rowCache.state.Get() == ENABLED
}
func (rowCache *InvalidationProcessor) updateErrCounters(err *InvalidationError) {
log.Errorf(err.Error())
if errorStats == nil {
log.Warningf("errorStats is not initialized")
return
}
errorStats.Add(err.errType, 1)
}
func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) error {
if !shouldInvalidatorRun() || !rowCache.isServiceEnabled() {
return NewInvalidationError(FATAL_ERROR, "Rowcache invalidator is not available", "")
}
updateResponse, ok := response.(*mysqlctl.UpdateResponse)
if !ok {
return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "")
}
rowCache.currentPosition = &updateResponse.Coord
return rowCache.processEvent(updateResponse)
}
func (rowCache *InvalidationProcessor) stopCache(reason string) {
log.Warningf("Stopping rowcache invalidation, reason: '%v'", reason)
rowCache.stopRowCacheInvalidation()
if IsCachePoolAvailable() {
log.Warningf("Disallowing Query Service as row-cache invalidator cannot run")
DisallowQueries()
if !rowCache.state.CompareAndSwap(RCINV_ENABLED, RCINV_SHUTTING_DOWN) {
log.Infof("Rowcache invalidator is not enabled")
}
rowCache.states.SetState(RCINV_SHUTTING_DOWN)
}
func (rowCache *InvalidationProcessor) runInvalidationLoop() {
var err error
if !IsCachePoolAvailable() {
log.Infof("Rowcache is not enabled. Not running invalidator.")
return
}
if !rowCache.state.CompareAndSwap(RCINV_DISABLED, RCINV_ENABLED) {
log.Infof("Rowcache invalidator already running")
return
}
rowCache.states.SetState(RCINV_ENABLED)
defer func() {
rowCache.state.Set(RCINV_DISABLED)
rowCache.states.SetState(RCINV_DISABLED)
DisallowQueries()
}()
replPos, err := mysqlctl.GetReplicationPosition()
if err != nil {
rErr := NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "")
rowCache.updateErrCounters(rErr)
rowCache.stopCache(rErr.Error())
log.Errorf("Rowcache invalidator could not start: cannot determine replication position: %v", err)
return
}
// TODO(sougou): change GroupId to be int64
groupid, err := strconv.Atoi(replPos.GroupId)
if err != nil {
rErr := NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "")
rowCache.updateErrCounters(rErr)
rowCache.stopCache(rErr.Error())
log.Errorf("Rowcache invalidator could not start: could not read group id: %v", err)
return
}
log.Infof("Starting rowcache invalidator")
req := &mysqlctl.BinlogPosition{GroupId: int64(groupid)}
err = mysqlctl.ServeUpdateStream(req, rowCache.receiveEvent)
err = mysqlctl.ServeUpdateStream(req, func(reply interface{}) error {
return rowCache.processEvent(reply.(*mysqlctl.StreamEvent))
})
if err != nil {
log.Errorf("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
if rErr, ok := err.(*InvalidationError); ok {
rowCache.updateErrCounters(rErr)
}
rowCache.stopCache(fmt.Sprintf("Unexpected or fatal error, '%v'", err.Error()))
}
log.Infof("Rowcache invalidator stopped")
}
func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateResponse) error {
if !event.Coord.Valid() {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "no error, position is not set", ""))
return nil
func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.StreamEvent) error {
if rowCache.state.Get() != RCINV_ENABLED {
return io.EOF
}
position := event.Coord.String()
var err error
switch event.Data.SqlType {
case mysqlctl.DDL:
err = rowCache.handleDdlEvent(event)
if err != nil {
return err
}
case mysqlctl.BEGIN:
rowCache.dmlBuffer = rowCache.dmlBuffer[:0]
if rowCache.inTxn {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Invalid 'BEGIN' event, transaction already in progress", position))
return nil
}
rowCache.inTxn = true
case mysqlctl.COMMIT:
if !rowCache.inTxn {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Invalid 'COMMIT' event for a non-transaction", position))
return nil
}
err = rowCache.handleTxn(event)
if err != nil {
return err
}
rowCache.inTxn = false
rowCache.dmlBuffer = rowCache.dmlBuffer[:0]
case "insert", "update", "delete":
dml, err := rowCache.buildDmlData(event)
if err != nil {
return err
}
if dml != nil {
rowCache.dmlBuffer = append(rowCache.dmlBuffer, dml)
}
switch event.Category {
case "DDL":
InvalidateForDDL(&proto.DDLInvalidate{DDL: event.Sql})
case "DML":
rowCache.handleDmlEvent(event)
case "ERR":
log.Errorf("Unrecognized: %s", event.Sql)
errorStats.Add("Invalidation", 1)
case "POS":
rowCache.currentPosition.GroupId = event.GroupId
rowCache.currentPosition.ServerId = event.ServerId
default:
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql), position))
//return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql))
panic(fmt.Errorf("unknown event: %#v", event))
}
return nil
}
func isDmlEvent(sqlType string) bool {
switch sqlType {
case "insert", "update", "delete":
return true
}
return false
}
func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateResponse) (*proto.DmlType, error) {
if !isDmlEvent(event.Data.SqlType) {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.Data.SqlType), event.Coord.String()))
return nil, nil
}
func (rowCache *InvalidationProcessor) handleDmlEvent(event *mysqlctl.StreamEvent) {
dml := new(proto.DmlType)
dml.Table = event.Data.TableName
dml.Keys = make([]interface{}, 0, len(event.Data.PkValues))
sqlTypeKeys := make([]sqltypes.Value, 0, len(event.Data.PkColNames))
for _, pkTuple := range event.Data.PkValues {
dml.Table = event.TableName
dml.Keys = make([]string, 0, len(event.PkValues))
sqlTypeKeys := make([]sqltypes.Value, 0, len(event.PkColNames))
for _, pkTuple := range event.PkValues {
sqlTypeKeys = sqlTypeKeys[:0]
if len(pkTuple) == 0 {
continue
}
for _, pkVal := range pkTuple {
key, err := sqltypes.BuildValue(pkVal)
if err != nil {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.Coord.String()))
return nil, nil
log.Errorf("Error building invalidation key for %#v: '%v'", event, err)
errorStats.Add("Invalidation", 1)
return
}
sqlTypeKeys = append(sqlTypeKeys, key)
}
@ -297,62 +144,5 @@ func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateRespon
dml.Keys = append(dml.Keys, invalidateKey)
}
}
return dml, nil
}
func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateResponse) error {
var err error
defer func() {
if x := recover(); x != nil {
if terr, ok := x.(*TabletError); ok {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.Coord.String()))
} else {
err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.Coord.String())
}
}
}()
if len(rowCache.dmlBuffer) == 0 {
return nil
}
rowCache.encBuf = rowCache.encBuf[:0]
cacheInvalidate := new(proto.CacheInvalidate)
rowCache.encBuf, err = bson.Marshal(&commitEvent.Coord)
if err != nil {
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.Coord.String())
}
cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer))
for _, dml := range rowCache.dmlBuffer {
cacheInvalidate.Dmls = append(cacheInvalidate.Dmls, *dml)
}
InvalidateForDml(cacheInvalidate)
return nil
}
func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateResponse) error {
var err error
defer func() {
if x := recover(); x != nil {
if terr, ok := x.(*TabletError); ok {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.Coord.String()))
} else {
err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.Coord.String())
}
}
}()
if ddlEvent.Data.Sql == "" {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String()))
return nil
//return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String())
}
rowCache.encBuf = rowCache.encBuf[:0]
ddlInvalidate := new(proto.DDLInvalidate)
rowCache.encBuf, err = bson.Marshal(&ddlEvent.Coord)
if err != nil {
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.Coord.String())
}
ddlInvalidate.DDL = ddlEvent.Data.Sql
InvalidateForDDL(ddlInvalidate)
return nil
InvalidateForDml(dml)
}

Просмотреть файл

@ -257,11 +257,11 @@ func (sq *SqlQuery) CloseReserved(session *proto.Session, noOutput *string) (err
return nil
}
func (sq *SqlQuery) invalidateForDml(cacheInvalidate *proto.CacheInvalidate) {
func (sq *SqlQuery) invalidateForDml(dml *proto.DmlType) {
if sq.state.Get() != SERVING {
return
}
sq.qe.InvalidateForDml(cacheInvalidate)
sq.qe.InvalidateForDml(dml)
}
func (sq *SqlQuery) invalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {

Просмотреть файл

@ -7,30 +7,24 @@ from net import gorpc
from net import bsonrpc
from vtdb import dbexceptions
class ReplicationCoordinates(object):
MasterFilename = None
MasterPosition = None
GroupId = None
def __init__(self, master_filename, master_position, group_id):
self.MasterFilename = master_filename
self.MasterPosition = master_position
self.GroupId = group_id
class Coord(object):
Position = None
Timestamp = None
Xid = None
GroupId = None
ServerId = None
def __init__(self, master_filename, master_position, group_id=None):
self.Position = ReplicationCoordinates(master_filename, master_position, group_id).__dict__
def __init__(self, group_id, server_id = None):
self.GroupId = group_id
self.ServerId = server_id
class EventData(object):
SqlType = None
Cateory = None
TableName = None
PkColNames = None
PkValues = None
Sql = None
PkRows = None
Timestamp = None
GroupId = None
ServerId = None
def __init__(self, raw_response):
for key, val in raw_response.iteritems():
@ -47,19 +41,6 @@ class EventData(object):
pk_row = [(col_name, col_value) for col_name, col_value in izip(raw_response['PkColNames'], pkList)]
self.PkRows.append(pk_row)
class UpdateStreamResponse(object):
Coord = None
Data = None
def __init__(self, response_dict):
self.raw_response = response_dict
self.format()
def format(self):
self.Coord = self.raw_response['Coord']
self.Data = EventData(self.raw_response['Data']).__dict__
class UpdateStreamConnection(object):
def __init__(self, addr, timeout, user=None, password=None, encrypted=False, keyfile=None, certfile=None):
self.client = bsonrpc.BsonRpcClient(addr, timeout, user, password, encrypted, keyfile, certfile)
@ -71,26 +52,24 @@ class UpdateStreamConnection(object):
self.client.close()
def stream_start(self, start_position):
req = {'StartPosition':start_position}
try:
self.client.stream_call('UpdateStream.ServeUpdateStream', req)
first_response = self.client.stream_next()
update_stream_response = UpdateStreamResponse(first_response.reply)
self.client.stream_call('UpdateStream.ServeUpdateStream', start_position)
response = self.client.stream_next()
if response is None:
return None
return EventData(response.reply).__dict__
except gorpc.GoRpcError as e:
raise dbexceptions.OperationalError(*e.args)
except:
logging.exception('gorpc low-level error')
raise
return update_stream_response.Coord, update_stream_response.Data
def stream_next(self):
try:
response = self.client.stream_next()
if response is None:
return None, None
update_stream_response = UpdateStreamResponse(response.reply)
return None
return EventData(response.reply).__dict__
except gorpc.AppError as e:
raise dbexceptions.DatabaseError(*e.args)
except gorpc.GoRpcError as e:
@ -98,4 +77,3 @@ class UpdateStreamConnection(object):
except:
logging.exception('gorpc low-level error')
raise
return update_stream_response.Coord, update_stream_response.Data

Просмотреть файл

@ -170,18 +170,18 @@ class RowCacheInvalidator(unittest.TestCase):
master_position = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status')
#The sleep is needed here, so the invalidator can catch up and the number can be tested.
replica_tablet.mquery('vt_test_keyspace', "select MASTER_POS_WAIT('%s', %d)" % (master_position[0][0], master_position[0][1]), 5)
time.sleep(5)
time.sleep(2)
inv_count1 = framework.MultiDict(json.load(urllib2.urlopen("http://%s/debug/table_stats" % replica_host)))['Totals']['Invalidations']
replica_tablet.mquery('vt_test_keyspace', "stop slave")
perform_insert(100)
# EOF is returned after 30s, sleeping a bit more to ensure we catch the EOF
# and can test replication stop effectively.
time.sleep(35)
time.sleep(2)
replica_tablet.mquery('vt_test_keyspace', "start slave")
master_position = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status')
#The sleep is needed here, so the invalidator can catch up and the number can be tested.
replica_tablet.mquery('vt_test_keyspace', "select MASTER_POS_WAIT('%s', %d)" % (master_position[0][0], master_position[0][1]), 5)
time.sleep(10)
time.sleep(2)
invalidatorStats = framework.MultiDict(json.load(urllib2.urlopen("http://%s/debug/vars" % replica_host)))
logging.debug("invalidatorStats %s" % invalidatorStats['RowcacheInvalidationCheckPoint'])
inv_count2 = framework.MultiDict(json.load(urllib2.urlopen("http://%s/debug/table_stats" % replica_host)))['Totals']['Invalidations']

Просмотреть файл

@ -32,7 +32,7 @@ master_start_position = None
def _get_master_current_position():
res = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status')
start_position = update_stream_service.Coord(res[0][0], res[0][1])
start_position = update_stream_service.Coord(int(res[0][4]))
return start_position.__dict__
@ -44,9 +44,7 @@ def _get_repl_current_position():
cursor.execute('show master status')
res = cursor.fetchall()
slave_dict = res[0]
master_log = slave_dict['File']
master_pos = slave_dict['Position']
start_position = update_stream_service.Coord(master_log, master_pos)
start_position = update_stream_service.Coord(slave_dict['Group_ID'])
return start_position.__dict__
@ -140,10 +138,10 @@ class TestUpdateStream(unittest.TestCase):
logging.debug("dialing replica update stream service")
replica_conn.dial()
try:
binlog_pos, data = replica_conn.stream_start(start_position)
data = replica_conn.stream_start(start_position)
except Exception, e:
logging.debug(str(e))
if str(e) == "Update stream service is not enabled yet":
if str(e) == "update stream service is not enabled":
logging.debug("Test Service Disabled: Pass")
else:
self.fail("Test Service Disabled: Fail - did not throw the correct exception")
@ -171,10 +169,10 @@ class TestUpdateStream(unittest.TestCase):
replica_conn.dial()
try:
binlog_pos, data = replica_conn.stream_start(start_position)
data = replica_conn.stream_start(start_position)
for i in xrange(10):
binlog_pos, data = replica_conn.stream_next()
if data['SqlType'] == 'COMMIT' and utils.options.verbose == 2:
data = replica_conn.stream_next()
if data['Category'] == 'DML' and utils.options.verbose == 2:
logging.debug("Test Service Enabled: Pass")
break
except Exception, e:
@ -191,15 +189,14 @@ class TestUpdateStream(unittest.TestCase):
disabled_err = False
txn_count = 0
try:
binlog_pos, data = replica_conn.stream_start(start_position)
data = replica_conn.stream_start(start_position)
utils.run_vtctl(['ChangeSlaveType', replica_tablet.tablet_alias, 'spare'])
#logging.debug("Sleeping a bit for the spare action to complete")
#time.sleep(20)
while binlog_pos:
binlog_pos, data = replica_conn.stream_next()
if data is not None and data['SqlType'] == 'COMMIT':
while data:
data = replica_conn.stream_next()
if data is not None and data['Category'] == 'POS':
txn_count +=1
logging.error("Test Service Switch: FAIL")
return
except dbexceptions.DatabaseError, e:
@ -240,31 +237,31 @@ class TestUpdateStream(unittest.TestCase):
self._exec_vt_txn(master_host, ['delete from vt_b',])
master_conn = self._get_master_stream_conn()
master_conn.dial()
master_tuples = []
binlog_pos, data = master_conn.stream_start(master_start_position)
master_tuples.append((binlog_pos, data))
master_events = []
data = master_conn.stream_start(master_start_position)
master_events.append(data)
for i in xrange(21):
binlog_pos, data = master_conn.stream_next()
master_tuples.append((binlog_pos, data))
if data['SqlType'] == 'COMMIT':
data = master_conn.stream_next()
master_events.append(data)
if data['Category'] == 'POS':
master_txn_count +=1
break
replica_tuples = []
replica_events = []
replica_conn = self._get_replica_stream_conn()
replica_conn.dial()
binlog_pos, data = replica_conn.stream_start(replica_start_position)
replica_tuples.append((binlog_pos, data))
data = replica_conn.stream_start(replica_start_position)
replica_events.append(data)
for i in xrange(21):
binlog_pos, data = replica_conn.stream_next()
replica_tuples.append((binlog_pos, data))
if data['SqlType'] == 'COMMIT':
data = replica_conn.stream_next()
replica_events.append(data)
if data['Category'] == 'POS':
replica_txn_count +=1
break
if len(master_tuples) != len(replica_tuples):
logging.debug("Test Failed - # of records mismatch, master %s replica %s" % (master_tuples, replica_tuples))
for master_val, replica_val in zip(master_tuples, replica_tuples):
master_data = master_val[1]
replica_data = replica_val[1]
if len(master_events) != len(replica_events):
logging.debug("Test Failed - # of records mismatch, master %s replica %s" % (master_events, replica_events))
for master_val, replica_val in zip(master_events, replica_events):
master_data = master_val
replica_data = replica_val
self.assertEqual(master_data, replica_data, "Test failed, data mismatch - master '%s' and replica position '%s'" % (master_data, replica_data))
logging.debug("Test Writes: PASS")
@ -275,8 +272,8 @@ class TestUpdateStream(unittest.TestCase):
logging.debug("test_ddl: starting @ %s" % start_position)
master_conn = self._get_master_stream_conn()
master_conn.dial()
binlog_pos, data = master_conn.stream_start(start_position)
self.assertEqual(data['Sql'], _create_vt_insert_test.replace('\n', ''), "DDL didn't match original")
data = master_conn.stream_start(start_position)
self.assertEqual(data['Sql'], _create_vt_insert_test, "DDL didn't match original")
#This tests the service switch from disable -> enable -> disable
def test_service_switch(self):
@ -292,17 +289,17 @@ class TestUpdateStream(unittest.TestCase):
self._exec_vt_txn(master_host, ['delete from vt_a',])
master_conn = self._get_master_stream_conn()
master_conn.dial()
binlog_pos, data = master_conn.stream_start(start_position)
data = master_conn.stream_start(start_position)
master_txn_count = 0
logs_correct = False
while master_txn_count <=2:
binlog_pos, data = master_conn.stream_next()
if start_position['Position']['MasterFilename'] < binlog_pos['Position']['MasterFilename']:
logs_correct = True
logging.debug("Log rotation correctly interpreted")
break
if data['SqlType'] == 'COMMIT':
data = master_conn.stream_next()
if data['Category'] == 'POS':
master_txn_count +=1
if start_position['GroupId'] < data['GroupId']:
logs_correct = True
logging.debug("Log rotation correctly interpreted")
break
if not logs_correct:
self.fail("Flush logs didn't get properly interpreted")