Golint recommendations in BinlogStreamer.

This commit is contained in:
Anthony Yeh 2015-04-30 16:34:07 -07:00
Родитель 76555b2878
Коммит 57290ce3d4
2 изменённых файлов: 24 добавлений и 19 удалений

Просмотреть файл

@ -21,8 +21,13 @@ import (
var (
binlogStreamerErrors = stats.NewCounters("BinlogStreamerErrors")
ClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream")
ServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld")
// ErrClientEOF is returned by BinlogStreamer if the stream ended because the
// consumer of the stream indicated it doesn't want any more events.
ErrClientEOF = fmt.Errorf("binlog stream consumer ended the reply stream")
// ErrServerEOF is returned by BinlogStreamer if the stream ended because the
// connection to the mysqld server was lost, or the stream was terminated by
// mysqld.
ErrServerEOF = fmt.Errorf("binlog stream connection was closed by mysqld")
// statementPrefixes are normal sql statement prefixes.
statementPrefixes = map[string]int{
@ -132,8 +137,8 @@ func (bls *BinlogStreamer) Stream(ctx *sync2.ServiceContext) (err error) {
// at a time, and groups them into transactions. It is called from within the
// service function launched by Stream().
//
// If the sendTransaction func returns io.EOF, parseEvents returns ClientEOF.
// If the events channel is closed, parseEvents returns ServerEOF.
// If the sendTransaction func returns io.EOF, parseEvents returns ErrClientEOF.
// If the events channel is closed, parseEvents returns ErrServerEOF.
func (bls *BinlogStreamer) parseEvents(ctx *sync2.ServiceContext, events <-chan proto.BinlogEvent) (myproto.ReplicationPosition, error) {
var statements []proto.Statement
var format proto.BinlogFormat
@ -162,7 +167,7 @@ func (bls *BinlogStreamer) parseEvents(ctx *sync2.ServiceContext, events <-chan
}
if err = bls.sendTransaction(trans); err != nil {
if err == io.EOF {
return ClientEOF
return ErrClientEOF
}
return fmt.Errorf("send reply error: %v", err)
}
@ -181,7 +186,7 @@ func (bls *BinlogStreamer) parseEvents(ctx *sync2.ServiceContext, events <-chan
if !ok {
// events channel has been closed, which means the connection died.
log.Infof("reached end of binlog event stream")
return pos, ServerEOF
return pos, ErrServerEOF
}
case <-ctx.ShuttingDown:
log.Infof("stopping early due to BinlogStreamer service shutdown")

Просмотреть файл

@ -193,7 +193,7 @@ func TestBinlogStreamerParseEventsXID(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -237,7 +237,7 @@ func TestBinlogStreamerParseEventsCommit(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -285,7 +285,7 @@ func TestBinlogStreamerParseEventsClientEOF(t *testing.T) {
queryEvent{query: proto.Query{Database: "vt_test_keyspace", Sql: []byte("insert into vt_a(eid, id) values (1, 1) /* _stream vt_a (eid id ) (1 1 ); */")}},
xidEvent{},
}
want := ClientEOF
want := ErrClientEOF
events := make(chan proto.BinlogEvent)
@ -310,7 +310,7 @@ func TestBinlogStreamerParseEventsClientEOF(t *testing.T) {
}
func TestBinlogStreamerParseEventsServerEOF(t *testing.T) {
want := ServerEOF
want := ErrServerEOF
events := make(chan proto.BinlogEvent)
close(events)
@ -577,7 +577,7 @@ func TestBinlogStreamerParseEventsRollback(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -626,7 +626,7 @@ func TestBinlogStreamerParseEventsDMLWithoutBegin(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -676,7 +676,7 @@ func TestBinlogStreamerParseEventsBeginWithoutCommit(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -722,7 +722,7 @@ func TestBinlogStreamerParseEventsSetInsertID(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -801,7 +801,7 @@ func TestBinlogStreamerParseEventsOtherDB(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -846,7 +846,7 @@ func TestBinlogStreamerParseEventsOtherDBBegin(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -878,7 +878,7 @@ func TestBinlogStreamerParseEventsBeginAgain(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
after := binlogStreamerErrors.Counts()["ParseEvents"]
@ -922,7 +922,7 @@ func TestBinlogStreamerParseEventsMariadbBeginGTID(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}
@ -965,7 +965,7 @@ func TestBinlogStreamerParseEventsMariadbStandaloneGTID(t *testing.T) {
_, err := bls.parseEvents(ctx, events)
return err
})
if err := svm.Join(); err != ServerEOF {
if err := svm.Join(); err != ErrServerEOF {
t.Errorf("unexpected error: %v", err)
}