зеркало из https://github.com/github/vitess-gh.git
Better formatting of long lines.
This commit is contained in:
Родитель
56ff3cbb49
Коммит
e569392844
|
@ -52,15 +52,27 @@ func NewScatterConn(serv SrvTopoServer, cell string, retryDelay time.Duration, r
|
|||
}
|
||||
|
||||
// Execute executes a non-streaming query on the specified shards.
|
||||
func (stc *ScatterConn) Execute(query string, bindVars map[string]interface{}, keyspace string, tabletType topo.TabletType, shards []string, session *SafeSession) (*proto.QueryResult, error) {
|
||||
results, allErrors := stc.multiGo(keyspace, tabletType, shards, session, func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
innerqr, err := sdc.Execute(query, bindVars, transactionId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sResults <- innerqr
|
||||
return nil
|
||||
})
|
||||
func (stc *ScatterConn) Execute(
|
||||
query string,
|
||||
bindVars map[string]interface{},
|
||||
keyspace string,
|
||||
shards []string,
|
||||
tabletType topo.TabletType,
|
||||
session *SafeSession,
|
||||
) (*proto.QueryResult, error) {
|
||||
results, allErrors := stc.multiGo(
|
||||
keyspace,
|
||||
shards,
|
||||
tabletType,
|
||||
session,
|
||||
func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
innerqr, err := sdc.Execute(query, bindVars, transactionId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sResults <- innerqr
|
||||
return nil
|
||||
})
|
||||
|
||||
qr := new(proto.QueryResult)
|
||||
for innerqr := range results {
|
||||
|
@ -74,15 +86,26 @@ func (stc *ScatterConn) Execute(query string, bindVars map[string]interface{}, k
|
|||
}
|
||||
|
||||
// ExecuteBatch executes a batch of non-streaming queries on the specified shards.
|
||||
func (stc *ScatterConn) ExecuteBatch(queries []tproto.BoundQuery, keyspace string, tabletType topo.TabletType, shards []string, session *SafeSession) (qrs *proto.QueryResultList, err error) {
|
||||
results, allErrors := stc.multiGo(keyspace, tabletType, shards, session, func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
innerqrs, err := sdc.ExecuteBatch(queries, transactionId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sResults <- innerqrs
|
||||
return nil
|
||||
})
|
||||
func (stc *ScatterConn) ExecuteBatch(
|
||||
queries []tproto.BoundQuery,
|
||||
keyspace string,
|
||||
shards []string,
|
||||
tabletType topo.TabletType,
|
||||
session *SafeSession,
|
||||
) (qrs *proto.QueryResultList, err error) {
|
||||
results, allErrors := stc.multiGo(
|
||||
keyspace,
|
||||
shards,
|
||||
tabletType,
|
||||
session,
|
||||
func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
innerqrs, err := sdc.ExecuteBatch(queries, transactionId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
sResults <- innerqrs
|
||||
return nil
|
||||
})
|
||||
|
||||
qrs = &proto.QueryResultList{}
|
||||
qrs.List = make([]mproto.QueryResult, len(queries))
|
||||
|
@ -99,14 +122,27 @@ func (stc *ScatterConn) ExecuteBatch(queries []tproto.BoundQuery, keyspace strin
|
|||
}
|
||||
|
||||
// StreamExecute executes a streaming query on vttablet. The retry rules are the same.
|
||||
func (stc *ScatterConn) StreamExecute(query string, bindVars map[string]interface{}, keyspace string, tabletType topo.TabletType, shards []string, session *SafeSession, sendReply func(reply interface{}) error) error {
|
||||
results, allErrors := stc.multiGo(keyspace, tabletType, shards, session, func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
sr, errFunc := sdc.StreamExecute(query, bindVars, transactionId)
|
||||
for qr := range sr {
|
||||
sResults <- qr
|
||||
}
|
||||
return errFunc()
|
||||
})
|
||||
func (stc *ScatterConn) StreamExecute(
|
||||
query string,
|
||||
bindVars map[string]interface{},
|
||||
keyspace string,
|
||||
shards []string,
|
||||
tabletType topo.TabletType,
|
||||
session *SafeSession,
|
||||
sendReply func(reply interface{}) error,
|
||||
) error {
|
||||
results, allErrors := stc.multiGo(
|
||||
keyspace,
|
||||
shards,
|
||||
tabletType,
|
||||
session,
|
||||
func(sdc *ShardConn, transactionId int64, sResults chan<- interface{}) error {
|
||||
sr, errFunc := sdc.StreamExecute(query, bindVars, transactionId)
|
||||
for qr := range sr {
|
||||
sResults <- qr
|
||||
}
|
||||
return errFunc()
|
||||
})
|
||||
var replyErr error
|
||||
for innerqr := range results {
|
||||
// We still need to finish pumping
|
||||
|
@ -170,7 +206,13 @@ func (stc *ScatterConn) Close() error {
|
|||
// If there are any unrecoverable errors during a transaction, multiGo
|
||||
// rolls back the transaction for all shards.
|
||||
// The action function must match the shardActionFunc signature.
|
||||
func (stc *ScatterConn) multiGo(keyspace string, tabletType topo.TabletType, shards []string, session *SafeSession, action shardActionFunc) (rResults <-chan interface{}, allErrors *concurrency.AllErrorRecorder) {
|
||||
func (stc *ScatterConn) multiGo(
|
||||
keyspace string,
|
||||
shards []string,
|
||||
tabletType topo.TabletType,
|
||||
session *SafeSession,
|
||||
action shardActionFunc,
|
||||
) (rResults <-chan interface{}, allErrors *concurrency.AllErrorRecorder) {
|
||||
allErrors = new(concurrency.AllErrorRecorder)
|
||||
results := make(chan interface{}, len(shards))
|
||||
var wg sync.WaitGroup
|
||||
|
@ -223,7 +265,12 @@ func (stc *ScatterConn) getConnection(keyspace, shard string, tabletType topo.Ta
|
|||
return sdc
|
||||
}
|
||||
|
||||
func (stc *ScatterConn) updateSession(sdc *ShardConn, keyspace, shard string, tabletType topo.TabletType, session *SafeSession) (transactionId int64, err error) {
|
||||
func (stc *ScatterConn) updateSession(
|
||||
sdc *ShardConn,
|
||||
keyspace, shard string,
|
||||
tabletType topo.TabletType,
|
||||
session *SafeSession,
|
||||
) (transactionId int64, err error) {
|
||||
if !session.InTransaction() {
|
||||
return 0, nil
|
||||
}
|
||||
|
|
|
@ -34,7 +34,13 @@ func Init(serv SrvTopoServer, cell string, retryDelay time.Duration, retryCount
|
|||
|
||||
// ExecuteShard executes a non-streaming query on the specified shards.
|
||||
func (vtg *VTGate) ExecuteShard(context *rpcproto.Context, query *proto.QueryShard, reply *proto.QueryResult) error {
|
||||
qr, err := vtg.scatterConn.Execute(query.Sql, query.BindVariables, query.Keyspace, query.TabletType, query.Shards, NewSafeSession(query.Sessn))
|
||||
qr, err := vtg.scatterConn.Execute(
|
||||
query.Sql,
|
||||
query.BindVariables,
|
||||
query.Keyspace,
|
||||
query.Shards,
|
||||
query.TabletType,
|
||||
NewSafeSession(query.Sessn))
|
||||
if err == nil {
|
||||
*reply = *qr
|
||||
} else {
|
||||
|
@ -46,7 +52,12 @@ func (vtg *VTGate) ExecuteShard(context *rpcproto.Context, query *proto.QuerySha
|
|||
|
||||
// ExecuteBatchShard executes a group of queries on the specified shards.
|
||||
func (vtg *VTGate) ExecuteBatchShard(context *rpcproto.Context, batchQuery *proto.BatchQueryShard, reply *proto.QueryResultList) error {
|
||||
qrs, err := vtg.scatterConn.ExecuteBatch(batchQuery.Queries, batchQuery.Keyspace, batchQuery.TabletType, batchQuery.Shards, NewSafeSession(batchQuery.Sessn))
|
||||
qrs, err := vtg.scatterConn.ExecuteBatch(
|
||||
batchQuery.Queries,
|
||||
batchQuery.Keyspace,
|
||||
batchQuery.Shards,
|
||||
batchQuery.TabletType,
|
||||
NewSafeSession(batchQuery.Sessn))
|
||||
if err == nil {
|
||||
*reply = *qrs
|
||||
} else {
|
||||
|
@ -58,7 +69,13 @@ func (vtg *VTGate) ExecuteBatchShard(context *rpcproto.Context, batchQuery *prot
|
|||
|
||||
// StreamExecuteShard executes a streaming query on the specified shards.
|
||||
func (vtg *VTGate) StreamExecuteShard(context *rpcproto.Context, query *proto.QueryShard, sendReply func(interface{}) error) error {
|
||||
err := vtg.scatterConn.StreamExecute(query.Sql, query.BindVariables, query.Keyspace, query.TabletType, query.Shards, NewSafeSession(query.Sessn), sendReply)
|
||||
err := vtg.scatterConn.StreamExecute(
|
||||
query.Sql,
|
||||
query.BindVariables,
|
||||
query.Keyspace,
|
||||
query.Shards,
|
||||
query.TabletType,
|
||||
NewSafeSession(query.Sessn), sendReply)
|
||||
if err != nil {
|
||||
log.Errorf("StreamExecuteShard: %v, query: %#v", err, query)
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче