diff --git a/go/vt/tabletserver/consolidator.go b/go/sync2/consolidator.go similarity index 76% rename from go/vt/tabletserver/consolidator.go rename to go/sync2/consolidator.go index 7c0f276b24..6a0617716a 100644 --- a/go/vt/tabletserver/consolidator.go +++ b/go/sync2/consolidator.go @@ -2,21 +2,20 @@ // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. -package tabletserver +package sync2 import ( "fmt" "net/http" "sync" "sync/atomic" - "time" "github.com/youtube/vitess/go/acl" "github.com/youtube/vitess/go/cache" ) var ( - waitError = NewTabletError(ErrFail, "Error waiting for consolidation") + waitError = fmt.Errorf("Error waiting for consolidation") ) // Consolidator consolidates duplicate queries from executing simulaneously @@ -28,19 +27,15 @@ type Consolidator struct { } // NewConsolidator creates a new Consolidator -func NewConsolidator(httpHandle bool) *Consolidator { - co := &Consolidator{queries: make(map[string]*Result), consolidations: cache.NewLRUCache(1000)} - if httpHandle { - http.Handle("/debug/consolidations", co) - } - return co +func NewConsolidator() *Consolidator { + return &Consolidator{queries: make(map[string]*Result), consolidations: cache.NewLRUCache(1000)} } // Result is a wrapper for result of a query. type Result struct { executing sync.RWMutex consolidator *Consolidator - sql string + query string Result interface{} Err error } @@ -48,17 +43,17 @@ type Result struct { // Create adds a query to currently executing queries and acquires a // lock on its Result if it is not already present. If the query is // a duplicate, Create returns false. -func (co *Consolidator) Create(sql string) (r *Result, created bool) { +func (co *Consolidator) Create(query string) (r *Result, created bool) { co.mu.Lock() defer co.mu.Unlock() - if r, ok := co.queries[sql]; ok { + if r, ok := co.queries[query]; ok { return r, false } // Preset the error. If there was an unexpected panic during the main // query, then all those who waited will return the waitError. - r = &Result{consolidator: co, sql: sql, Err: waitError} + r = &Result{consolidator: co, query: query, Err: waitError} r.executing.Lock() - co.queries[sql] = r + co.queries[query] = r return r, true } @@ -79,12 +74,12 @@ func (co *Consolidator) ServeHTTP(response http.ResponseWriter, request *http.Re } } -func (co *Consolidator) record(sql string) { - if v, ok := co.consolidations.Get(sql); ok { +func (co *Consolidator) record(query string) { + if v, ok := co.consolidations.Get(query); ok { v.(*ccount).Add(1) } else { c := ccount(1) - co.consolidations.Set(sql, &c) + co.consolidations.Set(query, &c) } } @@ -94,15 +89,14 @@ func (co *Consolidator) record(sql string) { func (rs *Result) Broadcast() { rs.consolidator.mu.Lock() defer rs.consolidator.mu.Unlock() - delete(rs.consolidator.queries, rs.sql) + delete(rs.consolidator.queries, rs.query) rs.executing.Unlock() } // Wait waits for the original query to complete execution. Wait should // be invoked for duplicate queries. func (rs *Result) Wait() { - rs.consolidator.record(rs.sql) - defer waitStats.Record("Consolidations", time.Now()) + rs.consolidator.record(rs.query) rs.executing.RLock() } diff --git a/go/vt/tabletserver/consolidator_test.go b/go/sync2/consolidator_test.go similarity index 56% rename from go/vt/tabletserver/consolidator_test.go rename to go/sync2/consolidator_test.go index 1fd955eadf..cc70066aaf 100644 --- a/go/vt/tabletserver/consolidator_test.go +++ b/go/sync2/consolidator_test.go @@ -1,38 +1,38 @@ -package tabletserver +package sync2 -import ( - "testing" - - "github.com/youtube/vitess/go/mysql/proto" -) +import "testing" func TestConsolidator(t *testing.T) { - qe := NewQueryEngine(qsConfig) + con := NewConsolidator() sql := "select * from SomeTable" - orig, added := qe.consolidator.Create(sql) + orig, added := con.Create(sql) if !added { t.Errorf("expected consolidator to register a new entry") } - dup, added := qe.consolidator.Create(sql) + dup, added := con.Create(sql) if added { t.Errorf("did not expect consolidator to register a new entry") } + result := 1 go func() { - orig.Result = &proto.QueryResult{InsertId: 145} + orig.Result = &result orig.Broadcast() }() dup.Wait() - if orig.Result.(*proto.QueryResult).InsertId != dup.Result.(*proto.QueryResult).InsertId { + if *orig.Result.(*int) != result { + t.Errorf("failed to pass result") + } + if *orig.Result.(*int) != *dup.Result.(*int) { t.Errorf("failed to share the result") } // Running the query again should add a new entry since the original // query execution completed - _, added = qe.consolidator.Create(sql) + _, added = con.Create(sql) if !added { t.Errorf("expected consolidator to register a new entry") } diff --git a/go/vt/tabletserver/query_engine.go b/go/vt/tabletserver/query_engine.go index fbb3f3e953..e0b1de285d 100644 --- a/go/vt/tabletserver/query_engine.go +++ b/go/vt/tabletserver/query_engine.go @@ -5,6 +5,7 @@ package tabletserver import ( + "net/http" "sync" "time" @@ -48,7 +49,7 @@ type QueryEngine struct { // Services txPool *TxPool - consolidator *Consolidator + consolidator *sync2.Consolidator invalidator *RowcacheInvalidator streamQList *QueryList tasks sync.WaitGroup @@ -148,7 +149,8 @@ func NewQueryEngine(config Config) *QueryEngine { time.Duration(config.TxPoolTimeout*1e9), time.Duration(config.IdleTimeout*1e9), ) - qe.consolidator = NewConsolidator(true) + qe.consolidator = sync2.NewConsolidator() + http.Handle("/debug/consolidations", qe.consolidator) qe.invalidator = NewRowcacheInvalidator(qe) qe.streamQList = NewQueryList() diff --git a/go/vt/tabletserver/request_context.go b/go/vt/tabletserver/request_context.go index 4bf54ce8d9..44a6c7ab9d 100644 --- a/go/vt/tabletserver/request_context.go +++ b/go/vt/tabletserver/request_context.go @@ -48,6 +48,10 @@ func (rqc *RequestContext) qFetch(logStats *SQLQueryStats, parsedQuery *sqlparse q, ok := rqc.qe.consolidator.Create(string(sql)) if ok { defer q.Broadcast() + // Wrap default error to TabletError + if q.Err != nil { + q.Err = NewTabletError(ErrFail, q.Err.Error()) + } waitingForConnectionStart := time.Now() timeout, err := rqc.deadline.Timeout() if err != nil { @@ -63,7 +67,9 @@ func (rqc *RequestContext) qFetch(logStats *SQLQueryStats, parsedQuery *sqlparse } } else { logStats.QuerySources |= QUERY_SOURCE_CONSOLIDATOR + startTime := time.Now() q.Wait() + waitStats.Record("Consolidations", startTime) } if q.Err != nil { panic(q.Err) diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index 8c19071b34..a703756b99 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -15,7 +15,6 @@ import ( "github.com/youtube/vitess/go/sync2" "github.com/youtube/vitess/go/vt/concurrency" kproto "github.com/youtube/vitess/go/vt/key" - "github.com/youtube/vitess/go/vt/tabletserver" tproto "github.com/youtube/vitess/go/vt/tabletserver/proto" "github.com/youtube/vitess/go/vt/tabletserver/tabletconn" "github.com/youtube/vitess/go/vt/topo" @@ -28,13 +27,12 @@ var idGen sync2.AtomicInt64 // ScatterConn is used for executing queries across // multiple ShardConn connections. type ScatterConn struct { - toposerv SrvTopoServer - cell string - retryDelay time.Duration - retryCount int - connTimeout time.Duration - timings *stats.MultiTimings - consolidator *tabletserver.Consolidator + toposerv SrvTopoServer + cell string + retryDelay time.Duration + retryCount int + connTimeout time.Duration + timings *stats.MultiTimings mu sync.Mutex shardConns map[string]*ShardConn @@ -51,14 +49,13 @@ type shardActionFunc func(conn *ShardConn, transactionId int64, sResults chan<- // for creating the appropriate ShardConn. func NewScatterConn(serv SrvTopoServer, statsName, cell string, retryDelay time.Duration, retryCount int, connTimeout time.Duration) *ScatterConn { return &ScatterConn{ - toposerv: serv, - cell: cell, - retryDelay: retryDelay, - retryCount: retryCount, - connTimeout: connTimeout, - timings: stats.NewMultiTimings(statsName, []string{"Operation", "Keyspace", "ShardName", "DbType"}), - consolidator: tabletserver.NewConsolidator(false), - shardConns: make(map[string]*ShardConn), + toposerv: serv, + cell: cell, + retryDelay: retryDelay, + retryCount: retryCount, + connTimeout: connTimeout, + timings: stats.NewMultiTimings(statsName, []string{"Operation", "Keyspace", "ShardName", "DbType"}), + shardConns: make(map[string]*ShardConn), } } @@ -556,7 +553,7 @@ func (stc *ScatterConn) getConnection(context context.Context, keyspace, shard s key := fmt.Sprintf("%s.%s.%s", keyspace, shard, tabletType) sdc, ok := stc.shardConns[key] if !ok { - sdc = NewShardConn(context, stc.toposerv, stc.cell, keyspace, shard, tabletType, stc.retryDelay, stc.retryCount, stc.connTimeout, stc.consolidator) + sdc = NewShardConn(context, stc.toposerv, stc.cell, keyspace, shard, tabletType, stc.retryDelay, stc.retryCount, stc.connTimeout) stc.shardConns[key] = sdc } return sdc diff --git a/go/vt/vtgate/shard_conn.go b/go/vt/vtgate/shard_conn.go index fa2e5ef82e..f5309ded20 100644 --- a/go/vt/vtgate/shard_conn.go +++ b/go/vt/vtgate/shard_conn.go @@ -10,8 +10,8 @@ import ( "time" mproto "github.com/youtube/vitess/go/mysql/proto" + "github.com/youtube/vitess/go/sync2" "github.com/youtube/vitess/go/vt/concurrency" - "github.com/youtube/vitess/go/vt/tabletserver" tproto "github.com/youtube/vitess/go/vt/tabletserver/proto" "github.com/youtube/vitess/go/vt/tabletserver/tabletconn" "github.com/youtube/vitess/go/vt/topo" @@ -23,7 +23,6 @@ import ( // be concurrently used across goroutines. Such requests are // interleaved on the same underlying connection. type ShardConn struct { - cell string keyspace string shard string tabletType topo.TabletType @@ -31,7 +30,7 @@ type ShardConn struct { retryCount int connTimeout time.Duration balancer *Balancer - consolidator *tabletserver.Consolidator + consolidator *sync2.Consolidator // conn needs a mutex because it can change during the lifetime of ShardConn. mu sync.Mutex @@ -41,7 +40,7 @@ type ShardConn struct { // NewShardConn creates a new ShardConn. It creates a Balancer using // serv, cell, keyspace, tabletType and retryDelay. retryCount is the max // number of retries before a ShardConn returns an error on an operation. -func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard string, tabletType topo.TabletType, retryDelay time.Duration, retryCount int, connTimeout time.Duration, consolidator *tabletserver.Consolidator) *ShardConn { +func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard string, tabletType topo.TabletType, retryDelay time.Duration, retryCount int, connTimeout time.Duration) *ShardConn { getAddresses := func() (*topo.EndPoints, error) { endpoints, err := serv.GetEndPoints(ctx, cell, keyspace, shard, tabletType) if err != nil { @@ -51,7 +50,6 @@ func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard } blc := NewBalancer(getAddresses, retryDelay) return &ShardConn{ - cell: cell, keyspace: keyspace, shard: shard, tabletType: tabletType, @@ -59,7 +57,7 @@ func NewShardConn(ctx context.Context, serv SrvTopoServer, cell, keyspace, shard retryCount: retryCount, connTimeout: connTimeout, balancer: blc, - consolidator: consolidator, + consolidator: sync2.NewConsolidator(), } } @@ -221,10 +219,10 @@ func (sdc *ShardConn) getConn(ctx context.Context) (conn tabletconn.TabletConn, sdc.mu.Unlock() return conn, endPoint, false, nil } - sdc.mu.Unlock() - key := fmt.Sprintf("%s.%s.%s.%s", sdc.cell, sdc.keyspace, sdc.shard, sdc.tabletType) + key := fmt.Sprintf("%s.%s.%s", sdc.keyspace, sdc.shard, sdc.tabletType) q, ok := sdc.consolidator.Create(key) + sdc.mu.Unlock() if ok { defer q.Broadcast() conn, endPoint, isTimeout, err := sdc.getNewConn(ctx) diff --git a/go/vt/vtgate/shard_conn_test.go b/go/vt/vtgate/shard_conn_test.go index a8915b21cb..32bb35ea04 100644 --- a/go/vt/vtgate/shard_conn_test.go +++ b/go/vt/vtgate/shard_conn_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/youtube/vitess/go/vt/tabletserver" tproto "github.com/youtube/vitess/go/vt/tabletserver/proto" "github.com/youtube/vitess/go/vt/topo" "golang.org/x/net/context" @@ -18,20 +17,19 @@ import ( // This file uses the sandbox_test framework. var ( - consolidator = tabletserver.NewConsolidator(false) - retryCount = 3 - retryDelay = 1 * time.Millisecond - connTimeout = 1 * time.Millisecond + retryCount = 3 + retryDelay = 1 * time.Millisecond + connTimeout = 1 * time.Millisecond ) func TestShardConnExecute(t *testing.T) { testShardConnGeneric(t, "TestShardConnExecute", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout) _, err := sdc.Execute(context.Background(), "query", nil, 0) return err }) testShardConnTransact(t, "TestShardConnExecute", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecute", "0", "", retryDelay, retryCount, connTimeout) _, err := sdc.Execute(context.Background(), "query", nil, 1) return err }) @@ -39,13 +37,13 @@ func TestShardConnExecute(t *testing.T) { func TestShardConnExecuteBatch(t *testing.T) { testShardConnGeneric(t, "TestShardConnExecuteBatch", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) queries := []tproto.BoundQuery{{"query", nil}} _, err := sdc.ExecuteBatch(context.Background(), queries, 0) return err }) testShardConnTransact(t, "TestShardConnExecuteBatch", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteBatch", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) queries := []tproto.BoundQuery{{"query", nil}} _, err := sdc.ExecuteBatch(context.Background(), queries, 1) return err @@ -54,12 +52,12 @@ func TestShardConnExecuteBatch(t *testing.T) { func TestShardConnExecuteStream(t *testing.T) { testShardConnGeneric(t, "TestShardConnExecuteStream", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) _, errfunc := sdc.StreamExecute(context.Background(), "query", nil, 0) return errfunc() }) testShardConnTransact(t, "TestShardConnExecuteStream", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnExecuteStream", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) _, errfunc := sdc.StreamExecute(context.Background(), "query", nil, 1) return errfunc() }) @@ -67,7 +65,7 @@ func TestShardConnExecuteStream(t *testing.T) { func TestShardConnBegin(t *testing.T) { testShardConnGeneric(t, "TestShardConnBegin", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBegin", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBegin", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) _, err := sdc.Begin(context.Background()) return err }) @@ -75,14 +73,14 @@ func TestShardConnBegin(t *testing.T) { func TestShardConnCommit(t *testing.T) { testShardConnTransact(t, "TestShardConnCommit", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnCommit", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnCommit", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) return sdc.Commit(context.Background(), 1) }) } func TestShardConnRollback(t *testing.T) { testShardConnTransact(t, "TestShardConnRollback", func() error { - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnRollback", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnRollback", "0", "", 1*time.Millisecond, 3, 1*time.Millisecond) return sdc.Rollback(context.Background(), 1) }) } @@ -255,7 +253,7 @@ func TestShardConnBeginOther(t *testing.T) { s := createSandbox("TestShardConnBeginOther") sbc := &sandboxConn{mustFailTxPool: 1} s.MapTestConn("0", sbc) - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBeginOther", "0", "", 10*time.Millisecond, 3, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnBeginOther", "0", "", 10*time.Millisecond, 3, 1*time.Millisecond) _, err := sdc.Begin(context.Background()) if err != nil { t.Errorf("want nil, got %v", err) @@ -275,7 +273,7 @@ func TestShardConnReconnect(t *testing.T) { retryCount := 5 s := createSandbox("TestShardConnReconnect") // case 1: resolved 0 endpoint, return error - sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc := NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) startTime := time.Now() _, err := sdc.Execute(context.Background(), "query", nil, 0) execDuration := time.Now().Sub(startTime) @@ -297,7 +295,7 @@ func TestShardConnReconnect(t *testing.T) { s.DialMustFail = 1 sbc := &sandboxConn{} s.MapTestConn("0", sbc) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart := time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration := time.Now().Sub(timeStart) @@ -315,7 +313,7 @@ func TestShardConnReconnect(t *testing.T) { s.Reset() sbc = &sandboxConn{mustFailConn: 1} s.MapTestConn("0", sbc) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -338,7 +336,7 @@ func TestShardConnReconnect(t *testing.T) { s.MapTestConn("0", sbc0) s.MapTestConn("0", sbc1) s.MapTestConn("0", sbc2) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -367,7 +365,7 @@ func TestShardConnReconnect(t *testing.T) { s.MapTestConn("0", sbc0) s.MapTestConn("0", sbc1) s.MapTestConn("0", sbc2) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -400,7 +398,7 @@ func TestShardConnReconnect(t *testing.T) { s.MapTestConn("0", sbc0) s.MapTestConn("0", sbc1) s.MapTestConn("0", sbc2) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -433,7 +431,7 @@ func TestShardConnReconnect(t *testing.T) { s.MapTestConn("0", sbc0) s.MapTestConn("0", sbc1) s.MapTestConn("0", sbc2) - sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), new(sandboxTopo), "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -480,7 +478,7 @@ func TestShardConnReconnect(t *testing.T) { } countGetEndPoints++ } - sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -530,7 +528,7 @@ func TestShardConnReconnect(t *testing.T) { } countGetEndPoints++ } - sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart) @@ -586,7 +584,7 @@ func TestShardConnReconnect(t *testing.T) { } countGetEndPoints++ } - sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond, consolidator) + sdc = NewShardConn(context.Background(), &sandboxTopo{callbackGetEndPoints: onGetEndPoints}, "aa", "TestShardConnReconnect", "0", "", retryDelay, retryCount, 1*time.Millisecond) timeStart = time.Now() sdc.Execute(context.Background(), "query", nil, 0) timeDuration = time.Now().Sub(timeStart)