This commit is contained in:
David Justice 2018-03-23 11:01:14 -07:00
Родитель 90202b957b
Коммит 9a0b4d0bd6
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 2B44C6BF9F416319
7 изменённых файлов: 313 добавлений и 17 удалений

Двоичные данные
_content/sas-policy.png Normal file

Двоичный файл не отображается.

После

Ширина:  |  Высота:  |  Размер: 47 KiB

Просмотреть файл

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/Azure/azure-event-hubs-go"
@ -51,6 +52,11 @@ func main() {
return
}
}
hub, err := eventhub.NewHubFromEnvironment()
if err != nil {
// handle err
}
}
func initHub() (*eventhub.Hub, []string) {

Просмотреть файл

@ -33,7 +33,7 @@ import (
type (
leasedReceiver struct {
handle eventhub.ListenerHandle
handle *eventhub.ListenerHandle
processor *EventProcessorHost
lease LeaseMarker
done func()

63
hub.go
Просмотреть файл

@ -104,7 +104,33 @@ func NewHub(namespace, name string, tokenProvider auth.TokenProvider, opts ...Hu
}
// NewHubWithNamespaceNameAndEnvironment creates a new Event Hub client for sending and receiving messages from
// environment variables with supplied namespace and name
// environment variables with supplied namespace and name which will attempt to build a token provider from
// environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither
// can be built, it will return error.
//
// SAS TokenProvider environment variables:
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_KEY_NAME" the name of the Event Hub key
// - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
// "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOption) (*Hub, error) {
var provider auth.TokenProvider
aadProvider, aadErr := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
@ -133,6 +159,39 @@ func NewHubWithNamespaceNameAndEnvironment(namespace, name string, opts ...HubOp
}
// NewHubFromEnvironment creates a new Event Hub client for sending and receiving messages from environment variables
//
// Expected Environment Variables:
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_NAME" the name of the Event Hub instance
//
//
// This method depends on NewHubWithNamespaceNameAndEnvironment which will attempt to build a token provider from
// environment variables. If unable to build a AAD Token Provider it will fall back to a SAS token provider. If neither
// can be built, it will return error.
//
// SAS TokenProvider environment variables:
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_KEY_NAME" the name of the Event Hub key
// - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
//
//
// AAD TokenProvider environment variables:
// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
// "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
func NewHubFromEnvironment(opts ...HubOption) (*Hub, error) {
const envErrMsg = "environment var %s must not be empty"
var namespace, name string
@ -188,7 +247,7 @@ func (h *Hub) Close() error {
}
// Receive subscribes for messages sent to the provided entityPath.
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (ListenerHandle, error) {
func (h *Hub) Receive(ctx context.Context, partitionID string, handler Handler, opts ...ReceiveOption) (*ListenerHandle, error) {
h.receiverMu.Lock()
defer h.receiverMu.Unlock()

Просмотреть файл

@ -22,7 +22,6 @@ package test
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE
import (
"context"
"errors"

235
readme.md
Просмотреть файл

@ -8,6 +8,241 @@ using any real-time analytics provider or with batching/storage adapters.
Refer to the [online documentation](https://azure.microsoft.com/services/event-hubs/) to learn more about Event Hubs in
general.
This library is a pure Golang implementation of Azure Event Hubs over AMQP.
## Preview of Event Hubs for Golang
This library is currently a preview. The may be breaking interface change until it reaches `1.0.0`. If you run into an
issue, please don't hesitate to log a [new issue](https://github.com/Azure/azure-event-hubs-go/issues/new) or open a
pull request.
## Installing the library
To more reliably manage dependencies in your application we recommend [golang/dep](https://github.com/golang/dep).
With dep:
```
dep ensure -add github.com/Azure/azure-event-hubs-go
```
With go get:
```
go get -u github.com/Azure/azure-event-hubs-go/...
```
If you need to install Go, follow [the official instructions](https://golang.org/dl/)
## Using the Event Hubs
In this section we'll cover some basics of the library to help you get started.
This library has two main dependencies, [vcabbage/amqp](https://github.com/vcabbage/amqp) and
[Azure AMQP Common](https://github.com/Azure/azure-amqp-common-go). The former provides the AMQP protocol implementation
and the later provides some common authentication, persistence and request-response message flows.
### Quick start
Let's send and receive `"hello, world!"`.
```go
// create a new Event Hub from environment variables
// the go docs for the func have a full description of the environment variables
hub, err := eventhub.NewHubFromEnvironment()
if err != nil {
// handle err
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// send a single message into a random partition
hub.Send(ctx, eventhub.NewEventFromString("hello, world!"))
handler := func(c context.Context, event *eventhub.Event) error {
fmt.Println(string(event.Data))
return nil
}
// listen to each partition of the Event Hub
runtimeInfo, err := hub.GetRuntimeInformation(ctx)
for _, partitionID := range runtimeInfo.PartitionIDs {
_, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
if err != nil {
// handle err
}
}
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
return hub.Close()
```
### Environment Variables
In the above example, the `Hub` instance was created using environment variables. Here is a list of environment
variables used in this project.
#### Event Hub env vars
- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
- "EVENTHUB_NAME" the name of the Event Hub instance
#### SAS TokenProvider environment variables:
There are two sets of environment variables which can produce a SAS TokenProvider
1) Expected Environment Variables:
- "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
- "EVENTHUB_KEY_NAME" the name of the Event Hub key
- "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
2) Expected Environment Variable:
- "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal like: `Endpoint=sb://foo.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=fluffypuppy`
#### AAD TokenProvider environment variables:
1) Client Credentials: attempt to authenticate with a [Service Principal](https://docs.microsoft.com/en-us/azure/azure-resource-manager/resource-group-create-service-principal-portal) via
- "AZURE_TENANT_ID" the Azure Tenant ID
- "AZURE_CLIENT_ID" the Azure Application ID
- "AZURE_CLIENT_SECRET" a key / secret for the corresponding application
2) Client Certificate: attempt to authenticate with a Service Principal via
- "AZURE_TENANT_ID" the Azure Tenant ID
- "AZURE_CLIENT_ID" the Azure Application ID
- "AZURE_CERTIFICATE_PATH" the path to the certificate file
- "AZURE_CERTIFICATE_PASSWORD" the password for the certificate
The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
### Authentication
Event Hubs offers a couple different paths for authentication, shared access signatures (SAS) and Azure Active Directory (AAD)
JWT authentication. Both token types are available for use and are exposed through the `TokenProvider` interface.
```go
// TokenProvider abstracts the fetching of authentication tokens
TokenProvider interface {
GetToken(uri string) (*Token, error)
}
```
#### SAS token provider
The SAS token provider uses the namespace of the Event Hub, the name of the "Shared access policy" key and the value of
the key to produce a token.
You can create new Shared access policies through the Azure portal as shown below.
![SAS policies in the Azure portal](./_content/sas-policy.png)
You can create a SAS token provider in a couple different ways. You can build one with a namespace, key name and key
value like this.
```go
provider, err := sas.NewTokenProvider("mynamespace", "myKeyName", "myKeyValue")
```
Or, you can create a token provider from environment variables like this.
```go
// TokenProviderWithEnvironmentVars creates a new SAS TokenProvider from environment variables
//
// There are two sets of environment variables which can produce a SAS TokenProvider
//
// 1) Expected Environment Variables:
// - "EVENTHUB_NAMESPACE" the namespace of the Event Hub instance
// - "EVENTHUB_KEY_NAME" the name of the Event Hub key
// - "EVENTHUB_KEY_VALUE" the secret for the Event Hub key named in "EVENTHUB_KEY_NAME"
//
// 2) Expected Environment Variable:
// - "EVENTHUB_CONNECTION_STRING" connection string from the Azure portal
provider, err := sas.NewTokenProvider(sas.TokenProviderWithEnvironmentVars())
```
#### AAD JWT token provider
The AAD JWT token provider uses Azure Active Directory to authenticate the service and acquire a token (JWT) which is
used to authenticate with Event Hubs. The authenticated identity must have `Contributor` role based authorization for
the Event Hub instance. [This article](https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-role-based-access-control)
provides more information about this preview feature.
The easiest way to create a JWT token provider is via environment variables.
```go
// 1. Client Credentials: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID" and
// "AZURE_CLIENT_SECRET"
//
// 2. Client Certificate: attempt to authenticate with a Service Principal via "AZURE_TENANT_ID", "AZURE_CLIENT_ID",
// "AZURE_CERTIFICATE_PATH" and "AZURE_CERTIFICATE_PASSWORD"
//
// 3. Managed Service Identity (MSI): attempt to authenticate via MSI
//
//
// The Azure Environment used can be specified using the name of the Azure Environment set in "AZURE_ENVIRONMENT" var.
provider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
```
You can also provide your own `adal.ServicePrincipalToken`.
```go
config := &aad.TokenProviderConfiguration{
ResourceURI: azure.PublicCloud.ResourceManagerEndpoint,
Env: &azure.PublicCloud,
}
spToken, err := config.NewServicePrincipalToken()
if err != nil {
// handle err
}
provider, err := aad.NewJWTProvider(aadToken)
```
### Send And Receive
The basics of messaging are sending and receiving messages. Here are the different ways you can do that.
#### Sending to a particular partition
By default, a Hub will send messages any of the load balanced partitions. Sometimes you want to send to only a
particular partition. You can do this in two ways.
1) You can supply a partition key on an event
```go
event := NewEventFromString("foo")
event.PartitionKey = "bazz"
hub.Send(ctx, event) // send event to the partition ID to which partition key hashes
```
2) You can build a hub instance that will only send to one partition.
```go
partitionID := "0"
hub, err := eventhub.NewHubFromEnvironment(HubWithPartitionedSender(partitionID))
```
#### Receiving
When receiving messages from an Event Hub, you always need to specify the partition you'd like to receive from.
`Hub.Receive` is a non-blocking call, which takes a message handler func and options. Since Event Hub is just a long
log of messages, you also have to tell it where to start from. By default, a receiver will start from the beginning
of the log, but there are options to help you specify your starting offset.
The `Receive` func returns a handle to the running receiver and an error. If error is returned, the receiver was unable
to start. If error is nil, the receiver is running and can be stopped by calling `Close` on the `Hub` or the handle
returned.
- Receive messages from a partition from the beginning of the log
```go
handle, err := hub.Receive(ctx, partitionID, func(ctx context.Context, event *eventhub.Event) error {
// do stuff
})
```
- Receive from the latest message onward
```go
handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithLatestOffset())
```
- Receive from a specified offset
```go
handle, err := hub.Receive(ctx, partitionID, handler, eventhub.ReceiveWithStartingOffset(offset))
```
At some point, a receiver process is going to stop. You will likely want it to start back up at the spot that it stopped
processing messages. This is where message offsets can be used to start from where you have left off.
The `Hub` struct can be customized to use an `persist.CheckpointPersister`. By default, a `Hub` uses an in-memory
`CheckpointPersister`, but accepts anything that implements the `perist.CheckpointPersister` interface.
```go
// CheckpointPersister provides persistence for the received offset for a given namespace, hub name, consumer group, partition Id and
// offset so that if a receiver where to be interrupted, it could resume after the last consumed event.
CheckpointPersister interface {
Write(namespace, name, consumerGroup, partitionID string, checkpoint Checkpoint) error
Read(namespace, name, consumerGroup, partitionID string) (Checkpoint, error)
}
```
For example, you could create a simple files system persister and use it likes so.
```go
hub, err := eventhub.NewHubFromEnvironment(HubWithOffsetPersistence(fileSystemPersister))
```
## Examples
- [HelloWorld: Producer and Consumer](./_examples/helloworld): an example of sending and receiving messages from an
Event Hub instance.

Просмотреть файл

@ -63,14 +63,8 @@ type (
// ReceiveOption provides a structure for configuring receivers
ReceiveOption func(receiver *receiver) error
// ListenerHandle provides a way to manage the lifespan of a listener
ListenerHandle interface {
Done() <-chan struct{}
Err() error
Close() error
}
listenerHandle struct {
// ListenerHandle provides the ability to close or listen to the close of a Receiver
ListenerHandle struct {
r *receiver
ctx context.Context
}
@ -162,7 +156,7 @@ func (r *receiver) Recover(ctx context.Context) error {
}
// Listen start a listener for messages sent to the entity path
func (r *receiver) Listen(handler Handler) ListenerHandle {
func (r *receiver) Listen(handler Handler) *ListenerHandle {
ctx, done := context.WithCancel(context.Background())
r.done = done
@ -170,7 +164,7 @@ func (r *receiver) Listen(handler Handler) ListenerHandle {
go r.listenForMessages(ctx, messages)
go r.handleMessages(ctx, messages, handler)
return &listenerHandle{
return &ListenerHandle{
r: r,
ctx: ctx,
}
@ -331,15 +325,18 @@ func (r *receiver) debugLogf(format string, args ...interface{}) {
log.Debugf(msg+" for entity identifier %q", r.getIdentifier())
}
func (lc *listenerHandle) Close() error {
// Close will close the listener
func (lc *ListenerHandle) Close() error {
return lc.r.Close()
}
func (lc *listenerHandle) Done() <-chan struct{} {
// Done will close the channel when the listener has stopped
func (lc *ListenerHandle) Done() <-chan struct{} {
return lc.ctx.Done()
}
func (lc *listenerHandle) Err() error {
// Err will return the last error encountered
func (lc *ListenerHandle) Err() error {
if lc.r.lastError != nil {
return lc.r.lastError
}