Adding the ability to cancel scheduled messges

This commit is contained in:
Martin Strobel 2018-11-02 14:37:07 -07:00
Родитель c03b0ce8d4
Коммит 3884a90a86
3 изменённых файлов: 46 добавлений и 14 удалений

Просмотреть файл

@ -7,6 +7,7 @@ const (
lockRenewalOperationName = vendorPrefix + "renew-lock"
peekMessageOperationID = vendorPrefix + "peek-message"
scheduleMessageOperationID = vendorPrefix + "schedule-message"
cancelScheduledOperationID = vendorPrefix + "cancel-scheduled-message"
)
// Field Descriptions

Просмотреть файл

@ -243,13 +243,51 @@ func (q *Queue) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages
}
return retval, nil
}
return nil, newErrIncorrectType(sequenceFieldName, []interface{}{}, rawArr)
return nil, newErrIncorrectType(sequenceFieldName, []int64{}, rawArr)
}
return nil, ErrMissingField(sequenceFieldName)
}
return nil, newErrIncorrectType("value", map[string]interface{}{}, resp.Message.Value)
}
// CancelScheduled allows for removal of messages that have been handed to the Service Bus broker for later delivery,
// but have not yet ben enqueued.
func (q *Queue) CancelScheduled(ctx context.Context, seq ...int64) error {
msg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
operationFieldName: cancelScheduledOperationID,
},
Value: map[string]interface{}{
"sequence-numbers": seq,
},
}
if deadline, ok := ctx.Deadline(); ok {
msg.ApplicationProperties[serverTimeoutFieldName] = uint(time.Until(deadline) / time.Millisecond)
}
err := q.ensureSender(ctx)
if err != nil {
return err
}
link, err := rpc.NewLink(q.sender.connection, q.ManagementPath())
if err != nil {
return err
}
resp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg)
if err != nil {
return err
}
if resp.Code != 200 {
return ErrAMQP(*resp)
}
return nil
}
// Peek fetches a list of Messages from the Service Bus broker without acquiring a lock or committing to a disposition.
// The messages are delivered as close to sequence order as possible.
//

Просмотреть файл

@ -104,7 +104,7 @@ func ExampleQueue_Receive() {
client.Receive(ctx, printMessage)
}
func ExampleQueue_ScheduleAt() {
func ExampleQueue_scheduleAndCancelMessages() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()
@ -137,28 +137,21 @@ func ExampleQueue_ScheduleAt() {
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")
_, err = client.ScheduleAt(ctx, expectedTime, msg)
scheduled, err := client.ScheduleAt(ctx, expectedTime, msg)
if err != nil {
fmt.Println("FATAL: ", err)
return
}
err = client.ReceiveOne(ctx,
servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
received := time.Now()
if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
fmt.Println("Received when expected!")
} else {
fmt.Println("Received outside the expected window.")
}
return msg.Complete()
}))
err = client.CancelScheduled(ctx, scheduled...)
if err != nil {
fmt.Println("FATAL: ", err)
return
}
// Output: Received when expected!
fmt.Println("All Messages Scheduled and Cancelled")
// Output: All Messages Scheduled and Cancelled
}
func ExampleQueue_sessionsRoundTrip() {