Signed-off-by: Nitin Shatma <nitin.sharma@airbnb.com>
This commit is contained in:
Nitin Shatma 2019-09-24 15:00:49 -07:00
Родитель b736b17924
Коммит a4ce7d81e0
1 изменённых файлов: 19 добавлений и 20 удалений

Просмотреть файл

@ -348,7 +348,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
for i, typ := range tm.Types {
t, err := sqltypes.MySQLToType(int64(typ), int64(tm.Flags))
if err != nil {
return nil, fmt.Errorf("unsupported type: %d", typ)
return nil, fmt.Errorf("unsupported type: %d, position: %d", typ, i)
}
cols = append(cols, schema.TableColumn{
Name: sqlparser.NewColIdent(fmt.Sprintf("@%d", i+1)),
@ -358,27 +358,26 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
st := vs.se.GetTable(sqlparser.NewTableIdent(tm.Name))
if st == nil && !vs.filter.BestEffortNameInFieldEvent {
return nil, fmt.Errorf("unknown table %v in schema", tm.Name)
} else {
if len(st.Columns) < len(tm.Types) && !vs.filter.BestEffortNameInFieldEvent {
return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(st.Columns), ev)
}
tableName = st.Name.String()
// check if the schema returned by schema.Engine matches with row.
schemaMatch := true
if len(tm.Types) == len(st.Columns) {
for i := range tm.Types {
t, _ := sqltypes.MySQLToType(int64(tm.Types[i]), int64(tm.Flags))
if t != st.Columns[i].Type {
schemaMatch = false
break
}
}
if len(st.Columns) < len(tm.Types) && !vs.filter.BestEffortNameInFieldEvent {
return nil, fmt.Errorf("cannot determine table columns for %s: event has %d columns, current schema has %d: %#v", tm.Name, len(tm.Types), len(st.Columns), ev)
}
tableName = st.Name.String()
// check if the schema returned by schema.Engine matches with row.
schemaMatch := true
if len(tm.Types) == len(st.Columns) {
for i := range tm.Types {
t, _ := sqltypes.MySQLToType(int64(tm.Types[i]), int64(tm.Flags))
if t != st.Columns[i].Type {
schemaMatch = false
break
}
} else {
schemaMatch = false
}
if schemaMatch || !vs.filter.BestEffortNameInFieldEvent {
cols = st.Columns[:len(tm.Types)]
}
} else {
schemaMatch = false
}
if schemaMatch || !vs.filter.BestEffortNameInFieldEvent {
cols = st.Columns[:len(tm.Types)]
}
table := &Table{