KAFKA-1888: rolling upgrade test

ewencp gwenshap
This needs some refactoring to avoid the duplicated code between replication test and upgrade test, but in shape for initial feedback.

I'm interested in feedback on the added `KafkaConfig` class and `kafka_props` file. This addition makes it:
- easier to attach different configs to different nodes (e.g. during broker upgrade process)
- easier to reason about the configuration of a particular node

Notes:
- in the default values in the KafkaConfig class, I removed many properties which were in kafka.properties before. This is because most of those properties were set to what is already the default value.
- when running non-trunk VerifiableProducer, I append the trunk tools jar to the classpath, and run it with the non-trunk kafka-run-class.sh script

Author: Geoff Anderson <geoff@confluent.io>

Reviewers: Dong Lin, Ewen Cheslack-Postava

Closes #229 from granders/KAFKA-1888-upgrade-test
This commit is contained in:
Geoff Anderson 2015-10-27 15:23:47 -07:00 коммит произвёл Guozhang Wang
Родитель af42c37899
Коммит e6b343302f
31 изменённых файлов: 1238 добавлений и 331 удалений

Просмотреть файл

@ -14,3 +14,13 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
# This determines the version of kafkatest that can be published to PyPi and installed with pip
#
# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT"
# due to python version naming restrictions, which are enforced by python packaging tools
# (see https://www.python.org/dev/peps/pep-0440/)
#
# Instead, in trunk, the version should have a suffix of the form ".devN"
#
# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0"
__version__ = '0.9.0.0.dev0'

Просмотреть файл

@ -20,11 +20,16 @@ from ducktape.mark import matrix
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.version import LATEST_0_8_2
from kafkatest.services.console_consumer import ConsoleConsumer
from kafkatest.utils.remote_account import line_count, file_exists
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.utils.security_config import SecurityConfig
import time
class ConsoleConsumerTest(Test):
"""Sanity checks on console consumer service class."""
def __init__(self, test_context):
@ -32,24 +37,29 @@ class ConsoleConsumerTest(Test):
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic)
def setUp(self):
self.zk.start()
@parametrize(security_protocol='SSL', new_consumer=True)
@matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True])
@parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True)
@matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True])
def test_lifecycle(self, security_protocol, new_consumer):
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
security_protocol=security_protocol,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
"""Check that console consumer starts/stops properly, and that we are capturing log output."""
self.kafka.security_protocol = security_protocol
self.kafka.start()
self.consumer.security_protocol = security_protocol
self.consumer.new_consumer = new_consumer
t0 = time.time()
self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer)
self.consumer.start()
node = self.consumer.nodes[0]
wait_until(lambda: self.consumer.alive(node),
wait_until(lambda: self.consumer.alive(node),
timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start")
self.logger.info("consumer started in %s seconds " % str(time.time() - t0))
@ -62,3 +72,22 @@ class ConsoleConsumerTest(Test):
assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0
self.consumer.stop_node(node)
def test_version(self):
"""Check that console consumer v0.8.2.X successfully starts and consumes messages."""
self.kafka.start()
num_messages = 1000
self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=num_messages, throughput=1000)
self.producer.start()
self.producer.wait()
self.consumer.nodes[0].version = LATEST_0_8_2
self.consumer.consumer_timeout_ms = 1000
self.consumer.start()
self.consumer.wait()
num_consumed = len(self.consumer.messages_consumed[1])
num_produced = self.producer.num_acked
assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed)

Просмотреть файл

@ -0,0 +1,55 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService, config_property
from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
from kafkatest.utils import is_version
class KafkaVersionTest(Test):
"""Sanity checks on kafka versioning."""
def __init__(self, test_context):
super(KafkaVersionTest, self).__init__(test_context)
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
def setUp(self):
self.zk.start()
def test_0_8_2(self):
"""Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster."""
self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
node = self.kafka.nodes[0]
node.version = LATEST_0_8_2
self.kafka.start()
assert is_version(node, [LATEST_0_8_2])
def test_multi_version(self):
"""Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X,
the other on trunk."""
self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 2}})
self.kafka.nodes[1].version = LATEST_0_8_2
self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
self.kafka.start()
assert is_version(self.kafka.nodes[0], [TRUNK.vstring])
assert is_version(self.kafka.nodes[1], [LATEST_0_8_2])

Просмотреть файл

@ -0,0 +1,70 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.utils import is_version
class TestVerifiableProducer(Test):
"""Sanity checks on verifiable producer service class."""
def __init__(self, test_context):
super(TestVerifiableProducer, self).__init__(test_context)
self.topic = "topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk,
topics={self.topic: {"partitions": 1, "replication-factor": 1}})
self.num_messages = 100
# This will produce to source kafka cluster
self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic,
max_messages=self.num_messages, throughput=1000)
def setUp(self):
self.zk.start()
self.kafka.start()
@parametrize(producer_version=str(LATEST_0_8_2))
@parametrize(producer_version=str(TRUNK))
def test_simple_run(self, producer_version=TRUNK):
"""
Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and
verify that we can produce a small number of messages.
"""
node = self.producer.nodes[0]
node.version = KafkaVersion(producer_version)
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
err_msg="Producer failed to start in a reasonable amount of time.")
# using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring
# that this check works with TRUNK
# When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way
# verifiable producer pulls in some trunk directories into its classpath
assert is_version(node, [node.version.vstring, TRUNK.vstring])
self.producer.wait()
num_produced = self.producer.num_acked
assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages)

Просмотреть файл

