From 7006d0f58b9a72f181b13bab6f1f64b0e7b4117e Mon Sep 17 00:00:00 2001 From: John Roesler Date: Thu, 15 Mar 2018 14:42:43 -0700 Subject: [PATCH] MINOR: Streams system tests fixes/updates (#4689) Some changes required to get the Streams system tests working via Docker To test: TC_PATHS="tests/kafkatest/tests/streams" bash tests/docker/run_tests.sh That command will take about 3.5 hours, and should pass. Note there are a couple of ignored tests. Reviewers: Guozhang Wang , Bill Bejeck --- .gitignore | 4 + tests/docker/Dockerfile | 23 +++- tests/kafkatest/services/streams.py | 5 +- .../tests/streams/streams_upgrade_test.py | 124 +++++++++--------- tests/kafkatest/version.py | 9 +- tests/setup.py | 2 +- vagrant/base.sh | 8 +- 7 files changed, 101 insertions(+), 74 deletions(-) diff --git a/.gitignore b/.gitignore index 60883492f..ba594ff03 100644 --- a/.gitignore +++ b/.gitignore @@ -47,3 +47,7 @@ tests/venv docs/generated/ .release-settings.json + +kafkatest.egg-info/ +systest/ + diff --git a/tests/docker/Dockerfile b/tests/docker/Dockerfile index 149e3911b..57ca2423e 100644 --- a/tests/docker/Dockerfile +++ b/tests/docker/Dockerfile @@ -40,12 +40,23 @@ RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.p # Install binary test dependencies. ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages" -RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" -RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" -RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" -RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" -RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" -RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0" +RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2" +RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1" +RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1" +RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1" +RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1" +RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0" +RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2" +RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0" +RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1" + +# Streams test dependencies +RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar && \ + curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar && \ + curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar && \ + curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar && \ + curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar && \ + curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar # The version of Kibosh to use for testing. # If you update this, also update vagrant/base.sy diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index e552a39ea..e886c94d5 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -123,6 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service): "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args + self.logger.info("Executing: " + cmd) return cmd @@ -163,7 +164,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService): def clean_node(self, node): if self.clean_node_enabled: - super.clean_node(self, node) + super(StreamsEosTestBaseService, self).clean_node(node) class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService): @@ -233,7 +234,7 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService): "INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \ " %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \ " %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args - + self.logger.info("Executing: " + cmd) return cmd diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 81b7ffe70..6ac5939fa 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -15,13 +15,14 @@ from ducktape.mark.resource import cluster from ducktape.tests.test import Test -from ducktape.mark import parametrize, ignore +from ducktape.mark import matrix from kafkatest.services.kafka import KafkaService from kafkatest.services.zookeeper import ZookeeperService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService -from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion +from kafkatest.version import LATEST_0_10_2, LATEST_0_11, LATEST_1_0, DEV_BRANCH, KafkaVersion import time +upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11), str(LATEST_1_0), str(DEV_BRANCH)] class StreamsUpgradeTest(Test): """ @@ -55,7 +56,7 @@ class StreamsUpgradeTest(Test): 'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication, 'configs': {"min.insync.replicas": self.isr} } } - + def perform_streams_upgrade(self, to_version): self.logger.info("First pass bounce - rolling streams upgrade") @@ -76,14 +77,8 @@ class StreamsUpgradeTest(Test): node.version = KafkaVersion(to_version) self.kafka.start_node(node) - @cluster(num_nodes=6) - @parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH)) - @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH)) - @parametrize(from_version=str(LATEST_0_10_1), to_version=str(LATEST_0_11_0)) - @parametrize(from_version=str(LATEST_0_10_2), to_version=str(LATEST_0_11_0)) - @parametrize(from_version=str(LATEST_0_11_0), to_version=str(LATEST_0_10_2)) - @parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2)) + @matrix(from_version=upgrade_versions, to_version=upgrade_versions) def test_upgrade_downgrade_streams(self, from_version, to_version): """ Start a smoke test client, then abort (kill -9) and restart it a few times. @@ -94,74 +89,79 @@ class StreamsUpgradeTest(Test): (search for get_kafka()). For streams in particular, that means that someone has manually copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh. """ - # Setup phase - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() + if from_version != to_version: + # Setup phase + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() - # number of nodes needs to be >= 3 for the smoke test - self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) - self.kafka.start() - - # allow some time for topics to be created - time.sleep(10) - - self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + # number of nodes needs to be >= 3 for the smoke test + self.kafka = KafkaService(self.test_context, num_nodes=3, + zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) + self.kafka.start() - - self.driver.start() - self.processor1.start() - time.sleep(15) + # allow some time for topics to be created + time.sleep(10) - self.perform_streams_upgrade(to_version) + self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) + self.driver.node.version = KafkaVersion(from_version) + self.driver.start() - time.sleep(15) - self.driver.wait() - self.driver.stop() + self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + self.processor1.node.version = KafkaVersion(from_version) + self.processor1.start() - self.processor1.stop() + time.sleep(15) - node = self.driver.node - node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) - self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) + self.perform_streams_upgrade(to_version) + + time.sleep(15) + self.driver.wait() + self.driver.stop() + + self.processor1.stop() + + self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) + self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) @cluster(num_nodes=6) - @parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH)) + @matrix(from_version=upgrade_versions, to_version=upgrade_versions) def test_upgrade_brokers(self, from_version, to_version): """ - Start a smoke test client then perform rolling upgrades on the broker. + Start a smoke test client then perform rolling upgrades on the broker. """ - # Setup phase - self.zk = ZookeeperService(self.test_context, num_nodes=1) - self.zk.start() + if from_version != to_version: + # Setup phase + self.zk = ZookeeperService(self.test_context, num_nodes=1) + self.zk.start() - # number of nodes needs to be >= 3 for the smoke test - self.kafka = KafkaService(self.test_context, num_nodes=3, - zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) - self.kafka.start() - - # allow some time for topics to be created - time.sleep(10) - - self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) - self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + # number of nodes needs to be >= 3 for the smoke test + self.kafka = KafkaService(self.test_context, num_nodes=3, + zk=self.zk, version=KafkaVersion(from_version), topics=self.topics) + self.kafka.start() - - self.driver.start() - self.processor1.start() - time.sleep(15) + # allow some time for topics to be created + time.sleep(10) - self.perform_broker_upgrade(to_version) + # use the current (dev) version driver + self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka) + self.driver.node.version = KafkaVersion(from_version) + self.driver.start() - time.sleep(15) - self.driver.wait() - self.driver.stop() + self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka) + self.processor1.node.version = KafkaVersion(from_version) + self.processor1.start() - self.processor1.stop() + time.sleep(15) - node = self.driver.node - node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) - self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) + self.perform_broker_upgrade(to_version) + + time.sleep(15) + self.driver.wait() + self.driver.stop() + + self.processor1.stop() + + self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False) + self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False) diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py index f63a7c17e..f88fd31d9 100644 --- a/tests/kafkatest/version.py +++ b/tests/kafkatest/version.py @@ -91,5 +91,12 @@ LATEST_0_10 = LATEST_0_10_2 # 0.11.0.0 versions V_0_11_0_0 = KafkaVersion("0.11.0.0") -LATEST_0_11_0 = V_0_11_0_0 +V_0_11_0_1 = KafkaVersion("0.11.0.1") +V_0_11_0_2 = KafkaVersion("0.11.0.2") +LATEST_0_11_0 = V_0_11_0_2 LATEST_0_11 = LATEST_0_11_0 + +# 1.0.x versions +V_1_0_0 = KafkaVersion("1.0.0") +V_1_0_1 = KafkaVersion("1.0.1") +LATEST_1_0 = V_1_0_1 \ No newline at end of file diff --git a/tests/setup.py b/tests/setup.py index 24ee4eb9e..7d7c4a460 100644 --- a/tests/setup.py +++ b/tests/setup.py @@ -38,7 +38,7 @@ class PyTest(TestCommand): def run_tests(self): # import here, cause outside the eggs aren't loaded import pytest - print self.pytest_args + print(self.pytest_args) errno = pytest.main(self.pytest_args) sys.exit(errno) diff --git a/vagrant/base.sh b/vagrant/base.sh index 4b5540652..bfc349673 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -110,8 +110,12 @@ get_kafka 0.10.1.1 2.11 chmod a+rw /opt/kafka-0.10.1.1 get_kafka 0.10.2.1 2.11 chmod a+rw /opt/kafka-0.10.2.1 -get_kafka 0.11.0.0 2.11 -chmod a+rw /opt/kafka-0.11.0.0 +get_kafka 0.11.0.2 2.11 +chmod a+rw /opt/kafka-0.11.0.2 +get_kafka 1.0.0 2.11 +chmod a+rw /opt/kafka-1.0.0 +get_kafka 1.0.1 2.11 +chmod a+rw /opt/kafka-1.0.1 # For EC2 nodes, we want to use /mnt, which should have the local disk. On local