From e9955bf3dda339e34e191ff55ddf9c945d08774a Mon Sep 17 00:00:00 2001 From: Shruti Patil Date: Fri, 12 Jul 2013 00:14:08 -0700 Subject: [PATCH] Simplified error types for invalidator. --- go/vt/tabletserver/rowcache_invalidator.go | 30 +++++++++------------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/go/vt/tabletserver/rowcache_invalidator.go b/go/vt/tabletserver/rowcache_invalidator.go index 4e29f3e772..536fee71c8 100644 --- a/go/vt/tabletserver/rowcache_invalidator.go +++ b/go/vt/tabletserver/rowcache_invalidator.go @@ -29,8 +29,7 @@ const ( // Error types for rowcache invalidator. const ( // Fatal Errors - CODE_ERROR = "Code Error" - SERVICE_ERROR = "Service Error" + FATAL_ERROR = "Fatal Error" // Skippable errors, recorded and skipped. INVALID_EVENT = "Invalid Event" @@ -190,18 +189,14 @@ func (rowCache *InvalidationProcessor) updateErrCounters(err *InvalidationError) func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) error { if !shouldInvalidatorRun() || !rowCache.isServiceEnabled() { - return NewInvalidationError(SERVICE_ERROR, "Rowcache invalidator is not available", "") + return NewInvalidationError(FATAL_ERROR, "Rowcache invalidator is not available", "") } updateResponse, ok := response.(*mysqlctl.UpdateResponse) if !ok { - return NewInvalidationError(CODE_ERROR, "Invalid Reponse type", "") + return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "") } rowCache.currentPosition = &updateResponse.BinlogPosition - err := rowCache.processEvent(updateResponse) - if err != nil { - return err - } - return nil + return rowCache.processEvent(updateResponse) } func (rowCache *InvalidationProcessor) getCheckpoint() (*mysqlctl.BinlogPosition, bool) { @@ -243,7 +238,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() { replPos, err := mysqlctl.GetReplicationPosition() if err != nil { - rErr := NewInvalidationError(SERVICE_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "") + rErr := NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Cannot determine replication position %v", err), "") rowCache.updateErrCounters(rErr) rowCache.stopCache(rErr.Error()) return @@ -278,9 +273,8 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() { req := &mysqlctl.UpdateStreamRequest{StartPosition: *startPosition} err = mysqlctl.ServeUpdateStream(req, rowCache.receiveEvent) if err != nil { - rErr, ok := err.(*InvalidationError) - if ok && rErr.isFatal() { - relog.Error("Fatal Error: '%v'", rErr) + relog.Error("mysqlctl.ServeUpdateStream returned err '%v'", err.Error()) + if rErr, ok := err.(*InvalidationError); ok { rowCache.updateErrCounters(rErr) } rowCache.stopCache(fmt.Sprintf("Unexpected or fatal error, '%v'", err.Error())) @@ -308,7 +302,7 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon // Check if update stream error is fatal, else record it and move on. if strings.HasPrefix(event.Error, mysqlctl.FATAL) { relog.Info("Returning Service Error") - return NewInvalidationError(SERVICE_ERROR, event.Error, position) + return NewInvalidationError(FATAL_ERROR, event.Error, position) } rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, event.Error, position)) return nil @@ -404,7 +398,7 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes if terr, ok := x.(*TabletError); ok { rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.BinlogPosition.String())) } else { - err = NewInvalidationError(SERVICE_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String()) + err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String()) } } }() @@ -416,7 +410,7 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes cacheInvalidate := new(proto.CacheInvalidate) rowCache.encBuf, err = bson.Marshal(commitEvent.BinlogPosition) if err != nil { - return NewInvalidationError(CODE_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String()) + return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String()) } cacheInvalidate.Position = rowCache.encBuf cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer)) @@ -434,7 +428,7 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR if terr, ok := x.(*TabletError); ok { rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.BinlogPosition.String())) } else { - err = NewInvalidationError(SERVICE_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String()) + err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String()) } } }() @@ -448,7 +442,7 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR ddlInvalidate := new(proto.DDLInvalidate) rowCache.encBuf, err = bson.Marshal(ddlEvent.BinlogPosition) if err != nil { - return NewInvalidationError(CODE_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String()) + return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String()) } ddlInvalidate.Position = rowCache.encBuf ddlInvalidate.DDL = ddlEvent.Sql