Simplified error types for invalidator.

This commit is contained in:
Shruti Patil 2013-07-12 00:14:08 -07:00
Родитель bc1ccfac22
Коммит e9955bf3dd
1 изменённых файлов: 12 добавлений и 18 удалений

Просмотреть файл

@ -29,8 +29,7 @@ const (
// Error types for rowcache invalidator.
const (
// Fatal Errors
CODE_ERROR = "Code Error"
SERVICE_ERROR = "Service Error"
FATAL_ERROR = "Fatal Error"
// Skippable errors, recorded and skipped.
INVALID_EVENT = "Invalid Event"
@ -190,18 +189,14 @@ func (rowCache *InvalidationProcessor) updateErrCounters(err *InvalidationError)
func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) error {
if !shouldInvalidatorRun() || !rowCache.isServiceEnabled() {
return NewInvalidationError(SERVICE_ERROR, "Rowcache invalidator is not available", "")
return NewInvalidationError(FATAL_ERROR, "Rowcache invalidator is not available", "")
}
updateResponse, ok := response.(*mysqlctl.UpdateResponse)
if !ok {
return NewInvalidationError(CODE_ERROR, "Invalid Reponse type", "")
return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "")
}
rowCache.currentPosition = &updateResponse.BinlogPosition
err := rowCache.processEvent(updateResponse)
if err != nil {
return err
}
return nil
return rowCache.processEvent(updateResponse)
}
func (rowCache *InvalidationProcessor) getCheckpoint() (*mysqlctl.BinlogPosition, bool) {
@ -243,7 +238,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
replPos, err := mysqlctl.GetReplicationPosition()
if err != nil {
rErr := NewInvalidationError(SERVICE_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "")
rErr := NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "")
rowCache.updateErrCounters(rErr)
rowCache.stopCache(rErr.Error())
return
@ -278,9 +273,8 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
req := &mysqlctl.UpdateStreamRequest{StartPosition: *startPosition}
err = mysqlctl.ServeUpdateStream(req, rowCache.receiveEvent)
if err != nil {
rErr, ok := err.(*InvalidationError)
if ok && rErr.isFatal() {
relog.Error("Fatal Error: '%v'", rErr)
relog.Error("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
if rErr, ok := err.(*InvalidationError); ok {
rowCache.updateErrCounters(rErr)
}
rowCache.stopCache(fmt.Sprintf("Unexpected or fatal error, '%v'", err.Error()))
@ -308,7 +302,7 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon
// Check if update stream error is fatal, else record it and move on.
if strings.HasPrefix(event.Error, mysqlctl.FATAL) {
relog.Info("Returning Service Error")
return NewInvalidationError(SERVICE_ERROR, event.Error, position)
return NewInvalidationError(FATAL_ERROR, event.Error, position)
}
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, event.Error, position))
return nil
@ -404,7 +398,7 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes
if terr, ok := x.(*TabletError); ok {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.BinlogPosition.String()))
} else {
err = NewInvalidationError(SERVICE_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String())
err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String())
}
}
}()
@ -416,7 +410,7 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes
cacheInvalidate := new(proto.CacheInvalidate)
rowCache.encBuf, err = bson.Marshal(commitEvent.BinlogPosition)
if err != nil {
return NewInvalidationError(CODE_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String())
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String())
}
cacheInvalidate.Position = rowCache.encBuf
cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer))
@ -434,7 +428,7 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR
if terr, ok := x.(*TabletError); ok {
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.BinlogPosition.String()))
} else {
err = NewInvalidationError(SERVICE_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String())
err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String())
}
}
}()
@ -448,7 +442,7 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR
ddlInvalidate := new(proto.DDLInvalidate)
rowCache.encBuf, err = bson.Marshal(ddlEvent.BinlogPosition)
if err != nil {
return NewInvalidationError(CODE_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String())
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String())
}
ddlInvalidate.Position = rowCache.encBuf
ddlInvalidate.DDL = ddlEvent.Sql