зеркало из https://github.com/github/vitess-gh.git
Handle unrecognized DDLs as non-fatal.
Stopping query service for unrecognized DDLs is too abrupt. Instead, increment the InternalErrors.Invalidation counter so we can tie that variable to an alert.
This commit is contained in:
Родитель
6893e7c70e
Коммит
50b5cf6bd2
|
@ -87,7 +87,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
|
|||
return rowCache.processEvent(reply)
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
|
||||
log.Errorf("binlog.ServeUpdateStream returned err '%v'", err.Error())
|
||||
}
|
||||
log.Infof("Rowcache invalidator stopped")
|
||||
}
|
||||
|
|
|
@ -240,10 +240,24 @@ func (sq *SqlQuery) Rollback(context *rpcproto.Context, session *proto.Session,
|
|||
return nil
|
||||
}
|
||||
|
||||
func handleInvalidationError(request interface{}) {
|
||||
if x := recover(); x != nil {
|
||||
terr, ok := x.(*TabletError)
|
||||
if !ok {
|
||||
log.Errorf("Uncaught panic for %v:\n%v\n%s", request, x, tb.Stack(4))
|
||||
internalErrors.Add("Panic", 1)
|
||||
return
|
||||
}
|
||||
log.Errorf("%s: %v", terr.Message, request)
|
||||
internalErrors.Add("Invalidation", 1)
|
||||
}
|
||||
}
|
||||
|
||||
func (sq *SqlQuery) invalidateForDml(dml *proto.DmlType) {
|
||||
if sq.state.Get() != SERVING {
|
||||
return
|
||||
}
|
||||
defer handleInvalidationError(dml)
|
||||
sq.qe.InvalidateForDml(dml)
|
||||
}
|
||||
|
||||
|
@ -251,6 +265,7 @@ func (sq *SqlQuery) invalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {
|
|||
if sq.state.Get() != SERVING {
|
||||
return
|
||||
}
|
||||
defer handleInvalidationError(ddlInvalidate)
|
||||
sq.qe.InvalidateForDDL(ddlInvalidate)
|
||||
}
|
||||
|
||||
|
|
|
@ -113,13 +113,16 @@ class RowCacheInvalidator(unittest.TestCase):
|
|||
def perform_delete(self):
|
||||
self._exec_vt_txn(['delete from vt_insert_test',])
|
||||
|
||||
def test_cache_invalidation(self):
|
||||
def _wait_for_replica(self):
|
||||
master_position = utils.mysql_query(master_tablet.tablet_uid,
|
||||
'vt_test_keyspace',
|
||||
'show master status')
|
||||
replica_tablet.mquery('vt_test_keyspace',
|
||||
"select MASTER_POS_WAIT('%s', %d)" %
|
||||
(master_position[0][0], master_position[0][1]), 5)
|
||||
|
||||
def test_cache_invalidation(self):
|
||||
self._wait_for_replica()
|
||||
invalidations = self.replica_stats()['Totals']['Invalidations']
|
||||
invalidatorStats = self.replica_vars()
|
||||
logging.debug("Invalidations %d InvalidatorStats %s" %
|
||||
|
@ -148,6 +151,24 @@ class RowCacheInvalidator(unittest.TestCase):
|
|||
self.assertEqual(stats_dict['Hits'] - hits, 1,
|
||||
"This should have hit the cache")
|
||||
|
||||
def test_invalidation_failure(self):
|
||||
start = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
|
||||
self.perform_insert(10)
|
||||
utils.mysql_write_query(master_tablet.tablet_uid,
|
||||
'vt_test_keyspace',
|
||||
"update vt_insert_test set msg = 'foo' where id = 1")
|
||||
self._wait_for_replica()
|
||||
time.sleep(1.0)
|
||||
end1 = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
|
||||
self.assertEqual(start+1, end1)
|
||||
utils.mysql_query(master_tablet.tablet_uid,
|
||||
'vt_test_keyspace',
|
||||
"truncate table vt_insert_test")
|
||||
self._wait_for_replica()
|
||||
time.sleep(1.0)
|
||||
end2 = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
|
||||
self.assertEqual(end1+1, end2)
|
||||
|
||||
def test_stop_replication(self):
|
||||
# restart the replica tablet so the stats are reset
|
||||
replica_tablet.kill_vttablet()
|
||||
|
|
Загрузка…
Ссылка в новой задаче