Merge pull request #962 from michael-berlin/fix_racy_test

Fix data race in vtgate unit test.
This commit is contained in:
Michael Berlin 2015-08-05 12:04:11 -07:00
Родитель 953859e44c f2c527a2a6
Коммит 883b9313bc
12 изменённых файлов: 240 добавлений и 197 удалений

Просмотреть файл

@ -65,8 +65,8 @@ func NewResourcePool(factory Factory, capacity, maxCap int, idleTimeout time.Dur
rp := &ResourcePool{
resources: make(chan resourceWrapper, maxCap),
factory: factory,
capacity: sync2.AtomicInt64(capacity),
idleTimeout: sync2.AtomicDuration(idleTimeout),
capacity: sync2.NewAtomicInt64(int64(capacity)),
idleTimeout: sync2.NewAtomicDuration(idleTimeout),
}
for i := 0; i < capacity; i++ {
rp.resources <- resourceWrapper{}

Просмотреть файл

@ -10,76 +10,94 @@ import (
"time"
)
type AtomicInt32 int32
// AtomicInt32 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int32 functions.
type AtomicInt32 struct {
int32
}
// NewAtomicInt32 initializes a new AtomicInt32 with a given value.
func NewAtomicInt32(n int32) AtomicInt32 {
return AtomicInt32{n}
}
// Add atomically adds n to the value.
func (i *AtomicInt32) Add(n int32) int32 {
return atomic.AddInt32((*int32)(i), n)
return atomic.AddInt32(&i.int32, n)
}
// Set atomically sets n as new value.
func (i *AtomicInt32) Set(n int32) {
atomic.StoreInt32((*int32)(i), n)
atomic.StoreInt32(&i.int32, n)
}
// Get atomically returns the current value.
func (i *AtomicInt32) Get() int32 {
return atomic.LoadInt32((*int32)(i))
return atomic.LoadInt32(&i.int32)
}
// CompareAndSwap atomatically swaps the old with the new value.
func (i *AtomicInt32) CompareAndSwap(oldval, newval int32) (swapped bool) {
return atomic.CompareAndSwapInt32((*int32)(i), oldval, newval)
return atomic.CompareAndSwapInt32(&i.int32, oldval, newval)
}
type AtomicUint32 uint32
func (i *AtomicUint32) Add(n uint32) uint32 {
return atomic.AddUint32((*uint32)(i), n)
// AtomicInt64 is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.
type AtomicInt64 struct {
int64
}
func (i *AtomicUint32) Set(n uint32) {
atomic.StoreUint32((*uint32)(i), n)
// NewAtomicInt64 initializes a new AtomicInt64 with a given value.
func NewAtomicInt64(n int64) AtomicInt64 {
return AtomicInt64{n}
}
func (i *AtomicUint32) Get() uint32 {
return atomic.LoadUint32((*uint32)(i))
}
func (i *AtomicUint32) CompareAndSwap(oldval, newval uint32) (swapped bool) {
return atomic.CompareAndSwapUint32((*uint32)(i), oldval, newval)
}
type AtomicInt64 int64
// Add atomically adds n to the value.
func (i *AtomicInt64) Add(n int64) int64 {
return atomic.AddInt64((*int64)(i), n)
return atomic.AddInt64(&i.int64, n)
}
// Set atomically sets n as new value.
func (i *AtomicInt64) Set(n int64) {
atomic.StoreInt64((*int64)(i), n)
atomic.StoreInt64(&i.int64, n)
}
// Get atomically returns the current value.
func (i *AtomicInt64) Get() int64 {
return atomic.LoadInt64((*int64)(i))
return atomic.LoadInt64(&i.int64)
}
// CompareAndSwap atomatically swaps the old with the new value.
func (i *AtomicInt64) CompareAndSwap(oldval, newval int64) (swapped bool) {
return atomic.CompareAndSwapInt64((*int64)(i), oldval, newval)
return atomic.CompareAndSwapInt64(&i.int64, oldval, newval)
}
type AtomicDuration int64
// AtomicDuration is a wrapper with a simpler interface around atomic.(Add|Store|Load|CompareAndSwap)Int64 functions.
type AtomicDuration struct {
int64
}
// NewAtomicDuration initializes a new AtomicDuration with a given value.
func NewAtomicDuration(duration time.Duration) AtomicDuration {
return AtomicDuration{int64(duration)}
}
// Add atomically adds duration to the value.
func (d *AtomicDuration) Add(duration time.Duration) time.Duration {
return time.Duration(atomic.AddInt64((*int64)(d), int64(duration)))
return time.Duration(atomic.AddInt64(&d.int64, int64(duration)))
}
// Set atomically sets duration as new value.
func (d *AtomicDuration) Set(duration time.Duration) {
atomic.StoreInt64((*int64)(d), int64(duration))
atomic.StoreInt64(&d.int64, int64(duration))
}
// Get atomically returns the current value.
func (d *AtomicDuration) Get() time.Duration {
return time.Duration(atomic.LoadInt64((*int64)(d)))
return time.Duration(atomic.LoadInt64(&d.int64))
}
// CompareAndSwap atomatically swaps the old with the new value.
func (d *AtomicDuration) CompareAndSwap(oldval, newval time.Duration) (swapped bool) {
return atomic.CompareAndSwapInt64((*int64)(d), int64(oldval), int64(newval))
return atomic.CompareAndSwapInt64(&d.int64, int64(oldval), int64(newval))
}
// AtomicString gives you atomic-style APIs for string, but
@ -90,12 +108,14 @@ type AtomicString struct {
str string
}
// Set atomically sets str as new value.
func (s *AtomicString) Set(str string) {
s.mu.Lock()
s.str = str
s.mu.Unlock()
}
// Get atomically returns the current value.
func (s *AtomicString) Get() string {
s.mu.Lock()
str := s.str
@ -103,6 +123,7 @@ func (s *AtomicString) Get() string {
return str
}
// CompareAndSwap atomatically swaps the old with the new value.
func (s *AtomicString) CompareAndSwap(oldval, newval string) (swqpped bool) {
s.mu.Lock()
defer s.mu.Unlock()

Просмотреть файл

@ -168,16 +168,16 @@ func NewQueryEngine(config Config) *QueryEngine {
// Vars
qe.queryTimeout.Set(time.Duration(config.QueryTimeout * 1e9))
qe.spotCheckFreq = sync2.AtomicInt64(config.SpotCheckRatio * spotCheckMultiplier)
qe.spotCheckFreq = sync2.NewAtomicInt64(int64(config.SpotCheckRatio * spotCheckMultiplier))
if config.StrictMode {
qe.strictMode.Set(1)
}
qe.strictTableAcl = config.StrictTableAcl
qe.enableTableAclDryRun = config.EnableTableAclDryRun
qe.exemptACL = config.TableAclExemptACL
qe.maxResultSize = sync2.AtomicInt64(config.MaxResultSize)
qe.maxDMLRows = sync2.AtomicInt64(config.MaxDMLRows)
qe.streamBufferSize = sync2.AtomicInt64(config.StreamBufferSize)
qe.maxResultSize = sync2.NewAtomicInt64(int64(config.MaxResultSize))
qe.maxDMLRows = sync2.NewAtomicInt64(int64(config.MaxDMLRows))
qe.streamBufferSize = sync2.NewAtomicInt64(int64(config.StreamBufferSize))
// Loggers
qe.accessCheckerLogger = logutil.NewThrottledLogger("accessChecker", 1*time.Second)

Просмотреть файл

@ -77,9 +77,9 @@ func NewTxPool(
axp := &TxPool{
pool: NewConnPool(name, capacity, idleTimeout, enablePublishStats, qStats),
activePool: pools.NewNumbered(),
lastID: sync2.AtomicInt64(time.Now().UnixNano()),
timeout: sync2.AtomicDuration(timeout),
poolTimeout: sync2.AtomicDuration(poolTimeout),
lastID: sync2.NewAtomicInt64(time.Now().UnixNano()),
timeout: sync2.NewAtomicDuration(timeout),
poolTimeout: sync2.NewAtomicDuration(poolTimeout),
ticks: timer.NewTimer(timeout / 10),
txStats: stats.NewTimings(txStatsName),
queryServiceStats: qStats,

Просмотреть файл

@ -26,7 +26,7 @@ func testHandler(req *http.Request, t *testing.T) {
StartTime: time.Now(),
Queries: []string{"select * from test"},
Conclusion: "unknown",
LogToFile: sync2.AtomicInt32(0),
LogToFile: sync2.AtomicInt32{},
}
txConn.EndTime = txConn.StartTime
response = httptest.NewRecorder()

Просмотреть файл

@ -221,11 +221,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
if err != nil {
t.Errorf("want nil, got %v", err)
}
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// non-retryable failure
@ -249,11 +249,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
}
}
// Ensure that we tried only once
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only once when mapping KeyspaceId/KeyRange to shards
if s.SrvKeyspaceCounter != 1 {
@ -281,11 +281,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
}
}
// Ensure that we tried only once.
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only twice.
if s.SrvKeyspaceCounter != 2 {
@ -308,15 +308,15 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure original keyspace is not used.
if sbc0.ExecCount != 0 {
t.Errorf("want 0, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 0 {
t.Errorf("want 0, got %v", execCount)
}
if sbc1.ExecCount != 0 {
t.Errorf("want 0, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 0 {
t.Errorf("want 0, got %v", execCount)
}
// Ensure redirected keyspace is accessed once.
if sbc2.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc2.ExecCount)
if execCount := sbc2.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried each keyspace only once.
if s.SrvKeyspaceCounter != 1 {
@ -345,11 +345,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried only twice.
if sbc0.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
if sbc1.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// Ensure that we tried topo only 3 times.
if s.SrvKeyspaceCounter != 3 {
@ -376,11 +376,11 @@ func testResolverGeneric(t *testing.T, name string, action func() (*mproto.Query
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried only twice.
if sbc0.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
if sbc1.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// Ensure that we tried topo only twice.
if s.SrvKeyspaceCounter != 2 {
@ -399,8 +399,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto
if err != nil {
t.Errorf("want nil, got %v", err)
}
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// failure
@ -415,8 +415,8 @@ func testResolverStreamGeneric(t *testing.T, name string, action func() (*mproto
t.Errorf("want\n%s\ngot\n%v", want, err)
}
// Ensure that we tried only once.
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// Ensure that we tried topo only once
if s.SrvKeyspaceCounter != 1 {
@ -539,8 +539,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) {
if callcount != 2 {
t.Errorf("want 2, got %v", callcount)
}
if sbc.AsTransactionCount != 0 {
t.Errorf("want 0, got %v", sbc.AsTransactionCount)
if count := sbc.AsTransactionCount.Get(); count != 0 {
t.Errorf("want 0, got %v", count)
}
callcount = 0
@ -552,8 +552,8 @@ func TestResolverExecBatchAsTransaction(t *testing.T) {
if callcount != 1 {
t.Errorf("want 1, got %v", callcount)
}
if sbc.AsTransactionCount != 1 {
t.Errorf("want 1, got %v", sbc.AsTransactionCount)
if count := sbc.AsTransactionCount.Get(); count != 1 {
t.Errorf("want 1, got %v", count)
}
}

Просмотреть файл

@ -164,8 +164,8 @@ func TestSelectEqual(t *testing.T) {
if !reflect.DeepEqual(sbc2.Queries, wantQueries) {
t.Errorf("sbc2.Queries: %+v, want %+v\n", sbc2.Queries, wantQueries)
}
if sbc1.ExecCount != 1 {
t.Errorf("sbc1.ExecCount: %v, want 1\n", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("sbc1.ExecCount: %v, want 1\n", execCount)
}
if sbc1.Queries != nil {
t.Errorf("sbc1.Queries: %+v, want nil\n", sbc1.Queries)

Просмотреть файл

@ -105,8 +105,8 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) (
t.Errorf("want %s, got %v", want, err)
}
// Ensure that we tried only once.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// two shards
@ -123,11 +123,11 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) (
t.Errorf("\nwant\n%s\ngot\n%v", want1, err)
}
// Ensure that we tried only once.
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// duplicate shards
@ -136,8 +136,8 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) (
s.MapTestConn("0", sbc)
qr, err = f([]string{"0", "0"})
// Ensure that we executed only once.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// no errors
@ -150,11 +150,11 @@ func testScatterConnGeneric(t *testing.T, name string, f func(shards []string) (
if err != nil {
t.Errorf("want nil, got %v", err)
}
if sbc0.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount)
if execCount := sbc0.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if qr.RowsAffected != 2 {
t.Errorf("want 2, got %v", qr.RowsAffected)
@ -287,11 +287,11 @@ func TestScatterConnCommitSuccess(t *testing.T) {
if !reflect.DeepEqual(wantSession, *session.Session) {
t.Errorf("want\n%+v, got\n%+v", wantSession, *session.Session)
}
if sbc0.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc0.CommitCount)
if commitCount := sbc0.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
if sbc1.RollbackCount != 1 {
t.Errorf("want 1, got %d", sbc1.RollbackCount)
if rollbackCount := sbc1.RollbackCount.Get(); rollbackCount != 1 {
t.Errorf("want 1, got %d", rollbackCount)
}
}
@ -315,11 +315,11 @@ func TestScatterConnRollback(t *testing.T) {
if !reflect.DeepEqual(wantSession, *session.Session) {
t.Errorf("want\n%#v, got\n%#v", wantSession, *session.Session)
}
if sbc0.RollbackCount != 1 {
t.Errorf("want 1, got %d", sbc0.RollbackCount)
if rollbackCount := sbc0.RollbackCount.Get(); rollbackCount != 1 {
t.Errorf("want 1, got %d", rollbackCount)
}
if sbc1.RollbackCount != 1 {
t.Errorf("want 1, got %d", sbc1.RollbackCount)
if rollbackCount := sbc1.RollbackCount.Get(); rollbackCount != 1 {
t.Errorf("want 1, got %d", rollbackCount)
}
}
@ -331,8 +331,8 @@ func TestScatterConnClose(t *testing.T) {
stc.Execute(context.Background(), "query1", nil, "TestScatterConnClose", []string{"0"}, "", nil, false)
stc.Close()
time.Sleep(1)
if sbc.CloseCount.Get() != 1 {
t.Errorf("want 1, got %d", sbc.CloseCount)
if closeCount := sbc.CloseCount.Get(); closeCount != 1 {
t.Errorf("want 1, got %d (test may be flaky because connections are closed asynchronously)", closeCount)
}
}
@ -380,14 +380,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session)
}
stc.Commit(context.Background(), session)
if sbc0.ExecCount != 1 || sbc1.ExecCount != 3 {
t.Errorf("want 1/3, got %d/%d", sbc0.ExecCount, sbc1.ExecCount)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
if execCount0 != 1 || execCount1 != 3 {
t.Errorf("want 1/3, got %d/%d", execCount0, execCount1)
}
}
if sbc0.CommitCount != 0 {
t.Errorf("want 0, got %d", sbc0.CommitCount)
if commitCount := sbc0.CommitCount.Get(); commitCount != 0 {
t.Errorf("want 0, got %d", commitCount)
}
if sbc1.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc1.CommitCount)
if commitCount := sbc1.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
// case 2: write query followed by read query (not in transaction), not in the same shard.
@ -414,14 +418,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session)
}
stc.Commit(context.Background(), session)
if sbc0.ExecCount != 3 || sbc1.ExecCount != 1 {
t.Errorf("want 3/1, got %d/%d", sbc0.ExecCount, sbc1.ExecCount)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
if execCount0 != 3 || execCount1 != 1 {
t.Errorf("want 3/1, got %d/%d", execCount0, execCount1)
}
}
if sbc0.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc0.CommitCount)
if commitCount := sbc0.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
if sbc1.CommitCount != 0 {
t.Errorf("want 0, got %d", sbc1.CommitCount)
if commitCount := sbc1.CommitCount.Get(); commitCount != 0 {
t.Errorf("want 0, got %d", commitCount)
}
// case 3: write query followed by read query, in the same shard.
@ -448,14 +456,18 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
t.Errorf("want\n%+v\ngot\n%+v", wantSession, *session.Session)
}
stc.Commit(context.Background(), session)
if sbc0.ExecCount != 4 || sbc1.ExecCount != 1 {
t.Errorf("want 4/1, got %d/%d", sbc0.ExecCount, sbc1.ExecCount)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
if execCount0 != 4 || execCount1 != 1 {
t.Errorf("want 4/1, got %d/%d", execCount0, execCount1)
}
}
if sbc0.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc0.CommitCount)
if commitCount := sbc0.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
if sbc1.CommitCount != 0 {
t.Errorf("want 0, got %d", sbc1.CommitCount)
if commitCount := sbc1.CommitCount.Get(); commitCount != 0 {
t.Errorf("want 0, got %d", commitCount)
}
}

Просмотреть файл

@ -133,8 +133,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
t.Errorf("want 2, got %v", s.DialCounter)
}
// Ensure we executed 2 times before failing.
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// retry error (one failure)
@ -150,8 +150,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
t.Errorf("want 2, got %v", s.DialCounter)
}
// Ensure we executed twice (second one succeeded)
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// fatal error (one failure)
@ -167,8 +167,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
t.Errorf("want 2, got %v", s.DialCounter)
}
// Ensure we executed twice (second one succeeded)
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// server error
@ -185,8 +185,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
t.Errorf("want 1, got %v", s.DialCounter)
}
// Ensure we did not re-execute.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// conn error (one failure)
@ -204,8 +204,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
t.Errorf("want 1, got %v", s.DialCounter)
}
// Ensure we did not re-execute.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// no failures
@ -219,8 +219,8 @@ func testShardConnGeneric(t *testing.T, name string, f func() error) {
if s.DialCounter != 1 {
t.Errorf("want 1, got %v", s.DialCounter)
}
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
@ -235,8 +235,8 @@ func testShardConnTransact(t *testing.T, name string, f func() error) {
t.Errorf("want %s, got %v", want, err)
}
// Should not retry if we're in transaction
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
// conn error
@ -249,8 +249,8 @@ func testShardConnTransact(t *testing.T, name string, f func() error) {
t.Errorf("want %s, got %v", want, err)
}
// Should not retry if we're in transaction
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
@ -270,8 +270,8 @@ func TestShardConnBeginOther(t *testing.T) {
t.Errorf("want 1, got %v", s.DialCounter)
}
// Account for 1 call to Begin.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
@ -289,8 +289,8 @@ func TestShardConnStreamingRetry(t *testing.T) {
if s.DialCounter != 2 {
t.Errorf("want 2, got %v", s.DialCounter)
}
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// ERR_FATAL
@ -307,8 +307,8 @@ func TestShardConnStreamingRetry(t *testing.T) {
if s.DialCounter != 1 {
t.Errorf("want 1, got %v", s.DialCounter)
}
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
@ -486,8 +486,8 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration >= retryDelay {
t.Errorf("want no delay, got %v", timeDuration)
}
if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount)
if execCount := sbc0.ExecCount.Get() + sbc1.ExecCount.Get() + sbc2.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if s.EndPointCounter != 1 {
t.Errorf("want 1, got %v", s.EndPointCounter)
@ -515,11 +515,16 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration >= retryDelay {
t.Errorf("want no delay, got %v", timeDuration)
}
if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount)
if execCount := sbc0.ExecCount.Get() + sbc1.ExecCount.Get() + sbc2.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
if sbc0.ExecCount > 1 || sbc1.ExecCount > 1 || sbc2.ExecCount > 1 {
t.Errorf("want no more than 1, got %v,%v,%v", sbc0.ExecCount, sbc1.ExecCount, sbc2.ExecCount)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
execCount2 := sbc2.ExecCount.Get()
if execCount0 > 1 || execCount1 > 1 || execCount2 > 1 {
t.Errorf("want no more than 1, got %v,%v,%v", execCount0, execCount1, execCount2)
}
}
if s.EndPointCounter != 2 {
t.Errorf("want 2, got %v", s.EndPointCounter)
@ -548,11 +553,16 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration >= retryDelay {
t.Errorf("want no delay, got %v", timeDuration)
}
if sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc0.ExecCount+sbc1.ExecCount+sbc2.ExecCount)
}
if sbc0.ExecCount > 1 || sbc1.ExecCount > 1 || sbc2.ExecCount > 1 {
t.Errorf("want no more than 1, got %v,%v,%v", sbc0.ExecCount, sbc1.ExecCount, sbc2.ExecCount)
{
execCount0 := sbc0.ExecCount.Get()
execCount1 := sbc1.ExecCount.Get()
execCount2 := sbc2.ExecCount.Get()
if sum := execCount0 + execCount1 + execCount2; sum != 2 {
t.Errorf("want 2, got %v", sum)
}
if execCount0 > 1 || execCount1 > 1 || execCount2 > 1 {
t.Errorf("want no more than 1, got %v,%v,%v", execCount0, execCount1, execCount2)
}
}
if s.EndPointCounter != 2 {
t.Errorf("want 2, got %v", s.EndPointCounter)
@ -585,12 +595,12 @@ func TestShardConnReconnect(t *testing.T) {
t.Errorf("want instant resolve %v, got %v", retryDelay, timeDuration)
}
for _, conn := range []*sandboxConn{sbc0, sbc1, sbc2} {
wantExecCount := 1
var wantExecCount int64 = 1
if conn == firstConn {
wantExecCount = 2
}
if int(conn.ExecCount) != wantExecCount {
t.Errorf("want %v, got %v", wantExecCount, conn.ExecCount)
if execCount := conn.ExecCount.Get(); execCount != wantExecCount {
t.Errorf("want %v, got %v", wantExecCount, execCount)
}
}
if s.EndPointCounter != 5 {
@ -628,12 +638,12 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration >= retryDelay {
t.Errorf("want no delay, got %v", timeDuration)
}
if firstConn.ExecCount != 1 {
t.Errorf("want 1, got %v", firstConn.ExecCount)
if execCount := firstConn.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
totalExecCount := 0
var totalExecCount int64
for _, conn := range s.TestConns["0"] {
totalExecCount += int(conn.(*sandboxConn).ExecCount)
totalExecCount += conn.(*sandboxConn).ExecCount.Get()
}
if totalExecCount != 1 {
t.Errorf("want 1, got %v", totalExecCount)
@ -681,15 +691,15 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration > retryDelay*2 {
t.Errorf("want instant resolve %v, got %v", retryDelay, timeDuration)
}
if secondConn.ExecCount != 1 {
t.Errorf("want 1, got %v", secondConn.ExecCount)
if execCount := secondConn.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
if firstConn.ExecCount != 2 {
t.Errorf("want 2, got %v", firstConn.ExecCount)
if execCount := firstConn.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
for _, conn := range s.TestConns["0"] {
if conn != firstConn && conn.(*sandboxConn).ExecCount != 1 {
t.Errorf("want 1, got %v", conn.(*sandboxConn).ExecCount)
if execCount := conn.(*sandboxConn).ExecCount.Get(); conn != firstConn && execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
if s.EndPointCounter != 6 {
@ -734,16 +744,16 @@ func TestShardConnReconnect(t *testing.T) {
if timeDuration >= retryDelay {
t.Errorf("want no delay, got %v", timeDuration)
}
if firstConn.ExecCount != 1 {
t.Errorf("want 1, got %v", firstConn.ExecCount)
if execCount := firstConn.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
for _, conn := range []*sandboxConn{sbc0, sbc1, sbc2} {
if conn != firstConn && conn.ExecCount != 0 {
t.Errorf("want 0, got %v", conn.ExecCount)
if execCount := conn.ExecCount.Get(); conn != firstConn && execCount != 0 {
t.Errorf("want 0, got %v", execCount)
}
}
if sbc3.ExecCount+sbc4.ExecCount+sbc5.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc3.ExecCount+sbc4.ExecCount+sbc5.ExecCount)
if sum := sbc3.ExecCount.Get() + sbc4.ExecCount.Get() + sbc5.ExecCount.Get(); sum != 1 {
t.Errorf("want 1, got %v", sum)
}
if s.EndPointCounter != 2 {
t.Errorf("want 2, got %v", s.EndPointCounter)

Просмотреть файл

@ -75,8 +75,8 @@ func TestInTransactionKeyspaceAlias(t *testing.T) {
}
// Ensure that we tried once, no retry here
// since we are in a transaction.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}
@ -90,8 +90,8 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried 2 times, 1 for retry and 1 for redirect.
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
// Fatal Error, for keyspace that is redirected should succeed.
@ -105,16 +105,16 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st
t.Errorf("want '%v', got '%v'", want, err)
}
// Ensure that we tried only once.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
} else {
if err != nil {
t.Errorf("want nil, got %v", err)
}
// Ensure that we tried 2 times, 1 for retry and 1 for redirect.
if sbc.ExecCount != 2 {
t.Errorf("want 2, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 2 {
t.Errorf("want 2, got %v", execCount)
}
}
@ -128,7 +128,7 @@ func testVerticalSplitGeneric(t *testing.T, isStreaming bool, f func(shards []st
t.Errorf("want '%v', got '%v'", want, err)
}
// Ensure that we tried once, no retry here.
if sbc.ExecCount != 1 {
t.Errorf("want 1, got %v", sbc.ExecCount)
if execCount := sbc.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v", execCount)
}
}

Просмотреть файл

@ -102,7 +102,7 @@ func Init(serv SrvTopoServer, schema *planbuilder.Schema, cell string, retryDela
rowsReturned: stats.NewMultiCounters("VtgateApiRowsReturned", []string{"Operation", "Keyspace", "DbType"}),
maxInFlight: int64(maxInFlight),
inFlight: 0,
inFlight: sync2.NewAtomicInt64(0),
logExecute: logutil.NewThrottledLogger("Execute", 5*time.Second),
logExecuteShard: logutil.NewThrottledLogger("ExecuteShard", 5*time.Second),

Просмотреть файл

@ -79,8 +79,8 @@ func TestVTGateExecute(t *testing.T) {
}
rpcVTGate.Commit(context.Background(), q.Session)
if sbc.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc.CommitCount)
if commitCount := sbc.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
q.Session = new(proto.Session)
@ -131,8 +131,8 @@ func TestVTGateExecuteShard(t *testing.T) {
}
rpcVTGate.Commit(context.Background(), q.Session)
if sbc.CommitCount != 1 {
t.Errorf("want 1, got %d", sbc.CommitCount)
if commitCount := sbc.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
q.Session = new(proto.Session)
@ -178,8 +178,8 @@ func TestVTGateExecuteKeyspaceIds(t *testing.T) {
if qr.Session != nil {
t.Errorf("want nil, got %+v\n", qr.Session)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v\n", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v\n", execCount)
}
// Test for successful execution in transaction
q.Session = new(proto.Session)
@ -201,8 +201,8 @@ func TestVTGateExecuteKeyspaceIds(t *testing.T) {
t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session)
}
rpcVTGate.Commit(context.Background(), q.Session)
if sbc1.CommitCount.Get() != 1 {
t.Errorf("want 1, got %d", sbc1.CommitCount.Get())
if commitCount := sbc1.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
// Test for multiple shards
kid30, err := key.HexKeyspaceId("30").Unhex()
@ -243,8 +243,8 @@ func TestVTGateExecuteKeyRanges(t *testing.T) {
if qr.Session != nil {
t.Errorf("want nil, got %+v\n", qr.Session)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v\n", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v\n", execCount)
}
// Test for successful execution in transaction
q.Session = new(proto.Session)
@ -269,8 +269,8 @@ func TestVTGateExecuteKeyRanges(t *testing.T) {
t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session)
}
rpcVTGate.Commit(context.Background(), q.Session)
if sbc1.CommitCount.Get() != 1 {
t.Errorf("want 1, got %v", sbc1.CommitCount.Get())
if commitCount := sbc1.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %v", commitCount)
}
// Test for multiple shards
kr, err = key.ParseKeyRangeParts("10", "30")
@ -317,8 +317,8 @@ func TestVTGateExecuteEntityIds(t *testing.T) {
if qr.Session != nil {
t.Errorf("want nil, got %+v\n", qr.Session)
}
if sbc1.ExecCount != 1 {
t.Errorf("want 1, got %v\n", sbc1.ExecCount)
if execCount := sbc1.ExecCount.Get(); execCount != 1 {
t.Errorf("want 1, got %v\n", execCount)
}
// Test for successful execution in transaction
q.Session = new(proto.Session)
@ -340,8 +340,8 @@ func TestVTGateExecuteEntityIds(t *testing.T) {
t.Errorf("want \n%+v, got \n%+v", wantSession, q.Session)
}
rpcVTGate.Commit(context.Background(), q.Session)
if sbc1.CommitCount.Get() != 1 {
t.Errorf("want 1, got %d", sbc1.CommitCount.Get())
if commitCount := sbc1.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
// Test for multiple shards
kid30, err := key.HexKeyspaceId("30").Unhex()
@ -526,8 +526,8 @@ func TestVTGateStreamExecuteKeyspaceIds(t *testing.T) {
t.Errorf("want\n%#v\ngot\n%#v", want, qrs)
}
rpcVTGate.Commit(context.Background(), sq.Session)
if sbc.CommitCount.Get() != 1 {
t.Errorf("want 1, got %d", sbc.CommitCount.Get())
if commitCount := sbc.CommitCount.Get(); commitCount != 1 {
t.Errorf("want 1, got %d", commitCount)
}
// Test for successful execution - multiple keyspaceids in single shard
sq.Session = nil