2013-01-26 05:09:32 +04:00
|
|
|
// Copyright 2012, Google Inc. All rights reserved.
|
|
|
|
// Use of this source code is governed by a BSD-style
|
|
|
|
// license that can be found in the LICENSE file.
|
|
|
|
|
|
|
|
package tabletserver
|
|
|
|
|
|
|
|
import (
|
2013-07-12 02:09:27 +04:00
|
|
|
"encoding/gob"
|
2013-01-26 05:09:32 +04:00
|
|
|
"fmt"
|
2013-07-12 02:09:27 +04:00
|
|
|
"strings"
|
2013-01-26 05:09:32 +04:00
|
|
|
"sync"
|
2013-07-12 02:09:27 +04:00
|
|
|
"time"
|
2013-01-26 05:09:32 +04:00
|
|
|
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log "github.com/golang/glog"
|
2013-07-19 05:18:20 +04:00
|
|
|
"github.com/youtube/vitess/go/bson"
|
|
|
|
"github.com/youtube/vitess/go/sqltypes"
|
|
|
|
estats "github.com/youtube/vitess/go/stats" // stats is a private type defined somewhere else in this package, so it would conflict
|
|
|
|
"github.com/youtube/vitess/go/sync2"
|
|
|
|
"github.com/youtube/vitess/go/vt/mysqlctl"
|
2013-08-02 05:08:10 +04:00
|
|
|
cproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
2013-07-19 05:18:20 +04:00
|
|
|
"github.com/youtube/vitess/go/vt/tabletserver/proto"
|
2013-01-26 05:09:32 +04:00
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
DISABLED = iota
|
|
|
|
ENABLED
|
|
|
|
)
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
// Error types for rowcache invalidator.
|
|
|
|
const (
|
|
|
|
// Fatal Errors
|
2013-07-13 03:41:41 +04:00
|
|
|
FATAL_ERROR = "InvalidatorFatal"
|
2013-07-12 02:09:27 +04:00
|
|
|
|
|
|
|
// Skippable errors, recorded and skipped.
|
2013-07-13 03:41:41 +04:00
|
|
|
INVALID_EVENT = "InvalidatorEvent"
|
2013-07-12 02:09:27 +04:00
|
|
|
)
|
|
|
|
|
|
|
|
type InvalidationError struct {
|
|
|
|
errPos string
|
|
|
|
errType string
|
|
|
|
msg string
|
|
|
|
}
|
|
|
|
|
|
|
|
func NewInvalidationError(errType, msg, pos string) *InvalidationError {
|
|
|
|
invErr := &InvalidationError{errType: errType, msg: msg, errPos: pos}
|
|
|
|
return invErr
|
|
|
|
}
|
|
|
|
|
|
|
|
func (err *InvalidationError) Error() string {
|
|
|
|
return fmt.Sprintf("%v: '%v' @ '%v'", err.errType, err.msg, err.errPos)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (err *InvalidationError) isFatal() bool {
|
|
|
|
return (err.errType != INVALID_EVENT)
|
|
|
|
}
|
|
|
|
|
2013-01-26 05:09:32 +04:00
|
|
|
type InvalidationProcessor struct {
|
2013-08-02 05:08:10 +04:00
|
|
|
currentPosition *cproto.BinlogPosition
|
2013-08-12 05:37:28 +04:00
|
|
|
state sync2.AtomicInt64
|
2013-07-12 02:09:27 +04:00
|
|
|
states *estats.States
|
2013-01-26 05:09:32 +04:00
|
|
|
stateLock sync.Mutex
|
|
|
|
inTxn bool
|
|
|
|
dmlBuffer []*proto.DmlType
|
|
|
|
receiveEvent mysqlctl.SendUpdateStreamResponse
|
2013-07-12 02:09:27 +04:00
|
|
|
encBuf []byte
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
var CacheInvalidationProcessor *InvalidationProcessor
|
|
|
|
|
|
|
|
func NewInvalidationProcessor() *InvalidationProcessor {
|
|
|
|
invalidator := &InvalidationProcessor{}
|
|
|
|
invalidator.dmlBuffer = make([]*proto.DmlType, 10)
|
|
|
|
|
|
|
|
invalidator.receiveEvent = func(response interface{}) error {
|
|
|
|
return invalidator.invalidateEvent(response)
|
|
|
|
}
|
2013-08-02 05:08:10 +04:00
|
|
|
gob.Register(cproto.BinlogPosition{})
|
2013-07-12 02:09:27 +04:00
|
|
|
invalidator.encBuf = make([]byte, 0, 100)
|
2013-01-26 05:09:32 +04:00
|
|
|
return invalidator
|
|
|
|
}
|
|
|
|
|
|
|
|
func RegisterCacheInvalidator() {
|
|
|
|
if CacheInvalidationProcessor != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
CacheInvalidationProcessor = NewInvalidationProcessor()
|
2013-08-15 02:26:49 +04:00
|
|
|
CacheInvalidationProcessor.states = estats.NewStates("CacheInvalidationState", []string{
|
2013-07-12 02:09:27 +04:00
|
|
|
"Disabled",
|
|
|
|
"Enabled",
|
|
|
|
}, time.Now(), DISABLED)
|
2013-08-15 02:26:49 +04:00
|
|
|
estats.Publish("CacheInvalidationCheckPoint", estats.StringFunc(func() string {
|
|
|
|
if pos := CacheInvalidationProcessor.currentPosition; pos != nil {
|
|
|
|
return pos.String()
|
|
|
|
}
|
|
|
|
return ""
|
|
|
|
}))
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func StartRowCacheInvalidation() {
|
|
|
|
if !shouldInvalidatorRun() {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Warningf("Row-cache invalidator not being enabled, criteria not met")
|
2013-01-26 05:09:32 +04:00
|
|
|
CacheInvalidationProcessor.stopRowCacheInvalidation()
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if CacheInvalidationProcessor.isServiceEnabled() {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Warningf("Row-cache invalidator service is already enabled")
|
2013-01-26 05:09:32 +04:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
CacheInvalidationProcessor.stateLock.Lock()
|
|
|
|
if shouldInvalidatorRun() {
|
2013-07-12 02:09:27 +04:00
|
|
|
CacheInvalidationProcessor.setState(ENABLED)
|
2013-01-26 05:09:32 +04:00
|
|
|
CacheInvalidationProcessor.stateLock.Unlock()
|
|
|
|
} else {
|
2013-07-12 02:09:27 +04:00
|
|
|
CacheInvalidationProcessor.setState(DISABLED)
|
2013-01-26 05:09:32 +04:00
|
|
|
CacheInvalidationProcessor.stateLock.Unlock()
|
|
|
|
return
|
|
|
|
}
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Infof("Starting RowCacheInvalidation Service")
|
2013-07-12 02:09:27 +04:00
|
|
|
|
|
|
|
CacheInvalidationProcessor.runInvalidationLoop()
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func StopRowCacheInvalidation() {
|
2013-07-12 02:09:27 +04:00
|
|
|
if !CacheInvalidationProcessor.isServiceEnabled() {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Infof("Invalidator is already disabled - NOP")
|
2013-07-12 02:09:27 +04:00
|
|
|
return
|
|
|
|
}
|
2013-01-26 05:09:32 +04:00
|
|
|
CacheInvalidationProcessor.stopRowCacheInvalidation()
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Infof("Rowcache invalidator stopped")
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func ShouldInvalidatorRun() bool {
|
|
|
|
return shouldInvalidatorRun()
|
|
|
|
}
|
|
|
|
|
|
|
|
func shouldInvalidatorRun() bool {
|
|
|
|
return IsCachePoolAvailable() && mysqlctl.IsUpdateStreamEnabled()
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rowCache *InvalidationProcessor) stopRowCacheInvalidation() {
|
|
|
|
rowCache.stateLock.Lock()
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.setState(DISABLED)
|
2013-01-26 05:09:32 +04:00
|
|
|
rowCache.stateLock.Unlock()
|
|
|
|
}
|
|
|
|
|
2013-08-12 05:37:28 +04:00
|
|
|
func (rowCache *InvalidationProcessor) setState(state int64) {
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.state.Set(state)
|
2013-08-12 05:37:28 +04:00
|
|
|
rowCache.states.SetState(state)
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
|
|
|
func (rowCache *InvalidationProcessor) isServiceEnabled() bool {
|
2013-04-11 02:43:10 +04:00
|
|
|
return rowCache.state.Get() == ENABLED
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
func (rowCache *InvalidationProcessor) updateErrCounters(err *InvalidationError) {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Errorf(err.Error())
|
2013-07-13 03:41:41 +04:00
|
|
|
if errorStats == nil {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Warningf("errorStats is not initialized")
|
2013-07-13 03:41:41 +04:00
|
|
|
return
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
2013-07-13 03:41:41 +04:00
|
|
|
errorStats.Add(err.errType, 1)
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
|
|
|
|
2013-01-26 05:09:32 +04:00
|
|
|
func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) error {
|
|
|
|
if !shouldInvalidatorRun() || !rowCache.isServiceEnabled() {
|
2013-07-12 11:14:08 +04:00
|
|
|
return NewInvalidationError(FATAL_ERROR, "Rowcache invalidator is not available", "")
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
updateResponse, ok := response.(*mysqlctl.UpdateResponse)
|
|
|
|
if !ok {
|
2013-07-12 11:14:08 +04:00
|
|
|
return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "")
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.currentPosition = &updateResponse.Coord
|
2013-07-12 11:14:08 +04:00
|
|
|
return rowCache.processEvent(updateResponse)
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
func (rowCache *InvalidationProcessor) stopCache(reason string) {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Warningf("Stopping rowcache invalidation, reason: '%v'", reason)
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.stopRowCacheInvalidation()
|
|
|
|
if IsCachePoolAvailable() {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Warningf("Disallowing Query Service as row-cache invalidator cannot run")
|
2013-07-12 02:09:27 +04:00
|
|
|
DisallowQueries(false)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (rowCache *InvalidationProcessor) runInvalidationLoop() {
|
|
|
|
var err error
|
|
|
|
|
|
|
|
replPos, err := mysqlctl.GetReplicationPosition()
|
|
|
|
if err != nil {
|
2013-07-12 11:14:08 +04:00
|
|
|
rErr := NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "")
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(rErr)
|
|
|
|
rowCache.stopCache(rErr.Error())
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2013-08-02 05:08:10 +04:00
|
|
|
startPosition := &cproto.BinlogPosition{Position: *replPos}
|
2013-01-26 05:09:32 +04:00
|
|
|
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Infof("Starting @ %v", startPosition.String())
|
2013-07-12 02:09:27 +04:00
|
|
|
req := &mysqlctl.UpdateStreamRequest{StartPosition: *startPosition}
|
|
|
|
err = mysqlctl.ServeUpdateStream(req, rowCache.receiveEvent)
|
2013-01-26 05:09:32 +04:00
|
|
|
if err != nil {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Errorf("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
|
2013-07-12 11:14:08 +04:00
|
|
|
if rErr, ok := err.(*InvalidationError); ok {
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(rErr)
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.stopCache(fmt.Sprintf("Unexpected or fatal error, '%v'", err.Error()))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2013-01-26 05:09:32 +04:00
|
|
|
func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateResponse) error {
|
2013-07-12 02:09:27 +04:00
|
|
|
position := ""
|
2013-07-20 05:38:13 +04:00
|
|
|
if event.Coord.Valid() {
|
|
|
|
position = event.Coord.String()
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
2013-01-26 05:09:32 +04:00
|
|
|
if event.Error != "" {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Errorf("Update stream returned error '%v'", event.Error)
|
2013-07-12 02:09:27 +04:00
|
|
|
// Check if update stream error is fatal, else record it and move on.
|
|
|
|
if strings.HasPrefix(event.Error, mysqlctl.FATAL) {
|
Automatic rewrite of relog import paths and calls to use glog.
Commands run:
find go -name "*.go" | xargs sed --in-place -r 's,"github.com/youtube/vitess/go/relog",log "github.com/golang/glog",g; s,relog.Info,log.Infof,g; s,relog.Warning,log.Warningf,g; s,relog.Error,log.Errorf,g; s,relog.Fatal,log.Fatalf,g; s,relog.Debug,log.V(6).Infof,g'
find . -name '*.go' -exec gofmt -w {} \;
2013-08-07 01:56:00 +04:00
|
|
|
log.Infof("Returning Service Error")
|
2013-07-12 11:14:08 +04:00
|
|
|
return NewInvalidationError(FATAL_ERROR, event.Error, position)
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, event.Error, position))
|
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
2013-07-20 05:38:13 +04:00
|
|
|
if !event.Coord.Valid() {
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "no error, position is not set", ""))
|
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
var err error
|
2013-07-20 05:38:13 +04:00
|
|
|
switch event.Data.SqlType {
|
2013-01-26 05:09:32 +04:00
|
|
|
case mysqlctl.DDL:
|
2013-07-12 02:09:27 +04:00
|
|
|
err = rowCache.handleDdlEvent(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-01-26 05:09:32 +04:00
|
|
|
case mysqlctl.BEGIN:
|
|
|
|
rowCache.dmlBuffer = rowCache.dmlBuffer[:0]
|
|
|
|
if rowCache.inTxn {
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Invalid 'BEGIN' event, transaction already in progress", position))
|
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
rowCache.inTxn = true
|
|
|
|
case mysqlctl.COMMIT:
|
|
|
|
if !rowCache.inTxn {
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Invalid 'COMMIT' event for a non-transaction", position))
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
err = rowCache.handleTxn(event)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
rowCache.inTxn = false
|
|
|
|
rowCache.dmlBuffer = rowCache.dmlBuffer[:0]
|
|
|
|
case "insert", "update", "delete":
|
2013-07-12 02:09:27 +04:00
|
|
|
dml, err := rowCache.buildDmlData(event)
|
2013-01-26 05:09:32 +04:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-07-12 02:09:27 +04:00
|
|
|
if dml != nil {
|
|
|
|
rowCache.dmlBuffer = append(rowCache.dmlBuffer, dml)
|
|
|
|
}
|
2013-01-26 05:09:32 +04:00
|
|
|
default:
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql), position))
|
|
|
|
//return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql))
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func isDmlEvent(sqlType string) bool {
|
|
|
|
switch sqlType {
|
|
|
|
case "insert", "update", "delete":
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateResponse) (*proto.DmlType, error) {
|
2013-07-20 05:38:13 +04:00
|
|
|
if !isDmlEvent(event.Data.SqlType) {
|
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.Data.SqlType), event.Coord.String()))
|
2013-07-12 02:09:27 +04:00
|
|
|
return nil, nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
dml := new(proto.DmlType)
|
2013-07-20 05:38:13 +04:00
|
|
|
dml.Table = event.Data.TableName
|
|
|
|
dml.Keys = make([]interface{}, 0, len(event.Data.PkValues))
|
|
|
|
sqlTypeKeys := make([]sqltypes.Value, 0, len(event.Data.PkColNames))
|
|
|
|
for _, pkTuple := range event.Data.PkValues {
|
2013-01-26 05:09:32 +04:00
|
|
|
sqlTypeKeys = sqlTypeKeys[:0]
|
|
|
|
if len(pkTuple) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
for _, pkVal := range pkTuple {
|
|
|
|
key, err := sqltypes.BuildValue(pkVal)
|
|
|
|
if err != nil {
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.Coord.String()))
|
2013-07-12 02:09:27 +04:00
|
|
|
return nil, nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
sqlTypeKeys = append(sqlTypeKeys, key)
|
|
|
|
}
|
|
|
|
invalidateKey := buildKey(sqlTypeKeys)
|
|
|
|
if invalidateKey != "" {
|
|
|
|
dml.Keys = append(dml.Keys, invalidateKey)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return dml, nil
|
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateResponse) error {
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
|
|
if x := recover(); x != nil {
|
|
|
|
if terr, ok := x.(*TabletError); ok {
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.Coord.String()))
|
2013-07-12 02:09:27 +04:00
|
|
|
} else {
|
2013-07-20 05:38:13 +04:00
|
|
|
err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.Coord.String())
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
if len(rowCache.dmlBuffer) == 0 {
|
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.encBuf = rowCache.encBuf[:0]
|
2013-01-26 05:09:32 +04:00
|
|
|
cacheInvalidate := new(proto.CacheInvalidate)
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.encBuf, err = bson.Marshal(&commitEvent.Coord)
|
2013-07-12 02:09:27 +04:00
|
|
|
if err != nil {
|
2013-07-20 05:38:13 +04:00
|
|
|
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.Coord.String())
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
2013-01-26 05:09:32 +04:00
|
|
|
cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer))
|
|
|
|
for _, dml := range rowCache.dmlBuffer {
|
|
|
|
cacheInvalidate.Dmls = append(cacheInvalidate.Dmls, *dml)
|
|
|
|
}
|
|
|
|
InvalidateForDml(cacheInvalidate)
|
2013-07-12 02:09:27 +04:00
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
|
|
|
|
2013-07-12 02:09:27 +04:00
|
|
|
func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateResponse) error {
|
|
|
|
var err error
|
|
|
|
defer func() {
|
|
|
|
if x := recover(); x != nil {
|
|
|
|
if terr, ok := x.(*TabletError); ok {
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.Coord.String()))
|
2013-07-12 02:09:27 +04:00
|
|
|
} else {
|
2013-07-20 05:38:13 +04:00
|
|
|
err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.Coord.String())
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2013-07-20 05:38:13 +04:00
|
|
|
if ddlEvent.Data.Sql == "" {
|
|
|
|
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String()))
|
2013-07-12 02:09:27 +04:00
|
|
|
return nil
|
2013-07-20 05:38:13 +04:00
|
|
|
//return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String())
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|
2013-07-12 02:09:27 +04:00
|
|
|
rowCache.encBuf = rowCache.encBuf[:0]
|
2013-01-26 05:09:32 +04:00
|
|
|
ddlInvalidate := new(proto.DDLInvalidate)
|
2013-07-20 05:38:13 +04:00
|
|
|
rowCache.encBuf, err = bson.Marshal(&ddlEvent.Coord)
|
2013-07-12 02:09:27 +04:00
|
|
|
if err != nil {
|
2013-07-20 05:38:13 +04:00
|
|
|
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.Coord.String())
|
2013-07-12 02:09:27 +04:00
|
|
|
}
|
2013-07-20 05:38:13 +04:00
|
|
|
ddlInvalidate.DDL = ddlEvent.Data.Sql
|
2013-01-26 05:09:32 +04:00
|
|
|
InvalidateForDDL(ddlInvalidate)
|
2013-07-12 02:09:27 +04:00
|
|
|
return nil
|
2013-01-26 05:09:32 +04:00
|
|
|
}
|