Add UpdateTabletEndpoints for lock-free single-tablet updates.

This commit is contained in:
Anthony Yeh 2015-06-05 01:39:58 -07:00
Родитель e7feb071ba
Коммит 439b778999
2 изменённых файлов: 233 добавлений и 0 удалений

Просмотреть файл

@ -232,3 +232,155 @@ func rebuildCellSrvShard(ctx context.Context, log logutil.Logger, ts topo.Server
wg.Wait()
return rec.Error()
}
func updateEndpoint(ctx context.Context, ts topo.Server, cell, keyspace, shard string, tabletType topo.TabletType, endpoint *topo.EndPoint) error {
return retryUpdateEndpoints(ctx, ts, cell, keyspace, shard, tabletType, true, /* create */
func(endpoints *topo.EndPoints) bool {
// Look for an existing entry to update.
for i := range endpoints.Entries {
if endpoints.Entries[i].Uid == endpoint.Uid {
if topo.EndPointEquality(&endpoints.Entries[i], endpoint) {
// The entry already exists and is the same.
return false
}
// Update an existing entry.
endpoints.Entries[i] = *endpoint
return true
}
}
// The entry doesn't exist, so add it.
endpoints.Entries = append(endpoints.Entries, *endpoint)
return true
})
}
func removeEndpoint(ctx context.Context, ts topo.Server, cell, keyspace, shard string, tabletType topo.TabletType, tabletUID uint32) error {
err := retryUpdateEndpoints(ctx, ts, cell, keyspace, shard, tabletType, false, /* create */
func(endpoints *topo.EndPoints) bool {
// Make a new list, excluding the given UID.
entries := make([]topo.EndPoint, 0, len(endpoints.Entries))
for _, ep := range endpoints.Entries {
if ep.Uid != tabletUID {
entries = append(entries, ep)
}
}
if len(entries) == len(endpoints.Entries) {
// Nothing was removed. Don't bother updating.
return false
}
// Do the update.
endpoints.Entries = entries
return true
})
if err == topo.ErrNoNode {
// Our goal is to remove one endpoint. If the list is empty, we're fine.
err = nil
}
return err
}
func retryUpdateEndpoints(ctx context.Context, ts topo.Server, cell, keyspace, shard string, tabletType topo.TabletType, create bool, updateFunc func(*topo.EndPoints) bool) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Get or create EndPoints list.
endpoints, version, err := ts.GetEndPoints(ctx, cell, keyspace, shard, tabletType)
if err == topo.ErrNoNode && create {
// Create instead of updating.
endpoints = &topo.EndPoints{}
if !updateFunc(endpoints) {
// Nothing changed.
return nil
}
err = ts.CreateEndPoints(ctx, cell, keyspace, shard, tabletType, endpoints)
if err == topo.ErrNodeExists {
// Someone else beat us to it. Try again.
continue
}
return err
}
if err != nil {
return err
}
// We got an existing EndPoints list. Try to update.
if !updateFunc(endpoints) {
// Nothing changed.
return nil
}
// If there's nothing left, we should delete the list entirely.
if len(endpoints.Entries) == 0 {
err = ts.DeleteEndPoints(ctx, cell, keyspace, shard, tabletType, version)
switch err {
case topo.ErrNoNode:
// Someone beat us to it, which is fine.
return nil
case topo.ErrBadVersion:
// Someone else updated the list. Try again.
continue
}
return err
}
err = ts.UpdateEndPoints(ctx, cell, keyspace, shard, tabletType, endpoints, version)
if err == topo.ErrBadVersion || (err == topo.ErrNoNode && create) {
// Someone else updated or deleted the list in the meantime. Try again.
continue
}
return err
}
}
// UpdateTabletEndpoints fixes up any entries in the serving graph that relate
// to a given tablet.
func UpdateTabletEndpoints(ctx context.Context, ts topo.Server, tablet *topo.Tablet) error {
srvTypes, err := ts.GetSrvTabletTypesPerShard(ctx, tablet.Alias.Cell, tablet.Keyspace, tablet.Shard)
if err != nil {
if err != topo.ErrNoNode {
return err
}
// It's fine if there are no existing types.
srvTypes = nil
}
wg := sync.WaitGroup{}
errs := concurrency.AllErrorRecorder{}
// Update the list that the tablet is supposed to be in (if any).
if tablet.IsInServingGraph() {
endpoint, err := tablet.EndPoint()
if err != nil {
return err
}
wg.Add(1)
go func() {
defer wg.Done()
errs.RecordError(
updateEndpoint(ctx, ts, tablet.Alias.Cell, tablet.Keyspace, tablet.Shard,
tablet.Type, endpoint))
}()
}
// Remove it from any other lists it isn't supposed to be in.
for _, srvType := range srvTypes {
if srvType != tablet.Type {
wg.Add(1)
go func(tabletType topo.TabletType) {
defer wg.Done()
errs.RecordError(
removeEndpoint(ctx, ts, tablet.Alias.Cell, tablet.Keyspace, tablet.Shard,
tabletType, tablet.Alias.Uid))
}(srvType)
}
}
wg.Wait()
return errs.Error()
}

