making EventHubs API call for every partition each batch
This commit is contained in:
Родитель
b1e026758a
Коммит
612cb99d30
|
@ -17,9 +17,6 @@
|
|||
|
||||
package org.apache.spark.eventhubscommon
|
||||
|
||||
import scala.collection.mutable.ListBuffer
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
import org.apache.spark.eventhubscommon.client.{EventHubClient, EventHubsClientWrapper, EventHubsOffsetTypes}
|
||||
import org.apache.spark.eventhubscommon.client.EventHubsOffsetTypes.EventHubsOffsetType
|
||||
import org.apache.spark.internal.Logging
|
||||
|
@ -73,35 +70,13 @@ private[spark] object RateControlUtils extends Logging {
|
|||
defaultRateControl(currentOffsetsAndSeqNums, highestEndpoints, eventhubsParams)
|
||||
}
|
||||
|
||||
private def collectPartitionsNeedingLargerProcessingRange(
|
||||
fetchedHighestOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)],
|
||||
currentOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)]):
|
||||
List[EventHubNameAndPartition] = {
|
||||
val partitionList = new ListBuffer[EventHubNameAndPartition]
|
||||
if (fetchedHighestOffsetsAndSeqNums != null) {
|
||||
for ((ehNameAndPartition, (offset, seqId)) <- fetchedHighestOffsetsAndSeqNums) {
|
||||
if (currentOffsetsAndSeqNums(ehNameAndPartition)._2 >=
|
||||
fetchedHighestOffsetsAndSeqNums(ehNameAndPartition)._2) {
|
||||
partitionList += ehNameAndPartition
|
||||
}
|
||||
}
|
||||
} else {
|
||||
partitionList ++= currentOffsetsAndSeqNums.keySet
|
||||
}
|
||||
partitionList.toList
|
||||
}
|
||||
|
||||
private[spark] def fetchLatestOffset(
|
||||
eventHubClient: EventHubClient,
|
||||
retryIfFail: Boolean,
|
||||
fetchedHighestOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)],
|
||||
currentOffsetsAndSeqNums: Map[EventHubNameAndPartition, (Long, Long)]):
|
||||
Option[Map[EventHubNameAndPartition, (Long, Long)]] = {
|
||||
// check if there is any eventhubs partition which potentially has newly arrived message (
|
||||
// the fetched highest message id is within the next batch's processing engine)
|
||||
val demandingEhNameAndPartitions = collectPartitionsNeedingLargerProcessingRange(
|
||||
fetchedHighestOffsetsAndSeqNums, currentOffsetsAndSeqNums)
|
||||
val r = eventHubClient.endPointOfPartition(retryIfFail, demandingEhNameAndPartitions)
|
||||
val r = eventHubClient.endPointOfPartition(retryIfFail, currentOffsetsAndSeqNums.keySet.toList)
|
||||
if (r.isDefined) {
|
||||
// merge results
|
||||
val mergedOffsets = if (fetchedHighestOffsetsAndSeqNums != null) {
|
||||
|
|
Загрузка…
Ссылка в новой задаче