This commit is contained in:
Sugu Sougoumarane 2013-12-01 15:53:16 -08:00
Родитель 6cd89f07c3
Коммит 35746ec3fd
12 изменённых файлов: 216 добавлений и 166 удалений

Просмотреть файл

@ -14,10 +14,11 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/mysql"
"github.com/youtube/vitess/go/mysql/proto"
mproto "github.com/youtube/vitess/go/mysql/proto"
"github.com/youtube/vitess/go/rpcplus"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
var (
@ -33,7 +34,7 @@ type VtClient interface {
Commit() error
Rollback() error
Close()
ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error)
ExecuteFetch(query string, maxrows int, wantfields bool) (qr *mproto.QueryResult, err error)
}
// DummyVtClient is a VtClient that writes to a writer instead of executing
@ -67,9 +68,9 @@ func (dc DummyVtClient) Close() {
return
}
func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *proto.QueryResult, err error) {
func (dc DummyVtClient) ExecuteFetch(query string, maxrows int, wantfields bool) (qr *mproto.QueryResult, err error) {
dc.stdout.WriteString(string(query) + ";\n")
return &proto.QueryResult{Fields: nil, RowsAffected: 1, InsertId: 0, Rows: nil}, nil
return &mproto.QueryResult{Fields: nil, RowsAffected: 1, InsertId: 0, Rows: nil}, nil
}
// DBClient is a real VtClient backed by a mysql connection
@ -139,14 +140,14 @@ func (dc *DBClient) Close() {
}
}
func (dc *DBClient) ExecuteFetch(query string, maxrows int, wantfields bool) (*proto.QueryResult, error) {
func (dc *DBClient) ExecuteFetch(query string, maxrows int, wantfields bool) (*mproto.QueryResult, error) {
mqr, err := dc.dbConn.ExecuteFetch(query, maxrows, wantfields)
if err != nil {
log.Errorf("ExecuteFetch failed w/ error %v", err)
dc.handleError(err)
return nil, err
}
qr := proto.QueryResult(*mqr)
qr := mproto.QueryResult(*mqr)
return &qr, nil
}
@ -243,7 +244,7 @@ func ReadStartPosition(dbClient VtClient, uid uint32) (*BlpPosition, error) {
}, nil
}
func (blp *BinlogPlayer) processTransaction(tx *BinlogTransaction) (ok bool, err error) {
func (blp *BinlogPlayer) processTransaction(tx *proto.BinlogTransaction) (ok bool, err error) {
txnStartTime := time.Now()
if err = blp.dbClient.Begin(); err != nil {
return false, fmt.Errorf("failed query BEGIN, err: %s", err)
@ -273,7 +274,7 @@ func (blp *BinlogPlayer) processTransaction(tx *BinlogTransaction) (ok bool, err
return true, nil
}
func (blp *BinlogPlayer) exec(sql string) (*proto.QueryResult, error) {
func (blp *BinlogPlayer) exec(sql string) (*mproto.QueryResult, error) {
queryStartTime := time.Now()
qr, err := blp.dbClient.ExecuteFetch(sql, 0, false)
blp.blplStats.queryCount.Add("QueryCount", 1)
@ -301,8 +302,8 @@ func (blp *BinlogPlayer) ApplyBinlogEvents(interrupted chan struct{}) error {
return fmt.Errorf("error dialing binlog server: %v", err)
}
responseChan := make(chan *BinlogTransaction)
req := &KeyrangeRequest{
responseChan := make(chan *proto.BinlogTransaction)
req := &proto.KeyrangeRequest{
Keyrange: blp.keyRange,
GroupId: blp.blpPos.GroupId,
}

Просмотреть файл

@ -17,17 +17,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/sync2"
)
// Valid statement types in the binlogs.
const (
BL_UNRECOGNIZED = iota
BL_BEGIN
BL_COMMIT
BL_ROLLBACK
BL_DML
BL_DDL
BL_SET
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
var (
@ -52,18 +42,18 @@ var (
// statementPrefixes are normal sql statement prefixes.
statementPrefixes = map[string]int{
"begin": BL_BEGIN,
"commit": BL_COMMIT,
"rollback": BL_ROLLBACK,
"insert": BL_DML,
"update": BL_DML,
"delete": BL_DML,
"create": BL_DDL,
"alter": BL_DDL,
"drop": BL_DDL,
"truncate": BL_DDL,
"rename": BL_DDL,
"set": BL_SET,
"begin": proto.BL_BEGIN,
"commit": proto.BL_COMMIT,
"rollback": proto.BL_ROLLBACK,
"insert": proto.BL_DML,
"update": proto.BL_DML,
"delete": proto.BL_DML,
"create": proto.BL_DDL,
"alter": proto.BL_DDL,
"drop": proto.BL_DDL,
"truncate": proto.BL_DDL,
"rename": proto.BL_DDL,
"set": proto.BL_SET,
}
// Misc vars.
@ -73,17 +63,6 @@ var (
DEFAULT_DELIM = []byte(";")
)
// TODO: Move to proto once finalized
type BinlogTransaction struct {
Statements []Statement
GroupId string
}
type Statement struct {
Category int
Sql []byte
}
type binlogPosition struct {
GroupId, ServerId int64
}
@ -109,8 +88,8 @@ type BinlogStreamer struct {
}
// sendTransactionFunc is used to send binlog events.
// reply is of type BinlogTransaction.
type sendTransactionFunc func(trans *BinlogTransaction) error
// reply is of type proto.BinlogTransaction.
type sendTransactionFunc func(trans *proto.BinlogTransaction) error
// NewBinlogStreamer creates a BinlogStreamer. dbname specifes
// the db to stream events for, and binlogPrefix is as defined
@ -183,7 +162,7 @@ func (bls *BinlogStreamer) run(sendTransaction sendTransactionFunc) (err error)
func (bls *BinlogStreamer) parseEvents(sendTransaction sendTransactionFunc, reader io.Reader) (err error) {
bls.delim = DEFAULT_DELIM
bufReader := bufio.NewReader(reader)
var statements []Statement
var statements []proto.Statement
for {
sql, err := bls.nextStatement(bufReader)
if sql == nil {
@ -191,16 +170,16 @@ func (bls *BinlogStreamer) parseEvents(sendTransaction sendTransactionFunc, read
}
prefix := string(bytes.ToLower(bytes.SplitN(sql, SPACE, 2)[0]))
switch category := statementPrefixes[prefix]; category {
case BL_UNRECOGNIZED:
case proto.BL_UNRECOGNIZED:
return fmt.Errorf("unrecognized: %s", sql)
// We trust that mysqlbinlog doesn't send BL_DMLs withot a BL_BEGIN
case BL_BEGIN, BL_ROLLBACK:
// We trust that mysqlbinlog doesn't send proto.BL_DMLs withot a proto.BL_BEGIN
case proto.BL_BEGIN, proto.BL_ROLLBACK:
statements = nil
case BL_DDL:
statements = append(statements, Statement{Category: category, Sql: sql})
case proto.BL_DDL:
statements = append(statements, proto.Statement{Category: category, Sql: sql})
fallthrough
case BL_COMMIT:
trans := &BinlogTransaction{
case proto.BL_COMMIT:
trans := &proto.BinlogTransaction{
Statements: statements,
GroupId: strconv.Itoa(int(bls.blPos.GroupId)),
}
@ -211,9 +190,9 @@ func (bls *BinlogStreamer) parseEvents(sendTransaction sendTransactionFunc, read
return fmt.Errorf("send reply error: %v", err)
}
statements = nil
// BL_DML & BL_SET
// proto.BL_DML & proto.BL_SET
default:
statements = append(statements, Statement{Category: category, Sql: sql})
statements = append(statements, proto.Statement{Category: category, Sql: sql})
}
}
}

Просмотреть файл

@ -15,6 +15,7 @@ import (
"time"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
func TestPosParse(t *testing.T) {
@ -303,7 +304,7 @@ func TestStream(t *testing.T) {
curTransaction := 0
bls := NewBinlogStreamer("db", "test/vt-0000041983-bin")
err = bls.Stream("vt-0000041983-bin.000001", 0, func(tx *BinlogTransaction) error {
err = bls.Stream("vt-0000041983-bin.000001", 0, func(tx *proto.BinlogTransaction) error {
for i, stmt := range tx.Statements {
if transactions[curTransaction].Statements[i].Sql != string(stmt.Sql) {
t.Errorf("want %s, got %s", transactions[curTransaction].Statements[i].Sql, stmt.Sql)
@ -347,7 +348,7 @@ func TestRotation(t *testing.T) {
defer cleanup(env)
bls := NewBinlogStreamer("db", "test/vt-0000041983-bin")
err := bls.Stream("vt-0000041983-bin.000004", 2682, func(tx *BinlogTransaction) error {
err := bls.Stream("vt-0000041983-bin.000004", 2682, func(tx *proto.BinlogTransaction) error {
bls.Stop()
return nil
})

Просмотреть файл

@ -11,6 +11,7 @@ import (
"strconv"
"time"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/sqlparser"
)
@ -20,26 +21,7 @@ var (
STREAM_COMMENT_START = []byte("/* _stream ")
)
type StreamEvent struct {
// Category can be "DML", "DDL", "ERR" or "POS"
Category string
// DML
TableName string
PkColNames []string
PkValues [][]interface{}
// DDL or ERR
Sql string
// Timestamp is set for DML, DDL or ERR
Timestamp int64
// POS
GroupId string
}
type sendEventFunc func(event *StreamEvent) error
type sendEventFunc func(event *proto.StreamEvent) error
type EventStreamer struct {
bls *BinlogStreamer
@ -68,13 +50,13 @@ func (evs *EventStreamer) Stop() {
evs.bls.Stop()
}
func (evs *EventStreamer) transactionToEvent(trans *BinlogTransaction) error {
func (evs *EventStreamer) transactionToEvent(trans *proto.BinlogTransaction) error {
var err error
var timestamp int64
var insertid int64
for _, stmt := range trans.Statements {
switch stmt.Category {
case BL_SET:
case proto.BL_SET:
if bytes.HasPrefix(stmt.Sql, BINLOG_SET_TIMESTAMP) {
if timestamp, err = strconv.ParseInt(string(stmt.Sql[len(BINLOG_SET_TIMESTAMP):]), 10, 64); err != nil {
return fmt.Errorf("%v: %s", err, stmt.Sql)
@ -86,8 +68,8 @@ func (evs *EventStreamer) transactionToEvent(trans *BinlogTransaction) error {
} else {
return fmt.Errorf("unrecognized: %s", stmt.Sql)
}
case BL_DML:
var dmlEvent *StreamEvent
case proto.BL_DML:
var dmlEvent *proto.StreamEvent
dmlEvent, insertid, err = evs.buildDMLEvent(stmt.Sql, insertid)
if err != nil {
return fmt.Errorf("%v: %s", err, stmt.Sql)
@ -97,15 +79,15 @@ func (evs *EventStreamer) transactionToEvent(trans *BinlogTransaction) error {
return err
}
evs.DmlCount++
case BL_DDL:
ddlEvent := &StreamEvent{Category: "DDL", Sql: string(stmt.Sql), Timestamp: timestamp}
case proto.BL_DDL:
ddlEvent := &proto.StreamEvent{Category: "DDL", Sql: string(stmt.Sql), Timestamp: timestamp}
if err = evs.sendEvent(ddlEvent); err != nil {
return err
}
evs.DdlCount++
}
}
posEvent := &StreamEvent{Category: "POS", GroupId: trans.GroupId}
posEvent := &proto.StreamEvent{Category: "POS", GroupId: trans.GroupId}
if err = evs.sendEvent(posEvent); err != nil {
return err
}
@ -113,11 +95,11 @@ func (evs *EventStreamer) transactionToEvent(trans *BinlogTransaction) error {
return nil
}
func (evs *EventStreamer) buildDMLEvent(sql []byte, insertid int64) (dmlEvent *StreamEvent, newinsertid int64, err error) {
func (evs *EventStreamer) buildDMLEvent(sql []byte, insertid int64) (dmlEvent *proto.StreamEvent, newinsertid int64, err error) {
commentIndex := bytes.LastIndex(sql, STREAM_COMMENT_START)
if commentIndex == -1 {
evs.DmlErrors++
return &StreamEvent{Category: "ERR", Sql: string(sql)}, insertid, nil
return &proto.StreamEvent{Category: "ERR", Sql: string(sql)}, insertid, nil
}
streamComment := string(sql[commentIndex+len(STREAM_COMMENT_START):])
eventTree, err := parseStreamComment(streamComment)
@ -136,7 +118,7 @@ func (evs *EventStreamer) buildDMLEvent(sql []byte, insertid int64) (dmlEvent *S
}
pkColLen := pkColNamesNode.Len()
dmlEvent = new(StreamEvent)
dmlEvent = new(proto.StreamEvent)
dmlEvent.Category = "DML"
dmlEvent.TableName = tableName
dmlEvent.PkColNames = pkColNames

Просмотреть файл

@ -7,6 +7,8 @@ package mysqlctl
import (
"fmt"
"testing"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
type eventErrorCase struct {
@ -17,47 +19,47 @@ type eventErrorCase struct {
var eventErrorCases = []eventErrorCase{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: "abcd",
want: `unrecognized: abcd`,
}, {
Category: BL_SET,
Category: proto.BL_SET,
Sql: "SET TIMESTAMP=abcd",
want: `strconv.ParseInt: parsing "abcd": invalid syntax: SET TIMESTAMP=abcd`,
}, {
Category: BL_SET,
Category: proto.BL_SET,
Sql: "SET INSERT_ID=abcd",
want: `strconv.ParseInt: parsing "abcd": invalid syntax: SET INSERT_ID=abcd`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream 10 (eid id name ) (null 1 'bmFtZQ==' ); */",
want: `expecting table name in stream comment: query /* _stream 10 (eid id name ) (null 1 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e eid id name ) (null 1 'bmFtZQ==' ); */",
want: `expecting '(': query /* _stream vtocc_e eid id name ) (null 1 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (10 id name ) (null 1 'bmFtZQ==' ); */",
want: `expecting column name: 10: query /* _stream vtocc_e (10 id name ) (null 1 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name (null 1 'bmFtZQ==' ); */",
want: `unexpected token: '(': query /* _stream vtocc_e (eid id name (null 1 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name) (null 'aaa' 'bmFtZQ==' ); */",
want: `illegal base64 data at input byte 0: query /* _stream vtocc_e (eid id name) (null 'aaa' 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name) (null 'bmFtZQ==' ); */",
want: `length mismatch in values: query /* _stream vtocc_e (eid id name) (null 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name) (null 1.1 'bmFtZQ==' ); */",
want: `strconv.ParseUint: parsing "1.1": invalid syntax: query /* _stream vtocc_e (eid id name) (null 1.1 'bmFtZQ==' ); */`,
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: "query /* _stream vtocc_e (eid id name) (null a 'bmFtZQ==' ); */",
want: `unexpected token: 'a': query /* _stream vtocc_e (eid id name) (null a 'bmFtZQ==' ); */`,
},
@ -65,13 +67,13 @@ var eventErrorCases = []eventErrorCase{
func TestEventErrors(t *testing.T) {
evs := &EventStreamer{
sendEvent: func(event *StreamEvent) error {
sendEvent: func(event *proto.StreamEvent) error {
return nil
},
}
for _, ecase := range eventErrorCases {
trans := &BinlogTransaction{
Statements: []Statement{
trans := &proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: ecase.Category,
Sql: []byte(ecase.Sql),
@ -86,26 +88,26 @@ func TestEventErrors(t *testing.T) {
}
func TestDMLEvent(t *testing.T) {
trans := &BinlogTransaction{
Statements: []Statement{
trans := &proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("SET TIMESTAMP=1"),
}, {
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("SET INSERT_ID=10"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("query /* _stream vtocc_e (eid id name) (null -1 'bmFtZQ==' ) (null 18446744073709551615 'bmFtZQ==' ); */"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("query"),
},
},
GroupId: "20",
}
evs := &EventStreamer{
sendEvent: func(event *StreamEvent) error {
sendEvent: func(event *proto.StreamEvent) error {
switch event.Category {
case "DML":
want := `&{DML vtocc_e [eid id name] [[10 -1 name] [11 18446744073709551615 name]] 1 }`
@ -144,20 +146,20 @@ func TestDMLEvent(t *testing.T) {
}
func TestDDLEvent(t *testing.T) {
trans := &BinlogTransaction{
Statements: []Statement{
trans := &proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("SET TIMESTAMP=1"),
}, {
Category: BL_DDL,
Category: proto.BL_DDL,
Sql: []byte("DDL"),
},
},
GroupId: "20",
}
evs := &EventStreamer{
sendEvent: func(event *StreamEvent) error {
sendEvent: func(event *proto.StreamEvent) error {
switch event.Category {
case "DDL":
want := `&{DDL [] [] DDL 1 }`

Просмотреть файл

@ -10,6 +10,7 @@ import (
log "github.com/golang/glog"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
var KEYSPACE_ID_COMMENT = []byte("/* EMD keyspace_id:")
@ -20,17 +21,17 @@ var SPACE = []byte(" ")
// passed into the BinlogStreamer: bls.Stream(file, pos, sendTransaction) ->
// bls.Stream(file, pos, KeyrangeFilterFunc(sendTransaction))
func KeyrangeFilterFunc(keyrange key.KeyRange, sendReply sendTransactionFunc) sendTransactionFunc {
return func(reply *BinlogTransaction) error {
return func(reply *proto.BinlogTransaction) error {
matched := false
filtered := make([]Statement, 0, len(reply.Statements))
filtered := make([]proto.Statement, 0, len(reply.Statements))
for _, statement := range reply.Statements {
switch statement.Category {
case BL_SET:
case proto.BL_SET:
filtered = append(filtered, statement)
case BL_DDL:
case proto.BL_DDL:
filtered = append(filtered, statement)
matched = true
case BL_DML:
case proto.BL_DML:
keyspaceIndex := bytes.LastIndex(statement.Sql, KEYSPACE_ID_COMMENT)
if keyspaceIndex == -1 {
// TODO(sougou): increment error counter

Просмотреть файл

@ -9,6 +9,7 @@ import (
"testing"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
var testKeyrange = key.KeyRange{
@ -17,23 +18,23 @@ var testKeyrange = key.KeyRange{
}
func TestKeyrangeFilterPass(t *testing.T) {
input := BinlogTransaction{
Statements: []Statement{
input := proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("set1"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("dml1 /* EMD keyspace_id:20 */"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("dml2 /* EMD keyspace_id:2 */"),
},
},
GroupId: "1",
}
var got string
f := KeyrangeFilterFunc(testKeyrange, func(reply *BinlogTransaction) error {
f := KeyrangeFilterFunc(testKeyrange, func(reply *proto.BinlogTransaction) error {
got = bltToString(reply)
return nil
})
@ -45,20 +46,20 @@ func TestKeyrangeFilterPass(t *testing.T) {
}
func TestKeyrangeFilterSkip(t *testing.T) {
input := BinlogTransaction{
Statements: []Statement{
input := proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("set1"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("dml1 /* EMD keyspace_id:20 */"),
},
},
GroupId: "1",
}
var got string
f := KeyrangeFilterFunc(testKeyrange, func(reply *BinlogTransaction) error {
f := KeyrangeFilterFunc(testKeyrange, func(reply *proto.BinlogTransaction) error {
got = bltToString(reply)
return nil
})
@ -70,20 +71,20 @@ func TestKeyrangeFilterSkip(t *testing.T) {
}
func TestKeyrangeFilterDDL(t *testing.T) {
input := BinlogTransaction{
Statements: []Statement{
input := proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("set1"),
}, {
Category: BL_DDL,
Category: proto.BL_DDL,
Sql: []byte("ddl"),
},
},
GroupId: "1",
}
var got string
f := KeyrangeFilterFunc(testKeyrange, func(reply *BinlogTransaction) error {
f := KeyrangeFilterFunc(testKeyrange, func(reply *proto.BinlogTransaction) error {
got = bltToString(reply)
return nil
})
@ -95,26 +96,26 @@ func TestKeyrangeFilterDDL(t *testing.T) {
}
func TestKeyrangeFilterMalformed(t *testing.T) {
input := BinlogTransaction{
Statements: []Statement{
input := proto.BinlogTransaction{
Statements: []proto.Statement{
{
Category: BL_SET,
Category: proto.BL_SET,
Sql: []byte("set1"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("ddl"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("dml1 /* EMD keyspace_id:20*/"),
}, {
Category: BL_DML,
Category: proto.BL_DML,
Sql: []byte("dml1 /* EMD keyspace_id:2a */"),
},
},
GroupId: "1",
}
var got string
f := KeyrangeFilterFunc(testKeyrange, func(reply *BinlogTransaction) error {
f := KeyrangeFilterFunc(testKeyrange, func(reply *proto.BinlogTransaction) error {
got = bltToString(reply)
return nil
})
@ -125,7 +126,7 @@ func TestKeyrangeFilterMalformed(t *testing.T) {
}
}
func bltToString(tx *BinlogTransaction) string {
func bltToString(tx *proto.BinlogTransaction) string {
result := ""
for _, statement := range tx.Statements {
result += fmt.Sprintf("statement: <%d, \"%s\"> ", statement.Category, statement.Sql)

Просмотреть файл

@ -0,0 +1,31 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
import ()
// Valid statement types in the binlogs.
const (
BL_UNRECOGNIZED = iota
BL_BEGIN
BL_COMMIT
BL_ROLLBACK
BL_DML
BL_DDL
BL_SET
)
// BinlogTransaction represents one transaction as read from
// the binlog.
type BinlogTransaction struct {
Statements []Statement
GroupId string
}
// Statement represents one statement as read from the binlog.
type Statement struct {
Category int
Sql []byte
}

Просмотреть файл

@ -0,0 +1,27 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
import ()
// StreamEvent represents one event for the update stream.
type StreamEvent struct {
// Category can be "DML", "DDL", "ERR" or "POS"
Category string
// DML
TableName string
PkColNames []string
PkValues [][]interface{}
// DDL or ERR
Sql string
// Timestamp is set for DML, DDL or ERR
Timestamp int64
// POS
GroupId string
}

Просмотреть файл

@ -0,0 +1,33 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package proto
import (
"github.com/youtube/vitess/go/rpcwrap"
"github.com/youtube/vitess/go/vt/key"
)
// UpdateStreamRequest is used to make a request for ServeUpdateStream.
type UpdateStreamRequest struct {
GroupId string
}
// KeyrangeRequest is used to make a request for StreamKeyrange.
type KeyrangeRequest struct {
GroupId string
Keyrange key.KeyRange
}
// UpdateStream defines the rpc API for the update stream service.
type UpdateStream interface {
ServeUpdateStream(req *UpdateStreamRequest, sendReply func(reply interface{}) error) (err error)
StreamKeyrange(req *KeyrangeRequest, sendReply func(reply interface{}) error) (err error)
}
// RegisterAuthenticated regiesters a varaiable that satisfies the UpdateStream interface
// as an rpc service that requires authentication.
func RegisterAuthenticated(service UpdateStream) {
rpcwrap.RegisterAuthenticated(service)
}

Просмотреть файл

@ -1,6 +1,7 @@
// Copyright 2012, Google Inc. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package mysqlctl
import (
@ -8,11 +9,10 @@ import (
"sync"
log "github.com/golang/glog"
"github.com/youtube/vitess/go/rpcwrap"
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/dbconfigs"
"github.com/youtube/vitess/go/vt/key"
"github.com/youtube/vitess/go/vt/mysqlctl/proto"
)
/* API and config for UpdateStream Service */
@ -38,11 +38,6 @@ type UpdateStream struct {
streams streamList
}
type KeyrangeRequest struct {
GroupId string
Keyrange key.KeyRange
}
type streamer interface {
Stop()
}
@ -89,7 +84,7 @@ func RegisterUpdateStreamService(mycnf *Mycnf) {
stats.Publish("UpdateStreamState", stats.StringFunc(func() string {
return usStateNames[UpdateStreamRpcService.state.Get()]
}))
rpcwrap.RegisterAuthenticated(UpdateStreamRpcService)
proto.RegisterAuthenticated(UpdateStreamRpcService)
}
func logError() {
@ -108,7 +103,7 @@ func DisableUpdateStreamService() {
UpdateStreamRpcService.disable()
}
func ServeUpdateStream(req *UpdateStreamRequest, sendReply func(reply interface{}) error) error {
func ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply interface{}) error) error {
return UpdateStreamRpcService.ServeUpdateStream(req, sendReply)
}
@ -166,11 +161,7 @@ func (updateStream *UpdateStream) isEnabled() bool {
return updateStream.state.Get() == ENABLED
}
type UpdateStreamRequest struct {
GroupId string
}
func (updateStream *UpdateStream) ServeUpdateStream(req *UpdateStreamRequest, sendReply func(reply interface{}) error) (err error) {
func (updateStream *UpdateStream) ServeUpdateStream(req *proto.UpdateStreamRequest, sendReply func(reply interface{}) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
@ -197,13 +188,13 @@ func (updateStream *UpdateStream) ServeUpdateStream(req *UpdateStreamRequest, se
updateStream.streams.Add(evs)
defer updateStream.streams.Delete(evs)
// Calls cascade like this: BinlogStreamer->func(*StreamEvent)->sendReply
return evs.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), func(reply *StreamEvent) error {
// Calls cascade like this: BinlogStreamer->func(*proto.StreamEvent)->sendReply
return evs.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), func(reply *proto.StreamEvent) error {
return sendReply(reply)
})
}
func (updateStream *UpdateStream) StreamKeyrange(req *KeyrangeRequest, sendReply func(reply interface{}) error) (err error) {
func (updateStream *UpdateStream) StreamKeyrange(req *proto.KeyrangeRequest, sendReply func(reply interface{}) error) (err error) {
defer func() {
if x := recover(); x != nil {
err = x.(error)
@ -230,8 +221,8 @@ func (updateStream *UpdateStream) StreamKeyrange(req *KeyrangeRequest, sendReply
updateStream.streams.Add(bls)
defer updateStream.streams.Delete(bls)
// Calls cascade like this: BinlogStreamer->KeyrangeFilterFunc->func(*BinlogTransaction)->sendReply
f := KeyrangeFilterFunc(req.Keyrange, func(reply *BinlogTransaction) error {
// Calls cascade like this: BinlogStreamer->KeyrangeFilterFunc->func(*proto.BinlogTransaction)->sendReply
f := KeyrangeFilterFunc(req.Keyrange, func(reply *proto.BinlogTransaction) error {
return sendReply(reply)
})
return bls.Stream(rp.MasterLogFile, int64(rp.MasterLogPosition), f)

Просмотреть файл

@ -13,6 +13,7 @@ import (
"github.com/youtube/vitess/go/stats"
"github.com/youtube/vitess/go/sync2"
"github.com/youtube/vitess/go/vt/mysqlctl"
myproto "github.com/youtube/vitess/go/vt/mysqlctl/proto"
"github.com/youtube/vitess/go/vt/tabletserver/proto"
)
@ -82,9 +83,9 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
}
log.Infof("Starting rowcache invalidator")
req := &mysqlctl.UpdateStreamRequest{GroupId: groupId}
req := &myproto.UpdateStreamRequest{GroupId: groupId}
err = mysqlctl.ServeUpdateStream(req, func(reply interface{}) error {
return rowCache.processEvent(reply.(*mysqlctl.StreamEvent))
return rowCache.processEvent(reply.(*myproto.StreamEvent))
})
if err != nil {
log.Errorf("mysqlctl.ServeUpdateStream returned err '%v'", err.Error())
@ -92,7 +93,7 @@ func (rowCache *InvalidationProcessor) runInvalidationLoop() {
log.Infof("Rowcache invalidator stopped")
}
func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.StreamEvent) error {
func (rowCache *InvalidationProcessor) processEvent(event *myproto.StreamEvent) error {
if rowCache.state.Get() != RCINV_ENABLED {
return io.EOF
}
@ -112,7 +113,7 @@ func (rowCache *InvalidationProcessor) processEvent(event *mysqlctl.StreamEvent)
return nil
}
func (rowCache *InvalidationProcessor) handleDmlEvent(event *mysqlctl.StreamEvent) {
func (rowCache *InvalidationProcessor) handleDmlEvent(event *myproto.StreamEvent) {
dml := new(proto.DmlType)
dml.Table = event.TableName
dml.Keys = make([]string, 0, len(event.PkValues))