Просмотреть файл

@ -142,3 +142,84 @@ func TestRebuildShardRace(t *testing.T) {
t.Errorf("second change was overwritten by first rebuild finishing late")
}
}
func TestUpdateTabletEndpoints(t *testing.T) {
ctx := context.Background()
cell := "test_cell"
// Set up topology.
ts := zktopo.NewTestServer(t, []string{cell})
si, err := GetOrCreateShard(ctx, ts, testKeyspace, testShard)
if err != nil {
t.Fatalf("GetOrCreateShard: %v", err)
}
si.Cells = append(si.Cells, cell)
if err := topo.UpdateShard(ctx, ts, si); err != nil {
t.Fatalf("UpdateShard: %v", err)
}
tablet1 := addTablet(ctx, t, ts, 1, cell, topo.TYPE_MASTER).Tablet
tablet2 := addTablet(ctx, t, ts, 2, cell, topo.TYPE_REPLICA).Tablet
update := func(tablet *topo.Tablet) {
if err := UpdateTabletEndpoints(ctx, ts, tablet); err != nil {
t.Fatalf("UpdateTabletEndpoints(%v): %v", tablet, err)
}
}
expect := func(tabletType topo.TabletType, want int) {
eps, _, err := ts.GetEndPoints(ctx, cell, testKeyspace, testShard, tabletType)
if err != nil && err != topo.ErrNoNode {
t.Errorf("GetEndPoints(%v): %v", tabletType, err)
return
}
var got int
if err == nil {
got = len(eps.Entries)
if got == 0 {
t.Errorf("len(EndPoints) = 0, expected ErrNoNode instead")
}
}
if got != want {
t.Errorf("len(GetEndPoints(%v)) = %v, want %v. EndPoints = %v", tabletType, len(eps.Entries), want, eps)
}
}
// Update tablets. This should create the serving graph dirs too.
update(tablet1)
expect(topo.TYPE_MASTER, 1)
update(tablet2)
expect(topo.TYPE_REPLICA, 1)
// Re-update an identical tablet.
update(tablet1)
expect(topo.TYPE_MASTER, 1)
// Change a tablet, but keep it the same type.
tablet2.Hostname += "extra"
update(tablet2)
expect(topo.TYPE_REPLICA, 1)
// Move the master to replica.
tablet1.Type = topo.TYPE_REPLICA
update(tablet1)
expect(topo.TYPE_MASTER, 0)
expect(topo.TYPE_REPLICA, 2)
// Take a replica out of serving.
tablet1.Type = topo.TYPE_SPARE
update(tablet1)
expect(topo.TYPE_MASTER, 0)
expect(topo.TYPE_REPLICA, 1)
// Put it back to serving.
tablet1.Type = topo.TYPE_REPLICA
update(tablet1)
expect(topo.TYPE_MASTER, 0)
expect(topo.TYPE_REPLICA, 2)
// Move a replica to master.
tablet2.Type = topo.TYPE_MASTER
update(tablet2)
expect(topo.TYPE_MASTER, 1)
expect(topo.TYPE_REPLICA, 1)
}