Merge pull request #1186 from youtube/vtgate

Implement gateway with discovery module.
This commit is contained in:
Liang 2015-10-09 22:33:51 -07:00
Родитель 7ee787d822 7210629aa2
Коммит 3e6152f752
6 изменённых файлов: 591 добавлений и 22 удалений

Просмотреть файл

@ -6,6 +6,7 @@ package main
import (
"flag"
"math/rand"
"time"
log "github.com/golang/glog"
@ -38,6 +39,7 @@ var healthCheck discovery.HealthCheck
var initFakeZK func()
func init() {
rand.Seed(time.Now().UnixNano())
servenv.RegisterDefaultFlags()
}

Просмотреть файл

@ -1,6 +1,9 @@
package discovery
import pbt "github.com/youtube/vitess/go/vt/proto/topodata"
import (
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
)
func newFakeHealthCheck() *fakeHealthCheck {
return &fakeHealthCheck{endPoints: make(map[string]*pbt.EndPoint)}
@ -36,6 +39,11 @@ func (fhc *fakeHealthCheck) GetEndPointStatsFromTarget(keyspace, shard string, t
return nil
}
// GetConnection returns the TabletConn of the given endpoint.
func (fhc *fakeHealthCheck) GetConnection(endPoint *pbt.EndPoint) tabletconn.TabletConn {
return nil
}
// CacheStatus returns a displayable version of the cache.
func (fhc *fakeHealthCheck) CacheStatus() EndPointsCacheStatusList {
return nil

Просмотреть файл

@ -54,6 +54,8 @@ type HealthCheck interface {
GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*EndPointStats
// GetEndPointStatsFromTarget returns all EndPointStats for the given target.
GetEndPointStatsFromTarget(keyspace, shard string, tabletType pbt.TabletType) []*EndPointStats
// GetConnection returns the TabletConn of the given endpoint.
GetConnection(endPoint *pbt.EndPoint) tabletconn.TabletConn
// CacheStatus returns a displayable version of the cache.
CacheStatus() EndPointsCacheStatusList
}
@ -344,6 +346,19 @@ func (hc *HealthCheckImpl) GetEndPointStatsFromTarget(keyspace, shard string, ta
return res
}
// GetConnection returns the TabletConn of the given endpoint.
func (hc *HealthCheckImpl) GetConnection(endPoint *pbt.EndPoint) tabletconn.TabletConn {
hc.mu.RLock()
defer hc.mu.RUnlock()
hcc := hc.addrToConns[EndPointToMapKey(endPoint)]
if hcc == nil {
return nil
}
hcc.mu.RLock()
defer hcc.mu.RUnlock()
return hcc.conn
}
// addEndPointToTargetProtected adds the endpoint to the given target.
// LOCK_REQUIRED hc.mu
func (hc *HealthCheckImpl) addEndPointToTargetProtected(target *pbq.Target, endPoint *pbt.EndPoint) {

Просмотреть файл

@ -201,7 +201,3 @@ func shuffle(addressNodes []*addressStatus, length int) {
addressNodes[i], addressNodes[index] = addressNodes[index], addressNodes[i]
}
}
func init() {
rand.Seed(time.Now().UnixNano())
}

Просмотреть файл

@ -5,8 +5,9 @@
package vtgate
import (
"errors"
"flag"
"fmt"
"math/rand"
"strings"
"time"
@ -17,9 +18,11 @@ import (
"github.com/youtube/vitess/go/vt/discovery"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
"github.com/youtube/vitess/go/vt/proto/vtrpc"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vterrors"
)
var (
@ -28,11 +31,7 @@ var (
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
)
var errNotImplemented = errors.New("Not implemented")
const (
gatewayImplementationDiscovery = "discoverygateway"
)
const gatewayImplementationDiscovery = "discoverygateway"
func init() {
RegisterGatewayCreator(gatewayImplementationDiscovery, createDiscoveryGateway)
@ -43,6 +42,7 @@ func createDiscoveryGateway(hc discovery.HealthCheck, topoServer topo.Server, se
hc: hc,
topoServer: topoServer,
localCell: cell,
retryCount: retryCount,
tabletsWatchers: make([]*discovery.CellTabletsWatcher, 0, 1),
}
}
@ -51,6 +51,7 @@ type discoveryGateway struct {
hc discovery.HealthCheck
topoServer topo.Server
localCell string
retryCount int
tabletsWatchers []*discovery.CellTabletsWatcher
}
@ -66,39 +67,81 @@ func (dg *discoveryGateway) InitializeConnections(ctx context.Context) error {
}
// Execute executes the non-streaming query for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
return nil, errNotImplemented
func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (qr *mproto.QueryResult, err error) {
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var innerErr error
qr, innerErr = conn.Execute2(ctx, query, bindVars, transactionID)
return innerErr
}, transactionID, false)
return qr, err
}
// ExecuteBatch executes a group of queries for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) {
return nil, errNotImplemented
func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (qrs *tproto.QueryResultList, err error) {
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var innerErr error
qrs, innerErr = conn.ExecuteBatch2(ctx, queries, asTransaction, transactionID)
return innerErr
}, transactionID, false)
return qrs, err
}
// StreamExecute executes a streaming query for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc) {
return nil, func() error { return errNotImplemented }
var usedConn tabletconn.TabletConn
var erFunc tabletconn.ErrFunc
var results <-chan *mproto.QueryResult
err := dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var err error
results, erFunc, err = conn.StreamExecute2(ctx, query, bindVars, transactionID)
usedConn = conn
return err
}, transactionID, true)
if err != nil {
return results, func() error { return err }
}
inTransaction := (transactionID != 0)
return results, func() error {
return WrapError(erFunc(), keyspace, shard, tabletType, usedConn.EndPoint(), inTransaction)
}
}
// Begin starts a transaction for the specified keyspace, shard, and tablet type.
// It returns the transaction ID.
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pbt.TabletType) (int64, error) {
return 0, errNotImplemented
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pbt.TabletType) (transactionID int64, err error) {
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var innerErr error
transactionID, innerErr = conn.Begin2(ctx)
return innerErr
}, 0, false)
return transactionID, err
}
// Commit commits the current transaction for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Commit(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error {
return errNotImplemented
return dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
return conn.Commit2(ctx, transactionID)
}, transactionID, false)
}
// Rollback rolls back the current transaction for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Rollback(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error {
return errNotImplemented
return dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
return conn.Rollback2(ctx, transactionID)
}, transactionID, false)
}
// SplitQuery splits a query into sub-queries for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) {
return nil, errNotImplemented
func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) (queries []tproto.QuerySplit, err error) {
err = dg.withRetry(ctx, keyspace, shard, tabletType, func(conn tabletconn.TabletConn) error {
var innerErr error
queries, innerErr = conn.SplitQuery(ctx, tproto.BoundQuery{
Sql: sql,
BindVariables: bindVariables,
}, splitColumn, splitCount)
return innerErr
}, 0, false)
return
}
// Close shuts down underlying connections.
@ -112,3 +155,169 @@ func (dg *discoveryGateway) Close(ctx context.Context) error {
// StatsUpdate receives updates about target and realtime stats changes.
func (dg *discoveryGateway) StatsUpdate(endPoint *pbt.EndPoint, cell string, target *pbq.Target, tabletExternallyReparentedTimestamp int64, stats *pbq.RealtimeStats) {
}
// withRetry gets available connections and executes the action. If there are retryable errors,
// it retries retryCount times before failing. It does not retry if the connection is in
// the middle of a transaction. While returning the error check if it maybe a result of
// a resharding event, and set the re-resolve bit and let the upper layers
// re-resolve and retry.
func (dg *discoveryGateway) withRetry(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, action func(conn tabletconn.TabletConn) error, transactionID int64, isStreaming bool) error {
var endPointLastUsed *pbt.EndPoint
var err error
inTransaction := (transactionID != 0)
invalidEndPoints := make(map[string]bool)
for i := 0; i < dg.retryCount+1; i++ {
var endPoint *pbt.EndPoint
endPoints := dg.getEndPoints(keyspace, shard, tabletType)
if len(endPoints) == 0 {
// fail fast if there is no endpoint
err = vterrors.FromError(vtrpc.ErrorCode_INTERNAL_ERROR, fmt.Errorf("no valid endpoint"))
break
}
shuffleEndPoints(endPoints)
// skip endpoints we tried before
for _, ep := range endPoints {
if _, ok := invalidEndPoints[discovery.EndPointToMapKey(ep)]; !ok {
endPoint = ep
break
}
}
if endPoint == nil {
if err == nil {
// do not override error from last attempt.
err = vterrors.FromError(vtrpc.ErrorCode_INTERNAL_ERROR, fmt.Errorf("no available connection"))
}
break
}
// execute
endPointLastUsed = endPoint
conn := dg.hc.GetConnection(endPoint)
if conn == nil {
err = vterrors.FromError(vtrpc.ErrorCode_INTERNAL_ERROR, fmt.Errorf("no connection for %+v", endPoint))
invalidEndPoints[discovery.EndPointToMapKey(endPoint)] = true
continue
}
err = action(conn)
if dg.canRetry(ctx, err, transactionID, isStreaming) {
invalidEndPoints[discovery.EndPointToMapKey(endPoint)] = true
continue
}
break
}
return WrapError(err, keyspace, shard, tabletType, endPointLastUsed, inTransaction)
}
// canRetry determines whether a query can be retried or not.
// OperationalErrors like retry/fatal are retryable if query is not in a txn.
// All other errors are non-retryable.
func (dg *discoveryGateway) canRetry(ctx context.Context, err error, transactionID int64, isStreaming bool) bool {
if err == nil {
return false
}
// Do not retry if ctx.Done() is closed.
select {
case <-ctx.Done():
return false
default:
}
if serverError, ok := err.(*tabletconn.ServerError); ok {
switch serverError.Code {
case tabletconn.ERR_FATAL:
// Do not retry on fatal error for streaming query.
// For streaming query, vttablet sends:
// - RETRY, if streaming is not started yet;
// - FATAL, if streaming is broken halfway.
// For non-streaming query, handle as ERR_RETRY.
if isStreaming {
return false
}
fallthrough
case tabletconn.ERR_RETRY:
// Retry on RETRY and FATAL if not in a transaction.
inTransaction := (transactionID != 0)
return !inTransaction
default:
// Not retry for TX_POOL_FULL and normal server errors.
return false
}
}
// Do not retry on operational error.
return false
}
func shuffleEndPoints(endPoints []*pbt.EndPoint) {
index := 0
length := len(endPoints)
for i := length - 1; i > 0; i-- {
index = rand.Intn(i + 1)
endPoints[i], endPoints[index] = endPoints[index], endPoints[i]
}
}
// getEndPoints gets all available endpoints from HealthCheck,
// and selects the usable ones based several rules:
// master - return one from any cells with latest reparent timestamp;
// replica - return all from local cell.
// TODO(liang): instead of checking eps.LastError, check eps.serving flag.
// TODO(liang): select replica by replication lag.
func (dg *discoveryGateway) getEndPoints(keyspace, shard string, tabletType pbt.TabletType) []*pbt.EndPoint {
epsList := dg.hc.GetEndPointStatsFromTarget(keyspace, shard, tabletType)
// for master, use any cells and return the one with max reparent timestamp.
if tabletType == pbt.TabletType_MASTER {
var maxTimestamp int64
var ep *pbt.EndPoint
for _, eps := range epsList {
if eps.LastError != nil {
continue
}
if eps.TabletExternallyReparentedTimestamp >= maxTimestamp {
maxTimestamp = eps.TabletExternallyReparentedTimestamp
ep = eps.EndPoint
}
}
if ep == nil {
return nil
}
return []*pbt.EndPoint{ep}
}
// for non-master, use only endpoints from local cell.
var epList []*pbt.EndPoint
for _, eps := range epsList {
if eps.LastError != nil {
continue
}
if dg.localCell != eps.Cell {
continue
}
epList = append(epList, eps.EndPoint)
}
return epList
}
// WrapError returns ShardConnError which preserves the original error code if possible,
// adds the connection context
// and adds a bit to determine whether the keyspace/shard needs to be
// re-resolved for a potential sharding event.
func WrapError(in error, keyspace, shard string, tabletType pbt.TabletType, endPoint *pbt.EndPoint, inTransaction bool) (wrapped error) {
if in == nil {
return nil
}
shardIdentifier := fmt.Sprintf("%s.%s.%s, %+v", keyspace, shard, strings.ToLower(tabletType.String()), endPoint)
code := tabletconn.ERR_NORMAL
serverError, ok := in.(*tabletconn.ServerError)
if ok {
code = serverError.Code
}
shardConnErr := &ShardConnError{
Code: code,
ShardIdentifier: shardIdentifier,
InTransaction: inTransaction,
Err: in,
endpointCode: vterrors.RecoverVtErrorCode(in),
}
return shardConnErr
}

Просмотреть файл

@ -0,0 +1,339 @@
package vtgate
import (
"fmt"
"testing"
"time"
"golang.org/x/net/context"
"github.com/youtube/vitess/go/vt/discovery"
"github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
)
func TestDiscoveryGatewayExecute(t *testing.T) {
testDiscoveryGatewayGeneric(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
_, err := dg.Execute(context.Background(), keyspace, shard, tabletType, "query", nil, 0)
return err
})
testDiscoveryGatewayTransact(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
_, err := dg.Execute(context.Background(), keyspace, shard, tabletType, "query", nil, 1)
return err
})
}
func TestDiscoveryGatewayExecuteBatch(t *testing.T) {
testDiscoveryGatewayGeneric(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
queries := []tproto.BoundQuery{{"query", nil}}
_, err := dg.ExecuteBatch(context.Background(), keyspace, shard, tabletType, queries, false, 0)
return err
})
testDiscoveryGatewayTransact(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
queries := []tproto.BoundQuery{{"query", nil}}
_, err := dg.ExecuteBatch(context.Background(), keyspace, shard, tabletType, queries, false, 1)
return err
})
}
func TestDiscoveryGatewayExecuteStream(t *testing.T) {
testDiscoveryGatewayGeneric(t, true, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
_, errfunc := dg.StreamExecute(context.Background(), keyspace, shard, tabletType, "query", nil, 0)
return errfunc()
})
testDiscoveryGatewayTransact(t, true, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
_, errfunc := dg.StreamExecute(context.Background(), keyspace, shard, tabletType, "query", nil, 1)
return errfunc()
})
}
func TestDiscoveryGatewayBegin(t *testing.T) {
testDiscoveryGatewayGeneric(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
_, err := dg.Begin(context.Background(), keyspace, shard, tabletType)
return err
})
}
func TestDiscoveryGatewayCommit(t *testing.T) {
testDiscoveryGatewayTransact(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
return dg.Commit(context.Background(), keyspace, shard, tabletType, 1)
})
}
func TestDiscoveryGatewayRollback(t *testing.T) {
testDiscoveryGatewayTransact(t, false, func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error {
return dg.Rollback(context.Background(), keyspace, shard, tabletType, 1)
})
}
func TestDiscoveryGatewayGetEndPoints(t *testing.T) {
keyspace := "ks"
shard := "0"
hc := newFakeHealthCheck()
dg := createDiscoveryGateway(hc, topo.Server{}, nil, "local", time.Millisecond, 2, time.Second, time.Second, time.Second, nil).(*discoveryGateway)
// replica should only use local ones
hc.addTestEndPoint("remote", "1.1.1.1", 1001, keyspace, shard, pbt.TabletType_REPLICA, 10, nil, nil)
ep1 := hc.addTestEndPoint("local", "2.2.2.2", 1001, keyspace, shard, pbt.TabletType_REPLICA, 10, nil, nil)
eps := dg.getEndPoints(keyspace, shard, pbt.TabletType_REPLICA)
if len(eps) != 1 || !topo.EndPointEquality(eps[0], ep1) {
t.Errorf("want %+v, got %+v", ep1, eps[0])
}
// master should use the one with newer timestamp regardless of cell
hc.addTestEndPoint("remote", "1.1.1.1", 1001, keyspace, shard, pbt.TabletType_MASTER, 5, nil, nil)
ep1 = hc.addTestEndPoint("remote", "2.2.2.2", 1001, keyspace, shard, pbt.TabletType_MASTER, 10, nil, nil)
eps = dg.getEndPoints(keyspace, shard, pbt.TabletType_MASTER)
if len(eps) != 1 || !topo.EndPointEquality(eps[0], ep1) {
t.Errorf("want %+v, got %+v", ep1, eps[0])
}
}
func testDiscoveryGatewayGeneric(t *testing.T, streaming bool, f func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error) {
keyspace := "ks"
shard := "0"
tabletType := pbt.TabletType_REPLICA
hc := newFakeHealthCheck()
dg := createDiscoveryGateway(hc, topo.Server{}, nil, "cell", time.Millisecond, 2, time.Second, time.Second, time.Second, nil)
// no endpoint
hc.Reset()
want := "shard, host: ks.0.replica, <nil>, no valid endpoint"
err := f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_INTERNAL_ERROR)
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
// endpoint with error
hc.Reset()
hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, fmt.Errorf("no connection"), nil)
want = "shard, host: ks.0.replica, <nil>, no valid endpoint"
err = f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_INTERNAL_ERROR)
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
// endpoint without connection
hc.Reset()
ep1 := hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, nil)
want = fmt.Sprintf(`shard, host: ks.0.replica, %+v, no connection for %+v`, ep1, ep1)
err = f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_INTERNAL_ERROR)
if hc.GetStatsFromTargetCounter != 2 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 2", hc.GetStatsFromTargetCounter)
}
// retry error
hc.Reset()
ep1 = hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailRetry: 1})
ep2 := hc.addTestEndPoint("cell", "1.1.1.1", 1002, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailRetry: 1})
wants := map[string]int{
fmt.Sprintf(`shard, host: ks.0.replica, %+v, retry: err`, ep1): 0,
fmt.Sprintf(`shard, host: ks.0.replica, %+v, retry: err`, ep2): 0,
}
err = f(dg, keyspace, shard, tabletType)
if _, ok := wants[fmt.Sprintf("%v", err)]; !ok {
t.Errorf("wanted error: %+v, got error: %v", wants, err)
}
if hc.GetStatsFromTargetCounter != 3 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 3", hc.GetStatsFromTargetCounter)
}
// fatal error
hc.Reset()
ep1 = hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailFatal: 1})
ep2 = hc.addTestEndPoint("cell", "1.1.1.1", 1002, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailFatal: 1})
wants = map[string]int{
fmt.Sprintf(`shard, host: ks.0.replica, %+v, fatal: err`, ep1): 0,
fmt.Sprintf(`shard, host: ks.0.replica, %+v, fatal: err`, ep2): 0,
}
err = f(dg, keyspace, shard, tabletType)
if _, ok := wants[fmt.Sprintf("%v", err)]; !ok {
t.Errorf("wanted error: %+v, got error: %v", wants, err)
}
wantCounter := 3
if streaming {
// streaming query does not retry on fatal
wantCounter = 1
}
if hc.GetStatsFromTargetCounter != wantCounter {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want %v", hc.GetStatsFromTargetCounter, wantCounter)
}
// server error - no retry
hc.Reset()
ep1 = hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailServer: 1})
want = fmt.Sprintf(`shard, host: ks.0.replica, %+v, error: err`, ep1)
err = f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_BAD_INPUT)
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
// conn error - no retry
hc.Reset()
ep1 = hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailConn: 1})
want = fmt.Sprintf(`shard, host: ks.0.replica, %+v, error: conn`, ep1)
err = f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_UNKNOWN_ERROR)
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
// no failure
hc.Reset()
hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{})
err = f(dg, keyspace, shard, tabletType)
if err != nil {
t.Errorf("want nil, got %v", err)
}
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
}
func testDiscoveryGatewayTransact(t *testing.T, streaming bool, f func(dg Gateway, keyspace, shard string, tabletType pbt.TabletType) error) {
keyspace := "ks"
shard := "0"
tabletType := pbt.TabletType_REPLICA
hc := newFakeHealthCheck()
dg := createDiscoveryGateway(hc, topo.Server{}, nil, "cell", time.Millisecond, 2, time.Second, time.Second, time.Second, nil)
// retry error - no retry
hc.Reset()
ep1 := hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailRetry: 1})
ep2 := hc.addTestEndPoint("cell", "1.1.1.1", 1002, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailRetry: 1})
wants := map[string]int{
fmt.Sprintf(`shard, host: ks.0.replica, %+v, retry: err`, ep1): 0,
fmt.Sprintf(`shard, host: ks.0.replica, %+v, retry: err`, ep2): 0,
}
err := f(dg, keyspace, shard, tabletType)
if _, ok := wants[fmt.Sprintf("%v", err)]; !ok {
t.Errorf("wanted error: %+v, got error: %v", wants, err)
}
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
// conn error - no retry
hc.Reset()
ep1 = hc.addTestEndPoint("cell", "1.1.1.1", 1001, keyspace, shard, tabletType, 10, nil, &sandboxConn{mustFailConn: 1})
want := fmt.Sprintf(`shard, host: ks.0.replica, %+v, error: conn`, ep1)
err = f(dg, keyspace, shard, tabletType)
verifyShardConnError(t, err, want, vtrpc.ErrorCode_UNKNOWN_ERROR)
if hc.GetStatsFromTargetCounter != 1 {
t.Errorf("hc.GetStatsFromTargetCounter = %v; want 1", hc.GetStatsFromTargetCounter)
}
}
func newFakeHealthCheck() *fakeHealthCheck {
return &fakeHealthCheck{items: make(map[string]*fhcItem)}
}
type fhcItem struct {
eps *discovery.EndPointStats
conn tabletconn.TabletConn
}
type fakeHealthCheck struct {
items map[string]*fhcItem
// stats
GetStatsFromTargetCounter int
GetStatsFromKeyspaceShardCounter int
}
func (fhc *fakeHealthCheck) Reset() {
fhc.GetStatsFromTargetCounter = 0
fhc.GetStatsFromKeyspaceShardCounter = 0
fhc.items = make(map[string]*fhcItem)
}
// SetListener sets the listener for healthcheck updates.
func (fhc *fakeHealthCheck) SetListener(listener discovery.HealthCheckStatsListener) {
}
// AddEndPoint adds the endpoint, and starts health check.
func (fhc *fakeHealthCheck) AddEndPoint(cell string, endPoint *pbt.EndPoint) {
key := discovery.EndPointToMapKey(endPoint)
item := &fhcItem{
eps: &discovery.EndPointStats{
EndPoint: endPoint,
Cell: cell,
},
}
fhc.items[key] = item
}
// RemoveEndPoint removes the endpoint, and stops the health check.
func (fhc *fakeHealthCheck) RemoveEndPoint(endPoint *pbt.EndPoint) {
key := discovery.EndPointToMapKey(endPoint)
delete(fhc.items, key)
}
// GetEndPointStatsFromKeyspaceShard returns all EndPointStats for the given keyspace/shard.
func (fhc *fakeHealthCheck) GetEndPointStatsFromKeyspaceShard(keyspace, shard string) []*discovery.EndPointStats {
fhc.GetStatsFromKeyspaceShardCounter++
var res []*discovery.EndPointStats
for _, item := range fhc.items {
if item.eps.Target == nil {
continue
}
if item.eps.Target.Keyspace == keyspace && item.eps.Target.Shard == shard {
res = append(res, item.eps)
}
}
return res
}
// GetEndPointStatsFromTarget returns all EndPointStats for the given target.
func (fhc *fakeHealthCheck) GetEndPointStatsFromTarget(keyspace, shard string, tabletType pbt.TabletType) []*discovery.EndPointStats {
fhc.GetStatsFromTargetCounter++
var res []*discovery.EndPointStats
for _, item := range fhc.items {
if item.eps.Target == nil {
continue
}
if item.eps.Target.Keyspace == keyspace && item.eps.Target.Shard == shard && item.eps.Target.TabletType == tabletType {
res = append(res, item.eps)
}
}
return res
}
// GetConnection returns the TabletConn of the given endpoint.
func (fhc *fakeHealthCheck) GetConnection(endPoint *pbt.EndPoint) tabletconn.TabletConn {
key := discovery.EndPointToMapKey(endPoint)
if item := fhc.items[key]; item != nil {
return item.conn
}
return nil
}
// CacheStatus returns a displayable version of the cache.
func (fhc *fakeHealthCheck) CacheStatus() discovery.EndPointsCacheStatusList {
return nil
}
func (fhc *fakeHealthCheck) addTestEndPoint(cell, host string, port int32, keyspace, shard string, tabletType pbt.TabletType, reparentTS int64, err error, conn tabletconn.TabletConn) *pbt.EndPoint {
ep := topo.NewEndPoint(0, host)
ep.PortMap["vt"] = port
key := discovery.EndPointToMapKey(ep)
item := fhc.items[key]
if item == nil {
fhc.AddEndPoint(cell, ep)
item = fhc.items[key]
}
item.eps.Target = &pbq.Target{Keyspace: keyspace, Shard: shard, TabletType: tabletType}
item.eps.TabletExternallyReparentedTimestamp = reparentTS
item.eps.LastError = err
item.conn = conn
return ep
}