This commit is contained in:
iamqizhao 2015-07-29 10:51:07 -07:00
Родитель 4e2e070c77
Коммит 199ad57e0d
3 изменённых файлов: 83 добавлений и 2 удалений

Просмотреть файл

@ -44,6 +44,8 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport"
"google.golang.org/grpc/grpclog"
)
type streamHandler func(srv interface{}, stream ServerStream) error
@ -187,7 +189,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
grpclog.Println("start recv")
err = recv(cs.p, cs.codec, m)
grpclog.Println("recv sth")
defer func() {
// err != nil indicates the termination of the stream.
if err != nil {

Просмотреть файл

@ -826,9 +826,9 @@ func TestFailedServerStreaming(t *testing.T) {
}
func testFailedServerStreaming(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, "", e)
_, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
//defer tearDown(s, cc)
respParam := make([]*testpb.ResponseParameters, len(respSizes))
for i, s := range respSizes {
respParam[i] = &testpb.ResponseParameters{

Просмотреть файл

@ -355,6 +355,83 @@ func TestLargeMessageSuspension(t *testing.T) {
server.stop()
}
func TestMaxStreams(t *testing.T) {
server, ct := setUp(t, 0, 1, suspended)
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
}
// Have a pending stream which takes all streams quota.
s, err := ct.NewStream(context.Background(), callHdr)
if err != nil {
t.Fatalf("failed to open stream: %v", err)
}
cc, ok := ct.(*http2Client)
if !ok {
t.Fatalf("Failed to convert %v to *http2Client", ct)
}
done := make(chan struct{})
ch := make(chan int)
go func() {
for {
select {
case <-time.After(5 * time.Millisecond):
ch <- 0
case <-time.After(5 * time.Second):
close(done)
return
}
}
}()
for {
select {
case <-ch:
case <-done:
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
}
cc.mu.Lock()
// cc.streamsQuota should be initialized once receiving the 1st setting frame from
// the server.
if cc.streamsQuota != nil {
cc.mu.Unlock()
select {
case <-cc.streamsQuota.acquire():
t.Fatalf("streamsQuota.acquire() becomes readable mistakenly.")
default:
if cc.streamsQuota.quota != 0 {
t.Fatalf("streamsQuota.quota got non-zero quota mistakenly.")
}
}
break
}
cc.mu.Unlock()
}
// Close the pending stream so that the streams quota becomes available for the next new stream.
ct.CloseStream(s, nil)
for {
select {
case <-ch:
case <-done:
t.Fatalf("Client has not received the max stream setting in 5 seconds.")
}
select {
case i:=<-cc.streamsQuota.acquire():
if i != 1 {
t.Fatalf("fadfa")
}
cc.streamsQuota.add(i)
default:
t.Fatalf("43092")
}
break
}
if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
t.Fatalf("failed to open stream: %v", err)
}
ct.Close()
server.stop()
}
func TestServerWithMisbehavedClient(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
callHdr := &CallHdr{