Add tracking session state changes for transaction start (#11061)

* Add tracking session state changes for transaction start

This logic depends on the custom option that PlanetScale as in it's
MySQL fork to get the current committed GTID at the start of a
transaction snapshot.

With this GTID, we can optimize vreplication to avoid having to lock
tables around vreplication operations.

See
9fa67ec3eb
for the change in MySQL that is published in our fork.

The change here allows getting back this value through any query
operation that might start a transaction.

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>

* Use 'session_track_gtids = START_GTID' where possible, and avoid LOCK tables while still getting a transaction consistent GTID

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>

Signed-off-by: Dirkjan Bussink <d.bussink@gmail.com>
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
Co-authored-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
This commit is contained in:
Dirkjan Bussink 2022-08-22 13:39:34 +02:00 коммит произвёл GitHub
Родитель 8846ffa490
Коммит ed9bf2427b
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
18 изменённых файлов: 1339 добавлений и 752 удалений

1330
go/vt/proto/query/query.pb.go сгенерированный

Разница между файлами не показана из-за своего большого размера Загрузить разницу

258
go/vt/proto/query/query_vtproto.pb.go сгенерированный
Просмотреть файл

@ -626,6 +626,13 @@ func (m *QueryResult) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x3a
}
if len(m.Info) > 0 {
i -= len(m.Info)
copy(dAtA[i:], m.Info)
@ -1274,6 +1281,13 @@ func (m *BeginResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error) {
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x1a
}
if m.TabletAlias != nil {
size, err := m.TabletAlias.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
@ -2497,6 +2511,13 @@ func (m *BeginExecuteResponse) MarshalToSizedBufferVT(dAtA []byte) (int, error)
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x2a
}
if m.TabletAlias != nil {
size, err := m.TabletAlias.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
@ -2662,6 +2683,13 @@ func (m *BeginStreamExecuteResponse) MarshalToSizedBufferVT(dAtA []byte) (int, e
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x2a
}
if m.TabletAlias != nil {
size, err := m.TabletAlias.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
@ -3399,6 +3427,13 @@ func (m *ReserveBeginExecuteResponse) MarshalToSizedBufferVT(dAtA []byte) (int,
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x32
}
if m.TabletAlias != nil {
size, err := m.TabletAlias.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
@ -3573,6 +3608,13 @@ func (m *ReserveBeginStreamExecuteResponse) MarshalToSizedBufferVT(dAtA []byte)
i -= len(m.unknownFields)
copy(dAtA[i:], m.unknownFields)
}
if len(m.SessionStateChanges) > 0 {
i -= len(m.SessionStateChanges)
copy(dAtA[i:], m.SessionStateChanges)
i = encodeVarint(dAtA, i, uint64(len(m.SessionStateChanges)))
i--
dAtA[i] = 0x32
}
if m.TabletAlias != nil {
size, err := m.TabletAlias.MarshalToSizedBufferVT(dAtA[:i])
if err != nil {
@ -4341,6 +4383,10 @@ func (m *QueryResult) SizeVT() (n int) {
if l > 0 {
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -4592,6 +4638,10 @@ func (m *BeginResponse) SizeVT() (n int) {
l = m.TabletAlias.SizeVT()
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -5086,6 +5136,10 @@ func (m *BeginExecuteResponse) SizeVT() (n int) {
l = m.TabletAlias.SizeVT()
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -5154,6 +5208,10 @@ func (m *BeginStreamExecuteResponse) SizeVT() (n int) {
l = m.TabletAlias.SizeVT()
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -5458,6 +5516,10 @@ func (m *ReserveBeginExecuteResponse) SizeVT() (n int) {
l = m.TabletAlias.SizeVT()
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -5532,6 +5594,10 @@ func (m *ReserveBeginStreamExecuteResponse) SizeVT() (n int) {
l = m.TabletAlias.SizeVT()
n += 1 + l + sov(uint64(l))
}
l = len(m.SessionStateChanges)
if l > 0 {
n += 1 + l + sov(uint64(l))
}
if m.unknownFields != nil {
n += len(m.unknownFields)
}
@ -7460,6 +7526,38 @@ func (m *QueryResult) UnmarshalVT(dAtA []byte) error {
}
m.Info = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -9023,6 +9121,38 @@ func (m *BeginResponse) UnmarshalVT(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -12061,6 +12191,38 @@ func (m *BeginExecuteResponse) UnmarshalVT(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -12521,6 +12683,38 @@ func (m *BeginStreamExecuteResponse) UnmarshalVT(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 5:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -14523,6 +14717,38 @@ func (m *ReserveBeginExecuteResponse) UnmarshalVT(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])
@ -15015,6 +15241,38 @@ func (m *ReserveBeginStreamExecuteResponse) UnmarshalVT(dAtA []byte) error {
return err
}
iNdEx = postIndex
case 6:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field SessionStateChanges", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflow
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLength
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLength
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.SessionStateChanges = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skip(dAtA[iNdEx:])

Просмотреть файл

@ -36,11 +36,12 @@ import (
// It's not thread safe, but you can create multiple clients that point to the
// same server.
type QueryClient struct {
ctx context.Context
target *querypb.Target
server *tabletserver.TabletServer
transactionID int64
reservedID int64
ctx context.Context
target *querypb.Target
server *tabletserver.TabletServer
transactionID int64
reservedID int64
sessionStateChanges string
}
// NewClient creates a new client for Server.
@ -94,6 +95,7 @@ func (client *QueryClient) Begin(clientFoundRows bool) error {
return err
}
client.transactionID = state.TransactionID
client.sessionStateChanges = state.SessionStateChanges
return nil
}
@ -192,6 +194,7 @@ func (client *QueryClient) BeginExecute(query string, bindvars map[string]*query
&querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL},
)
client.transactionID = state.TransactionID
client.sessionStateChanges = state.SessionStateChanges
if err != nil {
return nil, err
}
@ -259,6 +262,7 @@ func (client *QueryClient) StreamBeginExecuteWithOptions(query string, preQuerie
},
)
client.transactionID = state.TransactionID
client.sessionStateChanges = state.SessionStateChanges
if err != nil {
return nil, err
}
@ -311,6 +315,7 @@ func (client *QueryClient) ReserveBeginExecute(query string, preQueries []string
state, qr, err := client.server.ReserveBeginExecute(client.ctx, client.target, preQueries, postBeginQueries, query, bindvars, &querypb.ExecuteOptions{IncludedFields: querypb.ExecuteOptions_ALL})
client.transactionID = state.TransactionID
client.reservedID = state.ReservedID
client.sessionStateChanges = state.SessionStateChanges
if err != nil {
return nil, err
}

Просмотреть файл

@ -85,8 +85,9 @@ func (q *query) Begin(ctx context.Context, request *querypb.BeginRequest) (respo
}
return &querypb.BeginResponse{
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
}, nil
}
@ -252,9 +253,10 @@ func (q *query) BeginExecute(ctx context.Context, request *querypb.BeginExecuteR
return nil, vterrors.ToGRPC(err)
}
return &querypb.BeginExecuteResponse{
Result: sqltypes.ResultToProto3(result),
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
Result: sqltypes.ResultToProto3(result),
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
}, nil
}
@ -271,8 +273,9 @@ func (q *query) BeginStreamExecute(request *querypb.BeginStreamExecuteRequest, s
})
})
errInLastPacket := stream.Send(&querypb.BeginStreamExecuteResponse{
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
})
if err != nil {
return vterrors.ToGRPC(err)
@ -419,19 +422,21 @@ func (q *query) ReserveBeginExecute(ctx context.Context, request *querypb.Reserv
// if we have a valid reservedID, return the error in-band
if state.ReservedID != 0 {
return &querypb.ReserveBeginExecuteResponse{
Error: vterrors.ToVTRPC(err),
TransactionId: state.TransactionID,
ReservedId: state.ReservedID,
TabletAlias: state.TabletAlias,
Error: vterrors.ToVTRPC(err),
TransactionId: state.TransactionID,
ReservedId: state.ReservedID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
}, nil
}
return nil, vterrors.ToGRPC(err)
}
return &querypb.ReserveBeginExecuteResponse{
Result: sqltypes.ResultToProto3(result),
TransactionId: state.TransactionID,
ReservedId: state.ReservedID,
TabletAlias: state.TabletAlias,
Result: sqltypes.ResultToProto3(result),
TransactionId: state.TransactionID,
ReservedId: state.ReservedID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
}, nil
}
@ -448,9 +453,10 @@ func (q *query) ReserveBeginStreamExecute(request *querypb.ReserveBeginStreamExe
})
})
errInLastPacket := stream.Send(&querypb.ReserveBeginStreamExecuteResponse{
ReservedId: state.ReservedID,
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
ReservedId: state.ReservedID,
TransactionId: state.TransactionID,
TabletAlias: state.TabletAlias,
SessionStateChanges: state.SessionStateChanges,
})
if err != nil {
return vterrors.ToGRPC(err)

Просмотреть файл

@ -198,6 +198,7 @@ func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target,
}
state.TransactionID = br.TransactionId
state.TabletAlias = br.TabletAlias
state.SessionStateChanges = br.SessionStateChanges
return state, nil
}
@ -445,6 +446,7 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T
}
state.TransactionID = reply.TransactionId
state.TabletAlias = conn.tablet.Alias
state.SessionStateChanges = reply.SessionStateChanges
if reply.Error != nil {
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}
@ -496,6 +498,9 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que
if state.TabletAlias == nil && ser.GetTabletAlias() != nil {
state.TabletAlias = ser.GetTabletAlias()
}
if state.SessionStateChanges == "" && ser.GetSessionStateChanges() != "" {
state.SessionStateChanges = ser.GetSessionStateChanges()
}
if err != nil {
return state, tabletconn.ErrorFromGRPC(err)
@ -779,6 +784,7 @@ func (conn *gRPCQueryClient) ReserveBeginExecute(ctx context.Context, target *qu
state.ReservedID = reply.ReservedId
state.TransactionID = reply.TransactionId
state.TabletAlias = conn.tablet.Alias
state.SessionStateChanges = reply.SessionStateChanges
if reply.Error != nil {
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}
@ -835,6 +841,9 @@ func (conn *gRPCQueryClient) ReserveBeginStreamExecute(ctx context.Context, targ
if state.TabletAlias == nil && ser.GetTabletAlias() != nil {
state.TabletAlias = ser.GetTabletAlias()
}
if state.SessionStateChanges == "" && ser.GetSessionStateChanges() != "" {
state.SessionStateChanges = ser.GetSessionStateChanges()
}
if err != nil {
return state, tabletconn.ErrorFromGRPC(err)

Просмотреть файл

@ -122,8 +122,9 @@ type QueryService interface {
}
type TransactionState struct {
TransactionID int64
TabletAlias *topodatapb.TabletAlias
TransactionID int64
TabletAlias *topodatapb.TabletAlias
SessionStateChanges string
}
type ReservedState struct {
@ -132,7 +133,8 @@ type ReservedState struct {
}
type ReservedTransactionState struct {
ReservedID int64
TransactionID int64
TabletAlias *topodatapb.TabletAlias
ReservedID int64
TransactionID int64
TabletAlias *topodatapb.TabletAlias
SessionStateChanges string
}

Просмотреть файл

@ -187,7 +187,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT
conn, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
if err != nil {
return nil, err
@ -198,7 +198,7 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
conn, beginSQL, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil)
if err != nil {
return nil, err
}

Просмотреть файл

@ -17,6 +17,7 @@ limitations under the License.
package tabletserver
import (
"context"
"fmt"
"time"
@ -32,8 +33,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"context"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
@ -103,14 +102,15 @@ func (sc *StatefulConnection) Exec(ctx context.Context, query string, maxrows in
return r, nil
}
func (sc *StatefulConnection) execWithRetry(ctx context.Context, query string, maxrows int, wantfields bool) error {
func (sc *StatefulConnection) execWithRetry(ctx context.Context, query string, maxrows int, wantfields bool) (string, error) {
if sc.IsClosed() {
return vterrors.New(vtrpcpb.Code_CANCELED, "connection is closed")
return "", vterrors.New(vtrpcpb.Code_CANCELED, "connection is closed")
}
if _, err := sc.dbConn.Exec(ctx, query, maxrows, wantfields); err != nil {
return err
res, err := sc.dbConn.Exec(ctx, query, maxrows, wantfields)
if err != nil {
return "", err
}
return nil
return res.SessionStateChanges, nil
}
// FetchNext returns the next result set.

Просмотреть файл

@ -491,8 +491,9 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, preQ
if tsv.txThrottler.Throttle() {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
}
transactionID, beginSQL, err := tsv.te.Begin(ctx, preQueries, reservedID, options)
transactionID, beginSQL, sessionStateChanges, err := tsv.te.Begin(ctx, preQueries, reservedID, options)
state.TransactionID = transactionID
state.SessionStateChanges = sessionStateChanges
logStats.TransactionID = transactionID
logStats.ReservedID = reservedID
@ -1099,6 +1100,7 @@ func (tsv *TabletServer) VStreamResults(ctx context.Context, target *querypb.Tar
// ReserveBeginExecute implements the QueryService interface
func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, postBeginQueries []string, sql string, bindVariables map[string]*querypb.BindVariable, options *querypb.ExecuteOptions) (state queryservice.ReservedTransactionState, result *sqltypes.Result, err error) {
var connID int64
var sessionStateChanges string
state.TabletAlias = tsv.alias
err = tsv.execRequest(
@ -1107,7 +1109,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
connID, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
}
@ -1122,6 +1124,7 @@ func (tsv *TabletServer) ReserveBeginExecute(ctx context.Context, target *queryp
}
state.ReservedID = connID
state.TransactionID = connID
state.SessionStateChanges = sessionStateChanges
result, err = tsv.Execute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, options)
return state, result, err
@ -1139,7 +1142,7 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
callback func(*sqltypes.Result) error,
) (state queryservice.ReservedTransactionState, err error) {
var connID int64
state.TabletAlias = tsv.alias
var sessionStateChanges string
err = tsv.execRequest(
ctx, tsv.QueryTimeout.Get(),
@ -1147,7 +1150,7 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
defer tsv.stats.QueryTimings.Record("RESERVE", time.Now())
connID, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
connID, sessionStateChanges, err = tsv.te.ReserveBegin(ctx, options, preQueries, postBeginQueries)
if err != nil {
return err
}
@ -1163,6 +1166,7 @@ func (tsv *TabletServer) ReserveBeginStreamExecute(
state.ReservedID = connID
state.TransactionID = connID
state.TabletAlias = tsv.alias
state.SessionStateChanges = sessionStateChanges
err = tsv.StreamExecute(ctx, target, sql, bindVariables, state.TransactionID, state.ReservedID, options, callback)
return state, err

Просмотреть файл

@ -17,6 +17,7 @@ limitations under the License.
package tabletserver
import (
"context"
"fmt"
"sync"
"time"
@ -25,8 +26,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"context"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
@ -224,21 +223,21 @@ func (te *TxEngine) isTxPoolAvailable(addToWaitGroup func(int)) error {
// statement(s) used to execute the begin (if any).
//
// Subsequent statements can access the connection through the transaction id.
func (te *TxEngine) Begin(ctx context.Context, preQueries []string, reservedID int64, options *querypb.ExecuteOptions) (int64, string, error) {
func (te *TxEngine) Begin(ctx context.Context, preQueries []string, reservedID int64, options *querypb.ExecuteOptions) (int64, string, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.Begin")
defer span.Finish()
err := te.isTxPoolAvailable(te.beginRequests.Add)
if err != nil {
return 0, "", err
return 0, "", "", err
}
defer te.beginRequests.Done()
conn, beginSQL, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, preQueries)
conn, beginSQL, sessionStateChanges, err := te.txPool.Begin(ctx, options, te.state == AcceptingReadOnly, reservedID, preQueries)
if err != nil {
return 0, "", err
return 0, "", "", err
}
defer conn.UnlockUpdateTime()
return conn.ReservedID(), beginSQL, err
return conn.ReservedID(), beginSQL, sessionStateChanges, err
}
// Commit commits the specified transaction and renews connection id if one exists.
@ -390,7 +389,7 @@ outer:
if txid > maxid {
maxid = txid
}
conn, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
allErr.RecordError(err)
continue
@ -505,27 +504,27 @@ func (te *TxEngine) stopWatchdog() {
}
// ReserveBegin creates a reserved connection, and in it opens a transaction
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, postBeginQueries []string) (int64, error) {
func (te *TxEngine) ReserveBegin(ctx context.Context, options *querypb.ExecuteOptions, preQueries []string, postBeginQueries []string) (int64, string, error) {
span, ctx := trace.NewSpan(ctx, "TxEngine.ReserveBegin")
defer span.Finish()
err := te.isTxPoolAvailable(te.beginRequests.Add)
if err != nil {
return 0, err
return 0, "", err
}
defer te.beginRequests.Done()
conn, err := te.reserve(ctx, options, preQueries)
if err != nil {
return 0, err
return 0, "", err
}
defer conn.UnlockUpdateTime()
_, err = te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, postBeginQueries)
_, sessionStateChanges, err := te.txPool.begin(ctx, options, te.state == AcceptingReadOnly, conn, postBeginQueries)
if err != nil {
conn.Close()
conn.Release(tx.ConnInitFail)
return 0, err
return 0, "", err
}
return conn.ReservedID(), nil
return conn.ReservedID(), sessionStateChanges, nil
}
var noop = func(int) {}

Просмотреть файл

@ -17,6 +17,7 @@ limitations under the License.
package tabletserver
import (
"context"
"errors"
"fmt"
"strings"
@ -35,8 +36,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"context"
querypb "vitess.io/vitess/go/vt/proto/query"
)
@ -59,11 +58,11 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with timeout wait.
te.AcceptReadWrite()
c, beginSQL, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, beginSQL, _, err := te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
require.Equal(t, "begin", beginSQL)
c.Unlock()
c, beginSQL, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, beginSQL, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
require.Equal(t, "begin", beginSQL)
c.Unlock()
@ -75,7 +74,7 @@ func TestTxEngineClose(t *testing.T) {
// Immediate close.
te.AcceptReadOnly()
c, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
t.Fatal(err)
}
@ -87,7 +86,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period.
te.shutdownGracePeriod = 25 * time.Millisecond
te.AcceptReadWrite()
c, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
c.Unlock()
start = time.Now()
@ -98,7 +97,7 @@ func TestTxEngineClose(t *testing.T) {
// Normal close with short grace period, but pool gets empty early.
te.shutdownGracePeriod = 25 * time.Millisecond
te.AcceptReadWrite()
c, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
c.Unlock()
go func() {
@ -114,7 +113,7 @@ func TestTxEngineClose(t *testing.T) {
// Immediate close, but connection is in use.
te.AcceptReadOnly()
c, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
c, _, _, err = te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
go func() {
time.Sleep(100 * time.Millisecond)
@ -135,7 +134,7 @@ func TestTxEngineClose(t *testing.T) {
te.AcceptReadWrite()
_, err = te.Reserve(ctx, &querypb.ExecuteOptions{}, 0, nil)
require.NoError(t, err)
_, err = te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil, nil)
_, _, err = te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil, nil)
require.NoError(t, err)
start = time.Now()
te.Close()
@ -152,17 +151,17 @@ func TestTxEngineBegin(t *testing.T) {
config.DB = newDBConfigs(db)
te := NewTxEngine(tabletenv.NewEnv(config, "TabletServerTest"))
for _, exec := range []func() (int64, error){
func() (int64, error) {
tx, _, err := te.Begin(ctx, nil, 0, &querypb.ExecuteOptions{})
return tx, err
for _, exec := range []func() (int64, string, error){
func() (int64, string, error) {
tx, _, schemaStateChanges, err := te.Begin(ctx, nil, 0, &querypb.ExecuteOptions{})
return tx, schemaStateChanges, err
},
func() (int64, error) {
func() (int64, string, error) {
return te.ReserveBegin(ctx, &querypb.ExecuteOptions{}, nil, nil)
},
} {
te.AcceptReadOnly()
tx1, err := exec()
tx1, _, err := exec()
require.NoError(t, err)
_, _, err = te.Commit(ctx, tx1)
require.NoError(t, err)
@ -170,7 +169,7 @@ func TestTxEngineBegin(t *testing.T) {
db.ResetQueryLog()
te.AcceptReadWrite()
tx2, err := exec()
tx2, _, err := exec()
require.NoError(t, err)
_, _, err = te.Commit(ctx, tx2)
require.NoError(t, err)
@ -178,11 +177,11 @@ func TestTxEngineBegin(t *testing.T) {
db.ResetQueryLog()
te.transition(Transitioning)
_, err = exec()
_, _, err = exec()
assert.EqualError(t, err, "tx engine can't accept new connections in state Transitioning")
te.transition(NotServing)
_, err = exec()
_, _, err = exec()
assert.EqualError(t, err, "tx engine can't accept new connections in state NotServing")
}
@ -197,7 +196,7 @@ func TestTxEngineRenewFails(t *testing.T) {
te := NewTxEngine(tabletenv.NewEnv(config, "TabletServerTest"))
te.AcceptReadOnly()
options := &querypb.ExecuteOptions{}
connID, err := te.ReserveBegin(ctx, options, nil, nil)
connID, _, err := te.ReserveBegin(ctx, options, nil, nil)
require.NoError(t, err)
conn, err := te.txPool.GetAndLock(connID, "for test")
@ -552,7 +551,7 @@ func startTransaction(te *TxEngine, writeTransaction bool) error {
} else {
options.TransactionIsolation = querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY
}
_, _, err := te.Begin(context.Background(), nil, 0, options)
_, _, _, err := te.Begin(context.Background(), nil, 0, options)
return err
}
@ -568,7 +567,7 @@ func TestTxEngineFailReserve(t *testing.T) {
_, err := te.Reserve(ctx, options, 0, nil)
assert.EqualError(t, err, "tx engine can't accept new connections in state NotServing")
_, err = te.ReserveBegin(ctx, options, nil, nil)
_, _, err = te.ReserveBegin(ctx, options, nil, nil)
assert.EqualError(t, err, "tx engine can't accept new connections in state NotServing")
te.AcceptReadOnly()
@ -577,14 +576,14 @@ func TestTxEngineFailReserve(t *testing.T) {
_, err = te.Reserve(ctx, options, 0, []string{"dummy_query"})
assert.EqualError(t, err, "unknown error: failed executing dummy_query (errno 1105) (sqlstate HY000) during query: dummy_query")
_, err = te.ReserveBegin(ctx, options, []string{"dummy_query"}, nil)
_, _, err = te.ReserveBegin(ctx, options, []string{"dummy_query"}, nil)
assert.EqualError(t, err, "unknown error: failed executing dummy_query (errno 1105) (sqlstate HY000) during query: dummy_query")
nonExistingID := int64(42)
_, err = te.Reserve(ctx, options, nonExistingID, nil)
assert.EqualError(t, err, "transaction 42: not found")
txID, _, err := te.Begin(ctx, nil, 0, options)
txID, _, _, err := te.Begin(ctx, nil, 0, options)
require.NoError(t, err)
conn, err := te.txPool.GetAndLock(txID, "for test")
require.NoError(t, err)

Просмотреть файл

@ -17,12 +17,11 @@ limitations under the License.
package tabletserver
import (
"context"
"time"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"context"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/log"
@ -118,7 +117,7 @@ func (txe *TxExecutor) CommitPrepared(dtid string) error {
func (txe *TxExecutor) markFailed(ctx context.Context, dtid string) {
txe.te.env.Stats().InternalErrors.Add("TwopcCommit", 1)
txe.te.preparedPool.SetFailed(dtid)
conn, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txe.te.txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
log.Errorf("markFailed: Begin failed for dtid %s: %v", dtid, err)
return
@ -261,7 +260,7 @@ func (txe *TxExecutor) ReadTwopcInflight() (distributed []*tx.DistributedTx, pre
}
func (txe *TxExecutor) inTransaction(f func(*StatefulConnection) error) error {
conn, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txe.te.txPool.Begin(txe.ctx, &querypb.ExecuteOptions{}, false, 0, nil)
if err != nil {
return err
}

Просмотреть файл

@ -17,6 +17,7 @@ limitations under the License.
package tabletserver
import (
"context"
"sync"
"time"
@ -26,8 +27,6 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"context"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/trace"
@ -223,7 +222,7 @@ func (tp *TxPool) Rollback(ctx context.Context, txConn *StatefulConnection) erro
// the statements (if any) executed to initiate the transaction. In autocommit
// mode the statement will be "".
// The connection returned is locked for the callee and its responsibility is to unlock the connection.
func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, preQueries []string) (*StatefulConnection, string, error) {
func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, reservedID int64, preQueries []string) (*StatefulConnection, string, string, error) {
span, ctx := trace.NewSpan(ctx, "TxPool.Begin")
defer span.Finish()
@ -232,13 +231,13 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
if reservedID != 0 {
conn, err = tp.scp.GetAndLock(reservedID, "start transaction on reserve conn")
if err != nil {
return nil, "", vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", reservedID, err)
return nil, "", "", vterrors.Errorf(vtrpcpb.Code_ABORTED, "transaction %d: %v", reservedID, err)
}
} else {
immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)
if !tp.limiter.Get(immediateCaller, effectiveCaller) {
return nil, "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded")
return nil, "", "", vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "per-user transaction pool connection limit exceeded")
}
conn, err = tp.createConn(ctx, options)
defer func() {
@ -250,28 +249,28 @@ func (tp *TxPool) Begin(ctx context.Context, options *querypb.ExecuteOptions, re
}()
}
if err != nil {
return nil, "", err
return nil, "", "", err
}
sql, err := tp.begin(ctx, options, readOnly, conn, preQueries)
sql, sessionStateChanges, err := tp.begin(ctx, options, readOnly, conn, preQueries)
if err != nil {
conn.Close()
conn.Release(tx.ConnInitFail)
return nil, "", err
return nil, "", "", err
}
return conn, sql, nil
return conn, sql, sessionStateChanges, nil
}
func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection, preQueries []string) (string, error) {
func (tp *TxPool) begin(ctx context.Context, options *querypb.ExecuteOptions, readOnly bool, conn *StatefulConnection, preQueries []string) (string, string, error) {
immediateCaller := callerid.ImmediateCallerIDFromContext(ctx)
effectiveCaller := callerid.EffectiveCallerIDFromContext(ctx)
beginQueries, autocommit, err := createTransaction(ctx, options, conn, readOnly, preQueries)
beginQueries, autocommit, sessionStateChanges, err := createTransaction(ctx, options, conn, readOnly, preQueries)
if err != nil {
return "", err
return "", "", err
}
conn.txProps = tp.NewTxProps(immediateCaller, effectiveCaller, autocommit)
return beginQueries, nil
return beginQueries, sessionStateChanges, nil
}
func (tp *TxPool) createConn(ctx context.Context, options *querypb.ExecuteOptions) (*StatefulConnection, error) {
@ -291,39 +290,54 @@ func (tp *TxPool) createConn(ctx context.Context, options *querypb.ExecuteOption
return conn, nil
}
func createTransaction(ctx context.Context, options *querypb.ExecuteOptions, conn *StatefulConnection, readOnly bool, preQueries []string) (string, bool, error) {
func createTransaction(ctx context.Context, options *querypb.ExecuteOptions, conn *StatefulConnection, readOnly bool, preQueries []string) (string, bool, string, error) {
beginQueries := ""
autocommitTransaction := false
sessionStateChanges := ""
if queries, ok := txIsolations[options.GetTransactionIsolation()]; ok {
if options.GetTransactionIsolation() == querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY {
trackGtidQuery := "set session session_track_gtids = START_GTID"
_, err := conn.execWithRetry(ctx, trackGtidQuery, 1, false)
// We allow this to fail since this is a custom MySQL extension, but we return
// then if this query was executed or not.
//
// Callers also can know because the sessionStateChanges will be empty for a snapshot
// transaction and get GTID information in another (less efficient) way.
if err == nil {
beginQueries += trackGtidQuery + "; "
}
}
if queries.setIsolationLevel != "" {
txQuery := "set transaction isolation level " + queries.setIsolationLevel
if err := conn.execWithRetry(ctx, txQuery, 1, false); err != nil {
return "", false, err
if _, err := conn.execWithRetry(ctx, txQuery, 1, false); err != nil {
return "", false, "", err
}
beginQueries = queries.setIsolationLevel + "; "
beginQueries += queries.setIsolationLevel + "; "
}
beginSQL := queries.openTransaction
if readOnly &&
options.GetTransactionIsolation() != querypb.ExecuteOptions_CONSISTENT_SNAPSHOT_READ_ONLY {
beginSQL = "start transaction read only"
}
if err := conn.execWithRetry(ctx, beginSQL, 1, false); err != nil {
return "", false, err
var err error
sessionStateChanges, err = conn.execWithRetry(ctx, beginSQL, 1, false)
if err != nil {
return "", false, "", err
}
beginQueries = beginQueries + beginSQL
beginQueries += beginSQL
} else if options.GetTransactionIsolation() == querypb.ExecuteOptions_AUTOCOMMIT {
autocommitTransaction = true
} else {
return "", false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
return "", false, "", vterrors.Errorf(vtrpcpb.Code_INTERNAL, "don't know how to open a transaction of this type: %v", options.GetTransactionIsolation())
}
for _, preQuery := range preQueries {
if _, err := conn.Exec(ctx, preQuery, 1, false); err != nil {
return "", false, err
return "", false, "", err
}
}
return beginQueries, autocommitTransaction, nil
return beginQueries, autocommitTransaction, sessionStateChanges, nil
}
// LogActive causes all existing transactions to be logged when they complete.

Просмотреть файл

@ -44,7 +44,7 @@ func TestTxPoolExecuteCommit(t *testing.T) {
sql := "select 'this is a query'"
// begin a transaction and then return the connection
conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
id := conn.ReservedID()
@ -76,7 +76,7 @@ func TestTxPoolExecuteRollback(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
defer conn.Release(tx.TxRollback)
@ -94,7 +94,7 @@ func TestTxPoolExecuteRollbackOnClosedConn(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
defer conn.Release(tx.TxRollback)
@ -112,9 +112,9 @@ func TestTxPoolRollbackNonBusy(t *testing.T) {
defer closer()
// start two transactions, and mark one of them as unused
conn1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
conn2, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
conn2.Unlock() // this marks conn2 as NonBusy
@ -138,7 +138,7 @@ func TestTxPoolTransactionIsolation(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
c2, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil)
c2, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_READ_COMMITTED}, false, 0, nil)
require.NoError(t, err)
c2.Release(tx.TxClose)
@ -153,7 +153,7 @@ func TestTxPoolAutocommit(t *testing.T) {
// to mysql.
// This test is meaningful because if txPool.Begin were to send a BEGIN statement to the connection, it will fatal
// because is not in the list of expected queries (i.e db.AddQuery hasn't been called).
conn1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{TransactionIsolation: querypb.ExecuteOptions_AUTOCOMMIT}, false, 0, nil)
require.NoError(t, err)
// run a query to see it in the query log
@ -182,7 +182,7 @@ func TestTxPoolBeginWithPoolConnectionError_Errno2006_Transient(t *testing.T) {
err := db.WaitForClose(2 * time.Second)
require.NoError(t, err)
txConn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err, "Begin should have succeeded after the retry in DBConn.Exec()")
txConn.Release(tx.TxCommit)
}
@ -201,7 +201,7 @@ func primeTxPoolWithConnection(t *testing.T) (*fakesqldb.DB, *TxPool) {
// reused by subsequent transactions.
db.AddQuery("begin", &sqltypes.Result{})
db.AddQuery("rollback", &sqltypes.Result{})
txConn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
txConn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
txConn.Release(tx.TxCommit)
@ -221,7 +221,7 @@ func TestTxPoolBeginWithError(t *testing.T) {
}
ctxWithCallerID := callerid.NewContext(ctx, ef, im)
_, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
require.Error(t, err)
require.Contains(t, err.Error(), "error: rejected")
require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error")
@ -247,7 +247,7 @@ func TestTxPoolBeginWithPreQueryError(t *testing.T) {
db, txPool, _, closer := setup(t)
defer closer()
db.AddRejectedQuery("pre_query", errRejected)
_, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"})
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, []string{"pre_query"})
require.Error(t, err)
require.Contains(t, err.Error(), "error: rejected")
require.Equal(t, vtrpcpb.Code_UNKNOWN, vterrors.Code(err), "wrong error code for Begin error")
@ -261,7 +261,7 @@ func TestTxPoolCancelledContextError(t *testing.T) {
cancel()
// when
_, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
// then
require.Error(t, err)
@ -280,12 +280,12 @@ func TestTxPoolWaitTimeoutError(t *testing.T) {
defer closer()
// lock the only connection in the pool.
conn, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
defer conn.Unlock()
// try locking one more connection.
_, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
_, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
// then
require.Error(t, err)
@ -302,7 +302,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
defer closer()
db.AddRejectedQuery("rollback", errRejected)
conn1, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
_, err = conn1.Exec(ctx, sql, 1, true)
@ -319,7 +319,7 @@ func TestTxPoolRollbackFailIsPassedThrough(t *testing.T) {
func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
db, txPool, _, _ := setup(t)
defer db.Close()
conn1, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, _ := txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
id := conn1.ReservedID()
conn1.Unlock()
txPool.Close()
@ -341,7 +341,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
txPool, _ = newTxPool()
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
conn1, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, _ = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
id = conn1.ReservedID()
_, err := txPool.Commit(ctx, conn1)
require.NoError(t, err)
@ -355,7 +355,7 @@ func TestTxPoolGetConnRecentlyRemovedTransaction(t *testing.T) {
txPool.Open(db.ConnParams(), db.ConnParams(), db.ConnParams())
defer txPool.Close()
conn1, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
conn1, _, _, err = txPool.Begin(ctx, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err, "unable to start transaction: %v", err)
conn1.Unlock()
id = conn1.ReservedID()
@ -371,7 +371,7 @@ func TestTxPoolCloseKillsStrayTransactions(t *testing.T) {
startingStray := txPool.env.Stats().InternalErrors.Counts()["StrayTransactions"]
// Start stray transaction.
conn, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(context.Background(), &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
conn.Unlock()
@ -400,7 +400,7 @@ func TestTxTimeoutKillsTransactions(t *testing.T) {
ctxWithCallerID := callerid.NewContext(ctx, ef, im)
// Start transaction.
conn, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
conn, _, _, err := txPool.Begin(ctxWithCallerID, &querypb.ExecuteOptions{}, false, 0, nil)
require.NoError(t, err)
conn.Unlock()

Просмотреть файл

@ -47,7 +47,14 @@ func snapshotConnect(ctx context.Context, cp dbconfigs.Connector) (*snapshotConn
// startSnapshot starts a streaming query with a snapshot view of the specified table.
// It returns the gtid of the time when the snapshot was taken.
func (conn *snapshotConn) streamWithSnapshot(ctx context.Context, table, query string) (gtid string, err error) {
gtid, err = conn.startSnapshot(ctx, table)
_, err = conn.ExecuteFetch("set session session_track_gtids = START_GTID", 1, false)
if err != nil {
// session_track_gtids = START_GTID unsupported or cannot execute. Resort to LOCK-based snapshot
gtid, err = conn.startSnapshot(ctx, table)
} else {
// session_track_gtids = START_GTID supported. Get a transaction with consistent GTID without LOCKing tables.
gtid, err = conn.startSnapshotWithConsistentGTID(ctx)
}
if err != nil {
return "", err
}
@ -100,6 +107,29 @@ func (conn *snapshotConn) startSnapshot(ctx context.Context, table string) (gtid
return mysql.EncodePosition(mpos), nil
}
// startSnapshotWithConsistentGTID performs the snapshotting without locking tables. This assumes
// session_track_gtids = START_GTID, which is a contribution to MySQL and is not in vanilla MySQL at the
// time of this writing.
func (conn *snapshotConn) startSnapshotWithConsistentGTID(ctx context.Context) (gtid string, err error) {
if _, err := conn.ExecuteFetch("set transaction isolation level repeatable read", 1, false); err != nil {
return "", err
}
result, err := conn.ExecuteFetch("start transaction with consistent snapshot", 1, false)
if err != nil {
return "", err
}
// The "session_track_gtids = START_GTID" patch is only applicable to MySQL56 GTID, which is
// why we hardcode the position as mysql.Mysql56FlavorID
mpos, err := mysql.ParsePosition(mysql.Mysql56FlavorID, result.SessionStateChanges)
if err != nil {
return "", err
}
if _, err := conn.ExecuteFetch("set @@session.time_zone = '+00:00'", 1, false); err != nil {
return "", err
}
return mysql.EncodePosition(mpos), nil
}
// Close rollsback any open transactions and closes the connection.
func (conn *snapshotConn) Close() {
_, _ = conn.ExecuteFetch("rollback", 1, false)

Просмотреть файл

@ -388,6 +388,7 @@ message QueryResult {
uint64 insert_id = 3;
repeated Row rows = 4;
string info = 6;
string session_state_changes = 7;
}
// QueryWarning is used to convey out of band query execution warnings
@ -483,6 +484,9 @@ message BeginRequest {
message BeginResponse {
int64 transaction_id = 1;
topodata.TabletAlias tablet_alias = 2;
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 3;
}
// CommitRequest is the payload to Commit
@ -629,6 +633,9 @@ message BeginExecuteResponse {
// transaction_id might be non-zero even if an error is present.
int64 transaction_id = 3;
topodata.TabletAlias tablet_alias = 4;
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 5;
}
// BeginStreamExecuteRequest is the payload to BeginStreamExecute
@ -654,6 +661,9 @@ message BeginStreamExecuteResponse {
// transaction_id might be non-zero even if an error is present.
int64 transaction_id = 3;
topodata.TabletAlias tablet_alias = 4;
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 5;
}
// MessageStreamRequest is the request payload for MessageStream.
@ -753,6 +763,9 @@ message ReserveBeginExecuteResponse {
int64 transaction_id = 3;
int64 reserved_id = 4;
topodata.TabletAlias tablet_alias = 5;
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 6;
}
// ReserveBeginStreamExecuteRequest is the payload to ReserveBeginStreamExecute
@ -779,6 +792,9 @@ message ReserveBeginStreamExecuteResponse {
int64 transaction_id = 3;
int64 reserved_id = 4;
topodata.TabletAlias tablet_alias = 5;
// The session_state_changes might be set if the transaction is a snapshot transaction
// and the MySQL implementation supports getting a start gtid on snapshot
string session_state_changes = 6;
}
// ReleaseRequest is the payload to Release

36
web/vtadmin/src/proto/vtadmin.d.ts сгенерированный поставляемый
Просмотреть файл

@ -24026,6 +24026,9 @@ export namespace query {
/** QueryResult info */
info?: (string|null);
/** QueryResult session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a QueryResult. */
@ -24052,6 +24055,9 @@ export namespace query {
/** QueryResult info. */
public info: string;
/** QueryResult session_state_changes. */
public session_state_changes: string;
/**
* Creates a new QueryResult instance using the specified properties.
* @param [properties] Properties to set
@ -25086,6 +25092,9 @@ export namespace query {
/** BeginResponse tablet_alias */
tablet_alias?: (topodata.ITabletAlias|null);
/** BeginResponse session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a BeginResponse. */
@ -25103,6 +25112,9 @@ export namespace query {
/** BeginResponse tablet_alias. */
public tablet_alias?: (topodata.ITabletAlias|null);
/** BeginResponse session_state_changes. */
public session_state_changes: string;
/**
* Creates a new BeginResponse instance using the specified properties.
* @param [properties] Properties to set
@ -27282,6 +27294,9 @@ export namespace query {
/** BeginExecuteResponse tablet_alias */
tablet_alias?: (topodata.ITabletAlias|null);
/** BeginExecuteResponse session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a BeginExecuteResponse. */
@ -27305,6 +27320,9 @@ export namespace query {
/** BeginExecuteResponse tablet_alias. */
public tablet_alias?: (topodata.ITabletAlias|null);
/** BeginExecuteResponse session_state_changes. */
public session_state_changes: string;
/**
* Creates a new BeginExecuteResponse instance using the specified properties.
* @param [properties] Properties to set
@ -27516,6 +27534,9 @@ export namespace query {
/** BeginStreamExecuteResponse tablet_alias */
tablet_alias?: (topodata.ITabletAlias|null);
/** BeginStreamExecuteResponse session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a BeginStreamExecuteResponse. */
@ -27539,6 +27560,9 @@ export namespace query {
/** BeginStreamExecuteResponse tablet_alias. */
public tablet_alias?: (topodata.ITabletAlias|null);
/** BeginStreamExecuteResponse session_state_changes. */
public session_state_changes: string;
/**
* Creates a new BeginStreamExecuteResponse instance using the specified properties.
* @param [properties] Properties to set
@ -28623,6 +28647,9 @@ export namespace query {
/** ReserveBeginExecuteResponse tablet_alias */
tablet_alias?: (topodata.ITabletAlias|null);
/** ReserveBeginExecuteResponse session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a ReserveBeginExecuteResponse. */
@ -28649,6 +28676,9 @@ export namespace query {
/** ReserveBeginExecuteResponse tablet_alias. */
public tablet_alias?: (topodata.ITabletAlias|null);
/** ReserveBeginExecuteResponse session_state_changes. */
public session_state_changes: string;
/**
* Creates a new ReserveBeginExecuteResponse instance using the specified properties.
* @param [properties] Properties to set
@ -28863,6 +28893,9 @@ export namespace query {
/** ReserveBeginStreamExecuteResponse tablet_alias */
tablet_alias?: (topodata.ITabletAlias|null);
/** ReserveBeginStreamExecuteResponse session_state_changes */
session_state_changes?: (string|null);
}
/** Represents a ReserveBeginStreamExecuteResponse. */
@ -28889,6 +28922,9 @@ export namespace query {
/** ReserveBeginStreamExecuteResponse tablet_alias. */
public tablet_alias?: (topodata.ITabletAlias|null);
/** ReserveBeginStreamExecuteResponse session_state_changes. */
public session_state_changes: string;
/**
* Creates a new ReserveBeginStreamExecuteResponse instance using the specified properties.
* @param [properties] Properties to set

132
web/vtadmin/src/proto/vtadmin.js сгенерированный
Просмотреть файл

@ -55701,6 +55701,7 @@ $root.query = (function() {
* @property {number|Long|null} [insert_id] QueryResult insert_id
* @property {Array.<query.IRow>|null} [rows] QueryResult rows
* @property {string|null} [info] QueryResult info
* @property {string|null} [session_state_changes] QueryResult session_state_changes
*/
/**
@ -55760,6 +55761,14 @@ $root.query = (function() {
*/
QueryResult.prototype.info = "";
/**
* QueryResult session_state_changes.
* @member {string} session_state_changes
* @memberof query.QueryResult
* @instance
*/
QueryResult.prototype.session_state_changes = "";
/**
* Creates a new QueryResult instance using the specified properties.
* @function create
@ -55796,6 +55805,8 @@ $root.query = (function() {
$root.query.Row.encode(message.rows[i], writer.uint32(/* id 4, wireType 2 =*/34).fork()).ldelim();
if (message.info != null && Object.hasOwnProperty.call(message, "info"))
writer.uint32(/* id 6, wireType 2 =*/50).string(message.info);
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 7, wireType 2 =*/58).string(message.session_state_changes);
return writer;
};
@ -55849,6 +55860,9 @@ $root.query = (function() {
case 6:
message.info = reader.string();
break;
case 7:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -55911,6 +55925,9 @@ $root.query = (function() {
if (message.info != null && message.hasOwnProperty("info"))
if (!$util.isString(message.info))
return "info: string expected";
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -55966,6 +55983,8 @@ $root.query = (function() {
}
if (object.info != null)
message.info = String(object.info);
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -55998,6 +56017,7 @@ $root.query = (function() {
} else
object.insert_id = options.longs === String ? "0" : 0;
object.info = "";
object.session_state_changes = "";
}
if (message.fields && message.fields.length) {
object.fields = [];
@ -56021,6 +56041,8 @@ $root.query = (function() {
}
if (message.info != null && message.hasOwnProperty("info"))
object.info = message.info;
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};
@ -58479,6 +58501,7 @@ $root.query = (function() {
* @interface IBeginResponse
* @property {number|Long|null} [transaction_id] BeginResponse transaction_id
* @property {topodata.ITabletAlias|null} [tablet_alias] BeginResponse tablet_alias
* @property {string|null} [session_state_changes] BeginResponse session_state_changes
*/
/**
@ -58512,6 +58535,14 @@ $root.query = (function() {
*/
BeginResponse.prototype.tablet_alias = null;
/**
* BeginResponse session_state_changes.
* @member {string} session_state_changes
* @memberof query.BeginResponse
* @instance
*/
BeginResponse.prototype.session_state_changes = "";
/**
* Creates a new BeginResponse instance using the specified properties.
* @function create
@ -58540,6 +58571,8 @@ $root.query = (function() {
writer.uint32(/* id 1, wireType 0 =*/8).int64(message.transaction_id);
if (message.tablet_alias != null && Object.hasOwnProperty.call(message, "tablet_alias"))
$root.topodata.TabletAlias.encode(message.tablet_alias, writer.uint32(/* id 2, wireType 2 =*/18).fork()).ldelim();
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 3, wireType 2 =*/26).string(message.session_state_changes);
return writer;
};
@ -58580,6 +58613,9 @@ $root.query = (function() {
case 2:
message.tablet_alias = $root.topodata.TabletAlias.decode(reader, reader.uint32());
break;
case 3:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -58623,6 +58659,9 @@ $root.query = (function() {
if (error)
return "tablet_alias." + error;
}
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -58652,6 +58691,8 @@ $root.query = (function() {
throw TypeError(".query.BeginResponse.tablet_alias: object expected");
message.tablet_alias = $root.topodata.TabletAlias.fromObject(object.tablet_alias);
}
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -58675,6 +58716,7 @@ $root.query = (function() {
} else
object.transaction_id = options.longs === String ? "0" : 0;
object.tablet_alias = null;
object.session_state_changes = "";
}
if (message.transaction_id != null && message.hasOwnProperty("transaction_id"))
if (typeof message.transaction_id === "number")
@ -58683,6 +58725,8 @@ $root.query = (function() {
object.transaction_id = options.longs === String ? $util.Long.prototype.toString.call(message.transaction_id) : options.longs === Number ? new $util.LongBits(message.transaction_id.low >>> 0, message.transaction_id.high >>> 0).toNumber() : message.transaction_id;
if (message.tablet_alias != null && message.hasOwnProperty("tablet_alias"))
object.tablet_alias = $root.topodata.TabletAlias.toObject(message.tablet_alias, options);
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};
@ -63706,6 +63750,7 @@ $root.query = (function() {
* @property {query.IQueryResult|null} [result] BeginExecuteResponse result
* @property {number|Long|null} [transaction_id] BeginExecuteResponse transaction_id
* @property {topodata.ITabletAlias|null} [tablet_alias] BeginExecuteResponse tablet_alias
* @property {string|null} [session_state_changes] BeginExecuteResponse session_state_changes
*/
/**
@ -63755,6 +63800,14 @@ $root.query = (function() {
*/
BeginExecuteResponse.prototype.tablet_alias = null;
/**
* BeginExecuteResponse session_state_changes.
* @member {string} session_state_changes
* @memberof query.BeginExecuteResponse
* @instance
*/
BeginExecuteResponse.prototype.session_state_changes = "";
/**
* Creates a new BeginExecuteResponse instance using the specified properties.
* @function create
@ -63787,6 +63840,8 @@ $root.query = (function() {
writer.uint32(/* id 3, wireType 0 =*/24).int64(message.transaction_id);
if (message.tablet_alias != null && Object.hasOwnProperty.call(message, "tablet_alias"))
$root.topodata.TabletAlias.encode(message.tablet_alias, writer.uint32(/* id 4, wireType 2 =*/34).fork()).ldelim();
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 5, wireType 2 =*/42).string(message.session_state_changes);
return writer;
};
@ -63833,6 +63888,9 @@ $root.query = (function() {
case 4:
message.tablet_alias = $root.topodata.TabletAlias.decode(reader, reader.uint32());
break;
case 5:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -63886,6 +63944,9 @@ $root.query = (function() {
if (error)
return "tablet_alias." + error;
}
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -63925,6 +63986,8 @@ $root.query = (function() {
throw TypeError(".query.BeginExecuteResponse.tablet_alias: object expected");
message.tablet_alias = $root.topodata.TabletAlias.fromObject(object.tablet_alias);
}
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -63950,6 +64013,7 @@ $root.query = (function() {
} else
object.transaction_id = options.longs === String ? "0" : 0;
object.tablet_alias = null;
object.session_state_changes = "";
}
if (message.error != null && message.hasOwnProperty("error"))
object.error = $root.vtrpc.RPCError.toObject(message.error, options);
@ -63962,6 +64026,8 @@ $root.query = (function() {
object.transaction_id = options.longs === String ? $util.Long.prototype.toString.call(message.transaction_id) : options.longs === Number ? new $util.LongBits(message.transaction_id.low >>> 0, message.transaction_id.high >>> 0).toNumber() : message.transaction_id;
if (message.tablet_alias != null && message.hasOwnProperty("tablet_alias"))
object.tablet_alias = $root.topodata.TabletAlias.toObject(message.tablet_alias, options);
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};
@ -64365,6 +64431,7 @@ $root.query = (function() {
* @property {query.IQueryResult|null} [result] BeginStreamExecuteResponse result
* @property {number|Long|null} [transaction_id] BeginStreamExecuteResponse transaction_id
* @property {topodata.ITabletAlias|null} [tablet_alias] BeginStreamExecuteResponse tablet_alias
* @property {string|null} [session_state_changes] BeginStreamExecuteResponse session_state_changes
*/
/**
@ -64414,6 +64481,14 @@ $root.query = (function() {
*/
BeginStreamExecuteResponse.prototype.tablet_alias = null;
/**
* BeginStreamExecuteResponse session_state_changes.
* @member {string} session_state_changes
* @memberof query.BeginStreamExecuteResponse
* @instance
*/
BeginStreamExecuteResponse.prototype.session_state_changes = "";
/**
* Creates a new BeginStreamExecuteResponse instance using the specified properties.
* @function create
@ -64446,6 +64521,8 @@ $root.query = (function() {
writer.uint32(/* id 3, wireType 0 =*/24).int64(message.transaction_id);
if (message.tablet_alias != null && Object.hasOwnProperty.call(message, "tablet_alias"))
$root.topodata.TabletAlias.encode(message.tablet_alias, writer.uint32(/* id 4, wireType 2 =*/34).fork()).ldelim();
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 5, wireType 2 =*/42).string(message.session_state_changes);
return writer;
};
@ -64492,6 +64569,9 @@ $root.query = (function() {
case 4:
message.tablet_alias = $root.topodata.TabletAlias.decode(reader, reader.uint32());
break;
case 5:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -64545,6 +64625,9 @@ $root.query = (function() {
if (error)
return "tablet_alias." + error;
}
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -64584,6 +64667,8 @@ $root.query = (function() {
throw TypeError(".query.BeginStreamExecuteResponse.tablet_alias: object expected");
message.tablet_alias = $root.topodata.TabletAlias.fromObject(object.tablet_alias);
}
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -64609,6 +64694,7 @@ $root.query = (function() {
} else
object.transaction_id = options.longs === String ? "0" : 0;
object.tablet_alias = null;
object.session_state_changes = "";
}
if (message.error != null && message.hasOwnProperty("error"))
object.error = $root.vtrpc.RPCError.toObject(message.error, options);
@ -64621,6 +64707,8 @@ $root.query = (function() {
object.transaction_id = options.longs === String ? $util.Long.prototype.toString.call(message.transaction_id) : options.longs === Number ? new $util.LongBits(message.transaction_id.low >>> 0, message.transaction_id.high >>> 0).toNumber() : message.transaction_id;
if (message.tablet_alias != null && message.hasOwnProperty("tablet_alias"))
object.tablet_alias = $root.topodata.TabletAlias.toObject(message.tablet_alias, options);
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};
@ -67312,6 +67400,7 @@ $root.query = (function() {
* @property {number|Long|null} [transaction_id] ReserveBeginExecuteResponse transaction_id
* @property {number|Long|null} [reserved_id] ReserveBeginExecuteResponse reserved_id
* @property {topodata.ITabletAlias|null} [tablet_alias] ReserveBeginExecuteResponse tablet_alias
* @property {string|null} [session_state_changes] ReserveBeginExecuteResponse session_state_changes
*/
/**
@ -67369,6 +67458,14 @@ $root.query = (function() {
*/
ReserveBeginExecuteResponse.prototype.tablet_alias = null;
/**
* ReserveBeginExecuteResponse session_state_changes.
* @member {string} session_state_changes
* @memberof query.ReserveBeginExecuteResponse
* @instance
*/
ReserveBeginExecuteResponse.prototype.session_state_changes = "";
/**
* Creates a new ReserveBeginExecuteResponse instance using the specified properties.
* @function create
@ -67403,6 +67500,8 @@ $root.query = (function() {
writer.uint32(/* id 4, wireType 0 =*/32).int64(message.reserved_id);
if (message.tablet_alias != null && Object.hasOwnProperty.call(message, "tablet_alias"))
$root.topodata.TabletAlias.encode(message.tablet_alias, writer.uint32(/* id 5, wireType 2 =*/42).fork()).ldelim();
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 6, wireType 2 =*/50).string(message.session_state_changes);
return writer;
};
@ -67452,6 +67551,9 @@ $root.query = (function() {
case 5:
message.tablet_alias = $root.topodata.TabletAlias.decode(reader, reader.uint32());
break;
case 6:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -67508,6 +67610,9 @@ $root.query = (function() {
if (error)
return "tablet_alias." + error;
}
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -67556,6 +67661,8 @@ $root.query = (function() {
throw TypeError(".query.ReserveBeginExecuteResponse.tablet_alias: object expected");
message.tablet_alias = $root.topodata.TabletAlias.fromObject(object.tablet_alias);
}
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -67586,6 +67693,7 @@ $root.query = (function() {
} else
object.reserved_id = options.longs === String ? "0" : 0;
object.tablet_alias = null;
object.session_state_changes = "";
}
if (message.error != null && message.hasOwnProperty("error"))
object.error = $root.vtrpc.RPCError.toObject(message.error, options);
@ -67603,6 +67711,8 @@ $root.query = (function() {
object.reserved_id = options.longs === String ? $util.Long.prototype.toString.call(message.reserved_id) : options.longs === Number ? new $util.LongBits(message.reserved_id.low >>> 0, message.reserved_id.high >>> 0).toNumber() : message.reserved_id;
if (message.tablet_alias != null && message.hasOwnProperty("tablet_alias"))
object.tablet_alias = $root.topodata.TabletAlias.toObject(message.tablet_alias, options);
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};
@ -68010,6 +68120,7 @@ $root.query = (function() {
* @property {number|Long|null} [transaction_id] ReserveBeginStreamExecuteResponse transaction_id
* @property {number|Long|null} [reserved_id] ReserveBeginStreamExecuteResponse reserved_id
* @property {topodata.ITabletAlias|null} [tablet_alias] ReserveBeginStreamExecuteResponse tablet_alias
* @property {string|null} [session_state_changes] ReserveBeginStreamExecuteResponse session_state_changes
*/
/**
@ -68067,6 +68178,14 @@ $root.query = (function() {
*/
ReserveBeginStreamExecuteResponse.prototype.tablet_alias = null;
/**
* ReserveBeginStreamExecuteResponse session_state_changes.
* @member {string} session_state_changes
* @memberof query.ReserveBeginStreamExecuteResponse
* @instance
*/
ReserveBeginStreamExecuteResponse.prototype.session_state_changes = "";
/**
* Creates a new ReserveBeginStreamExecuteResponse instance using the specified properties.
* @function create
@ -68101,6 +68220,8 @@ $root.query = (function() {
writer.uint32(/* id 4, wireType 0 =*/32).int64(message.reserved_id);
if (message.tablet_alias != null && Object.hasOwnProperty.call(message, "tablet_alias"))
$root.topodata.TabletAlias.encode(message.tablet_alias, writer.uint32(/* id 5, wireType 2 =*/42).fork()).ldelim();
if (message.session_state_changes != null && Object.hasOwnProperty.call(message, "session_state_changes"))
writer.uint32(/* id 6, wireType 2 =*/50).string(message.session_state_changes);
return writer;
};
@ -68150,6 +68271,9 @@ $root.query = (function() {
case 5:
message.tablet_alias = $root.topodata.TabletAlias.decode(reader, reader.uint32());
break;
case 6:
message.session_state_changes = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
@ -68206,6 +68330,9 @@ $root.query = (function() {
if (error)
return "tablet_alias." + error;
}
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
if (!$util.isString(message.session_state_changes))
return "session_state_changes: string expected";
return null;
};
@ -68254,6 +68381,8 @@ $root.query = (function() {
throw TypeError(".query.ReserveBeginStreamExecuteResponse.tablet_alias: object expected");
message.tablet_alias = $root.topodata.TabletAlias.fromObject(object.tablet_alias);
}
if (object.session_state_changes != null)
message.session_state_changes = String(object.session_state_changes);
return message;
};
@ -68284,6 +68413,7 @@ $root.query = (function() {
} else
object.reserved_id = options.longs === String ? "0" : 0;
object.tablet_alias = null;
object.session_state_changes = "";
}
if (message.error != null && message.hasOwnProperty("error"))
object.error = $root.vtrpc.RPCError.toObject(message.error, options);
@ -68301,6 +68431,8 @@ $root.query = (function() {
object.reserved_id = options.longs === String ? $util.Long.prototype.toString.call(message.reserved_id) : options.longs === Number ? new $util.LongBits(message.reserved_id.low >>> 0, message.reserved_id.high >>> 0).toNumber() : message.reserved_id;
if (message.tablet_alias != null && message.hasOwnProperty("tablet_alias"))
object.tablet_alias = $root.topodata.TabletAlias.toObject(message.tablet_alias, options);
if (message.session_state_changes != null && message.hasOwnProperty("session_state_changes"))
object.session_state_changes = message.session_state_changes;
return object;
};