Change UnsupportedNewCommit and UnsupportedNewRollback to match the new proto3 interface, revert client changes to Rollback and Commit

This commit is contained in:
Ammar Aijazi 2015-06-11 19:01:55 -07:00
Родитель 820d9dbffd
Коммит 3da1d6d4de
5 изменённых файлов: 102 добавлений и 99 удалений

Просмотреть файл

@ -45,10 +45,14 @@ func (sq *SqlQuery) Begin(ctx context.Context, session *proto.Session, txInfo *p
// UnsupportedNewCommit should not be used by anything other than tests.
// It will eventually replace Commit, but it breaks compatibility with older clients.
// Once all clients are upgraded, it can be replaced.
func (sq *SqlQuery) UnsupportedNewCommit(ctx context.Context, session *proto.Session, reply *proto.ErrorOnly) (err error) {
func (sq *SqlQuery) UnsupportedNewCommit(ctx context.Context, commitRequest *proto.CommitRequest, commitResponse *proto.CommitResponse) (err error) {
defer sq.server.HandlePanic(&err)
session := &proto.Session{
SessionId: commitRequest.SessionId,
TransactionId: commitRequest.TransactionId,
}
tErr := sq.server.Commit(callinfo.RPCWrapCallInfo(ctx), session)
tabletserver.AddTabletErrorToErrorOnly(tErr, reply)
tabletserver.AddTabletErrorToCommitResponse(tErr, commitResponse)
if *tabletserver.RPCErrorOnlyInReply {
return nil
}
@ -64,10 +68,14 @@ func (sq *SqlQuery) Commit(ctx context.Context, session *proto.Session, noOutput
// UnsupportedNewRollback should not be used by anything other than tests.
// It will eventually replace Rollback, but it breaks compatibility with older clients.
// Once all clients are upgraded, it can be replaced.
func (sq *SqlQuery) UnsupportedNewRollback(ctx context.Context, session *proto.Session, reply *proto.ErrorOnly) (err error) {
func (sq *SqlQuery) UnsupportedNewRollback(ctx context.Context, rollbackRequest *proto.RollbackRequest, rollbackResponse *proto.RollbackResponse) (err error) {
defer sq.server.HandlePanic(&err)
session := &proto.Session{
SessionId: rollbackRequest.SessionId,
TransactionId: rollbackRequest.TransactionId,
}
tErr := sq.server.Rollback(callinfo.RPCWrapCallInfo(ctx), session)
tabletserver.AddTabletErrorToErrorOnly(tErr, reply)
tabletserver.AddTabletErrorToRollbackResponse(tErr, rollbackResponse)
if *tabletserver.RPCErrorOnlyInReply {
return nil
}

Просмотреть файл

@ -16,6 +16,7 @@ import (
"github.com/youtube/vitess/go/netutil"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/rpc"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/tabletserver/tabletconn"
"github.com/youtube/vitess/go/vt/topo"
@ -240,14 +241,8 @@ func (conn *TabletBson) Commit(ctx context.Context, transactionID int64) error {
SessionId: conn.sessionID,
TransactionId: transactionID,
}
var errReply tproto.ErrorOnly
action := func() error {
err := conn.rpcClient.Call(ctx, "SqlQuery.Commit", req, &errReply)
if err != nil {
return err
}
// SqlQuery.Commit might return an application error inside the ErrorOnly
return vterrors.FromRPCError(errReply.Err)
return conn.rpcClient.Call(ctx, "SqlQuery.Commit", req, &rpc.Unused{})
}
err := conn.withTimeout(ctx, action)
return tabletError(err)
@ -263,18 +258,18 @@ func (conn *TabletBson) UnsupportedNewCommit(ctx context.Context, transactionID
return tabletconn.ConnClosed
}
req := &tproto.Session{
commitRequest := &tproto.CommitRequest{
SessionId: conn.sessionID,
TransactionId: transactionID,
}
var errReply tproto.ErrorOnly
commitResponse := new(tproto.CommitResponse)
action := func() error {
err := conn.rpcClient.Call(ctx, "SqlQuery.UnsupportedNewCommit", req, &errReply)
err := conn.rpcClient.Call(ctx, "SqlQuery.UnsupportedNewCommit", commitRequest, commitResponse)
if err != nil {
return err
}
// SqlQuery.Commit might return an application error inside the ErrorOnly
return vterrors.FromRPCError(errReply.Err)
return vterrors.FromRPCError(commitResponse.Err)
}
err := conn.withTimeout(ctx, action)
return tabletError(err)
@ -292,14 +287,8 @@ func (conn *TabletBson) Rollback(ctx context.Context, transactionID int64) error
SessionId: conn.sessionID,
TransactionId: transactionID,
}
var errReply tproto.ErrorOnly
action := func() error {
err := conn.rpcClient.Call(ctx, "SqlQuery.Rollback", req, &errReply)
if err != nil {
return err
}
// SqlQuery.Rollback might return an application error inside the ErrorOnly
return vterrors.FromRPCError(errReply.Err)
return conn.rpcClient.Call(ctx, "SqlQuery.Rollback", req, &rpc.Unused{})
}
err := conn.withTimeout(ctx, action)
return tabletError(err)
@ -315,18 +304,18 @@ func (conn *TabletBson) UnsupportedNewRollback(ctx context.Context, transactionI
return tabletconn.ConnClosed
}
req := &tproto.Session{
rollbackRequest := &tproto.RollbackRequest{
SessionId: conn.sessionID,
TransactionId: transactionID,
}
var errReply tproto.ErrorOnly
rollbackResponse := new(tproto.RollbackResponse)
action := func() error {
err := conn.rpcClient.Call(ctx, "SqlQuery.UnsupportedNewRollback", req, &errReply)
err := conn.rpcClient.Call(ctx, "SqlQuery.UnsupportedNewRollback", rollbackRequest, rollbackResponse)
if err != nil {
return err
}
// SqlQuery.Rollback might return an application error inside the ErrorOnly
return vterrors.FromRPCError(errReply.Err)
return vterrors.FromRPCError(rollbackResponse.Err)
}
err := conn.withTimeout(ctx, action)
return tabletError(err)

Просмотреть файл

@ -1,57 +0,0 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
// DO NOT EDIT.
// FILE GENERATED BY BSONGEN.
import (
"bytes"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
mproto "github.com/youtube/vitess/go/mysql/proto"
)
// MarshalBson bson-encodes ErrorOnly.
func (errorOnly *ErrorOnly) MarshalBson(buf *bytes2.ChunkedWriter, key string) {
bson.EncodeOptionalPrefix(buf, bson.Object, key)
lenWriter := bson.NewLenWriter(buf)
// *mproto.RPCError
if errorOnly.Err == nil {
bson.EncodePrefix(buf, bson.Null, "Err")
} else {
(*errorOnly.Err).MarshalBson(buf, "Err")
}
lenWriter.Close()
}
// UnmarshalBson bson-decodes into ErrorOnly.
func (errorOnly *ErrorOnly) UnmarshalBson(buf *bytes.Buffer, kind byte) {
switch kind {
case bson.EOO, bson.Object:
// valid
case bson.Null:
return
default:
panic(bson.NewBsonError("unexpected kind %v for ErrorOnly", kind))
}
bson.Next(buf, 4)
for kind := bson.NextByte(buf); kind != bson.EOO; kind = bson.NextByte(buf) {
switch bson.ReadCString(buf) {
case "Err":
// *mproto.RPCError
if kind != bson.Null {
errorOnly.Err = new(mproto.RPCError)
(*errorOnly.Err).UnmarshalBson(buf, kind)
}
default:
bson.Skip(buf, kind)
}
}
}

Просмотреть файл

@ -27,13 +27,6 @@ type SessionInfo struct {
//go:generate bsongen -file $GOFILE -type SessionInfo -o session_info_bson.go
// ErrorOnly is the response from an RPC call that has no return value except an error..
type ErrorOnly struct {
Err *mproto.RPCError
}
//go:generate bsongen -file $GOFILE -type ErrorOnly -o error_only_bson.go
// Query is the payload to Execute.
type Query struct {
Sql string
@ -138,3 +131,64 @@ type SplitQueryResult struct {
Queries []QuerySplit
Err *mproto.RPCError
}
// CallerID is the BSON implementation of the proto3 vtrpc.CallerID
type CallerID struct {
Principal string
Component string
Subcomponent string
}
// VTGateCallerID is the BSON implementation of the proto3 query.VTGateCallerID
type VTGateCallerID struct {
Username string
}
// TabletType is the BSON implementation of the proto3 query.TabletType.
// Assumes that enums are expressed as int64 in BSON.
type TabletType int64
// Target is the BSON implementation of the proto3 query.Target
type Target struct {
Keyspace string
Shard string
TabletType TabletType
}
// CommitRequest is the BSON implementation of the proto3 query.CommitRequest
type CommitRequest struct {
EffectiveCallerID *CallerID
ImmediateCallerID *VTGateCallerID
Target *Target
TransactionId int64
// Although SessionId is not part of the proto3 interface, we're adding it here
// for backwards compatibility reasons. The proto3 interface defines the future,
// where we think there might not be a need for SessionID.
SessionId int64
}
// CommitResponse is the BSON implementation of the proto3 query.CommitResponse
type CommitResponse struct {
// Err is named 'Err' instead of 'Error' (as the proto3 version is) to remain
// consistent with other BSON structs.
Err *mproto.RPCError
}
// RollbackRequest is the BSON implementation of the proto3 query.RollbackRequest
type RollbackRequest struct {
EffectiveCallerID *CallerID
ImmediateCallerID *VTGateCallerID
Target *Target
TransactionId int64
// Although SessionId is not part of the proto3 interface, we're adding it here
// for backwards compatibility reasons. The proto3 interface defines the future,
// where we think there might not be a need for SessionID.
SessionId int64
}
// RollbackResponse is the BSON implementation of the proto3 query.RollbackResponse
type RollbackResponse struct {
// Err is named 'Err' instead of 'Error' (as the proto3 version is) to remain
// consistent with other BSON structs.
Err *mproto.RPCError
}

Просмотреть файл

@ -237,15 +237,6 @@ func rpcErrFromTabletError(err error) *mproto.RPCError {
}
}
// AddTabletErrorToErrorOnly will mutate a ErrorOnly struct to fill in the Err
// field with details from the TabletError.
func AddTabletErrorToErrorOnly(err error, reply *proto.ErrorOnly) {
if err == nil {
return
}
reply.Err = rpcErrFromTabletError(err)
}
// AddTabletErrorToQueryResult will mutate a QueryResult struct to fill in the Err
// field with details from the TabletError.
func AddTabletErrorToQueryResult(err error, reply *mproto.QueryResult) {
@ -290,3 +281,21 @@ func AddTabletErrorToSplitQueryResult(err error, reply *proto.SplitQueryResult)
}
reply.Err = rpcErrFromTabletError(err)
}
// AddTabletErrorToCommitResponse will mutate a CommitResponse struct to fill in the Err
// field with details from the TabletError.
func AddTabletErrorToCommitResponse(err error, reply *proto.CommitResponse) {
if err == nil {
return
}
reply.Err = rpcErrFromTabletError(err)
}
// AddTabletErrorToRollbackResponse will mutate a RollbackResponse struct to fill in the Err
// field with details from the TabletError.
func AddTabletErrorToRollbackResponse(err error, reply *proto.RollbackResponse) {
if err == nil {
return
}
reply.Err = rpcErrFromTabletError(err)
}