зеркало из https://github.com/microsoft/kafka.git
Options in SyncProducerConfig and AsyncProducerConfig can leak, KAFKA-83
git-svn-id: https://svn.apache.org/repos/asf/incubator/kafka/trunk@1154804 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Родитель
96b8e03dd1
Коммит
f7046c247d
|
@ -69,24 +69,15 @@ class ProducerPool[V](private val config: ProducerConfig,
|
|||
* @param port the port of the broker
|
||||
*/
|
||||
def addProducer(broker: Broker) {
|
||||
val props = new Properties()
|
||||
props.put("host", broker.host)
|
||||
props.put("port", broker.port.toString)
|
||||
props.putAll(config.props)
|
||||
if(sync) {
|
||||
val props = new Properties()
|
||||
props.put("host", broker.host)
|
||||
props.put("port", broker.port.toString)
|
||||
props.put("buffer.size", config.bufferSize.toString)
|
||||
props.put("connect.timeout.ms", config.connectTimeoutMs.toString)
|
||||
props.put("reconnect.interval", config.reconnectInterval.toString)
|
||||
val producer = new SyncProducer(new SyncProducerConfig(props))
|
||||
logger.info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
|
||||
syncProducers.put(broker.id, producer)
|
||||
} else {
|
||||
val props = new Properties()
|
||||
props.put("host", broker.host)
|
||||
props.put("port", broker.port.toString)
|
||||
props.put("queue.time", config.queueTime.toString)
|
||||
props.put("queue.size", config.queueSize.toString)
|
||||
props.put("batch.size", config.batchSize.toString)
|
||||
props.put("serializer.class", config.serializerClass)
|
||||
val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
|
||||
new SyncProducer(new SyncProducerConfig(props)),
|
||||
serializer,
|
||||
|
|
Загрузка…
Ссылка в новой задаче