KAFKA-6244; Dynamic update of log cleaner configuration (#4465)

This commit is contained in:
Rajini Sivaram 2018-01-26 14:38:46 -08:00 коммит произвёл Jason Gustafson
Родитель 1fef10d47f
Коммит 70cecb6881
8 изменённых файлов: 244 добавлений и 39 удалений

Просмотреть файл

@ -20,13 +20,14 @@ package kafka.log
import java.io.{File, IOException}
import java.nio._
import java.nio.file.Files
import java.util
import java.util.Date
import java.util.concurrent.TimeUnit
import com.yammer.metrics.core.Gauge
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.server.LogDirFailureChannel
import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
import scala.collection.mutable
import scala.collection.{Set, mutable}
import scala.collection.JavaConverters._
/**
@ -83,16 +84,20 @@ import scala.collection.JavaConverters._
* data from the transaction prior to reaching the offset of the marker. This follows the same logic used for
* tombstone deletion.
*
* @param config Configuration parameters for the cleaner
* @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated.
* @param logDirs The directories where offset checkpoints reside
* @param logs The pool of logs
* @param time A way to control the passage of time
*/
class LogCleaner(val config: CleanerConfig,
class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logDirFailureChannel: LogDirFailureChannel,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup {
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
{
/* Log cleaner configuration which may be dynamically updated */
@volatile private var config = initialConfig
/* for managing the state of partitions being cleaned. package-private to allow access in tests */
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
@ -106,7 +111,7 @@ class LogCleaner(val config: CleanerConfig,
time = time)
/* the threads */
private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
private val cleaners = mutable.ArrayBuffer[CleanerThread]()
/* a metric to track the maximum utilization of any thread's buffer in the last cleaning */
newGauge("max-buffer-utilization-percent",
@ -133,7 +138,11 @@ class LogCleaner(val config: CleanerConfig,
*/
def startup() {
info("Starting the log cleaner")
cleaners.foreach(_.start())
(0 until config.numThreads).foreach { i =>
val cleaner = new CleanerThread(i)
cleaners += cleaner
cleaner.start()
}
}
/**
@ -142,6 +151,27 @@ class LogCleaner(val config: CleanerConfig,
def shutdown() {
info("Shutting down the log cleaner.")
cleaners.foreach(_.shutdown())
cleaners.clear()
}
override def reconfigurableConfigs(): Set[String] = {
LogCleaner.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
val numThreads = newCleanerConfig.numThreads
numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <= config.numThreads * 2
}
/**
* Reconfigure log clean config. This simply stops current log cleaners and creates new ones.
* That ensures that if any of the cleaners had failed, new cleaners are created to match the new config.
*/
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
config = LogCleaner.cleanerConfig(newConfig)
shutdown()
startup()
}
/**
@ -210,6 +240,12 @@ class LogCleaner(val config: CleanerConfig,
isCleaned
}
// Only for testing
private[kafka] def currentConfig: CleanerConfig = config
// Only for testing
private[log] def cleanerCount: Int = cleaners.size
/**
* The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by
* choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments.
@ -317,6 +353,30 @@ class LogCleaner(val config: CleanerConfig,
}
}
object LogCleaner {
val ReconfigurableConfigs = Set(
KafkaConfig.LogCleanerThreadsProp,
KafkaConfig.LogCleanerDedupeBufferSizeProp,
KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
KafkaConfig.LogCleanerIoBufferSizeProp,
KafkaConfig.MessageMaxBytesProp,
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
KafkaConfig.LogCleanerBackoffMsProp
)
def cleanerConfig(config: KafkaConfig): CleanerConfig = {
CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
}
}
/**
* This class holds the actual logic for cleaning a log
* @param id An identifier used for logging

Просмотреть файл

@ -893,18 +893,11 @@ object LogManager {
val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
val defaultLogConfig = LogConfig(defaultProps)
// read the log configurations from zookeeper
val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
if (!failed.isEmpty) throw failed.head._2
// read the log configurations from zookeeper
val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
dedupeBufferSize = config.logCleanerDedupeBufferSize,
dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
ioBufferSize = config.logCleanerIoBufferSize,
maxMessageSize = config.messageMaxBytes,
maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
backOffMs = config.logCleanerBackoffMs,
enableCleaner = config.logCleanerEnable)
val cleanerConfig = LogCleaner.cleanerConfig(config)
new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),

Просмотреть файл

@ -22,6 +22,7 @@ import java.util
import java.util.Properties
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.log.LogCleaner
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.zk.{AdminZkClient, KafkaZkClient}
@ -77,6 +78,7 @@ object DynamicBrokerConfig {
val AllDynamicConfigs = mutable.Set[String]()
AllDynamicConfigs ++= DynamicSecurityConfigs
AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
private val PerBrokerConfigs = DynamicSecurityConfigs
@ -115,6 +117,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
private val dynamicDefaultConfigs = mutable.Map[String, String]()
private val brokerId = kafkaConfig.brokerId
private val reconfigurables = mutable.Buffer[Reconfigurable]()
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var currentConfig = kafkaConfig
@ -124,11 +127,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
updateBrokerConfig(brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
}
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
reconfigurables += reconfigurable
}
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
brokerReconfigurables += reconfigurable
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
reconfigurables -= reconfigurable
}
@ -327,9 +340,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val updatedKeys = updatedConfigs(newValues, oldValues).keySet
processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys))
processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly)
case reconfigurable =>
processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet))
processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
}
brokerReconfigurables.foreach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet))
processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly)
}
newConfig
} catch {
@ -343,18 +362,41 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
currentConfig
}
private def processReconfigurable(reconfigurable: Reconfigurable, updatedKeys: Set[String],
allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object],
private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = {
reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
}
private def processReconfigurable(reconfigurable: Reconfigurable,
allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
if (reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
val newConfigs = new util.HashMap[String, Object]
allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
newConfigs.putAll(newCustomConfigs)
if (validateOnly) {
if (!reconfigurable.validateReconfiguration(newConfigs))
throw new ConfigException("Validation of dynamic config update failed")
} else
reconfigurable.reconfigure(newConfigs)
}
val newConfigs = new util.HashMap[String, Object]
allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
newConfigs.putAll(newCustomConfigs)
if (validateOnly) {
if (!reconfigurable.validateReconfiguration(newConfigs))
throw new ConfigException("Validation of dynamic config update failed")
} else
reconfigurable.reconfigure(newConfigs)
}
private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
oldConfig: KafkaConfig,
newConfig: KafkaConfig,
validateOnly: Boolean): Unit = {
if (validateOnly) {
if (!reconfigurable.validateReconfiguration(newConfig))
throw new ConfigException("Validation of dynamic config update failed")
} else
reconfigurable.reconfigure(oldConfig, newConfig)
}
}
trait BrokerReconfigurable {
def reconfigurableConfigs: Set[String]
def validateReconfiguration(newConfig: KafkaConfig): Boolean
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}

Просмотреть файл

@ -290,6 +290,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
Mx4jLoader.maybeLoad()
/* Add all reconfigurables for config change notification before starting config handlers */
config.dynamicConfig.addReconfigurables(this)
/* start dynamic config manager */
dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers),
ConfigType.Client -> new ClientIdConfigHandler(quotaManagers),

Просмотреть файл

@ -242,6 +242,46 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
}
@Test
def testLogCleanerConfig(): Unit = {
val (producerThread, consumerThread) = startProduceConsume(0)
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
val props = new Properties
props.put(KafkaConfig.LogCleanerThreadsProp, "2")
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
props.put(KafkaConfig.MessageMaxBytesProp, "40000")
props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2"))
// Verify cleaner config was updated
val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
assertEquals(2, newCleanerConfig.numThreads)
assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
assertEquals(300000, newCleanerConfig.ioBufferSize)
assertEquals(40000, newCleanerConfig.maxMessageSize)
assertEquals(50000000, newCleanerConfig.maxIoBytesPerSecond, 50000000)
assertEquals(6000, newCleanerConfig.backOffMs)
// Verify thread count
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
// Stop a couple of threads and verify they are recreated if any config is updated
def cleanerThreads = Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
cleanerThreads.take(2).foreach(_.interrupt())
TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, "Threads did not exit")
props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerBackoffMsProp, "8000"))
verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
}
private def createProducer(trustStore: File, retries: Int,
clientId: String = "test-producer"): KafkaProducer[String, String] = {
val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
@ -411,6 +451,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
props
}
private def currentThreads: List[String] = {
Thread.getAllStackTraces.keySet.asScala.toList.map(_.getName)
}
private def verifyThreads(threadPrefix: String, countPerBroker: Int): Unit = {
val expectedCount = countPerBroker * servers.size
val (threads, resized) = TestUtils.computeUntilTrue(currentThreads.filter(_.startsWith(threadPrefix))) {
_.size == expectedCount
}
assertTrue(s"Invalid threads: expected $expectedCount, got ${threads.size}: $threads", resized)
}
private def startProduceConsume(retries: Int): (ProducerThread, ConsumerThread) = {
val producerThread = new ProducerThread(retries)
clientThreads += producerThread

Просмотреть файл

@ -79,6 +79,7 @@ abstract class AbstractLogCleanerIntegrationTest {
compactionLag: Long = defaultCompactionLag,
deleteDelay: Int = defaultDeleteDelay,
segmentSize: Int = defaultSegmentSize,
cleanerIoBufferSize: Option[Int] = None,
propertyOverrides: Properties = new Properties()): LogCleaner = {
val logMap = new Pool[TopicPartition, Log]()
@ -108,7 +109,7 @@ abstract class AbstractLogCleanerIntegrationTest {
val cleanerConfig = CleanerConfig(
numThreads = numThreads,
ioBufferSize = maxMessageSize / 2,
ioBufferSize = cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
maxMessageSize = maxMessageSize,
backOffMs = backOffMs)
new LogCleaner(cleanerConfig,

Просмотреть файл

@ -18,10 +18,12 @@
package kafka.log
import java.io.File
import java.util
import java.util.Properties
import kafka.api.KAFKA_0_11_0_IV0
import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
import kafka.server.KafkaConfig
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.utils._
import org.apache.kafka.common.TopicPartition
@ -227,6 +229,56 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle
checkLogAfterAppendingDups(log, startSize, appends)
}
@Test
def cleanerConfigUpdateTest() {
val largeMessageKey = 20
val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
val maxMessageSize = largeMessageSet.sizeInBytes
cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize,
cleanerIoBufferSize = Some(1))
val log = cleaner.logs.get(topicPartitions(0))
val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec)
val startSize = log.size
cleaner.startup()
assertEquals(1, cleaner.cleanerCount)
// Verify no cleaning with LogCleanerIoBufferSizeProp=1
val firstDirty = log.activeSegment.baseOffset
val topicPartition = new TopicPartition("log", 0)
cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = {
val props = TestUtils.createBrokerConfig(0, "localhost:2181")
props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString)
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString)
props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString)
props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString)
props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString)
props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString)
props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString)
KafkaConfig.fromProps(props)
}
// Verify cleaning done with larger LogCleanerIoBufferSizeProp
val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
ioBufferSize = 100000,
maxMessageSize = cleaner.currentConfig.maxMessageSize,
maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
backOffMs = cleaner.currentConfig.backOffMs))
cleaner.reconfigure(oldConfig, newConfig)
assertEquals(2, cleaner.cleanerCount)
checkLastCleaned("log", 0, firstDirty)
val compactedSize = log.logSegments.map(_.size).sum
assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize)
}
private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) {
// wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than
// LogConfig.MinCleanableDirtyRatioProp

Просмотреть файл

@ -104,23 +104,24 @@ class DynamicBrokerConfigTest {
verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
}
@Test
def testSecurityConfigs(): Unit = {
def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit = {
def verifyUpdate(name: String, value: Object): Unit = {
verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = true)
verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = invalidValue)
verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = false)
verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = true)
verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = false, expectFailure = true)
}
verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", invalidValue = false)
verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = false)
verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", invalidValue = false)
verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue = false)
verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks")
verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password")
verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
}
private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {