зеркало из https://github.com/github/vitess-gh.git
add schema diffs in schemamanager
1. Make DiffSchema compare table views. 2. Add schema diffs in schemamanager. Each schema changes have to change table structure and schemanager rejects a sql that does not change any table definition.
This commit is contained in:
Родитель
7bd078f8af
Коммит
0245dfb5ae
|
@ -189,6 +189,12 @@ func (sd *SchemaDefinition) ToSQLStrings() []string {
|
|||
// generates a report on what's different between two SchemaDefinition
|
||||
// for now, we skip the VIEW entirely.
|
||||
func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right *SchemaDefinition, er concurrency.ErrorRecorder) {
|
||||
if left == nil && right == nil {
|
||||
return
|
||||
}
|
||||
if left == nil || right == nil {
|
||||
er.RecordError(fmt.Errorf("%s and %s are different, %s: %v, %s: %v", leftName, rightName, leftName, rightName, left, right))
|
||||
}
|
||||
if left.DatabaseSchema != right.DatabaseSchema {
|
||||
er.RecordError(fmt.Errorf("%v and %v don't agree on database creation command:\n%v\n differs from:\n%v", leftName, rightName, left.DatabaseSchema, right.DatabaseSchema))
|
||||
}
|
||||
|
@ -196,16 +202,6 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
|
|||
leftIndex := 0
|
||||
rightIndex := 0
|
||||
for leftIndex < len(left.TableDefinitions) && rightIndex < len(right.TableDefinitions) {
|
||||
// skip views
|
||||
if left.TableDefinitions[leftIndex].Type == TABLE_VIEW {
|
||||
leftIndex++
|
||||
continue
|
||||
}
|
||||
if right.TableDefinitions[rightIndex].Type == TABLE_VIEW {
|
||||
rightIndex++
|
||||
continue
|
||||
}
|
||||
|
||||
// extra table on the left side
|
||||
if left.TableDefinitions[leftIndex].Name < right.TableDefinitions[rightIndex].Name {
|
||||
er.RecordError(fmt.Errorf("%v has an extra table named %v", leftName, left.TableDefinitions[leftIndex].Name))
|
||||
|
@ -224,6 +220,11 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
|
|||
if left.TableDefinitions[leftIndex].Schema != right.TableDefinitions[rightIndex].Schema {
|
||||
er.RecordError(fmt.Errorf("%v and %v disagree on schema for table %v:\n%v\n differs from:\n%v", leftName, rightName, left.TableDefinitions[leftIndex].Name, left.TableDefinitions[leftIndex].Schema, right.TableDefinitions[rightIndex].Schema))
|
||||
}
|
||||
|
||||
if left.TableDefinitions[leftIndex].Type != right.TableDefinitions[rightIndex].Type {
|
||||
er.RecordError(fmt.Errorf("%v and %v disagree on table type for table %v:\n%v\n differs from:\n%v", leftName, rightName, left.TableDefinitions[leftIndex].Name, left.TableDefinitions[leftIndex].Type, right.TableDefinitions[rightIndex].Type))
|
||||
}
|
||||
|
||||
leftIndex++
|
||||
rightIndex++
|
||||
}
|
||||
|
@ -232,12 +233,18 @@ func DiffSchema(leftName string, left *SchemaDefinition, rightName string, right
|
|||
if left.TableDefinitions[leftIndex].Type == TABLE_BASE_TABLE {
|
||||
er.RecordError(fmt.Errorf("%v has an extra table named %v", leftName, left.TableDefinitions[leftIndex].Name))
|
||||
}
|
||||
if left.TableDefinitions[leftIndex].Type == TABLE_VIEW {
|
||||
er.RecordError(fmt.Errorf("%v has an extra view named %v", leftName, left.TableDefinitions[leftIndex].Name))
|
||||
}
|
||||
leftIndex++
|
||||
}
|
||||
for rightIndex < len(right.TableDefinitions) {
|
||||
if right.TableDefinitions[rightIndex].Type == TABLE_BASE_TABLE {
|
||||
er.RecordError(fmt.Errorf("%v has an extra table named %v", rightName, right.TableDefinitions[rightIndex].Name))
|
||||
}
|
||||
if right.TableDefinitions[rightIndex].Type == TABLE_VIEW {
|
||||
er.RecordError(fmt.Errorf("%v has an extra view named %v", rightName, right.TableDefinitions[rightIndex].Name))
|
||||
}
|
||||
rightIndex++
|
||||
}
|
||||
}
|
||||
|
|
|
@ -9,8 +9,10 @@ import (
|
|||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/faketmclient"
|
||||
_ "github.com/youtube/vitess/go/vt/tabletmanager/gorpctmclient"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -60,7 +62,7 @@ func TestRunSchemaChangesExecutorOpenFail(t *testing.T) {
|
|||
dataSourcer := newFakeDataSourcer([]string{"create table test_table (pk int);"}, false, false, false)
|
||||
handler := newFakeHandler()
|
||||
exec := NewTabletExecutor(
|
||||
faketmclient.NewFakeTabletManagerClient(),
|
||||
newFakeTabletManagerClient(),
|
||||
newFakeTopo(),
|
||||
"unknown_keyspace")
|
||||
err := Run(dataSourcer, exec, handler)
|
||||
|
@ -70,9 +72,29 @@ func TestRunSchemaChangesExecutorOpenFail(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestRunSchemaChanges(t *testing.T) {
|
||||
dataSourcer := NewSimpleDataSourcer("create table test_table (pk int);")
|
||||
sql := "create table test_table (pk int)"
|
||||
dataSourcer := NewSimpleDataSourcer(sql)
|
||||
handler := newFakeHandler()
|
||||
exec := newFakeExecutor()
|
||||
fakeTmc := newFakeTabletManagerClient()
|
||||
fakeTmc.AddSchemaChange(sql, &proto.SchemaChangeResult{
|
||||
BeforeSchema: &proto.SchemaDefinition{},
|
||||
AfterSchema: &proto.SchemaDefinition{
|
||||
DatabaseSchema: "CREATE DATABASE `{{.DatabaseName}}` /*!40100 DEFAULT CHARACTER SET utf8 */",
|
||||
TableDefinitions: []*proto.TableDefinition{
|
||||
&proto.TableDefinition{
|
||||
Name: "test_table",
|
||||
Schema: sql,
|
||||
Type: proto.TABLE_BASE_TABLE,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
exec := NewTabletExecutor(
|
||||
fakeTmc,
|
||||
newFakeTopo(),
|
||||
"test_keyspace")
|
||||
|
||||
err := Run(dataSourcer, exec, handler)
|
||||
if err != nil {
|
||||
t.Fatalf("schema change should success but get error: %v", err)
|
||||
|
@ -96,11 +118,37 @@ func TestRunSchemaChanges(t *testing.T) {
|
|||
|
||||
func newFakeExecutor() *TabletExecutor {
|
||||
return NewTabletExecutor(
|
||||
faketmclient.NewFakeTabletManagerClient(),
|
||||
newFakeTabletManagerClient(),
|
||||
newFakeTopo(),
|
||||
"test_keyspace")
|
||||
}
|
||||
|
||||
func newFakeTabletManagerClient() *fakeTabletManagerClient {
|
||||
return &fakeTabletManagerClient{
|
||||
TabletManagerClient: faketmclient.NewFakeTabletManagerClient(),
|
||||
preflightSchemas: make(map[string]*proto.SchemaChangeResult),
|
||||
}
|
||||
}
|
||||
|
||||
type fakeTabletManagerClient struct {
|
||||
tmclient.TabletManagerClient
|
||||
preflightSchemas map[string]*proto.SchemaChangeResult
|
||||
}
|
||||
|
||||
func (client *fakeTabletManagerClient) AddSchemaChange(
|
||||
sql string, schemaResult *proto.SchemaChangeResult) {
|
||||
client.preflightSchemas[sql] = schemaResult
|
||||
}
|
||||
|
||||
func (client *fakeTabletManagerClient) PreflightSchema(ctx context.Context, tablet *topo.TabletInfo, change string) (*proto.SchemaChangeResult, error) {
|
||||
result, ok := client.preflightSchemas[change]
|
||||
if !ok {
|
||||
var scr proto.SchemaChangeResult
|
||||
return &scr, nil
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
type fakeTopo struct{}
|
||||
|
||||
func newFakeTopo() *fakeTopo {
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"sync"
|
||||
|
||||
log "github.com/golang/glog"
|
||||
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
|
||||
"github.com/youtube/vitess/go/vt/sqlparser"
|
||||
"github.com/youtube/vitess/go/vt/tabletmanager/tmclient"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
|
@ -21,6 +22,7 @@ type TabletExecutor struct {
|
|||
tmClient tmclient.TabletManagerClient
|
||||
topoServer topo.Server
|
||||
tabletInfos []*topo.TabletInfo
|
||||
schemaDiffs []*proto.SchemaChangeResult
|
||||
isClosed bool
|
||||
}
|
||||
|
||||
|
@ -72,13 +74,38 @@ func (exec *TabletExecutor) Validate(sqls []string) error {
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, ok := stat.(*sqlparser.DDL); !ok {
|
||||
_, ok := stat.(*sqlparser.DDL)
|
||||
if !ok {
|
||||
return fmt.Errorf("schema change works for DDLs only, but get non DDL statement: %s", sql)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (exec *TabletExecutor) preflightSchemaChanges(sqls []string) error {
|
||||
if len(exec.tabletInfos) == 0 {
|
||||
return nil
|
||||
}
|
||||
exec.schemaDiffs = make([]*proto.SchemaChangeResult, len(sqls))
|
||||
for i := range sqls {
|
||||
schemaDiff, err := exec.tmClient.PreflightSchema(
|
||||
context.Background(), exec.tabletInfos[0], sqls[i])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
exec.schemaDiffs[i] = schemaDiff
|
||||
diffs := proto.DiffSchemaToArray(
|
||||
"BeforeSchema",
|
||||
exec.schemaDiffs[i].BeforeSchema,
|
||||
"AfterSchema",
|
||||
exec.schemaDiffs[i].AfterSchema)
|
||||
if len(diffs) == 0 {
|
||||
return fmt.Errorf("Schema change: '%s' does not introduce any table definition change.", sqls[i])
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Execute applies schema changes
|
||||
func (exec *TabletExecutor) Execute(sqls []string) *ExecuteResult {
|
||||
execResult := ExecuteResult{}
|
||||
|
@ -87,6 +114,13 @@ func (exec *TabletExecutor) Execute(sqls []string) *ExecuteResult {
|
|||
execResult.ExecutorErr = "executor is closed"
|
||||
return &execResult
|
||||
}
|
||||
|
||||
// make sure every schema change introduces a table definition change
|
||||
if err := exec.preflightSchemaChanges(sqls); err != nil {
|
||||
execResult.ExecutorErr = err.Error()
|
||||
return &execResult
|
||||
}
|
||||
|
||||
for index, sql := range sqls {
|
||||
execResult.CurSqlIndex = index
|
||||
exec.executeOnAllTablets(&execResult, sql)
|
||||
|
|
|
@ -179,7 +179,6 @@ class TestSchema(unittest.TestCase):
|
|||
self._create_test_table_sql('vt_select_test01'),
|
||||
self._create_test_table_sql('vt_select_test02'),
|
||||
self._create_test_table_sql('vt_select_test03'),
|
||||
self._alter_test_table_sql('vt_select_test03', 'msg'),
|
||||
self._create_test_table_sql('vt_select_test04')])
|
||||
|
||||
tables = ','.join([
|
||||
|
@ -203,5 +202,15 @@ class TestSchema(unittest.TestCase):
|
|||
self.assertEqual(shard_0_schema, shard_1_schema)
|
||||
self.assertEqual(shard_0_schema, shard_2_schema)
|
||||
|
||||
self._apply_schema(test_keyspace, self._alter_test_table_sql('vt_select_test03', 'msg'))
|
||||
|
||||
shard_0_schema = self._get_schema(shard_0_master.tablet_alias, tables)
|
||||
shard_1_schema = self._get_schema(shard_1_master.tablet_alias, tables)
|
||||
shard_2_schema = self._get_schema(shard_2_master.tablet_alias, tables)
|
||||
|
||||
# all shards should have the same schema
|
||||
self.assertEqual(shard_0_schema, shard_1_schema)
|
||||
self.assertEqual(shard_0_schema, shard_2_schema)
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.main()
|
||||
|
|
Загрузка…
Ссылка в новой задаче