Add tests to all the things, fix some minor bugs/races

This commit is contained in:
Connor Peet 2017-10-20 11:00:09 -07:00
Родитель 5f937ecb7e
Коммит ac5796090a
12 изменённых файлов: 637 добавлений и 57 удалений

2
.gitignore поставляемый
Просмотреть файл

@ -30,8 +30,6 @@ _testmain.go
/.idea
/vendor/*/
/node_modules
/redplex
/bench
/x.js
*.profile

41
Jenkinsfile поставляемый Normal file
Просмотреть файл

@ -0,0 +1,41 @@
properties([[$class: 'BuildDiscarderProperty', strategy: [$class: 'LogRotator', artifactNumToKeepStr: '2', numToKeepStr: '2']]])
node {
def projectName = "redplex"
def gopath = pwd() + "/gopath"
def projectDir = "${gopath}/src/github.com/mixer/${projectName}"
env.GOPATH = "${gopath}"
env.PATH = env.PATH + ":${gopath}/bin"
try {
sh "mkdir -p '${projectDir}'"
dir (projectDir) {
stage("Checkout") {
checkout scm
}
stage("Initialize services") {
sh 'sudo /usr/bin/systemctl start redis'
}
stage("Prepare") {
sh 'go get github.com/kardianos/govendor'
sh 'go get github.com/golang/lint/golint'
sh 'govendor sync'
}
stage("Test") {
sh 'make check'
}
stage("Compile") {
sh 'make redplex'
}
stage("artifacts") {
archiveArtifacts artifacts: "${projectName}", fingerprint: true
}
currentBuild.result = "SUCCESS"
}
} catch(e) {
currentBuild.result = "FAILURE"
throw e
}
}

16
Makefile Normal file
Просмотреть файл

@ -0,0 +1,16 @@
GO_SRC = $(wildcard *.go)
redplex: $(GO_SRC)
@printf " → Compiling %s \n" $@
@go build ./cmd/$@
@printf " ✔ Compiled %s \n" $@
lint: $(GO_SRC)
@go vet ./ && printf " ✔ Vet passed \n"
@golint ./ && printf " ✔ Lint passed \n"
check: $(GO_SRC) lint
@go test -v -race ./
@printf " ✔ Tests passed \n"
.PHONY: lint check

Просмотреть файл

@ -18,7 +18,7 @@ import (
var (
benchData = []byte("*3\r\n$7\r\nmessage\r\n$7\r\nchannel\r\n$345\r\n" + `{"channel":314,"id":"fd269030-aacd-11e7-b70f-fd0fddd98285","user_name":"Jeff","user_id":1355,"user_roles":["Pro","User"],"user_level":94,"user_avatar":"https://uploads.beam.pro/avatar/qryjcpn1-1355.jpg","message":{"message":[{"type":"text","data":"Finally. We're back on the computer.","text":"Finally. We're back on the computer."}],"meta":{}}}` + "\r\n")
subscribe = redplex.NewRequest("subscribe", 1).Append([]byte(`channel`)).Bytes()
subscribe = redplex.NewRequest("subscribe", 1).Bulk([]byte(`channel`)).Bytes()
remoteAddress = "127.0.0.1:3100"
redplexAddress = "127.0.0.1:3101"
benchedBytes = 1024 * 1024 * 10

1
io.go
Просмотреть файл

@ -1 +0,0 @@
package redplex

Просмотреть файл

@ -10,11 +10,20 @@ import (
)
const (
MessageError = '-'
// MessageError is the prefix for Redis line errors in the protocol.
MessageError = '-'
// MessageStatus is the prefix for Redis line statues in the protocol.
MessageStatus = '+'
MessageInt = ':'
MessageBulk = '$'
MessageMutli = '*'
// MessageInt is the prefix for Redis line integers in the protocol.
// It's followed by the plain text number
MessageInt = ':'
// MessageBulk is the prefix for Redis bulk messages. It's followed by the
// bulk message size, and CRLF, and then the full bulk message bytes.
MessageBulk = '$'
// MessageMutli is the prefix for Redis "multi" messages (arrays).
// It's followed by the array length, and CRLF, and then the next N messages
// as elements of the array/
MessageMutli = '*'
)
var (
@ -24,9 +33,9 @@ var (
messagePrefix = []byte("*3\r\n$7\r\nmessage\r\n")
// pmessagePrefix is the prefix for pattern pubsub messages on the protocol.
pmessagePrefix = []byte("*4\r\n$8\r\npmessage\r\n")
// WrongMessageErr is returned in Parse commands if the command
// ErrWrongMessage is returned in Parse commands if the command
// is not a pubsub command.
WrongMessageErr = errors.New("redplex/protocol: unexpected message type")
ErrWrongMessage = errors.New("redplex/protocol: unexpected message type")
commandSubscribe = `subscribe`
commandPSubscribe = `psubscribe`
@ -96,7 +105,7 @@ type PublishCommand struct {
// trailing delimiter.
func ParseBulkMessage(line []byte) ([]byte, error) {
if line[0] != MessageBulk {
return nil, WrongMessageErr
return nil, ErrWrongMessage
}
delimiter := bytes.IndexByte(line, '\n')
@ -105,15 +114,19 @@ func ParseBulkMessage(line []byte) ([]byte, error) {
if err != nil {
return nil, err
}
if n <= 0 {
return nil, nil
}
if len(line) <= delimiter+1+int(n) {
return nil, WrongMessageErr
return nil, ErrWrongMessage
}
return line[delimiter+1 : delimiter+1+int(n)], nil
}
// Parses the given pubsub command efficiently. Returns a NotPubsubError if the
// command isn't a pubsub command.
// ParsePublishCommand parses the given pubsub command efficiently. Returns a
// NotPubsubError if the command isn't a pubsub command.
func ParsePublishCommand(b []byte) (cmd PublishCommand, err error) {
switch {
case bytes.HasPrefix(b, messagePrefix):
@ -130,11 +143,11 @@ func ParsePublishCommand(b []byte) (cmd PublishCommand, err error) {
}
return PublishCommand{IsPattern: true, ChannelOrPattern: name}, nil
default:
return cmd, WrongMessageErr
return cmd, ErrWrongMessage
}
}
// SubscribeCommand is a command to subscribe to one or more Redis channels.
// Request is a byte slice with utility methods for building up Redis commands.
type Request []byte
// NewRequest creates a new request to send to the Redis server.
@ -143,11 +156,11 @@ func NewRequest(name string, argCount int) *Request {
b = append(b, []byte(strconv.Itoa(argCount+1))...)
b = append(b, messageDelimiter...)
r := Request(b)
return (&r).Append([]byte(name))
return (&r).Bulk([]byte(name))
}
// Append adds a new argument value to the request.
func (r *Request) Append(arg []byte) *Request {
// Bulk adds a new bulk argument value to the request.
func (r *Request) Bulk(arg []byte) *Request {
data := *r
data = append(data, MessageBulk)
data = append(data, []byte(strconv.Itoa(len(arg)))...)
@ -159,6 +172,17 @@ func (r *Request) Append(arg []byte) *Request {
return r
}
// Int adds a new integer argument value to the request.
func (r *Request) Int(n int) *Request {
data := *r
data = append(data, MessageInt)
data = append(data, []byte(strconv.Itoa(n))...)
data = append(data, messageDelimiter...)
*r = data
return r
}
// Bytes returns the request bytes.
func (r *Request) Bytes() []byte { return *r }
@ -210,5 +234,5 @@ func copyBytes(b []byte) (dup []byte) {
// SubscribeResponse returns an appropriate response to the given subscribe
// or unsubscribe command.
func SubscribeResponse(command string, channel []byte) []byte {
return NewRequest(command, 2).Append(channel).Append([]byte(`:1`)).Bytes()
return NewRequest(command, 2).Bulk(channel).Int(1).Bytes()
}

Просмотреть файл

@ -1 +1,123 @@
package redplex
import (
"bufio"
"bytes"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func TestReadNextFull(t *testing.T) {
tt := [][]string{
{"$7", "he\r\nllo"},
{"$0", ""},
{"$-1"},
{":1"},
{"+OK"},
{"-hello world!"},
{"*0"},
{"*-1"},
{"*2", "$5", "hello", ":2"},
}
for _, tcase := range tt {
output := bytes.NewBuffer(nil)
expected := []byte(strings.Join(tcase, "\r\n") + "\r\n")
reader := bytes.NewReader(expected)
bufferedReader := bufio.NewReader(reader)
require.Nil(t, ReadNextFull(output, bufferedReader), "unexpected error in %+v", tcase)
require.Equal(t, output.Bytes(), expected, "expected parsed %+v to be equal", tcase)
require.Zero(t, reader.Len()+bufferedReader.Buffered(), "should have consumed all of %+v", tcase)
}
}
func TestParseBulkMessage(t *testing.T) {
tt := []struct {
Message string
Expected string
Error error
}{
{"$-1\r\n", "", nil},
{"$0\r\n", "", nil},
{"$5\r\nhello\r\n", "hello", nil},
{"$5\r\nhe", "", ErrWrongMessage},
{":1\r\nasdf\r\n", "", ErrWrongMessage},
}
for _, tcase := range tt {
actual, err := ParseBulkMessage([]byte(tcase.Message))
if tcase.Error != nil {
require.Equal(t, tcase.Error, err, "unexpected error parsing %s", tcase.Message)
} else if len(tcase.Expected) == 0 {
require.Nil(t, actual, "expected empty byte slice parsing %s", tcase.Message)
} else {
require.Equal(t, []byte(tcase.Expected), actual, "unexpected result parsing %s", tcase.Message)
}
}
}
func TestParsePublishCommand(t *testing.T) {
tt := []struct {
Message string
Expected PublishCommand
Error error
}{
{"$-1\r\n", PublishCommand{}, ErrWrongMessage},
{"*3\r\n$7\r\nmessage\r\n$6\r\nsecond\r\n$5\r\nHello\r\n", PublishCommand{false, []byte("second")}, nil},
{"*4\r\n$8\r\npmessage\r\n$6\r\nsecond\r\n$2\r\ns*\r\n$5\r\nHello\r\n", PublishCommand{true, []byte("second")}, nil},
}
for _, tcase := range tt {
actual, err := ParsePublishCommand([]byte(tcase.Message))
if err != nil {
require.Equal(t, tcase.Error, err, "unexpected error parsing %s", tcase.Message)
} else {
require.Equal(t, tcase.Expected, actual, "unexpected result parsing %s", tcase.Message)
}
}
}
func TestNewRequest(t *testing.T) {
require.Equal(t,
NewRequest("PING", 0).Bytes(),
[]byte("*1\r\n$4\r\nPING\r\n"),
)
require.Equal(t,
NewRequest("SUBSCRIBE", 1).Bulk([]byte("channel-name")).Bytes(),
[]byte("*2\r\n$9\r\nSUBSCRIBE\r\n$12\r\nchannel-name\r\n"),
)
}
func TestParseRequest(t *testing.T) {
tt := [][]string{
{"ping"},
{"subscribe", "foo"},
{"subscribe", "foo", "bar"},
}
for _, tcase := range tt {
var args [][]byte
cmd := NewRequest(tcase[0], len(tcase)-1)
for _, arg := range tcase[1:] {
args = append(args, []byte(arg))
cmd.Bulk([]byte(arg))
}
method, actualArgs, err := ParseRequest(bufio.NewReader(bytes.NewReader(cmd.Bytes())))
require.Nil(t, err, "unexpected error parsing %+v", tcase)
require.Equal(t, tcase[0], method)
require.Equal(t, args, actualArgs)
}
}
func TestSubscribeResponse(t *testing.T) {
require.Equal(t,
SubscribeResponse("subscribe", []byte("first")),
[]byte("*3\r\n$9\r\nsubscribe\r\n$5\r\nfirst\r\n:1\r\n"),
)
}

110
pubsub.go
Просмотреть файл

@ -13,15 +13,21 @@ import (
"github.com/sirupsen/logrus"
)
// Writable is an interface passed into Pubsub. It's called when we want to
// publish data.
type Writable interface {
Write(b []byte)
}
// The Listener wraps a function that's called when a pubsub message it sent.
type Listener struct {
IsPattern bool
Channel string
Conn *connection
Conn Writable
}
// listenerMap is a map of patterns or channels to Listeners.
type listenerMap map[string][]*connection
type listenerMap map[string][]Writable
// broadcast pushes the byte slice asynchronously to the list of listeners.
// Blocks until all listeners have been called.
@ -31,13 +37,13 @@ func (l listenerMap) broadcast(pattern []byte, b []byte) {
var wg sync.WaitGroup
wg.Add(len(listeners))
for _, l := range listeners {
go func() { l.write(b); wg.Done() }()
go func(l Writable) { l.Write(b); wg.Done() }(l)
}
wg.Wait()
}
// add inserts the listener into the pattern's set of listeners.
func (l listenerMap) add(channel string, listener *connection) (shouldSubscribe bool) {
func (l listenerMap) add(channel string, listener Writable) (shouldSubscribe bool) {
list := l[channel]
shouldSubscribe = len(list) == 0
l[channel] = append(list, listener)
@ -45,7 +51,7 @@ func (l listenerMap) add(channel string, listener *connection) (shouldSubscribe
}
// remove pulls the listener out of the map.
func (l listenerMap) remove(channel string, listener *connection) (shouldUnsubscribe bool) {
func (l listenerMap) remove(channel string, listener Writable) (shouldUnsubscribe bool) {
list := l[channel]
changed := false
for i, other := range list {
@ -72,7 +78,7 @@ func (l listenerMap) remove(channel string, listener *connection) (shouldUnsubsc
}
// removeAll removes all channels the listener is connected to.
func (l listenerMap) removeAll(conn *connection, p *Pubsub) (toUnsub [][]byte) {
func (l listenerMap) removeAll(conn Writable) (toUnsub [][]byte) {
for channel, list := range l {
for i := 0; i < len(list); i++ {
if list[i] == conn {
@ -112,6 +118,7 @@ func NewPubsub(dialer Dialer, writeTimeout time.Duration) *Pubsub {
writeTimeout: writeTimeout,
patterns: listenerMap{},
channels: listenerMap{},
closer: make(chan struct{}),
}
}
@ -122,34 +129,48 @@ func (p *Pubsub) Start() {
for {
cnx, err := p.dial()
if err == nil {
backoff.Reset()
if err := p.read(cnx); err != nil {
logrus.WithError(err).Info("redplex/pubsub: lost connection to pubsub server")
if err != nil {
logrus.WithError(err).Info("redplex/pubsub: error dialing to pubsub master")
select {
case <-time.After(backoff.NextBackOff()):
continue
case <-p.closer:
return
}
continue
}
logrus.WithError(err).Info("redplex/pubsub: error dialing to pubsub master")
backoff.Reset()
err = p.read(cnx)
select {
case <-time.After(backoff.NextBackOff()):
case <-p.closer:
return
default:
logrus.WithError(err).Info("redplex/pubsub: lost connection to pubsub server")
}
}
}
// Close frees resources associated with the pubsub server.
func (p *Pubsub) Close() {
close(p.closer)
p.mu.Lock()
if p.connection != nil {
p.connection.Close()
}
p.mu.Unlock()
}
// Subscribe adds the listener to the channel.
func (p *Pubsub) Subscribe(listener Listener) {
p.mu.Lock()
if listener.IsPattern {
if p.patterns.add(listener.Channel, listener.Conn) && p.connection != nil {
p.command(NewRequest(commandPSubscribe, 1).Append([]byte(listener.Channel)))
if p.patterns.add(listener.Channel, listener.Conn) {
p.command(NewRequest(commandPSubscribe, 1).Bulk([]byte(listener.Channel)))
}
} else {
if p.channels.add(listener.Channel, listener.Conn) && p.connection != nil {
p.command(NewRequest(commandSubscribe, 1).Append([]byte(listener.Channel)))
if p.channels.add(listener.Channel, listener.Conn) {
p.command(NewRequest(commandSubscribe, 1).Bulk([]byte(listener.Channel)))
}
}
p.mu.Unlock()
@ -159,41 +180,54 @@ func (p *Pubsub) Subscribe(listener Listener) {
func (p *Pubsub) Unsubscribe(listener Listener) {
p.mu.Lock()
if listener.IsPattern {
if p.patterns.remove(listener.Channel, listener.Conn) && p.connection != nil {
p.command(NewRequest(commandPUnsubscribe, 1).Append([]byte(listener.Channel)))
if p.patterns.remove(listener.Channel, listener.Conn) {
p.command(NewRequest(commandPUnsubscribe, 1).Bulk([]byte(listener.Channel)))
}
} else {
if p.channels.remove(listener.Channel, listener.Conn) && p.connection != nil {
p.command(NewRequest(commandUnsubscribe, 1).Append([]byte(listener.Channel)))
if p.channels.remove(listener.Channel, listener.Conn) {
p.command(NewRequest(commandUnsubscribe, 1).Bulk([]byte(listener.Channel)))
}
}
p.mu.Unlock()
}
// UnsubscribeAll removes all channels the writer is subscribed to.
func (p *Pubsub) UnsubscribeAll(c *connection) {
toUnsub := p.patterns.removeAll(c, p)
func (p *Pubsub) UnsubscribeAll(c Writable) {
p.mu.Lock()
var (
toUnsub = p.patterns.removeAll(c)
command []byte
)
if len(toUnsub) > 0 {
r := NewRequest(commandPUnsubscribe, len(toUnsub))
for _, p := range toUnsub {
r.Append(p)
r.Bulk(p)
}
p.command(r)
command = append(command, r.Bytes()...)
}
toUnsub = p.channels.removeAll(c, p)
toUnsub = p.channels.removeAll(c)
if len(toUnsub) > 0 {
r := NewRequest(commandUnsubscribe, len(toUnsub))
for _, p := range toUnsub {
r.Append(p)
r.Bulk(p)
}
p.command(r)
command = append(command, r.Bytes()...)
}
if p.connection != nil && len(command) > 0 {
p.connection.SetWriteDeadline(time.Now().Add(p.writeTimeout))
go p.connection.Write(command)
}
p.mu.Unlock()
}
// command sends the request to the pubsub server.
// command sends the request to the pubsub server asynchonrously.
func (p *Pubsub) command(r *Request) {
if p.connection != nil {
p.connection.SetWriteDeadline(time.Now().Add(p.writeTimeout))
@ -201,6 +235,14 @@ func (p *Pubsub) command(r *Request) {
}
}
// command sends the request to the pubsub server and blocks until it sends.
func (p *Pubsub) commandSync(r *Request) {
if p.connection != nil {
p.connection.SetWriteDeadline(time.Now().Add(p.writeTimeout))
p.connection.Write(r.Bytes())
}
}
func (p *Pubsub) resubscribe(cnx net.Conn) {
p.mu.Lock()
p.connection = cnx
@ -208,17 +250,17 @@ func (p *Pubsub) resubscribe(cnx net.Conn) {
if len(p.channels) > 0 {
cmd := NewRequest(commandSubscribe, len(p.channels))
for channel := range p.channels {
cmd.Append([]byte(channel))
cmd.Bulk([]byte(channel))
}
p.command(cmd)
p.commandSync(cmd)
}
if len(p.patterns) > 0 {
cmd := NewRequest(commandPSubscribe, len(p.patterns))
for pattern := range p.patterns {
cmd.Append([]byte(pattern))
cmd.Bulk([]byte(pattern))
}
p.command(cmd)
p.commandSync(cmd)
}
p.mu.Unlock()
@ -238,11 +280,9 @@ func (p *Pubsub) read(cnx net.Conn) error {
for {
buffer.Reset()
if err := ReadNextFull(buffer, reader); err != nil {
logrus.WithError(err).Debug("redplex/protocol: error reading from remote")
p.mu.Lock()
p.connection = nil
p.mu.Unlock()
return err
}

195
pubsub_test.go Normal file
Просмотреть файл

@ -0,0 +1,195 @@
package redplex
import (
"testing"
"net"
"time"
"sync"
"io"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
type mockWritable struct{ mock.Mock }
func (m *mockWritable) Write(b []byte) { m.Called(b) }
func makeWritableMocks() []*mockWritable {
var mocks []*mockWritable
for i := 0; i < 10; i++ {
mocks = append(mocks, &mockWritable{})
}
return mocks
}
func assertMocks(t *testing.T, mocks []*mockWritable) {
for _, mock := range mocks {
mock.AssertExpectations(t)
}
}
func TestListenersMap(t *testing.T) {
mocks := makeWritableMocks()
l := listenerMap{}
t.Run("AddsSubscribers", func(t *testing.T) {
require.True(t, l.add("foo", mocks[0]))
require.False(t, l.add("foo", mocks[1]))
require.True(t, l.add("bar", mocks[2]))
})
t.Run("Broadcasts", func(t *testing.T) {
mocks[0].On("Write", []byte{1, 2, 3}).Return()
mocks[1].On("Write", []byte{1, 2, 3}).Return()
l.broadcast([]byte("foo"), []byte{1, 2, 3})
assertMocks(t, mocks)
})
t.Run("Unsubscribes", func(t *testing.T) {
require.Equal(t, 2, len(l))
require.True(t, l.remove("bar", mocks[2]))
require.Equal(t, 1, len(l))
require.False(t, l.remove("foo", mocks[0]))
require.True(t, l.remove("foo", mocks[1]))
require.Equal(t, 0, len(l))
})
t.Run("RemoveAll", func(t *testing.T) {
require.True(t, l.add("foo", mocks[0]))
require.False(t, l.add("foo", mocks[1]))
require.True(t, l.add("bar", mocks[0]))
require.Equal(t,
[][]byte{[]byte("bar")},
l.removeAll(mocks[0]),
)
require.Equal(t, 1, len(l))
})
}
type PubsubSuite struct {
suite.Suite
server net.Listener
connections <-chan net.Conn
pubsubWg sync.WaitGroup
pubsub *Pubsub
}
func TestPubsubSuite(t *testing.T) {
suite.Run(t, new(PubsubSuite))
}
func (p *PubsubSuite) SetupSuite() {
server, err := net.Listen("tcp", "127.0.0.1:0")
require.Nil(p.T(), err)
connections := make(chan net.Conn)
p.connections = connections
p.server = server
go func() {
for {
cnx, err := server.Accept()
if err != nil {
return
}
connections <- cnx
}
}()
}
func (p *PubsubSuite) SetupTest() {
p.pubsub = NewPubsub(
func() (net.Conn, error) { return net.Dial("tcp", p.server.Addr().String()) },
time.Second,
)
p.pubsubWg.Add(1)
go func() { p.pubsub.Start(); p.pubsubWg.Done() }()
}
func (p *PubsubSuite) TearDownTest() {
p.pubsub.Close()
p.pubsubWg.Wait()
}
func (p *PubsubSuite) setupSubscription() (cnx net.Conn, mw *mockWritable) {
cnx = <-p.connections
mw = &mockWritable{}
p.pubsub.Subscribe(Listener{IsPattern: false, Channel: "foo", Conn: mw})
assertReads(p.T(), cnx, "*2\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n")
p.pubsub.Subscribe(Listener{IsPattern: true, Channel: "ba*", Conn: mw})
assertReads(p.T(), cnx, "*2\r\n$10\r\npsubscribe\r\n$3\r\nba*\r\n")
return cnx, mw
}
func (p *PubsubSuite) TestMakeSimpleSubscription() {
cnx, mw := p.setupSubscription()
p.assertReceivesMessage(cnx, mw, "*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$5\r\nheyo!\r\n")
p.assertReceivesMessage(cnx, mw, "*4\r\n$8\r\npmessage\r\n$3\r\nba*\r\n$3\r\nbar\r\n$5\r\nheyo!\r\n")
mw.AssertExpectations(p.T())
}
func (p *PubsubSuite) TestUnsubscribesAll() {
cnx, mw := p.setupSubscription()
p.pubsub.UnsubscribeAll(mw)
assertReads(p.T(), cnx, "*2\r\n$12\r\npunsubscribe\r\n$3\r\nba*\r\n")
assertReads(p.T(), cnx, "*2\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n")
}
func (p *PubsubSuite) TestUnsubscribesChannel() {
cnx, mw := p.setupSubscription()
p.pubsub.Unsubscribe(Listener{IsPattern: false, Channel: "foo", Conn: mw})
assertReads(p.T(), cnx, "*2\r\n$11\r\nunsubscribe\r\n$3\r\nfoo\r\n")
p.pubsub.Unsubscribe(Listener{IsPattern: true, Channel: "ba*", Conn: mw})
assertReads(p.T(), cnx, "*2\r\n$12\r\npunsubscribe\r\n$3\r\nba*\r\n")
}
func (p *PubsubSuite) TestResubscribesOnFailure() {
cnx, mw := p.setupSubscription()
cnx.Close()
newCnx := <-p.connections
assertReads(p.T(), newCnx, "*2\r\n$9\r\nsubscribe\r\n$3\r\nfoo\r\n")
assertReads(p.T(), newCnx, "*2\r\n$10\r\npsubscribe\r\n$3\r\nba*\r\n")
p.assertReceivesMessage(newCnx, mw, "*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$5\r\nheyo!\r\n")
}
func (p *PubsubSuite) TestIgnoresExtraneousMessages() {
cnx, mw := p.setupSubscription()
cnx.Write([]byte("+OK\r\n"))
p.assertReceivesMessage(cnx, mw, "*3\r\n$7\r\nmessage\r\n$3\r\nfoo\r\n$5\r\nheyo!\r\n")
}
func (p *PubsubSuite) assertReceivesMessage(cnx net.Conn, mw *mockWritable, message string) {
ok := make(chan struct{})
mw.On("Write", []byte(message)).Run(func(_ mock.Arguments) { ok <- struct{}{} }).Return()
cnx.Write([]byte(message))
select {
case <-ok:
case <-time.After(time.Second):
p.Fail("Expected to read sub message, but didn't")
}
mw.AssertExpectations(p.T())
}
func assertReads(t *testing.T, cnx net.Conn, message string) {
cnx.SetReadDeadline(time.Now().Add(time.Second * 5))
actual := make([]byte, len(message))
_, err := io.ReadFull(cnx, actual)
require.Nil(t, err, "error reading expected message %q", message)
require.Equal(t, message, string(actual))
}
func (p *PubsubSuite) TearDownSuite() {
p.server.Close()
}

Просмотреть файл

@ -17,6 +17,8 @@ const toSendQueueLimit = 128
// the pubsub master.
type Dialer func() (net.Conn, error)
// Server is the redplex server which accepts connections and talks to the
// underlying Pubsub implementation.
type Server struct {
l net.Listener
pubsub *Pubsub
@ -49,6 +51,12 @@ func (s *Server) Listen() error {
}
}
// Close frees resources associated with the server.
func (s *Server) Close() {
s.l.Close()
s.pubsub.Close()
}
type connection struct {
cnx net.Conn
pubsub *Pubsub
@ -101,7 +109,7 @@ func (s *connection) Start() {
logrus.Debug("redplex/server: terminating connection at client's request")
return
default:
s.cnx.Write([]byte(fmt.Sprintf("-ERR unknown command '%s'", method)))
s.cnx.Write([]byte(fmt.Sprintf("-ERR unknown command '%s'\r\n", method)))
continue
}
@ -131,7 +139,8 @@ func (s *connection) loopWrite() {
}
}
func (s *connection) write(b []byte) {
// Write implements Writable.Write.
func (s *connection) Write(b []byte) {
s.toSendCond.L.Lock()
if len(s.toSend) < cap(s.toSend) && !s.isClosed {
s.toSend = append(s.toSend, b)

94
server_test.go Normal file
Просмотреть файл

@ -0,0 +1,94 @@
package redplex
import (
"net"
"testing"
"time"
"github.com/garyburd/redigo/redis"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)
const redisAddress = "127.0.0.1:6379"
type EndToEndServerSuite struct {
suite.Suite
server *Server
redplexConn redis.Conn
directConn redis.Conn
}
func TestEndToEndServerSuite(t *testing.T) {
suite.Run(t, new(EndToEndServerSuite))
}
func (e *EndToEndServerSuite) SetupSuite() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.Nil(e.T(), err)
e.server = NewServer(listener, NewPubsub(
func() (net.Conn, error) { return net.Dial("tcp", redisAddress) },
time.Second*5,
))
go e.server.Listen()
directConn, err := redis.Dial("tcp", redisAddress)
require.Nil(e.T(), err)
e.directConn = directConn
redplexConn, err := redis.Dial("tcp", listener.Addr().String())
require.Nil(e.T(), err)
e.redplexConn = redplexConn
}
func (e *EndToEndServerSuite) TearDownSuite() {
e.server.Close()
e.redplexConn.Close()
e.directConn.Close()
}
func (e *EndToEndServerSuite) TestSubscribesAndGetsMessages() {
psc := redis.PubSubConn{Conn: e.redplexConn}
require.Nil(e.T(), psc.Subscribe("foo"))
require.Equal(e.T(), redis.Subscription{Kind: "subscribe", Channel: "foo", Count: 1}, psc.Receive())
require.Nil(e.T(), psc.PSubscribe("ba*"))
require.Equal(e.T(), redis.Subscription{Kind: "psubscribe", Channel: "ba*", Count: 1}, psc.Receive())
e.retryUntilReturns(
func() {
_, err := e.directConn.Do("PUBLISH", "foo", "bar")
require.Nil(e.T(), err)
},
func() {
require.Equal(e.T(), redis.Message{Channel: "foo", Data: []byte("bar")}, psc.Receive())
},
)
e.retryUntilReturns(
func() {
_, err := e.directConn.Do("PUBLISH", "bar", "heyo!")
require.Nil(e.T(), err)
},
func() {
require.Equal(e.T(), redis.PMessage{Pattern: "ba*", Channel: "bar", Data: []byte("heyo!")}, psc.Receive())
},
)
}
func (e *EndToEndServerSuite) retryUntilReturns(retried func(), awaitedFn func()) {
ok := make(chan struct{})
go func() {
for {
retried()
select {
case <-time.After(time.Millisecond * 500):
case <-ok:
return
}
}
}()
awaitedFn()
ok <- struct{}{}
}

42
vendor/vendor.json поставляемый
Просмотреть файл

@ -32,6 +32,12 @@
"revision": "61ba96c4d1002f22e06acb8e34a7650611125a63",
"revisionTime": "2017-09-19T12:10:20Z"
},
{
"checksumSHA1": "mrz/kicZiUaHxkyfvC/DyQcr8Do=",
"path": "github.com/davecgh/go-spew/spew",
"revision": "ecdeabc65495df2dec95d7c4a4c3e021903035e5",
"revisionTime": "2017-10-02T20:02:53Z"
},
{
"checksumSHA1": "nZSBUg/MfVC82n2cKc713xKhQiQ=",
"path": "github.com/docker/go-redis-server",
@ -50,6 +56,12 @@
"revision": "34a326de1fea52965fa5ad664d3fc7163dd4b0a1",
"revisionTime": "2017-10-09T00:43:22Z"
},
{
"checksumSHA1": "LuFv4/jlrmFNnDb/5SCSEPAM9vU=",
"path": "github.com/pmezard/go-difflib/difflib",
"revision": "792786c7400a136282c1664665ae0a8db921c6c2",
"revisionTime": "2016-01-10T10:55:54Z"
},
{
"checksumSHA1": "zcqzAha7CiVPN+wzm+rjySACre8=",
"path": "github.com/quorzz/redis-protocol",
@ -62,6 +74,36 @@
"revision": "89742aefa4b206dcf400792f3bd35b542998eb3b",
"revisionTime": "2017-08-22T13:27:46Z"
},
{
"checksumSHA1": "K0crHygPTP42i1nLKWphSlvOQJw=",
"path": "github.com/stretchr/objx",
"revision": "1a9d0bb9f541897e62256577b352fdbc1fb4fd94",
"revisionTime": "2015-09-28T12:21:52Z"
},
{
"checksumSHA1": "mGbTYZ8dHVTiPTTJu3ktp+84pPI=",
"path": "github.com/stretchr/testify/assert",
"revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f",
"revisionTime": "2017-08-14T20:04:35Z"
},
{
"checksumSHA1": "hs0IfAV4wNExbAXc0aUU9V2SuFc=",
"path": "github.com/stretchr/testify/mock",
"revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f",
"revisionTime": "2017-08-14T20:04:35Z"
},
{
"checksumSHA1": "7vs6dSc1PPGBKyzb/SCIyeMJPLQ=",
"path": "github.com/stretchr/testify/require",
"revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f",
"revisionTime": "2017-08-14T20:04:35Z"
},
{
"checksumSHA1": "uefllr2OtKBGo/kQSAPbW3w6p0A=",
"path": "github.com/stretchr/testify/suite",
"revision": "2aa2c176b9dab406a6970f6a55f513e8a8c8b18f",
"revisionTime": "2017-08-14T20:04:35Z"
},
{
"checksumSHA1": "nqWNlnMmVpt628zzvyo6Yv2CX5Q=",
"path": "golang.org/x/crypto/ssh/terminal",