MINOR: Streams system tests fixes/updates (#4689)

Some changes required to get the Streams system tests working via Docker

To test:

TC_PATHS="tests/kafkatest/tests/streams" bash tests/docker/run_tests.sh

That command will take about 3.5 hours, and should pass. Note there are a couple of ignored tests.

Reviewers: Guozhang Wang <wangguoz@gmail.com>, Bill Bejeck <bill@confluent.io>
This commit is contained in:
John Roesler 2018-03-15 14:42:43 -07:00 коммит произвёл Guozhang Wang
Родитель a6fad27372
Коммит 7006d0f58b
7 изменённых файлов: 101 добавлений и 74 удалений

4
.gitignore поставляемый
Просмотреть файл

@ -47,3 +47,7 @@ tests/venv
docs/generated/
.release-settings.json
kafkatest.egg-info/
systest/

Просмотреть файл

@ -40,12 +40,23 @@ RUN ssh-keygen -q -t rsa -N '' -f /root/.ssh/id_rsa && cp -f /root/.ssh/id_rsa.p
# Install binary test dependencies.
ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
RUN mkdir -p "/opt/kafka-0.8.2.2" && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
RUN mkdir -p "/opt/kafka-0.8.2.2" && chmod a+rw /opt/kafka-0.8.2.2 && curl -s "$KAFKA_MIRROR/kafka_2.10-0.8.2.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.8.2.2"
RUN mkdir -p "/opt/kafka-0.9.0.1" && chmod a+rw /opt/kafka-0.9.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.9.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.9.0.1"
RUN mkdir -p "/opt/kafka-0.10.0.1" && chmod a+rw /opt/kafka-0.10.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.0.1"
RUN mkdir -p "/opt/kafka-0.10.1.1" && chmod a+rw /opt/kafka-0.10.1.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.1.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.1.1"
RUN mkdir -p "/opt/kafka-0.10.2.1" && chmod a+rw /opt/kafka-0.10.2.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.10.2.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.10.2.1"
RUN mkdir -p "/opt/kafka-0.11.0.0" && chmod a+rw /opt/kafka-0.11.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.0"
RUN mkdir -p "/opt/kafka-0.11.0.2" && chmod a+rw /opt/kafka-0.11.0.2 && curl -s "$KAFKA_MIRROR/kafka_2.11-0.11.0.2.tgz" | tar xz --strip-components=1 -C "/opt/kafka-0.11.0.2"
RUN mkdir -p "/opt/kafka-1.0.0" && chmod a+rw /opt/kafka-1.0.0 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.0"
RUN mkdir -p "/opt/kafka-1.0.1" && chmod a+rw /opt/kafka-1.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.11-1.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-1.0.1"
# Streams test dependencies
RUN curl -s "$KAFKA_MIRROR/kafka-streams-0.10.1.1-test.jar" -o /opt/kafka-0.10.1.1/libs/kafka-streams-0.10.1.1-test.jar && \
curl -s "$KAFKA_MIRROR/kafka-streams-0.10.2.1-test.jar" -o /opt/kafka-0.10.2.1/libs/kafka-streams-0.10.2.1-test.jar && \
curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.0-test.jar" -o /opt/kafka-0.11.0.0/libs/kafka-streams-0.11.0.0-test.jar && \
curl -s "$KAFKA_MIRROR/kafka-streams-0.11.0.2-test.jar" -o /opt/kafka-0.11.0.2/libs/kafka-streams-0.11.0.2-test.jar && \
curl -s "$KAFKA_MIRROR/kafka-streams-1.0.0-test.jar" -o /opt/kafka-1.0.0/libs/kafka-streams-1.0.0-test.jar && \
curl -s "$KAFKA_MIRROR/kafka-streams-1.0.1-test.jar" -o /opt/kafka-1.0.1/libs/kafka-streams-1.0.1-test.jar
# The version of Kibosh to use for testing.
# If you update this, also update vagrant/base.sy

Просмотреть файл

@ -123,6 +123,7 @@ class StreamsTestBaseService(KafkaPathResolverMixin, Service):
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd
@ -163,7 +164,7 @@ class StreamsEosTestBaseService(StreamsTestBaseService):
def clean_node(self, node):
if self.clean_node_enabled:
super.clean_node(self, node)
super(StreamsEosTestBaseService, self).clean_node(node)
class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
@ -233,7 +234,7 @@ class StreamsBrokerDownResilienceService(StreamsTestBaseService):
"INCLUDE_TEST_JARS=true %(kafka_run_class)s %(streams_class_name)s " \
" %(kafka)s %(state_dir)s %(user_test_args)s %(user_test_args1)s %(user_test_args2)s" \
" %(user_test_args3)s & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
self.logger.info("Executing: " + cmd)
return cmd

Просмотреть файл

@ -15,13 +15,14 @@
from ducktape.mark.resource import cluster
from ducktape.tests.test import Test
from ducktape.mark import parametrize, ignore
from ducktape.mark import matrix
from kafkatest.services.kafka import KafkaService
from kafkatest.services.zookeeper import ZookeeperService
from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService
from kafkatest.version import LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, DEV_BRANCH, KafkaVersion
from kafkatest.version import LATEST_0_10_2, LATEST_0_11, LATEST_1_0, DEV_BRANCH, KafkaVersion
import time
upgrade_versions = [str(LATEST_0_10_2), str(LATEST_0_11), str(LATEST_1_0), str(DEV_BRANCH)]
class StreamsUpgradeTest(Test):
"""
@ -55,7 +56,7 @@ class StreamsUpgradeTest(Test):
'tagg' : { 'partitions': self.partitions, 'replication-factor': self.replication,
'configs': {"min.insync.replicas": self.isr} }
}
def perform_streams_upgrade(self, to_version):
self.logger.info("First pass bounce - rolling streams upgrade")
@ -76,14 +77,8 @@ class StreamsUpgradeTest(Test):
node.version = KafkaVersion(to_version)
self.kafka.start_node(node)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_1), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
@parametrize(from_version=str(LATEST_0_10_1), to_version=str(LATEST_0_11_0))
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(LATEST_0_11_0))
@parametrize(from_version=str(LATEST_0_11_0), to_version=str(LATEST_0_10_2))
@parametrize(from_version=str(DEV_BRANCH), to_version=str(LATEST_0_10_2))
@matrix(from_version=upgrade_versions, to_version=upgrade_versions)
def test_upgrade_downgrade_streams(self, from_version, to_version):
"""
Start a smoke test client, then abort (kill -9) and restart it a few times.
@ -94,74 +89,79 @@ class StreamsUpgradeTest(Test):
(search for get_kafka()). For streams in particular, that means that someone has manually
copies the kafka-stream-$version-test.jar in the right S3 bucket as shown in base.sh.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
if from_version != to_version:
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
time.sleep(10)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
self.driver.start()
self.processor1.start()
time.sleep(15)
# allow some time for topics to be created
time.sleep(10)
self.perform_streams_upgrade(to_version)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.node.version = KafkaVersion(from_version)
self.driver.start()
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.processor1.node.version = KafkaVersion(from_version)
self.processor1.start()
self.processor1.stop()
time.sleep(15)
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
self.perform_streams_upgrade(to_version)
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1.stop()
self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
@cluster(num_nodes=6)
@parametrize(from_version=str(LATEST_0_10_2), to_version=str(DEV_BRANCH))
@matrix(from_version=upgrade_versions, to_version=upgrade_versions)
def test_upgrade_brokers(self, from_version, to_version):
"""
Start a smoke test client then perform rolling upgrades on the broker.
Start a smoke test client then perform rolling upgrades on the broker.
"""
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
if from_version != to_version:
# Setup phase
self.zk = ZookeeperService(self.test_context, num_nodes=1)
self.zk.start()
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
# allow some time for topics to be created
time.sleep(10)
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
# number of nodes needs to be >= 3 for the smoke test
self.kafka = KafkaService(self.test_context, num_nodes=3,
zk=self.zk, version=KafkaVersion(from_version), topics=self.topics)
self.kafka.start()
self.driver.start()
self.processor1.start()
time.sleep(15)
# allow some time for topics to be created
time.sleep(10)
self.perform_broker_upgrade(to_version)
# use the current (dev) version driver
self.driver = StreamsSmokeTestDriverService(self.test_context, self.kafka)
self.driver.node.version = KafkaVersion(from_version)
self.driver.start()
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1 = StreamsSmokeTestJobRunnerService(self.test_context, self.kafka)
self.processor1.node.version = KafkaVersion(from_version)
self.processor1.start()
self.processor1.stop()
time.sleep(15)
node = self.driver.node
node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)
self.perform_broker_upgrade(to_version)
time.sleep(15)
self.driver.wait()
self.driver.stop()
self.processor1.stop()
self.driver.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % self.driver.STDOUT_FILE, allow_fail=False)
self.processor1.node.account.ssh_capture("grep SMOKE-TEST-CLIENT-CLOSED %s" % self.processor1.STDOUT_FILE, allow_fail=False)

