This commit is contained in:
Sugu Sougoumarane 2014-01-27 15:44:06 -08:00
Родитель 1022febc83 50b5cf6bd2
Коммит 41f5435e17
3 изменённых файлов: 38 добавлений и 2 удалений

Просмотреть файл

@ -87,7 +87,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
return rowCache.processEvent(reply)
})
if err != nil {
log.Errorf("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
log.Errorf("binlog.ServeUpdateStream returned err '%v'", err.Error())
}
log.Infof("Rowcache invalidator stopped")
}

Просмотреть файл

@ -240,10 +240,24 @@ func (sq *SqlQuery) Rollback(context *rpcproto.Context, session *proto.Session,
return nil
}
func handleInvalidationError(request interface{}) {
if x := recover(); x != nil {
terr, ok := x.(*TabletError)
if !ok {
log.Errorf("Uncaught panic for %v:\n%v\n%s", request, x, tb.Stack(4))
internalErrors.Add("Panic", 1)
return
}
log.Errorf("%s: %v", terr.Message, request)
internalErrors.Add("Invalidation", 1)
}
}
func (sq *SqlQuery) invalidateForDml(dml *proto.DmlType) {
if sq.state.Get() != SERVING {
return
}
defer handleInvalidationError(dml)
sq.qe.InvalidateForDml(dml)
}
@ -251,6 +265,7 @@ func (sq *SqlQuery) invalidateForDDL(ddlInvalidate *proto.DDLInvalidate) {
if sq.state.Get() != SERVING {
return
}
defer handleInvalidationError(ddlInvalidate)
sq.qe.InvalidateForDDL(ddlInvalidate)
}

Просмотреть файл

@ -113,13 +113,16 @@ class RowCacheInvalidator(unittest.TestCase):
def perform_delete(self):
self._exec_vt_txn(['delete from vt_insert_test',])
def test_cache_invalidation(self):
def _wait_for_replica(self):
master_position = utils.mysql_query(master_tablet.tablet_uid,
'vt_test_keyspace',
'show master status')
replica_tablet.mquery('vt_test_keyspace',
"select MASTER_POS_WAIT('%s', %d)" %
(master_position[0][0], master_position[0][1]), 5)
def test_cache_invalidation(self):
self._wait_for_replica()
invalidations = self.replica_stats()['Totals']['Invalidations']
invalidatorStats = self.replica_vars()
logging.debug("Invalidations %d InvalidatorStats %s" %
@ -148,6 +151,24 @@ class RowCacheInvalidator(unittest.TestCase):
self.assertEqual(stats_dict['Hits'] - hits, 1,
"This should have hit the cache")
def test_invalidation_failure(self):
start = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
self.perform_insert(10)
utils.mysql_write_query(master_tablet.tablet_uid,
'vt_test_keyspace',
"update vt_insert_test set msg = 'foo' where id = 1")
self._wait_for_replica()
time.sleep(1.0)
end1 = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
self.assertEqual(start+1, end1)
utils.mysql_query(master_tablet.tablet_uid,
'vt_test_keyspace',
"truncate table vt_insert_test")
self._wait_for_replica()
time.sleep(1.0)
end2 = self.replica_vars()['InternalErrors'].get('Invalidation', 0)
self.assertEqual(end1+1, end2)
def test_stop_replication(self):
# restart the replica tablet so the stats are reset
replica_tablet.kill_vttablet()