Adding missing APIs to go client.

This commit is contained in:
Alain Jobart 2015-05-29 10:25:33 -07:00
Родитель e83e214573
Коммит b048ef916d
3 изменённых файлов: 335 добавлений и 7 удалений

Просмотреть файл

@ -18,6 +18,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/key"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate/proto"
@ -149,6 +150,31 @@ func (conn *FakeVTGateConn) ExecuteShard(ctx context.Context, sql string, keyspa
return &reply, s, nil
}
// ExecuteKeyspaceIds please see vtgateconn.Impl.ExecuteKeyspaceIds
func (conn *FakeVTGateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteKeyRanges please see vtgateconn.Impl.ExecuteKeyRanges
func (conn *FakeVTGateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteEntityIds please see vtgateconn.Impl.ExecuteEntityIds
func (conn *FakeVTGateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteBatchShard please see vtgateconn.Impl.ExecuteBatchShard
func (conn *FakeVTGateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// ExecuteBatchKeyspaceIds please see vtgateconn.Impl.ExecuteBatchKeyspaceIds
func (conn *FakeVTGateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
panic("not implemented")
}
// StreamExecute please see vtgateconn.Impl.StreamExecute
func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
@ -184,6 +210,21 @@ func (conn *FakeVTGateConn) StreamExecute(ctx context.Context, query string, bin
return resultChan, nil
}
// StreamExecuteShard please see vtgateconn.Impl.StreamExecuteShard
func (conn *FakeVTGateConn) StreamExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
panic("not implemented")
}
// StreamExecuteKeyRanges please see vtgateconn.Impl.StreamExecuteKeyRanges
func (conn *FakeVTGateConn) StreamExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
panic("not implemented")
}
// StreamExecuteKeyspaceIds please see vtgateconn.Impl.StreamExecuteKeyspaceIds
func (conn *FakeVTGateConn) StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
panic("not implemented")
}
// Begin please see vtgateconn.Impl.Begin
func (conn *FakeVTGateConn) Begin(ctx context.Context) (interface{}, error) {
return &proto.Session{

Просмотреть файл

@ -13,6 +13,7 @@ import (
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/rpcwrap/bsonrpc"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/rpc"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/topo"
@ -85,6 +86,120 @@ func (conn *vtgateConn) ExecuteShard(ctx context.Context, query string, keyspace
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyspaceIdQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteKeyspaceIds", request, &result); err != nil {
return nil, session, err
}
if result.Error != "" {
return nil, result.Session, errors.New(result.Error)
}
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyRangeQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyRanges: keyRanges,
TabletType: tabletType,
Session: s,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteKeyRanges", request, &result); err != nil {
return nil, session, err
}
if result.Error != "" {
return nil, result.Session, errors.New(result.Error)
}
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.EntityIdsQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
EntityColumnName: entityColumnName,
EntityKeyspaceIDs: entityKeyspaceIDs,
TabletType: tabletType,
Session: s,
}
var result proto.QueryResult
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteEntityIds", request, &result); err != nil {
return nil, session, err
}
if result.Error != "" {
return nil, result.Session, errors.New(result.Error)
}
return result.Result, result.Session, nil
}
func (conn *vtgateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.BatchQueryShard{
Queries: queries,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: s,
}
var result proto.QueryResultList
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteBatchShard", request, &result); err != nil {
return nil, session, err
}
if result.Error != "" {
return nil, result.Session, errors.New(result.Error)
}
return result.List, result.Session, nil
}
func (conn *vtgateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error) {
var s *proto.Session
if session != nil {
s = session.(*proto.Session)
}
request := proto.KeyspaceIdBatchQuery{
Queries: queries,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: s,
}
var result proto.QueryResultList
if err := conn.rpcConn.Call(ctx, "VTGate.ExecuteBatchKeyspaceIds", request, &result); err != nil {
return nil, session, err
}
if result.Error != "" {
return nil, result.Session, errors.New(result.Error)
}
return result.List, result.Session, nil
}
func (conn *vtgateConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
req := &proto.Query{
Sql: query,
@ -94,6 +209,52 @@ func (conn *vtgateConn) StreamExecute(ctx context.Context, query string, bindVar
}
sr := make(chan *proto.QueryResult, 10)
c := conn.rpcConn.StreamGo("VTGate.StreamExecute", req, sr)
return sendStreamResults(c, sr)
}
func (conn *vtgateConn) StreamExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
req := &proto.QueryShard{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
Shards: shards,
TabletType: tabletType,
Session: nil,
}
sr := make(chan *proto.QueryResult, 10)
c := conn.rpcConn.StreamGo("VTGate.StreamExecuteShard", req, sr)
return sendStreamResults(c, sr)
}
func (conn *vtgateConn) StreamExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
req := &proto.KeyRangeQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyRanges: keyRanges,
TabletType: tabletType,
Session: nil,
}
sr := make(chan *proto.QueryResult, 10)
c := conn.rpcConn.StreamGo("VTGate.StreamExecuteKeyRanges", req, sr)
return sendStreamResults(c, sr)
}
func (conn *vtgateConn) StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
req := &proto.KeyspaceIdQuery{
Sql: query,
BindVariables: bindVars,
Keyspace: keyspace,
KeyspaceIds: keyspaceIds,
TabletType: tabletType,
Session: nil,
}
sr := make(chan *proto.QueryResult, 10)
c := conn.rpcConn.StreamGo("VTGate.StreamExecuteKeyspaceIds", req, sr)
return sendStreamResults(c, sr)
}
func sendStreamResults(c *rpcplus.Call, sr chan *proto.QueryResult) (<-chan *mproto.QueryResult, vtgateconn.ErrFunc) {
srout := make(chan *mproto.QueryResult, 1)
go func() {
defer close(srout)

Просмотреть файл

@ -11,6 +11,7 @@ import (
log "github.com/golang/glog"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/vt/key"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
"github.com/youtube/vitess/go/vt/topo"
"github.com/youtube/vitess/go/vt/vtgate/proto"
@ -48,6 +49,7 @@ type VTGateConn struct {
}
// Execute executes a non-streaming query on vtgate.
// This is using v3 API.
func (conn *VTGateConn) Execute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.Execute(ctx, query, bindVars, tabletType, nil)
return res, err
@ -59,14 +61,67 @@ func (conn *VTGateConn) ExecuteShard(ctx context.Context, query string, keyspace
return res, err
}
// StreamExecute executes a streaming query on vtgate. It returns a channel, ErrFunc and error.
// If error is non-nil, it means that the StreamExecute failed to send the request. Otherwise,
// you can pull values from the channel till it's closed. Following this, you can call ErrFunc
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
func (conn *VTGateConn) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, nil)
return res, err
}
// ExecuteKeyRanges executes a non-streaming query on a key range.
func (conn *VTGateConn) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, nil)
return res, err
}
// ExecuteEntityIds executes a non-streaming query for multiple entities.
func (conn *VTGateConn) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, nil)
return res, err
}
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
func (conn *VTGateConn) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, nil)
return res, err
}
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
func (conn *VTGateConn) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
res, _, err := conn.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, nil)
return res, err
}
// StreamExecute executes a streaming query on vtgate. It returns a channel, and ErrFunc.
// You can pull values from the channel till it's closed. Following this, you can call ErrFunc
// to see if the stream ended normally or due to a failure.
func (conn *VTGateConn) StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc) {
return conn.impl.StreamExecute(ctx, query, bindVars, tabletType)
}
// StreamExecuteShard executes a streaming query on vtgate, on a set of shards.
// It returns a channel, and ErrFunc.
// You can pull values from the channel till it's closed. Following this, you can call ErrFunc
// to see if the stream ended normally or due to a failure.
func (conn *VTGateConn) StreamExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc) {
return conn.impl.StreamExecuteShard(ctx, query, keyspace, shards, bindVars, tabletType)
}
// StreamExecuteKeyRanges executes a streaming query on vtgate, on a set of keyranges.
// It returns a channel, and ErrFunc.
// You can pull values from the channel till it's closed. Following this, you can call ErrFunc
// to see if the stream ended normally or due to a failure.
func (conn *VTGateConn) StreamExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc) {
return conn.impl.StreamExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType)
}
// StreamExecuteKeyspaceIds executes a streaming query on vtgate, for the given keyspaceIds.
// It returns a channel, and ErrFunc.
// You can pull values from the channel till it's closed. Following this, you can call ErrFunc
// to see if the stream ended normally or due to a failure.
func (conn *VTGateConn) StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc) {
return conn.impl.StreamExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType)
}
// Begin starts a transaction and returns a VTGateTX.
func (conn *VTGateConn) Begin(ctx context.Context) (*VTGateTx, error) {
session, err := conn.impl.Begin(ctx)
@ -119,6 +174,56 @@ func (tx *VTGateTx) ExecuteShard(ctx context.Context, query string, keyspace str
return res, err
}
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
func (tx *VTGateTx) ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeKeyspaceIds: not in transaction")
}
res, session, err := tx.impl.ExecuteKeyspaceIds(ctx, query, keyspace, keyspaceIds, bindVars, tabletType, tx.session)
tx.session = session
return res, err
}
// ExecuteKeyRanges executes a non-streaming query on a key range.
func (tx *VTGateTx) ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeKeyRanges: not in transaction")
}
res, session, err := tx.impl.ExecuteKeyRanges(ctx, query, keyspace, keyRanges, bindVars, tabletType, tx.session)
tx.session = session
return res, err
}
// ExecuteEntityIds executes a non-streaming query for multiple entities.
func (tx *VTGateTx) ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType) (*mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeEntityIds: not in transaction")
}
res, session, err := tx.impl.ExecuteEntityIds(ctx, query, keyspace, entityColumnName, entityKeyspaceIDs, bindVars, tabletType, tx.session)
tx.session = session
return res, err
}
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
func (tx *VTGateTx) ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeBatchShard: not in transaction")
}
res, session, err := tx.impl.ExecuteBatchShard(ctx, queries, keyspace, shards, tabletType, tx.session)
tx.session = session
return res, err
}
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
func (tx *VTGateTx) ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType) ([]mproto.QueryResult, error) {
if tx.session == nil {
return nil, fmt.Errorf("executeBatchKeyspaceIds: not in transaction")
}
res, session, err := tx.impl.ExecuteBatchKeyspaceIds(ctx, queries, keyspace, keyspaceIds, tabletType, tx.session)
tx.session = session
return res, err
}
// Commit commits the current transaction.
func (tx *VTGateTx) Commit(ctx context.Context) error {
if tx.session == nil {
@ -155,12 +260,33 @@ type Impl interface {
// ExecuteShard executes a non-streaming query for multiple shards on vtgate.
ExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
// StreamExecute executes a streaming query on vtgate. It returns a channel, ErrFunc and error.
// If error is non-nil, it means that the StreamExecute failed to send the request. Otherwise,
// you can pull values from the channel till it's closed. Following this, you can call ErrFunc
// to see if the stream ended normally or due to a failure.
// ExecuteKeyspaceIds executes a non-streaming query for multiple keyspace_ids.
ExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteKeyRanges executes a non-streaming query on a key range.
ExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteEntityIds executes a non-streaming query for multiple entities.
ExecuteEntityIds(ctx context.Context, query string, keyspace string, entityColumnName string, entityKeyspaceIDs []proto.EntityId, bindVars map[string]interface{}, tabletType topo.TabletType, session interface{}) (*mproto.QueryResult, interface{}, error)
// ExecuteBatchShard executes a set of non-streaming queries for multiple shards.
ExecuteBatchShard(ctx context.Context, queries []tproto.BoundQuery, keyspace string, shards []string, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error)
// ExecuteBatchKeyspaceIds executes a set of non-streaming queries for multiple keyspace ids.
ExecuteBatchKeyspaceIds(ctx context.Context, queries []tproto.BoundQuery, keyspace string, keyspaceIds []key.KeyspaceId, tabletType topo.TabletType, session interface{}) ([]mproto.QueryResult, interface{}, error)
// StreamExecute executes a streaming query on vtgate.
StreamExecute(ctx context.Context, query string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc)
// StreamExecuteShard executes a streaming query on vtgate, on a set of shards.
StreamExecuteShard(ctx context.Context, query string, keyspace string, shards []string, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc)
// StreamExecuteKeyRanges executes a streaming query on vtgate, on a set of keyranges.
StreamExecuteKeyRanges(ctx context.Context, query string, keyspace string, keyRanges []key.KeyRange, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc)
// StreamExecuteKeyspaceIds executes a streaming query on vtgate, for the given keyspaceIds.
StreamExecuteKeyspaceIds(ctx context.Context, query string, keyspace string, keyspaceIds []key.KeyspaceId, bindVars map[string]interface{}, tabletType topo.TabletType) (<-chan *mproto.QueryResult, ErrFunc)
// Begin starts a transaction and returns a VTGateTX.
Begin(ctx context.Context) (interface{}, error)