This commit is contained in:
David Justice 2019-01-03 09:31:32 -08:00
Родитель 66358a9819
Коммит 4276ebc5b7
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
1 изменённых файлов: 10 добавлений и 0 удалений

Просмотреть файл

@ -14,12 +14,14 @@ type StepSessionHandler struct {
messageSession *servicebus.MessageSession messageSession *servicebus.MessageSession
} }
// Start is called when a new session is started
func (ssh *StepSessionHandler) Start(ms *servicebus.MessageSession) error { func (ssh *StepSessionHandler) Start(ms *servicebus.MessageSession) error {
ssh.messageSession = ms ssh.messageSession = ms
fmt.Println("Begin session: ", *ssh.messageSession.SessionID()) fmt.Println("Begin session: ", *ssh.messageSession.SessionID())
return nil return nil
} }
// Handle is called when a new session message is received
func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error { func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Message) error {
var step RecipeStep var step RecipeStep
if err := json.Unmarshal(msg.Data, &step); err != nil { if err := json.Unmarshal(msg.Data, &step); err != nil {
@ -35,6 +37,8 @@ func (ssh *StepSessionHandler) Handle(ctx context.Context, msg *servicebus.Messa
return msg.Complete(ctx) return msg.Complete(ctx)
} }
// End is called when the message session is closed. Service Bus will not automatically end your message session. Be
// sure to know when to terminate your own session.
func (ssh *StepSessionHandler) End() { func (ssh *StepSessionHandler) End() {
fmt.Println("End session: ", *ssh.messageSession.SessionID()) fmt.Println("End session: ", *ssh.messageSession.SessionID())
fmt.Println("") fmt.Println("")
@ -57,6 +61,8 @@ func Example_messageSessions() {
return return
} }
// Create a Service Bus Queue with required sessions enabled. This will ensure that all messages sent and received
// are bound to a session.
qm := ns.NewQueueManager() qm := ns.NewQueueManager()
qEntity, err := ensureQueue(ctx, qm, "MessageSessionsExample", servicebus.QueueEntityWithRequiredSessions()) qEntity, err := ensureQueue(ctx, qm, "MessageSessionsExample", servicebus.QueueEntityWithRequiredSessions())
if err != nil { if err != nil {
@ -72,9 +78,13 @@ func Example_messageSessions() {
sessions := []string{"foo", "bar", "bazz", "buzz"} sessions := []string{"foo", "bar", "bazz", "buzz"}
for _, session := range sessions { for _, session := range sessions {
// send recipe steps
// note that order is preserved within a given session
sendSessionRecipeSteps(ctx, session, q) sendSessionRecipeSteps(ctx, session, q)
} }
// receive messages for each session
// you can also call q.NewSession(nil) to receive from any available session
for _, session := range sessions { for _, session := range sessions {
queueSession := q.NewSession(&session) queueSession := q.NewSession(&session)
err := queueSession.ReceiveOne(ctx, new(StepSessionHandler)) err := queueSession.ReceiveOne(ctx, new(StepSessionHandler))