Couple of fixes related to configuration and improve logging. (#493)
* several minor fixes * add a check for operation timeout
This commit is contained in:
Родитель
fbc606f0fd
Коммит
bda705a13f
|
@ -355,6 +355,10 @@ final class EventHubsConf private (private val connectionStr: String)
|
|||
* @return the updated [[EventHubsConf]] instance
|
||||
*/
|
||||
def setReceiverTimeout(d: Duration): EventHubsConf = {
|
||||
if (d.toMillis > operationTimeout.getOrElse(DefaultOperationTimeout).toMillis) {
|
||||
throw new IllegalArgumentException("receiver timeout is greater than operation timeout")
|
||||
}
|
||||
|
||||
set(ReceiverTimeoutKey, d)
|
||||
}
|
||||
|
||||
|
@ -372,6 +376,10 @@ final class EventHubsConf private (private val connectionStr: String)
|
|||
* @return the updated [[EventHubsConf]] instance
|
||||
*/
|
||||
def setOperationTimeout(d: Duration): EventHubsConf = {
|
||||
if (d.toMillis < receiverTimeout.getOrElse(DefaultReceiverTimeout).toMillis) {
|
||||
throw new IllegalArgumentException("operation timeout is less than receiver timeout")
|
||||
}
|
||||
|
||||
set(OperationTimeoutKey, d)
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ private class ClientConnectionPool(val ehConf: EventHubsConf) extends Logging {
|
|||
s"No clients left to borrow. EventHub name: ${ehConf.name}, " +
|
||||
s"ConsumerGroup name: $consumerGroup. Creating client ${count.incrementAndGet()}")
|
||||
val connStr = ConnectionStringBuilder(ehConf.connectionString)
|
||||
connStr.setOperationTimeout(ehConf.receiverTimeout.getOrElse(DefaultOperationTimeout))
|
||||
connStr.setOperationTimeout(ehConf.receiverTimeout.getOrElse(DefaultReceiverTimeout))
|
||||
EventHubsClient.userAgent =
|
||||
s"SparkConnector-$SparkConnectorVersion-[${ehConf.name}]-[$consumerGroup]"
|
||||
while (client == null) {
|
||||
|
|
|
@ -181,6 +181,10 @@ private[spark] class EventHubsSource private[eventhubs] (sqlContext: SQLContext,
|
|||
from.map {
|
||||
case (nAndP, seqNo) =>
|
||||
if (seqNo < earliestSeqNos.get(nAndP)) {
|
||||
reportDataLoss(
|
||||
s"Starting seqNo $seqNo in partition ${nAndP.partitionId} of EventHub ${nAndP.ehName} " +
|
||||
s"is behind the earliest sequence number ${earliestSeqNos.get(nAndP)} " +
|
||||
s"present in the service. Some events may have expired and been missed.")
|
||||
nAndP -> earliestSeqNos.get(nAndP)
|
||||
} else {
|
||||
nAndP -> seqNo
|
||||
|
|
|
@ -294,4 +294,22 @@ class EventHubsConfSuite extends FunSuite with BeforeAndAfterAll {
|
|||
eventHubConfig.partitionPreferredLocationStrategy ==
|
||||
PartitionPreferredLocationStrategy.BalancedHash)
|
||||
}
|
||||
|
||||
test("validate - receiver and operation timeout") {
|
||||
val eventHubConfig = testUtils.getEventHubsConf()
|
||||
intercept[IllegalArgumentException] {
|
||||
eventHubConfig.setOperationTimeout(Duration.ofMinutes(10))
|
||||
eventHubConfig.setReceiverTimeout(Duration.ofMinutes(11))
|
||||
}
|
||||
intercept[IllegalArgumentException] {
|
||||
eventHubConfig.setReceiverTimeout(Duration.ofMinutes(2))
|
||||
eventHubConfig.setOperationTimeout(Duration.ofMinutes(1))
|
||||
}
|
||||
|
||||
eventHubConfig.setReceiverTimeout(Duration.ofMinutes(2))
|
||||
assert(eventHubConfig.receiverTimeout.get.toMinutes == 2)
|
||||
|
||||
eventHubConfig.setOperationTimeout(Duration.ofMinutes(3))
|
||||
assert(eventHubConfig.operationTimeout.get.toMinutes == 3)
|
||||
}
|
||||
}
|
||||
|
|
Загрузка…
Ссылка в новой задаче