Просмотреть файл

@ -91,5 +91,12 @@ LATEST_0_10 = LATEST_0_10_2
# 0.11.0.0 versions
V_0_11_0_0 = KafkaVersion("0.11.0.0")
LATEST_0_11_0 = V_0_11_0_0
V_0_11_0_1 = KafkaVersion("0.11.0.1")
V_0_11_0_2 = KafkaVersion("0.11.0.2")
LATEST_0_11_0 = V_0_11_0_2
LATEST_0_11 = LATEST_0_11_0
# 1.0.x versions
V_1_0_0 = KafkaVersion("1.0.0")
V_1_0_1 = KafkaVersion("1.0.1")
LATEST_1_0 = V_1_0_1

Просмотреть файл

@ -38,7 +38,7 @@ class PyTest(TestCommand):
def run_tests(self):
# import here, cause outside the eggs aren't loaded
import pytest
print self.pytest_args
print(self.pytest_args)
errno = pytest.main(self.pytest_args)
sys.exit(errno)

Просмотреть файл

@ -110,8 +110,12 @@ get_kafka 0.10.1.1 2.11
chmod a+rw /opt/kafka-0.10.1.1
get_kafka 0.10.2.1 2.11
chmod a+rw /opt/kafka-0.10.2.1
get_kafka 0.11.0.0 2.11
chmod a+rw /opt/kafka-0.11.0.0
get_kafka 0.11.0.2 2.11
chmod a+rw /opt/kafka-0.11.0.2
get_kafka 1.0.0 2.11
chmod a+rw /opt/kafka-1.0.0
get_kafka 1.0.1 2.11
chmod a+rw /opt/kafka-1.0.1
# For EC2 nodes, we want to use /mnt, which should have the local disk. On local