Add flag and enable rpc timeout for gorpctabletconn

This commit is contained in:
Liang Guo 2015-02-12 10:26:28 -08:00
Родитель 66edf57d68
Коммит d97b94c37e
4 изменённых файлов: 79 добавлений и 16 удалений

Просмотреть файл

@ -73,6 +73,23 @@ func DialTablet(ctx context.Context, endPoint topo.EndPoint, keyspace, shard str
return conn, nil
}
func (conn *TabletBson) withTimeout(ctx context.Context, action func() error) error {
var err error
var errAction error
done := make(chan int)
go func() {
errAction = action()
close(done)
}()
select {
case <-ctx.Done():
err = ctx.Err()
case <-done:
err = errAction
}
return err
}
// Execute sends the query to VTTablet.
func (conn *TabletBson) Execute(ctx context.Context, query string, bindVars map[string]interface{}, transactionID int64) (*mproto.QueryResult, error) {
conn.mu.RLock()
@ -88,7 +105,10 @@ func (conn *TabletBson) Execute(ctx context.Context, query string, bindVars map[
SessionId: conn.sessionID,
}
qr := new(mproto.QueryResult)
if err := conn.rpcClient.Call(ctx, "SqlQuery.Execute", req, qr); err != nil {
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.Execute", req, qr)
}
if err := conn.withTimeout(ctx, action); err != nil {
return nil, tabletError(err)
}
return qr, nil
@ -108,7 +128,10 @@ func (conn *TabletBson) ExecuteBatch(ctx context.Context, queries []tproto.Bound
SessionId: conn.sessionID,
}
qrs := new(tproto.QueryResultList)
if err := conn.rpcClient.Call(ctx, "SqlQuery.ExecuteBatch", req, qrs); err != nil {
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.ExecuteBatch", req, qrs)
}
if err := conn.withTimeout(ctx, action); err != nil {
return nil, tabletError(err)
}
return qrs, nil
@ -157,7 +180,10 @@ func (conn *TabletBson) Begin(ctx context.Context) (transactionID int64, err err
SessionId: conn.sessionID,
}
var txInfo tproto.TransactionInfo
err = conn.rpcClient.Call(ctx, "SqlQuery.Begin", req, &txInfo)
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.Begin", req, &txInfo)
}
err = conn.withTimeout(ctx, action)
return txInfo.TransactionId, tabletError(err)
}
@ -173,7 +199,11 @@ func (conn *TabletBson) Commit(ctx context.Context, transactionID int64) error {
SessionId: conn.sessionID,
TransactionId: transactionID,
}
return tabletError(conn.rpcClient.Call(ctx, "SqlQuery.Commit", req, &rpc.Unused{}))
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.Commit", req, &rpc.Unused{})
}
err := conn.withTimeout(ctx, action)
return tabletError(err)
}
// Rollback rolls back the ongoing transaction.
@ -188,7 +218,11 @@ func (conn *TabletBson) Rollback(ctx context.Context, transactionID int64) error
SessionId: conn.sessionID,
TransactionId: transactionID,
}
return tabletError(conn.rpcClient.Call(ctx, "SqlQuery.Rollback", req, &rpc.Unused{}))
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.Rollback", req, &rpc.Unused{})
}
err := conn.withTimeout(ctx, action)
return tabletError(err)
}
// SplitQuery is the stub for SqlQuery.SplitQuery RPC
@ -204,7 +238,10 @@ func (conn *TabletBson) SplitQuery(ctx context.Context, query tproto.BoundQuery,
SplitCount: splitCount,
}
reply := new(tproto.SplitQueryResult)
if err := conn.rpcClient.Call(ctx, "SqlQuery.SplitQuery", req, reply); err != nil {
action := func() error {
return conn.rpcClient.Call(ctx, "SqlQuery.SplitQuery", req, reply)
}
if err := conn.withTimeout(ctx, action); err != nil {
return nil, tabletError(err)
}
return reply.Queries, nil

Просмотреть файл

@ -6,6 +6,9 @@
package gorpcvtgateservice
import (
"flag"
"time"
"github.com/youtube/vitess/go/vt/rpc"
"github.com/youtube/vitess/go/vt/servenv"
"github.com/youtube/vitess/go/vt/vtgate"
@ -13,35 +16,53 @@ import (
"golang.org/x/net/context"
)
var (
rpcTimeout = flag.Duration("bsonrpc_timeout", 20*time.Second, "rpc timeout")
)
type VTGate struct {
server *vtgate.VTGate
}
func (vtg *VTGate) Execute(ctx context.Context, query *proto.Query, reply *proto.QueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Execute(ctx, query, reply)
}
func (vtg *VTGate) ExecuteShard(ctx context.Context, query *proto.QueryShard, reply *proto.QueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteShard(ctx, query, reply)
}
func (vtg *VTGate) ExecuteKeyspaceIds(ctx context.Context, query *proto.KeyspaceIdQuery, reply *proto.QueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteKeyspaceIds(ctx, query, reply)
}
func (vtg *VTGate) ExecuteKeyRanges(ctx context.Context, query *proto.KeyRangeQuery, reply *proto.QueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteKeyRanges(ctx, query, reply)
}
func (vtg *VTGate) ExecuteEntityIds(ctx context.Context, query *proto.EntityIdsQuery, reply *proto.QueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteEntityIds(ctx, query, reply)
}
func (vtg *VTGate) ExecuteBatchShard(ctx context.Context, batchQuery *proto.BatchQueryShard, reply *proto.QueryResultList) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteBatchShard(ctx, batchQuery, reply)
}
func (vtg *VTGate) ExecuteBatchKeyspaceIds(ctx context.Context, batchQuery *proto.KeyspaceIdBatchQuery, reply *proto.QueryResultList) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.ExecuteBatchKeyspaceIds(ctx, batchQuery, reply)
}
@ -70,18 +91,26 @@ func (vtg *VTGate) StreamExecuteKeyspaceIds(ctx context.Context, query *proto.Ke
}
func (vtg *VTGate) Begin(ctx context.Context, noInput *rpc.Unused, outSession *proto.Session) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Begin(ctx, outSession)
}
func (vtg *VTGate) Commit(ctx context.Context, inSession *proto.Session, noOutput *rpc.Unused) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Commit(ctx, inSession)
}
func (vtg *VTGate) Rollback(ctx context.Context, inSession *proto.Session, noOutput *rpc.Unused) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.Rollback(ctx, inSession)
}
func (vtg *VTGate) SplitQuery(ctx context.Context, req *proto.SplitQueryRequest, reply *proto.SplitQueryResult) error {
ctx, cancel := context.WithDeadline(ctx, time.Now().Add(*rpcTimeout))
defer cancel()
return vtg.server.SplitQuery(ctx, req, reply)
}

