Moving gorpc specific callerid structs into its own package.

This commit is contained in:
Alain Jobart 2015-11-30 13:11:25 -08:00
Родитель ad4cc365f5
Коммит 933b136ff2
8 изменённых файлов: 82 добавлений и 133 удалений

Просмотреть файл

@ -1,25 +0,0 @@
package callerid
import (
querypb "github.com/youtube/vitess/go/vt/proto/query"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
// GoRPCImmediateCallerID creates new ImmediateCallerID(querypb.VTGateCallerID)
// from GoRPC's VTGateCallerID
func GoRPCImmediateCallerID(v *proto.VTGateCallerID) *querypb.VTGateCallerID {
if v == nil {
return nil
}
return NewImmediateCallerID(v.Username)
}
// GoRPCEffectiveCallerID creates new EffectiveCallerID(vtrpcpb.CallerID)
// from GoRPC's CallerID
func GoRPCEffectiveCallerID(c *proto.CallerID) *vtrpcpb.CallerID {
if c == nil {
return nil
}
return NewEffectiveCallerID(c.Principal, c.Component, c.Subcomponent)
}

Просмотреть файл

@ -0,0 +1,38 @@
package gorpccallerid
import (
"github.com/youtube/vitess/go/vt/callerid"
querypb "github.com/youtube/vitess/go/vt/proto/query"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
// CallerID is the BSON implementation of the proto3 vtrpc.CallerID
type CallerID struct {
Principal string
Component string
Subcomponent string
}
// VTGateCallerID is the BSON implementation of the proto3 query.VTGateCallerID
type VTGateCallerID struct {
Username string
}
// GoRPCImmediateCallerID creates new ImmediateCallerID(querypb.VTGateCallerID)
// from GoRPC's VTGateCallerID
func GoRPCImmediateCallerID(v *VTGateCallerID) *querypb.VTGateCallerID {
if v == nil {
return nil
}
return callerid.NewImmediateCallerID(v.Username)
}
// GoRPCEffectiveCallerID creates new EffectiveCallerID(vtrpcpb.CallerID)
// from GoRPC's CallerID
func GoRPCEffectiveCallerID(c *CallerID) *vtrpcpb.CallerID {
if c == nil {
return nil
}
return callerid.NewEffectiveCallerID(c.Principal, c.Component, c.Subcomponent)
}

Просмотреть файл

@ -1,19 +1,19 @@
package callerid
package gorpccallerid
import (
"testing"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/callerid"
)
func TestGoRPCCallerID(t *testing.T) {
im := proto.VTGateCallerID{
Username: FakeUsername,
im := VTGateCallerID{
Username: callerid.FakeUsername,
}
ef := proto.CallerID{
Principal: FakePrincipal,
Component: FakeComponent,
Subcomponent: FakeSubcomponent,
ef := CallerID{
Principal: callerid.FakePrincipal,
Component: callerid.FakeComponent,
Subcomponent: callerid.FakeSubcomponent,
}
// Test nil cases
if n := GoRPCImmediateCallerID(nil); n != nil {
@ -22,5 +22,5 @@ func TestGoRPCCallerID(t *testing.T) {
if n := GoRPCEffectiveCallerID(nil); n != nil {
t.Errorf("Expect nil from GoRPCEffectiveCallerID(nil), but got %v", n)
}
Tests(t, GoRPCImmediateCallerID(&im), GoRPCEffectiveCallerID(&ef))
callerid.Tests(t, GoRPCImmediateCallerID(&im), GoRPCEffectiveCallerID(&ef))
}

Просмотреть файл

@ -1,53 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
// DO NOT EDIT.
// FILE GENERATED BY BSONGEN.
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
)
// MarshalBson bson-encodes CallerID.
func (callerID *CallerID) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
bson.EncodeString(buf, "Principal", callerID.Principal)
bson.EncodeString(buf, "Component", callerID.Component)
bson.EncodeString(buf, "Subcomponent", callerID.Subcomponent)
lenWriter.Close()
}
// UnmarshalBson bson-decodes into CallerID.
func (callerID *CallerID) UnmarshalBson(buf *bytes.Buffer, kind byte) {
switch kind {
case bson.EOO, bson.Object:
// valid
case bson.Null:
return
default:
panic(bson.NewBsonError("unexpected kind %v for CallerID", kind))
}
bson.Next(buf, 4)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
switch bson.ReadCString(buf) {
case "Principal":
callerID.Principal = bson.DecodeString(buf, kind)
case "Component":
callerID.Component = bson.DecodeString(buf, kind)
case "Subcomponent":
callerID.Subcomponent = bson.DecodeString(buf, kind)
default:
bson.Skip(buf, kind)
}
}
}

Просмотреть файл

@ -52,17 +52,3 @@ type QuerySplit struct {
BindVariables map[string]interface{}
RowCount int64
}
// CallerID is the BSON implementation of the proto3 vtrpc.CallerID
type CallerID struct {
Principal string
Component string
Subcomponent string
}
//go:generate bsongen -file $GOFILE -type CallerID -o callerid_bson.go
// VTGateCallerID is the BSON implementation of the proto3 query.VTGateCallerID
type VTGateCallerID struct {
Username string
}

Просмотреть файл

@ -9,6 +9,7 @@ package gorpcvtgatecommon
import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callerid/gorpccallerid"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
@ -17,7 +18,7 @@ import (
// Query represents a keyspace agnostic query request.
type Query struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Sql string
BindVariables map[string]interface{}
TabletType topodatapb.TabletType
@ -28,7 +29,7 @@ type Query struct {
// QueryShard represents a query request for the
// specified list of shards.
type QueryShard struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Sql string
BindVariables map[string]interface{}
Keyspace string
@ -41,7 +42,7 @@ type QueryShard struct {
// KeyspaceIdQuery represents a query request for the
// specified list of keyspace IDs.
type KeyspaceIdQuery struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Sql string
BindVariables map[string]interface{}
Keyspace string
@ -54,7 +55,7 @@ type KeyspaceIdQuery struct {
// KeyRangeQuery represents a query request for the
// specified list of keyranges.
type KeyRangeQuery struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Sql string
BindVariables map[string]interface{}
Keyspace string
@ -72,7 +73,7 @@ type EntityId struct {
// EntityIdsQuery represents a query request for the specified KeyspaceId map.
type EntityIdsQuery struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Sql string
BindVariables map[string]interface{}
Keyspace string
@ -102,7 +103,7 @@ type BoundShardQuery struct {
// BatchQueryShard represents a batch query request
// for the specified shards.
type BatchQueryShard struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Queries []BoundShardQuery
TabletType topodatapb.TabletType
AsTransaction bool
@ -121,7 +122,7 @@ type BoundKeyspaceIdQuery struct {
// KeyspaceIdBatchQuery represents a batch query request
// for the specified keyspace IDs.
type KeyspaceIdBatchQuery struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Queries []BoundKeyspaceIdQuery
TabletType topodatapb.TabletType
AsTransaction bool
@ -137,7 +138,7 @@ type QueryResultList struct {
// SplitQueryRequest is a request to split a query into multiple parts
type SplitQueryRequest struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Keyspace string
Query tproto.BoundQuery
SplitColumn string
@ -146,7 +147,7 @@ type SplitQueryRequest struct {
// BeginRequest is the BSON implementation of the proto3 query.BeginRequest
type BeginRequest struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
}
// BeginResponse is the BSON implementation of the proto3 vtgate.BeginResponse
@ -159,7 +160,7 @@ type BeginResponse struct {
// CommitRequest is the BSON implementation of the proto3 vtgate.CommitRequest
type CommitRequest struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Session *vtgatepb.Session
}
@ -172,7 +173,7 @@ type CommitResponse struct {
// RollbackRequest is the BSON implementation of the proto3 vtgate.RollbackRequest
type RollbackRequest struct {
CallerID *tproto.CallerID // only used by BSON
CallerID *gorpccallerid.CallerID
Session *vtgatepb.Session
}

Просмотреть файл

@ -13,6 +13,7 @@ import (
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/callerid/gorpccallerid"
"github.com/youtube/vitess/go/vt/rpc"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/vterrors"
@ -45,9 +46,9 @@ func dial(ctx context.Context, address string, timeout time.Duration) (vtgatecon
return &vtgateConn{rpcConn: rpcConn}, nil
}
func getEffectiveCallerID(ctx context.Context) *tproto.CallerID {
func getEffectiveCallerID(ctx context.Context) *gorpccallerid.CallerID {
if ef := callerid.EffectiveCallerIDFromContext(ctx); ef != nil {
return &tproto.CallerID{
return &gorpccallerid.CallerID{
Principal: ef.Principal,
Component: ef.Component,
Subcomponent: ef.Subcomponent,

Просмотреть файл

@ -13,6 +13,7 @@ import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/callerid"
"github.com/youtube/vitess/go/vt/callerid/gorpccallerid"
"github.com/youtube/vitess/go/vt/rpc"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vterrors"
@ -61,7 +62,7 @@ func (vtg *VTGate) Execute(ctx context.Context, request *gorpcvtgatecommon.Query
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
var vtgErr error
@ -82,7 +83,7 @@ func (vtg *VTGate) ExecuteShard(ctx context.Context, request *gorpcvtgatecommon.
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
var vtgErr error
@ -105,7 +106,7 @@ func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, request *gorpcvtgatec
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
var vtgErr error
@ -128,7 +129,7 @@ func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, request *gorpcvtgatecom
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
var vtgErr error
@ -151,7 +152,7 @@ func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, request *gorpcvtgatecom
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
var vtgErr error
@ -175,7 +176,7 @@ func (vtg *VTGate) ExecuteBatchShard(ctx context.Context, request *gorpcvtgateco
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
qs, err := gorpcvtgatecommon.BoundShardQueriesToProto(request.Queries)
@ -200,7 +201,7 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *gorpcvt
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
qs, err := gorpcvtgatecommon.BoundKeyspaceIdQueriesToProto(request.Queries)
@ -222,7 +223,7 @@ func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, request *gorpcvt
func (vtg *VTGate) StreamExecute(ctx context.Context, request *gorpcvtgatecommon.Query, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecute(ctx,
request.Sql,
@ -239,7 +240,7 @@ func (vtg *VTGate) StreamExecute(ctx context.Context, request *gorpcvtgatecommon
func (vtg *VTGate) StreamExecute2(ctx context.Context, request *gorpcvtgatecommon.Query, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.StreamExecute(ctx,
request.Sql,
@ -264,7 +265,7 @@ func (vtg *VTGate) StreamExecute2(ctx context.Context, request *gorpcvtgatecommo
func (vtg *VTGate) StreamExecuteShard(ctx context.Context, request *gorpcvtgatecommon.QueryShard, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteShards(ctx,
request.Sql,
@ -283,7 +284,7 @@ func (vtg *VTGate) StreamExecuteShard(ctx context.Context, request *gorpcvtgatec
func (vtg *VTGate) StreamExecuteShard2(ctx context.Context, request *gorpcvtgatecommon.QueryShard, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.StreamExecuteShards(ctx,
request.Sql,
@ -311,7 +312,7 @@ func (vtg *VTGate) StreamExecuteShard2(ctx context.Context, request *gorpcvtgate
func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, request *gorpcvtgatecommon.KeyspaceIdQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteKeyspaceIds(ctx,
request.Sql,
@ -331,7 +332,7 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, request *gorpcv
func (vtg *VTGate) StreamExecuteKeyspaceIds2(ctx context.Context, request *gorpcvtgatecommon.KeyspaceIdQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.StreamExecuteKeyspaceIds(ctx,
request.Sql,
@ -359,7 +360,7 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds2(ctx context.Context, request *gorpc
func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, request *gorpcvtgatecommon.KeyRangeQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
return vtg.server.StreamExecuteKeyRanges(ctx,
request.Sql,
@ -379,7 +380,7 @@ func (vtg *VTGate) StreamExecuteKeyRanges(ctx context.Context, request *gorpcvtg
func (vtg *VTGate) StreamExecuteKeyRanges2(ctx context.Context, request *gorpcvtgatecommon.KeyRangeQuery, sendReply func(interface{}) error) (err error) {
defer vtg.server.HandlePanic(&err)
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
vtgErr := vtg.server.StreamExecuteKeyRanges(ctx,
request.Sql,
@ -437,7 +438,7 @@ func (vtg *VTGate) Begin2(ctx context.Context, request *gorpcvtgatecommon.BeginR
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
// Don't pass in a nil pointer
session, vtgErr := vtg.server.Begin(ctx)
@ -452,7 +453,7 @@ func (vtg *VTGate) Commit2(ctx context.Context, request *gorpcvtgatecommon.Commi
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
vtgErr := vtg.server.Commit(ctx, request.Session)
@ -466,7 +467,7 @@ func (vtg *VTGate) Rollback2(ctx context.Context, request *gorpcvtgatecommon.Rol
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
sessionFromRPC(request.Session)
vtgErr := vtg.server.Rollback(ctx, request.Session)
@ -480,7 +481,7 @@ func (vtg *VTGate) SplitQuery(ctx context.Context, request *gorpcvtgatecommon.Sp
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
ctx = callerid.NewContext(ctx,
callerid.GoRPCEffectiveCallerID(request.CallerID),
gorpccallerid.GoRPCEffectiveCallerID(request.CallerID),
callerid.NewImmediateCallerID("gorpc client"))
var vtgErr error
reply.Splits, vtgErr = vtg.server.SplitQuery(ctx,