added schema change detection and schema copy to health streamer

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
This commit is contained in:
Harshit Gangal 2021-04-30 19:01:01 +05:30
Родитель 722832042b
Коммит 9e1ff351b8
3 изменённых файлов: 156 добавлений и 64 удалений

Просмотреть файл

@ -28,66 +28,9 @@ import (
var ctx = context.Background()
const (
createSchemaCopyTable = `
CREATE TABLE _vt.schemacopy (
table_schema varchar(64) NOT NULL,
table_name varchar(64) NOT NULL,
column_name varchar(64) NOT NULL,
ordinal_position bigint(21) unsigned NOT NULL,
character_set_name varchar(32) DEFAULT NULL,
collation_name varchar(32) DEFAULT NULL,
column_type longtext NOT NULL,
column_key varchar(3) NOT NULL,
PRIMARY KEY (table_schema, table_name, ordinal_position)
)`
createDb = `create database if not exists _vt`
insertIntoSchemaCopy = `insert _vt.schemacopy
select table_schema, table_name, column_name, ordinal_position, character_set_name, collation_name, column_type, column_key
from information_schema.columns
where table_schema = "vttest"`
cleanSchemaCopy = `delete from _vt.schemacopy`
dropTestTable = `drop table if exists product`
createDb = `create database if not exists _vt`
createUserTable = `create table vttest.product (id bigint(20) primary key, name char(10) CHARACTER SET utf8 COLLATE utf8_unicode_ci, created bigint(20))`
detectNewColumns = `
select 1
from information_schema.columns as ISC
left join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where ISC.table_schema = "vttest" AND c.table_schema is null
`
detectChangeColumns = `
select 1
from information_schema.columns as ISC
join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where ISC.table_schema = "vttest"
AND (not(c.column_name <=> ISC.column_name)
OR not(ISC.character_set_name <=> c.character_set_name)
OR not(ISC.collation_name <=> c.collation_name)
OR not(ISC.column_type <=> c.column_type)
OR not(ISC.column_key <=> c.column_key))
`
detectRemoveColumns = `
select 1
from information_schema.columns as ISC
right join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where c.table_schema = "vttest" AND ISC.table_schema is null
`
detectChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns
dropTestTable = `drop table if exists product`
)
func TestChangeSchemaIsNoticed(t *testing.T) {
@ -97,7 +40,7 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
_, err = conn.ExecuteFetch(createDb, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(createSchemaCopyTable, 1000, true)
_, err = conn.ExecuteFetch(mysql.CreateSchemaCopyTable, 1000, true)
require.NoError(t, err)
tests := []struct {
@ -141,18 +84,18 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// reset schemacopy
_, err := conn.ExecuteFetch(cleanSchemaCopy, 1000, true)
_, err := conn.ExecuteFetch(mysql.ClearSchemaCopy, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(dropTestTable, 1000, true)
require.NoError(t, err)
_, err = conn.ExecuteFetch(createUserTable, 1000, true)
require.NoError(t, err)
rs, err := conn.ExecuteFetch(insertIntoSchemaCopy, 1000, true)
rs, err := conn.ExecuteFetch(mysql.InsertIntoSchemaCopy, 1000, true)
require.NoError(t, err)
require.NotZero(t, rs.RowsAffected)
// make sure no changes are detected
rs, err = conn.ExecuteFetch(detectChange, 1000, true)
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.Empty(t, rs.Rows)
@ -161,7 +104,7 @@ func TestChangeSchemaIsNoticed(t *testing.T) {
require.NoError(t, err)
// make sure the change is detected
rs, err = conn.ExecuteFetch(detectChange, 1000, true)
rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true)
require.NoError(t, err)
require.NotEmpty(t, rs.Rows)
})

Просмотреть файл

@ -34,6 +34,63 @@ const (
BaseShowPrimary = "SELECT table_name, column_name FROM information_schema.key_column_usage WHERE table_schema=database() AND constraint_name='PRIMARY' ORDER BY table_name, ordinal_position"
// ShowRowsRead is the query used to find the number of rows read.
ShowRowsRead = "show status like 'Innodb_rows_read'"
// CreateSchemaCopyTable query creates schemacopy table in _vt schema.
CreateSchemaCopyTable = `
CREATE TABLE if not exists _vt.schemacopy (
table_schema varchar(64) NOT NULL,
table_name varchar(64) NOT NULL,
column_name varchar(64) NOT NULL,
ordinal_position bigint(21) unsigned NOT NULL,
character_set_name varchar(32) DEFAULT NULL,
collation_name varchar(32) DEFAULT NULL,
column_type longtext NOT NULL,
column_key varchar(3) NOT NULL,
PRIMARY KEY (table_schema, table_name, ordinal_position))`
detectNewColumns = `
select 1
from information_schema.columns as ISC
left join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where ISC.table_schema = database() AND c.table_schema is null`
detectChangeColumns = `
select 1
from information_schema.columns as ISC
join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where ISC.table_schema = database()
AND (not(c.column_name <=> ISC.column_name)
OR not(ISC.character_set_name <=> c.character_set_name)
OR not(ISC.collation_name <=> c.collation_name)
OR not(ISC.column_type <=> c.column_type)
OR not(ISC.column_key <=> c.column_key))`
detectRemoveColumns = `
select 1
from information_schema.columns as ISC
right join _vt.schemacopy as c on
ISC.table_name = c.table_name and
ISC.table_schema=c.table_schema and
ISC.ordinal_position = c.ordinal_position
where c.table_schema = database() AND ISC.table_schema is null`
// DetectSchemaChange query detects if there is any schema change from previous copy.
DetectSchemaChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns
// ClearSchemaCopy query clears the schemacopy table.
ClearSchemaCopy = `delete from _vt.schemacopy`
// InsertIntoSchemaCopy query copies over the schema information from information_schema.columns table.
InsertIntoSchemaCopy = `insert _vt.schemacopy
select table_schema, table_name, column_name, ordinal_position, character_set_name, collation_name, column_type, column_key
from information_schema.columns
where table_schema = database()`
)
// BaseShowTablesFields contains the fields returned by a BaseShowTables or a BaseShowTablesForTable command.

Просмотреть файл

@ -23,6 +23,10 @@ import (
"sync"
"time"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"context"
"github.com/golang/protobuf/proto"
@ -61,9 +65,14 @@ type healthStreamer struct {
state *querypb.StreamHealthResponse
history *history.History
ticks *timer.Timer
conns *connpool.Pool
initSuccess bool
}
func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthStreamer {
reloadTime := env.Config().SchemaReloadIntervalSeconds.Get()
return &healthStreamer{
stats: env.Stats(),
degradedThreshold: env.Config().Healthcheck.DegradedThresholdSeconds.Get(),
@ -79,6 +88,13 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS
},
history: history.New(5),
ticks: timer.NewTimer(reloadTime),
// We need one connection for the reloader.
conns: connpool.NewPool(env, "", tabletenv.ConnPoolConfig{
Size: 1,
IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds,
}),
}
}
@ -97,6 +113,12 @@ func (hs *healthStreamer) Open() {
return
}
hs.ctx, hs.cancel = context.WithCancel(context.TODO())
hs.ticks.Start(func() {
if err := hs.Reload(); err != nil {
log.Errorf("periodic schema reload failed in health stream: %v", err)
}
})
}
func (hs *healthStreamer) Close() {
@ -255,3 +277,73 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) {
}
}
}
func (hs *healthStreamer) Reload() error {
hs.mu.Lock()
defer hs.mu.Unlock()
// Schema Reload to happen only on master.
if hs.state.Target.TabletType != topodatapb.TabletType_MASTER {
return nil
}
ctx := hs.ctx
conn, err := hs.conns.Get(ctx)
if err != nil {
return err
}
defer conn.Recycle()
if !hs.initSuccess {
hs.initSuccess, err = hs.InitSchemaLocked(conn)
if err != nil {
return err
}
}
qr, err := conn.Exec(ctx, mysql.DetectSchemaChange, 5, false)
if err != nil {
return err
}
// If no change detected, then return
if len(qr.Rows) == 0 {
return nil
}
log.Info("schema change detected")
// TODO: add logic to notify vtgate
// Reload the schema in a transaction.
_, err = conn.Exec(ctx, "begin", 1, false)
if err != nil {
return err
}
defer conn.Exec(ctx, "rollback", 1, false)
_, err = conn.Exec(ctx, mysql.ClearSchemaCopy, 1, false)
if err != nil {
return err
}
_, err = conn.Exec(ctx, mysql.InsertIntoSchemaCopy, 1, false)
if err != nil {
return err
}
_, err = conn.Exec(ctx, "commit", 1, false)
if err != nil {
return err
}
return nil
}
func (hs *healthStreamer) InitSchemaLocked(conn *connpool.DBConn) (bool, error) {
_, err := conn.Exec(hs.ctx, mysql.CreateSchemaCopyTable, 1, false)
if err != nil {
return false, err
}
return true, nil
}