This commit is contained in:
Liang Guo 2015-09-17 15:33:37 -07:00
Родитель 7e8c841d8e
Коммит 126d914059
3 изменённых файлов: 40 добавлений и 21 удалений

Просмотреть файл

@ -7,14 +7,15 @@ package vtgate
import (
"errors"
"flag"
"strings"
"time"
"golang.org/x/net/context"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/discovery"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pb "github.com/youtube/vitess/go/vt/proto/topodata"
pbt "github.com/youtube/vitess/go/vt/proto/topodata"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
@ -22,8 +23,9 @@ import (
)
var (
cellsToWatch = flag.String("cells-to-watch", "", "comma-separated list of cells for watching endpoints")
refreshInterval = flag.Duration("endpoint-refresh-interval", 1*time.Minute, "endpoint refresh interval")
cellsToWatch = flag.String("cells-to-watch", "", "comma-separated list of cells for watching endpoints")
refreshInterval = flag.Duration("endpoint-refresh-interval", 1*time.Minute, "endpoint refresh interval")
topoReadConcurrency = flag.Int("topo-read-concurrency", 32, "concurrent topo reads")
)
var errNotImplemented = errors.New("Not implemented")
@ -38,10 +40,11 @@ func init() {
func createDiscoveryGateway(topoServer topo.Server, serv SrvTopoServer, cell string, retryDelay time.Duration, retryCount int, connTimeoutTotal, connTimeoutPerConn, connLife time.Duration, connTimings *stats.MultiTimings) Gateway {
return &discoveryGateway{
topoServer: topoServer,
localCell: cell,
retryDelay: retryDelay,
connTimeout: connTimeoutTotal,
topoServer: topoServer,
localCell: cell,
retryDelay: retryDelay,
connTimeout: connTimeoutTotal,
tabletsWatchers: make([]*discovery.CellTabletsWatcher, 0, 1),
}
}
@ -51,58 +54,63 @@ type discoveryGateway struct {
retryDelay time.Duration
connTimeout time.Duration
//epWatcher *discovery.EndPointWatcher
tabletsWatchers []*discovery.CellTabletsWatcher
}
// InitializeConnections creates connections to VTTablets.
func (dg *discoveryGateway) InitializeConnections(ctx context.Context) error {
//hc := discovery.NewHealthCheck(dg, dg.connTimeout, dg.retryDelay)
//epWatcher := discovery.NewEndPointWatcher(dg.topoServer, hc, *cellsToWatch, *refreshInterval)
//dg.epWatcher = epWatcher
hc := discovery.NewHealthCheck(dg, dg.connTimeout, dg.retryDelay)
for _, cell := range strings.Split(*cellsToWatch, ",") {
ctw := discovery.NewCellTabletsWatcher(dg.topoServer, hc, cell, *refreshInterval, *topoReadConcurrency)
dg.tabletsWatchers = append(dg.tabletsWatchers, ctw)
}
return nil
}
// Execute executes the non-streaming query for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
func (dg *discoveryGateway) Execute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
return nil, errNotImplemented
}
// ExecuteBatch executes a group of queries for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) {
func (dg *discoveryGateway) ExecuteBatch(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, queries []tproto.BoundQuery, asTransaction bool, transactionID int64) (*tproto.QueryResultList, error) {
return nil, errNotImplemented
}
// StreamExecute executes a streaming query for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc) {
func (dg *discoveryGateway) StreamExecute(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, query string, bindVars map[string]interface{}, transactionID int64) (<-chan *mproto.QueryResult, tabletconn.ErrFunc) {
return nil, func() error { return errNotImplemented }
}
// Begin starts a transaction for the specified keyspace, shard, and tablet type.
// It returns the transaction ID.
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pb.TabletType) (int64, error) {
func (dg *discoveryGateway) Begin(ctx context.Context, keyspace string, shard string, tabletType pbt.TabletType) (int64, error) {
return 0, errNotImplemented
}
// Commit commits the current transaction for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Commit(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, transactionID int64) error {
func (dg *discoveryGateway) Commit(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error {
return errNotImplemented
}
// Rollback rolls back the current transaction for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) Rollback(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, transactionID int64) error {
func (dg *discoveryGateway) Rollback(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, transactionID int64) error {
return errNotImplemented
}
// SplitQuery splits a query into sub-queries for the specified keyspace, shard, and tablet type.
func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pb.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) {
func (dg *discoveryGateway) SplitQuery(ctx context.Context, keyspace, shard string, tabletType pbt.TabletType, sql string, bindVariables map[string]interface{}, splitColumn string, splitCount int) ([]tproto.QuerySplit, error) {
return nil, errNotImplemented
}
// Close shuts down underlying connections.
func (dg *discoveryGateway) Close() error {
//dg.epWatcher.Stop()
for _, ctw := range dg.tabletsWatchers {
ctw.Stop()
}
return nil
}
// StatsUpdate receives updates about target and realtime stats changes.
func (dg *discoveryGateway) StatsUpdate(endPoint *pbt.EndPoint, cell string, target *pbq.Target, tabletExternallyReparentedTimestamp int64, stats *pbq.RealtimeStats) {
}

Просмотреть файл

@ -74,6 +74,17 @@ func GetGatewayCreator() GatewayCreator {
gc, ok := gatewayCreators[*GatewayImplementation]
if !ok {
log.Fatalf("No gateway registered as %s", *GatewayImplementation)
return nil
}
return gc
}
// GetGatewayCreatorByName returns the GatewayCreator specified by the given name.
func GetGatewayCreatorByName(name string) GatewayCreator {
gc, ok := gatewayCreators[name]
if !ok {
log.Errorf("No gateway registered as %s", name)
return nil
}
return gc
}

Просмотреть файл

@ -31,7 +31,7 @@ type ScatterConn struct {
timings *stats.MultiTimings
tabletCallErrorCount *stats.MultiCounters
gateway Gateway
additionalGateway Gateway
additionalGateway Gateway // temporarily to enable health checking
}
// shardActionFunc defines the contract for a shard action. Every such function
@ -61,7 +61,7 @@ func NewScatterConn(topoServer topo.Server, serv SrvTopoServer, statsName, cell
// temporarily start other gateways for health checking
if additionalGateway != "" {
if gc, ok := gatewayCreators[additionalGateway]; ok {
if gc := GetGatewayCreatorByName(additionalGateway); gc != nil {
sc.additionalGateway = gc(topoServer, serv, cell, retryDelay, retryCount, connTimeoutTotal, connTimeoutPerConn, connLife, connTimings)
}
}