зеркало из https://github.com/github/vitess-gh.git
tabletserver: tabletenv.Env, heartbeat work
Signed-off-by: Sugu Sougoumarane <ssougou@gmail.com>
This commit is contained in:
Родитель
58c7d09f9d
Коммит
d6d255ac63
|
@ -69,7 +69,8 @@ type Reader struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewReader returns a new heartbeat reader.
|
// NewReader returns a new heartbeat reader.
|
||||||
func NewReader(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *Reader {
|
func NewReader(env tabletenv.Env) *Reader {
|
||||||
|
config := env.Config()
|
||||||
if !config.HeartbeatEnable {
|
if !config.HeartbeatEnable {
|
||||||
return &Reader{}
|
return &Reader{}
|
||||||
}
|
}
|
||||||
|
@ -80,7 +81,7 @@ func NewReader(checker connpool.MySQLChecker, config tabletenv.TabletConfig) *Re
|
||||||
interval: config.HeartbeatInterval,
|
interval: config.HeartbeatInterval,
|
||||||
ticks: timer.NewTimer(config.HeartbeatInterval),
|
ticks: timer.NewTimer(config.HeartbeatInterval),
|
||||||
errorLog: logutil.NewThrottledLogger("HeartbeatReporter", 60*time.Second),
|
errorLog: logutil.NewThrottledLogger("HeartbeatReporter", 60*time.Second),
|
||||||
pool: connpool.New(config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker),
|
pool: connpool.New(config.PoolNamePrefix+"HeartbeatReadPool", 1, 0, time.Duration(config.IdleTimeout*1e9), env),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -110,7 +110,7 @@ func newReader(db *fakesqldb.DB, nowFunc func() time.Time) *Reader {
|
||||||
cp := *params
|
cp := *params
|
||||||
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
|
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
|
||||||
|
|
||||||
tr := NewReader(&fakeMysqlChecker{}, config)
|
tr := NewReader(tabletenv.NewTestEnv(&config, nil))
|
||||||
tr.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
|
tr.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
|
||||||
tr.keyspaceShard = "test:0"
|
tr.keyspaceShard = "test:0"
|
||||||
tr.now = nowFunc
|
tr.now = nowFunc
|
||||||
|
|
|
@ -73,7 +73,8 @@ type Writer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewWriter creates a new Writer.
|
// NewWriter creates a new Writer.
|
||||||
func NewWriter(checker connpool.MySQLChecker, alias topodatapb.TabletAlias, config tabletenv.TabletConfig) *Writer {
|
func NewWriter(env tabletenv.Env, alias topodatapb.TabletAlias) *Writer {
|
||||||
|
config := env.Config()
|
||||||
if !config.HeartbeatEnable {
|
if !config.HeartbeatEnable {
|
||||||
return &Writer{}
|
return &Writer{}
|
||||||
}
|
}
|
||||||
|
@ -84,7 +85,7 @@ func NewWriter(checker connpool.MySQLChecker, alias topodatapb.TabletAlias, conf
|
||||||
interval: config.HeartbeatInterval,
|
interval: config.HeartbeatInterval,
|
||||||
ticks: timer.NewTimer(config.HeartbeatInterval),
|
ticks: timer.NewTimer(config.HeartbeatInterval),
|
||||||
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
|
errorLog: logutil.NewThrottledLogger("HeartbeatWriter", 60*time.Second),
|
||||||
pool: connpool.New(config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9), checker),
|
pool: connpool.New(config.PoolNamePrefix+"HeartbeatWritePool", 1, 0, time.Duration(config.IdleTimeout*1e9), env),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -115,9 +115,7 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer {
|
||||||
cp := *params
|
cp := *params
|
||||||
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
|
dbc := dbconfigs.NewTestDBConfigs(cp, cp, "")
|
||||||
|
|
||||||
tw := NewWriter(&fakeMysqlChecker{},
|
tw := NewWriter(tabletenv.NewTestEnv(&config, nil), topodatapb.TabletAlias{Cell: "test", Uid: 1111})
|
||||||
topodatapb.TabletAlias{Cell: "test", Uid: 1111},
|
|
||||||
config)
|
|
||||||
tw.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
|
tw.dbName = sqlescape.EscapeID(dbc.SidecarDBName.Get())
|
||||||
tw.keyspaceShard = "test:0"
|
tw.keyspaceShard = "test:0"
|
||||||
tw.now = nowFunc
|
tw.now = nowFunc
|
||||||
|
@ -125,7 +123,3 @@ func newTestWriter(db *fakesqldb.DB, nowFunc func() time.Time) *Writer {
|
||||||
|
|
||||||
return tw
|
return tw
|
||||||
}
|
}
|
||||||
|
|
||||||
type fakeMysqlChecker struct{}
|
|
||||||
|
|
||||||
func (f fakeMysqlChecker) CheckMySQL() {}
|
|
||||||
|
|
|
@ -224,8 +224,8 @@ func NewTabletServer(config tabletenv.TabletConfig, topoServer *topo.Server, ali
|
||||||
tsv.se = schema.NewEngine(tsv)
|
tsv.se = schema.NewEngine(tsv)
|
||||||
tsv.qe = NewQueryEngine(tsv, tsv.se)
|
tsv.qe = NewQueryEngine(tsv, tsv.se)
|
||||||
tsv.te = NewTxEngine(tsv)
|
tsv.te = NewTxEngine(tsv)
|
||||||
tsv.hw = heartbeat.NewWriter(tsv, alias, config)
|
tsv.hw = heartbeat.NewWriter(tsv, alias)
|
||||||
tsv.hr = heartbeat.NewReader(tsv, config)
|
tsv.hr = heartbeat.NewReader(tsv)
|
||||||
tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer)
|
tsv.txThrottler = txthrottler.CreateTxThrottlerFromTabletConfig(topoServer)
|
||||||
// FIXME(alainjobart) could we move this to the Register method below?
|
// FIXME(alainjobart) could we move this to the Register method below?
|
||||||
// So that vtcombo doesn't even call it once, on the first tablet.
|
// So that vtcombo doesn't even call it once, on the first tablet.
|
||||||
|
|
Загрузка…
Ссылка в новой задаче