[AIRFLOW-987] pass kerberos cli args keytab and principal to kerberos.run() (#4238)
This commit is contained in:
Родитель
92abe32db0
Коммит
e947c6c034
|
@ -1334,12 +1334,12 @@ def kerberos(args):
|
|||
)
|
||||
|
||||
with ctx:
|
||||
airflow.security.kerberos.run()
|
||||
airflow.security.kerberos.run(principal=args.principal, keytab=args.keytab)
|
||||
|
||||
stdout.close()
|
||||
stderr.close()
|
||||
else:
|
||||
airflow.security.kerberos.run()
|
||||
airflow.security.kerberos.run(principal=args.principal, keytab=args.keytab)
|
||||
|
||||
|
||||
@cli_utils.action_logging
|
||||
|
@ -1678,8 +1678,7 @@ class CLIFactory(object):
|
|||
help="Delete a variable"),
|
||||
# kerberos
|
||||
'principal': Arg(
|
||||
("principal",), "kerberos principal",
|
||||
nargs='?', default=conf.get('kerberos', 'principal')),
|
||||
("principal",), "kerberos principal", nargs='?'),
|
||||
'keytab': Arg(
|
||||
("-kt", "--keytab"), "keytab",
|
||||
nargs='?', default=conf.get('kerberos', 'keytab')),
|
||||
|
|
|
@ -27,11 +27,13 @@ NEED_KRB181_WORKAROUND = None
|
|||
log = LoggingMixin().log
|
||||
|
||||
|
||||
def renew_from_kt():
|
||||
def renew_from_kt(principal, keytab):
|
||||
# The config is specified in seconds. But we ask for that same amount in
|
||||
# minutes to give ourselves a large renewal buffer.
|
||||
|
||||
renewal_lifetime = "%sm" % configuration.conf.getint('kerberos', 'reinit_frequency')
|
||||
principal = configuration.conf.get('kerberos', 'principal').replace(
|
||||
|
||||
cmd_principal = principal or configuration.conf.get('kerberos', 'principal').replace(
|
||||
"_HOST", socket.getfqdn()
|
||||
)
|
||||
|
||||
|
@ -39,9 +41,9 @@ def renew_from_kt():
|
|||
configuration.conf.get('kerberos', 'kinit_path'),
|
||||
"-r", renewal_lifetime,
|
||||
"-k", # host ticket
|
||||
"-t", configuration.conf.get('kerberos', 'keytab'), # specify keytab
|
||||
"-t", keytab, # specify keytab
|
||||
"-c", configuration.conf.get('kerberos', 'ccache'), # specify credentials cache
|
||||
principal
|
||||
cmd_principal
|
||||
]
|
||||
log.info("Reinitting kerberos from keytab: " + " ".join(cmdv))
|
||||
|
||||
|
@ -55,8 +57,8 @@ def renew_from_kt():
|
|||
if subp.returncode != 0:
|
||||
log.error("Couldn't reinit from keytab! `kinit' exited with %s.\n%s\n%s" % (
|
||||
subp.returncode,
|
||||
b"\n".join(subp.stdout.readlines()),
|
||||
b"\n".join(subp.stderr.readlines())))
|
||||
"\n".join(subp.stdout.readlines()),
|
||||
"\n".join(subp.stderr.readlines())))
|
||||
sys.exit(subp.returncode)
|
||||
|
||||
global NEED_KRB181_WORKAROUND
|
||||
|
@ -66,10 +68,10 @@ def renew_from_kt():
|
|||
# (From: HUE-640). Kerberos clock have seconds level granularity. Make sure we
|
||||
# renew the ticket after the initial valid time.
|
||||
time.sleep(1.5)
|
||||
perform_krb181_workaround()
|
||||
perform_krb181_workaround(principal)
|
||||
|
||||
|
||||
def perform_krb181_workaround():
|
||||
def perform_krb181_workaround(principal):
|
||||
cmdv = [configuration.conf.get('kerberos', 'kinit_path'),
|
||||
"-c", configuration.conf.get('kerberos', 'ccache'),
|
||||
"-R"] # Renew ticket_cache
|
||||
|
@ -80,10 +82,8 @@ def perform_krb181_workaround():
|
|||
ret = subprocess.call(cmdv, close_fds=True)
|
||||
|
||||
if ret != 0:
|
||||
principal = "%s/%s" % (
|
||||
configuration.conf.get('kerberos', 'principal'),
|
||||
socket.getfqdn()
|
||||
)
|
||||
principal = "%s/%s" % (principal or configuration.conf.get('kerberos', 'principal'),
|
||||
socket.getfqdn())
|
||||
fmt_dict = dict(princ=principal,
|
||||
ccache=configuration.conf.get('kerberos', 'principal'))
|
||||
log.error("Couldn't renew kerberos ticket in order to work around "
|
||||
|
@ -110,11 +110,11 @@ def detect_conf_var():
|
|||
return b'X-CACHECONF:' in f.read()
|
||||
|
||||
|
||||
def run():
|
||||
if configuration.conf.get('kerberos', 'keytab') is None:
|
||||
def run(principal, keytab):
|
||||
if not keytab:
|
||||
log.debug("Keytab renewer not starting, no keytab configured")
|
||||
sys.exit(0)
|
||||
|
||||
while True:
|
||||
renew_from_kt()
|
||||
renew_from_kt(principal, keytab)
|
||||
time.sleep(configuration.conf.getint('kerberos', 'reinit_frequency'))
|
||||
|
|
|
@ -18,10 +18,14 @@
|
|||
# under the License.
|
||||
|
||||
import os
|
||||
import unittest
|
||||
|
||||
try:
|
||||
import unittest2 as unittest # PY27
|
||||
except ImportError:
|
||||
import unittest
|
||||
from argparse import Namespace
|
||||
from airflow import configuration
|
||||
from airflow.security.kerberos import renew_from_kt
|
||||
from airflow import LoggingMixin
|
||||
|
||||
|
||||
@unittest.skipIf('KRB5_KTNAME' not in os.environ,
|
||||
|
@ -32,13 +36,34 @@ class KerberosTest(unittest.TestCase):
|
|||
|
||||
if not configuration.conf.has_section("kerberos"):
|
||||
configuration.conf.add_section("kerberos")
|
||||
|
||||
configuration.conf.set("kerberos",
|
||||
"keytab",
|
||||
configuration.conf.set("kerberos", "keytab",
|
||||
os.environ['KRB5_KTNAME'])
|
||||
keytab_from_cfg = configuration.conf.get("kerberos", "keytab")
|
||||
self.args = Namespace(keytab=keytab_from_cfg, principal=None, pid=None,
|
||||
daemon=None, stdout=None, stderr=None, log_file=None)
|
||||
|
||||
def test_renew_from_kt(self):
|
||||
"""
|
||||
We expect no result, but a successful run. No more TypeError
|
||||
"""
|
||||
self.assertIsNone(renew_from_kt())
|
||||
self.assertIsNone(renew_from_kt(principal=self.args.principal,
|
||||
keytab=self.args.keytab))
|
||||
|
||||
def test_args_from_cli(self):
|
||||
"""
|
||||
We expect no result, but a run with sys.exit(1) because keytab not exist.
|
||||
"""
|
||||
configuration.conf.set("kerberos", "keytab", "")
|
||||
self.args.keytab = "test_keytab"
|
||||
|
||||
with self.assertRaises(SystemExit) as se:
|
||||
renew_from_kt(principal=self.args.principal,
|
||||
keytab=self.args.keytab)
|
||||
|
||||
with self.assertLogs(LoggingMixin().log) as log:
|
||||
self.assertIn(
|
||||
'kinit: krb5_init_creds_set_keytab: Failed to find '
|
||||
'airflow@LUPUS.GRIDDYNAMICS.NET in keytab FILE:{} '
|
||||
'(unknown enctype)'.format(self.args.keytab), log.output)
|
||||
|
||||
self.assertEqual(se.exception.code, 1)
|
||||
|
|
Загрузка…
Ссылка в новой задаче