@ -13,15 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from ducktape.utils.util import wait_until
from kafkatest.services.performance.jmx_mixin import JmxMixin
from kafkatest.services.performance import PerformanceService
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.utils.security_config import SecurityConfig
import itertools
import os
import subprocess
import itertools
def is_int(msg):
"""Default method used to check whether text pulled from console consumer is a message.
@ -30,7 +33,7 @@ def is_int(msg):
"""
try:
return int(msg)
except:
except ValueError:
return None
"""
@ -74,7 +77,7 @@ Option Description
"""
class ConsoleConsumer(JmxMixin, PerformanceService):
class ConsoleConsumer(JmxMixin, BackgroundThreadService):
# Root directory for persistent output
PERSISTENT_ROOT = "/mnt/console_consumer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout")
@ -94,10 +97,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
"consumer_log": {
"path": LOG_FILE,
"collect_default": True}
}
}
def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None,
from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None,
from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]):
"""
Args:
context: standard context
@ -114,7 +117,7 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
in a topic.
"""
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
PerformanceService.__init__(self, context, num_nodes)
BackgroundThreadService.__init__(self, context, num_nodes)
self.kafka = kafka
self.new_consumer = new_consumer
self.args = {
@ -122,47 +125,70 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
}
self.consumer_timeout_ms = consumer_timeout_ms
for node in self.nodes:
node.version = version
self.from_beginning = from_beginning
self.message_validator = message_validator
self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)}
self.client_id = client_id
self.security_protocol = security_protocol
# Process client configuration
self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms, client_id=self.client_id)
# Add security properties to the config. If security protocol is not specified,
# use the default in the template properties.
self.security_config = SecurityConfig(security_protocol, self.prop_file)
self.security_protocol = self.security_config.security_protocol
# Validate a few configs
if self.new_consumer is None:
self.new_consumer = self.security_protocol == SecurityConfig.SSL
if self.security_protocol == SecurityConfig.SSL and not self.new_consumer:
raise Exception("SSL protocol is supported only with the new consumer")
self.prop_file += str(self.security_config)
@property
def start_cmd(self):
def prop_file(self, node):
"""Return a string which can be used to create a configuration file appropriate for the given node."""
# Process client configuration
prop_file = self.render('console_consumer.properties')
if hasattr(node, "version") and node.version <= LATEST_0_8_2:
# in 0.8.2.X and earlier, console consumer does not have --timeout-ms option
# instead, we have to pass it through the config file
prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms)
# Add security properties to the config. If security protocol is not specified,
# use the default in the template properties.
self.security_config = SecurityConfig(self.security_protocol, prop_file)
self.security_protocol = self.security_config.security_protocol
prop_file += str(self.security_config)
return prop_file
def start_cmd(self, node):
"""Return the start command appropriate for the given node."""
args = self.args.copy()
args['zk_connect'] = self.kafka.zk.connect_setting()
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['stderr'] = ConsoleConsumer.STDERR_CAPTURE
args['log_dir'] = ConsoleConsumer.LOG_DIR
args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG
args['config_file'] = ConsoleConsumer.CONFIG_FILE
args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE
args['jmx_port'] = self.jmx_port
args['kafka_dir'] = kafka_dir(node)
args['broker_list'] = self.kafka.bootstrap_servers()
cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG
cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \
" --consumer.config %(config_file)s" % args
cmd = "export JMX_PORT=%(jmx_port)s; " \
"export LOG_DIR=%(log_dir)s; " \
"export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \
"/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \
"--topic %(topic)s --consumer.config %(config_file)s" % args
if self.new_consumer:
cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers()
cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args
else:
cmd += " --zookeeper %(zk_connect)s" % args
if self.from_beginning:
cmd += " --from-beginning"
if self.consumer_timeout_ms is not None:
cmd += " --timeout-ms %s" % self.consumer_timeout_ms
# version 0.8.X and below do not support --timeout-ms option
# This will be added in the properties file instead
if node.version > LATEST_0_8_2:
cmd += " --timeout-ms %s" % self.consumer_timeout_ms
cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args
return cmd
@ -183,8 +209,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
# Create and upload config file
self.logger.info("console_consumer.properties:")
self.logger.info(self.prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file)
prop_file = self.prop_file(node)
self.logger.info(prop_file)
node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file)
self.security_config.setup_node(node)
# Create and upload log properties
@ -192,23 +220,26 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config)
# Run and capture output
cmd = self.start_cmd
cmd = self.start_cmd(node)
self.logger.debug("Console consumer %d command: %s", idx, cmd)
consumer_output = node.account.ssh_capture(cmd, allow_fail=False)
first_line = consumer_output.next()
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], consumer_output):
msg = line.strip()
if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None:
self.messages_consumed[idx].append(msg)
first_line = next(consumer_output, None)
self.read_jmx_output(idx, node)
if first_line is not None:
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], consumer_output):
msg = line.strip()
if self.message_validator is not None:
msg = self.message_validator(msg)
if msg is not None:
self.messages_consumed[idx].append(msg)
self.read_jmx_output(idx, node)
def start_node(self, node):
PerformanceService.start_node(self, node)
BackgroundThreadService.start_node(self, node)
def stop_node(self, node):
node.account.kill_process("console_consumer", allow_fail=True)
@ -220,6 +251,6 @@ class ConsoleConsumer(JmxMixin, PerformanceService):
self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." %
(self.__class__.__name__, node.account))
JmxMixin.clean_node(self, node)
PerformanceService.clean_node(self, node)
node.account.kill_process("java", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)

Просмотреть файл

@ -15,8 +15,9 @@
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
import subprocess, signal
from kafkatest.services.kafka.directory import kafka_dir
import signal
class CopycatServiceBase(Service):
"""Base class for Copycat services providing some common settings and functionality"""
@ -99,7 +100,7 @@ class CopycatStandaloneService(CopycatServiceBase):
self.logger.info("Starting Copycat standalone process")
with node.account.monitor_log("/mnt/copycat.log") as monitor:
node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " +
node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) +
" ".join(remote_connector_configs) +
" 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid")
monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup")
@ -108,7 +109,6 @@ class CopycatStandaloneService(CopycatServiceBase):
raise RuntimeError("No process ids recorded")
class CopycatDistributedService(CopycatServiceBase):
"""Runs Copycat in distributed mode."""
@ -128,7 +128,7 @@ class CopycatDistributedService(CopycatServiceBase):
self.logger.info("Starting Copycat distributed process")
with node.account.monitor_log("/mnt/copycat.log") as monitor:
cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties "
cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node)
# Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them
# the first time the node is started so
if self.first_start and node == self.nodes[0]:
@ -140,4 +140,3 @@ class CopycatDistributedService(CopycatServiceBase):
if len(self.pids(node)) == 0:
raise RuntimeError("No process ids recorded")
self.first_start = False

Просмотреть файл

@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafka import KafkaService

Просмотреть файл

@ -0,0 +1,53 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import config_property
class KafkaConfig(dict):
"""A dictionary-like container class which allows for definition of overridable default values,
which is also capable of "rendering" itself as a useable server.properties file.
"""
DEFAULTS = {
config_property.PORT: 9092,
config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536,
config_property.LOG_DIRS: "/mnt/kafka-logs",
config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000
}
def __init__(self, **kwargs):
super(KafkaConfig, self).__init__(**kwargs)
# Set defaults
for key, val in self.DEFAULTS.items():
if not self.has_key(key):
self[key] = val
def render(self):
"""Render self as a series of lines key=val\n, and do so in a consistent order. """
keys = [k for k in self.keys()]
keys.sort()
s = ""
for k in keys:
s += "%s=%s\n" % (k, str(self[k]))
return s

Просмотреть файл

