Addition of a LoadKeyspace method to the schema tracker

Co-authored-by: Andres Taylor <andres@planetscale.com>
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
This commit is contained in:
Florent Poinsard 2021-05-25 17:52:14 +02:00
Родитель 92d8635a0c
Коммит 09481f2880
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 87A9DEBFB0824A2D
4 изменённых файлов: 57 добавлений и 6 удалений

Просмотреть файл

@ -100,7 +100,13 @@ where table_schema = database()`
from _vt.schemacopy
where table_schema = database() and
table_name in :tableNames
order by ordinal_position`
order by table_name, ordinal_position`
// FetchTables queries fetches all information about tables
FetchTables = `select table_name, column_name, data_type
from _vt.schemacopy
where table_schema = database()
order by table_name, ordinal_position`
)
// VTDatabaseInit contains all the schema creation queries needed to

Просмотреть файл

@ -20,6 +20,8 @@ import (
"context"
"sync"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
@ -57,6 +59,16 @@ func NewTracker(ch chan *discovery.TabletHealth) *Tracker {
return &Tracker{ch: ch, tables: &tableMap{m: map[keyspace]map[tableName][]vindexes.Column{}}}
}
// LoadKeyspace loads the keyspace schema.
func (t *Tracker) LoadKeyspace(conn queryservice.QueryService, target *querypb.Target) error {
res, err := conn.Execute(context.Background(), target, mysql.FetchTables, nil, 0, 0, nil)
if err != nil {
return err
}
t.updateTables(target.Keyspace, res)
return nil
}
// Start starts the schema tracking.
func (t *Tracker) Start() {
log.Info("Starting schema tracking")
@ -129,7 +141,10 @@ func (t *Tracker) updateSchema(th *discovery.TabletHealth) {
for _, tbl := range th.TablesUpdated {
t.tables.delete(th.Target.Keyspace, tbl)
}
t.updateTables(th.Target.Keyspace, res)
}
func (t *Tracker) updateTables(keyspace string, res *sqltypes.Result) {
for _, row := range res.Rows {
tbl := row[0].ToString()
colName := row[1].ToString()
@ -137,9 +152,9 @@ func (t *Tracker) updateSchema(th *discovery.TabletHealth) {
cType := sqlparser.ColumnType{Type: colType}
col := vindexes.Column{Name: sqlparser.NewColIdent(colName), Type: cType.SQLType()}
cols := t.tables.get(th.Target.Keyspace, tbl)
cols := t.tables.get(keyspace, tbl)
t.tables.set(th.Target.Keyspace, tbl, append(cols, col))
t.tables.set(keyspace, tbl, append(cols, col))
}
}

Просмотреть файл

@ -48,10 +48,17 @@ func TestTracking(t *testing.T) {
sbc := sandboxconn.NewSandboxConn(tablet)
ch := make(chan *discovery.TabletHealth)
tracker := NewTracker(ch)
fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar")
sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
fields,
"prior|id|int",
)})
err := tracker.LoadKeyspace(sbc, target)
require.NoError(t, err)
tracker.Start()
defer tracker.Stop()
fields := sqltypes.MakeTestFields("table_name|col_name|col_type", "varchar|varchar|varchar")
testcases := []struct {
tName string
result *sqltypes.Result
@ -72,16 +79,18 @@ func TestTracking(t *testing.T) {
{Name: sqlparser.NewColIdent("name"), Type: querypb.Type_VARCHAR}},
"t2": {
{Name: sqlparser.NewColIdent("id"), Type: querypb.Type_VARCHAR}},
"prior": {
{Name: sqlparser.NewColIdent("id"), Type: querypb.Type_INT32}},
},
}, {
tName: "delete t1, updated t2 and new t3",
tName: "delete t1 and prior, updated t2 and new t3",
result: sqltypes.MakeTestResult(
fields,
"t2|id|varchar",
"t2|name|varchar",
"t3|id|datetime",
),
updTbl: []string{"t1", "t2", "t3"},
updTbl: []string{"prior", "t1", "t2", "t3"},
exp: map[string][]vindexes.Column{
"t2": {
{Name: sqlparser.NewColIdent("id"), Type: querypb.Type_VARCHAR},

Просмотреть файл

@ -26,6 +26,8 @@ import (
"strings"
"time"
"vitess.io/vitess/go/vt/key"
"context"
"vitess.io/vitess/go/acl"
@ -202,6 +204,7 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa
MaxMemoryUsage: *queryPlanCacheMemory,
LFU: *queryPlanCacheLFU,
}
addKeyspaceToTracker(ctx, srvResolver, st, gw)
executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, st)
@ -265,6 +268,24 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa
return rpcVTGate
}
func addKeyspaceToTracker(ctx context.Context, srvResolver *srvtopo.Resolver, st *vtschema.Tracker, gw *TabletGateway) {
keyspaces, err := srvResolver.GetAllKeyspaces(ctx)
if err != nil {
log.Warningf("Unable to get all keyspaces: %v", err)
return
}
for _, keyspace := range keyspaces {
dest, err := srvResolver.ResolveDestination(ctx, keyspace, topodatapb.TabletType_MASTER, key.DestinationAnyShard{})
if err != nil {
log.Warningf("Unable to resolve destination: %v", err)
}
err = st.LoadKeyspace(gw, dest[0].Target)
if err != nil {
log.Warningf("Unable to add keyspace to tracker: %v", err)
}
}
}
func (vtg *VTGate) registerDebugHealthHandler() {
http.HandleFunc("/debug/health", func(w http.ResponseWriter, r *http.Request) {
if err := acl.CheckAccessHTTP(r, acl.MONITORING); err != nil {