From bf31119e76f8ea2bf8a49c49ccda46d3089cf232 Mon Sep 17 00:00:00 2001 From: shrutip Date: Fri, 19 Jul 2013 18:38:13 -0700 Subject: [PATCH] Fix for update stream test. --- go/vt/mysqlctl/binlog_parser.go | 44 +++++++++--------- go/vt/tabletserver/rowcache_invalidator.go | 52 +++++++++++----------- py/vtdb/update_stream_service.py | 14 +++--- test/rowcache_invalidator.py | 4 +- test/update_stream.py | 10 ++--- third_party/mysql.patch | 2 +- 6 files changed, 63 insertions(+), 63 deletions(-) diff --git a/go/vt/mysqlctl/binlog_parser.go b/go/vt/mysqlctl/binlog_parser.go index 375c879ce0..9be6c81533 100644 --- a/go/vt/mysqlctl/binlog_parser.go +++ b/go/vt/mysqlctl/binlog_parser.go @@ -175,8 +175,8 @@ func (pos *BinlogPosition) decodeReplCoordBson(buf *bytes.Buffer, kind byte) { //Api Interface type UpdateResponse struct { Error string - BinlogPosition - EventData + Coord BinlogPosition + Data EventData } type EventData struct { @@ -189,7 +189,7 @@ type EventData struct { //Raw event buffer used to gather data during parsing. type eventBuffer struct { - BinlogPosition + Coord BinlogPosition LogLine []byte firstKw string } @@ -202,11 +202,11 @@ func NewEventBuffer(pos *BinlogPosition, line []byte) *eventBuffer { relog.Warning("Logline not properly copied while creating new event written: %v len: %v", written, len(line)) } //The RelayPosition is never used, so using the default value - buf.BinlogPosition.Position = ReplicationCoordinates{RelayFilename: pos.Position.RelayFilename, + buf.Coord.Position = ReplicationCoordinates{RelayFilename: pos.Position.RelayFilename, MasterFilename: pos.Position.MasterFilename, MasterPosition: pos.Position.MasterPosition, } - buf.BinlogPosition.Timestamp = pos.Timestamp + buf.Coord.Timestamp = pos.Timestamp return buf } @@ -579,7 +579,7 @@ func (blp *Blp) extractEventTimestamp(event *eventBuffer) { panic(NewBinlogParseError(CODE_ERROR, fmt.Sprintf("Error in extracting timestamp %v", err))) } blp.currentPosition.Timestamp = currentTimestamp - event.BinlogPosition.Timestamp = currentTimestamp + event.Coord.Timestamp = currentTimestamp } func (blp *Blp) parseRotateEvent(line []byte) { @@ -649,7 +649,7 @@ func (blp *Blp) handleCommitEvent(sendReply SendUpdateStreamResponse, commitEven } } } - commitEvent.BinlogPosition.Xid = blp.currentPosition.Xid + commitEvent.Coord.Xid = blp.currentPosition.Xid blp.txnLineBuffer = append(blp.txnLineBuffer, commitEvent) //txn block for DMLs, parse it and send events for a txn var dmlCount uint @@ -756,8 +756,8 @@ func buildTxnResponse(trxnLineBuffer []*eventBuffer) (txnResponseList []*UpdateR line = event.LogLine if bytes.HasPrefix(line, BINLOG_BEGIN) { streamBuf := new(UpdateResponse) - streamBuf.BinlogPosition = event.BinlogPosition - streamBuf.SqlType = BEGIN + streamBuf.Coord = event.Coord + streamBuf.Data.SqlType = BEGIN txnResponseList = append(txnResponseList, streamBuf) continue } @@ -792,7 +792,7 @@ func buildTxnResponse(trxnLineBuffer []*eventBuffer) (txnResponseList []*UpdateR streamComment := string(line[commentIndex+len(STREAM_COMMENT_START):]) eventNodeTree = parseStreamComment(streamComment, autoincId) dmlType = GetDmlType(event.firstKw) - response := createUpdateResponse(eventNodeTree, dmlType, event.BinlogPosition) + response := createUpdateResponse(eventNodeTree, dmlType, event.Coord) txnResponseList = append(txnResponseList, response) autoincId = 0 dmlBuffer = dmlBuffer[:0] @@ -900,11 +900,11 @@ func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos BinlogP pkColLen := pkColNamesNode.Len() response = new(UpdateResponse) - response.BinlogPosition = blpPos - response.SqlType = dmlType - response.TableName = tableName - response.PkColNames = pkColNames - response.PkValues = make([][]interface{}, 0, len(eventTree.Sub[2:])) + response.Coord = blpPos + response.Data.SqlType = dmlType + response.Data.TableName = tableName + response.Data.PkColNames = pkColNames + response.Data.PkValues = make([][]interface{}, 0, len(eventTree.Sub[2:])) rowPk := make([]interface{}, pkColLen) for _, node := range eventTree.Sub[2:] { @@ -913,7 +913,7 @@ func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos BinlogP panic(NewBinlogParseError(EVENT_ERROR, "Error in the stream comment, length of pk values doesn't match column names.")) } rowPk = encodePkValues(node.Sub) - response.PkValues = append(response.PkValues, rowPk) + response.Data.PkValues = append(response.Data.PkValues, rowPk) } return response } @@ -959,7 +959,7 @@ func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *Binlo streamBuf := new(UpdateResponse) streamBuf.Error = inputErr.Error() if blpPos != nil { - streamBuf.BinlogPosition = *blpPos + streamBuf.Coord = *blpPos } buf := []*UpdateResponse{streamBuf} _ = sendStream(sendReply, buf) @@ -968,17 +968,17 @@ func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *Binlo //This creates the response for COMMIT event. func createCommitEvent(eventBuf *eventBuffer) (streamBuf *UpdateResponse) { streamBuf = new(UpdateResponse) - streamBuf.BinlogPosition = eventBuf.BinlogPosition - streamBuf.SqlType = COMMIT + streamBuf.Coord = eventBuf.Coord + streamBuf.Data.SqlType = COMMIT return } //This creates the response for DDL event. func createDdlStream(lineBuffer *eventBuffer) (ddlStream *UpdateResponse) { ddlStream = new(UpdateResponse) - ddlStream.BinlogPosition = lineBuffer.BinlogPosition - ddlStream.SqlType = DDL - ddlStream.Sql = string(lineBuffer.LogLine) + ddlStream.Coord = lineBuffer.Coord + ddlStream.Data.SqlType = DDL + ddlStream.Data.Sql = string(lineBuffer.LogLine) return ddlStream } diff --git a/go/vt/tabletserver/rowcache_invalidator.go b/go/vt/tabletserver/rowcache_invalidator.go index 5b7513d755..bd0386d4bb 100644 --- a/go/vt/tabletserver/rowcache_invalidator.go +++ b/go/vt/tabletserver/rowcache_invalidator.go @@ -178,7 +178,7 @@ func (rowCache *InvalidationProcessor) invalidateEvent(response interface{}) err if !ok { return NewInvalidationError(FATAL_ERROR, "Invalid Reponse type", "") } - rowCache.currentPosition = &updateResponse.BinlogPosition + rowCache.currentPosition = &updateResponse.Coord return rowCache.processEvent(updateResponse) } @@ -277,8 +277,8 @@ func isCheckpointValid(checkpoint, repl *mysqlctl.ReplicationCoordinates) bool { func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateResponse) error { position := "" - if event.BinlogPosition.Valid() { - position = event.BinlogPosition.String() + if event.Coord.Valid() { + position = event.Coord.String() } if event.Error != "" { relog.Error("Update stream returned error '%v'", event.Error) @@ -291,13 +291,13 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon return nil } - if !event.BinlogPosition.Valid() { + if !event.Coord.Valid() { rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "no error, position is not set", "")) return nil } var err error - switch event.EventData.SqlType { + switch event.Data.SqlType { case mysqlctl.DDL: err = rowCache.handleDdlEvent(event) if err != nil { @@ -330,8 +330,8 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.UpdateRespon rowCache.dmlBuffer = append(rowCache.dmlBuffer, dml) } default: - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.EventData.SqlType, event.EventData.Sql), position)) - //return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.EventData.SqlType, event.EventData.Sql)) + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql), position)) + //return NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Unknown SqlType, %v %v", event.Data.SqlType, event.Data.Sql)) } return nil } @@ -345,15 +345,15 @@ func isDmlEvent(sqlType string) bool { } func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateResponse) (*proto.DmlType, error) { - if !isDmlEvent(event.SqlType) { - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.SqlType), event.BinlogPosition.String())) + if !isDmlEvent(event.Data.SqlType) { + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Bad Dml type, '%v'", event.Data.SqlType), event.Coord.String())) return nil, nil } dml := new(proto.DmlType) - dml.Table = event.TableName - dml.Keys = make([]interface{}, 0, len(event.PkValues)) - sqlTypeKeys := make([]sqltypes.Value, 0, len(event.PkColNames)) - for _, pkTuple := range event.PkValues { + dml.Table = event.Data.TableName + dml.Keys = make([]interface{}, 0, len(event.Data.PkValues)) + sqlTypeKeys := make([]sqltypes.Value, 0, len(event.Data.PkColNames)) + for _, pkTuple := range event.Data.PkValues { sqlTypeKeys = sqlTypeKeys[:0] if len(pkTuple) == 0 { continue @@ -361,7 +361,7 @@ func (rowCache *InvalidationProcessor) buildDmlData(event *mysqlctl.UpdateRespon for _, pkVal := range pkTuple { key, err := sqltypes.BuildValue(pkVal) if err != nil { - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.BinlogPosition.String())) + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, fmt.Sprintf("Error building invalidation key '%v'", err), event.Coord.String())) return nil, nil } sqlTypeKeys = append(sqlTypeKeys, key) @@ -379,9 +379,9 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes defer func() { if x := recover(); x != nil { if terr, ok := x.(*TabletError); ok { - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.BinlogPosition.String())) + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), commitEvent.Coord.String())) } else { - err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.BinlogPosition.String()) + err = NewInvalidationError(FATAL_ERROR, "handleTxn failed", commitEvent.Coord.String()) } } }() @@ -391,9 +391,9 @@ func (rowCache *InvalidationProcessor) handleTxn(commitEvent *mysqlctl.UpdateRes } rowCache.encBuf = rowCache.encBuf[:0] cacheInvalidate := new(proto.CacheInvalidate) - rowCache.encBuf, err = bson.Marshal(&commitEvent.BinlogPosition) + rowCache.encBuf, err = bson.Marshal(&commitEvent.Coord) if err != nil { - return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.BinlogPosition.String()) + return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), commitEvent.Coord.String()) } cacheInvalidate.Position = rowCache.encBuf cacheInvalidate.Dmls = make([]proto.DmlType, 0, len(rowCache.dmlBuffer)) @@ -409,26 +409,26 @@ func (rowCache *InvalidationProcessor) handleDdlEvent(ddlEvent *mysqlctl.UpdateR defer func() { if x := recover(); x != nil { if terr, ok := x.(*TabletError); ok { - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.BinlogPosition.String())) + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, terr.Error(), ddlEvent.Coord.String())) } else { - err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.BinlogPosition.String()) + err = NewInvalidationError(FATAL_ERROR, "ddlEvent failed", ddlEvent.Coord.String()) } } }() - if ddlEvent.Sql == "" { - rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.BinlogPosition.String())) + if ddlEvent.Data.Sql == "" { + rowCache.updateErrCounters(NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String())) return nil - //return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.BinlogPosition.String()) + //return NewInvalidationError(INVALID_EVENT, "Empty ddl sql", ddlEvent.Coord.String()) } rowCache.encBuf = rowCache.encBuf[:0] ddlInvalidate := new(proto.DDLInvalidate) - rowCache.encBuf, err = bson.Marshal(&ddlEvent.BinlogPosition) + rowCache.encBuf, err = bson.Marshal(&ddlEvent.Coord) if err != nil { - return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.BinlogPosition.String()) + return NewInvalidationError(FATAL_ERROR, fmt.Sprintf("Error in encoding position, %v", err), ddlEvent.Coord.String()) } ddlInvalidate.Position = rowCache.encBuf - ddlInvalidate.DDL = ddlEvent.Sql + ddlInvalidate.DDL = ddlEvent.Data.Sql InvalidateForDDL(ddlInvalidate) return nil } diff --git a/py/vtdb/update_stream_service.py b/py/vtdb/update_stream_service.py index 6f28ed1f47..d9ddf4ec1a 100755 --- a/py/vtdb/update_stream_service.py +++ b/py/vtdb/update_stream_service.py @@ -17,7 +17,7 @@ class ReplPosition(object): self.MasterFilename = master_filename self.MasterPosition = master_position -class BinlogPosition(object): +class Coord(object): Position = None Timestamp = None Xid = None @@ -51,8 +51,8 @@ class EventData(object): class UpdateStreamResponse(object): - BinlogPosition = None - EventData = None + Coord = None + Data = None Error = None def __init__(self, response_dict): @@ -64,8 +64,8 @@ class UpdateStreamResponse(object): self.Error = None else: self.Error = self.raw_response['Error'] - self.BinlogPosition = self.raw_response['BinlogPosition'] - self.EventData = EventData(self.raw_response['EventData']).__dict__ + self.Coord = self.raw_response['Coord'] + self.Data = EventData(self.raw_response['Data']).__dict__ class UpdateStreamConnection(object): def __init__(self, addr, timeout, user=None, password=None, encrypted=False, keyfile=None, certfile=None): @@ -90,7 +90,7 @@ class UpdateStreamConnection(object): except: logging.exception('gorpc low-level error') raise - return update_stream_response.BinlogPosition, update_stream_response.EventData, update_stream_response.Error + return update_stream_response.Coord, update_stream_response.Data, update_stream_response.Error def stream_next(self): try: @@ -103,4 +103,4 @@ class UpdateStreamConnection(object): except: logging.exception('gorpc low-level error') raise - return update_stream_response.BinlogPosition, update_stream_response.EventData, update_stream_response.Error + return update_stream_response.Coord, update_stream_response.Data, update_stream_response.Error diff --git a/test/rowcache_invalidator.py b/test/rowcache_invalidator.py index 7398981eed..a27e848875 100755 --- a/test/rowcache_invalidator.py +++ b/test/rowcache_invalidator.py @@ -41,7 +41,7 @@ primary key (id) def _get_master_current_position(): res = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status') - start_position = update_stream_service.BinlogPosition(res[0][0], res[0][1]) + start_position = update_stream_service.Coord(res[0][0], res[0][1]) return start_position.__dict__ @@ -55,7 +55,7 @@ def _get_repl_current_position(): slave_dict = res[0] master_log = slave_dict['File'] master_pos = slave_dict['Position'] - start_position = update_stream_service.BinlogPosition(master_log, master_pos) + start_position = update_stream_service.Coord(master_log, master_pos) return start_position.__dict__ diff --git a/test/update_stream.py b/test/update_stream.py index f63d1d36e0..58ea38ec86 100755 --- a/test/update_stream.py +++ b/test/update_stream.py @@ -28,7 +28,7 @@ GLOBAL_MASTER_START_POSITION = None def _get_master_current_position(): res = utils.mysql_query(62344, 'vt_test_keyspace', 'show master status') - start_position = update_stream_service.BinlogPosition(res[0][0], res[0][1]) + start_position = update_stream_service.Coord(res[0][0], res[0][1]) return start_position.__dict__ @@ -42,7 +42,7 @@ def _get_repl_current_position(): slave_dict = res[0] master_log = slave_dict['File'] master_pos = slave_dict['Position'] - start_position = update_stream_service.BinlogPosition(master_log, master_pos) + start_position = update_stream_service.Coord(master_log, master_pos) return start_position.__dict__ @@ -205,10 +205,10 @@ def run_test_service_enabled(): #time.sleep(20) while(1): binlog_pos, data, err = replica_conn.stream_next() - if err != None and err == "Disconnecting because the Update Stream service has been disabled": + if err is not None and err == "Fatal Service Error: Disconnecting because the Update Stream service has been disabled": disabled_err = True break - if data['SqlType'] == 'COMMIT': + if data is not None and data['SqlType'] == 'COMMIT': txn_count +=1 if not disabled_err: @@ -217,6 +217,7 @@ def run_test_service_enabled(): except Exception, e: print "Exception: %s" % str(e) print traceback.print_exc() + raise utils.TestError("Update stream returned error '%s'", str(e)) utils.debug("Streamed %d transactions before exiting" % txn_count) def _vtdb_conn(host): @@ -278,7 +279,6 @@ def run_test_stream_parity(): break if len(master_tuples) != len(replica_tuples): utils.debug("Test Failed - # of records mismatch, master %s replica %s" % (master_tuples, replica_tuples)) - print len(master_tuples), len(replica_tuples) for master_val, replica_val in zip(master_tuples, replica_tuples): master_data = master_val[1] replica_data = replica_val[1] diff --git a/third_party/mysql.patch b/third_party/mysql.patch index 098ef791e1..dd8cd82b23 100644 --- a/third_party/mysql.patch +++ b/third_party/mysql.patch @@ -55,7 +55,7 @@ index 8158783..8b137c0 100644 $(top_srcdir)/mysys/base64.c mysqlbinlog_LDADD = $(LDADD) $(CXXLDFLAGS) -+vt_mysqlbinlog_SOURCES = mysqlbinlog.cc \ ++vt_mysqlbinlog_SOURCES = vt_mysqlbinlog.cc \ + $(top_srcdir)/mysys/checksum.c \ + $(top_srcdir)/mysys/mf_tempdir.c \ + $(top_srcdir)/mysys/my_new.cc \