Now moving BinlogPosition into mysqlctl/proto.

And unifying it with BlPosition.
This commit is contained in:
Alain Jobart 2013-08-01 18:08:10 -07:00
Родитель 078c91ba1a
Коммит d93747081a
7 изменённых файлов: 86 добавлений и 92 удалений

Просмотреть файл

@ -576,8 +576,8 @@ func (blp *BinlogPlayer) processBinlogEvent(binlogResponse *mysqlctl.BinlogRespo
blp.flushTxnBatch()
}
}
if binlogResponse.BlPosition.Position.MasterFilename != "" {
panic(fmt.Errorf("Error encountered at position %v, err: '%v'", binlogResponse.BlPosition.Position.String(), binlogResponse.Error))
if binlogResponse.BinlogPosition.Position.MasterFilename != "" {
panic(fmt.Errorf("Error encountered at position %v, err: '%v'", binlogResponse.BinlogPosition.Position.String(), binlogResponse.Error))
} else {
panic(fmt.Errorf("Error encountered from server %v", binlogResponse.Error))
}

Просмотреть файл

@ -15,8 +15,6 @@ import (
"strings"
"time"
"github.com/youtube/vitess/go/bson"
"github.com/youtube/vitess/go/bytes2"
"github.com/youtube/vitess/go/relog"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
parser "github.com/youtube/vitess/go/vt/sqlparser"
@ -73,64 +71,10 @@ var (
SEMICOLON_BYTE = []byte(";")
)
type BinlogPosition struct {
Position proto.ReplicationCoordinates
Timestamp int64
Xid uint64
}
func (pos *BinlogPosition) String() string {
return fmt.Sprintf("%v:%v", pos.Position.MasterFilename, pos.Position.MasterPosition)
}
func (pos *BinlogPosition) Valid() bool {
if pos.Position.MasterFilename == "" || pos.Position.MasterPosition == 0 {
return false
}
return true
}
func (pos *BinlogPosition) MarshalBson(buf *bytes2.ChunkedWriter) {
lenWriter := bson.NewLenWriter(buf)
bson.EncodePrefix(buf, bson.Object, "Position")
pos.Position.MarshalBson(buf)
bson.EncodePrefix(buf, bson.Long, "Timestamp")
bson.EncodeUint64(buf, uint64(pos.Timestamp))
bson.EncodePrefix(buf, bson.Ulong, "Xid")
bson.EncodeUint64(buf, pos.Xid)
buf.WriteByte(0)
lenWriter.RecordLen()
}
func (pos *BinlogPosition) UnmarshalBson(buf *bytes.Buffer) {
bson.Next(buf, 4)
kind := bson.NextByte(buf)
for kind != bson.EOO {
key := bson.ReadCString(buf)
switch key {
case "Position":
pos.Position = proto.ReplicationCoordinates{}
pos.Position.UnmarshalBson(buf)
case "Timestamp":
pos.Timestamp = bson.DecodeInt64(buf, kind)
case "Xid":
pos.Xid = bson.DecodeUint64(buf, kind)
default:
panic(bson.NewBsonError("Unrecognized tag %s", key))
}
kind = bson.NextByte(buf)
}
}
//Api Interface
type UpdateResponse struct {
Error string
Coord BinlogPosition
Coord proto.BinlogPosition
Data EventData
}
@ -144,12 +88,12 @@ type EventData struct {
//Raw event buffer used to gather data during parsing.
type eventBuffer struct {
Coord BinlogPosition
Coord proto.BinlogPosition
LogLine []byte
firstKw string
}
func NewEventBuffer(pos *BinlogPosition, line []byte) *eventBuffer {
func NewEventBuffer(pos *proto.BinlogPosition, line []byte) *eventBuffer {
buf := &eventBuffer{}
buf.LogLine = make([]byte, len(line))
written := copy(buf.LogLine, line)
@ -192,7 +136,7 @@ type Blp struct {
responseStream []*UpdateResponse
initialSeek bool
startPosition *proto.ReplicationCoordinates
currentPosition *BinlogPosition
currentPosition *proto.BinlogPosition
globalState *UpdateStream
dbmatch bool
blpStats
@ -201,7 +145,7 @@ type Blp struct {
func NewBlp(startCoordinates *proto.ReplicationCoordinates, updateStream *UpdateStream) *Blp {
blp := &Blp{}
blp.startPosition = startCoordinates
blp.currentPosition = &BinlogPosition{Position: *startCoordinates}
blp.currentPosition = &proto.BinlogPosition{Position: *startCoordinates}
blp.inTxn = false
blp.initialSeek = true
blp.txnLineBuffer = make([]*eventBuffer, 0, MAX_TXN_BATCH)
@ -760,7 +704,7 @@ func parseStreamComment(dmlComment string, autoincId uint64) (EventNode *parser.
}
//This builds UpdateResponse from the parsed tree, also handles a multi-row update.
func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos BinlogPosition) (response *UpdateResponse) {
func createUpdateResponse(eventTree *parser.Node, dmlType string, blpPos proto.BinlogPosition) (response *UpdateResponse) {
if eventTree.Len() < 3 {
panic(NewBinlogParseError(EVENT_ERROR, fmt.Sprintf("Invalid comment structure, len of tree %v", eventTree.Len())))
}
@ -832,7 +776,7 @@ func sendStream(sendReply SendUpdateStreamResponse, responseBuf []*UpdateRespons
}
//This sends the error to the client.
func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *BinlogPosition) {
func SendError(sendReply SendUpdateStreamResponse, inputErr error, blpPos *proto.BinlogPosition) {
streamBuf := new(UpdateResponse)
streamBuf.Error = inputErr.Error()
if blpPos != nil {

Просмотреть файл

@ -8,15 +8,9 @@ import (
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
type BlPosition struct {
Position proto.ReplicationCoordinates
Timestamp int64
Xid uint64
}
type BinlogResponse struct {
Error string
BlPosition
proto.BinlogPosition
BinlogData
}

Просмотреть файл

@ -65,12 +65,12 @@ type BinlogServer struct {
//Raw event buffer used to gather data during parsing.
type blsEventBuffer struct {
BlPosition
proto.BinlogPosition
LogLine []byte
firstKw string
}
func NewBlsEventBuffer(pos *BlPosition, line []byte) *blsEventBuffer {
func NewBlsEventBuffer(pos *proto.BinlogPosition, line []byte) *blsEventBuffer {
buf := &blsEventBuffer{}
buf.LogLine = make([]byte, len(line))
//buf.LogLine = append(buf.LogLine, line...)
@ -78,8 +78,8 @@ func NewBlsEventBuffer(pos *BlPosition, line []byte) *blsEventBuffer {
if written < len(line) {
relog.Warning("Problem in copying logline to new buffer, written %v, len %v", written, len(line))
}
buf.BlPosition = *pos
buf.BlPosition.Timestamp = pos.Timestamp
buf.BinlogPosition = *pos
buf.BinlogPosition.Timestamp = pos.Timestamp
return buf
}
@ -90,7 +90,7 @@ type Bls struct {
responseStream []*BinlogResponse
initialSeek bool
startPosition *proto.ReplicationCoordinates
currentPosition *BlPosition
currentPosition *proto.BinlogPosition
dbmatch bool
keyspaceRange key.KeyRange
keyrangeTag string
@ -105,7 +105,7 @@ func NewBls(startCoordinates *proto.ReplicationCoordinates, blServer *BinlogServ
blp := &Bls{}
blp.startPosition = startCoordinates
blp.keyspaceRange = *keyRange
blp.currentPosition = &BlPosition{}
blp.currentPosition = &proto.BinlogPosition{}
blp.currentPosition.Position = *startCoordinates
blp.inTxn = false
blp.initialSeek = true
@ -415,7 +415,7 @@ func (blp *Bls) extractEventTimestamp(event *blsEventBuffer) {
panic(NewBinlogServerError(fmt.Sprintf("Error in extracting timestamp %v, sql %v", err, string(line))))
}
blp.currentPosition.Timestamp = currentTimestamp
event.BlPosition.Timestamp = currentTimestamp
event.BinlogPosition.Timestamp = currentTimestamp
}
func (blp *Bls) parseRotateEvent(line []byte) {
@ -458,7 +458,7 @@ func (blp *Bls) handleBeginEvent(event *blsEventBuffer) {
//This creates the response for DDL event.
func blsCreateDdlStream(lineBuffer *blsEventBuffer) (ddlStream *BinlogResponse) {
ddlStream = new(BinlogResponse)
ddlStream.BlPosition = lineBuffer.BlPosition
ddlStream.BinlogPosition = lineBuffer.BinlogPosition
ddlStream.SqlType = DDL
ddlStream.Sql = make([]string, 0, 1)
ddlStream.Sql = append(ddlStream.Sql, string(lineBuffer.LogLine))
@ -479,8 +479,8 @@ func (blp *Bls) handleCommitEvent(sendReply SendUpdateStreamResponse, commitEven
return
}
commitEvent.BlPosition.Xid = blp.currentPosition.Xid
commitEvent.BlPosition.Position.GroupId = blp.currentPosition.Position.GroupId
commitEvent.BinlogPosition.Xid = blp.currentPosition.Xid
commitEvent.BinlogPosition.Position.GroupId = blp.currentPosition.Position.GroupId
blp.txnLineBuffer = append(blp.txnLineBuffer, commitEvent)
//txn block for DMLs, parse it and send events for a txn
var dmlCount int64
@ -512,7 +512,7 @@ func (blp *Bls) buildTxnResponse() (txnResponseList []*BinlogResponse, dmlCount
line = event.LogLine
if bytes.HasPrefix(line, BINLOG_BEGIN) {
streamBuf := new(BinlogResponse)
streamBuf.BlPosition = event.BlPosition
streamBuf.BinlogPosition = event.BinlogPosition
streamBuf.SqlType = BEGIN
txnResponseList = append(txnResponseList, streamBuf)
continue
@ -557,7 +557,7 @@ func (blp *Bls) createDmlEvent(eventBuf *blsEventBuffer, keyspaceId string) (dml
//parse keyspace id
//for inserts check for index comments
dmlEvent = new(BinlogResponse)
dmlEvent.BlPosition = eventBuf.BlPosition
dmlEvent.BinlogPosition = eventBuf.BinlogPosition
dmlEvent.SqlType = GetDmlType(eventBuf.firstKw)
dmlEvent.KeyspaceId = keyspaceId
indexType, indexId, userId := parseIndex(eventBuf.LogLine)
@ -633,7 +633,7 @@ func parseIndex(sql []byte) (indexName string, indexId interface{}, userId uint6
//This creates the response for COMMIT event.
func blsCreateCommitEvent(eventBuf *blsEventBuffer) (streamBuf *BinlogResponse) {
streamBuf = new(BinlogResponse)
streamBuf.BlPosition = eventBuf.BlPosition
streamBuf.BinlogPosition = eventBuf.BinlogPosition
streamBuf.SqlType = COMMIT
return
}
@ -660,12 +660,12 @@ func blsSendStream(sendReply SendUpdateStreamResponse, responseBuf []*BinlogResp
}
//This sends the error to the client.
func sendError(sendReply SendUpdateStreamResponse, reqIdentifier string, inputErr error, blpPos *BlPosition) {
func sendError(sendReply SendUpdateStreamResponse, reqIdentifier string, inputErr error, blpPos *proto.BinlogPosition) {
var err error
streamBuf := new(BinlogResponse)
streamBuf.Error = inputErr.Error()
if blpPos != nil {
streamBuf.BlPosition = *blpPos
streamBuf.BinlogPosition = *blpPos
}
buf := []*BinlogResponse{streamBuf}
err = blsSendStream(sendReply, buf)

Просмотреть файл

@ -66,3 +66,58 @@ func (repl *ReplicationCoordinates) UnmarshalBson(buf *bytes.Buffer) {
kind = bson.NextByte(buf)
}
}
// BinlogPosition keeps track of a server binlog position
type BinlogPosition struct {
Position ReplicationCoordinates
Timestamp int64
Xid uint64
}
func (pos *BinlogPosition) String() string {
return fmt.Sprintf("%v:%v", pos.Position.MasterFilename, pos.Position.MasterPosition)
}
func (pos *BinlogPosition) Valid() bool {
if pos.Position.MasterFilename == "" || pos.Position.MasterPosition == 0 {
return false
}
return true
}
func (pos *BinlogPosition) MarshalBson(buf *bytes2.ChunkedWriter) {
lenWriter := bson.NewLenWriter(buf)
bson.EncodePrefix(buf, bson.Object, "Position")
pos.Position.MarshalBson(buf)
bson.EncodePrefix(buf, bson.Long, "Timestamp")
bson.EncodeUint64(buf, uint64(pos.Timestamp))
bson.EncodePrefix(buf, bson.Ulong, "Xid")
bson.EncodeUint64(buf, pos.Xid)
buf.WriteByte(0)
lenWriter.RecordLen()
}
func (pos *BinlogPosition) UnmarshalBson(buf *bytes.Buffer) {
bson.Next(buf, 4)
kind := bson.NextByte(buf)
for kind != bson.EOO {
key := bson.ReadCString(buf)
switch key {
case "Position":
pos.Position = ReplicationCoordinates{}
pos.Position.UnmarshalBson(buf)
case "Timestamp":
pos.Timestamp = bson.DecodeInt64(buf, kind)
case "Xid":
pos.Xid = bson.DecodeUint64(buf, kind)
default:
panic(bson.NewBsonError("Unrecognized tag %s", key))
}
kind = bson.NextByte(buf)
}
}

Просмотреть файл

@ -39,7 +39,7 @@ type UpdateStream struct {
}
type UpdateStreamRequest struct {
StartPosition BinlogPosition
StartPosition proto.BinlogPosition
}
var UpdateStreamRpcService *UpdateStream
@ -218,7 +218,7 @@ func IsMasterPositionValid(startCoordinates *proto.ReplicationCoordinates) bool
return true
}
func IsStartPositionValid(startPos *BinlogPosition) bool {
func IsStartPositionValid(startPos *proto.BinlogPosition) bool {
startCoord := &startPos.Position
return IsMasterPositionValid(startCoord)
}

Просмотреть файл

@ -18,6 +18,7 @@ import (
estats "github.com/youtube/vitess/go/stats" // stats is a private type defined somewhere else in this package, so it would conflict
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/mysqlctl"
cproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
@ -55,7 +56,7 @@ func (err *InvalidationError) isFatal() bool {
}
type InvalidationProcessor struct {
currentPosition *mysqlctl.BinlogPosition
currentPosition *cproto.BinlogPosition
state sync2.AtomicUint32
states *estats.States
stateLock sync.Mutex
@ -74,7 +75,7 @@ func NewInvalidationProcessor() *InvalidationProcessor {
invalidator.receiveEvent = func(response interface{}) error {
return invalidator.invalidateEvent(response)
}
gob.Register(mysqlctl.BinlogPosition{})
gob.Register(cproto.BinlogPosition{})
invalidator.encBuf = make([]byte, 0, 100)
return invalidator
}
@ -202,7 +203,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
return
}
startPosition := &mysqlctl.BinlogPosition{Position: *replPos}
startPosition := &cproto.BinlogPosition{Position: *replPos}
relog.Info("Starting @ %v", startPosition.String())
req := &mysqlctl.UpdateStreamRequest{StartPosition: *startPosition}