Use bufconn in end2end tests.
This commit is contained in:
Родитель
9a71c7940b
Коммит
c7e2c00ed1
|
@ -53,6 +53,7 @@ import (
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"google.golang.org/grpc/tap"
|
"google.golang.org/grpc/tap"
|
||||||
|
"google.golang.org/grpc/test/bufconn"
|
||||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -354,7 +355,7 @@ const tlsDir = "testdata/"
|
||||||
|
|
||||||
type env struct {
|
type env struct {
|
||||||
name string
|
name string
|
||||||
network string // The type of network such as tcp, unix, etc.
|
network string // The type of network such as tcp, unix, bufconn, etc.
|
||||||
security string // The security protocol such as TLS, SSH, etc.
|
security string // The security protocol such as TLS, SSH, etc.
|
||||||
httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS
|
httpHandler bool // whether to use the http.Handler ServerTransport; requires TLS
|
||||||
balancer bool // whether to use balancer
|
balancer bool // whether to use balancer
|
||||||
|
@ -367,21 +368,19 @@ func (e env) runnable() bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) {
|
const bufconnNetwork = "bufconn"
|
||||||
return net.DialTimeout(e.network, addr, timeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
var (
|
var (
|
||||||
tcpClearEnv = env{name: "tcp-clear", network: "tcp", balancer: true}
|
tcpClearEnv = env{name: "tcp-clear", network: "tcp", balancer: true}
|
||||||
tcpTLSEnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: true}
|
tcpTLSEnv = env{name: "tcp-tls", network: "tcp", security: "tls", balancer: true}
|
||||||
unixClearEnv = env{name: "unix-clear", network: "unix", balancer: true}
|
unixClearEnv = env{name: "unix-clear", network: "unix", balancer: true}
|
||||||
unixTLSEnv = env{name: "unix-tls", network: "unix", security: "tls", balancer: true}
|
unixTLSEnv = env{name: "unix-tls", network: "unix", security: "tls", balancer: true}
|
||||||
handlerEnv = env{name: "handler-tls", network: "tcp", security: "tls", httpHandler: true, balancer: true}
|
handlerEnv = env{name: "handler-tls", network: bufconnNetwork, security: "tls", httpHandler: true, balancer: true}
|
||||||
noBalancerEnv = env{name: "no-balancer", network: "tcp", security: "tls", balancer: false}
|
noBalancerEnv = env{name: "no-balancer", network: bufconnNetwork, security: "tls", balancer: false}
|
||||||
allEnv = []env{tcpClearEnv, tcpTLSEnv, unixClearEnv, unixTLSEnv, handlerEnv, noBalancerEnv}
|
allEnv = []env{tcpClearEnv, tcpTLSEnv, unixClearEnv, unixTLSEnv, handlerEnv, noBalancerEnv}
|
||||||
)
|
)
|
||||||
|
|
||||||
var onlyEnv = flag.String("only_env", "", "If non-empty, one of 'tcp-clear', 'tcp-tls', 'unix-clear', 'unix-tls', or 'handler-tls' to only run the tests for that environment. Empty means all.")
|
var onlyEnv = flag.String("only_env", "", "Restrict tests to the named environment. Empty means all.")
|
||||||
|
|
||||||
func listTestEnv() (envs []env) {
|
func listTestEnv() (envs []env) {
|
||||||
if *onlyEnv != "" {
|
if *onlyEnv != "" {
|
||||||
|
@ -442,6 +441,7 @@ type test struct {
|
||||||
// srv and srvAddr are set once startServer is called.
|
// srv and srvAddr are set once startServer is called.
|
||||||
srv *grpc.Server
|
srv *grpc.Server
|
||||||
srvAddr string
|
srvAddr string
|
||||||
|
lis net.Listener
|
||||||
|
|
||||||
cc *grpc.ClientConn // nil until requested via clientConn
|
cc *grpc.ClientConn // nil until requested via clientConn
|
||||||
restoreLogs func() // nil unless declareLogNoise is used
|
restoreLogs func() // nil unless declareLogNoise is used
|
||||||
|
@ -523,7 +523,12 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
|
||||||
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
|
la = "/tmp/testsock" + fmt.Sprintf("%d", time.Now().UnixNano())
|
||||||
syscall.Unlink(la)
|
syscall.Unlink(la)
|
||||||
}
|
}
|
||||||
lis, err := net.Listen(te.e.network, la)
|
var err error
|
||||||
|
if te.e.network == bufconnNetwork {
|
||||||
|
te.lis = bufconn.Listen(1024 * 1024)
|
||||||
|
} else {
|
||||||
|
te.lis, err = net.Listen(te.e.network, la)
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
te.t.Fatalf("Failed to listen: %v", err)
|
te.t.Fatalf("Failed to listen: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -556,24 +561,36 @@ func (te *test) startServer(ts testpb.TestServiceServer) {
|
||||||
addr := la
|
addr := la
|
||||||
switch te.e.network {
|
switch te.e.network {
|
||||||
case "unix":
|
case "unix":
|
||||||
|
case "bufconn":
|
||||||
|
addr = te.lis.Addr().String()
|
||||||
default:
|
default:
|
||||||
_, port, err := net.SplitHostPort(lis.Addr().String())
|
_, port, err := net.SplitHostPort(te.lis.Addr().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
te.t.Fatalf("Failed to parse listener address: %v", err)
|
te.t.Fatalf("Failed to parse listener address: %v", err)
|
||||||
}
|
}
|
||||||
addr = "localhost:" + port
|
addr = "localhost:" + port
|
||||||
}
|
}
|
||||||
|
|
||||||
go s.Serve(lis)
|
go s.Serve(te.lis)
|
||||||
te.srvAddr = addr
|
te.srvAddr = addr
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (te *test) dialer(addr string, timeout time.Duration) (net.Conn, error) {
|
||||||
|
if te.lis == nil {
|
||||||
|
return nil, fmt.Errorf("no listener")
|
||||||
|
}
|
||||||
|
if te.e.network == bufconnNetwork {
|
||||||
|
return te.lis.(*bufconn.Listener).Dial()
|
||||||
|
}
|
||||||
|
return net.DialTimeout(te.e.network, addr, timeout)
|
||||||
|
}
|
||||||
|
|
||||||
func (te *test) clientConn() *grpc.ClientConn {
|
func (te *test) clientConn() *grpc.ClientConn {
|
||||||
if te.cc != nil {
|
if te.cc != nil {
|
||||||
return te.cc
|
return te.cc
|
||||||
}
|
}
|
||||||
opts := []grpc.DialOption{
|
opts := []grpc.DialOption{
|
||||||
grpc.WithDialer(te.e.dialer),
|
grpc.WithDialer(te.dialer),
|
||||||
grpc.WithUserAgent(te.userAgent),
|
grpc.WithUserAgent(te.userAgent),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -644,7 +661,7 @@ func (te *test) declareLogNoise(phrases ...string) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (te *test) withServerTester(fn func(st *serverTester)) {
|
func (te *test) withServerTester(fn func(st *serverTester)) {
|
||||||
c, err := te.e.dialer(te.srvAddr, 10*time.Second)
|
c, err := te.dialer(te.srvAddr, 10*time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
te.t.Fatal(err)
|
te.t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
@ -1089,7 +1106,7 @@ func testFailFast(t *testing.T, e env) {
|
||||||
if grpc.Code(err) == codes.Unavailable {
|
if grpc.Code(err) == codes.Unavailable {
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
fmt.Printf("%v.EmptyCall(_, _) = _, %v", tc, err)
|
t.Logf("%v.EmptyCall(_, _) = _, %v", tc, err)
|
||||||
time.Sleep(10 * time.Millisecond)
|
time.Sleep(10 * time.Millisecond)
|
||||||
}
|
}
|
||||||
// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
|
// The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.
|
||||||
|
@ -2222,7 +2239,8 @@ func testPeerClientSide(t *testing.T, e env) {
|
||||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||||
}
|
}
|
||||||
pa := peer.Addr.String()
|
pa := peer.Addr.String()
|
||||||
if e.network == "unix" {
|
switch e.network {
|
||||||
|
case "unix", "bufconn":
|
||||||
if pa != te.srvAddr {
|
if pa != te.srvAddr {
|
||||||
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
|
t.Fatalf("peer.Addr = %v, want %v", pa, te.srvAddr)
|
||||||
}
|
}
|
||||||
|
@ -3968,12 +3986,8 @@ func TestDialWithBlockErrorOnBadCertificates(t *testing.T) {
|
||||||
te.startServer(&testServer{security: te.e.security})
|
te.startServer(&testServer{security: te.e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
var (
|
var err error
|
||||||
err error
|
te.cc, err = grpc.Dial(te.srvAddr, grpc.WithTransportCredentials(clientAlwaysFailCred{}), grpc.WithBlock(), grpc.WithDialer(te.dialer))
|
||||||
opts []grpc.DialOption
|
|
||||||
)
|
|
||||||
opts = append(opts, grpc.WithTransportCredentials(clientAlwaysFailCred{}), grpc.WithBlock())
|
|
||||||
te.cc, err = grpc.Dial(te.srvAddr, opts...)
|
|
||||||
if err != errClientAlwaysFailCred {
|
if err != errClientAlwaysFailCred {
|
||||||
te.t.Fatalf("Dial(%q) = %v, want %v", te.srvAddr, err, errClientAlwaysFailCred)
|
te.t.Fatalf("Dial(%q) = %v, want %v", te.srvAddr, err, errClientAlwaysFailCred)
|
||||||
}
|
}
|
||||||
|
@ -4435,10 +4449,8 @@ func (ss *stubServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallSer
|
||||||
|
|
||||||
// Start starts the server and creates a client connected to it.
|
// Start starts the server and creates a client connected to it.
|
||||||
func (ss *stubServer) Start() error {
|
func (ss *stubServer) Start() error {
|
||||||
lis, err := net.Listen("tcp", "localhost:0")
|
lis := bufconn.Listen(1024 * 1024)
|
||||||
if err != nil {
|
dialer := func(string, time.Duration) (net.Conn, error) { return lis.Dial() }
|
||||||
return fmt.Errorf(`net.Listen("tcp", "localhost:0") = %v`, err)
|
|
||||||
}
|
|
||||||
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
|
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
|
||||||
|
|
||||||
s := grpc.NewServer()
|
s := grpc.NewServer()
|
||||||
|
@ -4446,7 +4458,7 @@ func (ss *stubServer) Start() error {
|
||||||
go s.Serve(lis)
|
go s.Serve(lis)
|
||||||
ss.cleanups = append(ss.cleanups, s.Stop)
|
ss.cleanups = append(ss.cleanups, s.Stop)
|
||||||
|
|
||||||
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock())
|
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithInsecure(), grpc.WithBlock(), grpc.WithDialer(dialer))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
|
return fmt.Errorf("grpc.Dial(%q) = %v", lis.Addr().String(), err)
|
||||||
}
|
}
|
||||||
|
|
Загрузка…
Ссылка в новой задаче