Make sure all in-flight streams close when ClientConn.Close() is called. (#1136)
* Make sure all in-flight streams close when ClientConn.Close() is called. * added test
This commit is contained in:
Родитель
6d0e6b04b3
Коммит
2d949be2fe
|
@ -237,6 +237,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||||
select {
|
select {
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
// Incur transport error, simply exit.
|
// Incur transport error, simply exit.
|
||||||
|
case <-cc.ctx.Done():
|
||||||
|
cs.finish(ErrClientConnClosing)
|
||||||
|
cs.closeTransportStream(ErrClientConnClosing)
|
||||||
case <-s.Done():
|
case <-s.Done():
|
||||||
// TODO: The trace of the RPC is terminated here when there is no pending
|
// TODO: The trace of the RPC is terminated here when there is no pending
|
||||||
// I/O, which is probably not the optimal solution.
|
// I/O, which is probably not the optimal solution.
|
||||||
|
|
|
@ -995,6 +995,41 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
|
||||||
awaitNewConnLogOutput()
|
awaitNewConnLogOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
if e.name == "handler-tls" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
testClientConnCloseAfterGoAwayWithActiveStream(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.startServer(&testServer{security: e.security})
|
||||||
|
defer te.tearDown()
|
||||||
|
cc := te.clientConn()
|
||||||
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
|
|
||||||
|
if _, err := tc.FullDuplexCall(context.Background()); err != nil {
|
||||||
|
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
te.srv.GracefulStop()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
cc.Close()
|
||||||
|
timeout := time.NewTimer(time.Second)
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-timeout.C:
|
||||||
|
t.Fatalf("Test timed-out.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestFailFast(t *testing.T) {
|
func TestFailFast(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
|
|
Загрузка…
Ссылка в новой задаче