Removed exit channel dependency in receive.go and moved receiveFromQueue call to app.go from acknowledge.go

This commit is contained in:
Jackie Elliott 2018-06-27 10:58:52 -07:00
Родитель 3206dbe137
Коммит b9a4f020d7
3 изменённых файлов: 21 добавлений и 33 удалений

Просмотреть файл

@ -8,6 +8,7 @@ import (
"github.com/unrolled/secure"
"github.com/Azure/buffalo-azure/sdk/eventgrid"
"github.com/Azure/spec-sla-bot/messages"
"github.com/gobuffalo/x/sessions"
"github.com/rs/cors"
)
@ -54,6 +55,8 @@ func App() *buffalo.App {
app.POST("/event/listen", EventListen)
eventgrid.RegisterSubscriber(app, "/specsla", NewSpecslaSubscriber(&eventgrid.BaseSubscriber{}))
//app.POST("/receiver/message", ReceiverMessage)
//Create AMQP Listener
messages.ReceiveFromQueue()
}
return app

Просмотреть файл

@ -24,7 +24,6 @@ func CheckAcknowledgement(event github.PullRequestEvent) {
message := fmt.Sprintf("PR id, %d, URL, %s, Assignee, %s", *event.PullRequest.ID, *event.PullRequest.URL, *event.PullRequest.Assignee.Login)
log.Print(message)
err := SendToQueue(message)
receiveFromQueue()
log.Print("SENT TO QUEUE")
if err != nil {
log.Printf("Message for event %d not delivered", *event.PullRequest.ID)

Просмотреть файл

@ -17,7 +17,7 @@ type Message struct {
Assignee string
}
func receiveFromQueue() {
func ReceiveFromQueue() {
connStr := mustGetenv("SERVICEBUS_CONNECTION_STRING")
//connStr :=
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
@ -33,53 +33,39 @@ func receiveFromQueue() {
os.Exit(1)
}
exit := make(chan struct{})
//exit := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
listenHandle, err := q.Receive(ctx, func(ctx context.Context, message *servicebus.Message) servicebus.DispositionAction {
text := string(message.Data)
log.Print(text)
if text == "exit\n" {
//This will not be a part of my project
log.Println("Oh snap!! Someone told me to exit!")
exit <- *new(struct{})
} else {
//The message is not invalid so parse
log.Print("MADE IT TO RECEIVE")
log.Print(message.Data)
messageStruct, err := parseMessage(message.Data)
if err != nil {
log.Println(err)
//os.Exit(1)
return message.DeadLetter(err)
}
err = SendEmailToAssignee(messageStruct)
if err != nil {
log.Println(err)
os.Exit(1)
}
//The message is not invalid so parse
log.Print("MADE IT TO RECEIVE")
log.Print(message.Data)
messageStruct, err := parseMessage(message.Data)
if err != nil {
log.Println(err)
//os.Exit(1)
return message.DeadLetter(err)
}
err = SendEmailToAssignee(messageStruct)
if err != nil {
log.Println(err)
os.Exit(1)
}
return message.Complete()
})
defer listenHandle.Close(context.Background())
//Not sure if this should stay
if err != nil {
log.Println(err)
os.Exit(1)
}
log.Println("I am listening...")
select {
case <-exit:
log.Println("closing after 2 seconds")
select {
case <-time.After(2 * time.Second):
listenHandle.Close(context.Background())
return
}
}
}
func getQueueToReceive(ns *servicebus.Namespace, queueName string) (*servicebus.Queue, error) {
@ -126,5 +112,5 @@ func parseMessage(data []byte) (*Message, error) {
}
func Split(r rune) bool {
return r == ','
return r == '-' || r == ','
}