Merge branch 'dev' into updateRelease

This commit is contained in:
Annie Liang 2024-02-28 11:33:15 -08:00 коммит произвёл GitHub
Родитель f6ae7652be 00ee1f020a
Коммит 9eeaa5ea16
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
4 изменённых файлов: 78 добавлений и 18 удалений

Просмотреть файл

@ -1,6 +1,22 @@
## Release History
### 1.13.0-beta.1 (Unreleased)
### 1.14.0 (2024-02-28)
#### New Features
* Updated `azure-cosmos` version to 4.56.0.
#### Key Bug Fixes
* Fixed `NullPointerException` in `CosmosDBSinkConnector` when `TRACE` level log is enabled and `SinkRecord` value being null. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)
#### Other Changes
* Added more DEBUG level logs in `CosmosDBSourceConnector`. [PR 549](https://github.com/microsoft/kafka-connect-cosmosdb/pull/549)
### 1.13.0 (2024-01-25)
#### New Features
* Updated `azure-cosmos` version to 4.54.0.
#### Key Bug Fixes
* Upgraded `com.jayway.jsonpath:json-path` from 2.8.0 to 2.9.0 to address the security vulnerability. [PR 544](https://github.com/microsoft/kafka-connect-cosmosdb/pull/544)
https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2023-51074
* Fixed an issue where source connector can be stuck in an infinite loop when task got cancelled. [PR 545](https://github.com/microsoft/kafka-connect-cosmosdb/pull/545)
### 1.12.0 (2023-12-18)

Просмотреть файл

@ -7,7 +7,7 @@
<groupId>com.azure.cosmos.kafka</groupId>
<artifactId>kafka-connect-cosmos</artifactId>
<version>1.13.0-beta.1</version>
<version>1.14.0</version>
<name> kafka-connect-cosmos</name>
<url>https://github.com/microsoft/kafka-connect-cosmosdb</url>
@ -48,7 +48,7 @@
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-cosmos</artifactId>
<version>4.53.1</version>
<version>4.56.0</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>

Просмотреть файл

@ -104,9 +104,14 @@ public class CosmosDBSinkTask extends SinkTask {
if (record.key() != null) {
MDC.put(String.format("CosmosDbSink-%s", containerName), record.key().toString());
}
logger.trace("Writing record, value type: {}", record.value().getClass().getName());
logger.trace("Key Schema: {}", record.keySchema());
logger.trace("Value schema: {}", record.valueSchema());
if (record.value() != null) {
logger.trace("Writing record, value type: {}", record.value().getClass().getName());
logger.trace("Value schema: {}", record.valueSchema());
} else {
logger.trace("Record value is null");
}
Object recordValue;
if (record.value() instanceof Struct) {

Просмотреть файл

@ -35,6 +35,7 @@ import java.util.Map;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import static java.lang.Thread.sleep;
import static java.util.Collections.singletonMap;
@ -62,11 +63,11 @@ public class CosmosDBSourceTask extends SourceTask {
@Override
public void start(Map<String, String> map) {
logger.info("Starting CosmosDBSourceTask.");
logger.info("Worker {} Starting CosmosDBSourceTask.", this.config.getWorkerName());
config = new CosmosDBSourceConfig(map);
this.queue = new LinkedTransferQueue<>();
logger.info("Creating the client.");
logger.info("Worker {} Creating the client.", this.config.getWorkerName());
client = getCosmosClient(config);
// Initialize the database, feed and lease containers
@ -101,7 +102,7 @@ public class CosmosDBSourceTask extends SourceTask {
}
} // Wait for ChangeFeedProcessor to start.
logger.info("Started CosmosDB source task.");
logger.info("Worker {} Started CosmosDB source task.", this.config.getWorkerName());
}
private String getItemLsn(JsonNode item) {
@ -121,11 +122,27 @@ public class CosmosDBSourceTask extends SourceTask {
while (running.get()) {
fillRecords(records, topic);
if (records.isEmpty() || System.currentTimeMillis() > maxWaitTime) {
logger.info("Sending {} documents.", records.size());
break;
}
}
logger.info("Worker {}, Sending {} documents.", this.config.getWorkerName(), records.size());
if (logger.isDebugEnabled()) {
List<String> recordDetails =
records
.stream()
.map(sourceRecord -> String.format("[key %s - offset %s]", sourceRecord.key(), sourceRecord.sourceOffset()))
.collect(Collectors.toList());
logger.debug(
"Worker {}, sending {} documents",
this.config.getWorkerName(),
recordDetails
);
}
logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), true);
this.shouldFillMoreRecords.set(true);
return records;
}
@ -155,9 +172,6 @@ public class CosmosDBSourceTask extends SourceTask {
// Get the latest lsn and record as offset
Map<String, Object> sourceOffset = singletonMap(OFFSET_KEY, getItemLsn(node));
if (logger.isDebugEnabled()) {
logger.debug("Latest offset is {}.", sourceOffset.get(OFFSET_KEY));
}
// Convert JSON to Kafka Connect struct and JSON schema
SchemaAndValue schemaAndValue = jsonToStruct.recordToSchemaAndValue(node);
@ -176,13 +190,16 @@ public class CosmosDBSourceTask extends SourceTask {
} else {
// If the buffer Size exceeds then do not remove the node.
if (logger.isDebugEnabled()) {
logger.debug("Adding record back to the queue since adding it exceeds the allowed buffer size {}", config.getTaskBufferSize());
logger.debug(
"Worker {} Adding record back to the queue since adding it exceeds the allowed buffer size {}",
this.config.getWorkerName(),
config.getTaskBufferSize());
}
this.queue.add(node);
break;
}
} catch (Exception e) {
logger.error("Failed to fill Source Records for Topic {}", topic);
logger.error("Worker {} Failed to fill Source Records for Topic {}", this.config.getWorkerName(), topic);
throw e;
}
}
@ -190,7 +207,7 @@ public class CosmosDBSourceTask extends SourceTask {
@Override
public void stop() {
logger.info("Stopping CosmosDB source task.");
logger.info("Worker {} Stopping CosmosDB source task.", this.config.getWorkerName());
// NOTE: poll() method and stop() method are both called from the same thread,
// so it is important not to include any changes which may block both places forever
running.set(false);
@ -200,10 +217,14 @@ public class CosmosDBSourceTask extends SourceTask {
changeFeedProcessor.stop().block();
changeFeedProcessor = null;
}
if (this.client != null) {
this.client.close();
}
}
private CosmosAsyncClient getCosmosClient(CosmosDBSourceConfig config) {
logger.info("Creating Cosmos Client.");
logger.info("Worker {} Creating Cosmos Client.", this.config.getWorkerName());
CosmosClientBuilder cosmosClientBuilder = new CosmosClientBuilder()
.endpoint(config.getConnEndpoint())
@ -211,7 +232,7 @@ public class CosmosDBSourceTask extends SourceTask {
.consistencyLevel(ConsistencyLevel.SESSION)
.contentResponseOnWriteEnabled(true)
.connectionSharingAcrossClientsEnabled(config.isConnectionSharingEnabled())
.userAgentSuffix(CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version());
.userAgentSuffix(getUserAgentSuffix());
if (config.isGatewayModeEnabled()) {
cosmosClientBuilder.gatewayMode();
@ -220,6 +241,10 @@ public class CosmosDBSourceTask extends SourceTask {
return cosmosClientBuilder.buildAsyncClient();
}
private String getUserAgentSuffix() {
return CosmosDBConfig.COSMOS_CLIENT_USER_AGENT_SUFFIX + version() + "|" + this.config.getWorkerName();
}
private ChangeFeedProcessor getChangeFeedProcessor(
String hostName,
CosmosAsyncContainer feedContainer,
@ -243,11 +268,24 @@ public class CosmosDBSourceTask extends SourceTask {
}
protected void handleCosmosDbChanges(List<JsonNode> docs) {
if (docs != null) {
List<String> docIds =
docs
.stream()
.map(jsonNode -> jsonNode.get("id") != null ? jsonNode.get("id").asText() : "null")
.collect(Collectors.toList());
logger.debug(
"handleCosmosDbChanges - Worker {}, total docs {}, Details [{}].",
this.config.getWorkerName(),
docIds.size(),
docIds);
}
for (JsonNode document : docs) {
// Blocks for each transfer till it is processed by the poll method.
// If we fail before checkpointing then the new worker starts again.
try {
logger.trace("Queuing document");
logger.trace("Queuing document {}", document.get("id") != null ? document.get("id").asText() : "null");
// The item is being transferred to the queue, and the method will only return if the item has been polled from the queue.
// The queue is being continuously polled and then put into a batch list, but the batch list is not being flushed right away
@ -264,6 +302,7 @@ public class CosmosDBSourceTask extends SourceTask {
if (docs.size() > 0) {
// it is important to flush the current batches to kafka as currently we are using lease container continuationToken for book marking
// so we would only want to move ahead of the book marking when all the records have been returned to kafka
logger.debug("Worker {}, shouldFillMoreRecords {}", this.config.getWorkerName(), false);
this.shouldFillMoreRecords.set(false);
}
}