put cache vars in the class instead of object (#577)
This commit is contained in:
Родитель
816181dd84
Коммит
7da32f69b4
|
@ -54,6 +54,9 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
|
||||||
|
|
||||||
private var _client: EventHubClient = _
|
private var _client: EventHubClient = _
|
||||||
|
|
||||||
|
private var partitionCountCache: Int = 0
|
||||||
|
private var partitionCountCacheUpdateTimestamp: Long = 0
|
||||||
|
|
||||||
private def client = synchronized {
|
private def client = synchronized {
|
||||||
if (_client == null) {
|
if (_client == null) {
|
||||||
_client = ClientConnectionPool.borrowClient(ehConf)
|
_client = ClientConnectionPool.borrowClient(ehConf)
|
||||||
|
@ -180,7 +183,7 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
|
||||||
lazy val partitionCountLazyVal: Int = {
|
lazy val partitionCountLazyVal: Int = {
|
||||||
try {
|
try {
|
||||||
logDebug(
|
logDebug(
|
||||||
s"partitionCountLazyVal makes a call to runTimeInfo to read the number of partitions.")
|
s"partitionCountLazyVal makes a call to runTimeInfo to read the number of partitions for EventHub ${client.getEventHubName}.")
|
||||||
val runtimeInfo = client.getRuntimeInformation.get
|
val runtimeInfo = client.getRuntimeInformation.get
|
||||||
runtimeInfo.getPartitionCount
|
runtimeInfo.getPartitionCount
|
||||||
} catch {
|
} catch {
|
||||||
|
@ -196,7 +199,8 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
|
||||||
partitionCountCache = runtimeInfo.getPartitionCount
|
partitionCountCache = runtimeInfo.getPartitionCount
|
||||||
partitionCountCacheUpdateTimestamp = currentTimeStamp
|
partitionCountCacheUpdateTimestamp = currentTimeStamp
|
||||||
logDebug(
|
logDebug(
|
||||||
s"partitionCountDynamic made a call to runTimeInfo to read the number of partitions = ${partitionCountCache} at timestamp = ${partitionCountCacheUpdateTimestamp}")
|
s"partitionCountDynamic made a call to runTimeInfo to read the number of partitions = ${partitionCountCache}" +
|
||||||
|
s" at timestamp = ${partitionCountCacheUpdateTimestamp} for EventHub ${client.getEventHubName}")
|
||||||
}
|
}
|
||||||
partitionCountCache
|
partitionCountCache
|
||||||
} catch {
|
} catch {
|
||||||
|
@ -350,8 +354,6 @@ private[spark] class EventHubsClient(private val ehConf: EventHubsConf)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[spark] object EventHubsClient {
|
private[spark] object EventHubsClient {
|
||||||
private var partitionCountCache: Int = 0
|
|
||||||
private var partitionCountCacheUpdateTimestamp: Long = 0
|
|
||||||
|
|
||||||
private[spark] def apply(ehConf: EventHubsConf): EventHubsClient =
|
private[spark] def apply(ehConf: EventHubsConf): EventHubsClient =
|
||||||
new EventHubsClient(ehConf)
|
new EventHubsClient(ehConf)
|
||||||
|
|
Загрузка…
Ссылка в новой задаче