@ -0,0 +1,177 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Define Kafka configuration property names here.
"""
BROKER_ID = "broker.id"
PORT = "port"
ADVERTISED_HOSTNAME = "advertised.host.name"
NUM_NETWORK_THREADS = "num.network.threads"
NUM_IO_THREADS = "num.io.threads"
SOCKET_SEND_BUFFER_BYTES = "socket.send.buffer.bytes"
SOCKET_RECEIVE_BUFFER_BYTES = "socket.receive.buffer.bytes"
SOCKET_REQUEST_MAX_BYTES = "socket.request.max.bytes"
LOG_DIRS = "log.dirs"
NUM_PARTITIONS = "num.partitions"
NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir"
LOG_RETENTION_HOURS = "log.retention.hours"
LOG_SEGMENT_BYTES = "log.segment.bytes"
LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms"
LOG_CLEANER_ENABLE = "log.cleaner.enable"
AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable"
ZOOKEEPER_CONNECT = "zookeeper.connect"
ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms"
INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version"
"""
From KafkaConfig.scala
/** ********* General Configuration ***********/
val MaxReservedBrokerIdProp = "reserved.broker.max.id"
val MessageMaxBytesProp = "message.max.bytes"
val NumIoThreadsProp = "num.io.threads"
val BackgroundThreadsProp = "background.threads"
val QueuedMaxRequestsProp = "queued.max.requests"
/** ********* Socket Server Configuration ***********/
val PortProp = "port"
val HostNameProp = "host.name"
val ListenersProp = "listeners"
val AdvertisedPortProp = "advertised.port"
val AdvertisedListenersProp = "advertised.listeners"
val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val SocketRequestMaxBytesProp = "socket.request.max.bytes"
val MaxConnectionsPerIpProp = "max.connections.per.ip"
val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides"
val ConnectionsMaxIdleMsProp = "connections.max.idle.ms"
/** ********* Log Configuration ***********/
val NumPartitionsProp = "num.partitions"
val LogDirsProp = "log.dirs"
val LogDirProp = "log.dir"
val LogSegmentBytesProp = "log.segment.bytes"
val LogRollTimeMillisProp = "log.roll.ms"
val LogRollTimeHoursProp = "log.roll.hours"
val LogRollTimeJitterMillisProp = "log.roll.jitter.ms"
val LogRollTimeJitterHoursProp = "log.roll.jitter.hours"
val LogRetentionTimeMillisProp = "log.retention.ms"
val LogRetentionTimeMinutesProp = "log.retention.minutes"
val LogRetentionTimeHoursProp = "log.retention.hours"
val LogRetentionBytesProp = "log.retention.bytes"
val LogCleanupIntervalMsProp = "log.retention.check.interval.ms"
val LogCleanupPolicyProp = "log.cleanup.policy"
val LogCleanerThreadsProp = "log.cleaner.threads"
val LogCleanerIoMaxBytesPerSecondProp = "log.cleaner.io.max.bytes.per.second"
val LogCleanerDedupeBufferSizeProp = "log.cleaner.dedupe.buffer.size"
val LogCleanerIoBufferSizeProp = "log.cleaner.io.buffer.size"
val LogCleanerDedupeBufferLoadFactorProp = "log.cleaner.io.buffer.load.factor"
val LogCleanerBackoffMsProp = "log.cleaner.backoff.ms"
val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio"
val LogCleanerEnableProp = "log.cleaner.enable"
val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms"
val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes"
val LogIndexIntervalBytesProp = "log.index.interval.bytes"
val LogFlushIntervalMessagesProp = "log.flush.interval.messages"
val LogDeleteDelayMsProp = "log.segment.delete.delay.ms"
val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms"
val LogFlushIntervalMsProp = "log.flush.interval.ms"
val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms"
val LogPreAllocateProp = "log.preallocate"
val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir"
val MinInSyncReplicasProp = "min.insync.replicas"
/** ********* Replication configuration ***********/
val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms"
val DefaultReplicationFactorProp = "default.replication.factor"
val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms"
val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms"
val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes"
val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes"
val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms"
val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes"
val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms"
val NumReplicaFetchersProp = "num.replica.fetchers"
val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms"
val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests"
val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests"
val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable"
val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage"
val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds"
val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable"
val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol"
val InterBrokerProtocolVersionProp = "inter.broker.protocol.version"
/** ********* Controlled shutdown configuration ***********/
val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries"
val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms"
val ControlledShutdownEnableProp = "controlled.shutdown.enable"
/** ********* Consumer coordinator configuration ***********/
val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms"
val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms"
/** ********* Offset management configuration ***********/
val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes"
val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size"
val OffsetsTopicReplicationFactorProp = "offsets.topic.replication.factor"
val OffsetsTopicPartitionsProp = "offsets.topic.num.partitions"
val OffsetsTopicSegmentBytesProp = "offsets.topic.segment.bytes"
val OffsetsTopicCompressionCodecProp = "offsets.topic.compression.codec"
val OffsetsRetentionMinutesProp = "offsets.retention.minutes"
val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms"
val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms"
val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks"
/** ********* Quota Configuration ***********/
val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default"
val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default"
val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides"
val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides"
val NumQuotaSamplesProp = "quota.window.num"
val QuotaWindowSizeSecondsProp = "quota.window.size.seconds"
val DeleteTopicEnableProp = "delete.topic.enable"
val CompressionTypeProp = "compression.type"
/** ********* Kafka Metrics Configuration ***********/
val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG
val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG
val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG
/** ********* SSL Configuration ****************/
val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG
val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG
val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG
val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG
val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG
val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG
val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG
val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG
val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG
val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG
val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG
val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG
val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG
val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG
val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG
val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG
"""

Просмотреть файл

@ -0,0 +1,32 @@
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# "trunk" installation of kafka
KAFKA_TRUNK = "kafka-trunk"
def kafka_dir(node=None):
"""Return name of kafka directory for the given node.
This provides a convenient way to support different versions of kafka or kafka tools running
on different nodes.
"""
if node is None:
return KAFKA_TRUNK
if not hasattr(node, "version"):
return KAFKA_TRUNK
return "kafka-" + str(node.version)

Просмотреть файл

@ -15,11 +15,18 @@
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.services.performance.jmx_mixin import JmxMixin
from config import KafkaConfig
from kafkatest.services.kafka import config_property
from kafkatest.services.kafka.version import TRUNK
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.utils.security_config import SecurityConfig
import json
import re
import signal
import subprocess
import time
@ -29,13 +36,16 @@ class KafkaService(JmxMixin, Service):
"kafka_log": {
"path": "/mnt/kafka.log",
"collect_default": True},
"kafka_operational_logs": {
"path": "/mnt/kafka-operational-logs",
"collect_default": True},
"kafka_data": {
"path": "/mnt/kafka-logs",
"path": "/mnt/kafka-data-logs",
"collect_default": False}
}
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]):
"""
:type context
:type zk: ZookeeperService
@ -43,16 +53,28 @@ class KafkaService(JmxMixin, Service):
"""
Service.__init__(self, context, num_nodes)
JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes)
self.zk = zk
if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL:
self.security_config = SecurityConfig(SecurityConfig.SSL)
else:
self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT)
self.quota_config = quota_config
self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093
self.topics = topics
self.quota_config = quota_config
for node in self.nodes:
node.version = version
node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)})
@property
def security_config(self):
if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL:
return SecurityConfig(SecurityConfig.SSL)
else:
return SecurityConfig(SecurityConfig.PLAINTEXT)
@property
def port(self):
return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093
def start(self):
Service.start(self)
@ -66,20 +88,37 @@ class KafkaService(JmxMixin, Service):
topic_cfg["topic"] = topic
self.create_topic(topic_cfg)
def prop_file(self, node):
cfg = KafkaConfig(**node.config)
cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname
cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting()
# TODO - clean up duplicate configuration logic
prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config, port=self.port)
return prop_file
def start_cmd(self, node):
cmd = "export JMX_PORT=%d; " % self.jmx_port
cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; "
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &"
return cmd
def start_node(self, node):
props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node),
port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config,
interbroker_security_protocol=self.interbroker_security_protocol)
prop_file = self.prop_file(node)
self.logger.info("kafka.properties:")
self.logger.info(props_file)
node.account.create_file("/mnt/kafka.properties", props_file)
self.logger.info(prop_file)
node.account.create_file("/mnt/kafka.properties", prop_file)
self.security_config.setup_node(node)
cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port
cmd = self.start_cmd(node)
self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd))
with node.account.monitor_log("/mnt/kafka.log") as monitor:
node.account.ssh(cmd)
monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup")
self.start_jmx_tool(self.idx(node), node)
if len(self.pids(node)) == 0:
raise Exception("No process ids recorded on node %s" % str(node))
@ -87,8 +126,11 @@ class KafkaService(JmxMixin, Service):
def pids(self, node):
"""Return process ids associated with running processes on the given node."""
try:
return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)]
except:
cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def signal_node(self, node, sig=signal.SIGTERM):
@ -106,21 +148,27 @@ class KafkaService(JmxMixin, Service):
for pid in pids:
node.account.signal(pid, sig, allow_fail=False)
node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False)
wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop")
def clean_node(self, node):
JmxMixin.clean_node(self, node)
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False)
self.security_config.clean_node(node)
node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True)
node.account.ssh("rm -rf /mnt/*", allow_fail=False)
def create_topic(self, topic_cfg):
node = self.nodes[0] # any node is fine here
def create_topic(self, topic_cfg, node=None):
"""Run the admin tool create topic command.
Specifying node is optional, and may be done if for different kafka nodes have different versions,
and we care where command gets run.
If the node is not specified, run the command from self.nodes[0]
"""
if node is None:
node = self.nodes[0]
self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg)
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\
"--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node)
cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % {
'zk_connect': self.zk.connect_setting(),
'topic': topic_cfg.get("topic"),
'partitions': topic_cfg.get('partitions', 1),
@ -139,20 +187,23 @@ class KafkaService(JmxMixin, Service):
for line in self.describe_topic(topic_cfg["topic"]).split("\n"):
self.logger.info(line)
def describe_topic(self, topic):
node = self.nodes[0]
cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
(self.zk.connect_setting(), topic)
def describe_topic(self, topic, node=None):
if node is None:
node = self.nodes[0]
cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \
(kafka_dir(node), self.zk.connect_setting(), topic)
output = ""
for line in node.account.ssh_capture(cmd):
output += line
return output
def verify_reassign_partitions(self, reassignment):
def verify_reassign_partitions(self, reassignment, node=None):
"""Run the reassign partitions admin tool in "verify" mode
"""
node = self.nodes[0]
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
if node is None:
node = self.nodes[0]
json_file = "/tmp/%s_reassign.json" % str(time.time())
# reassignment to json
json_str = json.dumps(reassignment)
@ -160,12 +211,11 @@ class KafkaService(JmxMixin, Service):
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
"--zookeeper %(zk_connect)s "\
"--reassignment-json-file %(reassignment_file)s "\
"--verify" % {'zk_connect': self.zk.connect_setting(),
'reassignment_file': json_file}
cmd += " && sleep 1 && rm -f %s" % json_file
cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
cmd += "--zookeeper %s " % self.zk.connect_setting()
cmd += "--reassignment-json-file %s " % json_file
cmd += "--verify "
cmd += "&& sleep 1 && rm -f %s" % json_file
# send command
self.logger.info("Verifying parition reassignment...")
@ -181,11 +231,12 @@ class KafkaService(JmxMixin, Service):
return True
def execute_reassign_partitions(self, reassignment):
def execute_reassign_partitions(self, reassignment, node=None):
"""Run the reassign partitions admin tool in "verify" mode
"""
node = self.nodes[0]
json_file = "/tmp/" + str(time.time()) + "_reassign.json"
if node is None:
node = self.nodes[0]
json_file = "/tmp/%s_reassign.json" % str(time.time())
# reassignment to json
json_str = json.dumps(reassignment)
@ -193,11 +244,10 @@ class KafkaService(JmxMixin, Service):
# create command
cmd = "echo %s > %s && " % (json_str, json_file)
cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\
"--zookeeper %(zk_connect)s "\
"--reassignment-json-file %(reassignment_file)s "\
"--execute" % {'zk_connect': self.zk.connect_setting(),
'reassignment_file': json_file}
cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node)
cmd += "--zookeeper %s " % self.zk.connect_setting()
cmd += "--reassignment-json-file %s " % json_file
cmd += "--execute"
cmd += " && sleep 1 && rm -f %s" % json_file
# send command
@ -210,28 +260,29 @@ class KafkaService(JmxMixin, Service):
self.logger.debug("Verify partition reassignment:")
self.logger.debug(output)
def restart_node(self, node, wait_sec=0, clean_shutdown=True):
"""Restart the given node, waiting wait_sec in between stopping and starting up again."""
def restart_node(self, node, clean_shutdown=True):
"""Restart the given node."""
self.stop_node(node, clean_shutdown)
time.sleep(wait_sec)
self.start_node(node)
def leader(self, topic, partition=0):
""" Get the leader replica for the given topic and partition.
"""
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \
% self.zk.connect_setting()
kafka_dir = KAFKA_TRUNK
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\
(kafka_dir, self.zk.connect_setting())
cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition)
self.logger.debug(cmd)
node = self.nodes[0]
node = self.zk.nodes[0]
self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic))
partition_state = None
for line in node.account.ssh_capture(cmd):
match = re.match("^({.+})$", line)
if match is not None:
partition_state = match.groups()[0]
break
# loop through all lines in the output, but only hold on to the first match
if partition_state is None:
match = re.match("^({.+})$", line)
if match is not None:
partition_state = match.groups()[0]
if partition_state is None:
raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition))
@ -244,10 +295,9 @@ class KafkaService(JmxMixin, Service):
return self.get_node(leader_idx)
def bootstrap_servers(self):
"""Get the broker list to connect to Kafka using the specified security protocol
"""
return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes])
"""Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,...
using the port for the configured security protocol.
def read_jmx_output_all_nodes(self):
for node in self.nodes:
self.read_jmx_output(self.idx(node), node)
This is the format expected by many config files.
"""
return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes])

Просмотреть файл

@ -15,11 +15,8 @@
# see kafka.server.KafkaConfig for additional details and defaults
broker.id={{ broker_id }}
port=9092
#host.name=localhost
advertised.host.name={{ node.account.hostname }}
#advertised.port=<port accessible by clients>
{% if security_protocol == interbroker_security_protocol %}
listeners={{ security_protocol }}://:{{ port }}
advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }}
@ -27,26 +24,20 @@ advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ po
listeners=PLAINTEXT://:9092,SSL://:9093
advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093
{% endif %}
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=65536
socket.request.max.bytes=104857600
log.dirs=/mnt/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
#log.flush.interval.messages=10000
#log.flush.interval.ms=1000
log.retention.hours=168
#log.retention.bytes=1073741824
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect={{ zk.connect_setting() }}
zookeeper.connection.timeout.ms=2000
{% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %}
quota.producer.default={{ quota_config.quota_producer_default }}
{% endif %}
@ -64,6 +55,7 @@ quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_p
{% endif %}
security.inter.broker.protocol={{ interbroker_security_protocol }}
ssl.keystore.location=/mnt/ssl/test.keystore.jks
ssl.keystore.password=test-ks-passwd
ssl.key.password=test-key-passwd
@ -71,4 +63,3 @@ ssl.keystore.type=JKS
ssl.truststore.location=/mnt/ssl/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS

Просмотреть файл

@ -0,0 +1,61 @@
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.utils import kafkatest_version
from distutils.version import LooseVersion
class KafkaVersion(LooseVersion):
"""Container for kafka versions which makes versions simple to compare.
distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic.
Example:
v10 = KafkaVersion("0.10.0")
v9 = KafkaVersion("0.9.0.1")
assert v10 > v9 # assertion passes!
"""
def __init__(self, version_string):
self.is_trunk = (version_string.lower() == "trunk")
if self.is_trunk:
# Since "trunk" may actually be a branch that is not trunk,
# use kafkatest_version() for comparison purposes,
# and track whether we're in "trunk" with a flag
version_string = kafkatest_version()
# Drop dev suffix if present
dev_suffix_index = version_string.find(".dev")
if dev_suffix_index >= 0:
version_string = version_string[:dev_suffix_index]
# Don't use the form super.(...).__init__(...) because
# LooseVersion is an "old style" python class
LooseVersion.__init__(self, version_string)
def __str__(self):
if self.is_trunk:
return "trunk"
else:
return LooseVersion.__str__(self)
TRUNK = KafkaVersion("trunk")
# 0.8.2.X versions
V_0_8_2_1 = KafkaVersion("0.8.2.1")
V_0_8_2_2 = KafkaVersion("0.8.2.2")
LATEST_0_8_2 = V_0_8_2_2

Просмотреть файл

@ -15,6 +15,8 @@
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir
class KafkaLog4jAppender(BackgroundThreadService):
@ -32,14 +34,15 @@ class KafkaLog4jAppender(BackgroundThreadService):
self.max_messages = max_messages
def _worker(self, idx, node):
cmd = self.start_cmd
cmd = self.start_cmd(node)
self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd))
node.account.ssh(cmd)
@property
def start_cmd(self):
cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
def start_cmd(self, node):
cmd = "/opt/%s/bin/" % kafka_dir(node)
cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender"
cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)

Просмотреть файл

@ -17,6 +17,8 @@
from ducktape.services.service import Service
from ducktape.utils.util import wait_until
from kafkatest.services.kafka.directory import kafka_dir
import os
import subprocess
@ -63,7 +65,6 @@ class MirrorMaker(Service):
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties")
CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties")
KAFKA_HOME = "/opt/kafka/"
logs = {
"mirror_maker_log": {
@ -101,7 +102,7 @@ class MirrorMaker(Service):
def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG
cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME
cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node)
cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG
cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG
if isinstance(self.num_streams, int):

Просмотреть файл

@ -0,0 +1,14 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

Просмотреть файл

@ -13,8 +13,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
class JmxMixin(object):
from kafkatest.services.kafka.directory import kafka_dir
class JmxMixin(object):
"""This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats.
Note that this is not a service in its own right.
"""
def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]):
self.jmx_object_names = jmx_object_names
self.jmx_attributes = jmx_attributes
@ -30,12 +35,11 @@ class JmxMixin(object):
node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False)
def start_jmx_tool(self, idx, node):
if self.started[idx-1] == True or self.jmx_object_names == None:
if self.started[idx-1] or self.jmx_object_names is None:
return
self.started[idx-1] = True
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \
"--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.JmxTool " \
"--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (kafka_dir(node), self.jmx_port)
for jmx_object_name in self.jmx_object_names:
cmd += " --object-name %s" % jmx_object_name
for jmx_attribute in self.jmx_attributes:
@ -46,11 +50,12 @@ class JmxMixin(object):
jmx_output = node.account.ssh_capture(cmd, allow_fail=False)
jmx_output.next()
self.started[idx-1] = True
def read_jmx_output(self, idx, node):
if self.started[idx-1] == False:
return
self.maximum_jmx_value = {}
self.average_jmx_value = {}
object_attribute_names = []
cmd = "cat /mnt/jmx_tool.log"
@ -64,7 +69,7 @@ class JmxMixin(object):
self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)}
# do not calculate average and maximum of jmx stats until we have read output from all nodes
if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats):
if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats):
return
start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats])
@ -72,10 +77,14 @@ class JmxMixin(object):
for name in object_attribute_names:
aggregates_per_time = []
for time_sec in xrange(start_time_sec, end_time_sec+1):
for time_sec in xrange(start_time_sec, end_time_sec + 1):
# assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth
values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats]
# assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth
aggregates_per_time.append(sum(values_per_node))
self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time)
self.maximum_jmx_value[name] = max(aggregates_per_time)
self.average_jmx_value[name] = sum(aggregates_per_time) / len(aggregates_per_time)
self.maximum_jmx_value[name] = max(aggregates_per_time)
def read_jmx_output_all_nodes(self):
for node in self.nodes:
self.read_jmx_output(self.idx(node), node)

Просмотреть файл

@ -14,6 +14,7 @@
# limitations under the License.
from kafkatest.services.performance import PerformanceService
from kafkatest.services.kafka.directory import kafka_dir
from kafkatest.utils.security_config import SecurityConfig
import os
@ -120,11 +121,10 @@ class ConsumerPerformanceService(PerformanceService):
return args
@property
def start_cmd(self):
def start_cmd(self, node):
cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG
cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh"
cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node)
for key, value in self.args.items():
cmd += " --%s %s" % (key, value)
cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE
@ -144,7 +144,7 @@ class ConsumerPerformanceService(PerformanceService):
node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config))
self.security_config.setup_node(node)
cmd = self.start_cmd
cmd = self.start_cmd(node)
self.logger.debug("Consumer performance %d command: %s", idx, cmd)
last = None
for line in node.account.ssh_capture(cmd):

Просмотреть файл

@ -16,6 +16,8 @@
from kafkatest.services.performance import PerformanceService
from kafkatest.utils.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir
class EndToEndLatencyService(PerformanceService):
@ -51,10 +53,8 @@ class EndToEndLatencyService(PerformanceService):
'ssl_config_file': ssl_config_file
})
cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\
"%(bootstrap_servers)s %(topic)s %(num_records)d "\
"%(acks)d 20 %(ssl_config_file)s" % args
cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node)
cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args
cmd += " | tee /mnt/end-to-end-latency.log"
self.logger.debug("End-to-end latency %d command: %s", idx, cmd)

Просмотреть файл

@ -13,10 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.performance.jmx_mixin import JmxMixin
from kafkatest.services.monitor.jmx import JmxMixin
from kafkatest.services.performance import PerformanceService
import itertools
from kafkatest.utils.security_config import SecurityConfig
from kafkatest.services.kafka.directory import kafka_dir
class ProducerPerformanceService(JmxMixin, PerformanceService):
@ -45,8 +46,13 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
def _worker(self, idx, node):
args = self.args.copy()
args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id})
cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
args.update({
'bootstrap_servers': self.kafka.bootstrap_servers(),
'jmx_port': self.jmx_port,
'client_id': self.client_id,
'kafka_directory': kafka_dir(node)
})
cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \
"%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args
self.security_config.setup_node(node)
@ -73,19 +79,21 @@ class ProducerPerformanceService(JmxMixin, PerformanceService):
}
last = None
producer_output = node.account.ssh_capture(cmd)
first_line = producer_output.next()
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], producer_output):
if self.intermediate_stats:
try:
self.stats[idx-1].append(parse_stats(line))
except:
# Sometimes there are extraneous log messages
pass
first_line = next(producer_output, None)
last = line
try:
self.results[idx-1] = parse_stats(last)
except:
raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
self.read_jmx_output(idx, node)
if first_line is not None:
self.start_jmx_tool(idx, node)
for line in itertools.chain([first_line], producer_output):
if self.intermediate_stats:
try:
self.stats[idx-1].append(parse_stats(line))
except:
# Sometimes there are extraneous log messages
pass
last = line
try:
self.results[idx-1] = parse_stats(last)
except:
raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last))
self.read_jmx_output(idx, node)

Просмотреть файл

@ -14,28 +14,49 @@
# limitations under the License.
from ducktape.services.background_thread import BackgroundThreadService
from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK
from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2
from kafkatest.utils.security_config import SecurityConfig
import json
import os
import subprocess
import time
class VerifiableProducer(BackgroundThreadService):
PERSISTENT_ROOT = "/mnt/verifiable_producer"
STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout")
STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr")
LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log")
LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.properties")
CONFIG_FILE = "/mnt/verifiable_producer.properties"
logs = {
"producer_log": {
"path": "/mnt/producer.log",
"collect_default": False}
}
"verifiable_producer_stdout": {
"path": STDOUT_CAPTURE,
"collect_default": False},
"verifiable_producer_stderr": {
"path": STDERR_CAPTURE,
"collect_default": False},
"verifiable_producer_log": {
"path": LOG_FILE,
"collect_default": True}
}
def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000):
def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK):
super(VerifiableProducer, self).__init__(context, num_nodes)
self.log_level = "TRACE"
self.kafka = kafka
self.topic = topic
self.max_messages = max_messages
self.throughput = throughput
for node in self.nodes:
node.version = version
self.acked_values = []
self.not_acked_values = []
@ -45,15 +66,24 @@ class VerifiableProducer(BackgroundThreadService):
self.prop_file += str(self.security_config)
def _worker(self, idx, node):
node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False)
# Create and upload log properties
log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE)
node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config)
# Create and upload config file
self.logger.info("verifiable_producer.properties:")
self.logger.info(self.prop_file)
node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file)
self.security_config.setup_node(node)
cmd = self.start_cmd
cmd = self.start_cmd(node)
self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd))
last_produced_time = time.time()
prev_msg = None
for line in node.account.ssh_capture(cmd):
line = line.strip()
@ -68,9 +98,30 @@ class VerifiableProducer(BackgroundThreadService):
elif data["name"] == "producer_send_success":
self.acked_values.append(int(data["value"]))
@property
def start_cmd(self):
cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \
# Log information if there is a large gap between successively acknowledged messages
t = time.time()
time_delta_sec = t - last_produced_time
if time_delta_sec > 2 and prev_msg is not None:
self.logger.debug(
"Time delta between successively acked messages is large: " +
"delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data)))
last_produced_time = t
prev_msg = data
def start_cmd(self, node):
cmd = ""
if node.version <= LATEST_0_8_2:
# 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add
# the tools jar from trunk to the classpath
cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK
cmd += "export CLASSPATH; "
cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR
cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG
cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \
" --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers())
if self.max_messages > 0:
cmd += " --max-messages %s" % str(self.max_messages)
@ -78,9 +129,20 @@ class VerifiableProducer(BackgroundThreadService):
cmd += " --throughput %s" % str(self.throughput)
cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE
cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &"
cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE)
return cmd
def pids(self, node):
try:
cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'"
pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)]
return pid_arr
except (subprocess.CalledProcessError, ValueError) as e:
return []
def alive(self, node):
return len(self.pids(node)) > 0
@property
def acked(self):
with self.lock:
@ -113,7 +175,7 @@ class VerifiableProducer(BackgroundThreadService):
def clean_node(self, node):
node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False)
node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False)
node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
self.security_config.clean_node(node)
def try_parse_json(self, string):

Просмотреть файл

@ -16,6 +16,8 @@
from ducktape.services.service import Service
from kafkatest.services.kafka.directory import kafka_dir
import subprocess
import time
@ -46,9 +48,9 @@ class ZookeeperService(Service):
self.logger.info(config_file)
node.account.create_file("/mnt/zookeeper.properties", config_file)
node.account.ssh(
"/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &"
% self.logs["zk_log"])
start_cmd = "/opt/%s/bin/zookeeper-server-start.sh " % kafka_dir(node)
start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"]
node.account.ssh(start_cmd)
time.sleep(5) # give it some time to start

Просмотреть файл

@ -0,0 +1,106 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
class ProduceConsumeValidateTest(Test):
"""This class provides a shared template for tests which follow the common pattern of:
- produce to a topic in the background
- consume from that topic in the background
- run some logic, e.g. fail topic leader etc.
- perform validation
"""
def __init__(self, test_context):
super(ProduceConsumeValidateTest, self).__init__(test_context=test_context)
def setup_producer_and_consumer(self):
raise NotImplementedError("Subclasses should implement this")
def start_producer_and_consumer(self):
# Start background producer and consumer
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10,
err_msg="Producer failed to start in a reasonable amount of time.")
self.consumer.start()
wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10,
err_msg="Consumer failed to start in a reasonable amount of time.")
def stop_producer_and_consumer(self):
for node in self.consumer.nodes:
if not self.consumer.alive(node):
self.logger.warn("Consumer on %s is not alive and probably should be." % str(node.account))
for node in self.producer.nodes:
if not self.producer.alive(node):
self.logger.warn("Producer on %s is not alive and probably should be." % str(node.account))
# Check that producer is still successfully producing
currently_acked = self.producer.num_acked
wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10,
err_msg="Expected producer to still be producing.")
self.producer.stop()
self.consumer.wait()
def run_produce_consume_validate(self, core_test_action):
"""Top-level template for simple produce/consume/validate tests."""
self.start_producer_and_consumer()
core_test_action()
self.stop_producer_and_consumer()
self.validate()
def validate(self):
"""Check that each acked message was consumed."""
self.acked = self.producer.acked
self.not_acked = self.producer.not_acked
# Check produced vs consumed
self.consumed = self.consumer.messages_consumed[1]
self.logger.info("num consumed: %d" % len(self.consumed))
success = True
msg = ""
if len(set(self.consumed)) != len(self.consumed):
# There are duplicates. This is ok, so report it but don't fail the test
msg += "There are duplicate messages in the log\n"
if not set(self.consumed).issuperset(set(self.acked)):
# Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
success = False
msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: "
if len(acked_minus_consumed) < 20:
msg += str(acked_minus_consumed)
else:
for i in range(20):
msg += str(acked_minus_consumed.pop()) + ", "
msg += "...plus " + str(len(acked_minus_consumed) - 20) + " more"
if not success:
# Collect all the data logs if there was a failure
self.mark_for_collect(self.kafka)
if not success:
self.mark_for_collect(self.producer)
assert success, msg

Просмотреть файл

@ -14,18 +14,13 @@
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.performance import ProducerPerformanceService
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
from kafkatest.services.console_consumer import ConsoleConsumer
import random
import signal
import time
class QuotaTest(Test):
"""
@ -73,7 +68,10 @@ class QuotaTest(Test):
"""Override this since we're adding services outside of the constructor"""
return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers
def run_clients(self, producer_id, producer_num, consumer_id, consumer_num):
@parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
# Produce all messages
producer = ProducerPerformanceService(
self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol,
@ -91,7 +89,7 @@ class QuotaTest(Test):
consumer.run()
for idx, messages in consumer.messages_consumed.iteritems():
assert len(messages)>0, "consumer %d didn't consume any message before timeout" % idx
assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx
success, msg = self.validate(self.kafka, producer, consumer)
assert success, msg
@ -172,9 +170,3 @@ class QuotaTest(Test):
if client_id in overridden_quotas:
return float(overridden_quotas[client_id])
return self.quota_config['quota_consumer_default']
@parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1)
@parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2)
def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1):
self.run_clients(producer_id, producer_num, consumer_id, consumer_num)