Просмотреть файл

@ -360,6 +360,7 @@ def vtgate_start(vtport=None, cell='test_nj', retry_delay=1, retry_count=1,
'-log_dir', environment.vtlogroot,
'-srv_topo_cache_ttl', cache_ttl,
'-conn-timeout', timeout,
'-bsonrpc_timeout', '5s',
] + protocols_flavor().tabletconn_protocol_flags()
if topo_impl:
args.extend(['-topo_implementation', topo_impl])

Просмотреть файл

@ -153,9 +153,8 @@ def setup_tablets():
vtgate_server, vtgate_port = utils.vtgate_start()
def get_connection(user=None, password=None):
def get_connection(user=None, password=None, timeout=10.0):
global vtgate_port
timeout = 10.0
conn = None
vtgate_addrs = {"vt": ["localhost:%s" % (vtgate_port),]}
conn = conn_class.connect(vtgate_addrs, timeout,
@ -774,25 +773,24 @@ class TestFailures(unittest.TestCase):
vtgate_conn.commit()
# test timeout between py client and vtgate
# the default timeout is 10 seconds
def test_vtgate_timeout(self):
try:
vtgate_conn = get_connection()
vtgate_conn = get_connection(timeout=3.0)
except Exception, e:
self.fail("Connection to vtgate failed with error %s" % str(e))
with self.assertRaises(dbexceptions.TimeoutError):
vtgate_conn._execute(
"select sleep(12) from dual", {},
"select sleep(4) from dual", {},
KEYSPACE_NAME, 'replica',
keyranges=[self.keyrange])
try:
vtgate_conn = get_connection()
vtgate_conn = get_connection(timeout=3.0)
except Exception, e:
self.fail("Connection to vtgate failed with error %s" % str(e))
with self.assertRaises(dbexceptions.TimeoutError):
vtgate_conn._execute(
"select sleep(12) from dual", {},
"select sleep(4) from dual", {},
KEYSPACE_NAME, 'master',
keyranges=[self.keyrange])
# Currently this is causing vttablet to become unreachable at
@ -803,10 +801,8 @@ class TestFailures(unittest.TestCase):
time.sleep(3)
# test timeout between vtgate and vttablet
# the default timeout is 5 seconds
# the timeout is set to 5 seconds
def test_tablet_timeout(self):
# disable the test till bsonrpc supports deadline
return
try:
vtgate_conn = get_connection()
except Exception, e: