зеркало из https://github.com/github/vitess-gh.git
Merge pull request #2039 from alainjobart/eventtoken
Adding vttablet background binlog watcher.
This commit is contained in:
Коммит
cd8afc0f13
|
@ -131,8 +131,10 @@ func (bls *Streamer) Stream(ctx context.Context) (err error) {
|
|||
var events <-chan replication.BinlogEvent
|
||||
if bls.timestamp != 0 {
|
||||
events, err = bls.conn.StartBinlogDumpFromTimestamp(ctx, bls.timestamp)
|
||||
} else {
|
||||
} else if !bls.startPos.IsZero() {
|
||||
events, err = bls.conn.StartBinlogDumpFromPosition(ctx, bls.startPos)
|
||||
} else {
|
||||
bls.startPos, events, err = bls.conn.StartBinlogDumpFromCurrent(ctx)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -269,7 +269,15 @@ func FindSlaves(mysqld MysqlDaemon) ([]string, error) {
|
|||
for _, row := range qr.Rows {
|
||||
// Check for prefix, since it could be "Binlog Dump GTID".
|
||||
if strings.HasPrefix(row[colCommand].String(), binlogDumpCommand) {
|
||||
host, _, err := netutil.SplitHostPort(row[colClientAddr].String())
|
||||
host := row[colClientAddr].String()
|
||||
if host == "localhost" {
|
||||
// If we have a local binlog streamer, it will
|
||||
// show up as being connected
|
||||
// from 'localhost' through the local
|
||||
// socket. Ignore it.
|
||||
continue
|
||||
}
|
||||
host, _, err = netutil.SplitHostPort(host)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("FindSlaves: malformed addr %v", err)
|
||||
}
|
||||
|
|
|
@ -84,6 +84,25 @@ func (mysqld *Mysqld) connectForReplication() (sqldb.Conn, error) {
|
|||
// slaveIDPool is the IDPool for server IDs used to connect as a slave.
|
||||
var slaveIDPool = pools.NewIDPool()
|
||||
|
||||
// StartBinlogDumpFromCurrent requests a replication binlog dump from
|
||||
// the current position.
|
||||
func (sc *SlaveConnection) StartBinlogDumpFromCurrent(ctx context.Context) (replication.Position, <-chan replication.BinlogEvent, error) {
|
||||
ctx, sc.cancel = context.WithCancel(ctx)
|
||||
|
||||
flavor, err := sc.mysqld.flavor()
|
||||
if err != nil {
|
||||
return replication.Position{}, nil, fmt.Errorf("StartBinlogDump needs flavor: %v", err)
|
||||
}
|
||||
|
||||
masterPosition, err := flavor.MasterPosition(sc.mysqld)
|
||||
if err != nil {
|
||||
return replication.Position{}, nil, fmt.Errorf("failed to get master position: %v", err)
|
||||
}
|
||||
|
||||
c, err := sc.StartBinlogDumpFromPosition(ctx, masterPosition)
|
||||
return masterPosition, c, err
|
||||
}
|
||||
|
||||
// StartBinlogDumpFromPosition requests a replication binlog dump from
|
||||
// the master mysqld at the given Position and then sends binlog
|
||||
// events to the provided channel.
|
||||
|
|
|
@ -1164,8 +1164,9 @@ func (m *StreamHealthResponse) GetRealtimeStats() *RealtimeStats {
|
|||
return nil
|
||||
}
|
||||
|
||||
// UpdateStreamRequest is the payload for UpdateStream. Only one of
|
||||
// position and timestamp can be set.
|
||||
// UpdateStreamRequest is the payload for UpdateStream. At most one of
|
||||
// position and timestamp can be set. If neither is set, we will start
|
||||
// streaming from the current binlog position.
|
||||
type UpdateStreamRequest struct {
|
||||
EffectiveCallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=effective_caller_id,json=effectiveCallerId" json:"effective_caller_id,omitempty"`
|
||||
ImmediateCallerId *VTGateCallerID `protobuf:"bytes,2,opt,name=immediate_caller_id,json=immediateCallerId" json:"immediate_caller_id,omitempty"`
|
||||
|
|
|
@ -21,8 +21,9 @@ import (
|
|||
)
|
||||
|
||||
var (
|
||||
queryLogHandler = flag.String("query-log-stream-handler", "/debug/querylog", "URL handler for streaming queries log")
|
||||
txLogHandler = flag.String("transaction-log-stream-handler", "/debug/txlog", "URL handler for streaming transactions log")
|
||||
queryLogHandler = flag.String("query-log-stream-handler", "/debug/querylog", "URL handler for streaming queries log")
|
||||
txLogHandler = flag.String("transaction-log-stream-handler", "/debug/txlog", "URL handler for streaming transactions log")
|
||||
watchReplicationStream = flag.Bool("watch_replication_stream", false, "When enabled, vttablet will stream the MySQL replication stream from the local server, and use it to support the include_event_token ExecuteOptions.")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
|
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/youtube/vitess/go/vt/utils"
|
||||
"golang.org/x/net/context"
|
||||
|
||||
binlogdatapb "github.com/youtube/vitess/go/vt/proto/binlogdata"
|
||||
querypb "github.com/youtube/vitess/go/vt/proto/query"
|
||||
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
|
||||
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
|
||||
|
@ -119,6 +120,13 @@ type TabletServer struct {
|
|||
// history records changes in state for display on the status page.
|
||||
// It has its own internal mutex.
|
||||
history *history.History
|
||||
|
||||
// UpdateStream helpers.
|
||||
updateStreamCancel context.CancelFunc
|
||||
|
||||
// eventTokenMutex protects the current EventToken
|
||||
eventTokenMutex sync.RWMutex
|
||||
eventToken *querypb.EventToken
|
||||
}
|
||||
|
||||
// RegisterFunction is a callback type to be called when we
|
||||
|
@ -159,6 +167,22 @@ func NewTabletServer(config Config) *TabletServer {
|
|||
stats.Publish(config.StatsPrefix+"QueryTimeout", stats.DurationFunc(tsv.QueryTimeout.Get))
|
||||
stats.Publish(config.StatsPrefix+"BeginTimeout", stats.DurationFunc(tsv.BeginTimeout.Get))
|
||||
stats.Publish(config.StatsPrefix+"TabletStateName", stats.StringFunc(tsv.GetState))
|
||||
stats.Publish(config.StatsPrefix+"EventTokenPosition", stats.StringFunc(func() string {
|
||||
tsv.eventTokenMutex.RLock()
|
||||
defer tsv.eventTokenMutex.RUnlock()
|
||||
if tsv.eventToken != nil {
|
||||
return tsv.eventToken.Position
|
||||
}
|
||||
return ""
|
||||
}))
|
||||
stats.Publish(config.StatsPrefix+"EventTokenTimestamp", stats.IntFunc(func() int64 {
|
||||
tsv.eventTokenMutex.RLock()
|
||||
defer tsv.eventTokenMutex.RUnlock()
|
||||
if tsv.eventToken != nil {
|
||||
return tsv.eventToken.Timestamp
|
||||
}
|
||||
return 0
|
||||
}))
|
||||
}
|
||||
return tsv
|
||||
}
|
||||
|
@ -352,6 +376,7 @@ func (tsv *TabletServer) fullStart() (err error) {
|
|||
log.Errorf("Could not start tabletserver: %v", x)
|
||||
tsv.qe.Close()
|
||||
tsv.updateStreamList.Stop()
|
||||
tsv.stopReplicationStreamer()
|
||||
tsv.transition(StateNotConnected)
|
||||
err = x.(error)
|
||||
}
|
||||
|
@ -374,10 +399,14 @@ func (tsv *TabletServer) serveNewType() (err error) {
|
|||
log.Errorf("Could not start tabletserver: %v", x)
|
||||
tsv.qe.Close()
|
||||
tsv.updateStreamList.Stop()
|
||||
tsv.stopReplicationStreamer()
|
||||
tsv.transition(StateNotConnected)
|
||||
err = x.(error)
|
||||
}
|
||||
}()
|
||||
if tsv.target.TabletType != topodatapb.TabletType_MASTER {
|
||||
tsv.startReplicationStreamer()
|
||||
}
|
||||
tsv.transition(StateServing)
|
||||
return nil
|
||||
}
|
||||
|
@ -420,6 +449,7 @@ func (tsv *TabletServer) waitForShutdown() {
|
|||
tsv.qe.WaitForTxEmpty()
|
||||
tsv.qe.streamQList.TerminateAll()
|
||||
tsv.updateStreamList.Stop()
|
||||
tsv.stopReplicationStreamer()
|
||||
tsv.requests.Wait()
|
||||
}
|
||||
|
||||
|
@ -521,11 +551,68 @@ func (tsv *TabletServer) QueryService() queryservice.QueryService {
|
|||
return tsv
|
||||
}
|
||||
|
||||
// QueryServiceStats returns the QueryServiceStats instance of the TabletServer's QueryEngine.
|
||||
// QueryServiceStats returns the QueryServiceStats instance of the
|
||||
// TabletServer's QueryEngine.
|
||||
func (tsv *TabletServer) QueryServiceStats() *QueryServiceStats {
|
||||
return tsv.qe.queryServiceStats
|
||||
}
|
||||
|
||||
func (tsv *TabletServer) startReplicationStreamer() {
|
||||
if !*watchReplicationStream {
|
||||
return
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
tsv.updateStreamCancel = cancel
|
||||
go tsv.replicationStreamer(ctx)
|
||||
}
|
||||
|
||||
func (tsv *TabletServer) stopReplicationStreamer() {
|
||||
if tsv.updateStreamCancel != nil {
|
||||
tsv.updateStreamCancel()
|
||||
tsv.updateStreamCancel = nil
|
||||
}
|
||||
}
|
||||
|
||||
// replicationStreamer is the background thread that reads the
|
||||
// replication stream. It will run in a loop. If it errors out, it
|
||||
// will wait for 5 seconds before restarting.
|
||||
func (tsv *TabletServer) replicationStreamer(ctx context.Context) {
|
||||
for {
|
||||
log.Infof("Starting a binlog Streamer from current replication position to monitor binlogs")
|
||||
streamer := binlog.NewStreamer(tsv.dbconfigs.App.DbName, tsv.mysqld, nil /*clientCharset*/, replication.Position{}, 0 /*timestamp*/, func(trans *binlogdatapb.BinlogTransaction) error {
|
||||
// Save the event token.
|
||||
tsv.eventTokenMutex.Lock()
|
||||
tsv.eventToken = trans.EventToken
|
||||
tsv.eventTokenMutex.Unlock()
|
||||
|
||||
// If it's a DDL, trigger a schema reload.
|
||||
isDDL := false
|
||||
for _, statement := range trans.Statements {
|
||||
if statement.Category == binlogdatapb.BinlogTransaction_Statement_BL_DDL {
|
||||
isDDL = true
|
||||
}
|
||||
}
|
||||
if isDDL {
|
||||
err := tsv.ReloadSchema(ctx)
|
||||
log.Infof("Streamer triggered a schema reload, with result: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := streamer.Stream(ctx); err != nil {
|
||||
log.Infof("Streamer stopped: %v", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-time.After(5 * time.Second):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Begin starts a new transaction. This is allowed only if the state is StateServing.
|
||||
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target) (transactionID int64, err error) {
|
||||
logStats := newLogStats("Begin", ctx)
|
||||
|
@ -1116,12 +1203,14 @@ func (tsv *TabletServer) UpdateStream(ctx context.Context, target *querypb.Targe
|
|||
var p replication.Position
|
||||
var err error
|
||||
if timestamp == 0 {
|
||||
p, err = replication.DecodePosition(position)
|
||||
if err != nil {
|
||||
return NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "cannot parse position: %v", err)
|
||||
if position != "" {
|
||||
p, err = replication.DecodePosition(position)
|
||||
if err != nil {
|
||||
return NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "cannot parse position: %v", err)
|
||||
}
|
||||
}
|
||||
} else if position != "" {
|
||||
return NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "only one of position and timestamp should be specified")
|
||||
return NewTabletError(vtrpcpb.ErrorCode_BAD_INPUT, "at most one of position and timestamp should be specified")
|
||||
}
|
||||
|
||||
// Validate proper target is used.
|
||||
|
|
|
@ -468,8 +468,9 @@ message StreamHealthResponse {
|
|||
RealtimeStats realtime_stats = 4;
|
||||
}
|
||||
|
||||
// UpdateStreamRequest is the payload for UpdateStream. Only one of
|
||||
// position and timestamp can be set.
|
||||
// UpdateStreamRequest is the payload for UpdateStream. At most one of
|
||||
// position and timestamp can be set. If neither is set, we will start
|
||||
// streaming from the current binlog position.
|
||||
message UpdateStreamRequest {
|
||||
vtrpc.CallerID effective_caller_id = 1;
|
||||
VTGateCallerID immediate_caller_id = 2;
|
||||
|
|
|
@ -301,7 +301,8 @@ class Tablet(object):
|
|||
rows = self.mquery('', 'show databases')
|
||||
for row in rows:
|
||||
dbname = row[0]
|
||||
if dbname in ['information_schema', 'performance_schema', 'mysql', 'sys', '_vt']:
|
||||
if dbname in ['information_schema', 'performance_schema', 'mysql', 'sys',
|
||||
'_vt']:
|
||||
continue
|
||||
self.drop_db(dbname)
|
||||
|
||||
|
@ -427,6 +428,7 @@ class Tablet(object):
|
|||
args.extend(['-health_check_interval', '2s'])
|
||||
args.extend(['-enable_replication_reporter'])
|
||||
args.extend(['-degraded_threshold', '5s'])
|
||||
args.extend(['-watch_replication_stream'])
|
||||
if enable_semi_sync:
|
||||
args.append('-enable_semi_sync')
|
||||
if self.use_mysqlctld:
|
||||
|
|
|
@ -347,6 +347,28 @@ class TestUpdateStream(unittest.TestCase):
|
|||
self.fail("Update stream service should be 'Enabled' but is '%s'" %
|
||||
v['UpdateStreamState'])
|
||||
|
||||
def test_event_token(self):
|
||||
"""Checks the background binlog monitor thread works."""
|
||||
replica_position = _get_repl_current_position()
|
||||
timeout = 10
|
||||
while True:
|
||||
value = None
|
||||
v = utils.get_vars(replica_tablet.port)
|
||||
if 'EventTokenPosition' in v:
|
||||
value = v['EventTokenPosition']
|
||||
if value == replica_position:
|
||||
logging.debug('got expected EventTokenPosition vars: %s', value)
|
||||
ts = v['EventTokenTimestamp']
|
||||
now = long(time.time())
|
||||
self.assertTrue(ts >= now - 120,
|
||||
'EventTokenTimestamp is too old: %d < %d' %
|
||||
(ts, now-120))
|
||||
self.assertTrue(ts <= now,
|
||||
'EventTokenTimestamp is too recent: %d > %d' %(ts, now))
|
||||
break
|
||||
timeout = utils.wait_step(
|
||||
'EventTokenPosition must be up to date but got %s' % value, timeout)
|
||||
|
||||
def test_update_stream_interrupt(self):
|
||||
"""Checks that a running query is terminated on going non-serving."""
|
||||
# Make sure the replica is replica type.
|
||||
|
|
Загрузка…
Ссылка в новой задаче