Merge pull request #727 from iamqizhao/master

Refactor round-robin impl
This commit is contained in:
Menghan Li 2016-06-23 14:14:58 -07:00 коммит произвёл GitHub
Родитель e78224b060 a4e91b9724
Коммит 69420784d4
1 изменённых файлов: 94 добавлений и 60 удалений

Просмотреть файл

@ -139,16 +139,20 @@ func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
}
type addrInfo struct {
addr Address
connected bool
}
type roundRobin struct {
r naming.Resolver
w naming.Watcher
open []Address // all the addresses the client should potentially connect
mu sync.Mutex
addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
connected []Address // all the connected addresses
next int // index of the next address to return for Get()
waitCh chan struct{} // the channel to block when there is no connected address available
done bool // The Balancer is closed.
r naming.Resolver
w naming.Watcher
addrs []*addrInfo // all the addresses the client should potentially connect
mu sync.Mutex
addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
next int // index of the next address to return for Get()
waitCh chan struct{} // the channel to block when there is no connected address available
done bool // The Balancer is closed.
}
func (rr *roundRobin) watchAddrUpdates() error {
@ -166,8 +170,8 @@ func (rr *roundRobin) watchAddrUpdates() error {
switch update.Op {
case naming.Add:
var exist bool
for _, v := range rr.open {
if addr == v {
for _, v := range rr.addrs {
if addr == v.addr {
exist = true
grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
break
@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error {
if exist {
continue
}
rr.open = append(rr.open, addr)
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
for i, v := range rr.open {
if v == addr {
copy(rr.open[i:], rr.open[i+1:])
rr.open = rr.open[:len(rr.open)-1]
for i, v := range rr.addrs {
if addr == v.addr {
copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error {
grpclog.Println("Unknown update.Op ", update.Op)
}
}
// Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.open), len(rr.open))
copy(open, rr.open)
// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {
open[i] = v.addr
}
if rr.done {
return ErrClientConnClosing
}
@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error {
func (rr *roundRobin) Start(target string) error {
if rr.r == nil {
// If there is no name resolver installed, it is not needed to
// do name resolution. In this case, rr.addrCh stays nil.
// do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error {
return nil
}
// Up appends addr to the end of rr.connected and sends notification if there
// are pending Get() calls.
// Up sets the connected state of addr and sends notification if there are pending
// Get() calls.
func (rr *roundRobin) Up(addr Address) func(error) {
rr.mu.Lock()
defer rr.mu.Unlock()
for _, a := range rr.connected {
if a == addr {
return nil
var cnt int
for _, a := range rr.addrs {
if a.addr == addr {
if a.connected {
return nil
}
a.connected = true
}
if a.connected {
cnt++
}
}
rr.connected = append(rr.connected, addr)
if len(rr.connected) == 1 {
// addr is only one available. Notify the Get() callers who are blocking.
if rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
// addr is only one which is connected. Notify the Get() callers who are blocking.
if cnt == 1 && rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
return func(err error) {
rr.down(addr, err)
}
}
// down removes addr from rr.connected and moves the remaining addrs forward.
// down unsets the connected state of addr.
func (rr *roundRobin) down(addr Address, err error) {
rr.mu.Lock()
defer rr.mu.Unlock()
for i, a := range rr.connected {
if a == addr {
copy(rr.connected[i:], rr.connected[i+1:])
rr.connected = rr.connected[:len(rr.connected)-1]
return
for _, a := range rr.addrs {
if addr == a.addr {
a.connected = false
break
}
}
}
@ -266,14 +277,26 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing
return
}
if rr.next >= len(rr.connected) {
rr.next = 0
}
if len(rr.connected) > 0 {
addr = rr.connected[rr.next]
rr.next++
rr.mu.Unlock()
return
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
}
// There is no address available. Wait on rr.waitCh.
// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
@ -296,24 +319,35 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad
err = ErrClientConnClosing
return
}
if len(rr.connected) == 0 {
// The newly added addr got removed by Down() again.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
rr.mu.Unlock()
continue
}
if rr.next >= len(rr.connected) {
rr.next = 0
// The newly added addr got removed by Down() again.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
addr = rr.connected[rr.next]
rr.next++
rr.mu.Unlock()
return
}
}
}