replica transactions: initial implementation

Signed-off-by: deepthi <deepthi@planetscale.com>
This commit is contained in:
deepthi 2020-05-22 15:42:07 -07:00
Родитель f56e82b95e
Коммит 99faad4a5d
35 изменённых файлов: 415 добавлений и 268 удалений

Просмотреть файл

@ -43,7 +43,7 @@ func TestVtgateProcess(t *testing.T) {
defer conn.Close()
exec(t, conn, "insert into customer(id, email) values(1,'email1')")
_ = exec(t, conn, "begin")
qr := exec(t, conn, "select id, email from customer")
if got, want := fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("email1")]]`; got != want {
t.Errorf("select:\n%v want\n%v", got, want)

Просмотреть файл

@ -36,7 +36,7 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
)
func TestVtgateProcess(t *testing.T) {
func TestVtgateHealthCheck(t *testing.T) {
defer cluster.PanicHandler(t)
// Healthcheck interval on tablet is set to 1s, so sleep for 2s
time.Sleep(2 * time.Second)
@ -46,12 +46,8 @@ func TestVtgateProcess(t *testing.T) {
require.Nil(t, err)
defer conn.Close()
exec(t, conn, "insert into customer(id, email) values(1,'email1')")
qr := exec(t, conn, "select id, email from customer")
assert.Equal(t, fmt.Sprintf("%v", qr.Rows), `[[INT64(1) VARCHAR("email1")]]`, "select returned wrong result")
qr = exec(t, conn, "show vitess_tablets")
assert.Equal(t, len(qr.Rows), 3, "wrong number of results from show")
qr := exec(t, conn, "show vitess_tablets")
assert.Equal(t, 3, len(qr.Rows), "wrong number of results from show")
}
func verifyVtgateVariables(t *testing.T, url string) {
@ -78,6 +74,46 @@ func verifyVtgateVariables(t *testing.T, url string) {
assert.True(t, isMasterTabletPresent(healthCheckConnection), "Atleast one master tablet needs to be present")
}
func TestReplicaTransactions(t *testing.T) {
// TODO(deepthi): this test seems to depend on previous test. Fix tearDown so that tests are independent
defer cluster.PanicHandler(t)
// Healthcheck interval on tablet is set to 1s, so sleep for 2s
time.Sleep(2 * time.Second)
ctx := context.Background()
masterConn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
replicaConn, err := mysql.Connect(ctx, &vtParams)
require.Nil(t, err)
defer masterConn.Close()
defer replicaConn.Close()
// insert a row using master
exec(t, masterConn, "insert into customer(id, email) values(1,'email1')")
// select using a replica
_ = exec(t, replicaConn, "use @replica")
qr := exec(t, replicaConn, "select id, email from customer")
// no data expected
assert.Equal(t, `[[INT64(1) VARCHAR("email1")]]`, fmt.Sprintf("%v", qr.Rows), "select returned wrong result")
// begin transaction on replica
_ = exec(t, replicaConn, "begin")
// insert more data on master
exec(t, masterConn, "insert into customer(id, email) values(2,'email2')")
// replica doesn't see new row because it hasn't been committed
qr = exec(t, replicaConn, "select id, email from customer")
assert.Equal(t, `[[INT64(1) VARCHAR("email1")]]`, fmt.Sprintf("%v", qr.Rows), "select returned wrong result")
// commit on master
_ = exec(t, masterConn, "commit")
// replica still doesn't see new row because it is in a transaction
qr = exec(t, replicaConn, "select id, email from customer")
assert.Equal(t, `[[INT64(1) VARCHAR("email1")]]`, fmt.Sprintf("%v", qr.Rows), "select returned wrong result")
// close transaction on replica
//_ = exec(t, replicaConn, "rollback")
// replica should now see new row
//qr = exec(t, replicaConn, "select id, email from customer")
//assert.Equal(t, `[[INT64(2) VARCHAR("email2")],[INT64(3) VARCHAR("email3")]]`, fmt.Sprintf("%v", qr.Rows), "select returned wrong result")
}
func getMapFromJSON(JSON map[string]interface{}, key string) map[string]interface{} {
result := make(map[string]interface{})
object := reflect.ValueOf(JSON[key])

Просмотреть файл

@ -45,6 +45,10 @@ import (
"sync"
"time"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/vt/topo"
@ -604,6 +608,19 @@ func (hc *HealthCheck) waitForTablets(ctx context.Context, targets []*query.Targ
}
}
// GetConnection returns the TabletConn of the given tablet.
func (hc *HealthCheck) GetConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error) {
hc.mu.Lock()
th := hc.healthByAlias[topoproto.TabletAliasString(alias)]
hc.mu.Unlock()
if th == nil {
return nil, vterrors.New(vtrpc.Code_NOT_FOUND, fmt.Sprintf("No TabletHealth available for alias: %v", alias))
}
th.mu.Lock()
defer th.mu.Unlock()
return th.Conn, nil
}
// Target includes cell which we ignore here
// because tabletStatsCache is intended to be per-cell
func (hc *HealthCheck) keyFromTarget(target *query.Target) keyspaceShardTabletType {

Просмотреть файл

@ -282,11 +282,12 @@ func (m *Session) GetRowCount() int64 {
}
type Session_ShardSession struct {
Target *query.Target `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
TransactionId int64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
Target *query.Target `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
TransactionId int64 `protobuf:"varint,2,opt,name=transaction_id,json=transactionId,proto3" json:"transaction_id,omitempty"`
TabletAlias *topodata.TabletAlias `protobuf:"bytes,3,opt,name=tablet_alias,json=tabletAlias,proto3" json:"tablet_alias,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Session_ShardSession) Reset() { *m = Session_ShardSession{} }
@ -328,6 +329,13 @@ func (m *Session_ShardSession) GetTransactionId() int64 {
return 0
}
func (m *Session_ShardSession) GetTabletAlias() *topodata.TabletAlias {
if m != nil {
return m.TabletAlias
}
return nil
}
// ExecuteRequest is the payload to Execute.
type ExecuteRequest struct {
// caller_id identifies the caller. This is the effective caller ID,
@ -973,75 +981,76 @@ func init() {
func init() { proto.RegisterFile("vtgate.proto", fileDescriptor_aab96496ceaf1ebb) }
var fileDescriptor_aab96496ceaf1ebb = []byte{
// 1117 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdb, 0x6e, 0x1b, 0x37,
0x13, 0xce, 0xea, 0xac, 0xd1, 0x69, 0x7f, 0x46, 0xc9, 0xbf, 0x51, 0xd3, 0x42, 0x50, 0x12, 0x44,
0x71, 0x0b, 0xa9, 0x50, 0xd1, 0xa2, 0x28, 0x5a, 0x14, 0xb6, 0xac, 0x04, 0x2a, 0x6c, 0xcb, 0xa5,
0x64, 0x1b, 0x28, 0x52, 0x2c, 0xd6, 0x5a, 0x5a, 0x26, 0x22, 0x2f, 0x37, 0x24, 0x25, 0x57, 0x4f,
0xd1, 0xab, 0xde, 0xf4, 0x05, 0xfa, 0x2e, 0xbd, 0xeb, 0x1b, 0x15, 0x24, 0x57, 0xd2, 0x5a, 0x75,
0x1b, 0xc7, 0x81, 0x6f, 0x16, 0xe4, 0xcc, 0x70, 0x38, 0xf3, 0x7d, 0x33, 0xc3, 0x85, 0xe2, 0x5c,
0x4e, 0x3c, 0x49, 0x5a, 0x21, 0x67, 0x92, 0xa1, 0x8c, 0xd9, 0xd5, 0xec, 0x53, 0x1a, 0x4c, 0xd9,
0xc4, 0xf7, 0xa4, 0x67, 0x34, 0xb5, 0xc2, 0xdb, 0x19, 0xe1, 0x8b, 0x68, 0x53, 0x96, 0x2c, 0x64,
0x71, 0xe5, 0x5c, 0xf2, 0x70, 0x6c, 0x36, 0x8d, 0xdf, 0x72, 0x90, 0x1d, 0x12, 0x21, 0x28, 0x0b,
0xd0, 0x33, 0x28, 0xd3, 0xc0, 0x95, 0xdc, 0x0b, 0x84, 0x37, 0x96, 0x94, 0x05, 0x8e, 0x55, 0xb7,
0x9a, 0x39, 0x5c, 0xa2, 0xc1, 0x68, 0x2d, 0x44, 0x5d, 0x28, 0x8b, 0x73, 0x8f, 0xfb, 0xae, 0x30,
0xe7, 0x84, 0x93, 0xa8, 0x27, 0x9b, 0x85, 0xce, 0xe3, 0x56, 0x14, 0x5d, 0xe4, 0xaf, 0x35, 0x54,
0x56, 0xd1, 0x06, 0x97, 0x44, 0x6c, 0x27, 0xd0, 0x47, 0x90, 0x17, 0x34, 0x98, 0x4c, 0x89, 0xeb,
0x9f, 0x3a, 0x49, 0x7d, 0x4d, 0xce, 0x08, 0x76, 0x4f, 0xd1, 0x27, 0x00, 0xde, 0x4c, 0xb2, 0x31,
0xbb, 0xb8, 0xa0, 0xd2, 0x49, 0x69, 0x6d, 0x4c, 0x82, 0x9e, 0x40, 0x49, 0x7a, 0x7c, 0x42, 0xa4,
0x2b, 0x24, 0xa7, 0xc1, 0xc4, 0x49, 0xd7, 0xad, 0x66, 0x1e, 0x17, 0x8d, 0x70, 0xa8, 0x65, 0xa8,
0x0d, 0x59, 0x16, 0x4a, 0x1d, 0x5f, 0xa6, 0x6e, 0x35, 0x0b, 0x9d, 0x07, 0x2d, 0x83, 0x4a, 0xef,
0x17, 0x32, 0x9e, 0x49, 0x32, 0x30, 0x4a, 0xbc, 0xb4, 0x42, 0x3b, 0x60, 0xc7, 0x72, 0x77, 0x2f,
0x98, 0x4f, 0x9c, 0x6c, 0xdd, 0x6a, 0x96, 0x3b, 0xff, 0x5f, 0x66, 0x16, 0x83, 0x61, 0x9f, 0xf9,
0x04, 0x57, 0xe4, 0x55, 0x01, 0x6a, 0x43, 0xee, 0xd2, 0xe3, 0x01, 0x0d, 0x26, 0xc2, 0xc9, 0x69,
0x54, 0xee, 0x47, 0xb7, 0xfe, 0xa8, 0xbe, 0x27, 0x46, 0x87, 0x57, 0x46, 0xe8, 0x7b, 0x28, 0x86,
0x9c, 0xac, 0xa1, 0xcc, 0xdf, 0x00, 0xca, 0x42, 0xc8, 0xc9, 0x0a, 0xc8, 0x6d, 0x28, 0x85, 0x4c,
0xc8, 0xb5, 0x07, 0xb8, 0x81, 0x87, 0xa2, 0x3a, 0xb2, 0x72, 0xf1, 0x14, 0xca, 0x53, 0x4f, 0x48,
0x97, 0x06, 0x82, 0x70, 0xe9, 0x52, 0xdf, 0x29, 0xd4, 0xad, 0x66, 0x0a, 0x17, 0x95, 0xb4, 0xaf,
0x85, 0x7d, 0x1f, 0x7d, 0x0c, 0x70, 0xc6, 0x66, 0x81, 0xef, 0x72, 0x76, 0x29, 0x9c, 0xa2, 0xb6,
0xc8, 0x6b, 0x09, 0x66, 0x97, 0x02, 0xb9, 0xf0, 0x70, 0x26, 0x08, 0x77, 0x7d, 0x72, 0x46, 0x03,
0xe2, 0xbb, 0x73, 0x8f, 0x53, 0xef, 0x74, 0x4a, 0x84, 0x53, 0xd2, 0x01, 0xbd, 0xd8, 0x0c, 0xe8,
0x48, 0x10, 0xbe, 0x6b, 0x8c, 0x8f, 0x97, 0xb6, 0xbd, 0x40, 0xf2, 0x05, 0xae, 0xce, 0xae, 0x51,
0xa1, 0x01, 0xd8, 0x62, 0x21, 0x24, 0xb9, 0x88, 0xb9, 0x2e, 0x6b, 0xd7, 0x4f, 0xff, 0x91, 0xab,
0xb6, 0xdb, 0xf0, 0x5a, 0x11, 0x57, 0xa5, 0xaa, 0x04, 0x39, 0xbb, 0x74, 0xc7, 0x6c, 0x16, 0x48,
0xa7, 0x52, 0xb7, 0x9a, 0x49, 0x9c, 0xe3, 0xec, 0xb2, 0xab, 0xf6, 0xb5, 0xd7, 0x50, 0x8c, 0x23,
0x86, 0x9e, 0x41, 0xc6, 0x54, 0x97, 0xee, 0x89, 0x42, 0xa7, 0x14, 0xd1, 0x3a, 0xd2, 0x42, 0x1c,
0x29, 0x55, 0x0b, 0xc5, 0x6b, 0x88, 0xfa, 0x4e, 0x42, 0x3b, 0x2e, 0xc5, 0xa4, 0x7d, 0xbf, 0xf6,
0x1a, 0x1e, 0xfd, 0x6b, 0xfa, 0xc8, 0x86, 0xe4, 0x1b, 0xb2, 0xd0, 0xf7, 0xe4, 0xb1, 0x5a, 0xa2,
0x17, 0x90, 0x9e, 0x7b, 0xd3, 0x19, 0xd1, 0xce, 0xd6, 0x25, 0xb5, 0x43, 0x83, 0xd5, 0x59, 0x6c,
0x2c, 0xbe, 0x49, 0x7c, 0x6d, 0xd5, 0x76, 0xa0, 0x7a, 0x1d, 0x02, 0xd7, 0x38, 0xae, 0xc6, 0x1d,
0xe7, 0x63, 0x3e, 0x1a, 0x7f, 0x24, 0xa0, 0x1c, 0x35, 0x0a, 0x26, 0x6f, 0x67, 0x44, 0x48, 0xf4,
0x19, 0xe4, 0xc7, 0xde, 0x74, 0x4a, 0xb8, 0x4a, 0xcb, 0xa0, 0x50, 0x69, 0x99, 0x59, 0xd2, 0xd5,
0xf2, 0xfe, 0x2e, 0xce, 0x19, 0x8b, 0xbe, 0x8f, 0x5e, 0x40, 0x36, 0x2a, 0xc9, 0x28, 0xea, 0xca,
0x06, 0x4b, 0x78, 0xa9, 0x47, 0xcf, 0x21, 0xad, 0x13, 0xd2, 0x73, 0xa0, 0xd0, 0xf9, 0xdf, 0x32,
0x3d, 0x55, 0x5b, 0xba, 0x6d, 0xb0, 0xd1, 0xa3, 0x2f, 0xa1, 0x20, 0x55, 0x3e, 0xd2, 0x95, 0x8b,
0x90, 0xe8, 0xc1, 0x50, 0xee, 0x54, 0x5b, 0xab, 0xf9, 0x36, 0xd2, 0xca, 0xd1, 0x22, 0x24, 0x18,
0xe4, 0x6a, 0xad, 0x48, 0x79, 0x43, 0x16, 0x22, 0xf4, 0xc6, 0xc4, 0xd5, 0x53, 0x48, 0x0f, 0x84,
0x3c, 0x2e, 0x2d, 0xa5, 0x9a, 0xe9, 0xf8, 0xc0, 0xc8, 0xde, 0x64, 0x60, 0xfc, 0x90, 0xca, 0xa5,
0xed, 0x4c, 0xe3, 0x57, 0x0b, 0x2a, 0x2b, 0xa4, 0x44, 0xc8, 0x02, 0xa1, 0x6e, 0x4c, 0x13, 0xce,
0x19, 0xdf, 0x80, 0x09, 0x1f, 0x76, 0x7b, 0x4a, 0x8c, 0x8d, 0xf6, 0x7d, 0x30, 0xda, 0x82, 0x0c,
0x27, 0x62, 0x36, 0x95, 0x11, 0x48, 0x28, 0x3e, 0x56, 0xb0, 0xd6, 0xe0, 0xc8, 0xa2, 0xf1, 0x57,
0x02, 0xee, 0x47, 0x11, 0xed, 0x78, 0x72, 0x7c, 0x7e, 0xe7, 0x04, 0x7e, 0x0a, 0x59, 0x15, 0x0d,
0x25, 0xc2, 0x49, 0xea, 0x8e, 0xbc, 0x86, 0xc2, 0xa5, 0xc5, 0x07, 0x90, 0xe8, 0x89, 0x2b, 0x8f,
0x53, 0xda, 0x3c, 0x4e, 0x9e, 0x88, 0x3f, 0x4e, 0x77, 0xc4, 0x75, 0xe3, 0x77, 0x0b, 0xaa, 0x57,
0x31, 0xbd, 0x33, 0xaa, 0x3f, 0x87, 0xac, 0x21, 0x72, 0x89, 0xe6, 0xc3, 0x28, 0x36, 0x43, 0xf3,
0x09, 0x95, 0xe7, 0xc6, 0xf5, 0xd2, 0x4c, 0x35, 0x6b, 0x75, 0x28, 0x39, 0xf1, 0x2e, 0x3e, 0xa8,
0x65, 0x57, 0x7d, 0x98, 0x78, 0xbf, 0x3e, 0x4c, 0xde, 0xba, 0x0f, 0x53, 0xef, 0xe0, 0x26, 0x7d,
0xa3, 0x87, 0x3b, 0x86, 0x6d, 0xe6, 0xbf, 0xb1, 0x6d, 0x74, 0xe1, 0xc1, 0x06, 0x50, 0x11, 0x8d,
0xeb, 0xfe, 0xb2, 0xde, 0xd9, 0x5f, 0x3f, 0xc3, 0x23, 0x4c, 0x04, 0x9b, 0xce, 0x49, 0xac, 0xf2,
0x6e, 0x07, 0x39, 0x82, 0x94, 0x2f, 0xa3, 0x57, 0x22, 0x8f, 0xf5, 0xba, 0xf1, 0x18, 0x6a, 0xd7,
0xb9, 0x37, 0x81, 0x36, 0xfe, 0xb4, 0xa0, 0x7c, 0x6c, 0x72, 0xb8, 0xdd, 0x95, 0x1b, 0xe4, 0x25,
0x6e, 0x48, 0xde, 0x73, 0x48, 0xcf, 0x27, 0x2a, 0xd4, 0xe5, 0x90, 0x8e, 0xfd, 0x74, 0x1e, 0xbf,
0x92, 0xd4, 0xc7, 0x46, 0xaf, 0x90, 0x3c, 0xa3, 0x53, 0x49, 0xb8, 0x66, 0x57, 0x21, 0x19, 0xb3,
0x7c, 0xa9, 0x35, 0x38, 0xb2, 0x68, 0x7c, 0x07, 0x95, 0x55, 0x2e, 0x6b, 0x22, 0xc8, 0x9c, 0x04,
0x52, 0x38, 0x96, 0x2e, 0xfe, 0x2b, 0xc7, 0x8f, 0x7b, 0x4a, 0x85, 0x23, 0x8b, 0xad, 0x5d, 0xa8,
0x6c, 0xfc, 0x91, 0xa1, 0x0a, 0x14, 0x8e, 0x0e, 0x86, 0x87, 0xbd, 0x6e, 0xff, 0x65, 0xbf, 0xb7,
0x6b, 0xdf, 0x43, 0x00, 0x99, 0x61, 0xff, 0xe0, 0xd5, 0x5e, 0xcf, 0xb6, 0x50, 0x1e, 0xd2, 0xfb,
0x47, 0x7b, 0xa3, 0xbe, 0x9d, 0x50, 0xcb, 0xd1, 0xc9, 0xe0, 0xb0, 0x6b, 0x27, 0xb7, 0xbe, 0x85,
0x42, 0x57, 0xff, 0x57, 0x0e, 0xb8, 0x4f, 0xb8, 0x3a, 0x70, 0x30, 0xc0, 0xfb, 0xdb, 0x7b, 0xf6,
0x3d, 0x94, 0x85, 0xe4, 0x21, 0x56, 0x27, 0x73, 0x90, 0x3a, 0x1c, 0x0c, 0x47, 0x76, 0x02, 0x95,
0x01, 0xb6, 0x8f, 0x46, 0x83, 0xee, 0x60, 0x7f, 0xbf, 0x3f, 0xb2, 0x93, 0x3b, 0x5f, 0x41, 0x85,
0xb2, 0xd6, 0x9c, 0x4a, 0x22, 0x84, 0xf9, 0xa7, 0xfe, 0xe9, 0x49, 0xb4, 0xa3, 0xac, 0x6d, 0x56,
0xed, 0x09, 0x6b, 0xcf, 0x65, 0x5b, 0x6b, 0xdb, 0xa6, 0x34, 0x4f, 0x33, 0x7a, 0xf7, 0xc5, 0xdf,
0x01, 0x00, 0x00, 0xff, 0xff, 0x8d, 0xfe, 0x6c, 0x91, 0xd3, 0x0b, 0x00, 0x00,
// 1133 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x56, 0xdd, 0x6e, 0x1b, 0x45,
0x14, 0xee, 0xfa, 0xdf, 0xc7, 0x7f, 0xcb, 0xd4, 0x2d, 0x5b, 0x53, 0x90, 0xe5, 0xb6, 0xaa, 0x13,
0x90, 0x8d, 0x82, 0x40, 0x15, 0x02, 0xa1, 0xc4, 0x71, 0x2b, 0xa3, 0x24, 0x0e, 0x63, 0x27, 0x91,
0x10, 0x68, 0xb5, 0xf1, 0x4e, 0x9c, 0x51, 0x9d, 0x9d, 0xed, 0xcc, 0xd8, 0xc1, 0x4f, 0xc1, 0x1d,
0x17, 0xbc, 0x00, 0xef, 0xc2, 0x15, 0xbc, 0x11, 0x9a, 0x99, 0xb5, 0xbd, 0x31, 0x81, 0xa6, 0xa9,
0x72, 0xb3, 0x9a, 0x73, 0xbe, 0x33, 0x67, 0xcf, 0x7c, 0xe7, 0x67, 0x06, 0x8a, 0x33, 0x39, 0xf6,
0x24, 0x69, 0x85, 0x9c, 0x49, 0x86, 0x32, 0x46, 0xaa, 0xd9, 0xa7, 0x34, 0x98, 0xb0, 0xb1, 0xef,
0x49, 0xcf, 0x20, 0xb5, 0xc2, 0x9b, 0x29, 0xe1, 0xf3, 0x48, 0x28, 0x4b, 0x16, 0xb2, 0x38, 0x38,
0x93, 0x3c, 0x1c, 0x19, 0xa1, 0xf1, 0x57, 0x0e, 0xb2, 0x03, 0x22, 0x04, 0x65, 0x01, 0x7a, 0x06,
0x65, 0x1a, 0xb8, 0x92, 0x7b, 0x81, 0xf0, 0x46, 0x92, 0xb2, 0xc0, 0xb1, 0xea, 0x56, 0x33, 0x87,
0x4b, 0x34, 0x18, 0xae, 0x94, 0xa8, 0x03, 0x65, 0x71, 0xee, 0x71, 0xdf, 0x15, 0x66, 0x9f, 0x70,
0x12, 0xf5, 0x64, 0xb3, 0xb0, 0xf5, 0xb8, 0x15, 0x45, 0x17, 0xf9, 0x6b, 0x0d, 0x94, 0x55, 0x24,
0xe0, 0x92, 0x88, 0x49, 0x02, 0x7d, 0x04, 0x79, 0x41, 0x83, 0xf1, 0x84, 0xb8, 0xfe, 0xa9, 0x93,
0xd4, 0xbf, 0xc9, 0x19, 0xc5, 0xee, 0x29, 0xfa, 0x04, 0xc0, 0x9b, 0x4a, 0x36, 0x62, 0x17, 0x17,
0x54, 0x3a, 0x29, 0x8d, 0xc6, 0x34, 0xe8, 0x09, 0x94, 0xa4, 0xc7, 0xc7, 0x44, 0xba, 0x42, 0x72,
0x1a, 0x8c, 0x9d, 0x74, 0xdd, 0x6a, 0xe6, 0x71, 0xd1, 0x28, 0x07, 0x5a, 0x87, 0xda, 0x90, 0x65,
0xa1, 0xd4, 0xf1, 0x65, 0xea, 0x56, 0xb3, 0xb0, 0xf5, 0xa0, 0x65, 0x58, 0xe9, 0xfe, 0x42, 0x46,
0x53, 0x49, 0xfa, 0x06, 0xc4, 0x0b, 0x2b, 0xb4, 0x03, 0x76, 0xec, 0xec, 0xee, 0x05, 0xf3, 0x89,
0x93, 0xad, 0x5b, 0xcd, 0xf2, 0xd6, 0x87, 0x8b, 0x93, 0xc5, 0x68, 0xd8, 0x67, 0x3e, 0xc1, 0x15,
0x79, 0x55, 0x81, 0xda, 0x90, 0xbb, 0xf4, 0x78, 0x40, 0x83, 0xb1, 0x70, 0x72, 0x9a, 0x95, 0xfb,
0xd1, 0x5f, 0x7f, 0x50, 0xdf, 0x13, 0x83, 0xe1, 0xa5, 0x11, 0xfa, 0x0e, 0x8a, 0x21, 0x27, 0x2b,
0x2a, 0xf3, 0x37, 0xa0, 0xb2, 0x10, 0x72, 0xb2, 0x24, 0x72, 0x1b, 0x4a, 0x21, 0x13, 0x72, 0xe5,
0x01, 0x6e, 0xe0, 0xa1, 0xa8, 0xb6, 0x2c, 0x5d, 0x3c, 0x85, 0xf2, 0xc4, 0x13, 0xd2, 0xa5, 0x81,
0x20, 0x5c, 0xba, 0xd4, 0x77, 0x0a, 0x75, 0xab, 0x99, 0xc2, 0x45, 0xa5, 0xed, 0x69, 0x65, 0xcf,
0x47, 0x1f, 0x03, 0x9c, 0xb1, 0x69, 0xe0, 0xbb, 0x9c, 0x5d, 0x0a, 0xa7, 0xa8, 0x2d, 0xf2, 0x5a,
0x83, 0xd9, 0xa5, 0x40, 0x2e, 0x3c, 0x9c, 0x0a, 0xc2, 0x5d, 0x9f, 0x9c, 0xd1, 0x80, 0xf8, 0xee,
0xcc, 0xe3, 0xd4, 0x3b, 0x9d, 0x10, 0xe1, 0x94, 0x74, 0x40, 0x1b, 0xeb, 0x01, 0x1d, 0x09, 0xc2,
0x77, 0x8d, 0xf1, 0xf1, 0xc2, 0xb6, 0x1b, 0x48, 0x3e, 0xc7, 0xd5, 0xe9, 0x35, 0x10, 0xea, 0x83,
0x2d, 0xe6, 0x42, 0x92, 0x8b, 0x98, 0xeb, 0xb2, 0x76, 0xfd, 0xf4, 0x5f, 0x67, 0xd5, 0x76, 0x6b,
0x5e, 0x2b, 0xe2, 0xaa, 0x56, 0x95, 0x20, 0x67, 0x97, 0xee, 0x88, 0x4d, 0x03, 0xe9, 0x54, 0xea,
0x56, 0x33, 0x89, 0x73, 0x9c, 0x5d, 0x76, 0x94, 0x5c, 0xfb, 0xcd, 0x82, 0x62, 0x9c, 0x32, 0xf4,
0x0c, 0x32, 0xa6, 0xbc, 0x74, 0x53, 0x14, 0xb6, 0x4a, 0x51, 0x5e, 0x87, 0x5a, 0x89, 0x23, 0x50,
0xf5, 0x50, 0xbc, 0x88, 0xa8, 0xef, 0x24, 0xb4, 0xe7, 0x52, 0x4c, 0xdb, 0xf3, 0xd1, 0x0b, 0x28,
0x4a, 0x15, 0x85, 0x74, 0xbd, 0x09, 0xf5, 0x84, 0xee, 0x00, 0x55, 0xa1, 0xcb, 0x56, 0x1d, 0x6a,
0x74, 0x5b, 0x81, 0xb8, 0x20, 0x57, 0x42, 0xed, 0x27, 0x78, 0xf4, 0x9f, 0xcc, 0x21, 0x1b, 0x92,
0xaf, 0xc9, 0x5c, 0x47, 0x98, 0xc7, 0x6a, 0x89, 0x36, 0x20, 0x3d, 0xf3, 0x26, 0x53, 0xa2, 0xc3,
0x58, 0x55, 0xe3, 0x0e, 0x0d, 0x96, 0x7b, 0xb1, 0xb1, 0xf8, 0x3a, 0xf1, 0xc2, 0xaa, 0xed, 0x40,
0xf5, 0x3a, 0xf2, 0xae, 0x71, 0x5c, 0x8d, 0x3b, 0xce, 0xc7, 0x7c, 0x34, 0xfe, 0x48, 0x40, 0x39,
0xea, 0x31, 0x4c, 0xde, 0x4c, 0x89, 0x90, 0xe8, 0x33, 0xc8, 0x8f, 0xbc, 0xc9, 0x84, 0x70, 0x45,
0x88, 0xe1, 0xaf, 0xd2, 0x32, 0x63, 0xa8, 0xa3, 0xf5, 0xbd, 0x5d, 0x9c, 0x33, 0x16, 0x3d, 0x1f,
0x6d, 0x40, 0x36, 0xaa, 0xe6, 0x28, 0xea, 0xca, 0x5a, 0x82, 0xf1, 0x02, 0x47, 0xcf, 0x21, 0xad,
0x0f, 0x14, 0x11, 0xf8, 0xc1, 0xe2, 0x78, 0xaa, 0x2c, 0x75, 0xc7, 0x61, 0x83, 0xa3, 0x2f, 0x21,
0x62, 0xd1, 0x95, 0xf3, 0x90, 0xe8, 0x99, 0x52, 0xde, 0xaa, 0xae, 0xf3, 0x3d, 0x9c, 0x87, 0x04,
0x83, 0x5c, 0xae, 0x55, 0x3a, 0x5f, 0x93, 0xb9, 0x08, 0xbd, 0x11, 0x71, 0xf5, 0x00, 0xd3, 0xb3,
0x24, 0x8f, 0x4b, 0x0b, 0xad, 0xae, 0x91, 0xf8, 0xac, 0xc9, 0xde, 0x64, 0xd6, 0x7c, 0x9f, 0xca,
0xa5, 0xed, 0x4c, 0xe3, 0x57, 0x0b, 0x2a, 0x4b, 0xa6, 0x44, 0xc8, 0x02, 0xa1, 0xfe, 0x98, 0x26,
0x9c, 0x33, 0xbe, 0x46, 0x13, 0x3e, 0xec, 0x74, 0x95, 0x1a, 0x1b, 0xf4, 0x5d, 0x38, 0xda, 0x84,
0x0c, 0x27, 0x62, 0x3a, 0x91, 0x11, 0x49, 0x28, 0x3e, 0x91, 0xb0, 0x46, 0x70, 0x64, 0xd1, 0xf8,
0x3b, 0x01, 0xf7, 0xa3, 0x88, 0x76, 0x3c, 0x39, 0x3a, 0xbf, 0xf3, 0x04, 0x7e, 0x0a, 0x59, 0x15,
0x0d, 0x25, 0xaa, 0x07, 0x92, 0xd7, 0xa7, 0x70, 0x61, 0xf1, 0x1e, 0x49, 0xf4, 0xc4, 0x95, 0x7b,
0x2d, 0x6d, 0xee, 0x35, 0x4f, 0xc4, 0xef, 0xb5, 0x3b, 0xca, 0x75, 0xe3, 0x77, 0x0b, 0xaa, 0x57,
0x39, 0xbd, 0xb3, 0x54, 0x7f, 0x0e, 0x59, 0x93, 0xc8, 0x05, 0x9b, 0x0f, 0xa3, 0xd8, 0x4c, 0x9a,
0x4f, 0xa8, 0x3c, 0x37, 0xae, 0x17, 0x66, 0xaa, 0x59, 0xab, 0x03, 0xc9, 0x89, 0x77, 0xf1, 0x5e,
0x2d, 0xbb, 0xec, 0xc3, 0xc4, 0xbb, 0xf5, 0x61, 0xf2, 0xd6, 0x7d, 0x98, 0x7a, 0x4b, 0x6e, 0xd2,
0x37, 0xba, 0xf3, 0x63, 0xdc, 0x66, 0xfe, 0x9f, 0xdb, 0x46, 0x07, 0x1e, 0xac, 0x11, 0x15, 0xa5,
0x71, 0xd5, 0x5f, 0xd6, 0x5b, 0xfb, 0xeb, 0x67, 0x78, 0x84, 0x89, 0x60, 0x93, 0x19, 0x89, 0x55,
0xde, 0xed, 0x28, 0x47, 0x90, 0xf2, 0x65, 0x74, 0xbf, 0xe4, 0xb1, 0x5e, 0x37, 0x1e, 0x43, 0xed,
0x3a, 0xf7, 0x26, 0xd0, 0xc6, 0x9f, 0x16, 0x94, 0x8f, 0xcd, 0x19, 0x6e, 0xf7, 0xcb, 0xb5, 0xe4,
0x25, 0x6e, 0x98, 0xbc, 0xe7, 0x90, 0x9e, 0x8d, 0x55, 0xa8, 0x8b, 0x21, 0x1d, 0x7b, 0xaf, 0x1e,
0xbf, 0x92, 0xd4, 0xc7, 0x06, 0x57, 0x4c, 0x9e, 0xd1, 0x89, 0x24, 0x5c, 0x67, 0x57, 0x31, 0x19,
0xb3, 0x7c, 0xa9, 0x11, 0x1c, 0x59, 0x34, 0xbe, 0x85, 0xca, 0xf2, 0x2c, 0xab, 0x44, 0x90, 0x19,
0x09, 0xa4, 0x70, 0x2c, 0x5d, 0xfc, 0x57, 0xb6, 0x1f, 0x77, 0x15, 0x84, 0x23, 0x8b, 0xcd, 0x5d,
0xa8, 0xac, 0x3d, 0xe6, 0x50, 0x05, 0x0a, 0x47, 0x07, 0x83, 0xc3, 0x6e, 0xa7, 0xf7, 0xb2, 0xd7,
0xdd, 0xb5, 0xef, 0x21, 0x80, 0xcc, 0xa0, 0x77, 0xf0, 0x6a, 0xaf, 0x6b, 0x5b, 0x28, 0x0f, 0xe9,
0xfd, 0xa3, 0xbd, 0x61, 0xcf, 0x4e, 0xa8, 0xe5, 0xf0, 0xa4, 0x7f, 0xd8, 0xb1, 0x93, 0x9b, 0xdf,
0x40, 0xa1, 0xa3, 0x9f, 0xa4, 0x7d, 0xee, 0x13, 0xae, 0x36, 0x1c, 0xf4, 0xf1, 0xfe, 0xf6, 0x9e,
0x7d, 0x0f, 0x65, 0x21, 0x79, 0x88, 0xd5, 0xce, 0x1c, 0xa4, 0x0e, 0xfb, 0x83, 0xa1, 0x9d, 0x40,
0x65, 0x80, 0xed, 0xa3, 0x61, 0xbf, 0xd3, 0xdf, 0xdf, 0xef, 0x0d, 0xed, 0xe4, 0xce, 0x57, 0x50,
0xa1, 0xac, 0x35, 0xa3, 0x92, 0x08, 0x61, 0x9e, 0xe3, 0x3f, 0x3e, 0x89, 0x24, 0xca, 0xda, 0x66,
0xd5, 0x1e, 0xb3, 0xf6, 0x4c, 0xb6, 0x35, 0xda, 0x36, 0xa5, 0x79, 0x9a, 0xd1, 0xd2, 0x17, 0xff,
0x04, 0x00, 0x00, 0xff, 0xff, 0x07, 0xaa, 0x95, 0x79, 0x0e, 0x0c, 0x00, 0x00,
}

Просмотреть файл

@ -31,6 +31,16 @@ import (
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
// A Gateway is the query processing module for each shard,
// which is used by ScatterConn.
type Gateway interface {
// the query service that this Gateway wraps around
queryservice.QueryService
// QueryServiceByAlias returns a QueryService
QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error)
}
// A Resolver can resolve keyspace ids and key ranges into ResolvedShard*
// objects. It uses an underlying srvtopo.Server to find the topology,
// and a TargetStats object to find the healthy destinations.
@ -38,8 +48,8 @@ type Resolver struct {
// topoServ is the srvtopo.Server to use for topo queries.
topoServ Server
// queryService is the actual query service that will be used to execute queries.
queryService queryservice.QueryService
// gateway
gateway Gateway
// localCell is the local cell for the queries.
localCell string
@ -50,11 +60,11 @@ type Resolver struct {
}
// NewResolver creates a new Resolver.
func NewResolver(topoServ Server, queryService queryservice.QueryService, localCell string) *Resolver {
func NewResolver(topoServ Server, gateway Gateway, localCell string) *Resolver {
return &Resolver{
topoServ: topoServ,
queryService: queryService,
localCell: localCell,
topoServ: topoServ,
gateway: gateway,
localCell: localCell,
}
}
@ -64,7 +74,7 @@ type ResolvedShard struct {
Target *querypb.Target
// QueryService is the actual way to execute the query.
QueryService queryservice.QueryService
Gateway Gateway
}
// ResolvedShardEqual is an equality check on *ResolvedShard.
@ -135,8 +145,8 @@ func (r *Resolver) GetAllShards(ctx context.Context, keyspace string, tabletType
// We would then need to read the SrvKeyspace there too.
target.Cell = ""
res[i] = &ResolvedShard{
Target: target,
QueryService: r.queryService,
Target: target,
Gateway: r.gateway,
}
}
return res, srvKeyspace, nil
@ -194,8 +204,8 @@ func (r *Resolver) ResolveDestinations(ctx context.Context, keyspace string, tab
target.Cell = ""
s = len(result)
result = append(result, &ResolvedShard{
Target: target,
QueryService: r.queryService,
Target: target,
Gateway: r.gateway,
})
if ids != nil {
values = append(values, nil)

Просмотреть файл

@ -327,12 +327,12 @@ func (itc *internalTabletConn) StreamExecute(ctx context.Context, target *queryp
}
// Begin is part of queryservice.QueryService
func (itc *internalTabletConn) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error) {
transactionID, err := itc.tablet.qsc.QueryService().Begin(ctx, target, options)
func (itc *internalTabletConn) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) {
transactionID, alias, err := itc.tablet.qsc.QueryService().Begin(ctx, target, options)
if err != nil {
return 0, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
return 0, nil, tabletconn.ErrorFromGRPC(vterrors.ToGRPC(err))
}
return transactionID, nil
return transactionID, alias, nil
}
// Commit is part of queryservice.QueryService
@ -396,23 +396,23 @@ func (itc *internalTabletConn) ReadTransaction(ctx context.Context, target *quer
}
// BeginExecute is part of queryservice.QueryService
func (itc *internalTabletConn) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) {
transactionID, err := itc.Begin(ctx, target, options)
func (itc *internalTabletConn) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, alias, err := itc.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
result, err := itc.Execute(ctx, target, query, bindVars, transactionID, options)
return result, transactionID, err
return result, transactionID, alias, err
}
// BeginExecuteBatch is part of queryservice.QueryService
func (itc *internalTabletConn) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error) {
transactionID, err := itc.Begin(ctx, target, options)
func (itc *internalTabletConn) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, alias, err := itc.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
results, err := itc.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options)
return results, transactionID, err
return results, transactionID, alias, err
}
// MessageStream is part of queryservice.QueryService

Просмотреть файл

@ -276,7 +276,7 @@ func commandVtTabletBegin(ctx context.Context, wr *wrangler.Wrangler, subFlags *
}
defer conn.Close(ctx)
transactionID, err := conn.Begin(ctx, &querypb.Target{
transactionID, _, err := conn.Begin(ctx, &querypb.Target{
Keyspace: tabletInfo.Tablet.Keyspace,
Shard: tabletInfo.Tablet.Shard,
TabletType: tabletInfo.Tablet.Type,

Просмотреть файл

@ -119,7 +119,7 @@ func newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet {
var _ queryservice.QueryService = (*explainTablet)(nil) // compile-time interface check
// Begin is part of the QueryService interface.
func (t *explainTablet) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error) {
func (t *explainTablet) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) {
t.mu.Lock()
t.currentTime = batchTime.Wait()
t.tabletQueries = append(t.tabletQueries, &TabletQuery{
@ -248,7 +248,7 @@ func (t *explainTablet) ExecuteBatch(ctx context.Context, target *querypb.Target
}
// BeginExecute is part of the QueryService interface.
func (t *explainTablet) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) {
func (t *explainTablet) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
t.mu.Lock()
t.currentTime = batchTime.Wait()
bindVariables = sqltypes.CopyBindVariables(bindVariables)

Просмотреть файл

@ -414,3 +414,8 @@ func NewShardError(in error, target *querypb.Target, tablet *topodatapb.Tablet)
func (dg *DiscoveryGateway) HealthCheck() *discovery.HealthCheck {
return nil
}
// QueryServiceByAlias satisfies the Gateway interface
func (dg *DiscoveryGateway) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) {
return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "Unimplemented")
}

Просмотреть файл

@ -71,7 +71,7 @@ func TestDiscoveryGatewayExecuteStream(t *testing.T) {
func TestDiscoveryGatewayBegin(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *DiscoveryGateway, target *querypb.Target) error {
_, err := dg.Begin(context.Background(), target, nil)
_, _, err := dg.Begin(context.Background(), target, nil)
return err
})
}
@ -90,7 +90,7 @@ func TestDiscoveryGatewayRollback(t *testing.T) {
func TestDiscoveryGatewayBeginExecute(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *DiscoveryGateway, target *querypb.Target) error {
_, _, err := dg.BeginExecute(context.Background(), target, "query", nil, nil)
_, _, _, err := dg.BeginExecute(context.Background(), target, "query", nil, nil)
return err
})
}
@ -98,7 +98,7 @@ func TestDiscoveryGatewayBeginExecute(t *testing.T) {
func TestDiscoveryGatewayBeginExecuteBatch(t *testing.T) {
testDiscoveryGatewayGeneric(t, func(dg *DiscoveryGateway, target *querypb.Target) error {
queries := []*querypb.BoundQuery{{Sql: "query", BindVariables: nil}}
_, _, err := dg.BeginExecuteBatch(context.Background(), target, queries, false, nil)
_, _, _, err := dg.BeginExecuteBatch(context.Background(), target, queries, false, nil)
return err
})
}

Просмотреть файл

@ -207,9 +207,10 @@ func (e *Executor) execute(ctx context.Context, safeSession *SafeSession, sql st
return 0, nil, err
}
if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER {
return 0, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
}
// TODO(deepthi): we should check for the right condition that allows transactions
//if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER {
// return 0, nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "executor.execute: transactions are supported only for master tablet types, current type: %v", destTabletType)
//}
if bindVars == nil {
bindVars = make(map[string]*querypb.BindVariable)
}
@ -279,9 +280,6 @@ func (e *Executor) destinationExec(ctx context.Context, safeSession *SafeSession
}
func (e *Executor) handleBegin(ctx context.Context, safeSession *SafeSession, destTabletType topodatapb.TabletType, logStats *LogStats) (*sqltypes.Result, error) {
if destTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
}
execStart := time.Now()
logStats.PlanTime = execStart.Sub(logStats.StartTime)
err := e.txConn.Begin(ctx, safeSession)
@ -1517,7 +1515,7 @@ func (e *Executor) prepare(ctx context.Context, safeSession *SafeSession, sql st
}
if safeSession.InTransaction() && destTabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", destTabletType)
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "executor.prepare: transactions are supported only for master tablet types, current type: %v", destTabletType)
}
if bindVars == nil {
bindVars = make(map[string]*querypb.BindVariable)

Просмотреть файл

@ -65,6 +65,9 @@ type Gateway interface {
// HealthCheck returns a reference to the healthCheck being used by this gateway
HealthCheck() *discovery.HealthCheck
// TabletByAlias returns a QueryService
QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error)
}
// Creator is the factory method which can create the actual gateway object.

Просмотреть файл

@ -151,7 +151,7 @@ func (session *SafeSession) InTransaction() bool {
}
// Find returns the transactionId, if any, for a session
func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.TabletType) int64 {
func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.TabletType) (transactionID int64, alias *topodatapb.TabletAlias) {
session.mu.Lock()
defer session.mu.Unlock()
sessions := session.ShardSessions
@ -163,10 +163,10 @@ func (session *SafeSession) Find(keyspace, shard string, tabletType topodatapb.T
}
for _, shardSession := range sessions {
if keyspace == shardSession.Target.Keyspace && tabletType == shardSession.Target.TabletType && shard == shardSession.Target.Shard {
return shardSession.TransactionId
return shardSession.TransactionId, shardSession.TabletAlias
}
}
return 0
return 0, nil
}
// Append adds a new ShardSession

Просмотреть файл

@ -22,6 +22,8 @@ import (
"sync"
"time"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
@ -66,7 +68,7 @@ type shardActionFunc func(rs *srvtopo.ResolvedShard, i int) error
// multiGoTransaction is capable of executing multiple
// shardActionTransactionFunc actions in parallel and consolidating
// the results and errors for the caller.
type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error)
type shardActionTransactionFunc func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64, alias *topodatapb.TabletAlias) (int64, *topodatapb.TabletAlias, error)
// LegacyNewScatterConn creates a new ScatterConn.
func LegacyNewScatterConn(statsName string, txConn *TxConn, gw Gateway, hc discovery.LegacyHealthCheck) *ScatterConn {
@ -159,7 +161,7 @@ func (stc *ScatterConn) Execute(
tabletType,
session,
notInTransaction,
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) {
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64, alias *topodatapb.TabletAlias) (int64, *topodatapb.TabletAlias, error) {
var (
innerqr *sqltypes.Result
err error
@ -169,12 +171,20 @@ func (stc *ScatterConn) Execute(
case autocommit:
innerqr, err = stc.executeAutocommit(ctx, rs, query, bindVars, opts)
case shouldBegin:
innerqr, transactionID, err = rs.QueryService.BeginExecute(ctx, rs.Target, query, bindVars, options)
innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, query, bindVars, options)
default:
innerqr, err = rs.QueryService.Execute(ctx, rs.Target, query, bindVars, transactionID, options)
var qs queryservice.QueryService
if *GatewayImplementation == GatewayImplementationDiscovery || transactionID == 0 {
qs = rs.Gateway
} else {
qs, err = rs.Gateway.QueryServiceByAlias(alias)
}
if err == nil {
innerqr, err = qs.Execute(ctx, rs.Target, query, bindVars, transactionID, options)
}
}
if err != nil {
return transactionID, err
return transactionID, alias, err
}
mu.Lock()
@ -183,7 +193,7 @@ func (stc *ScatterConn) Execute(
if len(qr.Rows) <= *maxMemoryRows {
qr.AppendResult(innerqr)
}
return transactionID, nil
return transactionID, alias, nil
},
)
@ -221,7 +231,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
tabletType,
session,
notInTransaction,
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64) (int64, error) {
func(rs *srvtopo.ResolvedShard, i int, shouldBegin bool, transactionID int64, alias *topodatapb.TabletAlias) (int64, *topodatapb.TabletAlias, error) {
var (
innerqr *sqltypes.Result
err error
@ -235,12 +245,20 @@ func (stc *ScatterConn) ExecuteMultiShard(
case autocommit:
innerqr, err = stc.executeAutocommit(ctx, rs, queries[i].Sql, queries[i].BindVariables, opts)
case shouldBegin:
innerqr, transactionID, err = rs.QueryService.BeginExecute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, opts)
innerqr, transactionID, alias, err = rs.Gateway.BeginExecute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, opts)
default:
innerqr, err = rs.QueryService.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, transactionID, opts)
var qs queryservice.QueryService
if *GatewayImplementation == GatewayImplementationDiscovery || transactionID == 0 {
qs = rs.Gateway
} else {
qs, err = rs.Gateway.QueryServiceByAlias(alias)
}
if err == nil {
innerqr, err = qs.Execute(ctx, rs.Target, queries[i].Sql, queries[i].BindVariables, transactionID, opts)
}
}
if err != nil {
return transactionID, err
return transactionID, alias, err
}
mu.Lock()
@ -249,7 +267,7 @@ func (stc *ScatterConn) ExecuteMultiShard(
if len(qr.Rows) <= *maxMemoryRows {
qr.AppendResult(innerqr)
}
return transactionID, nil
return transactionID, alias, nil
},
)
@ -267,7 +285,7 @@ func (stc *ScatterConn) executeAutocommit(ctx context.Context, rs *srvtopo.Resol
}}
// ExecuteBatch is a stop-gap because it's the only function that can currently do
// single round-trip commit.
qrs, err := rs.QueryService.ExecuteBatch(ctx, rs.Target, queries, true /* asTransaction */, 0, options)
qrs, err := rs.Gateway.ExecuteBatch(ctx, rs.Target, queries, true /* asTransaction */, 0, options)
if err != nil {
return nil, err
}
@ -311,7 +329,7 @@ func (stc *ScatterConn) StreamExecute(
fieldSent := false
allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error {
return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars, 0, options, func(qr *sqltypes.Result) error {
return rs.Gateway.StreamExecute(ctx, rs.Target, query, bindVars, 0, options, func(qr *sqltypes.Result) error {
return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback)
})
})
@ -337,7 +355,7 @@ func (stc *ScatterConn) StreamExecuteMulti(
fieldSent := false
allErrors := stc.multiGo(ctx, "StreamExecute", rss, tabletType, func(rs *srvtopo.ResolvedShard, i int) error {
return rs.QueryService.StreamExecute(ctx, rs.Target, query, bindVars[i], 0, options, func(qr *sqltypes.Result) error {
return rs.Gateway.StreamExecute(ctx, rs.Target, query, bindVars[i], 0, options, func(qr *sqltypes.Result) error {
return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback)
})
})
@ -395,7 +413,7 @@ func (stc *ScatterConn) MessageStream(ctx context.Context, rss []*srvtopo.Resolv
// an individual stream to end. If we don't succeed on the retries for
// messageStreamGracePeriod, we abort and return an error.
for {
err := rs.QueryService.MessageStream(ctx, rs.Target, name, func(qr *sqltypes.Result) error {
err := rs.Gateway.MessageStream(ctx, rs.Target, name, func(qr *sqltypes.Result) error {
lastErrors.Reset(rs.Target)
return stc.processOneStreamingResult(&mu, &fieldSent, qr, callback)
})
@ -532,12 +550,13 @@ func (stc *ScatterConn) multiGoTransaction(
startTime, statsKey := stc.startAction(name, rs.Target)
defer stc.endAction(startTime, allErrors, statsKey, &err, session)
shouldBegin, transactionID := transactionInfo(rs.Target, session, notInTransaction)
transactionID, err = action(rs, i, shouldBegin, transactionID)
shouldBegin, transactionID, alias := transactionInfo(rs.Target, session, notInTransaction)
transactionID, alias, err = action(rs, i, shouldBegin, transactionID, alias)
if shouldBegin && transactionID != 0 {
if appendErr := session.Append(&vtgatepb.Session_ShardSession{
Target: rs.Target,
TransactionId: transactionID,
TabletAlias: alias,
}, stc.txConn.mode); appendErr != nil {
err = appendErr
}
@ -576,24 +595,24 @@ func transactionInfo(
target *querypb.Target,
session *SafeSession,
notInTransaction bool,
) (shouldBegin bool, transactionID int64) {
) (shouldBegin bool, transactionID int64, alias *topodatapb.TabletAlias) {
if !session.InTransaction() {
return false, 0
return false, 0, nil
}
// No need to protect ourselves from the race condition between
// Find and Append. The higher level functions ensure that no
// duplicate (target) tuples can execute
// this at the same time.
transactionID = session.Find(target.Keyspace, target.Shard, target.TabletType)
transactionID, alias = session.Find(target.Keyspace, target.Shard, target.TabletType)
if transactionID != 0 {
return false, transactionID
return false, transactionID, alias
}
// We are in a transaction at higher level,
// but client requires not to start a transaction for this query.
// If a transaction was started on this conn, we will use it (as above).
if notInTransaction {
return false, 0
return false, 0, nil
}
return true, 0
return true, 0, nil
}

Просмотреть файл

@ -288,14 +288,14 @@ func TestMultiExecs(t *testing.T) {
Keyspace: "TestMultiExecs",
Shard: "0",
},
QueryService: sbc0,
Gateway: sbc0,
},
{
Target: &querypb.Target{
Keyspace: "TestMultiExecs",
Shard: "1",
},
QueryService: sbc1,
Gateway: sbc1,
},
}
queries := []*querypb.BoundQuery{
@ -338,14 +338,14 @@ func TestMultiExecs(t *testing.T) {
Keyspace: "TestMultiExecs",
Shard: "0",
},
QueryService: sbc0,
Gateway: sbc0,
},
{
Target: &querypb.Target{
Keyspace: "TestMultiExecs",
Shard: "1",
},
QueryService: sbc1,
Gateway: sbc1,
},
}
bvs := []map[string]*querypb.BindVariable{
@ -420,6 +420,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
TabletType: topodatapb.TabletType_REPLICA,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(&wantSession, session.Session) {
@ -470,6 +471,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
TabletType: topodatapb.TabletType_REPLICA,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(&wantSession, session.Session) {
@ -520,6 +522,7 @@ func TestScatterConnQueryNotInTransaction(t *testing.T) {
TabletType: topodatapb.TabletType_REPLICA,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(&wantSession, session.Session) {

Просмотреть файл

@ -117,6 +117,11 @@ func NewTabletGateway(ctx context.Context, serv srvtopo.Server, localCell string
return gw
}
// QueryServiceByAlias satisfies the Gateway interface
func (gw *TabletGateway) QueryServiceByAlias(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) {
return gw.hc.GetConnection(alias)
}
// RegisterStats registers the stats to export the lag since the last refresh
// and the checksum of the topology
func (gw *TabletGateway) RegisterStats() {

Просмотреть файл

@ -81,6 +81,7 @@ func (txc *TxConn) Commit(ctx context.Context, session *SafeSession) error {
func (txc *TxConn) commitNormal(ctx context.Context, session *SafeSession) error {
if err := txc.runSessions(session.PreSessions, func(s *vtgatepb.Session_ShardSession) error {
defer func() { s.TransactionId = 0 }()
// TODO(deepthi): this Commit needs to happen on the right tablet (that the transactionID belongs to)
return txc.gateway.Commit(ctx, s.Target, s.TransactionId)
}); err != nil {
_ = txc.Rollback(ctx, session)

Просмотреть файл

@ -80,6 +80,7 @@ func TestTxConnCommitSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(session.Session, &wantSession) {
@ -95,6 +96,7 @@ func TestTxConnCommitSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}, {
Target: &querypb.Target{
Keyspace: "TestTxConn",
@ -102,6 +104,7 @@ func TestTxConnCommitSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(session.Session, &wantSession) {
@ -263,6 +266,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(session.Session, &wantSession) {
@ -280,6 +284,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 2,
TabletAlias: sbc0.Tablet().Alias,
}},
ShardSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
@ -288,6 +293,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(session.Session, &wantSession) {
@ -305,6 +311,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 2,
TabletAlias: sbc0.Tablet().Alias,
}},
ShardSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
@ -313,6 +320,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
PostSessions: []*vtgatepb.Session_ShardSession{{
Target: &querypb.Target{
@ -321,6 +329,7 @@ func TestTxConnCommitOrderSuccess(t *testing.T) {
TabletType: topodatapb.TabletType_MASTER,
},
TransactionId: 1,
TabletAlias: sbc0.Tablet().Alias,
}},
}
if !proto.Equal(session.Session, &wantSession) {

Просмотреть файл

@ -139,10 +139,11 @@ func newVCursorImpl(ctx context.Context, safeSession *SafeSession, marginComment
return nil, err
}
// TODO(deepthi): is it safe to remove this code block?
// Check for transaction to be only application in master.
if safeSession.InTransaction() && tabletType != topodatapb.TabletType_MASTER {
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", tabletType)
}
//if safeSession.InTransaction() && tabletType != topodatapb.TabletType_MASTER {
// return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "transactions are supported only for master tablet types, current type: %v", tabletType)
//}
return &vcursorImpl{
ctx: ctx,

Просмотреть файл

@ -212,7 +212,7 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha
return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "unexpected number or shards: %v", rss)
}
// Safe to access sgtid.Gtid here (because it can't change until streaming begins).
err = rss[0].QueryService.VStream(ctx, rss[0].Target, sgtid.Gtid, vs.filter, func(events []*binlogdatapb.VEvent) error {
err = rss[0].Gateway.VStream(ctx, rss[0].Target, sgtid.Gtid, vs.filter, func(events []*binlogdatapb.VEvent) error {
// We received a valid event. Reset error count.
errCount = 0

Просмотреть файл

@ -85,7 +85,7 @@ func (client *QueryClient) Begin(clientFoundRows bool) error {
if clientFoundRows {
options = &querypb.ExecuteOptions{ClientFoundRows: clientFoundRows}
}
transactionID, err := client.server.Begin(client.ctx, &client.target, options)
transactionID, _, err := client.server.Begin(client.ctx, &client.target, options)
if err != nil {
return err
}
@ -164,7 +164,7 @@ func (client *QueryClient) BeginExecute(query string, bindvars map[string]*query
if client.transactionID != 0 {
return nil, errors.New("already in transaction")
}
qr, transactionID, err := client.server.BeginExecute(
qr, transactionID, _, err := client.server.BeginExecute(
client.ctx,
&client.target,
query,

Просмотреть файл

@ -91,7 +91,8 @@ func (q *query) Begin(ctx context.Context, request *querypb.BeginRequest) (respo
request.EffectiveCallerId,
request.ImmediateCallerId,
)
transactionID, err := q.server.Begin(ctx, request.Target, request.Options)
// ignore returned TabletAlias here, we are not sending it in the grpc response
transactionID, _, err := q.server.Begin(ctx, request.Target, request.Options)
if err != nil {
return nil, vterrors.ToGRPC(err)
}
@ -248,8 +249,8 @@ func (q *query) BeginExecute(ctx context.Context, request *querypb.BeginExecuteR
request.EffectiveCallerId,
request.ImmediateCallerId,
)
result, transactionID, err := q.server.BeginExecute(ctx, request.Target, request.Query.Sql, request.Query.BindVariables, request.Options)
// ignore returned TabletAlias here, we are not sending it in the grpc response
result, transactionID, _, err := q.server.BeginExecute(ctx, request.Target, request.Query.Sql, request.Query.BindVariables, request.Options)
if err != nil {
// if we have a valid transactionID, return the error in-band
if transactionID != 0 {
@ -273,8 +274,8 @@ func (q *query) BeginExecuteBatch(ctx context.Context, request *querypb.BeginExe
request.EffectiveCallerId,
request.ImmediateCallerId,
)
results, transactionID, err := q.server.BeginExecuteBatch(ctx, request.Target, request.Queries, request.AsTransaction, request.Options)
// ignore returned TabletAlias here, we are not sending it in the grpc response
results, transactionID, _, err := q.server.BeginExecuteBatch(ctx, request.Target, request.Queries, request.AsTransaction, request.Options)
if err != nil {
// if we have a valid transactionID, return the error in-band
if transactionID != 0 {

Просмотреть файл

@ -197,11 +197,11 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
}
// Begin starts a transaction.
func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, err error) {
func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, alias *topodatapb.TabletAlias, err error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return 0, tabletconn.ConnClosed
return 0, nil, tabletconn.ConnClosed
}
req := &querypb.BeginRequest{
@ -212,9 +212,9 @@ func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target,
}
br, err := conn.c.Begin(ctx, req)
if err != nil {
return 0, tabletconn.ErrorFromGRPC(err)
return 0, nil, tabletconn.ErrorFromGRPC(err)
}
return br.TransactionId, nil
return br.TransactionId, conn.tablet.Alias, nil
}
// Commit commits the ongoing transaction.
@ -436,11 +436,11 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp
}
// BeginExecute starts a transaction and runs an Execute.
func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (result *sqltypes.Result, transactionID int64, err error) {
func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (result *sqltypes.Result, transactionID int64, alias *topodatapb.TabletAlias, err error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, 0, tabletconn.ConnClosed
return nil, 0, nil, tabletconn.ConnClosed
}
req := &querypb.BeginExecuteRequest{
@ -455,20 +455,20 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T
}
reply, err := conn.c.BeginExecute(ctx, req)
if err != nil {
return nil, 0, tabletconn.ErrorFromGRPC(err)
return nil, 0, nil, tabletconn.ErrorFromGRPC(err)
}
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.ErrorFromVTRPC(reply.Error)
return nil, reply.TransactionId, conn.tablet.Alias, tabletconn.ErrorFromVTRPC(reply.Error)
}
return sqltypes.Proto3ToResult(reply.Result), reply.TransactionId, nil
return sqltypes.Proto3ToResult(reply.Result), reply.TransactionId, conn.tablet.Alias, nil
}
// BeginExecuteBatch starts a transaction and runs an ExecuteBatch.
func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) (results []sqltypes.Result, transactionID int64, err error) {
func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) (results []sqltypes.Result, transactionID int64, alias *topodatapb.TabletAlias, err error) {
conn.mu.RLock()
defer conn.mu.RUnlock()
if conn.cc == nil {
return nil, 0, tabletconn.ConnClosed
return nil, 0, nil, tabletconn.ConnClosed
}
req := &querypb.BeginExecuteBatchRequest{
@ -482,12 +482,12 @@ func (conn *gRPCQueryClient) BeginExecuteBatch(ctx context.Context, target *quer
reply, err := conn.c.BeginExecuteBatch(ctx, req)
if err != nil {
return nil, 0, tabletconn.ErrorFromGRPC(err)
return nil, 0, nil, tabletconn.ErrorFromGRPC(err)
}
if reply.Error != nil {
return nil, reply.TransactionId, tabletconn.ErrorFromVTRPC(reply.Error)
return nil, reply.TransactionId, conn.tablet.Alias, tabletconn.ErrorFromVTRPC(reply.Error)
}
return sqltypes.Proto3ToResults(reply.Results), reply.TransactionId, nil
return sqltypes.Proto3ToResults(reply.Results), reply.TransactionId, conn.tablet.Alias, nil
}
// MessageStream streams messages.

Просмотреть файл

@ -56,8 +56,8 @@ func NewStreamHealthQueryService(target querypb.Target) *StreamHealthQueryServic
}
// Begin implemented as a no op
func (q *StreamHealthQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error) {
return 0, nil
func (q *StreamHealthQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) {
return 0, nil, nil
}
// Execute implemented as a no op

Просмотреть файл

@ -21,6 +21,8 @@ package queryservice
import (
"io"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"golang.org/x/net/context"
"vitess.io/vitess/go/sqltypes"
@ -41,7 +43,7 @@ type QueryService interface {
// Transaction management
// Begin returns the transaction id to use for further operations
Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error)
Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error)
// Commit commits the current transaction
Commit(ctx context.Context, target *querypb.Target, transactionID int64) error
@ -85,8 +87,8 @@ type QueryService interface {
// Begin part. If err != nil, the transactionID may still be
// non-zero, and needs to be propagated back (like for a DB
// Integrity Error)
BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error)
BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error)
BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error)
BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, *topodatapb.TabletAlias, error)
// Messaging methods.
MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error

Просмотреть файл

@ -18,6 +18,7 @@ package queryservice
import (
"golang.org/x/net/context"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vterrors"
@ -83,13 +84,13 @@ type wrappedService struct {
wrapper WrapperFunc
}
func (ws *wrappedService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, err error) {
func (ws *wrappedService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, alias *topodatapb.TabletAlias, err error) {
err = ws.wrapper(ctx, target, ws.impl, "Begin", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
transactionID, innerErr = conn.Begin(ctx, target, options)
transactionID, alias, innerErr = conn.Begin(ctx, target, options)
return canRetry(ctx, innerErr), innerErr
})
return transactionID, err
return transactionID, alias, err
}
func (ws *wrappedService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) error {
@ -201,22 +202,22 @@ func (ws *wrappedService) ExecuteBatch(ctx context.Context, target *querypb.Targ
return qrs, err
}
func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (qr *sqltypes.Result, transactionID int64, err error) {
func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (qr *sqltypes.Result, transactionID int64, alias *topodatapb.TabletAlias, err error) {
err = ws.wrapper(ctx, target, ws.impl, "BeginExecute", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
qr, transactionID, innerErr = conn.BeginExecute(ctx, target, query, bindVars, options)
qr, transactionID, alias, innerErr = conn.BeginExecute(ctx, target, query, bindVars, options)
return canRetry(ctx, innerErr), innerErr
})
return qr, transactionID, err
return qr, transactionID, alias, err
}
func (ws *wrappedService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) (qrs []sqltypes.Result, transactionID int64, err error) {
func (ws *wrappedService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) (qrs []sqltypes.Result, transactionID int64, alias *topodatapb.TabletAlias, err error) {
err = ws.wrapper(ctx, target, ws.impl, "BeginExecuteBatch", false, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
var innerErr error
qrs, transactionID, innerErr = conn.BeginExecuteBatch(ctx, target, queries, asTransaction, options)
qrs, transactionID, alias, innerErr = conn.BeginExecuteBatch(ctx, target, queries, asTransaction, options)
return canRetry(ctx, innerErr), innerErr
})
return qrs, transactionID, err
return qrs, transactionID, alias, err
}
func (ws *wrappedService) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error {

Просмотреть файл

@ -177,13 +177,13 @@ func (sbc *SandboxConn) StreamExecute(ctx context.Context, target *querypb.Targe
}
// Begin is part of the QueryService interface.
func (sbc *SandboxConn) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error) {
func (sbc *SandboxConn) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) {
sbc.BeginCount.Add(1)
err := sbc.getError()
if err != nil {
return 0, err
return 0, nil, err
}
return sbc.TransactionID.Add(1), nil
return sbc.TransactionID.Add(1), sbc.tablet.Alias, nil
}
// Commit is part of the QueryService interface.
@ -286,23 +286,23 @@ func (sbc *SandboxConn) ReadTransaction(ctx context.Context, target *querypb.Tar
}
// BeginExecute is part of the QueryService interface.
func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) {
transactionID, err := sbc.Begin(ctx, target, options)
func (sbc *SandboxConn) BeginExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, alias, err := sbc.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
result, err := sbc.Execute(ctx, target, query, bindVars, transactionID, options)
return result, transactionID, err
return result, transactionID, alias, err
}
// BeginExecuteBatch is part of the QueryService interface.
func (sbc *SandboxConn) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error) {
transactionID, err := sbc.Begin(ctx, target, options)
func (sbc *SandboxConn) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, alias, err := sbc.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
results, err := sbc.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options)
return results, transactionID, err
return results, transactionID, alias, err
}
// MessageStream is part of the QueryService interface.
@ -375,6 +375,11 @@ func (sbc *SandboxConn) VStreamResults(ctx context.Context, target *querypb.Targ
return fmt.Errorf("not implemented in test")
}
// QueryServiceByAlias is part of the Gateway interface.
func (sbc *SandboxConn) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) {
return sbc, nil
}
// HandlePanic is part of the QueryService interface.
func (sbc *SandboxConn) HandlePanic(err *error) {
}

Просмотреть файл

@ -21,6 +21,8 @@ import (
"fmt"
"testing"
"vitess.io/vitess/go/vt/vttablet/queryservice"
"golang.org/x/net/context"
"github.com/golang/protobuf/proto"
@ -126,9 +128,9 @@ func (f *FakeQueryService) checkTargetCallerID(ctx context.Context, name string,
const BeginTransactionID int64 = 9990
// Begin is part of the queryservice.QueryService interface
func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, error) {
func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (int64, *topodatapb.TabletAlias, error) {
if f.HasBeginError {
return 0, f.TabletError
return 0, nil, f.TabletError
}
if f.Panics {
panic(fmt.Errorf("test-triggered panic"))
@ -137,7 +139,8 @@ func (f *FakeQueryService) Begin(ctx context.Context, target *querypb.Target, op
if !proto.Equal(options, TestExecuteOptions) {
f.t.Errorf("invalid Execute.ExecuteOptions: got %v expected %v", options, TestExecuteOptions)
}
return BeginTransactionID, nil
// TODO(deepthi): what alias should we actually return here?
return BeginTransactionID, nil, nil
}
// CommitTransactionID is a test transaction id for Commit.
@ -564,25 +567,27 @@ func (f *FakeQueryService) ExecuteBatch(ctx context.Context, target *querypb.Tar
}
// BeginExecute combines Begin and Execute.
func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) {
transactionID, err := f.Begin(ctx, target, options)
func (f *FakeQueryService) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, _, err := f.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
// TODO(deepthi): what alias should we actually return here?
result, err := f.Execute(ctx, target, sql, bindVariables, transactionID, options)
return result, transactionID, err
return result, transactionID, nil, err
}
// BeginExecuteBatch combines Begin and ExecuteBatch.
func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error) {
transactionID, err := f.Begin(ctx, target, options)
func (f *FakeQueryService) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
transactionID, _, err := f.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
// TODO(deepthi): what alias should we actually return here?
results, err := f.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options)
return results, transactionID, err
return results, transactionID, nil, err
}
var (
@ -698,6 +703,11 @@ func (f *FakeQueryService) VStreamResults(ctx context.Context, target *querypb.T
panic("not implemented")
}
// QueryServiceByAlias satisfies the Gateway interface
func (f *FakeQueryService) QueryServiceByAlias(_ *topodatapb.TabletAlias) (queryservice.QueryService, error) {
panic("not implemented")
}
// CreateFakeServer returns the fake server for the tests
func CreateFakeServer(t *testing.T) *FakeQueryService {
return &FakeQueryService{

Просмотреть файл

@ -98,7 +98,8 @@ func testBegin(t *testing.T, conn queryservice.QueryService, f *FakeQueryService
t.Log("testBegin")
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
transactionID, err := conn.Begin(ctx, TestTarget, TestExecuteOptions)
// TODO(deepthi): should we test returned alias here?
transactionID, _, err := conn.Begin(ctx, TestTarget, TestExecuteOptions)
if err != nil {
t.Fatalf("Begin failed: %v", err)
}
@ -111,7 +112,7 @@ func testBeginError(t *testing.T, conn queryservice.QueryService, f *FakeQuerySe
t.Log("testBeginError")
f.HasBeginError = true
testErrorHelper(t, f, "Begin", func(ctx context.Context) error {
_, err := conn.Begin(ctx, TestTarget, nil)
_, _, err := conn.Begin(ctx, TestTarget, nil)
return err
})
f.HasBeginError = false
@ -120,7 +121,7 @@ func testBeginError(t *testing.T, conn queryservice.QueryService, f *FakeQuerySe
func testBeginPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testBeginPanics")
testPanicHelper(t, f, "Begin", func(ctx context.Context) error {
_, err := conn.Begin(ctx, TestTarget, nil)
_, _, err := conn.Begin(ctx, TestTarget, nil)
return err
})
}
@ -427,7 +428,7 @@ func testBeginExecute(t *testing.T, conn queryservice.QueryService, f *FakeQuery
f.ExpectedTransactionID = BeginTransactionID
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
qr, transactionID, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
qr, transactionID, _, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
if err != nil {
t.Fatalf("BeginExecute failed: %v", err)
}
@ -443,7 +444,7 @@ func testBeginExecuteErrorInBegin(t *testing.T, conn queryservice.QueryService,
t.Log("testBeginExecuteErrorInBegin")
f.HasBeginError = true
testErrorHelper(t, f, "BeginExecute.Begin", func(ctx context.Context) error {
_, transactionID, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
_, transactionID, _, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
if transactionID != 0 {
t.Errorf("Unexpected transactionID from BeginExecute: got %v wanted 0", transactionID)
}
@ -457,7 +458,7 @@ func testBeginExecuteErrorInExecute(t *testing.T, conn queryservice.QueryService
f.HasError = true
testErrorHelper(t, f, "BeginExecute.Execute", func(ctx context.Context) error {
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
_, transactionID, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
_, transactionID, _, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
if transactionID != BeginTransactionID {
t.Errorf("Unexpected transactionID from BeginExecute: got %v wanted %v", transactionID, BeginTransactionID)
}
@ -469,7 +470,7 @@ func testBeginExecuteErrorInExecute(t *testing.T, conn queryservice.QueryService
func testBeginExecutePanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testBeginExecutePanics")
testPanicHelper(t, f, "BeginExecute", func(ctx context.Context) error {
_, _, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
_, _, _, err := conn.BeginExecute(ctx, TestTarget, ExecuteQuery, ExecuteBindVars, TestExecuteOptions)
return err
})
}
@ -611,7 +612,7 @@ func testBeginExecuteBatch(t *testing.T, conn queryservice.QueryService, f *Fake
f.ExpectedTransactionID = BeginTransactionID
ctx := context.Background()
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
qrl, transactionID, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
qrl, transactionID, _, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
if err != nil {
t.Fatalf("BeginExecuteBatch failed: %v", err)
}
@ -627,7 +628,7 @@ func testBeginExecuteBatchErrorInBegin(t *testing.T, conn queryservice.QueryServ
t.Log("testBeginExecuteBatchErrorInBegin")
f.HasBeginError = true
testErrorHelper(t, f, "BeginExecuteBatch.Begin", func(ctx context.Context) error {
_, transactionID, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
_, transactionID, _, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
if transactionID != 0 {
t.Errorf("Unexpected transactionID from BeginExecuteBatch: got %v wanted 0", transactionID)
}
@ -641,7 +642,7 @@ func testBeginExecuteBatchErrorInExecuteBatch(t *testing.T, conn queryservice.Qu
f.HasError = true
testErrorHelper(t, f, "BeginExecute.ExecuteBatch", func(ctx context.Context) error {
ctx = callerid.NewContext(ctx, TestCallerID, TestVTGateCallerID)
_, transactionID, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
_, transactionID, _, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
if transactionID != BeginTransactionID {
t.Errorf("Unexpected transactionID from BeginExecuteBatch: got %v wanted %v", transactionID, BeginTransactionID)
}
@ -653,7 +654,7 @@ func testBeginExecuteBatchErrorInExecuteBatch(t *testing.T, conn queryservice.Qu
func testBeginExecuteBatchPanics(t *testing.T, conn queryservice.QueryService, f *FakeQueryService) {
t.Log("testBeginExecuteBatchPanics")
testPanicHelper(t, f, "BeginExecuteBatch", func(ctx context.Context) error {
_, _, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
_, _, _, err := conn.BeginExecuteBatch(ctx, TestTarget, ExecuteBatchQueries, true, TestExecuteOptions)
return err
})
}

Просмотреть файл

@ -229,8 +229,10 @@ func TestQueryExecutorPlans(t *testing.T) {
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
// Test inside a transaction.
txid, err := tsv.Begin(ctx, &tsv.target, nil)
txid, alias, err := tsv.Begin(ctx, &tsv.target, nil)
require.NoError(t, err)
require.NotNil(t, alias, "alias should not be nil")
assert.Equal(t, tsv.alias, *alias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, &tsv.target, txid)
qre = newTestQueryExecutor(ctx, tsv, tcase.input, txid)
@ -293,8 +295,10 @@ func TestQueryExecutorSelectImpossible(t *testing.T) {
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
txid, err := tsv.Begin(ctx, &tsv.target, nil)
txid, alias, err := tsv.Begin(ctx, &tsv.target, nil)
require.NoError(t, err)
require.NotNil(t, alias, "alias should not be nil")
assert.Equal(t, tsv.alias, *alias, "Wrong tablet alias from Begin")
defer tsv.Commit(ctx, &tsv.target, txid)
qre = newTestQueryExecutor(ctx, tsv, tcase.input, txid)
@ -399,8 +403,10 @@ func TestQueryExecutorLimitFailure(t *testing.T) {
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
// Test inside a transaction.
txid, err := tsv.Begin(ctx, &tsv.target, nil)
txid, alias, err := tsv.Begin(ctx, &tsv.target, nil)
require.NoError(t, err)
require.NotNil(t, alias, "alias should not be nil")
assert.Equal(t, tsv.alias, *alias, "Wrong tablet alias from Begin")
defer tsv.Commit(ctx, &tsv.target, txid)
qre = newTestQueryExecutor(ctx, tsv, tcase.input, txid)
@ -1074,7 +1080,7 @@ func newTestTabletServer(ctx context.Context, flags executorFlags, db *fakesqldb
}
func newTransaction(tsv *TabletServer, options *querypb.ExecuteOptions) int64 {
transactionID, err := tsv.Begin(context.Background(), &tsv.target, options)
transactionID, _, err := tsv.Begin(context.Background(), &tsv.target, options)
if err != nil {
panic(vterrors.Wrap(err, "failed to start a transaction"))
}

Просмотреть файл

@ -736,7 +736,7 @@ func (tsv *TabletServer) SchemaEngine() *schema.Engine {
}
// Begin starts a new transaction. This is allowed only if the state is StateServing.
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, err error) {
func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions) (transactionID int64, tablet *topodatapb.TabletAlias, err error) {
err = tsv.execRequest(
ctx, tsv.QueryTimeout.Get(),
"Begin", "begin", nil,
@ -763,7 +763,7 @@ func (tsv *TabletServer) Begin(ctx context.Context, target *querypb.Target, opti
return err
},
)
return transactionID, err
return transactionID, &tsv.alias, err
}
// Commit commits the specified transaction.
@ -1082,7 +1082,7 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
}
if asTransaction {
transactionID, err = tsv.Begin(ctx, target, options)
transactionID, _, err = tsv.Begin(ctx, target, options)
if err != nil {
return nil, err
}
@ -1113,24 +1113,24 @@ func (tsv *TabletServer) ExecuteBatch(ctx context.Context, target *querypb.Targe
}
// BeginExecute combines Begin and Execute.
func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, error) {
func (tsv *TabletServer) BeginExecute(ctx context.Context, target *querypb.Target, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (*sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
if tsv.enableHotRowProtection {
txDone, err := tsv.beginWaitForSameRangeTransactions(ctx, target, options, sql, bindVariables)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
if txDone != nil {
defer txDone()
}
}
transactionID, err := tsv.Begin(ctx, target, options)
transactionID, alias, err := tsv.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
result, err := tsv.Execute(ctx, target, sql, bindVariables, transactionID, options)
return result, transactionID, err
return result, transactionID, alias, err
}
func (tsv *TabletServer) beginWaitForSameRangeTransactions(ctx context.Context, target *querypb.Target, options *querypb.ExecuteOptions, sql string, bindVariables map[string]*querypb.BindVariable) (txserializer.DoneFunc, error) {
@ -1212,16 +1212,16 @@ func (tsv *TabletServer) computeTxSerializerKey(ctx context.Context, logStats *t
}
// BeginExecuteBatch combines Begin and ExecuteBatch.
func (tsv *TabletServer) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, error) {
func (tsv *TabletServer) BeginExecuteBatch(ctx context.Context, target *querypb.Target, queries []*querypb.BoundQuery, asTransaction bool, options *querypb.ExecuteOptions) ([]sqltypes.Result, int64, *topodatapb.TabletAlias, error) {
// TODO(mberlin): Integrate hot row protection here as we did for BeginExecute()
// and ExecuteBatch(asTransaction=true).
transactionID, err := tsv.Begin(ctx, target, options)
transactionID, alias, err := tsv.Begin(ctx, target, options)
if err != nil {
return nil, 0, err
return nil, 0, nil, err
}
results, err := tsv.ExecuteBatch(ctx, target, queries, asTransaction, transactionID, options)
return results, transactionID, err
return results, transactionID, alias, err
}
// MessageStream streams messages from the requested table.
@ -1292,7 +1292,7 @@ func (tsv *TabletServer) execDML(ctx context.Context, target *querypb.Target, qu
return 0, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "%v", err)
}
transactionID, err := tsv.Begin(ctx, target, nil)
transactionID, _, err := tsv.Begin(ctx, target, nil)
if err != nil {
return 0, err
}

Просмотреть файл

@ -30,6 +30,7 @@ import (
"time"
"github.com/golang/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/net/context"
@ -484,12 +485,13 @@ func TestBeginOnReplica(t *testing.T) {
options := querypb.ExecuteOptions{
TransactionIsolation: querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY,
}
txID, err := tsv.Begin(ctx, &target1, &options)
txID, alias, err := tsv.Begin(ctx, &target1, &options)
if err != nil {
t.Errorf("err: %v, failed to create read only tx on replica", err)
}
require.NotNil(t, alias, "alias should not be nil")
assert.Equal(t, tsv.alias, *alias, "Wrong tablet alias from Begin")
err = tsv.Rollback(ctx, &target1, txID)
if err != nil {
t.Errorf("err: %v, failed to rollback read only tx", err)
@ -499,7 +501,7 @@ func TestBeginOnReplica(t *testing.T) {
options = querypb.ExecuteOptions{
TransactionIsolation: querypb.ExecuteOptions_DEFAULT,
}
_, err = tsv.Begin(ctx, &target1, &options)
_, _, err = tsv.Begin(ctx, &target1, &options)
if err == nil {
t.Error("expected write tx to be refused")
@ -512,7 +514,7 @@ func TestTabletServerMasterToReplica(t *testing.T) {
defer db.Close()
ctx := context.Background()
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
txid1, err := tsv.Begin(ctx, &target, nil)
txid1, _, err := tsv.Begin(ctx, &target, nil)
require.NoError(t, err)
if _, err := tsv.Execute(ctx, &target, "update test_table set name = 2 where pk = 1", nil, txid1, nil); err != nil {
t.Error(err)
@ -520,7 +522,7 @@ func TestTabletServerMasterToReplica(t *testing.T) {
if err = tsv.Prepare(ctx, &target, txid1, "aa"); err != nil {
t.Error(err)
}
txid2, err := tsv.Begin(ctx, &target, nil)
txid2, _, err := tsv.Begin(ctx, &target, nil)
require.NoError(t, err)
// This makes txid2 busy
conn2, err := tsv.te.txPool.Get(txid2, "for query")
@ -805,6 +807,7 @@ func TestTabletServerReadTransaction(t *testing.T) {
got, err = tsv.ReadTransaction(ctx, &target, "aa")
require.NoError(t, err)
if !proto.Equal(got, want) {
t.Errorf("ReadTransaction: %v, want %v", got, want)
}
}
@ -838,7 +841,7 @@ func TestTabletServerBeginFail(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
tsv.Begin(ctx, &target, nil)
_, err = tsv.Begin(ctx, &target, nil)
_, _, err = tsv.Begin(ctx, &target, nil)
want := "transaction pool aborting request due to already expired context"
if err == nil || err.Error() != want {
t.Fatalf("Begin err: %v, want %v", err, want)
@ -869,7 +872,7 @@ func TestTabletServerCommitTransaction(t *testing.T) {
}
defer tsv.StopService()
ctx := context.Background()
transactionID, err := tsv.Begin(ctx, &target, nil)
transactionID, _, err := tsv.Begin(ctx, &target, nil)
if err != nil {
t.Fatalf("call TabletServer.Begin failed: %v", err)
}
@ -929,7 +932,7 @@ func TestTabletServerRollback(t *testing.T) {
}
defer tsv.StopService()
ctx := context.Background()
transactionID, err := tsv.Begin(ctx, &target, nil)
transactionID, _, err := tsv.Begin(ctx, &target, nil)
if err != nil {
t.Fatalf("call TabletServer.Begin failed: %v", err)
}
@ -948,7 +951,7 @@ func TestTabletServerPrepare(t *testing.T) {
defer tsv.StopService()
ctx := context.Background()
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
transactionID, err := tsv.Begin(ctx, &target, nil)
transactionID, _, err := tsv.Begin(ctx, &target, nil)
if err != nil {
t.Fatal(err)
}
@ -968,7 +971,7 @@ func TestTabletServerCommitPrepared(t *testing.T) {
defer tsv.StopService()
ctx := context.Background()
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
transactionID, err := tsv.Begin(ctx, &target, nil)
transactionID, _, err := tsv.Begin(ctx, &target, nil)
if err != nil {
t.Fatal(err)
}
@ -991,7 +994,7 @@ func TestTabletServerRollbackPrepared(t *testing.T) {
defer tsv.StopService()
ctx := context.Background()
target := querypb.Target{TabletType: topodatapb.TabletType_MASTER}
transactionID, err := tsv.Begin(ctx, &target, nil)
transactionID, _, err := tsv.Begin(ctx, &target, nil)
if err != nil {
t.Fatal(err)
}
@ -1391,7 +1394,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
go func() {
defer wg.Done()
_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
_, tx1, _, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q1, err)
}
@ -1406,7 +1409,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
defer wg.Done()
<-tx1Started
_, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
_, tx2, _, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q2, err)
}
@ -1426,7 +1429,7 @@ func TestSerializeTransactionsSameRow(t *testing.T) {
defer wg.Done()
<-tx1Started
_, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
_, tx3, _, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q3, err)
}
@ -1464,7 +1467,7 @@ func TestDMLQueryWithoutWhereClause(t *testing.T) {
db.AddQuery(q+" limit 10001", &sqltypes.Result{})
ctx := context.Background()
_, txid, err := tsv.BeginExecute(ctx, &target, q, nil, nil)
_, txid, _, err := tsv.BeginExecute(ctx, &target, q, nil, nil)
require.NoError(t, err)
err = tsv.Commit(ctx, &target, txid)
require.NoError(t, err)
@ -1654,7 +1657,7 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) {
go func() {
defer wg.Done()
_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
_, tx1, _, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q1, err)
}
@ -1673,7 +1676,7 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) {
// In that case, we would see less than 3 pending transactions.
<-tx1Started
_, tx2, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
_, tx2, _, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q2, err)
}
@ -1692,7 +1695,7 @@ func TestSerializeTransactionsSameRow_ConcurrentTransactions(t *testing.T) {
// In that case, we would see less than 3 pending transactions.
<-tx1Started
_, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
_, tx3, _, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q3, err)
}
@ -1792,7 +1795,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) {
go func() {
defer wg.Done()
_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
_, tx1, _, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q1, err)
}
@ -1808,7 +1811,7 @@ func TestSerializeTransactionsSameRow_TooManyPendingRequests(t *testing.T) {
defer close(tx2Failed)
<-tx1Started
_, _, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
_, _, _, err := tsv.BeginExecute(ctx, &target, q2, bvTx2, nil)
if err == nil || vterrors.Code(err) != vtrpcpb.Code_RESOURCE_EXHAUSTED || err.Error() != "hot row protection: too many queued transactions (1 >= 1) for the same row (table + WHERE clause: 'test_table where pk = 1 and name = 1')" {
t.Errorf("tx2 should have failed because there are too many pending requests: %v", err)
}
@ -1977,7 +1980,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
go func() {
defer wg.Done()
_, tx1, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
_, tx1, _, err := tsv.BeginExecute(ctx, &target, q1, bvTx1, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q1, err)
}
@ -1997,7 +2000,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
// Wait until tx1 has started to make the test deterministic.
<-tx1Started
_, _, err := tsv.BeginExecute(ctxTx2, &target, q2, bvTx2, nil)
_, _, _, err := tsv.BeginExecute(ctxTx2, &target, q2, bvTx2, nil)
if err == nil || vterrors.Code(err) != vtrpcpb.Code_CANCELED || err.Error() != "context canceled" {
t.Errorf("tx2 should have failed because the context was canceled: %v", err)
}
@ -2014,7 +2017,7 @@ func TestSerializeTransactionsSameRow_RequestCanceled(t *testing.T) {
t.Error(err)
}
_, tx3, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
_, tx3, _, err := tsv.BeginExecute(ctx, &target, q3, bvTx3, nil)
if err != nil {
t.Errorf("failed to execute query: %s: %s", q3, err)
}

Просмотреть файл

@ -260,11 +260,12 @@ func (te *TxEngine) Begin(ctx context.Context, options *querypb.ExecuteOptions)
return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can't accept new transactions in state %v", te.state)
}
isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
if te.state == AcceptingReadOnly && isWriteTransaction {
te.stateLock.Unlock()
return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state")
}
// TODO(deepthi): Can this be deleted now that we do allow replica transactions?
//isWriteTransaction := options == nil || options.TransactionIsolation != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
//if te.state == AcceptingReadOnly && isWriteTransaction {
// te.stateLock.Unlock()
// return 0, "", vterrors.Errorf(vtrpc.Code_UNAVAILABLE, "tx engine can only accept read-only transactions in current state")
//}
// By Add() to beginRequests, we block others from initiating state
// changes until we have finished adding this transaction

Просмотреть файл

@ -660,7 +660,7 @@ func createTransactions(ctx context.Context, numberOfScanners int, wr *wrangler.
scanners := make([]int64, numberOfScanners)
for i := 0; i < numberOfScanners; i++ {
tx, err := queryService.Begin(ctx, target, &query.ExecuteOptions{
tx, _, err := queryService.Begin(ctx, target, &query.ExecuteOptions{
// Make sure our tx is not killed by tx sniper
Workload: query.ExecuteOptions_DBA,
TransactionIsolation: query.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY,

Просмотреть файл

@ -72,6 +72,7 @@ message Session {
message ShardSession {
query.Target target = 1;
int64 transaction_id = 2;
topodata.TabletAlias tablet_alias = 3;
}
// shard_sessions keep track of per-shard transaction info.
repeated ShardSession shard_sessions = 2;