зеркало из https://github.com/github/vitess-gh.git
schema: handle case where cardinality==0
tabletserver: copy read-only subquery results before normalizing tabletserver: add function to validate the invalidation keys tablet_server: bug fix. was not disabling cache if pk types were invalid
This commit is contained in:
Родитель
cc4d3d2878
Коммит
15c1443004
|
@ -105,6 +105,9 @@ func NewIndex(name string) *Index {
|
|||
|
||||
func (self *Index) AddColumn(name string, cardinality uint64) {
|
||||
self.Columns = append(self.Columns, name)
|
||||
if cardinality == 0 {
|
||||
cardinality = uint64(len(self.Cardinality) + 1)
|
||||
}
|
||||
self.Cardinality = append(self.Cardinality, cardinality)
|
||||
}
|
||||
|
||||
|
|
|
@ -33,11 +33,13 @@ package tabletserver
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"code.google.com/p/vitess/go/relog"
|
||||
"code.google.com/p/vitess/go/vt/schema"
|
||||
"code.google.com/p/vitess/go/vt/sqlparser"
|
||||
"encoding/base64"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func buildValueList(pkValues []interface{}, bindVars map[string]interface{}) [][]interface{} {
|
||||
|
@ -101,6 +103,15 @@ func resolveValue(value interface{}, bindVars map[string]interface{}) interface{
|
|||
return value
|
||||
}
|
||||
|
||||
func copyRows(rows [][]interface{}) (result [][]interface{}) {
|
||||
result = make([][]interface{}, len(rows))
|
||||
for i, row := range rows {
|
||||
result[i] = make([]interface{}, len(row))
|
||||
copy(result[i], row)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func normalizePKRows(tableInfo *TableInfo, pkRows [][]interface{}) {
|
||||
normalizeRows(tableInfo, tableInfo.PKColumns, pkRows)
|
||||
}
|
||||
|
@ -186,7 +197,6 @@ func encodePKValue(buf *bytes.Buffer, pkValue interface{}, category int) {
|
|||
default:
|
||||
buf.WriteByte('\'')
|
||||
encoder := base64.NewEncoder(base64.StdEncoding, buf)
|
||||
defer encoder.Close()
|
||||
switch val := pkValue.(type) {
|
||||
case string:
|
||||
encoder.Write([]byte(val))
|
||||
|
@ -195,10 +205,44 @@ func encodePKValue(buf *bytes.Buffer, pkValue interface{}, category int) {
|
|||
default:
|
||||
panic(NewTabletError(FAIL, "Type %T disallowed for non-number pk columns", val))
|
||||
}
|
||||
encoder.Close()
|
||||
buf.WriteByte('\'')
|
||||
}
|
||||
}
|
||||
|
||||
func validateKey(tableInfo *TableInfo, key string) (newKey string) {
|
||||
if key == "" {
|
||||
// TODO: Verify auto-increment table
|
||||
return
|
||||
}
|
||||
pieces := strings.Split(key, ".")
|
||||
if len(pieces) != len(tableInfo.PKColumns) {
|
||||
// TODO: Verify auto-increment table
|
||||
return ""
|
||||
}
|
||||
pkValues := make([]interface{}, len(tableInfo.PKColumns))
|
||||
for i, piece := range pieces {
|
||||
if piece[0] == '\'' {
|
||||
/*var err error
|
||||
pkValues[i], err = base64.StdEncoding.DecodeString(piece[1 : len(piece)-1])
|
||||
if err != nil {
|
||||
relog.Warning("Error decoding key %s for table %s: %v", key, tableInfo.Name, err)
|
||||
return
|
||||
}*/
|
||||
pkValues[i] = piece[1 : len(piece)-1]
|
||||
} else if piece == "null" {
|
||||
// TODO: Verify auto-increment table
|
||||
return ""
|
||||
} else {
|
||||
pkValues[i] = piece
|
||||
}
|
||||
}
|
||||
if newKey = buildKey(tableInfo, pkValues); newKey != key {
|
||||
relog.Warning("Error: Key mismatch, received: %s, computed: %s", key, newKey)
|
||||
}
|
||||
return buildKey(tableInfo, pkValues)
|
||||
}
|
||||
|
||||
// duplicated in vt/sqlparser/execution.go
|
||||
func tonumber(val string) (number interface{}) {
|
||||
var err error
|
||||
|
|
|
@ -76,7 +76,7 @@ type SqlQuery struct {
|
|||
|
||||
// stats are globals to allow anybody to set them
|
||||
var queryStats, waitStats *stats.Timings
|
||||
var killStats, errorStats *stats.Counters
|
||||
var killStats, errorStats, invalidationStats *stats.Counters
|
||||
var resultStats *stats.Histogram
|
||||
|
||||
var resultBuckets = []int64{0, 1, 5, 10, 50, 100, 500, 1000, 5000, 10000}
|
||||
|
@ -104,6 +104,7 @@ func NewSqlQuery(cachePoolCap, poolSize, transactionCap int, transactionTimeout
|
|||
waitStats = stats.NewTimings("Waits")
|
||||
killStats = stats.NewCounters("Kills")
|
||||
errorStats = stats.NewCounters("Errors")
|
||||
invalidationStats = stats.NewCounters("Invalidations")
|
||||
resultStats = stats.NewHistogram("Results", resultBuckets)
|
||||
return self
|
||||
}
|
||||
|
@ -393,6 +394,7 @@ func (self *SqlQuery) Invalidate(cacheInvalidate *CacheInvalidate, noOutput *str
|
|||
*noOutput = ""
|
||||
self.mu.RLock()
|
||||
defer self.mu.RUnlock()
|
||||
|
||||
if self.cachePool.IsClosed() {
|
||||
return nil
|
||||
}
|
||||
|
@ -400,12 +402,19 @@ func (self *SqlQuery) Invalidate(cacheInvalidate *CacheInvalidate, noOutput *str
|
|||
if tableInfo == nil {
|
||||
return NewTabletError(FAIL, "Table %s not found", cacheInvalidate.Table)
|
||||
}
|
||||
if tableInfo == nil || tableInfo.Cache == nil {
|
||||
if tableInfo.CacheType == 0 {
|
||||
return nil
|
||||
}
|
||||
invalidationStats.Add("DML", 1)
|
||||
for _, val := range cacheInvalidate.Keys {
|
||||
// TODO: Validate val
|
||||
tableInfo.Cache.Delete(val.(string))
|
||||
newKey := validateKey(tableInfo, val.(string))
|
||||
if newKey != "" {
|
||||
tableInfo.Cache.Delete(newKey)
|
||||
}
|
||||
/*
|
||||
if k := val.(string); k != "" {
|
||||
tableInfo.Cache.Delete(k)
|
||||
}*/
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -425,6 +434,7 @@ func (self *SqlQuery) InvalidateForDDL(ddl *DDLInvalidate, noOutput *string) (er
|
|||
if ddlPlan.Action == 0 {
|
||||
panic(NewTabletError(FAIL, "DDL is not understood"))
|
||||
}
|
||||
invalidationStats.Add("DDL", 1)
|
||||
self.schemaInfo.DropTable(ddlPlan.TableName)
|
||||
if ddlPlan.Action != sqlparser.DROP { // CREATE, ALTER, RENAME
|
||||
self.schemaInfo.CreateTable(ddlPlan.NewName)
|
||||
|
@ -486,7 +496,7 @@ func (self *SqlQuery) execPK(plan *CompiledPlan) (result *QueryResult) {
|
|||
|
||||
func (self *SqlQuery) execSubquery(plan *CompiledPlan) (result *QueryResult) {
|
||||
innerResult := self.qFetch(plan, plan.Subquery, nil)
|
||||
return self.fetchPKRows(plan, innerResult.Rows)
|
||||
return self.fetchPKRows(plan, copyRows(innerResult.Rows))
|
||||
}
|
||||
|
||||
func (self *SqlQuery) fetchPKRows(plan *CompiledPlan, pkRows [][]interface{}) (result *QueryResult) {
|
||||
|
@ -500,7 +510,9 @@ func (self *SqlQuery) fetchPKRows(plan *CompiledPlan, pkRows [][]interface{}) (r
|
|||
for _, pk := range pkRows {
|
||||
key := buildKey(tableInfo, pk)
|
||||
if cacheRow := tableInfo.Cache.Get(key); cacheRow != nil {
|
||||
//self.validateRow(plan, cacheRow, pk)
|
||||
/*if dbrow := self.validateRow(plan, cacheRow, pk); dbrow != nil {
|
||||
rows = append(rows, applyFilter(plan.ColumnNumbers, dbrow))
|
||||
}*/
|
||||
rows = append(rows, applyFilter(plan.ColumnNumbers, cacheRow))
|
||||
hits++
|
||||
} else {
|
||||
|
@ -528,8 +540,8 @@ func (self *SqlQuery) fetchPKRows(plan *CompiledPlan, pkRows [][]interface{}) (r
|
|||
func (self *SqlQuery) validateRow(plan *CompiledPlan, cacheRow []interface{}, pk []interface{}) (dbrow []interface{}) {
|
||||
resultFromdb := self.qFetch(plan, plan.OuterQuery, pk)
|
||||
if len(resultFromdb.Rows) != 1 {
|
||||
relog.Warning("dbrow not found for: %v", pk)
|
||||
return
|
||||
relog.Warning("unexpected number of rows for %v: %d", pk, len(resultFromdb.Rows))
|
||||
return nil
|
||||
}
|
||||
dbrow = resultFromdb.Rows[0]
|
||||
for i := 0; i < len(cacheRow); i++ {
|
||||
|
|
|
@ -148,6 +148,7 @@ func (self *TableInfo) initRowCache(conn *DBConnection, tableType string, create
|
|||
for col := range self.PKColumns {
|
||||
if self.ColumnCategory[col] == schema.CAT_OTHER {
|
||||
relog.Info("Table %s pk has unsupported column types. Will not be cached.", self.Name)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Загрузка…
Ссылка в новой задаче