зеркало из https://github.com/github/putty.git
New test script 'agenttest.py' for testing Pageant.
Well, actually, two new test programs. agenttest.py is the actual
test; it depends on agenttestgen.py which generates a collection of
test private keys, using the newly exposed testcrypt interface to our
key generation code.
In this commit I've also factored out some Python SSH marshalling code
from cryptsuite, and moved it into a module ssh.py which the agent
tests can reuse.
(cherry picked from commit 8c7b0a787f
)
This commit is contained in:
Родитель
3b1f458a0d
Коммит
7ccc368a57
|
@ -0,0 +1,252 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
import sys
|
||||
import os
|
||||
import socket
|
||||
import base64
|
||||
import itertools
|
||||
import collections
|
||||
|
||||
from ssh import *
|
||||
import agenttestdata
|
||||
|
||||
test_session_id = b'Test16ByteSessId'
|
||||
assert len(test_session_id) == 16
|
||||
test_message_to_sign = b'test message to sign'
|
||||
|
||||
TestSig2 = collections.namedtuple("TestSig2", "flags sig")
|
||||
|
||||
class Key2(collections.namedtuple("Key2", "comment public sigs openssh")):
|
||||
def public_only(self):
|
||||
return Key2(self.comment, self.public, None, None)
|
||||
|
||||
def Add(self):
|
||||
alg = ssh_decode_string(self.public)
|
||||
msg = (ssh_byte(SSH2_AGENTC_ADD_IDENTITY) +
|
||||
ssh_string(alg) +
|
||||
self.openssh +
|
||||
ssh_string(self.comment))
|
||||
return agent_query(msg)
|
||||
|
||||
verb = "sign"
|
||||
def Use(self, flags):
|
||||
msg = (ssh_byte(SSH2_AGENTC_SIGN_REQUEST) +
|
||||
ssh_string(self.public) +
|
||||
ssh_string(test_message_to_sign))
|
||||
if flags is not None:
|
||||
msg += ssh_uint32(flags)
|
||||
rsp = agent_query(msg)
|
||||
t, rsp = ssh_decode_byte(rsp, True)
|
||||
assert t == SSH2_AGENT_SIGN_RESPONSE
|
||||
sig, rsp = ssh_decode_string(rsp, True)
|
||||
assert len(rsp) == 0
|
||||
return sig
|
||||
|
||||
def Del(self):
|
||||
msg = (ssh_byte(SSH2_AGENTC_REMOVE_IDENTITY) +
|
||||
ssh_string(self.public))
|
||||
return agent_query(msg)
|
||||
|
||||
@staticmethod
|
||||
def DelAll():
|
||||
msg = (ssh_byte(SSH2_AGENTC_REMOVE_ALL_IDENTITIES))
|
||||
return agent_query(msg)
|
||||
|
||||
@staticmethod
|
||||
def List():
|
||||
msg = (ssh_byte(SSH2_AGENTC_REQUEST_IDENTITIES))
|
||||
rsp = agent_query(msg)
|
||||
t, rsp = ssh_decode_byte(rsp, True)
|
||||
assert t == SSH2_AGENT_IDENTITIES_ANSWER
|
||||
nk, rsp = ssh_decode_uint32(rsp, True)
|
||||
keylist = []
|
||||
for _ in range(nk):
|
||||
p, rsp = ssh_decode_string(rsp, True)
|
||||
c, rsp = ssh_decode_string(rsp, True)
|
||||
keylist.append(Key2(c, p, None, None))
|
||||
assert len(rsp) == 0
|
||||
return keylist
|
||||
|
||||
@classmethod
|
||||
def make_examples(cls):
|
||||
cls.examples = agenttestdata.key2examples(cls, TestSig2)
|
||||
|
||||
def iter_testsigs(self):
|
||||
for testsig in self.sigs:
|
||||
if testsig.flags == 0:
|
||||
yield testsig._replace(flags=None)
|
||||
yield testsig
|
||||
|
||||
def iter_tests(self):
|
||||
for testsig in self.iter_testsigs():
|
||||
yield ([testsig.flags],
|
||||
" (flags={})".format(testsig.flags),
|
||||
testsig.sig)
|
||||
|
||||
class Key1(collections.namedtuple(
|
||||
"Key1", "comment public challenge response private")):
|
||||
def public_only(self):
|
||||
return Key1(self.comment, self.public, None, None, None)
|
||||
|
||||
def Add(self):
|
||||
msg = (ssh_byte(SSH1_AGENTC_ADD_RSA_IDENTITY) +
|
||||
self.private +
|
||||
ssh_string(self.comment))
|
||||
return agent_query(msg)
|
||||
|
||||
verb = "decrypt"
|
||||
def Use(self, challenge):
|
||||
msg = (ssh_byte(SSH1_AGENTC_RSA_CHALLENGE) +
|
||||
self.public +
|
||||
ssh1_mpint(challenge) +
|
||||
test_session_id +
|
||||
ssh_uint32(1))
|
||||
rsp = agent_query(msg)
|
||||
t, rsp = ssh_decode_byte(rsp, True)
|
||||
assert t == SSH1_AGENT_RSA_RESPONSE
|
||||
assert len(rsp) == 16
|
||||
return rsp
|
||||
|
||||
def Del(self):
|
||||
msg = (ssh_byte(SSH1_AGENTC_REMOVE_RSA_IDENTITY) +
|
||||
self.public)
|
||||
return agent_query(msg)
|
||||
|
||||
@staticmethod
|
||||
def DelAll():
|
||||
msg = (ssh_byte(SSH1_AGENTC_REMOVE_ALL_RSA_IDENTITIES))
|
||||
return agent_query(msg)
|
||||
|
||||
@staticmethod
|
||||
def List():
|
||||
msg = (ssh_byte(SSH1_AGENTC_REQUEST_RSA_IDENTITIES))
|
||||
rsp = agent_query(msg)
|
||||
t, rsp = ssh_decode_byte(rsp, True)
|
||||
assert t == SSH1_AGENT_RSA_IDENTITIES_ANSWER
|
||||
nk, rsp = ssh_decode_uint32(rsp, True)
|
||||
keylist = []
|
||||
for _ in range(nk):
|
||||
b, rsp = ssh_decode_uint32(rsp, True)
|
||||
e, rsp = ssh1_get_mpint(rsp, True)
|
||||
m, rsp = ssh1_get_mpint(rsp, True)
|
||||
c, rsp = ssh_decode_string(rsp, True)
|
||||
keylist.append(Key1(c, ssh_uint32(b)+e+m, None, None, None))
|
||||
assert len(rsp) == 0
|
||||
return keylist
|
||||
|
||||
@classmethod
|
||||
def make_examples(cls):
|
||||
cls.examples = agenttestdata.key1examples(cls)
|
||||
|
||||
def iter_tests(self):
|
||||
yield [self.challenge], "", self.response
|
||||
|
||||
def agent_query(msg):
|
||||
msg = ssh_string(msg)
|
||||
s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
||||
s.connect(os.environ["SSH_AUTH_SOCK"])
|
||||
s.send(msg)
|
||||
length = ssh_decode_uint32(s.recv(4))
|
||||
assert length < AGENT_MAX_MSGLEN
|
||||
return s.recv(length)
|
||||
|
||||
def enumerate_bits(iterable):
|
||||
return ((1<<j, item) for j,item in enumerate(iterable))
|
||||
|
||||
def gray_code(nbits):
|
||||
old = 0
|
||||
for i in itertools.chain(range(1, 1 << nbits), [0]):
|
||||
new = i ^ (i>>1)
|
||||
diff = new ^ old
|
||||
assert diff != 0 and (diff & (diff-1)) == 0
|
||||
yield old, new, diff
|
||||
old = new
|
||||
assert old == 0
|
||||
|
||||
class TestRunner:
|
||||
def __init__(self):
|
||||
self.ok = True
|
||||
|
||||
@staticmethod
|
||||
def fmt_response(response):
|
||||
return "'{}'".format(
|
||||
base64.encodebytes(response).decode("ASCII").replace("\n",""))
|
||||
|
||||
@staticmethod
|
||||
def fmt_keylist(keys):
|
||||
return "{{{}}}".format(
|
||||
",".join(key.comment.decode("ASCII") for key in sorted(keys)))
|
||||
|
||||
def expect_success(self, text, response):
|
||||
if response == ssh_byte(SSH_AGENT_SUCCESS):
|
||||
print(text, "=> success")
|
||||
elif response == ssh_byte(SSH_AGENT_FAILURE):
|
||||
print("FAIL!", text, "=> failure")
|
||||
self.ok = False
|
||||
else:
|
||||
print("FAIL!", text, "=>", self.fmt_response(response))
|
||||
self.ok = False
|
||||
|
||||
def check_keylist(self, K, expected_keys):
|
||||
keys = K.List()
|
||||
print("list keys =>", self.fmt_keylist(keys))
|
||||
if set(keys) != set(expected_keys):
|
||||
print("FAIL! Should have been", self.fmt_keylist(expected_keys))
|
||||
self.ok = False
|
||||
|
||||
def gray_code_test(self, K):
|
||||
bks = list(enumerate_bits(K.examples))
|
||||
|
||||
self.check_keylist(K, {})
|
||||
|
||||
for old, new, diff in gray_code(len(K.examples)):
|
||||
bit, key = next((bit, key) for bit, key in bks if diff & bit)
|
||||
|
||||
if new & bit:
|
||||
self.expect_success("insert " + key.comment.decode("ASCII"),
|
||||
key.Add())
|
||||
else:
|
||||
self.expect_success("delete " + key.comment.decode("ASCII"),
|
||||
key.Del())
|
||||
|
||||
self.check_keylist(K, [key.public_only() for bit, key in bks
|
||||
if new & bit])
|
||||
|
||||
def sign_test(self, K):
|
||||
for key in K.examples:
|
||||
for params, message, expected_answer in key.iter_tests():
|
||||
key.Add()
|
||||
actual_answer = key.Use(*params)
|
||||
key.Del()
|
||||
record = "{} with {}{}".format(
|
||||
K.verb, key.comment.decode("ASCII"), message)
|
||||
if actual_answer == expected_answer:
|
||||
print(record, "=> success")
|
||||
else:
|
||||
print("FAIL!", record, "=> {} but expected {}".format(
|
||||
self.fmt_response(actual_answer),
|
||||
self.fmt_response(expected_answer)))
|
||||
self.ok = False
|
||||
|
||||
def run(self):
|
||||
self.expect_success("init: delete all ssh2 keys", Key2.DelAll())
|
||||
|
||||
for K in [Key2, Key1]:
|
||||
self.gray_code_test(K)
|
||||
self.sign_test(K)
|
||||
|
||||
# TODO: negative tests of all kinds.
|
||||
|
||||
def main():
|
||||
Key2.make_examples()
|
||||
Key1.make_examples()
|
||||
|
||||
tr = TestRunner()
|
||||
tr.run()
|
||||
if tr.ok:
|
||||
print("Test run passed")
|
||||
else:
|
||||
sys.exit("Test run failed!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Различия файлов скрыты, потому что одна или несколько строк слишком длинны
|
@ -0,0 +1,89 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
def generate():
|
||||
import hashlib
|
||||
|
||||
print("""\
|
||||
# DO NOT EDIT DIRECTLY! Autogenerated by agenttestgen.py
|
||||
#
|
||||
# To regenerate, run
|
||||
# python3 agenttestgen.py > agenttestdata.py
|
||||
#
|
||||
# agenttestgen.py depends on the testcrypt system, so you must also
|
||||
# have built testcrypt in the parent directory, or else set
|
||||
# PUTTY_TESTCRYPT to point at a working implementation of it.
|
||||
|
||||
""")
|
||||
|
||||
from testcrypt import (rsa_generate, dsa_generate, ecdsa_generate,
|
||||
eddsa_generate, random_clear, random_queue,
|
||||
ssh_key_public_blob, ssh_key_openssh_blob,
|
||||
ssh_key_sign, rsa1_generate, rsa_ssh1_encrypt,
|
||||
rsa_ssh1_public_blob, rsa_ssh1_private_blob_agent,
|
||||
mp_from_bytes_be)
|
||||
from agenttest import (Key2, TestSig2, test_message_to_sign,
|
||||
Key1, test_session_id)
|
||||
import ssh
|
||||
|
||||
keygen2 = [
|
||||
('RSA-1024', lambda: rsa_generate(1024),
|
||||
(ssh.SSH_AGENT_RSA_SHA2_256, ssh.SSH_AGENT_RSA_SHA2_512)),
|
||||
('DSA-1024', lambda: dsa_generate(1024)),
|
||||
('ECDSA-p256', lambda: ecdsa_generate(256)),
|
||||
('Ed25519', lambda: eddsa_generate(256)),
|
||||
]
|
||||
|
||||
keys2 = []
|
||||
|
||||
for record in keygen2:
|
||||
if len(record) == 2:
|
||||
record += ((),)
|
||||
comment, genfn, flaglist = record
|
||||
flaglist = (0,) + flaglist
|
||||
|
||||
random_clear()
|
||||
random_queue(b''.join(hashlib.sha512('{}{:d}'.format(comment, j)
|
||||
.encode('ASCII')).digest()
|
||||
for j in range(1000)))
|
||||
key = genfn()
|
||||
sigs = [TestSig2(flags, ssh_key_sign(key, test_message_to_sign, flags))
|
||||
for flags in flaglist]
|
||||
|
||||
keys2.append(Key2(comment.encode("ASCII"),
|
||||
ssh_key_public_blob(key),
|
||||
sigs,
|
||||
ssh_key_openssh_blob(key)))
|
||||
|
||||
print("def key2examples(Key2, TestSig2):\n return {!r}".format(keys2))
|
||||
|
||||
keygen1 = [
|
||||
('RSA-1024a', 1024),
|
||||
('RSA-1024b', 1024),
|
||||
('RSA-768c', 768),
|
||||
('RSA-768d', 768),
|
||||
]
|
||||
|
||||
keys1 = []
|
||||
|
||||
for comment, bits in keygen1:
|
||||
random_clear()
|
||||
random_queue(b''.join(hashlib.sha512('{}{:d}'.format(comment, j)
|
||||
.encode('ASCII')).digest()
|
||||
for j in range(1000)))
|
||||
key = rsa1_generate(bits)
|
||||
preimage = b'Test128BitRSA1ChallengeCleartext'
|
||||
assert len(preimage) == 32
|
||||
challenge_bytes = rsa_ssh1_encrypt(preimage, key)
|
||||
assert len(challenge_bytes) > 0
|
||||
challenge = int(mp_from_bytes_be(challenge_bytes))
|
||||
response = hashlib.md5(preimage + test_session_id).digest()
|
||||
|
||||
keys1.append(Key1(comment.encode("ASCII"),
|
||||
rsa_ssh1_public_blob(key, 'exponent_first'),
|
||||
challenge, response,
|
||||
rsa_ssh1_private_blob_agent(key)))
|
||||
|
||||
print("def key1examples(Key1):\n return {!r}".format(keys1))
|
||||
|
||||
if __name__ == "__main__":
|
||||
generate()
|
|
@ -15,41 +15,16 @@ except ImportError:
|
|||
|
||||
from eccref import *
|
||||
from testcrypt import *
|
||||
from ssh import *
|
||||
|
||||
try:
|
||||
base64decode = base64.decodebytes
|
||||
except AttributeError:
|
||||
base64decode = base64.decodestring
|
||||
|
||||
def nbits(n):
|
||||
# Mimic mp_get_nbits for ordinary Python integers.
|
||||
assert 0 <= n
|
||||
smax = next(s for s in itertools.count() if (n >> (1 << s)) == 0)
|
||||
toret = 0
|
||||
for shift in reversed([1 << s for s in range(smax)]):
|
||||
if n >> shift != 0:
|
||||
n >>= shift
|
||||
toret += shift
|
||||
assert n <= 1
|
||||
if n == 1:
|
||||
toret += 1
|
||||
return toret
|
||||
|
||||
def unhex(s):
|
||||
return binascii.unhexlify(s.replace(" ", "").replace("\n", ""))
|
||||
|
||||
def ssh_uint32(n):
|
||||
return struct.pack(">L", n)
|
||||
def ssh_string(s):
|
||||
return ssh_uint32(len(s)) + s
|
||||
def ssh1_mpint(x):
|
||||
bits = nbits(x)
|
||||
bytevals = [0xFF & (x >> (8*n)) for n in range((bits-1)//8, -1, -1)]
|
||||
return struct.pack(">H" + "B" * len(bytevals), bits, *bytevals)
|
||||
def ssh2_mpint(x):
|
||||
bytevals = [0xFF & (x >> (8*n)) for n in range(nbits(x)//8, -1, -1)]
|
||||
return struct.pack(">L" + "B" * len(bytevals), len(bytevals), *bytevals)
|
||||
|
||||
def rsa_bare(e, n):
|
||||
rsa = rsa_new()
|
||||
get_rsa_ssh1_pub(ssh_uint32(nbits(n)) + ssh1_mpint(e) + ssh1_mpint(n),
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
import struct
|
||||
import itertools
|
||||
|
||||
def nbits(n):
|
||||
# Mimic mp_get_nbits for ordinary Python integers.
|
||||
assert 0 <= n
|
||||
smax = next(s for s in itertools.count() if (n >> (1 << s)) == 0)
|
||||
toret = 0
|
||||
for shift in reversed([1 << s for s in range(smax)]):
|
||||
if n >> shift != 0:
|
||||
n >>= shift
|
||||
toret += shift
|
||||
assert n <= 1
|
||||
if n == 1:
|
||||
toret += 1
|
||||
return toret
|
||||
|
||||
def ssh_byte(n):
|
||||
return struct.pack("B", n)
|
||||
|
||||
def ssh_uint32(n):
|
||||
return struct.pack(">L", n)
|
||||
|
||||
def ssh_string(s):
|
||||
return ssh_uint32(len(s)) + s
|
||||
|
||||
def ssh1_mpint(x):
|
||||
bits = nbits(x)
|
||||
bytevals = [0xFF & (x >> (8*n)) for n in range((bits-1)//8, -1, -1)]
|
||||
return struct.pack(">H" + "B" * len(bytevals), bits, *bytevals)
|
||||
|
||||
def ssh2_mpint(x):
|
||||
bytevals = [0xFF & (x >> (8*n)) for n in range(nbits(x)//8, -1, -1)]
|
||||
return struct.pack(">L" + "B" * len(bytevals), len(bytevals), *bytevals)
|
||||
|
||||
def decoder(fn):
|
||||
def decode(s, return_rest = False):
|
||||
item, length_consumed = fn(s)
|
||||
if return_rest:
|
||||
return item, s[length_consumed:]
|
||||
else:
|
||||
return item
|
||||
return decode
|
||||
|
||||
@decoder
|
||||
def ssh_decode_byte(s):
|
||||
return struct.unpack_from("B", s, 0)[0], 1
|
||||
|
||||
@decoder
|
||||
def ssh_decode_uint32(s):
|
||||
return struct.unpack_from(">L", s, 0)[0], 4
|
||||
|
||||
@decoder
|
||||
def ssh_decode_string(s):
|
||||
length = ssh_decode_uint32(s)
|
||||
assert length + 4 <= len(s)
|
||||
return s[4:length+4], length+4
|
||||
|
||||
@decoder
|
||||
def ssh1_get_mpint(s): # returns it unconsumed, still in wire encoding
|
||||
nbits = struct.unpack_from(">H", s, 0)[0]
|
||||
nbytes = (nbits + 7) // 8
|
||||
assert nbytes + 2 <= len(s)
|
||||
return s[:nbytes+2], nbytes+2
|
||||
|
||||
@decoder
|
||||
def ssh1_decode_mpint(s):
|
||||
nbits = struct.unpack_from(">H", s, 0)[0]
|
||||
nbytes = (nbits + 7) // 8
|
||||
assert nbytes + 2 <= len(s)
|
||||
data = s[2:nbytes+2]
|
||||
v = 0
|
||||
for b in struct.unpack("B" * len(data), data):
|
||||
v = (v << 8) | b
|
||||
return v, nbytes+2
|
||||
|
||||
AGENT_MAX_MSGLEN = 262144
|
||||
|
||||
SSH1_AGENTC_REQUEST_RSA_IDENTITIES = 1
|
||||
SSH1_AGENT_RSA_IDENTITIES_ANSWER = 2
|
||||
SSH1_AGENTC_RSA_CHALLENGE = 3
|
||||
SSH1_AGENT_RSA_RESPONSE = 4
|
||||
SSH1_AGENTC_ADD_RSA_IDENTITY = 7
|
||||
SSH1_AGENTC_REMOVE_RSA_IDENTITY = 8
|
||||
SSH1_AGENTC_REMOVE_ALL_RSA_IDENTITIES = 9
|
||||
SSH_AGENT_FAILURE = 5
|
||||
SSH_AGENT_SUCCESS = 6
|
||||
SSH2_AGENTC_REQUEST_IDENTITIES = 11
|
||||
SSH2_AGENT_IDENTITIES_ANSWER = 12
|
||||
SSH2_AGENTC_SIGN_REQUEST = 13
|
||||
SSH2_AGENT_SIGN_RESPONSE = 14
|
||||
SSH2_AGENTC_ADD_IDENTITY = 17
|
||||
SSH2_AGENTC_REMOVE_IDENTITY = 18
|
||||
SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19
|
||||
SSH2_AGENTC_EXTENSION = 27
|
||||
|
||||
SSH_AGENT_RSA_SHA2_256 = 2
|
||||
SSH_AGENT_RSA_SHA2_512 = 4
|
Загрузка…
Ссылка в новой задаче