Merge branch 'master' into resharding

This commit is contained in:
Alain Jobart 2015-05-14 11:58:57 -07:00
Родитель 0829bd0c85 aefde8715e
Коммит d2acc23b5a
12 изменённых файлов: 224 добавлений и 59 удалений

Просмотреть файл

@ -9,7 +9,6 @@ import (
"time"
"github.com/youtube/vitess/go/vt/concurrency"
"github.com/youtube/vitess/go/vt/topo"
)
var (
@ -26,11 +25,11 @@ func init() {
type Reporter interface {
// Report returns the replication delay gathered by this
// module (or 0 if it thinks it's not behind), assuming that
// its tablet type is TabletType, and that its query service
// it is a slave type or not, and that its query service
// should be running or not. If Report returns an error it
// implies that the tablet is in a bad shape and not able to
// handle queries.
Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error)
Report(isSlaveType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error)
// HTMLName returns a displayable name for the module.
// Can be used to be displayed in the status page.
@ -38,11 +37,11 @@ type Reporter interface {
}
// FunctionReporter is a function that may act as a Reporter.
type FunctionReporter func(topo.TabletType, bool) (time.Duration, error)
type FunctionReporter func(bool, bool) (time.Duration, error)
// Report implements Reporter.Report
func (fc FunctionReporter) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
return fc(tabletType, shouldQueryServiceBeRunning)
func (fc FunctionReporter) Report(isSlaveType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
return fc(isSlaveType, shouldQueryServiceBeRunning)
}
// HTMLName implements Reporter.HTMLName
@ -71,7 +70,7 @@ func NewAggregator() *Aggregator {
// The returned replication delay will be the highest of all the replication
// delays returned by the Reporter implementations (although typically
// only one implementation will actually return a meaningful one).
func (ag *Aggregator) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
func (ag *Aggregator) Report(isSlaveType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
var (
wg sync.WaitGroup
rec concurrency.AllErrorRecorder
@ -83,7 +82,7 @@ func (ag *Aggregator) Report(tabletType topo.TabletType, shouldQueryServiceBeRun
wg.Add(1)
go func(name string, rep Reporter) {
defer wg.Done()
replicationDelay, err := rep.Report(tabletType, shouldQueryServiceBeRunning)
replicationDelay, err := rep.Report(isSlaveType, shouldQueryServiceBeRunning)
if err != nil {
rec.RecordError(fmt.Errorf("%v: %v", name, err))
return

Просмотреть файл

@ -4,23 +4,21 @@ import (
"errors"
"testing"
"time"
"github.com/youtube/vitess/go/vt/topo"
)
func TestReporters(t *testing.T) {
ag := NewAggregator()
ag.Register("a", FunctionReporter(func(topo.TabletType, bool) (time.Duration, error) {
ag.Register("a", FunctionReporter(func(bool, bool) (time.Duration, error) {
return 10 * time.Second, nil
}))
ag.Register("b", FunctionReporter(func(topo.TabletType, bool) (time.Duration, error) {
ag.Register("b", FunctionReporter(func(bool, bool) (time.Duration, error) {
return 5 * time.Second, nil
}))
delay, err := ag.Report(topo.TYPE_REPLICA, true)
delay, err := ag.Report(true, true)
if err != nil {
t.Error(err)
@ -29,10 +27,10 @@ func TestReporters(t *testing.T) {
t.Errorf("delay=%v, want 10s", delay)
}
ag.Register("c", FunctionReporter(func(topo.TabletType, bool) (time.Duration, error) {
ag.Register("c", FunctionReporter(func(bool, bool) (time.Duration, error) {
return 0, errors.New("e error")
}))
if _, err := ag.Report(topo.TYPE_REPLICA, false); err == nil {
if _, err := ag.Report(true, false); err == nil {
t.Errorf("ag.Run: expected error")
}

Просмотреть файл

@ -116,9 +116,14 @@ func (hook *Hook) Execute() (result *HookResult) {
// Execute an optional hook, returns a printable error
func (hook *Hook) ExecuteOptional() error {
hr := hook.Execute()
if hr.ExitStatus == HOOK_DOES_NOT_EXIST {
switch hr.ExitStatus {
case HOOK_DOES_NOT_EXIST:
log.Infof("%v hook doesn't exist", hook.Name)
} else if hr.ExitStatus != HOOK_SUCCESS {
case HOOK_VTROOT_ERROR:
log.Infof("VTROOT not set, so %v hook doesn't exist", hook.Name)
case HOOK_SUCCESS:
// nothing to do here
default:
return fmt.Errorf("%v hook failed(%v): %v", hook.Name, hr.ExitStatus, hr.Stderr)
}
return nil

Просмотреть файл

@ -6,7 +6,6 @@ import (
"time"
"github.com/youtube/vitess/go/vt/health"
"github.com/youtube/vitess/go/vt/topo"
)
// mysqlReplicationLag implements health.Reporter
@ -15,8 +14,8 @@ type mysqlReplicationLag struct {
}
// Report is part of the health.Reporter interface
func (mrl *mysqlReplicationLag) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
if !topo.IsSlaveType(tabletType) {
func (mrl *mysqlReplicationLag) Report(isSlaveType, shouldQueryServiceBeRunning bool) (time.Duration, error) {
if !isSlaveType {
return 0, nil
}

Просмотреть файл

@ -9,7 +9,6 @@ import (
"encoding/hex"
"fmt"
"regexp"
"sort"
"strings"
"github.com/youtube/vitess/go/jscfg"
@ -17,10 +16,13 @@ import (
)
const (
// TABLE_BASE_TABLE indicates the table type is a base table.
TABLE_BASE_TABLE = "BASE TABLE"
TABLE_VIEW = "VIEW"
// TABLE_VIEW indicates the table type is a view.
TABLE_VIEW = "VIEW"
)
// TableDefinition contains all schema information about a table.
type TableDefinition struct {
Name string // the table name
Schema string // the SQL to run to create the table
@ -32,26 +34,20 @@ type TableDefinition struct {
// be approximate count)
}
// helper methods for sorting
// TableDefinitions is a list of TableDefinition.
type TableDefinitions []*TableDefinition
// Len returns TableDefinitions length.
func (tds TableDefinitions) Len() int {
return len(tds)
}
// Swap used for sorting TableDefinitions.
func (tds TableDefinitions) Swap(i, j int) {
tds[i], tds[j] = tds[j], tds[i]
}
// sort by reverse DataLength
type ByReverseDataLength struct {
TableDefinitions
}
func (bdl ByReverseDataLength) Less(i, j int) bool {
return bdl.TableDefinitions[j].DataLength < bdl.TableDefinitions[i].DataLength
}
// SchemaDefinition defines schema for a certain database.
type SchemaDefinition struct {
// the 'CREATE DATABASE...' statement, with db name as {{.DatabaseName}}
DatabaseSchema string
@ -67,10 +63,6 @@ func (sd *SchemaDefinition) String() string {
return jscfg.ToJSON(sd)
}
func (sd *SchemaDefinition) SortByReverseDataLength() {
sort.Sort(ByReverseDataLength{sd.TableDefinitions})
}
// FilterTables returns a copy which includes only
// whitelisted tables (tables), no blacklisted tables (excludeTables) and optionally views (includeViews).
func (sd *SchemaDefinition) FilterTables(tables, excludeTables []string, includeViews bool) (*SchemaDefinition, error) {
@ -141,6 +133,8 @@ func (sd *SchemaDefinition) FilterTables(tables, excludeTables []string, include
return &copy, nil
}
// GenerateSchemaVersion return a unique schema version string based on
// its TableDefinitions.
func (sd *SchemaDefinition) GenerateSchemaVersion() {
hasher := md5.New()
for _, td := range sd.TableDefinitions {
@ -151,6 +145,7 @@ func (sd *SchemaDefinition) GenerateSchemaVersion() {
sd.Version = hex.EncodeToString(hasher.Sum(nil))
}
// GetTable returns TableDefinition for a given table name.
func (sd *SchemaDefinition) GetTable(table string) (td *TableDefinition, ok bool) {
for _, td := range sd.TableDefinitions {
if td.Name == table {
@ -186,9 +181,16 @@ func (sd *SchemaDefinition) ToSQLStrings() []string {
return append(sqlStrings, createViewSql...)
}
// generates a report on what's different between two SchemaDefinition
// for now, we skip the VIEW entirely.
// DiffSchema generates a report on what's different between two SchemaDefinitions
// including views.
func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition, er concurrency.ErrorRecorder) {
if left == nil && right == nil {
return
}
if left == nil || right == nil {
er.RecordError(fmt.Errorf("%v and %v are different, %s: %v, %s: %v", leftName, rightName, leftName, left, rightName, right))
return
}
if left.DatabaseSchema != right.DatabaseSchema {
er.RecordError(fmt.Errorf("%v and %v don't agree on database creation command:\n%v\n differs from:\n%v", leftName, rightName, left.DatabaseSchema, right.DatabaseSchema))
}
@ -196,16 +198,6 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
leftIndex := 0
rightIndex := 0
for leftIndex < len(left.TableDefinitions) && rightIndex < len(right.TableDefinitions) {
// skip views
if left.TableDefinitions[leftIndex].Type == TABLE_VIEW {
leftIndex++
continue
}
if right.TableDefinitions[rightIndex].Type == TABLE_VIEW {
rightIndex++
continue
}
// extra table on the left side
if left.TableDefinitions[leftIndex].Name < right.TableDefinitions[rightIndex].Name {
er.RecordError(fmt.Errorf("%v has an extra table named %v", leftName, left.TableDefinitions[leftIndex].Name))
@ -224,6 +216,11 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
if left.TableDefinitions[leftIndex].Schema != right.TableDefinitions[rightIndex].Schema {
er.RecordError(fmt.Errorf("%v and %v disagree on schema for table %v:\n%v\n differs from:\n%v", leftName, rightName, left.TableDefinitions[leftIndex].Name, left.TableDefinitions[leftIndex].Schema, right.TableDefinitions[rightIndex].Schema))
}
if left.TableDefinitions[leftIndex].Type != right.TableDefinitions[rightIndex].Type {
er.RecordError(fmt.Errorf("%v and %v disagree on table type for table %v:\n%v\n differs from:\n%v", leftName, rightName, left.TableDefinitions[leftIndex].Name, left.TableDefinitions[leftIndex].Type, right.TableDefinitions[rightIndex].Type))
}
leftIndex++
rightIndex++
}
@ -232,26 +229,33 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
if left.TableDefinitions[leftIndex].Type == TABLE_BASE_TABLE {
er.RecordError(fmt.Errorf("%v has an extra table named %v", leftName, left.TableDefinitions[leftIndex].Name))
}
if left.TableDefinitions[leftIndex].Type == TABLE_VIEW {
er.RecordError(fmt.Errorf("%v has an extra view named %v", leftName, left.TableDefinitions[leftIndex].Name))
}
leftIndex++
}
for rightIndex < len(right.TableDefinitions) {
if right.TableDefinitions[rightIndex].Type == TABLE_BASE_TABLE {
er.RecordError(fmt.Errorf("%v has an extra table named %v", rightName, right.TableDefinitions[rightIndex].Name))
}
if right.TableDefinitions[rightIndex].Type == TABLE_VIEW {
er.RecordError(fmt.Errorf("%v has an extra view named %v", rightName, right.TableDefinitions[rightIndex].Name))
}
rightIndex++
}
}
// DiffSchemaToArray diffs two schemas and return the schema diffs if there is any.
func DiffSchemaToArray(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition) (result []string) {
er := concurrency.AllErrorRecorder{}
DiffSchema(leftName, left, rightName, right, &er)
if er.HasErrors() {
return er.ErrorStrings()
} else {
return nil
}
return nil
}
// SchemaChange contains all necessary information to apply a schema change.
type SchemaChange struct {
Sql string
Force bool
@ -260,6 +264,8 @@ type SchemaChange struct {
AfterSchema *SchemaDefinition
}
// SchemaChangeResult contains before and after table schemas for
// a schema change sql.
type SchemaChangeResult struct {
BeforeSchema *SchemaDefinition
AfterSchema *SchemaDefinition

Просмотреть файл

@ -6,6 +6,7 @@ package proto
import (
"errors"
"fmt"
"reflect"
"testing"
)
@ -161,11 +162,70 @@ func TestSchemaDiff(t *testing.T) {
},
},
}
testDiff(t, sd1, sd1, "sd1", "sd2", []string{})
sd2 := &SchemaDefinition{TableDefinitions: make([]*TableDefinition, 0, 2)}
sd3 := &SchemaDefinition{
TableDefinitions: []*TableDefinition{
&TableDefinition{
Name: "table2",
Schema: "schema2",
Type: TABLE_BASE_TABLE,
},
},
}
sd4 := &SchemaDefinition{
TableDefinitions: []*TableDefinition{
&TableDefinition{
Name: "table2",
Schema: "table2",
Type: TABLE_VIEW,
},
},
}
sd5 := &SchemaDefinition{
TableDefinitions: []*TableDefinition{
&TableDefinition{
Name: "table2",
Schema: "table2",
Type: TABLE_BASE_TABLE,
},
},
}
testDiff(t, sd1, sd1, "sd1", "sd2", []string{})
testDiff(t, sd2, sd2, "sd2", "sd2", []string{})
// two schemas are considered the same if both nil
testDiff(t, nil, nil, "sd1", "sd2", nil)
testDiff(t, sd1, nil, "sd1", "sd2", []string{
fmt.Sprintf("sd1 and sd2 are different, sd1: %v, sd2: null", sd1),
})
testDiff(t, sd1, sd3, "sd1", "sd3", []string{
"sd1 has an extra table named table1",
})
testDiff(t, sd3, sd1, "sd3", "sd1", []string{
"sd1 has an extra table named table1",
})
testDiff(t, sd2, sd4, "sd2", "sd4", []string{
"sd4 has an extra view named table2",
})
testDiff(t, sd4, sd2, "sd4", "sd2", []string{
"sd4 has an extra view named table2",
})
testDiff(t, sd4, sd5, "sd4", "sd5", []string{
fmt.Sprintf("sd4 and sd5 disagree on table type for table table2:\nVIEW\n differs from:\nBASE TABLE"),
})
sd1.DatabaseSchema = "CREATE DATABASE {{.DatabaseName}}"
sd2.DatabaseSchema = "DONT CREATE DATABASE {{.DatabaseName}}"
testDiff(t, sd1, sd2, "sd1", "sd2", []string{"sd1 and sd2 don't agree on database creation command:\nCREATE DATABASE {{.DatabaseName}}\n differs from:\nDONT CREATE DATABASE {{.DatabaseName}}", "sd1 has an extra table named table1", "sd1 has an extra table named table2"})

Просмотреть файл

@ -5,6 +5,9 @@
package schemamanager
import (
"encoding/json"
"fmt"
log "github.com/golang/glog"
mproto "github.com/youtube/vitess/go/mysql/proto"
)
@ -81,5 +84,9 @@ func Run(sourcer DataSourcer,
handler.OnValidationSuccess(sqls)
result := exec.Execute(sqls)
handler.OnExecutorComplete(result)
if result.ExecutorErr != "" || len(result.FailedShards) > 0 {
out, _ := json.MarshalIndent(result, "", " ")
return fmt.Errorf("Schema change failed, ExecuteResult: %v\n", string(out))
}
return nil
}

Просмотреть файл

@ -9,8 +9,10 @@ import (
"fmt"
"testing"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
"golang.org/x/net/context"
)
@ -60,7 +62,7 @@ func TestRunSchemaChangesExecutorOpenFail(t *testing.T) {
dataSourcer := newFakeDataSourcer([]string{"create table test_table (pk int);"}, false, false, false)
handler := newFakeHandler()
exec := NewTabletExecutor(
faketmclient.NewFakeTabletManagerClient(),
newFakeTabletManagerClient(),
newFakeTopo(),
"unknown_keyspace")
err := Run(dataSourcer, exec, handler)
@ -70,9 +72,29 @@ func TestRunSchemaChangesExecutorOpenFail(t *testing.T) {
}
func TestRunSchemaChanges(t *testing.T) {
dataSourcer := NewSimpleDataSourcer("create table test_table (pk int);")
sql := "create table test_table (pk int)"
dataSourcer := NewSimpleDataSourcer(sql)
handler := newFakeHandler()
exec := newFakeExecutor()
fakeTmc := newFakeTabletManagerClient()
fakeTmc.AddSchemaChange(sql, &proto.SchemaChangeResult{
BeforeSchema: &proto.SchemaDefinition{},
AfterSchema: &proto.SchemaDefinition{
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
TableDefinitions: []*proto.TableDefinition{
&proto.TableDefinition{
Name: "test_table",
Schema: sql,
Type: proto.TABLE_BASE_TABLE,
},
},
},
})
exec := NewTabletExecutor(
fakeTmc,
newFakeTopo(),
"test_keyspace")
err := Run(dataSourcer, exec, handler)
if err != nil {
t.Fatalf("schema change should success but get error: %v", err)
@ -96,11 +118,37 @@ func TestRunSchemaChanges(t *testing.T) {
func newFakeExecutor() *TabletExecutor {
return NewTabletExecutor(
faketmclient.NewFakeTabletManagerClient(),
newFakeTabletManagerClient(),
newFakeTopo(),
"test_keyspace")
}
func newFakeTabletManagerClient() *fakeTabletManagerClient {
return &fakeTabletManagerClient{
TabletManagerClient: faketmclient.NewFakeTabletManagerClient(),
preflightSchemas: make(map[string]*proto.SchemaChangeResult),
}
}
type fakeTabletManagerClient struct {
tmclient.TabletManagerClient
preflightSchemas map[string]*proto.SchemaChangeResult
}
func (client *fakeTabletManagerClient) AddSchemaChange(
sql string, schemaResult *proto.SchemaChangeResult) {
client.preflightSchemas[sql] = schemaResult
}
func (client *fakeTabletManagerClient) PreflightSchema(ctx context.Context, tablet *topo.TabletInfo, change string) (*proto.SchemaChangeResult, error) {
result, ok := client.preflightSchemas[change]
if !ok {
var scr proto.SchemaChangeResult
return &scr, nil
}
return result, nil
}
type fakeTopo struct{}
func newFakeTopo() *fakeTopo {

Просмотреть файл

@ -9,6 +9,7 @@ import (
"sync"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/sqlparser"
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
"github.com/youtube/vitess/go/vt/topo"
@ -21,6 +22,7 @@ type TabletExecutor struct {
tmClient tmclient.TabletManagerClient
topoServer topo.Server
tabletInfos []*topo.TabletInfo
schemaDiffs []*proto.SchemaChangeResult
isClosed bool
}
@ -72,13 +74,38 @@ func (exec *TabletExecutor) Validate(sqls []string) error {
if err != nil {
return err
}
if _, ok := stat.(*sqlparser.DDL); !ok {
_, ok := stat.(*sqlparser.DDL)
if !ok {
return fmt.Errorf("schema change works for DDLs only, but get non DDL statement: %s", sql)
}
}
return nil
}
func (exec *TabletExecutor) preflightSchemaChanges(sqls []string) error {
if len(exec.tabletInfos) == 0 {
return nil
}
exec.schemaDiffs = make([]*proto.SchemaChangeResult, len(sqls))
for i := range sqls {
schemaDiff, err := exec.tmClient.PreflightSchema(
context.Background(), exec.tabletInfos[0], sqls[i])
if err != nil {
return err
}
exec.schemaDiffs[i] = schemaDiff
diffs := proto.DiffSchemaToArray(
"BeforeSchema",
exec.schemaDiffs[i].BeforeSchema,
"AfterSchema",
exec.schemaDiffs[i].AfterSchema)
if len(diffs) == 0 {
return fmt.Errorf("Schema change: '%s' does not introduce any table definition change.", sqls[i])
}
}
return nil
}
// Execute applies schema changes
func (exec *TabletExecutor) Execute(sqls []string) *ExecuteResult {
execResult := ExecuteResult{}
@ -87,6 +114,13 @@ func (exec *TabletExecutor) Execute(sqls []string) *ExecuteResult {
execResult.ExecutorErr = "executor is closed"
return &execResult
}
// make sure every schema change introduces a table definition change
if err := exec.preflightSchemaChanges(sqls); err != nil {
execResult.ExecutorErr = err.Error()
return &execResult
}
for index, sql := range sqls {
execResult.CurSqlIndex = index
exec.executeOnAllTablets(&execResult, sql)

Просмотреть файл

@ -156,7 +156,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
if tablet.Type == topo.TYPE_MASTER {
typeForHealthCheck = topo.TYPE_MASTER
}
replicationDelay, err := agent.HealthReporter.Report(typeForHealthCheck, shouldQueryServiceBeRunning)
replicationDelay, err := agent.HealthReporter.Report(topo.IsSlaveType(typeForHealthCheck), shouldQueryServiceBeRunning)
health := make(map[string]string)
if err == nil {
if replicationDelay > *unhealthyThreshold {

Просмотреть файл

@ -103,7 +103,7 @@ type fakeHealthCheck struct {
reportError error
}
func (fhc *fakeHealthCheck) Report(tabletType topo.TabletType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error) {
func (fhc *fakeHealthCheck) Report(isSlaveType, shouldQueryServiceBeRunning bool) (replicationDelay time.Duration, err error) {
return fhc.reportReplicationDelay, fhc.reportError
}

Просмотреть файл

@ -179,7 +179,6 @@ class TestSchema(unittest.TestCase):
self._create_test_table_sql('vt_select_test01'),
self._create_test_table_sql('vt_select_test02'),
self._create_test_table_sql('vt_select_test03'),
self._alter_test_table_sql('vt_select_test03', 'msg'),
self._create_test_table_sql('vt_select_test04')])
tables = ','.join([
@ -203,5 +202,15 @@ class TestSchema(unittest.TestCase):
self.assertEqual(shard_0_schema, shard_1_schema)
self.assertEqual(shard_0_schema, shard_2_schema)
self._apply_schema(test_keyspace, self._alter_test_table_sql('vt_select_test03', 'msg'))
shard_0_schema = self._get_schema(shard_0_master.tablet_alias, tables)
shard_1_schema = self._get_schema(shard_1_master.tablet_alias, tables)
shard_2_schema = self._get_schema(shard_2_master.tablet_alias, tables)
# all shards should have the same schema
self.assertEqual(shard_0_schema, shard_1_schema)
self.assertEqual(shard_0_schema, shard_2_schema)
if __name__ == '__main__':
utils.main()