add an example
This commit is contained in:
Родитель
6add3316dd
Коммит
2bb8cc5082
|
@ -0,0 +1 @@
|
|||
bin
|
|
@ -0,0 +1,14 @@
|
|||
# Go parameters
|
||||
GOCMD=go
|
||||
GOBUILD=$(GOCMD) build
|
||||
GOCLEAN=$(GOCMD) clean
|
||||
GOTEST=$(GOCMD) test
|
||||
GOGET=$(GOCMD) get
|
||||
|
||||
all: clean build
|
||||
build:
|
||||
$(GOBUILD) -o ./bin/consumer ./consumer/main.go
|
||||
$(GOBUILD) -o ./bin/producer ./producer/main.go
|
||||
clean:
|
||||
$(GOCLEAN)
|
||||
rm -rf ./bin
|
|
@ -0,0 +1,80 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/Azure/azure-event-hubs-go"
|
||||
"fmt"
|
||||
"time"
|
||||
"os"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"log"
|
||||
"context"
|
||||
"pack.ag/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
Location = "eastus"
|
||||
ResourceGroupName = "ehtest"
|
||||
HubName = "producerConsumer"
|
||||
)
|
||||
|
||||
func main() {
|
||||
hub, partitions := initHub()
|
||||
exit := make(chan struct{})
|
||||
|
||||
handler := func(ctx context.Context, msg *amqp.Message) error {
|
||||
text := string(msg.Data)
|
||||
if text == "exit\n" {
|
||||
fmt.Println("Someone told me to exit!")
|
||||
exit <- *new(struct{})
|
||||
} else {
|
||||
fmt.Println(string(msg.Data))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, partitionID := range *partitions {
|
||||
hub.Receive(partitionID, handler)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-exit:
|
||||
fmt.Println("closing after 2 seconds")
|
||||
select {
|
||||
case <-time.After(2 * time.Second):
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initHub() (eventhub.SenderReceiver, *[]string) {
|
||||
subscriptionID := mustGetenv("AZURE_SUBSCRIPTION_ID")
|
||||
namespace := mustGetenv("EVENTHUB_NAMESPACE")
|
||||
creds := eventhub.ServicePrincipalCredentials{
|
||||
TenantID: mustGetenv("AZURE_TENANT_ID"),
|
||||
ApplicationID: mustGetenv("AZURE_CLIENT_ID"),
|
||||
Secret: mustGetenv("AZURE_CLIENT_SECRET"),
|
||||
}
|
||||
ns, err := eventhub.NewNamespaceWithServicePrincipalCredentials(subscriptionID, ResourceGroupName, namespace, creds, azure.PublicCloud)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hubMgmt, err := ns.EnsureEventHub(context.Background(), HubName)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
hub, err := ns.NewEventHub(HubName)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return hub, hubMgmt.PartitionIds
|
||||
}
|
||||
|
||||
func mustGetenv(key string) string {
|
||||
v := os.Getenv(key)
|
||||
if v == "" {
|
||||
panic("Environment variable '" + key + "' required for integration tests.")
|
||||
}
|
||||
return v
|
||||
}
|
|
@ -0,0 +1,65 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"github.com/Azure/azure-event-hubs-go"
|
||||
"fmt"
|
||||
"os"
|
||||
"github.com/Azure/go-autorest/autorest/azure"
|
||||
"log"
|
||||
"context"
|
||||
"pack.ag/amqp"
|
||||
"bufio"
|
||||
)
|
||||
|
||||
const (
|
||||
Location = "eastus"
|
||||
ResourceGroupName = "ehtest"
|
||||
HubName = "producerConsumer"
|
||||
)
|
||||
|
||||
func main() {
|
||||
hub, _ := initHub()
|
||||
|
||||
reader := bufio.NewReader(os.Stdin)
|
||||
for {
|
||||
fmt.Print("Enter text: ")
|
||||
text, _ := reader.ReadString('\n')
|
||||
hub.Send(context.Background(), &amqp.Message{Data: []byte(text)})
|
||||
if text == "exit\n" {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initHub() (eventhub.SenderReceiver, *[]string) {
|
||||
subscriptionID := mustGetenv("AZURE_SUBSCRIPTION_ID")
|
||||
namespace := mustGetenv("EVENTHUB_NAMESPACE")
|
||||
creds := eventhub.ServicePrincipalCredentials{
|
||||
TenantID: mustGetenv("AZURE_TENANT_ID"),
|
||||
ApplicationID: mustGetenv("AZURE_CLIENT_ID"),
|
||||
Secret: mustGetenv("AZURE_CLIENT_SECRET"),
|
||||
}
|
||||
ns, err := eventhub.NewNamespaceWithServicePrincipalCredentials(subscriptionID, ResourceGroupName, namespace, creds, azure.PublicCloud)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
hubMgmt, err := ns.EnsureEventHub(context.Background(), HubName)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
hub, err := ns.NewEventHub(HubName)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return hub, hubMgmt.PartitionIds
|
||||
}
|
||||
|
||||
func mustGetenv(key string) string {
|
||||
v := os.Getenv(key)
|
||||
if v == "" {
|
||||
panic("Environment variable '" + key + "' required for integration tests.")
|
||||
}
|
||||
return v
|
||||
}
|
Загрузка…
Ссылка в новой задаче