This commit is contained in:
tracyboehrer 2021-03-24 09:39:13 -05:00 коммит произвёл GitHub
Родитель c01ecdd56d
Коммит 9d60e67189
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 510 добавлений и 400 удалений

Просмотреть файл

@ -1,247 +1,264 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.applicationinsights;
import com.microsoft.applicationinsights.TelemetryClient;
import com.microsoft.applicationinsights.telemetry.EventTelemetry;
import com.microsoft.applicationinsights.telemetry.ExceptionTelemetry;
import com.microsoft.applicationinsights.telemetry.PageViewTelemetry;
import com.microsoft.applicationinsights.telemetry.RemoteDependencyTelemetry;
import com.microsoft.applicationinsights.telemetry.SeverityLevel;
import com.microsoft.applicationinsights.telemetry.TraceTelemetry;
import com.microsoft.bot.builder.BotTelemetryClient;
import com.microsoft.bot.builder.Severity;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A logging client for bot telemetry.
*/
public class BotTelemetryClientImpl implements BotTelemetryClient {
private final TelemetryClient telemetryClient;
/**
* Initializes a new instance of the {@link BotTelemetryClient}.
*
* @param withTelemetryClient The telemetry client to forward bot events to.
*/
public BotTelemetryClientImpl(TelemetryClient withTelemetryClient) {
if (withTelemetryClient == null) {
throw new IllegalArgumentException("withTelemetry should be provided");
}
this.telemetryClient = withTelemetryClient;
}
/**
* Send information about availability of an application.
*
* @param name Availability test name.
* @param timeStamp The time when the availability was captured.
* @param duration The time taken for the availability test to run.
* @param runLocation Name of the location the availability test was run from.
* @param success True if the availability test ran successfully.
* @param message Error message on availability test run failure.
* @param properties Named string values you can use to classify and search for this availability telemetry.
* @param metrics Additional values associated with this availability telemetry.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@Override
public void trackAvailability(String name,
OffsetDateTime timeStamp,
Duration duration,
String runLocation,
boolean success,
String message,
Map<String, String> properties,
Map<String, Double> metrics) {
com.microsoft.applicationinsights.telemetry.Duration durationTelemetry =
new com.microsoft.applicationinsights.telemetry.Duration(duration.toNanos());
ConcurrentMap<String, String> concurrentProperties = new ConcurrentHashMap<>(properties);
ConcurrentMap<String, Double> concurrentMetrics = new ConcurrentHashMap<>(metrics);
AvailabilityTelemetry telemetry = new AvailabilityTelemetry(
name,
durationTelemetry,
runLocation,
message,
success,
concurrentMetrics,
concurrentProperties);
if (properties != null) {
for (Map.Entry<String, String> pair: properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair: metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
/**
* This should be telemetryClient.trackAvailability(telemetry).
* However, it is not present in TelemetryClient class
*/
telemetryClient.track(telemetry);
}
/**
* Send information about an external dependency (outgoing call) in the application.
*
* @param dependencyTypeName Name of the command initiated with this dependency call. Low cardinality value.
* Examples are SQL, Azure table, and HTTP.
* @param target External dependency target.
* @param dependencyName Name of the command initiated with this dependency call. Low cardinality value.
* Examples are stored procedure name and URL path template.
* @param data Command initiated by this dependency call. Examples are SQL statement and HTTP
* URL's with all query parameters.
* @param startTime The time when the dependency was called.
* @param duration The time taken by the external dependency to handle the call.
* @param resultCode Result code of dependency call execution.
* @param success True if the dependency call was handled successfully.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@Override
public void trackDependency(String dependencyTypeName,
String target,
String dependencyName,
String data,
OffsetDateTime startTime,
Duration duration,
String resultCode,
boolean success) {
com.microsoft.applicationinsights.telemetry.Duration durationTelemetry =
new com.microsoft.applicationinsights.telemetry.Duration(duration.toNanos());
RemoteDependencyTelemetry telemetry =
new RemoteDependencyTelemetry(dependencyName, data, durationTelemetry, success);
telemetry.setType(dependencyTypeName);
telemetry.setTarget(target);
telemetry.setTimestamp(new Date(startTime.toInstant().toEpochMilli()));
telemetry.setResultCode(resultCode);
telemetryClient.trackDependency(telemetry);
}
/**
* Logs custom events with extensible named fields.
*
* @param eventName A name for the event.
* @param properties Named string values you can use to search and classify events.
* @param metrics Measurements associated with this event.
*/
@Override
public void trackEvent(String eventName, Map<String, String> properties, Map<String, Double> metrics) {
EventTelemetry telemetry = new EventTelemetry(eventName);
if (properties != null) {
for (Map.Entry<String, String> pair: properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair: metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackEvent(telemetry);
}
/**
* Logs a system exception.
*
* @param exception The exception to log.
* @param properties Named string values you can use to classify and search for this exception.
* @param metrics Additional values associated with this exception
*/
@Override
public void trackException(Exception exception, Map<String, String> properties, Map<String, Double> metrics) {
ExceptionTelemetry telemetry = new ExceptionTelemetry(exception);
if (properties != null) {
for (Map.Entry<String, String> pair: properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair: metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackException(telemetry);
}
/**
* Send a trace message.
*
* @param message Message to display.
* @param severityLevel Trace severity level {@link Severity}.
* @param properties Named string values you can use to search and classify events.
*/
@Override
public void trackTrace(String message, Severity severityLevel, Map<String, String> properties) {
TraceTelemetry telemetry = new TraceTelemetry(message);
telemetry.setSeverityLevel(SeverityLevel.values()[severityLevel.ordinal()]);
if (properties != null) {
for (Map.Entry<String, String> pair: properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackTrace(telemetry);
}
/**
* We implemented this method calling the tracePageView method from {@link BotTelemetryClientImpl} as the
* IBotPageViewTelemetryClient has not been implemented.
* {@inheritDoc}
*/
@Override
public void trackDialogView(String dialogName, Map<String, String> properties, Map<String, Double> metrics) {
trackPageView(dialogName, properties, metrics);
}
/**
* Logs a dialog entry / as an Application Insights page view.
*
* @param dialogName The name of the dialog to log the entry / start for.
* @param properties Named string values you can use to search and classify events.
* @param metrics Measurements associated with this event.
*/
public void trackPageView(String dialogName, Map<String, String> properties, Map<String, Double> metrics) {
PageViewTelemetry telemetry = new PageViewTelemetry(dialogName);
if (properties != null) {
for (Map.Entry<String, String> pair: properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair: metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackPageView(telemetry);
}
/**
* Flushes the in-memory buffer and any metrics being pre-aggregated.
*/
@Override
public void flush() {
telemetryClient.flush();
}
}
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.microsoft.bot.applicationinsights;
import com.microsoft.applicationinsights.TelemetryClient;
import com.microsoft.applicationinsights.telemetry.EventTelemetry;
import com.microsoft.applicationinsights.telemetry.ExceptionTelemetry;
import com.microsoft.applicationinsights.telemetry.PageViewTelemetry;
import com.microsoft.applicationinsights.telemetry.RemoteDependencyTelemetry;
import com.microsoft.applicationinsights.telemetry.SeverityLevel;
import com.microsoft.applicationinsights.telemetry.TraceTelemetry;
import com.microsoft.bot.builder.BotTelemetryClient;
import com.microsoft.bot.builder.Severity;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* A logging client for bot telemetry.
*/
public class ApplicationInsightsBotTelemetryClient implements BotTelemetryClient {
private final TelemetryClient telemetryClient;
/**
* Initializes a new instance of the {@link BotTelemetryClient}.
*
* @param withTelemetryClient The telemetry client to forward bot events to.
*/
public ApplicationInsightsBotTelemetryClient(TelemetryClient withTelemetryClient) {
if (withTelemetryClient == null) {
throw new IllegalArgumentException("withTelemetry should be provided");
}
this.telemetryClient = withTelemetryClient;
}
/**
* Send information about availability of an application.
*
* @param name Availability test name.
* @param timeStamp The time when the availability was captured.
* @param duration The time taken for the availability test to run.
* @param runLocation Name of the location the availability test was run from.
* @param success True if the availability test ran successfully.
* @param message Error message on availability test run failure.
* @param properties Named string values you can use to classify and search for
* this availability telemetry.
* @param metrics Additional values associated with this availability
* telemetry.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@Override
public void trackAvailability(
String name,
OffsetDateTime timeStamp,
Duration duration,
String runLocation,
boolean success,
String message,
Map<String, String> properties,
Map<String, Double> metrics
) {
com.microsoft.applicationinsights.telemetry.Duration durationTelemetry =
new com.microsoft.applicationinsights.telemetry.Duration(duration.toNanos());
ConcurrentMap<String, String> concurrentProperties = new ConcurrentHashMap<>(properties);
ConcurrentMap<String, Double> concurrentMetrics = new ConcurrentHashMap<>(metrics);
AvailabilityTelemetry telemetry = new AvailabilityTelemetry(
name,
durationTelemetry,
runLocation,
message,
success,
concurrentMetrics,
concurrentProperties
);
if (properties != null) {
for (Map.Entry<String, String> pair : properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair : metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
/**
* This should be telemetryClient.trackAvailability(telemetry). However, it is
* not present in TelemetryClient class
*/
telemetryClient.track(telemetry);
}
/**
* Send information about an external dependency (outgoing call) in the
* application.
*
* @param dependencyTypeName Name of the command initiated with this dependency
* call. Low cardinality value. Examples are SQL,
* Azure table, and HTTP.
* @param target External dependency target.
* @param dependencyName Name of the command initiated with this dependency
* call. Low cardinality value. Examples are stored
* procedure name and URL path template.
* @param data Command initiated by this dependency call. Examples
* are SQL statement and HTTP URL's with all query
* parameters.
* @param startTime The time when the dependency was called.
* @param duration The time taken by the external dependency to handle
* the call.
* @param resultCode Result code of dependency call execution.
* @param success True if the dependency call was handled
* successfully.
*/
@SuppressWarnings("checkstyle:ParameterNumber")
@Override
public void trackDependency(
String dependencyTypeName,
String target,
String dependencyName,
String data,
OffsetDateTime startTime,
Duration duration,
String resultCode,
boolean success
) {
com.microsoft.applicationinsights.telemetry.Duration durationTelemetry =
new com.microsoft.applicationinsights.telemetry.Duration(duration.toNanos());
RemoteDependencyTelemetry telemetry =
new RemoteDependencyTelemetry(dependencyName, data, durationTelemetry, success);
telemetry.setType(dependencyTypeName);
telemetry.setTarget(target);
telemetry.setTimestamp(new Date(startTime.toInstant().toEpochMilli()));
telemetry.setResultCode(resultCode);
telemetryClient.trackDependency(telemetry);
}
/**
* Logs custom events with extensible named fields.
*
* @param eventName A name for the event.
* @param properties Named string values you can use to search and classify
* events.
* @param metrics Measurements associated with this event.
*/
@Override
public void trackEvent(String eventName, Map<String, String> properties, Map<String, Double> metrics) {
EventTelemetry telemetry = new EventTelemetry(eventName);
if (properties != null) {
for (Map.Entry<String, String> pair : properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair : metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackEvent(telemetry);
}
/**
* Logs a system exception.
*
* @param exception The exception to log.
* @param properties Named string values you can use to classify and search for
* this exception.
* @param metrics Additional values associated with this exception
*/
@Override
public void trackException(Exception exception, Map<String, String> properties, Map<String, Double> metrics) {
ExceptionTelemetry telemetry = new ExceptionTelemetry(exception);
if (properties != null) {
for (Map.Entry<String, String> pair : properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair : metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackException(telemetry);
}
/**
* Send a trace message.
*
* @param message Message to display.
* @param severityLevel Trace severity level {@link Severity}.
* @param properties Named string values you can use to search and classify
* events.
*/
@Override
public void trackTrace(String message, Severity severityLevel, Map<String, String> properties) {
TraceTelemetry telemetry = new TraceTelemetry(message);
telemetry.setSeverityLevel(SeverityLevel.values()[severityLevel.ordinal()]);
if (properties != null) {
for (Map.Entry<String, String> pair : properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackTrace(telemetry);
}
/**
* We implemented this method calling the tracePageView method from
* {@link ApplicationInsightsBotTelemetryClient} as the
* IBotPageViewTelemetryClient has not been implemented. {@inheritDoc}
*/
@Override
public void trackDialogView(String dialogName, Map<String, String> properties, Map<String, Double> metrics) {
trackPageView(dialogName, properties, metrics);
}
/**
* Logs a dialog entry / as an Application Insights page view.
*
* @param dialogName The name of the dialog to log the entry / start for.
* @param properties Named string values you can use to search and classify
* events.
* @param metrics Measurements associated with this event.
*/
public void trackPageView(String dialogName, Map<String, String> properties, Map<String, Double> metrics) {
PageViewTelemetry telemetry = new PageViewTelemetry(dialogName);
if (properties != null) {
for (Map.Entry<String, String> pair : properties.entrySet()) {
telemetry.getProperties().put(pair.getKey(), pair.getValue());
}
}
if (metrics != null) {
for (Map.Entry<String, Double> pair : metrics.entrySet()) {
telemetry.getMetrics().put(pair.getKey(), pair.getValue());
}
}
telemetryClient.trackPageView(telemetry);
}
/**
* Flushes the in-memory buffer and any metrics being pre-aggregated.
*/
@Override
public void flush() {
telemetryClient.flush();
}
}

Просмотреть файл

@ -12,8 +12,9 @@ import java.util.Date;
import java.util.concurrent.ConcurrentMap;
/**
* We took this class from https://github.com/microsoft/ApplicationInsights-Java/issues/1099
* as this is not already migrated in ApplicationInsights-Java library.
* We took this class from
* https://github.com/microsoft/ApplicationInsights-Java/issues/1099 as this is
* not already migrated in ApplicationInsights-Java library.
*/
public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<AvailabilityData> {
private Double samplingPercentage;
@ -23,7 +24,6 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
public static final String BASE_TYPE = "AvailabilityData";
/**
* Initializes a new instance of the AvailabilityTelemetry class.
*/
@ -38,19 +38,26 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
}
/**
* Initializes a new instance of the AvailabilityTelemetry class with the given name,
* time stamp, duration, HTTP response code and success property values.
* @param name A user-friendly name for the request.
* @param duration The time of the request.
* @param runLocation The duration, in milliseconds, of the request processing.
* @param message The HTTP response code.
* @param success 'true' if the request was a success, 'false' otherwise.
* Initializes a new instance of the AvailabilityTelemetry class with the given
* name, time stamp, duration, HTTP response code and success property values.
*
* @param name A user-friendly name for the request.
* @param duration The time of the request.
* @param runLocation The duration, in milliseconds, of the request processing.
* @param message The HTTP response code.
* @param success 'true' if the request was a success, 'false' otherwise.
* @param measurements The measurements.
* @param properties The corresponding properties.
* @param properties The corresponding properties.
*/
public AvailabilityTelemetry(String name, Duration duration, String runLocation, String message,
boolean success, ConcurrentMap<String, Double> measurements,
ConcurrentMap<String, String> properties) {
public AvailabilityTelemetry(
String name,
Duration duration,
String runLocation,
String message,
boolean success,
ConcurrentMap<String, Double> measurements,
ConcurrentMap<String, String> properties
) {
this.data = new AvailabilityData();
@ -70,9 +77,9 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
setSuccess(success);
}
/**
* Gets the ver value from the data object.
*
* @return The ver value.
*/
@Override
@ -82,6 +89,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets a map of application-defined request metrics.
*
* @return The map of metrics
*/
public ConcurrentMap<String, Double> getMetrics() {
@ -89,7 +97,9 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
}
/**
* Sets the StartTime. Uses the default behavior and sets the property on the 'data' start time.
* Sets the StartTime. Uses the default behavior and sets the property on the
* 'data' start time.
*
* @param timestamp The timestamp as Date.
*/
@Override
@ -103,6 +113,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets or human-readable name of the requested page.
*
* @return A human-readable name.
*/
public String getName() {
@ -111,6 +122,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Sets or human-readable name of the requested page.
*
* @param name A human-readable name.
*/
public void setName(String name) {
@ -119,6 +131,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets or human-readable name of the run location.
*
* @return A human-readable name.
*/
public String getRunLocation() {
@ -127,6 +140,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Sets or human-readable name of the run location.
*
* @param runLocation A human-readable name
*/
public void setRunLocation(String runLocation) {
@ -135,6 +149,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets the unique identifier of the request.
*
* @return Unique identifier.
*/
public String getId() {
@ -143,6 +158,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Sets the unique identifier of the request.
*
* @param id Unique identifier.
*/
public void setId(String id) {
@ -151,6 +167,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets a value indicating whether application handled the request successfully.
*
* @return Success indication.
*/
public boolean isSuccess() {
@ -159,6 +176,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Sets a value indicating whether application handled the request successfully.
*
* @param success Success indication.
*/
public void setSuccess(boolean success) {
@ -167,6 +185,7 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Gets the amount of time it took the application to handle the request.
*
* @return Amount of time in milliseconds.
*/
public Duration getDuration() {
@ -175,7 +194,9 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
/**
* Sets the amount of time it took the application to handle the request.
* @param duration Amount of time in captured in a {@link com.microsoft.applicationinsights.telemetry.Duration}.
*
* @param duration Amount of time in captured in a
* {@link com.microsoft.applicationinsights.telemetry.Duration}.
*/
public void setDuration(Duration duration) {
data.setDuration(duration);
@ -214,4 +235,3 @@ public final class AvailabilityTelemetry extends BaseSampleSourceTelemetry<Avail
return BASE_TYPE;
}
}

Просмотреть файл

@ -27,11 +27,15 @@ public class TelemetryInitializerMiddleware implements Middleware {
/**
* Initializes a new instance of the {@link TelemetryInitializerMiddleware}.
*
* @param withTelemetryLoggerMiddleware The TelemetryLoggerMiddleware to use.
* @param withLogActivityTelemetry Boolean determining if you want to log telemetry activity
* @param withLogActivityTelemetry Boolean determining if you want to log
* telemetry activity
*/
public TelemetryInitializerMiddleware(TelemetryLoggerMiddleware withTelemetryLoggerMiddleware,
Boolean withLogActivityTelemetry) {
public TelemetryInitializerMiddleware(
TelemetryLoggerMiddleware withTelemetryLoggerMiddleware,
Boolean withLogActivityTelemetry
) {
telemetryLoggerMiddleware = withTelemetryLoggerMiddleware;
if (withLogActivityTelemetry == null) {
withLogActivityTelemetry = true;
@ -40,9 +44,11 @@ public class TelemetryInitializerMiddleware implements Middleware {
}
/**
* Stores the incoming activity as JSON in the items collection on the HttpContext.
* Stores the incoming activity as JSON in the items collection on the
* HttpContext.
*
* @param context The incoming TurnContext
* @param next Delegate to run next on
* @param next Delegate to run next on
* @return Returns a CompletableFuture with Void value
*/
public CompletableFuture<Void> onTurn(TurnContext context, NextDelegate next) {
@ -71,4 +77,3 @@ public class TelemetryInitializerMiddleware implements Middleware {
}
}
}

Просмотреть файл

@ -38,13 +38,13 @@ public class BotTelemetryClientTests {
telemetryConfiguration.setChannel(mockTelemetryChannel);
TelemetryClient telemetryClient = new TelemetryClient(telemetryConfiguration);
botTelemetryClient = new BotTelemetryClientImpl(telemetryClient);
botTelemetryClient = new ApplicationInsightsBotTelemetryClient(telemetryClient);
}
@Test
public void nullTelemetryClientThrows() {
Assert.assertThrows(IllegalArgumentException.class, () -> {
new BotTelemetryClientImpl(null);
new ApplicationInsightsBotTelemetryClient(null);
});
}
@ -52,7 +52,7 @@ public class BotTelemetryClientTests {
public void nonNullTelemetryClientSucceeds() {
TelemetryClient telemetryClient = new TelemetryClient();
BotTelemetryClient botTelemetryClient = new BotTelemetryClientImpl(telemetryClient);
BotTelemetryClient botTelemetryClient = new ApplicationInsightsBotTelemetryClient(telemetryClient);
}
@Test

Просмотреть файл

@ -10,7 +10,7 @@ import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Map;
public class MyBotTelemetryClient extends BotTelemetryClientImpl {
public class MyBotTelemetryClient extends ApplicationInsightsBotTelemetryClient {
public MyBotTelemetryClient(TelemetryClient telemetryClient) {
super(telemetryClient);
}

Просмотреть файл

@ -48,8 +48,8 @@ public final class CosmosDbKeyEscape {
* means a key of "?test?" would be escaped as "*3ftest*3f".
*/
private static final Map<Character, String> ILLEGAL_KEY_CHARACTER_REPLACEMENT_MAP = Arrays
.stream(ArrayUtils.toObject(ILLEGAL_KEYS))
.collect(Collectors.toMap(c -> c, c -> "*" + String.format("%02x", (int) c)));
.stream(ArrayUtils.toObject(ILLEGAL_KEYS))
.collect(Collectors.toMap(c -> c, c -> "*" + String.format("%02x", (int) c)));
/**
* Converts the key into a DocumentID that can be used safely with Cosmos DB.
@ -94,8 +94,8 @@ public final class CosmosDbKeyEscape {
// Allocate a builder that assumes that all remaining characters might be
// replaced
// to avoid any extra allocations
StringBuilder sanitizedKeyBuilder = new StringBuilder(
key.length() + ((key.length() - firstIllegalCharIndex) * ESCAPE_LENGTH));
StringBuilder sanitizedKeyBuilder =
new StringBuilder(key.length() + ((key.length() - firstIllegalCharIndex) * ESCAPE_LENGTH));
// Add all good characters up to the first bad character to the builder first
for (Integer index = 0; index < firstIllegalCharIndex; index++) {

Просмотреть файл

@ -85,7 +85,8 @@ public class CosmosDbPartitionedStorage implements Storage {
if (StringUtils.isNotBlank(withCosmosDbStorageOptions.getKeySuffix())) {
if (withCosmosDbStorageOptions.getCompatibilityMode()) {
throw new IllegalArgumentException(
"CompatibilityMode cannot be 'true' while using a KeySuffix: withCosmosDbStorageOptions");
"CompatibilityMode cannot be 'true' while using a KeySuffix: withCosmosDbStorageOptions"
);
}
// In order to reduce key complexity, we do not allow invalid characters in a
@ -93,18 +94,28 @@ public class CosmosDbPartitionedStorage implements Storage {
// If the KeySuffix has invalid characters, the EscapeKey will not match
String suffixEscaped = CosmosDbKeyEscape.escapeKey(withCosmosDbStorageOptions.getKeySuffix());
if (!withCosmosDbStorageOptions.getKeySuffix().equals(suffixEscaped)) {
throw new IllegalArgumentException(String.format("Cannot use invalid Row Key characters: %s %s",
withCosmosDbStorageOptions.getKeySuffix(), "withCosmosDbStorageOptions"));
throw new IllegalArgumentException(
String.format(
"Cannot use invalid Row Key characters: %s %s",
withCosmosDbStorageOptions.getKeySuffix(),
"withCosmosDbStorageOptions"
)
);
}
}
cosmosDbStorageOptions = withCosmosDbStorageOptions;
objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.findAndRegisterModules().enableDefaultTyping();
.findAndRegisterModules()
.enableDefaultTyping();
client = new DocumentClient(cosmosDbStorageOptions.getCosmosDbEndpoint(), cosmosDbStorageOptions.getAuthKey(),
cosmosDbStorageOptions.getConnectionPolicy(), cosmosDbStorageOptions.getConsistencyLevel());
client = new DocumentClient(
cosmosDbStorageOptions.getCosmosDbEndpoint(),
cosmosDbStorageOptions.getAuthKey(),
cosmosDbStorageOptions.getConnectionPolicy(),
cosmosDbStorageOptions.getConsistencyLevel()
);
}
/**
@ -128,8 +139,15 @@ public class CosmosDbPartitionedStorage implements Storage {
// Issue all of the reads at once
List<CompletableFuture<Document>> documentFutures = new ArrayList<>();
for (String key : keys) {
documentFutures.add(getDocumentById(CosmosDbKeyEscape.escapeKey(key,
cosmosDbStorageOptions.getKeySuffix(), cosmosDbStorageOptions.getCompatibilityMode())));
documentFutures.add(
getDocumentById(
CosmosDbKeyEscape.escapeKey(
key,
cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode()
)
)
);
}
// Map each returned Document to it's original value.
@ -190,8 +208,13 @@ public class CosmosDbPartitionedStorage implements Storage {
DocumentStoreItem documentChange = new DocumentStoreItem() {
{
setId(CosmosDbKeyEscape.escapeKey(change.getKey(), cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode()));
setId(
CosmosDbKeyEscape.escapeKey(
change.getKey(),
cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode()
)
);
setReadId(change.getKey());
setDocument(node.toString());
setType(change.getValue().getClass().getTypeName());
@ -243,8 +266,8 @@ public class CosmosDbPartitionedStorage implements Storage {
// issue the deletes in parallel
return getCollection().thenCompose(collection -> Arrays.stream(keys).map(key -> {
String escapedKey = CosmosDbKeyEscape.escapeKey(key, cosmosDbStorageOptions.getKeySuffix(),
cosmosDbStorageOptions.getCompatibilityMode());
String escapedKey = CosmosDbKeyEscape
.escapeKey(key, cosmosDbStorageOptions.getKeySuffix(), cosmosDbStorageOptions.getCompatibilityMode());
return getDocumentById(escapedKey).thenApplyAsync(document -> {
if (document != null) {
try {
@ -266,10 +289,10 @@ public class CosmosDbPartitionedStorage implements Storage {
private Database getDatabase() {
if (databaseCache == null) {
// Get the database if it exists
List<Database> databaseList = client
.queryDatabases("SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getDatabaseId() + "'",
null)
.getQueryIterable().toList();
List<Database> databaseList = client.queryDatabases(
"SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getDatabaseId() + "'",
null
).getQueryIterable().toList();
if (databaseList.size() > 0) {
// Cache the database object so we won't have to query for it
@ -306,9 +329,11 @@ public class CosmosDbPartitionedStorage implements Storage {
return CompletableFuture.supplyAsync(() -> {
// Get the collection if it exists.
List<DocumentCollection> collectionList = client.queryCollections(getDatabase().getSelfLink(),
"SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getContainerId() + "'", null)
.getQueryIterable().toList();
List<DocumentCollection> collectionList = client.queryCollections(
getDatabase().getSelfLink(),
"SELECT * FROM root r WHERE r.id='" + cosmosDbStorageOptions.getContainerId() + "'",
null
).getQueryIterable().toList();
if (collectionList.size() > 0) {
// Cache the collection object so we won't have to query for it
@ -331,8 +356,8 @@ public class CosmosDbPartitionedStorage implements Storage {
};
collectionCache = client
.createCollection(getDatabase().getSelfLink(), collectionDefinition, options)
.getResource();
.createCollection(getDatabase().getSelfLink(), collectionDefinition, options)
.getResource();
} catch (DocumentClientException e) {
// able to query or create the collection.
// Verify your connection, endpoint, and key.
@ -350,8 +375,9 @@ public class CosmosDbPartitionedStorage implements Storage {
return getCollection().thenApplyAsync(collection -> {
// Retrieve the document using the DocumentClient.
List<Document> documentList = client
.queryDocuments(collection.getSelfLink(), "SELECT * FROM root r WHERE r.id='" + id + "'", null)
.getQueryIterable().toList();
.queryDocuments(collection.getSelfLink(), "SELECT * FROM root r WHERE r.id='" + id + "'", null)
.getQueryIterable()
.toList();
if (documentList.size() > 0) {
return documentList.get(0);

Просмотреть файл

@ -201,6 +201,7 @@ public class CosmosDbPartitionedStorageOptions {
* also allow for using older collections where no PartitionKey was specified.
*
* Note: CompatibilityMode cannot be 'true' if KeySuffix is used.
*
* @return The compatibilityMode
*/
public Boolean getCompatibilityMode() {

Просмотреть файл

@ -32,13 +32,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
/**
* Implements {@link Storage} using Azure Storage Blobs.
* This class uses a single Azure Storage Blob Container.
* Each entity or {@link StoreItem} is serialized into a JSON string and stored in an individual text blob.
* Each blob is named after the store item key, which is encoded so that it conforms a valid blob name.
* an entity is an {@link StoreItem}, the storage object will set the entity's {@link StoreItem}
* property value to the blob's ETag upon read. Afterward, an {@link BlobRequestConditions} with the ETag value
* will be generated during Write. New entities start with a null ETag.
* Implements {@link Storage} using Azure Storage Blobs. This class uses a
* single Azure Storage Blob Container. Each entity or {@link StoreItem} is
* serialized into a JSON string and stored in an individual text blob. Each
* blob is named after the store item key, which is encoded so that it conforms
* a valid blob name. an entity is an {@link StoreItem}, the storage object will
* set the entity's {@link StoreItem} property value to the blob's ETag upon
* read. Afterward, an {@link BlobRequestConditions} with the ETag value will be
* generated during Write. New entities start with a null ETag.
*/
public class BlobsStorage implements Storage {
@ -50,8 +51,10 @@ public class BlobsStorage implements Storage {
/**
* Initializes a new instance of the {@link BlobsStorage} class.
*
* @param dataConnectionString Azure Storage connection string.
* @param containerName Name of the Blob container where entities will be stored.
* @param containerName Name of the Blob container where entities will be
* stored.
*/
public BlobsStorage(String dataConnectionString, String containerName) {
if (StringUtils.isBlank(dataConnectionString)) {
@ -62,19 +65,18 @@ public class BlobsStorage implements Storage {
throw new IllegalArgumentException("containerName is required.");
}
objectMapper = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
objectMapper = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.findAndRegisterModules()
.enableDefaultTyping();
containerClient = new BlobContainerClientBuilder()
.connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
containerClient = new BlobContainerClientBuilder().connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
}
/**
* Deletes entity blobs from the configured container.
*
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@ -84,7 +86,7 @@ public class BlobsStorage implements Storage {
throw new IllegalArgumentException("The 'keys' parameter is required.");
}
for (String key: keys) {
for (String key : keys) {
String blobName = getBlobName(key);
BlobClient blobClient = containerClient.getBlobClient(blobName);
if (blobClient.exists()) {
@ -102,6 +104,7 @@ public class BlobsStorage implements Storage {
/**
* Retrieve entities from the configured blob container.
*
* @param keys An array of entity keys.
* @return A task that represents the work queued to execute.
*/
@ -136,6 +139,7 @@ public class BlobsStorage implements Storage {
/**
* Stores a new entity in the configured blob container.
*
* @param changes The changes to write to storage.
* @return A task that represents the work queued to execute.
*/
@ -157,28 +161,37 @@ public class BlobsStorage implements Storage {
StoreItem storeItem = newValue instanceof StoreItem ? (StoreItem) newValue : null;
// "*" eTag in StoreItem converts to null condition for AccessCondition
boolean isNullOrEmpty = storeItem == null || StringUtils.isBlank(storeItem.getETag())
|| storeItem.getETag().equals("*");
BlobRequestConditions accessCondition = !isNullOrEmpty
? new BlobRequestConditions().setIfMatch(storeItem.getETag())
: null;
boolean isNullOrEmpty =
storeItem == null || StringUtils.isBlank(storeItem.getETag()) || storeItem.getETag().equals("*");
BlobRequestConditions accessCondition =
!isNullOrEmpty ? new BlobRequestConditions().setIfMatch(storeItem.getETag()) : null;
String blobName = getBlobName(keyValuePair.getKey());
BlobClient blobReference = containerClient.getBlobClient(blobName);
try {
String json = objectMapper.writeValueAsString(newValue);
InputStream stream = new ByteArrayInputStream(json.getBytes(StandardCharsets.UTF_8));
//verify the corresponding length
blobReference.uploadWithResponse(stream, stream.available(),
null, null,
null, null, accessCondition, null, Context.NONE);
// verify the corresponding length
blobReference.uploadWithResponse(
stream,
stream.available(),
null,
null,
null,
null,
accessCondition,
null,
Context.NONE
);
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
StringBuilder sb =
new StringBuilder("An error occurred while trying to write an object. The underlying ");
sb.append(BlobErrorCode.INVALID_BLOCK_LIST);
sb.append(" error is commonly caused due to "
+ "concurrently uploading an object larger than 128MB in size.");
sb.append(
" error is commonly caused due to "
+ "concurrently uploading an object larger than 128MB in size."
);
throw new HttpResponseException(sb.toString(), e.getResponse());
}
@ -210,12 +223,13 @@ public class BlobsStorage implements Storage {
while (true) {
try (ByteArrayOutputStream outputStream = new ByteArrayOutputStream()) {
blobReference.download(outputStream);
String contentString = outputStream.toString();
String contentString = outputStream.toString();
Object obj;
// We are doing this try/catch because we are receiving String or HashMap
try {
// We need to deserialize to an Object class since there are contentString which has an Object type
// We need to deserialize to an Object class since there are contentString which
// has an Object type
obj = objectMapper.readValue(contentString, Object.class);
} catch (MismatchedInputException ex) {
// In case of the contentString has the structure of a HashMap,
@ -232,7 +246,8 @@ public class BlobsStorage implements Storage {
} catch (HttpResponseException e) {
if (e.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) {
// additional retry logic,
// even though this is a read operation blob storage can return 412 if there is contention
// even though this is a read operation blob storage can return 412 if there is
// contention
if (i++ < retryTimes) {
try {
TimeUnit.MILLISECONDS.sleep(millisecondsTimeout);

Просмотреть файл

@ -68,8 +68,10 @@ public class BlobsTranscriptStore implements TranscriptStore {
/**
* Initializes a new instance of the {@link BlobsTranscriptStore} class.
*
* @param dataConnectionString Azure Storage connection string.
* @param containerName Name of the Blob container where entities will be stored.
* @param containerName Name of the Blob container where entities will be
* stored.
*/
public BlobsTranscriptStore(String dataConnectionString, String containerName) {
if (StringUtils.isBlank(dataConnectionString)) {
@ -80,8 +82,7 @@ public class BlobsTranscriptStore implements TranscriptStore {
throw new IllegalArgumentException("containerName");
}
jsonSerializer = new ObjectMapper()
.setSerializationInclusion(JsonInclude.Include.NON_NULL)
jsonSerializer = new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL)
.enable(SerializationFeature.INDENT_OUTPUT)
.findAndRegisterModules();
@ -91,6 +92,7 @@ public class BlobsTranscriptStore implements TranscriptStore {
/**
* Log an activity to the transcript.
*
* @param activity Activity being logged.
* @return A CompletableFuture that represents the work queued to execute.
*/
@ -101,8 +103,8 @@ public class BlobsTranscriptStore implements TranscriptStore {
case ActivityTypes.MESSAGE_UPDATE:
Activity updatedActivity = null;
try {
updatedActivity = jsonSerializer
.readValue(jsonSerializer.writeValueAsString(activity), Activity.class);
updatedActivity =
jsonSerializer.readValue(jsonSerializer.writeValueAsString(activity), Activity.class);
} catch (IOException ex) {
ex.printStackTrace();
}
@ -122,6 +124,7 @@ public class BlobsTranscriptStore implements TranscriptStore {
});
return CompletableFuture.completedFuture(null);
case ActivityTypes.MESSAGE_DELETE:
innerReadBlob(activity).thenAccept(activityAndBlob -> {
if (activityAndBlob != null && activityAndBlob.getLeft() != null) {
@ -147,19 +150,20 @@ public class BlobsTranscriptStore implements TranscriptStore {
logActivityToBlobClient(tombstonedActivity, activityAndBlob.getRight(), true)
.thenApply(task -> CompletableFuture.completedFuture(null));
}
}
});
return CompletableFuture.completedFuture(null);
default:
this.innerLogActivity(activity)
.thenApply(task -> CompletableFuture.completedFuture(null));
this.innerLogActivity(activity).thenApply(task -> CompletableFuture.completedFuture(null));
return CompletableFuture.completedFuture(null);
}
}
/**
* Get activities for a conversation (Aka the transcript).
*
* @param channelId The ID of the channel the conversation is in.
* @param conversationId The ID of the conversation.
* @param continuationToken The continuation token (if available).
@ -167,9 +171,12 @@ public class BlobsTranscriptStore implements TranscriptStore {
* not included.
* @return PagedResult of activities.
*/
public CompletableFuture<PagedResult<Activity>> getTranscriptActivities(String channelId, String conversationId,
@Nullable String continuationToken,
OffsetDateTime startDate) {
public CompletableFuture<PagedResult<Activity>> getTranscriptActivities(
String channelId,
String conversationId,
@Nullable String continuationToken,
OffsetDateTime startDate
) {
if (startDate == null) {
startDate = OffsetDateTime.MIN;
}
@ -194,11 +201,10 @@ public class BlobsTranscriptStore implements TranscriptStore {
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
for (PagedResponse<BlobItem> blobPage : resultSegment) {
for (BlobItem blobItem : blobPage.getValue()) {
OffsetDateTime parseDateTime = OffsetDateTime.parse(blobItem.getMetadata().get("Timestamp"));
if (parseDateTime.isAfter(startDate)
|| parseDateTime.isEqual(startDate)) {
if (parseDateTime.isAfter(startDate) || parseDateTime.isEqual(startDate)) {
if (continuationToken != null) {
if (blobItem.getName().equals(continuationToken)) {
// we found continuation token
@ -218,14 +224,10 @@ public class BlobsTranscriptStore implements TranscriptStore {
}
} while (!StringUtils.isBlank(token) && blobs.size() < pageSize);
pagedResult.setItems(blobs
.stream()
.map(bl -> {
BlobClient blobClient = containerClient.getBlobClient(bl.getName());
return this.getActivityFromBlobClient(blobClient);
})
.map(t -> t.join())
.collect(Collectors.toList()));
pagedResult.setItems(blobs.stream().map(bl -> {
BlobClient blobClient = containerClient.getBlobClient(bl.getName());
return this.getActivityFromBlobClient(blobClient);
}).map(t -> t.join()).collect(Collectors.toList()));
if (pagedResult.getItems().size() == pageSize) {
pagedResult.setContinuationToken(blobs.get(blobs.size() - 1).getName());
@ -236,12 +238,15 @@ public class BlobsTranscriptStore implements TranscriptStore {
/**
* List conversations in the channelId.
*
* @param channelId The ID of the channel.
* @param continuationToken The continuation token (if available).
* @return A CompletableFuture that represents the work queued to execute.
*/
public CompletableFuture<PagedResult<TranscriptInfo>> listTranscripts(String channelId,
@Nullable String continuationToken) {
public CompletableFuture<PagedResult<TranscriptInfo>> listTranscripts(
String channelId,
@Nullable String continuationToken
) {
final int pageSize = 20;
if (StringUtils.isBlank(channelId)) {
@ -253,16 +258,17 @@ public class BlobsTranscriptStore implements TranscriptStore {
List<TranscriptInfo> conversations = new ArrayList<TranscriptInfo>();
do {
String prefix = String.format("%s/", sanitizeKey(channelId));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient.
listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
for (PagedResponse<BlobItem> blobPage : resultSegment) {
for (BlobItem blobItem : blobPage.getValue()) {
// Unescape the Id we escaped when we saved it
String conversationId = new String();
String lastName = Arrays.stream(blobItem.getName().split("/"))
.reduce((first, second) -> second.length() > 0 ? second : first).get();
.reduce((first, second) -> second.length() > 0 ? second : first)
.get();
try {
conversationId = URLDecoder.decode(lastName, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException ex) {
@ -299,6 +305,7 @@ public class BlobsTranscriptStore implements TranscriptStore {
/**
* Delete a specific conversation and all of it's activities.
*
* @param channelId The ID of the channel the conversation is in.
* @param conversationId The ID of the conversation to delete.
* @return A CompletableFuture that represents the work queued to execute.
@ -316,11 +323,12 @@ public class BlobsTranscriptStore implements TranscriptStore {
do {
String prefix = String.format("%s/%s/", sanitizeKey(channelId), sanitizeKey(conversationId));
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null).iterableByPage(token);
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
for (PagedResponse<BlobItem> blobPage : resultSegment) {
for (BlobItem blobItem : blobPage.getValue()) {
BlobClient blobClient = containerClient.getBlobClient(blobItem.getName());
if (blobClient.exists()) {
try {
@ -345,19 +353,22 @@ public class BlobsTranscriptStore implements TranscriptStore {
try {
String token = null;
do {
String prefix = String.format("%s/%s/",
sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId()));
String prefix = String.format(
"%s/%s/",
sanitizeKey(activity.getChannelId()),
sanitizeKey(activity.getConversation().getId())
);
Iterable<PagedResponse<BlobItem>> resultSegment = containerClient
.listBlobsByHierarchy("/",
this.getOptionsWithMetadata(prefix), null).iterableByPage(token);
.listBlobsByHierarchy("/", this.getOptionsWithMetadata(prefix), null)
.iterableByPage(token);
token = null;
for (PagedResponse<BlobItem> blobPage: resultSegment) {
for (BlobItem blobItem: blobPage.getValue()) {
for (PagedResponse<BlobItem> blobPage : resultSegment) {
for (BlobItem blobItem : blobPage.getValue()) {
if (blobItem.getMetadata().get("Id").equals(activity.getId())) {
BlobClient blobClient = containerClient.getBlobClient(blobItem.getName());
return this.getActivityFromBlobClient(blobClient)
.thenApply(blobActivity ->
new Pair<Activity, BlobClient>(blobActivity, blobClient));
return this.getActivityFromBlobClient(
blobClient
).thenApply(blobActivity -> new Pair<Activity, BlobClient>(blobActivity, blobClient));
}
}
@ -370,7 +381,8 @@ public class BlobsTranscriptStore implements TranscriptStore {
} catch (HttpResponseException ex) {
if (ex.getResponse().getStatusCode() == HttpStatus.SC_PRECONDITION_FAILED) {
// additional retry logic,
// even though this is a read operation blob storage can return 412 if there is contention
// even though this is a read operation blob storage can return 412 if there is
// contention
if (i++ < retryTimes) {
try {
TimeUnit.MILLISECONDS.sleep(milisecondsTimeout);
@ -405,8 +417,11 @@ public class BlobsTranscriptStore implements TranscriptStore {
return logActivityToBlobClient(activity, blobClient, null);
}
private CompletableFuture<Void> logActivityToBlobClient(Activity activity, BlobClient blobClient,
Boolean overwrite) {
private CompletableFuture<Void> logActivityToBlobClient(
Activity activity,
BlobClient blobClient,
Boolean overwrite
) {
if (overwrite == null) {
overwrite = false;
}
@ -440,9 +455,13 @@ public class BlobsTranscriptStore implements TranscriptStore {
}
private String getBlobName(Activity activity) {
String blobName = String.format("%s/%s/%s-%s.json",
sanitizeKey(activity.getChannelId()), sanitizeKey(activity.getConversation().getId()),
this.formatTicks(activity.getTimestamp()), sanitizeKey(activity.getId()));
String blobName = String.format(
"%s/%s/%s-%s.json",
sanitizeKey(activity.getChannelId()),
sanitizeKey(activity.getConversation().getId()),
this.formatTicks(activity.getTimestamp()),
sanitizeKey(activity.getId())
);
return blobName;
}
@ -459,8 +478,7 @@ public class BlobsTranscriptStore implements TranscriptStore {
private BlobContainerClient getContainerClient(String dataConnectionString, String containerName) {
containerName = containerName.toLowerCase();
containerClient = new BlobContainerClientBuilder()
.connectionString(dataConnectionString)
containerClient = new BlobContainerClientBuilder().connectionString(dataConnectionString)
.containerName(containerName)
.buildClient();
if (!CHECKED_CONTAINERS.contains(containerName)) {
@ -478,12 +496,12 @@ public class BlobsTranscriptStore implements TranscriptStore {
/**
* Formats a timestamp in a way that is consistent with the C# SDK.
*
* @param dateTime The dateTime used to get the ticks
* @return The String representing the ticks.
*/
private String formatTicks(OffsetDateTime dateTime) {
final Instant begin = ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0,
ZoneOffset.UTC).toInstant();
final Instant begin = ZonedDateTime.of(1, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC).toInstant();
final Instant end = dateTime.toInstant();
long secsDiff = Math.subtractExact(end.getEpochSecond(), begin.getEpochSecond());
long totalHundredNanos = Math.multiplyExact(secsDiff, multipleProductValue);

Просмотреть файл

@ -27,8 +27,10 @@ public class AzureQueueStorage extends QueueStorage {
/**
* Initializes a new instance of the {@link AzureQueueStorage} class.
*
* @param queuesStorageConnectionString Azure Storage connection string.
* @param queueName Name of the storage queue where entities will be queued.
* @param queueName Name of the storage queue where entities
* will be queued.
*/
public AzureQueueStorage(String queuesStorageConnectionString, String queueName) {
if (StringUtils.isBlank(queuesStorageConnectionString)) {
@ -39,27 +41,32 @@ public class AzureQueueStorage extends QueueStorage {
throw new IllegalArgumentException("queueName is required.");
}
queueClient = new QueueClientBuilder()
.connectionString(queuesStorageConnectionString)
.queueName(queueName)
.buildClient();
queueClient =
new QueueClientBuilder().connectionString(queuesStorageConnectionString).queueName(queueName).buildClient();
}
/**
* Queue an Activity to an Azure.Storage.Queues.QueueClient.
* The visibility timeout specifies how long the message should be invisible
* to Dequeue and Peek operations. The message content must be a UTF-8 encoded string that is up to 64KB in size.
* @param activity This is expected to be an {@link Activity} retrieved from a call to
* activity.GetConversationReference().GetContinuationActivity().
* This enables restarting the conversation using BotAdapter.ContinueConversationAsync.
* Queue an Activity to an Azure.Storage.Queues.QueueClient. The visibility
* timeout specifies how long the message should be invisible to Dequeue and
* Peek operations. The message content must be a UTF-8 encoded string that is
* up to 64KB in size.
*
* @param activity This is expected to be an {@link Activity} retrieved
* from a call to
* activity.GetConversationReference().GetContinuationActivity().
* This enables restarting the conversation using
* BotAdapter.ContinueConversationAsync.
* @param visibilityTimeout Default value of 0. Cannot be larger than 7 days.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return {@link SendMessageResult} as a Json string, from the QueueClient SendMessageAsync operation.
* @param timeToLive Specifies the time-to-live interval for the message.
* @return {@link SendMessageResult} as a Json string, from the QueueClient
* SendMessageAsync operation.
*/
@Override
public CompletableFuture<String> queueActivity(Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive) {
public CompletableFuture<String> queueActivity(
Activity activity,
@Nullable Duration visibilityTimeout,
@Nullable Duration timeToLive
) {
return CompletableFuture.supplyAsync(() -> {
if (createQueueIfNotExists) {
try {
@ -68,7 +75,8 @@ public class AzureQueueStorage extends QueueStorage {
throw new RuntimeException(e);
}
// This is an optimization flag to check if the container creation call has been made.
// This is an optimization flag to check if the container creation call has been
// made.
// It is okay if this is called more than once.
createQueueIfNotExists = false;
}