Просмотреть файл

@ -13,24 +13,76 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ducktape.tests.test import Test
from ducktape.utils.util import wait_until
from ducktape.mark import parametrize
from ducktape.mark import matrix
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
import signal
import time
class ReplicationTest(Test):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios."""
def clean_shutdown(test):
"""Discover leader node for our topic and shut it down cleanly."""
test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM)
def hard_shutdown(test):
"""Discover leader node for our topic and shut it down with a hard kill."""
test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL)
def clean_bounce(test):
"""Chase the leader of one partition and restart it cleanly."""
for i in range(5):
prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
test.kafka.restart_node(prev_leader_node, clean_shutdown=True)
def hard_bounce(test):
"""Chase the leader and restart it with a hard kill."""
for i in range(5):
prev_leader_node = test.kafka.leader(topic=test.topic, partition=0)
test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL)
# Since this is a hard kill, we need to make sure the process is down and that
# zookeeper and the broker cluster have registered the loss of the leader.
# Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this.
def leader_changed():
current_leader = test.kafka.leader(topic=test.topic, partition=0)
return current_leader is not None and current_leader != prev_leader_node
wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5)
wait_until(leader_changed, timeout_sec=10, backoff_sec=.5)
test.kafka.start_node(prev_leader_node)
failures = {
"clean_shutdown": clean_shutdown,
"hard_shutdown": hard_shutdown,
"clean_bounce": clean_bounce,
"hard_bounce": hard_bounce
}
class ReplicationTest(ProduceConsumeValidateTest):
"""
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
ordering guarantees.
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
indicator that nothing is left to consume.
"""
def __init__(self, test_context):
""":type test_context: ducktape.tests.test.TestContext"""
@ -38,6 +90,11 @@ class ReplicationTest(Test):
self.topic = "test_topic"
self.zk = ZookeeperService(test_context, num_nodes=1)
self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}
})
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1
@ -49,125 +106,27 @@ class ReplicationTest(Test):
"""Override this since we're adding services outside of the constructor"""
return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers
def run_with_failure(self, failure, interbroker_security_protocol):
"""This is the top-level test template.
The steps are:
Produce messages in the background while driving some failure condition
When done driving failures, immediately stop producing
Consume all messages
Validate that messages acked by brokers were consumed
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
interbroker_security_protocol=["PLAINTEXT", "SSL"])
def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios.
Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages
(foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop
too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose
ordering guarantees.
Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked,
we might exit early if some messages are duplicated (though not an issue here since producer retries==0)
Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively
consumed messages. Since we run the producer to completion before running the consumer, this is a reliable
indicator that nothing is left to consume.
Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2
- Produce messages in the background
- Consume messages in the background
- Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9)
- When done driving failures, stop producing, and finish consuming
- Validate that every acked message was consumed
"""
security_protocol='PLAINTEXT'
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk,
security_protocol=security_protocol,
interbroker_security_protocol=interbroker_security_protocol,
topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}
})
client_security_protocol = 'PLAINTEXT'
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int)
self.kafka.interbroker_security_protocol = interbroker_security_protocol
self.kafka.start()
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int)
# Produce in a background thread while driving broker failures
self.producer.start()
wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5,
err_msg="Producer failed to start in a reasonable amount of time.")
failure()
self.producer.stop()
self.acked = self.producer.acked
self.not_acked = self.producer.not_acked
self.logger.info("num not acked: %d" % self.producer.num_not_acked)
self.logger.info("num acked: %d" % self.producer.num_acked)
# Consume all messages
self.consumer.start()
self.consumer.wait()
self.consumed = self.consumer.messages_consumed[1]
self.logger.info("num consumed: %d" % len(self.consumed))
# Check produced vs consumed
success, msg = self.validate()
if not success:
self.mark_for_collect(self.producer)
assert success, msg
def clean_shutdown(self):
"""Discover leader node for our topic and shut it down cleanly."""
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM)
def hard_shutdown(self):
"""Discover leader node for our topic and shut it down with a hard kill."""
self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL)
def clean_bounce(self):
"""Chase the leader of one partition and restart it cleanly."""
for i in range(5):
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True)
def hard_bounce(self):
"""Chase the leader and restart it cleanly."""
for i in range(5):
prev_leader_node = self.kafka.leader(topic=self.topic, partition=0)
self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False)
# Wait long enough for previous leader to probably be awake again
time.sleep(6)
def validate(self):
"""Check that produced messages were consumed."""
success = True
msg = ""
if len(set(self.consumed)) != len(self.consumed):
# There are duplicates. This is ok, so report it but don't fail the test
msg += "There are duplicate messages in the log\n"
if not set(self.consumed).issuperset(set(self.acked)):
# Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages.
acked_minus_consumed = set(self.producer.acked) - set(self.consumed)
success = False
msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed)
if not success:
# Collect all the data logs if there was a failure
self.mark_for_collect(self.kafka)
return success, msg
@matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
def test_clean_shutdown(self, interbroker_security_protocol):
self.run_with_failure(self.clean_shutdown, interbroker_security_protocol)
@matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
def test_hard_shutdown(self, interbroker_security_protocol):
self.run_with_failure(self.hard_shutdown, interbroker_security_protocol)
@matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
def test_clean_bounce(self, interbroker_security_protocol):
self.run_with_failure(self.clean_bounce, interbroker_security_protocol)
@matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL'])
def test_hard_bounce(self, interbroker_security_protocol):
self.run_with_failure(self.hard_bounce, interbroker_security_protocol)
self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self))

