schema: use vitess type system

This commit is contained in:
Sugu Sougoumarane 2015-11-05 21:03:18 -08:00
Родитель 90c5c2d5ad
Коммит be45074302
18 изменённых файлов: 166 добавлений и 93 удалений

Просмотреть файл

@ -8,32 +8,24 @@ package schema
// It contains a data structure that's shared between sqlparser & tabletserver
import (
"strings"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/sync2"
)
// Column categories
const (
CAT_OTHER = iota
CAT_NUMBER
CAT_VARBINARY
"github.com/youtube/vitess/go/vt/proto/query"
)
// Cache types
const (
CACHE_NONE = 0
CACHE_RW = 1
CACHE_W = 2
CacheNone = 0
CacheRW = 1
CacheW = 2
)
// TableColumn contains info about a table's column.
type TableColumn struct {
Name string
Category int
IsAuto bool
Default sqltypes.Value
Name string
Type query.Type
IsAuto bool
Default sqltypes.Value
}
// Table contains info about a table.
@ -61,16 +53,10 @@ func NewTable(name string) *Table {
}
// AddColumn adds a column to the Table.
func (ta *Table) AddColumn(name string, columnType string, defval sqltypes.Value, extra string) {
func (ta *Table) AddColumn(name string, columnType query.Type, defval sqltypes.Value, extra string) {
index := len(ta.Columns)
ta.Columns = append(ta.Columns, TableColumn{Name: name})
if strings.Contains(columnType, "int") {
ta.Columns[index].Category = CAT_NUMBER
} else if strings.HasPrefix(columnType, "varbinary") {
ta.Columns[index].Category = CAT_VARBINARY
} else {
ta.Columns[index].Category = CAT_OTHER
}
ta.Columns[index].Type = columnType
if extra == "auto_increment" {
ta.Columns[index].IsAuto = true
// Ignore default value, if any
@ -79,7 +65,7 @@ func (ta *Table) AddColumn(name string, columnType string, defval sqltypes.Value
if defval.IsNull() {
return
}
if ta.Columns[index].Category == CAT_NUMBER {
if sqltypes.IsIntegral(ta.Columns[index].Type) {
ta.Columns[index].Default = sqltypes.MakeNumeric(defval.Raw())
} else {
ta.Columns[index].Default = sqltypes.MakeString(defval.Raw())

Просмотреть файл

@ -176,12 +176,11 @@ func validateValue(col *schema.TableColumn, value sqltypes.Value) error {
if value.IsNull() {
return nil
}
switch col.Category {
case schema.CAT_NUMBER:
if sqltypes.IsIntegral(col.Type) {
if !value.IsNumeric() {
return NewTabletError(ErrFail, vtrpc.ErrorCode_BAD_INPUT, "type mismatch, expecting numeric type for %v for column: %v", value, col)
}
case schema.CAT_VARBINARY:
} else if col.Type == sqltypes.VarBinary {
if !value.IsString() {
return NewTabletError(ErrFail, vtrpc.ErrorCode_BAD_INPUT, "type mismatch, expecting string type for %v for column: %v", value, col)
}

Просмотреть файл

@ -11,13 +11,14 @@ import (
"testing"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/vt/proto/query"
"github.com/youtube/vitess/go/vt/schema"
)
func TestCodexBuildValuesList(t *testing.T) {
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
// simple PK clause. e.g. where pk1 = 1
@ -201,7 +202,7 @@ func TestCodexResolvePKValues(t *testing.T) {
testUtils := newTestUtils()
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
key := "var"
bindVariables := make(map[string]interface{})
@ -237,7 +238,7 @@ func TestCodexResolveListArg(t *testing.T) {
testUtils := newTestUtils()
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
key := "var"
@ -264,7 +265,7 @@ func TestCodexBuildSecondaryList(t *testing.T) {
pk2 := "pk2"
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{pk1, pk2})
// set pk2 = 'xyz' where pk1=1 and pk2 = 'abc'
@ -295,7 +296,7 @@ func TestCodexBuildStreamComment(t *testing.T) {
pk2 := "pk2"
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{pk1, pk2})
// set pk2 = 'xyz' where pk1=1 and pk2 = 'abc'
@ -318,7 +319,7 @@ func TestCodexResolveValueWithIncompatibleValueType(t *testing.T) {
testUtils := newTestUtils()
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
_, err := resolveValue(tableInfo.GetPKColumn(0), 0, nil)
testUtils.checkTabletError(t, err, ErrFail, "incompatible value type ")
@ -328,7 +329,7 @@ func TestCodexValidateRow(t *testing.T) {
testUtils := newTestUtils()
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
// #columns and #rows do not match
err := validateRow(&tableInfo, []int{1}, []sqltypes.Value{})
@ -414,7 +415,7 @@ func TestCodexApplyFilterWithPKDefaults(t *testing.T) {
testUtils := newTestUtils()
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
output := applyFilterWithPKDefaults(&tableInfo, []int{-1}, []sqltypes.Value{})
if len(output) != 1 {
@ -432,7 +433,7 @@ func TestCodexValidateKey(t *testing.T) {
queryServiceStats := NewQueryServiceStats("", false)
tableInfo := createTableInfo("Table",
[]string{"pk1", "pk2", "col1"},
[]string{"int", "varbinary(128)", "int"},
[]query.Type{sqltypes.Int64, sqltypes.VarBinary, sqltypes.Int32},
[]string{"pk1", "pk2"})
// validate empty key
newKey := validateKey(&tableInfo, "", queryServiceStats)
@ -473,14 +474,14 @@ func TestCodexUnicoded(t *testing.T) {
}
func createTableInfo(
name string, colNames []string, colTypes []string, pKeys []string) TableInfo {
name string, colNames []string, colTypes []query.Type, pKeys []string) TableInfo {
table := schema.NewTable(name)
for i, colName := range colNames {
colType := colTypes[i]
defaultVal := sqltypes.Value{}
if strings.Contains(colType, "int") {
if sqltypes.IsIntegral(colType) {
defaultVal = sqltypes.MakeNumeric([]byte("0"))
} else if strings.HasPrefix(colType, "varbinary") {
} else if colType == sqltypes.VarBinary {
defaultVal = sqltypes.MakeString([]byte(""))
}
table.AddColumn(colName, colType, defaultVal, "")

Просмотреть файл

@ -42,8 +42,8 @@ func TestUncacheableTables(t *testing.T) {
t.Errorf("%s: table vitess_nocache not found in schema", tcase.create)
continue
}
if table.CacheType != schema.CACHE_NONE {
t.Errorf("CacheType: %d, want %d", table.CacheType, schema.CACHE_NONE)
if table.CacheType != schema.CacheNone {
t.Errorf("CacheType: %d, want %d", table.CacheType, schema.CacheNone)
}
}
}
@ -54,16 +54,16 @@ func TestOverrideTables(t *testing.T) {
cacheType int
}{{
table: "vitess_cached2",
cacheType: schema.CACHE_RW,
cacheType: schema.CacheRW,
}, {
table: "vitess_view",
cacheType: schema.CACHE_RW,
cacheType: schema.CacheRW,
}, {
table: "vitess_part1",
cacheType: schema.CACHE_W,
cacheType: schema.CacheW,
}, {
table: "vitess_part2",
cacheType: schema.CACHE_W,
cacheType: schema.CacheW,
}}
for _, tcase := range testCases {
table, ok := framework.DebugSchema()[tcase.table]

Просмотреть файл

@ -187,7 +187,7 @@ func analyzeSelect(sel *sqlparser.Select, getTable TableGetter) (plan *ExecPlan,
}
// Further improvements possible only if table is row-cached
if tableInfo.CacheType == schema.CACHE_NONE || tableInfo.CacheType == schema.CACHE_W {
if tableInfo.CacheType == schema.CacheNone || tableInfo.CacheType == schema.CacheW {
plan.Reason = ReasonNocache
return plan, nil
}

Просмотреть файл

@ -84,7 +84,7 @@ func (qre *QueryExecutor) Execute() (reply *mproto.QueryResult, err error) {
defer conn.Recycle()
conn.RecordQuery(qre.query)
var invalidator CacheInvalidator
if qre.plan.TableInfo != nil && qre.plan.TableInfo.CacheType != schema.CACHE_NONE {
if qre.plan.TableInfo != nil && qre.plan.TableInfo.CacheType != schema.CacheNone {
invalidator = conn.DirtyKeys(qre.plan.TableName)
}
switch qre.plan.PlanID {
@ -186,7 +186,7 @@ func (qre *QueryExecutor) execDmlAutoCommit() (reply *mproto.QueryResult, err er
defer conn.Recycle()
conn.RecordQuery(qre.query)
var invalidator CacheInvalidator
if qre.plan.TableInfo != nil && qre.plan.TableInfo.CacheType != schema.CACHE_NONE {
if qre.plan.TableInfo != nil && qre.plan.TableInfo.CacheType != schema.CacheNone {
invalidator = conn.DirtyKeys(qre.plan.TableName)
}
switch qre.plan.PlanID {

Просмотреть файл

@ -1063,9 +1063,9 @@ func initQueryExecutorTestDB(db *fakesqldb.DB) {
func getTestTableFields() []mproto.Field {
return []mproto.Field{
mproto.Field{Name: "pk", Type: mproto.VT_LONG},
mproto.Field{Name: "name", Type: mproto.VT_LONG},
mproto.Field{Name: "addr", Type: mproto.VT_LONG},
mproto.Field{Name: "pk", Type: mysql.TypeLong},
mproto.Field{Name: "name", Type: mysql.TypeLong},
mproto.Field{Name: "addr", Type: mysql.TypeLong},
}
}
@ -1125,6 +1125,18 @@ func getQueryExecutorSupportedQueries() map[string]*mproto.QueryResult {
},
},
},
"select * from `test_table` where 1 != 1": &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}, {
Name: "name",
Type: mysql.TypeLong,
}, {
Name: "addr",
Type: mysql.TypeLong,
}},
},
"describe `test_table`": &mproto.QueryResult{
RowsAffected: 3,
Rows: [][]sqltypes.Value{

Просмотреть файл

@ -19,9 +19,9 @@ func getSchemaInfo() *SchemaInfo {
Name: "test_table",
}
zero, _ := sqltypes.BuildValue(0)
table.AddColumn("id", "int", zero, "")
table.AddColumn("id2", "int", zero, "")
table.AddColumn("count", "int", zero, "")
table.AddColumn("id", sqltypes.Int64, zero, "")
table.AddColumn("id2", sqltypes.Int64, zero, "")
table.AddColumn("count", sqltypes.Int64, zero, "")
table.PKColumns = []int{0}
primaryIndex := table.AddIndex("PRIMARY")
primaryIndex.AddColumn("id", 12345)
@ -35,7 +35,7 @@ func getSchemaInfo() *SchemaInfo {
tableNoPK := &schema.Table{
Name: "test_table_no_pk",
}
tableNoPK.AddColumn("id", "int", zero, "")
tableNoPK.AddColumn("id", sqltypes.Int64, zero, "")
tableNoPK.PKColumns = []int{}
tables["test_table_no_pk"] = &TableInfo{Table: tableNoPK}

Просмотреть файл

@ -12,7 +12,6 @@ import (
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/proto/vtrpc"
"github.com/youtube/vitess/go/vt/schema"
"golang.org/x/net/context"
)
@ -170,7 +169,7 @@ func (rc *RowCache) decodeRow(b []byte) (row []sqltypes.Value) {
// Corrupt data
return nil
}
if rc.tableInfo.Columns[i].Category == schema.CAT_NUMBER {
if sqltypes.IsIntegral(rc.tableInfo.Columns[i].Type) {
row[i] = sqltypes.MakeNumeric(data[:length])
} else {
row[i] = sqltypes.MakeString(data[:length])

Просмотреть файл

@ -187,7 +187,7 @@ func (rci *RowcacheInvalidator) handleDMLEvent(event *blproto.StreamEvent) {
if tableInfo == nil {
panic(NewTabletError(ErrFail, vtrpc.ErrorCode_BAD_INPUT, "Table %s not found", event.TableName))
}
if tableInfo.CacheType == schema.CACHE_NONE {
if tableInfo.CacheType == schema.CacheNone {
return
}
@ -251,7 +251,7 @@ func (rci *RowcacheInvalidator) handleUnrecognizedEvent(sql string) {
rci.qe.queryServiceStats.InternalErrors.Add("Invalidation", 1)
return
}
if tableInfo.CacheType == schema.CACHE_NONE {
if tableInfo.CacheType == schema.CacheNone {
return
}

Просмотреть файл

@ -233,10 +233,10 @@ func (si *SchemaInfo) override() {
}
switch override.Cache.Type {
case "RW":
table.CacheType = schema.CACHE_RW
table.CacheType = schema.CacheRW
table.Cache = NewRowCache(table, si.cachePool)
case "W":
table.CacheType = schema.CACHE_W
table.CacheType = schema.CacheW
if override.Cache.Table == "" {
log.Warningf("Incomplete cache specs: %v", override)
continue
@ -381,7 +381,7 @@ func (si *SchemaInfo) CreateOrUpdateTable(ctx context.Context, tableName string)
}
si.tables[tableName] = tableInfo
if tableInfo.CacheType == schema.CACHE_NONE {
if tableInfo.CacheType == schema.CacheNone {
log.Infof("Initialized table: %s", tableName)
} else {
log.Infof("Initialized cached table: %s, prefix: %s", tableName, tableInfo.Cache.prefix)
@ -555,7 +555,7 @@ func (si *SchemaInfo) getRowcacheStats() map[string]int64 {
defer si.mu.Unlock()
tstats := make(map[string]int64)
for k, v := range si.tables {
if v.CacheType != schema.CACHE_NONE {
if v.CacheType != schema.CacheNone {
hits, absent, misses, _ := v.Stats()
tstats[k+".Hits"] = hits
tstats[k+".Absent"] = absent
@ -570,7 +570,7 @@ func (si *SchemaInfo) getRowcacheInvalidations() map[string]int64 {
defer si.mu.Unlock()
tstats := make(map[string]int64)
for k, v := range si.tables {
if v.CacheType != schema.CACHE_NONE {
if v.CacheType != schema.CacheNone {
_, _, _, invalidations := v.Stats()
tstats[k] = invalidations
}
@ -745,7 +745,7 @@ func (si *SchemaInfo) handleHTTPTableStats(response http.ResponseWriter, request
si.mu.Lock()
defer si.mu.Unlock()
for k, v := range si.tables {
if v.CacheType != schema.CACHE_NONE {
if v.CacheType != schema.CacheNone {
temp.hits, temp.absent, temp.misses, temp.invalidations = v.Stats()
tstats[k] = temp
totals.hits += temp.hits

Просмотреть файл

@ -14,6 +14,7 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
@ -148,7 +149,7 @@ func TestSchemaInfoOpenFailedDueToTableInfoErr(t *testing.T) {
createTestTableBaseShowTable("test_table"),
},
})
db.AddQuery("describe `test_table`", &mproto.QueryResult{
db.AddQuery("select * from `test_table` where 1 != 1", &mproto.QueryResult{
// this will cause NewTableInfo error
RowsAffected: math.MaxUint64,
})
@ -180,14 +181,14 @@ func TestSchemaInfoOpenWithSchemaOverride(t *testing.T) {
// test cache type RW
schemaInfo.Open(&appParams, &dbaParams, schemaOverrides, true)
testTableInfo := schemaInfo.GetTable("test_table_01")
if testTableInfo.Table.CacheType != schema.CACHE_RW {
if testTableInfo.Table.CacheType != schema.CacheRW {
t.Fatalf("test_table_01's cache type should be RW")
}
schemaInfo.Close()
// test cache type W
schemaInfo.Open(&appParams, &dbaParams, schemaOverrides, true)
testTableInfo = schemaInfo.GetTable("test_table_02")
if testTableInfo.Table.CacheType != schema.CACHE_W {
if testTableInfo.Table.CacheType != schema.CacheW {
t.Fatalf("test_table_02's cache type should be W")
}
schemaInfo.Close()
@ -235,11 +236,16 @@ func TestSchemaInfoReload(t *testing.T) {
},
})
db.AddQuery("select * from `test_table_04` where 1 != 1", &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
})
db.AddQuery("describe `test_table_04`", &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{createTestTableDescribe("pk")},
})
db.AddQuery("show index from `test_table_04`", &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{createTestTableShowIndex("pk")},
@ -490,6 +496,13 @@ func TestUpdatedMysqlStats(t *testing.T) {
createTestTableBaseShowTable(tableName),
},
})
q = fmt.Sprintf("select * from `%s` where 1 != 1", tableName)
db.AddQuery(q, &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
})
q = fmt.Sprintf("describe `%s`", tableName)
db.AddQuery(q, &mproto.QueryResult{
RowsAffected: 1,
@ -797,6 +810,12 @@ func getSchemaInfoTestSupportedQueries() map[string]*mproto.QueryResult {
},
},
},
"select * from `test_table_01` where 1 != 1": &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
},
"describe `test_table_01`": &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{
@ -810,6 +829,12 @@ func getSchemaInfoTestSupportedQueries() map[string]*mproto.QueryResult {
},
},
},
"select * from `test_table_02` where 1 != 1": &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
},
"describe `test_table_02`": &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{
@ -823,6 +848,12 @@ func getSchemaInfoTestSupportedQueries() map[string]*mproto.QueryResult {
},
},
},
"select * from `test_table_03` where 1 != 1": &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
},
"describe `test_table_03`": &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{

Просмотреть файл

@ -30,7 +30,7 @@ var (
schemazTmpl = template.Must(template.New("example").Parse(`
{{$top := .}}{{with .Table}}<tr class="low">
<td>{{.Name}}</td>
<td>{{range .Columns}}{{.Name}}: {{index $top.ColumnCategory .Category}}, {{if .IsAuto}}autoinc{{end}}, {{.Default}}<br>{{end}}</td>
<td>{{range .Columns}}{{.Name}}: {{.Type}}, {{if .IsAuto}}autoinc{{end}}, {{.Default}}<br>{{end}}</td>
<td>{{range .Indexes}}{{.Name}}: ({{range .Columns}}{{.}},{{end}}), ({{range .Cardinality}}{{.}},{{end}})<br>{{end}}</td>
<td>{{index $top.CacheType .CacheType}}</td>
<td>{{.TableRows.Get}}</td>
@ -75,12 +75,10 @@ func schemazHandler(tables []*schema.Table, w http.ResponseWriter, r *http.Reque
}
sort.Sort(&sorter)
envelope := struct {
ColumnCategory []string
CacheType []string
Table *schema.Table
CacheType []string
Table *schema.Table
}{
ColumnCategory: []string{"other", "number", "varbinary"},
CacheType: []string{"none", "read-write", "write-only"},
CacheType: []string{"none", "read-write", "write-only"},
}
for _, Value := range sorter.rows {
envelope.Table = Value

Просмотреть файл

@ -23,17 +23,17 @@ func TestSchamazHandler(t *testing.T) {
tableB := schema.NewTable("b")
tableC := schema.NewTable("c")
tableA.AddColumn("column1", "int", sqltypes.MakeNumeric([]byte("0")), "auto_increment")
tableA.AddColumn("column1", sqltypes.Int64, sqltypes.MakeNumeric([]byte("0")), "auto_increment")
tableA.AddIndex("index1").AddColumn("index_column", 1000)
tableA.CacheType = schema.CACHE_RW
tableA.CacheType = schema.CacheRW
tableB.AddColumn("column2", "string", sqltypes.MakeString([]byte("NULL")), "")
tableB.AddColumn("column2", sqltypes.VarChar, sqltypes.MakeString([]byte("NULL")), "")
tableB.AddIndex("index2").AddColumn("index_column2", 200)
tableB.CacheType = schema.CACHE_W
tableB.CacheType = schema.CacheW
tableC.AddColumn("column3", "string", sqltypes.MakeString([]byte("")), "")
tableC.AddColumn("column3", sqltypes.VarChar, sqltypes.MakeString([]byte("")), "")
tableC.AddIndex("index3").AddColumn("index_column3", 500)
tableC.CacheType = schema.CACHE_NONE
tableC.CacheType = schema.CacheNone
tables := []*schema.Table{
tableA, tableB, tableC,
@ -42,7 +42,7 @@ func TestSchamazHandler(t *testing.T) {
body, _ := ioutil.ReadAll(resp.Body)
tableCPattern := []string{
`<td>c</td>`,
`<td>column3: other, , <br></td>`,
`<td>column3: VARCHAR, , <br></td>`,
`<td>index3: \(index_column3,\), \(500,\)<br></td>`,
`<td>none</td>`,
}
@ -55,7 +55,7 @@ func TestSchamazHandler(t *testing.T) {
}
tableBPattern := []string{
`<td>b</td>`,
`<td>column2: other, , NULL<br></td>`,
`<td>column2: VARCHAR, , NULL<br></td>`,
`<td>index2: \(index_column2,\), \(200,\)<br></td>`,
`<td>write-only</td>`,
}
@ -68,7 +68,7 @@ func TestSchamazHandler(t *testing.T) {
}
tableAPattern := []string{
`<td>a</td>`,
`<td>column1: number, autoinc, <br></td>`,
`<td>column1: INT64, autoinc, <br></td>`,
`<td>index1: \(index_column,\), \(1000,\)<br></td>`,
`<td>read-write</td>`,
}

Просмотреть файл

@ -10,7 +10,9 @@ import (
"strings"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sqltypes"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/proto/query"
"github.com/youtube/vitess/go/vt/schema"
"golang.org/x/net/context"
)
@ -46,12 +48,29 @@ func loadTableInfo(conn *DBConn, tableName string) (ti *TableInfo, err error) {
}
func (ti *TableInfo) fetchColumns(conn *DBConn) error {
qr, err := conn.Exec(context.Background(), fmt.Sprintf("select * from `%s` where 1 != 1", ti.Name), 10000, true)
if err != nil {
return err
}
fieldTypes := make(map[string]query.Type, len(qr.Fields))
for _, field := range qr.Fields {
fieldTypes[field.Name], err = sqltypes.MySQLToType(field.Type, field.Flags)
if err != nil {
return err
}
}
columns, err := conn.Exec(context.Background(), fmt.Sprintf("describe `%s`", ti.Name), 10000, false)
if err != nil {
return err
}
for _, row := range columns.Rows {
ti.AddColumn(row[0].String(), row[1].String(), row[4], row[5].String())
name := row[0].String()
columnType, ok := fieldTypes[name]
if !ok {
log.Warningf("Table: %s, column %s not found in select list, skipping.", ti.Name, name)
continue
}
ti.AddColumn(name, columnType, row[4], row[5].String())
}
return nil
}
@ -155,13 +174,14 @@ func (ti *TableInfo) initRowCache(conn *DBConn, tableType string, comment string
return
}
for _, col := range ti.PKColumns {
if ti.Columns[col].Category == schema.CAT_OTHER {
log.Infof("Table %s pk has unsupported column types. Will not be cached.", ti.Name)
return
if sqltypes.IsIntegral(ti.Columns[col].Type) || ti.Columns[col].Type == sqltypes.VarBinary {
continue
}
log.Infof("Table %s pk has unsupported column types. Will not be cached.", ti.Name)
return
}
ti.CacheType = schema.CACHE_RW
ti.CacheType = schema.CacheRW
ti.Cache = NewRowCache(ti, cachePool)
}

Просмотреть файл

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/youtube/vitess/go/mysql"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/sqldb"
"github.com/youtube/vitess/go/sqltypes"
@ -102,6 +103,12 @@ func TestTableInfoWithoutRowCacheViaNoPKColumn(t *testing.T) {
fakecacheservice.Register()
db := fakesqldb.Register()
db.AddQuery("show index from `test_table`", &mproto.QueryResult{})
db.AddQuery("select * from `test_table` where 1 != 1", &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}},
})
db.AddQuery("describe `test_table`", &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{
@ -145,12 +152,18 @@ func TestTableInfoWithoutRowCacheViaUnknownPKColumnType(t *testing.T) {
},
},
})
db.AddQuery("select * from `test_table` where 1 != 1", &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeNewDecimal,
}},
})
db.AddQuery("describe `test_table`", &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{
[]sqltypes.Value{
sqltypes.MakeString([]byte("pk")),
sqltypes.MakeString([]byte("unknown_type")),
sqltypes.MakeString([]byte("decimal")),
sqltypes.MakeString([]byte{}),
sqltypes.MakeString([]byte{}),
sqltypes.MakeString([]byte("1")),
@ -260,7 +273,7 @@ func TestTableInfoInvalidCardinalityInIndex(t *testing.T) {
defer cachePool.Close()
tableInfo, err := newTestTableInfo(cachePool, "USER_TABLE", "test table", db)
if err != nil {
t.Fatalf("failed to create a table info")
t.Fatalf("failed to create a table info: %v", err)
}
if len(tableInfo.PKColumns) != 1 {
t.Fatalf("table should have one PK column although the cardinality is invalid")
@ -309,6 +322,18 @@ func newTestTableInfoCachePool() *CachePool {
func getTestTableInfoQueries() map[string]*mproto.QueryResult {
return map[string]*mproto.QueryResult{
"select * from `test_table` where 1 != 1": &mproto.QueryResult{
Fields: []mproto.Field{{
Name: "pk",
Type: mysql.TypeLong,
}, {
Name: "name",
Type: mysql.TypeLong,
}, {
Name: "addr",
Type: mysql.TypeLong,
}},
},
"describe `test_table`": &mproto.QueryResult{
RowsAffected: 3,
Rows: [][]sqltypes.Value{

Просмотреть файл

@ -1490,6 +1490,9 @@ func getSupportedQueries() map[string]*mproto.QueryResult {
"select * from test_table where 1 != 1": &mproto.QueryResult{
Fields: getTestTableFields(),
},
"select * from `test_table` where 1 != 1": &mproto.QueryResult{
Fields: getTestTableFields(),
},
baseShowTables: &mproto.QueryResult{
RowsAffected: 1,
Rows: [][]sqltypes.Value{

Просмотреть файл

@ -8,7 +8,6 @@ import (
"github.com/youtube/vitess/go/vt/key"
tproto "github.com/youtube/vitess/go/vt/tabletserver/proto"
pbq "github.com/youtube/vitess/go/vt/proto/query"
pb "github.com/youtube/vitess/go/vt/proto/vtgate"
)