зеркало из https://github.com/github/vitess-gh.git
Merge pull request #10 from youtube/update-stream-fix
Fix for update stream test.
This commit is contained in:
Коммит
f8617ad83c
|
@ -175,8 +175,8 @@ func (pos *BinlogPosition) decodeReplCoordBson(buf *bytes.Buffer, kind byte) {
|
|||
//Api Interface
|
||||
type UpdateResponse struct {
|
||||
Error string
|
||||
BinlogPosition
|
||||
EventData
|
||||
Coord BinlogPosition
|
||||
Data EventData
|
||||
}
|
||||
|
||||
type EventData struct {
|
||||
|
@ -189,7 +189,7 @@ type EventData struct {
|
|||
|
||||
//Raw event buffer used to gather data during parsing.
|
||||
type eventBuffer struct {
|
||||
BinlogPosition
|
||||
Coord BinlogPosition
|
||||
LogLine []byte
|
||||
firstKw string
|
||||
}
|
||||
|
@ -202,11 +202,11 @@ func NewEventBuffer(pos *BinlogPosition, line []byte) *eventBuffer {
|
|||
relog.Warning("Logline not properly copied while creating new event written: %v len: %v", written, len(line))
|
||||
}
|
||||
//The RelayPosition is never used, so using the default value
|
||||
buf.BinlogPosition.Position = ReplicationCoordinates{RelayFilename: pos.Position.RelayFilename,
|
||||
buf.Coord.Position = ReplicationCoordinates{RelayFilename: pos.Position.RelayFilename,
|
||||
MasterFilename: pos.Position.MasterFilename,
|
||||
MasterPosition: pos.Position.MasterPosition,
|
||||
}
|
||||
buf.BinlogPosition.Timestamp = pos.Timestamp
|
||||
buf.Coord.Timestamp = pos.Timestamp
|
||||
return buf
|
||||
}
|
||||
|
||||
|
@ -579,7 +579,7 @@ func (blp *Blp) extractEventTimestamp(event *eventBuffer) {
|
|||
panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting timestamp %v", err)))
|
||||
}
|
||||
blp.currentPosition.Timestamp = currentTimestamp
|
||||
event.BinlogPosition.Timestamp = currentTimestamp
|
||||
event.Coord.Timestamp = currentTimestamp
|
||||
}
|
||||
|
||||
func (blp *Blp) parseRotateEvent(line []byte) {
|
||||
|
@ -649,7 +649,7 @@ func (blp *Blp) handleCommitEvent(sendReply SendUpdateStreamResponse, commitEven
|
|||
}
|
||||
}
|
||||
}
|
||||
commitEvent.BinlogPosition.Xid = blp.currentPosition.Xid
|
||||
commitEvent.Coord.Xid = blp.currentPosition.Xid
|
||||
blp.txnLineBuffer = append(blp.txnLineBuffer, commitEvent)
|
||||
//txn block for DMLs, parse it and send events for a txn
|
||||
var dmlCount uint
|
||||
|
@ -756,8 +756,8 @@ func buildTxnResponse(trxnLineBuffer []*eventBuffer) (txnResponseList []*UpdateR
|
|||
line = event.LogLine
|
||||
if bytes.HasPrefix(line, BINLOG_BEGIN) {
|
||||
streamBuf := new(UpdateResponse)
|
||||
streamBuf.BinlogPosition = event.BinlogPosition
|
||||
streamBuf.SqlType = BEGIN
|
||||
streamBuf.Coord = event.Coord
|
||||
streamBuf.Data.SqlType = BEGIN
|
||||
txnResponseList = append(txnResponseList, streamBuf)
|
||||
continue
|
||||
}
|
||||
|
@ -792,7 +792,7 @@ func buildTxnResponse(trxnLineBuffer []*eventBuffer) (txnResponseList []*UpdateR
|
|||
streamComment := string(line[commentIndex+len(STREAM_COMMENT_START):])
|
||||
eventNodeTree = parseStreamComment(streamComment, autoincId)
|
||||
dmlType = GetDmlType(event.firstKw)
|
||||
response := createUpdateResponse(eventNodeTree, dmlType, event.BinlogPosition)
|
||||
response := createUpdateResponse(eventNodeTree, dmlType, event.Coord)
|
||||
txnResponseList = append(txnResponseList, response)
|
||||
autoincId = 0
|
||||
dmlBuffer = dmlBuffer[:0]
|
||||
|
@ -900,11 +900,11 @@ func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos BinlogP
|
|||
pkColLen := pkColNamesNode.Len()
|
||||
|
||||
response = new(UpdateResponse)
|
||||
response.BinlogPosition = blpPos
|
||||
response.SqlType = dmlType
|
||||
response.TableName = tableName
|
||||
response.PkColNames = pkColNames
|
||||
response.PkValues = make([][]interface{}, 0, len(eventTree.Sub[2:]))
|
||||
response.Coord = blpPos
|
||||
response.Data.SqlType = dmlType
|
||||
response.Data.TableName = tableName
|
||||
response.Data.PkColNames = pkColNames
|
||||
response.Data.PkValues = make([][]interface{}, 0, len(eventTree.Sub[2:]))
|
||||
|
||||
rowPk := make([]interface{}, pkColLen)
|
||||
for _, node := range eventTree.Sub[2:] {
|
||||
|
@ -913,7 +913,7 @@ func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos BinlogP
|
|||
panic(NewBinlogParseError(EVENT_ERROR, "Error in the stream comment, length of pk values doesn't match column names."))
|
||||
}
|
||||
rowPk = encodePkValues(node.Sub)
|
||||
response.PkValues = append(response.PkValues, rowPk)
|
||||
response.Data.PkValues = append(response.Data.PkValues, rowPk)
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
@ -959,7 +959,7 @@ func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *Binlo
|
|||
streamBuf := new(UpdateResponse)
|
||||
streamBuf.Error = inputErr.Error()
|
||||
if blpPos != nil {
|
||||
streamBuf.BinlogPosition = *blpPos
|
||||
streamBuf.Coord = *blpPos
|
||||
}
|
||||
buf := []*UpdateResponse{streamBuf}
|
||||
_ = sendStream(sendReply, buf)
|
||||
|
@ -968,17 +968,17 @@ func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *Binlo
|
|||
//This creates the response for COMMIT event.
|
||||
func createCommitEvent(eventBuf *eventBuffer) (streamBuf *UpdateResponse) {
|
||||
streamBuf = new(UpdateResponse)
|
||||
streamBuf.BinlogPosition = eventBuf.BinlogPosition
|
||||
streamBuf.SqlType = COMMIT
|
||||
streamBuf.Coord = eventBuf.Coord
|
||||
streamBuf.Data.SqlType = COMMIT
|
||||
return
|
||||
}
|
||||
|
||||
//This creates the response for DDL event.
|
||||
func createDdlStream(lineBuffer *eventBuffer) (ddlStream *UpdateResponse) {
|
||||
ddlStream = new(UpdateResponse)
|
||||
ddlStream.BinlogPosition = lineBuffer.BinlogPosition
|
||||
ddlStream.SqlType = DDL
|
||||
ddlStream.Sql = string(lineBuffer.LogLine)
|
||||
ddlStream.Coord = lineBuffer.Coord
|
||||
ddlStream.Data.SqlType = DDL
|
||||
ddlStream.Data.Sql = string(lineBuffer.LogLine)
|
||||
return ddlStream
|
||||
}
|
||||
|
||||
|
|
|
@ -178,7 +178,7 @@ func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) err
|
|||
if !ok {
|
||||
return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "")
|
||||
}
|
||||
rowCache.currentPosition = &updateResponse.BinlogPosition
|
||||
rowCache.currentPosition = &updateResponse.Coord
|
||||
return rowCache.processEvent(updateResponse)
|
||||
}
|
||||
|
||||
|
@ -277,8 +277,8 @@ func isCheckpointValid(checkpoint, repl *mysqlctl.ReplicationCoordinates) bool {
|
|||
|
||||
func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateResponse) error {
|
||||
position := ""
|
||||
if event.BinlogPosition.Valid() {
|
||||
position = event.BinlogPosition.String()
|
||||
if event.Coord.Valid() {
|
||||
position = event.Coord.String()
|
||||
}
|
||||
if event.Error != "" {
|
||||
relog.Error("Update stream returned error '%v'", event.Error)
|
||||
|
@ -291,13 +291,13 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon
|
|||
return nil
|
||||
}
|
||||
|
||||
if !event.BinlogPosition.Valid() {
|
||||
if !event.Coord.Valid() {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "no error, position is not set", ""))
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
switch event.EventData.SqlType {
|
||||
switch event.Data.SqlType {
|
||||
case mysqlctl.DDL:
|
||||
err = rowCache.handleDdlEvent(event)
|
||||
if err != nil {
|
||||
|
@ -330,8 +330,8 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon
|
|||
rowCache.dmlBuffer = append(rowCache.dmlBuffer, dml)
|
||||
}
|
||||
default:
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.EventData.SqlType, event.EventData.Sql), position))
|
||||
//return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.EventData.SqlType, event.EventData.Sql))
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql), position))
|
||||
//return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -345,15 +345,15 @@ func isDmlEvent(sqlType string) bool {
|
|||
}
|
||||
|
||||
func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateResponse) (*proto.DmlType, error) {
|
||||
if !isDmlEvent(event.SqlType) {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.SqlType), event.BinlogPosition.String()))
|
||||
if !isDmlEvent(event.Data.SqlType) {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.Data.SqlType), event.Coord.String()))
|
||||
return nil, nil
|
||||
}
|
||||
dml := new(proto.DmlType)
|
||||
dml.Table = event.TableName
|
||||
dml.Keys = make([]interface{}, 0, len(event.PkValues))
|
||||
sqlTypeKeys := make([]sqltypes.Value, 0, len(event.PkColNames))
|
||||
for _, pkTuple := range event.PkValues {
|
||||
dml.Table = event.Data.TableName
|
||||
dml.Keys = make([]interface{}, 0, len(event.Data.PkValues))
|
||||
sqlTypeKeys := make([]sqltypes.Value, 0, len(event.Data.PkColNames))
|
||||
for _, pkTuple := range event.Data.PkValues {
|
||||
sqlTypeKeys = sqlTypeKeys[:0]
|
||||
if len(pkTuple) == 0 {
|
||||
continue
|
||||
|
@ -361,7 +361,7 @@ func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateRespon
|
|||
for _, pkVal := range pkTuple {
|
||||
key, err := sqltypes.BuildValue(pkVal)
|
||||
if err != nil {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.BinlogPosition.String()))
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.Coord.String()))
|
||||
return nil, nil
|
||||
}
|
||||
sqlTypeKeys = append(sqlTypeKeys, key)
|
||||
|
@ -379,9 +379,9 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes
|
|||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
if terr, ok := x.(*TabletError); ok {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.BinlogPosition.String()))
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.Coord.String()))
|
||||
} else {
|
||||
err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String())
|
||||
err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.Coord.String())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
@ -391,9 +391,9 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes
|
|||
}
|
||||
rowCache.encBuf = rowCache.encBuf[:0]
|
||||
cacheInvalidate := new(proto.CacheInvalidate)
|
||||
rowCache.encBuf, err = bson.Marshal(&commitEvent.BinlogPosition)
|
||||
rowCache.encBuf, err = bson.Marshal(&commitEvent.Coord)
|
||||
if err != nil {
|
||||
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String())
|
||||
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.Coord.String())
|
||||
}
|
||||
cacheInvalidate.Position = rowCache.encBuf
|
||||
cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer))
|
||||
|
@ -409,26 +409,26 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR
|
|||
defer func() {
|
||||
if x := recover(); x != nil {
|
||||
if terr, ok := x.(*TabletError); ok {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.BinlogPosition.String()))
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.Coord.String()))
|
||||
} else {
|
||||
err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String())
|
||||
err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.Coord.String())
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if ddlEvent.Sql == "" {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.BinlogPosition.String()))
|
||||
if ddlEvent.Data.Sql == "" {
|
||||
rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String()))
|
||||
return nil
|
||||
//return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.BinlogPosition.String())
|
||||
//return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String())
|
||||
}
|
||||
rowCache.encBuf = rowCache.encBuf[:0]
|
||||
ddlInvalidate := new(proto.DDLInvalidate)
|
||||
rowCache.encBuf, err = bson.Marshal(&ddlEvent.BinlogPosition)
|
||||
rowCache.encBuf, err = bson.Marshal(&ddlEvent.Coord)
|
||||
if err != nil {
|
||||
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String())
|
||||
return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.Coord.String())
|
||||
}
|
||||
ddlInvalidate.Position = rowCache.encBuf
|
||||
ddlInvalidate.DDL = ddlEvent.Sql
|
||||
ddlInvalidate.DDL = ddlEvent.Data.Sql
|
||||
InvalidateForDDL(ddlInvalidate)
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ class ReplPosition(object):
|
|||
self.MasterFilename = master_filename
|
||||
self.MasterPosition = master_position
|
||||
|
||||
class BinlogPosition(object):
|
||||
class Coord(object):
|
||||
Position = None
|
||||
Timestamp = None
|
||||
Xid = None
|
||||
|
@ -51,8 +51,8 @@ class EventData(object):
|
|||
|
||||
|
||||
class UpdateStreamResponse(object):
|
||||
BinlogPosition = None
|
||||
EventData = None
|
||||
Coord = None
|
||||
Data = None
|
||||
Error = None
|
||||
|
||||
def __init__(self, response_dict):
|
||||
|
@ -64,8 +64,8 @@ class UpdateStreamResponse(object):
|
|||
self.Error = None
|
||||
else:
|
||||
self.Error = self.raw_response['Error']
|
||||
self.BinlogPosition = self.raw_response['BinlogPosition']
|
||||
self.EventData = EventData(self.raw_response['EventData']).__dict__
|
||||
self.Coord = self.raw_response['Coord']
|
||||
self.Data = EventData(self.raw_response['Data']).__dict__
|
||||
|
||||
class UpdateStreamConnection(object):
|
||||
def __init__(self, addr, timeout, user=None, password=None, encrypted=False, keyfile=None, certfile=None):
|
||||
|
@ -90,7 +90,7 @@ class UpdateStreamConnection(object):
|
|||
except:
|
||||
logging.exception('gorpc low-level error')
|
||||
raise
|
||||
return update_stream_response.BinlogPosition, update_stream_response.EventData, update_stream_response.Error
|
||||
return update_stream_response.Coord, update_stream_response.Data, update_stream_response.Error
|
||||
|
||||
def stream_next(self):
|
||||
try:
|
||||
|
@ -103,4 +103,4 @@ class UpdateStreamConnection(object):
|
|||
except:
|
||||
logging.exception('gorpc low-level error')
|
||||
raise
|
||||
return update_stream_response.BinlogPosition, update_stream_response.EventData, update_stream_response.Error
|
||||
return update_stream_response.Coord, update_stream_response.Data, update_stream_response.Error
|
||||
|
|
|
@ -41,7 +41,7 @@ primary key (id)
|
|||
|
||||
def _get_master_current_position():
|
||||
res = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status')
|
||||
start_position = update_stream_service.BinlogPosition(res[0][0], res[0][1])
|
||||
start_position = update_stream_service.Coord(res[0][0], res[0][1])
|
||||
return start_position.__dict__
|
||||
|
||||
|
||||
|
@ -55,7 +55,7 @@ def _get_repl_current_position():
|
|||
slave_dict = res[0]
|
||||
master_log = slave_dict['File']
|
||||
master_pos = slave_dict['Position']
|
||||
start_position = update_stream_service.BinlogPosition(master_log, master_pos)
|
||||
start_position = update_stream_service.Coord(master_log, master_pos)
|
||||
return start_position.__dict__
|
||||
|
||||
|
||||
|
|
|
@ -28,7 +28,7 @@ GLOBAL_MASTER_START_POSITION = None
|
|||
|
||||
def _get_master_current_position():
|
||||
res = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status')
|
||||
start_position = update_stream_service.BinlogPosition(res[0][0], res[0][1])
|
||||
start_position = update_stream_service.Coord(res[0][0], res[0][1])
|
||||
return start_position.__dict__
|
||||
|
||||
|
||||
|
@ -42,7 +42,7 @@ def _get_repl_current_position():
|
|||
slave_dict = res[0]
|
||||
master_log = slave_dict['File']
|
||||
master_pos = slave_dict['Position']
|
||||
start_position = update_stream_service.BinlogPosition(master_log, master_pos)
|
||||
start_position = update_stream_service.Coord(master_log, master_pos)
|
||||
return start_position.__dict__
|
||||
|
||||
|
||||
|
@ -205,10 +205,10 @@ def run_test_service_enabled():
|
|||
#time.sleep(20)
|
||||
while(1):
|
||||
binlog_pos, data, err = replica_conn.stream_next()
|
||||
if err != None and err == "Disconnecting because the Update Stream service has been disabled":
|
||||
if err is not None and err == "Fatal Service Error: Disconnecting because the Update Stream service has been disabled":
|
||||
disabled_err = True
|
||||
break
|
||||
if data['SqlType'] == 'COMMIT':
|
||||
if data is not None and data['SqlType'] == 'COMMIT':
|
||||
txn_count +=1
|
||||
|
||||
if not disabled_err:
|
||||
|
@ -217,6 +217,7 @@ def run_test_service_enabled():
|
|||
except Exception, e:
|
||||
print "Exception: %s" % str(e)
|
||||
print traceback.print_exc()
|
||||
raise utils.TestError("Update stream returned error '%s'", str(e))
|
||||
utils.debug("Streamed %d transactions before exiting" % txn_count)
|
||||
|
||||
def _vtdb_conn(host):
|
||||
|
@ -278,7 +279,6 @@ def run_test_stream_parity():
|
|||
break
|
||||
if len(master_tuples) != len(replica_tuples):
|
||||
utils.debug("Test Failed - # of records mismatch, master %s replica %s" % (master_tuples, replica_tuples))
|
||||
print len(master_tuples), len(replica_tuples)
|
||||
for master_val, replica_val in zip(master_tuples, replica_tuples):
|
||||
master_data = master_val[1]
|
||||
replica_data = replica_val[1]
|
||||
|
|
|
@ -55,7 +55,7 @@ index 8158783..8b137c0 100644
|
|||
$(top_srcdir)/mysys/base64.c
|
||||
mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS)
|
||||
|
||||
+vt_mysqlbinlog_SOURCES = mysqlbinlog.cc \
|
||||
+vt_mysqlbinlog_SOURCES = vt_mysqlbinlog.cc \
|
||||
+ $(top_srcdir)/mysys/checksum.c \
|
||||
+ $(top_srcdir)/mysys/mf_tempdir.c \
|
||||
+ $(top_srcdir)/mysys/my_new.cc \
|
||||
|
|
Загрузка…
Ссылка в новой задаче