Merge pull request #973 from alainjobart/replication

Replication
This commit is contained in:
Alain Jobart 2015-08-07 13:21:01 -07:00
Родитель fb684f89d4 7d1225e19c
Коммит f5b1c7697c
16 изменённых файлов: 115 добавлений и 115 удалений

Просмотреть файл

@ -82,7 +82,7 @@ func (th *tabletHealth) stream(ctx context.Context, ts topo.Server, tabletAlias
if err != nil {
return err
}
ep, err := ti.EndPoint()
ep, err := topo.TabletEndPoint(ti.Tablet)
if err != nil {
return err
}

Просмотреть файл

@ -40,9 +40,9 @@ import (
// |----01
// |--- create_table_table_03.sql
// Schema Change Files: ${keyspace}/input/*.sql
// Error Files: ${keysapce}/error/${YYYY}/${MM}/${DD}/*.sql
// Log Files: ${keysapce}/log/${YYYY}/${MM}/${DD}/*.sql
// Complete Files: ${keysapce}/compelte/${YYYY}/${MM}/${DD}/*.sql
// Error Files: ${keyspace}/error/${YYYY}/${MM}/${DD}/*.sql
// Log Files: ${keyspace}/log/${YYYY}/${MM}/${DD}/*.sql
// Complete Files: ${keyspace}/complete/${YYYY}/${MM}/${DD}/*.sql
type LocalController struct {
schemaChangeDir string
keyspace string

Просмотреть файл

@ -54,7 +54,7 @@ func (agent *ActionAgent) allowQueries(tablet *topo.Tablet, blacklistedTables []
if agent.DBConfigs != nil {
// Update our DB config to match the info we have in the tablet
if agent.DBConfigs.App.DbName == "" {
agent.DBConfigs.App.DbName = tablet.DbName()
agent.DBConfigs.App.DbName = topo.TabletDbName(tablet)
}
agent.DBConfigs.App.Keyspace = tablet.Keyspace
agent.DBConfigs.App.Shard = tablet.Shard
@ -114,7 +114,7 @@ func (agent *ActionAgent) loadKeyspaceAndBlacklistRules(tablet *topo.Tablet, bla
blacklistRules := tabletserver.NewQueryRules()
if len(blacklistedTables) > 0 {
// tables, first resolve wildcards
tables, err := mysqlctl.ResolveTables(agent.MysqlDaemon, tablet.DbName(), blacklistedTables)
tables, err := mysqlctl.ResolveTables(agent.MysqlDaemon, topo.TabletDbName(tablet), blacklistedTables)
if err != nil {
return err
}
@ -149,7 +149,7 @@ func (agent *ActionAgent) changeCallback(ctx context.Context, oldTablet, newTabl
span.StartLocal("ActionAgent.changeCallback")
defer span.Finish()
allowQuery := newTablet.IsRunningQueryService()
allowQuery := topo.IsRunningQueryService(newTablet.Type)
// Read the shard to get SourceShards / TabletControlMap if
// we're going to use it.

Просмотреть файл

@ -376,7 +376,7 @@ func (agent *ActionAgent) verifyTopology(ctx context.Context) error {
func (agent *ActionAgent) verifyServingAddrs(ctx context.Context) error {
ti := agent.Tablet()
if !ti.IsRunningQueryService() {
if !topo.IsRunningQueryService(ti.Type) {
return nil
}

Просмотреть файл

@ -434,7 +434,7 @@ func (blm *BinlogPlayerMap) RefreshMap(ctx context.Context, tablet *topo.Tablet,
blm.mu.Lock()
if blm.dbConfig.DbName == "" {
blm.dbConfig.DbName = tablet.DbName()
blm.dbConfig.DbName = topo.TabletDbName(tablet)
}
// get the existing sources and build a map of sources to remove
@ -447,7 +447,7 @@ func (blm *BinlogPlayerMap) RefreshMap(ctx context.Context, tablet *topo.Tablet,
// for each source, add it if not there, and delete from toRemove
for _, sourceShard := range shardInfo.SourceShards {
blm.addPlayer(ctx, tablet.Alias.Cell, keyspaceInfo.ShardingColumnType, key.KeyRangeToProto(tablet.KeyRange), sourceShard, tablet.DbName())
blm.addPlayer(ctx, tablet.Alias.Cell, keyspaceInfo.ShardingColumnType, key.KeyRangeToProto(tablet.KeyRange), sourceShard, topo.TabletDbName(tablet))
delete(toRemove, sourceShard.Uid)
}
hasPlayers := len(shardInfo.SourceShards) > 0

Просмотреть файл

@ -291,7 +291,7 @@ func (agent *ActionAgent) runHealthCheck(targetTabletType topo.TabletType) {
if tablet.Type == topo.TYPE_SPARE {
newTabletType = targetTabletType
}
if tablet.Type == newTabletType && tablet.IsHealthEqual(health) {
if tablet.Type == newTabletType && topo.IsHealthEqual(health, tablet.Health) {
// no change in health, not logging anything,
// and we're done
return

Просмотреть файл

@ -164,8 +164,8 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int) error {
if gRPCPort != 0 {
tablet.Portmap["grpc"] = gRPCPort
}
if err := tablet.Complete(); err != nil {
return fmt.Errorf("InitTablet tablet.Complete failed: %v", err)
if err := topo.TabletComplete(tablet); err != nil {
return fmt.Errorf("InitTablet TabletComplete failed: %v", err)
}
// now try to create the record
@ -173,7 +173,7 @@ func (agent *ActionAgent) InitTablet(port, gRPCPort int) error {
switch err {
case nil:
// it worked, we're good, can update the replication graph
if tablet.IsInReplicationGraph() {
if topo.IsInReplicationGraph(tablet.Type) {
if err := topo.UpdateTabletReplicationData(ctx, agent.TopoServer, tablet); err != nil {
return fmt.Errorf("UpdateTabletReplicationData failed: %v", err)
}

Просмотреть файл

@ -105,7 +105,7 @@ func (agent *ActionAgent) TabletExternallyReparented(ctx context.Context, extern
// this will be enough for clients to re-resolve the new master.
event.DispatchUpdate(ev, "writing new master endpoint")
log.Infof("fastTabletExternallyReparented: writing new master endpoint to serving graph")
ep, err := tablet.EndPoint()
ep, err := topo.TabletEndPoint(tablet.Tablet)
if err != nil {
return fmt.Errorf("fastTabletExternallyReparented: failed to generate EndPoint for tablet %v: %v", tablet.Alias, err)
}

Просмотреть файл

@ -15,7 +15,6 @@ import (
"golang.org/x/net/context"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/jscfg"
"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/trace"
"github.com/youtube/vitess/go/vt/key"
@ -356,20 +355,25 @@ type Tablet struct {
KeyRange key.KeyRange
}
// String returns a string describing the tablet.
func (tablet *Tablet) String() string {
return fmt.Sprintf("Tablet{%v}", tablet.Alias)
}
// ValidatePortmap returns an error if the tablet's portmap doesn't
// contain all the necessary ports for the tablet to be fully
// operational. We only care about vt port now, as mysql may not even
// be running.
func (tablet *Tablet) ValidatePortmap() error {
func TabletValidatePortMap(tablet *Tablet) error {
if _, ok := tablet.Portmap["vt"]; !ok {
return fmt.Errorf("no vt port available")
}
return nil
}
// EndPoint returns an EndPoint associated with the tablet record
func (tablet *Tablet) EndPoint() (*pb.EndPoint, error) {
if err := tablet.ValidatePortmap(); err != nil {
// TabletEndPoint returns an EndPoint associated with the tablet record
func TabletEndPoint(tablet *Tablet) (*pb.EndPoint, error) {
if err := TabletValidatePortMap(tablet); err != nil {
return nil, err
}
@ -387,24 +391,9 @@ func (tablet *Tablet) EndPoint() (*pb.EndPoint, error) {
return entry, nil
}
// Addr returns hostname:vt port.
func (tablet *Tablet) Addr() string {
return netutil.JoinHostPort(tablet.Hostname, int32(tablet.Portmap["vt"]))
}
// MysqlAddr returns hostname:mysql port.
func (tablet *Tablet) MysqlAddr() string {
return netutil.JoinHostPort(tablet.Hostname, int32(tablet.Portmap["mysql"]))
}
// MysqlIPAddr returns ip:mysql port.
func (tablet *Tablet) MysqlIPAddr() string {
return netutil.JoinHostPort(tablet.IPAddr, int32(tablet.Portmap["mysql"]))
}
// DbName is usually implied by keyspace. Having the shard information in the
// TabletDbName is usually implied by keyspace. Having the shard information in the
// database name complicates mysql replication.
func (tablet *Tablet) DbName() string {
func TabletDbName(tablet *Tablet) string {
if tablet.DbNameOverride != "" {
return tablet.DbNameOverride
}
@ -414,58 +403,9 @@ func (tablet *Tablet) DbName() string {
return vtDbPrefix + tablet.Keyspace
}
// IsInServingGraph returns if this tablet is in the serving graph
func (tablet *Tablet) IsInServingGraph() bool {
return IsInServingGraph(tablet.Type)
}
// IsRunningQueryService returns if this tablet should be running
// the query service.
func (tablet *Tablet) IsRunningQueryService() bool {
return IsRunningQueryService(tablet.Type)
}
// IsInReplicationGraph returns if this tablet is in the replication graph.
func (tablet *Tablet) IsInReplicationGraph() bool {
return IsInReplicationGraph(tablet.Type)
}
// IsSlaveType returns if this tablet's type is a slave
func (tablet *Tablet) IsSlaveType() bool {
return IsSlaveType(tablet.Type)
}
// IsAssigned returns if this tablet ever assigned data? A "scrap" node will
// show up as assigned even though its data cannot be used for serving.
func (tablet *Tablet) IsAssigned() bool {
return tablet.Keyspace != "" && tablet.Shard != ""
}
// String returns a string describing the tablet.
func (tablet *Tablet) String() string {
return fmt.Sprintf("Tablet{%v}", tablet.Alias)
}
// JSON returns a json verison of the tablet.
func (tablet *Tablet) JSON() string {
return jscfg.ToJSON(tablet)
}
// TabletInfo is the container for a Tablet, read from the topology server.
type TabletInfo struct {
version int64 // node version - used to prevent stomping concurrent writes
*Tablet
}
// Version returns the version of this tablet from last time it was read or
// updated.
func (ti *TabletInfo) Version() int64 {
return ti.version
}
// Complete validates and normalizes the tablet. If the shard name
// TabletComplete validates and normalizes the tablet. If the shard name
// contains a '-' it is going to try to infer the keyrange from it.
func (tablet *Tablet) Complete() error {
func TabletComplete(tablet *Tablet) error {
shard, kr, err := ValidateShardName(tablet.Shard)
if err != nil {
return err
@ -475,14 +415,64 @@ func (tablet *Tablet) Complete() error {
return nil
}
// IsHealthEqual compares the tablet's health with the passed one, and
// TabletInfo is the container for a Tablet, read from the topology server.
type TabletInfo struct {
version int64 // node version - used to prevent stomping concurrent writes
*Tablet
}
// Addr returns hostname:vt port.
func (tablet *TabletInfo) Addr() string {
return netutil.JoinHostPort(tablet.Hostname, int32(tablet.Portmap["vt"]))
}
// MysqlAddr returns hostname:mysql port.
func (tablet *TabletInfo) MysqlAddr() string {
return netutil.JoinHostPort(tablet.Hostname, int32(tablet.Portmap["mysql"]))
}
// IsAssigned returns if this tablet ever assigned data?
// A "scrap" node will show up as assigned even though its data
// cannot be used for serving.
func (tablet *TabletInfo) IsAssigned() bool {
return tablet.Keyspace != "" && tablet.Shard != ""
}
// DbName is usually implied by keyspace. Having the shard information in the
// database name complicates mysql replication.
func (tablet *TabletInfo) DbName() string {
return TabletDbName(tablet.Tablet)
}
// Version returns the version of this tablet from last time it was read or
// updated.
func (ti *TabletInfo) Version() int64 {
return ti.version
}
// IsInServingGraph returns if this tablet is in the serving graph
func (tablet *TabletInfo) IsInServingGraph() bool {
return IsInServingGraph(tablet.Type)
}
// IsInReplicationGraph returns if this tablet is in the replication graph.
func (tablet *TabletInfo) IsInReplicationGraph() bool {
return IsInReplicationGraph(tablet.Type)
}
// IsSlaveType returns if this tablet's type is a slave
func (tablet *TabletInfo) IsSlaveType() bool {
return IsSlaveType(tablet.Type)
}
// IsHealthEqual compares the two health maps, and
// returns true if they're equivalent.
func (tablet *Tablet) IsHealthEqual(health map[string]string) bool {
if len(health) == 0 && len(tablet.Health) == 0 {
func IsHealthEqual(left, right map[string]string) bool {
if len(left) == 0 && len(right) == 0 {
return true
}
return reflect.DeepEqual(health, tablet.Health)
return reflect.DeepEqual(left, right)
}
// NewTabletInfo returns a TabletInfo basing on tablet with the
@ -593,7 +583,7 @@ func CreateTablet(ctx context.Context, ts Server, tablet *Tablet) error {
}
// Then add the tablet to the replication graphs
if !tablet.IsInReplicationGraph() {
if !IsInReplicationGraph(tablet.Type) {
return nil
}

Просмотреть файл

@ -110,7 +110,7 @@ func rebuildCellSrvShard(ctx context.Context, log logutil.Logger, ts topo.Server
endpoints = topo.NewEndPoints()
serving[tablet.Type] = endpoints
}
entry, err := tablet.EndPoint()
entry, err := topo.TabletEndPoint(tablet.Tablet)
if err != nil {
log.Warningf("EndPointForTablet failed for tablet %v: %v", tablet.Alias, err)
continue
@ -365,8 +365,8 @@ func UpdateTabletEndpoints(ctx context.Context, ts topo.Server, tablet *topo.Tab
errs := concurrency.AllErrorRecorder{}
// Update the list that the tablet is supposed to be in (if any).
if tablet.IsInServingGraph() {
endpoint, err := tablet.EndPoint()
if topo.IsInServingGraph(tablet.Type) {
endpoint, err := topo.TabletEndPoint(tablet)
if err != nil {
return err
}

Просмотреть файл

@ -41,7 +41,7 @@ func (tn *TabletNode) ShortName() string {
}
func newTabletNodeFromTabletInfo(ti *topo.TabletInfo) *TabletNode {
if err := ti.ValidatePortmap(); err != nil {
if err := topo.TabletValidatePortMap(ti.Tablet); err != nil {
log.Errorf("ValidatePortmap(%v): %v", ti.Alias, err)
}
return &TabletNode{

Просмотреть файл

@ -218,7 +218,7 @@ func commandVtTabletExecute(ctx context.Context, wr *wrangler.Wrangler, subFlags
if err != nil {
return err
}
ep, err := tabletInfo.EndPoint()
ep, err := topo.TabletEndPoint(tabletInfo.Tablet)
if err != nil {
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}
@ -255,7 +255,7 @@ func commandVtTabletStreamHealth(ctx context.Context, wr *wrangler.Wrangler, sub
return err
}
ep, err := tabletInfo.EndPoint()
ep, err := topo.TabletEndPoint(tabletInfo.Tablet)
if err != nil {
return fmt.Errorf("cannot get EndPoint from tablet record: %v", err)
}

Просмотреть файл

@ -40,7 +40,7 @@ func NewQueryResultReaderForTablet(ctx context.Context, ts topo.Server, tabletAl
return nil, err
}
endPoint, err := tablet.EndPoint()
endPoint, err := topo.TabletEndPoint(tablet.Tablet)
if err != nil {
return nil, err
}

Просмотреть файл

@ -346,7 +346,7 @@ func (wr *Wrangler) applySchemaShardComplex(ctx context.Context, statusArray []*
if err != nil {
return nil, err
}
typeChangeRequired := ti.Tablet.IsInServingGraph()
typeChangeRequired := ti.IsInServingGraph()
if typeChangeRequired {
// note we want to update the serving graph there
err = wr.changeTypeInternal(ctx, ti.Alias, topo.TYPE_SCHEMA_UPGRADE)
@ -454,13 +454,13 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias topo.
}
}
createSql := sourceSd.ToSQLStrings()
createSQL := sourceSd.ToSQLStrings()
destTabletInfo, err := wr.ts.GetTablet(ctx, topo.ProtoToTabletAlias(destShardInfo.MasterAlias))
if err != nil {
return err
}
for i, sqlLine := range createSql {
err = wr.applySqlShard(ctx, destTabletInfo, sqlLine, i == len(createSql)-1)
for i, sqlLine := range createSQL {
err = wr.applySQLShard(ctx, destTabletInfo, sqlLine, i == len(createSQL)-1)
if err != nil {
return err
}
@ -468,14 +468,14 @@ func (wr *Wrangler) CopySchemaShard(ctx context.Context, sourceTabletAlias topo.
return nil
}
// applySqlShard applies a given SQL change on a given tablet alias. It allows executing arbitrary
// applySQLShard applies a given SQL change on a given tablet alias. It allows executing arbitrary
// SQL statements, but doesn't return any results, so it's only useful for SQL statements
// that would be run for their effects (e.g., CREATE).
// It works by applying the SQL statement on the shard's master tablet with replication turned on.
// Thus it should be used only for changes that can be applied on a live instance without causing issues;
// it shouldn't be used for anything that will require a pivot.
// The SQL statement string is expected to have {{.DatabaseName}} in place of the actual db name.
func (wr *Wrangler) applySqlShard(ctx context.Context, tabletInfo *topo.TabletInfo, change string, reloadSchema bool) error {
func (wr *Wrangler) applySQLShard(ctx context.Context, tabletInfo *topo.TabletInfo, change string, reloadSchema bool) error {
filledChange, err := fillStringTemplate(change, map[string]string{"DatabaseName": tabletInfo.DbName()})
if err != nil {
return fmt.Errorf("fillStringTemplate failed: %v", err)

Просмотреть файл

@ -25,11 +25,11 @@ import (
// If Force is true, and a tablet with the same ID already exists, it
// will be scrapped and deleted, and then recreated.
func (wr *Wrangler) InitTablet(ctx context.Context, tablet *topo.Tablet, force, createShardAndKeyspace, update bool) error {
if err := tablet.Complete(); err != nil {
if err := topo.TabletComplete(tablet); err != nil {
return err
}
if tablet.IsInReplicationGraph() {
if topo.IsInReplicationGraph(tablet.Type) {
// get the shard, possibly creating it
var err error
var si *topo.ShardInfo
@ -75,7 +75,7 @@ func (wr *Wrangler) InitTablet(ctx context.Context, tablet *topo.Tablet, force,
wr.Logger().Warningf("failed updating tablet %v: %v", tablet.Alias, err)
// now fall through the Scrap case
} else {
if !tablet.IsInReplicationGraph() {
if !topo.IsInReplicationGraph(tablet.Type) {
return nil
}
@ -115,7 +115,7 @@ func (wr *Wrangler) Scrap(ctx context.Context, tabletAlias topo.TabletAlias, for
if err != nil {
return err
}
rebuildRequired := ti.Tablet.IsInServingGraph()
rebuildRequired := ti.IsInServingGraph()
wasMaster := ti.Type == topo.TYPE_MASTER
if force {
@ -219,13 +219,13 @@ func (wr *Wrangler) ChangeTypeNoRebuild(ctx context.Context, tabletAlias topo.Ta
}
}
if !ti.Tablet.IsInServingGraph() {
if !ti.IsInServingGraph() {
// re-read the tablet, see if we become serving
ti, err = wr.ts.GetTablet(ctx, tabletAlias)
if err != nil {
return false, "", "", "", err
}
if !ti.Tablet.IsInServingGraph() {
if !ti.IsInServingGraph() {
return false, "", "", "", nil
}
}
@ -240,7 +240,7 @@ func (wr *Wrangler) changeTypeInternal(ctx context.Context, tabletAlias topo.Tab
if err != nil {
return err
}
rebuildRequired := ti.Tablet.IsInServingGraph()
rebuildRequired := ti.IsInServingGraph()
// change the type
if err := wr.tmc.ChangeType(ctx, ti, dbType); err != nil {

Просмотреть файл

@ -52,8 +52,13 @@ func tabletInfoFromJSON(data string, version int64) (*topo.TabletInfo, error) {
func (zkts *Server) CreateTablet(ctx context.Context, tablet *topo.Tablet) error {
zkTabletPath := TabletPathForAlias(tablet.Alias)
data, err := json.MarshalIndent(tablet, " ", " ")
if err != nil {
return err
}
// Create /zk/<cell>/vt/tablets/<uid>
_, err := zk.CreateRecursive(zkts.zconn, zkTabletPath, tablet.JSON(), 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
_, err = zk.CreateRecursive(zkts.zconn, zkTabletPath, string(data), 0, zookeeper.WorldACL(zookeeper.PERM_ALL))
if err != nil {
if zookeeper.IsError(err, zookeeper.ZNODEEXISTS) {
err = topo.ErrNodeExists
@ -71,7 +76,12 @@ func (zkts *Server) CreateTablet(ctx context.Context, tablet *topo.Tablet) error
// UpdateTablet is part of the topo.Server interface
func (zkts *Server) UpdateTablet(ctx context.Context, tablet *topo.TabletInfo, existingVersion int64) (int64, error) {
zkTabletPath := TabletPathForAlias(tablet.Alias)
stat, err := zkts.zconn.Set(zkTabletPath, tablet.JSON(), int(existingVersion))
data, err := json.MarshalIndent(tablet.Tablet, " ", " ")
if err != nil {
return 0, err
}
stat, err := zkts.zconn.Set(zkTabletPath, string(data), int(existingVersion))
if err != nil {
if zookeeper.IsError(err, zookeeper.ZBADVERSION) {
err = topo.ErrBadVersion