docker-py/tests/unit/auth_test.py

799 строки
28 KiB
Python

import base64
import json
import os
import os.path
import random
import shutil
import tempfile
import unittest
from unittest import mock
import pytest
from docker import auth, credentials, errors
class RegressionTest(unittest.TestCase):
def test_803_urlsafe_encode(self):
auth_data = {
'username': 'root',
'password': 'GR?XGR?XGR?XGR?X'
}
encoded = auth.encode_header(auth_data)
assert b'/' not in encoded
assert b'_' in encoded
class ResolveRepositoryNameTest(unittest.TestCase):
def test_resolve_repository_name_hub_library_image(self):
assert auth.resolve_repository_name('image') == (
'docker.io', 'image'
)
def test_resolve_repository_name_dotted_hub_library_image(self):
assert auth.resolve_repository_name('image.valid') == (
'docker.io', 'image.valid'
)
def test_resolve_repository_name_hub_image(self):
assert auth.resolve_repository_name('username/image') == (
'docker.io', 'username/image'
)
def test_explicit_hub_index_library_image(self):
assert auth.resolve_repository_name('docker.io/image') == (
'docker.io', 'image'
)
def test_explicit_legacy_hub_index_library_image(self):
assert auth.resolve_repository_name('index.docker.io/image') == (
'docker.io', 'image'
)
def test_resolve_repository_name_private_registry(self):
assert auth.resolve_repository_name('my.registry.net/image') == (
'my.registry.net', 'image'
)
def test_resolve_repository_name_private_registry_with_port(self):
assert auth.resolve_repository_name('my.registry.net:5000/image') == (
'my.registry.net:5000', 'image'
)
def test_resolve_repository_name_private_registry_with_username(self):
assert auth.resolve_repository_name(
'my.registry.net/username/image'
) == ('my.registry.net', 'username/image')
def test_resolve_repository_name_no_dots_but_port(self):
assert auth.resolve_repository_name('hostname:5000/image') == (
'hostname:5000', 'image'
)
def test_resolve_repository_name_no_dots_but_port_and_username(self):
assert auth.resolve_repository_name(
'hostname:5000/username/image'
) == ('hostname:5000', 'username/image')
def test_resolve_repository_name_localhost(self):
assert auth.resolve_repository_name('localhost/image') == (
'localhost', 'image'
)
def test_resolve_repository_name_localhost_with_username(self):
assert auth.resolve_repository_name('localhost/username/image') == (
'localhost', 'username/image'
)
def test_invalid_index_name(self):
with pytest.raises(errors.InvalidRepository):
auth.resolve_repository_name('-gecko.com/image')
def encode_auth(auth_info):
return base64.b64encode(
auth_info.get('username', '').encode('utf-8') + b':' +
auth_info.get('password', '').encode('utf-8'))
class ResolveAuthTest(unittest.TestCase):
index_config = {'auth': encode_auth({'username': 'indexuser'})}
private_config = {'auth': encode_auth({'username': 'privateuser'})}
legacy_config = {'auth': encode_auth({'username': 'legacyauth'})}
auth_config = auth.AuthConfig({
'auths': auth.parse_auth({
'https://index.docker.io/v1/': index_config,
'my.registry.net': private_config,
'http://legacy.registry.url/v1/': legacy_config,
})
})
def test_resolve_authconfig_hostname_only(self):
assert auth.resolve_authconfig(
self.auth_config, 'my.registry.net'
)['username'] == 'privateuser'
def test_resolve_authconfig_no_protocol(self):
assert auth.resolve_authconfig(
self.auth_config, 'my.registry.net/v1/'
)['username'] == 'privateuser'
def test_resolve_authconfig_no_path(self):
assert auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net'
)['username'] == 'privateuser'
def test_resolve_authconfig_no_path_trailing_slash(self):
assert auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/'
)['username'] == 'privateuser'
def test_resolve_authconfig_no_path_wrong_secure_proto(self):
assert auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net'
)['username'] == 'privateuser'
def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
assert auth.resolve_authconfig(
self.auth_config, 'http://index.docker.io'
)['username'] == 'indexuser'
def test_resolve_authconfig_path_wrong_proto(self):
assert auth.resolve_authconfig(
self.auth_config, 'https://my.registry.net/v1/'
)['username'] == 'privateuser'
def test_resolve_authconfig_default_registry(self):
assert auth.resolve_authconfig(
self.auth_config
)['username'] == 'indexuser'
def test_resolve_authconfig_default_explicit_none(self):
assert auth.resolve_authconfig(
self.auth_config, None
)['username'] == 'indexuser'
def test_resolve_authconfig_fully_explicit(self):
assert auth.resolve_authconfig(
self.auth_config, 'http://my.registry.net/v1/'
)['username'] == 'privateuser'
def test_resolve_authconfig_legacy_config(self):
assert auth.resolve_authconfig(
self.auth_config, 'legacy.registry.url'
)['username'] == 'legacyauth'
def test_resolve_authconfig_no_match(self):
assert auth.resolve_authconfig(
self.auth_config, 'does.not.exist'
) is None
def test_resolve_registry_and_auth_library_image(self):
image = 'image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
)['username'] == 'indexuser'
def test_resolve_registry_and_auth_hub_image(self):
image = 'username/image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
)['username'] == 'indexuser'
def test_resolve_registry_and_auth_explicit_hub(self):
image = 'docker.io/username/image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
)['username'] == 'indexuser'
def test_resolve_registry_and_auth_explicit_legacy_hub(self):
image = 'index.docker.io/username/image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
)['username'] == 'indexuser'
def test_resolve_registry_and_auth_private_registry(self):
image = 'my.registry.net/image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
)['username'] == 'privateuser'
def test_resolve_registry_and_auth_unauthenticated_registry(self):
image = 'other.registry.net/image'
assert auth.resolve_authconfig(
self.auth_config, auth.resolve_repository_name(image)[0]
) is None
def test_resolve_auth_with_empty_credstore_and_auth_dict(self):
auth_config = auth.AuthConfig({
'auths': auth.parse_auth({
'https://index.docker.io/v1/': self.index_config,
}),
'credsStore': 'blackbox'
})
with mock.patch(
'docker.auth.AuthConfig._resolve_authconfig_credstore'
) as m:
m.return_value = None
assert 'indexuser' == auth.resolve_authconfig(
auth_config, None
)['username']
class LoadConfigTest(unittest.TestCase):
def test_load_config_no_file(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg = auth.load_config(folder)
assert cfg is not None
def test_load_legacy_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg_path = os.path.join(folder, '.dockercfg')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
with open(cfg_path, 'w') as f:
f.write(f'auth = {auth_}\n')
f.write('email = sakuya@scarlet.net')
cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None
assert cfg.auths[auth.INDEX_NAME] is not None
cfg = cfg.auths[auth.INDEX_NAME]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net'
assert cfg.get('Auth') is None
def test_load_json_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg_path = os.path.join(folder, '.dockercfg')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
email = 'sakuya@scarlet.net'
with open(cfg_path, 'w') as f:
json.dump(
{auth.INDEX_URL: {'auth': auth_, 'email': email}}, f
)
cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None
assert cfg.auths[auth.INDEX_URL] is not None
cfg = cfg.auths[auth.INDEX_URL]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == email
assert cfg.get('Auth') is None
def test_load_modern_json_config(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
cfg_path = os.path.join(folder, 'config.json')
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
email = 'sakuya@scarlet.net'
with open(cfg_path, 'w') as f:
json.dump({
'auths': {
auth.INDEX_URL: {
'auth': auth_, 'email': email
}
}
}, f)
cfg = auth.load_config(cfg_path)
assert auth.resolve_authconfig(cfg) is not None
assert cfg.auths[auth.INDEX_URL] is not None
cfg = cfg.auths[auth.INDEX_URL]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == email
def test_load_config_with_random_name(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(
folder,
f'.{random.randrange(100000)}.dockercfg',
)
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': f'{auth_}',
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path).auths
assert registry in cfg
assert cfg[registry] is not None
cfg = cfg[registry]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net'
assert cfg.get('auth') is None
def test_load_config_custom_config_env(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
registry: {
'auth': f'{auth_}',
'email': 'sakuya@scarlet.net'
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None).auths
assert registry in cfg
assert cfg[registry] is not None
cfg = cfg[registry]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net'
assert cfg.get('auth') is None
def test_load_config_custom_config_env_with_auths(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
config = {
'auths': {
registry: {
'auth': f'{auth_}',
'email': 'sakuya@scarlet.net'
}
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)
assert registry in cfg.auths
cfg = cfg.auths[registry]
assert cfg['username'] == 'sakuya'
assert cfg['password'] == 'izayoi'
assert cfg['email'] == 'sakuya@scarlet.net'
assert cfg.get('auth') is None
def test_load_config_custom_config_env_utf8(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
registry = 'https://your.private.registry.io'
auth_ = base64.b64encode(
b'sakuya\xc3\xa6:izayoi\xc3\xa6').decode('ascii')
config = {
'auths': {
registry: {
'auth': f'{auth_}',
'email': 'sakuya@scarlet.net'
}
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
cfg = auth.load_config(None)
assert registry in cfg.auths
cfg = cfg.auths[registry]
assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8')
assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8')
assert cfg['email'] == 'sakuya@scarlet.net'
assert cfg.get('auth') is None
def test_load_config_unknown_keys(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
config = {
'detachKeys': 'ctrl-q, ctrl-u, ctrl-i'
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
assert dict(cfg) == {'auths': {}}
def test_load_config_invalid_auth_dict(self):
folder = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
config = {
'auths': {
'scarlet.net': {'sakuya': 'izayoi'}
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
assert dict(cfg) == {'auths': {'scarlet.net': {}}}
def test_load_config_identity_token(self):
folder = tempfile.mkdtemp()
registry = 'scarlet.net'
token = '1ce1cebb-503e-7043-11aa-7feb8bd4a1ce'
self.addCleanup(shutil.rmtree, folder)
dockercfg_path = os.path.join(folder, 'config.json')
auth_entry = encode_auth({'username': 'sakuya'}).decode('ascii')
config = {
'auths': {
registry: {
'auth': auth_entry,
'identitytoken': token
}
}
}
with open(dockercfg_path, 'w') as f:
json.dump(config, f)
cfg = auth.load_config(dockercfg_path)
assert registry in cfg.auths
cfg = cfg.auths[registry]
assert 'IdentityToken' in cfg
assert cfg['IdentityToken'] == token
class CredstoreTest(unittest.TestCase):
def setUp(self):
self.authconfig = auth.AuthConfig({'credsStore': 'default'})
self.default_store = InMemoryStore('default')
self.authconfig._stores['default'] = self.default_store
self.default_store.store(
'https://gensokyo.jp/v2', 'sakuya', 'izayoi',
)
self.default_store.store(
'https://default.com/v2', 'user', 'hunter2',
)
def test_get_credential_store(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
'credsStore': 'blackbox',
})
assert auth_config.get_credential_store('registry1.io') == 'truesecret'
assert auth_config.get_credential_store('registry2.io') == 'powerlock'
assert auth_config.get_credential_store('registry3.io') == 'blackbox'
def test_get_credential_store_no_default(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
})
assert auth_config.get_credential_store('registry2.io') == 'powerlock'
assert auth_config.get_credential_store('registry3.io') is None
def test_get_credential_store_default_index(self):
auth_config = auth.AuthConfig({
'credHelpers': {
'https://index.docker.io/v1/': 'powerlock'
},
'credsStore': 'truesecret'
})
assert auth_config.get_credential_store(None) == 'powerlock'
assert auth_config.get_credential_store('docker.io') == 'powerlock'
assert auth_config.get_credential_store('images.io') == 'truesecret'
def test_get_credential_store_with_plain_dict(self):
auth_config = {
'credHelpers': {
'registry1.io': 'truesecret',
'registry2.io': 'powerlock'
},
'credsStore': 'blackbox',
}
assert auth.get_credential_store(
auth_config, 'registry1.io'
) == 'truesecret'
assert auth.get_credential_store(
auth_config, 'registry2.io'
) == 'powerlock'
assert auth.get_credential_store(
auth_config, 'registry3.io'
) == 'blackbox'
def test_get_all_credentials_credstore_only(self):
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_with_empty_credhelper(self):
self.authconfig['credHelpers'] = {
'registry1.io': 'truesecret',
}
self.authconfig._stores['truesecret'] = InMemoryStore()
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': None,
}
def test_get_all_credentials_with_credhelpers_only(self):
del self.authconfig['credsStore']
assert self.authconfig.get_all_credentials() == {}
self.authconfig['credHelpers'] = {
'https://gensokyo.jp/v2': 'default',
'https://default.com/v2': 'default',
}
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_with_auths_entries(self):
self.authconfig.add_auth('registry1.io', {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
},
}
def test_get_all_credentials_with_empty_auths_entry(self):
self.authconfig.add_auth('default.com', {})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_credstore_overrides_auth_entry(self):
self.authconfig.add_auth('default.com', {
'Username': 'shouldnotsee',
'Password': 'thisentry',
'ServerAddress': 'https://default.com/v2',
})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_helpers_override_default(self):
self.authconfig['credHelpers'] = {
'https://default.com/v2': 'truesecret',
}
truesecret = InMemoryStore('truesecret')
truesecret.store('https://default.com/v2', 'reimu', 'hakurei')
self.authconfig._stores['truesecret'] = truesecret
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'reimu',
'Password': 'hakurei',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'reimu',
'Password': 'hakurei',
'ServerAddress': 'https://default.com/v2',
},
}
def test_get_all_credentials_3_sources(self):
self.authconfig['credHelpers'] = {
'registry1.io': 'truesecret',
}
truesecret = InMemoryStore('truesecret')
truesecret.store('registry1.io', 'reimu', 'hakurei')
self.authconfig._stores['truesecret'] = truesecret
self.authconfig.add_auth('registry2.io', {
'ServerAddress': 'registry2.io',
'Username': 'reimu',
'Password': 'hakurei',
})
assert self.authconfig.get_all_credentials() == {
'https://gensokyo.jp/v2': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'gensokyo.jp': {
'Username': 'sakuya',
'Password': 'izayoi',
'ServerAddress': 'https://gensokyo.jp/v2',
},
'https://default.com/v2': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'default.com': {
'Username': 'user',
'Password': 'hunter2',
'ServerAddress': 'https://default.com/v2',
},
'registry1.io': {
'ServerAddress': 'registry1.io',
'Username': 'reimu',
'Password': 'hakurei',
},
'registry2.io': {
'ServerAddress': 'registry2.io',
'Username': 'reimu',
'Password': 'hakurei',
}
}
class InMemoryStore(credentials.Store):
def __init__(self, *args, **kwargs):
self.__store = {}
def get(self, server):
try:
return self.__store[server]
except KeyError as ke:
raise credentials.errors.CredentialsNotFound() from ke
def store(self, server, username, secret):
self.__store[server] = {
'ServerURL': server,
'Username': username,
'Secret': secret,
}
def list(self):
return {
k: v['Username'] for k, v in self.__store.items()
}
def erase(self, server):
del self.__store[server]