зеркало из https://github.com/github/vitess-gh.git
Revert the remove of the --rpc-error-only-in-reply flag for Gorpc VtGate, as there are clients that depend on it for now
This commit is contained in:
Родитель
5a705431c6
Коммит
5e00421dab
|
@ -45,7 +45,10 @@ func (vtg *VTGate) Execute(ctx context.Context, request *proto.Query, reply *pro
|
|||
request.NotInTransaction,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteShard is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -66,7 +69,10 @@ func (vtg *VTGate) ExecuteShard(ctx context.Context, request *proto.QueryShard,
|
|||
request.NotInTransaction,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteKeyspaceIds is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -87,7 +93,10 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *proto.Keyspa
|
|||
request.NotInTransaction,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteKeyRanges is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -108,7 +117,10 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *proto.KeyRange
|
|||
request.NotInTransaction,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteEntityIds is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -130,7 +142,10 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *proto.EntityId
|
|||
request.NotInTransaction,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteBatchShard is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -148,7 +163,10 @@ func (vtg *VTGate) ExecuteBatchShard(ctx context.Context, request *proto.BatchQu
|
|||
request.Session,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResultList(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// ExecuteBatchKeyspaceIds is the RPC version of
|
||||
|
@ -167,7 +185,10 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *proto.K
|
|||
request.Session,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToQueryResultList(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// StreamExecute is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -201,10 +222,18 @@ func (vtg *VTGate) StreamExecute2(ctx context.Context, request *proto.Query, sen
|
|||
if vtgErr == nil {
|
||||
return nil
|
||||
}
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
return sendReply(qr)
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
|
||||
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
|
||||
// problems where the client will get out of sync with the number of QueryResults sent.
|
||||
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
|
||||
// (signalling that all clients are able to handle new-style errors).
|
||||
return sendReply(qr)
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// StreamExecuteShard is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -242,10 +271,18 @@ func (vtg *VTGate) StreamExecuteShard2(ctx context.Context, request *proto.Query
|
|||
if vtgErr == nil {
|
||||
return nil
|
||||
}
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
return sendReply(qr)
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
|
||||
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
|
||||
// problems where the client will get out of sync with the number of QueryResults sent.
|
||||
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
|
||||
// (signalling that all clients are able to handle new-style errors).
|
||||
return sendReply(qr)
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// StreamExecuteKeyspaceIds is the RPC version of
|
||||
|
@ -285,10 +322,18 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds2(ctx context.Context, request *proto
|
|||
if vtgErr == nil {
|
||||
return nil
|
||||
}
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
return sendReply(qr)
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
|
||||
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
|
||||
// problems where the client will get out of sync with the number of QueryResults sent.
|
||||
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
|
||||
// (signalling that all clients are able to handle new-style errors).
|
||||
return sendReply(qr)
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// StreamExecuteKeyRanges is the RPC version of
|
||||
|
@ -328,10 +373,18 @@ func (vtg *VTGate) StreamExecuteKeyRanges2(ctx context.Context, request *proto.K
|
|||
if vtgErr == nil {
|
||||
return nil
|
||||
}
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
return sendReply(qr)
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
// If there was an app error, send a QueryResult back with it.
|
||||
qr := new(proto.QueryResult)
|
||||
vtgate.AddVtGateErrorToQueryResult(vtgErr, qr)
|
||||
// Sending back errors this way is not backwards compatible. If a (new) server sends an additional
|
||||
// QueryResult with an error, and the (old) client doesn't know how to read it, it will cause
|
||||
// problems where the client will get out of sync with the number of QueryResults sent.
|
||||
// That's why this the error is only sent this way when the --rpc_errors_only_in_reply flag is set
|
||||
// (signalling that all clients are able to handle new-style errors).
|
||||
return sendReply(qr)
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// Begin is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -370,7 +423,10 @@ func (vtg *VTGate) Begin2(ctx context.Context, request *proto.BeginRequest, repl
|
|||
reply.Session = &proto.Session{}
|
||||
vtgErr := vtg.server.Begin(ctx, reply.Session)
|
||||
vtgate.AddVtGateErrorToBeginResponse(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// Commit2 is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -383,7 +439,10 @@ func (vtg *VTGate) Commit2(ctx context.Context, request *proto.CommitRequest, re
|
|||
callerid.NewImmediateCallerID("gorpc client"))
|
||||
vtgErr := vtg.server.Commit(ctx, request.Session)
|
||||
vtgate.AddVtGateErrorToCommitResponse(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// Rollback2 is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -396,7 +455,10 @@ func (vtg *VTGate) Rollback2(ctx context.Context, request *proto.RollbackRequest
|
|||
callerid.NewImmediateCallerID("gorpc client"))
|
||||
vtgErr := vtg.server.Rollback(ctx, request.Session)
|
||||
vtgate.AddVtGateErrorToRollbackResponse(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// SplitQuery is the RPC version of vtgateservice.VTGateService method
|
||||
|
@ -415,7 +477,10 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *proto.SplitQueryRequ
|
|||
request.SplitCount,
|
||||
reply)
|
||||
vtgate.AddVtGateErrorToSplitQueryResult(vtgErr, reply)
|
||||
return nil
|
||||
if *vtgate.RPCErrorOnlyInReply {
|
||||
return nil
|
||||
}
|
||||
return vtgErr
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is the RPC version of vtgateservice.VTGateService method
|
||||
|
|
|
@ -97,7 +97,7 @@ var RegisterVTGates []RegisterVTGate
|
|||
|
||||
var (
|
||||
// RPCErrorOnlyInReply informs vtgateservice(s) about how to return errors.
|
||||
RPCErrorOnlyInReply = flag.Bool("rpc-error-only-in-reply", true, "if true, supported RPC calls from vtgateservice(s) will only return errors as part of the RPC server response")
|
||||
RPCErrorOnlyInReply = flag.Bool("rpc-error-only-in-reply", false, "if true, supported RPC calls from vtgateservice(s) will only return errors as part of the RPC server response")
|
||||
)
|
||||
|
||||
// Init initializes VTGate server.
|
||||
|
|
|
@ -30,22 +30,26 @@ def setUpModule():
|
|||
global vtgateclienttest_port
|
||||
global vtgateclienttest_grpc_port
|
||||
|
||||
environment.topo_server().setup()
|
||||
try:
|
||||
environment.topo_server().setup()
|
||||
|
||||
vtgateclienttest_port = environment.reserve_ports(1)
|
||||
args = environment.binary_args('vtgateclienttest') + [
|
||||
'-log_dir', environment.vtlogroot,
|
||||
'-port', str(vtgateclienttest_port),
|
||||
]
|
||||
vtgateclienttest_port = environment.reserve_ports(1)
|
||||
args = environment.binary_args('vtgateclienttest') + [
|
||||
'-log_dir', environment.vtlogroot,
|
||||
'-port', str(vtgateclienttest_port),
|
||||
]
|
||||
|
||||
if protocols_flavor().vtgate_python_protocol() == 'grpc':
|
||||
vtgateclienttest_grpc_port = environment.reserve_ports(1)
|
||||
args.extend(['-grpc_port', str(vtgateclienttest_grpc_port)])
|
||||
if protocols_flavor().service_map():
|
||||
args.extend(['-service_map', ','.join(protocols_flavor().service_map())])
|
||||
if protocols_flavor().vtgate_python_protocol() == 'grpc':
|
||||
vtgateclienttest_grpc_port = environment.reserve_ports(1)
|
||||
args.extend(['-grpc_port', str(vtgateclienttest_grpc_port)])
|
||||
if protocols_flavor().service_map():
|
||||
args.extend(['-service_map', ','.join(protocols_flavor().service_map())])
|
||||
|
||||
vtgateclienttest_process = utils.run_bg(args)
|
||||
utils.wait_for_vars('vtgateclienttest', vtgateclienttest_port)
|
||||
vtgateclienttest_process = utils.run_bg(args)
|
||||
utils.wait_for_vars('vtgateclienttest', vtgateclienttest_port)
|
||||
except:
|
||||
tearDownModule()
|
||||
raise
|
||||
|
||||
|
||||
def tearDownModule():
|
||||
|
|
Загрузка…
Ссылка в новой задаче