KAFKA-2693: Ducktape tests for SASL/PLAIN and multiple mechanisms

Run a sanity test with SASL/PLAIN and a couple of replication tests with SASL/PLAIN and multiple mechanisms.

Author: Rajini Sivaram <rajinisivaram@googlemail.com>

Reviewers: Ismael Juma <ismael@juma.me.uk>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1282 from rajinisivaram/KAFKA-2693
This commit is contained in:
Rajini Sivaram 2016-04-29 09:41:12 -07:00 коммит произвёл Ewen Cheslack-Postava
Родитель 4ab4e4af81
Коммит cea01af125
6 изменённых файлов: 97 добавлений и 64 удалений

Просмотреть файл

@ -45,11 +45,14 @@ class ConsoleConsumerTest(Test):
self.zk.start()
@parametrize(security_protocol='PLAINTEXT', new_consumer=False)
@parametrize(security_protocol='SASL_SSL', sasl_mechanism='PLAIN')
@matrix(security_protocol=['PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL'])
def test_lifecycle(self, security_protocol, new_consumer=True):
def test_lifecycle(self, security_protocol, new_consumer=True, sasl_mechanism='GSSAPI'):
"""Check that console consumer starts/stops properly, and that we are capturing log output."""
self.kafka.security_protocol = security_protocol
self.kafka.client_sasl_mechanism = sasl_mechanism
self.kafka.interbroker_sasl_mechanism = sasl_mechanism
self.kafka.start()
self.consumer.security_protocol = security_protocol

Просмотреть файл

@ -63,7 +63,8 @@ class KafkaService(JmxMixin, Service):
}
def __init__(self, context, num_nodes, zk, security_protocol=SecurityConfig.PLAINTEXT, interbroker_security_protocol=SecurityConfig.PLAINTEXT,
sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
client_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SecurityConfig.SASL_MECHANISM_GSSAPI,
authorizer_class_name=None, topics=None, version=TRUNK, quota_config=None, jmx_object_names=None,
jmx_attributes=[], zk_connect_timeout=5000):
"""
:type context
@ -78,7 +79,8 @@ class KafkaService(JmxMixin, Service):
self.security_protocol = security_protocol
self.interbroker_security_protocol = interbroker_security_protocol
self.sasl_mechanism = sasl_mechanism
self.client_sasl_mechanism = client_sasl_mechanism
self.interbroker_sasl_mechanism = interbroker_sasl_mechanism
self.topics = topics
self.minikdc = None
self.authorizer_class_name = authorizer_class_name
@ -108,7 +110,9 @@ class KafkaService(JmxMixin, Service):
@property
def security_config(self):
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol, zk_sasl = self.zk.zk_sasl , sasl_mechanism=self.sasl_mechanism)
return SecurityConfig(self.security_protocol, self.interbroker_security_protocol,
zk_sasl = self.zk.zk_sasl,
client_sasl_mechanism=self.client_sasl_mechanism, interbroker_sasl_mechanism=self.interbroker_sasl_mechanism)
def open_port(self, protocol):
self.port_mappings[protocol] = self.port_mappings[protocol]._replace(open=True)
@ -163,9 +167,7 @@ class KafkaService(JmxMixin, Service):
# TODO - clean up duplicate configuration logic
prop_file = cfg.render()
prop_file += self.render('kafka.properties', node=node, broker_id=self.idx(node),
security_config=self.security_config,
interbroker_security_protocol=self.interbroker_security_protocol,
sasl_mechanism=self.sasl_mechanism)
security_config=self.security_config)
return prop_file
def start_cmd(self, node):

Просмотреть файл

@ -50,7 +50,7 @@ quota.producer.bytes.per.second.overrides={{ quota_config.quota_producer_bytes_p
quota.consumer.bytes.per.second.overrides={{ quota_config.quota_consumer_bytes_per_second_overrides }}
{% endif %}
security.inter.broker.protocol={{ interbroker_security_protocol }}
security.inter.broker.protocol={{ security_config.interbroker_security_protocol }}
ssl.keystore.location=/mnt/security/test.keystore.jks
ssl.keystore.password=test-ks-passwd
@ -59,7 +59,8 @@ ssl.keystore.type=JKS
ssl.truststore.location=/mnt/security/test.truststore.jks
ssl.truststore.password=test-ts-passwd
ssl.truststore.type=JKS
sasl.mechanism={{ sasl_mechanism }}
sasl.mechanism.inter.broker.protocol={{ security_config.interbroker_sasl_mechanism }}
sasl.enabled.mechanisms={{ ",".join(security_config.enabled_sasl_mechanisms) }}
sasl.kerberos.service.name=kafka
{% if authorizer_class_name is not none %}
ssl.client.auth=required

Просмотреть файл

@ -75,7 +75,9 @@ class SecurityConfig(TemplateRenderer):
ssl_stores = Keytool.generate_keystore_truststore('.')
def __init__(self, security_protocol=None, interbroker_security_protocol=None, sasl_mechanism=SASL_MECHANISM_GSSAPI, zk_sasl=False, template_props=""):
def __init__(self, security_protocol=None, interbroker_security_protocol=None,
client_sasl_mechanism=SASL_MECHANISM_GSSAPI, interbroker_sasl_mechanism=SASL_MECHANISM_GSSAPI,
zk_sasl=False, template_props=""):
"""
Initialize the security properties for the node and copy
keystore and truststore to the remote node if the transport protocol
@ -104,13 +106,14 @@ class SecurityConfig(TemplateRenderer):
'ssl.key.password' : SecurityConfig.ssl_stores['ssl.key.password'],
'ssl.truststore.location' : SecurityConfig.TRUSTSTORE_PATH,
'ssl.truststore.password' : SecurityConfig.ssl_stores['ssl.truststore.password'],
'sasl.mechanism' : sasl_mechanism,
'sasl.mechanism' : client_sasl_mechanism,
'sasl.mechanism.inter.broker.protocol' : interbroker_sasl_mechanism,
'sasl.kerberos.service.name' : 'kafka'
}
def client_config(self, template_props=""):
return SecurityConfig(self.security_protocol, sasl_mechanism=self.sasl_mechanism, template_props=template_props)
return SecurityConfig(self.security_protocol, client_sasl_mechanism=self.client_sasl_mechanism, template_props=template_props)
def setup_node(self, node):
if self.has_ssl:
@ -120,13 +123,15 @@ class SecurityConfig(TemplateRenderer):
if self.has_sasl:
node.account.ssh("mkdir -p %s" % SecurityConfig.CONFIG_DIR, allow_fail=False)
jaas_conf_file = self.sasl_mechanism.lower() + "_jaas.conf"
jaas_conf_file = "jaas.conf"
java_version = node.account.ssh_capture("java -version")
if any('IBM' in line for line in java_version):
is_ibm_jdk = True
else:
is_ibm_jdk = False
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk)
jaas_conf = self.render(jaas_conf_file, node=node, is_ibm_jdk=is_ibm_jdk,
client_sasl_mechanism=self.client_sasl_mechanism,
enabled_sasl_mechanisms=self.enabled_sasl_mechanisms)
node.account.create_file(SecurityConfig.JAAS_CONF_PATH, jaas_conf)
if self.has_sasl_kerberos:
node.account.scp_to(MiniKdc.LOCAL_KEYTAB_FILE, SecurityConfig.KEYTAB_PATH)
@ -159,12 +164,20 @@ class SecurityConfig(TemplateRenderer):
return self.properties['security.protocol']
@property
def sasl_mechanism(self):
def client_sasl_mechanism(self):
return self.properties['sasl.mechanism']
@property
def interbroker_sasl_mechanism(self):
return self.properties['sasl.mechanism.inter.broker.protocol']
@property
def enabled_sasl_mechanisms(self):
return set([self.client_sasl_mechanism, self.interbroker_sasl_mechanism])
@property
def has_sasl_kerberos(self):
return self.has_sasl and self.sasl_mechanism == SecurityConfig.SASL_MECHANISM_GSSAPI
return self.has_sasl and (SecurityConfig.SASL_MECHANISM_GSSAPI in self.enabled_sasl_mechanisms)
@property
def kafka_opts(self):

