Refactor direct config accesses in crypto

- Refactor os path calls to pathlib
This commit is contained in:
Fred Park 2016-11-12 09:13:09 -08:00
Родитель 285f86ae9b
Коммит 6e20e1b512
2 изменённых файлов: 115 добавлений и 61 удалений

Просмотреть файл

@ -34,7 +34,6 @@ import base64
import collections
import getpass
import logging
import os
try:
import pathlib2 as pathlib
except ImportError:
@ -71,31 +70,26 @@ def generate_ssh_keypair(export_path):
:rtype: tuple
:return: (private key filename, public key filename)
"""
privkey = str(pathlib.Path(export_path, _SSH_KEY_PREFIX))
pubkey = str(pathlib.Path(export_path, _SSH_KEY_PREFIX + '.pub'))
try:
if os.path.exists(privkey):
old = privkey + '.old'
if os.path.exists(old):
os.unlink(old)
os.rename(privkey, old)
except OSError:
pass
try:
if os.path.exists(pubkey):
old = pubkey + '.old'
if os.path.exists(old):
os.unlink(old)
os.rename(pubkey, old)
except OSError:
pass
privkey = pathlib.Path(export_path, _SSH_KEY_PREFIX)
pubkey = pathlib.Path(export_path, _SSH_KEY_PREFIX + '.pub')
if privkey.exists():
old = pathlib.Path(privkey + '.old')
if old.exists():
old.unlink()
privkey.rename(old)
if pubkey.exists():
old = pathlib.Path(pubkey + '.old')
if old.exists():
old.unlink()
pubkey.rename(old)
logger.info('generating ssh key pair to path: {}'.format(export_path))
sprivkey = str(privkey)
subprocess.check_call(
['ssh-keygen', '-f', privkey, '-t', 'rsa', '-N', ''''''])
return (privkey, pubkey)
['ssh-keygen', '-f', sprivkey, '-t', 'rsa', '-N', ''''''])
return (sprivkey, str(pubkey))
def derive_pem_from_pfx(pfxfile, passphrase=None, pemfile=None):
def derive_private_key_pem_from_pfx(pfxfile, passphrase=None, pemfile=None):
# type: (str, str, str) -> str
"""Derive a private key pem file from a pfx
:param str pfxfile: pfx file
@ -114,12 +108,52 @@ def derive_pem_from_pfx(pfxfile, passphrase=None, pemfile=None):
f.close()
pemfile = f.name
try:
# create pem from pfx
subprocess.check_call(
['openssl', 'pkcs12', '-nodes', '-in', pfxfile, '-out',
pemfile, '-password', 'pass:' + passphrase]
)
except Exception:
os.unlink(pemfile)
fp = pathlib.Path(pemfile)
if fp.exists():
fp.unlink()
pemfile = None
return pemfile
def derive_public_key_pem_from_pfx(pfxfile, passphrase=None, pemfile=None):
# type: (str, str, str) -> str
"""Derive a public key pem file from a pfx
:param str pfxfile: pfx file
:param str passphrase: passphrase for pfx
:param str pemfile: path of pem file to write to
:rtype: str
:return: path of pem file
"""
if pfxfile is None:
raise ValueError('pfx file is invalid')
if passphrase is None:
passphrase = getpass.getpass('Enter password for PFX: ')
# convert pfx to pem
if pemfile is None:
f = tempfile.NamedTemporaryFile(mode='wb', delete=False)
f.close()
pemfile = f.name
try:
# create pem from pfx
subprocess.check_call(
['openssl', 'pkcs12', '-nodes', '-in', pfxfile, '-out',
pemfile, '-password', 'pass:' + passphrase]
)
# extract public key from private key
subprocess.check_call(
['openssl', 'rsa', '-in', pemfile, '-pubout', '-outform',
'PEM', '-out', pemfile]
)
except Exception:
fp = pathlib.Path(pemfile)
if fp.exists():
fp.unlink()
pemfile = None
return pemfile
@ -161,25 +195,15 @@ def generate_pem_pfx_certificates(config):
:return: sha1 thumbprint of pfx
"""
# gather input
try:
pemfile = config['batch_shipyard']['encryption']['public_key_pem']
except KeyError:
pemfile = None
try:
pfxfile = config['batch_shipyard']['encryption']['pfx']['filename']
except KeyError:
pfxfile = None
try:
passphrase = config['batch_shipyard']['encryption']['pfx'][
'passphrase']
except KeyError:
passphrase = None
pemfile = settings.batch_shipyard_encryption_public_key_pem(config)
pfxfile = settings.batch_shipyard_encryption_pfx_filename(config)
passphrase = settings.batch_shipyard_encryption_pfx_passphrase(config)
if pemfile is None:
pemfile = util.get_input('Enter public key PEM filename to create: ')
if pfxfile is None:
pfxfile = util.get_input('Enter PFX filename to create: ')
if passphrase is None:
while passphrase is None or len(passphrase) == 0:
while util.is_none_or_empty(passphrase):
passphrase = getpass.getpass('Enter password for PFX: ')
if len(passphrase) == 0:
print('passphrase cannot be empty')
@ -208,12 +232,13 @@ def generate_pem_pfx_certificates(config):
logger.debug('created PFX file: {}'.format(pfxfile))
finally:
# remove rsa private key file
try:
os.unlink(privatekey)
except OSError:
pass
fp = pathlib.Path(privatekey)
if fp.exists():
fp.unlink()
# remove temp cert pem
os.unlink(f.name)
fp = pathlib.Path(f.name)
if fp.exists():
fp.unlink()
# get sha1 thumbprint of pfx
return get_sha1_thumbprint_pfx(pfxfile, passphrase)
@ -248,18 +273,35 @@ def _rsa_encrypt_string(data, config):
:rtype: str
:return: base64-encoded cipher text
"""
if data is None or len(data) == 0:
if util.is_none_or_empty(data):
raise ValueError('invalid data to encrypt')
inkey = settings.batch_shipyard_encryption_public_key_pem(config)
derived = False
if inkey is None:
# derive pem from pfx
derived = True
pfxfile = settings.batch_shipyard_encryption_pfx_filename(config)
pfx_passphrase = settings.batch_shipyard_encryption_pfx_passphrase(
config)
inkey = derive_public_key_pem_from_pfx(pfxfile, pfx_passphrase, None)
try:
inkey = config['batch_shipyard']['encryption']['public_key_pem']
except KeyError:
pass
proc = subprocess.Popen(
['openssl', 'rsautl', '-encrypt', '-pubin', '-inkey', inkey],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
ciphertext = util.base64_encode_string(
proc.communicate(input=util.encode_string(data))[0])
return ciphertext
if inkey is None:
raise RuntimeError('public encryption key is invalid')
proc = subprocess.Popen(
['openssl', 'rsautl', '-encrypt', '-pubin', '-inkey', inkey],
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
ciphertext = util.base64_encode_string(
proc.communicate(input=util.encode_string(data))[0])
if proc.returncode != 0:
raise RuntimeError(
'openssl encryption failed with returncode: {}'.format(
proc.returncode))
return ciphertext
finally:
if derived:
fp = pathlib.Path(inkey)
if fp.exists():
fp.unlink()
def _rsa_decrypt_string_with_pfx(ciphertext, config):
@ -270,15 +312,11 @@ def _rsa_decrypt_string_with_pfx(ciphertext, config):
:rtype: str
:return: decrypted cipher text
"""
if ciphertext is None or len(ciphertext) == 0:
if util.is_none_or_empty(ciphertext):
raise ValueError('invalid ciphertext to decrypt')
pfxfile = config['batch_shipyard']['encryption']['pfx']['filename']
try:
pfx_passphrase = config['batch_shipyard']['encryption']['pfx'][
'passphrase']
except KeyError:
pfx_passphrase = None
pemfile = derive_pem_from_pfx(pfxfile, pfx_passphrase, None)
pfxfile = settings.batch_shipyard_encryption_pfx_filename(config)
pfx_passphrase = settings.batch_shipyard_encryption_pfx_passphrase(config)
pemfile = derive_private_key_pem_from_pfx(pfxfile, pfx_passphrase, None)
if pemfile is None:
raise RuntimeError('cannot decrypt without valid private key')
cleartext = None
@ -289,7 +327,9 @@ def _rsa_decrypt_string_with_pfx(ciphertext, config):
stdin=subprocess.PIPE, stdout=subprocess.PIPE)
cleartext = proc.communicate(input=data)[0]
finally:
os.unlink(pemfile)
fp = pathlib.Path(pemfile)
if fp.exists():
fp.unlink()
return cleartext

Просмотреть файл

@ -566,6 +566,20 @@ def set_batch_shipyard_encryption_pfx_sha1_thumbprint(config, tp):
config['batch_shipyard']['encryption']['pfx']['sha1_thumbprint'] = tp
def batch_shipyard_encryption_public_key_pem(config):
# type: (dict) -> str
"""Get filename of pem public key
:param dict config: configuration object
:rtype: str
:return: pem filename
"""
try:
pem = config['batch_shipyard']['encryption']['public_key_pem']
except KeyError:
pem = None
return pem
def docker_registry_private_allow_public_pull(config):
# type: (dict) -> bool
"""Get public docker hub pull on missing setting