Removing flag that combines begin+execute.

It is now the default. The RPC endpoints exist since before 2.0 GA was released.
This commit is contained in:
Alain Jobart 2016-11-17 10:06:56 -08:00
Родитель e5fab6b8e4
Коммит fa619007b5
4 изменённых файлов: 20 добавлений и 107 удалений

Просмотреть файл

@ -27,11 +27,10 @@ import (
const protocolName = "grpc"
var (
cert = flag.String("tablet_grpc_cert", "", "the cert to use to connect")
key = flag.String("tablet_grpc_key", "", "the key to use to connect")
ca = flag.String("tablet_grpc_ca", "", "the server ca to use to validate servers when connecting")
name = flag.String("tablet_grpc_server_name", "", "the server name to use to validate server certificate")
combo = flag.Bool("tablet_grpc_combine_begin_execute", false, "combines Begin and Execute / ExecuteBatch calls in one when possible")
cert = flag.String("tablet_grpc_cert", "", "the cert to use to connect")
key = flag.String("tablet_grpc_key", "", "the key to use to connect")
ca = flag.String("tablet_grpc_ca", "", "the server ca to use to validate servers when connecting")
name = flag.String("tablet_grpc_server_name", "", "the server name to use to validate server certificate")
)
func init() {
@ -437,52 +436,21 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T
return nil, 0, err
}
if *combo {
// If combo is enabled, we combine both calls
req := &querypb.BeginExecuteRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Query: q,
Options: options,
}
reply, err := conn.c.BeginExecute(ctx, req)
if err != nil {
return nil, 0, tabletconn.TabletErrorFromGRPC(err)
}
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.TabletErrorFromRPCError(reply.Error)
}
return sqltypes.Proto3ToResult(reply.Result), reply.TransactionId, nil
}
// Begin part.
breq := &querypb.BeginRequest{
req := &querypb.BeginExecuteRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Query: q,
Options: options,
}
br, err := conn.c.Begin(ctx, breq)
reply, err := conn.c.BeginExecute(ctx, req)
if err != nil {
return nil, 0, tabletconn.TabletErrorFromGRPC(err)
}
transactionID = br.TransactionId
// Execute part.
ereq := &querypb.ExecuteRequest{
Target: target,
EffectiveCallerId: breq.EffectiveCallerId,
ImmediateCallerId: breq.ImmediateCallerId,
Query: q,
TransactionId: transactionID,
Options: options,
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.TabletErrorFromRPCError(reply.Error)
}
er, err := conn.c.Execute(ctx, ereq)
if err != nil {
return nil, transactionID, tabletconn.TabletErrorFromGRPC(err)
}
return sqltypes.Proto3ToResult(er.Result), transactionID, nil
return sqltypes.Proto3ToResult(reply.Result), reply.TransactionId, nil
}
// BeginExecuteBatch starts a transaction and runs an ExecuteBatch.
@ -493,52 +461,12 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer
return nil, 0, tabletconn.ConnClosed
}
if *combo {
// If combo is enabled, we combine both calls
req := &querypb.BeginExecuteBatchRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
Queries: make([]*querypb.BoundQuery, len(queries)),
AsTransaction: asTransaction,
Options: options,
}
for i, q := range queries {
qq, err := querytypes.BoundQueryToProto3(q.Sql, q.BindVariables)
if err != nil {
return nil, transactionID, err
}
req.Queries[i] = qq
}
reply, err := conn.c.BeginExecuteBatch(ctx, req)
if err != nil {
return nil, 0, tabletconn.TabletErrorFromGRPC(err)
}
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.TabletErrorFromRPCError(reply.Error)
}
return sqltypes.Proto3ToResults(reply.Results), reply.TransactionId, nil
}
breq := &querypb.BeginRequest{
req := &querypb.BeginExecuteBatchRequest{
Target: target,
EffectiveCallerId: callerid.EffectiveCallerIDFromContext(ctx),
ImmediateCallerId: callerid.ImmediateCallerIDFromContext(ctx),
}
br, err := conn.c.Begin(ctx, breq)
if err != nil {
return nil, 0, tabletconn.TabletErrorFromGRPC(err)
}
transactionID = br.TransactionId
ereq := &querypb.ExecuteBatchRequest{
Target: target,
EffectiveCallerId: breq.EffectiveCallerId,
ImmediateCallerId: breq.ImmediateCallerId,
Queries: make([]*querypb.BoundQuery, len(queries)),
AsTransaction: asTransaction,
TransactionId: transactionID,
Options: options,
}
for i, q := range queries {
@ -546,13 +474,17 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer
if err != nil {
return nil, transactionID, err
}
ereq.Queries[i] = qq
req.Queries[i] = qq
}
ebr, err := conn.c.ExecuteBatch(ctx, ereq)
reply, err := conn.c.BeginExecuteBatch(ctx, req)
if err != nil {
return nil, transactionID, tabletconn.TabletErrorFromGRPC(err)
return nil, 0, tabletconn.TabletErrorFromGRPC(err)
}
return sqltypes.Proto3ToResults(ebr.Results), transactionID, nil
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.TabletErrorFromRPCError(reply.Error)
}
return sqltypes.Proto3ToResults(reply.Results), reply.TransactionId, nil
}
// SplitQuery is the stub for TabletServer.SplitQuery RPC

Просмотреть файл

@ -44,17 +44,4 @@ func TestGRPCTabletConn(t *testing.T) {
"grpc": int32(port),
},
}, service)
// run it again with combo enabled
t.Log("Enabling combo Begin / Execute{,Batch}")
*combo = true
tabletconntest.TestSuite(t, protocolName, &topodatapb.Tablet{
Keyspace: tabletconntest.TestTarget.Keyspace,
Shard: tabletconntest.TestTarget.Shard,
Type: tabletconntest.TestTarget.TabletType,
Hostname: host,
PortMap: map[string]int32{
"grpc": int32(port),
},
}, service)
}

Просмотреть файл

@ -72,10 +72,6 @@ func TestGRPCDiscovery(t *testing.T) {
// run the test suite.
TestSuite(t, "discovery-grpc", dg, service)
// run it again with vtgate combining Begin and Execute
flag.Set("tablet_grpc_combine_begin_execute", "true")
TestSuite(t, "discovery-grpc-combo", dg, service)
}
// TestL2VTGateDiscovery tests the l2vtgate gateway with a gRPC

Просмотреть файл

@ -547,7 +547,6 @@ class VtGate(object):
'-log_dir', environment.vtlogroot,
'-srv_topo_cache_ttl', cache_ttl,
'-tablet_protocol', protocols_flavor().tabletconn_protocol(),
'-tablet_grpc_combine_begin_execute',
]
if l2vtgates:
args.extend([
@ -741,7 +740,6 @@ class L2VtGate(object):
'-healthcheck_conn_timeout', healthcheck_conn_timeout,
'-tablet_protocol', protocols_flavor().tabletconn_protocol(),
'-gateway_implementation', vtgate_gateway_flavor().flavor(),
'-tablet_grpc_combine_begin_execute',
]
args.extend(vtgate_gateway_flavor().flags(cell=cell, tablets=tablets))
if tablet_types_to_wait: