Consolidated network writes to connWriter. Started removing uses of bufPool.

This commit is contained in:
Kale Blankenship 2017-05-03 23:12:18 -07:00
Родитель 758bf6c37d
Коммит 8b03c127af
5 изменённых файлов: 173 добавлений и 143 удалений

Просмотреть файл

@ -60,46 +60,9 @@ func Dial(addr string, opts ...ConnOption) (*Client, error) {
// New establishes an AMQP client connection on a pre-established
// net.Conn.
func New(netConn net.Conn, opts ...ConnOption) (*Client, error) {
c := &conn{
net: netConn,
maxFrameSize: defaultMaxFrameSize,
peerMaxFrameSize: defaultMaxFrameSize,
channelMax: defaultChannelMax,
idleTimeout: defaultIdleTimeout,
done: make(chan struct{}),
readErr: make(chan error, 1), // buffered to ensure connReader doesn't leak
rxProto: make(chan protoHeader),
rxFrame: make(chan frame),
newSession: make(chan *Session),
delSession: make(chan *Session),
}
// apply options
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
// start connReader
go c.connReader()
// run connection establishment state machine
for state := c.negotiateProto; state != nil; {
state = state()
}
// check if err occurred
if c.err != nil {
c.close()
return nil, c.err
}
// start multiplexor
go c.mux()
return &Client{conn: c}, nil
func New(conn net.Conn, opts ...ConnOption) (*Client, error) {
c, err := newConn(conn, opts...)
return &Client{conn: c}, err
}
// Close disconnects the connection.
@ -118,14 +81,10 @@ func (c *Client) NewSession() (*Session, error) {
}
// send Begin to server
err := s.txFrame(&performBegin{
s.txFrame(&performBegin{
NextOutgoingID: 0,
IncomingWindow: 1,
})
if err != nil {
s.Close()
return nil, err
}
// wait for response
var fr frame
@ -184,8 +143,8 @@ func (s *Session) Close() error {
}
}
func (s *Session) txFrame(p frameBody) error {
return s.conn.txFrame(frame{
func (s *Session) txFrame(p frameBody) {
s.conn.wantWriteFrame(frame{
typ: frameTypeAMQP,
channel: s.channel,
body: p,
@ -251,10 +210,7 @@ func (s *Session) NewReceiver(opts ...LinkOption) (*Receiver, error) {
l.senderDeliveryCount = resp.InitialDeliveryCount
return &Receiver{
link: l,
buf: bufPool.New().(*bytes.Buffer),
}, nil
return &Receiver{link: l}, nil
}
func (s *Session) mux() {
@ -389,16 +345,15 @@ func LinkCredit(credit uint32) LinkOption { // TODO: make receiver specific?
// Receiver receives messages on a single AMQP link.
type Receiver struct {
link *link
buf *bytes.Buffer
buf bytes.Buffer
}
// sendFlow transmits a flow frame with enough credits to bring the sender's
// link credits up to l.link.linkCredit.
func (r *Receiver) sendFlow() error {
func (r *Receiver) sendFlow() {
newLinkCredit := r.link.linkCredit - (r.link.linkCredit - r.link.creditUsed)
r.link.senderDeliveryCount += r.link.creditUsed
err := r.link.session.txFrame(&performFlow{
r.link.session.txFrame(&performFlow{
IncomingWindow: 2147483647,
NextOutgoingID: 0,
OutgoingWindow: 0,
@ -407,7 +362,6 @@ func (r *Receiver) sendFlow() error {
LinkCredit: &newLinkCredit,
})
r.link.creditUsed = 0
return err
}
// Receive returns the next message from the sender.
@ -422,10 +376,7 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
outer:
for {
if r.link.creditUsed > r.link.linkCredit/2 {
err := r.sendFlow()
if err != nil {
return nil, err
}
r.sendFlow()
}
var fr frameBody
@ -461,14 +412,12 @@ outer:
}
}
_, err := unmarshal(r.buf, msg)
_, err := unmarshal(&r.buf, msg)
return msg, err
}
// Close closes the Receiver and AMQP link.
func (r *Receiver) Close() error {
r.link.close()
bufPool.Put(r.buf)
r.buf = nil
return r.link.err
}

215
conn.go
Просмотреть файл

@ -140,11 +140,62 @@ type conn struct {
done chan struct{} // indicates the connection is done
// mux
readErr chan error // connReader notifications of an error
rxProto chan protoHeader // protoHeaders received by connReader
rxFrame chan frame // AMQP frames received by connReader
newSession chan *Session // new Sessions are requested from mux by reading off this channel
delSession chan *Session // session completion is indicated to mux by sending the Session on this channel
newSession chan *Session // new Sessions are requested from mux by reading off this channel
delSession chan *Session // session completion is indicated to mux by sending the Session on this channel
connErr chan error // connReader/Writer notifications of an error
// connReader
rxProto chan protoHeader // protoHeaders received by connReader
rxFrame chan frame // AMQP frames received by connReader
// connWriter
txProto chan protoID // protoHeaders to be sent by connWriter
txFrame chan frame // AMQP frames to be sent by connWriter
}
func newConn(netConn net.Conn, opts ...ConnOption) (*conn, error) {
c := &conn{
net: netConn,
maxFrameSize: defaultMaxFrameSize,
peerMaxFrameSize: defaultMaxFrameSize,
channelMax: defaultChannelMax,
idleTimeout: defaultIdleTimeout,
done: make(chan struct{}),
connErr: make(chan error, 2), // buffered to ensure connReader/Writer won't leak
rxProto: make(chan protoHeader),
rxFrame: make(chan frame),
newSession: make(chan *Session),
delSession: make(chan *Session),
txProto: make(chan protoID),
txFrame: make(chan frame),
}
// apply options
for _, opt := range opts {
if err := opt(c); err != nil {
return nil, err
}
}
// start reader/writer
go c.connReader()
go c.connWriter()
// run connection establishment state machine
for state := c.negotiateProto; state != nil; {
state = state()
}
// check if err occurred
if c.err != nil {
c.close()
return nil, c.err
}
// start multiplexor
go c.mux()
return c, c.err
}
func (c *conn) close() error {
@ -167,9 +218,6 @@ func (c *conn) closeDone() {
c.doneOnce.Do(func() { close(c.done) })
}
// keepaliveFrame is an AMQP frame with no body, used for keepalives
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
// mux is start in it's own goroutine after initial connection establishment.
// It handles muxing of sessions, keepalives, and connection errors.
func (c *conn) mux() {
@ -179,17 +227,6 @@ func (c *conn) mux() {
// map channel to sessions
sessions := make(map[uint16]*Session)
// if conn.peerIdleTimeout is 0, keepalive will be nil and
// no keepalives will be sent
var keepalive <-chan time.Time
// per spec, keepalives should be sent every 0.5 * idle timeout
if kaInterval := c.peerIdleTimeout / 2; kaInterval > 0 {
ticker := time.NewTicker(kaInterval)
defer ticker.Stop()
keepalive = ticker.C
}
// we hold the errMu lock until error or done
c.errMu.Lock()
defer c.errMu.Unlock()
@ -203,7 +240,7 @@ func (c *conn) mux() {
select {
// error from connReader
case c.err = <-c.readErr:
case c.err = <-c.connErr:
// new frame from connReader
case fr := <-c.rxFrame:
@ -232,11 +269,6 @@ func (c *conn) mux() {
case s := <-c.delSession:
delete(sessions, s.channel)
// keepalive timer
case <-keepalive:
// TODO: reset timer on non-keepalive transmit
_, c.err = c.net.Write(keepaliveFrame)
// connection is complete
case <-c.done:
return
@ -264,9 +296,7 @@ func (f *frameReader) Read(p []byte) (int, error) {
// connReader reads from the net.Conn, decodes frames, and passes them
// up via the conn.rxFrame and conn.rxProto channels.
func (c *conn) connReader() {
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
buf := new(bytes.Buffer)
var (
negotiating = true // true during conn establishment, we should check for protoHeaders
@ -295,7 +325,7 @@ func (c *conn) connReader() {
continue
}
c.readErr <- err
c.connErr <- err
return
}
}
@ -309,7 +339,7 @@ func (c *conn) connReader() {
if negotiating && bytes.Equal(buf.Bytes()[:4], []byte{'A', 'M', 'Q', 'P'}) {
p, err := parseProtoHeader(buf)
if err != nil {
c.readErr <- err
c.connErr <- err
return
}
@ -335,7 +365,7 @@ func (c *conn) connReader() {
var err error
currentHeader, err = parseFrameHeader(buf)
if err != nil {
c.readErr <- err
c.connErr <- err
return
}
frameInProgress = true
@ -343,7 +373,7 @@ func (c *conn) connReader() {
// check size is reasonable
if currentHeader.Size > math.MaxInt32 { // make max size configurable
c.readErr <- errorNew("payload too large")
c.connErr <- errorNew("payload too large")
return
}
@ -364,7 +394,7 @@ func (c *conn) connReader() {
payload := bytes.NewBuffer(buf.Next(bodySize))
parsedBody, err := parseFrameBody(payload)
if err != nil {
c.readErr <- err
c.connErr <- err
return
}
@ -377,6 +407,78 @@ func (c *conn) connReader() {
}
}
func (c *conn) connWriter() {
// if conn.peerIdleTimeout is 0, keepalive will be nil and
// no keepalives will be sent
var keepalive <-chan time.Time
// per spec, keepalives should be sent every 0.5 * idle timeout
if kaInterval := c.peerIdleTimeout / 2; kaInterval > 0 {
ticker := time.NewTicker(kaInterval)
defer ticker.Stop()
keepalive = ticker.C
}
buf := new(bytes.Buffer)
var err error
for {
if err != nil {
c.connErr <- err
return
}
select {
// frame write request
case fr := <-c.txFrame:
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
buf.Reset()
err = writeFrame(buf, fr)
if err != nil {
continue
}
if uint64(buf.Len()) > uint64(c.peerMaxFrameSize) {
err = errorErrorf("frame larger than peer ")
continue
}
_, err = c.net.Write(buf.Bytes())
case pID := <-c.txProto:
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
_, err = c.net.Write([]byte{'A', 'M', 'Q', 'P', byte(pID), 1, 0, 0})
// keepalive timer
case <-keepalive:
// TODO: reset timer on non-keepalive transmit
_, err = c.net.Write(keepaliveFrame)
case <-c.done:
}
}
}
// keepaliveFrame is an AMQP frame with no body, used for keepalives
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
func (c *conn) wantWriteFrame(fr frame) {
select {
case c.txFrame <- fr:
case <-c.done:
}
}
func (c *conn) wantWriteProtoHeader(p protoID) {
select {
case c.txProto <- p:
case <-c.done:
}
}
// stateFunc is a state in a state machine.
//
// The state is advanced by returning the next state.
@ -396,25 +498,20 @@ func (c *conn) negotiateProto() stateFunc {
}
}
type protoID uint8
// protocol IDs received in protoHeaders
const (
protoAMQP = 0x0
protoTLS = 0x2
protoSASL = 0x3
protoAMQP protoID = 0x0
protoTLS protoID = 0x2
protoSASL protoID = 0x3
)
// exchangeProtoHeader performs the round trip exchange of protocol
// headers, validation, and returns the protoID specific next state.
func (c *conn) exchangeProtoHeader(protoID uint8) stateFunc {
func (c *conn) exchangeProtoHeader(pID protoID) stateFunc {
// write the proto header
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
_, c.err = c.net.Write([]byte{'A', 'M', 'Q', 'P', protoID, 1, 0, 0})
if c.err != nil {
c.err = errorWrapf(c.err, "writing to network")
return nil
}
c.wantWriteProtoHeader(pID)
// read response header
var deadline <-chan time.Time
@ -424,7 +521,7 @@ func (c *conn) exchangeProtoHeader(protoID uint8) stateFunc {
var p protoHeader
select {
case p = <-c.rxProto:
case c.err = <-c.readErr:
case c.err = <-c.connErr:
return nil
case fr := <-c.rxFrame:
c.err = errorErrorf("unexpected frame %#v", fr)
@ -434,13 +531,13 @@ func (c *conn) exchangeProtoHeader(protoID uint8) stateFunc {
return nil
}
if protoID != p.ProtoID {
c.err = errorErrorf("unexpected protocol header %#00x, expected %#00x", p.ProtoID, protoID)
if pID != p.ProtoID {
c.err = errorErrorf("unexpected protocol header %#00x, expected %#00x", p.ProtoID, pID)
return nil
}
// go to the proto specific state
switch protoID {
switch pID {
case protoAMQP:
return c.openAMQP
case protoTLS:
@ -491,20 +588,10 @@ func (c *conn) startTLS() stateFunc {
return c.negotiateProto
}
// txFrame encodes and transmits a frame on the connection
func (c *conn) txFrame(fr frame) error {
// BUG: This should respect c.peerMaxFrameSize. Should not affect current functionality;
// only transfer frames should be larger than min-max frame size (512).
if c.connectTimeout != 0 {
c.net.SetWriteDeadline(time.Now().Add(c.connectTimeout))
}
return writeFrame(c.net, fr) // TODO: buffer?
}
// openAMQP round trips the AMQP open performative
func (c *conn) openAMQP() stateFunc {
// send open frame
c.err = c.txFrame(frame{
c.wantWriteFrame(frame{
typ: frameTypeAMQP,
body: &performOpen{
ContainerID: randString(),
@ -515,9 +602,6 @@ func (c *conn) openAMQP() stateFunc {
},
channel: 0,
})
if c.err != nil {
return nil
}
// get the response
fr, err := c.readFrame()
@ -610,16 +694,15 @@ func (c *conn) readFrame() (frame, error) {
if c.connectTimeout != 0 {
deadline = time.After(c.connectTimeout)
}
var fr frame
select {
case fr = <-c.rxFrame:
return fr, nil
case err := <-c.readErr:
case err := <-c.connErr:
return fr, err
case p := <-c.rxProto:
return fr, errorErrorf("unexpected protocol header %#v", p)
// fail if we don't get a response after 1 second
case <-deadline:
return fr, ErrTimeout // TODO: move to connReader
}

Просмотреть файл

@ -27,8 +27,8 @@ var bufPool = sync.Pool{
},
}
// writeFrame encodes and writes fr to wr.
func writeFrame(wr io.Writer, fr frame) error {
// writesFrame encodes fr into buf.
func writeFrame(buf *bytes.Buffer, fr frame) error {
header := frameHeader{
Size: 0, // overwrite later
DataOffset: 2, // see frameHeader.DataOffset comment
@ -36,11 +36,6 @@ func writeFrame(wr io.Writer, fr frame) error {
Channel: fr.channel,
}
// get a buffer
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
buf.Reset()
// write header
err := binary.Write(buf, binary.BigEndian, header)
if err != nil {
@ -63,10 +58,7 @@ func writeFrame(wr io.Writer, fr frame) error {
// write correct size
binary.BigEndian.PutUint32(bufBytes, uint32(len(bufBytes)))
// frame needs be written to network as a single chunk
_, err = wr.Write(bufBytes) // TODO: have a connWriter, like connReader
return err
return nil
}
type marshaler interface {
@ -150,6 +142,12 @@ type marshalField struct {
// omit set to true will be encoded as null or omitted altogether if there are
// no non-null fields after them.
func marshalComposite(wr writer, code amqpType, fields ...marshalField) error {
if len(fields) == 0 {
// write header only
_, err := wr.Write([]byte{0x0, byte(typeCodeSmallUlong), byte(code), byte(typeCodeList0)})
return err
}
var (
rawFields = make([][]byte, len(fields)) // sized to the total number of fields
@ -207,7 +205,7 @@ func writeSymbolArray(w writer, symbols []Symbol) error {
}
}
buf := bufPool.New().(*bytes.Buffer)
buf := bufPool.Get().(*bytes.Buffer)
defer bufPool.Put(buf)
var elems [][]byte

Просмотреть файл

@ -36,7 +36,7 @@ func ConnSASLPlain(username, password string) ConnOption {
// add the handler the the map
c.saslHandlers[saslMechanismPLAIN] = func() stateFunc {
// send saslInit with PLAIN payload
c.txFrame(frame{
c.wantWriteFrame(frame{
typ: frameTypeSASL,
body: &saslInit{
Mechanism: "PLAIN",

Просмотреть файл

@ -140,7 +140,7 @@ const (
// protoHeader in a structure appropriate for use with binary.Read()
type protoHeader struct {
Proto [4]byte
ProtoID uint8
ProtoID protoID
Major uint8
Minor uint8
Revision uint8