A new SingleDb flag has been added to Begin, which would survive
through the Session. If set, any transaction that exceeds a single
shard will be failed.

VTGate acquires a command line flag: transaction_mode, which can be
single, multi or 2pc. In single mode, it will fail Begins that request
multi-shard. In multi mode, it will fail commits that request 2pc.
In 2pc mode, everything will be allowed.

The per-request flags specify what the app wants, and the vtgate flags
specify what it allows.

* 2pc: Go and python clients

For Go, the transaction mode settings are supported through
the context. This means that it will only work with go1.8.

For Python, it's a cursor constructor parameter.

* 2pc: end-to-end test
This commit is contained in:
sougou 2016-12-05 16:28:44 -08:00 коммит произвёл GitHub
Родитель 608708fe40
Коммит 7933b66e43
32 изменённых файлов: 741 добавлений и 315 удалений

Просмотреть файл

@ -142,7 +142,7 @@ func main() {
topodatapb.TabletType_REPLICA,
topodatapb.TabletType_RDONLY,
}
vtgate.Init(context.Background(), healthCheck, ts, resilientSrvTopoServer, tpb.Cells[0], 2 /*retryCount*/, tabletTypesToWait)
vtgate.Init(context.Background(), healthCheck, ts, resilientSrvTopoServer, tpb.Cells[0], 2 /*retryCount*/, tabletTypesToWait, vtgate.TxMulti)
// vtctld configuration and init
vtctld.InitVtctld(ts)

Просмотреть файл

@ -31,6 +31,7 @@ var (
healthCheckRetryDelay = flag.Duration("healthcheck_retry_delay", 2*time.Millisecond, "health check retry delay")
healthCheckTimeout = flag.Duration("healthcheck_timeout", time.Minute, "the health check timeout period")
tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization")
transactionMode = flag.String("transaction_mode", "multi", "single: disallow multi-db transactions, multi: allow multi-db transactions with best effort commit, twopc: allow multi-db transactions with 2pc commit")
)
var resilientSrvTopoServer *vtgate.ResilientSrvTopoServer
@ -71,7 +72,22 @@ func main() {
tabletTypes = append(tabletTypes, tt)
}
}
vtg := vtgate.Init(context.Background(), healthCheck, ts, resilientSrvTopoServer, *cell, *retryCount, tabletTypes)
txMode := vtgate.TxMulti
switch *transactionMode {
case "single":
log.Infof("Transaction mode: '%s'", *transactionMode)
txMode = vtgate.TxSingle
case "multi":
log.Infof("Transaction mode: '%s'", *transactionMode)
case "twopc":
log.Infof("Transaction mode: '%s'", *transactionMode)
txMode = vtgate.TxTwoPC
default:
log.Warningf("Unrecognized transactionMode '%s'. Continuing with default 'multi'", *transactionMode)
}
vtg := vtgate.Init(context.Background(), healthCheck, ts, resilientSrvTopoServer, *cell, *retryCount, tabletTypes, txMode)
servenv.OnRun(func() {
addStatusParts(vtg)

Просмотреть файл

@ -235,14 +235,14 @@ func (c *errorClient) StreamExecuteKeyRanges(ctx context.Context, sql string, bi
return c.fallbackClient.StreamExecuteKeyRanges(ctx, sql, bindVariables, keyspace, keyRanges, tabletType, options, sendReply)
}
func (c *errorClient) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (c *errorClient) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
// The client sends the error request through the callerid, as there are no other parameters
cid := callerid.EffectiveCallerIDFromContext(ctx)
request := callerid.GetPrincipal(cid)
if err := requestToError(request); err != nil {
return nil, err
}
return c.fallbackClient.Begin(ctx)
return c.fallbackClient.Begin(ctx, singledb)
}
func (c *errorClient) Commit(ctx context.Context, twopc bool, session *vtgatepb.Session) error {

Просмотреть файл

@ -71,8 +71,8 @@ func (c fallbackClient) StreamExecuteKeyRanges(ctx context.Context, sql string,
return c.fallback.StreamExecuteKeyRanges(ctx, sql, bindVariables, keyspace, keyRanges, tabletType, options, sendReply)
}
func (c fallbackClient) Begin(ctx context.Context) (*vtgatepb.Session, error) {
return c.fallback.Begin(ctx)
func (c fallbackClient) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
return c.fallback.Begin(ctx, singledb)
}
func (c fallbackClient) Commit(ctx context.Context, twopc bool, session *vtgatepb.Session) error {

Просмотреть файл

@ -26,9 +26,10 @@ func newSuccessClient(fallback vtgateservice.VTGateService) *successClient {
}
}
func (c *successClient) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (c *successClient) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
return &vtgatepb.Session{
InTransaction: true,
SingleDb: singledb,
}, nil
}

Просмотреть файл

@ -76,7 +76,7 @@ func (c *terminalClient) StreamExecuteKeyRanges(ctx context.Context, sql string,
return errTerminal
}
func (c *terminalClient) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (c *terminalClient) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
return nil, errTerminal
}

Просмотреть файл

