azure-service-bus-go/receiver.go

146 строки
3.0 KiB
Go
Исходник Обычный вид История

package servicebus
2018-01-22 22:30:27 +03:00
import (
"context"
log "github.com/sirupsen/logrus"
"net"
"pack.ag/amqp"
"time"
)
// Receiver provides session and link handling for a receiving entity path
type Receiver struct {
client *amqp.Client
session *amqp.Session
receiver *amqp.Receiver
entityPath string
done chan struct{}
2018-01-22 22:30:27 +03:00
}
// NewReceiver creates a new Service Bus message listener given an AMQP client and an entity path
func NewReceiver(client *amqp.Client, entityPath string) (*Receiver, error) {
receiver := &Receiver{
client: client,
entityPath: entityPath,
done: make(chan struct{}),
2018-01-22 22:30:27 +03:00
}
err := receiver.newSessionAndLink()
if err != nil {
return nil, err
}
return receiver, nil
}
// Close will close the AMQP session and link of the receiver
func (r *Receiver) Close() error {
err := r.session.Close()
if err != nil {
return err
}
err = r.receiver.Close()
if err != nil {
return err
}
close(r.done)
2018-01-22 22:30:27 +03:00
return nil
}
// Recover will attempt to close the current session and link, then rebuild them
func (r *Receiver) Recover() error {
err := r.Close()
if err != nil {
return err
}
err = r.newSessionAndLink()
if err != nil {
return err
}
return nil
}
// Listen start a listener for messages sent to the entity path
func (r *Receiver) Listen(handler Handler) {
messages := make(chan *amqp.Message)
go r.listenForMessages(messages)
go r.handleMessages(messages, handler)
}
func (r *Receiver) handleMessages(messages chan *amqp.Message, handler Handler) {
for {
select {
case <-r.done:
log.Debug("done handling messages")
return
case msg := <-messages:
ctx := context.Background()
err := handler(ctx, msg)
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
}
if err != nil {
msg.Reject()
log.Debugf("Message rejected: id: %s", id)
} else {
// Accept message
msg.Accept()
log.Debugf("Message accepted: id: %s", id)
}
}
}
}
func (r *Receiver) listenForMessages(msgChan chan *amqp.Message) {
for {
select {
case <-r.done:
log.Debug("done listenting for messages")
close(msgChan)
return
default:
waitCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
msg, err := r.receiver.Receive(waitCtx)
cancel()
if err, ok := err.(net.Error); ok && err.Timeout() {
continue
} else if err != nil {
log.Fatalln(err)
}
if msg != nil {
id := interface{}("null")
if msg.Properties != nil {
id = msg.Properties.MessageID
2018-01-22 22:30:27 +03:00
}
log.Debugf("Message received: %s", id)
msgChan <- msg
2018-01-22 22:30:27 +03:00
}
}
}
2018-01-22 22:30:27 +03:00
}
// newSessionAndLink will replace the session and link on the receiver
func (r *Receiver) newSessionAndLink() error {
session, err := r.client.NewSession()
if err != nil {
return err
}
amqpReceiver, err := session.NewReceiver(
amqp.LinkAddress(r.entityPath),
amqp.LinkCredit(10),
amqp.LinkBatching(true))
if err != nil {
return err
}
r.session = session
r.receiver = amqpReceiver
return nil
}