Replace ADAL authentication library with MSAL (#155)

* Convert from ADAL to MSAL (include a few questions to be resolved during code review)

* Resolve some TODOs

* Some minor changes and some changes from code review

* Incorporate most review comments

* Add '/' to authority to resolve issue.

* Per Yochai's request, first try to access the token silently, and if that fails authenticate from scratch

* Make the method determineAadAuthorityUrl() less confusing

* Fix error

* Fix test
This commit is contained in:
Yihezkel Schoenbrun 2020-12-20 19:42:18 +02:00 коммит произвёл GitHub
Родитель 8e6e6ffe95
Коммит 2e05191f2a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
15 изменённых файлов: 386 добавлений и 385 удалений

Просмотреть файл

@ -128,11 +128,11 @@
<artifactId>nimbus-jose-jwt</artifactId>
<version>[6.0.1,9.0.0)</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/adal4j -->
<!-- https://mvnrepository.com/artifact/com.microsoft.azure/msal4j -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>adal4j</artifactId>
<version>${adal4j.version}</version>
<artifactId>msal4j</artifactId>
<version>${msal4j.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpclient -->
@ -162,6 +162,11 @@
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.bouncycastle</groupId>

Просмотреть файл

@ -3,300 +3,259 @@
package com.microsoft.azure.kusto.data;
import com.microsoft.aad.adal4j.*;
import com.microsoft.aad.msal4j.*;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.naming.ServiceUnavailableException;
import java.awt.*;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.Date;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
class AadAuthenticationHelper {
private final static String DEFAULT_AAD_TENANT = "microsoft.com";
private final static String CLIENT_ID = "db662dc1-0cfe-4e1c-a843-19a68e65be58";
final static long MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS = 60000;
private ClientCredential clientCredential;
private String userUsername;
private String userPassword;
private String clusterUrl;
private String aadAuthorityUri;
private X509Certificate x509Certificate;
private PrivateKey privateKey;
private AuthenticationType authenticationType;
private String accessToken;
private AuthenticationResult lastAuthenticationResult;
private Lock lastAuthenticationResultLock = new ReentrantLock();
protected static final String ORGANIZATION_URI_SUFFIX = "organizations";
// TODO: Get ClientId from CM endpoint
protected static final String CLIENT_ID = "db662dc1-0cfe-4e1c-a843-19a68e65be58";
protected static final int MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS = 60 * 1000;
protected static final String ERROR_INVALID_AUTHORITY_URL = "Error acquiring ApplicationAccessToken due to invalid Authority URL";
protected static final String ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN = "Error acquiring ApplicationAccessToken";
private final Logger logger = LoggerFactory.getLogger(getClass());
private static final ExecutorService executor = Executors.newCachedThreadPool();
private static final int TIMEOUT_MS = 20 * 1000;
private static final int USER_PROMPT_TIMEOUT_MS = 120 * 1000;
private final String clusterUrl;
private final Set<String> scopes;
private final String scope;
private final AuthenticationType authenticationType;
private final String aadAuthorityUrl;
private String clientId;
private IClientSecret clientSecret;
private IClientCertificate clientCertificate;
private String applicationClientId;
private String userUsername;
private URI redirectUri;
private String accessToken;
private Callable<String> tokenProvider;
private IPublicClientApplication publicClientApplication;
private IConfidentialClientApplication confidentialClientApplication;
private enum AuthenticationType {
AAD_USERNAME_PASSWORD,
AAD_APPLICATION_KEY,
AAD_DEVICE_LOGIN,
AAD_APPLICATION_CERTIFICATE,
AAD_ACCESS_TOKEN,
AAD_ACCESS_TOKEN_PROVIDER
AAD_USER_PROMPT(ClientApplicationType.PUBLIC),
AAD_APPLICATION_KEY(ClientApplicationType.CONFIDENTIAL),
AAD_APPLICATION_CERTIFICATE(ClientApplicationType.CONFIDENTIAL),
AAD_ACCESS_TOKEN(ClientApplicationType.NOT_APPLICABLE),
AAD_ACCESS_TOKEN_PROVIDER(ClientApplicationType.NOT_APPLICABLE);
private final ClientApplicationType clientApplicationType;
AuthenticationType(ClientApplicationType clientApplicationType) {
this.clientApplicationType = clientApplicationType;
}
public ClientApplicationType getClientApplicationType() {
return this.clientApplicationType;
}
}
private enum ClientApplicationType {
PUBLIC,
CONFIDENTIAL,
NOT_APPLICABLE
}
AadAuthenticationHelper(@NotNull ConnectionStringBuilder csb) throws URISyntaxException {
URI clusterUri = new URI(csb.getClusterUrl());
clusterUrl = String.format("%s://%s", clusterUri.getScheme(), clusterUri.getHost());
if (StringUtils.isNotEmpty(csb.getApplicationClientId()) && StringUtils.isNotEmpty(csb.getApplicationKey())) {
clientCredential = new ClientCredential(csb.getApplicationClientId(), csb.getApplicationKey());
authenticationType = AuthenticationType.AAD_APPLICATION_KEY;
} else if (StringUtils.isNotEmpty(csb.getUserUsername()) && StringUtils.isNotEmpty(csb.getUserPassword())) {
userUsername = csb.getUserUsername();
userPassword = csb.getUserPassword();
authenticationType = AuthenticationType.AAD_USERNAME_PASSWORD;
} else if (csb.getX509Certificate() != null && csb.getPrivateKey() != null) {
x509Certificate = csb.getX509Certificate();
privateKey = csb.getPrivateKey();
applicationClientId = csb.getApplicationClientId();
authenticationType = AuthenticationType.AAD_APPLICATION_CERTIFICATE;
} else if (StringUtils.isNotBlank(csb.getAccessToken())) {
authenticationType = AuthenticationType.AAD_ACCESS_TOKEN;
accessToken = csb.getAccessToken();
} else if(csb.getTokenProvider() != null){
authenticationType = AuthenticationType.AAD_ACCESS_TOKEN_PROVIDER;
tokenProvider = csb.getTokenProvider();
} else {
authenticationType = AuthenticationType.AAD_DEVICE_LOGIN;
}
scope = String.format("%s/%s", clusterUrl, ".default");
scopes = new HashSet<>();
scopes.add(scope);
// Set the AAD Authority URI
String aadAuthorityId = (csb.getAuthorityId() == null ? DEFAULT_AAD_TENANT : csb.getAuthorityId());
String aadAuthorityFromEnv = System.getenv("AadAuthorityUri");
if (aadAuthorityFromEnv == null){
aadAuthorityUri = String.format("https://login.microsoftonline.com/%s", aadAuthorityId);
} else {
aadAuthorityUri = String.format("%s%s%s", aadAuthorityFromEnv, aadAuthorityFromEnv.endsWith("/") ? "" : "/", aadAuthorityId);
aadAuthorityUrl = determineAadAuthorityUrl(csb.getAuthorityId());
try {
if (StringUtils.isNotBlank(csb.getApplicationClientId()) && StringUtils.isNotBlank(csb.getApplicationKey())) {
clientId = csb.getApplicationClientId();
clientSecret = ClientCredentialFactory.createFromSecret(csb.getApplicationKey());
confidentialClientApplication = ConfidentialClientApplication.builder(clientId, clientSecret).authority(aadAuthorityUrl).build();
authenticationType = AuthenticationType.AAD_APPLICATION_KEY;
} else if (csb.getX509Certificate() != null && csb.getPrivateKey() != null && StringUtils.isNotBlank(csb.getApplicationClientId())) {
clientCertificate = ClientCredentialFactory.createFromCertificate(csb.getPrivateKey(), csb.getX509Certificate());
applicationClientId = csb.getApplicationClientId();
confidentialClientApplication = ConfidentialClientApplication.builder(applicationClientId, clientCertificate).authority(aadAuthorityUrl).validateAuthority(false).build();
authenticationType = AuthenticationType.AAD_APPLICATION_CERTIFICATE;
} else if (StringUtils.isNotBlank(csb.getAccessToken())) {
accessToken = csb.getAccessToken();
authenticationType = AuthenticationType.AAD_ACCESS_TOKEN;
} else if (csb.getTokenProvider() != null) {
tokenProvider = csb.getTokenProvider();
authenticationType = AuthenticationType.AAD_ACCESS_TOKEN_PROVIDER;
} else {
if (StringUtils.isNotBlank(csb.getUserUsernameHint())) {
userUsername = csb.getUserUsernameHint();
}
redirectUri = new URI("http://localhost");
publicClientApplication = PublicClientApplication.builder(CLIENT_ID).authority(aadAuthorityUrl).build();
authenticationType = AuthenticationType.AAD_USER_PROMPT;
}
} catch (MalformedURLException e) {
throw new URISyntaxException(aadAuthorityUrl, ERROR_INVALID_AUTHORITY_URL);
}
}
String acquireAccessToken() throws DataServiceException, DataClientException {
private String determineAadAuthorityUrl(String authorityId) {
if (authorityId == null) {
authorityId = ORGANIZATION_URI_SUFFIX;
}
String authorityUrl;
String aadAuthorityFromEnv = System.getenv("AadAuthorityUri");
if (aadAuthorityFromEnv == null) {
authorityUrl = String.format("https://login.microsoftonline.com/%s/", authorityId);
} else {
authorityUrl = String.format("%s%s%s/", aadAuthorityFromEnv, aadAuthorityFromEnv.endsWith("/") ? "" : "/", authorityId);
}
return authorityUrl;
}
protected String acquireAccessToken() throws DataServiceException, DataClientException {
if (authenticationType == AuthenticationType.AAD_ACCESS_TOKEN) {
return accessToken;
}
if (authenticationType == AuthenticationType.AAD_ACCESS_TOKEN_PROVIDER) {
try {
return tokenProvider.call();
Future<String> future = executor.submit(tokenProvider);
return future.get(USER_PROMPT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (Exception e) {
throw new DataClientException(clusterUrl, e.getMessage(), e);
}
}
if (lastAuthenticationResult == null) {
acquireToken();
} else if (IsInvalidToken()) {
if (lastAuthenticationResult.getRefreshToken() == null) {
acquireToken();
} else {
lastAuthenticationResultLock.lock();
try {
if (IsInvalidToken()) {
lastAuthenticationResult = acquireAccessTokenByRefreshToken();
}
} finally {
lastAuthenticationResultLock.unlock();
}
}
try {
return acquireAccessTokenSilently().accessToken();
} catch (DataServiceException ex) {
logger.error("Failed to acquire access token silently (via cache or refresh token). Attempting to get new access token.", ex);
return acquireNewAccessToken().accessToken();
}
return lastAuthenticationResult.getAccessToken();
}
private AuthenticationResult acquireAadUserAccessToken() throws DataServiceException, DataClientException {
AuthenticationContext context;
AuthenticationResult result;
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, true, service);
protected IAuthenticationResult acquireNewAccessToken() throws DataServiceException, DataClientException {
switch (authenticationType) {
case AAD_APPLICATION_KEY:
return acquireWithAadApplicationKey();
case AAD_APPLICATION_CERTIFICATE:
return acquireWithAadApplicationClientCertificate();
case AAD_USER_PROMPT:
return acquireWithUserPrompt();
default:
throw new DataClientException(String.format("Authentication type '%s' is invalid (cluster URL '%s')", authenticationType.name(), clusterUrl));
}
}
Future<AuthenticationResult> future = context.acquireToken(
clusterUrl, CLIENT_ID, userUsername, userPassword,
null);
result = future.get();
} catch (InterruptedException | ExecutionException | MalformedURLException e) {
throw new DataClientException(clusterUrl, "Error in acquiring UserAccessToken", e);
} finally {
if (service != null) {
service.shutdown();
}
protected IAuthenticationResult acquireWithAadApplicationKey() throws DataServiceException {
IAuthenticationResult result;
try {
CompletableFuture<IAuthenticationResult> future = confidentialClientApplication.acquireToken(ClientCredentialParameters.builder(scopes).build());
result = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
}
if (result == null) {
throw new DataServiceException(clusterUrl, "acquireAadUserAccessToken got 'null' authentication result");
throw new DataServiceException(clusterUrl, "acquireWithAadApplicationKey got 'null' authentication result");
}
return result;
}
private AuthenticationResult acquireAadApplicationAccessToken() throws DataServiceException, DataClientException {
AuthenticationContext context;
AuthenticationResult result;
ExecutorService service = null;
protected IAuthenticationResult acquireWithAadApplicationClientCertificate() throws DataServiceException {
IAuthenticationResult result;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, true, service);
Future<AuthenticationResult> future = context.acquireToken(clusterUrl, clientCredential, null);
result = future.get();
} catch (InterruptedException | ExecutionException | MalformedURLException e) {
throw new DataClientException(clusterUrl, "Error in acquiring ApplicationAccessToken", e);
} finally {
if (service != null) {
service.shutdown();
}
CompletableFuture<IAuthenticationResult> future = confidentialClientApplication.acquireToken(ClientCredentialParameters.builder(scopes).build());
result = future.get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (ExecutionException | TimeoutException e) {
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
}
if (result == null) {
throw new DataServiceException(clusterUrl, "acquireAadApplicationAccessToken got 'null' authentication result");
throw new DataServiceException(clusterUrl, "acquireWithAadApplicationClientCertificate got 'null' authentication result");
}
return result;
}
private AuthenticationResult acquireAccessTokenUsingDeviceCodeFlow() throws Exception {
AuthenticationContext context = null;
AuthenticationResult result = null;
ExecutorService service = null;
protected IAuthenticationResult acquireWithUserPrompt() throws DataServiceException, DataClientException {
IAuthenticationResult result;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, true, service);
Future<DeviceCode> future = context.acquireDeviceCode(CLIENT_ID, clusterUrl, null);
DeviceCode deviceCode = future.get();
System.out.println(deviceCode.getMessage());
if (Desktop.isDesktopSupported()) {
Desktop.getDesktop().browse(new URI(deviceCode.getVerificationUrl()));
}
result = waitAndAcquireTokenByDeviceCode(deviceCode, context);
} finally {
if (service != null) {
service.shutdown();
}
}
if (result == null) {
throw new ServiceUnavailableException("authentication result was null");
}
return result;
}
private AuthenticationResult waitAndAcquireTokenByDeviceCode(DeviceCode deviceCode, AuthenticationContext context)
throws InterruptedException {
int timeout = 60 * 1000;
AuthenticationResult result = null;
while (timeout > 0) {
try {
Future<AuthenticationResult> futureResult = context.acquireTokenByDeviceCode(deviceCode, null);
return futureResult.get();
} catch (ExecutionException e) {
Thread.sleep(1000);
timeout -= 1000;
}
}
return result;
}
AuthenticationResult acquireWithClientCertificate()
throws InterruptedException, ExecutionException, ServiceUnavailableException {
AuthenticationContext context;
AuthenticationResult result = null;
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, false, service);
AsymmetricKeyCredential asymmetricKeyCredential = AsymmetricKeyCredential.create(applicationClientId,
privateKey, x509Certificate);
// pass null value for optional callback function and acquire access token
result = context.acquireToken(clusterUrl, asymmetricKeyCredential, null).get();
// This is the only auth method that allows the same application to be used for multiple distinct accounts, so reset account cache between sign-ins
publicClientApplication = PublicClientApplication.builder(CLIENT_ID).authority(aadAuthorityUrl).build();
CompletableFuture<IAuthenticationResult> future = publicClientApplication.acquireToken(InteractiveRequestParameters.builder(redirectUri).scopes(scopes).loginHint(userUsername).build());
result = future.get(USER_PROMPT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
} catch (MalformedURLException e) {
e.printStackTrace();
} finally {
if (service != null) {
service.shutdown();
}
throw new DataClientException(clusterUrl, ERROR_INVALID_AUTHORITY_URL, e);
} catch (TimeoutException | ExecutionException e) {
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
}
if (result == null) {
throw new ServiceUnavailableException("authentication result was null");
throw new DataServiceException(clusterUrl, "acquireWithUserPrompt got 'null' authentication result");
}
return result;
}
private void acquireToken() throws DataServiceException {
lastAuthenticationResultLock.lock();
protected IAuthenticationResult acquireAccessTokenSilently() throws DataServiceException, DataClientException {
CompletableFuture<Set<IAccount>> accounts;
try {
if (IsInvalidToken()) {
switch (authenticationType) {
case AAD_APPLICATION_KEY:
lastAuthenticationResult = acquireAadApplicationAccessToken();
break;
case AAD_USERNAME_PASSWORD:
lastAuthenticationResult = acquireAadUserAccessToken();
break;
case AAD_DEVICE_LOGIN:
lastAuthenticationResult = acquireAccessTokenUsingDeviceCodeFlow();
break;
case AAD_APPLICATION_CERTIFICATE:
lastAuthenticationResult = acquireWithClientCertificate();
break;
default:
throw new DataServiceException("Authentication type: " + authenticationType.name() + " is invalid");
}
}
} catch (Exception e) {
throw new DataServiceException(e.getMessage(), e);
} finally {
lastAuthenticationResultLock.unlock();
}
}
private boolean IsInvalidToken() {
return lastAuthenticationResult == null || lastAuthenticationResult.getExpiresOnDate().before(dateInAMinute());
}
AuthenticationResult acquireAccessTokenByRefreshToken() throws DataServiceException {
AuthenticationContext context;
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
context = new AuthenticationContext(aadAuthorityUri, false, service);
switch (authenticationType) {
case AAD_APPLICATION_KEY:
case AAD_APPLICATION_CERTIFICATE:
return context.acquireTokenByRefreshToken(lastAuthenticationResult.getRefreshToken(), clientCredential, null).get();
case AAD_USERNAME_PASSWORD:
case AAD_DEVICE_LOGIN:
return context.acquireTokenByRefreshToken(lastAuthenticationResult.getRefreshToken(), CLIENT_ID, clusterUrl, null).get();
switch (authenticationType.getClientApplicationType()) {
case CONFIDENTIAL:
accounts = confidentialClientApplication.getAccounts();
return confidentialClientApplication.acquireTokenSilently(getSilentParameters(accounts)).get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
case PUBLIC:
accounts = publicClientApplication.getAccounts();
return publicClientApplication.acquireTokenSilently(getSilentParameters(accounts)).get(TIMEOUT_MS, TimeUnit.MILLISECONDS);
case NOT_APPLICABLE:
throw new DataClientException("Cannot obtain a refresh token for Authentication type '" + authenticationType.name());
default:
throw new DataServiceException("Authentication type: " + authenticationType.name() + " is invalid");
}
} catch (Exception e) {
throw new DataServiceException(e.getMessage(), e);
} finally {
if (service != null) {
service.shutdown();
throw new DataClientException("Authentication type '" + authenticationType.name() + "' is invalid");
}
} catch (MalformedURLException e) {
throw new DataClientException(clusterUrl, ERROR_INVALID_AUTHORITY_URL, e);
} catch (TimeoutException | ExecutionException e) {
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new DataServiceException(clusterUrl, ERROR_ACQUIRING_APPLICATION_ACCESS_TOKEN, e);
}
}
Date dateInAMinute() {
return new Date(System.currentTimeMillis() + MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS);
private SilentParameters getSilentParameters(CompletableFuture<Set<IAccount>> accounts) {
IAccount account;
Set<IAccount> accountSet = accounts.join();
if (StringUtils.isNotBlank(userUsername)) {
account = accountSet.stream().filter(u -> userUsername.equalsIgnoreCase(u.username())).findAny().orElse(null);
} else {
if (accountSet.isEmpty()) {
account = null;
} else {
// Normally we would filter accounts by the user authenticating, but there's only 1 per AadAuthenticationHelper instance
account = accountSet.iterator().next();
}
}
if (account == null) {
return SilentParameters.builder(scopes).authorityUrl(aadAuthorityUrl).build();
}
return SilentParameters.builder(scopes).account(account).authorityUrl(aadAuthorityUrl).build();
}
}

Просмотреть файл

@ -6,6 +6,9 @@ package com.microsoft.azure.kusto.data;
import java.net.URISyntaxException;
public class ClientFactory {
private ClientFactory() {
// Hide the default constructor, as this is a factory with static methods
}
public static Client createClient(ConnectionStringBuilder csb) throws URISyntaxException {
return new ClientImpl(csb);

Просмотреть файл

@ -24,8 +24,8 @@ public class ClientImpl implements Client, StreamingClient {
private static final String QUERY_ENDPOINT_VERSION = "v2";
private static final String STREAMING_VERSION = "v1";
private static final String DEFAULT_DATABASE_NAME = "NetDefaultDb";
private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10) + TimeUnit.SECONDS.toMillis(30);
private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4) + TimeUnit.SECONDS.toMillis(30);
private static final Long COMMAND_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);
private static final Long QUERY_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(4);
private static final Long STREAMING_INGEST_TIMEOUT_IN_MILLISECS = TimeUnit.MINUTES.toMillis(10);
private static final int CLIENT_SERVER_DELTA_IN_MILLISECS = (int) TimeUnit.SECONDS.toMillis(30);
public static final String FEDERATED_SECURITY_POSTFIX = ";fed=true";
@ -152,7 +152,7 @@ public class ClientImpl implements Client, StreamingClient {
if (timeoutMs == null) {
timeoutMs = STREAMING_INGEST_TIMEOUT_IN_MILLISECS;
}
return Utils.post(clusterEndpoint, null, stream, timeoutMs.intValue() + CLIENT_SERVER_DELTA_IN_MILLISECS, headers, leaveOpen);
return Utils.post(clusterEndpoint, null, stream, timeoutMs.intValue() + CLIENT_SERVER_DELTA_IN_MILLISECS, headers, leaveOpen);
}
private HashMap<String, String> initHeaders() throws DataServiceException, DataClientException {

Просмотреть файл

@ -11,10 +11,9 @@ import java.util.concurrent.Callable;
public class ConnectionStringBuilder {
private final static String DEFAULT_DEVICE_AUTH_TENANT = "common";
private static final String DEFAULT_DEVICE_AUTH_TENANT = "common";
private String clusterUri;
private String username;
private String password;
private String usernameHint;
private String applicationClientId;
private String applicationKey;
private X509Certificate x509Certificate;
@ -27,8 +26,7 @@ public class ConnectionStringBuilder {
private ConnectionStringBuilder(String resourceUri) {
clusterUri = resourceUri;
username = null;
password = null;
usernameHint = null;
applicationClientId = null;
applicationKey = null;
aadAuthorityId = null;
@ -46,12 +44,8 @@ public class ConnectionStringBuilder {
this.clusterUri = clusterUri;
}
String getUserUsername() {
return username;
}
String getUserPassword() {
return password;
String getUserUsernameHint() {
return usernameHint;
}
String getApplicationClientId() {
@ -98,37 +92,16 @@ public class ConnectionStringBuilder {
return tokenProvider;
}
public static ConnectionStringBuilder createWithAadUserCredentials(String resourceUri,
String username,
String password,
String authorityId) {
if (StringUtils.isEmpty(resourceUri)) {
throw new IllegalArgumentException("resourceUri cannot be null or empty");
}
if (StringUtils.isEmpty(username)) {
throw new IllegalArgumentException("username cannot be null or empty");
}
if (StringUtils.isEmpty(password)) {
throw new IllegalArgumentException("password cannot be null or empty");
}
ConnectionStringBuilder csb = new ConnectionStringBuilder(resourceUri);
csb.username = username;
csb.password = password;
csb.aadAuthorityId = authorityId;
return csb;
}
public static ConnectionStringBuilder createWithAadUserCredentials(String resourceUri,
String username,
String password) {
return createWithAadUserCredentials(resourceUri, username, password, null);
public static ConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri,
String applicationClientId,
String applicationKey) {
return createWithAadApplicationCredentials(resourceUri, applicationClientId, applicationKey, null);
}
public static ConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri,
String applicationClientId,
String applicationKey,
String authorityId) {
if (StringUtils.isEmpty(resourceUri)) {
throw new IllegalArgumentException("resourceUri cannot be null or empty");
}
@ -146,32 +119,32 @@ public class ConnectionStringBuilder {
return csb;
}
public static ConnectionStringBuilder createWithAadApplicationCredentials(String resourceUri,
String applicationClientId,
String applicationKey) {
return createWithAadApplicationCredentials(resourceUri, applicationClientId, applicationKey, null);
public static ConnectionStringBuilder createWithUserPrompt(String resourceUri) {
return createWithUserPrompt(resourceUri, DEFAULT_DEVICE_AUTH_TENANT, null);
}
public static ConnectionStringBuilder createWithDeviceCodeCredentials(String resourceUri, String authorityId) {
public static ConnectionStringBuilder createWithUserPrompt(String resourceUri, String usernameHint) {
return createWithUserPrompt(resourceUri, DEFAULT_DEVICE_AUTH_TENANT, usernameHint);
}
public static ConnectionStringBuilder createWithUserPrompt(String resourceUri, String authorityId, String usernameHint) {
if (StringUtils.isEmpty(resourceUri)) {
throw new IllegalArgumentException("resourceUri cannot be null or empty");
}
ConnectionStringBuilder csb = new ConnectionStringBuilder(resourceUri);
csb.aadAuthorityId = authorityId;
csb.usernameHint = usernameHint;
return csb;
}
public static ConnectionStringBuilder createWithDeviceCodeCredentials(String resourceUri) {
return createWithDeviceCodeCredentials(resourceUri, DEFAULT_DEVICE_AUTH_TENANT);
}
public static ConnectionStringBuilder createWithAadApplicationCertificate(String resourceUri,
String applicationClientId,
X509Certificate x509Certificate,
PrivateKey privateKey) {
return createWithAadApplicationCertificate(resourceUri, applicationClientId, x509Certificate, privateKey, null);
}
public static ConnectionStringBuilder createWithAadApplicationCertificate(String resourceUri,
String applicationClientId,
X509Certificate x509Certificate,

Просмотреть файл

@ -32,7 +32,8 @@ public interface StreamingClient {
/**
* <p>Execute the provided command against the default database.</p>
*
* @param command The command to execute
* @param command The command to execute
* @return {@link KustoOperationResult} object including the ingestion result
* @throws DataClientException An exception originating from a client activity
* @throws DataServiceException An exception returned from the service
*/

Просмотреть файл

@ -6,7 +6,9 @@ package com.microsoft.azure.kusto.data.exceptions;
public class DataClientException extends Exception {
private String ingestionSource;
public String getIngestionSource() { return ingestionSource; }
public String getIngestionSource() {
return ingestionSource;
}
public DataClientException(String message) {
super(message);

Просмотреть файл

@ -3,8 +3,9 @@
package com.microsoft.azure.kusto.data;
import com.microsoft.aad.adal4j.AuthenticationResult;
import com.microsoft.aad.adal4j.UserInfo;
import com.microsoft.aad.msal4j.IAccount;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import com.microsoft.aad.msal4j.ITenantProfile;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import com.microsoft.azure.kusto.data.exceptions.DataServiceException;
import org.bouncycastle.asn1.pkcs.PrivateKeyInfo;
@ -31,23 +32,21 @@ import java.security.PrivateKey;
import java.security.Security;
import java.security.cert.CertificateException;
import java.security.cert.X509Certificate;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.naming.ServiceUnavailableException;
import java.util.concurrent.TimeoutException;
import static com.microsoft.azure.kusto.data.AadAuthenticationHelper.MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS;
import static org.mockito.Mockito.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;
public class AadAuthenticationHelperTest {
@Test
@DisplayName("validate auth with certificate throws exception when missing or invalid parameters")
void acquireWithClientCertificateNullKey() throws CertificateException, OperatorCreationException,
PKCSException, IOException, URISyntaxException {
String certFilePath = Paths.get("src", "test", "resources", "cert.cer").toString();
String privateKeyPath = Paths.get("src", "test", "resources", "key.pem").toString();
@ -59,15 +58,15 @@ public class AadAuthenticationHelperTest {
AadAuthenticationHelper aadAuthenticationHelper = new AadAuthenticationHelper(csb);
Assertions.assertThrows(ExecutionException.class,
() -> aadAuthenticationHelper.acquireWithClientCertificate());
Assertions.assertThrows(DataServiceException.class,
() -> aadAuthenticationHelper.acquireWithAadApplicationClientCertificate());
}
static KeyCert readPem(String path, String password)
throws IOException, CertificateException, OperatorCreationException, PKCSException {
Security.addProvider(new BouncyCastleProvider());
PEMParser pemParser = new PEMParser(new FileReader(new File(path)));
PEMParser pemParser = new PEMParser(new FileReader(path));
PrivateKey privateKey = null;
X509Certificate cert = null;
Object object = pemParser.readObject();
@ -98,7 +97,7 @@ public class AadAuthenticationHelperTest {
@Test
@DisplayName("validate cached token. Refresh if needed. Call regularly if no refresh token")
void useCachedTokenAndRefreshWhenNeeded() throws InterruptedException, ExecutionException, ServiceUnavailableException, IOException, DataServiceException, URISyntaxException, CertificateException, OperatorCreationException, PKCSException, DataClientException {
void useCachedTokenAndRefreshWhenNeeded() throws IOException, DataServiceException, URISyntaxException, CertificateException, OperatorCreationException, PKCSException, DataClientException {
String certFilePath = Paths.get("src", "test", "resources", "cert.cer").toString();
String privateKeyPath = Paths.get("src", "test", "resources", "key.pem").toString();
@ -110,25 +109,107 @@ public class AadAuthenticationHelperTest {
AadAuthenticationHelper aadAuthenticationHelperSpy = spy(new AadAuthenticationHelper(csb));
AuthenticationResult authenticationResult = new AuthenticationResult("testType", "firstToken", "refreshToken", 0, "id", mock(UserInfo.class), false);
AuthenticationResult authenticationResultFromRefresh = new AuthenticationResult("testType", "fromRefresh", null, 90, "id", mock(UserInfo.class), false);
AuthenticationResult authenticationResultNullRefreshTokenResult = new AuthenticationResult("testType", "nullRefreshResult", null, 0, "id", mock(UserInfo.class), false);
IAuthenticationResult authenticationResult = new MockAuthenticationResult("firstToken", "firstToken", new MockAccount("homeAccountId", "environment", "username", Collections.emptyMap()), "environment", "environment", new Date());
IAuthenticationResult authenticationResultFromRefresh = new MockAuthenticationResult("fromRefresh", "fromRefresh", new MockAccount("homeAccountId", "environment", "username", Collections.emptyMap()), "environment", "environment", new Date());
IAuthenticationResult authenticationResultNullRefreshTokenResult = new MockAuthenticationResult("nullRefreshResult", "nullRefreshResult", new MockAccount("homeAccountId", "environment", "username", Collections.emptyMap()), "environment", "environment", new Date());
doReturn(authenticationResultFromRefresh).when(aadAuthenticationHelperSpy).acquireAccessTokenByRefreshToken();
doReturn(authenticationResult).when(aadAuthenticationHelperSpy).acquireWithClientCertificate();
doThrow(DataServiceException.class).when(aadAuthenticationHelperSpy).acquireAccessTokenSilently();
doReturn(authenticationResult).when(aadAuthenticationHelperSpy).acquireWithAadApplicationClientCertificate();
assertEquals("firstToken", aadAuthenticationHelperSpy.acquireAccessToken());
doReturn(authenticationResultFromRefresh).when(aadAuthenticationHelperSpy).acquireAccessTokenSilently();
// Token was passed as expired - expected to be refreshed
assertEquals("fromRefresh", aadAuthenticationHelperSpy.acquireAccessToken());
// Token is still valid - expected to return the same
assertEquals("fromRefresh", aadAuthenticationHelperSpy.acquireAccessToken());
doReturn(new Date(System.currentTimeMillis() + MIN_ACCESS_TOKEN_VALIDITY_IN_MILLISECS * 2)).when(aadAuthenticationHelperSpy).dateInAMinute();
doReturn(authenticationResultNullRefreshTokenResult).when(aadAuthenticationHelperSpy).acquireWithClientCertificate();
doReturn(authenticationResultNullRefreshTokenResult).when(aadAuthenticationHelperSpy).acquireWithAadApplicationClientCertificate();
// Null refresh token + token is now expired- expected to authenticate again and reacquire token
assertEquals("nullRefreshResult", aadAuthenticationHelperSpy.acquireAccessToken());
assertEquals("fromRefresh", aadAuthenticationHelperSpy.acquireAccessToken());
}
class MockAccount implements IAccount {
private final String homeAccountId;
private final String environment;
private final String username;
private final Map<String, ITenantProfile> getTenantProfiles;
public MockAccount(String homeAccountId, String environment, String username, Map<String, ITenantProfile> getTenantProfiles) {
this.homeAccountId = homeAccountId;
this.environment = environment;
this.username = username;
this.getTenantProfiles = getTenantProfiles;
}
@Override
public String homeAccountId() {
return homeAccountId;
}
@Override
public String environment() {
return environment;
}
@Override
public String username() {
return username;
}
@Override
public Map<String, ITenantProfile> getTenantProfiles() {
return getTenantProfiles;
}
}
class MockAuthenticationResult implements IAuthenticationResult {
private final String accessToken;
private final String idToken;
private final MockAccount account;
private final String environment;
private final String scopes;
private final Date expiresOnDate;
public MockAuthenticationResult(String accessToken, String idToken, MockAccount account, String environment, String scopes, Date expiresOnDate) {
this.accessToken = accessToken;
this.idToken = idToken;
this.account = account;
this.environment = environment;
this.scopes = scopes;
this.expiresOnDate = expiresOnDate;
}
@Override
public String accessToken() {
return accessToken;
}
@Override
public String idToken() {
return idToken;
}
@Override
public IAccount account() {
return account;
}
@Override
public String environment() {
return environment;
}
@Override
public String scopes() {
return scopes;
}
@Override
public Date expiresOnDate() {
return expiresOnDate;
}
}
}

Просмотреть файл

@ -19,37 +19,9 @@ import java.security.cert.X509Certificate;
import static com.microsoft.azure.kusto.data.AadAuthenticationHelperTest.readPem;
public class ConnectionStringBuilderTest {
@Test
@DisplayName("validate createWithAadUserCredentials throws IllegalArgumentException exception when missing or invalid parameters")
void createWithAadUserCredentials(){
//nullOrEmpty username
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials("resource.uri", null, "password"));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials("resource.uri", "", "password"));
//nullOrEmpty password
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials("resource.uri", "username", null));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials("resource.uri", "username", ""));
//nullOrEmpty resourceUri
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials(null, "username", "password"));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadUserCredentials("", "username", "password"));
}
@Test
@DisplayName("validate createWithAadApplicationCredentials throws IllegalArgumentException exception when missing or invalid parameters")
void createWithAadApplicationCredentials(){
void createWithAadApplicationCredentials() {
//nullOrEmpty appId
Assertions.assertThrows(IllegalArgumentException.class,
@ -75,16 +47,15 @@ public class ConnectionStringBuilderTest {
}
@Test
@DisplayName("validate createWithDeviceCodeCredentials throws IllegalArgumentException exception when missing or invalid parameters")
void createWithDeviceCodeCredentials(){
@DisplayName("validate createWithUserPrompt throws IllegalArgumentException exception when missing or invalid parameters")
void createWithUserPrompt() {
//nullOrEmpty resourceUri
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithDeviceCodeCredentials(null));
.createWithUserPrompt(null, null));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithDeviceCodeCredentials(""));
.createWithUserPrompt("", ""));
}
@Test
@ -92,8 +63,8 @@ public class ConnectionStringBuilderTest {
void createWithAadApplicationCertificate() throws CertificateException, OperatorCreationException,
PKCSException, IOException {
String certFilePath = Paths.get("src","test","resources", "cert.cer").toString();
String privateKeyPath = Paths.get("src","test","resources","key.pem").toString();
String certFilePath = Paths.get("src", "test", "resources", "cert.cer").toString();
String privateKeyPath = Paths.get("src", "test", "resources", "key.pem").toString();
X509Certificate x509Certificate = readPem(certFilePath, "basic").getCertificate();
PrivateKey privateKey = readPem(privateKeyPath, "basic").getKey();
@ -125,27 +96,27 @@ public class ConnectionStringBuilderTest {
@Test
@DisplayName("validate createWithAadAccessTokenAuthentication throws IllegalArgumentException exception when missing or invalid parameters")
void createWithAadAccessTokenAuthentication(){
void createWithAadAccessTokenAuthentication() {
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication(null, "token"));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication("","token"));
.createWithAadAccessTokenAuthentication("", "token"));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication("resource.uri", null));
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication("resource.uri",""));
Assertions.assertDoesNotThrow( () -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication("resource.uri","token"));
.createWithAadAccessTokenAuthentication("resource.uri", ""));
Assertions.assertDoesNotThrow(() -> ConnectionStringBuilder
.createWithAadAccessTokenAuthentication("resource.uri", "token"));
}
@Test
@DisplayName("validate createWithAadTokenProviderAuthentication throws IllegalArgumentException exception when missing or invalid parameters")
void createWithAadTokenProviderAuthentication(){
void createWithAadTokenProviderAuthentication() {
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
@ -156,7 +127,7 @@ public class ConnectionStringBuilderTest {
Assertions.assertThrows(IllegalArgumentException.class,
() -> ConnectionStringBuilder
.createWithAadTokenProviderAuthentication("resource.uri", null));
Assertions.assertDoesNotThrow( () -> ConnectionStringBuilder
Assertions.assertDoesNotThrow(() -> ConnectionStringBuilder
.createWithAadTokenProviderAuthentication("resource.uri", () -> "token"));
}

Просмотреть файл

@ -4,10 +4,14 @@
package com.microsoft.azure.kusto.ingest;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.exceptions.DataClientException;
import java.net.URISyntaxException;
public class IngestClientFactory {
private IngestClientFactory() {
// Hide the default constructor, as this is a factory with static methods
}
public static IngestClient createClient(ConnectionStringBuilder csb) throws URISyntaxException {
return new QueuedIngestClient(csb);

Просмотреть файл

@ -12,7 +12,6 @@ import com.microsoft.azure.kusto.ingest.source.BlobSourceInfo;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +33,7 @@ import java.net.URISyntaxException;
*/
public class ManagedStreamingIngestClient implements IngestClient {
private final static Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int MAX_RETRY_CALLS = 3;
private final QueuedIngestClient queuedIngestClient;
private final StreamingIngestClient streamingIngestClient;
@ -68,7 +67,6 @@ public class ManagedStreamingIngestClient implements IngestClient {
log.error("File not found when ingesting a file.", e);
throw new IngestionClientException("IO exception - check file path.", e);
}
}
/**
@ -122,23 +120,20 @@ public class ManagedStreamingIngestClient implements IngestClient {
for (int i = 0; i < MAX_RETRY_CALLS; i++) {
try {
return streamingIngestClient.ingestFromStream(streamSourceInfo, ingestionProperties);
}
catch (Exception e) {
} catch (Exception e) {
if (e instanceof IngestionServiceException
&& e.getCause() != null
&& e.getCause() instanceof DataServiceException
&& e.getCause() instanceof DataServiceException
&& e.getCause().getCause() != null
&& e.getCause().getCause() instanceof DataWebException) {
DataWebException webException = (DataWebException) e.getCause().getCause();
try {
OneApiError oneApiError = webException.getApiError();
if (oneApiError.isPermanent())
{
if (oneApiError.isPermanent()) {
log.error("Error is permanent, stopping.");
throw e;
}
} catch (JSONException je)
{
} catch (JSONException je) {
log.info("Failed to parse json in exception, continuing.", je);
}
}

Просмотреть файл

@ -15,10 +15,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -59,8 +56,8 @@ class ResourceManager implements Closeable {
private final Timer timer;
private ReadWriteLock ingestionResourcesLock = new ReentrantReadWriteLock();
private ReadWriteLock authTokenLock = new ReentrantReadWriteLock();
private static final long REFRESH_INGESTION_RESOURCES_PERIOD = 1000 * 60 * 60; // 1 hour
private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = 1000 * 60 * 15; // 15 minutes
private static final long REFRESH_INGESTION_RESOURCES_PERIOD = 1000L * 60 * 60; // 1 hour
private static final long REFRESH_INGESTION_RESOURCES_PERIOD_ON_FAILURE = 1000L * 60 * 15; // 15 minutes
private Long defaultRefreshTime;
private Long refreshTimeOnFailure;
public static final String SERVICE_TYPE_COLUMN_NAME = "ServiceType";
@ -186,7 +183,7 @@ class ResourceManager implements Closeable {
}
}
private void putIngestionResourceValues(ConcurrentHashMap<ResourceType, IngestionResource> ingestionResources, HashMap<ResourceType, IngestionResource> newIngestionResources) {
private void putIngestionResourceValues(ConcurrentHashMap<ResourceType, IngestionResource> ingestionResources, Map<ResourceType, IngestionResource> newIngestionResources) {
// Update the values in the original resources map:
newIngestionResources.keySet().forEach(
k -> ingestionResources.put(k, newIngestionResources.get(k))

Просмотреть файл

@ -19,7 +19,6 @@ import com.microsoft.azure.kusto.ingest.source.ResultSetSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.utils.URIBuilder;
import org.slf4j.Logger;

Просмотреть файл

@ -3,18 +3,29 @@
package com.microsoft.azure.kusto.ingest;
import com.microsoft.azure.kusto.data.*;
import com.microsoft.azure.kusto.ingest.IngestionMapping.*;
import com.microsoft.azure.kusto.data.ClientImpl;
import com.microsoft.azure.kusto.data.ConnectionStringBuilder;
import com.microsoft.azure.kusto.data.KustoOperationResult;
import com.microsoft.azure.kusto.data.KustoResultSetTable;
import com.microsoft.azure.kusto.ingest.IngestionMapping.IngestionMappingKind;
import com.microsoft.azure.kusto.ingest.IngestionProperties.DATA_FORMAT;
import com.microsoft.azure.kusto.ingest.source.*;
import org.junit.jupiter.api.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.microsoft.azure.kusto.ingest.source.CompressionType;
import com.microsoft.azure.kusto.ingest.source.FileSourceInfo;
import com.microsoft.azure.kusto.ingest.source.StreamSourceInfo;
import org.apache.commons.codec.binary.Base64;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.io.*;
import java.net.URISyntaxException;
import java.nio.file.*;
import java.util.*;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class E2ETest {
private static IngestClient ingestClient;
@ -68,7 +79,7 @@ public class E2ETest {
Assertions.fail("Failed to drop and create new table", ex);
}
resourcesPath = Paths.get(System.getProperty("user.dir"), "src","test", "resources").toString();
resourcesPath = Paths.get(System.getProperty("user.dir"), "src", "test", "resources").toString();
try {
String mappingAsString = new String(Files.readAllBytes(Paths.get(resourcesPath, "dataset_mapping.json")));
queryClient.execute(databaseName, String.format(".create table %s ingestion json mapping '%s' '%s'",
@ -94,7 +105,7 @@ public class E2ETest {
first.setPath("$.rownumber");
ColumnMapping second = new ColumnMapping("rowguid", "string");
second.setPath("$.rowguid");
ColumnMapping[] columnMapping = new ColumnMapping[] { first, second };
ColumnMapping[] columnMapping = new ColumnMapping[]{first, second};
ingestionPropertiesWithColumnMapping.setIngestionMapping(columnMapping, IngestionMappingKind.Json);
dataForTests = Arrays.asList(new TestDataItem() {
@ -139,7 +150,7 @@ public class E2ETest {
}
private void assertRowCount(int expectedRowsCount) {
KustoOperationResult result = null;
KustoOperationResult result;
int timeoutInSec = 100;
int actualRowsCount = 0;
@ -172,20 +183,20 @@ public class E2ETest {
} catch (Exception ex) {
Assertions.fail("Failed to execute show database principal command", ex);
}
KustoResultSetTable mainTableResultSet= result.getPrimaryResults();
while (mainTableResultSet.next()){
KustoResultSetTable mainTableResultSet = result.getPrimaryResults();
while (mainTableResultSet.next()) {
if (mainTableResultSet.getString("PrincipalFQN").equals(principalFqn)) {
found = true;
}
}
Assertions.assertTrue(found, "Faile to find authorized AppId in the database principals");
Assertions.assertTrue(found, "Failed to find authorized AppId in the database principals");
}
@Test
void testIngestFromFile() {
for (TestDataItem item : dataForTests) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(item.file.getPath(),item.file.length());
FileSourceInfo fileSourceInfo = new FileSourceInfo(item.file.getPath(), item.file.length());
try {
ingestClient.ingestFromFile(fileSourceInfo, item.ingestionProperties);
} catch (Exception ex) {
@ -213,10 +224,10 @@ public class E2ETest {
}
@Test
void testStramingIngestFromFile() {
void testStreamingIngestFromFile() {
for (TestDataItem item : dataForTests) {
if (item.testOnstreamingIngestion) {
FileSourceInfo fileSourceInfo = new FileSourceInfo(item.file.getPath(),item.file.length());
FileSourceInfo fileSourceInfo = new FileSourceInfo(item.file.getPath(), item.file.length());
try {
streamingIngestClient.ingestFromFile(fileSourceInfo, item.ingestionProperties);
} catch (Exception ex) {
@ -228,7 +239,7 @@ public class E2ETest {
}
@Test
void testStramingIngestFromStream() throws FileNotFoundException {
void testStreamingIngestFromStream() throws FileNotFoundException {
for (TestDataItem item : dataForTests) {
if (item.testOnstreamingIngestion) {
InputStream stream = new FileInputStream(item.file);

Просмотреть файл

@ -40,7 +40,7 @@
<!-- Compile dependencies -->
<slf4j.version>1.8.0-beta4</slf4j.version>
<commons-lang3.version>3.9</commons-lang3.version>
<adal4j.version>1.6.5</adal4j.version>
<msal4j.version>1.7.0</msal4j.version>
<httpclient.version>4.5.8</httpclient.version>
<httpcore.version>4.4.11</httpcore.version>
<json.version>20190722</json.version>