diff --git a/go/vt/vtgate/discoverygateway.go b/go/vt/vtgate/discoverygateway.go index b42e6926d3..7cac2a999b 100644 --- a/go/vt/vtgate/discoverygateway.go +++ b/go/vt/vtgate/discoverygateway.go @@ -7,14 +7,15 @@ package vtgate import ( "errors" "flag" + "strings" "time" "golang.org/x/net/context" mproto "github.com/youtube/vitess/go/mysql/proto" "github.com/youtube/vitess/go/stats" + "github.com/youtube/vitess/go/vt/discovery" pbq "github.com/youtube/vitess/go/vt/proto/query" - pb "github.com/youtube/vitess/go/vt/proto/topodata" pbt "github.com/youtube/vitess/go/vt/proto/topodata" tproto "github.com/youtube/vitess/go/vt/tabletserver/proto" "github.com/youtube/vitess/go/vt/tabletserver/tabletconn" @@ -22,8 +23,9 @@ import ( ) var ( - cellsToWatch = flag.String("cells-to-watch", "", "comma-separated list of cells for watching endpoints") - refreshInterval = flag.Duration("endpoint-refresh-interval", 1*time.Minute, "endpoint refresh interval") + cellsToWatch = flag.String("cells-to-watch", "", "comma-separated list of cells for watching endpoints") + refreshInterval = flag.Duration("endpoint-refresh-interval", 1*time.Minute, "endpoint refresh interval") + topoReadConcurrency = flag.Int("topo-read-concurrency", 32, "concurrent topo reads") ) var errNotImplemented = errors.New("Not implemented") @@ -38,10 +40,11 @@ func init() { func createDiscoveryGateway(topoServer topo.Server, serv SrvTopoServer, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, connTimings *stats.MultiTimings) Gateway { return &discoveryGateway{ - topoServer: topoServer, - localCell: cell, - retryDelay: retryDelay, - connTimeout: connTimeoutTotal, + topoServer: topoServer, + localCell: cell, + retryDelay: retryDelay, + connTimeout: connTimeoutTotal, + tabletsWatchers: make([]*discovery.CellTabletsWatcher, 0, 1), } } @@ -51,58 +54,63 @@ type discoveryGateway struct { retryDelay time.Duration connTimeout time.Duration - //epWatcher *discovery.EndPointWatcher + tabletsWatchers []*discovery.CellTabletsWatcher } // InitializeConnections creates connections to VTTablets. func (dg *discoveryGateway) InitializeConnections(ctx context.Context) error { - //hc := discovery.NewHealthCheck(dg, dg.connTimeout, dg.retryDelay) - //epWatcher := discovery.NewEndPointWatcher(dg.topoServer, hc, *cellsToWatch, *refreshInterval) - //dg.epWatcher = epWatcher + hc := discovery.NewHealthCheck(dg, dg.connTimeout, dg.retryDelay) + for _, cell := range strings.Split(*cellsToWatch, ",") { + ctw := discovery.NewCellTabletsWatcher(dg.topoServer, hc, cell, *refreshInterval, *topoReadConcurrency) + dg.tabletsWatchers = append(dg.tabletsWatchers, ctw) + } return nil } // Execute executes the non-streaming query for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) { +func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) { return nil, errNotImplemented } // ExecuteBatch executes a group of queries for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) { +func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) { return nil, errNotImplemented } // StreamExecute executes a streaming query for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc) { +func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc) { return nil, func() error { return errNotImplemented } } // Begin starts a transaction for the specified keyspace, shard, and tablet type. // It returns the transaction ID. -func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pb.TabletType) (int64, error) { +func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pbt.TabletType) (int64, error) { return 0, errNotImplemented } // Commit commits the current transaction for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) Commit(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, transactionID int64) error { +func (dg *discoveryGateway) Commit(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error { return errNotImplemented } // Rollback rolls back the current transaction for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) Rollback(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, transactionID int64) error { +func (dg *discoveryGateway) Rollback(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error { return errNotImplemented } // SplitQuery splits a query into sub-queries for the specified keyspace, shard, and tablet type. -func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) { +func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) { return nil, errNotImplemented } // Close shuts down underlying connections. func (dg *discoveryGateway) Close() error { - //dg.epWatcher.Stop() + for _, ctw := range dg.tabletsWatchers { + ctw.Stop() + } return nil } +// StatsUpdate receives updates about target and realtime stats changes. func (dg *discoveryGateway) StatsUpdate(endPoint *pbt.EndPoint, cell string, target *pbq.Target, tabletExternallyReparentedTimestamp int64, stats *pbq.RealtimeStats) { } diff --git a/go/vt/vtgate/gateway.go b/go/vt/vtgate/gateway.go index 97addb7059..5ac0e4d0cf 100644 --- a/go/vt/vtgate/gateway.go +++ b/go/vt/vtgate/gateway.go @@ -74,6 +74,17 @@ func GetGatewayCreator() GatewayCreator { gc, ok := gatewayCreators[*GatewayImplementation] if !ok { log.Fatalf("No gateway registered as %s", *GatewayImplementation) + return nil + } + return gc +} + +// GetGatewayCreatorByName returns the GatewayCreator specified by the given name. +func GetGatewayCreatorByName(name string) GatewayCreator { + gc, ok := gatewayCreators[name] + if !ok { + log.Errorf("No gateway registered as %s", name) + return nil } return gc } diff --git a/go/vt/vtgate/scatter_conn.go b/go/vt/vtgate/scatter_conn.go index d820eb21e2..ae1b52f29e 100644 --- a/go/vt/vtgate/scatter_conn.go +++ b/go/vt/vtgate/scatter_conn.go @@ -31,7 +31,7 @@ type ScatterConn struct { timings *stats.MultiTimings tabletCallErrorCount *stats.MultiCounters gateway Gateway - additionalGateway Gateway + additionalGateway Gateway // temporarily to enable health checking } // shardActionFunc defines the contract for a shard action. Every such function @@ -61,7 +61,7 @@ func NewScatterConn(topoServer topo.Server, serv SrvTopoServer, statsName, cell // temporarily start other gateways for health checking if additionalGateway != "" { - if gc, ok := gatewayCreators[additionalGateway]; ok { + if gc := GetGatewayCreatorByName(additionalGateway); gc != nil { sc.additionalGateway = gc(topoServer, serv, cell, retryDelay, retryCount, connTimeoutTotal, connTimeoutPerConn, connLife, connTimings) } }