azure-service-bus-go/queue_examples_test.go

344 строки
9.5 KiB
Go

package servicebus_test
import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"time"
"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/Azure/azure-service-bus-go"
"github.com/joho/godotenv"
)
func init() {
if err := godotenv.Load(); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_getOrBuildQueue() {
const queueName = "myqueue"
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println(err)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
qm := ns.NewQueueManager()
qe, err := qm.Get(ctx, queueName)
if err != nil && !servicebus.IsErrNotFound(err) {
fmt.Println(err)
return
}
if qe == nil {
_, err := qm.Put(ctx, queueName)
if err != nil {
fmt.Println(err)
return
}
}
q, err := ns.NewQueue(queueName)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(q.Name)
// Output: myqueue
}
func ExampleQueue_Send() {
// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
return
}
client, err := ns.NewQueue("myqueue")
if err != nil {
return
}
// Create a context to limit how long we will try to send, then push the message over the wire.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := client.Send(ctx, servicebus.NewMessageFromString("Hello World!!!")); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_Receive() {
// Define a function that should be executed when a message is received.
var printMessage servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
fmt.Println(string(msg.Data))
return msg.Complete(ctx)
}
// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
return
}
client, err := ns.NewQueue("myqueue")
if err != nil {
return
}
// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
if err := client.Receive(ctx, printMessage); err != nil {
fmt.Println("FATAL: ", err)
}
}
func ExampleQueue_Receive_second() {
// Set concurrent number
const concurrentNum = 5
// Define msg chan
msgChan := make(chan *servicebus.Message, concurrentNum)
// Define a function that should be executed when a message is received.
var concurrentHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
msgChan <- msg
return nil
}
// Define msg workers
for i := 0; i < concurrentNum; i++ {
go func() {
for msg := range msgChan {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Millisecond)
defer cancel()
fmt.Println(string(msg.Data))
msg.Complete(ctx)
}
}()
}
// Instantiate the clients needed to communicate with a Service Bus Queue.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString("<your connection string here>"))
if err != nil {
close(msgChan)
return
}
// Init queue client with prefetch count
client, err := ns.NewQueue("myqueue", servicebus.QueueWithPrefetchCount(concurrentNum))
if err != nil {
close(msgChan)
return
}
// Define a context to limit how long we will block to receive messages, then start serving our function.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
if err := client.Receive(ctx, concurrentHandler); err != nil {
fmt.Println("FATAL: ", err)
}
// Close the message chan
close(msgChan)
}
func ExampleQueue_scheduleAndCancelMessages() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
// Create a client to communicate with a Service Bus Namespace.
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
client, err := ns.NewQueue("schedulewithqueue")
if err != nil {
fmt.Println("FATAL: ", err)
return
}
// The delay that we should schedule a message for.
const waitTime = 1 * time.Minute
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")
scheduled, err := client.ScheduleAt(ctx, expectedTime, msg)
if err != nil {
fmt.Println("FATAL: ", err)
return
}
err = client.CancelScheduled(ctx, scheduled...)
if err != nil {
fmt.Println("FATAL: ", err)
return
}
fmt.Println("All Messages Scheduled and Cancelled")
// Output: All Messages Scheduled and Cancelled
}
func ExampleQueue_sessionsRoundTrip() {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Setup the required clients for communicating with Service Bus. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
connStr := os.Getenv("SERVICEBUS_CONNECTION_STRING")
if connStr == "" {
fmt.Println("FATAL: expected environment variable SERVICEBUS_CONNECTION_STRING not set")
return
}
ns, err := servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(connStr))
if err != nil {
fmt.Println("FATAL: ", err)
return
}
client, err := ns.NewQueue("receivesession")
if err != nil {
fmt.Println("FATAL: ", err)
return
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Publish five session's worth of data. //
// //
// The sessions are deliberately interleaved to demonstrate consumption semantics. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const numSessions = 5
adjectives := []string{"Doltish", "Foolish", "Juvenile"}
nouns := []string{"Automaton", "Luddite", "Monkey", "Neanderthal"}
// seed chosen arbitrarily, see https://en.wikipedia.org/wiki/Taxicab_number
generator := rand.New(rand.NewSource(1729))
sessionIDs := make([]string, numSessions)
// Establish a set of sessions
for i := 0; i < numSessions; i++ {
if rawSessionID, err := uuid.NewV4(); err == nil {
sessionIDs[i] = rawSessionID.String()
} else {
fmt.Println("FATAL: ", err)
return
}
}
// Publish an adjective for each session
for i := 0; i < numSessions; i++ {
adj := adjectives[generator.Intn(len(adjectives))]
msg := servicebus.NewMessageFromString(adj)
msg.SessionID = &sessionIDs[i]
if err := client.Send(ctx, msg); err != nil {
fmt.Println("FATAL: ", err)
return
}
}
// Publish a noun for each session
for i := 0; i < numSessions; i++ {
noun := nouns[generator.Intn(len(nouns))]
msg := servicebus.NewMessageFromString(noun)
msg.SessionID = &sessionIDs[i]
if err := client.Send(ctx, msg); err != nil {
fmt.Println("FATAL: ", err)
return
}
}
// Publish a numeric suffix for each session
for i := 0; i < numSessions; i++ {
suffix := fmt.Sprintf("%02d", generator.Intn(100))
msg := servicebus.NewMessageFromString(suffix)
msg.SessionID = &sessionIDs[i]
if err := client.Send(ctx, msg); err != nil {
fmt.Println("FATAL: ", err)
return
}
}
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Receive and process the previously published sessions. //
// //
// The order the sessions are received in is not guaranteed, so the expected output must be "Unordered output". //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
for i := 0; i < numSessions; i++ {
handler := &SessionPrinter{}
qs := client.NewSession(nil)
if err := qs.ReceiveOne(ctx, handler); err != nil {
fmt.Println("FATAL: ", err)
return
}
}
// Unordered output:
// FoolishMonkey63
// FoolishLuddite05
// JuvenileMonkey80
// JuvenileLuddite84
// FoolishLuddite68
}
type SessionPrinter struct {
builder *bytes.Buffer
messageSession *servicebus.MessageSession
messagesReceived uint
}
func (sp *SessionPrinter) Start(ms *servicebus.MessageSession) error {
if sp.builder == nil {
sp.builder = &bytes.Buffer{}
} else {
sp.builder.Reset()
}
sp.messagesReceived = 0
sp.messageSession = ms
return nil
}
func (sp *SessionPrinter) Handle(ctx context.Context, msg *servicebus.Message) error {
sp.builder.Write(msg.Data)
sp.messagesReceived++
if sp.messagesReceived >= 3 {
defer sp.messageSession.Close()
}
return msg.Complete(ctx)
}
func (sp *SessionPrinter) End() {
fmt.Println(sp.builder.String())
}