Просмотреть файл

@ -11,76 +11,85 @@
* specific language governing permissions and limitations under the License.
*/
{% if is_ibm_jdk %}
KafkaClient {
{% if client_sasl_mechanism == "GSSAPI" %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="client@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="client@EXAMPLE.COM";
{% endif %}
{% elif client_sasl_mechanism == "PLAIN" %}
org.apache.kafka.common.security.plain.PlainLoginModule required
username="client"
password="client-secret";
{% endif %}
};
KafkaServer {
{% if "GSSAPI" in enabled_sasl_mechanisms %}
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
{% endif %}
{% endif %}
{% if "PLAIN" in enabled_sasl_mechanisms %}
org.apache.kafka.common.security.plain.PlainLoginModule required
username="kafka"
password="kafka-secret"
user_client="client-secret"
user_kafka="kafka-secret";
{% endif %}
};
{% if zk_sasl %}
Client {
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeytab="file:/mnt/security/keytab"
principal="zkclient@EXAMPLE.COM";
};
Server {
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeyTab="file:/mnt/security/keytab"
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% endif %}
{% else %}
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="client@EXAMPLE.COM";
};
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required debug=false
doNotPrompt=true
useKeyTab=true
storeKey=true
keyTab="/mnt/security/keytab"
principal="kafka/{{ node.account.hostname }}@EXAMPLE.COM";
};
{% if zk_sasl %}
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zkclient@EXAMPLE.COM";
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zkclient@EXAMPLE.COM";
{% endif %}
};
Server {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
{% if is_ibm_jdk %}
com.ibm.security.auth.module.Krb5LoginModule required debug=false
credsType=both
useKeyTab="file:/mnt/security/keytab"
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
{% else %}
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/mnt/security/keytab"
storeKey=true
useTicketCache=false
principal="zookeeper/{{ node.account.hostname }}@EXAMPLE.COM";
{% endif %}
};
{% endif %}
{% endif %}

Просмотреть файл

@ -128,7 +128,10 @@ class ReplicationTest(ProduceConsumeValidateTest):
@matrix(failure_mode=["clean_shutdown", "hard_shutdown", "clean_bounce", "hard_bounce"],
broker_type=["controller"],
security_protocol=["PLAINTEXT", "SASL_SSL"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type):
@matrix(failure_mode=["hard_bounce"],
broker_type=["leader"],
security_protocol=["SASL_SSL"], client_sasl_mechanism=["PLAIN"], interbroker_sasl_mechanism=["PLAIN", "GSSAPI"])
def test_replication_with_broker_failure(self, failure_mode, security_protocol, broker_type, client_sasl_mechanism="GSSAPI", interbroker_sasl_mechanism="GSSAPI"):
"""Replication tests.
These tests verify that replication provides simple durability guarantees by checking that data acked by
brokers is still available for consumption in the face of various failure scenarios.
@ -144,6 +147,8 @@ class ReplicationTest(ProduceConsumeValidateTest):
self.kafka.security_protocol = security_protocol
self.kafka.interbroker_security_protocol = security_protocol
self.kafka.client_sasl_mechanism = client_sasl_mechanism
self.kafka.interbroker_sasl_mechanism = interbroker_sasl_mechanism
new_consumer = False if self.kafka.security_protocol == "PLAINTEXT" else True
self.producer = VerifiableProducer(self.test_context, self.num_producers, self.kafka, self.topic, throughput=self.producer_throughput)
self.consumer = ConsoleConsumer(self.test_context, self.num_consumers, self.kafka, self.topic, new_consumer=new_consumer, consumer_timeout_ms=60000, message_validator=is_int)