зеркало из https://github.com/Azure/go-amqp.git
prevent hang if unexpected frame received
This commit is contained in:
Родитель
70eb87968e
Коммит
c9844db300
6
Makefile
6
Makefile
|
@ -2,9 +2,13 @@ PACKAGE := pack.ag/amqp
|
|||
|
||||
.PHONY: fuzz
|
||||
fuzz:
|
||||
go-fuzz-build -o fuzz.zip $(PACKAGE)
|
||||
go-fuzz-build -o ./go-fuzz/fuzz.zip $(PACKAGE)
|
||||
go-fuzz -bin ./go-fuzz/fuzz.zip -workdir go-fuzz
|
||||
|
||||
.PHONY: fuzzclean
|
||||
fuzzclean:
|
||||
rm -f ./go-fuzz/{crashers,suppressions}/*
|
||||
|
||||
.PHONY: test
|
||||
test:
|
||||
go test -v -race ./...
|
||||
|
|
73
conn.go
73
conn.go
|
@ -4,7 +4,6 @@ import (
|
|||
"bytes"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/url"
|
||||
"time"
|
||||
|
@ -201,49 +200,6 @@ func (c *Conn) NewSession() (*Session, error) {
|
|||
return s, nil
|
||||
}
|
||||
|
||||
func parseFrame(payload []byte) (preformative, error) {
|
||||
pType, err := preformativeType(payload)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var t preformative
|
||||
switch pType {
|
||||
case preformativeOpen:
|
||||
t = new(performativeOpen)
|
||||
case preformativeBegin:
|
||||
t = new(performativeBegin)
|
||||
case preformativeAttach:
|
||||
t = new(performativeAttach)
|
||||
case preformativeFlow:
|
||||
t = new(flow)
|
||||
case preformativeTransfer:
|
||||
t = new(performativeTransfer)
|
||||
case preformativeDisposition:
|
||||
t = new(performativeDisposition)
|
||||
case preformativeDetach:
|
||||
t = new(performativeDetach)
|
||||
case preformativeEnd:
|
||||
t = new(performativeEnd)
|
||||
case preformativeClose:
|
||||
t = new(performativeClose)
|
||||
case typeSASLMechanism:
|
||||
t = new(saslMechanisms)
|
||||
case typeSASLOutcome:
|
||||
t = new(saslOutcome)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown preformative type %0x", pType)
|
||||
}
|
||||
|
||||
err = unmarshal(bytes.NewReader(payload), t)
|
||||
return t, err
|
||||
}
|
||||
|
||||
type frame struct {
|
||||
channel uint16
|
||||
preformative preformative
|
||||
}
|
||||
|
||||
var keepaliveFrame = []byte{0x00, 0x00, 0x00, 0x08, 0x02, 0x00, 0x00, 0x00}
|
||||
|
||||
func (c *Conn) startMux() {
|
||||
|
@ -325,46 +281,30 @@ outer:
|
|||
negotiating = false
|
||||
}
|
||||
|
||||
fmt.Printf("got: %#v\n", p)
|
||||
select {
|
||||
case <-c.done:
|
||||
return
|
||||
case c.rxProto <- p:
|
||||
}
|
||||
fmt.Printf("sent: %#v\n", p)
|
||||
continue
|
||||
}
|
||||
|
||||
frameHeader, err := parseFrameHeader(buf.Bytes())
|
||||
frameHeader, err := parseFrameHeader(buf)
|
||||
if err != nil {
|
||||
c.readErr <- err
|
||||
return
|
||||
}
|
||||
|
||||
if buf.Len() < int(frameHeader.size) {
|
||||
if buf.Len() < int(frameHeader.Size)-8 {
|
||||
continue outer
|
||||
}
|
||||
|
||||
if frameHeader.size < 8 {
|
||||
c.readErr <- errors.Errorf("invalid header size: %#v", frameHeader)
|
||||
return
|
||||
}
|
||||
|
||||
payload := make([]byte, frameHeader.size)
|
||||
_, err = io.ReadFull(buf, payload)
|
||||
p, err := parseFrame(buf)
|
||||
if err != nil {
|
||||
c.readErr <- err
|
||||
return
|
||||
}
|
||||
|
||||
p, err := parseFrame(payload[8:])
|
||||
if err != nil {
|
||||
c.readErr <- err
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("got: %#v\n", p)
|
||||
|
||||
if o, ok := p.(*performativeOpen); ok && o.MaxFrameSize < c.maxFrameSize {
|
||||
c.maxFrameSize = o.MaxFrameSize
|
||||
}
|
||||
|
@ -372,9 +312,8 @@ outer:
|
|||
select {
|
||||
case <-c.done:
|
||||
return
|
||||
case c.rxFrame <- frame{channel: frameHeader.channel, preformative: p}:
|
||||
case c.rxFrame <- frame{channel: frameHeader.Channel, preformative: p}:
|
||||
}
|
||||
fmt.Printf("sent: %#v\n", p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -415,6 +354,8 @@ func (c *Conn) exchangeProtoHeader(protoID uint8) stateFunc {
|
|||
case p = <-c.rxProto:
|
||||
case c.err = <-c.readErr:
|
||||
return nil
|
||||
case fr := <-c.rxFrame:
|
||||
c.err = errors.Errorf("unexpected frame %#v", fr)
|
||||
case <-time.After(1 * time.Second):
|
||||
c.err = ErrTimeout
|
||||
return nil
|
||||
|
@ -580,6 +521,8 @@ func (c *Conn) readFrame() (frame, error) {
|
|||
return fr, nil
|
||||
case err := <-c.readErr:
|
||||
return fr, err
|
||||
case p := <-c.rxProto:
|
||||
return fr, errors.Errorf("unexpected protocol header %#v", p)
|
||||
case <-time.After(1 * time.Second):
|
||||
return fr, ErrTimeout
|
||||
}
|
||||
|
|
72
framing.go
72
framing.go
|
@ -4,23 +4,25 @@ import (
|
|||
"encoding/binary"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type frameHeader struct {
|
||||
// size: an unsigned 32-bit integer that MUST contain the total frame size of the frame header,
|
||||
// extended header, and frame body. The frame is malformed if the size is less than the size of
|
||||
// the frame header (8 bytes).
|
||||
size uint32
|
||||
Size uint32
|
||||
// doff: gives the position of the body within the frame. The value of the data offset is an
|
||||
// unsigned, 8-bit integer specifying a count of 4-byte words. Due to the mandatory 8-byte
|
||||
// frame header, the frame is malformed if the value is less than 2.
|
||||
dataOffset uint8
|
||||
frameType uint8
|
||||
channel uint16
|
||||
DataOffset uint8
|
||||
FrameType uint8
|
||||
Channel uint16
|
||||
}
|
||||
|
||||
func (fh frameHeader) dataOffsetBytes() int {
|
||||
return int(fh.dataOffset) * 4
|
||||
return int(fh.DataOffset) * 4
|
||||
}
|
||||
|
||||
// Frame Types
|
||||
|
@ -29,21 +31,10 @@ const (
|
|||
frameTypeSASL = 0x1
|
||||
)
|
||||
|
||||
func parseFrameHeader(buf []byte) (frameHeader, error) {
|
||||
func parseFrameHeader(r io.Reader) (frameHeader, error) {
|
||||
var fh frameHeader
|
||||
// err := binary.Read(r, binary.BigEndian, &fh)
|
||||
// return fh, err
|
||||
|
||||
if len(buf) < 8 {
|
||||
return fh, fmt.Errorf("frame size %d, must be at least 8 bytes", len(buf))
|
||||
}
|
||||
|
||||
fh.size = binary.BigEndian.Uint32(buf)
|
||||
fh.dataOffset = buf[4]
|
||||
fh.frameType = buf[5]
|
||||
fh.channel = binary.BigEndian.Uint16(buf[6:])
|
||||
|
||||
return fh, nil
|
||||
err := binary.Read(r, binary.BigEndian, &fh)
|
||||
return fh, err
|
||||
}
|
||||
|
||||
type proto struct {
|
||||
|
@ -69,6 +60,49 @@ func parseProto(r io.Reader) (proto, error) {
|
|||
return p, nil
|
||||
}
|
||||
|
||||
func parseFrame(r byteReader) (preformative, error) {
|
||||
pType, err := peekPreformativeType(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var t preformative
|
||||
switch pType {
|
||||
case preformativeOpen:
|
||||
t = new(performativeOpen)
|
||||
case preformativeBegin:
|
||||
t = new(performativeBegin)
|
||||
case preformativeAttach:
|
||||
t = new(performativeAttach)
|
||||
case preformativeFlow:
|
||||
t = new(flow)
|
||||
case preformativeTransfer:
|
||||
t = new(performativeTransfer)
|
||||
case preformativeDisposition:
|
||||
t = new(performativeDisposition)
|
||||
case preformativeDetach:
|
||||
t = new(performativeDetach)
|
||||
case preformativeEnd:
|
||||
t = new(performativeEnd)
|
||||
case preformativeClose:
|
||||
t = new(performativeClose)
|
||||
case typeSASLMechanism:
|
||||
t = new(saslMechanisms)
|
||||
case typeSASLOutcome:
|
||||
t = new(saslOutcome)
|
||||
default:
|
||||
return nil, errors.Errorf("unknown preformative type %0x", pType)
|
||||
}
|
||||
|
||||
err = unmarshal(r, t)
|
||||
return t, err
|
||||
}
|
||||
|
||||
type frame struct {
|
||||
channel uint16
|
||||
preformative preformative
|
||||
}
|
||||
|
||||
/*
|
||||
header (8 bytes)
|
||||
0-3: SIZE (total size, at least 8 bytes for header, uint32)
|
||||
|
|
28
fuzz_test.go
28
fuzz_test.go
|
@ -125,6 +125,34 @@ func TestFuzzCrashers(t *testing.T) {
|
|||
"1\x01\xe0/\x04\xb3\x00\x00\x00\aMSSBCB\xff\x00\x00\x00" +
|
||||
"\x05PLAIN\xfa\x00\x00\tAN\xcfNYMOUS\x00" +
|
||||
"\x00\x00\bE\xef\xbf\x02\x00\fU ",
|
||||
14: "AMQP\x03\x01\x00\x00\x00\x00\x00?\x02\x01\x00\x00\x00S@\xc0" +
|
||||
"2\x01\xe0/\x04\xb3\x00\x00\x00\aMSSBCBS\x00\x00\x00" +
|
||||
"\x05PLAIN\x00\x00\x00\tANONYMOUR\xff" +
|
||||
"\xff\xed\bEXTERNAL\x00\x00\x00\x1a\x02\x01\x00\x00\x00" +
|
||||
"SD\xc0\r\x02P\x00\xa0\bWelcome!AMQ" +
|
||||
"P\x00\x01\x00\x00\x00\x00\x00G\x02\x00\x00\x00\x00S\x10\xc0:\n\xa1" +
|
||||
"$83a29bedd884468ba2e" +
|
||||
"37f3017eeab1d_G29@p\x00" +
|
||||
"\x00\x02\x00`\x00\x01p\x00\x03\xa9\x80@@@@@\x00\x00\x00\x1f" +
|
||||
"\x02\x00\x00\x00\x00S\x11\xc0\x12\b`\x00\x00R\x01p\x00\x00\x13\x88" +
|
||||
"R\x01R\xff@@@\x00\x00\x00d\x02\x00\x00\x00\x00S\x12\xc0W" +
|
||||
"\x0e\xa1(oJnNPGsiuzytMOJPa" +
|
||||
"twtPilfsfykSBGplhxtx" +
|
||||
"VSGCB@P\x01\x00S(\xc0\x12\v\xa1\x05/tes" +
|
||||
"t@@@@@@@@@@@C\x80\x00\x00\x00\x00\x00\x04" +
|
||||
"\x10\x00@@@\x00\x00\x01y\x02\x00\x00\x00\x00S\x14\xc0\x1d\vC" +
|
||||
"C\xa0\x10F>\xc6\\\x06&\xfaE\x9c\x03\xa8\x8e\xe7\x83\xe3;C" +
|
||||
"@B@@@@A\x00Sp\xc0\n\x05@@pH\x19\b\x00" +
|
||||
"@C\x00Sr\xc1\\\x06\xa3\x13x-opt-enqu" +
|
||||
"eued-time\x83\x00\x00\x01[\x9c_)ѣ\x15" +
|
||||
"x-opt-sequence-numbe" +
|
||||
"r\x81\x00\x00\x00\x00\x00\x00\x03x\xa3\x12x-opt-lo" +
|
||||
"cked-until\x83\x00\x00\x01[\x9c_\x9f\x11\x00" +
|
||||
"Ss\xc0H\r\xa1$5e84053f-81c9" +
|
||||
"-49fc-ae42-ff0ab353d" +
|
||||
"998@@\xa1\x14Service Bus E" +
|
||||
"xplorer@@@@@@@@@\x00St\xc1" +
|
||||
"8\x04\xa1\vMachineNam",
|
||||
}
|
||||
|
||||
for i, tt := range tests {
|
||||
|
|
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
Двоичный файл не отображается.
21
message.go
21
message.go
|
@ -119,16 +119,6 @@ func (m *Message) Release() {
|
|||
m.sendDisposition(&StateReleased{})
|
||||
}
|
||||
|
||||
// func (h *Header) marshal() ([]byte, error) {
|
||||
// return marshalComposite(TypeMessageHeader, []field{
|
||||
// {value: h.Durable, omit: !h.Durable},
|
||||
// {value: h.Priority, omit: h.Priority == 4},
|
||||
// {value: h.TTL, omit: h.TTL.Duration == 0},
|
||||
// {value: h.FirstAcquirer, omit: !h.FirstAcquirer},
|
||||
// {value: h.DeliveryCount, omit: h.DeliveryCount == 0},
|
||||
// }...)
|
||||
// }
|
||||
|
||||
func peekMessageType(buf []byte) (uint8, error) {
|
||||
if len(buf) < 3 {
|
||||
return 0, errors.New("invalid message")
|
||||
|
@ -138,7 +128,7 @@ func peekMessageType(buf []byte) (uint8, error) {
|
|||
return 0, errors.Errorf("invalid composite header %0x", buf[0])
|
||||
}
|
||||
|
||||
v, err := readInt(bytes.NewReader(buf[1:]))
|
||||
v, err := readInt(bytes.NewBuffer(buf[1:]))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
@ -156,15 +146,8 @@ func consumeBytes(r io.ByteReader, n int) error {
|
|||
}
|
||||
|
||||
func (m *Message) unmarshal(r byteReader) error {
|
||||
byter, ok := r.(interface {
|
||||
Bytes() []byte
|
||||
})
|
||||
if !ok {
|
||||
return errors.New("unmarshal message requires Bytes() method")
|
||||
}
|
||||
|
||||
for {
|
||||
buf := byter.Bytes()
|
||||
buf := r.Bytes()
|
||||
if len(buf) == 0 {
|
||||
break
|
||||
}
|
||||
|
|
|
@ -28,7 +28,8 @@ const (
|
|||
typeError = 0x1d
|
||||
)
|
||||
|
||||
func preformativeType(payload []byte) (uint8, error) {
|
||||
func peekPreformativeType(r byteReader) (uint8, error) {
|
||||
payload := r.Bytes()
|
||||
if len(payload) == 0 {
|
||||
return preformativeEmpty, nil
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ type byteReader interface {
|
|||
io.Reader
|
||||
io.ByteReader
|
||||
UnreadByte() error
|
||||
Bytes() []byte
|
||||
}
|
||||
|
||||
type byteWriter interface {
|
||||
|
|
Загрузка…
Ссылка в новой задаче