vrepl: CreateLookupVindex tweaks

Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Sugu Sougoumarane 2019-12-14 18:43:54 -08:00
Родитель bfb6a42028
Коммит f87c943ea6
2 изменённых файлов: 28 добавлений и 18 удалений

2
go.mod
Просмотреть файл

@ -20,7 +20,7 @@ require (
github.com/evanphx/json-patch v4.5.0+incompatible
github.com/go-critic/go-critic v0.4.0 // indirect
github.com/go-ini/ini v1.12.0 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/gogo/protobuf v1.3.1
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1

Просмотреть файл

@ -22,6 +22,7 @@ import (
"sync"
"text/template"
"github.com/gogo/protobuf/proto"
"golang.org/x/net/context"
"vitess.io/vitess/go/json2"
@ -116,6 +117,7 @@ func (wr *Wrangler) Migrate(ctx context.Context, workflow, sourceKeyspace, targe
// CreateLookupVindex creates a lookup vindex and sets up the backfill.
func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cell, tabletTypes string) error {
// Important variables are pulled out here.
var (
// vschemas
sourceVSchema *vschemapb.Keyspace
@ -130,9 +132,12 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
vindexToCol string
// source table info
sourceTableName string
sourceTable *vschemapb.Table
sourceVSchemaTable *vschemapb.Table
sourceTableName string
// sourceTable is the supplied table info
sourceTable *vschemapb.Table
// sourceVSchemaTable is the table info present in the vschema
sourceVSchemaTable *vschemapb.Table
// sourceVindexColumns are computed from the input sourceTable
sourceVindexColumns []string
// target table info
@ -176,6 +181,10 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
}
}
vindexToCol = vindex.Params["to"]
// Make the vindex write_only. If one exists already in the vschema,
// it will need to match this vindex exactly, including the write_only setting.
vindex.Params["write_only"] = "true"
// See if we can create the vindex without errors.
if _, err := vindexes.CreateVindex(vindex.Type, vindexName, vindex.Params); err != nil {
return err
}
@ -192,11 +201,11 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
sourceTableName = k
sourceTable = ti
}
// Validate input table and vindex consistency
if sourceTable.ColumnVindexes[0].Name != vindexName {
return fmt.Errorf("ColumnVindex name must match vindex name: %s vs %s", sourceTable.ColumnVindexes[0].Name, vindexName)
}
// Validate input table and vindex consistency
if vindex.Owner != "" && vindex.Owner != sourceTableName {
return fmt.Errorf("vindex owner must match table name: %v vs %v", vindex.Owner, sourceTableName)
}
@ -220,8 +229,10 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
if sourceVSchema.Vindexes == nil {
sourceVSchema.Vindexes = make(map[string]*vschemapb.Vindex)
}
if _, ok := sourceVSchema.Vindexes[vindexName]; ok {
return fmt.Errorf("vindex %s already exists in vschema, please delete and try again", vindexName)
if existing, ok := sourceVSchema.Vindexes[vindexName]; ok {
if !proto.Equal(existing, vindex) {
return fmt.Errorf("a conflicting vindex named %s already exists in the source vschema", vindexName)
}
}
sourceVSchemaTable = sourceVSchema.Tables[sourceTableName]
if sourceVSchemaTable == nil {
@ -254,7 +265,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
return fmt.Errorf("unexpected number of tables returned from schema: %v", tableSchema.TableDefinitions)
}
// Generate "create table statement"
// Generate "create table" statement
lines := strings.Split(tableSchema.TableDefinitions[0].Schema, "\n")
if len(lines) < 3 {
return fmt.Errorf("schema looks incorrect: %s, expecting at least four lines", tableSchema.TableDefinitions[0].Schema)
@ -306,29 +317,29 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
}
if targetVSchema.Sharded {
// Choose a primary vindex type for target table based on source specs
found := false
var targetVindex *vschemapb.Vindex
for _, field := range tableSchema.TableDefinitions[0].Fields {
if sourceVindexColumns[0] == field.Name {
targetVindexType, err = vindexes.ChooseVindexForType(field.Type)
if err != nil {
return err
}
found = true
targetVindex = &vschemapb.Vindex{
Type: targetVindexType,
}
break
}
}
if !found {
if targetVindex == nil {
// Unreachable. We validated column names when generating the DDL.
return fmt.Errorf("column %s not found in schema %v", sourceVindexColumns[0], tableSchema.TableDefinitions[0])
}
if existing, ok := targetVSchema.Vindexes[targetVindexType]; ok {
if existing.Type != targetVindexType {
if !proto.Equal(existing, targetVindex) {
return fmt.Errorf("a conflicting vindex named %s already exists in the target vschema", targetVindexType)
}
} else {
targetVSchema.Vindexes[targetVindexType] = &vschemapb.Vindex{
Type: targetVindexType,
}
targetVSchema.Vindexes[targetVindexType] = targetVindex
}
if _, ok := targetVSchema.Tables[targetTableName]; ok {
@ -365,7 +376,6 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
}
// Create source Vindex
vindex.Params["write_only"] = "true"
sourceVSchema.Vindexes[vindexName] = vindex
// Update source table
@ -374,7 +384,7 @@ func (wr *Wrangler) CreateLookupVindex(ctx context.Context, keyspace, specs, cel
return err
}
return nil
return wr.ts.RebuildSrvVSchema(ctx, nil)
}
func generateColDef(lines []string, sourceVindexCol, vindexFromCol string) (string, error) {