зеркало из https://github.com/github/vitess-gh.git
Move consolidator, and related fix
This commit is contained in:
Родитель
c7dfbf7b37
Коммит
d2300c57f4
|
@ -2,21 +2,20 @@
|
|||
// Use of this source code is governed by a BSD-style
|
||||
// license that can be found in the LICENSE file.
|
||||
|
||||
package tabletserver
|
||||
package sync2
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/acl"
|
||||
"github.com/youtube/vitess/go/cache"
|
||||
)
|
||||
|
||||
var (
|
||||
waitError = NewTabletError(ErrFail, "Error waiting for consolidation")
|
||||
waitError = fmt.Errorf("Error waiting for consolidation")
|
||||
)
|
||||
|
||||
// Consolidator consolidates duplicate queries from executing simulaneously
|
||||
|
@ -28,19 +27,15 @@ type Consolidator struct {
|
|||
}
|
||||
|
||||
// NewConsolidator creates a new Consolidator
|
||||
func NewConsolidator(httpHandle bool) *Consolidator {
|
||||
co := &Consolidator{queries: make(map[string]*Result), consolidations: cache.NewLRUCache(1000)}
|
||||
if httpHandle {
|
||||
http.Handle("/debug/consolidations", co)
|
||||
}
|
||||
return co
|
||||
func NewConsolidator() *Consolidator {
|
||||
return &Consolidator{queries: make(map[string]*Result), consolidations: cache.NewLRUCache(1000)}
|
||||
}
|
||||
|
||||
// Result is a wrapper for result of a query.
|
||||
type Result struct {
|
||||
executing sync.RWMutex
|
||||
consolidator *Consolidator
|
||||
sql string
|
||||
query string
|
||||
Result interface{}
|
||||
Err error
|
||||
}
|
||||
|
@ -48,17 +43,17 @@ type Result struct {
|
|||
// Create adds a query to currently executing queries and acquires a
|
||||
// lock on its Result if it is not already present. If the query is
|
||||
// a duplicate, Create returns false.
|
||||
func (co *Consolidator) Create(sql string) (r *Result, created bool) {
|
||||
func (co *Consolidator) Create(query string) (r *Result, created bool) {
|
||||
co.mu.Lock()
|
||||
defer co.mu.Unlock()
|
||||
if r, ok := co.queries[sql]; ok {
|
||||
if r, ok := co.queries[query]; ok {
|
||||
return r, false
|
||||
}
|
||||
// Preset the error. If there was an unexpected panic during the main
|
||||
// query, then all those who waited will return the waitError.
|
||||
r = &Result{consolidator: co, sql: sql, Err: waitError}
|
||||
r = &Result{consolidator: co, query: query, Err: waitError}
|
||||
r.executing.Lock()
|
||||
co.queries[sql] = r
|
||||
co.queries[query] = r
|
||||
return r, true
|
||||
}
|
||||
|
||||
|
@ -79,12 +74,12 @@ func (co *Consolidator) ServeHTTP(response http.ResponseWriter, request *http.Re
|
|||
}
|
||||
}
|
||||
|
||||
func (co *Consolidator) record(sql string) {
|
||||
if v, ok := co.consolidations.Get(sql); ok {
|
||||
func (co *Consolidator) record(query string) {
|
||||
if v, ok := co.consolidations.Get(query); ok {
|
||||
v.(*ccount).Add(1)
|
||||
} else {
|
||||
c := ccount(1)
|
||||
co.consolidations.Set(sql, &c)
|
||||
co.consolidations.Set(query, &c)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -94,15 +89,14 @@ func (co *Consolidator) record(sql string) {
|
|||
func (rs *Result) Broadcast() {
|
||||
rs.consolidator.mu.Lock()
|
||||
defer rs.consolidator.mu.Unlock()
|
||||
delete(rs.consolidator.queries, rs.sql)
|
||||
delete(rs.consolidator.queries, rs.query)
|
||||
rs.executing.Unlock()
|
||||
}
|
||||
|
||||
// Wait waits for the original query to complete execution. Wait should
|
||||
// be invoked for duplicate queries.
|
||||
func (rs *Result) Wait() {
|
||||
rs.consolidator.record(rs.sql)
|
||||
defer waitStats.Record("Consolidations", time.Now())
|
||||
rs.consolidator.record(rs.query)
|
||||
rs.executing.RLock()
|
||||
}
|
||||
|
|
@ -1,38 +1,38 @@
|
|||
package tabletserver
|
||||
package sync2
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/youtube/vitess/go/mysql/proto"
|
||||
)
|
||||
import "testing"
|
||||
|
||||
func TestConsolidator(t *testing.T) {
|
||||
qe := NewQueryEngine(qsConfig)
|
||||
con := NewConsolidator()
|
||||
sql := "select * from SomeTable"
|
||||
|
||||
orig, added := qe.consolidator.Create(sql)
|
||||
orig, added := con.Create(sql)
|
||||
if !added {
|
||||
t.Errorf("expected consolidator to register a new entry")
|
||||
}
|
||||
|
||||
dup, added := qe.consolidator.Create(sql)
|
||||
dup, added := con.Create(sql)
|
||||
if added {
|
||||
t.Errorf("did not expect consolidator to register a new entry")
|
||||
}
|
||||
|
||||
result := 1
|
||||
go func() {
|
||||
orig.Result = &proto.QueryResult{InsertId: 145}
|
||||
orig.Result = &result
|
||||
orig.Broadcast()
|
||||
}()
|
||||
dup.Wait()
|
||||
|
||||
if orig.Result.(*proto.QueryResult).InsertId != dup.Result.(*proto.QueryResult).InsertId {
|
||||
if *orig.Result.(*int) != result {
|
||||
t.Errorf("failed to pass result")
|
||||
}
|
||||
if *orig.Result.(*int) != *dup.Result.(*int) {
|
||||
t.Errorf("failed to share the result")
|
||||
}
|
||||
|
||||
// Running the query again should add a new entry since the original
|
||||
// query execution completed
|
||||
_, added = qe.consolidator.Create(sql)
|
||||
_, added = con.Create(sql)
|
||||
if !added {
|
||||
t.Errorf("expected consolidator to register a new entry")
|
||||
}
|
|
@ -5,6 +5,7 @@
|
|||
package tabletserver
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
|
@ -48,7 +49,7 @@ type QueryEngine struct {
|
|||
|
||||
// Services
|
||||
txPool *TxPool
|
||||
consolidator *Consolidator
|
||||
consolidator *sync2.Consolidator
|
||||
invalidator *RowcacheInvalidator
|
||||
streamQList *QueryList
|
||||
tasks sync.WaitGroup
|
||||
|
@ -148,7 +149,8 @@ func NewQueryEngine(config Config) *QueryEngine {
|
|||
time.Duration(config.TxPoolTimeout*1e9),
|
||||
time.Duration(config.IdleTimeout*1e9),
|
||||
)
|
||||
qe.consolidator = NewConsolidator(true)
|
||||
qe.consolidator = sync2.NewConsolidator()
|
||||
http.Handle("/debug/consolidations", qe.consolidator)
|
||||
qe.invalidator = NewRowcacheInvalidator(qe)
|
||||
qe.streamQList = NewQueryList()
|
||||
|
||||
|
|
|
@ -48,6 +48,10 @@ func (rqc *RequestContext) qFetch(logStats *SQLQueryStats, parsedQuery *sqlparse
|
|||
q, ok := rqc.qe.consolidator.Create(string(sql))
|
||||
if ok {
|
||||
defer q.Broadcast()
|
||||
// Wrap default error to TabletError
|
||||
if q.Err != nil {
|
||||
q.Err = NewTabletError(ErrFail, q.Err.Error())
|
||||
}
|
||||
waitingForConnectionStart := time.Now()
|
||||
timeout, err := rqc.deadline.Timeout()
|
||||
if err != nil {
|
||||
|
@ -63,7 +67,9 @@ func (rqc *RequestContext) qFetch(logStats *SQLQueryStats, parsedQuery *sqlparse
|
|||
}
|
||||
} else {
|
||||
logStats.QuerySources |= QUERY_SOURCE_CONSOLIDATOR
|
||||
startTime := time.Now()
|
||||
q.Wait()
|
||||
waitStats.Record("Consolidations", startTime)
|
||||
}
|
||||
if q.Err != nil {
|
||||
panic(q.Err)
|
||||
|
|
|
@ -15,7 +15,6 @@ import (
|
|||
"github.com/youtube/vitess/go/sync2"
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
kproto "github.com/youtube/vitess/go/vt/key"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
|
@ -28,13 +27,12 @@ var idGen sync2.AtomicInt64
|
|||
// ScatterConn is used for executing queries across
|
||||
// multiple ShardConn connections.
|
||||
type ScatterConn struct {
|
||||
toposerv SrvTopoServer
|
||||
cell string
|
||||
retryDelay time.Duration
|
||||
retryCount int
|
||||
connTimeout time.Duration
|
||||
timings *stats.MultiTimings
|
||||
consolidator *tabletserver.Consolidator
|
||||
toposerv SrvTopoServer
|
||||
cell string
|
||||
retryDelay time.Duration
|
||||
retryCount int
|
||||
connTimeout time.Duration
|
||||
timings *stats.MultiTimings
|
||||
|
||||
mu sync.Mutex
|
||||
shardConns map[string]*ShardConn
|
||||
|
@ -51,14 +49,13 @@ type shardActionFunc func(conn *ShardConn, transactionId int64, sResults chan<-
|
|||
// for creating the appropriate ShardConn.
|
||||
func NewScatterConn(serv SrvTopoServer, statsName, cell string, retryDelay time.Duration, retryCount int, connTimeout time.Duration) *ScatterConn {
|
||||
return &ScatterConn{
|
||||
toposerv: serv,
|
||||
cell: cell,
|
||||
retryDelay: retryDelay,
|
||||
retryCount: retryCount,
|
||||
connTimeout: connTimeout,
|
||||
timings: stats.NewMultiTimings(statsName, []string{"Operation", "Keyspace", "ShardName", "DbType"}),
|
||||
consolidator: tabletserver.NewConsolidator(false),
|
||||
shardConns: make(map[string]*ShardConn),
|
||||
toposerv: serv,
|
||||
cell: cell,
|
||||
retryDelay: retryDelay,
|
||||
retryCount: retryCount,
|
||||
connTimeout: connTimeout,
|
||||
timings: stats.NewMultiTimings(statsName, []string{"Operation", "Keyspace", "ShardName", "DbType"}),
|
||||
shardConns: make(map[string]*ShardConn),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -556,7 +553,7 @@ func (stc *ScatterConn) getConnection(context context.Context, keyspace, shard s
|
|||
key := fmt.Sprintf("%s.%s.%s", keyspace, shard, tabletType)
|
||||
sdc, ok := stc.shardConns[key]
|
||||
if !ok {
|
||||
sdc = NewShardConn(context, stc.toposerv, stc.cell, keyspace, shard, tabletType, stc.retryDelay, stc.retryCount, stc.connTimeout, stc.consolidator)
|
||||
sdc = NewShardConn(context, stc.toposerv, stc.cell, keyspace, shard, tabletType, stc.retryDelay, stc.retryCount, stc.connTimeout)
|
||||
stc.shardConns[key] = sdc
|
||||
}
|
||||
return sdc
|
||||
|
|
|
@ -10,8 +10,8 @@ import (
|
|||
"time"
|
||||
|
||||
mproto "github.com/youtube/vitess/go/mysql/proto"
|
||||
"github.com/youtube/vitess/go/sync2"
|
||||
"github.com/youtube/vitess/go/vt/concurrency"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
|
@ -23,7 +23,6 @@ import (
|
|||
// be concurrently used across goroutines. Such requests are
|
||||
// interleaved on the same underlying connection.
|
||||
type ShardConn struct {
|
||||
cell string
|
||||
keyspace string
|
||||
shard string
|
||||
tabletType topo.TabletType
|
||||
|
@ -31,7 +30,7 @@ type ShardConn struct {
|
|||
retryCount int
|
||||
connTimeout time.Duration
|
||||
balancer *Balancer
|
||||
consolidator *tabletserver.Consolidator
|
||||
consolidator *sync2.Consolidator
|
||||
|
||||
// conn needs a mutex because it can change during the lifetime of ShardConn.
|
||||
mu sync.Mutex
|
||||
|
@ -41,7 +40,7 @@ type ShardConn struct {
|
|||
// NewShardConn creates a new ShardConn. It creates a Balancer using
|
||||
// serv, cell, keyspace, tabletType and retryDelay. retryCount is the max
|
||||
// number of retries before a ShardConn returns an error on an operation.
|
||||
func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard string, tabletType topo.TabletType, retryDelay time.Duration, retryCount int, connTimeout time.Duration, consolidator *tabletserver.Consolidator) *ShardConn {
|
||||
func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard string, tabletType topo.TabletType, retryDelay time.Duration, retryCount int, connTimeout time.Duration) *ShardConn {
|
||||
getAddresses := func() (*topo.EndPoints, error) {
|
||||
endpoints, err := serv.GetEndPoints(ctx, cell, keyspace, shard, tabletType)
|
||||
if err != nil {
|
||||
|
@ -51,7 +50,6 @@ func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard
|
|||
}
|
||||
blc := NewBalancer(getAddresses, retryDelay)
|
||||
return &ShardConn{
|
||||
cell: cell,
|
||||
keyspace: keyspace,
|
||||
shard: shard,
|
||||
tabletType: tabletType,
|
||||
|
@ -59,7 +57,7 @@ func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard
|
|||
retryCount: retryCount,
|
||||
connTimeout: connTimeout,
|
||||
balancer: blc,
|
||||
consolidator: consolidator,
|
||||
consolidator: sync2.NewConsolidator(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -221,10 +219,10 @@ func (sdc *ShardConn) getConn(ctx context.Context) (conn tabletconn.TabletConn,
|
|||
sdc.mu.Unlock()
|
||||
return conn, endPoint, false, nil
|
||||
}
|
||||
sdc.mu.Unlock()
|
||||
|
||||
key := fmt.Sprintf("%s.%s.%s.%s", sdc.cell, sdc.keyspace, sdc.shard, sdc.tabletType)
|
||||
key := fmt.Sprintf("%s.%s.%s", sdc.keyspace, sdc.shard, sdc.tabletType)
|
||||
q, ok := sdc.consolidator.Create(key)
|
||||
sdc.mu.Unlock()
|
||||
if ok {
|
||||
defer q.Broadcast()
|
||||
conn, endPoint, isTimeout, err := sdc.getNewConn(ctx)
|
||||
|
|
|
@ -9,7 +9,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/youtube/vitess/go/vt/tabletserver"
|
||||
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -18,20 +17,19 @@ import (
|
|||
// This file uses the sandbox_test framework.
|
||||
|
||||
var (
|
||||
consolidator = tabletserver.NewConsolidator(false)
|
||||
retryCount = 3
|
||||
retryDelay = 1 * time.Millisecond
|
||||
connTimeout = 1 * time.Millisecond
|
||||
retryCount = 3
|
||||
retryDelay = 1 * time.Millisecond
|
||||
connTimeout = 1 * time.Millisecond
|
||||
)
|
||||
|
||||
func TestShardConnExecute(t *testing.T) {
|
||||
testShardConnGeneric(t, "TestShardConnExecute", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout)
|
||||
_, err := sdc.Execute(context.Background(), "query", nil, 0)
|
||||
return err
|
||||
})
|
||||
testShardConnTransact(t, "TestShardConnExecute", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout)
|
||||
_, err := sdc.Execute(context.Background(), "query", nil, 1)
|
||||
return err
|
||||
})
|
||||
|
@ -39,13 +37,13 @@ func TestShardConnExecute(t *testing.T) {
|
|||
|
||||
func TestShardConnExecuteBatch(t *testing.T) {
|
||||
testShardConnGeneric(t, "TestShardConnExecuteBatch", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
queries := []tproto.BoundQuery{{"query", nil}}
|
||||
_, err := sdc.ExecuteBatch(context.Background(), queries, 0)
|
||||
return err
|
||||
})
|
||||
testShardConnTransact(t, "TestShardConnExecuteBatch", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
queries := []tproto.BoundQuery{{"query", nil}}
|
||||
_, err := sdc.ExecuteBatch(context.Background(), queries, 1)
|
||||
return err
|
||||
|
@ -54,12 +52,12 @@ func TestShardConnExecuteBatch(t *testing.T) {
|
|||
|
||||
func TestShardConnExecuteStream(t *testing.T) {
|
||||
testShardConnGeneric(t, "TestShardConnExecuteStream", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
_, errfunc := sdc.StreamExecute(context.Background(), "query", nil, 0)
|
||||
return errfunc()
|
||||
})
|
||||
testShardConnTransact(t, "TestShardConnExecuteStream", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
_, errfunc := sdc.StreamExecute(context.Background(), "query", nil, 1)
|
||||
return errfunc()
|
||||
})
|
||||
|
@ -67,7 +65,7 @@ func TestShardConnExecuteStream(t *testing.T) {
|
|||
|
||||
func TestShardConnBegin(t *testing.T) {
|
||||
testShardConnGeneric(t, "TestShardConnBegin", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBegin", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBegin", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
_, err := sdc.Begin(context.Background())
|
||||
return err
|
||||
})
|
||||
|
@ -75,14 +73,14 @@ func TestShardConnBegin(t *testing.T) {
|
|||
|
||||
func TestShardConnCommit(t *testing.T) {
|
||||
testShardConnTransact(t, "TestShardConnCommit", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnCommit", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnCommit", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
return sdc.Commit(context.Background(), 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestShardConnRollback(t *testing.T) {
|
||||
testShardConnTransact(t, "TestShardConnRollback", func() error {
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnRollback", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnRollback", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond)
|
||||
return sdc.Rollback(context.Background(), 1)
|
||||
})
|
||||
}
|
||||
|
@ -255,7 +253,7 @@ func TestShardConnBeginOther(t *testing.T) {
|
|||
s := createSandbox("TestShardConnBeginOther")
|
||||
sbc := &sandboxConn{mustFailTxPool: 1}
|
||||
s.MapTestConn("0", sbc)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBeginOther", "0", "", 10*time.Millisecond, 3, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBeginOther", "0", "", 10*time.Millisecond, 3, 1*time.Millisecond)
|
||||
_, err := sdc.Begin(context.Background())
|
||||
if err != nil {
|
||||
t.Errorf("want nil, got %v", err)
|
||||
|
@ -275,7 +273,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
retryCount := 5
|
||||
s := createSandbox("TestShardConnReconnect")
|
||||
// case 1: resolved 0 endpoint, return error
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
startTime := time.Now()
|
||||
_, err := sdc.Execute(context.Background(), "query", nil, 0)
|
||||
execDuration := time.Now().Sub(startTime)
|
||||
|
@ -297,7 +295,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.DialMustFail = 1
|
||||
sbc := &sandboxConn{}
|
||||
s.MapTestConn("0", sbc)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart := time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration := time.Now().Sub(timeStart)
|
||||
|
@ -315,7 +313,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.Reset()
|
||||
sbc = &sandboxConn{mustFailConn: 1}
|
||||
s.MapTestConn("0", sbc)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -338,7 +336,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.MapTestConn("0", sbc0)
|
||||
s.MapTestConn("0", sbc1)
|
||||
s.MapTestConn("0", sbc2)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -367,7 +365,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.MapTestConn("0", sbc0)
|
||||
s.MapTestConn("0", sbc1)
|
||||
s.MapTestConn("0", sbc2)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -400,7 +398,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.MapTestConn("0", sbc0)
|
||||
s.MapTestConn("0", sbc1)
|
||||
s.MapTestConn("0", sbc2)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -433,7 +431,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
s.MapTestConn("0", sbc0)
|
||||
s.MapTestConn("0", sbc1)
|
||||
s.MapTestConn("0", sbc2)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -480,7 +478,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
}
|
||||
countGetEndPoints++
|
||||
}
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -530,7 +528,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
}
|
||||
countGetEndPoints++
|
||||
}
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
@ -586,7 +584,7 @@ func TestShardConnReconnect(t *testing.T) {
|
|||
}
|
||||
countGetEndPoints++
|
||||
}
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator)
|
||||
sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond)
|
||||
timeStart = time.Now()
|
||||
sdc.Execute(context.Background(), "query", nil, 0)
|
||||
timeDuration = time.Now().Sub(timeStart)
|
||||
|
|
Загрузка…
Ссылка в новой задаче