grpc-go/balancer.go

375 строки
11 KiB
Go
Исходник Обычный вид История

2016-05-17 01:47:46 +03:00
/*
*
* Copyright 2016, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
2016-05-07 01:47:09 +03:00
package grpc
import (
2016-05-25 21:28:45 +03:00
"fmt"
2016-05-07 01:47:09 +03:00
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/naming"
2016-05-07 01:47:09 +03:00
"google.golang.org/grpc/transport"
)
2016-05-13 04:52:24 +03:00
// Address represents a server the client connects to.
2016-05-25 03:37:44 +03:00
// This is the EXPERIMENTAL API and may be changed or extended in the future.
2016-05-07 01:47:09 +03:00
type Address struct {
2016-05-13 04:52:24 +03:00
// Addr is the server address on which a connection will be established.
2016-05-07 01:47:09 +03:00
Addr string
// Metadata is the information associated with Addr, which may be used
2016-05-27 01:49:50 +03:00
// to make load balancing decision.
2016-05-07 01:47:09 +03:00
Metadata interface{}
}
2016-05-25 21:52:51 +03:00
// BalancerGetOptions configures a Get call.
2016-05-25 03:37:44 +03:00
// This is the EXPERIMENTAL API and may be changed or extended in the future.
2016-05-25 03:19:44 +03:00
type BalancerGetOptions struct {
// BlockingWait specifies whether Get should block when there is no
2016-05-25 03:37:44 +03:00
// connected address.
2016-05-25 03:19:44 +03:00
BlockingWait bool
}
2016-05-07 01:47:09 +03:00
// Balancer chooses network addresses for RPCs.
2016-05-17 03:12:44 +03:00
// This is the EXPERIMENTAL API and may be changed or extended in the future.
2016-05-07 01:47:09 +03:00
type Balancer interface {
// Start does the initialization work to bootstrap a Balancer. For example,
2016-05-27 00:53:32 +03:00
// this function may start the name resolution and watch the updates. It will
// be called when dialing.
Start(target string) error
2016-05-25 03:37:44 +03:00
// Up informs the Balancer that gRPC has a connection to the server at
2016-05-17 02:36:40 +03:00
// addr. It returns down which is called once the connection to addr gets
2016-05-25 03:37:44 +03:00
// lost or closed.
2016-06-02 00:09:21 +03:00
// TODO: It is not clear how to construct and take advantage the meaningful error
// parameter for down. Need realistic demands to guide.
2016-05-07 01:47:09 +03:00
Up(addr Address) (down func(error))
2016-05-25 04:14:24 +03:00
// Get gets the address of a server for the RPC corresponding to ctx.
2016-05-25 21:38:29 +03:00
// i) If it returns a connected address, gRPC internals issues the RPC on the
// connection to this address;
// ii) If it returns an address on which the connection is under construction
// (initiated by Notify(...)) but not connected, gRPC internals
2016-06-02 00:09:21 +03:00
// * fails RPC if the RPC is fail-fast and connection is in the TransientFailure or
// Shutdown state;
2016-05-25 21:38:29 +03:00
// or
2016-05-25 04:14:24 +03:00
// * issues RPC on the connection otherwise.
2016-05-25 21:38:29 +03:00
// iii) If it returns an address on which the connection does not exist, gRPC
// internals treats it as an error and will fail the corresponding RPC.
2016-05-25 04:14:24 +03:00
//
// Therefore, the following is the recommended rule when writing a custom Balancer.
2016-05-25 21:38:29 +03:00
// If opts.BlockingWait is true, it should return a connected address or
2016-05-26 01:31:54 +03:00
// block if there is no connected address. It should respect the timeout or
2016-05-25 04:14:24 +03:00
// cancellation of ctx when blocking. If opts.BlockingWait is false (for fail-fast
2016-05-25 21:38:29 +03:00
// RPCs), it should return an address it has notified via Notify(...) immediately
// instead of blocking.
2016-05-25 04:14:24 +03:00
//
// The function returns put which is called once the rpc has completed or failed.
2016-06-02 00:09:21 +03:00
// put can collect and report RPC stats to a remote load balancer. gRPC internals
// will try to call this again if err is non-nil (unless err is ErrClientConnClosing).
//
// TODO: Add other non-recoverable errors?
2016-05-25 03:19:44 +03:00
Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error)
// Notify returns a channel that is used by gRPC internals to watch the addresses
// gRPC needs to connect. The addresses might be from a name resolver or remote
// load balancer. gRPC internals will compare it with the existing connected
// addresses. If the address Balancer notified is not in the existing connected
// addresses, gRPC starts to connect the address. If an address in the existing
// connected addresses is not in the notification list, the corresponding connection
// is shutdown gracefully. Otherwise, there are no operations to take. Note that
// the Address slice must be the full list of the Addresses which should be connected.
// It is NOT delta.
Notify() <-chan []Address
2016-05-07 01:47:09 +03:00
// Close shuts down the balancer.
Close() error
}
2016-05-25 21:52:51 +03:00
// downErr implements net.Error. It is constructed by gRPC internals and passed to the down
2016-05-25 21:28:45 +03:00
// call of Balancer.
type downErr struct {
timeout bool
temporary bool
desc string
}
func (e downErr) Error() string { return e.desc }
func (e downErr) Timeout() bool { return e.timeout }
func (e downErr) Temporary() bool { return e.temporary }
func downErrorf(timeout, temporary bool, format string, a ...interface{}) downErr {
return downErr{
timeout: timeout,
temporary: temporary,
desc: fmt.Sprintf(format, a...),
}
}
// RoundRobin returns a Balancer that selects addresses round-robin. It uses r to watch
// the name resolution updates and updates the addresses available correspondingly.
func RoundRobin(r naming.Resolver) Balancer {
return &roundRobin{r: r}
2016-05-07 01:47:09 +03:00
}
2016-06-22 03:15:31 +03:00
type addrInfo struct {
addr Address
connected bool
}
2016-05-07 01:47:09 +03:00
type roundRobin struct {
2016-06-22 03:15:31 +03:00
r naming.Resolver
w naming.Watcher
addrs []*addrInfo // all the addresses the client should potentially connect
mu sync.Mutex
addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
next int // index of the next address to return for Get()
waitCh chan struct{} // the channel to block when there is no connected address available
done bool // The Balancer is closed.
}
func (rr *roundRobin) watchAddrUpdates() error {
updates, err := rr.w.Next()
if err != nil {
2016-05-26 01:31:54 +03:00
grpclog.Println("grpc: the naming watcher stops working due to %v.", err)
return err
}
2016-05-25 03:50:02 +03:00
rr.mu.Lock()
defer rr.mu.Unlock()
for _, update := range updates {
addr := Address{
Addr: update.Addr,
}
switch update.Op {
case naming.Add:
2016-05-25 03:50:02 +03:00
var exist bool
2016-06-22 03:15:31 +03:00
for _, v := range rr.addrs {
if addr == v.addr {
2016-05-25 03:50:02 +03:00
exist = true
grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
break
}
}
2016-05-25 03:50:02 +03:00
if exist {
continue
}
2016-06-22 03:15:31 +03:00
rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
case naming.Delete:
2016-06-22 03:15:31 +03:00
for i, v := range rr.addrs {
if addr == v.addr {
copy(rr.addrs[i:], rr.addrs[i+1:])
rr.addrs = rr.addrs[:len(rr.addrs)-1]
break
}
}
default:
grpclog.Println("Unknown update.Op ", update.Op)
}
}
2016-06-22 03:15:31 +03:00
// Make a copy of rr.addrs and write it onto rr.addrCh so that gRPC internals gets notified.
2016-06-23 21:08:27 +03:00
open := make([]Address, len(rr.addrs))
for i, v := range rr.addrs {
open[i] = v.addr
2016-06-22 03:15:31 +03:00
}
if rr.done {
return ErrClientConnClosing
}
rr.addrCh <- open
return nil
}
func (rr *roundRobin) Start(target string) error {
if rr.r == nil {
2016-05-25 03:50:02 +03:00
// If there is no name resolver installed, it is not needed to
2016-06-22 03:15:31 +03:00
// do name resolution. In this case, target is added into rr.addrs
// as the only address available and rr.addrCh stays nil.
rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
return nil
}
w, err := rr.r.Resolve(target)
if err != nil {
return err
}
rr.w = w
rr.addrCh = make(chan []Address)
go func() {
for {
if err := rr.watchAddrUpdates(); err != nil {
return
}
}
}()
return nil
2016-05-07 01:47:09 +03:00
}
2016-06-22 03:15:31 +03:00
// Up sets the connected state of addr and sends notification if there are pending
// Get() calls.
2016-05-11 05:29:44 +03:00
func (rr *roundRobin) Up(addr Address) func(error) {
2016-05-07 01:47:09 +03:00
rr.mu.Lock()
defer rr.mu.Unlock()
2016-06-22 03:15:31 +03:00
var cnt int
for _, a := range rr.addrs {
if a.addr == addr {
if a.connected {
return nil
}
a.connected = true
2016-05-07 01:47:09 +03:00
}
2016-06-22 03:15:31 +03:00
if a.connected {
cnt++
2016-05-07 01:47:09 +03:00
}
}
2016-06-22 03:15:31 +03:00
// addr is only one which is connected. Notify the Get() callers who are blocking.
if cnt == 1 && rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
2016-05-11 05:29:44 +03:00
return func(err error) {
rr.down(addr, err)
2016-05-07 01:47:09 +03:00
}
}
2016-06-22 03:15:31 +03:00
// down unsets the connected state of addr.
2016-05-11 05:29:44 +03:00
func (rr *roundRobin) down(addr Address, err error) {
2016-05-07 01:47:09 +03:00
rr.mu.Lock()
defer rr.mu.Unlock()
2016-06-22 03:15:31 +03:00
for _, a := range rr.addrs {
if addr == a.addr {
a.connected = false
break
2016-05-07 01:47:09 +03:00
}
}
}
2016-05-25 03:50:02 +03:00
// Get returns the next addr in the rotation.
2016-05-25 03:19:44 +03:00
func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
2016-05-07 01:47:09 +03:00
var ch chan struct{}
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
2016-06-22 03:15:31 +03:00
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
2016-05-07 01:47:09 +03:00
}
2016-05-13 04:52:24 +03:00
// There is no address available. Wait on rr.waitCh.
2016-05-25 03:19:44 +03:00
// TODO(zhaoq): Handle the case when opts.BlockingWait is false.
2016-05-07 01:47:09 +03:00
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
}
rr.mu.Unlock()
for {
select {
case <-ctx.Done():
err = transport.ContextErr(ctx.Err())
return
case <-ch:
rr.mu.Lock()
if rr.done {
rr.mu.Unlock()
err = ErrClientConnClosing
return
}
2016-06-22 03:15:31 +03:00
if len(rr.addrs) > 0 {
if rr.next >= len(rr.addrs) {
rr.next = 0
}
next := rr.next
for {
a := rr.addrs[next]
next = (next + 1) % len(rr.addrs)
if a.connected {
addr = a.addr
rr.next = next
rr.mu.Unlock()
return
}
if next == rr.next {
// Has iterated all the possible address but none is connected.
break
}
}
2016-05-07 01:47:09 +03:00
}
2016-06-22 03:15:31 +03:00
// The newly added addr got removed by Down() again.
if rr.waitCh == nil {
ch = make(chan struct{})
rr.waitCh = ch
} else {
ch = rr.waitCh
2016-05-07 01:47:09 +03:00
}
rr.mu.Unlock()
}
}
}
func (rr *roundRobin) Notify() <-chan []Address {
return rr.addrCh
2016-05-07 01:47:09 +03:00
}
func (rr *roundRobin) Close() error {
rr.mu.Lock()
defer rr.mu.Unlock()
rr.done = true
if rr.w != nil {
rr.w.Close()
2016-05-26 01:31:54 +03:00
}
if rr.waitCh != nil {
close(rr.waitCh)
rr.waitCh = nil
}
if rr.addrCh != nil {
close(rr.addrCh)
}
2016-05-07 01:47:09 +03:00
return nil
}