Add support for modified disposition and including an error with reject. (#97)

This changes the signature of `Message.Reject` to accept a `*Error` and is therefor a breaking change.
This commit is contained in:
Kale Blankenship 2018-05-14 08:23:18 -07:00 коммит произвёл GitHub
Родитель 970864c81c
Коммит 390d7eaeac
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
3 изменённых файлов: 45 добавлений и 52 удалений

Просмотреть файл

@ -1542,20 +1542,12 @@ func (r *Receiver) Close(ctx context.Context) error {
}
type messageDisposition struct {
id deliveryID
disposition disposition
id deliveryID
state interface{}
}
type deliveryID uint32
type disposition int
const (
dispositionAccept disposition = iota
dispositionReject
dispositionRelease
)
func (r *Receiver) dispositionBatcher() {
// batch operations:
// Keep track of the first and last delivery ID, incrementing as
@ -1580,16 +1572,17 @@ func (r *Receiver) dispositionBatcher() {
case msgDis := <-r.dispositions:
// not accepted or batch out of order
if msgDis.disposition != dispositionAccept || (batchStarted && last+1 != msgDis.id) {
_, isAccept := msgDis.state.(*stateAccepted)
if !isAccept || (batchStarted && last+1 != msgDis.id) {
// send the current batch, if any
if batchStarted {
lastCopy := last
r.sendDisposition(first, &lastCopy, dispositionAccept)
r.sendDisposition(first, &lastCopy, &stateAccepted{})
batchStarted = false
}
// send the current message
r.sendDisposition(msgDis.id, nil, msgDis.disposition)
r.sendDisposition(msgDis.id, nil, msgDis.state)
continue
}
@ -1607,7 +1600,7 @@ func (r *Receiver) dispositionBatcher() {
// send batch if current size == batchSize
if uint32(last-first+1) >= batchSize {
lastCopy := last
r.sendDisposition(first, &lastCopy, dispositionAccept)
r.sendDisposition(first, &lastCopy, &stateAccepted{})
batchStarted = false
if !batchTimer.Stop() {
<-batchTimer.C // batch timer must be drained if stop returns false
@ -1617,7 +1610,7 @@ func (r *Receiver) dispositionBatcher() {
// maxBatchAge elapsed, send batch
case <-batchTimer.C:
lastCopy := last
r.sendDisposition(first, &lastCopy, dispositionAccept)
r.sendDisposition(first, &lastCopy, &stateAccepted{})
batchStarted = false
batchTimer.Stop()
@ -1628,49 +1621,25 @@ func (r *Receiver) dispositionBatcher() {
}
// sendDisposition sends a disposition frame to the peer
func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, disp disposition) {
func (r *Receiver) sendDisposition(first deliveryID, last *deliveryID, state interface{}) {
fr := &performDisposition{
Role: roleReceiver,
First: uint32(first),
Last: (*uint32)(last),
Settled: r.link.receiverSettleMode == nil || *r.link.receiverSettleMode == ModeFirst,
}
switch disp {
case dispositionAccept:
fr.State = new(stateAccepted)
case dispositionReject:
fr.State = new(stateRejected)
case dispositionRelease:
fr.State = new(stateReleased)
State: state,
}
debug(1, "TX: %s", fr)
r.link.session.txFrame(fr, nil)
}
func (r *Receiver) acceptMessage(id deliveryID) {
func (r *Receiver) messageDisposition(id deliveryID, state interface{}) {
if r.batching {
r.dispositions <- messageDisposition{id: id, disposition: dispositionAccept}
r.dispositions <- messageDisposition{id: id, state: state}
return
}
r.sendDisposition(id, nil, dispositionAccept)
}
func (r *Receiver) rejectMessage(id deliveryID) {
if r.batching {
r.dispositions <- messageDisposition{id: id, disposition: dispositionReject}
return
}
r.sendDisposition(id, nil, dispositionReject)
}
func (r *Receiver) releaseMessage(id deliveryID) {
if r.batching {
r.dispositions <- messageDisposition{id: id, disposition: dispositionRelease}
return
}
r.sendDisposition(id, nil, dispositionRelease)
r.sendDisposition(id, nil, state)
}
const maxTransferFrameHeader = 66 // determined by calcMaxTransferFrameHeader

Просмотреть файл

@ -510,7 +510,7 @@ var (
&stateModified{
DeliveryFailed: true,
UndeliverableHere: true,
MessageAnnotations: map[symbol]interface{}{
MessageAnnotations: Annotations{
"more": "annotations",
},
},

Просмотреть файл

@ -1750,14 +1750,18 @@ func (m *Message) GetData() []byte {
// accepted and does not require redelivery.
func (m *Message) Accept() {
if m.shouldSendDisposition() {
m.receiver.acceptMessage(m.id)
m.receiver.messageDisposition(m.id, &stateAccepted{})
}
}
// Reject notifies the server that the message is invalid.
func (m *Message) Reject() {
//
// Rejection error is optional.
func (m *Message) Reject(e *Error) {
if m.shouldSendDisposition() {
m.receiver.rejectMessage(m.id)
m.receiver.messageDisposition(m.id, &stateRejected{
Error: e,
})
}
}
@ -1765,7 +1769,29 @@ func (m *Message) Reject() {
// may be redelivered to this or another consumer.
func (m *Message) Release() {
if m.shouldSendDisposition() {
m.receiver.releaseMessage(m.id)
m.receiver.messageDisposition(m.id, &stateReleased{})
}
}
// Modify notifies the server that the message was not acted upon
// and should be modifed.
//
// deliveryFailed indicates that the server must consider this and
// unsuccessful delivery attempt and increment the delivery count.
//
// undeliverableHere indicates that the server must not redeliver
// the message to this link.
//
// messageAnnotations is an optional annotation map to be merged
// with the existing message annotations, overwriting existing keys
// if necessary.
func (m *Message) Modify(deliveryFailed, undeliverableHere bool, messageAnnotations Annotations) {
if m.shouldSendDisposition() {
m.receiver.messageDisposition(m.id, &stateModified{
DeliveryFailed: deliveryFailed,
UndeliverableHere: undeliverableHere,
MessageAnnotations: messageAnnotations,
})
}
}
@ -1780,8 +1806,6 @@ func (m *Message) shouldSendDisposition() bool {
return !m.settled || (m.receiver.link.receiverSettleMode != nil && *m.receiver.link.receiverSettleMode == ModeSecond)
}
// TODO: add support for sending Modified disposition
func (m *Message) marshal(wr *buffer) error {
if m.Header != nil {
err := m.Header.marshal(wr)
@ -2312,7 +2336,7 @@ type stateModified struct {
// the value in this field associated with that key replaces the one in the
// existing headers; where the existing message-annotations has no such value,
// the value in this map is added.
MessageAnnotations map[symbol]interface{}
MessageAnnotations Annotations
}
func (sm *stateModified) marshal(wr *buffer) error {