Refactor EventHubClientFactory to return functional creator

This commit is contained in:
Warren Zhu 2018-07-02 13:38:10 +08:00
Родитель e2538d16c4
Коммит ade99e09c4
6 изменённых файлов: 116 добавлений и 200 удалений

Просмотреть файл

@ -12,11 +12,23 @@ public class Tuple<T, U> {
private final T first;
private final U second;
public Tuple(T first, U second) {
private Tuple(T first, U second) {
this.first = first;
this.second = second;
}
public static <T, U> Tuple<T, U> of(T first, U second) {
return new Tuple(first, second);
}
public T getFirst() {
return first;
}
public U getSecond() {
return second;
}
@Override
public boolean equals(Object o) {
if (this == o) {

Просмотреть файл

@ -13,8 +13,10 @@ import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.management.eventhub.AuthorizationRule;
import com.microsoft.azure.management.eventhub.EventHubAuthorizationKey;
import com.microsoft.azure.management.eventhub.EventHubNamespace;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.cloud.context.core.AzureUtil;
import com.microsoft.azure.spring.integration.core.Memoizer;
import com.microsoft.azure.spring.integration.core.Tuple;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -22,10 +24,11 @@ import org.springframework.beans.factory.DisposableBean;
import org.springframework.util.Assert;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import java.util.function.Function;
/**
* Default implementation of {@link EventHubClientFactory}.
@ -34,22 +37,17 @@ import java.util.stream.Stream;
*/
public class DefaultEventHubClientFactory implements EventHubClientFactory, DisposableBean {
private static final Log LOGGER = LogFactory.getLog(DefaultEventHubClientFactory.class);
// eventHubName -> eventHubClient
private final ConcurrentHashMap<String, EventHubClient> clientMap = new ConcurrentHashMap<>();
// eventHubName -> connectionString
private final ConcurrentHashMap<String, String> connectionStringMap = new ConcurrentHashMap<>();
private final Map<String, EventHubClient> clientsByName = new ConcurrentHashMap<>();
// (eventHubName, partitionId) -> partitionSender
private final ConcurrentHashMap<Tuple<String, String>, PartitionSender> partitionSenderMap =
new ConcurrentHashMap<>();
// (eventHubClient, partitionId) -> partitionSender
private final Map<Tuple<EventHubClient, String>, PartitionSender> partitionSenderMap = new ConcurrentHashMap<>();
// (eventHubName, consumerGroup) -> eventProcessorHost
private final ConcurrentHashMap<Tuple<String, String>, EventProcessorHost> processorHostMap =
new ConcurrentHashMap<>();
private final Map<Tuple<String, String>, EventProcessorHost> processorHostMap = new ConcurrentHashMap<>();
private final AzureAdmin azureAdmin;
private final String namespace;
private final EventHubNamespace namespace;
private String checkpointStorageAccountContainer;
private String checkpointStorageConnectionString;
@ -57,7 +55,7 @@ public class DefaultEventHubClientFactory implements EventHubClientFactory, Disp
Assert.notNull(azureAdmin, "azureAdmin can't be null.");
Assert.hasText(namespace, "namespace can't be null or empty");
this.azureAdmin = azureAdmin;
this.namespace = namespace;
this.namespace = azureAdmin.getOrCreateEventHubNamespace(namespace);
}
public void initCheckpointConnectionString(String checkpointStorageAccount) {
@ -71,62 +69,72 @@ public class DefaultEventHubClientFactory implements EventHubClientFactory, Disp
this.checkpointStorageAccountContainer = checkpointStorageAccountContainer;
}
@Override
public EventHubClient getOrCreateEventHubClient(String eventHubName) {
return this.clientMap.computeIfAbsent(eventHubName, key -> {
try {
return EventHubClient
.createSync(getOrCreateConnectionString(eventHubName), Executors.newSingleThreadExecutor());
} catch (EventHubException | IOException e) {
throw new EventHubRuntimeException("Error when creating event hub client", e);
}
});
@Override
public Function<String, EventHubClient> getEventHubClientCreator() {
return Memoizer.memoize(clientsByName, this::createEventHubClient);
}
@Override
public PartitionSender getOrCreatePartitionSender(String eventHubName, String partitionId) {
return this.partitionSenderMap.computeIfAbsent(new Tuple(eventHubName, partitionId), key -> {
try {
return getOrCreateEventHubClient(eventHubName).createPartitionSenderSync(partitionId);
} catch (EventHubException e) {
throw new EventHubRuntimeException("Error when creating event hub partition sender", e);
}
});
public Function<Tuple<EventHubClient, String>, PartitionSender> getPartitionSenderCreator() {
return Memoizer.memoize(partitionSenderMap, this::createPartitionSender);
}
@Override
public EventProcessorHost getOrCreateEventProcessorHost(String eventHubName, String consumerGroup) {
return this.processorHostMap.computeIfAbsent(new Tuple(eventHubName, consumerGroup),
key -> new EventProcessorHost(EventProcessorHost.createHostName("hostNamePrefix"), eventHubName,
consumerGroup, getOrCreateConnectionString(eventHubName), checkpointStorageConnectionString,
checkpointStorageAccountContainer));
public Function<Tuple<String, String>, EventProcessorHost> getProcessorHostCreator() {
return Memoizer.memoize(processorHostMap, this::createEventProcessorHost);
}
private String getOrCreateConnectionString(String eventHubName) {
private EventHubClient createEventHubClient(String eventHubName) {
try {
return EventHubClient
.createSync(connectionStringCreator().apply(eventHubName), Executors.newSingleThreadExecutor());
} catch (EventHubException | IOException e) {
throw new EventHubRuntimeException("Error when creating event hub client", e);
}
}
return this.connectionStringMap.computeIfAbsent(eventHubName,
name -> azureAdmin.getOrCreateEventHubNamespace(namespace).listAuthorizationRules().stream().findFirst()
.map(AuthorizationRule::getKeys)
.map(EventHubAuthorizationKey::primaryConnectionString)
.map(s -> new ConnectionStringBuilder(s).setEventHubName(name).toString())
.orElseThrow(() -> new EventHubRuntimeException(
String.format("Failed to fetch connection string of '%s'", eventHubName),
null)));
private PartitionSender createPartitionSender(Tuple<EventHubClient, String> clientAndPartitionId) {
try {
return clientAndPartitionId.getFirst().createPartitionSenderSync(clientAndPartitionId.getSecond());
} catch (EventHubException e) {
throw new EventHubRuntimeException("Error when creating event hub partition sender", e);
}
}
private EventProcessorHost createEventProcessorHost(Tuple<String, String> nameAndConsumerGroup) {
return new EventProcessorHost(EventProcessorHost.createHostName("hostNamePrefix"),
nameAndConsumerGroup.getFirst(), nameAndConsumerGroup.getSecond(),
connectionStringCreator().apply(nameAndConsumerGroup.getFirst()), checkpointStorageConnectionString,
checkpointStorageAccountContainer);
}
private Function<String, String> connectionStringCreator() {
return Memoizer.memoize(this::getConnectionString);
}
private String getConnectionString(String eventHubName) {
return namespace.listAuthorizationRules().stream().findFirst().map(AuthorizationRule::getKeys)
.map(EventHubAuthorizationKey::primaryConnectionString)
.map(s -> new ConnectionStringBuilder(s).setEventHubName(eventHubName).toString()).orElseThrow(
() -> new RuntimeException(
String.format("Failed to fetch connection string of '%s'", eventHubName), null));
}
private <K, V> void close(Map<K, V> map, Function<V, CompletableFuture<Void>> close) {
CompletableFuture.allOf(map.values().stream().map(close).toArray(CompletableFuture[]::new))
.exceptionally((ex) -> {
LOGGER.warn("Failed to clean event hub client factory", ex);
return null;
});
}
@Override
public void destroy() throws Exception {
Stream<CompletableFuture<Void>> closeClientFutures = clientMap.values().stream().map(EventHubClient::close);
Stream<CompletableFuture<Void>> closeSenderFutures =
partitionSenderMap.values().stream().map(PartitionSender::close);
Stream<CompletableFuture<Void>> closeProcessorFutures =
processorHostMap.values().stream().map(EventProcessorHost::unregisterEventProcessor);
CompletableFuture.allOf(Stream.of(closeClientFutures, closeSenderFutures, closeProcessorFutures)
.toArray(CompletableFuture[]::new)).exceptionally((ex) -> {
LOGGER.warn("Failed to clean event hub client factory", ex);
return null;
});
close(clientsByName, EventHubClient::close);
close(partitionSenderMap, PartitionSender::close);
close(processorHostMap, EventProcessorHost::unregisterEventProcessor);
}
}

Просмотреть файл

@ -9,15 +9,28 @@ package com.microsoft.azure.spring.integration.eventhub;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.spring.integration.core.Tuple;
import java.util.function.Function;
/**
* @author Warren Zhu
*/
public interface EventHubClientFactory {
EventHubClient getOrCreateEventHubClient(String eventHubName);
/**
* Return a function which accepts event hub name, then returns {@link EventHubClient}
*/
Function<String, EventHubClient> getEventHubClientCreator();
PartitionSender getOrCreatePartitionSender(String eventHubName, String partitionId);
/**
* Return a function which accepts {@link EventHubClient} and partition id, then returns {@link PartitionSender}
*/
Function<Tuple<EventHubClient, String>, PartitionSender> getPartitionSenderCreator();
/**
* Return a function which accepts event hub name and consumer group, then returns {@link EventProcessorHost}
*/
Function<Tuple<String, String>, EventProcessorHost> getProcessorHostCreator();
EventProcessorHost getOrCreateEventProcessorHost(String eventHubName, String consumerGroup);
}

Просмотреть файл

@ -31,7 +31,7 @@ import java.util.function.Consumer;
* Default implementation of {@link EventHubOperation}.
*
* <p>
* The main eventhub component for sending to and consuming from event hub
* The main event hub component for sending to and consuming from event hub
*
* @author Warren Zhu
*/
@ -57,12 +57,13 @@ public class EventHubTemplate implements EventHubOperation {
Assert.hasText(eventHubName, "eventHubName can't be null or empty");
Assert.notNull(message, "message can't be null");
try {
EventHubClient client = this.clientFactory.getOrCreateEventHubClient(eventHubName);
EventHubClient client = this.clientFactory.getEventHubClientCreator().apply(eventHubName);
if (partitionSupplier == null) {
return client.send(message);
} else if (!Strings.isNullOrEmpty(partitionSupplier.getPartitionId())) {
return this.clientFactory.getOrCreatePartitionSender(eventHubName, partitionSupplier.getPartitionId())
return this.clientFactory.getPartitionSenderCreator().apply(Tuple.of(client, partitionSupplier
.getPartitionId()))
.send(message);
} else if (!Strings.isNullOrEmpty(partitionSupplier.getPartitionKey())) {
return client.send(message, partitionSupplier.getPartitionKey());
@ -79,13 +80,13 @@ public class EventHubTemplate implements EventHubOperation {
@Override
public Checkpointer<EventData> getCheckpointer(String destination, String consumerGroup) {
return checkpointersByNameAndConsumerGroup.get(new Tuple<>(destination, consumerGroup));
return checkpointersByNameAndConsumerGroup.get(Tuple.of(destination, consumerGroup));
}
@Override
public synchronized boolean subscribe(String destination, Consumer<Iterable<EventData>> consumer,
String consumerGroup) {
Tuple<String, String> nameAndConsumerGroup = new Tuple<>(destination, consumerGroup);
Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
consumersByNameAndConsumerGroup.putIfAbsent(nameAndConsumerGroup, new CopyOnWriteArraySet<>());
boolean added = consumersByNameAndConsumerGroup.get(nameAndConsumerGroup).add(consumer);
@ -94,14 +95,15 @@ public class EventHubTemplate implements EventHubOperation {
}
processorHostsByNameAndConsumerGroup.computeIfAbsent(nameAndConsumerGroup, key -> {
EventProcessorHost host = this.clientFactory.getOrCreateEventProcessorHost(destination, consumerGroup);
EventProcessorHost host = this.clientFactory.getProcessorHostCreator().apply(Tuple.of(destination,
consumerGroup));
host.registerEventProcessorFactory(context -> new IEventProcessor() {
@Override
public void onOpen(PartitionContext context) throws Exception {
LOGGER.info(String.format("Partition %s is opening", context.getPartitionId()));
checkpointersByNameAndConsumerGroup.putIfAbsent(nameAndConsumerGroup, new EventHubCheckpointer());
checkpointersByNameAndConsumerGroup.get(new Tuple<>(destination, consumerGroup))
checkpointersByNameAndConsumerGroup.get(Tuple.of(destination, consumerGroup))
.addPartitionContext(context);
}
@ -109,7 +111,7 @@ public class EventHubTemplate implements EventHubOperation {
public void onClose(PartitionContext context, CloseReason reason) throws Exception {
LOGGER.info(
String.format("Partition %s is closing for reason %s", context.getPartitionId(), reason));
checkpointersByNameAndConsumerGroup.get(new Tuple<>(destination, consumerGroup))
checkpointersByNameAndConsumerGroup.get(Tuple.of(destination, consumerGroup))
.removePartitionContext(context);
}
@ -132,7 +134,7 @@ public class EventHubTemplate implements EventHubOperation {
@Override
public synchronized boolean unsubscribe(String destination, Consumer<Iterable<EventData>> consumer,
String consumerGroup) {
Tuple<String, String> nameAndConsumerGroup = new Tuple<>(destination, consumerGroup);
Tuple<String, String> nameAndConsumerGroup = Tuple.of(destination, consumerGroup);
boolean existed = consumersByNameAndConsumerGroup.get(nameAndConsumerGroup).remove(consumer);
if (consumersByNameAndConsumerGroup.get(nameAndConsumerGroup).isEmpty()) {
processorHostsByNameAndConsumerGroup.remove(nameAndConsumerGroup).unregisterEventProcessor();

Просмотреть файл

@ -1,119 +0,0 @@
/*
* Copyright (c) Microsoft Corporation. All rights reserved.
* Licensed under the MIT License. See LICENSE in the project root for
* license information.
*/
package com.microsoft.azure.spring.integration.core;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionSender;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import com.microsoft.azure.management.eventhub.EventHub;
import com.microsoft.azure.management.eventhub.EventHubNamespace;
import com.microsoft.azure.spring.cloud.context.core.AzureAdmin;
import com.microsoft.azure.spring.integration.eventhub.DefaultEventHubClientFactory;
import com.microsoft.azure.spring.integration.eventhub.EventHubClientFactory;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import java.util.concurrent.Executor;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.when;
import static org.powermock.api.mockito.PowerMockito.whenNew;
@RunWith(PowerMockRunner.class)
@PrepareForTest(EventHubClient.class)
public class DefaultEventHubClientFactoryTest {
@Mock
private AzureAdmin azureAdmin;
@Mock
private EventHubNamespace eventHubNamespace;
@Mock
private EventHubClient eventHubClient;
@Mock
private PartitionSender partitionSender;
@Mock
private EventProcessorHost processorHost;
@Mock
private EventHub eventHub;
private EventHubClientFactory factory;
private String namespace = "namespace";
private String eventHubName = "eventHub";
private String consumerGroup = "group";
private String partitionId = "1";
@Before
public void setUp() throws Exception {
this.factory = new DefaultEventHubClientFactory(azureAdmin, namespace);
when(azureAdmin.getOrCreateEventHubNamespace(eq(namespace))).thenReturn(eventHubNamespace);
when(azureAdmin.getOrCreateEventHub(eq(namespace), eq(eventHubName))).thenReturn(eventHub);
when(eventHubClient.getEventHubName()).thenReturn(eventHubName);
when(partitionSender.getPartitionId()).thenReturn(partitionId);
PowerMockito.mockStatic(EventHubClient.class);
when(EventHubClient.createSync(isA(String.class), isA(Executor.class))).thenReturn(eventHubClient);
when(eventHubClient.createPartitionSenderSync(eq(partitionId))).thenReturn(partitionSender);
whenNew(EventProcessorHost.class).withAnyArguments().thenReturn(processorHost);
}
@Test
@Ignore
public void testGetClient() {
EventHubClient client = factory.getOrCreateEventHubClient(eventHubName);
assertEquals(eventHubName, client.getEventHubName());
EventHubClient same = factory.getOrCreateEventHubClient(eventHubName);
assertEquals(same, client);
}
@Test
@Ignore
public void testGetSender() {
PartitionSender sender = factory.getOrCreatePartitionSender(eventHubName, partitionId);
assertEquals(partitionId, sender.getPartitionId());
PartitionSender same = factory.getOrCreatePartitionSender(eventHubName, partitionId);
assertEquals(same, sender);
}
@Test
@Ignore
public void testGetEventProcessorHost() {
EventProcessorHost host = factory.getOrCreateEventProcessorHost(eventHubName, consumerGroup);
EventProcessorHost same = factory.getOrCreateEventProcessorHost(eventHubName, consumerGroup);
assertEquals(host, same);
}
@Test(expected = IllegalArgumentException.class)
public void testNewDefaultPublisherFactory_nullProjectIdProvider() {
new DefaultEventHubClientFactory(null,
namespace);
}
@Test(expected = IllegalArgumentException.class)
public void testNewDefaultPublisherFactory_nullProjectId() {
new DefaultEventHubClientFactory(azureAdmin, null);
}
}

Просмотреть файл

@ -58,17 +58,15 @@ public class EventHubTemplateTest {
public void setUp() {
this.eventHubTemplate = new EventHubTemplate(this.mockClientFactory);
when(this.mockClientFactory.getOrCreateEventHubClient(eventHubName)).thenReturn(this.mockClient);
when(this.mockClientFactory.getEventHubClientCreator()).thenReturn(s -> this.mockClient);
when(this.mockClient.send(isA(EventData.class))).thenReturn(this.future);
when(this.mockClient.send(isA(EventData.class), eq(partitionKey))).thenReturn(this.future);
when(this.mockClientFactory.getOrCreatePartitionSender(eq(eventHubName), isA(String.class)))
.thenReturn(this.mockSender);
when(this.mockClientFactory.getPartitionSenderCreator()).thenReturn(t -> this.mockSender);
when(this.mockSender.send(isA(EventData.class))).thenReturn(this.future);
when(this.mockClientFactory.getOrCreateEventProcessorHost(eq(eventHubName), eq(consumerGroup))).thenReturn
(this.host);
when(this.host.registerEventProcessorFactory(isA(IEventProcessorFactory.class
))).thenReturn(new CompletableFuture<>());
when(this.mockClientFactory.getProcessorHostCreator()).thenReturn(t -> this.host);
when(this.host.registerEventProcessorFactory(isA(IEventProcessorFactory.class)))
.thenReturn(new CompletableFuture<>());
}
@Test
@ -99,7 +97,7 @@ public class EventHubTemplateTest {
assertEquals(null, future.get());
verify(this.mockSender, times(1)).send(isA(EventData.class));
verify(this.mockClientFactory, times(1)).getOrCreatePartitionSender(eq(eventHubName), eq(partitionId));
verify(this.mockClientFactory, times(1)).getPartitionSenderCreator();
}
@Test
@ -111,13 +109,14 @@ public class EventHubTemplateTest {
assertEquals(null, future.get());
verify(this.mockClient, times(1)).send(isA(EventData.class), eq(partitionKey));
verify(this.mockClientFactory, times(1)).getOrCreateEventHubClient(eq(eventHubName));
verify(this.mockClientFactory, times(1)).getEventHubClientCreator();
}
@Test(expected = EventHubRuntimeException.class)
public void testSendCreateSenderFailure() throws Throwable {
when(this.mockClientFactory.getOrCreateEventHubClient(eventHubName))
.thenThrow(new EventHubRuntimeException("couldn't create the event hub client."));
when(this.mockClientFactory.getEventHubClientCreator()).thenReturn((s) -> {
throw new EventHubRuntimeException("couldn't create the event hub client.");
});
try {
this.eventHubTemplate.sendAsync(eventHubName, this.message, null).get();
@ -145,8 +144,9 @@ public class EventHubTemplateTest {
public void testSubscribe() {
this.eventHubTemplate.subscribe(eventHubName, this::handleMessage, consumerGroup);
verify(this.mockClientFactory, times(1)).getOrCreateEventProcessorHost(eq(eventHubName), eq(consumerGroup));
verify(this.mockClientFactory, times(1)).getProcessorHostCreator();
}
private void handleMessage(Iterable<EventData> events){}
private void handleMessage(Iterable<EventData> events) {
}
}