Send Err in QueryService.StreamExecute's QueryResult responses, handle them in tabletconn

This commit is contained in:
Ammar Aijazi 2015-06-09 18:38:06 -07:00
Родитель 876c4a658b
Коммит 856856f234
3 изменённых файлов: 74 добавлений и 4 удалений

Просмотреть файл

@ -78,9 +78,24 @@ func (sq *SqlQuery) Execute(ctx context.Context, query *proto.Query, reply *mpro
// StreamExecute is exposing tabletserver.SqlQuery.StreamExecute
func (sq *SqlQuery) StreamExecute(ctx context.Context, query *proto.Query, sendReply func(reply interface{}) error) (err error) {
defer sq.server.HandlePanic(&err)
return sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), query, func(reply *mproto.QueryResult) error {
tErr := sq.server.StreamExecute(callinfo.RPCWrapCallInfo(ctx), query, func(reply *mproto.QueryResult) error {
return sendReply(reply)
})
if tErr == nil {
return nil
}
if *tabletserver.RPCErrorOnlyInReply {
// If there was an app error, send a QueryResult back with it.
qr := new(mproto.QueryResult)
tabletserver.AddTabletErrorToQueryResult(tErr, qr)
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
// problems where the client will get out of sync with the number of QueryResults sent.
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
// (signalling that all clients are able to handle new-style errors).
return sendReply(qr)
}
return tErr
}
// ExecuteBatch is exposing tabletserver.SqlQuery.ExecuteBatch

Просмотреть файл

@ -172,15 +172,36 @@ func (conn *TabletBson) StreamExecute(ctx context.Context, query string, bindVar
if !ok {
return nil, nil, tabletError(c.Error)
}
// SqlQuery.StreamExecute might return an application error inside the QueryResult
vtErr := vterrors.FromRPCError(firstResult.Err)
if vtErr != nil {
return nil, nil, tabletError(vtErr)
}
srout := make(chan *mproto.QueryResult, 1)
go func() {
defer close(srout)
srout <- firstResult
for r := range sr {
srout <- r
vtErr = vterrors.FromRPCError(r.Err)
// If we get a QueryResult with an RPCError, that was an extra QueryResult sent by
// the server specifically to indicate an error, and we shouldn't surface it to clients.
if vtErr == nil {
srout <- r
}
}
}()
return srout, func() error { return tabletError(c.Error) }, nil
// errFunc will return either an RPC-layer error or an application error, if one exists.
// It will only return the most recent application error (i.e, from the QueryResult that
// most recently contained an error). It will prioritize an RPC-layer error over an apperror,
// if both exist.
errFunc := func() error {
rpcErr := tabletError(c.Error)
if rpcErr != nil {
return rpcErr
}
return tabletError(vtErr)
}
return srout, errFunc, nil
}
// Begin starts a transaction.

Просмотреть файл

@ -323,6 +323,9 @@ func (f *FakeQueryService) StreamExecute(ctx context.Context, query *proto.Query
<-panicWait
panic(fmt.Errorf("test-triggered panic late"))
}
if f.hasError {
return testTabletError
}
if err := sendReply(&streamExecuteQueryResult2); err != nil {
f.t.Errorf("sendReply2 failed: %v", err)
}
@ -399,6 +402,37 @@ func testStreamExecute(t *testing.T, conn tabletconn.TabletConn) {
}
}
func testStreamExecuteError(t *testing.T, conn tabletconn.TabletConn) {
t.Log("testStreamExecuteError")
ctx := context.Background()
stream, errFunc, err := conn.StreamExecute(ctx, streamExecuteQuery, streamExecuteBindVars, streamExecuteTransactionId)
if err != nil {
t.Fatalf("StreamExecute failed: %v", err)
}
qr, ok := <-stream
if !ok {
t.Fatalf("StreamExecute failed: cannot read result1")
}
if len(qr.Rows) == 0 {
qr.Rows = nil
}
if !reflect.DeepEqual(*qr, streamExecuteQueryResult1) {
t.Errorf("Unexpected result1 from StreamExecute: got %v wanted %v", qr, streamExecuteQueryResult1)
}
// After 1 result, we expect to get an error (no more results).
qr, ok = <-stream
if ok {
t.Fatalf("StreamExecute channel wasn't closed")
}
err = errFunc()
if err == nil {
t.Fatalf("StreamExecute was expecting an error, didn't get one")
}
if err.Error() != expectedErr {
t.Errorf("Unexpected error from StreamExecute: got %v wanted %v", err, expectedErr)
}
}
func testStreamExecutePanics(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService) {
t.Log("testStreamExecutePanics")
// early panic is before sending the Fields, that is returned
@ -643,7 +677,7 @@ func TestSuite(t *testing.T, conn tabletconn.TabletConn, fake *FakeQueryService)
testCommitError(t, conn)
testRollbackError(t, conn)
testExecuteError(t, conn)
// testStreamExecuteError(t, conn)
testStreamExecuteError(t, conn)
testExecuteBatchError(t, conn)
testSplitQueryError(t, conn)
fake.hasError = false