Several bug fixes and improvements for 2.3.14 release (#465)

* [Bug fix] fix IO pipe issue that causes a blocking issue on send calls
* [Bug fix] fix broken TCP connection issue that causes prolonged delay in receiving events
* [Configuration] provide a configuration knob to specify whether an epoch receiver should be used. Default value is true.
* [Tracing] add a Spark task ID to log messages and emit latency data for receive calls
This commit is contained in:
SJ 2019-10-23 09:44:13 -07:00 коммит произвёл GitHub
Родитель d5ad0d6caa
Коммит 33ee66e23a
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
11 изменённых файлов: 193 добавлений и 75 удалений

Просмотреть файл

@ -23,7 +23,7 @@
<parent>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark-parent_2.11</artifactId>
<version>2.3.13</version>
<version>2.3.14</version>
<relativePath>../pom.xml</relativePath>
</parent>
<artifactId>azure-eventhubs-spark_2.11</artifactId>

Просмотреть файл

@ -435,6 +435,22 @@ final class EventHubsConf private (private val connectionStr: String)
set(MaxEventsPerTriggerKey, limit)
}
/**
* Set the size of thread pool.
* Default: [[DefaultUseExclusiveReceiver]]
*
* @param b the flag which specifies whether the connector uses an epoch receiver
* @return the updated [[EventHubsConf]] instance
*/
def setUseExclusiveReceiver(b: Boolean): EventHubsConf = {
set(UseExclusiveReceiverKey, b)
}
/** The current thread pool size. */
def useExclusiveReceiver: Boolean = {
self.get(UseExclusiveReceiverKey).getOrElse(DefaultUseExclusiveReceiver).toBoolean
}
// The simulated client (and simulated eventhubs) will be used. These
// can be found in EventHubsTestUtils.
private[spark] def setUseSimulatedClient(b: Boolean): EventHubsConf = {
@ -484,6 +500,7 @@ object EventHubsConf extends Logging {
val ThreadPoolSizeKey = "eventhubs.threadPoolSize"
val MaxEventsPerTriggerKey = "maxEventsPerTrigger"
val UseSimulatedClientKey = "useSimulatedClient"
val UseExclusiveReceiverKey = "useExclusiveReceiver"
/** Creates an EventHubsConf */
def apply(connectionString: String) = new EventHubsConf(connectionString)

Просмотреть файл

@ -17,14 +17,22 @@
package org.apache.spark.eventhubs
import com.microsoft.azure.eventhubs.EventData
import java.util.concurrent.CompletableFuture
import com.microsoft.azure.eventhubs.{
EventData,
EventHubClient,
PartitionReceiver,
ReceiverOptions,
EventPosition => ehep
}
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.eventhubs.client.EventHubsClient
import org.apache.spark.eventhubs.rdd.{ EventHubsRDD, OffsetRange }
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
import org.apache.spark.SparkContext
import org.apache.spark.api.java.{ JavaRDD, JavaSparkContext }
import org.apache.spark.streaming.api.java.{ JavaInputDStream, JavaStreamingContext }
import org.apache.spark.streaming.eventhubs.EventHubsDirectDStream
import org.apache.spark.{ SparkContext, TaskContext }
/**
* Helper to create Direct DStreams which consume events from Event Hubs.
@ -35,7 +43,7 @@ object EventHubsUtils {
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
*
* @param ssc the StreamingContext this DStream belongs to
* @param ssc the StreamingContext this DStream belongs to
* @param ehConf the parameters for your EventHubs instance
* @return An [[EventHubsDirectDStream]]
*/
@ -47,7 +55,7 @@ object EventHubsUtils {
* Creates a Direct DStream which consumes from the Event Hubs instance
* specified in the [[EventHubsConf]].
*
* @param jssc the JavaStreamingContext this DStream belongs to
* @param jssc the JavaStreamingContext this DStream belongs to
* @param ehConf the parameters for your EventHubs instance
* @return A [[JavaInputDStream]] containing [[EventData]]
*/
@ -60,8 +68,8 @@ object EventHubsUtils {
* Creates an RDD which is contains events from an EventHubs instance.
* Starting and ending offsets are specified in advance.
*
* @param sc the SparkContext the RDD belongs to
* @param ehConf contains EventHubs-specific configurations
* @param sc the SparkContext the RDD belongs to
* @param ehConf contains EventHubs-specific configurations
* @param offsetRanges offset ranges that define the EventHubs data belonging to this RDD
* @return An [[EventHubsRDD]]
*
@ -76,8 +84,8 @@ object EventHubsUtils {
* Creates an RDD which is contains events from an EventHubs instance.
* Starting and ending offsets are specified in advance.
*
* @param jsc the JavaSparkContext the RDD belongs to
* @param ehConf contains EventHubs-specific configurations
* @param jsc the JavaSparkContext the RDD belongs to
* @param ehConf contains EventHubs-specific configurations
* @param offsetRanges offset ranges that define the EventHubs data belonging to this RDD
* @return A [[JavaRDD]] containing [[EventData]]
*
@ -87,4 +95,30 @@ object EventHubsUtils {
offsetRanges: Array[OffsetRange]): JavaRDD[EventData] = {
new JavaRDD(createRDD(jsc.sc, ehConf, offsetRanges))
}
def createReceiverInner(
client: EventHubClient,
useExclusiveReceiver: Boolean,
consumerGroup: String,
partitionId: String,
eventPosition: ehep,
receiverOptions: ReceiverOptions): CompletableFuture[PartitionReceiver] = {
if (useExclusiveReceiver) {
client.createEpochReceiver(consumerGroup,
partitionId,
eventPosition,
DefaultEpoch,
receiverOptions)
} else {
client.createReceiver(consumerGroup, partitionId, eventPosition, receiverOptions)
}
}
def getTaskId: Long = {
val taskContext = TaskContext.get()
if (taskContext != null) {
taskContext.taskAttemptId()
} else -1
}
}

Просмотреть файл

@ -18,15 +18,23 @@
package org.apache.spark.eventhubs.client
import java.time.Duration
import java.util.concurrent.TimeUnit
import com.microsoft.azure.eventhubs._
import org.apache.spark.eventhubs.utils.RetryUtils.{ retryJava, retryNotNull }
import org.apache.spark.eventhubs.{ EventHubsConf, NameAndPartition, SequenceNumber }
import org.apache.spark.SparkEnv
import org.apache.spark.eventhubs.utils.RetryUtils.{ after, retryJava, retryNotNull }
import org.apache.spark.eventhubs.{
DefaultConsumerGroup,
EventHubsConf,
EventHubsUtils,
NameAndPartition,
SequenceNumber
}
import org.apache.spark.internal.Logging
import org.apache.spark.{ SparkEnv, TaskContext }
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ Await, Awaitable, Future }
private[spark] trait CachedReceiver {
@ -63,58 +71,86 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
private lazy val client: EventHubClient = ClientConnectionPool.borrowClient(ehConf)
private var receiver: Future[PartitionReceiver] = createReceiver(startSeqNo)
private var receiver: PartitionReceiver = createReceiver(startSeqNo)
private def createReceiver(seqNo: SequenceNumber): Future[PartitionReceiver] = {
private def createReceiver(seqNo: SequenceNumber): PartitionReceiver = {
val taskId = EventHubsUtils.getTaskId
logInfo(
s"creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. seqNo: $seqNo")
s"(TID $taskId) creating receiver for Event Hub ${nAndP.ehName} on partition ${nAndP.partitionId}. seqNo: $seqNo")
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
val receiverOptions = new ReceiverOptions
receiverOptions.setReceiverRuntimeMetricEnabled(true)
receiverOptions.setPrefetchCount(ehConf.prefetchCount.getOrElse(DefaultPrefetchCount))
receiverOptions.setIdentifier(
s"spark-${SparkEnv.get.executorId}-${TaskContext.get.taskAttemptId}")
val epochReceiver = retryJava(
client.createEpochReceiver(consumerGroup,
nAndP.partitionId.toString,
EventPosition.fromSequenceNumber(seqNo).convert,
DefaultEpoch,
receiverOptions),
receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}-$taskId")
val consumer = retryJava(
EventHubsUtils.createReceiverInner(client,
ehConf.useExclusiveReceiver,
consumerGroup,
nAndP.partitionId.toString,
EventPosition.fromSequenceNumber(seqNo).convert,
receiverOptions),
"CachedReceiver creation."
)
epochReceiver
Await.result(consumer, ehConf.internalOperationTimeout)
}
private def lastReceivedOffset(): Future[Long] = {
receiver
.flatMap { r =>
if (r.getEventPosition.getSequenceNumber != null) {
Future.successful(r.getEventPosition.getSequenceNumber)
} else {
Future.successful(-1)
}
}
if (receiver.getEventPosition.getSequenceNumber != null) {
Future.successful(receiver.getEventPosition.getSequenceNumber)
} else {
Future.successful(-1)
}
}
private def receiveOne(timeout: Duration, msg: String): Future[Iterable[EventData]] = {
receiver
.flatMap { r =>
r.setReceiveTimeout(timeout)
retryNotNull(r.receive(1), msg)
def receiveOneWithRetry(timeout: Duration,
msg: String,
retryCount: Int): Future[Iterable[EventData]] = {
if (!receiver.getIsOpen && retryCount < RetryCount) {
val taskId = EventHubsUtils.getTaskId
logInfo(
s"(TID $taskId) receiver is not opened yet. Will retry.. $nAndP, consumer group: ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}")
after(WaitInterval.milliseconds)(receiveOneWithRetry(timeout, msg, retryCount + 1))
}
.map {
receiver.setReceiveTimeout(timeout)
retryNotNull(receiver.receive(1), msg).map(
_.asScala
}
)
}
receiveOneWithRetry(timeout, msg, retryCount = 0)
}
private def closeReceiver(): Future[Void] = {
retryJava(receiver.close(), "closing a receiver")
}
private def recreateReceiver(seqNo: SequenceNumber): Unit = {
val taskId = EventHubsUtils.getTaskId
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
Await.result(closeReceiver(), ehConf.internalOperationTimeout)
receiver = createReceiver(seqNo)
val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
logInfo(s"(TID $taskId) Finished recreating a receiver for $nAndP, ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}: $elapsedTimeMs ms")
}
private def checkCursor(requestSeqNo: SequenceNumber): Future[Iterable[EventData]] = {
val taskId = EventHubsUtils.getTaskId
val lastReceivedSeqNo =
Await.result(lastReceivedOffset(), ehConf.internalOperationTimeout)
if (lastReceivedSeqNo > -1 && lastReceivedSeqNo + 1 != requestSeqNo) {
logInfo(
s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo")
receiver = createReceiver(requestSeqNo)
logInfo(s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse(
DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, lastReceivedSeqNo: $lastReceivedSeqNo")
recreateReceiver(requestSeqNo)
}
val event = awaitReceiveMessage(
@ -129,8 +165,9 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
// 2) Your desired event has expired from the service.
// First, we'll check for case (1).
logInfo(
s"checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
receiver = createReceiver(requestSeqNo)
s"(TID $taskId) checkCursor. Recreating a receiver for $nAndP, ${ehConf.consumerGroup.getOrElse(
DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, receivedSeqNo: $receivedSeqNo")
recreateReceiver(requestSeqNo)
val movedEvent = awaitReceiveMessage(
receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout), "checkCursor move"),
requestSeqNo)
@ -167,6 +204,10 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
}
private def receive(requestSeqNo: SequenceNumber, batchSize: Int): Iterator[EventData] = {
val taskId = EventHubsUtils.getTaskId
val startTimeNs = System.nanoTime()
def elapsedTimeNs = System.nanoTime() - startTimeNs
// Retrieve the events. First, we get the first event in the batch.
// Then, if the succeeds, we collect the rest of the data.
val first = Await.result(checkCursor(requestSeqNo), ehConf.internalOperationTimeout)
@ -190,18 +231,26 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
val (result, validate) = sorted.duplicate
assert(validate.size == newBatchSize)
val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
logInfo(s"(TID $taskId) Finished receiving for $nAndP, consumer group: ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}, batchSize: $batchSize, elapsed time: $elapsedTimeMs ms")
result
}
private def awaitReceiveMessage[T](awaitable: Awaitable[T], requestSeqNo: SequenceNumber): T = {
val taskId = EventHubsUtils.getTaskId
try {
Await.result(awaitable, ehConf.internalOperationTimeout)
} catch {
case e: AwaitTimeoutException =>
logError(
s"awaitReceiveMessage call failed with timeout. Event Hub $nAndP, ConsumerGroup ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo")
s"(TID $taskId) awaitReceiveMessage call failed with timeout. Event Hub $nAndP, ConsumerGroup ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo")
receiver = createReceiver(requestSeqNo)
recreateReceiver(requestSeqNo)
throw e
}
}
@ -226,8 +275,10 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): Iterator[EventData] = {
logInfo(
s"EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup}. requestSeqNo: $requestSeqNo, batchSize: $batchSize")
val taskId = EventHubsUtils.getTaskId
logInfo(s"(TID $taskId) EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}. requestSeqNo: $requestSeqNo, batchSize: $batchSize")
var receiver: CachedEventHubsReceiver = null
receivers.synchronized {
receiver = receivers.getOrElseUpdate(key(ehConf, nAndP), {

Просмотреть файл

@ -45,16 +45,22 @@ private class ClientConnectionPool(val ehConf: EventHubsConf) extends Logging {
*/
private def borrowClient: EventHubClient = {
var client = pool.poll()
val consumerGroup = ehConf.consumerGroup.getOrElse(DefaultConsumerGroup)
if (client == null) {
logInfo(
s"No clients left to borrow. EventHub name: ${ehConf.name}. Creating client ${count.incrementAndGet()}")
s"No clients left to borrow. EventHub name: ${ehConf.name}, " +
s"ConsumerGroup name: $consumerGroup. Creating client ${count.incrementAndGet()}")
val connStr = ConnectionStringBuilder(ehConf.connectionString)
connStr.setOperationTimeout(ehConf.operationTimeout.getOrElse(DefaultOperationTimeout))
EventHubsClient.userAgent =
s"SparkConnector-$SparkConnectorVersion-[${ehConf.name}]-[$consumerGroup]"
while (client == null) {
client = EventHubClient.createSync(connStr.toString, ClientThreadPool.get(ehConf))
client = EventHubClient.createFromConnectionStringSync(connStr.toString,
ClientThreadPool.get(ehConf))
}
} else {
logInfo(s"Borrowing client. EventHub name: ${ehConf.name}")
logInfo(
s"Borrowing client. EventHub name: ${ehConf.name}, ConsumerGroup name: $consumerGroup")
}
logInfo(s"Available clients: {${pool.size}}. Total clients: ${count.get}")
client

Просмотреть файл

@ -266,6 +266,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
case _ =>
val runtimeInfo =
Await.result(getRunTimeInfoF(nAndP.partitionId), ehConf.internalOperationTimeout)
var receiver: Future[PartitionReceiver] = null
val seqNo =
if (runtimeInfo.getIsEmpty || (pos.enqueuedTime != null &&
runtimeInfo.getLastEnqueuedTimeUtc.isBefore(pos.enqueuedTime.toInstant))) {
@ -277,12 +278,15 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
receiverOptions.setPrefetchCount(1)
receiverOptions.setIdentifier(s"spark-${SparkEnv.get.executorId}")
val receiver = retryJava(client.createEpochReceiver(consumerGroup,
nAndP.partitionId.toString,
pos.convert,
DefaultEpoch,
receiverOptions),
"translate: epoch receiver creation.")
receiver = retryJava(
EventHubsUtils.createReceiverInner(client,
ehConf.useExclusiveReceiver,
consumerGroup,
nAndP.partitionId.toString,
pos.convert,
receiverOptions),
"translate: receiver creation."
)
receiver
.flatMap { r =>
r.setReceiveTimeout(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout))
@ -292,6 +296,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
e.iterator.next.getSystemProperties.getSequenceNumber
}
}
(nAndP.partitionId, seqNo)
}

Просмотреть файл

@ -44,16 +44,18 @@ package object eventhubs {
val DefaultPrefetchCount: Int = PartitionReceiver.DEFAULT_PREFETCH_COUNT
val DefaultFailOnDataLoss = "true"
val DefaultUseSimulatedClient = "false"
val DefaultUseExclusiveReceiver = "true"
val StartingSequenceNumber = 0L
val DefaultThreadPoolSize = 16
val DefaultEpoch = 0L
val RetryCount = 3
val WaitInterval = 3000
val OffsetAnnotation = "x-opt-offset"
val EnqueuedTimeAnnotation = "x-opt-enqueued-time"
val SequenceNumberAnnotation = "x-opt-sequence-number"
val SparkConnectorVersion = "2.3.13"
val SparkConnectorVersion = "2.3.14"
type PartitionId = Int
val PartitionId: Int.type = Int
@ -82,5 +84,4 @@ package object eventhubs {
def toSequenceNumber: SequenceNumber = str.toLong
}
}

Просмотреть файл

@ -94,7 +94,7 @@ private[spark] class EventHubsRDD(sc: SparkContext,
}
private def errBeginAfterEnd(part: EventHubsRDDPartition): String =
s"The beginning sequence number ${part.fromSeqNo} is larger than thet ending " +
s"The beginning sequence number ${part.fromSeqNo} is larger than the ending " +
s"sequence number ${part.untilSeqNo} for EventHubs ${part.name} on partition " +
s"${part.partitionId}."
@ -104,13 +104,13 @@ private[spark] class EventHubsRDD(sc: SparkContext,
if (part.fromSeqNo == part.untilSeqNo) {
logInfo(
s"Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
s"(TID ${context.taskAttemptId()}) Beginning sequence number ${part.fromSeqNo} is equal to the ending sequence " +
s"number ${part.untilSeqNo}. Returning empty partition for EH: ${part.name} " +
s"on partition: ${part.partitionId}")
Iterator.empty
} else {
logInfo(
s"Computing EventHubs ${part.name}, partition ${part.partitionId} " +
s"(TID ${context.taskAttemptId()}) Computing EventHubs ${part.name}, partition ${part.partitionId} " +
s"sequence numbers ${part.fromSeqNo} => ${part.untilSeqNo}")
val cachedReceiver = if (ehConf.useSimulatedClient) {
SimulatedCachedReceiver

Просмотреть файл

@ -23,10 +23,9 @@ import com.microsoft.azure.eventhubs.EventHubException
import org.apache.spark.internal.Logging
import scala.compat.java8.FutureConverters.toScala
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.duration.{ FiniteDuration, _ }
import scala.concurrent.{ ExecutionContext, Future, Promise }
// TODO tests
@ -109,25 +108,26 @@ private[spark] object RetryUtils extends Logging {
maxRetry: Int = RetryCount,
delay: Int = 10): Future[T] = {
def retryHelper(fn: => Future[T], retryCount: Int): Future[T] = {
val taskId = EventHubsUtils.getTaskId
fn.recoverWith {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
logInfo(s"failure: $opName")
logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
logInfo(s"retrying $opName after $delay ms")
logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
case t: Throwable =>
t.getCause match {
case eh: EventHubException if eh.getIsTransient =>
if (retryCount >= maxRetry) {
logInfo(s"failure: $opName")
logInfo(s"(TID $taskId) failure: $opName")
throw eh
}
logInfo(s"retrying $opName after $delay ms")
logInfo(s"(TID $taskId) retrying $opName after $delay ms")
after(delay.milliseconds)(retryHelper(fn, retryCount + 1))
case _ =>
logInfo(s"failure: $opName")
logInfo(s"(TID $taskId) failure: $opName")
throw t
}
}

Просмотреть файл

@ -215,7 +215,7 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
* the start and end [[Offset]]s provided.
*
* @param start the start positions (inclusive)
* @param end the end positions (exclusive)
* @param end the end positions (exclusive)
* @return the [[DataFrame]] with Event Hubs data
*/
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
@ -328,6 +328,10 @@ private[eventhubs] object EventHubsSource {
}
private def compare(a: ExecutorCacheTaskLocation, b: ExecutorCacheTaskLocation): Boolean = {
if (a.host == b.host) { a.executorId > b.executorId } else { a.host > b.host }
if (a.host == b.host) {
a.executorId > b.executorId
} else {
a.host > b.host
}
}
}

Просмотреть файл

@ -25,7 +25,7 @@
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-spark-parent_2.11</artifactId>
<version>2.3.13</version>
<version>2.3.14</version>
<packaging>pom</packaging>
<name>EventHubs+Spark Parent POM</name>
@ -136,7 +136,7 @@
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs</artifactId>
<version>2.3.2</version>
<version>3.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>