diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7425427ac5..60c478a078 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -72,13 +72,12 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } ct := &controller{ - vre: vre, - dbClientFactory: dbClientFactory, - mysqld: mysqld, - blpStats: blpStats, - done: make(chan struct{}), - source: &binlogdatapb.BinlogSource{}, - lastWorkflowError: newLastError("VReplication Controller", maxTimeToRetryError), + vre: vre, + dbClientFactory: dbClientFactory, + mysqld: mysqld, + blpStats: blpStats, + done: make(chan struct{}), + source: &binlogdatapb.BinlogSource{}, } log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) @@ -89,6 +88,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } ct.id = uint32(id) ct.workflow = params["workflow"] + ct.lastWorkflowError = newLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError) state := params["state"] blpStats.State.Set(state) diff --git a/go/vt/vttablet/tabletmanager/vreplication/last_error.go b/go/vt/vttablet/tabletmanager/vreplication/last_error.go index e3a73d5b06..5d51fc44f0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/last_error.go +++ b/go/vt/vttablet/tabletmanager/vreplication/last_error.go @@ -26,18 +26,20 @@ import ( /* * lastError tracks the most recent error for any ongoing process and how long it has persisted. - * The err field should be a vterror so as to ensure we have meaningful error codes, causes, stack + * The err field should be a vterror to ensure we have meaningful error codes, causes, stack * traces, etc. */ type lastError struct { name string err error firstSeen time.Time + lastSeen time.Time mu sync.Mutex maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false } func newLastError(name string, maxTimeInError time.Duration) *lastError { + log.Infof("Created last error: %s, with maxTimeInError: %s", name, maxTimeInError) return &lastError{ name: name, maxTimeInError: maxTimeInError, @@ -48,15 +50,27 @@ func (le *lastError) record(err error) { le.mu.Lock() defer le.mu.Unlock() if err == nil { + log.Infof("Resetting last error: %s", le.name) le.err = nil le.firstSeen = time.Time{} + le.lastSeen = time.Time{} return } if !vterrors.Equals(err, le.err) { + log.Infof("Got new last error %+v for %s, was %+v", err, le.name, le.err) le.firstSeen = time.Now() + le.lastSeen = time.Now() le.err = err + } else { + // same error seen + log.Infof("Got the same last error for %q: %+v ; first seen at %s and last seen %dms ago", le.name, le.err, le.firstSeen, int(time.Since(le.lastSeen).Milliseconds())) + if time.Since(le.lastSeen) > le.maxTimeInError { + // reset firstSeen, since it has been long enough since the last time we saw this error + log.Infof("Resetting firstSeen for %s, since it is too long since the last one", le.name) + le.firstSeen = time.Now() + } + le.lastSeen = time.Now() } - // The error is unchanged so we don't need to do anything } func (le *lastError) shouldRetry() bool { diff --git a/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go b/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go index 8d0e353478..08eaa67f3b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/last_error_test.go @@ -24,32 +24,63 @@ import ( "github.com/stretchr/testify/require" ) -func TestLastError(t *testing.T) { - le := newLastError("test", 100*time.Millisecond) +const shortWait = 1 * time.Millisecond +const longWait = 150 * time.Millisecond +const maxTimeInError = 100 * time.Millisecond - t.Run("long running error", func(t *testing.T) { - err1 := fmt.Errorf("test1") - le.record(err1) - require.True(t, le.shouldRetry()) - time.Sleep(150 * time.Millisecond) - require.False(t, le.shouldRetry()) - }) - - t.Run("new long running error", func(t *testing.T) { - err2 := fmt.Errorf("test2") - le.record(err2) - require.True(t, le.shouldRetry()) - for i := 1; i < 10; i++ { - le.record(err2) - } - require.True(t, le.shouldRetry()) - time.Sleep(150 * time.Millisecond) - le.record(err2) - require.False(t, le.shouldRetry()) - }) - - t.Run("no error", func(t *testing.T) { - le.record(nil) - require.True(t, le.shouldRetry()) - }) +// TestLastErrorZeroMaxTime tests maxTimeInError = 0, should always retry +func TestLastErrorZeroMaxTime(t *testing.T) { + le := newLastError("test", 0) + err1 := fmt.Errorf("error1") + le.record(err1) + require.True(t, le.shouldRetry()) + time.Sleep(shortWait) + require.True(t, le.shouldRetry()) + time.Sleep(longWait) + require.True(t, le.shouldRetry()) +} + +// TestLastErrorNoError ensures that an uninitialized lastError always retries +func TestLastErrorNoError(t *testing.T) { + le := newLastError("test", maxTimeInError) + require.True(t, le.shouldRetry()) + err1 := fmt.Errorf("error1") + le.record(err1) + require.True(t, le.shouldRetry()) + le.record(nil) + require.True(t, le.shouldRetry()) +} + +// TestLastErrorOneError validates that we retry an error if happening within the maxTimeInError, but not after +func TestLastErrorOneError(t *testing.T) { + le := newLastError("test", maxTimeInError) + err1 := fmt.Errorf("error1") + le.record(err1) + require.True(t, le.shouldRetry()) + time.Sleep(shortWait) + require.True(t, le.shouldRetry()) + time.Sleep(shortWait) + require.True(t, le.shouldRetry()) + time.Sleep(longWait) + require.False(t, le.shouldRetry()) +} + +// TestLastErrorRepeatedError confirms that if same error is repeated we don't retry +// unless it happens after maxTimeInError +func TestLastErrorRepeatedError(t *testing.T) { + le := newLastError("test", maxTimeInError) + err1 := fmt.Errorf("error1") + le.record(err1) + require.True(t, le.shouldRetry()) + for i := 1; i < 10; i++ { + le.record(err1) + time.Sleep(shortWait) + } + require.True(t, le.shouldRetry()) + + // same error happens after maxTimeInError, so it should retry + time.Sleep(longWait) + require.False(t, le.shouldRetry()) + le.record(err1) + require.True(t, le.shouldRetry()) }