This commit is contained in:
iamqizhao 2016-05-12 18:52:24 -07:00
Родитель f93b6bbfb5
Коммит 32eec1acef
2 изменённых файлов: 13 добавлений и 12 удалений

Просмотреть файл

@ -7,8 +7,9 @@ import (
"google.golang.org/grpc/transport"
)
// Address represents a server the client connects to.
type Address struct {
// Addr is the peer address on which a connection will be established.
// Addr is the server address on which a connection will be established.
Addr string
// Metadata is the information associated with Addr, which may be used
// to make load balancing decision. This is from the metadata attached
@ -32,18 +33,20 @@ type Balancer interface {
Close() error
}
// RoundRobin returns a Balancer that selects addresses round-robin.
func RoundRobin() Balancer {
return &roundRobin{}
}
type roundRobin struct {
mu sync.Mutex
addrs []Address
next int
waitCh chan struct{}
pending int
mu sync.Mutex
addrs []Address
next int // index of the next address to return for Get()
waitCh chan struct{}
}
// Up appends addr to the end of rr.addrs and sends notification if there
// are pending Get() calls.
func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock()
defer rr.mu.Unlock()
@ -64,6 +67,7 @@ func (rr *roundRobin) Up(addr Address) func(error) {
}
}
// down removes addr from rr.addrs and moves the remaining addrs forward.
func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
@ -76,6 +80,7 @@ func (rr *roundRobin) down(addr Address, err error) {
}
}
// Get returns the next addr in the rotation. It blocks if there is no address available.
func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err error) {
var ch chan struct{}
rr.mu.Lock()
@ -85,13 +90,13 @@ func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err er
if len(rr.addrs) > 0 {
addr = rr.addrs[rr.next]
rr.next++
rr.pending++
rr.mu.Unlock()
put = func() {
rr.put(ctx, addr)
}
return
}
// There is no address available. Wait on rr.waitCh.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
@ -116,7 +121,6 @@ func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err er
}
addr = rr.addrs[rr.next]
rr.next++
rr.pending++
rr.mu.Unlock()
put = func() {
rr.put(ctx, addr)
@ -127,9 +131,6 @@ func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err er
}
func (rr *roundRobin) put(ctx context.Context, addr Address) {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.pending--
}
func (rr *roundRobin) Close() error {

Просмотреть файл

@ -618,7 +618,7 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
closeTransport = false
select {
case <-time.After(sleepTime):
case <-cc.shutdownChan:
case <-ac.shutdownChan:
}
retries++
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)