From 2bb8cc50824c671f18e99529e2920451614a3d84 Mon Sep 17 00:00:00 2001 From: David Justice Date: Thu, 15 Feb 2018 14:40:17 -0800 Subject: [PATCH] add an example --- _examples/helloworld/.gitignore | 1 + _examples/helloworld/Makefile | 14 +++++ _examples/helloworld/consumer/main.go | 80 +++++++++++++++++++++++++++ _examples/helloworld/producer/main.go | 65 ++++++++++++++++++++++ 4 files changed, 160 insertions(+) create mode 100644 _examples/helloworld/.gitignore create mode 100644 _examples/helloworld/Makefile create mode 100644 _examples/helloworld/consumer/main.go create mode 100644 _examples/helloworld/producer/main.go diff --git a/_examples/helloworld/.gitignore b/_examples/helloworld/.gitignore new file mode 100644 index 0000000..c5e82d7 --- /dev/null +++ b/_examples/helloworld/.gitignore @@ -0,0 +1 @@ +bin \ No newline at end of file diff --git a/_examples/helloworld/Makefile b/_examples/helloworld/Makefile new file mode 100644 index 0000000..3b5f6f8 --- /dev/null +++ b/_examples/helloworld/Makefile @@ -0,0 +1,14 @@ +# Go parameters +GOCMD=go +GOBUILD=$(GOCMD) build +GOCLEAN=$(GOCMD) clean +GOTEST=$(GOCMD) test +GOGET=$(GOCMD) get + +all: clean build +build: + $(GOBUILD) -o ./bin/consumer ./consumer/main.go + $(GOBUILD) -o ./bin/producer ./producer/main.go +clean: + $(GOCLEAN) + rm -rf ./bin \ No newline at end of file diff --git a/_examples/helloworld/consumer/main.go b/_examples/helloworld/consumer/main.go new file mode 100644 index 0000000..faa4e6b --- /dev/null +++ b/_examples/helloworld/consumer/main.go @@ -0,0 +1,80 @@ +package main + +import ( + "github.com/Azure/azure-event-hubs-go" + "fmt" + "time" + "os" + "github.com/Azure/go-autorest/autorest/azure" + "log" + "context" + "pack.ag/amqp" +) + +const ( + Location = "eastus" + ResourceGroupName = "ehtest" + HubName = "producerConsumer" +) + +func main() { + hub, partitions := initHub() + exit := make(chan struct{}) + + handler := func(ctx context.Context, msg *amqp.Message) error { + text := string(msg.Data) + if text == "exit\n" { + fmt.Println("Someone told me to exit!") + exit <- *new(struct{}) + } else { + fmt.Println(string(msg.Data)) + } + return nil + } + + for _, partitionID := range *partitions { + hub.Receive(partitionID, handler) + } + + select { + case <-exit: + fmt.Println("closing after 2 seconds") + select { + case <-time.After(2 * time.Second): + return + } + } +} + +func initHub() (eventhub.SenderReceiver, *[]string) { + subscriptionID := mustGetenv("AZURE_SUBSCRIPTION_ID") + namespace := mustGetenv("EVENTHUB_NAMESPACE") + creds := eventhub.ServicePrincipalCredentials{ + TenantID: mustGetenv("AZURE_TENANT_ID"), + ApplicationID: mustGetenv("AZURE_CLIENT_ID"), + Secret: mustGetenv("AZURE_CLIENT_SECRET"), + } + ns, err := eventhub.NewNamespaceWithServicePrincipalCredentials(subscriptionID, ResourceGroupName, namespace, creds, azure.PublicCloud) + if err != nil { + panic(err) + } + + hubMgmt, err := ns.EnsureEventHub(context.Background(), HubName) + if err != nil { + log.Fatal(err) + } + + hub, err := ns.NewEventHub(HubName) + if err != nil { + log.Fatal(err) + } + return hub, hubMgmt.PartitionIds +} + +func mustGetenv(key string) string { + v := os.Getenv(key) + if v == "" { + panic("Environment variable '" + key + "' required for integration tests.") + } + return v +} diff --git a/_examples/helloworld/producer/main.go b/_examples/helloworld/producer/main.go new file mode 100644 index 0000000..7837867 --- /dev/null +++ b/_examples/helloworld/producer/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "github.com/Azure/azure-event-hubs-go" + "fmt" + "os" + "github.com/Azure/go-autorest/autorest/azure" + "log" + "context" + "pack.ag/amqp" + "bufio" +) + +const ( + Location = "eastus" + ResourceGroupName = "ehtest" + HubName = "producerConsumer" +) + +func main() { + hub, _ := initHub() + + reader := bufio.NewReader(os.Stdin) + for { + fmt.Print("Enter text: ") + text, _ := reader.ReadString('\n') + hub.Send(context.Background(), &amqp.Message{Data: []byte(text)}) + if text == "exit\n" { + break + } + } +} + +func initHub() (eventhub.SenderReceiver, *[]string) { + subscriptionID := mustGetenv("AZURE_SUBSCRIPTION_ID") + namespace := mustGetenv("EVENTHUB_NAMESPACE") + creds := eventhub.ServicePrincipalCredentials{ + TenantID: mustGetenv("AZURE_TENANT_ID"), + ApplicationID: mustGetenv("AZURE_CLIENT_ID"), + Secret: mustGetenv("AZURE_CLIENT_SECRET"), + } + ns, err := eventhub.NewNamespaceWithServicePrincipalCredentials(subscriptionID, ResourceGroupName, namespace, creds, azure.PublicCloud) + if err != nil { + panic(err) + } + + hubMgmt, err := ns.EnsureEventHub(context.Background(), HubName) + if err != nil { + log.Fatal(err) + } + + hub, err := ns.NewEventHub(HubName) + if err != nil { + log.Fatal(err) + } + return hub, hubMgmt.PartitionIds +} + +func mustGetenv(key string) string { + v := os.Getenv(key) + if v == "" { + panic("Environment variable '" + key + "' required for integration tests.") + } + return v +}