зеркало из https://github.com/github/vitess-gh.git
tabletserver: qFetch should handle conn pool errors, and guarantee broadcast for consolidation.
This commit is contained in:
Родитель
e7b71fa85e
Коммит
351ee8bb3a
|
@ -38,6 +38,15 @@ func (self *ConnectionPool) Get() PoolConnection {
|
|||
return r.(*pooledConnection)
|
||||
}
|
||||
|
||||
// You must call Recycle on the PoolConnection once done.
|
||||
func (self *ConnectionPool) SafeGet() (PoolConnection, error) {
|
||||
r, err := self.RoundRobin.Get()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return r.(*pooledConnection), nil
|
||||
}
|
||||
|
||||
// You must call Recycle on the PoolConnection once done.
|
||||
func (self *ConnectionPool) TryGet() PoolConnection {
|
||||
r, err := self.RoundRobin.TryGet()
|
||||
|
|
|
@ -14,6 +14,10 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
waitError = NewTabletError(FAIL, "Error waiting for consolidation")
|
||||
)
|
||||
|
||||
type Consolidator struct {
|
||||
mu sync.Mutex
|
||||
queries map[string]*Result
|
||||
|
@ -40,7 +44,9 @@ func (self *Consolidator) Create(sql string) (r *Result, created bool) {
|
|||
if r, ok := self.queries[sql]; ok {
|
||||
return r, false
|
||||
}
|
||||
r = &Result{consolidator: self, sql: sql}
|
||||
// Preset the error. If there was an unexpected panic during the main
|
||||
// query, then all those who waited will return the waitError.
|
||||
r = &Result{consolidator: self, sql: sql, Err: waitError}
|
||||
r.executing.Lock()
|
||||
self.queries[sql] = r
|
||||
return r, true
|
||||
|
|
|
@ -641,12 +641,16 @@ func (qe *QueryEngine) qFetch(logStats *sqlQueryStats, plan *CompiledPlan, parse
|
|||
sql := qe.generateFinalSql(parsed_query, plan.BindVars, listVars, nil)
|
||||
q, ok := qe.consolidator.Create(string(sql))
|
||||
if ok {
|
||||
defer q.Broadcast()
|
||||
waitingForConnectionStart := time.Now()
|
||||
conn := qe.connPool.Get()
|
||||
conn, err := qe.connPool.SafeGet()
|
||||
logStats.WaitingForConnection += time.Now().Sub(waitingForConnectionStart)
|
||||
defer conn.Recycle()
|
||||
q.Result, q.Err = qe.executeSql(logStats, conn, sql, false)
|
||||
q.Broadcast()
|
||||
if err != nil {
|
||||
q.Err = err
|
||||
} else {
|
||||
defer conn.Recycle()
|
||||
q.Result, q.Err = qe.executeSql(logStats, conn, sql, false)
|
||||
}
|
||||
} else {
|
||||
logStats.QuerySources |= QUERY_SOURCE_CONSOLIDATOR
|
||||
q.Wait()
|
||||
|
|
Загрузка…
Ссылка в новой задаче