load balancer
This commit is contained in:
Родитель
b062a3c003
Коммит
9c2d8546bf
|
@ -0,0 +1,137 @@
|
|||
package grpc
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
type Address struct {
|
||||
// Addr is the peer address on which a connection will be established.
|
||||
Addr string
|
||||
// Metadata is the information associated with Addr, which may be used
|
||||
// to make load balancing decision. This is from the metadata attached
|
||||
// in the address updates from name resolver.
|
||||
Metadata interface{}
|
||||
}
|
||||
|
||||
// Balancer chooses network addresses for RPCs.
|
||||
type Balancer interface {
|
||||
// Up informs the balancer that gRPC has a connection to the server at
|
||||
// addr. It returns down which will be called once the connection gets
|
||||
// lost. Once down is called, addr may no longer be returned by Get.
|
||||
Up(addr Address) (down func(error))
|
||||
// Get gets the address of a server for the rpc corresponding to ctx.
|
||||
// It may block if there is no server available. It respects the
|
||||
// timeout or cancellation of ctx when blocking. It returns put which
|
||||
// is called once the rpc has completed or failed. put can collect and
|
||||
// report rpc stats to remote load balancer.
|
||||
Get(ctx context.Context) (addr Address, put func(), err error)
|
||||
// Close shuts down the balancer.
|
||||
Close() error
|
||||
}
|
||||
|
||||
func RoundRobin() Balancer {
|
||||
return &roundRobin{}
|
||||
}
|
||||
|
||||
type roundRobin struct {
|
||||
mu sync.Mutex
|
||||
addrs []Address
|
||||
next int
|
||||
waitCh chan struct{}
|
||||
pending int
|
||||
}
|
||||
|
||||
func (rr *roundRobin) Up(addr Address) func() {
|
||||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
for _, a := range rr.addrs {
|
||||
if a == addr {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
rr.addrs = append(rr.addrs, addr)
|
||||
if len(rr.addrs) == 1 {
|
||||
if rr.waitCh != nil {
|
||||
close(rr.waitCh)
|
||||
rr.waitCh = nil
|
||||
}
|
||||
}
|
||||
return func() {
|
||||
rr.down(addr)
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *roundRobin) down(addr Address) {
|
||||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
for i, a := range rr.addrs {
|
||||
if a == addr {
|
||||
copy(rr.addrs[i:], rr.addrs[i+1:])
|
||||
rr.addrs = rr.addrs[:len(rr.addrs)-1]
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *roundRobin) Get(ctx context.Context) (addr Address, put func(), err error) {
|
||||
var ch chan struct{}
|
||||
rr.mu.Lock()
|
||||
if rr.next >= len(rr.addrs) {
|
||||
rr.next = 0
|
||||
}
|
||||
if len(rr.addrs) > 0 {
|
||||
addr = rr.addrs[rr.next]
|
||||
rr.next++
|
||||
rr.pending++
|
||||
rr.mu.Unlock()
|
||||
put = func() {
|
||||
rr.put(ctx, addr)
|
||||
}
|
||||
return
|
||||
}
|
||||
if rr.waitCh == nil {
|
||||
ch = make(chan struct{})
|
||||
rr.waitCh = ch
|
||||
} else {
|
||||
ch = rr.waitCh
|
||||
}
|
||||
rr.mu.Unlock()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
err = transport.ContextErr(ctx.Err())
|
||||
return
|
||||
case <-ch:
|
||||
rr.mu.Lock()
|
||||
if len(rr.addrs) == 0 {
|
||||
// The newly added addr got removed by Down() again.
|
||||
rr.mu.Unlock()
|
||||
continue
|
||||
}
|
||||
if rr.next >= len(rr.addrs) {
|
||||
rr.next = 0
|
||||
}
|
||||
addr = rr.addrs[rr.next]
|
||||
rr.next++
|
||||
rr.pending++
|
||||
rr.mu.Unlock()
|
||||
put = func() {
|
||||
rr.put(ctx, addr)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (rr *roundRobin) put(ctx context.Context, addr Address) {
|
||||
rr.mu.Lock()
|
||||
defer rr.mu.Unlock()
|
||||
rr.pending--
|
||||
}
|
||||
|
||||
func (rr *roundRobin) Close() error {
|
||||
return nil
|
||||
}
|
6
call.go
6
call.go
|
@ -134,6 +134,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
}
|
||||
var (
|
||||
lastErr error // record the error that happened
|
||||
put func()
|
||||
)
|
||||
for {
|
||||
var (
|
||||
|
@ -152,7 +153,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
if cc.dopts.cp != nil {
|
||||
callHdr.SendCompress = cc.dopts.cp.Type()
|
||||
}
|
||||
t, err = cc.dopts.picker.Pick(ctx)
|
||||
t, put, err = cc.getTransport(ctx)
|
||||
if err != nil {
|
||||
if lastErr != nil {
|
||||
// This was a retry; return the error from the last attempt.
|
||||
|
@ -165,6 +166,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
}
|
||||
stream, err = sendRequest(ctx, cc.dopts.codec, cc.dopts.cp, callHdr, t, args, topts)
|
||||
if err != nil {
|
||||
put()
|
||||
if _, ok := err.(transport.ConnectionError); ok {
|
||||
lastErr = err
|
||||
continue
|
||||
|
@ -177,12 +179,14 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||
// Receive the response
|
||||
lastErr = recvResponse(cc.dopts, t, &c, stream, reply)
|
||||
if _, ok := lastErr.(transport.ConnectionError); ok {
|
||||
put()
|
||||
continue
|
||||
}
|
||||
if c.traceInfo.tr != nil {
|
||||
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
|
||||
}
|
||||
t.CloseStream(stream, lastErr)
|
||||
put()
|
||||
if lastErr != nil {
|
||||
return toRPCErr(lastErr)
|
||||
}
|
||||
|
|
571
clientconn.go
571
clientconn.go
|
@ -43,14 +43,14 @@ import (
|
|||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/naming"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrUnspecTarget indicates that the target address is unspecified.
|
||||
ErrUnspecTarget = errors.New("grpc: target is unspecified")
|
||||
// ErrNoTransportSecurity indicates that there is no transport security
|
||||
// being set for ClientConn. Users should either set one or explicitly
|
||||
// call WithInsecure DialOption to disable security.
|
||||
|
@ -60,11 +60,16 @@ var (
|
|||
// connection.
|
||||
ErrCredentialsMisuse = errors.New("grpc: the credentials require transport level security (use grpc.WithTransportAuthenticator() to set)")
|
||||
// ErrClientConnClosing indicates that the operation is illegal because
|
||||
// the session is closing.
|
||||
// the ClientConn is closing.
|
||||
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
|
||||
// ErrClientConnTimeout indicates that the connection could not be
|
||||
// established or re-established within the specified timeout.
|
||||
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
|
||||
// ErrNetworkIP indicates that the connection is down due to some network I/O error.
|
||||
ErrNetworkIO = errors.New("grpc: failed with network I/O error")
|
||||
// ErrConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
|
||||
ErrConnDrain = errors.New("grpc: ")
|
||||
errConnClosing = errors.New("grpc: the addrConn is closing")
|
||||
// minimum time to give a connection to complete
|
||||
minConnectTimeout = 20 * time.Second
|
||||
)
|
||||
|
@ -76,7 +81,8 @@ type dialOptions struct {
|
|||
cp Compressor
|
||||
dc Decompressor
|
||||
bs backoffStrategy
|
||||
picker Picker
|
||||
resolver naming.Resolver
|
||||
balancer Balancer
|
||||
block bool
|
||||
insecure bool
|
||||
copts transport.ConnectOptions
|
||||
|
@ -108,10 +114,9 @@ func WithDecompressor(dc Decompressor) DialOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithPicker returns a DialOption which sets a picker for connection selection.
|
||||
func WithPicker(p Picker) DialOption {
|
||||
func WithResolver(r naming.Resolver) DialOption {
|
||||
return func(o *dialOptions) {
|
||||
o.picker = p
|
||||
o.resolver = r
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,6 +206,7 @@ func WithUserAgent(s string) DialOption {
|
|||
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
||||
cc := &ClientConn{
|
||||
target: target,
|
||||
infos: make(map[Address]*addrInfo),
|
||||
}
|
||||
for _, opt := range opts {
|
||||
opt(&cc.dopts)
|
||||
|
@ -214,14 +220,44 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
|
|||
cc.dopts.bs = DefaultBackoffConfig
|
||||
}
|
||||
|
||||
if cc.dopts.picker == nil {
|
||||
cc.dopts.picker = &unicastPicker{
|
||||
target: target,
|
||||
cc.balancer = cc.dopts.balancer
|
||||
if cc.balancer == nil {
|
||||
cc.balancer = RoundRobin()
|
||||
}
|
||||
|
||||
// Start the first connection
|
||||
if cc.dopts.resolver == nil {
|
||||
addr := Address{
|
||||
Addr: cc.target,
|
||||
}
|
||||
if err := cc.dopts.picker.Init(cc); err != nil {
|
||||
ac, err := cc.newAddrConn(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.infos[addr] = &addrInfo{
|
||||
ac: ac,
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
} else {
|
||||
w, err := cc.dopts.resolver.Resolve(cc.target)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cc.watcher = w
|
||||
// Get the initial name resolution which starts dialing.
|
||||
if err := cc.watchAddrUpdates(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
if err := cc.watchAddrUpdates(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
colonPos := strings.LastIndex(target, ":")
|
||||
if colonPos == -1 {
|
||||
colonPos = len(target)
|
||||
|
@ -263,61 +299,88 @@ func (s ConnectivityState) String() string {
|
|||
}
|
||||
}
|
||||
|
||||
type addrInfo struct {
|
||||
ac *addrConn
|
||||
}
|
||||
|
||||
// ClientConn represents a client connection to an RPC service.
|
||||
type ClientConn struct {
|
||||
target string
|
||||
watcher naming.Watcher
|
||||
balancer Balancer
|
||||
authority string
|
||||
dopts dialOptions
|
||||
|
||||
mu sync.RWMutex
|
||||
infos map[Address]*addrInfo
|
||||
}
|
||||
|
||||
// State returns the connectivity state of cc.
|
||||
// This is EXPERIMENTAL API.
|
||||
func (cc *ClientConn) State() (ConnectivityState, error) {
|
||||
return cc.dopts.picker.State()
|
||||
func (cc *ClientConn) watchAddrUpdates() error {
|
||||
updates, err := cc.watcher.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, update := range updates {
|
||||
switch update.Op {
|
||||
case naming.Add:
|
||||
cc.mu.Lock()
|
||||
addr := Address{
|
||||
Addr: update.Addr,
|
||||
Metadata: update.Metadata,
|
||||
}
|
||||
if _, ok := cc.infos[addr]; ok {
|
||||
cc.mu.Unlock()
|
||||
grpclog.Println("grpc: The name resolver wanted to add an existing address: ", addr)
|
||||
continue
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
ac, err := cc.newAddrConn(addr)
|
||||
if err != nil {
|
||||
cc.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.infos[addr] = &addrInfo{
|
||||
ac: ac,
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
case naming.Delete:
|
||||
cc.mu.Lock()
|
||||
addr := Address{
|
||||
Addr: update.Addr,
|
||||
Metadata: update.Metadata,
|
||||
}
|
||||
i, ok := cc.infos[addr]
|
||||
if !ok {
|
||||
cc.mu.Unlock()
|
||||
grpclog.Println("grpc: The name resolver wanted to delete a non-exist address: ", addr)
|
||||
continue
|
||||
}
|
||||
delete(cc.infos, addr)
|
||||
cc.mu.Unlock()
|
||||
i.ac.startDrain()
|
||||
default:
|
||||
grpclog.Println("Unknown update.Op ", update.Op)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForStateChange blocks until the state changes to something other than the sourceState.
|
||||
// It returns the new state or error.
|
||||
// This is EXPERIMENTAL API.
|
||||
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
|
||||
}
|
||||
|
||||
// Close starts to tear down the ClientConn.
|
||||
func (cc *ClientConn) Close() error {
|
||||
return cc.dopts.picker.Close()
|
||||
}
|
||||
|
||||
// Conn is a client connection to a single destination.
|
||||
type Conn struct {
|
||||
target string
|
||||
dopts dialOptions
|
||||
resetChan chan int
|
||||
shutdownChan chan struct{}
|
||||
events trace.EventLog
|
||||
|
||||
mu sync.Mutex
|
||||
state ConnectivityState
|
||||
stateCV *sync.Cond
|
||||
// ready is closed and becomes nil when a new transport is up or failed
|
||||
// due to timeout.
|
||||
ready chan struct{}
|
||||
transport transport.ClientTransport
|
||||
}
|
||||
|
||||
// NewConn creates a Conn.
|
||||
func NewConn(cc *ClientConn) (*Conn, error) {
|
||||
func (cc *ClientConn) newAddrConn(addr Address) (*addrConn, error) {
|
||||
/*
|
||||
if cc.target == "" {
|
||||
return nil, ErrUnspecTarget
|
||||
}
|
||||
c := &Conn{
|
||||
target: cc.target,
|
||||
*/
|
||||
c := &addrConn{
|
||||
cc: cc,
|
||||
addr: addr,
|
||||
dopts: cc.dopts,
|
||||
resetChan: make(chan int, 1),
|
||||
//resetChan: make(chan int, 1),
|
||||
shutdownChan: make(chan struct{}),
|
||||
}
|
||||
if EnableTracing {
|
||||
c.events = trace.NewEventLog("grpc.ClientConn", c.target)
|
||||
c.events = trace.NewEventLog("grpc.ClientConn", c.addr.Addr)
|
||||
}
|
||||
if !c.dopts.insecure {
|
||||
var ok bool
|
||||
|
@ -339,7 +402,7 @@ func NewConn(cc *ClientConn) (*Conn, error) {
|
|||
c.stateCV = sync.NewCond(&c.mu)
|
||||
if c.dopts.block {
|
||||
if err := c.resetTransport(false); err != nil {
|
||||
c.Close()
|
||||
c.tearDown(err)
|
||||
return nil, err
|
||||
}
|
||||
// Start to monitor the error status of transport.
|
||||
|
@ -348,108 +411,200 @@ func NewConn(cc *ClientConn) (*Conn, error) {
|
|||
// Start a goroutine connecting to the server asynchronously.
|
||||
go func() {
|
||||
if err := c.resetTransport(false); err != nil {
|
||||
grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
|
||||
c.Close()
|
||||
grpclog.Printf("Failed to dial %s: %v; please retry.", c.addr.Addr, err)
|
||||
c.tearDown(err)
|
||||
return
|
||||
}
|
||||
grpclog.Println("DEBUG ugh here resetTransport")
|
||||
c.transportMonitor()
|
||||
}()
|
||||
}
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// printf records an event in cc's event log, unless cc has been closed.
|
||||
// REQUIRES cc.mu is held.
|
||||
func (cc *Conn) printf(format string, a ...interface{}) {
|
||||
if cc.events != nil {
|
||||
cc.events.Printf(format, a...)
|
||||
func (cc *ClientConn) getTransport(ctx context.Context) (transport.ClientTransport, func(), error) {
|
||||
addr, put, err := cc.balancer.Get(ctx)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
cc.mu.RLock()
|
||||
if cc.infos == nil {
|
||||
cc.mu.RUnlock()
|
||||
return nil, nil, ErrClientConnClosing
|
||||
}
|
||||
info, ok := cc.infos[addr]
|
||||
cc.mu.RUnlock()
|
||||
if !ok {
|
||||
put()
|
||||
return nil, nil, transport.StreamErrorf(codes.Internal, "grpc: failed to find the transport to send the rpc")
|
||||
}
|
||||
t, err := info.ac.wait(ctx)
|
||||
if err != nil {
|
||||
put()
|
||||
return nil, nil, err
|
||||
}
|
||||
return t, put, nil
|
||||
}
|
||||
|
||||
// errorf records an error in cc's event log, unless cc has been closed.
|
||||
// REQUIRES cc.mu is held.
|
||||
func (cc *Conn) errorf(format string, a ...interface{}) {
|
||||
if cc.events != nil {
|
||||
cc.events.Errorf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// State returns the connectivity state of the Conn
|
||||
func (cc *Conn) State() ConnectivityState {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
return cc.state
|
||||
/*
|
||||
// State returns the connectivity state of cc.
|
||||
// This is EXPERIMENTAL API.
|
||||
func (cc *ClientConn) State() (ConnectivityState, error) {
|
||||
return cc.dopts.picker.State()
|
||||
}
|
||||
|
||||
// WaitForStateChange blocks until the state changes to something other than the sourceState.
|
||||
func (cc *Conn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
// It returns the new state or error.
|
||||
// This is EXPERIMENTAL API.
|
||||
func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
return cc.dopts.picker.WaitForStateChange(ctx, sourceState)
|
||||
}
|
||||
*/
|
||||
|
||||
// Close starts to tear down the ClientConn.
|
||||
func (cc *ClientConn) Close() error {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
if sourceState != cc.state {
|
||||
return cc.state, nil
|
||||
if cc.infos == nil {
|
||||
cc.mu.Unlock()
|
||||
return ErrClientConnClosing
|
||||
}
|
||||
infos := cc.infos
|
||||
cc.infos = nil
|
||||
cc.mu.Unlock()
|
||||
cc.balancer.Close()
|
||||
if cc.watcher != nil {
|
||||
cc.watcher.Close()
|
||||
}
|
||||
for _, i := range infos {
|
||||
i.ac.tearDown(ErrClientClosing)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// addrConn is a network connection to a given address.
|
||||
type addrConn struct {
|
||||
cc *ClientConn
|
||||
addr Address
|
||||
dopts dialOptions
|
||||
//resetChan chan int
|
||||
shutdownChan chan struct{}
|
||||
events trace.EventLog
|
||||
|
||||
mu sync.Mutex
|
||||
state ConnectivityState
|
||||
stateCV *sync.Cond
|
||||
down func(error) // the handler called when a connection is down.
|
||||
drain bool
|
||||
// ready is closed and becomes nil when a new transport is up or failed
|
||||
// due to timeout.
|
||||
ready chan struct{}
|
||||
transport transport.ClientTransport
|
||||
}
|
||||
|
||||
func (ac *addrConn) startDrain() {
|
||||
ac.mu.Lock()
|
||||
t := ac.transport
|
||||
ac.drain = true
|
||||
if ac.down != nil {
|
||||
ac.down(ErrConnDrain)
|
||||
ac.down = nil
|
||||
}
|
||||
ac.mu.Unlock()
|
||||
t.GracefulClose()
|
||||
}
|
||||
|
||||
// printf records an event in ac's event log, unless ac has been closed.
|
||||
// REQUIRES ac.mu is held.
|
||||
func (ac *addrConn) printf(format string, a ...interface{}) {
|
||||
if ac.events != nil {
|
||||
ac.events.Printf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// errorf records an error in ac's event log, unless ac has been closed.
|
||||
// REQUIRES ac.mu is held.
|
||||
func (ac *addrConn) errorf(format string, a ...interface{}) {
|
||||
if ac.events != nil {
|
||||
ac.events.Errorf(format, a...)
|
||||
}
|
||||
}
|
||||
|
||||
// getState returns the connectivity state of the Conn
|
||||
func (ac *addrConn) getState() ConnectivityState {
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
return ac.state
|
||||
}
|
||||
|
||||
// waitForStateChange blocks until the state changes to something other than the sourceState.
|
||||
func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
if sourceState != ac.state {
|
||||
return ac.state, nil
|
||||
}
|
||||
done := make(chan struct{})
|
||||
var err error
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
cc.mu.Lock()
|
||||
ac.mu.Lock()
|
||||
err = ctx.Err()
|
||||
cc.stateCV.Broadcast()
|
||||
cc.mu.Unlock()
|
||||
ac.stateCV.Broadcast()
|
||||
ac.mu.Unlock()
|
||||
case <-done:
|
||||
}
|
||||
}()
|
||||
defer close(done)
|
||||
for sourceState == cc.state {
|
||||
cc.stateCV.Wait()
|
||||
for sourceState == ac.state {
|
||||
ac.stateCV.Wait()
|
||||
if err != nil {
|
||||
return cc.state, err
|
||||
return ac.state, err
|
||||
}
|
||||
}
|
||||
return cc.state, nil
|
||||
return ac.state, nil
|
||||
}
|
||||
|
||||
// NotifyReset tries to signal the underlying transport needs to be reset due to
|
||||
// for example a name resolution change in flight.
|
||||
func (cc *Conn) NotifyReset() {
|
||||
select {
|
||||
case cc.resetChan <- 0:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *Conn) resetTransport(closeTransport bool) error {
|
||||
func (ac *addrConn) resetTransport(closeTransport bool) error {
|
||||
var retries int
|
||||
start := time.Now()
|
||||
for {
|
||||
cc.mu.Lock()
|
||||
cc.printf("connecting")
|
||||
if cc.state == Shutdown {
|
||||
// cc.Close() has been invoked.
|
||||
cc.mu.Unlock()
|
||||
return ErrClientConnClosing
|
||||
ac.mu.Lock()
|
||||
ac.printf("connecting")
|
||||
if ac.state == Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
cc.state = Connecting
|
||||
cc.stateCV.Broadcast()
|
||||
cc.mu.Unlock()
|
||||
if closeTransport {
|
||||
cc.transport.Close()
|
||||
if ac.drain {
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
if ac.down != nil {
|
||||
ac.down(ErrNetworkIO)
|
||||
ac.down = nil
|
||||
}
|
||||
ac.state = Connecting
|
||||
ac.stateCV.Broadcast()
|
||||
t := ac.transport
|
||||
ac.mu.Unlock()
|
||||
if closeTransport && t != nil {
|
||||
t.Close()
|
||||
}
|
||||
// Adjust timeout for the current try.
|
||||
copts := cc.dopts.copts
|
||||
copts := ac.dopts.copts
|
||||
if copts.Timeout < 0 {
|
||||
cc.Close()
|
||||
ac.tearDown(ErrClientConnTimeout)
|
||||
return ErrClientConnTimeout
|
||||
}
|
||||
if copts.Timeout > 0 {
|
||||
copts.Timeout -= time.Since(start)
|
||||
if copts.Timeout <= 0 {
|
||||
cc.Close()
|
||||
ac.tearDown(ErrClientConnTimeout)
|
||||
return ErrClientConnTimeout
|
||||
}
|
||||
}
|
||||
sleepTime := cc.dopts.bs.backoff(retries)
|
||||
sleepTime := ac.dopts.bs.backoff(retries)
|
||||
timeout := sleepTime
|
||||
if timeout < minConnectTimeout {
|
||||
timeout = minConnectTimeout
|
||||
|
@ -458,130 +613,134 @@ func (cc *Conn) resetTransport(closeTransport bool) error {
|
|||
copts.Timeout = timeout
|
||||
}
|
||||
connectTime := time.Now()
|
||||
addr, err := cc.dopts.picker.PickAddr()
|
||||
var newTransport transport.ClientTransport
|
||||
if err == nil {
|
||||
newTransport, err = transport.NewClientTransport(addr, &copts)
|
||||
}
|
||||
grpclog.Println("DEBUG reach inside resetTransport 1")
|
||||
newTransport, err := transport.NewClientTransport(ac.addr.Addr, &copts)
|
||||
if err != nil {
|
||||
cc.mu.Lock()
|
||||
if cc.state == Shutdown {
|
||||
// cc.Close() has been invoked.
|
||||
cc.mu.Unlock()
|
||||
return ErrClientConnClosing
|
||||
ac.mu.Lock()
|
||||
if ac.state == Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
return errConnClosing
|
||||
}
|
||||
cc.errorf("transient failure: %v", err)
|
||||
cc.state = TransientFailure
|
||||
cc.stateCV.Broadcast()
|
||||
if cc.ready != nil {
|
||||
close(cc.ready)
|
||||
cc.ready = nil
|
||||
ac.errorf("transient failure: %v", err)
|
||||
ac.state = TransientFailure
|
||||
ac.stateCV.Broadcast()
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
ac.mu.Unlock()
|
||||
sleepTime -= time.Since(connectTime)
|
||||
if sleepTime < 0 {
|
||||
sleepTime = 0
|
||||
}
|
||||
// Fail early before falling into sleep.
|
||||
if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
|
||||
cc.mu.Lock()
|
||||
cc.errorf("connection timeout")
|
||||
cc.mu.Unlock()
|
||||
cc.Close()
|
||||
if ac.dopts.copts.Timeout > 0 && ac.dopts.copts.Timeout < sleepTime+time.Since(start) {
|
||||
ac.mu.Lock()
|
||||
ac.errorf("connection timeout")
|
||||
ac.mu.Unlock()
|
||||
ac.tearDown(ErrClientTimeout)
|
||||
return ErrClientConnTimeout
|
||||
}
|
||||
closeTransport = false
|
||||
time.Sleep(sleepTime)
|
||||
retries++
|
||||
grpclog.Printf("grpc: Conn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
|
||||
grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, ac.addr)
|
||||
continue
|
||||
}
|
||||
cc.mu.Lock()
|
||||
cc.printf("ready")
|
||||
if cc.state == Shutdown {
|
||||
// cc.Close() has been invoked.
|
||||
cc.mu.Unlock()
|
||||
ac.mu.Lock()
|
||||
grpclog.Println("DEBUG reach inside resetTransport 2")
|
||||
ac.printf("ready")
|
||||
if ac.state == Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
newTransport.Close()
|
||||
return ErrClientConnClosing
|
||||
return errConnClosing
|
||||
}
|
||||
cc.state = Ready
|
||||
cc.stateCV.Broadcast()
|
||||
cc.transport = newTransport
|
||||
if cc.ready != nil {
|
||||
close(cc.ready)
|
||||
cc.ready = nil
|
||||
grpclog.Println("DEBUG reach inside resetTransport 3: ", ac.addr)
|
||||
ac.state = Ready
|
||||
ac.stateCV.Broadcast()
|
||||
ac.transport = newTransport
|
||||
ac.down = ac.cc.balancer.Up(ac.addr)
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
ac.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (cc *Conn) reconnect() bool {
|
||||
cc.mu.Lock()
|
||||
if cc.state == Shutdown {
|
||||
// cc.Close() has been invoked.
|
||||
cc.mu.Unlock()
|
||||
return false
|
||||
}
|
||||
cc.state = TransientFailure
|
||||
cc.stateCV.Broadcast()
|
||||
cc.mu.Unlock()
|
||||
if err := cc.resetTransport(true); err != nil {
|
||||
// The ClientConn is closing.
|
||||
cc.mu.Lock()
|
||||
cc.printf("transport exiting: %v", err)
|
||||
cc.mu.Unlock()
|
||||
grpclog.Printf("grpc: Conn.transportMonitor exits due to: %v", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Run in a goroutine to track the error in transport and create the
|
||||
// new transport if an error happens. It returns when the channel is closing.
|
||||
func (cc *Conn) transportMonitor() {
|
||||
func (ac *addrConn) transportMonitor() {
|
||||
for {
|
||||
ac.mu.Lock()
|
||||
t := ac.transport
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
// shutdownChan is needed to detect the teardown when
|
||||
// the ClientConn is idle (i.e., no RPC in flight).
|
||||
case <-cc.shutdownChan:
|
||||
// the addrConn is idle (i.e., no RPC in flight).
|
||||
case <-ac.shutdownChan:
|
||||
return
|
||||
case <-cc.resetChan:
|
||||
if !cc.reconnect() {
|
||||
/*
|
||||
case <-ac.resetChan:
|
||||
if !ac.reconnect() {
|
||||
return
|
||||
}
|
||||
case <-cc.transport.Error():
|
||||
if !cc.reconnect() {
|
||||
*/
|
||||
case <-t.Error():
|
||||
ac.mu.Lock()
|
||||
if ac.state == Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
ac.state = TransientFailure
|
||||
ac.stateCV.Broadcast()
|
||||
ac.mu.Unlock()
|
||||
if err := ac.resetTransport(true); err != nil {
|
||||
ac.mu.Lock()
|
||||
ac.printf("transport exiting: %v", err)
|
||||
ac.mu.Unlock()
|
||||
grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
|
||||
return
|
||||
}
|
||||
/*
|
||||
if !ac.reconnect() {
|
||||
return
|
||||
}
|
||||
*/
|
||||
/*
|
||||
// Tries to drain reset signal if there is any since it is out-dated.
|
||||
select {
|
||||
case <-cc.resetChan:
|
||||
case <-ac.resetChan:
|
||||
default:
|
||||
}
|
||||
*/
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait blocks until i) the new transport is up or ii) ctx is done or iii) cc is closed.
|
||||
func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
|
||||
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed.
|
||||
func (ac *addrConn) wait(ctx context.Context) (transport.ClientTransport, error) {
|
||||
for {
|
||||
cc.mu.Lock()
|
||||
ac.mu.Lock()
|
||||
switch {
|
||||
case cc.state == Shutdown:
|
||||
cc.mu.Unlock()
|
||||
return nil, ErrClientConnClosing
|
||||
case cc.state == Ready:
|
||||
ct := cc.transport
|
||||
cc.mu.Unlock()
|
||||
case ac.state == Shutdown:
|
||||
ac.mu.Unlock()
|
||||
return nil, errConnClosing
|
||||
case ac.state == Ready:
|
||||
ct := ac.transport
|
||||
ac.mu.Unlock()
|
||||
return ct, nil
|
||||
default:
|
||||
ready := cc.ready
|
||||
ready := ac.ready
|
||||
if ready == nil {
|
||||
ready = make(chan struct{})
|
||||
cc.ready = ready
|
||||
ac.ready = ready
|
||||
}
|
||||
cc.mu.Unlock()
|
||||
ac.mu.Unlock()
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil, transport.ContextErr(ctx.Err())
|
||||
|
@ -592,32 +751,36 @@ func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Close starts to tear down the Conn. Returns ErrClientConnClosing if
|
||||
// tearDown starts to tear down the Conn. Returns errConnClosing if
|
||||
// it has been closed (mostly due to dial time-out).
|
||||
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
|
||||
// some edge cases (e.g., the caller opens and closes many ClientConn's in a
|
||||
// some edge cases (e.g., the caller opens and closes many addrConn's in a
|
||||
// tight loop.
|
||||
func (cc *Conn) Close() error {
|
||||
cc.mu.Lock()
|
||||
defer cc.mu.Unlock()
|
||||
if cc.state == Shutdown {
|
||||
return ErrClientConnClosing
|
||||
func (ac *addrConn) tearDown(err error) {
|
||||
ac.mu.Lock()
|
||||
defer ac.mu.Unlock()
|
||||
if ac.down != nil {
|
||||
ac.down(err)
|
||||
ac.down = nil
|
||||
}
|
||||
cc.state = Shutdown
|
||||
cc.stateCV.Broadcast()
|
||||
if cc.events != nil {
|
||||
cc.events.Finish()
|
||||
cc.events = nil
|
||||
if ac.state == Shutdown {
|
||||
return
|
||||
}
|
||||
if cc.ready != nil {
|
||||
close(cc.ready)
|
||||
cc.ready = nil
|
||||
ac.state = Shutdown
|
||||
ac.stateCV.Broadcast()
|
||||
if ac.events != nil {
|
||||
ac.events.Finish()
|
||||
ac.events = nil
|
||||
}
|
||||
if cc.transport != nil {
|
||||
cc.transport.Close()
|
||||
if ac.ready != nil {
|
||||
close(ac.ready)
|
||||
ac.ready = nil
|
||||
}
|
||||
if cc.shutdownChan != nil {
|
||||
close(cc.shutdownChan)
|
||||
if ac.transport != nil {
|
||||
ac.transport.Close()
|
||||
}
|
||||
return nil
|
||||
if ac.shutdownChan != nil {
|
||||
close(ac.shutdownChan)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type memAddr string
|
||||
|
||||
func (a memAddr) Network() string { return "mem" }
|
||||
func (a memAddr) String() string { return string(a) }
|
||||
|
||||
type memListener struct {
|
||||
c chan net.Conn
|
||||
}
|
||||
|
||||
func (ln *memListener) Accept() (net.Conn, error) {
|
||||
conn, ok := <-ln.c
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("closed")
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (ln *memListener) Addr() net.Addr {
|
||||
return memAddr(fmt.Sprintf("%p", ln))
|
||||
}
|
||||
|
||||
func (ln *memListener) Close() error {
|
||||
close(ln.c)
|
||||
return nil
|
||||
}
|
||||
|
||||
func main() {
|
||||
grpc.EnableTracing = true
|
||||
|
||||
ln := &memListener{
|
||||
c: make(chan net.Conn, 1),
|
||||
}
|
||||
|
||||
go func() {
|
||||
s := grpc.NewServer()
|
||||
log.Fatal(s.Serve(ln))
|
||||
}()
|
||||
|
||||
log.Printf("Dialing to server over a synchronous pipe...")
|
||||
serverConn, err := grpc.Dial("inmemory",
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithDialer(func(_ string, _ time.Duration) (net.Conn, error) {
|
||||
c1, c2 := net.Pipe()
|
||||
log.Printf("Pipe created: %v %v", c1, c2)
|
||||
ln.c <- c2
|
||||
log.Printf("Pipe accepted: %v %v", c1, c2)
|
||||
return c1, nil
|
||||
}))
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// BUG: never reached
|
||||
log.Printf("SUCCESS! Connected to server: %v", serverConn)
|
||||
}
|
243
picker.go
243
picker.go
|
@ -1,243 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2014, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/naming"
|
||||
"google.golang.org/grpc/transport"
|
||||
)
|
||||
|
||||
// Picker picks a Conn for RPC requests.
|
||||
// This is EXPERIMENTAL and please do not implement your own Picker for now.
|
||||
type Picker interface {
|
||||
// Init does initial processing for the Picker, e.g., initiate some connections.
|
||||
Init(cc *ClientConn) error
|
||||
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
|
||||
// or some error happens.
|
||||
Pick(ctx context.Context) (transport.ClientTransport, error)
|
||||
// PickAddr picks a peer address for connecting. This will be called repeated for
|
||||
// connecting/reconnecting.
|
||||
PickAddr() (string, error)
|
||||
// State returns the connectivity state of the underlying connections.
|
||||
State() (ConnectivityState, error)
|
||||
// WaitForStateChange blocks until the state changes to something other than
|
||||
// the sourceState. It returns the new state or error.
|
||||
WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error)
|
||||
// Close closes all the Conn's owned by this Picker.
|
||||
Close() error
|
||||
}
|
||||
|
||||
// unicastPicker is the default Picker which is used when there is no custom Picker
|
||||
// specified by users. It always picks the same Conn.
|
||||
type unicastPicker struct {
|
||||
target string
|
||||
conn *Conn
|
||||
}
|
||||
|
||||
func (p *unicastPicker) Init(cc *ClientConn) error {
|
||||
c, err := NewConn(cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.conn = c
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||||
return p.conn.Wait(ctx)
|
||||
}
|
||||
|
||||
func (p *unicastPicker) PickAddr() (string, error) {
|
||||
return p.target, nil
|
||||
}
|
||||
|
||||
func (p *unicastPicker) State() (ConnectivityState, error) {
|
||||
return p.conn.State(), nil
|
||||
}
|
||||
|
||||
func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
return p.conn.WaitForStateChange(ctx, sourceState)
|
||||
}
|
||||
|
||||
func (p *unicastPicker) Close() error {
|
||||
if p.conn != nil {
|
||||
return p.conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// unicastNamingPicker picks an address from a name resolver to set up the connection.
|
||||
type unicastNamingPicker struct {
|
||||
cc *ClientConn
|
||||
resolver naming.Resolver
|
||||
watcher naming.Watcher
|
||||
mu sync.Mutex
|
||||
// The list of the addresses are obtained from watcher.
|
||||
addrs *list.List
|
||||
// It tracks the current picked addr by PickAddr(). The next PickAddr may
|
||||
// push it forward on addrs.
|
||||
pickedAddr *list.Element
|
||||
conn *Conn
|
||||
}
|
||||
|
||||
// NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver
|
||||
// to connect.
|
||||
func NewUnicastNamingPicker(r naming.Resolver) Picker {
|
||||
return &unicastNamingPicker{
|
||||
resolver: r,
|
||||
addrs: list.New(),
|
||||
}
|
||||
}
|
||||
|
||||
type addrInfo struct {
|
||||
addr string
|
||||
// Set to true if this addrInfo needs to be deleted in the next PickAddrr() call.
|
||||
deleting bool
|
||||
}
|
||||
|
||||
// processUpdates calls Watcher.Next() once and processes the obtained updates.
|
||||
func (p *unicastNamingPicker) processUpdates() error {
|
||||
updates, err := p.watcher.Next()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for _, update := range updates {
|
||||
switch update.Op {
|
||||
case naming.Add:
|
||||
p.mu.Lock()
|
||||
p.addrs.PushBack(&addrInfo{
|
||||
addr: update.Addr,
|
||||
})
|
||||
p.mu.Unlock()
|
||||
// Initial connection setup
|
||||
if p.conn == nil {
|
||||
conn, err := NewConn(p.cc)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.conn = conn
|
||||
}
|
||||
case naming.Delete:
|
||||
p.mu.Lock()
|
||||
for e := p.addrs.Front(); e != nil; e = e.Next() {
|
||||
if update.Addr == e.Value.(*addrInfo).addr {
|
||||
if e == p.pickedAddr {
|
||||
// Do not remove the element now if it is the current picked
|
||||
// one. We leave the deletion to the next PickAddr() call.
|
||||
e.Value.(*addrInfo).deleting = true
|
||||
// Notify Conn to close it. All the live RPCs on this connection
|
||||
// will be aborted.
|
||||
p.conn.NotifyReset()
|
||||
} else {
|
||||
p.addrs.Remove(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
p.mu.Unlock()
|
||||
default:
|
||||
grpclog.Println("Unknown update.Op ", update.Op)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher
|
||||
// is closed.
|
||||
func (p *unicastNamingPicker) monitor() {
|
||||
for {
|
||||
if err := p.processUpdates(); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) Init(cc *ClientConn) error {
|
||||
w, err := p.resolver.Resolve(cc.target)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.watcher = w
|
||||
p.cc = cc
|
||||
// Get the initial name resolution.
|
||||
if err := p.processUpdates(); err != nil {
|
||||
return err
|
||||
}
|
||||
go p.monitor()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||||
return p.conn.Wait(ctx)
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) PickAddr() (string, error) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
if p.pickedAddr == nil {
|
||||
p.pickedAddr = p.addrs.Front()
|
||||
} else {
|
||||
pa := p.pickedAddr
|
||||
p.pickedAddr = pa.Next()
|
||||
if pa.Value.(*addrInfo).deleting {
|
||||
p.addrs.Remove(pa)
|
||||
}
|
||||
if p.pickedAddr == nil {
|
||||
p.pickedAddr = p.addrs.Front()
|
||||
}
|
||||
}
|
||||
if p.pickedAddr == nil {
|
||||
return "", fmt.Errorf("there is no address available to pick")
|
||||
}
|
||||
return p.pickedAddr.Value.(*addrInfo).addr, nil
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) State() (ConnectivityState, error) {
|
||||
return 0, fmt.Errorf("State() is not supported for unicastNamingPicker")
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||||
return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker")
|
||||
}
|
||||
|
||||
func (p *unicastNamingPicker) Close() error {
|
||||
p.watcher.Close()
|
||||
p.conn.Close()
|
||||
return nil
|
||||
}
|
188
picker_test.go
188
picker_test.go
|
@ -1,188 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Copyright 2014, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package grpc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc/naming"
|
||||
)
|
||||
|
||||
type testWatcher struct {
|
||||
// the channel to receives name resolution updates
|
||||
update chan *naming.Update
|
||||
// the side channel to get to know how many updates in a batch
|
||||
side chan int
|
||||
// the channel to notifiy update injector that the update reading is done
|
||||
readDone chan int
|
||||
}
|
||||
|
||||
func (w *testWatcher) Next() (updates []*naming.Update, err error) {
|
||||
n := <-w.side
|
||||
if n == 0 {
|
||||
return nil, fmt.Errorf("w.side is closed")
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
u := <-w.update
|
||||
if u != nil {
|
||||
updates = append(updates, u)
|
||||
}
|
||||
}
|
||||
w.readDone <- 0
|
||||
return
|
||||
}
|
||||
|
||||
func (w *testWatcher) Close() {
|
||||
}
|
||||
|
||||
func (w *testWatcher) inject(updates []*naming.Update) {
|
||||
w.side <- len(updates)
|
||||
for _, u := range updates {
|
||||
w.update <- u
|
||||
}
|
||||
<-w.readDone
|
||||
}
|
||||
|
||||
type testNameResolver struct {
|
||||
w *testWatcher
|
||||
addr string
|
||||
}
|
||||
|
||||
func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
|
||||
r.w = &testWatcher{
|
||||
update: make(chan *naming.Update, 1),
|
||||
side: make(chan int, 1),
|
||||
readDone: make(chan int),
|
||||
}
|
||||
r.w.side <- 1
|
||||
r.w.update <- &naming.Update{
|
||||
Op: naming.Add,
|
||||
Addr: r.addr,
|
||||
}
|
||||
go func() {
|
||||
<-r.w.readDone
|
||||
}()
|
||||
return r.w, nil
|
||||
}
|
||||
|
||||
func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
|
||||
var servers []*server
|
||||
for i := 0; i < numServers; i++ {
|
||||
s := newTestServer()
|
||||
servers = append(servers, s)
|
||||
go s.start(t, port, maxStreams)
|
||||
s.wait(t, 2*time.Second)
|
||||
}
|
||||
// Point to server1
|
||||
addr := "127.0.0.1:" + servers[0].port
|
||||
return servers, &testNameResolver{
|
||||
addr: addr,
|
||||
}
|
||||
}
|
||||
|
||||
func TestNameDiscovery(t *testing.T) {
|
||||
// Start 3 servers on 3 ports.
|
||||
servers, r := startServers(t, 3, 0, math.MaxUint32)
|
||||
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||
}
|
||||
var reply string
|
||||
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||
}
|
||||
// Inject name resolution change to point to the second server now.
|
||||
var updates []*naming.Update
|
||||
updates = append(updates, &naming.Update{
|
||||
Op: naming.Delete,
|
||||
Addr: "127.0.0.1:" + servers[0].port,
|
||||
})
|
||||
updates = append(updates, &naming.Update{
|
||||
Op: naming.Add,
|
||||
Addr: "127.0.0.1:" + servers[1].port,
|
||||
})
|
||||
r.w.inject(updates)
|
||||
servers[0].stop()
|
||||
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||
}
|
||||
// Add another server address (server#3) to name resolution
|
||||
updates = nil
|
||||
updates = append(updates, &naming.Update{
|
||||
Op: naming.Add,
|
||||
Addr: "127.0.0.1:" + servers[2].port,
|
||||
})
|
||||
r.w.inject(updates)
|
||||
// Stop server#2. The library should direct to server#3 automatically.
|
||||
servers[1].stop()
|
||||
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||
}
|
||||
cc.Close()
|
||||
servers[2].stop()
|
||||
}
|
||||
|
||||
func TestEmptyAddrs(t *testing.T) {
|
||||
servers, r := startServers(t, 1, 0, math.MaxUint32)
|
||||
cc, err := Dial("foo.bar.com", WithPicker(NewUnicastNamingPicker(r)), WithBlock(), WithInsecure(), WithCodec(testCodec{}))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create ClientConn: %v", err)
|
||||
}
|
||||
var reply string
|
||||
if err := Invoke(context.Background(), "/foo/bar", &expectedRequest, &reply, cc); err != nil || reply != expectedResponse {
|
||||
t.Fatalf("grpc.Invoke(_, _, _, _, _) = %v, want <nil>", err)
|
||||
}
|
||||
// Inject name resolution change to remove the server address so that there is no address
|
||||
// available after that.
|
||||
var updates []*naming.Update
|
||||
updates = append(updates, &naming.Update{
|
||||
Op: naming.Delete,
|
||||
Addr: "127.0.0.1:" + servers[0].port,
|
||||
})
|
||||
r.w.inject(updates)
|
||||
// Loop until the above updates apply.
|
||||
for {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if err := Invoke(ctx, "/foo/bar", &expectedRequest, &reply, cc); err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
cc.Close()
|
||||
servers[0].stop()
|
||||
}
|
10
stream.go
10
stream.go
|
@ -103,8 +103,10 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||
var (
|
||||
t transport.ClientTransport
|
||||
err error
|
||||
put func()
|
||||
)
|
||||
t, err = cc.dopts.picker.Pick(ctx)
|
||||
//t, err = cc.dopts.picker.Pick(ctx)
|
||||
t, put, err = cc.getTransport(ctx)
|
||||
if err != nil {
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
|
@ -119,6 +121,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||
}
|
||||
cs := &clientStream{
|
||||
desc: desc,
|
||||
put: put,
|
||||
codec: cc.dopts.codec,
|
||||
cp: cc.dopts.cp,
|
||||
dc: cc.dopts.dc,
|
||||
|
@ -174,6 +177,7 @@ type clientStream struct {
|
|||
tracing bool // set to EnableTracing when the clientStream is created.
|
||||
|
||||
mu sync.Mutex
|
||||
put func()
|
||||
closed bool
|
||||
// trInfo.tr is set when the clientStream is created (if EnableTracing is true),
|
||||
// and is set to nil when the clientStream's finish method is called.
|
||||
|
@ -311,6 +315,10 @@ func (cs *clientStream) finish(err error) {
|
|||
}
|
||||
cs.mu.Lock()
|
||||
defer cs.mu.Unlock()
|
||||
if cs.put != nil {
|
||||
cs.put()
|
||||
cs.put = nil
|
||||
}
|
||||
if cs.trInfo.tr != nil {
|
||||
if err == nil || err == io.EOF {
|
||||
cs.trInfo.tr.LazyPrintf("RPC: [OK]")
|
||||
|
|
|
@ -332,14 +332,15 @@ func TestReconnectTimeout(t *testing.T) {
|
|||
ResponseSize: proto.Int32(respSize),
|
||||
Payload: payload,
|
||||
}
|
||||
if _, err := tc.UnaryCall(context.Background(), req); err == nil {
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.UnaryCall(ctx, req); err == nil {
|
||||
t.Errorf("TestService/UnaryCall(_, _) = _, <nil>, want _, non-nil")
|
||||
return
|
||||
}
|
||||
}()
|
||||
// Block until reconnect times out.
|
||||
<-waitC
|
||||
if err := conn.Close(); err != grpc.ErrClientConnClosing {
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing)
|
||||
}
|
||||
}
|
||||
|
@ -594,36 +595,36 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||
}
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||
}
|
||||
if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||
}
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != context.DeadlineExceeded {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, err, context.DeadlineExceeded)
|
||||
}
|
||||
//ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||
//}
|
||||
//ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||
//}
|
||||
//if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||
// t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||
//}
|
||||
//ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != context.DeadlineExceeded {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, %v", grpc.Ready, err, context.DeadlineExceeded)
|
||||
//}
|
||||
te.srv.Stop()
|
||||
// Set -1 as the timeout to make sure if transportMonitor gets error
|
||||
// notification in time the failure path of the 1st invoke of
|
||||
// ClientConn.wait hits the deadline exceeded error.
|
||||
ctx, _ = context.WithTimeout(context.Background(), -1)
|
||||
ctx, _ := context.WithTimeout(context.Background(), -1)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); grpc.Code(err) != codes.DeadlineExceeded {
|
||||
t.Fatalf("TestService/EmptyCall(%v, _) = _, error %v, want _, error code: %d", ctx, err, codes.DeadlineExceeded)
|
||||
}
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
|
||||
}
|
||||
if state, err := cc.State(); err != nil || (state != grpc.Connecting && state != grpc.TransientFailure) {
|
||||
t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
|
||||
}
|
||||
//ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
|
||||
//}
|
||||
//if state, err := cc.State(); err != nil || (state != grpc.Connecting && state != grpc.TransientFailure) {
|
||||
// t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
|
||||
//}
|
||||
cc.Close()
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
@ -784,21 +785,23 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
|||
cc := te.clientConn()
|
||||
|
||||
// Wait until cc is connected.
|
||||
ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||
}
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||
}
|
||||
if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||
}
|
||||
//ctx, _ := context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Idle); err != nil {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Idle, err)
|
||||
//}
|
||||
//ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
//if _, err := cc.WaitForStateChange(ctx, grpc.Connecting); err != nil {
|
||||
// t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Connecting, err)
|
||||
//}
|
||||
//if state, err := cc.State(); err != nil || state != grpc.Ready {
|
||||
// t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Ready)
|
||||
//}
|
||||
/*
|
||||
ctx, _ = context.WithTimeout(context.Background(), time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err == nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, <nil>, want _, %v", grpc.Ready, context.DeadlineExceeded)
|
||||
}
|
||||
*/
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
var header metadata.MD
|
||||
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
|
||||
|
@ -811,7 +814,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
|||
|
||||
te.srv.Stop()
|
||||
cc.Close()
|
||||
|
||||
/*
|
||||
ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
if _, err := cc.WaitForStateChange(ctx, grpc.Ready); err != nil {
|
||||
t.Fatalf("cc.WaitForStateChange(_, %s) = _, %v, want _, <nil>", grpc.Ready, err)
|
||||
|
@ -819,6 +822,7 @@ func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
|
|||
if state, err := cc.State(); err != nil || state != grpc.Shutdown {
|
||||
t.Fatalf("cc.State() = %s, %v, want %s, <nil>", state, err, grpc.Shutdown)
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
func TestFailedEmptyUnary(t *testing.T) {
|
||||
|
@ -1000,7 +1004,6 @@ func testRetry(t *testing.T, e env) {
|
|||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
numRPC := 1000
|
||||
|
|
Разница между файлами не показана из-за своего большого размера
Загрузить разницу
|
@ -272,6 +272,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||
}
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil {
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
|
@ -390,6 +394,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||
func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
var updateStreams bool
|
||||
t.mu.Lock()
|
||||
if t.activeStreams == nil {
|
||||
t.mu.Unlock()
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
if t.streamsQuota != nil {
|
||||
updateStreams = true
|
||||
}
|
||||
|
@ -457,6 +466,17 @@ func (t *http2Client) Close() (err error) {
|
|||
return
|
||||
}
|
||||
|
||||
func (t *http2Client) GracefulClose() error {
|
||||
t.mu.Lock()
|
||||
active := len(t.activeStreams)
|
||||
t.activeStreams = nil
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
return t.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Write formats the data into HTTP2 data frame(s) and sends it out. The caller
|
||||
// should proceed only if Write returns nil.
|
||||
// TODO(zhaoq): opts.Delay is ignored in this implementation. Support it later
|
||||
|
|
|
@ -391,6 +391,8 @@ type ClientTransport interface {
|
|||
// is called only once.
|
||||
Close() error
|
||||
|
||||
GracefulClose() error
|
||||
|
||||
// Write sends the data for the given stream. A nil stream indicates
|
||||
// the write is to be performed on the transport as a whole.
|
||||
Write(s *Stream, data []byte, opts *Options) error
|
||||
|
|
Загрузка…
Ссылка в новой задаче