Update approach to not require another binary to run vtshovel

Signed-off-by: Rafael Chacon <rafael@slack-corp.com>
This commit is contained in:
Rafael Chacon 2019-10-17 16:27:56 -07:00
Родитель 8bf38cc699
Коммит b56bf67d7a
12 изменённых файлов: 254 добавлений и 399 удалений

Просмотреть файл

@ -1,196 +0,0 @@
/*
Copyright 2017 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"encoding/json"
"flag"
"io/ioutil"
"math/rand"
"time"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
var (
vtShovelConfigFile = flag.String("vtshovel-config-file", "/etc/slack.d/vtshovel.json", "VTShovel Config file")
dryRun = flag.Bool("dry-run", false, "When present, only log DML that are going to be performed in target database")
)
func init() {
rand.Seed(time.Now().UnixNano())
servenv.RegisterDefaultFlags()
}
// VtShovelConfig fields to configure vtshovel client
type VtShovelConfig struct {
// Source MySQL client information
// MySQLSourceHost ...
MySQLSourceHost string `json:"mysql_source_host"`
// MySQLSourcePort ...
MySQLSourcePort int `json:"mysql_source_port"`
// MySQLSourceUser ...
MySQLSourceUser string `json:"mysql_source_user"`
// MySQLSourcePassword ...
MySQLSourcePassword string `json:"mysql_source_password"`
// MySQLSourceBinlogStartPos ...
MySQLSourceBinlogStartPos string `json:"mysql_source_binlog_start_pos"`
// MySQLSourceDatabase ...
MySQLSourceDBName string `json:"mysql_source_dbname"`
}
func main() {
defer exit.Recover()
dbconfigs.RegisterFlags(dbconfigs.Dba)
mysqlctl.RegisterFlags()
servenv.ParseFlags("vtshovel")
servenv.Init()
servenv.OnRun(func() {
//vreplication.MySQLAddStatusPart()
// Flags are parsed now. Parse the template using the actual flag value and overwrite the current template.
//addStatusParts(vtg)
})
vtShovelConfig, err := loadConfigFromFile(*vtShovelConfigFile)
if err != nil {
log.Fatal(err)
}
sourceConnParams := mysql.ConnParams{
Host: vtShovelConfig.MySQLSourceHost,
Port: vtShovelConfig.MySQLSourcePort,
Pass: vtShovelConfig.MySQLSourcePassword,
Uname: vtShovelConfig.MySQLSourceUser,
DbName: vtShovelConfig.MySQLSourceDBName,
}
source := binlogdatapb.BinlogSource{
Filter: &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{
&binlogdatapb.Rule{
Match: "/.*",
},
},
},
}
var mycnf *mysqlctl.Mycnf
var socketFile string
// If no connection parameters were specified, load the mycnf file
// and use the socket from it. If connection parameters were specified,
// we assume that the mysql is not local, and we skip loading mycnf.
// This also means that backup and restore will not be allowed.
if !dbconfigs.HasConnectionParams() {
var err error
if mycnf, err = mysqlctl.NewMycnfFromFlags(123213123); err != nil {
log.Exitf("mycnf read failed: %v", err)
}
socketFile = mycnf.SocketFile
} else {
log.Info("connection parameters were specified. Not loading my.cnf.")
}
// If connection parameters were specified, socketFile will be empty.
// Otherwise, the socketFile (read from mycnf) will be used to initialize
// dbconfigs.
dbcfgs, err := dbconfigs.Init(socketFile)
if err != nil {
log.Warning(err)
}
mysqld := mysqlctl.NewMysqld(dbcfgs)
servenv.OnClose(mysqld.Close)
destConnParams := dbcfgs.Dba()
// Hack to make sure dbname is set correctly given that this is not a tablet
// and SetDBName is not called.
destConnParams.DbName = destConnParams.DeprecatedDBName
log.Infof("This are the destConnParams:%v", destConnParams)
destDbClient := binlogplayer.NewDBClient(destConnParams)
if err := destDbClient.Connect(); err != nil {
log.Fatal(vterrors.Wrap(err, "can't connect to database"))
}
servenv.OnClose(destDbClient.Close)
for _, query := range binlogplayer.CreateVReplicationTable() {
if _, err := destDbClient.ExecuteFetch(query, 0); err != nil {
log.Fatalf("Failed to ensure vreplication table exists: %v", err)
}
}
newVReplicatorStmt := binlogplayer.CreateVReplication("VTshovel", &source, vtShovelConfig.MySQLSourceBinlogStartPos, int64(1000), int64(100000), time.Now().Unix(), destDbClient.DBName())
res, err := destDbClient.ExecuteFetch(newVReplicatorStmt, 0)
if err != nil {
log.Fatalf("Failed to create vreplication stream: %v", err)
}
sourceVstreamClient := vreplication.NewMySQLVStreamerClient(&sourceConnParams)
go func() {
ctx := context.Background()
replicator := vreplication.NewVReplicator(
uint32(res.InsertID),
&source,
sourceVstreamClient,
binlogplayer.NewStats(),
destDbClient,
mysqld,
)
replicator.Replicate(ctx)
if err != nil {
log.Infof("Error with stream: %v", err)
}
return
}()
servenv.RunDefault()
}
func loadConfigFromFile(file string) (*VtShovelConfig, error) {
data, err := ioutil.ReadFile(file)
if err != nil {
return nil, vterrors.Wrapf(err, "Failed to read %v file", file)
}
vtShovelConfig := &VtShovelConfig{}
err = json.Unmarshal(data, vtShovelConfig)
if err != nil {
return nil, vterrors.Wrap(err, "Error parsing auth server config")
}
return vtShovelConfig, nil
}
type vtShovelDbClient struct {
dbClient binlogplayer.DBClient
startPos string
}

Просмотреть файл

@ -73,10 +73,11 @@ const (
Dba = "dba" Dba = "dba"
Filtered = "filtered" Filtered = "filtered"
Repl = "repl" Repl = "repl"
ExternalRepl = "erepl"
) )
// All can be used to register all flags: RegisterFlags(All...) // All can be used to register all flags: RegisterFlags(All...)
var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl} var All = []string{App, AppDebug, AllPrivs, Dba, Filtered, Repl, ExternalRepl}
// RegisterFlags registers the flags for the given DBConfigFlag. // RegisterFlags registers the flags for the given DBConfigFlag.
// For instance, vttablet will register client, dba and repl. // For instance, vttablet will register client, dba and repl.
@ -157,16 +158,26 @@ func (dbcfgs *DBConfigs) DbaWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Dba, true) return dbcfgs.makeParams(Dba, true)
} }
// FilteredWithDB returns connection parameters for appdebug with dbname set. // FilteredWithDB returns connection parameters for filtered with dbname set.
func (dbcfgs *DBConfigs) FilteredWithDB() *mysql.ConnParams { func (dbcfgs *DBConfigs) FilteredWithDB() *mysql.ConnParams {
return dbcfgs.makeParams(Filtered, true) return dbcfgs.makeParams(Filtered, true)
} }
// Repl returns connection parameters for appdebug with no dbname set. // Repl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) Repl() *mysql.ConnParams { func (dbcfgs *DBConfigs) Repl() *mysql.ConnParams {
return dbcfgs.makeParams(Repl, false) return dbcfgs.makeParams(Repl, false)
} }
// ExternalRepl returns connection parameters for repl with no dbname set.
func (dbcfgs *DBConfigs) ExternalRepl() *mysql.ConnParams {
return dbcfgs.makeParams(Repl, false)
}
// ExternalReplWithDb returns connection parameters for repl with dbname set.
func (dbcfgs *DBConfigs) ExternalReplWithDb() *mysql.ConnParams {
return dbcfgs.makeParams(Repl, true)
}
// AppWithDB returns connection parameters for app with dbname set. // AppWithDB returns connection parameters for app with dbname set.
func (dbcfgs *DBConfigs) makeParams(userKey string, withDB bool) *mysql.ConnParams { func (dbcfgs *DBConfigs) makeParams(userKey string, withDB bool) *mysql.ConnParams {
orig := dbcfgs.userConfigs[userKey] orig := dbcfgs.userConfigs[userKey]

Просмотреть файл

@ -716,6 +716,9 @@ type BinlogSource struct {
Filter *Filter `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"` Filter *Filter `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"`
// on_ddl specifies the action to be taken when a DDL is encountered. // on_ddl specifies the action to be taken when a DDL is encountered.
OnDdl OnDDLAction `protobuf:"varint,7,opt,name=on_ddl,json=onDdl,proto3,enum=binlogdata.OnDDLAction" json:"on_ddl,omitempty"` OnDdl OnDDLAction `protobuf:"varint,7,opt,name=on_ddl,json=onDdl,proto3,enum=binlogdata.OnDDLAction" json:"on_ddl,omitempty"`
// Source is an external mysql. This attribute should be set to the username
// to use in the connection
ExternalMysql string `protobuf:"bytes,8,opt,name=external_mysql,json=externalMysql,proto3" json:"external_mysql,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"` XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"` XXX_sizecache int32 `json:"-"`
@ -795,6 +798,13 @@ func (m *BinlogSource) GetOnDdl() OnDDLAction {
return OnDDLAction_IGNORE return OnDDLAction_IGNORE
} }
func (m *BinlogSource) GetExternalMysql() string {
if m != nil {
return m.ExternalMysql
}
return ""
}
// RowChange represents one row change // RowChange represents one row change
type RowChange struct { type RowChange struct {
Before *query.Row `protobuf:"bytes,1,opt,name=before,proto3" json:"before,omitempty"` Before *query.Row `protobuf:"bytes,1,opt,name=before,proto3" json:"before,omitempty"`
@ -1568,107 +1578,109 @@ func init() {
func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_5fd02bcb2e350dad) } func init() { proto.RegisterFile("binlogdata.proto", fileDescriptor_5fd02bcb2e350dad) }
var fileDescriptor_5fd02bcb2e350dad = []byte{ var fileDescriptor_5fd02bcb2e350dad = []byte{
// 1630 bytes of a gzipped FileDescriptorProto // 1652 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xcb, 0x72, 0xf3, 0x48, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xd4, 0x57, 0xdd, 0x72, 0xe3, 0x48,
0x15, 0x8e, 0x2d, 0xf9, 0x76, 0x94, 0x38, 0x4a, 0xe7, 0x82, 0x49, 0x31, 0x54, 0x46, 0xc5, 0x90, 0x15, 0x1e, 0x5b, 0xf2, 0xdf, 0x51, 0xe2, 0x28, 0x9d, 0x4c, 0x30, 0x53, 0x2c, 0x95, 0x55, 0x31,
0x90, 0x2a, 0x1c, 0x30, 0xf0, 0xb3, 0x1a, 0x06, 0x5f, 0x94, 0xc4, 0x89, 0x6c, 0xe7, 0x6f, 0x2b, 0x4c, 0x48, 0x15, 0x0e, 0x18, 0x18, 0xae, 0x96, 0xc5, 0x3f, 0x4a, 0xc6, 0x13, 0xd9, 0xce, 0xb4,
0x19, 0x6a, 0x36, 0x2a, 0xc5, 0xea, 0x24, 0x22, 0xb2, 0xe4, 0x5f, 0x6a, 0x3b, 0xe4, 0x01, 0x28, 0x95, 0x2c, 0xb5, 0x37, 0x2a, 0xc5, 0xea, 0x24, 0x22, 0xfa, 0xf1, 0x48, 0x6d, 0x67, 0xfd, 0x00,
0x1e, 0x80, 0x2d, 0x2f, 0xc0, 0x1a, 0xb6, 0x6c, 0xd9, 0xf3, 0x10, 0x3c, 0x00, 0x6f, 0x40, 0xf5, 0x14, 0x0f, 0xc0, 0x53, 0x70, 0x0d, 0xb7, 0x5c, 0x51, 0xc5, 0x3d, 0x0f, 0xc1, 0x03, 0xf0, 0x06,
0x45, 0xb2, 0x95, 0x0c, 0xf3, 0x67, 0xa8, 0x62, 0xc1, 0x46, 0x75, 0xfa, 0xf4, 0x39, 0xa7, 0xcf, 0x54, 0xff, 0x48, 0xb6, 0x32, 0xcb, 0x4e, 0x96, 0x2a, 0x2e, 0xf6, 0x46, 0x75, 0xfa, 0xf4, 0x39,
0xf9, 0xfa, 0x3b, 0xdd, 0x6a, 0xd0, 0x6f, 0xfd, 0x30, 0x88, 0xee, 0x3d, 0x97, 0xba, 0xcd, 0x59, 0xa7, 0xcf, 0xf9, 0xfa, 0xfc, 0xa8, 0x41, 0xbf, 0xf6, 0xa3, 0x20, 0xbe, 0xf5, 0x5c, 0xea, 0xb6,
0x1c, 0xd1, 0x08, 0xc1, 0x52, 0xb3, 0xaf, 0x2d, 0x68, 0x3c, 0x9b, 0x88, 0x89, 0x7d, 0xed, 0xc3, 0xe7, 0x49, 0x4c, 0x63, 0x04, 0x6b, 0xce, 0x0b, 0x6d, 0x49, 0x93, 0xf9, 0x4c, 0x6c, 0xbc, 0xd0,
0x9c, 0xc4, 0xcf, 0x72, 0x50, 0xa7, 0xd1, 0x2c, 0x5a, 0x7a, 0x19, 0x03, 0xa8, 0x74, 0x1f, 0xdc, 0xde, 0x2f, 0x48, 0xb2, 0x92, 0x8b, 0x26, 0x8d, 0xe7, 0xf1, 0x5a, 0xcb, 0x18, 0x41, 0xad, 0x7f,
0x38, 0x21, 0x14, 0xed, 0x41, 0x79, 0x12, 0xf8, 0x24, 0xa4, 0x8d, 0xc2, 0x41, 0xe1, 0xa8, 0x84, 0xe7, 0x26, 0x29, 0xa1, 0xe8, 0x00, 0xaa, 0xb3, 0xc0, 0x27, 0x11, 0x6d, 0x95, 0x0e, 0x4b, 0x47,
0xe5, 0x08, 0x21, 0x50, 0x27, 0x51, 0x18, 0x36, 0x8a, 0x5c, 0xcb, 0x65, 0x66, 0x9b, 0x90, 0x78, 0x15, 0x2c, 0x57, 0x08, 0x81, 0x3a, 0x8b, 0xa3, 0xa8, 0x55, 0xe6, 0x5c, 0x4e, 0x33, 0xd9, 0x94,
0x41, 0xe2, 0x86, 0x22, 0x6c, 0xc5, 0xc8, 0xf8, 0xa7, 0x02, 0x5b, 0x1d, 0x9e, 0x87, 0x1d, 0xbb, 0x24, 0x4b, 0x92, 0xb4, 0x14, 0x21, 0x2b, 0x56, 0xc6, 0xbf, 0x14, 0xd8, 0xed, 0x71, 0x3f, 0xec,
0x61, 0xe2, 0x4e, 0xa8, 0x1f, 0x85, 0xe8, 0x0c, 0x20, 0xa1, 0x2e, 0x25, 0x53, 0x12, 0xd2, 0xa4, 0xc4, 0x8d, 0x52, 0x77, 0x46, 0xfd, 0x38, 0x42, 0x67, 0x00, 0x29, 0x75, 0x29, 0x09, 0x49, 0x44,
0x51, 0x38, 0x50, 0x8e, 0xb4, 0xd6, 0x61, 0x73, 0xa5, 0x82, 0x57, 0x2e, 0xcd, 0x71, 0x6a, 0x8f, 0xd3, 0x56, 0xe9, 0x50, 0x39, 0xd2, 0x3a, 0xaf, 0xda, 0x1b, 0x11, 0x7c, 0xa0, 0xd2, 0x9e, 0x66,
0x57, 0x5c, 0x51, 0x0b, 0x34, 0xb2, 0x20, 0x21, 0x75, 0x68, 0xf4, 0x48, 0xc2, 0x86, 0x7a, 0x50, 0xf2, 0x78, 0x43, 0x15, 0x75, 0x40, 0x23, 0x4b, 0x12, 0x51, 0x87, 0xc6, 0xf7, 0x24, 0x6a, 0xa9,
0x38, 0xd2, 0x5a, 0x5b, 0x4d, 0x51, 0xa0, 0xc9, 0x66, 0x6c, 0x36, 0x81, 0x81, 0x64, 0xf2, 0xfe, 0x87, 0xa5, 0x23, 0xad, 0xb3, 0xdb, 0x16, 0x01, 0x9a, 0x6c, 0xc7, 0x66, 0x1b, 0x18, 0x48, 0x4e,
0xdf, 0x8b, 0x50, 0xcb, 0xa2, 0x21, 0x0b, 0xaa, 0x13, 0x97, 0x92, 0xfb, 0x28, 0x7e, 0xe6, 0x65, 0xbf, 0xf8, 0x47, 0x19, 0x1a, 0xb9, 0x35, 0x64, 0x41, 0x7d, 0xe6, 0x52, 0x72, 0x1b, 0x27, 0x2b,
0xd6, 0x5b, 0x3f, 0x79, 0x63, 0x22, 0xcd, 0xae, 0xf4, 0xc3, 0x59, 0x04, 0xf4, 0x63, 0xa8, 0x4c, 0x1e, 0x66, 0xb3, 0xf3, 0xb3, 0x27, 0x3a, 0xd2, 0xee, 0x4b, 0x3d, 0x9c, 0x5b, 0x40, 0x3f, 0x85,
0x04, 0x7a, 0x1c, 0x1d, 0xad, 0xb5, 0xbd, 0x1a, 0x4c, 0x02, 0x8b, 0x53, 0x1b, 0xa4, 0x83, 0x92, 0xda, 0x4c, 0xa0, 0xc7, 0xd1, 0xd1, 0x3a, 0x7b, 0x9b, 0xc6, 0x24, 0xb0, 0x38, 0x93, 0x41, 0x3a,
0x7c, 0x08, 0x38, 0x64, 0xeb, 0x98, 0x89, 0xc6, 0x9f, 0x0b, 0x50, 0x4d, 0xe3, 0xa2, 0x6d, 0xd8, 0x28, 0xe9, 0xfb, 0x80, 0x43, 0xb6, 0x85, 0x19, 0x69, 0xfc, 0xb9, 0x04, 0xf5, 0xcc, 0x2e, 0xda,
0xec, 0x58, 0xce, 0xf5, 0x10, 0x9b, 0xdd, 0xd1, 0xd9, 0xb0, 0xff, 0x95, 0xd9, 0xd3, 0xd7, 0xd0, 0x83, 0x9d, 0x9e, 0xe5, 0x5c, 0x8e, 0xb1, 0xd9, 0x9f, 0x9c, 0x8d, 0x87, 0x5f, 0x9a, 0x03, 0xfd,
0x3a, 0x54, 0x3b, 0x96, 0xd3, 0x31, 0xcf, 0xfa, 0x43, 0xbd, 0x80, 0x36, 0xa0, 0xd6, 0xb1, 0x9c, 0x19, 0xda, 0x82, 0x7a, 0xcf, 0x72, 0x7a, 0xe6, 0xd9, 0x70, 0xac, 0x97, 0xd0, 0x36, 0x34, 0x7a,
0xee, 0x68, 0x30, 0xe8, 0xdb, 0x7a, 0x11, 0x6d, 0x82, 0xd6, 0xb1, 0x1c, 0x3c, 0xb2, 0xac, 0x4e, 0x96, 0xd3, 0x9f, 0x8c, 0x46, 0x43, 0x5b, 0x2f, 0xa3, 0x1d, 0xd0, 0x7a, 0x96, 0x83, 0x27, 0x96,
0xbb, 0x7b, 0xa9, 0x2b, 0x68, 0x17, 0xb6, 0x3a, 0x96, 0xd3, 0x1b, 0x58, 0x4e, 0xcf, 0xbc, 0xc2, 0xd5, 0xeb, 0xf6, 0xcf, 0x75, 0x05, 0x3d, 0x87, 0xdd, 0x9e, 0xe5, 0x0c, 0x46, 0x96, 0x33, 0x30,
0x66, 0xb7, 0x6d, 0x9b, 0x3d, 0x5d, 0x45, 0x00, 0x65, 0xa6, 0xee, 0x59, 0x7a, 0x49, 0xca, 0x63, 0x2f, 0xb0, 0xd9, 0xef, 0xda, 0xe6, 0x40, 0x57, 0x11, 0x40, 0x95, 0xb1, 0x07, 0x96, 0x5e, 0x91,
0xd3, 0xd6, 0xcb, 0x32, 0x5c, 0x7f, 0x38, 0x36, 0xb1, 0xad, 0x57, 0xe4, 0xf0, 0xfa, 0xaa, 0xd7, 0xf4, 0xd4, 0xb4, 0xf5, 0xaa, 0x34, 0x37, 0x1c, 0x4f, 0x4d, 0x6c, 0xeb, 0x35, 0xb9, 0xbc, 0xbc,
0xb6, 0x4d, 0xbd, 0x2a, 0x87, 0x3d, 0xd3, 0x32, 0x6d, 0x53, 0xaf, 0x5d, 0xa8, 0xd5, 0xa2, 0xae, 0x18, 0x74, 0x6d, 0x53, 0xaf, 0xcb, 0xe5, 0xc0, 0xb4, 0x4c, 0xdb, 0xd4, 0x1b, 0x6f, 0xd5, 0x7a,
0x5c, 0xa8, 0x55, 0x45, 0x57, 0x8d, 0x3f, 0x16, 0x60, 0x77, 0x4c, 0x63, 0xe2, 0x4e, 0x2f, 0xc9, 0x59, 0x57, 0xde, 0xaa, 0x75, 0x45, 0x57, 0x8d, 0x3f, 0x95, 0xe0, 0xf9, 0x94, 0x26, 0xc4, 0x0d,
0x33, 0x76, 0xc3, 0x7b, 0x82, 0xc9, 0x87, 0x39, 0x49, 0x28, 0xda, 0x87, 0xea, 0x2c, 0x4a, 0x7c, 0xcf, 0xc9, 0x0a, 0xbb, 0xd1, 0x2d, 0xc1, 0xe4, 0xfd, 0x82, 0xa4, 0x14, 0xbd, 0x80, 0xfa, 0x3c,
0x86, 0x1d, 0x07, 0xb8, 0x86, 0xb3, 0x31, 0x3a, 0x81, 0xda, 0x23, 0x79, 0x76, 0x62, 0x66, 0x2f, 0x4e, 0x7d, 0x86, 0x1d, 0x07, 0xb8, 0x81, 0xf3, 0x35, 0x3a, 0x81, 0xc6, 0x3d, 0x59, 0x39, 0x09,
0x01, 0x43, 0xcd, 0x8c, 0x90, 0x59, 0xa4, 0xea, 0xa3, 0x94, 0x56, 0xf1, 0x55, 0x3e, 0x8e, 0xaf, 0x93, 0x97, 0x80, 0xa1, 0x76, 0x9e, 0x90, 0xb9, 0xa5, 0xfa, 0xbd, 0xa4, 0x36, 0xf1, 0x55, 0x3e,
0x71, 0x07, 0x7b, 0x2f, 0x93, 0x4a, 0x66, 0x51, 0x98, 0x10, 0x64, 0x01, 0x12, 0x8e, 0x0e, 0x5d, 0x8e, 0xaf, 0x71, 0x03, 0x07, 0x8f, 0x9d, 0x4a, 0xe7, 0x71, 0x94, 0x12, 0x64, 0x01, 0x12, 0x8a,
0xee, 0x2d, 0xcf, 0x4f, 0x6b, 0x7d, 0xf2, 0x8d, 0x04, 0xc0, 0x5b, 0xb7, 0x2f, 0x55, 0xc6, 0xef, 0x0e, 0x5d, 0xdf, 0x2d, 0xf7, 0x4f, 0xeb, 0x7c, 0xf2, 0x8d, 0x09, 0x80, 0x77, 0xaf, 0x1f, 0xb3,
0x60, 0x5b, 0xac, 0x63, 0xbb, 0xb7, 0x01, 0x49, 0xde, 0x52, 0xfa, 0x1e, 0x94, 0x29, 0x37, 0x6e, 0x8c, 0xaf, 0x60, 0x4f, 0x9c, 0x63, 0xbb, 0xd7, 0x01, 0x49, 0x9f, 0x12, 0xfa, 0x01, 0x54, 0x29,
0x14, 0x0f, 0x94, 0xa3, 0x1a, 0x96, 0xa3, 0x6f, 0x5b, 0xa1, 0x07, 0x3b, 0xf9, 0x95, 0xff, 0x27, 0x17, 0x6e, 0x95, 0x0f, 0x95, 0xa3, 0x06, 0x96, 0xab, 0x6f, 0x1b, 0xa1, 0x07, 0xfb, 0xc5, 0x93,
0xf5, 0xfd, 0x1c, 0x54, 0x3c, 0x0f, 0x08, 0xda, 0x81, 0xd2, 0xd4, 0xa5, 0x93, 0x07, 0x59, 0x8d, 0xff, 0x2f, 0xf1, 0xfd, 0x12, 0x54, 0xbc, 0x08, 0x08, 0xda, 0x87, 0x4a, 0xe8, 0xd2, 0xd9, 0x9d,
0x18, 0xb0, 0x52, 0xee, 0xfc, 0x80, 0x92, 0x98, 0x6f, 0x61, 0x0d, 0xcb, 0x91, 0xf1, 0x97, 0x02, 0x8c, 0x46, 0x2c, 0x58, 0x28, 0x37, 0x7e, 0x40, 0x49, 0xc2, 0xaf, 0xb0, 0x81, 0xe5, 0xca, 0xf8,
0x94, 0x4f, 0xb9, 0x88, 0x7e, 0x08, 0xa5, 0x78, 0xce, 0x8a, 0x15, 0xbd, 0xae, 0xaf, 0x66, 0xc0, 0x4b, 0x09, 0xaa, 0xa7, 0x9c, 0x44, 0x3f, 0x86, 0x4a, 0xb2, 0x60, 0xc1, 0x8a, 0x5a, 0xd7, 0x37,
0x22, 0x63, 0x31, 0x8d, 0xfa, 0x50, 0xbf, 0xf3, 0x49, 0xe0, 0xf1, 0xd6, 0x1d, 0x44, 0x9e, 0x60, 0x3d, 0x60, 0x96, 0xb1, 0xd8, 0x46, 0x43, 0x68, 0xde, 0xf8, 0x24, 0xf0, 0x78, 0xe9, 0x8e, 0x62,
0x45, 0xbd, 0xf5, 0xe9, 0xaa, 0x83, 0x88, 0xd9, 0x3c, 0xcd, 0x19, 0xe2, 0x17, 0x8e, 0xc6, 0x3b, 0x4f, 0x64, 0x45, 0xb3, 0xf3, 0xe9, 0xa6, 0x82, 0xb0, 0xd9, 0x3e, 0x2d, 0x08, 0xe2, 0x47, 0x8a,
0xa8, 0xe7, 0x2d, 0x58, 0x3b, 0x99, 0x18, 0x3b, 0xa3, 0xa1, 0x33, 0xe8, 0x8f, 0x07, 0x6d, 0xbb, 0xc6, 0x6b, 0x68, 0x16, 0x25, 0x58, 0x39, 0x99, 0x18, 0x3b, 0x93, 0xb1, 0x33, 0x1a, 0x4e, 0x47,
0x7b, 0xae, 0xaf, 0xf1, 0x8e, 0x31, 0xc7, 0xb6, 0x63, 0x9e, 0x9e, 0x8e, 0xb0, 0xad, 0x17, 0x8c, 0x5d, 0xbb, 0xff, 0x46, 0x7f, 0xc6, 0x2b, 0xc6, 0x9c, 0xda, 0x8e, 0x79, 0x7a, 0x3a, 0xc1, 0xb6,
0x3f, 0x15, 0x61, 0x5d, 0x80, 0x32, 0x8e, 0xe6, 0xf1, 0x84, 0xb0, 0x5d, 0x7c, 0x24, 0xcf, 0xc9, 0x5e, 0x32, 0xfe, 0x5e, 0x86, 0x2d, 0x01, 0xca, 0x34, 0x5e, 0x24, 0x33, 0xc2, 0x6e, 0xf1, 0x9e,
0xcc, 0x9d, 0x90, 0x74, 0x17, 0xd3, 0x31, 0x03, 0x24, 0x79, 0x70, 0x63, 0x4f, 0x56, 0x2e, 0x06, 0xac, 0xd2, 0xb9, 0x3b, 0x23, 0xd9, 0x2d, 0x66, 0x6b, 0x06, 0x48, 0x7a, 0xe7, 0x26, 0x9e, 0x8c,
0xe8, 0x17, 0xa0, 0xf1, 0xdd, 0xa4, 0x0e, 0x7d, 0x9e, 0x11, 0xbe, 0x8f, 0xf5, 0xd6, 0xce, 0x92, 0x5c, 0x2c, 0xd0, 0xaf, 0x40, 0xe3, 0xb7, 0x49, 0x1d, 0xba, 0x9a, 0x13, 0x7e, 0x8f, 0xcd, 0xce,
0xd8, 0x7c, 0xaf, 0xa8, 0xfd, 0x3c, 0x23, 0x18, 0x68, 0x26, 0xe7, 0xbb, 0x41, 0x7d, 0x43, 0x37, 0xfe, 0x3a, 0xb1, 0xf9, 0x5d, 0x51, 0x7b, 0x35, 0x27, 0x18, 0x68, 0x4e, 0x17, 0xab, 0x41, 0x7d,
0x2c, 0x39, 0x54, 0xca, 0x71, 0xe8, 0x38, 0xdb, 0x90, 0xb2, 0x8c, 0xf2, 0x0a, 0xbd, 0x74, 0x93, 0x42, 0x35, 0xac, 0x73, 0xa8, 0x52, 0xc8, 0xa1, 0xe3, 0xfc, 0x42, 0xaa, 0xd2, 0xca, 0x07, 0xe8,
0x50, 0x13, 0xca, 0x51, 0xe8, 0x78, 0x5e, 0xd0, 0xa8, 0xf0, 0x34, 0xbf, 0xb3, 0x6a, 0x3b, 0x0a, 0x65, 0x97, 0x84, 0xda, 0x50, 0x8d, 0x23, 0xc7, 0xf3, 0x82, 0x56, 0x8d, 0xbb, 0xf9, 0xbd, 0x4d,
0x7b, 0x3d, 0xab, 0x2d, 0x68, 0x51, 0x8a, 0xc2, 0x9e, 0x17, 0x18, 0xef, 0xa1, 0x86, 0xa3, 0xa7, 0xd9, 0x49, 0x34, 0x18, 0x58, 0x5d, 0x91, 0x16, 0x95, 0x38, 0x1a, 0x78, 0x01, 0x7a, 0x09, 0x4d,
0xee, 0x03, 0x4f, 0xc0, 0x80, 0xf2, 0x2d, 0xb9, 0x8b, 0x62, 0x22, 0x99, 0x05, 0xf2, 0xe4, 0xc5, 0xf2, 0x15, 0x25, 0x49, 0xe4, 0x06, 0x4e, 0xb8, 0x62, 0xdd, 0xab, 0xce, 0x43, 0xdf, 0xce, 0xb8,
0xd1, 0x13, 0x96, 0x33, 0xe8, 0x00, 0x4a, 0xee, 0x5d, 0x4a, 0x8e, 0xbc, 0x89, 0x98, 0x30, 0x5c, 0x23, 0xc6, 0x34, 0xde, 0x41, 0x03, 0xc7, 0x0f, 0xfd, 0x3b, 0xee, 0xa7, 0x01, 0xd5, 0x6b, 0x72,
0xa8, 0xe2, 0xe8, 0x89, 0xef, 0x13, 0xfa, 0x04, 0x04, 0x22, 0x4e, 0xe8, 0x4e, 0x53, 0xb8, 0x6b, 0x13, 0x27, 0x44, 0x26, 0x20, 0xc8, 0x06, 0x8d, 0xe3, 0x07, 0x2c, 0x77, 0xd0, 0x21, 0x54, 0xdc,
0x5c, 0x33, 0x74, 0xa7, 0x04, 0xbd, 0x03, 0x2d, 0x8e, 0x9e, 0x9c, 0x09, 0x5f, 0x5e, 0xb4, 0x8e, 0x9b, 0x2c, 0x87, 0x8a, 0x22, 0x62, 0xc3, 0x70, 0xa1, 0x8e, 0xe3, 0x07, 0x7e, 0x9d, 0xe8, 0x13,
0xd6, 0xda, 0xcd, 0xb1, 0x29, 0x4d, 0x0e, 0x43, 0x9c, 0x8a, 0x89, 0xf1, 0x1e, 0x60, 0x49, 0x86, 0x10, 0xc0, 0x39, 0x91, 0x1b, 0x66, 0xb7, 0xd2, 0xe0, 0x9c, 0xb1, 0x1b, 0x12, 0xf4, 0x1a, 0xb4,
0x8f, 0x2d, 0xf2, 0x03, 0x06, 0x1f, 0x09, 0xbc, 0x34, 0xfe, 0xba, 0x4c, 0x99, 0x47, 0xc0, 0x72, 0x24, 0x7e, 0x70, 0x66, 0xfc, 0x78, 0x51, 0x61, 0x5a, 0xe7, 0x79, 0x21, 0xe9, 0x32, 0xe7, 0x30,
0x8e, 0x01, 0x31, 0x66, 0xbb, 0x7d, 0x46, 0x7d, 0xef, 0xbf, 0xe0, 0x08, 0x02, 0xf5, 0x9e, 0xfa, 0x24, 0x19, 0x99, 0x1a, 0xef, 0x00, 0xd6, 0x39, 0xf3, 0xb1, 0x43, 0x7e, 0xc4, 0x50, 0x26, 0x81,
0x1e, 0x27, 0x47, 0x0d, 0x73, 0xd9, 0xf8, 0x02, 0x4a, 0x37, 0x3c, 0xdc, 0x3b, 0xd0, 0xb8, 0x95, 0x97, 0xd9, 0xdf, 0x92, 0x2e, 0x73, 0x0b, 0x58, 0xee, 0x31, 0x20, 0xa6, 0x2c, 0x29, 0xce, 0xa8,
0xc3, 0xd4, 0x69, 0xd3, 0xe4, 0xca, 0xcc, 0x96, 0xc6, 0x90, 0xa4, 0x62, 0x62, 0xb4, 0x61, 0xe3, 0xef, 0xfd, 0x0f, 0xa9, 0x84, 0x40, 0xbd, 0xa5, 0xbe, 0xc7, 0x73, 0xa8, 0x81, 0x39, 0x6d, 0x7c,
0x52, 0x2e, 0xcb, 0x0d, 0xbe, 0x7d, 0x5e, 0xc6, 0x5f, 0x8b, 0x50, 0xb9, 0x88, 0xe6, 0x71, 0xe8, 0x0e, 0x95, 0x2b, 0x6e, 0xee, 0x35, 0x68, 0x5c, 0xca, 0x61, 0xec, 0xac, 0xb6, 0x0a, 0x61, 0xe6,
0x06, 0xa8, 0x0e, 0x45, 0xdf, 0xe3, 0x7e, 0x0a, 0x2e, 0xfa, 0x1e, 0xfa, 0x35, 0xd4, 0xa7, 0xfe, 0x47, 0x63, 0x48, 0x33, 0x32, 0x35, 0xba, 0xb0, 0x7d, 0x2e, 0x8f, 0xe5, 0x02, 0xdf, 0xde, 0x2f,
0x7d, 0xec, 0x32, 0x3e, 0x08, 0x6a, 0x8b, 0xee, 0xfc, 0xee, 0x6a, 0x66, 0x83, 0xd4, 0x82, 0xf3, 0xe3, 0xaf, 0x65, 0xa8, 0xbd, 0x8d, 0x17, 0xec, 0xc2, 0x51, 0x13, 0xca, 0xbe, 0xc7, 0xf5, 0x14,
0x7b, 0x63, 0xba, 0x3a, 0x5c, 0x61, 0xac, 0x92, 0x63, 0xec, 0x67, 0x50, 0x0f, 0xa2, 0x89, 0x1b, 0x5c, 0xf6, 0x3d, 0xf4, 0x5b, 0x68, 0x86, 0xfe, 0x6d, 0xe2, 0xb2, 0xb4, 0x11, 0x15, 0x20, 0x8a,
0x38, 0xd9, 0x79, 0xa9, 0xf2, 0xa4, 0x36, 0xb8, 0xf6, 0x2a, 0x3d, 0x34, 0x5f, 0xe0, 0x52, 0x7a, 0xf8, 0xfb, 0x9b, 0x9e, 0x8d, 0x32, 0x09, 0x5e, 0x06, 0xdb, 0xe1, 0xe6, 0x72, 0x23, 0xb1, 0x95,
0x23, 0x2e, 0xe8, 0x73, 0x58, 0x9f, 0xb9, 0x31, 0xf5, 0x27, 0xfe, 0xcc, 0x65, 0x7f, 0x1c, 0x65, 0x42, 0x62, 0xbf, 0x84, 0x66, 0x10, 0xcf, 0xdc, 0xc0, 0xc9, 0xdb, 0xaa, 0x2a, 0x92, 0x8f, 0x73,
0xee, 0x98, 0x4b, 0x3b, 0x87, 0x1b, 0xce, 0x99, 0xa3, 0x4f, 0x61, 0x3d, 0x26, 0x0b, 0x12, 0x27, 0x2f, 0xb2, 0xde, 0xfa, 0x08, 0x97, 0xca, 0x13, 0x71, 0x41, 0x9f, 0xc1, 0xd6, 0xdc, 0x4d, 0xa8,
0xc4, 0x73, 0xd8, 0xba, 0x95, 0x03, 0xe5, 0x48, 0xc1, 0x5a, 0xaa, 0xeb, 0x7b, 0x89, 0xf1, 0xaf, 0x3f, 0xf3, 0xe7, 0x2e, 0xfb, 0x31, 0xa9, 0x72, 0xc5, 0x82, 0xdb, 0x05, 0xdc, 0x70, 0x41, 0x1c,
0x22, 0x94, 0x6f, 0x04, 0xbb, 0x8e, 0x41, 0xe5, 0xd8, 0x88, 0xbf, 0x89, 0xbd, 0xd5, 0x45, 0x84, 0x7d, 0x0a, 0x5b, 0x09, 0x59, 0x92, 0x24, 0x25, 0x9e, 0xc3, 0xce, 0xad, 0x1d, 0x2a, 0x47, 0x0a,
0x05, 0x07, 0x86, 0xdb, 0xa0, 0xef, 0x41, 0x8d, 0xfa, 0x53, 0x92, 0x50, 0x77, 0x3a, 0xe3, 0x60, 0xd6, 0x32, 0xde, 0xd0, 0x4b, 0x8d, 0x7f, 0x97, 0xa1, 0x7a, 0x25, 0xb2, 0xeb, 0x18, 0x54, 0x8e,
0x2a, 0x78, 0xa9, 0xf8, 0x3a, 0x8e, 0xb0, 0x5f, 0x06, 0xd6, 0xac, 0x02, 0x1e, 0x26, 0xa2, 0x9f, 0x8d, 0xf8, 0xe9, 0x38, 0xd8, 0x3c, 0x44, 0x48, 0x70, 0x60, 0xb8, 0x0c, 0xfa, 0x01, 0x34, 0xa8,
0x42, 0x8d, 0xf5, 0x04, 0xff, 0xc3, 0x69, 0x94, 0x78, 0x93, 0xed, 0xbc, 0xe8, 0x08, 0xbe, 0x2c, 0x1f, 0x92, 0x94, 0xba, 0xe1, 0x9c, 0x83, 0xa9, 0xe0, 0x35, 0xe3, 0xeb, 0x72, 0x84, 0xfd, 0x59,
0xae, 0xc6, 0x69, 0x97, 0xfd, 0x12, 0x34, 0xce, 0x62, 0xe9, 0x24, 0x4e, 0x89, 0xbd, 0xfc, 0x29, 0xb0, 0x9a, 0x16, 0xf0, 0x30, 0x12, 0xfd, 0x1c, 0x1a, 0xac, 0x26, 0xf8, 0x8f, 0x50, 0xab, 0xc2,
0x91, 0x76, 0x0b, 0x86, 0xe5, 0xc1, 0x8a, 0x0e, 0xa1, 0xb4, 0xe0, 0x29, 0x55, 0xe4, 0x9f, 0xd6, 0x8b, 0x6c, 0xff, 0x51, 0x45, 0xf0, 0x63, 0x71, 0x3d, 0xc9, 0xaa, 0xec, 0xd7, 0xa0, 0xf1, 0x2c,
0x6a, 0x71, 0x1c, 0x76, 0x31, 0xcf, 0xae, 0xb1, 0xdf, 0x0a, 0x16, 0x35, 0xaa, 0xaf, 0xaf, 0x31, 0x96, 0x4a, 0xa2, 0x99, 0x1c, 0x14, 0x9b, 0x49, 0x56, 0x2d, 0x18, 0xd6, 0xfd, 0x17, 0xbd, 0x82,
0x49, 0x30, 0x9c, 0xda, 0xf0, 0xaa, 0xa6, 0x41, 0xa3, 0x26, 0xab, 0x9a, 0x06, 0x0c, 0xf3, 0xc9, 0xca, 0x92, 0xbb, 0x54, 0x93, 0x3f, 0x64, 0x9b, 0xc1, 0x71, 0xd8, 0xc5, 0x3e, 0x9b, 0x76, 0xbf,
0x3c, 0x8e, 0xf9, 0xbf, 0x9d, 0x3f, 0x25, 0x8d, 0x1d, 0x0e, 0x8e, 0x26, 0x75, 0xb6, 0x3f, 0x25, 0x17, 0x59, 0xc4, 0xdb, 0xc8, 0xa3, 0x69, 0x27, 0x13, 0x0c, 0x67, 0x32, 0x3c, 0xaa, 0x30, 0x68,
0xc6, 0x1f, 0x8a, 0x50, 0xbf, 0x11, 0xb7, 0x5f, 0x7a, 0xe3, 0x7e, 0x01, 0xdb, 0xe4, 0xee, 0x8e, 0x35, 0x64, 0x54, 0x61, 0xc0, 0x30, 0x9f, 0x2d, 0x92, 0x84, 0xff, 0x02, 0xfa, 0x21, 0x69, 0xed,
0x4c, 0xa8, 0xbf, 0x20, 0xce, 0xc4, 0x0d, 0x02, 0x12, 0x3b, 0x92, 0xc2, 0x5a, 0x6b, 0xb3, 0x29, 0x73, 0x70, 0x34, 0xc9, 0xb3, 0xfd, 0x90, 0x18, 0x7f, 0x2c, 0x43, 0xf3, 0x4a, 0x0c, 0xc9, 0x6c,
0xfe, 0x82, 0xbb, 0x5c, 0xdf, 0xef, 0xe1, 0xad, 0xcc, 0x56, 0xaa, 0x3c, 0x64, 0xc2, 0xb6, 0x3f, 0x30, 0x7f, 0x0e, 0x7b, 0xe4, 0xe6, 0x86, 0xcc, 0xa8, 0xbf, 0x24, 0xce, 0xcc, 0x0d, 0x02, 0x92,
0x9d, 0x12, 0xcf, 0x77, 0xe9, 0x6a, 0x00, 0x71, 0x76, 0xed, 0xca, 0x83, 0xe0, 0xc6, 0x3e, 0x73, 0x38, 0x32, 0x85, 0xb5, 0xce, 0x4e, 0x5b, 0xfc, 0x2c, 0xf7, 0x39, 0x7f, 0x38, 0xc0, 0xbb, 0xb9,
0x29, 0x59, 0x86, 0xc9, 0x3c, 0xb2, 0x30, 0x9f, 0x31, 0x9e, 0xc7, 0xf7, 0xd9, 0x25, 0xbe, 0x21, 0xac, 0x64, 0x79, 0xc8, 0x84, 0x3d, 0x3f, 0x0c, 0x89, 0xe7, 0xbb, 0x74, 0xd3, 0x80, 0xe8, 0x5d,
0x3d, 0x6d, 0xae, 0xc4, 0x72, 0x32, 0xf7, 0x83, 0xa0, 0xbe, 0xf8, 0x41, 0x58, 0x1e, 0xe2, 0xa5, 0xcf, 0x65, 0x23, 0xb8, 0xb2, 0xcf, 0x5c, 0x4a, 0xd6, 0x66, 0x72, 0x8d, 0xdc, 0xcc, 0x4b, 0x96,
0x8f, 0x1d, 0xe2, 0xc6, 0xe7, 0xb0, 0x99, 0x01, 0x21, 0x7f, 0x00, 0x8e, 0xa1, 0xcc, 0x37, 0x37, 0xe7, 0xc9, 0x6d, 0x3e, 0xeb, 0xb7, 0xa5, 0xa6, 0xcd, 0x99, 0x58, 0x6e, 0x16, 0xfe, 0x23, 0xd4,
0x3d, 0x3d, 0xd0, 0x6b, 0x1e, 0x62, 0x69, 0x61, 0xfc, 0xbe, 0x08, 0x28, 0xf5, 0x8f, 0x9e, 0x92, 0x47, 0xff, 0x11, 0xeb, 0x5e, 0x5f, 0xf9, 0x58, 0xaf, 0x37, 0x3e, 0x83, 0x9d, 0x1c, 0x08, 0xf9,
0xff, 0x53, 0x30, 0x77, 0xa0, 0xc4, 0xf5, 0x12, 0x49, 0x31, 0x60, 0x38, 0x04, 0x6e, 0x42, 0x67, 0x9f, 0x70, 0x0c, 0x55, 0x7e, 0xb9, 0x59, 0xf7, 0x40, 0x1f, 0xe6, 0x21, 0x96, 0x12, 0xc6, 0x1f,
0x8f, 0x19, 0x8c, 0xc2, 0xf9, 0x3d, 0xfb, 0x62, 0x92, 0xcc, 0x03, 0x8a, 0xa5, 0x85, 0xf1, 0xb7, 0xca, 0x80, 0x32, 0xfd, 0xf8, 0x21, 0xfd, 0x8e, 0x82, 0xb9, 0x0f, 0x15, 0xce, 0x97, 0x48, 0x8a,
0x02, 0x6c, 0xe7, 0x70, 0x90, 0x58, 0x2e, 0x2f, 0x84, 0xc2, 0x7f, 0xbe, 0x10, 0xd0, 0x11, 0x54, 0x05, 0xc3, 0x21, 0x70, 0x53, 0x3a, 0xbf, 0xcf, 0x61, 0x14, 0xca, 0xef, 0xd8, 0x17, 0x93, 0x74,
0x67, 0x8f, 0xdf, 0x70, 0x71, 0x64, 0xb3, 0x5f, 0xdb, 0xd7, 0xdf, 0x07, 0x35, 0x8e, 0x9e, 0x92, 0x11, 0x50, 0x2c, 0x25, 0x8c, 0xbf, 0x95, 0x60, 0xaf, 0x80, 0x83, 0xc4, 0x72, 0x3d, 0x10, 0x4a,
0x86, 0xca, 0x3d, 0x57, 0x6f, 0x49, 0xae, 0x67, 0x57, 0x6d, 0xae, 0x8e, 0xdc, 0x55, 0x2b, 0x66, 0xff, 0x7d, 0x20, 0xa0, 0x23, 0xa8, 0xcf, 0xef, 0xbf, 0x61, 0x70, 0xe4, 0xbb, 0x5f, 0x5b, 0xd7,
0x8e, 0x7f, 0x05, 0xda, 0xca, 0x8d, 0xcd, 0x7e, 0xec, 0xfb, 0x67, 0xc3, 0x11, 0x36, 0xf5, 0x35, 0x3f, 0x04, 0x35, 0x89, 0x1f, 0xd2, 0x96, 0xca, 0x35, 0x37, 0xa7, 0x24, 0xe7, 0xb3, 0x51, 0x5b,
0x54, 0x05, 0x75, 0x6c, 0x8f, 0xae, 0xf4, 0x02, 0x93, 0xcc, 0xdf, 0x98, 0x5d, 0xf1, 0x58, 0x60, 0x88, 0xa3, 0x30, 0x6a, 0xc5, 0xce, 0xf1, 0x6f, 0x40, 0xdb, 0x18, 0xec, 0xec, 0xff, 0x7f, 0x78,
0x92, 0x23, 0x8d, 0x94, 0xe3, 0x7f, 0x14, 0x00, 0x96, 0x47, 0x14, 0xd2, 0xa0, 0x72, 0x3d, 0xbc, 0x36, 0x9e, 0x60, 0x53, 0x7f, 0x86, 0xea, 0xa0, 0x4e, 0xed, 0xc9, 0x85, 0x5e, 0x62, 0x94, 0xf9,
0x1c, 0x8e, 0xbe, 0x1c, 0x8a, 0x00, 0x67, 0x76, 0xbf, 0xa7, 0x17, 0x50, 0x0d, 0x4a, 0xe2, 0xf5, 0x3b, 0xb3, 0x2f, 0xde, 0x14, 0x8c, 0x72, 0xa4, 0x90, 0x72, 0xfc, 0xcf, 0x12, 0xc0, 0xba, 0x45,
0x51, 0x64, 0x2b, 0xc8, 0xa7, 0x87, 0xc2, 0xde, 0x25, 0xd9, 0xbb, 0x43, 0x45, 0x15, 0x50, 0xb2, 0x21, 0x0d, 0x6a, 0x97, 0xe3, 0xf3, 0xf1, 0xe4, 0x8b, 0xb1, 0x30, 0x70, 0x66, 0x0f, 0x07, 0x7a,
0xd7, 0x85, 0x7c, 0x4e, 0x94, 0x59, 0x40, 0x6c, 0x5e, 0x59, 0xed, 0xae, 0xa9, 0x57, 0xd8, 0x44, 0x09, 0x35, 0xa0, 0x22, 0x1e, 0x29, 0x65, 0x76, 0x82, 0x7c, 0xa1, 0x28, 0xec, 0xf9, 0x92, 0x3f,
0xf6, 0xb0, 0x00, 0x28, 0xa7, 0xaf, 0x0a, 0xe6, 0xc9, 0xde, 0x22, 0xc0, 0xd6, 0x19, 0xd9, 0xe7, 0x4f, 0x54, 0x54, 0x03, 0x25, 0x7f, 0x84, 0xc8, 0x57, 0x47, 0x95, 0x19, 0xc4, 0xe6, 0x85, 0xd5,
0x26, 0xd6, 0x35, 0xa6, 0xc3, 0xa3, 0x2f, 0xf5, 0x75, 0xa6, 0x3b, 0xed, 0x9b, 0x56, 0x4f, 0xdf, 0xed, 0x9b, 0x7a, 0x8d, 0x6d, 0xe4, 0xef, 0x0f, 0x80, 0x6a, 0xf6, 0xf8, 0x60, 0x9a, 0xec, 0xc9,
0x60, 0x8f, 0x91, 0x73, 0xb3, 0x8d, 0xed, 0x8e, 0xd9, 0xb6, 0xf5, 0x3a, 0x9b, 0xb9, 0xe1, 0x09, 0x02, 0xec, 0x9c, 0x89, 0xfd, 0xc6, 0xc4, 0xba, 0xc6, 0x78, 0x78, 0xf2, 0x85, 0xbe, 0xc5, 0x78,
0x6e, 0xb2, 0x65, 0x2e, 0x46, 0xd7, 0x78, 0xd8, 0xb6, 0x74, 0xfd, 0xf8, 0x10, 0x36, 0x72, 0x37, 0xa7, 0x43, 0xd3, 0x1a, 0xe8, 0xdb, 0xec, 0xcd, 0xf2, 0xc6, 0xec, 0x62, 0xbb, 0x67, 0x76, 0x6d,
0x12, 0x5b, 0xcb, 0x6e, 0x77, 0x2c, 0x73, 0xac, 0xaf, 0x31, 0x79, 0x7c, 0xde, 0xc6, 0xbd, 0xb1, 0xbd, 0xc9, 0x76, 0xae, 0xb8, 0x83, 0x3b, 0xec, 0x98, 0xb7, 0x93, 0x4b, 0x3c, 0xee, 0x5a, 0xba,
0x5e, 0xe8, 0xfc, 0xe8, 0xab, 0xc3, 0x85, 0x4f, 0x49, 0x92, 0x34, 0xfd, 0xe8, 0x44, 0x48, 0x27, 0x7e, 0xfc, 0x0a, 0xb6, 0x0b, 0x13, 0x89, 0x9d, 0x65, 0x77, 0x7b, 0x96, 0x39, 0xd5, 0x9f, 0x31,
0xf7, 0xd1, 0xc9, 0x82, 0x9e, 0xf0, 0x87, 0xf1, 0xc9, 0xb2, 0x7d, 0x6e, 0xcb, 0x5c, 0xf3, 0xb3, 0x7a, 0xfa, 0xa6, 0x8b, 0x07, 0x53, 0xbd, 0xd4, 0xfb, 0xc9, 0x97, 0xaf, 0x96, 0x3e, 0x25, 0x69,
0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0xc1, 0x08, 0x9a, 0x3a, 0x74, 0x0f, 0x00, 0x00, 0xda, 0xf6, 0xe3, 0x13, 0x41, 0x9d, 0xdc, 0xc6, 0x27, 0x4b, 0x7a, 0xc2, 0xdf, 0xcf, 0x27, 0xeb,
0xf2, 0xb9, 0xae, 0x72, 0xce, 0x2f, 0xfe, 0x13, 0x00, 0x00, 0xff, 0xff, 0xc2, 0x82, 0x3a, 0x5d,
0x9b, 0x0f, 0x00, 0x00,
} }

Просмотреть файл

@ -290,7 +290,10 @@ func NewActionAgent(
// The db name is set by the Start function called above // The db name is set by the Start function called above
agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient { agent.VREngine = vreplication.NewEngine(ts, tabletAlias.Cell, mysqld, func() binlogplayer.DBClient {
return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB()) return binlogplayer.NewDBClient(agent.DBConfigs.FilteredWithDB())
}, agent.DBConfigs.FilteredWithDB().DbName) },
agent.DBConfigs.ExternalRepl(),
agent.DBConfigs.FilteredWithDB().DbName,
)
servenv.OnTerm(agent.VREngine.Close) servenv.OnTerm(agent.VREngine.Close)
// Run a background task to rebuild the SrvKeyspace in our cell/keyspace // Run a background task to rebuild the SrvKeyspace in our cell/keyspace
@ -357,7 +360,7 @@ func NewTestActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias *
Cnf: nil, Cnf: nil,
MysqlDaemon: mysqlDaemon, MysqlDaemon: mysqlDaemon,
DBConfigs: &dbconfigs.DBConfigs{}, DBConfigs: &dbconfigs.DBConfigs{},
VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, ti.DbName()), VREngine: vreplication.NewEngine(ts, tabletAlias.Cell, mysqlDaemon, binlogplayer.NewFakeDBClient, nil, ti.DbName()),
History: history.New(historyLength), History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"), _healthy: fmt.Errorf("healthcheck not run yet"),
} }
@ -396,7 +399,7 @@ func NewComboActionAgent(batchCtx context.Context, ts *topo.Server, tabletAlias
Cnf: nil, Cnf: nil,
MysqlDaemon: mysqlDaemon, MysqlDaemon: mysqlDaemon,
DBConfigs: dbcfgs, DBConfigs: dbcfgs,
VREngine: vreplication.NewEngine(nil, "", nil, nil, ""), VREngine: vreplication.NewEngine(nil, "", nil, nil, nil, ""),
gotMysqlPort: true, gotMysqlPort: true,
History: history.New(historyLength), History: history.New(historyLength),
_healthy: fmt.Errorf("healthcheck not run yet"), _healthy: fmt.Errorf("healthcheck not run yet"),

Просмотреть файл

@ -27,6 +27,7 @@ import (
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sync2" "vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/tb" "vitess.io/vitess/go/tb"
"vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/binlog/binlogplayer"
@ -44,6 +45,7 @@ var retryDelay = flag.Duration("vreplication_retry_delay", 5*time.Second, "delay
// either read-only or self-synchronized. // either read-only or self-synchronized.
type controller struct { type controller struct {
dbClientFactory func() binlogplayer.DBClient dbClientFactory func() binlogplayer.DBClient
sourceDbConnParams *mysql.ConnParams
mysqld mysqlctl.MysqlDaemon mysqld mysqlctl.MysqlDaemon
blpStats *binlogplayer.Stats blpStats *binlogplayer.Stats
@ -61,12 +63,13 @@ type controller struct {
// newController creates a new controller. Unless a stream is explicitly 'Stopped', // newController creates a new controller. Unless a stream is explicitly 'Stopped',
// this function launches a goroutine to perform continuous vreplication. // this function launches a goroutine to perform continuous vreplication.
func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats) (*controller, error) { func newController(ctx context.Context, params map[string]string, dbClientFactory func() binlogplayer.DBClient, sourceDbConnParams *mysql.ConnParams, mysqld mysqlctl.MysqlDaemon, ts *topo.Server, cell, tabletTypesStr string, blpStats *binlogplayer.Stats) (*controller, error) {
if blpStats == nil { if blpStats == nil {
blpStats = binlogplayer.NewStats() blpStats = binlogplayer.NewStats()
} }
ct := &controller{ ct := &controller{
dbClientFactory: dbClientFactory, dbClientFactory: dbClientFactory,
sourceDbConnParams: sourceDbConnParams,
mysqld: mysqld, mysqld: mysqld,
blpStats: blpStats, blpStats: blpStats,
done: make(chan struct{}), done: make(chan struct{}),
@ -92,6 +95,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
} }
ct.stopPos = params["stop_pos"] ct.stopPos = params["stop_pos"]
if ct.source.GetExternalMysql() == "" {
// tabletPicker // tabletPicker
if v, ok := params["cell"]; ok { if v, ok := params["cell"]; ok {
cell = v cell = v
@ -104,6 +108,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
return nil, err return nil, err
} }
ct.tabletPicker = tp ct.tabletPicker = tp
}
// cancel // cancel
ctx, ct.cancel = context.WithCancel(ctx) ctx, ct.cancel = context.WithCancel(ctx)
@ -199,7 +204,14 @@ func (ct *controller) runBlp(ctx context.Context) (err error) {
if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil { if _, err := dbClient.ExecuteFetch("set names binary", 10000); err != nil {
return err return err
} }
vsClient := NewTabletVStreamerClient(tablet)
var vsClient VStreamerClient
if ct.source.GetExternalMysql() == "" {
vsClient = NewTabletVStreamerClient(tablet)
} else {
vsClient = NewMySQLVStreamerClient(ct.sourceDbConnParams)
}
vreplicator := NewVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld) vreplicator := NewVReplicator(ct.id, &ct.source, vsClient, ct.blpStats, dbClient, ct.mysqld)
return vreplicator.Replicate(ctx) return vreplicator.Replicate(ctx)
} }

Просмотреть файл

@ -76,7 +76,7 @@ func TestControllerKeyRange(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) ct, err := newController(context.Background(), params, dbClientFactory, nil, mysqld, env.TopoServ, env.Cells[0], "replica", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -136,7 +136,7 @@ func TestControllerTables(t *testing.T) {
}, },
} }
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) ct, err := newController(context.Background(), params, dbClientFactory, nil, mysqld, env.TopoServ, env.Cells[0], "replica", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -153,7 +153,7 @@ func TestControllerBadID(t *testing.T) {
params := map[string]string{ params := map[string]string{
"id": "bad", "id": "bad",
} }
_, err := newController(context.Background(), params, nil, nil, nil, "", "", nil) _, err := newController(context.Background(), params, nil, nil, nil, nil, "", "", nil)
want := `strconv.Atoi: parsing "bad": invalid syntax` want := `strconv.Atoi: parsing "bad": invalid syntax`
if err == nil || err.Error() != want { if err == nil || err.Error() != want {
t.Errorf("newController err: %v, want %v", err, want) t.Errorf("newController err: %v, want %v", err, want)
@ -166,7 +166,7 @@ func TestControllerStopped(t *testing.T) {
"state": binlogplayer.BlpStopped, "state": binlogplayer.BlpStopped,
} }
ct, err := newController(context.Background(), params, nil, nil, nil, "", "", nil) ct, err := newController(context.Background(), params, nil, nil, nil, nil, "", "", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -203,7 +203,7 @@ func TestControllerOverrides(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil) ct, err := newController(context.Background(), params, dbClientFactory, nil, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -227,7 +227,7 @@ func TestControllerCanceledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
cancel() cancel()
ct, err := newController(ctx, params, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil) ct, err := newController(ctx, params, nil, nil, nil, env.TopoServ, env.Cells[0], "rdonly", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -269,7 +269,7 @@ func TestControllerRetry(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil) ct, err := newController(context.Background(), params, dbClientFactory, nil, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -315,7 +315,7 @@ func TestControllerStopPosition(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil) ct, err := newController(context.Background(), params, dbClientFactory, nil, mysqld, env.TopoServ, env.Cells[0], "replica", nil)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

Просмотреть файл

@ -81,18 +81,20 @@ type Engine struct {
cell string cell string
mysqld mysqlctl.MysqlDaemon mysqld mysqlctl.MysqlDaemon
dbClientFactory func() binlogplayer.DBClient dbClientFactory func() binlogplayer.DBClient
sourceDbConnParams *mysql.ConnParams
dbName string dbName string
} }
// NewEngine creates a new Engine. // NewEngine creates a new Engine.
// A nil ts means that the Engine is disabled. // A nil ts means that the Engine is disabled.
func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, dbName string) *Engine { func NewEngine(ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, dbClientFactory func() binlogplayer.DBClient, sourceDbConnParams *mysql.ConnParams, dbName string) *Engine {
vre := &Engine{ vre := &Engine{
controllers: make(map[int]*controller), controllers: make(map[int]*controller),
ts: ts, ts: ts,
cell: cell, cell: cell,
mysqld: mysqld, mysqld: mysqld,
dbClientFactory: dbClientFactory, dbClientFactory: dbClientFactory,
sourceDbConnParams: sourceDbConnParams,
dbName: dbName, dbName: dbName,
} }
return vre return vre
@ -187,7 +189,7 @@ func (vre *Engine) initAll() error {
return err return err
} }
for _, row := range rows { for _, row := range rows {
ct, err := newController(vre.ctx, row, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) ct, err := newController(vre.ctx, row, vre.dbClientFactory, vre.sourceDbConnParams, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil)
if err != nil { if err != nil {
return err return err
} }
@ -280,7 +282,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil) ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.sourceDbConnParams, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -318,7 +320,7 @@ func (vre *Engine) Exec(query string) (*sqltypes.Result, error) {
} }
// Create a new controller in place of the old one. // Create a new controller in place of the old one.
// For continuity, the new controller inherits the previous stats. // For continuity, the new controller inherits the previous stats.
ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id]) ct, err := newController(vre.ctx, params, vre.dbClientFactory, vre.sourceDbConnParams, vre.mysqld, vre.ts, vre.cell, *tabletTypesStr, blpStats[id])
if err != nil { if err != nil {
return nil, err return nil, err
} }

Просмотреть файл

@ -41,7 +41,7 @@ func TestEngineOpen(t *testing.T) {
// Test Insert // Test Insert
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
if vre.IsOpen() { if vre.IsOpen() {
t.Errorf("IsOpen: %v, want false", vre.IsOpen()) t.Errorf("IsOpen: %v, want false", vre.IsOpen())
} }
@ -89,7 +89,7 @@ func TestEngineExec(t *testing.T) {
// Test Insert // Test Insert
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil { if err := vre.Open(context.Background()); err != nil {
@ -249,7 +249,7 @@ func TestEngineBadInsert(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil { if err := vre.Open(context.Background()); err != nil {
@ -279,7 +279,7 @@ func TestEngineSelect(t *testing.T) {
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil { if err := vre.Open(context.Background()); err != nil {
@ -314,7 +314,7 @@ func TestWaitForPos(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t) dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil { if err := vre.Open(context.Background()); err != nil {
@ -344,7 +344,7 @@ func TestWaitForPosError(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t) dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084") err := vre.WaitForPos(context.Background(), 1, "MariaDB/0-1-1084")
want := `vreplication engine is closed` want := `vreplication engine is closed`
@ -386,7 +386,7 @@ func TestWaitForPosCancel(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t) dbClient := binlogplayer.NewMockDBClient(t)
mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306} mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: 3306}
dbClientFactory := func() binlogplayer.DBClient { return dbClient } dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", &sqltypes.Result{}, nil)
if err := vre.Open(context.Background()); err != nil { if err := vre.Open(context.Background()); err != nil {
@ -433,7 +433,7 @@ func TestCreateDBAndTable(t *testing.T) {
// Test Insert // Test Insert
vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClient.DBName()) vre := NewEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, nil, dbClient.DBName())
tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"} tableNotFound := mysql.SQLError{Num: 1146, Message: "table not found"}
dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound) dbClient.ExpectRequest("select * from _vt.vreplication where db_name='db'", nil, &tableNotFound)

Просмотреть файл

@ -96,7 +96,7 @@ func TestMain(m *testing.M) {
return 1 return 1
} }
playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, vrepldb) playerEngine = NewEngine(env.TopoServ, env.Cells[0], env.Mysqld, realDBClientFactory, nil, vrepldb)
if err := playerEngine.Open(context.Background()); err != nil { if err := playerEngine.Open(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "%v", err) fmt.Fprintf(os.Stderr, "%v", err)
return 1 return 1

Просмотреть файл

@ -315,8 +315,8 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
if err != nil { if err != nil {
return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev) return nil, fmt.Errorf("can't get query from binlog event: %v, event data: %#v", err, ev)
} }
// Insert/Delete/Update are supported are in here only to have support for vtshovel with // Insert/Delete/Update are supported only to be used in the context of vtshovel where source databases
// SBR streams. Vitess itself should never run into cases where it needs to consume non rbr statements. // could be using SBR. Vitess itself should never run into cases where it needs to consume non rbr statements.
switch cat := sqlparser.Preview(q.SQL); cat { switch cat := sqlparser.Preview(q.SQL); cat {
case sqlparser.StmtInsert: case sqlparser.StmtInsert:
mustSend := mustSendStmt(q, vs.cp.DbName) mustSend := mustSendStmt(q, vs.cp.DbName)

Просмотреть файл

@ -170,6 +170,10 @@ message BinlogSource {
// on_ddl specifies the action to be taken when a DDL is encountered. // on_ddl specifies the action to be taken when a DDL is encountered.
OnDDLAction on_ddl = 7; OnDDLAction on_ddl = 7;
// Source is an external mysql. This attribute should be set to the username
// to use in the connection
string external_mysql = 8;
} }
// VEventType enumerates the event types. // VEventType enumerates the event types.

Различия файлов скрыты, потому что одна или несколько строк слишком длинны