* Ingest at the last for dup blobs option (#404)

This commit is contained in:
Ramachandran A G 2024-10-08 14:18:08 +05:30 коммит произвёл GitHub
Родитель 175ac6081f
Коммит da0963fdbf
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: B5690EEEBB952194
2 изменённых файлов: 44 добавлений и 28 удалений

Просмотреть файл

@ -47,6 +47,7 @@ import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.concurrent.ConcurrentHashMap
object KustoWriter {
private val className = this.getClass.getSimpleName
@ -392,6 +393,8 @@ object KustoWriter {
batchIdForTracing: String): Unit = {
val partitionId = TaskContext.getPartitionId
val partitionIdString = TaskContext.getPartitionId.toString
val taskMap = new ConcurrentHashMap[String, BlobWriteResource]()
def ingest(
blobResource: BlobWriteResource,
size: Long,
@ -486,26 +489,30 @@ object KustoWriter {
if (shouldNotCommitBlockBlob) {
blobWriter
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
blobWriter.csvWriter.getCounter,
blobWriter.sas,
flushImmediately = !parameters.writeOptions.disableFlushImmediately,
curBlobUUID,
kustoClient)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
row._2,
curBlobUUID)
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, blobWriter)
} else {
KDSU.logInfo(
className,
s"Sealing blob in partition $partitionIdString for requestId: '${parameters.writeOptions.requestId}', " +
s"blob number ${row._2}, with size $count")
finalizeBlobWrite(blobWriter)
ingest(
blobWriter,
blobWriter.csvWriter.getCounter,
blobWriter.sas,
flushImmediately = !parameters.writeOptions.disableFlushImmediately,
curBlobUUID,
kustoClient)
curBlobUUID = UUID.randomUUID().toString
createBlobWriter(
parameters.coordinates,
parameters.tmpTableName,
kustoClient,
partitionIdString,
row._2,
curBlobUUID)
}
}
}
@ -515,13 +522,22 @@ object KustoWriter {
s"requestId: '${parameters.writeOptions.requestId}' ")
finalizeBlobWrite(lastBlobWriter)
if (lastBlobWriter.csvWriter.getCounter > 0) {
ingest(
lastBlobWriter,
lastBlobWriter.csvWriter.getCounter,
lastBlobWriter.sas,
flushImmediately = false,
curBlobUUID,
kustoClient)
if (parameters.writeOptions.ensureNoDupBlobs) {
taskMap.put(curBlobUUID, lastBlobWriter)
} else {
ingest(
lastBlobWriter,
lastBlobWriter.csvWriter.getCounter,
lastBlobWriter.sas,
flushImmediately = false,
curBlobUUID,
kustoClient)
}
}
if (parameters.writeOptions.ensureNoDupBlobs && taskMap.size() > 0) {
taskMap.forEach((uuid, bw) => {
ingest(bw, bw.csvWriter.getCounter, bw.sas, flushImmediately = false, uuid, kustoClient)
})
}
}

Просмотреть файл

@ -8,7 +8,7 @@
<packaging>pom</packaging>
<version>${revision}</version>
<properties>
<revision>5.2.2</revision>
<revision>5.2.3</revision>
<!-- Spark dependencies -->
<scala.version.major>2.12</scala.version.major>
<scalafmt.plugin.version>1.1.1640084764.9f463a9</scalafmt.plugin.version>