From 65a7b17255ebfb29e1d2942d201e83ca09bfb851 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 21 Jun 2016 17:15:31 -0700 Subject: [PATCH 1/3] Refactor round-robin --- balancer.go | 155 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 95 insertions(+), 60 deletions(-) diff --git a/balancer.go b/balancer.go index 348bf975..324e3572 100644 --- a/balancer.go +++ b/balancer.go @@ -139,16 +139,20 @@ func RoundRobin(r naming.Resolver) Balancer { return &roundRobin{r: r} } +type addrInfo struct { + addr Address + connected bool +} + type roundRobin struct { - r naming.Resolver - w naming.Watcher - open []Address // all the addresses the client should potentially connect - mu sync.Mutex - addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. - connected []Address // all the connected addresses - next int // index of the next address to return for Get() - waitCh chan struct{} // the channel to block when there is no connected address available - done bool // The Balancer is closed. + r naming.Resolver + w naming.Watcher + addrs []*addrInfo // all the addresses the client should potentially connect + mu sync.Mutex + addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to. + next int // index of the next address to return for Get() + waitCh chan struct{} // the channel to block when there is no connected address available + done bool // The Balancer is closed. } func (rr *roundRobin) watchAddrUpdates() error { @@ -166,8 +170,8 @@ func (rr *roundRobin) watchAddrUpdates() error { switch update.Op { case naming.Add: var exist bool - for _, v := range rr.open { - if addr == v { + for _, v := range rr.addrs { + if addr == v.addr { exist = true grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr) break @@ -176,12 +180,12 @@ func (rr *roundRobin) watchAddrUpdates() error { if exist { continue } - rr.open = append(rr.open, addr) + rr.addrs = append(rr.addrs, &addrInfo{addr: addr}) case naming.Delete: - for i, v := range rr.open { - if v == addr { - copy(rr.open[i:], rr.open[i+1:]) - rr.open = rr.open[:len(rr.open)-1] + for i, v := range rr.addrs { + if addr == v.addr { + copy(rr.addrs[i:], rr.addrs[i+1:]) + rr.addrs = rr.addrs[:len(rr.addrs)-1] break } } @@ -189,9 +193,11 @@ func (rr *roundRobin) watchAddrUpdates() error { grpclog.Println("Unknown update.Op ", update.Op) } } - // Make a copy of rr.open and write it onto rr.addrCh so that gRPC internals gets notified. - open := make([]Address, len(rr.open), len(rr.open)) - copy(open, rr.open) + // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. + open := make([]Address, 0, len(rr.addrs)) + for _, v := range rr.addrs { + open = append(open, v.addr) + } if rr.done { return ErrClientConnClosing } @@ -202,7 +208,9 @@ func (rr *roundRobin) watchAddrUpdates() error { func (rr *roundRobin) Start(target string) error { if rr.r == nil { // If there is no name resolver installed, it is not needed to - // do name resolution. In this case, rr.addrCh stays nil. + // do name resolution. In this case, target is added into rr.addrs + // as the only address available and rr.addrCh stays nil. + rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}}) return nil } w, err := rr.r.Resolve(target) @@ -221,38 +229,41 @@ func (rr *roundRobin) Start(target string) error { return nil } -// Up appends addr to the end of rr.connected and sends notification if there -// are pending Get() calls. +// Up sets the connected state of addr and sends notification if there are pending +// Get() calls. func (rr *roundRobin) Up(addr Address) func(error) { rr.mu.Lock() defer rr.mu.Unlock() - for _, a := range rr.connected { - if a == addr { - return nil + var cnt int + for _, a := range rr.addrs { + if a.addr == addr { + if a.connected { + return nil + } + a.connected = true + } + if a.connected { + cnt++ } } - rr.connected = append(rr.connected, addr) - if len(rr.connected) == 1 { - // addr is only one available. Notify the Get() callers who are blocking. - if rr.waitCh != nil { - close(rr.waitCh) - rr.waitCh = nil - } + // addr is only one which is connected. Notify the Get() callers who are blocking. + if cnt == 1 && rr.waitCh != nil { + close(rr.waitCh) + rr.waitCh = nil } return func(err error) { rr.down(addr, err) } } -// down removes addr from rr.connected and moves the remaining addrs forward. +// down unsets the connected state of addr. func (rr *roundRobin) down(addr Address, err error) { rr.mu.Lock() defer rr.mu.Unlock() - for i, a := range rr.connected { - if a == addr { - copy(rr.connected[i:], rr.connected[i+1:]) - rr.connected = rr.connected[:len(rr.connected)-1] - return + for _, a := range rr.addrs { + if addr == a.addr { + a.connected = false + break } } } @@ -266,14 +277,26 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad err = ErrClientConnClosing return } - if rr.next >= len(rr.connected) { - rr.next = 0 - } - if len(rr.connected) > 0 { - addr = rr.connected[rr.next] - rr.next++ - rr.mu.Unlock() - return + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } + } } // There is no address available. Wait on rr.waitCh. // TODO(zhaoq): Handle the case when opts.BlockingWait is false. @@ -296,24 +319,36 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad err = ErrClientConnClosing return } - if len(rr.connected) == 0 { - // The newly added addr got removed by Down() again. - if rr.waitCh == nil { - ch = make(chan struct{}) - rr.waitCh = ch - } else { - ch = rr.waitCh + + if len(rr.addrs) > 0 { + if rr.next >= len(rr.addrs) { + rr.next = 0 + } + next := rr.next + for { + a := rr.addrs[next] + next = (next + 1) % len(rr.addrs) + if a.connected { + addr = a.addr + rr.next = next + rr.mu.Unlock() + return + } + if next == rr.next { + // Has iterated all the possible address but none is connected. + break + } } - rr.mu.Unlock() - continue } - if rr.next >= len(rr.connected) { - rr.next = 0 + // The newly added addr got removed by Down() again. + if rr.waitCh == nil { + ch = make(chan struct{}) + rr.waitCh = ch + } else { + ch = rr.waitCh } - addr = rr.connected[rr.next] - rr.next++ rr.mu.Unlock() - return + continue } } } From f5c974be39e12d81de9e4bbc9df904bbeca9d8fe Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 21 Jun 2016 18:21:27 -0700 Subject: [PATCH 2/3] delete unnecessary continue --- balancer.go | 1 - 1 file changed, 1 deletion(-) diff --git a/balancer.go b/balancer.go index 324e3572..274f3be2 100644 --- a/balancer.go +++ b/balancer.go @@ -348,7 +348,6 @@ func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Ad ch = rr.waitCh } rr.mu.Unlock() - continue } } } From a4e91b972417cf67d46e951e2ce05018693be35e Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 23 Jun 2016 11:08:27 -0700 Subject: [PATCH 3/3] addressed the comments --- balancer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/balancer.go b/balancer.go index 274f3be2..c298ae91 100644 --- a/balancer.go +++ b/balancer.go @@ -194,9 +194,9 @@ func (rr *roundRobin) watchAddrUpdates() error { } } // Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified. - open := make([]Address, 0, len(rr.addrs)) - for _, v := range rr.addrs { - open = append(open, v.addr) + open := make([]Address, len(rr.addrs)) + for i, v := range rr.addrs { + open[i] = v.addr } if rr.done { return ErrClientConnClosing