From 9e1ff351b878a356faea96164a6515d8ab97b47f Mon Sep 17 00:00:00 2001 From: Harshit Gangal Date: Fri, 30 Apr 2021 19:01:01 +0530 Subject: [PATCH] added schema change detection and schema copy to health streamer Signed-off-by: Harshit Gangal --- go/mysql/endtoend/schema_change_test.go | 71 ++------------ go/mysql/schema.go | 57 ++++++++++++ .../vttablet/tabletserver/health_streamer.go | 92 +++++++++++++++++++ 3 files changed, 156 insertions(+), 64 deletions(-) diff --git a/go/mysql/endtoend/schema_change_test.go b/go/mysql/endtoend/schema_change_test.go index 8ee2122bae..e4db4de6a3 100644 --- a/go/mysql/endtoend/schema_change_test.go +++ b/go/mysql/endtoend/schema_change_test.go @@ -28,66 +28,9 @@ import ( var ctx = context.Background() const ( - createSchemaCopyTable = ` -CREATE TABLE _vt.schemacopy ( - table_schema varchar(64) NOT NULL, - table_name varchar(64) NOT NULL, - column_name varchar(64) NOT NULL, - ordinal_position bigint(21) unsigned NOT NULL, - character_set_name varchar(32) DEFAULT NULL, - collation_name varchar(32) DEFAULT NULL, - column_type longtext NOT NULL, - column_key varchar(3) NOT NULL, - PRIMARY KEY (table_schema, table_name, ordinal_position) -)` - createDb = `create database if not exists _vt` - - insertIntoSchemaCopy = `insert _vt.schemacopy -select table_schema, table_name, column_name, ordinal_position, character_set_name, collation_name, column_type, column_key -from information_schema.columns -where table_schema = "vttest"` - - cleanSchemaCopy = `delete from _vt.schemacopy` - dropTestTable = `drop table if exists product` - + createDb = `create database if not exists _vt` createUserTable = `create table vttest.product (id bigint(20) primary key, name char(10) CHARACTER SET utf8 COLLATE utf8_unicode_ci, created bigint(20))` - - detectNewColumns = ` -select 1 -from information_schema.columns as ISC - left join _vt.schemacopy as c on - ISC.table_name = c.table_name and - ISC.table_schema=c.table_schema and - ISC.ordinal_position = c.ordinal_position -where ISC.table_schema = "vttest" AND c.table_schema is null -` - - detectChangeColumns = ` -select 1 -from information_schema.columns as ISC - join _vt.schemacopy as c on - ISC.table_name = c.table_name and - ISC.table_schema=c.table_schema and - ISC.ordinal_position = c.ordinal_position -where ISC.table_schema = "vttest" - AND (not(c.column_name <=> ISC.column_name) - OR not(ISC.character_set_name <=> c.character_set_name) - OR not(ISC.collation_name <=> c.collation_name) - OR not(ISC.column_type <=> c.column_type) - OR not(ISC.column_key <=> c.column_key)) -` - - detectRemoveColumns = ` -select 1 -from information_schema.columns as ISC - right join _vt.schemacopy as c on - ISC.table_name = c.table_name and - ISC.table_schema=c.table_schema and - ISC.ordinal_position = c.ordinal_position -where c.table_schema = "vttest" AND ISC.table_schema is null -` - - detectChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns + dropTestTable = `drop table if exists product` ) func TestChangeSchemaIsNoticed(t *testing.T) { @@ -97,7 +40,7 @@ func TestChangeSchemaIsNoticed(t *testing.T) { _, err = conn.ExecuteFetch(createDb, 1000, true) require.NoError(t, err) - _, err = conn.ExecuteFetch(createSchemaCopyTable, 1000, true) + _, err = conn.ExecuteFetch(mysql.CreateSchemaCopyTable, 1000, true) require.NoError(t, err) tests := []struct { @@ -141,18 +84,18 @@ func TestChangeSchemaIsNoticed(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { // reset schemacopy - _, err := conn.ExecuteFetch(cleanSchemaCopy, 1000, true) + _, err := conn.ExecuteFetch(mysql.ClearSchemaCopy, 1000, true) require.NoError(t, err) _, err = conn.ExecuteFetch(dropTestTable, 1000, true) require.NoError(t, err) _, err = conn.ExecuteFetch(createUserTable, 1000, true) require.NoError(t, err) - rs, err := conn.ExecuteFetch(insertIntoSchemaCopy, 1000, true) + rs, err := conn.ExecuteFetch(mysql.InsertIntoSchemaCopy, 1000, true) require.NoError(t, err) require.NotZero(t, rs.RowsAffected) // make sure no changes are detected - rs, err = conn.ExecuteFetch(detectChange, 1000, true) + rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true) require.NoError(t, err) require.Empty(t, rs.Rows) @@ -161,7 +104,7 @@ func TestChangeSchemaIsNoticed(t *testing.T) { require.NoError(t, err) // make sure the change is detected - rs, err = conn.ExecuteFetch(detectChange, 1000, true) + rs, err = conn.ExecuteFetch(mysql.DetectSchemaChange, 1000, true) require.NoError(t, err) require.NotEmpty(t, rs.Rows) }) diff --git a/go/mysql/schema.go b/go/mysql/schema.go index 0184142973..cd739605b5 100644 --- a/go/mysql/schema.go +++ b/go/mysql/schema.go @@ -34,6 +34,63 @@ const ( BaseShowPrimary = "SELECT table_name, column_name FROM information_schema.key_column_usage WHERE table_schema=database() AND constraint_name='PRIMARY' ORDER BY table_name, ordinal_position" // ShowRowsRead is the query used to find the number of rows read. ShowRowsRead = "show status like 'Innodb_rows_read'" + + // CreateSchemaCopyTable query creates schemacopy table in _vt schema. + CreateSchemaCopyTable = ` +CREATE TABLE if not exists _vt.schemacopy ( + table_schema varchar(64) NOT NULL, + table_name varchar(64) NOT NULL, + column_name varchar(64) NOT NULL, + ordinal_position bigint(21) unsigned NOT NULL, + character_set_name varchar(32) DEFAULT NULL, + collation_name varchar(32) DEFAULT NULL, + column_type longtext NOT NULL, + column_key varchar(3) NOT NULL, + PRIMARY KEY (table_schema, table_name, ordinal_position))` + + detectNewColumns = ` +select 1 +from information_schema.columns as ISC + left join _vt.schemacopy as c on + ISC.table_name = c.table_name and + ISC.table_schema=c.table_schema and + ISC.ordinal_position = c.ordinal_position +where ISC.table_schema = database() AND c.table_schema is null` + + detectChangeColumns = ` +select 1 +from information_schema.columns as ISC + join _vt.schemacopy as c on + ISC.table_name = c.table_name and + ISC.table_schema=c.table_schema and + ISC.ordinal_position = c.ordinal_position +where ISC.table_schema = database() + AND (not(c.column_name <=> ISC.column_name) + OR not(ISC.character_set_name <=> c.character_set_name) + OR not(ISC.collation_name <=> c.collation_name) + OR not(ISC.column_type <=> c.column_type) + OR not(ISC.column_key <=> c.column_key))` + + detectRemoveColumns = ` +select 1 +from information_schema.columns as ISC + right join _vt.schemacopy as c on + ISC.table_name = c.table_name and + ISC.table_schema=c.table_schema and + ISC.ordinal_position = c.ordinal_position +where c.table_schema = database() AND ISC.table_schema is null` + + // DetectSchemaChange query detects if there is any schema change from previous copy. + DetectSchemaChange = detectChangeColumns + " UNION " + detectNewColumns + " UNION " + detectRemoveColumns + + // ClearSchemaCopy query clears the schemacopy table. + ClearSchemaCopy = `delete from _vt.schemacopy` + + // InsertIntoSchemaCopy query copies over the schema information from information_schema.columns table. + InsertIntoSchemaCopy = `insert _vt.schemacopy +select table_schema, table_name, column_name, ordinal_position, character_set_name, collation_name, column_type, column_key +from information_schema.columns +where table_schema = database()` ) // BaseShowTablesFields contains the fields returned by a BaseShowTables or a BaseShowTablesForTable command. diff --git a/go/vt/vttablet/tabletserver/health_streamer.go b/go/vt/vttablet/tabletserver/health_streamer.go index 36e856d3e8..b1d7bfcd8b 100644 --- a/go/vt/vttablet/tabletserver/health_streamer.go +++ b/go/vt/vttablet/tabletserver/health_streamer.go @@ -23,6 +23,10 @@ import ( "sync" "time" + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/timer" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "context" "github.com/golang/protobuf/proto" @@ -61,9 +65,14 @@ type healthStreamer struct { state *querypb.StreamHealthResponse history *history.History + + ticks *timer.Timer + conns *connpool.Pool + initSuccess bool } func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthStreamer { + reloadTime := env.Config().SchemaReloadIntervalSeconds.Get() return &healthStreamer{ stats: env.Stats(), degradedThreshold: env.Config().Healthcheck.DegradedThresholdSeconds.Get(), @@ -79,6 +88,13 @@ func newHealthStreamer(env tabletenv.Env, alias topodatapb.TabletAlias) *healthS }, history: history.New(5), + + ticks: timer.NewTimer(reloadTime), + // We need one connection for the reloader. + conns: connpool.NewPool(env, "", tabletenv.ConnPoolConfig{ + Size: 1, + IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, + }), } } @@ -97,6 +113,12 @@ func (hs *healthStreamer) Open() { return } hs.ctx, hs.cancel = context.WithCancel(context.TODO()) + hs.ticks.Start(func() { + if err := hs.Reload(); err != nil { + log.Errorf("periodic schema reload failed in health stream: %v", err) + } + }) + } func (hs *healthStreamer) Close() { @@ -255,3 +277,73 @@ func (hs *healthStreamer) SetUnhealthyThreshold(v time.Duration) { } } } + +func (hs *healthStreamer) Reload() error { + hs.mu.Lock() + defer hs.mu.Unlock() + + // Schema Reload to happen only on master. + if hs.state.Target.TabletType != topodatapb.TabletType_MASTER { + return nil + } + + ctx := hs.ctx + conn, err := hs.conns.Get(ctx) + if err != nil { + return err + } + defer conn.Recycle() + + if !hs.initSuccess { + hs.initSuccess, err = hs.InitSchemaLocked(conn) + if err != nil { + return err + } + } + + qr, err := conn.Exec(ctx, mysql.DetectSchemaChange, 5, false) + if err != nil { + return err + } + + // If no change detected, then return + if len(qr.Rows) == 0 { + return nil + } + + log.Info("schema change detected") + // TODO: add logic to notify vtgate + + // Reload the schema in a transaction. + + _, err = conn.Exec(ctx, "begin", 1, false) + if err != nil { + return err + } + defer conn.Exec(ctx, "rollback", 1, false) + + _, err = conn.Exec(ctx, mysql.ClearSchemaCopy, 1, false) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, mysql.InsertIntoSchemaCopy, 1, false) + if err != nil { + return err + } + + _, err = conn.Exec(ctx, "commit", 1, false) + if err != nil { + return err + } + + return nil +} + +func (hs *healthStreamer) InitSchemaLocked(conn *connpool.DBConn) (bool, error) { + _, err := conn.Exec(hs.ctx, mysql.CreateSchemaCopyTable, 1, false) + if err != nil { + return false, err + } + return true, nil +}