@ -75,6 +75,9 @@ const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Session struct {
InTransaction bool `protobuf:"varint,1,opt,name=in_transaction,json=inTransaction" json:"in_transaction,omitempty"`
ShardSessions []*Session_ShardSession `protobuf:"bytes,2,rep,name=shard_sessions,json=shardSessions" json:"shard_sessions,omitempty"`
// single_db specifies if the transaction should be restricted
// to a single database.
SingleDb bool `protobuf:"varint,3,opt,name=single_db,json=singleDb" json:"single_db,omitempty"`
}
func (m *Session) Reset() { *m = Session{} }
@ -1104,6 +1107,9 @@ type BeginRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
CallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=caller_id,json=callerId" json:"caller_id,omitempty"`
// single_db specifies if the transaction should be restricted
// to a single database.
SingleDb bool `protobuf:"varint,2,opt,name=single_db,json=singleDb" json:"single_db,omitempty"`
}
func (m *BeginRequest) Reset() { *m = BeginRequest{} }
@ -1597,100 +1603,102 @@ func init() {
func init() { proto.RegisterFile("vtgate.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 1518 bytes of a gzipped FileDescriptorProto
// 1543 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xd4, 0x59, 0x5d, 0x6f, 0x1b, 0x45,
0x17, 0xd6, 0xae, 0xbf, 0x8f, 0x3f, 0x92, 0x4e, 0x9d, 0xd6, 0xf5, 0x9b, 0xb7, 0x71, 0x57, 0x44,
0x75, 0x21, 0x72, 0xd5, 0x94, 0x2f, 0x21, 0x24, 0x20, 0x26, 0x42, 0x51, 0xa1, 0x94, 0x49, 0x40,
0x5c, 0x50, 0xad, 0x36, 0xf6, 0x28, 0x5d, 0xec, 0xdd, 0x75, 0x77, 0x66, 0x5d, 0xcc, 0x05, 0xea,
0x3f, 0xe8, 0x15, 0x12, 0x42, 0x48, 0x08, 0x89, 0x5b, 0x6e, 0x91, 0xb8, 0xe3, 0x02, 0x89, 0x1f,
0xc0, 0x05, 0xf7, 0xfc, 0x01, 0x04, 0xbf, 0x00, 0xed, 0xcc, 0xec, 0x87, 0x37, 0xb1, 0xe3, 0x38,
0x49, 0xe5, 0x5c, 0x65, 0x67, 0xe6, 0xcc, 0xcc, 0x33, 0xcf, 0x79, 0xe6, 0x9c, 0x39, 0x31, 0x94,
0x86, 0xec, 0xc0, 0x60, 0xa4, 0x35, 0x70, 0x1d, 0xe6, 0xa0, 0xac, 0x68, 0xd5, 0x8b, 0x8f, 0x3d,
0xe2, 0x8e, 0x44, 0x67, 0xbd, 0xc2, 0x9c, 0x81, 0xd3, 0x35, 0x98, 0x21, 0xdb, 0xc5, 0x21, 0x73,
0x07, 0x1d, 0xd1, 0xd0, 0xfe, 0x50, 0x20, 0xb7, 0x4b, 0x28, 0x35, 0x1d, 0x1b, 0xad, 0x43, 0xc5,
0xb4, 0x75, 0xe6, 0x1a, 0x36, 0x35, 0x3a, 0xcc, 0x74, 0xec, 0x9a, 0xd2, 0x50, 0x9a, 0x79, 0x5c,
0x36, 0xed, 0xbd, 0xa8, 0x13, 0xb5, 0xa1, 0x42, 0x1f, 0x19, 0x6e, 0x57, 0xa7, 0x62, 0x1e, 0xad,
0xa9, 0x8d, 0x54, 0xb3, 0xb8, 0xb9, 0xda, 0x92, 0x58, 0xe4, 0x7a, 0xad, 0x5d, 0xdf, 0x4a, 0x36,
0x70, 0x99, 0xc6, 0x5a, 0xb4, 0xfe, 0x19, 0x94, 0xe2, 0xc3, 0x68, 0x1d, 0xb2, 0xcc, 0x70, 0x0f,
0x08, 0xe3, 0x7b, 0x16, 0x37, 0xcb, 0x2d, 0x71, 0x84, 0x3d, 0xde, 0x89, 0xe5, 0xa0, 0x0f, 0x31,
0x86, 0x4f, 0x37, 0xbb, 0x35, 0xb5, 0xa1, 0x34, 0x53, 0xb8, 0x1c, 0xeb, 0xdd, 0xe9, 0x6a, 0xbf,
0xa9, 0x50, 0xd9, 0xfe, 0x82, 0x74, 0x3c, 0x46, 0x30, 0x79, 0xec, 0x11, 0xca, 0xd0, 0x06, 0x14,
0x3a, 0x46, 0xbf, 0x4f, 0x5c, 0x7f, 0x92, 0xd8, 0x63, 0xa9, 0x25, 0x98, 0x68, 0xf3, 0xfe, 0x9d,
0x77, 0x71, 0x5e, 0x58, 0xec, 0x74, 0xd1, 0x2d, 0xc8, 0xc9, 0xd3, 0xf1, 0x0d, 0x84, 0x6d, 0xfc,
0x70, 0x38, 0x18, 0x47, 0x37, 0x21, 0xc3, 0xa1, 0xd6, 0x52, 0xdc, 0xf0, 0x92, 0x04, 0xbe, 0xe5,
0x78, 0x76, 0xf7, 0x23, 0xff, 0x13, 0x8b, 0x71, 0xf4, 0x0a, 0x14, 0x99, 0xb1, 0xdf, 0x27, 0x4c,
0x67, 0xa3, 0x01, 0xa9, 0xa5, 0x1b, 0x4a, 0xb3, 0xb2, 0x59, 0x6d, 0x85, 0xde, 0xd9, 0xe3, 0x83,
0x7b, 0xa3, 0x01, 0xc1, 0xc0, 0xc2, 0x6f, 0xb4, 0x01, 0xc8, 0x76, 0x98, 0x9e, 0xf0, 0x4c, 0x86,
0x7b, 0x66, 0xd9, 0x76, 0xd8, 0xce, 0x98, 0x73, 0xea, 0x90, 0xef, 0x91, 0x11, 0x1d, 0x18, 0x1d,
0x52, 0xcb, 0x36, 0x94, 0x66, 0x01, 0x87, 0x6d, 0x74, 0x1b, 0x72, 0xce, 0x80, 0x71, 0x8f, 0xe5,
0x38, 0xd6, 0x15, 0x89, 0x55, 0x52, 0xf5, 0xa1, 0x18, 0xc4, 0x81, 0x95, 0xf6, 0x4c, 0x81, 0xa5,
0x90, 0x46, 0x3a, 0x70, 0x6c, 0x4a, 0xd0, 0x3a, 0x64, 0x88, 0xeb, 0x3a, 0x6e, 0x82, 0x43, 0xfc,
0xa0, 0xbd, 0xed, 0x77, 0x63, 0x31, 0x7a, 0x12, 0x02, 0x5f, 0x84, 0xac, 0x4b, 0xa8, 0xd7, 0x67,
0x92, 0x41, 0x24, 0x51, 0x09, 0xf2, 0xf8, 0x08, 0x96, 0x16, 0xda, 0x5f, 0x2a, 0x54, 0x25, 0x22,
0x2e, 0x1f, 0xba, 0x38, 0xee, 0x8d, 0x33, 0x9f, 0x4e, 0x30, 0x7f, 0x05, 0xb2, 0x5c, 0xfe, 0xb4,
0x96, 0x69, 0xa4, 0x9a, 0x05, 0x2c, 0x5b, 0x49, 0x49, 0x64, 0x4f, 0x25, 0x89, 0xdc, 0x04, 0x49,
0xc4, 0xdc, 0x9e, 0x9f, 0xc9, 0xed, 0x5f, 0x2b, 0xb0, 0x92, 0x20, 0x79, 0x21, 0x9c, 0xff, 0xaf,
0x0a, 0xd7, 0x24, 0xae, 0x7b, 0x92, 0xd9, 0x9d, 0x8b, 0xa2, 0x80, 0x1b, 0x50, 0x0a, 0xbe, 0x75,
0x53, 0xea, 0xa0, 0x84, 0x8b, 0xbd, 0xe8, 0x1c, 0x0b, 0x2a, 0x86, 0x6f, 0x15, 0xa8, 0x1f, 0x45,
0xfa, 0x42, 0x28, 0xe2, 0x69, 0x0a, 0xae, 0x46, 0xe0, 0xb0, 0x61, 0x1f, 0x90, 0x0b, 0xa2, 0x87,
0x3b, 0x00, 0x3d, 0x32, 0xd2, 0x5d, 0x0e, 0x99, 0xab, 0xc1, 0x3f, 0x69, 0xe8, 0xeb, 0xe0, 0x34,
0xb8, 0xd0, 0x0b, 0xce, 0xb5, 0xa0, 0xfa, 0xf8, 0x46, 0x81, 0xda, 0x61, 0x17, 0x2c, 0x84, 0x3a,
0x7e, 0x49, 0x87, 0xea, 0xd8, 0xb6, 0x99, 0xc9, 0x46, 0x17, 0x26, 0x5a, 0x6c, 0x00, 0x22, 0x1c,
0xb1, 0xde, 0x71, 0xfa, 0x9e, 0x65, 0xeb, 0xb6, 0x61, 0x11, 0x9e, 0xf3, 0x0b, 0x78, 0x59, 0x8c,
0xb4, 0xf9, 0xc0, 0x7d, 0xc3, 0x22, 0xe8, 0x53, 0xb8, 0x2c, 0xad, 0xc7, 0x42, 0x4c, 0x96, 0x8b,
0xaa, 0x19, 0x20, 0x9d, 0xc0, 0x44, 0x2b, 0xe8, 0xc0, 0x97, 0xc4, 0x22, 0xf7, 0x26, 0x87, 0xa4,
0xdc, 0xa9, 0x24, 0x97, 0x3f, 0x5e, 0x72, 0x85, 0x59, 0x24, 0x57, 0xdf, 0x87, 0x7c, 0x00, 0x1a,
0xad, 0x41, 0x9a, 0x43, 0x53, 0x38, 0xb4, 0x62, 0xf0, 0x6a, 0xf4, 0x11, 0xf1, 0x01, 0x54, 0x85,
0xcc, 0xd0, 0xe8, 0x7b, 0x84, 0x3b, 0xae, 0x84, 0x45, 0x03, 0xad, 0x41, 0x31, 0xc6, 0x15, 0xf7,
0x55, 0x09, 0x43, 0x14, 0x8d, 0xe3, 0xb2, 0x8e, 0x31, 0xb6, 0x10, 0xb2, 0xb6, 0x61, 0x89, 0xab,
0x89, 0xe7, 0x66, 0x6e, 0x10, 0x89, 0x4e, 0x39, 0x81, 0xe8, 0xd4, 0x89, 0x8f, 0x94, 0x54, 0xfc,
0x91, 0xa2, 0xfd, 0x1c, 0xa5, 0xdd, 0x2d, 0x83, 0x75, 0x1e, 0x3d, 0xa7, 0x87, 0xd7, 0x1d, 0xc8,
0xf9, 0x98, 0x4d, 0x22, 0xf0, 0x14, 0x37, 0xaf, 0x06, 0xa6, 0x89, 0xd3, 0xe3, 0xc0, 0x6e, 0xde,
0x17, 0xf6, 0x3a, 0x54, 0x0c, 0x7a, 0xc4, 0xeb, 0xba, 0x6c, 0xd0, 0x09, 0x3a, 0xcd, 0xce, 0x14,
0x1a, 0xbf, 0x8b, 0x52, 0xe7, 0x18, 0x71, 0xe7, 0xa6, 0xa2, 0x0d, 0xc8, 0x09, 0x8d, 0x04, 0x94,
0x1d, 0x25, 0xa3, 0xc0, 0x44, 0xfb, 0x0a, 0xaa, 0x9c, 0xc9, 0xe8, 0xc2, 0x9f, 0xa1, 0x98, 0x92,
0xef, 0x9d, 0xd4, 0xa1, 0xf7, 0x8e, 0xf6, 0xab, 0x0a, 0xd7, 0xe3, 0xf4, 0x3c, 0xcf, 0x37, 0xdd,
0xab, 0x49, 0x71, 0xad, 0x8e, 0x89, 0x2b, 0x41, 0xc9, 0xc2, 0x2a, 0xec, 0x07, 0x05, 0xd6, 0x26,
0x52, 0xb8, 0x20, 0x32, 0xfb, 0x47, 0x81, 0xea, 0x2e, 0x73, 0x89, 0x61, 0x9d, 0xaa, 0x22, 0x0f,
0x55, 0xa9, 0x9e, 0xac, 0xcc, 0x4e, 0xcd, 0xe8, 0xa2, 0x69, 0xe9, 0x38, 0xe6, 0x97, 0xcc, 0x4c,
0x7e, 0x69, 0xc3, 0x4a, 0xe2, 0xc8, 0xd2, 0x19, 0x51, 0x9c, 0x57, 0x8e, 0x8d, 0xf3, 0xcf, 0x54,
0xa8, 0x8f, 0xad, 0x72, 0x9a, 0xc0, 0x3b, 0x33, 0x7d, 0x71, 0x1e, 0x52, 0x13, 0x33, 0x44, 0x7a,
0x5a, 0x19, 0x9b, 0x99, 0x91, 0xf2, 0x13, 0xcb, 0x7d, 0x07, 0xfe, 0x77, 0x24, 0x21, 0x73, 0x90,
0xfb, 0xbd, 0x0a, 0x6b, 0x63, 0x6b, 0x9d, 0x3a, 0xfa, 0x9c, 0x09, 0xc3, 0xc9, 0xb0, 0x99, 0x3e,
0xb6, 0x4c, 0x3c, 0x37, 0xb2, 0xef, 0x43, 0x63, 0x32, 0x41, 0x73, 0x30, 0xfe, 0x93, 0x0a, 0xff,
0x4f, 0x2e, 0x78, 0x9a, 0x8a, 0xed, 0x4c, 0xf8, 0x1e, 0x2f, 0xc3, 0xd2, 0x73, 0x94, 0x61, 0xe7,
0xc6, 0xff, 0xfb, 0x70, 0x7d, 0x12, 0x5d, 0x73, 0xb0, 0xff, 0x26, 0x94, 0xb6, 0xc8, 0x81, 0x69,
0xcf, 0xc5, 0xb5, 0xf6, 0x06, 0x94, 0xe5, 0x6c, 0xb9, 0x75, 0x2c, 0x5b, 0x28, 0xd3, 0xb3, 0x85,
0xf6, 0x54, 0x81, 0x72, 0xdb, 0xb1, 0x2c, 0x93, 0x9d, 0x7b, 0x56, 0xbf, 0x02, 0x59, 0x83, 0x39,
0x96, 0xd9, 0xe1, 0x7e, 0xce, 0x63, 0xd9, 0xd2, 0x96, 0xa1, 0x12, 0x20, 0x10, 0xf8, 0xb5, 0xcf,
0x61, 0x09, 0x3b, 0xfd, 0xfe, 0xbe, 0xd1, 0xe9, 0x9d, 0x37, 0x2a, 0x0d, 0xc1, 0x72, 0xb4, 0x97,
0xdc, 0xff, 0x21, 0x5c, 0xc3, 0x84, 0x3a, 0xfd, 0x21, 0x89, 0xe5, 0xff, 0xf9, 0x90, 0x20, 0x48,
0x77, 0x99, 0xfc, 0x47, 0x78, 0x01, 0xf3, 0x6f, 0x6d, 0x15, 0xea, 0x47, 0x2d, 0x2f, 0x37, 0xff,
0x5b, 0x85, 0x4b, 0xbb, 0x83, 0xbe, 0xc9, 0xa4, 0x50, 0xe6, 0xd9, 0x75, 0xda, 0xdb, 0x6f, 0xe6,
0x12, 0xf8, 0x06, 0x94, 0xa8, 0x8f, 0x43, 0x56, 0xb9, 0x32, 0xab, 0x14, 0x79, 0x9f, 0xa8, 0x6f,
0xfd, 0x42, 0x2d, 0x30, 0xf1, 0x6c, 0xc6, 0x6f, 0x5b, 0x0a, 0x83, 0xb4, 0xf0, 0x6c, 0x86, 0x5e,
0x86, 0xab, 0xb6, 0x67, 0xe9, 0xae, 0xf3, 0x84, 0xea, 0x03, 0xe2, 0xea, 0x7c, 0x65, 0x7d, 0x60,
0xb8, 0x8c, 0xdf, 0xb3, 0x14, 0xbe, 0x6c, 0x7b, 0x16, 0x76, 0x9e, 0xd0, 0x07, 0xc4, 0xe5, 0x9b,
0x3f, 0x30, 0x5c, 0x86, 0xde, 0x86, 0x82, 0xd1, 0x3f, 0x70, 0x5c, 0x93, 0x3d, 0xb2, 0x64, 0x59,
0xab, 0x49, 0x98, 0x87, 0x98, 0x69, 0xbd, 0x13, 0x58, 0xe2, 0x68, 0x12, 0x7a, 0x09, 0x90, 0x47,
0x89, 0x2e, 0xc0, 0x89, 0x4d, 0x87, 0x9b, 0xb2, 0xc6, 0x5d, 0xf2, 0x28, 0x89, 0x96, 0xf9, 0x64,
0x53, 0xfb, 0x3d, 0x05, 0x28, 0xbe, 0xae, 0xbc, 0x45, 0xaf, 0x41, 0x96, 0xcf, 0xa7, 0x35, 0x85,
0x47, 0x9e, 0xb5, 0x50, 0x43, 0x87, 0x6c, 0x5b, 0x3e, 0x6c, 0x2c, 0xcd, 0xeb, 0x0f, 0xa1, 0x14,
0x84, 0x03, 0x7e, 0x9c, 0xb8, 0x37, 0x94, 0xa9, 0x21, 0x4e, 0x9d, 0x21, 0xc4, 0xd5, 0xdf, 0x82,
0x02, 0x4f, 0xad, 0xc7, 0xae, 0x1d, 0x3d, 0x08, 0xd4, 0xf8, 0x83, 0xa0, 0xfe, 0xa7, 0x02, 0x69,
0x3e, 0x79, 0xe6, 0x5a, 0xe2, 0x03, 0xa8, 0x84, 0x28, 0x85, 0xf7, 0xc4, 0xb5, 0xba, 0x39, 0x85,
0x92, 0x38, 0x05, 0xb8, 0xd4, 0x8b, 0x13, 0xd2, 0x06, 0x10, 0xbf, 0x51, 0xf1, 0xa5, 0x84, 0x0e,
0x5f, 0x98, 0xb2, 0x54, 0x78, 0x5c, 0x5c, 0xa0, 0xe1, 0xc9, 0x11, 0xa4, 0xa9, 0xf9, 0xa5, 0x78,
0x0e, 0xa6, 0x30, 0xff, 0xd6, 0xee, 0xc2, 0xca, 0x7b, 0x84, 0xed, 0xba, 0xc3, 0x20, 0x1d, 0x06,
0xd7, 0x67, 0x0a, 0x4d, 0x1a, 0x86, 0x2b, 0xc9, 0x49, 0x52, 0x01, 0xaf, 0x43, 0x89, 0xba, 0x43,
0x7d, 0x6c, 0xa6, 0x9f, 0x1a, 0x42, 0xf7, 0xc4, 0x27, 0x15, 0x69, 0xd4, 0xd0, 0x7e, 0x54, 0xe1,
0xf2, 0xc7, 0x83, 0xae, 0xc1, 0x88, 0xc8, 0x12, 0x67, 0x7f, 0x8d, 0xab, 0x90, 0xe1, 0x5c, 0xc8,
0xa4, 0x29, 0x1a, 0xe8, 0x36, 0x14, 0x42, 0x47, 0x71, 0x66, 0x8e, 0x56, 0x53, 0x3e, 0x70, 0xc7,
0xbc, 0xf9, 0x72, 0x15, 0x0a, 0xcc, 0xb4, 0x08, 0x65, 0x86, 0x35, 0x90, 0x37, 0x39, 0xea, 0xf0,
0x75, 0x45, 0x86, 0xc4, 0x66, 0xf2, 0x87, 0xac, 0x40, 0x57, 0xdb, 0x7e, 0xdf, 0x9e, 0xd3, 0x23,
0x36, 0x16, 0xe3, 0x5a, 0x0f, 0xaa, 0xe3, 0x2c, 0x49, 0xe2, 0x9b, 0xc1, 0x02, 0xe3, 0xa9, 0x53,
0x66, 0x5c, 0x7f, 0x44, 0xae, 0x80, 0x6e, 0xc1, 0xb2, 0x9f, 0x43, 0x2d, 0xa2, 0x47, 0x78, 0xc4,
0x8f, 0x8e, 0x4b, 0xa2, 0x7f, 0x2f, 0xe8, 0xde, 0xaa, 0x43, 0xad, 0xe3, 0x58, 0xad, 0x91, 0xe3,
0x31, 0x6f, 0x9f, 0xb4, 0x86, 0x26, 0x23, 0x94, 0x8a, 0x1f, 0x5a, 0xf7, 0xb3, 0xfc, 0xcf, 0xdd,
0xff, 0x02, 0x00, 0x00, 0xff, 0xff, 0x36, 0x13, 0x91, 0xa1, 0xb1, 0x1d, 0x00, 0x00,
0x17, 0xd6, 0xae, 0xbf, 0x8f, 0x3f, 0x92, 0x4e, 0x9d, 0xd6, 0x75, 0xf3, 0x36, 0xee, 0xea, 0x8d,
0xea, 0x42, 0xe4, 0xaa, 0x2e, 0x5f, 0xe2, 0x06, 0x88, 0x1b, 0xa1, 0xa8, 0x50, 0xca, 0x24, 0x20,
0x90, 0xa8, 0x56, 0x6b, 0x7b, 0x94, 0x2e, 0xf6, 0xee, 0xba, 0x3b, 0xb3, 0x2e, 0xe6, 0x02, 0xf5,
0x1f, 0xf4, 0x0a, 0x09, 0x21, 0x24, 0x84, 0xc4, 0x2d, 0xb7, 0x48, 0xdc, 0x71, 0x81, 0xc4, 0x4f,
0xe0, 0x9e, 0x3f, 0xc0, 0xc7, 0x2f, 0x40, 0x3b, 0x33, 0xeb, 0x5d, 0x6f, 0x62, 0xc7, 0x71, 0x92,
0xca, 0xbd, 0xca, 0xce, 0xcc, 0x99, 0x33, 0xcf, 0x3c, 0xe7, 0x99, 0x39, 0x73, 0x62, 0x28, 0x0c,
0xd9, 0x81, 0xc1, 0x48, 0x63, 0xe0, 0x3a, 0xcc, 0x41, 0x69, 0xd1, 0xaa, 0xe6, 0x1f, 0x7b, 0xc4,
0x1d, 0x89, 0xce, 0x6a, 0x89, 0x39, 0x03, 0xa7, 0x6b, 0x30, 0x43, 0xb6, 0xf3, 0x43, 0xe6, 0x0e,
0x3a, 0xa2, 0xa1, 0xfd, 0xad, 0x40, 0x66, 0x8f, 0x50, 0x6a, 0x3a, 0x36, 0xda, 0x84, 0x92, 0x69,
0xeb, 0xcc, 0x35, 0x6c, 0x6a, 0x74, 0x98, 0xe9, 0xd8, 0x15, 0xa5, 0xa6, 0xd4, 0xb3, 0xb8, 0x68,
0xda, 0xfb, 0x61, 0x27, 0x6a, 0x41, 0x89, 0x3e, 0x32, 0xdc, 0xae, 0x4e, 0xc5, 0x3c, 0x5a, 0x51,
0x6b, 0x89, 0x7a, 0xbe, 0xb9, 0xde, 0x90, 0x58, 0xa4, 0xbf, 0xc6, 0x9e, 0x6f, 0x25, 0x1b, 0xb8,
0x48, 0x23, 0x2d, 0x8a, 0xae, 0x42, 0x8e, 0x9a, 0xf6, 0x41, 0x9f, 0xe8, 0xdd, 0x76, 0x25, 0xc1,
0x97, 0xc9, 0x8a, 0x8e, 0xbb, 0xed, 0xea, 0x67, 0x50, 0x88, 0xce, 0x45, 0x9b, 0x90, 0x66, 0x86,
0x7b, 0x40, 0x18, 0x07, 0x94, 0x6f, 0x16, 0x1b, 0x62, 0x7f, 0xfb, 0xbc, 0x13, 0xcb, 0x41, 0x1f,
0x7f, 0x04, 0xbc, 0x6e, 0x76, 0x2b, 0x6a, 0x4d, 0xa9, 0x27, 0x70, 0x31, 0xd2, 0xbb, 0xdb, 0xd5,
0x7e, 0x53, 0xa1, 0xb4, 0xf3, 0x05, 0xe9, 0x78, 0x8c, 0x60, 0xf2, 0xd8, 0x23, 0x94, 0xa1, 0x2d,
0xc8, 0x75, 0x8c, 0x7e, 0x9f, 0xb8, 0xfe, 0x24, 0xb1, 0xc6, 0x4a, 0x43, 0xd0, 0xd4, 0xe2, 0xfd,
0xbb, 0x77, 0x71, 0x56, 0x58, 0xec, 0x76, 0xd1, 0x4d, 0xc8, 0xc8, 0xad, 0xf3, 0x05, 0x84, 0x6d,
0x74, 0xe7, 0x38, 0x18, 0x47, 0x37, 0x20, 0xc5, 0xa1, 0xf2, 0x2d, 0xe6, 0x9b, 0x17, 0x24, 0xf0,
0x6d, 0xc7, 0xb3, 0xbb, 0x1f, 0xfa, 0x9f, 0x58, 0x8c, 0xa3, 0x57, 0x21, 0xcf, 0x8c, 0x76, 0x9f,
0x30, 0x9d, 0x8d, 0x06, 0xa4, 0x92, 0xac, 0x29, 0xf5, 0x52, 0xb3, 0xdc, 0x18, 0x87, 0x6e, 0x9f,
0x0f, 0xee, 0x8f, 0x06, 0x04, 0x03, 0x1b, 0x7f, 0xa3, 0x2d, 0x40, 0xb6, 0xc3, 0xf4, 0x58, 0xd8,
0x52, 0x9c, 0xcf, 0x55, 0xdb, 0x61, 0xbb, 0x13, 0x91, 0xab, 0x42, 0xb6, 0x47, 0x46, 0x74, 0x60,
0x74, 0x48, 0x25, 0x5d, 0x53, 0xea, 0x39, 0x3c, 0x6e, 0xa3, 0x5b, 0x90, 0x71, 0x06, 0x8c, 0x87,
0x33, 0xc3, 0xb1, 0xae, 0x49, 0xac, 0x92, 0xaa, 0x0f, 0xc4, 0x20, 0x0e, 0xac, 0xb4, 0x67, 0x0a,
0xac, 0x8c, 0x69, 0xa4, 0x03, 0xc7, 0xa6, 0x04, 0x6d, 0x42, 0x8a, 0xb8, 0xae, 0xe3, 0xc6, 0x38,
0xc4, 0x0f, 0x5a, 0x3b, 0x7e, 0x37, 0x16, 0xa3, 0x27, 0x21, 0xf0, 0x25, 0x48, 0xbb, 0x84, 0x7a,
0x7d, 0x26, 0x19, 0x44, 0x12, 0x95, 0x20, 0x8f, 0x8f, 0x60, 0x69, 0xa1, 0xfd, 0xa9, 0x42, 0x59,
0x22, 0xe2, 0xf2, 0xa1, 0xcb, 0x13, 0xde, 0x28, 0xf3, 0xc9, 0x18, 0xf3, 0x97, 0x20, 0xcd, 0xcf,
0x06, 0xad, 0xa4, 0x6a, 0x89, 0x7a, 0x0e, 0xcb, 0x56, 0x5c, 0x12, 0xe9, 0x53, 0x49, 0x22, 0x33,
0x45, 0x12, 0x91, 0xb0, 0x67, 0xe7, 0x0a, 0xfb, 0xd7, 0x0a, 0xac, 0xc5, 0x48, 0x5e, 0x8a, 0xe0,
0xff, 0xab, 0xc2, 0x15, 0x89, 0xeb, 0x9e, 0x64, 0x76, 0xf7, 0x45, 0x51, 0xc0, 0x75, 0x28, 0x04,
0xdf, 0xba, 0x29, 0x75, 0x50, 0xc0, 0xf9, 0x5e, 0xb8, 0x8f, 0x25, 0x15, 0xc3, 0xb7, 0x0a, 0x54,
0x8f, 0x22, 0x7d, 0x29, 0x14, 0xf1, 0x34, 0x01, 0x97, 0x43, 0x70, 0xd8, 0xb0, 0x0f, 0xc8, 0x0b,
0xa2, 0x87, 0xdb, 0x00, 0x3d, 0x32, 0xd2, 0x5d, 0x0e, 0x99, 0xab, 0xc1, 0xdf, 0xe9, 0x38, 0xd6,
0xc1, 0x6e, 0x70, 0xae, 0x17, 0xec, 0x6b, 0x49, 0xf5, 0xf1, 0x8d, 0x02, 0x95, 0xc3, 0x21, 0x58,
0x0a, 0x75, 0xfc, 0x92, 0x1c, 0xab, 0x63, 0xc7, 0x66, 0x26, 0x1b, 0xbd, 0x30, 0xb7, 0xc5, 0x16,
0x20, 0xc2, 0x11, 0xeb, 0x1d, 0xa7, 0xef, 0x59, 0xb6, 0x6e, 0x1b, 0x16, 0xe1, 0x39, 0x3f, 0x87,
0x57, 0xc5, 0x48, 0x8b, 0x0f, 0xdc, 0x37, 0x2c, 0x82, 0x3e, 0x81, 0x8b, 0xd2, 0x7a, 0xe2, 0x8a,
0x49, 0x73, 0x51, 0xd5, 0x03, 0xa4, 0x53, 0x98, 0x68, 0x04, 0x1d, 0xf8, 0x82, 0x70, 0x72, 0x6f,
0xfa, 0x95, 0x94, 0x39, 0x95, 0xe4, 0xb2, 0xc7, 0x4b, 0x2e, 0x37, 0x8f, 0xe4, 0xaa, 0x6d, 0xc8,
0x06, 0xa0, 0xd1, 0x06, 0x24, 0x39, 0x34, 0x85, 0x43, 0xcb, 0x07, 0xaf, 0x46, 0x1f, 0x11, 0x1f,
0x40, 0x65, 0x48, 0x0d, 0x8d, 0xbe, 0x47, 0x78, 0xe0, 0x0a, 0x58, 0x34, 0xd0, 0x06, 0xe4, 0x23,
0x5c, 0xf1, 0x58, 0x15, 0x30, 0x84, 0xb7, 0x71, 0x54, 0xd6, 0x11, 0xc6, 0x96, 0x42, 0xd6, 0x36,
0xac, 0x70, 0x35, 0xf1, 0xdc, 0xcc, 0x0d, 0x42, 0xd1, 0x29, 0x27, 0x10, 0x9d, 0x3a, 0xf5, 0x91,
0x92, 0x88, 0x3e, 0x52, 0xb4, 0x9f, 0xc3, 0xb4, 0xbb, 0x6d, 0xb0, 0xce, 0xa3, 0xe7, 0xf4, 0xf0,
0xba, 0x0d, 0x19, 0x1f, 0xb3, 0x49, 0x04, 0x9e, 0x7c, 0xf3, 0x72, 0x60, 0x1a, 0xdb, 0x3d, 0x0e,
0xec, 0x16, 0x7d, 0x61, 0x6f, 0x42, 0xc9, 0xa0, 0x47, 0xbc, 0xae, 0x8b, 0x06, 0x9d, 0xa2, 0xd3,
0xf4, 0x5c, 0x57, 0xe3, 0x77, 0x61, 0xea, 0x9c, 0x20, 0xee, 0xdc, 0x54, 0xb4, 0x05, 0x19, 0xa1,
0x91, 0x80, 0xb2, 0xa3, 0x64, 0x14, 0x98, 0x68, 0x5f, 0x41, 0x99, 0x33, 0x19, 0x1e, 0xf8, 0x33,
0x14, 0x53, 0xfc, 0xbd, 0x93, 0x38, 0xf4, 0xde, 0xd1, 0x7e, 0x55, 0xe1, 0x5a, 0x94, 0x9e, 0xe7,
0xf9, 0xa6, 0x7b, 0x2d, 0x2e, 0xae, 0xf5, 0x09, 0x71, 0xc5, 0x28, 0x59, 0x5a, 0x85, 0xfd, 0xa0,
0xc0, 0xc6, 0x54, 0x0a, 0x97, 0x44, 0x66, 0xff, 0x28, 0x50, 0xde, 0x63, 0x2e, 0x31, 0xac, 0x53,
0x55, 0xe4, 0x63, 0x55, 0xaa, 0x27, 0x2b, 0xb3, 0x13, 0x73, 0x86, 0x68, 0x56, 0x3a, 0x8e, 0xc4,
0x25, 0x35, 0x57, 0x5c, 0x5a, 0xb0, 0x16, 0xdb, 0xb2, 0x0c, 0x46, 0x78, 0xcf, 0x2b, 0xc7, 0xde,
0xf3, 0xcf, 0x54, 0xa8, 0x4e, 0x78, 0x39, 0xcd, 0xc5, 0x3b, 0x37, 0x7d, 0x51, 0x1e, 0x12, 0x53,
0x33, 0x44, 0x72, 0x56, 0x19, 0x9b, 0x9a, 0x93, 0xf2, 0x13, 0xcb, 0x7d, 0x17, 0xae, 0x1e, 0x49,
0xc8, 0x02, 0xe4, 0x7e, 0xaf, 0xc2, 0xc6, 0x84, 0xaf, 0x53, 0xdf, 0x3e, 0x67, 0xc2, 0x70, 0xfc,
0xda, 0x4c, 0x1e, 0x5b, 0x26, 0x9e, 0x1b, 0xd9, 0xf7, 0xa1, 0x36, 0x9d, 0xa0, 0x05, 0x18, 0xff,
0x49, 0x85, 0xff, 0xc5, 0x1d, 0x9e, 0xa6, 0x62, 0x3b, 0x13, 0xbe, 0x27, 0xcb, 0xb0, 0xe4, 0x02,
0x65, 0xd8, 0xb9, 0xf1, 0xff, 0x1e, 0x5c, 0x9b, 0x46, 0xd7, 0x02, 0xec, 0x7f, 0x0a, 0x85, 0x6d,
0x72, 0x60, 0xda, 0x8b, 0x71, 0x3d, 0xf1, 0xaf, 0x5c, 0x75, 0xf2, 0x5f, 0xb9, 0xda, 0x9b, 0x50,
0x94, 0xae, 0x25, 0xae, 0x48, 0x2a, 0x51, 0x66, 0xa7, 0x12, 0xed, 0xa9, 0x02, 0xc5, 0x96, 0x63,
0x59, 0x26, 0x3b, 0xf7, 0x94, 0x7f, 0x09, 0xd2, 0x06, 0x73, 0x2c, 0xb3, 0x23, 0xff, 0x17, 0x2d,
0x5b, 0xda, 0x2a, 0x94, 0x02, 0x04, 0x02, 0xbf, 0xf6, 0x39, 0xac, 0x60, 0xa7, 0xdf, 0x6f, 0x1b,
0x9d, 0xde, 0x79, 0xa3, 0xd2, 0x10, 0xac, 0x86, 0x6b, 0xc9, 0xf5, 0x1f, 0xc2, 0x15, 0x4c, 0xa8,
0xd3, 0x1f, 0x92, 0xc8, 0xe3, 0x60, 0x31, 0x24, 0x08, 0x92, 0x5d, 0x26, 0xff, 0x4b, 0x9e, 0xc3,
0xfc, 0x5b, 0x5b, 0x87, 0xea, 0x51, 0xee, 0xe5, 0xe2, 0x7f, 0xa9, 0x70, 0x61, 0x6f, 0xd0, 0x37,
0x99, 0x54, 0xd1, 0x22, 0xab, 0xce, 0x7a, 0x18, 0xce, 0x5d, 0x1f, 0x5f, 0x87, 0x02, 0xf5, 0x71,
0xc8, 0x12, 0x58, 0xa6, 0x9c, 0x3c, 0xef, 0x13, 0xc5, 0xaf, 0x5f, 0xc5, 0x05, 0x26, 0x9e, 0xcd,
0xf8, 0x51, 0x4c, 0x60, 0x90, 0x16, 0x9e, 0xcd, 0xd0, 0x2b, 0x70, 0xd9, 0xf6, 0x2c, 0xdd, 0x75,
0x9e, 0x50, 0x7d, 0x40, 0x5c, 0x9d, 0x7b, 0xd6, 0x07, 0x86, 0xcb, 0xf8, 0x21, 0x4c, 0xe0, 0x8b,
0xb6, 0x67, 0x61, 0xe7, 0x09, 0x7d, 0x40, 0x5c, 0xbe, 0xf8, 0x03, 0xc3, 0x65, 0xe8, 0x6d, 0xc8,
0x19, 0xfd, 0x03, 0xc7, 0x35, 0xd9, 0x23, 0x4b, 0xd6, 0xbc, 0x9a, 0x84, 0x79, 0x88, 0x99, 0xc6,
0x3b, 0x81, 0x25, 0x0e, 0x27, 0xa1, 0x97, 0x01, 0x79, 0x94, 0xe8, 0x02, 0x9c, 0x58, 0x74, 0xd8,
0x94, 0x05, 0xf0, 0x8a, 0x47, 0x49, 0xe8, 0xe6, 0xe3, 0xa6, 0xf6, 0x7b, 0x02, 0x50, 0xd4, 0xaf,
0x3c, 0x45, 0xaf, 0x43, 0x9a, 0xcf, 0xa7, 0x15, 0x85, 0x5f, 0x4b, 0x1b, 0x63, 0x0d, 0x1d, 0xb2,
0x6d, 0xf8, 0xb0, 0xb1, 0x34, 0xaf, 0x3e, 0x84, 0x42, 0x70, 0x57, 0xf0, 0xed, 0x44, 0xa3, 0xa1,
0xcc, 0xbc, 0xff, 0xd4, 0x39, 0xee, 0xbf, 0xea, 0x5b, 0x90, 0xe3, 0x79, 0xf7, 0x58, 0xdf, 0xe1,
0x6b, 0x41, 0x8d, 0xbe, 0x16, 0xaa, 0x7f, 0x28, 0x90, 0xe4, 0x93, 0xe7, 0x2e, 0x34, 0xde, 0x87,
0xd2, 0x18, 0xa5, 0x88, 0x9e, 0x38, 0x56, 0x37, 0x66, 0x50, 0x12, 0xa5, 0x00, 0x17, 0x7a, 0x51,
0x42, 0x5a, 0x00, 0xe2, 0xd7, 0x2d, 0xee, 0x4a, 0xe8, 0xf0, 0xff, 0x33, 0x5c, 0x8d, 0xb7, 0x8b,
0x73, 0x74, 0xbc, 0x73, 0x04, 0x49, 0x6a, 0x7e, 0x29, 0xde, 0x8a, 0x09, 0xcc, 0xbf, 0xb5, 0x3b,
0xb0, 0xf6, 0x2e, 0x61, 0x7b, 0xee, 0x30, 0xc8, 0x95, 0xc1, 0xf1, 0x99, 0x41, 0x93, 0x86, 0xe1,
0x52, 0x7c, 0x92, 0x54, 0xc0, 0x1b, 0x50, 0xa0, 0xee, 0x50, 0x9f, 0x98, 0xe9, 0xe7, 0x8d, 0x71,
0x78, 0xa2, 0x93, 0xf2, 0x34, 0x6c, 0x68, 0x3f, 0xaa, 0x70, 0xf1, 0xa3, 0x41, 0xd7, 0x60, 0x44,
0xa4, 0x90, 0xb3, 0x3f, 0xc6, 0x65, 0x48, 0x71, 0x2e, 0x64, 0x46, 0x15, 0x0d, 0x74, 0x0b, 0x72,
0xe3, 0x40, 0x71, 0x66, 0x8e, 0x56, 0x53, 0x36, 0x08, 0xc7, 0xa2, 0xc9, 0x74, 0x1d, 0x72, 0xcc,
0xb4, 0x08, 0x65, 0x86, 0x35, 0x90, 0x27, 0x39, 0xec, 0xf0, 0x75, 0x45, 0x86, 0xc4, 0x66, 0xf2,
0x57, 0xae, 0x40, 0x57, 0x3b, 0x7e, 0xdf, 0xbe, 0xd3, 0x23, 0x36, 0x16, 0xe3, 0x5a, 0x0f, 0xca,
0x93, 0x2c, 0x49, 0xe2, 0xeb, 0x81, 0x83, 0xc9, 0xbc, 0x2a, 0xd3, 0xb1, 0x3f, 0x22, 0x3d, 0xa0,
0x9b, 0xb0, 0xea, 0x27, 0x58, 0x8b, 0xe8, 0x21, 0x1e, 0xf1, 0x8b, 0xe4, 0x8a, 0xe8, 0xdf, 0x0f,
0xba, 0xb7, 0xab, 0x50, 0xe9, 0x38, 0x56, 0x63, 0xe4, 0x78, 0xcc, 0x6b, 0x93, 0xc6, 0xd0, 0x64,
0x84, 0x52, 0xf1, 0x13, 0x6d, 0x3b, 0xcd, 0xff, 0xdc, 0xf9, 0x2f, 0x00, 0x00, 0xff, 0xff, 0x92,
0xc6, 0x3b, 0xdf, 0xeb, 0x1d, 0x00, 0x00,
}

Просмотреть файл

@ -14,6 +14,8 @@ import (
"database/sql/driver"
"errors"
"fmt"
"github.com/youtube/vitess/go/vt/vtgate/vtgateconn"
)
var (
@ -29,6 +31,28 @@ var (
_ driver.StmtExecContext = &stmt{}
)
// These are synonyms of the constants defined in vtgateconn.
const (
// AtomicityMulti is the default level. It allows distributed transactions
// with best effort commits. Partial commits are possible.
AtomicityMulti = vtgateconn.AtomicityMulti
// AtomicitySingle prevents a transaction from crossing the boundary of
// a single database.
AtomicitySingle = vtgateconn.AtomicitySingle
// Atomicity2PC allows distributed transactions, and performs 2PC commits.
Atomicity2PC = vtgateconn.Atomicity2PC
)
// WithAtomicity returns a context with the atomicity level set.
func WithAtomicity(ctx context.Context, level vtgateconn.Atomicity) context.Context {
return vtgateconn.WithAtomicity(ctx, level)
}
// AtomicityFromContext returns the atomicity of the context.
func AtomicityFromContext(ctx context.Context) vtgateconn.Atomicity {
return vtgateconn.AtomicityFromContext(ctx)
}
func (c *conn) BeginContext(ctx context.Context) (driver.Tx, error) {
if c.Streaming {
return nil, errors.New("transaction not allowed for streaming connection")

Просмотреть файл

@ -131,7 +131,7 @@ func (f *fakeVTGateService) StreamExecuteKeyRanges(ctx context.Context, sql stri
}
// Begin is part of the VTGateService interface
func (f *fakeVTGateService) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (f *fakeVTGateService) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
return session1, nil
}

Просмотреть файл

@ -315,14 +315,15 @@ func (conn *FakeVTGateConn) StreamExecuteKeyspaceIds(ctx context.Context, query
}
// Begin please see vtgateconn.Impl.Begin
func (conn *FakeVTGateConn) Begin(ctx context.Context) (interface{}, error) {
func (conn *FakeVTGateConn) Begin(ctx context.Context, singledb bool) (interface{}, error) {
return &vtgatepb.Session{
InTransaction: true,
SingleDb: singledb,
}, nil
}
// Commit please see vtgateconn.Impl.Commit
func (conn *FakeVTGateConn) Commit(ctx context.Context, session interface{}) error {
func (conn *FakeVTGateConn) Commit(ctx context.Context, session interface{}, twopc bool) error {
if session == nil {
return errors.New("commit: not in transaction")
}

Просмотреть файл

@ -374,9 +374,10 @@ func (conn *vtgateConn) StreamExecuteKeyspaceIds(ctx context.Context, query stri
}, nil
}
func (conn *vtgateConn) Begin(ctx context.Context) (interface{}, error) {
func (conn *vtgateConn) Begin(ctx context.Context, singledb bool) (interface{}, error) {
request := &vtgatepb.BeginRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
SingleDb: singledb,
}
response, err := conn.c.Begin(ctx, request)
if err != nil {
@ -385,10 +386,11 @@ func (conn *vtgateConn) Begin(ctx context.Context) (interface{}, error) {
return response.Session, nil
}
func (conn *vtgateConn) Commit(ctx context.Context, session interface{}) error {
func (conn *vtgateConn) Commit(ctx context.Context, session interface{}, twopc bool) error {
request := &vtgatepb.CommitRequest{
CallerId: callerid.EffectiveCallerIDFromContext(ctx),
Session: session.(*vtgatepb.Session),
Atomic: twopc,
}
_, err := conn.c.Commit(ctx, request)
return vterrors.FromGRPCError(err)

Просмотреть файл

@ -328,7 +328,7 @@ func (vtg *VTGate) StreamExecuteKeyRanges(request *vtgatepb.StreamExecuteKeyRang
func (vtg *VTGate) Begin(ctx context.Context, request *vtgatepb.BeginRequest) (response *vtgatepb.BeginResponse, err error) {
defer vtg.server.HandlePanic(&err)
ctx = withCallerIDContext(ctx, request.CallerId)
session, vtgErr := vtg.server.Begin(ctx)
session, vtgErr := vtg.server.Begin(ctx, request.SingleDb)
if vtgErr == nil {
return &vtgatepb.BeginResponse{
Session: session,

Просмотреть файл

@ -5,10 +5,14 @@
package vtgate
import (
"fmt"
"sync"
"github.com/youtube/vitess/go/vt/vterrors"
topodatapb "github.com/youtube/vitess/go/vt/proto/topodata"
vtgatepb "github.com/youtube/vitess/go/vt/proto/vtgate"
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
// SafeSession is a mutex-protected version of the Session.
@ -16,7 +20,8 @@ import (
// (the use pattern is 'Find', if not found, then 'Append',
// for a single shard)
type SafeSession struct {
mu sync.Mutex
mu sync.Mutex
mustRollback bool
*vtgatepb.Session
}
@ -51,14 +56,44 @@ func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.T
}
// Append adds a new ShardSession
func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession) {
func (session *SafeSession) Append(shardSession *vtgatepb.Session_ShardSession) error {
session.mu.Lock()
defer session.mu.Unlock()
// Always append, in order for rollback to succeed.
session.ShardSessions = append(session.ShardSessions, shardSession)
if session.SingleDb && len(session.ShardSessions) > 1 {
session.mustRollback = true
return vterrors.FromError(vtrpcpb.ErrorCode_BAD_INPUT, fmt.Errorf("multi-db transaction attempted: %v", session.ShardSessions))
}
return nil
}
// SetRollback sets the flag indicating that the transaction must be rolled back.
// The call is a no-op if the session is not in a transaction.
func (session *SafeSession) SetRollback() {
if session == nil || session.Session == nil || !session.Session.InTransaction {
return
}
session.mu.Lock()
defer session.mu.Unlock()
session.mustRollback = true
}
// MustRollback returns true if the transaction must be rolled back.
func (session *SafeSession) MustRollback() bool {
if session == nil {
return false
}
session.mu.Lock()
defer session.mu.Unlock()
return session.mustRollback
}
// Reset clears the session
func (session *SafeSession) Reset() {
if session == nil || session.Session == nil {
return
}
session.mu.Lock()
defer session.mu.Unlock()
session.Session.InTransaction = false

Просмотреть файл

@ -74,7 +74,7 @@ func (stc *ScatterConn) startAction(name string, target *querypb.Target) (time.T
return startTime, statsKey
}
func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey []string, err *error) {
func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.AllErrorRecorder, statsKey []string, err *error, session *SafeSession) {
if *err != nil {
allErrors.RecordError(*err)
// Don't increment the error counter for duplicate
@ -84,6 +84,9 @@ func (stc *ScatterConn) endAction(startTime time.Time, allErrors *concurrency.Al
if ec != vtrpcpb.ErrorCode_INTEGRITY_ERROR && ec != vtrpcpb.ErrorCode_BAD_INPUT {
stc.tabletCallErrorCount.Add(statsKey, 1)
}
if ec == vtrpcpb.ErrorCode_RESOURCE_EXHAUSTED || ec == vtrpcpb.ErrorCode_NOT_IN_TX {
session.SetRollback()
}
}
stc.timings.Record(statsKey, startTime)
}
@ -105,7 +108,7 @@ func (stc *ScatterConn) Execute(
var mu sync.Mutex
qr := new(sqltypes.Result)
allErrors := stc.multiGoTransaction(
err := stc.multiGoTransaction(
ctx,
"Execute",
keyspace,
@ -134,13 +137,7 @@ func (stc *ScatterConn) Execute(
appendResult(qr, innerqr)
return transactionID, nil
})
if allErrors.HasErrors() {
err := allErrors.AggrError(stc.aggregateErrors)
stc.txConn.RollbackIfNeeded(ctx, err, session)
return nil, err
}
return qr, nil
return qr, err
}
// ExecuteMulti is like Execute,
@ -161,7 +158,7 @@ func (stc *ScatterConn) ExecuteMulti(
var mu sync.Mutex
qr := new(sqltypes.Result)
allErrors := stc.multiGoTransaction(
err := stc.multiGoTransaction(
ctx,
"Execute",
keyspace,
@ -190,13 +187,7 @@ func (stc *ScatterConn) ExecuteMulti(
appendResult(qr, innerqr)
return transactionID, nil
})
if allErrors.HasErrors() {
err := allErrors.AggrError(stc.aggregateErrors)
stc.txConn.RollbackIfNeeded(ctx, err, session)
return nil, err
}
return qr, nil
return qr, err
}
// ExecuteEntityIds executes queries that are shard specific.
@ -216,7 +207,7 @@ func (stc *ScatterConn) ExecuteEntityIds(
var mu sync.Mutex
qr := new(sqltypes.Result)
allErrors := stc.multiGoTransaction(
err := stc.multiGoTransaction(
ctx,
"ExecuteEntityIds",
keyspace,
@ -248,12 +239,7 @@ func (stc *ScatterConn) ExecuteEntityIds(
appendResult(qr, innerqr)
return transactionID, nil
})
if allErrors.HasErrors() {
err := allErrors.AggrError(stc.aggregateErrors)
stc.txConn.RollbackIfNeeded(ctx, err, session)
return nil, err
}
return qr, nil
return qr, err
}
// scatterBatchRequest needs to be built to perform a scatter batch query.
@ -299,17 +285,19 @@ func (stc *ScatterConn) ExecuteBatch(
var err error
startTime, statsKey := stc.startAction("ExecuteBatch", target)
defer stc.endAction(startTime, allErrors, statsKey, &err)
defer stc.endAction(startTime, allErrors, statsKey, &err, session)
shouldBegin, transactionID := transactionInfo(target, session, false)
var innerqrs []sqltypes.Result
if shouldBegin {
innerqrs, transactionID, err = stc.gateway.BeginExecuteBatch(ctx, target, req.Queries, asTransaction, options)
if transactionID != 0 {
session.Append(&vtgatepb.Session_ShardSession{
if appendErr := session.Append(&vtgatepb.Session_ShardSession{
Target: target,
TransactionId: transactionID,
})
}); appendErr != nil {
err = appendErr
}
}
if err != nil {
return
@ -329,12 +317,12 @@ func (stc *ScatterConn) ExecuteBatch(
}(req)
}
wg.Wait()
// If we want to rollback, we have to do it before closing results
// so that the session is updated to be not InTransaction.
if session.MustRollback() {
stc.txConn.Rollback(ctx, session)
}
if allErrors.HasErrors() {
err := allErrors.AggrError(stc.aggregateErrors)
stc.txConn.RollbackIfNeeded(ctx, err, session)
return nil, err
return nil, allErrors.AggrError(stc.aggregateErrors)
}
return results, nil
}
@ -644,7 +632,7 @@ func (stc *ScatterConn) multiGo(
TabletType: tabletType,
}
startTime, statsKey := stc.startAction(name, target)
defer stc.endAction(startTime, allErrors, statsKey, &err)
defer stc.endAction(startTime, allErrors, statsKey, &err, nil)
err = action(target)
}
@ -683,13 +671,13 @@ func (stc *ScatterConn) multiGoTransaction(
session *SafeSession,
notInTransaction bool,
action shardActionTransactionFunc,
) (allErrors *concurrency.AllErrorRecorder) {
allErrors = new(concurrency.AllErrorRecorder)
) error {
shardMap := unique(shards)
if len(shardMap) == 0 {
return allErrors
return nil
}
allErrors := new(concurrency.AllErrorRecorder)
oneShard := func(shard string) {
var err error
target := &querypb.Target{
@ -698,27 +686,29 @@ func (stc *ScatterConn) multiGoTransaction(
TabletType: tabletType,
}
startTime, statsKey := stc.startAction(name, target)
defer stc.endAction(startTime, allErrors, statsKey, &err)
defer stc.endAction(startTime, allErrors, statsKey, &err, session)
shouldBegin, transactionID := transactionInfo(target, session, notInTransaction)
transactionID, err = action(target, shouldBegin, transactionID)
if shouldBegin && transactionID != 0 {
session.Append(&vtgatepb.Session_ShardSession{
if appendErr := session.Append(&vtgatepb.Session_ShardSession{
Target: target,
TransactionId: transactionID,
})
}
}
if len(shardMap) == 1 {
// only one shard, do it synchronously.
for shard := range shardMap {
oneShard(shard)
return allErrors
}); appendErr != nil {
err = appendErr
}
}
}
var wg sync.WaitGroup
if len(shardMap) == 1 {
// only one shard, do it synchronously.
for shard := range shardMap {
oneShard(shard)
goto end
}
}
for shard := range shardMap {
wg.Add(1)
go func(shard string) {
@ -727,7 +717,15 @@ func (stc *ScatterConn) multiGoTransaction(
}(shard)
}
wg.Wait()
return allErrors
end:
if session.MustRollback() {
stc.txConn.Rollback(ctx, session)
}
if allErrors.HasErrors() {
return allErrors.AggrError(stc.aggregateErrors)
}
return nil
}
// transactionInfo looks at the current session, and returns:

Просмотреть файл

@ -7,6 +7,7 @@ package vtgate
import (
"fmt"
"reflect"
"strings"
"testing"
"github.com/youtube/vitess/go/sqltypes"
@ -401,6 +402,44 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
}
}
func TestScatterConnSingleDB(t *testing.T) {
createSandbox("TestScatterConnSingleDB")
hc := discovery.NewFakeHealthCheck()
hc.Reset()
sc := newTestScatterConn(hc, new(sandboxTopo), "aa")
hc.AddTestTablet("aa", "0", 1, "TestScatterConnSingleDB", "0", topodatapb.TabletType_MASTER, true, 1, nil)
hc.AddTestTablet("aa", "1", 1, "TestScatterConnSingleDB", "1", topodatapb.TabletType_MASTER, true, 1, nil)
session := NewSafeSession(&vtgatepb.Session{InTransaction: true, SingleDb: true})
_, err := sc.Execute(context.Background(), "query1", nil, "TestScatterConnSingleDB", []string{"0"}, topodatapb.TabletType_MASTER, session, false, nil)
if err != nil {
t.Error(err)
}
_, err = sc.Execute(context.Background(), "query1", nil, "TestScatterConnSingleDB", []string{"1"}, topodatapb.TabletType_MASTER, session, false, nil)
want := "multi-db transaction attempted"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("Multi DB exec: %v, must contain %s", err, want)
}
session = NewSafeSession(&vtgatepb.Session{InTransaction: true, SingleDb: true})
queries := []*vtgatepb.BoundShardQuery{{
Query: &querypb.BoundQuery{
Sql: "query",
BindVariables: nil,
},
Keyspace: "TestScatterConnSingleDB",
Shards: []string{"0", "1"},
}}
scatterRequest, err := boundShardQueriesToScatterBatchRequest(queries)
if err != nil {
t.Error(err)
}
_, err = sc.ExecuteBatch(context.Background(), scatterRequest, topodatapb.TabletType_MASTER, false, session, nil)
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("Multi DB exec: %v, must contain %s", err, want)
}
}
func TestAppendResult(t *testing.T) {
qr := new(sqltypes.Result)
innerqr1 := &sqltypes.Result{

Просмотреть файл

@ -113,7 +113,7 @@ func (txc *TxConn) commit2PC(ctx context.Context, session *SafeSession) error {
// Rollback rolls back the current transaction. There are no retries on this operation.
func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error {
if session == nil {
if !session.InTransaction() {
return nil
}
defer session.Reset()
@ -122,17 +122,6 @@ func (txc *TxConn) Rollback(ctx context.Context, session *SafeSession) error {
})
}
// RollbackIfNeeded rolls back the current transaction if the error implies that the
// transaction can never be completed.
func (txc *TxConn) RollbackIfNeeded(ctx context.Context, err error, session *SafeSession) {
if session.InTransaction() {
ec := vterrors.RecoverVtErrorCode(err)
if ec == vtrpcpb.ErrorCode_RESOURCE_EXHAUSTED || ec == vtrpcpb.ErrorCode_NOT_IN_TX {
txc.Rollback(ctx, session)
}
}
}
// Resolve resolves the specified 2PC transaction.
func (txc *TxConn) Resolve(ctx context.Context, dtid string) error {
mmShard, err := dtids.ShardSession(dtid)

Просмотреть файл

@ -7,6 +7,7 @@
package vtgate
import (
"errors"
"fmt"
"math"
"net/http"
@ -39,6 +40,13 @@ import (
vtrpcpb "github.com/youtube/vitess/go/vt/proto/vtrpc"
)
// Transaction modes. The value specifies what's allowed.
const (
TxSingle = iota
TxMulti
TxTwoPC
)
var (
rpcVTGate *VTGate
@ -61,6 +69,8 @@ var (
// VTGate is the rpc interface to vtgate. Only one instance
// can be created. It implements vtgateservice.VTGateService
type VTGate struct {
transactionMode int
// router and resolver are top-level objects
// that make routing decisions.
router *Router
@ -105,7 +115,7 @@ var RegisterVTGates []RegisterVTGate
var vtgateOnce sync.Once
// Init initializes VTGate server.
func Init(ctx context.Context, hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, cell string, retryCount int, tabletTypesToWait []topodatapb.TabletType) *VTGate {
func Init(ctx context.Context, hc discovery.HealthCheck, topoServer topo.Server, serv topo.SrvTopoServer, cell string, retryCount int, tabletTypesToWait []topodatapb.TabletType, transactionMode int) *VTGate {
if rpcVTGate != nil {
log.Fatalf("VTGate already initialized")
}
@ -123,13 +133,14 @@ func Init(ctx context.Context, hc discovery.HealthCheck, topoServer topo.Server,
sc := NewScatterConn("VttabletCall", tc, gw)
rpcVTGate = &VTGate{
router: NewRouter(ctx, serv, cell, "VTGateRouter", sc),
resolver: NewResolver(serv, cell, sc),
scatterConn: sc,
txConn: tc,
gateway: gw,
timings: stats.NewMultiTimings("VtgateApi", []string{"Operation", "Keyspace", "DbType"}),
rowsReturned: stats.NewMultiCounters("VtgateApiRowsReturned", []string{"Operation", "Keyspace", "DbType"}),
transactionMode: transactionMode,
router: NewRouter(ctx, serv, cell, "VTGateRouter", sc),
resolver: NewResolver(serv, cell, sc),
scatterConn: sc,
txConn: tc,
gateway: gw,
timings: stats.NewMultiTimings("VtgateApi", []string{"Operation", "Keyspace", "DbType"}),
rowsReturned: stats.NewMultiCounters("VtgateApiRowsReturned", []string{"Operation", "Keyspace", "DbType"}),
logExecute: logutil.NewThrottledLogger("Execute", 5*time.Second),
logExecuteShards: logutil.NewThrottledLogger("ExecuteShards", 5*time.Second),
@ -567,14 +578,23 @@ func (vtg *VTGate) StreamExecuteShards(ctx context.Context, sql string, bindVari
}
// Begin begins a transaction. It has to be concluded by a Commit or Rollback.
func (vtg *VTGate) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (vtg *VTGate) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
if !singledb && vtg.transactionMode == TxSingle {
return nil, vterrors.FromError(vtrpcpb.ErrorCode_BAD_INPUT, errors.New("multi-db transaction disallowed"))
}
return &vtgatepb.Session{
InTransaction: true,
SingleDb: singledb,
}, nil
}
// Commit commits a transaction.
func (vtg *VTGate) Commit(ctx context.Context, twopc bool, session *vtgatepb.Session) error {
if twopc && vtg.transactionMode != TxTwoPC {
// Rollback the transaction to prevent future deadlocks.
vtg.txConn.Rollback(ctx, NewSafeSession(session))
return vterrors.FromError(vtrpcpb.ErrorCode_BAD_INPUT, errors.New("2pc transaction disallowed"))
}
return formatError(vtg.txConn.Commit(ctx, twopc, NewSafeSession(session)))
}

Просмотреть файл

@ -48,7 +48,148 @@ func init() {
}
`
hcVTGateTest = discovery.NewFakeHealthCheck()
Init(context.Background(), hcVTGateTest, topo.Server{}, new(sandboxTopo), "aa", 10, nil)
Init(context.Background(), hcVTGateTest, topo.Server{}, new(sandboxTopo), "aa", 10, nil, TxMulti)
}
func TestVTGateBegin(t *testing.T) {
save := rpcVTGate.transactionMode
defer func() {
rpcVTGate.transactionMode = save
}()
rpcVTGate.transactionMode = TxSingle
got, err := rpcVTGate.Begin(context.Background(), true)
if err != nil {
t.Error(err)
}
wantSession := &vtgatepb.Session{
InTransaction: true,
SingleDb: true,
}
if !reflect.DeepEqual(got, wantSession) {
t.Errorf("Begin(single): %v, want %v", got, wantSession)
}
_, err = rpcVTGate.Begin(context.Background(), false)
wantErr := "multi-db transaction disallowed"
if err == nil || err.Error() != wantErr {
t.Errorf("Begin(multi): %v, want %s", err, wantErr)
}
rpcVTGate.transactionMode = TxMulti
got, err = rpcVTGate.Begin(context.Background(), true)
if err != nil {
t.Error(err)
}
wantSession = &vtgatepb.Session{
InTransaction: true,
SingleDb: true,
}
if !reflect.DeepEqual(got, wantSession) {
t.Errorf("Begin(single): %v, want %v", got, wantSession)
}
got, err = rpcVTGate.Begin(context.Background(), false)
if err != nil {
t.Error(err)
}
wantSession = &vtgatepb.Session{
InTransaction: true,
}
if !reflect.DeepEqual(got, wantSession) {
t.Errorf("Begin(single): %v, want %v", got, wantSession)
}
rpcVTGate.transactionMode = TxTwoPC
got, err = rpcVTGate.Begin(context.Background(), true)
if err != nil {
t.Error(err)
}
wantSession = &vtgatepb.Session{
InTransaction: true,
SingleDb: true,
}
if !reflect.DeepEqual(got, wantSession) {
t.Errorf("Begin(single): %v, want %v", got, wantSession)
}
got, err = rpcVTGate.Begin(context.Background(), false)
if err != nil {
t.Error(err)
}
wantSession = &vtgatepb.Session{
InTransaction: true,
}
if !reflect.DeepEqual(got, wantSession) {
t.Errorf("Begin(single): %v, want %v", got, wantSession)
}
}
func TestVTGateCommit(t *testing.T) {
save := rpcVTGate.transactionMode
defer func() {
rpcVTGate.transactionMode = save
}()
session := &vtgatepb.Session{
InTransaction: true,
}
rpcVTGate.transactionMode = TxSingle
err := rpcVTGate.Commit(context.Background(), true, session)
wantErr := "2pc transaction disallowed"
if err == nil || err.Error() != wantErr {
t.Errorf("Begin(multi): %v, want %s", err, wantErr)
}
session = &vtgatepb.Session{
InTransaction: true,
}
err = rpcVTGate.Commit(context.Background(), false, session)
if err != nil {
t.Error(err)
}
rpcVTGate.transactionMode = TxMulti
session = &vtgatepb.Session{
InTransaction: true,
}
err = rpcVTGate.Commit(context.Background(), true, session)
if err == nil || err.Error() != wantErr {
t.Errorf("Begin(multi): %v, want %s", err, wantErr)
}
session = &vtgatepb.Session{
InTransaction: true,
}
err = rpcVTGate.Commit(context.Background(), false, session)
if err != nil {
t.Error(err)
}
rpcVTGate.transactionMode = TxTwoPC
session = &vtgatepb.Session{
InTransaction: true,
}
err = rpcVTGate.Commit(context.Background(), true, session)
if err != nil {
t.Error(err)
}
session = &vtgatepb.Session{
InTransaction: true,
}
err = rpcVTGate.Commit(context.Background(), false, session)
if err != nil {
t.Error(err)
}
}
func TestVTGateRollbackNil(t *testing.T) {
err := rpcVTGate.Rollback(context.Background(), nil)
if err != nil {
t.Error(err)
}
}
func TestVTGateExecute(t *testing.T) {
@ -73,7 +214,7 @@ func TestVTGateExecute(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc.Options[0], executeOptions)
}
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if !session.InTransaction {
t.Errorf("want true, got false")
}
@ -105,7 +246,7 @@ func TestVTGateExecute(t *testing.T) {
t.Errorf("want 1, got %d", commitCount)
}
session, err = rpcVTGate.Begin(context.Background())
session, err = rpcVTGate.Begin(context.Background(), false)
rpcVTGate.Execute(context.Background(),
"select id from t1",
nil,
@ -177,7 +318,7 @@ func TestVTGateExecuteShards(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc.Options[0], executeOptions)
}
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if !session.InTransaction {
t.Errorf("want true, got false")
}
@ -210,7 +351,7 @@ func TestVTGateExecuteShards(t *testing.T) {
t.Errorf("want 1, got %d", commitCount)
}
session, err = rpcVTGate.Begin(context.Background())
session, err = rpcVTGate.Begin(context.Background(), false)
rpcVTGate.ExecuteShards(context.Background(),
"query",
nil,
@ -257,7 +398,7 @@ func TestVTGateExecuteKeyspaceIds(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc1.Options[0], executeOptions)
}
// Test for successful execution in transaction
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if !session.InTransaction {
t.Errorf("want true, got false")
}
@ -337,7 +478,7 @@ func TestVTGateExecuteKeyRanges(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc1.Options[0], executeOptions)
}
// Test for successful execution in transaction
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if !session.InTransaction {
t.Errorf("want true, got false")
}
@ -426,7 +567,7 @@ func TestVTGateExecuteEntityIds(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc1.Options[0], executeOptions)
}
// Test for successful execution in transaction
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if !session.InTransaction {
t.Errorf("want true, got false")
}
@ -535,7 +676,7 @@ func TestVTGateExecuteBatchShards(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc1.Options[0], executeOptions)
}
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
rpcVTGate.ExecuteBatchShards(context.Background(),
[]*vtgatepb.BoundShardQuery{{
Query: &querypb.BoundQuery{
@ -604,7 +745,7 @@ func TestVTGateExecuteBatchKeyspaceIds(t *testing.T) {
t.Errorf("got ExecuteOptions \n%+v, want \n%+v", sbc1.Options[0], executeOptions)
}
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
rpcVTGate.ExecuteBatchKeyspaceIds(context.Background(),
[]*vtgatepb.BoundKeyspaceIdQuery{{
Query: &querypb.BoundQuery{
@ -1847,7 +1988,7 @@ func TestErrorIssuesRollback(t *testing.T) {
// Start a transaction, send one statement.
// Simulate an error that should trigger a rollback:
// vtrpcpb.ErrorCode_NOT_IN_TX case.
session, err := rpcVTGate.Begin(context.Background())
session, err := rpcVTGate.Begin(context.Background(), false)
if err != nil {
t.Fatalf("cannot start a transaction: %v", err)
}
@ -1886,7 +2027,7 @@ func TestErrorIssuesRollback(t *testing.T) {
// Start a transaction, send one statement.
// Simulate an error that should trigger a rollback:
// vtrpcpb.ErrorCode_RESOURCE_EXHAUSTED case.
session, err = rpcVTGate.Begin(context.Background())
session, err = rpcVTGate.Begin(context.Background(), false)
if err != nil {
t.Fatalf("cannot start a transaction: %v", err)
}
@ -1925,7 +2066,7 @@ func TestErrorIssuesRollback(t *testing.T) {
// Start a transaction, send one statement.
// Simulate an error that should *not* trigger a rollback:
// vtrpcpb.ErrorCode_INTEGRITY_ERROR case.
session, err = rpcVTGate.Begin(context.Background())
session, err = rpcVTGate.Begin(context.Background(), false)
if err != nil {
t.Fatalf("cannot start a transaction: %v", err)
}

Просмотреть файл

@ -23,6 +23,31 @@ var (
VtgateProtocol = flag.String("vtgate_protocol", "grpc", "how to talk to vtgate")
)
// Atomicity specifies atomicity level of a transaction.
type Atomicity int
const (
// AtomicityMulti is the default level. It allows distributed transactions
// with best effort commits. Partial commits are possible.
AtomicityMulti = Atomicity(iota)
// AtomicitySingle prevents a transaction from crossing the boundary of
// a single database.
AtomicitySingle
// Atomicity2PC allows distributed transactions, and performs 2PC commits.
Atomicity2PC
)
// WithAtomicity returns a context with the atomicity level set.
func WithAtomicity(ctx context.Context, level Atomicity) context.Context {
return context.WithValue(ctx, Atomicity(0), level)
}
// AtomicityFromContext returns the atomicity of the context.
func AtomicityFromContext(ctx context.Context) Atomicity {
v, _ := ctx.Value(Atomicity(0)).(Atomicity)
return v
}
// VTGateConn is the client API object to talk to vtgate.
// It is constructed using the Dial method.
// It can be used concurrently across goroutines.
@ -118,14 +143,16 @@ func (conn *VTGateConn) ResolveTransaction(ctx context.Context, dtid string) err
// Begin starts a transaction and returns a VTGateTX.
func (conn *VTGateConn) Begin(ctx context.Context) (*VTGateTx, error) {
session, err := conn.impl.Begin(ctx)
atomicity := AtomicityFromContext(ctx)
session, err := conn.impl.Begin(ctx, atomicity == AtomicitySingle)
if err != nil {
return nil, err
}
return &VTGateTx{
conn: conn,
session: session,
conn: conn,
session: session,
atomicity: atomicity,
}, nil
}
@ -176,8 +203,9 @@ func (conn *VTGateConn) UpdateStream(ctx context.Context, shard string, keyRange
// VTGateTx defines an ongoing transaction.
// It should not be concurrently used across goroutines.
type VTGateTx struct {
conn *VTGateConn
session interface{}
conn *VTGateConn
session interface{}
atomicity Atomicity
}
// Execute executes a query on vtgate within the current transaction.
@ -255,7 +283,7 @@ func (tx *VTGateTx) Commit(ctx context.Context) error {
if tx.session == nil {
return fmt.Errorf("commit: not in transaction")
}
err := tx.conn.impl.Commit(ctx, tx.session)
err := tx.conn.impl.Commit(ctx, tx.session, tx.atomicity == Atomicity2PC)
tx.session = nil
return err
}
@ -311,9 +339,9 @@ type Impl interface {
StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds [][]byte, bindVars map[string]interface{}, tabletType topodatapb.TabletType, options *querypb.ExecuteOptions) (sqltypes.ResultStream, error)
// Begin starts a transaction and returns a VTGateTX.
Begin(ctx context.Context) (interface{}, error)
Begin(ctx context.Context, singledb bool) (interface{}, error)
// Commit commits the current transaction.
Commit(ctx context.Context, session interface{}) error
Commit(ctx context.Context, session interface{}, twopc bool) error
// Rollback rolls back the current transaction.
Rollback(ctx context.Context, session interface{}) error
// ResolveTransaction resolves the specified 2pc transaction.

Просмотреть файл

@ -36,3 +36,14 @@ func TestGetDialerWithProtocol(t *testing.T) {
t.Errorf("not setting keyspace properly.")
}
}
func TestAtomicity(t *testing.T) {
ctx := context.Background()
if v := AtomicityFromContext(ctx); v != AtomicityMulti {
t.Errorf("Atomicity: %v, want %d", v, AtomicityMulti)
}
ctx = WithAtomicity(ctx, Atomicity2PC)
if v := AtomicityFromContext(ctx); v != Atomicity2PC {
t.Errorf("Atomicity: %v, want %d", v, Atomicity2PC)
}
}

Просмотреть файл

@ -585,7 +585,7 @@ func (f *fakeVTGateService) StreamExecuteKeyRanges(ctx context.Context, sql stri
}
// Begin is part of the VTGateService interface
func (f *fakeVTGateService) Begin(ctx context.Context) (*vtgatepb.Session, error) {
func (f *fakeVTGateService) Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error) {
f.checkCallerID(ctx, "Begin")
switch {
case f.forceBeginSuccess:

Просмотреть файл

@ -38,7 +38,7 @@ type VTGateService interface {
// Transaction management
Begin(ctx context.Context) (*vtgatepb.Session, error)
Begin(ctx context.Context, singledb bool) (*vtgatepb.Session, error)
Commit(ctx context.Context, twopc bool, session *vtgatepb.Session) error
Rollback(ctx context.Context, session *vtgatepb.Session) error
ResolveTransaction(ctx context.Context, dtid string) error

Просмотреть файл

@ -21,6 +21,10 @@ message Session {
int64 transaction_id = 2;
}
repeated ShardSession shard_sessions = 2;
// single_db specifies if the transaction should be restricted
// to a single database.
bool single_db = 3;
}
// ExecuteRequest is the payload to Execute.
@ -482,6 +486,10 @@ message BeginRequest {
// caller_id identifies the caller. This is the effective caller ID,
// set by the application to further identify the caller.
vtrpc.CallerID caller_id = 1;
// single_db specifies if the transaction should be restricted
// to a single database.
bool single_db = 2;
}
// BeginResponse is the returned value from Begin.

Просмотреть файл

@ -84,10 +84,12 @@ class BaseListCursor(BasePEP0249Cursor):
"""
arraysize = 1
def __init__(self):
def __init__(self, single_db=False, twopc=False):
super(BaseListCursor, self).__init__()
self._clear_list_state()
self.effective_caller_id = None
self.single_db = single_db
self.twopc = twopc
def _clear_list_state(self):
self._clear_common_state()
@ -101,10 +103,12 @@ class BaseListCursor(BasePEP0249Cursor):
self.effective_caller_id = effective_caller_id
def begin(self):
return self.connection.begin(self.effective_caller_id)
return self.connection.begin(
effective_caller_id=self.effective_caller_id,
single_db=self.single_db)
def commit(self):
return self.connection.commit()
return self.connection.commit(self.twopc)
def rollback(self):
return self.connection.rollback()
@ -116,7 +120,6 @@ class BaseListCursor(BasePEP0249Cursor):
def _handle_transaction_sql(self, sql):
sql_check = sql.strip().lower()
if sql_check == 'begin':
self.set_effective_caller_id(self.effective_caller_id)
self.begin()
return True
elif sql_check == 'commit':

Просмотреть файл

@ -7,8 +7,8 @@ from vtdb import base_cursor
class Cursor(base_cursor.BaseListCursor):
def __init__(self, connection, tablet_type):
super(Cursor, self).__init__()
def __init__(self, connection, tablet_type, single_db=False, twopc=False):
super(Cursor, self).__init__(single_db=single_db, twopc=twopc)
self._conn = connection
self.tablet_type = tablet_type

Просмотреть файл

@ -90,17 +90,17 @@ class GRPCVTGateConnection(vtgate_client.VTGateClient,
cursorclass = kwargs.pop('cursorclass', None) or vtgate_cursor.VTGateCursor
return cursorclass(self, *pargs, **kwargs)
def begin(self, effective_caller_id=None):
def begin(self, effective_caller_id=None, single_db=False):
try:
request = self.begin_request(effective_caller_id)
request = self.begin_request(effective_caller_id, single_db)
response = self.stub.Begin(request, self.timeout)
self.update_session(response)
except (grpc.RpcError, vtgate_utils.VitessError) as e:
raise _convert_exception(e, 'Begin')
def commit(self):
def commit(self, twopc=False):
try:
request = self.commit_request()
request = self.commit_request(twopc)
self.stub.Commit(request, self.timeout)
except (grpc.RpcError, vtgate_utils.VitessError) as e:
raise _convert_exception(e, 'Commit')

Просмотреть файл

@ -290,7 +290,7 @@ class Proto3Connection(object):
lastrowid = query_result.insert_id
return results, rowcount, lastrowid, fields
def begin_request(self, effective_caller_id):
def begin_request(self, effective_caller_id, single_db):
"""Builds a vtgate_pb2.BeginRequest object.
Also remembers the effective caller id for next call to
@ -298,25 +298,31 @@ class Proto3Connection(object):
Args:
effective_caller_id: optional vtgate_client.CallerID.
single_db: True if single db transaction is needed.
Returns:
A vtgate_pb2.BeginRequest object.
"""
request = vtgate_pb2.BeginRequest()
request.single_db = single_db
self._add_caller_id(request, effective_caller_id)
self._effective_caller_id = effective_caller_id
return request
def commit_request(self):
def commit_request(self, twopc):
"""Builds a vtgate_pb2.CommitRequest object.
Uses the effective_caller_id saved from begin_request().
It will also clear the saved effective_caller_id.
Args:
twopc: perform 2-phase commit.
Returns:
A vtgate_pb2.CommitRequest object.
"""
request = vtgate_pb2.CommitRequest()
request.atomic = twopc
self._add_caller_id(request, self._effective_caller_id)
self._add_session(request)
self._effective_caller_id = None

Просмотреть файл

@ -124,7 +124,7 @@ class VTGateClient(object):
cursorclass = kwargs.pop('cursorclass', None) or vtgate_cursor.VTGateCursor
return cursorclass(self, *pargs, **kwargs)
def begin(self, effective_caller_id=None):
def begin(self, effective_caller_id=None, single_db=False):
"""Starts a transaction.
FIXME(alainjobart): instead of storing the Session as member variable,
@ -132,6 +132,7 @@ class VTGateClient(object):
Args:
effective_caller_id: CallerID Object.
single_db: True if single db transaction is needed.
Raises:
dbexceptions.TimeoutError: for connection timeout.
@ -146,11 +147,14 @@ class VTGateClient(object):
"""
raise NotImplementedError('Child class needs to implement this')
def commit(self):
def commit(self, twopc=False):
"""Commits the current transaction.
FIXME(alainjobart): should take the session in.
Args:
twopc: perform 2-phase commit.
Raises:
dbexceptions.TimeoutError: for connection timeout.
dbexceptions.TransientError: the server is overloaded, and this query

Просмотреть файл

@ -37,7 +37,8 @@ class VTGateCursor(base_cursor.BaseListCursor, VTGateCursorMixin):
def __init__(
self, connection, tablet_type, keyspace=None,
shards=None, keyspace_ids=None, keyranges=None,
writable=False, as_transaction=False):
writable=False, as_transaction=False, single_db=False,
twopc=False):
"""Init VTGateCursor.
Args:
@ -49,8 +50,10 @@ class VTGateCursor(base_cursor.BaseListCursor, VTGateCursorMixin):
keyranges: Str keyranges.
writable: True if writable.
as_transaction: True if an executemany call is its own transaction.
single_db: True if single db transaction is needed.
twopc: True if 2-phase commit is needed.
"""
super(VTGateCursor, self).__init__()
super(VTGateCursor, self).__init__(single_db=single_db, twopc=twopc)
self._conn = connection
self._writable = writable
self.description = None

Различия файлов скрыты, потому что одна или несколько строк слишком длинны

Просмотреть файл

@ -21,7 +21,9 @@ class TestEnv(object):
self.tablet_map = {}
def launch(
self, keyspace, shards=None, replica_count=1, rdonly_count=0, ddls=None):
self, keyspace, shards=None,
replica_count=1, rdonly_count=0, ddls=None,
twopc_coordinator_address=None):
"""Launch test environment."""
if replica_count < 1:
@ -57,11 +59,14 @@ class TestEnv(object):
# Start tablets.
for shard in shards:
self._start_tablet(keyspace, shard, 'master', None)
self._start_tablet(
keyspace, shard, 'master', None, twopc_coordinator_address)
for i in xrange(replica_count):
self._start_tablet(keyspace, shard, 'replica', i)
self._start_tablet(
keyspace, shard, 'replica', i, twopc_coordinator_address)
for i in xrange(rdonly_count):
self._start_tablet(keyspace, shard, 'rdonly', i)
self._start_tablet(
keyspace, shard, 'rdonly', i, twopc_coordinator_address)
for t in self.tablets:
t.wait_for_vttablet_state('NOT_SERVING')
@ -110,7 +115,8 @@ class TestEnv(object):
t = self.tablet_map[key]
t.init_tablet(init_tablet_type, keyspace, shard, tablet_index=tablet_index)
def _start_tablet(self, keyspace, shard, tablet_type, index):
def _start_tablet(
self, keyspace, shard, tablet_type, index, twopc_coordinator_address):
"""Start a tablet."""
init_tablet_type = tablet_type
if tablet_type == 'master':
@ -120,7 +126,14 @@ class TestEnv(object):
key = '%s.%s.%s.%s' % (keyspace, shard, tablet_type, index)
t = self.tablet_map[key]
t.create_db('vt_' + keyspace)
extra_args = ['-queryserver-config-schema-reload-time', '1']
if twopc_coordinator_address:
extra_args.extend([
'-twopc_enable',
'-twopc_coordinator_address', twopc_coordinator_address,
'-twopc_abandon_age', '3600',
])
return t.start_vttablet(
wait_for_state=None, init_tablet_type=init_tablet_type,
init_keyspace=keyspace, init_shard=shard,
extra_args=['-queryserver-config-schema-reload-time', '1'])
extra_args=extra_args)

Просмотреть файл

@ -71,6 +71,12 @@ info varchar(128),
primary key (name)
) Engine=InnoDB'''
create_twopc_user = '''create table twopc_user (
user_id bigint,
val varchar(128),
primary key (user_id)
) Engine=InnoDB'''
create_vt_user_seq = '''create table vt_user_seq (
id int,
next_id bigint,
@ -116,6 +122,12 @@ val varchar(128),
primary key(id)
) Engine=InnoDB'''
create_twopc_lookup = '''create table twopc_lookup (
id bigint,
val varchar(128),
primary key (id)
) Engine=InnoDB'''
vschema = {
'user': '''{
"sharded": true,
@ -229,6 +241,14 @@ vschema = {
"name": "unicode_hash"
}
]
},
"twopc_user": {
"column_vindexes": [
{
"column": "user_id",
"name": "user_index"
}
]
}
}
}''',
@ -251,7 +271,8 @@ vschema = {
"column": "id",
"sequence": "vt_main_seq"
}
}
},
"twopc_lookup": {}
}
}''',
}
@ -280,8 +301,10 @@ def setUpModule():
create_join_user,
create_join_user_extra,
create_join_name_info,
create_twopc_user,
],
rdonly_count=1, # to test SplitQuery
twopc_coordinator_address='localhost:15028', # enables 2pc
)
keyspace_env.launch(
'lookup',
@ -292,7 +315,9 @@ def setUpModule():
create_music_user_map,
create_name_user2_map,
create_main,
create_twopc_lookup,
],
twopc_coordinator_address='localhost:15028', # enables 2pc
)
shard_0_master = keyspace_env.tablet_map['user.-80.master']
shard_1_master = keyspace_env.tablet_map['user.80-.master']
@ -300,7 +325,8 @@ def setUpModule():
utils.apply_vschema(vschema)
utils.VtGate().start(
tablets=[shard_0_master, shard_1_master, lookup_master])
tablets=[shard_0_master, shard_1_master, lookup_master],
extra_args=['-transaction_mode', 'twopc'])
utils.vtgate.wait_for_endpoints('user.-80.master', 1)
utils.vtgate.wait_for_endpoints('user.80-.master', 1)
utils.vtgate.wait_for_endpoints('lookup.0.master', 1)
@ -1020,6 +1046,42 @@ class TestVTGateFunctions(unittest.TestCase):
finally:
vtgate_conn.rollback()
def test_transaction_modes(self):
vtgate_conn = get_connection()
cursor = vtgate_conn.cursor(
tablet_type='master', keyspace=None, writable=True, single_db=True)
cursor.begin()
cursor.execute(
'insert into twopc_user (user_id, val) values(1, \'val\')', {})
with self.assertRaisesRegexp(
dbexceptions.DatabaseError, '.*multi-db transaction attempted.*'):
cursor.execute(
'insert into twopc_lookup (id, val) values(1, \'val\')', {})
cursor = vtgate_conn.cursor(
tablet_type='master', keyspace=None, writable=True, twopc=True)
cursor.begin()
cursor.execute(
'insert into twopc_user (user_id, val) values(1, \'val\')', {})
cursor.execute(
'insert into twopc_lookup (id, val) values(1, \'val\')', {})
cursor.commit()
cursor.execute('select user_id, val from twopc_user where user_id = 1', {})
self.assertEqual(cursor.fetchall(), [(1, 'val')])
cursor.execute('select id, val from twopc_lookup where id = 1', {})
self.assertEqual(cursor.fetchall(), [(1, 'val')])
cursor.begin()
cursor.execute('delete from twopc_user where user_id = 1', {})
cursor.execute('delete from twopc_lookup where id = 1', {})
cursor.commit()
cursor.execute('select user_id, val from twopc_user where user_id = 1', {})
self.assertEqual(cursor.fetchall(), [])
cursor.execute('select id, val from twopc_lookup where id = 1', {})
self.assertEqual(cursor.fetchall(), [])
def test_vtclient(self):
"""This test uses vtclient to send and receive various queries.
"""