зеркало из https://github.com/github/vitess-gh.git
Adding GetSrvKeyspace to vtgate API.
This commit is contained in:
Родитель
6565b96218
Коммит
766c52b88e
|
@ -10,6 +10,7 @@ import (
|
|||
log "github.com/golang/glog"
|
||||
|
||||
"github.com/youtube/vitess/go/tb"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
"golang.org/x/net/context"
|
||||
|
@ -115,6 +116,13 @@ func (c *errorClient) SplitQuery(ctx context.Context, req *proto.SplitQueryReque
|
|||
return c.fallback.SplitQuery(ctx, req, reply)
|
||||
}
|
||||
|
||||
func (c *errorClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
if keyspace == "error" {
|
||||
return nil, fmt.Errorf("vtgate test client, errorClient.GetSrvKeyspace returning error")
|
||||
}
|
||||
return c.fallback.GetSrvKeyspace(ctx, keyspace)
|
||||
}
|
||||
|
||||
func (c *errorClient) HandlePanic(err *error) {
|
||||
if x := recover(); x != nil {
|
||||
log.Errorf("Uncaught panic:\n%v\n%s", x, tb.Stack(4))
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
log "github.com/golang/glog"
|
||||
|
||||
"github.com/youtube/vitess/go/tb"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -85,6 +86,10 @@ func (c *terminalClient) SplitQuery(ctx context.Context, req *proto.SplitQueryRe
|
|||
return errTerminal
|
||||
}
|
||||
|
||||
func (c *terminalClient) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return nil, errTerminal
|
||||
}
|
||||
|
||||
func (c *terminalClient) HandlePanic(err *error) {
|
||||
if x := recover(); x != nil {
|
||||
log.Errorf("Uncaught panic:\n%v\n%s", x, tb.Stack(4))
|
||||
|
|
|
@ -131,6 +131,11 @@ func (f *fakeVTGateService) SplitQuery(ctx context.Context, req *proto.SplitQuer
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is part of the VTGateService interface
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return &topo.SrvKeyspace{}, nil
|
||||
}
|
||||
|
||||
// HandlePanic is part of the VTGateService interface
|
||||
func (f *fakeVTGateService) HandlePanic(err *error) {
|
||||
if x := recover(); x != nil {
|
||||
|
|
|
@ -42,6 +42,8 @@ It has these top-level messages:
|
|||
RollbackResponse
|
||||
SplitQueryRequest
|
||||
SplitQueryResponse
|
||||
GetSrvKeyspaceRequest
|
||||
GetSrvKeyspaceResponse
|
||||
*/
|
||||
package vtgate
|
||||
|
||||
|
@ -1084,6 +1086,31 @@ func (m *SplitQueryResponse_Part) GetShardPart() *SplitQueryResponse_ShardPart {
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace
|
||||
type GetSrvKeyspaceRequest struct {
|
||||
Keyspace string `protobuf:"bytes,1,opt,name=keyspace" json:"keyspace,omitempty"`
|
||||
}
|
||||
|
||||
func (m *GetSrvKeyspaceRequest) Reset() { *m = GetSrvKeyspaceRequest{} }
|
||||
func (m *GetSrvKeyspaceRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*GetSrvKeyspaceRequest) ProtoMessage() {}
|
||||
|
||||
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace
|
||||
type GetSrvKeyspaceResponse struct {
|
||||
SrvKeyspace *topodata.SrvKeyspace `protobuf:"bytes,1,opt,name=srv_keyspace" json:"srv_keyspace,omitempty"`
|
||||
}
|
||||
|
||||
func (m *GetSrvKeyspaceResponse) Reset() { *m = GetSrvKeyspaceResponse{} }
|
||||
func (m *GetSrvKeyspaceResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*GetSrvKeyspaceResponse) ProtoMessage() {}
|
||||
|
||||
func (m *GetSrvKeyspaceResponse) GetSrvKeyspace() *topodata.SrvKeyspace {
|
||||
if m != nil {
|
||||
return m.SrvKeyspace
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterEnum("vtgate.ExecuteEntityIdsRequest_EntityId_Type", ExecuteEntityIdsRequest_EntityId_Type_name, ExecuteEntityIdsRequest_EntityId_Type_value)
|
||||
}
|
||||
|
|
|
@ -65,6 +65,14 @@ type VitessClient interface {
|
|||
Rollback(ctx context.Context, in *vtgate.RollbackRequest, opts ...grpc.CallOption) (*vtgate.RollbackResponse, error)
|
||||
// Split a query into non-overlapping sub queries
|
||||
SplitQuery(ctx context.Context, in *vtgate.SplitQueryRequest, opts ...grpc.CallOption) (*vtgate.SplitQueryResponse, error)
|
||||
// GetSrvKeyspace returns a SrvKeyspace object (as seen by this vtgate).
|
||||
// This method is provided as a convenient way for clients to take a
|
||||
// look at the sharding configuration for a Keyspace. Looking at the
|
||||
// sharding information should not be used for routing queries (as the
|
||||
// information may change, use the Execute calls for that).
|
||||
// It is convenient for monitoring applications for instance, or if
|
||||
// using custom sharding.
|
||||
GetSrvKeyspace(ctx context.Context, in *vtgate.GetSrvKeyspaceRequest, opts ...grpc.CallOption) (*vtgate.GetSrvKeyspaceResponse, error)
|
||||
}
|
||||
|
||||
type vitessClient struct {
|
||||
|
@ -302,6 +310,15 @@ func (c *vitessClient) SplitQuery(ctx context.Context, in *vtgate.SplitQueryRequ
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func (c *vitessClient) GetSrvKeyspace(ctx context.Context, in *vtgate.GetSrvKeyspaceRequest, opts ...grpc.CallOption) (*vtgate.GetSrvKeyspaceResponse, error) {
|
||||
out := new(vtgate.GetSrvKeyspaceResponse)
|
||||
err := grpc.Invoke(ctx, "/vtgateservice.Vitess/GetSrvKeyspace", in, out, c.cc, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Server API for Vitess service
|
||||
|
||||
type VitessServer interface {
|
||||
|
@ -340,6 +357,14 @@ type VitessServer interface {
|
|||
Rollback(context.Context, *vtgate.RollbackRequest) (*vtgate.RollbackResponse, error)
|
||||
// Split a query into non-overlapping sub queries
|
||||
SplitQuery(context.Context, *vtgate.SplitQueryRequest) (*vtgate.SplitQueryResponse, error)
|
||||
// GetSrvKeyspace returns a SrvKeyspace object (as seen by this vtgate).
|
||||
// This method is provided as a convenient way for clients to take a
|
||||
// look at the sharding configuration for a Keyspace. Looking at the
|
||||
// sharding information should not be used for routing queries (as the
|
||||
// information may change, use the Execute calls for that).
|
||||
// It is convenient for monitoring applications for instance, or if
|
||||
// using custom sharding.
|
||||
GetSrvKeyspace(context.Context, *vtgate.GetSrvKeyspaceRequest) (*vtgate.GetSrvKeyspaceResponse, error)
|
||||
}
|
||||
|
||||
func RegisterVitessServer(s *grpc.Server, srv VitessServer) {
|
||||
|
@ -562,6 +587,18 @@ func _Vitess_SplitQuery_Handler(srv interface{}, ctx context.Context, codec grpc
|
|||
return out, nil
|
||||
}
|
||||
|
||||
func _Vitess_GetSrvKeyspace_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
|
||||
in := new(vtgate.GetSrvKeyspaceRequest)
|
||||
if err := codec.Unmarshal(buf, in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out, err := srv.(VitessServer).GetSrvKeyspace(ctx, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var _Vitess_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "vtgateservice.Vitess",
|
||||
HandlerType: (*VitessServer)(nil),
|
||||
|
@ -610,6 +647,10 @@ var _Vitess_serviceDesc = grpc.ServiceDesc{
|
|||
MethodName: "SplitQuery",
|
||||
Handler: _Vitess_SplitQuery_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "GetSrvKeyspace",
|
||||
Handler: _Vitess_GetSrvKeyspace_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
|
|
|
@ -85,3 +85,60 @@ func ProtoToTablet(t *pb.Tablet) *Tablet {
|
|||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// SrvKeyspaceToProto turns a Tablet into a proto
|
||||
func SrvKeyspaceToProto(s *SrvKeyspace) *pb.SrvKeyspace {
|
||||
result := &pb.SrvKeyspace{
|
||||
ShardingColumnName: s.ShardingColumnName,
|
||||
ShardingColumnType: key.KeyspaceIdTypeToProto(s.ShardingColumnType),
|
||||
SplitShardCount: s.SplitShardCount,
|
||||
}
|
||||
for tt, p := range s.Partitions {
|
||||
partition := &pb.SrvKeyspace_KeyspacePartition{
|
||||
ServedType: TabletTypeToProto(tt),
|
||||
}
|
||||
for _, sr := range p.ShardReferences {
|
||||
partition.ShardReferences = append(partition.ShardReferences, &pb.ShardReference{
|
||||
Name: sr.Name,
|
||||
KeyRange: key.KeyRangeToProto(sr.KeyRange),
|
||||
})
|
||||
}
|
||||
result.Partitions = append(result.Partitions, partition)
|
||||
}
|
||||
for tt, k := range s.ServedFrom {
|
||||
result.ServedFrom = append(result.ServedFrom, &pb.SrvKeyspace_ServedFrom{
|
||||
TabletType: TabletTypeToProto(tt),
|
||||
Keyspace: k,
|
||||
})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// ProtoToSrvKeyspace turns a proto to a Tablet
|
||||
func ProtoToSrvKeyspace(s *pb.SrvKeyspace) *SrvKeyspace {
|
||||
result := &SrvKeyspace{
|
||||
Partitions: make(map[TabletType]*KeyspacePartition),
|
||||
ShardingColumnName: s.ShardingColumnName,
|
||||
ShardingColumnType: key.ProtoToKeyspaceIdType(s.ShardingColumnType),
|
||||
SplitShardCount: s.SplitShardCount,
|
||||
}
|
||||
for _, p := range s.Partitions {
|
||||
tt := ProtoToTabletType(p.ServedType)
|
||||
partition := &KeyspacePartition{}
|
||||
for _, sr := range p.ShardReferences {
|
||||
partition.ShardReferences = append(partition.ShardReferences, ShardReference{
|
||||
Name: sr.Name,
|
||||
KeyRange: key.ProtoToKeyRange(sr.KeyRange),
|
||||
})
|
||||
}
|
||||
result.Partitions[tt] = partition
|
||||
}
|
||||
if len(s.ServedFrom) > 0 {
|
||||
result.ServedFrom = make(map[TabletType]string)
|
||||
for _, sf := range s.ServedFrom {
|
||||
tt := ProtoToTabletType(sf.TabletType)
|
||||
result.ServedFrom[tt] = sf.Keyspace
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
|
|
@ -278,6 +278,11 @@ func (conn *FakeVTGateConn) SplitQuery(ctx context.Context, keyspace string, que
|
|||
return reply, nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace please see vtgateconn.Impl.SplitQuery
|
||||
func (conn *FakeVTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return nil, fmt.Errorf("NYI")
|
||||
}
|
||||
|
||||
// Close please see vtgateconn.Impl.Close
|
||||
func (conn *FakeVTGateConn) Close() {
|
||||
}
|
||||
|
|
|
@ -366,6 +366,15 @@ func (conn *vtgateConn) SplitQuery(ctx context.Context, keyspace string, query t
|
|||
return result.Splits, nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
request := &keyspace
|
||||
result := &topo.SrvKeyspace{}
|
||||
if err := conn.rpcConn.Call(ctx, "VTGate.GetSrvKeyspace", request, result); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) Close() {
|
||||
conn.rpcConn.Close()
|
||||
}
|
||||
|
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
"github.com/youtube/vitess/go/vt/rpc"
|
||||
"github.com/youtube/vitess/go/vt/servenv"
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/vtgateservice"
|
||||
|
@ -230,6 +231,19 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest,
|
|||
return vtgErr
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is the RPC version of vtgateservice.VTGateService method
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace *string, reply *topo.SrvKeyspace) (err error) {
|
||||
defer vtg.server.HandlePanic(&err)
|
||||
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
|
||||
defer cancel()
|
||||
ks, err := vtg.server.GetSrvKeyspace(ctx, *keyspace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
*reply = *ks
|
||||
return nil
|
||||
}
|
||||
|
||||
// New returns a new VTGate service
|
||||
func New(vtGate vtgateservice.VTGateService) *VTGate {
|
||||
return &VTGate{vtGate}
|
||||
|
|
|
@ -409,6 +409,17 @@ func (conn *vtgateConn) SplitQuery(ctx context.Context, keyspace string, query t
|
|||
return proto.ProtoToSplitQueryParts(response), nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
request := &pb.GetSrvKeyspaceRequest{
|
||||
Keyspace: keyspace,
|
||||
}
|
||||
response, err := conn.c.GetSrvKeyspace(ctx, request)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return topo.ProtoToSrvKeyspace(response.SrvKeyspace), nil
|
||||
}
|
||||
|
||||
func (conn *vtgateConn) Close() {
|
||||
conn.cc.Close()
|
||||
}
|
||||
|
|
|
@ -361,6 +361,18 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *pb.SplitQueryRequest
|
|||
return proto.SplitQueryPartsToProto(reply.Splits), nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is the RPC version of vtgateservice.VTGateService method
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, request *pb.GetSrvKeyspaceRequest) (response *pb.GetSrvKeyspaceResponse, err error) {
|
||||
defer vtg.server.HandlePanic(&err)
|
||||
sk, err := vtg.server.GetSrvKeyspace(ctx, request.Keyspace)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &pb.GetSrvKeyspaceResponse{
|
||||
SrvKeyspace: topo.SrvKeyspaceToProto(sk),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
vtgate.RegisterVTGates = append(vtgate.RegisterVTGates, func(vtGate vtgateservice.VTGateService) {
|
||||
if servenv.GRPCCheckServiceMap("vtgateservice") {
|
||||
|
|
|
@ -563,6 +563,11 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest,
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is part of the vtgate service API.
|
||||
func (vtg *VTGate) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return vtg.router.serv.GetSrvKeyspace(ctx, vtg.router.cell, keyspace)
|
||||
}
|
||||
|
||||
// Any errors that are caused by VTGate dependencies (e.g, VtTablet) should be logged
|
||||
// as errors in those components, but logged to Info in VTGate itself.
|
||||
func logError(err error, query interface{}, logger *logutil.ThrottledLogger) {
|
||||
|
|
|
@ -166,6 +166,11 @@ func (conn *VTGateConn) SplitQuery(ctx context.Context, keyspace string, query t
|
|||
return conn.impl.SplitQuery(ctx, keyspace, query, splitColumn, splitCount)
|
||||
}
|
||||
|
||||
// GetSrvKeyspace returns a topo.SrvKeyspace object.
|
||||
func (conn *VTGateConn) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
return conn.impl.GetSrvKeyspace(ctx, keyspace)
|
||||
}
|
||||
|
||||
// VTGateTx defines an ongoing transaction.
|
||||
// It should not be concurrently used across goroutines.
|
||||
type VTGateTx struct {
|
||||
|
@ -348,6 +353,9 @@ type Impl interface {
|
|||
// appending primary key range clauses to the original query.
|
||||
SplitQuery(ctx context.Context, keyspace string, query tproto.BoundQuery, splitColumn string, splitCount int) ([]proto.SplitQueryPart, error)
|
||||
|
||||
// GetSrvKeyspace returns a topo.SrvKeyspace.
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error)
|
||||
|
||||
// Close must be called for releasing resources.
|
||||
Close()
|
||||
}
|
||||
|
|
|
@ -375,6 +375,20 @@ func (f *fakeVTGateService) SplitQuery(ctx context.Context, req *proto.SplitQuer
|
|||
return nil
|
||||
}
|
||||
|
||||
// GetSrvKeyspace is part of the VTGateService interface
|
||||
func (f *fakeVTGateService) GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error) {
|
||||
if f.hasError {
|
||||
return nil, testVtGateError
|
||||
}
|
||||
if f.panics {
|
||||
panic(fmt.Errorf("test forced panic"))
|
||||
}
|
||||
if keyspace != getSrvKeyspaceKeyspace {
|
||||
f.t.Errorf("GetSrvKeyspace has wrong input: got %v wanted %v", keyspace, getSrvKeyspaceKeyspace)
|
||||
}
|
||||
return getSrvKeyspaceResult, nil
|
||||
}
|
||||
|
||||
// CreateFakeServer returns the fake server for the tests
|
||||
func CreateFakeServer(t *testing.T) vtgateservice.VTGateService {
|
||||
return &fakeVTGateService{
|
||||
|
@ -418,6 +432,7 @@ func TestSuite(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGa
|
|||
testTx2PassNotInTransaction(t, conn)
|
||||
testTx2Fail(t, conn)
|
||||
testSplitQuery(t, conn)
|
||||
testGetSrvKeyspace(t, conn)
|
||||
|
||||
// return an error for every call, make sure they're handled properly
|
||||
fakeServer.(*fakeVTGateService).hasError = true
|
||||
|
@ -445,6 +460,7 @@ func TestSuite(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGa
|
|||
testCommit2Error(t, conn)
|
||||
testRollback2Error(t, conn)
|
||||
testSplitQueryError(t, conn)
|
||||
testGetSrvKeyspaceError(t, conn)
|
||||
fakeServer.(*fakeVTGateService).hasError = false
|
||||
|
||||
// force a panic at every call, then test that works
|
||||
|
@ -473,6 +489,7 @@ func TestSuite(t *testing.T, impl vtgateconn.Impl, fakeServer vtgateservice.VTGa
|
|||
testCommit2Panic(t, conn)
|
||||
testRollback2Panic(t, conn)
|
||||
testSplitQueryPanic(t, conn)
|
||||
testGetSrvKeyspacePanic(t, conn)
|
||||
fakeServer.(*fakeVTGateService).panics = false
|
||||
}
|
||||
|
||||
|
@ -1590,8 +1607,8 @@ func testSplitQuery(t *testing.T, conn *vtgateconn.VTGateConn) {
|
|||
t.Fatalf("SplitQuery failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(qsl, splitQueryResult.Splits) {
|
||||
t.Errorf("SplitQuery returned worng result: got %+v wanted %+v", qsl, splitQueryResult.Splits)
|
||||
t.Errorf("SplitQuery returned worng result: got %+v wanted %+v", qsl[0].Query, splitQueryResult.Splits[0].Query)
|
||||
t.Errorf("SplitQuery returned wrong result: got %+v wanted %+v", qsl, splitQueryResult.Splits)
|
||||
t.Errorf("SplitQuery returned wrong result: got %+v wanted %+v", qsl[0].Query, splitQueryResult.Splits[0].Query)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1607,6 +1624,29 @@ func testSplitQueryPanic(t *testing.T, conn *vtgateconn.VTGateConn) {
|
|||
expectPanic(t, err)
|
||||
}
|
||||
|
||||
func testGetSrvKeyspace(t *testing.T, conn *vtgateconn.VTGateConn) {
|
||||
ctx := context.Background()
|
||||
sk, err := conn.GetSrvKeyspace(ctx, getSrvKeyspaceKeyspace)
|
||||
if err != nil {
|
||||
t.Fatalf("GetSrvKeyspace failed: %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(sk, getSrvKeyspaceResult) {
|
||||
t.Errorf("GetSrvKeyspace returned wrong result: got %+v wanted %+v", sk, getSrvKeyspaceResult)
|
||||
}
|
||||
}
|
||||
|
||||
func testGetSrvKeyspaceError(t *testing.T, conn *vtgateconn.VTGateConn) {
|
||||
ctx := context.Background()
|
||||
_, err := conn.GetSrvKeyspace(ctx, getSrvKeyspaceKeyspace)
|
||||
verifyError(t, err, "GetSrvKeyspace")
|
||||
}
|
||||
|
||||
func testGetSrvKeyspacePanic(t *testing.T, conn *vtgateconn.VTGateConn) {
|
||||
ctx := context.Background()
|
||||
_, err := conn.GetSrvKeyspace(ctx, getSrvKeyspaceKeyspace)
|
||||
expectPanic(t, err)
|
||||
}
|
||||
|
||||
var execMap = map[string]struct {
|
||||
execQuery *proto.Query
|
||||
shardQuery *proto.QueryShard
|
||||
|
@ -2099,3 +2139,27 @@ var splitQueryResult = &proto.SplitQueryResult{
|
|||
},
|
||||
},
|
||||
}
|
||||
|
||||
var getSrvKeyspaceKeyspace = "test_keyspace"
|
||||
|
||||
var getSrvKeyspaceResult = &topo.SrvKeyspace{
|
||||
Partitions: map[topo.TabletType]*topo.KeyspacePartition{
|
||||
topo.TYPE_REPLICA: &topo.KeyspacePartition{
|
||||
ShardReferences: []topo.ShardReference{
|
||||
topo.ShardReference{
|
||||
Name: "shard0",
|
||||
KeyRange: key.KeyRange{
|
||||
Start: key.KeyspaceId("s"),
|
||||
End: key.KeyspaceId("e"),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
ShardingColumnName: "sharding_column_name",
|
||||
ShardingColumnType: key.KIT_UINT64,
|
||||
ServedFrom: map[topo.TabletType]string{
|
||||
topo.TYPE_MASTER: "other_keyspace",
|
||||
},
|
||||
SplitShardCount: 128,
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@
|
|||
package vtgateservice
|
||||
|
||||
import (
|
||||
"github.com/youtube/vitess/go/vt/topo"
|
||||
"github.com/youtube/vitess/go/vt/vtgate/proto"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
@ -37,6 +38,9 @@ type VTGateService interface {
|
|||
// Map Reduce support
|
||||
SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error
|
||||
|
||||
// Topology support
|
||||
GetSrvKeyspace(ctx context.Context, keyspace string) (*topo.SrvKeyspace, error)
|
||||
|
||||
// HandlePanic should be called with defer at the beginning of each
|
||||
// RPC implementation method, before calling any of the previous methods
|
||||
HandlePanic(err *error)
|
||||
|
|
|
@ -293,3 +293,13 @@ message SplitQueryResponse {
|
|||
}
|
||||
repeated Part splits = 1;
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceRequest is the payload to GetSrvKeyspace
|
||||
message GetSrvKeyspaceRequest {
|
||||
string keyspace = 1;
|
||||
}
|
||||
|
||||
// GetSrvKeyspaceResponse is the returned value from GetSrvKeyspace
|
||||
message GetSrvKeyspaceResponse {
|
||||
topodata.SrvKeyspace srv_keyspace = 1;
|
||||
}
|
||||
|
|
|
@ -57,4 +57,13 @@ service Vitess {
|
|||
|
||||
// Split a query into non-overlapping sub queries
|
||||
rpc SplitQuery(vtgate.SplitQueryRequest) returns (vtgate.SplitQueryResponse) {};
|
||||
|
||||
// GetSrvKeyspace returns a SrvKeyspace object (as seen by this vtgate).
|
||||
// This method is provided as a convenient way for clients to take a
|
||||
// look at the sharding configuration for a Keyspace. Looking at the
|
||||
// sharding information should not be used for routing queries (as the
|
||||
// information may change, use the Execute calls for that).
|
||||
// It is convenient for monitoring applications for instance, or if
|
||||
// using custom sharding.
|
||||
rpc GetSrvKeyspace(vtgate.GetSrvKeyspaceRequest) returns (vtgate.GetSrvKeyspaceResponse) {};
|
||||
}
|
||||
|
|
Различия файлов скрыты, потому что одна или несколько строк слишком длинны
|
@ -20,7 +20,7 @@ DESCRIPTOR = _descriptor.FileDescriptor(
|
|||
name='vtgateservice.proto',
|
||||
package='vtgateservice',
|
||||
syntax='proto3',
|
||||
serialized_pb=_b('\n\x13vtgateservice.proto\x12\rvtgateservice\x1a\x0cvtgate.proto2\x85\n\n\x06Vitess\x12<\n\x07\x45xecute\x12\x16.vtgate.ExecuteRequest\x1a\x17.vtgate.ExecuteResponse\"\x00\x12N\n\rExecuteShards\x12\x1c.vtgate.ExecuteShardsRequest\x1a\x1d.vtgate.ExecuteShardsResponse\"\x00\x12]\n\x12\x45xecuteKeyspaceIds\x12!.vtgate.ExecuteKeyspaceIdsRequest\x1a\".vtgate.ExecuteKeyspaceIdsResponse\"\x00\x12W\n\x10\x45xecuteKeyRanges\x12\x1f.vtgate.ExecuteKeyRangesRequest\x1a .vtgate.ExecuteKeyRangesResponse\"\x00\x12W\n\x10\x45xecuteEntityIds\x12\x1f.vtgate.ExecuteEntityIdsRequest\x1a .vtgate.ExecuteEntityIdsResponse\"\x00\x12]\n\x12\x45xecuteBatchShards\x12!.vtgate.ExecuteBatchShardsRequest\x1a\".vtgate.ExecuteBatchShardsResponse\"\x00\x12l\n\x17\x45xecuteBatchKeyspaceIds\x12&.vtgate.ExecuteBatchKeyspaceIdsRequest\x1a\'.vtgate.ExecuteBatchKeyspaceIdsResponse\"\x00\x12P\n\rStreamExecute\x12\x1c.vtgate.StreamExecuteRequest\x1a\x1d.vtgate.StreamExecuteResponse\"\x00\x30\x01\x12\x62\n\x13StreamExecuteShards\x12\".vtgate.StreamExecuteShardsRequest\x1a#.vtgate.StreamExecuteShardsResponse\"\x00\x30\x01\x12q\n\x18StreamExecuteKeyspaceIds\x12\'.vtgate.StreamExecuteKeyspaceIdsRequest\x1a(.vtgate.StreamExecuteKeyspaceIdsResponse\"\x00\x30\x01\x12k\n\x16StreamExecuteKeyRanges\x12%.vtgate.StreamExecuteKeyRangesRequest\x1a&.vtgate.StreamExecuteKeyRangesResponse\"\x00\x30\x01\x12\x36\n\x05\x42\x65gin\x12\x14.vtgate.BeginRequest\x1a\x15.vtgate.BeginResponse\"\x00\x12\x39\n\x06\x43ommit\x12\x15.vtgate.CommitRequest\x1a\x16.vtgate.CommitResponse\"\x00\x12?\n\x08Rollback\x12\x17.vtgate.RollbackRequest\x1a\x18.vtgate.RollbackResponse\"\x00\x12\x45\n\nSplitQuery\x12\x19.vtgate.SplitQueryRequest\x1a\x1a.vtgate.SplitQueryResponse\"\x00\x62\x06proto3')
|
||||
serialized_pb=_b('\n\x13vtgateservice.proto\x12\rvtgateservice\x1a\x0cvtgate.proto2\xd8\n\n\x06Vitess\x12<\n\x07\x45xecute\x12\x16.vtgate.ExecuteRequest\x1a\x17.vtgate.ExecuteResponse\"\x00\x12N\n\rExecuteShards\x12\x1c.vtgate.ExecuteShardsRequest\x1a\x1d.vtgate.ExecuteShardsResponse\"\x00\x12]\n\x12\x45xecuteKeyspaceIds\x12!.vtgate.ExecuteKeyspaceIdsRequest\x1a\".vtgate.ExecuteKeyspaceIdsResponse\"\x00\x12W\n\x10\x45xecuteKeyRanges\x12\x1f.vtgate.ExecuteKeyRangesRequest\x1a .vtgate.ExecuteKeyRangesResponse\"\x00\x12W\n\x10\x45xecuteEntityIds\x12\x1f.vtgate.ExecuteEntityIdsRequest\x1a .vtgate.ExecuteEntityIdsResponse\"\x00\x12]\n\x12\x45xecuteBatchShards\x12!.vtgate.ExecuteBatchShardsRequest\x1a\".vtgate.ExecuteBatchShardsResponse\"\x00\x12l\n\x17\x45xecuteBatchKeyspaceIds\x12&.vtgate.ExecuteBatchKeyspaceIdsRequest\x1a\'.vtgate.ExecuteBatchKeyspaceIdsResponse\"\x00\x12P\n\rStreamExecute\x12\x1c.vtgate.StreamExecuteRequest\x1a\x1d.vtgate.StreamExecuteResponse\"\x00\x30\x01\x12\x62\n\x13StreamExecuteShards\x12\".vtgate.StreamExecuteShardsRequest\x1a#.vtgate.StreamExecuteShardsResponse\"\x00\x30\x01\x12q\n\x18StreamExecuteKeyspaceIds\x12\'.vtgate.StreamExecuteKeyspaceIdsRequest\x1a(.vtgate.StreamExecuteKeyspaceIdsResponse\"\x00\x30\x01\x12k\n\x16StreamExecuteKeyRanges\x12%.vtgate.StreamExecuteKeyRangesRequest\x1a&.vtgate.StreamExecuteKeyRangesResponse\"\x00\x30\x01\x12\x36\n\x05\x42\x65gin\x12\x14.vtgate.BeginRequest\x1a\x15.vtgate.BeginResponse\"\x00\x12\x39\n\x06\x43ommit\x12\x15.vtgate.CommitRequest\x1a\x16.vtgate.CommitResponse\"\x00\x12?\n\x08Rollback\x12\x17.vtgate.RollbackRequest\x1a\x18.vtgate.RollbackResponse\"\x00\x12\x45\n\nSplitQuery\x12\x19.vtgate.SplitQueryRequest\x1a\x1a.vtgate.SplitQueryResponse\"\x00\x12Q\n\x0eGetSrvKeyspace\x12\x1d.vtgate.GetSrvKeyspaceRequest\x1a\x1e.vtgate.GetSrvKeyspaceResponse\"\x00\x62\x06proto3')
|
||||
,
|
||||
dependencies=[vtgate__pb2.DESCRIPTOR,])
|
||||
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
|
||||
|
@ -80,6 +80,9 @@ class EarlyAdopterVitessServicer(object):
|
|||
@abc.abstractmethod
|
||||
def SplitQuery(self, request, context):
|
||||
raise NotImplementedError()
|
||||
@abc.abstractmethod
|
||||
def GetSrvKeyspace(self, request, context):
|
||||
raise NotImplementedError()
|
||||
class EarlyAdopterVitessServer(object):
|
||||
"""<fill me in later!>"""
|
||||
__metaclass__ = abc.ABCMeta
|
||||
|
@ -152,6 +155,10 @@ class EarlyAdopterVitessStub(object):
|
|||
def SplitQuery(self, request):
|
||||
raise NotImplementedError()
|
||||
SplitQuery.async = None
|
||||
@abc.abstractmethod
|
||||
def GetSrvKeyspace(self, request):
|
||||
raise NotImplementedError()
|
||||
GetSrvKeyspace.async = None
|
||||
def early_adopter_create_Vitess_server(servicer, port, private_key=None, certificate_chain=None):
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
|
@ -183,6 +190,8 @@ def early_adopter_create_Vitess_server(servicer, port, private_key=None, certifi
|
|||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
method_service_descriptions = {
|
||||
"Begin": utilities.unary_unary_service_description(
|
||||
servicer.Begin,
|
||||
|
@ -229,6 +238,11 @@ def early_adopter_create_Vitess_server(servicer, port, private_key=None, certifi
|
|||
vtgate_pb2.ExecuteShardsRequest.FromString,
|
||||
vtgate_pb2.ExecuteShardsResponse.SerializeToString,
|
||||
),
|
||||
"GetSrvKeyspace": utilities.unary_unary_service_description(
|
||||
servicer.GetSrvKeyspace,
|
||||
vtgate_pb2.GetSrvKeyspaceRequest.FromString,
|
||||
vtgate_pb2.GetSrvKeyspaceResponse.SerializeToString,
|
||||
),
|
||||
"Rollback": utilities.unary_unary_service_description(
|
||||
servicer.Rollback,
|
||||
vtgate_pb2.RollbackRequest.FromString,
|
||||
|
@ -292,6 +306,8 @@ def early_adopter_create_Vitess_stub(host, port, metadata_transformer=None, secu
|
|||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
import vtgate_pb2
|
||||
method_invocation_descriptions = {
|
||||
"Begin": utilities.unary_unary_invocation_description(
|
||||
vtgate_pb2.BeginRequest.SerializeToString,
|
||||
|
@ -329,6 +345,10 @@ def early_adopter_create_Vitess_stub(host, port, metadata_transformer=None, secu
|
|||
vtgate_pb2.ExecuteShardsRequest.SerializeToString,
|
||||
vtgate_pb2.ExecuteShardsResponse.FromString,
|
||||
),
|
||||
"GetSrvKeyspace": utilities.unary_unary_invocation_description(
|
||||
vtgate_pb2.GetSrvKeyspaceRequest.SerializeToString,
|
||||
vtgate_pb2.GetSrvKeyspaceResponse.FromString,
|
||||
),
|
||||
"Rollback": utilities.unary_unary_invocation_description(
|
||||
vtgate_pb2.RollbackRequest.SerializeToString,
|
||||
vtgate_pb2.RollbackResponse.FromString,
|
||||
|
|
Загрузка…
Ссылка в новой задаче