Просмотреть файл

@ -0,0 +1,81 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.kafka import KafkaService
from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK
from kafkatest.services.verifiable_producer import VerifiableProducer
from kafkatest.services.console_consumer import ConsoleConsumer, is_int
from kafkatest.services.kafka import config_property
from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest
class TestUpgrade(ProduceConsumeValidateTest):
def __init__(self, test_context):
super(TestUpgrade, self).__init__(test_context=test_context)
def setUp(self):
self.topic = "test_topic"
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: {
"partitions": 3,
"replication-factor": 3,
"min.insync.replicas": 2}})
self.zk.start()
self.kafka.start()
# Producer and consumer
self.producer_throughput = 10000
self.num_producers = 1
self.num_consumers = 1
self.producer = VerifiableProducer(
self.test_context, self.num_producers, self.kafka, self.topic,
throughput=self.producer_throughput, version=LATEST_0_8_2)
# TODO - reduce the timeout
self.consumer = ConsoleConsumer(
self.test_context, self.num_consumers, self.kafka, self.topic,
consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2)
def perform_upgrade(self):
self.logger.info("First pass bounce - rolling upgrade")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
node.version = TRUNK
node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X"
self.kafka.start_node(node)
self.logger.info("Second pass bounce - remove inter.broker.protocol.version config")
for node in self.kafka.nodes:
self.kafka.stop_node(node)
del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION]
self.kafka.start_node(node)
def test_upgrade(self):
"""Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0
- Start 3 node broker cluster on version 0.8.2
- Start producer and consumer in the background
- Perform two-phase rolling upgrade
- First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X
- Second phase: remove inter.broker.protocol.version config with rolling bounce
- Finally, validate that every message acked by the producer was consumed by the consumer
"""
self.run_produce_consume_validate(core_test_action=self.perform_upgrade)

