diff --git a/core/pom.xml b/core/pom.xml
index 02c5802c..f0920b7f 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -23,7 +23,7 @@
com.microsoft.azure
azure-eventhubs-spark-parent_2.11
- 2.3.13
+ 2.3.14
../pom.xml
azure-eventhubs-spark_2.11
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala
index a091f1b7..53d28b0a 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsConf.scala
@@ -435,6 +435,22 @@ final class EventHubsConf private (private val connectionStr: String)
set(MaxEventsPerTriggerKey, limit)
}
+ /**
+ * Set the size of thread pool.
+ * Default: [[DefaultUseExclusiveReceiver]]
+ *
+ * @param b the flag which specifies whether the connector uses an epoch receiver
+ * @return the updated [[EventHubsConf]] instance
+ */
+ def setUseExclusiveReceiver(b: Boolean): EventHubsConf = {
+ set(UseExclusiveReceiverKey, b)
+ }
+
+ /** The current thread pool size. */
+ def useExclusiveReceiver: Boolean = {
+ self.get(UseExclusiveReceiverKey).getOrElse(DefaultUseExclusiveReceiver).toBoolean
+ }
+
// The simulated client (and simulated eventhubs) will be used. These
// can be found in EventHubsTestUtils.
private[spark] def setUseSimulatedClient(b: Boolean): EventHubsConf = {
@@ -484,6 +500,7 @@ object EventHubsConf extends Logging {
val ThreadPoolSizeKey = "eventhubs.threadPoolSize"
val MaxEventsPerTriggerKey = "maxEventsPerTrigger"
val UseSimulatedClientKey = "useSimulatedClient"
+ val UseExclusiveReceiverKey = "useExclusiveReceiver"
/** Creates an EventHubsConf */
def apply(connectionString: String) = new EventHubsConf(connectionString)
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala
index dffbb213..e1e60d4a 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/EventHubsUtils.scala
@@ -17,14 +17,22 @@
package org.apache.spark.eventhubs
-import com.microsoft.azure.eventhubs.EventData
+import java.util.concurrent.CompletableFuture
+
+import com.microsoft.azure.eventhubs.{
+ EventData,
+ EventHubClient,
+ PartitionReceiver,
+ ReceiverOptions,
+ EventPosition => ehep
+}
+import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.eventhubs.client.EventHubsClient
import org.apache.spark.eventhubs.rdd.{ EventHubsRDD, OffsetRange }
import org.apache.spark.streaming.StreamingContext
-import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
-import org.apache.spark.SparkContext
-import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
+import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
+import org.apache.spark.{ SparkContext, TaskContext }
/**
* Helper to create Direct DStreams which consume events from Event Hubs.
@@ -35,7 +43,7 @@ object EventHubsUtils {
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
*
- * @param ssc the StreamingContext this DStream belongs to
+ * @param ssc the StreamingContext this DStream belongs to
* @param ehConf the parameters for your EventHubs instance
* @return An [[EventHubsDirectDStream]]
*/
@@ -47,7 +55,7 @@ object EventHubsUtils {
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
*
- * @param jssc the JavaStreamingContext this DStream belongs to
+ * @param jssc the JavaStreamingContext this DStream belongs to
* @param ehConf the parameters for your EventHubs instance
* @return A [[JavaInputDStream]] containing [[EventData]]
*/
@@ -60,8 +68,8 @@ object EventHubsUtils {
* Creates an RDD which is contains events from an EventHubs instance.
* Starting and ending offsets are specified in advance.
*
- * @param sc the SparkContext the RDD belongs to
- * @param ehConf contains EventHubs-specific configurations
+ * @param sc the SparkContext the RDD belongs to
+ * @param ehConf contains EventHubs-specific configurations
* @param offsetRanges offset ranges that define the EventHubs data belonging to this RDD
* @return An [[EventHubsRDD]]
*
@@ -76,8 +84,8 @@ object EventHubsUtils {
* Creates an RDD which is contains events from an EventHubs instance.
* Starting and ending offsets are specified in advance.
*
- * @param jsc the JavaSparkContext the RDD belongs to
- * @param ehConf contains EventHubs-specific configurations
+ * @param jsc the JavaSparkContext the RDD belongs to
+ * @param ehConf contains EventHubs-specific configurations
* @param offsetRanges offset ranges that define the EventHubs data belonging to this RDD
* @return A [[JavaRDD]] containing [[EventData]]
*
@@ -87,4 +95,30 @@ object EventHubsUtils {
offsetRanges: Array[OffsetRange]): JavaRDD[EventData] = {
new JavaRDD(createRDD(jsc.sc, ehConf, offsetRanges))
}
+
+ def createReceiverInner(
+ client: EventHubClient,
+ useExclusiveReceiver: Boolean,
+ consumerGroup: String,
+ partitionId: String,
+ eventPosition: ehep,
+ receiverOptions: ReceiverOptions): CompletableFuture[PartitionReceiver] = {
+ if (useExclusiveReceiver) {
+ client.createEpochReceiver(consumerGroup,
+ partitionId,
+ eventPosition,
+ DefaultEpoch,
+ receiverOptions)
+
+ } else {
+ client.createReceiver(consumerGroup, partitionId, eventPosition, receiverOptions)
+ }
+ }
+
+ def getTaskId: Long = {
+ val taskContext = TaskContext.get()
+ if (taskContext != null) {
+ taskContext.taskAttemptId()
+ } else -1
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala
index 0ceafd8b..1691569d 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/client/CachedEventHubsReceiver.scala
@@ -18,15 +18,23 @@
package org.apache.spark.eventhubs.client
import java.time.Duration
+import java.util.concurrent.TimeUnit
import com.microsoft.azure.eventhubs._
-import org.apache.spark.eventhubs.utils.RetryUtils.{ retryJava, retryNotNull }
-import org.apache.spark.eventhubs.{ EventHubsConf, NameAndPartition, SequenceNumber }
+import org.apache.spark.SparkEnv
+import org.apache.spark.eventhubs.utils.RetryUtils.{ after, retryJava, retryNotNull }
+import org.apache.spark.eventhubs.{
+ DefaultConsumerGroup,
+ EventHubsConf,
+ EventHubsUtils,
+ NameAndPartition,
+ SequenceNumber
+}
import org.apache.spark.internal.Logging
-import org.apache.spark.{ SparkEnv, TaskContext }
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
import scala.concurrent.{ Await, Awaitable, Future }
private[spark] trait CachedReceiver {
@@ -63,58 +71,86 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
private lazy val client: EventHubClient = ClientConnectionPool.borrowClient(ehConf)
- private var receiver: Future[PartitionReceiver] = createReceiver(startSeqNo)
+ private var receiver: PartitionReceiver = createReceiver(startSeqNo)
- private def createReceiver(seqNo: SequenceNumber): Future[PartitionReceiver] = {
+ private def createReceiver(seqNo: SequenceNumber): PartitionReceiver = {
+ val taskId = EventHubsUtils.getTaskId
logInfo(
- s"creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. seqNo: $seqNo")
+ s"(TID $taskId) creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. seqNo: $seqNo")
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
val receiverOptions = new ReceiverOptions
receiverOptions.setReceiverRuntimeMetricEnabled(true)
receiverOptions.setPrefetchCount(ehConf.prefetchCount.getOrElse(DefaultPrefetchCount))
- receiverOptions.setIdentifier(
- s"spark-${SparkEnv.get.executorId}-${TaskContext.get.taskAttemptId}")
- val epochReceiver = retryJava(
- client.createEpochReceiver(consumerGroup,
- nAndP.partitionId.toString,
- EventPosition.fromSequenceNumber(seqNo).convert,
- DefaultEpoch,
- receiverOptions),
+ receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId")
+ val consumer = retryJava(
+ EventHubsUtils.createReceiverInner(client,
+ ehConf.useExclusiveReceiver,
+ consumerGroup,
+ nAndP.partitionId.toString,
+ EventPosition.fromSequenceNumber(seqNo).convert,
+ receiverOptions),
"CachedReceiver creation."
)
- epochReceiver
+ Await.result(consumer, ehConf.internalOperationTimeout)
}
private def lastReceivedOffset(): Future[Long] = {
- receiver
- .flatMap { r =>
- if (r.getEventPosition.getSequenceNumber != null) {
- Future.successful(r.getEventPosition.getSequenceNumber)
- } else {
- Future.successful(-1)
- }
- }
+ if (receiver.getEventPosition.getSequenceNumber != null) {
+ Future.successful(receiver.getEventPosition.getSequenceNumber)
+ } else {
+ Future.successful(-1)
+ }
}
private def receiveOne(timeout: Duration, msg: String): Future[Iterable[EventData]] = {
- receiver
- .flatMap { r =>
- r.setReceiveTimeout(timeout)
- retryNotNull(r.receive(1), msg)
+ def receiveOneWithRetry(timeout: Duration,
+ msg: String,
+ retryCount: Int): Future[Iterable[EventData]] = {
+ if (!receiver.getIsOpen && retryCount < RetryCount) {
+ val taskId = EventHubsUtils.getTaskId
+ logInfo(
+ s"(TID $taskId) receiver is not opened yet. Will retry.. $nAndP, consumer group: ${ehConf.consumerGroup
+ .getOrElse(DefaultConsumerGroup)}")
+
+ after(WaitInterval.milliseconds)(receiveOneWithRetry(timeout, msg, retryCount + 1))
}
- .map {
+
+ receiver.setReceiveTimeout(timeout)
+ retryNotNull(receiver.receive(1), msg).map(
_.asScala
- }
+ )
+ }
+ receiveOneWithRetry(timeout, msg, retryCount = 0)
+ }
+
+ private def closeReceiver(): Future[Void] = {
+ retryJava(receiver.close(), "closing a receiver")
+ }
+
+ private def recreateReceiver(seqNo: SequenceNumber): Unit = {
+ val taskId = EventHubsUtils.getTaskId
+ val startTimeNs = System.nanoTime()
+ def elapsedTimeNs = System.nanoTime() - startTimeNs
+
+ Await.result(closeReceiver(), ehConf.internalOperationTimeout)
+ receiver = createReceiver(seqNo)
+
+ val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
+ logInfo(s"(TID $taskId) Finished recreating a receiver for $nAndP, ${ehConf.consumerGroup
+ .getOrElse(DefaultConsumerGroup)}: $elapsedTimeMs ms")
}
private def checkCursor(requestSeqNo: SequenceNumber): Future[Iterable[EventData]] = {
+ val taskId = EventHubsUtils.getTaskId
+
val lastReceivedSeqNo =
Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout)
if (lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) {
- logInfo(
- s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo")
- receiver = createReceiver(requestSeqNo)
+ logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse(
+ DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo")
+
+ recreateReceiver(requestSeqNo)
}
val event = awaitReceiveMessage(
@@ -129,8 +165,9 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
// 2) Your desired event has expired from the service.
// First, we'll check for case (1).
logInfo(
- s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
- receiver = createReceiver(requestSeqNo)
+ s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse(
+ DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
+ recreateReceiver(requestSeqNo)
val movedEvent = awaitReceiveMessage(
receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor move"),
requestSeqNo)
@@ -167,6 +204,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
}
private def receive(requestSeqNo: SequenceNumber, batchSize: Int): Iterator[EventData] = {
+ val taskId = EventHubsUtils.getTaskId
+ val startTimeNs = System.nanoTime()
+ def elapsedTimeNs = System.nanoTime() - startTimeNs
+
// Retrieve the events. First, we get the first event in the batch.
// Then, if the succeeds, we collect the rest of the data.
val first = Await.result(checkCursor(requestSeqNo), ehConf.internalOperationTimeout)
@@ -190,18 +231,26 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
val (result, validate) = sorted.duplicate
assert(validate.size == newBatchSize)
+
+ val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
+ logInfo(s"(TID $taskId) Finished receiving for $nAndP, consumer group: ${ehConf.consumerGroup
+ .getOrElse(DefaultConsumerGroup)}, batchSize: $batchSize, elapsed time: $elapsedTimeMs ms")
+
result
}
private def awaitReceiveMessage[T](awaitable: Awaitable[T], requestSeqNo: SequenceNumber): T = {
+ val taskId = EventHubsUtils.getTaskId
+
try {
Await.result(awaitable, ehConf.internalOperationTimeout)
} catch {
case e: AwaitTimeoutException =>
logError(
- s"awaitReceiveMessage call failed with timeout. Event Hub $nAndP, ConsumerGroup ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo")
+ s"(TID $taskId) awaitReceiveMessage call failed with timeout. Event Hub $nAndP, ConsumerGroup ${ehConf.consumerGroup
+ .getOrElse(DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo")
- receiver = createReceiver(requestSeqNo)
+ recreateReceiver(requestSeqNo)
throw e
}
}
@@ -226,8 +275,10 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): Iterator[EventData] = {
- logInfo(
- s"EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, batchSize: $batchSize")
+ val taskId = EventHubsUtils.getTaskId
+
+ logInfo(s"(TID $taskId) EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup
+ .getOrElse(DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, batchSize: $batchSize")
var receiver: CachedEventHubsReceiver = null
receivers.synchronized {
receiver = receivers.getOrElseUpdate(key(ehConf, nAndP), {
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/ClientConnectionPool.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/ClientConnectionPool.scala
index 14472513..df912886 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/client/ClientConnectionPool.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/client/ClientConnectionPool.scala
@@ -45,16 +45,22 @@ private class ClientConnectionPool(val ehConf: EventHubsConf) extends Logging {
*/
private def borrowClient: EventHubClient = {
var client = pool.poll()
+ val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
if (client == null) {
logInfo(
- s"No clients left to borrow. EventHub name: ${ehConf.name}. Creating client ${count.incrementAndGet()}")
+ s"No clients left to borrow. EventHub name: ${ehConf.name}, " +
+ s"ConsumerGroup name: $consumerGroup. Creating client ${count.incrementAndGet()}")
val connStr = ConnectionStringBuilder(ehConf.connectionString)
connStr.setOperationTimeout(ehConf.operationTimeout.getOrElse(DefaultOperationTimeout))
+ EventHubsClient.userAgent =
+ s"SparkConnector-$SparkConnectorVersion-[${ehConf.name}]-[$consumerGroup]"
while (client == null) {
- client = EventHubClient.createSync(connStr.toString, ClientThreadPool.get(ehConf))
+ client = EventHubClient.createFromConnectionStringSync(connStr.toString,
+ ClientThreadPool.get(ehConf))
}
} else {
- logInfo(s"Borrowing client. EventHub name: ${ehConf.name}")
+ logInfo(
+ s"Borrowing client. EventHub name: ${ehConf.name}, ConsumerGroup name: $consumerGroup")
}
logInfo(s"Available clients: {${pool.size}}. Total clients: ${count.get}")
client
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala
index a3c6896d..0eaf1082 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/client/EventHubsClient.scala
@@ -266,6 +266,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
case _ =>
val runtimeInfo =
Await.result(getRunTimeInfoF(nAndP.partitionId), ehConf.internalOperationTimeout)
+ var receiver: Future[PartitionReceiver] = null
val seqNo =
if (runtimeInfo.getIsEmpty || (pos.enqueuedTime != null &&
runtimeInfo.getLastEnqueuedTimeUtc.isBefore(pos.enqueuedTime.toInstant))) {
@@ -277,12 +278,15 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
receiverOptions.setPrefetchCount(1)
receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}")
- val receiver = retryJava(client.createEpochReceiver(consumerGroup,
- nAndP.partitionId.toString,
- pos.convert,
- DefaultEpoch,
- receiverOptions),
- "translate: epoch receiver creation.")
+ receiver = retryJava(
+ EventHubsUtils.createReceiverInner(client,
+ ehConf.useExclusiveReceiver,
+ consumerGroup,
+ nAndP.partitionId.toString,
+ pos.convert,
+ receiverOptions),
+ "translate: receiver creation."
+ )
receiver
.flatMap { r =>
r.setReceiveTimeout(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout))
@@ -292,6 +296,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
e.iterator.next.getSystemProperties.getSequenceNumber
}
}
+
(nAndP.partitionId, seqNo)
}
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/package.scala b/core/src/main/scala/org/apache/spark/eventhubs/package.scala
index bea4c5a4..2986686e 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/package.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/package.scala
@@ -44,16 +44,18 @@ package object eventhubs {
val DefaultPrefetchCount: Int = PartitionReceiver.DEFAULT_PREFETCH_COUNT
val DefaultFailOnDataLoss = "true"
val DefaultUseSimulatedClient = "false"
+ val DefaultUseExclusiveReceiver = "true"
val StartingSequenceNumber = 0L
val DefaultThreadPoolSize = 16
val DefaultEpoch = 0L
val RetryCount = 3
+ val WaitInterval = 3000
val OffsetAnnotation = "x-opt-offset"
val EnqueuedTimeAnnotation = "x-opt-enqueued-time"
val SequenceNumberAnnotation = "x-opt-sequence-number"
- val SparkConnectorVersion = "2.3.13"
+ val SparkConnectorVersion = "2.3.14"
type PartitionId = Int
val PartitionId: Int.type = Int
@@ -82,5 +84,4 @@ package object eventhubs {
def toSequenceNumber: SequenceNumber = str.toLong
}
-
}
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala b/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala
index 2bea6d40..d92c55b2 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/rdd/EventHubsRDD.scala
@@ -94,7 +94,7 @@ private[spark] class EventHubsRDD(sc: SparkContext,
}
private def errBeginAfterEnd(part: EventHubsRDDPartition): String =
- s"The beginning sequence number ${part.fromSeqNo} is larger than thet ending " +
+ s"The beginning sequence number ${part.fromSeqNo} is larger than the ending " +
s"sequence number ${part.untilSeqNo} for EventHubs ${part.name} on partition " +
s"${part.partitionId}."
@@ -104,13 +104,13 @@ private[spark] class EventHubsRDD(sc: SparkContext,
if (part.fromSeqNo == part.untilSeqNo) {
logInfo(
- s"Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
+ s"(TID ${context.taskAttemptId()}) Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
s"number ${part.untilSeqNo}. Returning empty partition for EH: ${part.name} " +
s"on partition: ${part.partitionId}")
Iterator.empty
} else {
logInfo(
- s"Computing EventHubs ${part.name}, partition ${part.partitionId} " +
+ s"(TID ${context.taskAttemptId()}) Computing EventHubs ${part.name}, partition ${part.partitionId} " +
s"sequence numbers ${part.fromSeqNo} => ${part.untilSeqNo}")
val cachedReceiver = if (ehConf.useSimulatedClient) {
SimulatedCachedReceiver
diff --git a/core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala b/core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala
index 1752cf9e..12ee4217 100644
--- a/core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala
+++ b/core/src/main/scala/org/apache/spark/eventhubs/utils/RetryUtils.scala
@@ -23,10 +23,9 @@ import com.microsoft.azure.eventhubs.EventHubException
import org.apache.spark.internal.Logging
import scala.compat.java8.FutureConverters.toScala
-import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration.FiniteDuration
-import scala.concurrent.duration._
+import scala.concurrent.duration.{ FiniteDuration, _ }
+import scala.concurrent.{ ExecutionContext, Future, Promise }
// TODO tests
@@ -109,25 +108,26 @@ private[spark] object RetryUtils extends Logging {
maxRetry: Int = RetryCount,
delay: Int = 10): Future[T] = {
def retryHelper(fn: => Future[T], retryCount: Int): Future[T] = {
+ val taskId = EventHubsUtils.getTaskId
fn.recoverWith {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
- logInfo(s"failure: $opName")
+ logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
- logInfo(s"retrying $opName after $delay ms")
+ logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
case t: Throwable =>
t.getCause match {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
- logInfo(s"failure: $opName")
+ logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
- logInfo(s"retrying $opName after $delay ms")
+ logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
case _ =>
- logInfo(s"failure: $opName")
+ logInfo(s"(TID $taskId) failure: $opName")
throw t
}
}
diff --git a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala
index 62671f8a..7f3c8561 100644
--- a/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala
+++ b/core/src/main/scala/org/apache/spark/sql/eventhubs/EventHubsSource.scala
@@ -215,7 +215,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
* the start and end [[Offset]]s provided.
*
* @param start the start positions (inclusive)
- * @param end the end positions (exclusive)
+ * @param end the end positions (exclusive)
* @return the [[DataFrame]] with Event Hubs data
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
@@ -328,6 +328,10 @@ private[eventhubs] object EventHubsSource {
}
private def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
- if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
+ if (a.host == b.host) {
+ a.executorId > b.executorId
+ } else {
+ a.host > b.host
+ }
}
}
diff --git a/pom.xml b/pom.xml
index 014193c0..770ff158 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,7 +25,7 @@
com.microsoft.azure
azure-eventhubs-spark-parent_2.11
- 2.3.13
+ 2.3.14
pom
EventHubs+Spark Parent POM
@@ -136,7 +136,7 @@
com.microsoft.azure
azure-eventhubs
- 2.3.2
+ 3.0.2
org.apache.spark