Add concurrent receive example
This commit is contained in:
Родитель
9fb31b39cb
Коммит
330db51df7
|
@ -110,6 +110,56 @@ func ExampleQueue_Receive() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ExampleQueue_Receive_second() {
|
||||||
|
// Set concurrent number
|
||||||
|
const concurrentNum = 5
|
||||||
|
// Define msg chan
|
||||||
|
msgChan := make(chan *servicebus.Message, concurrentNum)
|
||||||
|
// Define a function that should be executed when a message is received.
|
||||||
|
var concurrentHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
|
||||||
|
msgChan <- msg
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define msg workers
|
||||||
|
for i := 0; i < concurrentNum; i++ {
|
||||||
|
go func() {
|
||||||
|
for msg := range msgChan {
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 30 * time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
fmt.Println(string(msg.Data))
|
||||||
|
msg.Complete(ctx)
|
||||||
|
}
|
||||||
|
} ()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Instantiate the clients needed to communicate with a Service Bus Queue.
|
||||||
|
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
|
||||||
|
if err != nil {
|
||||||
|
close(msgChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Init queue client with prefetch count
|
||||||
|
client, err := ns.NewQueue("myqueue", servicebus.QueueWithPrefetchCount(concurrentNum))
|
||||||
|
if err != nil {
|
||||||
|
close(msgChan)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Define a context to limit how long we will block to receive messages, then start serving our function.
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
if err := client.Receive(ctx, concurrentHandler); err != nil {
|
||||||
|
fmt.Println("FATAL: ", err)
|
||||||
|
}
|
||||||
|
// Close the message chan
|
||||||
|
close(msgChan)
|
||||||
|
}
|
||||||
|
|
||||||
func ExampleQueue_scheduleAndCancelMessages() {
|
func ExampleQueue_scheduleAndCancelMessages() {
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
Загрузка…
Ссылка в новой задаче