Просмотреть файл

@ -12,4 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
# see kafka.server.KafkaConfig for additional details and defaults
from util import kafkatest_version, is_version

Просмотреть файл

@ -0,0 +1,42 @@
# Copyright 2015 Confluent Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from kafkatest import __version__ as __kafkatest_version__
import re
def kafkatest_version():
"""Return string representation of current ducktape version."""
return __kafkatest_version__
def _kafka_jar_versions(proc_string):
"""Use a rough heuristic to find all kafka versions explicitly in the process classpath"""
versions = re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)
versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string))
return set(versions)
def is_version(node, version_list, proc_grep_string="kafka"):
"""Heuristic to check that only the specified version appears in the classpath of the process
A useful tool to aid in checking that service version apis are working correctly.
"""
lines = [l for l in node.account.ssh_capture("ps ax | grep %s | grep -v grep" % proc_grep_string)]
assert len(lines) == 1
versions = _kafka_jar_versions(lines[0])
return versions == {str(v) for v in version_list}

Просмотреть файл

@ -14,15 +14,21 @@
# limitations under the License.
# see kafka.server.KafkaConfig for additional details and defaults
import re
from setuptools import find_packages, setup
version = ''
with open('kafkatest/__init__.py', 'r') as fd:
version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
fd.read(), re.MULTILINE).group(1)
setup(name="kafkatest",
version="0.9.0.dev0",
version=version,
description="Apache Kafka System Tests",
author="Apache Kafka",
platforms=["any"],
license="apache2.0",
packages=find_packages(),
include_package_data=True,
install_requires=["ducktape==0.3.2"]
install_requires=["ducktape==0.3.8"]
)

