This commit is contained in:
Liang Guo 2015-06-18 17:49:11 -07:00
Родитель 81813d2db7
Коммит 390078c10e
1 изменённых файлов: 24 добавлений и 24 удалений

Просмотреть файл

@ -23,7 +23,7 @@ import (
var (
srvTopoCacheTTL = flag.Duration("srv_topo_cache_ttl", 1*time.Second, "how long to use cached entries for topology")
enableRemoteMaster = flag.Bool("enable_remote_master", false, "enable remote master access")
srvTopoTimeout = flag.Duration("srv_topo_timeout", 1*time.Second, "topo server timeout")
srvTopoTimeout = flag.Duration("srv_topo_timeout", 2*time.Second, "topo server timeout")
)
const (
@ -242,17 +242,17 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, c
}
// not in cache or too old, get the real value
ctx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, err := server.topoServer.GetSrvKeyspaceNames(ctx, cell)
result, err := server.topoServer.GetSrvKeyspaceNames(newCtx, cell)
if err != nil {
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvKeyspaceNames(%v, %v) failed: %v (no cached value, caching and returning error)", ctx, cell, err)
log.Errorf("GetSrvKeyspaceNames(%v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspaceNames(%v, %v) failed: %v (returning cached value: %v %v)", ctx, cell, err, entry.value, entry.lastError)
log.Warningf("GetSrvKeyspaceNames(%v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
}
@ -261,7 +261,7 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, c
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastErrorCtx = ctx
entry.lastErrorCtx = newCtx
return result, err
}
@ -294,17 +294,17 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell,
}
// not in cache or too old, get the real value
ctx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, err := server.topoServer.GetSrvKeyspace(ctx, cell, keyspace)
result, err := server.topoServer.GetSrvKeyspace(newCtx, cell, keyspace)
if err != nil {
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvKeyspace(%v, %v, %v) failed: %v (no cached value, caching and returning error)", ctx, cell, keyspace, err)
log.Errorf("GetSrvKeyspace(%v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvKeyspace(%v, %v, %v) failed: %v (returning cached value: %v %v)", ctx, cell, keyspace, err, entry.value, entry.lastError)
log.Warningf("GetSrvKeyspace(%v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
}
@ -313,7 +313,7 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell,
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastErrorCtx = ctx
entry.lastErrorCtx = newCtx
return result, err
}
@ -347,17 +347,17 @@ func (server *ResilientSrvTopoServer) GetSrvShard(ctx context.Context, cell, key
}
// not in cache or too old, get the real value
ctx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, err := server.topoServer.GetSrvShard(ctx, cell, keyspace, shard)
result, err := server.topoServer.GetSrvShard(newCtx, cell, keyspace, shard)
if err != nil {
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetSrvShard(%v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", ctx, cell, keyspace, shard, err)
log.Errorf("GetSrvShard(%v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, shard, err)
} else {
server.counts.Add(cachedCategory, 1)
log.Warningf("GetSrvShard(%v, %v, %v, %v) failed: %v (returning cached value: %v %v)", ctx, cell, keyspace, shard, err, entry.value, entry.lastError)
log.Warningf("GetSrvShard(%v, %v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, shard, err, entry.value, entry.lastError)
return entry.value, entry.lastError
}
}
@ -366,7 +366,7 @@ func (server *ResilientSrvTopoServer) GetSrvShard(ctx context.Context, cell, key
entry.insertionTime = time.Now()
entry.value = result
entry.lastError = err
entry.lastErrorCtx = ctx
entry.lastErrorCtx = newCtx
return result, err
}
@ -431,25 +431,25 @@ func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, ke
}
// not in cache or too old, get the real value
ctx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout)
defer cancel()
result, _, err = server.topoServer.GetEndPoints(ctx, cell, keyspace, shard, tabletType)
result, _, err = server.topoServer.GetEndPoints(newCtx, cell, keyspace, shard, tabletType)
// get remote endpoints for master if enabled
if err != nil && server.enableRemoteMaster && tabletType == topo.TYPE_MASTER {
remote = true
server.counts.Add(remoteQueryCategory, 1)
server.endPointCounters.remoteLookups.Add(key, 1)
var ss *topo.SrvShard
ss, err = server.topoServer.GetSrvShard(ctx, cell, keyspace, shard)
ss, err = server.topoServer.GetSrvShard(newCtx, cell, keyspace, shard)
if err != nil {
server.counts.Add(remoteErrorCategory, 1)
server.endPointCounters.remoteLookupErrors.Add(key, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed to get SrvShard for remote master: %v",
ctx, cell, keyspace, shard, tabletType, err)
newCtx, cell, keyspace, shard, tabletType, err)
} else {
if ss.MasterCell != "" && ss.MasterCell != cell {
result, _, err = server.topoServer.GetEndPoints(ctx, ss.MasterCell, keyspace, shard, tabletType)
result, _, err = server.topoServer.GetEndPoints(newCtx, ss.MasterCell, keyspace, shard, tabletType)
}
}
}
@ -457,11 +457,11 @@ func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, ke
server.endPointCounters.lookupErrors.Add(key, 1)
if entry.insertionTime.IsZero() {
server.counts.Add(errorCategory, 1)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", ctx, cell, keyspace, shard, tabletType, err)
log.Errorf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, shard, tabletType, err)
} else {
server.counts.Add(cachedCategory, 1)
server.endPointCounters.staleCacheFallbacks.Add(key, 1)
log.Warningf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (returning cached value: %v %v)", ctx, cell, keyspace, shard, tabletType, err, entry.value, entry.lastError)
log.Warningf("GetEndPoints(%v, %v, %v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, shard, tabletType, err, entry.value, entry.lastError)
return entry.value, -1, entry.lastError
}
}
@ -471,7 +471,7 @@ func (server *ResilientSrvTopoServer) GetEndPoints(ctx context.Context, cell, ke
entry.originalValue = result
entry.value = filterUnhealthyServers(result)
entry.lastError = err
entry.lastErrorCtx = ctx
entry.lastErrorCtx = newCtx
entry.remote = remote
return entry.value, -1, err
}