This commit is contained in:
Sugu Sougoumarane 2014-02-14 17:25:46 -08:00
Родитель c26a034302 3e7db5bc00
Коммит 09dcc4ea57
19 изменённых файлов: 351 добавлений и 186 удалений

Просмотреть файл

@ -258,7 +258,7 @@ func main() {
}
fmt.Fprintf(os.Stderr, "\n")
}
dbconfigs.RegisterCommonFlags()
dbconfigs.RegisterFlags()
flag.Parse()
tabletAddr = fmt.Sprintf("%v:%v", "localhost", *port)

Просмотреть файл

@ -36,7 +36,7 @@ func init() {
}
func main() {
dbconfigs.RegisterCommonFlags()
dbconfigs.RegisterFlags()
flag.Parse()
servenv.Init()
defer servenv.Close()

Просмотреть файл

@ -12,28 +12,38 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/servenv"
ts "github.com/youtube/vitess/go/vt/tabletserver"
)
var (
port = flag.Int("port", 6510, "tcp port to serve on")
overridesFile = flag.String("schema-override", "", "schema overrides file")
port = flag.Int("port", 6510, "tcp port to serve on")
overridesFile = flag.String("schema-override", "", "schema overrides file")
enableRowcache = flag.Bool("enable-rowcache", false, "enable rowcacche")
enableInvalidator = flag.Bool("enable-invalidator", false, "enable rowcache invalidator")
binlogPath = flag.String("binlog-path", "", "binlog path used by rowcache invalidator")
)
var schemaOverrides []ts.SchemaOverride
func main() {
defaultDBConfig := dbconfigs.DefaultDBConfigs.App
defaultDBConfig.Host = "localhost"
dbconfigs.RegisterAppFlags(defaultDBConfig)
dbconfigs.RegisterFlags()
flag.Parse()
servenv.Init()
dbConfig, err := dbconfigs.InitApp("")
dbConfigs, err := dbconfigs.Init("")
if err != nil {
log.Fatalf("Cannot initialize App dbconfig: %v", err)
}
if *enableRowcache {
dbConfigs.App.EnableRowcache = true
if *enableInvalidator {
dbConfigs.App.EnableInvalidator = true
}
}
mycnf := &mysqlctl.Mycnf{BinLogPath: *binlogPath}
mysqld := mysqlctl.NewMysqld(mycnf, &dbConfigs.Dba, &dbConfigs.Repl)
unmarshalFile(*overridesFile, &schemaOverrides)
data, _ := json.MarshalIndent(schemaOverrides, "", " ")
@ -41,7 +51,7 @@ func main() {
ts.InitQueryService()
ts.AllowQueries(dbConfig, schemaOverrides, ts.LoadCustomRules())
ts.AllowQueries(&dbConfigs.App, schemaOverrides, ts.LoadCustomRules(), mysqld)
log.Infof("starting vtocc %v", *port)
servenv.OnClose(func() {

Просмотреть файл

@ -21,10 +21,11 @@ import (
)
var (
port = flag.Int("port", 6509, "port for the server")
tabletPath = flag.String("tablet-path", "", "tablet alias or path to zk node representing the tablet")
mycnfFile = flag.String("mycnf-file", "", "my.cnf file")
overridesFile = flag.String("schema-override", "", "schema overrides file")
port = flag.Int("port", 6509, "port for the server")
tabletPath = flag.String("tablet-path", "", "tablet alias or path to zk node representing the tablet")
mycnfFile = flag.String("mycnf-file", "", "my.cnf file")
enableRowcache = flag.Bool("enable-rowcache", false, "enable rowcacche")
overridesFile = flag.String("schema-override", "", "schema overrides file")
securePort = flag.Int("secure-port", 0, "port for the secure server")
cert = flag.String("cert", "", "cert file")
@ -35,7 +36,7 @@ var (
)
func main() {
dbconfigs.RegisterCommonFlags()
dbconfigs.RegisterFlags()
flag.Parse()
servenv.Init()
@ -55,6 +56,7 @@ func main() {
if err != nil {
log.Warning(err)
}
dbcfgs.App.EnableRowcache = *enableRowcache
ts.InitQueryService()
binlog.RegisterUpdateStreamService(mycnf)

Просмотреть файл

@ -0,0 +1,82 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync2
import (
"sync"
)
// These are the three predefined states of a service.
const (
SERVICE_STOPPED = iota
SERVICE_RUNNING
SERVICE_SHUTTING_DOWN
)
var stateNames = []string{
"Stopped",
"Running",
"ShuttingDown",
}
// ServiceManager manages the state of a service
// through its lifecycle.
type ServiceManager struct {
mu sync.Mutex
wg sync.WaitGroup
state AtomicInt64
}
// Go tries to change the state from SERVICE_STOPPED to SERVICE_RUNNING.
// If the current state is not SERVICE_STOPPED (already running),
// it returns false immediately.
// On successful transition, it launches the service as a goroutine and returns true.
// The service func is required to regularly check the state of the service manager.
// If the state is not SERVICE_RUNNING, it must treat it as end of service and return.
// When the service func returns, the state is reverted to SERVICE_STOPPED.
func (svm *ServiceManager) Go(service func(svm *ServiceManager)) bool {
svm.mu.Lock()
defer svm.mu.Unlock()
if !svm.state.CompareAndSwap(SERVICE_STOPPED, SERVICE_RUNNING) {
return false
}
svm.wg.Add(1)
go func() {
service(svm)
svm.state.Set(SERVICE_STOPPED)
svm.wg.Done()
}()
return true
}
// Stop tries to change the state from SERVICE_RUNNING to SERVICE_SHUTTING_DOWN.
// If the current state is not SERVICE_RUNNING, it returns false immediately.
// On successul transition, it waits for the service to finish, and returns true.
// You are allowed to 'Go' again after a Stop.
func (svm *ServiceManager) Stop() bool {
svm.mu.Lock()
defer svm.mu.Unlock()
if !svm.state.CompareAndSwap(SERVICE_RUNNING, SERVICE_SHUTTING_DOWN) {
return false
}
svm.wg.Wait()
return true
}
// Wait waits for the service to terminate if it's currently running.
func (svm *ServiceManager) Wait() {
svm.wg.Wait()
}
// State returns the current state of the service.
// This should only be used to report the current state.
func (svm *ServiceManager) State() int64 {
return svm.state.Get()
}
// StateName returns the name of the current state.
func (svm *ServiceManager) StateName() string {
return stateNames[svm.State()]
}

Просмотреть файл

@ -0,0 +1,60 @@
// Copyright 2013, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync2
import (
"runtime"
"testing"
"time"
)
func TestServiceManager(t *testing.T) {
activated := AtomicInt64(0)
service := func(svm *ServiceManager) {
if !activated.CompareAndSwap(0, 1) {
t.Fatalf("service called more than once")
}
for svm.State() == SERVICE_RUNNING {
time.Sleep(10 * time.Millisecond)
}
if !activated.CompareAndSwap(1, 0) {
t.Fatalf("service ended more than once")
}
}
var sm ServiceManager
if sm.StateName() != "Stopped" {
t.Errorf("want Stopped, got %s", sm.StateName())
}
result := sm.Go(service)
if !result {
t.Errorf("want true, got false")
}
if sm.StateName() != "Running" {
t.Errorf("want Running, got %s", sm.StateName())
}
runtime.Gosched()
if val := activated.Get(); val != 1 {
t.Errorf("want 1, got %d", val)
}
result = sm.Go(service)
if result {
t.Errorf("want false, got true")
}
result = sm.Stop()
if !result {
t.Errorf("want true, got false")
}
if val := activated.Get(); val != 0 {
t.Errorf("want 0, got %d", val)
}
result = sm.Stop()
if result {
t.Errorf("want false, got true")
}
sm.state.Set(SERVICE_SHUTTING_DOWN)
if sm.StateName() != "ShuttingDown" {
t.Errorf("want ShuttingDown, got %s", sm.StateName())
}
}

Просмотреть файл

@ -74,8 +74,7 @@ type BinlogStreamer struct {
dbname string
dir string
// running is set when streaming begins.
running sync2.AtomicInt32
svm sync2.ServiceManager
// file, blPos & delim are updated during streaming.
file fileInfo
@ -99,27 +98,26 @@ func NewBinlogStreamer(dbname, binlogPrefix string) *BinlogStreamer {
// Stream starts streaming binlog events from file & pos by repeatedly calling sendTransaction.
func (bls *BinlogStreamer) Stream(file string, pos int64, sendTransaction sendTransactionFunc) (err error) {
if !bls.running.CompareAndSwap(0, 1) {
return fmt.Errorf("already streaming or stopped.")
}
if err = bls.file.Init(path.Join(bls.dir, file), pos); err != nil {
return err
}
defer func() {
bls.file.Close()
bls.Stop()
}()
defer bls.file.Close()
for {
if err = bls.run(sendTransaction); err != nil {
goto end
// Launch using service manager so we can stop this
// as needed.
bls.svm.Go(func(_ *sync2.ServiceManager) {
for {
if err = bls.run(sendTransaction); err != nil {
return
}
if err = bls.file.WaitForChange(&bls.svm); err != nil {
return
}
}
if err = bls.file.WaitForChange(&bls.running); err != nil {
goto end
}
}
})
end:
// Wait for service to exit, and handle errors if any.
bls.svm.Wait()
if err == io.EOF {
log.Infof("Stream ended @ %#v", bls.file)
return nil
@ -129,9 +127,8 @@ end:
}
// Stop stops the currently executing Stream if there is one.
// You cannot resume with the current BinlogStreamer after you've stopped.
func (bls *BinlogStreamer) Stop() {
bls.running.Set(-1)
bls.svm.Stop()
}
// run launches mysqlbinlog and starts the stream. It takes care of
@ -199,7 +196,8 @@ func (bls *BinlogStreamer) parseEvents(sendTransaction sendTransactionFunc, read
func (bls *BinlogStreamer) nextStatement(bufReader *bufio.Reader) (stmt []byte, err error) {
eventLoop:
for {
if bls.running.Get() != 1 {
// Stop processing if we're shutting down
if bls.svm.State() != sync2.SERVICE_RUNNING {
return nil, io.EOF
}
event, err := bls.readEvent(bufReader)
@ -316,9 +314,10 @@ func (f *fileInfo) Set(pos int64) {
f.pos = pos
}
func (f *fileInfo) WaitForChange(running *sync2.AtomicInt32) error {
func (f *fileInfo) WaitForChange(svm *sync2.ServiceManager) error {
for {
if running.Get() != 1 {
// Stop waiting if we're shutting down
if svm.State() != sync2.SERVICE_RUNNING {
return io.EOF
}
time.Sleep(100 * time.Millisecond)

Просмотреть файл

@ -83,10 +83,10 @@ func TestFileInfo(t *testing.T) {
t.Fatal(err)
}
ch := make(chan []byte, 10)
var running = sync2.AtomicInt32(1)
go func() {
for {
file.WaitForChange(&running)
var svm = sync2.ServiceManager{}
svm.Go(func(_ *sync2.ServiceManager) {
for svm.State() == sync2.SERVICE_RUNNING {
file.WaitForChange(&svm)
b := make([]byte, 128)
n, err := file.handle.Read(b)
if err != nil {
@ -95,7 +95,7 @@ func TestFileInfo(t *testing.T) {
file.Set(file.pos + int64(n))
ch <- b[:n]
}
}()
})
want := "Message1"
writer.WriteString(want)
@ -123,7 +123,7 @@ func TestFileInfo(t *testing.T) {
}
want = "EOF"
running.Set(-1)
svm.Stop()
got = string(<-ch)
if want != got {
t.Errorf("want %v, got %v", want, got)
@ -319,7 +319,8 @@ func TestStream(t *testing.T) {
}
curTransaction++
if curTransaction == len(transactions) {
bls.Stop()
// Launch as goroutine to prevent deadlock.
go bls.Stop()
}
// Uncomment the following lines to produce a different set of
// expected outputs. You'll need to massage the file a bit afterwards.
@ -350,7 +351,8 @@ func TestRotation(t *testing.T) {
bls := NewBinlogStreamer("db", testfiles.Locate("mysqlctl_test/vt-0000041983-bin"))
err := bls.Stream("vt-0000041983-bin.000004", 2682, func(tx *proto.BinlogTransaction) error {
bls.Stop()
// Launch as goroutine to prevent deadlock.
go bls.Stop()
return nil
})
if err != nil {

Просмотреть файл

@ -54,19 +54,13 @@ func registerConnFlags(connParams *mysql.ConnectionParams, name string, defaultP
}
// vtocc will only register the app flags, it doesn't do any dba or repl
// access.
func RegisterAppFlags(defaultDBConfig DBConfig) {
registerConnFlags(&dbConfigs.App.ConnectionParams, "app", defaultDBConfig.ConnectionParams)
flag.StringVar(&dbConfigs.App.Keyspace, "db-config-app-keyspace", defaultDBConfig.Keyspace, "db app connection keyspace")
flag.StringVar(&dbConfigs.App.Shard, "db-config-app-shard", defaultDBConfig.Shard, "db app connection shard")
}
// vttablet will register client, dba and repl.
func RegisterCommonFlags() {
func RegisterFlags() {
registerConnFlags(&dbConfigs.Dba, "dba", DefaultDBConfigs.Dba)
registerConnFlags(&dbConfigs.Repl, "repl", DefaultDBConfigs.Repl)
RegisterAppFlags(DefaultDBConfigs.App)
registerConnFlags(&dbConfigs.App.ConnectionParams, "app", DefaultDBConfigs.App.ConnectionParams)
flag.StringVar(&dbConfigs.App.Keyspace, "db-config-app-keyspace", DefaultDBConfigs.App.Keyspace, "db app connection keyspace")
flag.StringVar(&dbConfigs.App.Shard, "db-config-app-shard", DefaultDBConfigs.App.Shard, "db app connection shard")
}
// InitConnectionParams may overwrite the socket file,
@ -106,8 +100,10 @@ func MysqlParams(cp *mysql.ConnectionParams) (mysql.ConnectionParams, error) {
// shard.
type DBConfig struct {
mysql.ConnectionParams
Keyspace string `json:"keyspace"`
Shard string `json:"shard"`
Keyspace string `json:"keyspace"`
Shard string `json:"shard"`
EnableRowcache bool `json:"enable_rowcache"`
EnableInvalidator bool `json:"enable_invalidator"`
}
func (d *DBConfig) String() string {
@ -146,21 +142,11 @@ func (dbcfgs *DBConfigs) Redact() {
dbcfgs.Repl.Redact()
}
// Initialize only the app side of the db configs (for vtocc)
func InitApp(socketFile string) (*DBConfig, error) {
// Initialize app, dba and repl configs
func Init(socketFile string) (*DBConfigs, error) {
if err := InitConnectionParams(&dbConfigs.App.ConnectionParams, socketFile); err != nil {
return nil, err
}
return &dbConfigs.App, nil
}
// Initialize app, dba and repl configs
func Init(socketFile string) (*DBConfigs, error) {
if _, err := InitApp(socketFile); err != nil {
return nil, err
}
// init configs
if err := InitConnectionParams(&dbConfigs.Dba, socketFile); err != nil {
return nil, err
}

Просмотреть файл

@ -55,6 +55,10 @@ func NewMysqld(config *Mycnf, dba, repl *mysql.ConnectionParams) *Mysqld {
}
}
func (mt *Mysqld) Cnf() *Mycnf {
return mt.config
}
func (mt *Mysqld) createDbaConnection() (*mysql.Connection, error) {
params, err := dbconfigs.MysqlParams(mt.dbaParams)
if err != nil {

Просмотреть файл

@ -97,8 +97,7 @@ func NewCachePool(name string, rowCacheConfig RowCacheConfig, queryTimeout time.
func (cp *CachePool) Open() {
if cp.rowCacheConfig.Binary == "" {
log.Infof("rowcache not enabled")
return
panic(NewTabletError(FATAL, "rowcache binary not specified"))
}
cp.startMemcache()
log.Infof("rowcache is enabled")

Просмотреть файл

@ -78,6 +78,9 @@ type CacheInvalidator interface {
Delete(key string) bool
}
// NewQueryEngine creates a new QueryEngine.
// This is a singleton class.
// You must call this only once.
func NewQueryEngine(config Config) *QueryEngine {
qe := &QueryEngine{}
@ -115,14 +118,18 @@ func NewQueryEngine(config Config) *QueryEngine {
return qe
}
func (qe *QueryEngine) Open(info *mysql.ConnectionParams, schemaOverrides []SchemaOverride, qrs *QueryRules) {
func (qe *QueryEngine) Open(info *mysql.ConnectionParams, schemaOverrides []SchemaOverride, qrs *QueryRules, enableRowcache bool) {
// Wait for Close, in case it's running
qe.mu.Lock()
defer qe.mu.Unlock()
connFactory := GenericConnectionCreator(info)
qe.cachePool.Open()
if enableRowcache {
qe.cachePool.Open()
} else {
log.Infof("rowcache not enabled")
}
start := time.Now()
qe.schemaInfo.Open(connFactory, schemaOverrides, qe.cachePool, qrs)
log.Infof("Time taken to load the schema: %v", time.Now().Sub(start))

Просмотреть файл

@ -13,6 +13,7 @@ import (
log "github.com/golang/glog"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
@ -146,9 +147,9 @@ func RegisterQueryService() {
// AllowQueries can take an indefinite amount of time to return because
// it keeps retrying until it obtains a valid connection to the database.
func AllowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides []SchemaOverride, qrs *QueryRules) {
func AllowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides []SchemaOverride, qrs *QueryRules, mysqld *mysqlctl.Mysqld) {
defer logError()
SqlQueryRpcService.allowQueries(dbconfig, schemaOverrides, qrs)
SqlQueryRpcService.allowQueries(dbconfig, schemaOverrides, qrs, mysqld)
}
// DisallowQueries can take a long time to return (not indefinite) because

Просмотреть файл

@ -6,7 +6,7 @@ package tabletserver
import (
"fmt"
"io"
"sync"
"time"
log "github.com/golang/glog"
@ -16,109 +16,112 @@ import (
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/binlog"
blproto "github.com/youtube/vitess/go/vt/binlog/proto"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/sqlparser"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
const (
RCINV_DISABLED int64 = iota
RCINV_ENABLED
RCINV_SHUTTING_DOWN
)
type RowcacheInvalidator struct {
qe *QueryEngine
svm sync2.ServiceManager
var rcinvStateNames = map[int64]string{
RCINV_DISABLED: "Disabled",
RCINV_ENABLED: "Enabled",
RCINV_SHUTTING_DOWN: "ShuttingDown",
}
type InvalidationProcessor struct {
// mu mainly protects access to evs by Open and Close.
mu sync.Mutex
dbname string
mysqld *mysqlctl.Mysqld
evs *binlog.EventStreamer
GroupId sync2.AtomicInt64
state sync2.AtomicInt64
}
var CacheInvalidationProcessor *InvalidationProcessor
func init() {
CacheInvalidationProcessor = new(InvalidationProcessor)
stats.Publish("RowcacheInvalidationState", stats.StringFunc(func() string {
return rcinvStateNames[CacheInvalidationProcessor.state.Get()]
}))
stats.Publish("RowcacheInvalidationCheckPoint", stats.IntFunc(func() int64 {
return CacheInvalidationProcessor.GroupId.Get()
}))
// NewRowcacheInvalidator creates a new RowcacheInvalidator.
// Just like QueryEngine, this is a singleton class.
// You must call this only once.
func NewRowcacheInvalidator(qe *QueryEngine) *RowcacheInvalidator {
rci := &RowcacheInvalidator{qe: qe}
stats.Publish("RowcacheInvalidatorState", stats.StringFunc(rci.svm.StateName))
stats.Publish("RowcacheInvalidatorPosition", stats.IntFunc(rci.GroupId.Get))
return rci
}
func StartRowCacheInvalidation() {
go CacheInvalidationProcessor.runInvalidationLoop()
}
func StopRowCacheInvalidation() {
CacheInvalidationProcessor.stopRowCacheInvalidation()
}
func (rowCache *InvalidationProcessor) stopRowCacheInvalidation() {
if !rowCache.state.CompareAndSwap(RCINV_ENABLED, RCINV_SHUTTING_DOWN) {
log.Infof("Rowcache invalidator is not enabled")
}
}
func (rowCache *InvalidationProcessor) runInvalidationLoop() {
if !IsCachePoolAvailable() {
log.Infof("Rowcache is not enabled. Not running invalidator.")
return
}
if !rowCache.state.CompareAndSwap(RCINV_DISABLED, RCINV_ENABLED) {
log.Infof("Rowcache invalidator already running")
return
}
defer func() {
rowCache.state.Set(RCINV_DISABLED)
}()
groupId, err := binlog.GetReplicationPosition()
// Open runs the invalidation loop.
func (rci *RowcacheInvalidator) Open(dbname string, mysqld *mysqlctl.Mysqld) {
rp, err := mysqld.MasterStatus()
if err != nil {
log.Errorf("Rowcache invalidator could not start: cannot determine replication position: %v", err)
return
panic(NewTabletError(FATAL, "Rowcache invalidator aborting: cannot determine replication position: %v", err))
}
if mysqld.Cnf().BinLogPath == "" {
panic(NewTabletError(FATAL, "Rowcache invalidator aborting: binlog path not specified"))
}
rowCache.GroupId.Set(groupId)
log.Infof("Starting rowcache invalidator at: %d", groupId)
ok := rci.svm.Go(func(_ *sync2.ServiceManager) {
rci.mu.Lock()
rci.dbname = dbname
rci.mysqld = mysqld
rci.evs = binlog.NewEventStreamer(dbname, mysqld.Cnf().BinLogPath)
rci.GroupId.Set(rp.MasterLogGroupId)
rci.mu.Unlock()
rci.run()
rci.mu.Lock()
rci.evs = nil
rci.mu.Unlock()
})
if ok {
log.Infof("Rowcache invalidator starting, dbname: %s, path: %s, logfile: %s, position: %d", dbname, mysqld.Cnf().BinLogPath, rp.MasterLogFile, rp.MasterLogPosition)
} else {
log.Infof("Rowcache invalidator already running")
}
}
// Close terminates the invalidation loop. It returns only of the
// loop has terminated.
func (rci *RowcacheInvalidator) Close() {
rci.mu.Lock()
defer rci.mu.Unlock()
if rci.evs == nil {
log.Infof("Rowcache is not running")
return
}
rci.evs.Stop()
rci.evs = nil
}
func (rci *RowcacheInvalidator) run() {
for {
// We wrap this code in a func so we can catch all panics.
// If an error is returned, we log it, wait 1 second, and retry.
// Rowcache can only be stopped by calling StopRowCacheInvalidation.
// This loop can only be stopped by calling Close.
err := func() (inner error) {
defer func() {
if x := recover(); x != nil {
inner = fmt.Errorf("%v: uncaught panic:\n%s", x, tb.Stack(4))
}
}()
req := &blproto.UpdateStreamRequest{GroupId: rowCache.GroupId.Get()}
return binlog.ServeUpdateStream(req, func(reply *blproto.StreamEvent) error {
return rowCache.processEvent(reply)
rp, err := rci.mysqld.BinlogInfo(rci.GroupId.Get())
if err != nil {
return err
}
return rci.evs.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), func(reply *blproto.StreamEvent) error {
return rci.processEvent(reply)
})
}()
if err == nil {
break
}
log.Errorf("binlog.ServeUpdateStream returned err '%v'", err.Error())
log.Errorf("binlog.ServeUpdateStream returned err '%v', retrying in 1 second.", err.Error())
internalErrors.Add("Invalidation", 1)
time.Sleep(1 * time.Second)
}
log.Infof("Rowcache invalidator stopped")
}
func (rowCache *InvalidationProcessor) processEvent(event *blproto.StreamEvent) error {
if rowCache.state.Get() != RCINV_ENABLED {
return io.EOF
}
func (rci *RowcacheInvalidator) processEvent(event *blproto.StreamEvent) error {
switch event.Category {
case "DDL":
InvalidateForDDL(&proto.DDLInvalidate{DDL: event.Sql})
case "DML":
rowCache.handleDmlEvent(event)
rci.handleDmlEvent(event)
case "ERR":
dbname, err := sqlparser.GetDBName(event.Sql)
// TODO(sougou): Also check if dbname matches current db name
@ -130,7 +133,7 @@ func (rowCache *InvalidationProcessor) processEvent(event *blproto.StreamEvent)
infoErrors.Add("Invalidation", 1)
}
case "POS":
rowCache.GroupId.Set(event.GroupId)
rci.GroupId.Set(event.GroupId)
default:
log.Errorf("unknown event: %#v", event)
internalErrors.Add("Invalidation", 1)
@ -138,7 +141,7 @@ func (rowCache *InvalidationProcessor) processEvent(event *blproto.StreamEvent)
return nil
}
func (rowCache *InvalidationProcessor) handleDmlEvent(event *blproto.StreamEvent) {
func (rci *RowcacheInvalidator) handleDmlEvent(event *blproto.StreamEvent) {
dml := new(proto.DmlType)
dml.Table = event.TableName
dml.Keys = make([]string, 0, len(event.PKValues))

Просмотреть файл

@ -19,6 +19,7 @@ import (
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/tb"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/mysqlctl"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
@ -66,6 +67,7 @@ type SqlQuery struct {
state sync2.AtomicInt64
qe *QueryEngine
rci *RowcacheInvalidator
sessionId int64
dbconfig *dbconfigs.DBConfig
}
@ -73,6 +75,7 @@ type SqlQuery struct {
func NewSqlQuery(config Config) *SqlQuery {
sq := &SqlQuery{}
sq.qe = NewQueryEngine(config)
sq.rci = NewRowcacheInvalidator(sq.qe)
stats.PublishJSONFunc("Voltron", sq.statsJSON)
stats.Publish("TabletState", stats.IntFunc(sq.state.Get))
stats.Publish("TabletStateName", stats.StringFunc(sq.GetState))
@ -90,12 +93,13 @@ func (sq *SqlQuery) setState(state int64) {
sq.state.Set(state)
}
func (sq *SqlQuery) allowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides []SchemaOverride, qrs *QueryRules) {
func (sq *SqlQuery) allowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides []SchemaOverride, qrs *QueryRules, mysqld *mysqlctl.Mysqld) {
sq.statemu.Lock()
defer sq.statemu.Unlock()
v := sq.state.Get()
switch v {
case CONNECTING, ABORT, SERVING:
sq.statemu.Unlock()
log.Infof("Ignoring allowQueries request, current state: %v", v)
return
case INITIALIZING, SHUTTING_DOWN:
@ -103,38 +107,34 @@ func (sq *SqlQuery) allowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides [
}
// state is NOT_SERVING
sq.setState(CONNECTING)
sq.statemu.Unlock()
// Try connecting. disallowQueries can change the state to ABORT during this time.
waitTime := time.Second
for {
params, err := dbconfigs.MysqlParams(&dbconfig.ConnectionParams)
if err == nil {
c, err := mysql.Connect(params)
// When this function exits, state can be CONNECTING or ABORT
func() {
sq.statemu.Unlock()
defer sq.statemu.Lock()
waitTime := time.Second
// disallowQueries can change the state to ABORT during this time.
for sq.state.Get() != ABORT {
params, err := dbconfigs.MysqlParams(&dbconfig.ConnectionParams)
if err == nil {
c.Close()
break
c, err := mysql.Connect(params)
if err == nil {
c.Close()
break
}
log.Errorf("mysql.Connect() error: %v", err)
} else {
log.Errorf("dbconfigs.MysqlParams error: %v", err)
}
time.Sleep(waitTime)
// Cap at 32 seconds
if waitTime < 30*time.Second {
waitTime = waitTime * 2
}
log.Errorf("mysql.Connect() error: %v", err)
} else {
log.Errorf("dbconfigs.MysqlParams error: %v", err)
}
time.Sleep(waitTime)
// Cap at 32 seconds
if waitTime < 30*time.Second {
waitTime = waitTime * 2
}
if sq.state.Get() == ABORT {
// Exclusive transition. No need to lock statemu.
sq.setState(NOT_SERVING)
log.Infof("allowQueries aborting")
return
}
}
}()
// Connection successful. Keep statemu locked.
sq.statemu.Lock()
defer sq.statemu.Unlock()
if sq.state.Get() == ABORT {
sq.setState(NOT_SERVING)
log.Infof("allowQueries aborting")
@ -146,13 +146,17 @@ func (sq *SqlQuery) allowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides [
if x := recover(); x != nil {
log.Errorf("%s", x.(*TabletError).Message)
sq.qe.Close()
sq.rci.Close()
sq.setState(NOT_SERVING)
return
}
sq.setState(SERVING)
}()
sq.qe.Open(&dbconfig.ConnectionParams, schemaOverrides, qrs)
sq.qe.Open(&dbconfig.ConnectionParams, schemaOverrides, qrs, dbconfig.EnableRowcache)
if dbconfig.EnableRowcache && dbconfig.EnableInvalidator {
sq.rci.Open(dbconfig.DbName, mysqld)
}
sq.dbconfig = dbconfig
sq.sessionId = Rand()
log.Infof("Session id: %d", sq.sessionId)
@ -161,6 +165,7 @@ func (sq *SqlQuery) allowQueries(dbconfig *dbconfigs.DBConfig, schemaOverrides [
func (sq *SqlQuery) disallowQueries() {
sq.statemu.Lock()
defer sq.statemu.Unlock()
switch sq.state.Get() {
case CONNECTING:
sq.setState(ABORT)
@ -178,6 +183,7 @@ func (sq *SqlQuery) disallowQueries() {
log.Infof("Stopping query service: %d", sq.sessionId)
sq.qe.Close()
sq.rci.Close()
sq.sessionId = 0
sq.dbconfig = &dbconfigs.DBConfig{}
}

Просмотреть файл

@ -42,7 +42,8 @@ func InitAgent(
dbcfgs *dbconfigs.DBConfigs,
mycnf *mysqlctl.Mycnf,
port, securePort int,
overridesFile string) (agent *tabletmanager.ActionAgent, err error) {
overridesFile string,
) (agent *tabletmanager.ActionAgent, err error) {
schemaOverrides := loadSchemaOverrides(overridesFile)
topoServer := topo.GetServer()
@ -98,6 +99,11 @@ func InitAgent(
}
dbcfgs.App.Keyspace = newTablet.Keyspace
dbcfgs.App.Shard = newTablet.Shard
if newTablet.Type != topo.TYPE_MASTER {
dbcfgs.App.EnableInvalidator = true
} else {
dbcfgs.App.EnableInvalidator = false
}
// Transitioning from replica to master, first disconnect
// existing connections. "false" indicateds that clients must
// re-resolve their endpoint before reconnecting.
@ -115,16 +121,12 @@ func InitAgent(
qrs.Add(qr)
}
}
ts.AllowQueries(&dbcfgs.App, schemaOverrides, qrs)
ts.AllowQueries(&dbcfgs.App, schemaOverrides, qrs, mysqld)
// Disable before enabling to force existing streams to stop.
binlog.DisableUpdateStreamService()
binlog.EnableUpdateStreamService(dbcfgs)
if newTablet.Type != topo.TYPE_MASTER {
ts.StartRowCacheInvalidation()
}
} else {
ts.DisallowQueries()
ts.StopRowCacheInvalidation()
binlog.DisableUpdateStreamService()
}

Просмотреть файл

@ -313,6 +313,7 @@ class VtoccTestEnv(TestEnv):
memcache = self.mysqldir+"/memcache.sock"
occ_args.extend(["-rowcache-bin", "memcached"])
occ_args.extend(["-rowcache-socket", memcache])
occ_args.extend(["-enable-rowcache"])
self.vtstderr = open("/tmp/vtocc_stderr.log", "a+")
self.vtstdout = open("/tmp/vtocc_stdout.log", "a+")

Просмотреть файл

@ -127,7 +127,7 @@ class RowCacheInvalidator(unittest.TestCase):
invalidatorStats = self.replica_vars()
logging.debug("Invalidations %d InvalidatorStats %s" %
(invalidations,
invalidatorStats['RowcacheInvalidationCheckPoint']))
invalidatorStats['RowcacheInvalidatorPosition']))
self.assertTrue(invalidations > 0, "Invalidations are not flowing through.")
res = replica_tablet.mquery('vt_test_keyspace',
@ -220,8 +220,8 @@ class RowCacheInvalidator(unittest.TestCase):
# check and display some stats
invalidatorStats = self.replica_vars()
logging.debug("invalidatorStats %s" %
invalidatorStats['RowcacheInvalidationCheckPoint'])
self.assertEqual(invalidatorStats["RowcacheInvalidationState"], "Enabled",
invalidatorStats['RowcacheInvalidatorPosition'])
self.assertEqual(invalidatorStats["RowcacheInvalidatorState"], "Running",
"Row-cache invalidator should be enabled")
def test_cache_hit(self):
@ -256,17 +256,17 @@ class RowCacheInvalidator(unittest.TestCase):
time.sleep(0.1)
invStats_after = self.replica_vars()
logging.debug("Got state %s" %
invStats_after["RowcacheInvalidationState"])
if invStats_after["RowcacheInvalidationState"] == "Disabled":
invStats_after["RowcacheInvalidatorState"])
if invStats_after["RowcacheInvalidatorState"] == "Stopped":
break
# check all data is right
inv_after = self.replica_stats()['Totals']['Invalidations']
invStats_after = self.replica_vars()
logging.debug("Tablet Replica->Spare\n\tBefore: Invalidations: %d InvalidatorStats %s\n\tAfter: Invalidations: %d InvalidatorStats %s" % (inv_before, invStats_before['RowcacheInvalidationCheckPoint'], inv_after, invStats_after['RowcacheInvalidationCheckPoint']))
logging.debug("Tablet Replica->Spare\n\tBefore: Invalidations: %d InvalidatorStats %s\n\tAfter: Invalidations: %d InvalidatorStats %s" % (inv_before, invStats_before['RowcacheInvalidatorPosition'], inv_after, invStats_after['RowcacheInvalidatorPosition']))
self.assertEqual(inv_after, 0,
"Row-cache invalid. should be disabled, no invalidations")
self.assertEqual(invStats_after["RowcacheInvalidationState"], "Disabled",
self.assertEqual(invStats_after["RowcacheInvalidatorState"], "Stopped",
"Row-cache invalidator should be disabled")
# and restore the type

Просмотреть файл

@ -301,6 +301,7 @@ class Tablet(object):
args.extend(["-rowcache-bin", "memcached"])
memcache_socket = os.path.join(self.tablet_dir, "memcache.sock")
args.extend(["-rowcache-socket", memcache_socket])
args.extend(["-enable-rowcache"])
if auth:
args.extend(['-auth-credentials', os.path.join(environment.vttop, 'test', 'test_data', 'authcredentials_test.json')])