Просмотреть файл

@ -23,12 +23,14 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.utils.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@ -138,7 +140,29 @@ public class VerifiableProducer {
return parser;
}
/**
* Read a properties file from the given path
* @param filename The path of the file to read
*
* Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate
* but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests,
* we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars.
* Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate.
*/
public static Properties loadProps(String filename) throws IOException, FileNotFoundException {
Properties props = new Properties();
InputStream propStream = null;
try {
propStream = new FileInputStream(filename);
props.load(propStream);
} finally {
if (propStream != null)
propStream.close();
}
return props;
}
/** Construct a VerifiableProducer object from command-line arguments. */
public static VerifiableProducer createFromArgs(String[] args) {
ArgumentParser parser = argParser();
@ -164,7 +188,7 @@ public class VerifiableProducer {
producerProps.put("retries", "0");
if (configFile != null) {
try {
producerProps.putAll(Utils.loadProps(configFile));
producerProps.putAll(loadProps(configFile));
} catch (IOException e) {
throw new ArgumentParserException(e.getMessage(), parser);
}

Просмотреть файл

@ -38,9 +38,31 @@ if [ -z `which javac` ]; then
fi
chmod a+rw /opt
if [ ! -e /opt/kafka ]; then
ln -s /vagrant /opt/kafka
if [ -h /opt/kafka-trunk ]; then
# reset symlink
rm /opt/kafka-trunk
fi
ln -s /vagrant /opt/kafka-trunk
get_kafka() {
version=$1
kafka_dir=/opt/kafka-$version
url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
if [ ! -d /opt/kafka-$version ]; then
pushd /tmp
curl -O $url
file_tgz=`basename $url`
tar -xzf $file_tgz
rm -rf $file_tgz
file=`basename $file_tgz .tgz`
mv $file $kafka_dir
popd
fi
}
get_kafka 0.8.2.2
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local
# VMs, we can just create it if it doesn't exist and use it like we'd use