Add receive, send listener for getting performance metrics (#473)

Co-authored-by: Lucas Yang <luyan@microsoft.com>
This commit is contained in:
Lucas Yang 2020-04-10 19:36:07 -07:00 коммит произвёл GitHub
Родитель d90bc73303
Коммит 0d6205a65d
Не найден ключ, соответствующий данной подписи
Идентификатор ключа GPG: 4AEE18F83AFDEB23
7 изменённых файлов: 133 добавлений и 32 удалений

Просмотреть файл

@ -24,7 +24,9 @@ import org.apache.http.annotation.Experimental
import org.apache.spark.eventhubs.PartitionPreferredLocationStrategy.PartitionPreferredLocationStrategy
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.json4s.NoTypeHints
import org.apache.spark.eventhubs.utils.MetricPlugin
import org.apache.spark.internal.Logging
import org.json4s.{DefaultFormats, NoTypeHints}
import org.json4s.jackson.Serialization
import scala.collection.JavaConverters._
@ -53,7 +55,6 @@ final class EventHubsConf private (private val connectionStr: String)
self =>
import EventHubsConf._
private val settings = new ConcurrentHashMap[String, String]()
this.setConnectionString(connectionStr)
@ -167,7 +168,8 @@ final class EventHubsConf private (private val connectionStr: String)
"eventhubs.prefetchCount",
"eventhubs.threadPoolSize",
"eventhubs.useExclusiveReceiver",
"useSimulatedClient"
"useSimulatedClient",
"eventhubs.metricPlugin"
).map(_.toLowerCase).toSet
val trimmedConfig = EventHubsConf(connectionString)
@ -445,6 +447,18 @@ final class EventHubsConf private (private val connectionStr: String)
set(MaxEventsPerTriggerKey, limit)
}
def setMetricPlugin(metricPlugin: MetricPlugin): EventHubsConf = {
set(MetricPluginKey, metricPlugin.getClass.getName)
}
def metricPlugin(): Option[MetricPlugin] = {
self.get(MetricPluginKey).map(
className => {
Class.forName(className).newInstance().asInstanceOf[MetricPlugin]
}
)
}
/**
* Set the size of thread pool.
* Default: [[DefaultUseExclusiveReceiver]]
@ -536,11 +550,13 @@ object EventHubsConf extends Logging {
val UseExclusiveReceiverKey = "eventhubs.useExclusiveReceiver"
val MaxEventsPerTriggerKey = "maxEventsPerTrigger"
val UseSimulatedClientKey = "useSimulatedClient"
val MetricPluginKey = "eventhubs.metricPlugin"
val PartitionPreferredLocationStrategyKey = "partitionPreferredLocationStrategy"
/** Creates an EventHubsConf */
def apply(connectionString: String) = new EventHubsConf(connectionString)
private[spark] def toConf(params: Map[String, String]): EventHubsConf = {
val connectionString =
EventHubsUtils.decrypt(

Просмотреть файл

@ -22,20 +22,15 @@ import java.util.concurrent.TimeUnit
import com.microsoft.azure.eventhubs._
import org.apache.spark.SparkEnv
import org.apache.spark.eventhubs.utils.RetryUtils.{ after, retryJava, retryNotNull }
import org.apache.spark.eventhubs.{
DefaultConsumerGroup,
EventHubsConf,
EventHubsUtils,
NameAndPartition,
SequenceNumber
}
import org.apache.spark.eventhubs.utils.MetricPlugin
import org.apache.spark.eventhubs.utils.RetryUtils.{after, retryJava, retryNotNull}
import org.apache.spark.eventhubs.{DefaultConsumerGroup, EventHubsConf, EventHubsUtils, NameAndPartition, SequenceNumber}
import org.apache.spark.internal.Logging
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{ Await, Awaitable, Future }
import scala.concurrent.{Await, Awaitable, Future}
private[spark] trait CachedReceiver {
private[eventhubs] def receive(ehConf: EventHubsConf,
@ -59,16 +54,18 @@ private[spark] trait CachedReceiver {
*
* @param ehConf the [[EventHubsConf]] which contains the connection string used to connect to Event Hubs
* @param nAndP the Event Hub name and partition that the receiver is connected to.
*/
*/
private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
nAndP: NameAndPartition,
startSeqNo: SequenceNumber)
extends Logging {
extends Logging {
type AwaitTimeoutException = java.util.concurrent.TimeoutException
import org.apache.spark.eventhubs._
private lazy val metricPlugin: Option[MetricPlugin] = ehConf.metricPlugin()
private lazy val client: EventHubClient = ClientConnectionPool.borrowClient(ehConf)
private var receiver: PartitionReceiver = createReceiver(startSeqNo)
@ -219,15 +216,15 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
// Then, if the succeeds, we collect the rest of the data.
val first = Await.result(checkCursor(requestSeqNo), ehConf.internalOperationTimeout)
val firstSeqNo = first.head.getSystemProperties.getSequenceNumber
val newBatchSize = (requestSeqNo + batchSize - firstSeqNo).toInt
val batchCount = (requestSeqNo + batchSize - firstSeqNo).toInt
if (newBatchSize <= 0) {
if (batchCount <= 0) {
return Iterator.empty
}
val theRest = for { i <- 1 until newBatchSize } yield
val theRest = for { i <- 1 until batchCount } yield
awaitReceiveMessage(receiveOne(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout),
s"receive; $nAndP; seqNo: ${requestSeqNo + i}"),
s"receive; $nAndP; seqNo: ${requestSeqNo + i}"),
requestSeqNo)
// Combine and sort the data.
val combined = first ++ theRest.flatten
@ -237,12 +234,22 @@ private[client] class CachedEventHubsReceiver private (ehConf: EventHubsConf,
.iterator
val (result, validate) = sorted.duplicate
assert(validate.size == newBatchSize)
val elapsedTimeMs = TimeUnit.NANOSECONDS.toMillis(elapsedTimeNs)
if (metricPlugin.isDefined) {
val (validateSize, batchSizeInBytes) =
validate.map(eventData => (1, eventData.getBytes.length.toLong)).reduceOption {
(countAndSize1, countAndSize2) =>
(countAndSize1._1 + countAndSize2._1, countAndSize1._2 + countAndSize2._2)
}.getOrElse((0, 0L))
metricPlugin.foreach(listener => {
listener.onReceiveMetric(nAndP, batchCount, batchSizeInBytes, elapsedTimeMs)
})
assert(validateSize == batchCount)
} else {
assert(validate.size == batchCount)
}
logInfo(s"(TID $taskId) Finished receiving for $nAndP, consumer group: ${ehConf.consumerGroup
.getOrElse(DefaultConsumerGroup)}, batchSize: $batchSize, elapsed time: $elapsedTimeMs ms")
result
}
@ -281,7 +288,8 @@ private[spark] object CachedEventHubsReceiver extends CachedReceiver with Loggin
private[eventhubs] override def receive(ehConf: EventHubsConf,
nAndP: NameAndPartition,
requestSeqNo: SequenceNumber,
batchSize: Int): Iterator[EventData] = {
batchSize: Int
): Iterator[EventData] = {
val taskId = EventHubsUtils.getTaskId
logInfo(s"(TID $taskId) EventHubsCachedReceiver look up. For $nAndP, ${ehConf.consumerGroup

Просмотреть файл

@ -0,0 +1,12 @@
package org.apache.spark.eventhubs.utils
import com.microsoft.azure.eventhubs.EventData
import org.apache.spark.eventhubs.{NameAndPartition, SequenceNumber}
trait MetricPlugin extends Serializable {
def onReceiveMetric(partitionInfo: NameAndPartition, batchCount: Int, batchSizeInBytes: Long, elapsedTimeInMillis: Long): Unit
def onSendMetric(eventHubName: String, batchCount: Int, batchSizeInBytes: Long, elapsedTimeInMillis: Long, isSuccess: Boolean): Unit
}

Просмотреть файл

@ -17,9 +17,10 @@
package org.apache.spark.sql.eventhubs
import com.microsoft.azure.eventhubs.{ EventData, EventHubClient }
import org.apache.spark.eventhubs.EventHubsConf
import com.microsoft.azure.eventhubs.{EventData, EventHubClient}
import org.apache.spark.eventhubs.{EventHubsConf, RetryCount}
import org.apache.spark.eventhubs.client.ClientConnectionPool
import org.apache.spark.eventhubs.utils.MetricPlugin
import org.apache.spark.eventhubs.utils.RetryUtils._
import org.apache.spark.sql.ForeachWriter
@ -38,9 +39,14 @@ import org.apache.spark.sql.ForeachWriter
* for the Event Hub which will receive the sent events
*/
case class EventHubsForeachWriter(ehConf: EventHubsConf) extends ForeachWriter[String] {
private lazy val metricPlugin: Option[MetricPlugin] = ehConf.metricPlugin()
var client: EventHubClient = _
var totalMessageSizeInBytes = 0
var totalMessageCount = 0
var writerOpenTime = 0L
def open(partitionId: Long, version: Long): Boolean = {
writerOpenTime = System.currentTimeMillis()
client = ClientConnectionPool.borrowClient(ehConf)
true
}
@ -48,12 +54,27 @@ case class EventHubsForeachWriter(ehConf: EventHubsConf) extends ForeachWriter[S
def process(body: String): Unit = {
val event = EventData.create(s"$body".getBytes("UTF-8"))
retryJava(client.send(event), "ForeachWriter")
totalMessageCount += 1
totalMessageSizeInBytes += event.getBytes.length
}
def close(errorOrNull: Throwable): Unit = {
errorOrNull match {
case t: Throwable => throw t
case t: Throwable =>
metricPlugin.foreach(_.onSendMetric(
ehConf.name,
totalMessageCount,
totalMessageSizeInBytes,
System.currentTimeMillis() - writerOpenTime,
isSuccess = false))
throw t
case _ =>
metricPlugin.foreach(_.onSendMetric(
ehConf.name,
totalMessageCount,
totalMessageSizeInBytes,
System.currentTimeMillis() - writerOpenTime,
isSuccess = true))
ClientConnectionPool.returnClient(ehConf, client)
}
}

Просмотреть файл

@ -20,6 +20,7 @@ package org.apache.spark.sql.eventhubs
import com.microsoft.azure.eventhubs.EventData
import org.apache.spark.eventhubs.EventHubsConf
import org.apache.spark.eventhubs.client.Client
import org.apache.spark.eventhubs.utils.MetricPlugin
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{
Attribute,
@ -28,9 +29,9 @@ import org.apache.spark.sql.catalyst.expressions.{
UnsafeMapData,
UnsafeProjection
}
import org.apache.spark.unsafe.types.UTF8String.IntWrapper
import org.apache.spark.sql.types.{ BinaryType, MapType, StringType }
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.UTF8String.IntWrapper
/**
* Writes out data in a single Spark task, without any concerns about how
@ -43,6 +44,10 @@ private[eventhubs] class EventHubsWriteTask(parameters: Map[String, String],
private val ehConf = EventHubsConf.toConf(parameters)
private var sender: Client = _
private lazy val metricPlugin: Option[MetricPlugin] = ehConf.metricPlugin()
var totalMessageSizeInBytes = 0
var totalMessageCount = 0
var writerOpenTime = 0L
/**
* Writers data out to EventHubs
@ -51,14 +56,22 @@ private[eventhubs] class EventHubsWriteTask(parameters: Map[String, String],
*/
def execute(iterator: Iterator[InternalRow]): Unit = {
sender = EventHubsSourceProvider.clientFactory(parameters)(ehConf)
writerOpenTime = System.currentTimeMillis()
while (iterator.hasNext) {
val currentRow = iterator.next
sendRow(currentRow, sender)
totalMessageSizeInBytes += sendRow(currentRow, sender)
totalMessageCount += 1
}
}
def close(): Unit = {
if (sender != null) {
metricPlugin.foreach(
_.onSendMetric(ehConf.name,
totalMessageCount,
totalMessageSizeInBytes,
System.currentTimeMillis() - writerOpenTime,
isSuccess = true))
sender.close()
sender = null
}
@ -109,7 +122,7 @@ private[eventhubs] abstract class EventHubsRowWriter(inputSchema: Seq[Attribute]
protected def sendRow(
row: InternalRow,
sender: Client
): Unit = {
): Int = {
val projectedRow = projection(row)
val body = projectedRow.getBinary(0)
val partitionKey = toPartitionKey(projectedRow.getUTF8String(1))
@ -123,6 +136,7 @@ private[eventhubs] abstract class EventHubsRowWriter(inputSchema: Seq[Attribute]
val event = EventData.create(body)
sender.send(event, partitionId, partitionKey, properties)
event.getBytes.length
}
private def createProjection = {

Просмотреть файл

@ -20,12 +20,12 @@ package org.apache.spark.eventhubs
import java.time.Duration
import java.util.NoSuchElementException
import org.apache.spark.eventhubs.utils.EventHubsTestUtils
import org.apache.spark.eventhubs.utils.{EventHubsTestUtils, MetricPluginMock}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.json4s.jackson.Serialization.{ read => sread }
import org.json4s.jackson.Serialization.{ write => swrite }
import org.scalatest.{ BeforeAndAfterAll, FunSuite }
import org.json4s.jackson.Serialization.{read => sread}
import org.json4s.jackson.Serialization.{write => swrite}
import org.scalatest.{BeforeAndAfterAll, FunSuite}
/**
* Tests [[EventHubsConf]] for correctness.
@ -207,6 +207,17 @@ class EventHubsConfSuite extends FunSuite with BeforeAndAfterAll {
assert(actual.equals(expected))
}
test("metricPlugin set/get") {
val expectedListener = new MetricPluginMock
val conf = testUtils.getEventHubsConf().setMetricPlugin(expectedListener)
val actualListener = conf.metricPlugin().get
val idField = actualListener.getClass.getDeclaredField("id")
idField.setAccessible(true)
assert(idField.getInt(actualListener) == expectedListener.id)
}
test("trimmedConfig") {
val originalConf = testUtils
.getEventHubsConf()

Просмотреть файл

@ -0,0 +1,19 @@
package org.apache.spark.eventhubs.utils
import org.apache.spark.eventhubs.{NameAndPartition, Rate, SequenceNumber}
class MetricPluginMock extends MetricPlugin {
val id = 1
override def onReceiveMetric(partitionInfo: NameAndPartition,
batchCount: Rate,
batchSizeInBytes: SequenceNumber,
elapsedTimeInMillis: SequenceNumber): Unit = ???
override def onSendMetric(eventHubName: String,
batchCount: Rate,
batchSizeInBytes: SequenceNumber,
elapsedTimeInMillis: SequenceNumber,
isSuccess: Boolean): Unit = ???
}