зеркало из https://github.com/github/vitess-gh.git
* Retry error if it happens after timeout Signed-off-by: Rohit Nayak <rohit@planetscale.com> * Address review comments Signed-off-by: Rohit Nayak <rohit@planetscale.com> * Address review comments Signed-off-by: Rohit Nayak <rohit@planetscale.com> * Improve unit tests Signed-off-by: Rohit Nayak <rohit@planetscale.com> --------- Signed-off-by: Rohit Nayak <rohit@planetscale.com> Co-authored-by: Rohit Nayak <rohit@planetscale.com>
This commit is contained in:
Родитель
b632186a2d
Коммит
6998097494
|
@ -72,13 +72,12 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
|
|||
}
|
||||
|
||||
ct := &controller{
|
||||
vre: vre,
|
||||
dbClientFactory: dbClientFactory,
|
||||
mysqld: mysqld,
|
||||
blpStats: blpStats,
|
||||
done: make(chan struct{}),
|
||||
source: &binlogdatapb.BinlogSource{},
|
||||
lastWorkflowError: newLastError("VReplication Controller", maxTimeToRetryError),
|
||||
vre: vre,
|
||||
dbClientFactory: dbClientFactory,
|
||||
mysqld: mysqld,
|
||||
blpStats: blpStats,
|
||||
done: make(chan struct{}),
|
||||
source: &binlogdatapb.BinlogSource{},
|
||||
}
|
||||
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)
|
||||
|
||||
|
@ -89,6 +88,7 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
|
|||
}
|
||||
ct.id = uint32(id)
|
||||
ct.workflow = params["workflow"]
|
||||
ct.lastWorkflowError = newLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)
|
||||
|
||||
state := params["state"]
|
||||
blpStats.State.Set(state)
|
||||
|
|
|
@ -26,18 +26,20 @@ import (
|
|||
|
||||
/*
|
||||
* lastError tracks the most recent error for any ongoing process and how long it has persisted.
|
||||
* The err field should be a vterror so as to ensure we have meaningful error codes, causes, stack
|
||||
* The err field should be a vterror to ensure we have meaningful error codes, causes, stack
|
||||
* traces, etc.
|
||||
*/
|
||||
type lastError struct {
|
||||
name string
|
||||
err error
|
||||
firstSeen time.Time
|
||||
lastSeen time.Time
|
||||
mu sync.Mutex
|
||||
maxTimeInError time.Duration // if error persists for this long, shouldRetry() will return false
|
||||
}
|
||||
|
||||
func newLastError(name string, maxTimeInError time.Duration) *lastError {
|
||||
log.Infof("Created last error: %s, with maxTimeInError: %s", name, maxTimeInError)
|
||||
return &lastError{
|
||||
name: name,
|
||||
maxTimeInError: maxTimeInError,
|
||||
|
@ -48,15 +50,27 @@ func (le *lastError) record(err error) {
|
|||
le.mu.Lock()
|
||||
defer le.mu.Unlock()
|
||||
if err == nil {
|
||||
log.Infof("Resetting last error: %s", le.name)
|
||||
le.err = nil
|
||||
le.firstSeen = time.Time{}
|
||||
le.lastSeen = time.Time{}
|
||||
return
|
||||
}
|
||||
if !vterrors.Equals(err, le.err) {
|
||||
log.Infof("Got new last error %+v for %s, was %+v", err, le.name, le.err)
|
||||
le.firstSeen = time.Now()
|
||||
le.lastSeen = time.Now()
|
||||
le.err = err
|
||||
} else {
|
||||
// same error seen
|
||||
log.Infof("Got the same last error for %q: %+v ; first seen at %s and last seen %dms ago", le.name, le.err, le.firstSeen, int(time.Since(le.lastSeen).Milliseconds()))
|
||||
if time.Since(le.lastSeen) > le.maxTimeInError {
|
||||
// reset firstSeen, since it has been long enough since the last time we saw this error
|
||||
log.Infof("Resetting firstSeen for %s, since it is too long since the last one", le.name)
|
||||
le.firstSeen = time.Now()
|
||||
}
|
||||
le.lastSeen = time.Now()
|
||||
}
|
||||
// The error is unchanged so we don't need to do anything
|
||||
}
|
||||
|
||||
func (le *lastError) shouldRetry() bool {
|
||||
|
|
|
@ -24,32 +24,63 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLastError(t *testing.T) {
|
||||
le := newLastError("test", 100*time.Millisecond)
|
||||
const shortWait = 1 * time.Millisecond
|
||||
const longWait = 150 * time.Millisecond
|
||||
const maxTimeInError = 100 * time.Millisecond
|
||||
|
||||
t.Run("long running error", func(t *testing.T) {
|
||||
err1 := fmt.Errorf("test1")
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
require.False(t, le.shouldRetry())
|
||||
})
|
||||
|
||||
t.Run("new long running error", func(t *testing.T) {
|
||||
err2 := fmt.Errorf("test2")
|
||||
le.record(err2)
|
||||
require.True(t, le.shouldRetry())
|
||||
for i := 1; i < 10; i++ {
|
||||
le.record(err2)
|
||||
}
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
le.record(err2)
|
||||
require.False(t, le.shouldRetry())
|
||||
})
|
||||
|
||||
t.Run("no error", func(t *testing.T) {
|
||||
le.record(nil)
|
||||
require.True(t, le.shouldRetry())
|
||||
})
|
||||
// TestLastErrorZeroMaxTime tests maxTimeInError = 0, should always retry
|
||||
func TestLastErrorZeroMaxTime(t *testing.T) {
|
||||
le := newLastError("test", 0)
|
||||
err1 := fmt.Errorf("error1")
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(shortWait)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(longWait)
|
||||
require.True(t, le.shouldRetry())
|
||||
}
|
||||
|
||||
// TestLastErrorNoError ensures that an uninitialized lastError always retries
|
||||
func TestLastErrorNoError(t *testing.T) {
|
||||
le := newLastError("test", maxTimeInError)
|
||||
require.True(t, le.shouldRetry())
|
||||
err1 := fmt.Errorf("error1")
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
le.record(nil)
|
||||
require.True(t, le.shouldRetry())
|
||||
}
|
||||
|
||||
// TestLastErrorOneError validates that we retry an error if happening within the maxTimeInError, but not after
|
||||
func TestLastErrorOneError(t *testing.T) {
|
||||
le := newLastError("test", maxTimeInError)
|
||||
err1 := fmt.Errorf("error1")
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(shortWait)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(shortWait)
|
||||
require.True(t, le.shouldRetry())
|
||||
time.Sleep(longWait)
|
||||
require.False(t, le.shouldRetry())
|
||||
}
|
||||
|
||||
// TestLastErrorRepeatedError confirms that if same error is repeated we don't retry
|
||||
// unless it happens after maxTimeInError
|
||||
func TestLastErrorRepeatedError(t *testing.T) {
|
||||
le := newLastError("test", maxTimeInError)
|
||||
err1 := fmt.Errorf("error1")
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
for i := 1; i < 10; i++ {
|
||||
le.record(err1)
|
||||
time.Sleep(shortWait)
|
||||
}
|
||||
require.True(t, le.shouldRetry())
|
||||
|
||||
// same error happens after maxTimeInError, so it should retry
|
||||
time.Sleep(longWait)
|
||||
require.False(t, le.shouldRetry())
|
||||
le.record(err1)
|
||||
require.True(t, le.shouldRetry())
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче