This commit is contained in:
Strong Liu 2021-11-22 00:09:03 +08:00
Родитель 3ced59c5e6
Коммит f807de2e30
10 изменённых файлов: 383 добавлений и 376 удалений

Просмотреть файл

@ -555,32 +555,32 @@ Legacy code:
[source,java]
----
@Bean
public EventHubInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubOperation eventhubOperation) {
eventhubOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
EventHubInboundChannelAdapter adapter = new EventHubInboundChannelAdapter(EVENTHUB_NAME,
eventhubOperation, CONSUMER_GROUP);
adapter.setOutputChannel(inputChannel);
return adapter;
}
public EventHubInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, EventHubOperation eventhubOperation) {
eventhubOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
EventHubInboundChannelAdapter adapter = new EventHubInboundChannelAdapter(EVENTHUB_NAME,
eventhubOperation, CONSUMER_GROUP);
adapter.setOutputChannel(inputChannel);
return adapter;
}
----
Modern code:
[source,java]
----
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsProcessorContainer processorContainer) {
CheckpointConfig config = new CheckpointConfig(CheckpointMode.MANUAL);
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsProcessorContainer processorContainer) {
CheckpointConfig config = new CheckpointConfig(CheckpointMode.MANUAL);
EventHubsInboundChannelAdapter adapter =
new EventHubsInboundChannelAdapter(processorContainer, EVENTHUB_NAME,
CONSUMER_GROUP, config);
adapter.setOutputChannel(inputChannel);
return adapter;
}
EventHubsInboundChannelAdapter adapter =
new EventHubsInboundChannelAdapter(processorContainer, EVENTHUB_NAME,
CONSUMER_GROUP, config);
adapter.setOutputChannel(inputChannel);
return adapter;
}
----
2.DefaultMessageHandler sample code:
@ -589,24 +589,23 @@ Legacy code:
[source,java]
----
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubOperation queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubOperation queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
return handler;
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
return handler;
}
----
Modern code:
@ -614,23 +613,23 @@ Modern code:
[source,java]
----
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
return handler;
}
return handler;
}
----
===== Package path changes
@ -672,77 +671,78 @@ Legacy code:
[source,java]
----
@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusQueueOperation queueOperation) {
queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter("QUEUE_NAME",
queueOperation);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusQueueInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusQueueOperation queueOperation) {
queueOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusQueueInboundChannelAdapter adapter = new ServiceBusQueueInboundChannelAdapter("QUEUE_NAME",
queueOperation);
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusTopicInboundChannelAdapter topicMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusTopicOperation topicOperation) {
topicOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusTopicInboundChannelAdapter adapter = new ServiceBusTopicInboundChannelAdapter("TOPIC_NAME",
topicOperation, "SUBSCRIPTION_NAME");
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusTopicInboundChannelAdapter topicMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusTopicOperation topicOperation) {
topicOperation.setCheckpointConfig(CheckpointConfig.builder().checkpointMode(CheckpointMode.MANUAL).build());
ServiceBusTopicInboundChannelAdapter adapter = new ServiceBusTopicInboundChannelAdapter("TOPIC_NAME",
topicOperation, "SUBSCRIPTION_NAME");
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "OUTPUT_CHANNEL_NAME")
public MessageHandler queueMessageSender(ServiceBusQueueOperation queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler("QUEUE_NAME", queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
@Bean
@ServiceActivator(inputChannel = "OUTPUT_CHANNEL_NAME")
public MessageHandler queueMessageSender(ServiceBusQueueOperation queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler("QUEUE_NAME", queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
----
* Modern code:
====
[source,java]
----
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, "QUEUE_NAME",
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier("INPUT_CHANNEL_NAME") MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, "QUEUE_NAME",
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "OUTPUT_CHANNEL_NAME")
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler("QUEUE_NAME", serviceBusTemplate);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully for {}.", "QUEUE_NAME");
}
@Bean
@ServiceActivator(inputChannel = "OUTPUT_CHANNEL_NAME")
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler("QUEUE_NAME", serviceBusTemplate);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully for {}.", "QUEUE_NAME);
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
----
====
==== spring-cloud-azure-starter-integration-storage-queue

Просмотреть файл

@ -28,14 +28,20 @@ If you want to retrieve the connection string using Azure Resource Manager, plea
=== Configuration
Below properties could be configured when using Kafka support:
[cols="2*", options="header"]
|===
|Properties |Description
|*spring.cloud.azure.eventhubs*.kafka.enabled |Whether to enable the Azure Event Hubs Kafka support, defult to true.
|*spring.cloud.azure.eventhubs*.connection-string |Azure Event Hubs connection string. Should be provided when want to provide the connection string directly.
|*spring.cloud.azure.eventhubs*.namespace |Azure Event Hubs namespace. Should be provided when want to retrieve the connection information through Azure Resource Manager.
|*spring.cloud.azure.eventhubs*.resource.resource-group |The resource group of Azure Event Hubs namespace. Should be provided when want to retrieve the connection information through Azure Resource Manager.
|*spring.cloud.azure*.profile.subscription-id| The subscription id. Should be provided when want to retrieve the connection information through Azure Resource Manager.|
|Properties
|Description
|*spring.cloud.azure.eventhubs*.kafka.enabled
|Whether to enable the Azure Event Hubs Kafka support, defult to true.
|*spring.cloud.azure.eventhubs*.connection-string
|Azure Event Hubs connection string. Should be provided when want to provide the connection string directly.
|*spring.cloud.azure.eventhubs*.namespace
|Azure Event Hubs namespace. Should be provided when want to retrieve the connection information through Azure Resource Manager.
|*spring.cloud.azure.eventhubs*.resource.resource-group
|The resource group of Azure Event Hubs namespace. Should be provided when want to retrieve the connection information through Azure Resource Manager.
|*spring.cloud.azure*.profile.subscription-id
| The subscription id. Should be provided when want to retrieve the connection information through Azure Resource Manager.
|===
NOTE: Authentication information is also required for authenticating for Azure Resource Manager. The credential related configurations of Resource Manager should be configured under prefix `spring.cloud.azure`. Please refer to link:index.html#authentication[Authentication] for more details.

Просмотреть файл

@ -21,7 +21,7 @@ Adding below dependencies if you want to use the Spring Cloud Azure Redis suppor
=== Configuration
Below properties could be configured when using Redis support:
[cols="4*", options="header"]
|===
|Properties |Description |Default Value | Required
|*spring.cloud.azure.redis*.enabled |Azure Cache for Redis instance name.|true | No

Просмотреть файл

@ -28,7 +28,7 @@ Connect to Azure Storage using Spring Resource abstraction and Azure Storage lib
=== Configuration
This `spring-cloud-azure-starter-storage-blob` provides the following properties:
[cols="3*", options="header"]
|===
|Name | Default | Description
|spring.cloud.azure.storage.blob.enabled | `true` | Whether an Azure Blob Storage Service is enabled
@ -38,7 +38,7 @@ This `spring-cloud-azure-starter-storage-blob` provides the following properties
|===
This `spring-cloud-azure-starter-storage-file-share` provides the following properties:
[cols="3*", options="header"]
|===
|Name | Default | Description
|spring.cloud.azure.storage.fileshare.enabled | `true` | Whether Azure File Storage Service is enabled
@ -59,13 +59,13 @@ spring:
azure:
storage:
blob:
account-name: ${STORAGE-ACCOUNT-NAME}
account-key: ${STORAGE-ACCOUNT-PRIVATE-KEY}
endpoint: ${STORAGE-BLOB-ENDPOINT}
account-name: ${STORAGE_ACCOUNT_NAME}
account-key: ${STORAGE_ACCOUNT_PRIVATE_KEY}
endpoint: ${STORAGE_BLOB_ENDPOINT}
fileshare:
account-name: ${STORAGE-ACCOUNT-NAME}
account-key: ${STORAGE-ACCOUNT-PRIVATE-KEY}
endpoint: ${STORAGE-FILESHARE-ENDPOINT}
account-name: ${STORAGE_ACCOUNT_NAME}
account-key: ${STORAGE_ACCOUNT_PRIVATE_KEY}
endpoint: ${STORAGE_FILESHARE_ENDPOINT}
----

Просмотреть файл

@ -16,10 +16,9 @@ Construct `TokenCredential` by using various credential information, and then co
=== Configuration
This Spring Cloud Azure Resource Manager provides the following properties:
[cols="2*", options="header"]
|===
|Properties |Description
|*spring.cloud.azure.resource-manager*.enabled |Whether the Resource Manager is enabled. Default is true.
|*spring.cloud.azure.credential*.client-certificate-password |Password of the certificate file.
|*spring.cloud.azure.credential*.client-certificate-path |Path of a PEM certificate file to use when performing service principal authentication with Azure.

Просмотреть файл

@ -17,7 +17,7 @@ spring-cloud-azure-starter-keyvault-secrets adds Azure Key Vault as one of the S
=== Configuration
Configurable items
[cols="2*", options="header"]
|===
|Properties |Description
| spring.cloud.azure.keyvault.secret.endpoint | Key Vault uri
@ -30,7 +30,6 @@ Configurable items
| spring.cloud.azure.keyvault.secret.property-sources[].case-sensitive | Whether the secret name is case-sensitive
| spring.cloud.azure.keyvault.secret.property-sources[].secret-keys | The supported secret names. If not configured, it will retrieve all secret names.
| spring.cloud.azure.keyvault.secret.property-sources[].refresh-interval | Refresh interval
|
|===
=== Basic Usage
@ -52,7 +51,7 @@ spring:
keyvault:
secret:
property-source-enabled: true
endpoint: ${KEY_VAULT_ENDPOINT}
endpoint: ${AZURE_KEYVAULT_ENDPOINT}
----
===== Java code

Просмотреть файл

@ -182,12 +182,12 @@ To see the list of all Spring Cloud Azure related configuration properties pleas
this channel is open by default, you can handle the error message in this way:
----
// Replace destination with spring.cloud.stream.bindings.input.destination
// Replace group with spring.cloud.stream.bindings.input.group
@ServiceActivator(inputChannel == "{destination}.{group}.errors")
public void consumerError(Message<?> message) {
LOGGER.error("Handling customer ERROR: " + message);
}
// Replace destination with spring.cloud.stream.bindings.input.destination
// Replace group with spring.cloud.stream.bindings.input.group
@ServiceActivator(inputChannel == "{destination}.{group}.errors")
public void consumerError(Message<?> message) {
LOGGER.error("Handling customer ERROR: " + message);
}
----
*_producer error channel_*
@ -201,11 +201,11 @@ spring.cloud.stream.default.producer.errorChannelEnabled=true
you can handle the error message in this way:
----
// Replace destination with spring.cloud.stream.bindings.output.destination
@ServiceActivator(inputChannel == "{destination}.errors")
public void producerError(Message<?> message) {
LOGGER.error("Handling Producer ERROR: " + message);
}
// Replace destination with spring.cloud.stream.bindings.output.destination
@ServiceActivator(inputChannel == "{destination}.errors")
public void producerError(Message<?> message) {
LOGGER.error("Handling Producer ERROR: " + message);
}
----
===== Batch Consumer Sample
@ -221,8 +221,8 @@ spring:
stream:
bindings:
consume-in-0:
destination: {event-hub-name}
group: [consumer-group-name]
destination: ${AZURE_EVENTHUB_NAME}
group: ${AZURE_EVENTHUB_CONSUMER_GROUP}
consumer:
batch-mode: true
eventhubs:
@ -242,49 +242,49 @@ For checkpointing mode as BATCH, you can use below code to send messages and con
[source,java]
----
@Bean
public Consumer<List<String>> consume() {
return list -> list.forEach(event -> LOGGER.info("New event received: '{}'",event));
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
};
}
@Bean
public Consumer<List<String>> consume() {
return list -> list.forEach(event -> LOGGER.info("New event received: '{}'",event));
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
};
}
----
For checkpointing mode as MANUAL, you can use below code to send messages and consume/checkpoint in batches.
[source,java]
----
@Bean
public Consumer<Message<List<String>>> consume() {
return message -> {
for (int i == 0; i < message.getPayload().size(); i++) {
LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getPayload().get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.PARTITION_KEY)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.OFFSET)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)).get(i));
}
Checkpointer checkpointer == (Checkpointer) message.getHeaders().get(CHECKPOINTER);
checkpointer.success()
.doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
.doOnError(error -> LOGGER.error("Exception found", error))
.subscribe();
};
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
};
}
@Bean
public Consumer<Message<List<String>>> consume() {
return message -> {
for (int i == 0; i < message.getPayload().size(); i++) {
LOGGER.info("New message received: '{}', partition key: {}, sequence number: {}, offset: {}, enqueued time: {}",
message.getPayload().get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.PARTITION_KEY)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.SEQUENCE_NUMBER)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.OFFSET)).get(i),
((List<Object>) message.getHeaders().get(EventHubsHeaders.ENQUEUED_TIME)).get(i));
}
Checkpointer checkpointer == (Checkpointer) message.getHeaders().get(CHECKPOINTER);
checkpointer.success()
.doOnSuccess(success -> LOGGER.info("Message '{}' successfully checkpointed", message.getPayload()))
.doOnError(error -> LOGGER.error("Exception found", error))
.subscribe();
};
}
@Bean
public Supplier<Message<String>> supply() {
return () -> {
LOGGER.info("Sending message, sequence " + i);
return MessageBuilder.withPayload("\"test"+ i++ +"\"").build();
};
}
----
=== Spring Cloud Stream Binder for Azure Service Bus

Просмотреть файл

@ -20,7 +20,7 @@ Connect to Cosmos DB using Spring Data and CosmosDB libraries.
=== Configuration
Below are some properties you can configure when using Spring Data CosmosDB Support:
[cols="3*", options="header"]
|===
|Name | Description | Default
|spring.cloud.azure.cosmos.enabled | Whether Azure Cosmos Service is enabled. | `true`
@ -49,9 +49,9 @@ spring:
cloud:
azure:
cosmos:
key: ${YOUR_COSMOS_KEY}
endpoint: ${YOUR_COSMOS_ENDPOINT}
database: ${YOUR_COSMOS_DATABASE}
key: ${AZURE_COSMOS_KEY}
endpoint: ${AZURE_COSMOS_ENDPOINT}
database: ${AZURE_COSMOS_DATABASE}
----
=== Samples

Просмотреть файл

@ -26,7 +26,13 @@ Provide Spring Integration support for these Azure services: Event Hubs, Service
This starter provides the following 2 parts of configuration options:
===== Azure Common Configuration Options
<<<<<<< HEAD
Below properties can also be configured with the default Spring Cloud Azure unified properties by changing the prefix from *spring.cloud.azure.eventhubs.* to *spring.cloud.azure.*.
=======
Below properties can also be configured with the default Spring Cloud Azure unified properties,
of which the prefix is changed from *spring.cloud.azure.eventhubs.* to *spring.cloud.azure.*.
[cols="3*", options="header"]
>>>>>>> 1ffefa6e (update properties)
|===
|Properties | Type |Description
@ -68,6 +74,7 @@ Below properties can also be configured with the default Spring Cloud Azure unif
===== Azure Event Hubs Client Configuration Options
Below options are used to configure Azure Event Hubs SDK Client.
[cols="3*", options="header"]
|===
|Properties | Type |Description
@ -97,7 +104,7 @@ spring:
cloud:
azure:
eventhubs:
connection-string: [servicebus-connection-string]
connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
----
* For credentials as MSI, configure below properties in `application.yml`:
@ -107,17 +114,17 @@ spring:
cloud:
azure:
credential:
managed-identity-client-id: [managed-identity-client-id]
managed-identity-client-id: ${AZURE_CLIENT_ID}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
eventhubs:
namespace: [servicebus-namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
* For credentials as service principal, configure below properties in application.yml:
@ -127,59 +134,59 @@ spring:
cloud:
azure:
credential:
client-id: [client-id]
client-secret: [client-secret]
client-id: ${AZURE_CLIENT_ID}
client-secret: ${AZURE_CLIENT_SECRET}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
eventhubs:
namespace: [namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
- Step 2. Create `DefaultMessageHandler` with the bean of `EventHubsTemplate` to send messages to Event Hubs.
[source,java]
----
private static final String OUTPUT_CHANNEL = "output";
private static final String EVENTHUB_NAME = "eh1";
private static final String OUTPUT_CHANNEL = "output";
private static final String EVENTHUB_NAME = "eh1";
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
return handler;
}
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(EventHubsTemplate queueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(EVENTHUB_NAME, queueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.error("There was an error sending the message.", ex);
}
});
return handler;
}
----
- Step 3. Create a Message gateway binding with the message handler created in the last step via a message channel
[source,java]
----
@Autowired
EventHubOutboundGateway messagingGateway;
@Autowired
EventHubOutboundGateway messagingGateway;
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface EventHubOutboundGateway {
void send(String text);
}
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface EventHubOutboundGateway {
void send(String text);
}
----
- Step 4. Send messages using the gateway
[source,java]
----
this.messagingGateway.send(message);
this.messagingGateway.send(message);
----
===== Receive messages from Azure Event Hubs
@ -187,43 +194,43 @@ spring:
- Step 2. Create a bean of message channel as the input channel.
[source,java]
----
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
private static final String INPUT_CHANNEL = "input";
private static final String EVENTHUB_NAME = "eh1";
private static final String CONSUMER_GROUP = "$Default";
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel input() {
return new DirectChannel();
}
----
- Step 3. Create `EventHubsInboundChannelAdapter` with the bean of `EventHubsProcessorContainer` to receive messages to Event Hubs.
[source,java]
----
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsProcessorContainer processorContainer) {
CheckpointConfig config = new CheckpointConfig(CheckpointMode.MANUAL);
@Bean
public EventHubsInboundChannelAdapter messageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel,
EventHubsProcessorContainer processorContainer) {
CheckpointConfig config = new CheckpointConfig(CheckpointMode.MANUAL);
EventHubsInboundChannelAdapter adapter =
new EventHubsInboundChannelAdapter(processorContainer, EVENTHUB_NAME,
CONSUMER_GROUP, config);
adapter.setOutputChannel(inputChannel);
return adapter;
}
EventHubsInboundChannelAdapter adapter =
new EventHubsInboundChannelAdapter(processorContainer, EVENTHUB_NAME,
CONSUMER_GROUP, config);
adapter.setOutputChannel(inputChannel);
return adapter;
}
----
- Step 4. Create a message receiver binding with EventHubsInboundChannelAdapter created in the last step via the message channel we created before.
[source,java]
----
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
----
==== Samples
@ -310,7 +317,7 @@ spring:
cloud:
azure:
servicebus:
connection-string: [servicebus-connection-string]
connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
----
- For credentials as MSI, configure below properties in application.yml:
@ -320,17 +327,17 @@ spring:
cloud:
azure:
credential:
managed-identity-client-id: [managed-identity-client-id]
managed-identity-client-id: ${AZURE_CLIENT_ID}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
servicebus:
namespace: [servicebus-namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
- For credentials as service principal, configure below properties in application.yml:
@ -340,64 +347,63 @@ spring:
cloud:
azure:
credential:
client-id: [client-id]
client-secret: [client-secret]
client-id: ${AZURE_CLIENT_ID}
client-secret: ${AZURE_CLIENT_SECRET}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
servicebus:
namespace: [namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
* Step 2. Create `DefaultMessageHandler` with the bean of `ServiceBusTemplate` to send messages to Service Bus,
set the entity type for the ServiceBusTemplate.
[source,java]
----
private static final String OUTPUT_CHANNEL = "queue.output";
private static final String OUTPUT_CHANNEL = "queue.output";
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler queueMessageSender(ServiceBusTemplate serviceBusTemplate) {
serviceBusTemplate.setDefaultEntityType(ServiceBusEntityType.QUEUE);
DefaultMessageHandler handler = new DefaultMessageHandler(QUEUE_NAME, serviceBusTemplate);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
return handler;
}
----
* Step 3. Create a Message gateway binding with the message handler created in the last stop via a message channel
[source,java]
----
@Autowired
QueueOutboundGateway messagingGateway;
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(String text);
}
@Autowired
QueueOutboundGateway messagingGateway;
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface QueueOutboundGateway {
void send(String text);
}
----
* Step 4. Send messages using the gateway
[source,java]
----
this.messagingGateway.send(message);
this.messagingGateway.send(message);
----
===== Receive messages from Azure Service Bus
@ -405,39 +411,39 @@ set the entity type for the ServiceBusTemplate.
* Step 2. Create a bean of message channel as the input channel.
[source,java]
----
private static final String INPUT_CHANNEL = "input";
private static final String INPUT_CHANNEL = "input";
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel input() {
return new DirectChannel();
}
----
* Step 3. Create `ServiceBusInboundChannelAdapter` with the bean of `ServiceBusProcessorContainer` to receive messages to Service Bus.
[source,java]
----
private static final String QUEUE_NAME = "queue1";
private static final String QUEUE_NAME = "queue1";
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}
@Bean
public ServiceBusInboundChannelAdapter queueMessageChannelAdapter(
@Qualifier(INPUT_CHANNEL) MessageChannel inputChannel, ServiceBusProcessorContainer processorContainer) {
ServiceBusInboundChannelAdapter adapter = new ServiceBusInboundChannelAdapter(processorContainer, QUEUE_NAME,
new CheckpointConfig(CheckpointMode.MANUAL));
adapter.setOutputChannel(inputChannel);
return adapter;
}
----
* Step 4. Create a message receiver binding with ServiceBusInboundChannelAdapter created in the last step via the message channel we created before.
[source,java]
----
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnSuccess(s -> LOGGER.info("Message '{}' successfully checkpointed", message))
.doOnError(e -> LOGGER.error("Error found", e))
.subscribe();
}
----
==== Samples
@ -590,7 +596,7 @@ spring:
azure:
storage:
queue:
connection-string: [servicebus-connection-string]
connection-string: ${AZURE_SERVICE_BUS_CONNECTION_STRING}
----
- For credentials as MSI, configure below properties in application.yml:
@ -600,18 +606,18 @@ spring:
cloud:
azure:
credential:
managed-identity-client-id: [managed-identity-client-id]
managed-identity-client-id: ${AZURE_CLIENT_ID}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
storage:
queue:
namespace: [servicebus-namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
- For credentials as service principal, configure below properties in application.yml:
@ -621,63 +627,62 @@ spring:
cloud:
azure:
credential:
client-id: [client-id]
client-secret: [client-secret]
client-id: ${AZURE_CLIENT_ID}
client-secret: ${AZURE_CLIENT_SECRET}
profile:
tenant-id: [tenant-id]
tenant-id: ${AZURE_TENANT_ID}
# Uncomment below configurations if you want to enable auto creating resources.
# subscription-id: [subscription-id]
# subscription-id: ${AZURE_SUBSCRIPTION_ID}
# cloud: Azure
# resource:
# region: [region]
storage:
queue:
namespace: [servicebus-namespace]
namespace: ${AZURE_SERVICE_BUS_NAMESPACE}
----
* Step 2. Create `DefaultMessageHandler` with the bean of `StorageQueueOperation` to send messages to Storage Queue.
[source,java]
----
private static final String STORAGE_QUEUE_NAME = "example";
private static final String OUTPUT_CHANNEL = "output";
private static final String STORAGE_QUEUE_NAME = "example";
private static final String OUTPUT_CHANNEL = "output";
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(StorageQueueOperation storageQueueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Bean
@ServiceActivator(inputChannel = OUTPUT_CHANNEL)
public MessageHandler messageSender(StorageQueueOperation storageQueueOperation) {
DefaultMessageHandler handler = new DefaultMessageHandler(STORAGE_QUEUE_NAME, storageQueueOperation);
handler.setSendCallback(new ListenableFutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
LOGGER.info("Message was sent successfully.");
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
@Override
public void onFailure(Throwable ex) {
LOGGER.info("There was an error sending the message.");
}
});
return handler;
}
----
* Step 3. Create a Message gateway binding with the message handler created in the last stop via a message channel
[source,java]
----
@Autowired
StorageQueueOutboundGateway storageQueueOutboundGateway;
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface StorageQueueOutboundGateway {
void send(String text);
}
@Autowired
StorageQueueOutboundGateway storageQueueOutboundGateway;
@MessagingGateway(defaultRequestChannel = OUTPUT_CHANNEL)
public interface StorageQueueOutboundGateway {
void send(String text);
}
----
* Step 4. Send messages using the gateway
[source,java]
----
this.storageQueueOutboundGateway.send(message);
this.storageQueueOutboundGateway.send(message);
----
===== Receive messages from Azure Storage Queue
@ -685,39 +690,39 @@ spring:
* Step 2. Create a bean of message channel as the input channel.
[source,java]
----
private static final String INPUT_CHANNEL = "input";
private static final String INPUT_CHANNEL = "input";
@Bean
public MessageChannel input() {
return new DirectChannel();
}
@Bean
public MessageChannel input() {
return new DirectChannel();
}
----
* Step 3. Create `StorageQueueMessageSource` with the bean of `StorageQueueOperation` to receive messages to Storage Queue.
[source,java]
----
private static final String STORAGE_QUEUE_NAME = "example";
private static final String STORAGE_QUEUE_NAME = "example";
@Bean
@InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
public StorageQueueMessageSource storageQueueMessageSource(StorageQueueOperation storageQueueOperation) {
storageQueueOperation.setCheckpointMode(CheckpointMode.MANUAL);
storageQueueOperation.setVisibilityTimeoutInSeconds(10);
@Bean
@InboundChannelAdapter(channel = INPUT_CHANNEL, poller = @Poller(fixedDelay = "1000"))
public StorageQueueMessageSource storageQueueMessageSource(StorageQueueOperation storageQueueOperation) {
storageQueueOperation.setCheckpointMode(CheckpointMode.MANUAL);
storageQueueOperation.setVisibilityTimeoutInSeconds(10);
return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueOperation);
}
return new StorageQueueMessageSource(STORAGE_QUEUE_NAME, storageQueueOperation);
}
----
* Step 4. Create a message receiver binding with StorageQueueMessageSource created in the last step via the message channel we created before.
[source,java]
----
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnError(Throwable::printStackTrace)
.doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
.subscribe();
}
@ServiceActivator(inputChannel = INPUT_CHANNEL)
public void messageReceiver(byte[] payload, @Header(AzureHeaders.CHECKPOINTER) Checkpointer checkpointer) {
String message = new String(payload);
LOGGER.info("New message received: '{}'", message);
checkpointer.success()
.doOnError(Throwable::printStackTrace)
.doOnSuccess(t -> LOGGER.info("Message '{}' successfully checkpointed", message))
.subscribe();
}
----
==== Samples

Просмотреть файл

@ -15,10 +15,9 @@
==== Configuration
This starter provides the following properties:
[cols="2*", options="header"]
|===
|Properties |Description
|*spring.cloud.azure.active-directory*.app-id-uri |It used in resource server, used to validate the audience in access_token. access_token is valid only when the audience in access_token equal to client-id or app-id-uri
|*spring.cloud.azure.active-directory*.authorization-clients |A map configure the resource APIs the application is going to visit. Each item corresponding to one resource API the application is going to visit. In Spring code, each item corresponding to one OAuth2AuthorizedClient object
|*spring.cloud.azure.active-directory*.authorization-clients.${AZURE_CLIENT_NAME}.scopes |API permissions of a resource server that the application is going to acquire.
@ -44,10 +43,9 @@ Here are some examples about how to use these properties:
===== Property example 1: Application type
This property(`spring.cloud.azure.active-directory.application-type`) is optional, its value can be inferred by dependencies, only `web_application_and_resource_server` must be configured manually: `spring.cloud.azure.active-directory.application-type=web_application_and_resource_server`.
[cols="4*", options="header"]
|===
|Has dependency: spring-security-oauth2-client |Has dependency: spring-security-oauth2-resource-server |Valid values of application type |Default value
|Yes |No |`web_application` |`web_application`
|No |Yes |`resource_server` |`resource_server`
|Yes |Yes |`web_application`,`resource_server`,`resource_server_with_obo`, `web_application_and_resource_server` |`resource_server_with_obo`
@ -354,10 +352,10 @@ spring:
cloud:
azure:
active-directory:
tenant-id: <Tenant-id-registered-by-application>
client-id: <Web-API-A-client-id>
client-secret: <Web-API-A-client-secret>
app-id-uri: <Web-API-A-app-id-url>
tenant-id: ${AZURE_TENANT_ID}
client-id: ${AZURE_CLIENT_ID}
client-secret: ${AZURE_CLIENT_SECRET}
app-id-uri: ${WEB_API_ID_URI}
authorization-clients:
graph:
scopes:
@ -414,10 +412,10 @@ spring:
cloud:
azure:
active-directory:
tenant-id: <Tenant-id-registered-by-application>
client-id: <Web-API-C-client-id>
client-secret: <Web-API-C-client-secret>
app-id-uri: <Web-API-C-app-id-url>
tenant-id: ${AZURE_TENANT_ID}
client-id: ${AZURE_CLIENT_ID}
client-secret: ${AZURE_CLIENT_SECRET}
app-id-uri: ${WEB_API_ID_URI}
application-type: web_application_and_resource_server # This is required.
authorization-clients:
graph: