зеркало из https://github.com/microsoft/redplex.git
95 строки
2.1 KiB
Go
95 строки
2.1 KiB
Go
package redplex
|
|
|
|
import (
|
|
"net"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/garyburd/redigo/redis"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/stretchr/testify/suite"
|
|
)
|
|
|
|
const redisAddress = "127.0.0.1:6379"
|
|
|
|
type EndToEndServerSuite struct {
|
|
suite.Suite
|
|
server *Server
|
|
redplexConn redis.Conn
|
|
directConn redis.Conn
|
|
}
|
|
|
|
func TestEndToEndServerSuite(t *testing.T) {
|
|
suite.Run(t, new(EndToEndServerSuite))
|
|
}
|
|
|
|
func (e *EndToEndServerSuite) SetupSuite() {
|
|
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
|
require.Nil(e.T(), err)
|
|
|
|
e.server = NewServer(listener, NewPubsub(
|
|
NewDirectDialer("tcp", redisAddress, "", false, 0),
|
|
time.Second*5,
|
|
))
|
|
go e.server.Listen()
|
|
|
|
directConn, err := redis.Dial("tcp", redisAddress)
|
|
require.Nil(e.T(), err)
|
|
e.directConn = directConn
|
|
|
|
redplexConn, err := redis.Dial("tcp", listener.Addr().String())
|
|
require.Nil(e.T(), err)
|
|
e.redplexConn = redplexConn
|
|
}
|
|
|
|
func (e *EndToEndServerSuite) TearDownSuite() {
|
|
e.server.Close()
|
|
e.redplexConn.Close()
|
|
e.directConn.Close()
|
|
}
|
|
|
|
func (e *EndToEndServerSuite) TestSubscribesAndGetsMessages() {
|
|
psc := redis.PubSubConn{Conn: e.redplexConn}
|
|
require.Nil(e.T(), psc.Subscribe("foo"))
|
|
require.Equal(e.T(), redis.Subscription{Kind: "subscribe", Channel: "foo", Count: 1}, psc.Receive())
|
|
require.Nil(e.T(), psc.PSubscribe("ba*"))
|
|
require.Equal(e.T(), redis.Subscription{Kind: "psubscribe", Channel: "ba*", Count: 1}, psc.Receive())
|
|
|
|
e.retryUntilReturns(
|
|
func() {
|
|
_, err := e.directConn.Do("PUBLISH", "foo", "bar")
|
|
require.Nil(e.T(), err)
|
|
},
|
|
func() {
|
|
require.Equal(e.T(), redis.Message{Channel: "foo", Data: []byte("bar")}, psc.Receive())
|
|
},
|
|
)
|
|
|
|
e.retryUntilReturns(
|
|
func() {
|
|
_, err := e.directConn.Do("PUBLISH", "bar", "heyo!")
|
|
require.Nil(e.T(), err)
|
|
},
|
|
func() {
|
|
require.Equal(e.T(), redis.PMessage{Pattern: "ba*", Channel: "bar", Data: []byte("heyo!")}, psc.Receive())
|
|
},
|
|
)
|
|
}
|
|
|
|
func (e *EndToEndServerSuite) retryUntilReturns(retried func(), awaitedFn func()) {
|
|
ok := make(chan struct{})
|
|
go func() {
|
|
for {
|
|
retried()
|
|
select {
|
|
case <-time.After(time.Millisecond * 500):
|
|
case <-ok:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
awaitedFn()
|
|
ok <- struct{}{}
|
|
}
|