зеркало из https://github.com/github/vitess-gh.git
tabletserver: Make ReloadSchema synchronous.
This commit is contained in:
Родитель
cc3f0c9550
Коммит
b4f4f8ddc9
|
@ -9,6 +9,8 @@ import (
|
|||
"fmt"
|
||||
"net/url"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/streamlog"
|
||||
"github.com/youtube/vitess/go/vt/dbconfigs"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
|
@ -162,7 +164,7 @@ type Controller interface {
|
|||
ClearQueryPlanCache()
|
||||
|
||||
// ReloadSchema makes the quey service reload its schema cache
|
||||
ReloadSchema()
|
||||
ReloadSchema(ctx context.Context) error
|
||||
|
||||
// RegisterQueryRuleSource adds a query rule source
|
||||
RegisterQueryRuleSource(ruleSource string)
|
||||
|
|
|
@ -172,7 +172,7 @@ func (dbc *DBConn) Recycle() {
|
|||
func (dbc *DBConn) Kill(reason string) error {
|
||||
dbc.queryServiceStats.KillStats.Add("Queries", 1)
|
||||
log.Infof("Due to %s, killing query %s", reason, dbc.Current())
|
||||
killConn, err := dbc.pool.dbaPool.Get(0)
|
||||
killConn, err := dbc.pool.dbaPool.Get(context.TODO())
|
||||
if err != nil {
|
||||
log.Warningf("Failed to get conn from dba pool: %v", err)
|
||||
// TODO(aaijazi): Find the right error code for an internal error that we don't want to retry
|
||||
|
|
|
@ -14,6 +14,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/mysql"
|
||||
"github.com/youtube/vitess/go/sqltypes"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/endtoend/framework"
|
||||
|
@ -231,7 +233,7 @@ func TestSchemaReload(t *testing.T) {
|
|||
_, _ = conn.ExecuteFetch("drop table vitess_temp", 10, false)
|
||||
conn.Close()
|
||||
}()
|
||||
framework.Server.ReloadSchema()
|
||||
framework.Server.ReloadSchema(context.Background())
|
||||
client := framework.NewClient()
|
||||
waitTime := 50 * time.Millisecond
|
||||
for i := 0; i < 10; i++ {
|
||||
|
|
|
@ -21,6 +21,7 @@ import (
|
|||
"github.com/youtube/vitess/go/stats"
|
||||
"github.com/youtube/vitess/go/timer"
|
||||
"github.com/youtube/vitess/go/trace"
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
querypb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
|
||||
"github.com/youtube/vitess/go/vt/schema"
|
||||
|
@ -90,6 +91,10 @@ type SchemaInfo struct {
|
|||
lastChange int64
|
||||
reloadTime time.Duration
|
||||
|
||||
// actionMutex serializes all calls to state-altering methods:
|
||||
// Open, Close, Reload, DropTable, CreateOrUpdateTable.
|
||||
actionMutex sync.Mutex
|
||||
|
||||
// The following vars are either read-only or have
|
||||
// their own synchronization.
|
||||
queries *cache.LRUCache
|
||||
|
@ -144,6 +149,9 @@ func NewSchemaInfo(
|
|||
|
||||
// Open initializes the current SchemaInfo for service by loading the necessary info from the specified database.
|
||||
func (si *SchemaInfo) Open(dbaParams *sqldb.ConnParams, strictMode bool) {
|
||||
si.actionMutex.Lock()
|
||||
defer si.actionMutex.Unlock()
|
||||
|
||||
ctx := context.Background()
|
||||
si.connPool.Open(dbaParams, dbaParams)
|
||||
// Get time first because it needs a connection from the pool.
|
||||
|
@ -183,7 +191,8 @@ func (si *SchemaInfo) Open(dbaParams *sqldb.ConnParams, strictMode bool) {
|
|||
row[3].String(), // table_comment
|
||||
)
|
||||
if err != nil {
|
||||
si.recordSchemaError(err, tableName)
|
||||
si.queryServiceStats.InternalErrors.Add("Schema", 1)
|
||||
log.Errorf("SchemaInfo.Open: failed to create TableInfo for table %s: %v", tableName, err)
|
||||
// Skip over the table that had an error and move on to the next one
|
||||
return
|
||||
}
|
||||
|
@ -207,19 +216,20 @@ func (si *SchemaInfo) Open(dbaParams *sqldb.ConnParams, strictMode bool) {
|
|||
}()
|
||||
// Clear is not really needed. Doing it for good measure.
|
||||
si.queries.Clear()
|
||||
si.ticks.Start(si.Reload)
|
||||
}
|
||||
|
||||
// Records an error that occurs when getting the schema for a table.
|
||||
func (si *SchemaInfo) recordSchemaError(err error, tableName string) {
|
||||
terr := PrefixTabletError(vtrpcpb.ErrorCode_INTERNAL_ERROR, err,
|
||||
fmt.Sprintf("Could not load schema for table %s: ", tableName))
|
||||
log.Error(terr)
|
||||
si.queryServiceStats.InternalErrors.Add("Schema", 1)
|
||||
si.ticks.Start(func() {
|
||||
if err := si.Reload(ctx); err != nil {
|
||||
log.Errorf("periodic schema reload failed: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Close shuts down SchemaInfo. It can be re-opened after Close.
|
||||
func (si *SchemaInfo) Close() {
|
||||
// Make sure there are no reloads going on, as they depend on SchemaInfo
|
||||
// staying open during the reload.
|
||||
si.actionMutex.Lock()
|
||||
defer si.actionMutex.Unlock()
|
||||
|
||||
si.ticks.Stop()
|
||||
si.connPool.Close()
|
||||
si.queries.Clear()
|
||||
|
@ -229,11 +239,28 @@ func (si *SchemaInfo) Close() {
|
|||
si.tables = nil
|
||||
}
|
||||
|
||||
// Reload reloads the schema info from the db. Any tables that have changed
|
||||
// since the last load are updated.
|
||||
func (si *SchemaInfo) Reload() {
|
||||
// IsClosed returns true if the SchemaInfo is closed.
|
||||
func (si *SchemaInfo) IsClosed() bool {
|
||||
si.mu.Lock()
|
||||
defer si.mu.Unlock()
|
||||
return si.tables == nil
|
||||
}
|
||||
|
||||
// Reload reloads the schema info from the db.
|
||||
// Any tables that have changed since the last load are updated.
|
||||
// This is a no-op if the SchemaInfo is closed.
|
||||
func (si *SchemaInfo) Reload(ctx context.Context) error {
|
||||
defer logError(si.queryServiceStats)
|
||||
ctx := context.Background()
|
||||
|
||||
// Reload() gets called both from the ticker, and from external RPCs.
|
||||
// We don't want them to race over writing data that was read concurrently.
|
||||
si.actionMutex.Lock()
|
||||
defer si.actionMutex.Unlock()
|
||||
|
||||
if si.IsClosed() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get time first because it needs a connection from the pool.
|
||||
curTime := si.mysqlTime(ctx)
|
||||
|
||||
|
@ -245,33 +272,34 @@ func (si *SchemaInfo) Reload() {
|
|||
tableData, err = conn.Exec(ctx, baseShowTables, maxTableCount, false)
|
||||
}()
|
||||
if err != nil {
|
||||
log.Warningf("Could not get table list for reload: %v", err)
|
||||
return
|
||||
return fmt.Errorf("could not get table list for reload: %v", err)
|
||||
}
|
||||
log.Infof("Reloading schema")
|
||||
|
||||
// Reload any tables that have changed. We try every table even if some fail,
|
||||
// but we return success only if all tables succeed.
|
||||
// The following section requires us to hold mu.
|
||||
func() {
|
||||
si.mu.Lock()
|
||||
defer si.mu.Unlock()
|
||||
for _, row := range tableData.Rows {
|
||||
tableName := row[0].String()
|
||||
createTime, _ := row[2].ParseInt64()
|
||||
// Check if we know about the table or it has been recreated.
|
||||
if _, ok := si.tables[tableName]; !ok || createTime >= si.lastChange {
|
||||
func() {
|
||||
// Unlock so CreateOrUpdateTable can lock.
|
||||
si.mu.Unlock()
|
||||
defer si.mu.Lock()
|
||||
log.Infof("Reloading: %s", tableName)
|
||||
si.CreateOrUpdateTable(ctx, tableName)
|
||||
}()
|
||||
continue
|
||||
}
|
||||
// Only update table_rows, data_length, index_length
|
||||
si.tables[tableName].SetMysqlStats(row[4], row[5], row[6], row[7])
|
||||
errs := &concurrency.AllErrorRecorder{}
|
||||
si.mu.Lock()
|
||||
defer si.mu.Unlock()
|
||||
for _, row := range tableData.Rows {
|
||||
tableName := row[0].String()
|
||||
createTime, _ := row[2].ParseInt64()
|
||||
// Check if we know about the table or it has been recreated.
|
||||
if _, ok := si.tables[tableName]; !ok || createTime >= si.lastChange {
|
||||
func() {
|
||||
// Unlock so CreateOrUpdateTable can lock.
|
||||
si.mu.Unlock()
|
||||
defer si.mu.Lock()
|
||||
log.Infof("Reloading schema for table: %s", tableName)
|
||||
errs.RecordError(si.createOrUpdateTableLocked(ctx, tableName))
|
||||
}()
|
||||
continue
|
||||
}
|
||||
si.lastChange = curTime
|
||||
}()
|
||||
// Only update table_rows, data_length, index_length
|
||||
si.tables[tableName].SetMysqlStats(row[4], row[5], row[6], row[7])
|
||||
}
|
||||
si.lastChange = curTime
|
||||
return errs.Error()
|
||||
}
|
||||
|
||||
func (si *SchemaInfo) mysqlTime(ctx context.Context) int64 {
|
||||
|
@ -291,29 +319,31 @@ func (si *SchemaInfo) mysqlTime(ctx context.Context) int64 {
|
|||
return t
|
||||
}
|
||||
|
||||
// safe to call this if Close has been called, as si.ticks will be stopped
|
||||
// and won't fire
|
||||
func (si *SchemaInfo) triggerReload() {
|
||||
si.ticks.Trigger()
|
||||
}
|
||||
|
||||
// ClearQueryPlanCache should be called if query plan cache is potentially obsolete
|
||||
func (si *SchemaInfo) ClearQueryPlanCache() {
|
||||
si.queries.Clear()
|
||||
}
|
||||
|
||||
// CreateOrUpdateTable must be called if a DDL was applied to that table.
|
||||
func (si *SchemaInfo) CreateOrUpdateTable(ctx context.Context, tableName string) {
|
||||
func (si *SchemaInfo) CreateOrUpdateTable(ctx context.Context, tableName string) error {
|
||||
si.actionMutex.Lock()
|
||||
defer si.actionMutex.Unlock()
|
||||
return si.createOrUpdateTableLocked(ctx, tableName)
|
||||
}
|
||||
|
||||
// createOrUpdateTableLocked must only be called while holding actionMutex.
|
||||
func (si *SchemaInfo) createOrUpdateTableLocked(ctx context.Context, tableName string) error {
|
||||
conn := getOrPanic(ctx, si.connPool)
|
||||
defer conn.Recycle()
|
||||
tableData, err := conn.Exec(ctx, fmt.Sprintf("%s and table_name = '%s'", baseShowTables, tableName), 1, false)
|
||||
if err != nil {
|
||||
si.recordSchemaError(err, tableName)
|
||||
return
|
||||
si.queryServiceStats.InternalErrors.Add("Schema", 1)
|
||||
return PrefixTabletError(vtrpcpb.ErrorCode_INTERNAL_ERROR, err,
|
||||
fmt.Sprintf("CreateOrUpdateTable: information_schema query failed for table %s: ", tableName))
|
||||
}
|
||||
if len(tableData.Rows) != 1 {
|
||||
// This can happen if DDLs race with each other.
|
||||
return
|
||||
return nil
|
||||
}
|
||||
row := tableData.Rows[0]
|
||||
tableInfo, err := NewTableInfo(
|
||||
|
@ -323,8 +353,9 @@ func (si *SchemaInfo) CreateOrUpdateTable(ctx context.Context, tableName string)
|
|||
row[3].String(), // table_comment
|
||||
)
|
||||
if err != nil {
|
||||
si.recordSchemaError(err, tableName)
|
||||
return
|
||||
si.queryServiceStats.InternalErrors.Add("Schema", 1)
|
||||
return PrefixTabletError(vtrpcpb.ErrorCode_INTERNAL_ERROR, err,
|
||||
fmt.Sprintf("CreateOrUpdateTable: failed to create TableInfo for table %s: ", tableName))
|
||||
}
|
||||
// table_rows, data_length, index_length
|
||||
tableInfo.SetMysqlStats(row[4], row[5], row[6], row[7])
|
||||
|
@ -347,10 +378,14 @@ func (si *SchemaInfo) CreateOrUpdateTable(ctx context.Context, tableName string)
|
|||
case schema.Sequence:
|
||||
log.Infof("Initialized sequence: %s", tableName)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// DropTable must be called if a table was dropped.
|
||||
func (si *SchemaInfo) DropTable(tableName string) {
|
||||
si.actionMutex.Lock()
|
||||
defer si.actionMutex.Unlock()
|
||||
|
||||
si.mu.Lock()
|
||||
defer si.mu.Unlock()
|
||||
|
||||
|
|
|
@ -143,6 +143,7 @@ func TestSchemaInfoOpenFailedDueToTableInfoErr(t *testing.T) {
|
|||
|
||||
func TestSchemaInfoReload(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ctx := context.Background()
|
||||
for query, result := range getSchemaInfoTestSupportedQueries() {
|
||||
db.AddQuery(query, result)
|
||||
}
|
||||
|
@ -157,7 +158,7 @@ func TestSchemaInfoReload(t *testing.T) {
|
|||
if tableInfo != nil {
|
||||
t.Fatalf("table: %s exists; expecting nil", newTable)
|
||||
}
|
||||
schemaInfo.Reload()
|
||||
schemaInfo.Reload(ctx)
|
||||
tableInfo = schemaInfo.GetTable(newTable)
|
||||
if tableInfo != nil {
|
||||
t.Fatalf("table: %s exists; expecting nil", newTable)
|
||||
|
@ -193,7 +194,7 @@ func TestSchemaInfoReload(t *testing.T) {
|
|||
Rows: [][]sqltypes.Value{createTestTableShowIndex("pk")},
|
||||
})
|
||||
|
||||
schemaInfo.Reload()
|
||||
schemaInfo.Reload(ctx)
|
||||
tableInfo = schemaInfo.GetTable(newTable)
|
||||
if tableInfo != nil {
|
||||
t.Fatalf("table: %s exists; expecting nil", newTable)
|
||||
|
@ -210,7 +211,9 @@ func TestSchemaInfoReload(t *testing.T) {
|
|||
if tableInfo != nil {
|
||||
t.Fatalf("table: %s exists; expecting nil", newTable)
|
||||
}
|
||||
schemaInfo.Reload()
|
||||
if err := schemaInfo.Reload(ctx); err != nil {
|
||||
t.Fatalf("schemaInfo.Reload() error: %v", err)
|
||||
}
|
||||
tableInfo = schemaInfo.GetTable(newTable)
|
||||
if tableInfo == nil {
|
||||
t.Fatalf("table: %s should exist", newTable)
|
||||
|
@ -379,6 +382,7 @@ func TestSchemaInfoExportVars(t *testing.T) {
|
|||
|
||||
func TestUpdatedMysqlStats(t *testing.T) {
|
||||
db := fakesqldb.Register()
|
||||
ctx := context.Background()
|
||||
for query, result := range getSchemaInfoTestSupportedQueries() {
|
||||
db.AddQuery(query, result)
|
||||
}
|
||||
|
@ -421,7 +425,9 @@ func TestUpdatedMysqlStats(t *testing.T) {
|
|||
Rows: [][]sqltypes.Value{createTestTableShowIndex("pk")},
|
||||
})
|
||||
|
||||
schemaInfo.Reload()
|
||||
if err := schemaInfo.Reload(ctx); err != nil {
|
||||
t.Fatalf("schemaInfo.Reload() error: %v", err)
|
||||
}
|
||||
tableInfo := schemaInfo.GetTable(tableName)
|
||||
if tableInfo == nil {
|
||||
t.Fatalf("table: %s should exist", tableName)
|
||||
|
@ -437,7 +443,9 @@ func TestUpdatedMysqlStats(t *testing.T) {
|
|||
createTestTableUpdatedStats(tableName),
|
||||
},
|
||||
})
|
||||
schemaInfo.Reload()
|
||||
if err := schemaInfo.Reload(ctx); err != nil {
|
||||
t.Fatalf("schemaInfo.Reload() error: %v", err)
|
||||
}
|
||||
tableInfo = schemaInfo.GetTable(tableName)
|
||||
tr2 := tableInfo.TableRows
|
||||
dl2 := tableInfo.DataLength
|
||||
|
|
|
@ -500,10 +500,9 @@ func (tsv *TabletServer) isMySQLReachable() bool {
|
|||
}
|
||||
|
||||
// ReloadSchema reloads the schema.
|
||||
// If the query service is not running, it's a no-op.
|
||||
func (tsv *TabletServer) ReloadSchema() {
|
||||
func (tsv *TabletServer) ReloadSchema(ctx context.Context) error {
|
||||
defer logError(tsv.qe.queryServiceStats)
|
||||
tsv.qe.schemaInfo.triggerReload()
|
||||
return tsv.qe.schemaInfo.Reload(ctx)
|
||||
}
|
||||
|
||||
// ClearQueryPlanCache clears internal query plan cache
|
||||
|
|
|
@ -8,6 +8,8 @@ package tabletservermock
|
|||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/dbconfigs"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl"
|
||||
querypb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
|
@ -127,7 +129,8 @@ func (tqsc *Controller) IsHealthy() error {
|
|||
}
|
||||
|
||||
// ReloadSchema is part of the tabletserver.Controller interface
|
||||
func (tqsc *Controller) ReloadSchema() {
|
||||
func (tqsc *Controller) ReloadSchema(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
//ClearQueryPlanCache is part of the tabletserver.Controller interface
|
||||
|
|
Загрузка…
Ссылка в новой задаче