diff --git a/tests/kafkatest/__init__.py b/tests/kafkatest/__init__.py index 28d269bc4..e346811e6 100644 --- a/tests/kafkatest/__init__.py +++ b/tests/kafkatest/__init__.py @@ -14,3 +14,13 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults +# This determines the version of kafkatest that can be published to PyPi and installed with pip +# +# Note that in development, this version name can't follow Kafka's convention of having a trailing "-SNAPSHOT" +# due to python version naming restrictions, which are enforced by python packaging tools +# (see https://www.python.org/dev/peps/pep-0440/) +# +# Instead, in trunk, the version should have a suffix of the form ".devN" +# +# For example, when Kafka is at version 0.9.0.0-SNAPSHOT, this should be something like "0.9.0.0.dev0" +__version__ = '0.9.0.0.dev0' diff --git a/tests/kafkatest/sanity_checks/test_console_consumer.py b/tests/kafkatest/sanity_checks/test_console_consumer.py index a9c4d53f0..0d2c1fd92 100644 --- a/tests/kafkatest/sanity_checks/test_console_consumer.py +++ b/tests/kafkatest/sanity_checks/test_console_consumer.py @@ -20,11 +20,16 @@ from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_8_2 from kafkatest.services.console_consumer import ConsoleConsumer from kafkatest.utils.remote_account import line_count, file_exists +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.utils.security_config import SecurityConfig + import time + class ConsoleConsumerTest(Test): """Sanity checks on console consumer service class.""" def __init__(self, test_context): @@ -32,24 +37,29 @@ class ConsoleConsumerTest(Test): self.topic = "topic" self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic) def setUp(self): self.zk.start() - @parametrize(security_protocol='SSL', new_consumer=True) - @matrix(security_protocol=['PLAINTEXT'], new_consumer=[False, True]) + @parametrize(security_protocol=SecurityConfig.SSL, new_consumer=True) + @matrix(security_protocol=[SecurityConfig.PLAINTEXT], new_consumer=[False, True]) def test_lifecycle(self, security_protocol, new_consumer): - self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, - security_protocol=security_protocol, - topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + """Check that console consumer starts/stops properly, and that we are capturing log output.""" + + self.kafka.security_protocol = security_protocol self.kafka.start() + self.consumer.security_protocol = security_protocol + self.consumer.new_consumer = new_consumer + t0 = time.time() - self.consumer = ConsoleConsumer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, security_protocol=security_protocol, new_consumer=new_consumer) self.consumer.start() node = self.consumer.nodes[0] - wait_until(lambda: self.consumer.alive(node), + wait_until(lambda: self.consumer.alive(node), timeout_sec=10, backoff_sec=.2, err_msg="Consumer was too slow to start") self.logger.info("consumer started in %s seconds " % str(time.time() - t0)) @@ -62,3 +72,22 @@ class ConsoleConsumerTest(Test): assert line_count(node, ConsoleConsumer.STDOUT_CAPTURE) == 0 self.consumer.stop_node(node) + + def test_version(self): + """Check that console consumer v0.8.2.X successfully starts and consumes messages.""" + self.kafka.start() + + num_messages = 1000 + self.producer = VerifiableProducer(self.test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, + max_messages=num_messages, throughput=1000) + self.producer.start() + self.producer.wait() + + self.consumer.nodes[0].version = LATEST_0_8_2 + self.consumer.consumer_timeout_ms = 1000 + self.consumer.start() + self.consumer.wait() + + num_consumed = len(self.consumer.messages_consumed[1]) + num_produced = self.producer.num_acked + assert num_produced == num_consumed, "num_produced: %d, num_consumed: %d" % (num_produced, num_consumed) diff --git a/tests/kafkatest/sanity_checks/test_kafka_version.py b/tests/kafkatest/sanity_checks/test_kafka_version.py new file mode 100644 index 000000000..f5f5d5fe8 --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_kafka_version.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService, config_property +from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK +from kafkatest.utils import is_version + + +class KafkaVersionTest(Test): + """Sanity checks on kafka versioning.""" + def __init__(self, test_context): + super(KafkaVersionTest, self).__init__(test_context) + + self.topic = "topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + + def setUp(self): + self.zk.start() + + def test_0_8_2(self): + """Test kafka service node-versioning api - verify that we can bring up a single-node 0.8.2.X cluster.""" + self.kafka = KafkaService(self.test_context, num_nodes=1, zk=self.zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + node = self.kafka.nodes[0] + node.version = LATEST_0_8_2 + self.kafka.start() + + assert is_version(node, [LATEST_0_8_2]) + + def test_multi_version(self): + """Test kafka service node-versioning api - ensure we can bring up a 2-node cluster, one on version 0.8.2.X, + the other on trunk.""" + self.kafka = KafkaService(self.test_context, num_nodes=2, zk=self.zk, + topics={self.topic: {"partitions": 1, "replication-factor": 2}}) + self.kafka.nodes[1].version = LATEST_0_8_2 + self.kafka.nodes[1].config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X" + self.kafka.start() + + assert is_version(self.kafka.nodes[0], [TRUNK.vstring]) + assert is_version(self.kafka.nodes[1], [LATEST_0_8_2]) diff --git a/tests/kafkatest/sanity_checks/test_verifiable_producer.py b/tests/kafkatest/sanity_checks/test_verifiable_producer.py new file mode 100644 index 000000000..4155279ca --- /dev/null +++ b/tests/kafkatest/sanity_checks/test_verifiable_producer.py @@ -0,0 +1,70 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until +from ducktape.mark import parametrize + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK, KafkaVersion +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.utils import is_version + + +class TestVerifiableProducer(Test): + """Sanity checks on verifiable producer service class.""" + def __init__(self, test_context): + super(TestVerifiableProducer, self).__init__(test_context) + + self.topic = "topic" + self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(test_context, num_nodes=1, zk=self.zk, + topics={self.topic: {"partitions": 1, "replication-factor": 1}}) + + self.num_messages = 100 + # This will produce to source kafka cluster + self.producer = VerifiableProducer(test_context, num_nodes=1, kafka=self.kafka, topic=self.topic, + max_messages=self.num_messages, throughput=1000) + + def setUp(self): + self.zk.start() + self.kafka.start() + + @parametrize(producer_version=str(LATEST_0_8_2)) + @parametrize(producer_version=str(TRUNK)) + def test_simple_run(self, producer_version=TRUNK): + """ + Test that we can start VerifiableProducer on trunk or against the 0.8.2 jar, and + verify that we can produce a small number of messages. + """ + node = self.producer.nodes[0] + node.version = KafkaVersion(producer_version) + self.producer.start() + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5, + err_msg="Producer failed to start in a reasonable amount of time.") + + # using version.vstring (distutils.version.LooseVersion) is a tricky way of ensuring + # that this check works with TRUNK + # When running VerifiableProducer 0.8.X, both trunk version and 0.8.X should show up because of the way + # verifiable producer pulls in some trunk directories into its classpath + assert is_version(node, [node.version.vstring, TRUNK.vstring]) + + self.producer.wait() + num_produced = self.producer.num_acked + assert num_produced == self.num_messages, "num_produced: %d, num_messages: %d" % (num_produced, self.num_messages) + + diff --git a/tests/kafkatest/services/console_consumer.py b/tests/kafkatest/services/console_consumer.py index 96fe7775e..07343e84a 100644 --- a/tests/kafkatest/services/console_consumer.py +++ b/tests/kafkatest/services/console_consumer.py @@ -13,15 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.services.background_thread import BackgroundThreadService from ducktape.utils.util import wait_until -from kafkatest.services.performance.jmx_mixin import JmxMixin -from kafkatest.services.performance import PerformanceService +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir +from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2 +from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.utils.security_config import SecurityConfig +import itertools import os import subprocess -import itertools + def is_int(msg): """Default method used to check whether text pulled from console consumer is a message. @@ -30,7 +33,7 @@ def is_int(msg): """ try: return int(msg) - except: + except ValueError: return None """ @@ -74,7 +77,7 @@ Option Description """ -class ConsoleConsumer(JmxMixin, PerformanceService): +class ConsoleConsumer(JmxMixin, BackgroundThreadService): # Root directory for persistent output PERSISTENT_ROOT = "/mnt/console_consumer" STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "console_consumer.stdout") @@ -94,10 +97,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService): "consumer_log": { "path": LOG_FILE, "collect_default": True} - } + } - def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, new_consumer=None, message_validator=None, - from_beginning=True, consumer_timeout_ms=None, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, new_consumer=False, message_validator=None, + from_beginning=True, consumer_timeout_ms=None, version=TRUNK, client_id="console-consumer", jmx_object_names=None, jmx_attributes=[]): """ Args: context: standard context @@ -114,7 +117,7 @@ class ConsoleConsumer(JmxMixin, PerformanceService): in a topic. """ JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) - PerformanceService.__init__(self, context, num_nodes) + BackgroundThreadService.__init__(self, context, num_nodes) self.kafka = kafka self.new_consumer = new_consumer self.args = { @@ -122,47 +125,70 @@ class ConsoleConsumer(JmxMixin, PerformanceService): } self.consumer_timeout_ms = consumer_timeout_ms + for node in self.nodes: + node.version = version self.from_beginning = from_beginning self.message_validator = message_validator self.messages_consumed = {idx: [] for idx in range(1, num_nodes + 1)} self.client_id = client_id + self.security_protocol = security_protocol - # Process client configuration - self.prop_file = self.render('console_consumer.properties', consumer_timeout_ms=self.consumer_timeout_ms, client_id=self.client_id) - - # Add security properties to the config. If security protocol is not specified, - # use the default in the template properties. - self.security_config = SecurityConfig(security_protocol, self.prop_file) - self.security_protocol = self.security_config.security_protocol + # Validate a few configs if self.new_consumer is None: self.new_consumer = self.security_protocol == SecurityConfig.SSL if self.security_protocol == SecurityConfig.SSL and not self.new_consumer: raise Exception("SSL protocol is supported only with the new consumer") - self.prop_file += str(self.security_config) - @property - def start_cmd(self): + def prop_file(self, node): + """Return a string which can be used to create a configuration file appropriate for the given node.""" + # Process client configuration + prop_file = self.render('console_consumer.properties') + if hasattr(node, "version") and node.version <= LATEST_0_8_2: + # in 0.8.2.X and earlier, console consumer does not have --timeout-ms option + # instead, we have to pass it through the config file + prop_file += "\nconsumer.timeout.ms=%s\n" % str(self.consumer_timeout_ms) + + # Add security properties to the config. If security protocol is not specified, + # use the default in the template properties. + self.security_config = SecurityConfig(self.security_protocol, prop_file) + self.security_protocol = self.security_config.security_protocol + + prop_file += str(self.security_config) + return prop_file + + def start_cmd(self, node): + """Return the start command appropriate for the given node.""" args = self.args.copy() args['zk_connect'] = self.kafka.zk.connect_setting() args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['stderr'] = ConsoleConsumer.STDERR_CAPTURE + args['log_dir'] = ConsoleConsumer.LOG_DIR + args['log4j_config'] = ConsoleConsumer.LOG4J_CONFIG args['config_file'] = ConsoleConsumer.CONFIG_FILE + args['stdout'] = ConsoleConsumer.STDOUT_CAPTURE args['jmx_port'] = self.jmx_port + args['kafka_dir'] = kafka_dir(node) + args['broker_list'] = self.kafka.bootstrap_servers() - cmd = "export LOG_DIR=%s;" % ConsoleConsumer.LOG_DIR - cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsoleConsumer.LOG4J_CONFIG - cmd += " JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-console-consumer.sh --topic %(topic)s" \ - " --consumer.config %(config_file)s" % args + cmd = "export JMX_PORT=%(jmx_port)s; " \ + "export LOG_DIR=%(log_dir)s; " \ + "export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j_config)s\"; " \ + "/opt/%(kafka_dir)s/bin/kafka-console-consumer.sh " \ + "--topic %(topic)s --consumer.config %(config_file)s" % args if self.new_consumer: - cmd += " --new-consumer --bootstrap-server %s" % self.kafka.bootstrap_servers() + cmd += " --new-consumer --bootstrap-server %(broker_list)s" % args else: cmd += " --zookeeper %(zk_connect)s" % args if self.from_beginning: cmd += " --from-beginning" + if self.consumer_timeout_ms is not None: - cmd += " --timeout-ms %s" % self.consumer_timeout_ms + # version 0.8.X and below do not support --timeout-ms option + # This will be added in the properties file instead + if node.version > LATEST_0_8_2: + cmd += " --timeout-ms %s" % self.consumer_timeout_ms cmd += " 2>> %(stderr)s | tee -a %(stdout)s &" % args return cmd @@ -183,8 +209,10 @@ class ConsoleConsumer(JmxMixin, PerformanceService): # Create and upload config file self.logger.info("console_consumer.properties:") - self.logger.info(self.prop_file) - node.account.create_file(ConsoleConsumer.CONFIG_FILE, self.prop_file) + + prop_file = self.prop_file(node) + self.logger.info(prop_file) + node.account.create_file(ConsoleConsumer.CONFIG_FILE, prop_file) self.security_config.setup_node(node) # Create and upload log properties @@ -192,23 +220,26 @@ class ConsoleConsumer(JmxMixin, PerformanceService): node.account.create_file(ConsoleConsumer.LOG4J_CONFIG, log_config) # Run and capture output - cmd = self.start_cmd + cmd = self.start_cmd(node) self.logger.debug("Console consumer %d command: %s", idx, cmd) consumer_output = node.account.ssh_capture(cmd, allow_fail=False) - first_line = consumer_output.next() - self.start_jmx_tool(idx, node) - for line in itertools.chain([first_line], consumer_output): - msg = line.strip() - if self.message_validator is not None: - msg = self.message_validator(msg) - if msg is not None: - self.messages_consumed[idx].append(msg) + first_line = next(consumer_output, None) - self.read_jmx_output(idx, node) + if first_line is not None: + self.start_jmx_tool(idx, node) + + for line in itertools.chain([first_line], consumer_output): + msg = line.strip() + if self.message_validator is not None: + msg = self.message_validator(msg) + if msg is not None: + self.messages_consumed[idx].append(msg) + + self.read_jmx_output(idx, node) def start_node(self, node): - PerformanceService.start_node(self, node) + BackgroundThreadService.start_node(self, node) def stop_node(self, node): node.account.kill_process("console_consumer", allow_fail=True) @@ -220,6 +251,6 @@ class ConsoleConsumer(JmxMixin, PerformanceService): self.logger.warn("%s %s was still alive at cleanup time. Killing forcefully..." % (self.__class__.__name__, node.account)) JmxMixin.clean_node(self, node) - PerformanceService.clean_node(self, node) + node.account.kill_process("java", clean_shutdown=False, allow_fail=True) node.account.ssh("rm -rf %s" % ConsoleConsumer.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) diff --git a/tests/kafkatest/services/copycat.py b/tests/kafkatest/services/copycat.py index 45ef330b9..831a93258 100644 --- a/tests/kafkatest/services/copycat.py +++ b/tests/kafkatest/services/copycat.py @@ -15,8 +15,9 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until -import subprocess, signal +from kafkatest.services.kafka.directory import kafka_dir +import signal class CopycatServiceBase(Service): """Base class for Copycat services providing some common settings and functionality""" @@ -99,7 +100,7 @@ class CopycatStandaloneService(CopycatServiceBase): self.logger.info("Starting Copycat standalone process") with node.account.monitor_log("/mnt/copycat.log") as monitor: - node.account.ssh("/opt/kafka/bin/copycat-standalone.sh /mnt/copycat.properties " + + node.account.ssh("/opt/%s/bin/copycat-standalone.sh /mnt/copycat.properties " % kafka_dir(node) + " ".join(remote_connector_configs) + " 1>> /mnt/copycat.log 2>> /mnt/copycat.log & echo $! > /mnt/copycat.pid") monitor.wait_until('Copycat started', timeout_sec=10, err_msg="Never saw message indicating Copycat finished startup") @@ -108,7 +109,6 @@ class CopycatStandaloneService(CopycatServiceBase): raise RuntimeError("No process ids recorded") - class CopycatDistributedService(CopycatServiceBase): """Runs Copycat in distributed mode.""" @@ -128,7 +128,7 @@ class CopycatDistributedService(CopycatServiceBase): self.logger.info("Starting Copycat distributed process") with node.account.monitor_log("/mnt/copycat.log") as monitor: - cmd = "/opt/kafka/bin/copycat-distributed.sh /mnt/copycat.properties " + cmd = "/opt/%s/bin/copycat-distributed.sh /mnt/copycat.properties " % kafka_dir(node) # Only submit connectors on the first node so they don't get submitted multiple times. Also only submit them # the first time the node is started so if self.first_start and node == self.nodes[0]: @@ -140,4 +140,3 @@ class CopycatDistributedService(CopycatServiceBase): if len(self.pids(node)) == 0: raise RuntimeError("No process ids recorded") - self.first_start = False diff --git a/tests/kafkatest/services/kafka/__init__.py b/tests/kafkatest/services/kafka/__init__.py new file mode 100644 index 000000000..6408b5904 --- /dev/null +++ b/tests/kafkatest/services/kafka/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafka import KafkaService diff --git a/tests/kafkatest/services/kafka/config.py b/tests/kafkatest/services/kafka/config.py new file mode 100644 index 000000000..0accf2084 --- /dev/null +++ b/tests/kafkatest/services/kafka/config.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import config_property + + +class KafkaConfig(dict): + """A dictionary-like container class which allows for definition of overridable default values, + which is also capable of "rendering" itself as a useable server.properties file. + """ + + DEFAULTS = { + config_property.PORT: 9092, + config_property.SOCKET_RECEIVE_BUFFER_BYTES: 65536, + config_property.LOG_DIRS: "/mnt/kafka-logs", + config_property.ZOOKEEPER_CONNECTION_TIMEOUT_MS: 2000 + } + + def __init__(self, **kwargs): + super(KafkaConfig, self).__init__(**kwargs) + + # Set defaults + for key, val in self.DEFAULTS.items(): + if not self.has_key(key): + self[key] = val + + def render(self): + """Render self as a series of lines key=val\n, and do so in a consistent order. """ + keys = [k for k in self.keys()] + keys.sort() + + s = "" + for k in keys: + s += "%s=%s\n" % (k, str(self[k])) + return s + + + + + + diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py new file mode 100644 index 000000000..cc685aad6 --- /dev/null +++ b/tests/kafkatest/services/kafka/config_property.py @@ -0,0 +1,177 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Define Kafka configuration property names here. +""" + +BROKER_ID = "broker.id" +PORT = "port" +ADVERTISED_HOSTNAME = "advertised.host.name" + +NUM_NETWORK_THREADS = "num.network.threads" +NUM_IO_THREADS = "num.io.threads" +SOCKET_SEND_BUFFER_BYTES = "socket.send.buffer.bytes" +SOCKET_RECEIVE_BUFFER_BYTES = "socket.receive.buffer.bytes" +SOCKET_REQUEST_MAX_BYTES = "socket.request.max.bytes" +LOG_DIRS = "log.dirs" +NUM_PARTITIONS = "num.partitions" +NUM_RECOVERY_THREADS_PER_DATA_DIR = "num.recovery.threads.per.data.dir" + +LOG_RETENTION_HOURS = "log.retention.hours" +LOG_SEGMENT_BYTES = "log.segment.bytes" +LOG_RETENTION_CHECK_INTERVAL_MS = "log.retention.check.interval.ms" +LOG_CLEANER_ENABLE = "log.cleaner.enable" + +AUTO_CREATE_TOPICS_ENABLE = "auto.create.topics.enable" + +ZOOKEEPER_CONNECT = "zookeeper.connect" +ZOOKEEPER_CONNECTION_TIMEOUT_MS = "zookeeper.connection.timeout.ms" +INTER_BROKER_PROTOCOL_VERSION = "inter.broker.protocol.version" + + +""" +From KafkaConfig.scala + + /** ********* General Configuration ***********/ + val MaxReservedBrokerIdProp = "reserved.broker.max.id" + val MessageMaxBytesProp = "message.max.bytes" + val NumIoThreadsProp = "num.io.threads" + val BackgroundThreadsProp = "background.threads" + val QueuedMaxRequestsProp = "queued.max.requests" + /** ********* Socket Server Configuration ***********/ + val PortProp = "port" + val HostNameProp = "host.name" + val ListenersProp = "listeners" + val AdvertisedPortProp = "advertised.port" + val AdvertisedListenersProp = "advertised.listeners" + val SocketSendBufferBytesProp = "socket.send.buffer.bytes" + val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" + val SocketRequestMaxBytesProp = "socket.request.max.bytes" + val MaxConnectionsPerIpProp = "max.connections.per.ip" + val MaxConnectionsPerIpOverridesProp = "max.connections.per.ip.overrides" + val ConnectionsMaxIdleMsProp = "connections.max.idle.ms" + /** ********* Log Configuration ***********/ + val NumPartitionsProp = "num.partitions" + val LogDirsProp = "log.dirs" + val LogDirProp = "log.dir" + val LogSegmentBytesProp = "log.segment.bytes" + + val LogRollTimeMillisProp = "log.roll.ms" + val LogRollTimeHoursProp = "log.roll.hours" + + val LogRollTimeJitterMillisProp = "log.roll.jitter.ms" + val LogRollTimeJitterHoursProp = "log.roll.jitter.hours" + + val LogRetentionTimeMillisProp = "log.retention.ms" + val LogRetentionTimeMinutesProp = "log.retention.minutes" + val LogRetentionTimeHoursProp = "log.retention.hours" + + val LogRetentionBytesProp = "log.retention.bytes" + val LogCleanupIntervalMsProp = "log.retention.check.interval.ms" + val LogCleanupPolicyProp = "log.cleanup.policy" + val LogCleanerThreadsProp = "log.cleaner.threads" + val LogCleanerIoMaxBytesPerSecondProp = "log.cleaner.io.max.bytes.per.second" + val LogCleanerDedupeBufferSizeProp = "log.cleaner.dedupe.buffer.size" + val LogCleanerIoBufferSizeProp = "log.cleaner.io.buffer.size" + val LogCleanerDedupeBufferLoadFactorProp = "log.cleaner.io.buffer.load.factor" + val LogCleanerBackoffMsProp = "log.cleaner.backoff.ms" + val LogCleanerMinCleanRatioProp = "log.cleaner.min.cleanable.ratio" + val LogCleanerEnableProp = "log.cleaner.enable" + val LogCleanerDeleteRetentionMsProp = "log.cleaner.delete.retention.ms" + val LogIndexSizeMaxBytesProp = "log.index.size.max.bytes" + val LogIndexIntervalBytesProp = "log.index.interval.bytes" + val LogFlushIntervalMessagesProp = "log.flush.interval.messages" + val LogDeleteDelayMsProp = "log.segment.delete.delay.ms" + val LogFlushSchedulerIntervalMsProp = "log.flush.scheduler.interval.ms" + val LogFlushIntervalMsProp = "log.flush.interval.ms" + val LogFlushOffsetCheckpointIntervalMsProp = "log.flush.offset.checkpoint.interval.ms" + val LogPreAllocateProp = "log.preallocate" + val NumRecoveryThreadsPerDataDirProp = "num.recovery.threads.per.data.dir" + val MinInSyncReplicasProp = "min.insync.replicas" + /** ********* Replication configuration ***********/ + val ControllerSocketTimeoutMsProp = "controller.socket.timeout.ms" + val DefaultReplicationFactorProp = "default.replication.factor" + val ReplicaLagTimeMaxMsProp = "replica.lag.time.max.ms" + val ReplicaSocketTimeoutMsProp = "replica.socket.timeout.ms" + val ReplicaSocketReceiveBufferBytesProp = "replica.socket.receive.buffer.bytes" + val ReplicaFetchMaxBytesProp = "replica.fetch.max.bytes" + val ReplicaFetchWaitMaxMsProp = "replica.fetch.wait.max.ms" + val ReplicaFetchMinBytesProp = "replica.fetch.min.bytes" + val ReplicaFetchBackoffMsProp = "replica.fetch.backoff.ms" + val NumReplicaFetchersProp = "num.replica.fetchers" + val ReplicaHighWatermarkCheckpointIntervalMsProp = "replica.high.watermark.checkpoint.interval.ms" + val FetchPurgatoryPurgeIntervalRequestsProp = "fetch.purgatory.purge.interval.requests" + val ProducerPurgatoryPurgeIntervalRequestsProp = "producer.purgatory.purge.interval.requests" + val AutoLeaderRebalanceEnableProp = "auto.leader.rebalance.enable" + val LeaderImbalancePerBrokerPercentageProp = "leader.imbalance.per.broker.percentage" + val LeaderImbalanceCheckIntervalSecondsProp = "leader.imbalance.check.interval.seconds" + val UncleanLeaderElectionEnableProp = "unclean.leader.election.enable" + val InterBrokerSecurityProtocolProp = "security.inter.broker.protocol" + val InterBrokerProtocolVersionProp = "inter.broker.protocol.version" + /** ********* Controlled shutdown configuration ***********/ + val ControlledShutdownMaxRetriesProp = "controlled.shutdown.max.retries" + val ControlledShutdownRetryBackoffMsProp = "controlled.shutdown.retry.backoff.ms" + val ControlledShutdownEnableProp = "controlled.shutdown.enable" + /** ********* Consumer coordinator configuration ***********/ + val ConsumerMinSessionTimeoutMsProp = "consumer.min.session.timeout.ms" + val ConsumerMaxSessionTimeoutMsProp = "consumer.max.session.timeout.ms" + /** ********* Offset management configuration ***********/ + val OffsetMetadataMaxSizeProp = "offset.metadata.max.bytes" + val OffsetsLoadBufferSizeProp = "offsets.load.buffer.size" + val OffsetsTopicReplicationFactorProp = "offsets.topic.replication.factor" + val OffsetsTopicPartitionsProp = "offsets.topic.num.partitions" + val OffsetsTopicSegmentBytesProp = "offsets.topic.segment.bytes" + val OffsetsTopicCompressionCodecProp = "offsets.topic.compression.codec" + val OffsetsRetentionMinutesProp = "offsets.retention.minutes" + val OffsetsRetentionCheckIntervalMsProp = "offsets.retention.check.interval.ms" + val OffsetCommitTimeoutMsProp = "offsets.commit.timeout.ms" + val OffsetCommitRequiredAcksProp = "offsets.commit.required.acks" + /** ********* Quota Configuration ***********/ + val ProducerQuotaBytesPerSecondDefaultProp = "quota.producer.default" + val ConsumerQuotaBytesPerSecondDefaultProp = "quota.consumer.default" + val ProducerQuotaBytesPerSecondOverridesProp = "quota.producer.bytes.per.second.overrides" + val ConsumerQuotaBytesPerSecondOverridesProp = "quota.consumer.bytes.per.second.overrides" + val NumQuotaSamplesProp = "quota.window.num" + val QuotaWindowSizeSecondsProp = "quota.window.size.seconds" + + val DeleteTopicEnableProp = "delete.topic.enable" + val CompressionTypeProp = "compression.type" + + /** ********* Kafka Metrics Configuration ***********/ + val MetricSampleWindowMsProp = CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG + val MetricNumSamplesProp: String = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG + val MetricReporterClassesProp: String = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG + + /** ********* SSL Configuration ****************/ + val PrincipalBuilderClassProp = SSLConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG + val SSLProtocolProp = SSLConfigs.SSL_PROTOCOL_CONFIG + val SSLProviderProp = SSLConfigs.SSL_PROVIDER_CONFIG + val SSLCipherSuitesProp = SSLConfigs.SSL_CIPHER_SUITES_CONFIG + val SSLEnabledProtocolsProp = SSLConfigs.SSL_ENABLED_PROTOCOLS_CONFIG + val SSLKeystoreTypeProp = SSLConfigs.SSL_KEYSTORE_TYPE_CONFIG + val SSLKeystoreLocationProp = SSLConfigs.SSL_KEYSTORE_LOCATION_CONFIG + val SSLKeystorePasswordProp = SSLConfigs.SSL_KEYSTORE_PASSWORD_CONFIG + val SSLKeyPasswordProp = SSLConfigs.SSL_KEY_PASSWORD_CONFIG + val SSLTruststoreTypeProp = SSLConfigs.SSL_TRUSTSTORE_TYPE_CONFIG + val SSLTruststoreLocationProp = SSLConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG + val SSLTruststorePasswordProp = SSLConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG + val SSLKeyManagerAlgorithmProp = SSLConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG + val SSLTrustManagerAlgorithmProp = SSLConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG + val SSLEndpointIdentificationAlgorithmProp = SSLConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG + val SSLClientAuthProp = SSLConfigs.SSL_CLIENT_AUTH_CONFIG +""" + + diff --git a/tests/kafkatest/services/kafka/directory.py b/tests/kafkatest/services/kafka/directory.py new file mode 100644 index 000000000..59af1fcad --- /dev/null +++ b/tests/kafkatest/services/kafka/directory.py @@ -0,0 +1,32 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# "trunk" installation of kafka +KAFKA_TRUNK = "kafka-trunk" + + +def kafka_dir(node=None): + """Return name of kafka directory for the given node. + + This provides a convenient way to support different versions of kafka or kafka tools running + on different nodes. + """ + if node is None: + return KAFKA_TRUNK + + if not hasattr(node, "version"): + return KAFKA_TRUNK + + return "kafka-" + str(node.version) \ No newline at end of file diff --git a/tests/kafkatest/services/kafka.py b/tests/kafkatest/services/kafka/kafka.py similarity index 58% rename from tests/kafkatest/services/kafka.py rename to tests/kafkatest/services/kafka/kafka.py index 5c4b22f75..5e4a1e1fa 100644 --- a/tests/kafkatest/services/kafka.py +++ b/tests/kafkatest/services/kafka/kafka.py @@ -15,11 +15,18 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until -from kafkatest.services.performance.jmx_mixin import JmxMixin + +from config import KafkaConfig +from kafkatest.services.kafka import config_property +from kafkatest.services.kafka.version import TRUNK +from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK + +from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.utils.security_config import SecurityConfig import json import re import signal +import subprocess import time @@ -29,13 +36,16 @@ class KafkaService(JmxMixin, Service): "kafka_log": { "path": "/mnt/kafka.log", "collect_default": True}, + "kafka_operational_logs": { + "path": "/mnt/kafka-operational-logs", + "collect_default": True}, "kafka_data": { - "path": "/mnt/kafka-logs", + "path": "/mnt/kafka-data-logs", "collect_default": False} } def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT, - topics=None, quota_config=None, jmx_object_names=None, jmx_attributes=[]): + topics=None, version=TRUNK, quota_config=None, jmx_object_names=None, jmx_attributes=[]): """ :type context :type zk: ZookeeperService @@ -43,16 +53,28 @@ class KafkaService(JmxMixin, Service): """ Service.__init__(self, context, num_nodes) JmxMixin.__init__(self, num_nodes, jmx_object_names, jmx_attributes) + self.zk = zk - if security_protocol == SecurityConfig.SSL or interbroker_security_protocol == SecurityConfig.SSL: - self.security_config = SecurityConfig(SecurityConfig.SSL) - else: - self.security_config = SecurityConfig(SecurityConfig.PLAINTEXT) + self.quota_config = quota_config + self.security_protocol = security_protocol self.interbroker_security_protocol = interbroker_security_protocol - self.port = 9092 if security_protocol == SecurityConfig.PLAINTEXT else 9093 self.topics = topics - self.quota_config = quota_config + + for node in self.nodes: + node.version = version + node.config = KafkaConfig(**{config_property.BROKER_ID: self.idx(node)}) + + @property + def security_config(self): + if self.security_protocol == SecurityConfig.SSL or self.interbroker_security_protocol == SecurityConfig.SSL: + return SecurityConfig(SecurityConfig.SSL) + else: + return SecurityConfig(SecurityConfig.PLAINTEXT) + + @property + def port(self): + return 9092 if self.security_protocol == SecurityConfig.PLAINTEXT else 9093 def start(self): Service.start(self) @@ -66,20 +88,37 @@ class KafkaService(JmxMixin, Service): topic_cfg["topic"] = topic self.create_topic(topic_cfg) + def prop_file(self, node): + cfg = KafkaConfig(**node.config) + cfg[config_property.ADVERTISED_HOSTNAME] = node.account.hostname + cfg[config_property.ZOOKEEPER_CONNECT] = self.zk.connect_setting() + + # TODO - clean up duplicate configuration logic + prop_file = cfg.render() + prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node), + security_config=self.security_config, port=self.port) + return prop_file + + def start_cmd(self, node): + cmd = "export JMX_PORT=%d; " % self.jmx_port + cmd += "export LOG_DIR=/mnt/kafka-operational-logs/; " + cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log &" + return cmd + def start_node(self, node): - props_file = self.render('kafka.properties', node=node, broker_id=self.idx(node), - port = self.port, security_protocol = self.security_protocol, quota_config=self.quota_config, - interbroker_security_protocol=self.interbroker_security_protocol) + prop_file = self.prop_file(node) self.logger.info("kafka.properties:") - self.logger.info(props_file) - node.account.create_file("/mnt/kafka.properties", props_file) + self.logger.info(prop_file) + node.account.create_file("/mnt/kafka.properties", prop_file) + self.security_config.setup_node(node) - cmd = "JMX_PORT=%d /opt/kafka/bin/kafka-server-start.sh /mnt/kafka.properties 1>> /mnt/kafka.log 2>> /mnt/kafka.log & echo $! > /mnt/kafka.pid" % self.jmx_port + cmd = self.start_cmd(node) self.logger.debug("Attempting to start KafkaService on %s with command: %s" % (str(node.account), cmd)) with node.account.monitor_log("/mnt/kafka.log") as monitor: node.account.ssh(cmd) monitor.wait_until("Kafka Server.*started", timeout_sec=30, err_msg="Kafka server didn't finish startup") + self.start_jmx_tool(self.idx(node), node) if len(self.pids(node)) == 0: raise Exception("No process ids recorded on node %s" % str(node)) @@ -87,8 +126,11 @@ class KafkaService(JmxMixin, Service): def pids(self, node): """Return process ids associated with running processes on the given node.""" try: - return [pid for pid in node.account.ssh_capture("cat /mnt/kafka.pid", callback=int)] - except: + cmd = "ps ax | grep -i kafka | grep java | grep -v grep | awk '{print $1}'" + + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: return [] def signal_node(self, node, sig=signal.SIGTERM): @@ -106,21 +148,27 @@ class KafkaService(JmxMixin, Service): for pid in pids: node.account.signal(pid, sig, allow_fail=False) - - node.account.ssh("rm -f /mnt/kafka.pid", allow_fail=False) + wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=20, err_msg="Kafka node failed to stop") def clean_node(self, node): JmxMixin.clean_node(self, node) - node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) - node.account.ssh("rm -rf /mnt/kafka-logs /mnt/kafka.properties /mnt/kafka.log /mnt/kafka.pid", allow_fail=False) self.security_config.clean_node(node) + node.account.kill_process("kafka", clean_shutdown=False, allow_fail=True) + node.account.ssh("rm -rf /mnt/*", allow_fail=False) - def create_topic(self, topic_cfg): - node = self.nodes[0] # any node is fine here + def create_topic(self, topic_cfg, node=None): + """Run the admin tool create topic command. + Specifying node is optional, and may be done if for different kafka nodes have different versions, + and we care where command gets run. + + If the node is not specified, run the command from self.nodes[0] + """ + if node is None: + node = self.nodes[0] self.logger.info("Creating topic %s with settings %s", topic_cfg["topic"], topic_cfg) - cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %(zk_connect)s --create "\ - "--topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { + cmd = "/opt/%s/bin/kafka-topics.sh " % kafka_dir(node) + cmd += "--zookeeper %(zk_connect)s --create --topic %(topic)s --partitions %(partitions)d --replication-factor %(replication)d" % { 'zk_connect': self.zk.connect_setting(), 'topic': topic_cfg.get("topic"), 'partitions': topic_cfg.get('partitions', 1), @@ -139,20 +187,23 @@ class KafkaService(JmxMixin, Service): for line in self.describe_topic(topic_cfg["topic"]).split("\n"): self.logger.info(line) - def describe_topic(self, topic): - node = self.nodes[0] - cmd = "/opt/kafka/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \ - (self.zk.connect_setting(), topic) + def describe_topic(self, topic, node=None): + if node is None: + node = self.nodes[0] + cmd = "/opt/%s/bin/kafka-topics.sh --zookeeper %s --topic %s --describe" % \ + (kafka_dir(node), self.zk.connect_setting(), topic) output = "" for line in node.account.ssh_capture(cmd): output += line return output - def verify_reassign_partitions(self, reassignment): + def verify_reassign_partitions(self, reassignment, node=None): """Run the reassign partitions admin tool in "verify" mode """ - node = self.nodes[0] - json_file = "/tmp/" + str(time.time()) + "_reassign.json" + if node is None: + node = self.nodes[0] + + json_file = "/tmp/%s_reassign.json" % str(time.time()) # reassignment to json json_str = json.dumps(reassignment) @@ -160,12 +211,11 @@ class KafkaService(JmxMixin, Service): # create command cmd = "echo %s > %s && " % (json_str, json_file) - cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ - "--zookeeper %(zk_connect)s "\ - "--reassignment-json-file %(reassignment_file)s "\ - "--verify" % {'zk_connect': self.zk.connect_setting(), - 'reassignment_file': json_file} - cmd += " && sleep 1 && rm -f %s" % json_file + cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node) + cmd += "--zookeeper %s " % self.zk.connect_setting() + cmd += "--reassignment-json-file %s " % json_file + cmd += "--verify " + cmd += "&& sleep 1 && rm -f %s" % json_file # send command self.logger.info("Verifying parition reassignment...") @@ -181,11 +231,12 @@ class KafkaService(JmxMixin, Service): return True - def execute_reassign_partitions(self, reassignment): + def execute_reassign_partitions(self, reassignment, node=None): """Run the reassign partitions admin tool in "verify" mode """ - node = self.nodes[0] - json_file = "/tmp/" + str(time.time()) + "_reassign.json" + if node is None: + node = self.nodes[0] + json_file = "/tmp/%s_reassign.json" % str(time.time()) # reassignment to json json_str = json.dumps(reassignment) @@ -193,11 +244,10 @@ class KafkaService(JmxMixin, Service): # create command cmd = "echo %s > %s && " % (json_str, json_file) - cmd += "/opt/kafka/bin/kafka-reassign-partitions.sh "\ - "--zookeeper %(zk_connect)s "\ - "--reassignment-json-file %(reassignment_file)s "\ - "--execute" % {'zk_connect': self.zk.connect_setting(), - 'reassignment_file': json_file} + cmd += "/opt/%s/bin/kafka-reassign-partitions.sh " % kafka_dir(node) + cmd += "--zookeeper %s " % self.zk.connect_setting() + cmd += "--reassignment-json-file %s " % json_file + cmd += "--execute" cmd += " && sleep 1 && rm -f %s" % json_file # send command @@ -210,28 +260,29 @@ class KafkaService(JmxMixin, Service): self.logger.debug("Verify partition reassignment:") self.logger.debug(output) - def restart_node(self, node, wait_sec=0, clean_shutdown=True): - """Restart the given node, waiting wait_sec in between stopping and starting up again.""" + def restart_node(self, node, clean_shutdown=True): + """Restart the given node.""" self.stop_node(node, clean_shutdown) - time.sleep(wait_sec) self.start_node(node) def leader(self, topic, partition=0): """ Get the leader replica for the given topic and partition. """ - cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " \ - % self.zk.connect_setting() + kafka_dir = KAFKA_TRUNK + cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.ZooKeeperMainWrapper -server %s " %\ + (kafka_dir, self.zk.connect_setting()) cmd += "get /brokers/topics/%s/partitions/%d/state" % (topic, partition) self.logger.debug(cmd) - node = self.nodes[0] + node = self.zk.nodes[0] self.logger.debug("Querying zookeeper to find leader replica for topic %s: \n%s" % (cmd, topic)) partition_state = None for line in node.account.ssh_capture(cmd): - match = re.match("^({.+})$", line) - if match is not None: - partition_state = match.groups()[0] - break + # loop through all lines in the output, but only hold on to the first match + if partition_state is None: + match = re.match("^({.+})$", line) + if match is not None: + partition_state = match.groups()[0] if partition_state is None: raise Exception("Error finding partition state for topic %s and partition %d." % (topic, partition)) @@ -244,10 +295,9 @@ class KafkaService(JmxMixin, Service): return self.get_node(leader_idx) def bootstrap_servers(self): - """Get the broker list to connect to Kafka using the specified security protocol - """ - return ','.join([node.account.hostname + ":" + `self.port` for node in self.nodes]) + """Return comma-delimited list of brokers in this cluster formatted as HOSTNAME1:PORT1,HOSTNAME:PORT2,... + using the port for the configured security protocol. - def read_jmx_output_all_nodes(self): - for node in self.nodes: - self.read_jmx_output(self.idx(node), node) \ No newline at end of file + This is the format expected by many config files. + """ + return ','.join([node.account.hostname + ":" + str(self.port) for node in self.nodes]) diff --git a/tests/kafkatest/services/templates/kafka.properties b/tests/kafkatest/services/kafka/templates/kafka.properties similarity index 90% rename from tests/kafkatest/services/templates/kafka.properties rename to tests/kafkatest/services/kafka/templates/kafka.properties index a7f66043d..4db1120b0 100644 --- a/tests/kafkatest/services/templates/kafka.properties +++ b/tests/kafkatest/services/kafka/templates/kafka.properties @@ -15,11 +15,8 @@ # see kafka.server.KafkaConfig for additional details and defaults -broker.id={{ broker_id }} -port=9092 -#host.name=localhost advertised.host.name={{ node.account.hostname }} -#advertised.port= + {% if security_protocol == interbroker_security_protocol %} listeners={{ security_protocol }}://:{{ port }} advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ port }} @@ -27,26 +24,20 @@ advertised.listeners={{ security_protocol }}://{{ node.account.hostname }}:{{ po listeners=PLAINTEXT://:9092,SSL://:9093 advertised.listeners=PLAINTEXT://{{ node.account.hostname }}:9092,SSL://{{ node.account.hostname }}:9093 {% endif %} + num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=65536 socket.request.max.bytes=104857600 -log.dirs=/mnt/kafka-logs num.partitions=1 num.recovery.threads.per.data.dir=1 -#log.flush.interval.messages=10000 -#log.flush.interval.ms=1000 log.retention.hours=168 -#log.retention.bytes=1073741824 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 log.cleaner.enable=false -zookeeper.connect={{ zk.connect_setting() }} -zookeeper.connection.timeout.ms=2000 - {% if quota_config.quota_producer_default is defined and quota_config.quota_producer_default is not none %} quota.producer.default={{ quota_config.quota_producer_default }} {% endif %} @@ -64,6 +55,7 @@ quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_p {% endif %} security.inter.broker.protocol={{ interbroker_security_protocol }} + ssl.keystore.location=/mnt/ssl/test.keystore.jks ssl.keystore.password=test-ks-passwd ssl.key.password=test-key-passwd @@ -71,4 +63,3 @@ ssl.keystore.type=JKS ssl.truststore.location=/mnt/ssl/test.truststore.jks ssl.truststore.password=test-ts-passwd ssl.truststore.type=JKS - diff --git a/tests/kafkatest/services/kafka/version.py b/tests/kafkatest/services/kafka/version.py new file mode 100644 index 000000000..95f3448f8 --- /dev/null +++ b/tests/kafkatest/services/kafka/version.py @@ -0,0 +1,61 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest.utils import kafkatest_version + +from distutils.version import LooseVersion + + +class KafkaVersion(LooseVersion): + """Container for kafka versions which makes versions simple to compare. + + distutils.version.LooseVersion (and StrictVersion) has robust comparison and ordering logic. + + Example: + + v10 = KafkaVersion("0.10.0") + v9 = KafkaVersion("0.9.0.1") + assert v10 > v9 # assertion passes! + """ + def __init__(self, version_string): + self.is_trunk = (version_string.lower() == "trunk") + if self.is_trunk: + # Since "trunk" may actually be a branch that is not trunk, + # use kafkatest_version() for comparison purposes, + # and track whether we're in "trunk" with a flag + version_string = kafkatest_version() + + # Drop dev suffix if present + dev_suffix_index = version_string.find(".dev") + if dev_suffix_index >= 0: + version_string = version_string[:dev_suffix_index] + + # Don't use the form super.(...).__init__(...) because + # LooseVersion is an "old style" python class + LooseVersion.__init__(self, version_string) + + def __str__(self): + if self.is_trunk: + return "trunk" + else: + return LooseVersion.__str__(self) + + +TRUNK = KafkaVersion("trunk") + +# 0.8.2.X versions +V_0_8_2_1 = KafkaVersion("0.8.2.1") +V_0_8_2_2 = KafkaVersion("0.8.2.2") +LATEST_0_8_2 = V_0_8_2_2 + diff --git a/tests/kafkatest/services/kafka_log4j_appender.py b/tests/kafkatest/services/kafka_log4j_appender.py index ff6bb1879..46ad82ecb 100644 --- a/tests/kafkatest/services/kafka_log4j_appender.py +++ b/tests/kafkatest/services/kafka_log4j_appender.py @@ -15,6 +15,8 @@ from ducktape.services.background_thread import BackgroundThreadService +from kafkatest.services.kafka.directory import kafka_dir + class KafkaLog4jAppender(BackgroundThreadService): @@ -32,14 +34,15 @@ class KafkaLog4jAppender(BackgroundThreadService): self.max_messages = max_messages def _worker(self, idx, node): - cmd = self.start_cmd + cmd = self.start_cmd(node) self.logger.debug("VerifiableKafkaLog4jAppender %d command: %s" % (idx, cmd)) node.account.ssh(cmd) - @property - def start_cmd(self): - cmd = "/opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" \ - " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + def start_cmd(self, node): + cmd = "/opt/%s/bin/" % kafka_dir(node) + cmd += "kafka-run-class.sh org.apache.kafka.tools.VerifiableLog4jAppender" + cmd += " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) + if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) diff --git a/tests/kafkatest/services/mirror_maker.py b/tests/kafkatest/services/mirror_maker.py index afbed133e..a3b49282e 100644 --- a/tests/kafkatest/services/mirror_maker.py +++ b/tests/kafkatest/services/mirror_maker.py @@ -17,6 +17,8 @@ from ducktape.services.service import Service from ducktape.utils.util import wait_until +from kafkatest.services.kafka.directory import kafka_dir + import os import subprocess @@ -63,7 +65,6 @@ class MirrorMaker(Service): LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") PRODUCER_CONFIG = os.path.join(PERSISTENT_ROOT, "producer.properties") CONSUMER_CONFIG = os.path.join(PERSISTENT_ROOT, "consumer.properties") - KAFKA_HOME = "/opt/kafka/" logs = { "mirror_maker_log": { @@ -101,7 +102,7 @@ class MirrorMaker(Service): def start_cmd(self, node): cmd = "export LOG_DIR=%s;" % MirrorMaker.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % MirrorMaker.LOG4J_CONFIG - cmd += " %s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % MirrorMaker.KAFKA_HOME + cmd += " /opt/%s/bin/kafka-run-class.sh kafka.tools.MirrorMaker" % kafka_dir(node) cmd += " --consumer.config %s" % MirrorMaker.CONSUMER_CONFIG cmd += " --producer.config %s" % MirrorMaker.PRODUCER_CONFIG if isinstance(self.num_streams, int): diff --git a/tests/kafkatest/services/monitor/__init__.py b/tests/kafkatest/services/monitor/__init__.py new file mode 100644 index 000000000..ec2014340 --- /dev/null +++ b/tests/kafkatest/services/monitor/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/kafkatest/services/performance/jmx_mixin.py b/tests/kafkatest/services/monitor/jmx.py similarity index 83% rename from tests/kafkatest/services/performance/jmx_mixin.py rename to tests/kafkatest/services/monitor/jmx.py index 7e198398a..06c7dc82d 100644 --- a/tests/kafkatest/services/performance/jmx_mixin.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -13,8 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -class JmxMixin(object): +from kafkatest.services.kafka.directory import kafka_dir +class JmxMixin(object): + """This mixin helps existing service subclasses start JmxTool on their worker nodes and collect jmx stats. + + Note that this is not a service in its own right. + """ def __init__(self, num_nodes, jmx_object_names=None, jmx_attributes=[]): self.jmx_object_names = jmx_object_names self.jmx_attributes = jmx_attributes @@ -30,12 +35,11 @@ class JmxMixin(object): node.account.ssh("rm -rf /mnt/jmx_tool.log", allow_fail=False) def start_jmx_tool(self, idx, node): - if self.started[idx-1] == True or self.jmx_object_names == None: + if self.started[idx-1] or self.jmx_object_names is None: return - self.started[idx-1] = True - cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.JmxTool " \ - "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % self.jmx_port + cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.JmxTool " \ + "--reporting-interval 1000 --jmx-url service:jmx:rmi:///jndi/rmi://127.0.0.1:%d/jmxrmi" % (kafka_dir(node), self.jmx_port) for jmx_object_name in self.jmx_object_names: cmd += " --object-name %s" % jmx_object_name for jmx_attribute in self.jmx_attributes: @@ -46,11 +50,12 @@ class JmxMixin(object): jmx_output = node.account.ssh_capture(cmd, allow_fail=False) jmx_output.next() + self.started[idx-1] = True + def read_jmx_output(self, idx, node): if self.started[idx-1] == False: return - self.maximum_jmx_value = {} - self.average_jmx_value = {} + object_attribute_names = [] cmd = "cat /mnt/jmx_tool.log" @@ -64,7 +69,7 @@ class JmxMixin(object): self.jmx_stats[idx-1][time_sec] = {name : stats[i+1] for i, name in enumerate(object_attribute_names)} # do not calculate average and maximum of jmx stats until we have read output from all nodes - if any(len(time_to_stats)==0 for time_to_stats in self.jmx_stats): + if any(len(time_to_stats) == 0 for time_to_stats in self.jmx_stats): return start_time_sec = min([min(time_to_stats.keys()) for time_to_stats in self.jmx_stats]) @@ -72,10 +77,14 @@ class JmxMixin(object): for name in object_attribute_names: aggregates_per_time = [] - for time_sec in xrange(start_time_sec, end_time_sec+1): + for time_sec in xrange(start_time_sec, end_time_sec + 1): # assume that value is 0 if it is not read by jmx tool at the given time. This is appropriate for metrics such as bandwidth values_per_node = [time_to_stats.get(time_sec, {}).get(name, 0) for time_to_stats in self.jmx_stats] # assume that value is aggregated across nodes by sum. This is appropriate for metrics such as bandwidth aggregates_per_time.append(sum(values_per_node)) - self.average_jmx_value[name] = sum(aggregates_per_time)/len(aggregates_per_time) - self.maximum_jmx_value[name] = max(aggregates_per_time) \ No newline at end of file + self.average_jmx_value[name] = sum(aggregates_per_time) / len(aggregates_per_time) + self.maximum_jmx_value[name] = max(aggregates_per_time) + + def read_jmx_output_all_nodes(self): + for node in self.nodes: + self.read_jmx_output(self.idx(node), node) \ No newline at end of file diff --git a/tests/kafkatest/services/performance/consumer_performance.py b/tests/kafkatest/services/performance/consumer_performance.py index 053059b3b..e52220c9d 100644 --- a/tests/kafkatest/services/performance/consumer_performance.py +++ b/tests/kafkatest/services/performance/consumer_performance.py @@ -14,6 +14,7 @@ # limitations under the License. from kafkatest.services.performance import PerformanceService +from kafkatest.services.kafka.directory import kafka_dir from kafkatest.utils.security_config import SecurityConfig import os @@ -120,11 +121,10 @@ class ConsumerPerformanceService(PerformanceService): return args - @property - def start_cmd(self): + def start_cmd(self, node): cmd = "export LOG_DIR=%s;" % ConsumerPerformanceService.LOG_DIR cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\";" % ConsumerPerformanceService.LOG4J_CONFIG - cmd += " /opt/kafka/bin/kafka-consumer-perf-test.sh" + cmd += " /opt/%s/bin/kafka-consumer-perf-test.sh" % kafka_dir(node) for key, value in self.args.items(): cmd += " --%s %s" % (key, value) cmd += " --consumer.config %s" % ConsumerPerformanceService.CONFIG_FILE @@ -144,7 +144,7 @@ class ConsumerPerformanceService(PerformanceService): node.account.create_file(ConsumerPerformanceService.CONFIG_FILE, str(self.security_config)) self.security_config.setup_node(node) - cmd = self.start_cmd + cmd = self.start_cmd(node) self.logger.debug("Consumer performance %d command: %s", idx, cmd) last = None for line in node.account.ssh_capture(cmd): diff --git a/tests/kafkatest/services/performance/end_to_end_latency.py b/tests/kafkatest/services/performance/end_to_end_latency.py index 0559a4317..2be1621fa 100644 --- a/tests/kafkatest/services/performance/end_to_end_latency.py +++ b/tests/kafkatest/services/performance/end_to_end_latency.py @@ -16,6 +16,8 @@ from kafkatest.services.performance import PerformanceService from kafkatest.utils.security_config import SecurityConfig +from kafkatest.services.kafka.directory import kafka_dir + class EndToEndLatencyService(PerformanceService): @@ -51,10 +53,8 @@ class EndToEndLatencyService(PerformanceService): 'ssl_config_file': ssl_config_file }) - cmd = "/opt/kafka/bin/kafka-run-class.sh kafka.tools.EndToEndLatency "\ - "%(bootstrap_servers)s %(topic)s %(num_records)d "\ - "%(acks)d 20 %(ssl_config_file)s" % args - + cmd = "/opt/%s/bin/kafka-run-class.sh kafka.tools.EndToEndLatency " % kafka_dir(node) + cmd += "%(bootstrap_servers)s %(topic)s %(num_records)d %(acks)d 20 %(ssl_config_file)s" % args cmd += " | tee /mnt/end-to-end-latency.log" self.logger.debug("End-to-end latency %d command: %s", idx, cmd) diff --git a/tests/kafkatest/services/performance/producer_performance.py b/tests/kafkatest/services/performance/producer_performance.py index 25911af02..401d6f742 100644 --- a/tests/kafkatest/services/performance/producer_performance.py +++ b/tests/kafkatest/services/performance/producer_performance.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. -from kafkatest.services.performance.jmx_mixin import JmxMixin +from kafkatest.services.monitor.jmx import JmxMixin from kafkatest.services.performance import PerformanceService import itertools from kafkatest.utils.security_config import SecurityConfig +from kafkatest.services.kafka.directory import kafka_dir class ProducerPerformanceService(JmxMixin, PerformanceService): @@ -45,8 +46,13 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): def _worker(self, idx, node): args = self.args.copy() - args.update({'bootstrap_servers': self.kafka.bootstrap_servers(), 'jmx_port': self.jmx_port, 'client_id': self.client_id}) - cmd = "JMX_PORT=%(jmx_port)d /opt/kafka/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ + args.update({ + 'bootstrap_servers': self.kafka.bootstrap_servers(), + 'jmx_port': self.jmx_port, + 'client_id': self.client_id, + 'kafka_directory': kafka_dir(node) + }) + cmd = "JMX_PORT=%(jmx_port)d /opt/%(kafka_directory)s/bin/kafka-run-class.sh org.apache.kafka.tools.ProducerPerformance " \ "%(topic)s %(num_records)d %(record_size)d %(throughput)d bootstrap.servers=%(bootstrap_servers)s client.id=%(client_id)s" % args self.security_config.setup_node(node) @@ -73,19 +79,21 @@ class ProducerPerformanceService(JmxMixin, PerformanceService): } last = None producer_output = node.account.ssh_capture(cmd) - first_line = producer_output.next() - self.start_jmx_tool(idx, node) - for line in itertools.chain([first_line], producer_output): - if self.intermediate_stats: - try: - self.stats[idx-1].append(parse_stats(line)) - except: - # Sometimes there are extraneous log messages - pass + first_line = next(producer_output, None) - last = line - try: - self.results[idx-1] = parse_stats(last) - except: - raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) - self.read_jmx_output(idx, node) + if first_line is not None: + self.start_jmx_tool(idx, node) + for line in itertools.chain([first_line], producer_output): + if self.intermediate_stats: + try: + self.stats[idx-1].append(parse_stats(line)) + except: + # Sometimes there are extraneous log messages + pass + + last = line + try: + self.results[idx-1] = parse_stats(last) + except: + raise Exception("Unable to parse aggregate performance statistics on node %d: %s" % (idx, last)) + self.read_jmx_output(idx, node) diff --git a/tests/kafkatest/services/verifiable_producer.py b/tests/kafkatest/services/verifiable_producer.py index 7ae7988b6..a95a0d606 100644 --- a/tests/kafkatest/services/verifiable_producer.py +++ b/tests/kafkatest/services/verifiable_producer.py @@ -14,28 +14,49 @@ # limitations under the License. from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.services.kafka.directory import kafka_dir, KAFKA_TRUNK +from kafkatest.services.kafka.version import TRUNK, LATEST_0_8_2 from kafkatest.utils.security_config import SecurityConfig import json +import os +import subprocess +import time class VerifiableProducer(BackgroundThreadService): + PERSISTENT_ROOT = "/mnt/verifiable_producer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_producer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_producer.properties") - CONFIG_FILE = "/mnt/verifiable_producer.properties" logs = { - "producer_log": { - "path": "/mnt/producer.log", - "collect_default": False} - } + "verifiable_producer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_producer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_producer_log": { + "path": LOG_FILE, + "collect_default": True} + } - def __init__(self, context, num_nodes, kafka, topic, security_protocol=None, max_messages=-1, throughput=100000): + def __init__(self, context, num_nodes, kafka, topic, security_protocol=SecurityConfig.PLAINTEXT, max_messages=-1, throughput=100000, version=TRUNK): super(VerifiableProducer, self).__init__(context, num_nodes) + self.log_level = "TRACE" self.kafka = kafka self.topic = topic self.max_messages = max_messages self.throughput = throughput + for node in self.nodes: + node.version = version self.acked_values = [] self.not_acked_values = [] @@ -45,15 +66,24 @@ class VerifiableProducer(BackgroundThreadService): self.prop_file += str(self.security_config) def _worker(self, idx, node): + node.account.ssh("mkdir -p %s" % VerifiableProducer.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=VerifiableProducer.LOG_FILE) + node.account.create_file(VerifiableProducer.LOG4J_CONFIG, log_config) + # Create and upload config file self.logger.info("verifiable_producer.properties:") self.logger.info(self.prop_file) node.account.create_file(VerifiableProducer.CONFIG_FILE, self.prop_file) self.security_config.setup_node(node) - cmd = self.start_cmd + cmd = self.start_cmd(node) self.logger.debug("VerifiableProducer %d command: %s" % (idx, cmd)) + + last_produced_time = time.time() + prev_msg = None for line in node.account.ssh_capture(cmd): line = line.strip() @@ -68,9 +98,30 @@ class VerifiableProducer(BackgroundThreadService): elif data["name"] == "producer_send_success": self.acked_values.append(int(data["value"])) - @property - def start_cmd(self): - cmd = "/opt/kafka/bin/kafka-verifiable-producer.sh" \ + # Log information if there is a large gap between successively acknowledged messages + t = time.time() + time_delta_sec = t - last_produced_time + if time_delta_sec > 2 and prev_msg is not None: + self.logger.debug( + "Time delta between successively acked messages is large: " + + "delta_t_sec: %s, prev_message: %s, current_message: %s" % (str(time_delta_sec), str(prev_msg), str(data))) + + last_produced_time = t + prev_msg = data + + def start_cmd(self, node): + + cmd = "" + if node.version <= LATEST_0_8_2: + # 0.8.2.X releases do not have VerifiableProducer.java, so cheat and add + # the tools jar from trunk to the classpath + cmd += "for file in /opt/%s/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK + cmd += "for file in /opt/%s/tools/build/dependant-libs-${SCALA_VERSION}*/*.jar; do CLASSPATH=$CLASSPATH:$file; done; " % KAFKA_TRUNK + cmd += "export CLASSPATH; " + + cmd += "export LOG_DIR=%s;" % VerifiableProducer.LOG_DIR + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableProducer.LOG4J_CONFIG + cmd += "/opt/" + kafka_dir(node) + "/bin/kafka-run-class.sh org.apache.kafka.tools.VerifiableProducer" \ " --topic %s --broker-list %s" % (self.topic, self.kafka.bootstrap_servers()) if self.max_messages > 0: cmd += " --max-messages %s" % str(self.max_messages) @@ -78,9 +129,20 @@ class VerifiableProducer(BackgroundThreadService): cmd += " --throughput %s" % str(self.throughput) cmd += " --producer.config %s" % VerifiableProducer.CONFIG_FILE - cmd += " 2>> /mnt/producer.log | tee -a /mnt/producer.log &" + cmd += " 2>> %s | tee -a %s &" % (VerifiableProducer.STDOUT_CAPTURE, VerifiableProducer.STDOUT_CAPTURE) return cmd + def pids(self, node): + try: + cmd = "ps ax | grep -i VerifiableProducer | grep java | grep -v grep | awk '{print $1}'" + pid_arr = [pid for pid in node.account.ssh_capture(cmd, allow_fail=True, callback=int)] + return pid_arr + except (subprocess.CalledProcessError, ValueError) as e: + return [] + + def alive(self, node): + return len(self.pids(node)) > 0 + @property def acked(self): with self.lock: @@ -113,7 +175,7 @@ class VerifiableProducer(BackgroundThreadService): def clean_node(self, node): node.account.kill_process("VerifiableProducer", clean_shutdown=False, allow_fail=False) - node.account.ssh("rm -rf /mnt/producer.log /mnt/verifiable_producer.properties", allow_fail=False) + node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False) self.security_config.clean_node(node) def try_parse_json(self, string): diff --git a/tests/kafkatest/services/zookeeper.py b/tests/kafkatest/services/zookeeper.py index 09bec35d4..9a9047c05 100644 --- a/tests/kafkatest/services/zookeeper.py +++ b/tests/kafkatest/services/zookeeper.py @@ -16,6 +16,8 @@ from ducktape.services.service import Service +from kafkatest.services.kafka.directory import kafka_dir + import subprocess import time @@ -46,9 +48,9 @@ class ZookeeperService(Service): self.logger.info(config_file) node.account.create_file("/mnt/zookeeper.properties", config_file) - node.account.ssh( - "/opt/kafka/bin/zookeeper-server-start.sh /mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" - % self.logs["zk_log"]) + start_cmd = "/opt/%s/bin/zookeeper-server-start.sh " % kafka_dir(node) + start_cmd += "/mnt/zookeeper.properties 1>> %(path)s 2>> %(path)s &" % self.logs["zk_log"] + node.account.ssh(start_cmd) time.sleep(5) # give it some time to start diff --git a/tests/kafkatest/tests/produce_consume_validate.py b/tests/kafkatest/tests/produce_consume_validate.py new file mode 100644 index 000000000..aa2fe5380 --- /dev/null +++ b/tests/kafkatest/tests/produce_consume_validate.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.tests.test import Test +from ducktape.utils.util import wait_until + + +class ProduceConsumeValidateTest(Test): + """This class provides a shared template for tests which follow the common pattern of: + + - produce to a topic in the background + - consume from that topic in the background + - run some logic, e.g. fail topic leader etc. + - perform validation + """ + + def __init__(self, test_context): + super(ProduceConsumeValidateTest, self).__init__(test_context=test_context) + + def setup_producer_and_consumer(self): + raise NotImplementedError("Subclasses should implement this") + + def start_producer_and_consumer(self): + # Start background producer and consumer + self.producer.start() + wait_until(lambda: self.producer.num_acked > 5, timeout_sec=10, + err_msg="Producer failed to start in a reasonable amount of time.") + self.consumer.start() + wait_until(lambda: len(self.consumer.messages_consumed[1]) > 0, timeout_sec=10, + err_msg="Consumer failed to start in a reasonable amount of time.") + + def stop_producer_and_consumer(self): + for node in self.consumer.nodes: + if not self.consumer.alive(node): + self.logger.warn("Consumer on %s is not alive and probably should be." % str(node.account)) + for node in self.producer.nodes: + if not self.producer.alive(node): + self.logger.warn("Producer on %s is not alive and probably should be." % str(node.account)) + + # Check that producer is still successfully producing + currently_acked = self.producer.num_acked + wait_until(lambda: self.producer.num_acked > currently_acked + 5, timeout_sec=10, + err_msg="Expected producer to still be producing.") + + self.producer.stop() + self.consumer.wait() + + def run_produce_consume_validate(self, core_test_action): + """Top-level template for simple produce/consume/validate tests.""" + + self.start_producer_and_consumer() + core_test_action() + self.stop_producer_and_consumer() + self.validate() + + def validate(self): + """Check that each acked message was consumed.""" + + self.acked = self.producer.acked + self.not_acked = self.producer.not_acked + + # Check produced vs consumed + self.consumed = self.consumer.messages_consumed[1] + self.logger.info("num consumed: %d" % len(self.consumed)) + + success = True + msg = "" + + if len(set(self.consumed)) != len(self.consumed): + # There are duplicates. This is ok, so report it but don't fail the test + msg += "There are duplicate messages in the log\n" + + if not set(self.consumed).issuperset(set(self.acked)): + # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages. + acked_minus_consumed = set(self.producer.acked) - set(self.consumed) + success = False + + msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + if len(acked_minus_consumed) < 20: + msg += str(acked_minus_consumed) + else: + for i in range(20): + msg += str(acked_minus_consumed.pop()) + ", " + msg += "...plus " + str(len(acked_minus_consumed) - 20) + " more" + + if not success: + # Collect all the data logs if there was a failure + self.mark_for_collect(self.kafka) + + if not success: + self.mark_for_collect(self.producer) + + assert success, msg + diff --git a/tests/kafkatest/tests/quota_test.py b/tests/kafkatest/tests/quota_test.py index 4ae2e0830..6ba6aa7a6 100644 --- a/tests/kafkatest/tests/quota_test.py +++ b/tests/kafkatest/tests/quota_test.py @@ -14,18 +14,13 @@ # limitations under the License. from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from ducktape.mark import parametrize from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService -from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.performance import ProducerPerformanceService -from kafkatest.services.console_consumer import ConsoleConsumer, is_int +from kafkatest.services.console_consumer import ConsoleConsumer -import random -import signal -import time class QuotaTest(Test): """ @@ -73,7 +68,10 @@ class QuotaTest(Test): """Override this since we're adding services outside of the constructor""" return super(QuotaTest, self).min_cluster_size() + self.num_producers + self.num_consumers - def run_clients(self, producer_id, producer_num, consumer_id, consumer_num): + @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1) + @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1) + @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2) + def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1): # Produce all messages producer = ProducerPerformanceService( self.test_context, producer_num, self.kafka, security_protocol=self.security_protocol, @@ -91,7 +89,7 @@ class QuotaTest(Test): consumer.run() for idx, messages in consumer.messages_consumed.iteritems(): - assert len(messages)>0, "consumer %d didn't consume any message before timeout" % idx + assert len(messages) > 0, "consumer %d didn't consume any message before timeout" % idx success, msg = self.validate(self.kafka, producer, consumer) assert success, msg @@ -172,9 +170,3 @@ class QuotaTest(Test): if client_id in overridden_quotas: return float(overridden_quotas[client_id]) return self.quota_config['quota_consumer_default'] - - @parametrize(producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1) - @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=1) - @parametrize(producer_id='overridden_id', producer_num=1, consumer_id='overridden_id', consumer_num=2) - def test_quota(self, producer_id='default_id', producer_num=1, consumer_id='default_id', consumer_num=1): - self.run_clients(producer_id, producer_num, consumer_id, consumer_num) diff --git a/tests/kafkatest/tests/replication_test.py b/tests/kafkatest/tests/replication_test.py index d20cc220b..16aa944c9 100644 --- a/tests/kafkatest/tests/replication_test.py +++ b/tests/kafkatest/tests/replication_test.py @@ -13,24 +13,76 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ducktape.tests.test import Test from ducktape.utils.util import wait_until -from ducktape.mark import parametrize + from ducktape.mark import matrix from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.kafka import KafkaService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.console_consumer import ConsoleConsumer, is_int +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest import signal -import time -class ReplicationTest(Test): - """Replication tests. - These tests verify that replication provides simple durability guarantees by checking that data acked by - brokers is still available for consumption in the face of various failure scenarios.""" +def clean_shutdown(test): + """Discover leader node for our topic and shut it down cleanly.""" + test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGTERM) + + +def hard_shutdown(test): + """Discover leader node for our topic and shut it down with a hard kill.""" + test.kafka.signal_leader(test.topic, partition=0, sig=signal.SIGKILL) + + +def clean_bounce(test): + """Chase the leader of one partition and restart it cleanly.""" + for i in range(5): + prev_leader_node = test.kafka.leader(topic=test.topic, partition=0) + test.kafka.restart_node(prev_leader_node, clean_shutdown=True) + + +def hard_bounce(test): + """Chase the leader and restart it with a hard kill.""" + for i in range(5): + prev_leader_node = test.kafka.leader(topic=test.topic, partition=0) + test.kafka.signal_node(prev_leader_node, sig=signal.SIGKILL) + + # Since this is a hard kill, we need to make sure the process is down and that + # zookeeper and the broker cluster have registered the loss of the leader. + # Waiting for a new leader to be elected on the topic-partition is a reasonable heuristic for this. + + def leader_changed(): + current_leader = test.kafka.leader(topic=test.topic, partition=0) + return current_leader is not None and current_leader != prev_leader_node + + wait_until(lambda: len(test.kafka.pids(prev_leader_node)) == 0, timeout_sec=5) + wait_until(leader_changed, timeout_sec=10, backoff_sec=.5) + test.kafka.start_node(prev_leader_node) + +failures = { + "clean_shutdown": clean_shutdown, + "hard_shutdown": hard_shutdown, + "clean_bounce": clean_bounce, + "hard_bounce": hard_bounce +} + + +class ReplicationTest(ProduceConsumeValidateTest): + """ + Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages + (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop + too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose + ordering guarantees. + + Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked, + we might exit early if some messages are duplicated (though not an issue here since producer retries==0) + + Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively + consumed messages. Since we run the producer to completion before running the consumer, this is a reliable + indicator that nothing is left to consume. + """ def __init__(self, test_context): """:type test_context: ducktape.tests.test.TestContext""" @@ -38,6 +90,11 @@ class ReplicationTest(Test): self.topic = "test_topic" self.zk = ZookeeperService(test_context, num_nodes=1) + self.kafka = KafkaService(test_context, num_nodes=3, zk=self.zk, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2} + }) self.producer_throughput = 10000 self.num_producers = 1 self.num_consumers = 1 @@ -49,125 +106,27 @@ class ReplicationTest(Test): """Override this since we're adding services outside of the constructor""" return super(ReplicationTest, self).min_cluster_size() + self.num_producers + self.num_consumers - def run_with_failure(self, failure, interbroker_security_protocol): - """This is the top-level test template. - The steps are: - Produce messages in the background while driving some failure condition - When done driving failures, immediately stop producing - Consume all messages - Validate that messages acked by brokers were consumed + @matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"], + interbroker_security_protocol=["PLAINTEXT", "SSL"]) + def test_replication_with_broker_failure(self, failure_mode, interbroker_security_protocol="PLAINTEXT"): + """Replication tests. + These tests verify that replication provides simple durability guarantees by checking that data acked by + brokers is still available for consumption in the face of various failure scenarios. - Note that consuming is a bit tricky, at least with console consumer. The goal is to consume all messages - (foreach partition) in the topic. In this case, waiting for the last message may cause the consumer to stop - too soon since console consumer is consuming multiple partitions from a single thread and therefore we lose - ordering guarantees. - - Waiting on a count of consumed messages can be unreliable: if we stop consuming when num_consumed == num_acked, - we might exit early if some messages are duplicated (though not an issue here since producer retries==0) - - Therefore rely here on the consumer.timeout.ms setting which times out on the interval between successively - consumed messages. Since we run the producer to completion before running the consumer, this is a reliable - indicator that nothing is left to consume. + Setup: 1 zk, 3 kafka nodes, 1 topic with partitions=3, replication-factor=3, and min.insync.replicas=2 + - Produce messages in the background + - Consume messages in the background + - Drive broker failures (shutdown, or bounce repeatedly with kill -15 or kill -9) + - When done driving failures, stop producing, and finish consuming + - Validate that every acked message was consumed """ - security_protocol='PLAINTEXT' - self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, - security_protocol=security_protocol, - interbroker_security_protocol=interbroker_security_protocol, - topics={self.topic: { - "partitions": 3, - "replication-factor": 3, - "min.insync.replicas": 2} - }) + client_security_protocol = 'PLAINTEXT' + self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=client_security_protocol, throughput=self.producer_throughput) + self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=client_security_protocol, consumer_timeout_ms=60000, message_validator=is_int) + + self.kafka.interbroker_security_protocol = interbroker_security_protocol self.kafka.start() - self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, security_protocol=security_protocol, throughput=self.producer_throughput) - self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, security_protocol=security_protocol, new_consumer=False, consumer_timeout_ms=3000, message_validator=is_int) - - # Produce in a background thread while driving broker failures - self.producer.start() - wait_until(lambda: self.producer.num_acked > 5, timeout_sec=5, - err_msg="Producer failed to start in a reasonable amount of time.") - failure() - self.producer.stop() - - self.acked = self.producer.acked - self.not_acked = self.producer.not_acked - self.logger.info("num not acked: %d" % self.producer.num_not_acked) - self.logger.info("num acked: %d" % self.producer.num_acked) - - # Consume all messages - self.consumer.start() - self.consumer.wait() - self.consumed = self.consumer.messages_consumed[1] - self.logger.info("num consumed: %d" % len(self.consumed)) - - # Check produced vs consumed - success, msg = self.validate() - - if not success: - self.mark_for_collect(self.producer) - - assert success, msg - - def clean_shutdown(self): - """Discover leader node for our topic and shut it down cleanly.""" - self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGTERM) - - def hard_shutdown(self): - """Discover leader node for our topic and shut it down with a hard kill.""" - self.kafka.signal_leader(self.topic, partition=0, sig=signal.SIGKILL) - - def clean_bounce(self): - """Chase the leader of one partition and restart it cleanly.""" - for i in range(5): - prev_leader_node = self.kafka.leader(topic=self.topic, partition=0) - self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=True) - - def hard_bounce(self): - """Chase the leader and restart it cleanly.""" - for i in range(5): - prev_leader_node = self.kafka.leader(topic=self.topic, partition=0) - self.kafka.restart_node(prev_leader_node, wait_sec=5, clean_shutdown=False) - - # Wait long enough for previous leader to probably be awake again - time.sleep(6) - - def validate(self): - """Check that produced messages were consumed.""" - - success = True - msg = "" - - if len(set(self.consumed)) != len(self.consumed): - # There are duplicates. This is ok, so report it but don't fail the test - msg += "There are duplicate messages in the log\n" - - if not set(self.consumed).issuperset(set(self.acked)): - # Every acked message must appear in the logs. I.e. consumed messages must be superset of acked messages. - acked_minus_consumed = set(self.producer.acked) - set(self.consumed) - success = False - msg += "At least one acked message did not appear in the consumed messages. acked_minus_consumed: " + str(acked_minus_consumed) - - if not success: - # Collect all the data logs if there was a failure - self.mark_for_collect(self.kafka) - - return success, msg - - - @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) - def test_clean_shutdown(self, interbroker_security_protocol): - self.run_with_failure(self.clean_shutdown, interbroker_security_protocol) - - @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) - def test_hard_shutdown(self, interbroker_security_protocol): - self.run_with_failure(self.hard_shutdown, interbroker_security_protocol) - - @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) - def test_clean_bounce(self, interbroker_security_protocol): - self.run_with_failure(self.clean_bounce, interbroker_security_protocol) - - @matrix(interbroker_security_protocol=['PLAINTEXT', 'SSL']) - def test_hard_bounce(self, interbroker_security_protocol): - self.run_with_failure(self.hard_bounce, interbroker_security_protocol) + + self.run_produce_consume_validate(core_test_action=lambda: failures[failure_mode](self)) diff --git a/tests/kafkatest/tests/upgrade_test.py b/tests/kafkatest/tests/upgrade_test.py new file mode 100644 index 000000000..97605cd76 --- /dev/null +++ b/tests/kafkatest/tests/upgrade_test.py @@ -0,0 +1,81 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.services.kafka import KafkaService +from kafkatest.services.kafka.version import LATEST_0_8_2, TRUNK +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.console_consumer import ConsoleConsumer, is_int +from kafkatest.services.kafka import config_property +from kafkatest.tests.produce_consume_validate import ProduceConsumeValidateTest + + +class TestUpgrade(ProduceConsumeValidateTest): + + def __init__(self, test_context): + super(TestUpgrade, self).__init__(test_context=test_context) + + def setUp(self): + self.topic = "test_topic" + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.kafka = KafkaService(self.test_context, num_nodes=3, zk=self.zk, version=LATEST_0_8_2, topics={self.topic: { + "partitions": 3, + "replication-factor": 3, + "min.insync.replicas": 2}}) + self.zk.start() + self.kafka.start() + + # Producer and consumer + self.producer_throughput = 10000 + self.num_producers = 1 + self.num_consumers = 1 + self.producer = VerifiableProducer( + self.test_context, self.num_producers, self.kafka, self.topic, + throughput=self.producer_throughput, version=LATEST_0_8_2) + + # TODO - reduce the timeout + self.consumer = ConsoleConsumer( + self.test_context, self.num_consumers, self.kafka, self.topic, + consumer_timeout_ms=30000, message_validator=is_int, version=LATEST_0_8_2) + + def perform_upgrade(self): + self.logger.info("First pass bounce - rolling upgrade") + for node in self.kafka.nodes: + self.kafka.stop_node(node) + node.version = TRUNK + node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] = "0.8.2.X" + self.kafka.start_node(node) + + self.logger.info("Second pass bounce - remove inter.broker.protocol.version config") + for node in self.kafka.nodes: + self.kafka.stop_node(node) + del node.config[config_property.INTER_BROKER_PROTOCOL_VERSION] + self.kafka.start_node(node) + + def test_upgrade(self): + """Test upgrade of Kafka broker cluster from 0.8.2 to 0.9.0 + + - Start 3 node broker cluster on version 0.8.2 + - Start producer and consumer in the background + - Perform two-phase rolling upgrade + - First phase: upgrade brokers to 0.9.0 with inter.broker.protocol.version set to 0.8.2.X + - Second phase: remove inter.broker.protocol.version config with rolling bounce + - Finally, validate that every message acked by the producer was consumed by the consumer + """ + + self.run_produce_consume_validate(core_test_action=self.perform_upgrade) + + diff --git a/tests/kafkatest/utils/__init__.py b/tests/kafkatest/utils/__init__.py index cff6d2b21..46c71f0f1 100644 --- a/tests/kafkatest/utils/__init__.py +++ b/tests/kafkatest/utils/__init__.py @@ -12,4 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults \ No newline at end of file +# see kafka.server.KafkaConfig for additional details and defaults + +from util import kafkatest_version, is_version \ No newline at end of file diff --git a/tests/kafkatest/utils/util.py b/tests/kafkatest/utils/util.py new file mode 100644 index 000000000..2b1e49c91 --- /dev/null +++ b/tests/kafkatest/utils/util.py @@ -0,0 +1,42 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kafkatest import __version__ as __kafkatest_version__ + +import re + + +def kafkatest_version(): + """Return string representation of current ducktape version.""" + return __kafkatest_version__ + + +def _kafka_jar_versions(proc_string): + """Use a rough heuristic to find all kafka versions explicitly in the process classpath""" + versions = re.findall("kafka-[a-z]+-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string) + versions.extend(re.findall("kafka-([0-9]+\.[0-9]+\.[0-9]+\.[0-9]+)", proc_string)) + + return set(versions) + + +def is_version(node, version_list, proc_grep_string="kafka"): + """Heuristic to check that only the specified version appears in the classpath of the process + A useful tool to aid in checking that service version apis are working correctly. + """ + lines = [l for l in node.account.ssh_capture("ps ax | grep %s | grep -v grep" % proc_grep_string)] + assert len(lines) == 1 + + versions = _kafka_jar_versions(lines[0]) + return versions == {str(v) for v in version_list} + diff --git a/tests/setup.py b/tests/setup.py index d637eb8dd..f555fd314 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -14,15 +14,21 @@ # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults +import re from setuptools import find_packages, setup +version = '' +with open('kafkatest/__init__.py', 'r') as fd: + version = re.search(r'^__version__\s*=\s*[\'"]([^\'"]*)[\'"]', + fd.read(), re.MULTILINE).group(1) + setup(name="kafkatest", - version="0.9.0.dev0", + version=version, description="Apache Kafka System Tests", author="Apache Kafka", platforms=["any"], license="apache2.0", packages=find_packages(), include_package_data=True, - install_requires=["ducktape==0.3.2"] + install_requires=["ducktape==0.3.8"] ) diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index dd695cfc6..0cd90c0bb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -23,12 +23,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.utils.Utils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.IOException; +import java.io.InputStream; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -138,7 +140,29 @@ public class VerifiableProducer { return parser; } - + + /** + * Read a properties file from the given path + * @param filename The path of the file to read + * + * Note: this duplication of org.apache.kafka.common.utils.Utils.loadProps is unfortunate + * but *intentional*. In order to use VerifiableProducer in compatibility and upgrade tests, + * we use VerifiableProducer from trunk tools package, and run it against 0.8.X.X kafka jars. + * Since this method is not in Utils in the 0.8.X.X jars, we have to cheat a bit and duplicate. + */ + public static Properties loadProps(String filename) throws IOException, FileNotFoundException { + Properties props = new Properties(); + InputStream propStream = null; + try { + propStream = new FileInputStream(filename); + props.load(propStream); + } finally { + if (propStream != null) + propStream.close(); + } + return props; + } + /** Construct a VerifiableProducer object from command-line arguments. */ public static VerifiableProducer createFromArgs(String[] args) { ArgumentParser parser = argParser(); @@ -164,7 +188,7 @@ public class VerifiableProducer { producerProps.put("retries", "0"); if (configFile != null) { try { - producerProps.putAll(Utils.loadProps(configFile)); + producerProps.putAll(loadProps(configFile)); } catch (IOException e) { throw new ArgumentParserException(e.getMessage(), parser); } diff --git a/vagrant/base.sh b/vagrant/base.sh index 133f10a95..2c2e5c2ef 100644 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -38,9 +38,31 @@ if [ -z `which javac` ]; then fi chmod a+rw /opt -if [ ! -e /opt/kafka ]; then - ln -s /vagrant /opt/kafka +if [ -h /opt/kafka-trunk ]; then + # reset symlink + rm /opt/kafka-trunk fi +ln -s /vagrant /opt/kafka-trunk + +get_kafka() { + version=$1 + + kafka_dir=/opt/kafka-$version + url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz + if [ ! -d /opt/kafka-$version ]; then + pushd /tmp + curl -O $url + file_tgz=`basename $url` + tar -xzf $file_tgz + rm -rf $file_tgz + + file=`basename $file_tgz .tgz` + mv $file $kafka_dir + popd + fi +} + +get_kafka 0.8.2.2 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local # VMs, we can just create it if it doesn't exist and use it like we'd use