trezor: add support for Ed25519 SSH keys

nistp521
Roman Zeyde 9 years ago
parent 34cecb276a
commit 60571e65dd

@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled
disable=invalid-name, missing-docstring, locally-disabled,no-member

@ -3,14 +3,14 @@ from setuptools import setup
setup(
name='trezor_agent',
version='0.4.2',
version='0.5.0',
description='Using Trezor as hardware SSH agent',
author='Roman Zeyde',
author_email='roman.zeyde@gmail.com',
license='MIT',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent', 'trezor_agent.trezor'],
install_requires=['ecdsa>=0.13', 'trezor>=0.6.6'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'trezor>=0.6.6'],
platforms=['POSIX'],
classifiers=[
'Development Status :: 3 - Alpha',

@ -1,6 +1,5 @@
[tox]
envlist = py27,py34
skipsdist = True
envlist = py27
[testenv]
deps=
pytest
@ -9,9 +8,8 @@ deps=
coverage
pylint
six
ecdsa
commands=
pep8 trezor_agent
pylint --report=no --rcfile .pylintrc trezor_agent
coverage run --omit='trezor_agent/__main__.py,trezor_agent/trezor/_library.py' --source trezor_agent/ -m py.test -v
coverage run --omit='trezor_agent/__main__.py' --source trezor_agent -m py.test -v trezor_agent
coverage report

@ -3,9 +3,11 @@ import re
import sys
import argparse
import subprocess
import functools
from . import trezor
from . import server
from . import formats
import logging
log = logging.getLogger(__name__)
@ -48,9 +50,13 @@ def create_agent_parser():
g = p.add_mutually_exclusive_group()
g.add_argument('-s', '--shell', default=False, action='store_true',
help='run $SHELL as subprocess under SSH agent')
help='run ${SHELL} as subprocess under SSH agent')
g.add_argument('-c', '--connect', default=False, action='store_true',
help='connect to specified host via SSH')
curves = ', '.join(sorted(formats.SUPPORTED_CURVES))
p.add_argument('-e', '--ecdsa-curve-name', metavar='CURVE',
default=formats.CURVE_NIST256,
help='specify ECDSA curve name: ' + curves)
p.add_argument('command', type=str, nargs='*', metavar='ARGUMENT',
help='command to run under the SSH agent')
return p
@ -64,21 +70,15 @@ def setup_logging(verbosity):
logging.basicConfig(format=fmt, level=level)
def ssh_command(identity):
command = ['ssh', identity.host]
if identity.user:
command += ['-l', identity.user]
if identity.port:
command += ['-p', identity.port]
return command
def ssh_sign(client, label, blob):
return client.sign_ssh_challenge(label=label, blob=blob)
def trezor_agent():
def run_agent(client_factory):
args = create_agent_parser().parse_args()
setup_logging(verbosity=args.verbose)
with trezor.Client() as client:
with client_factory(curve=args.ecdsa_curve_name) as client:
label = args.identity
command = args.command
@ -88,12 +88,11 @@ def trezor_agent():
if command:
command = ['git'] + command
identity = client.get_identity(label=label)
public_key = client.get_public_key(identity=identity)
public_key = client.get_public_key(label=label)
use_shell = False
if args.connect:
command = ssh_command(identity) + args.command
command = ['ssh', label] + args.command
log.debug('SSH connect: %r', command)
if args.shell:
@ -104,13 +103,14 @@ def trezor_agent():
sys.stdout.write(public_key)
return
def signer(label, blob):
identity = client.get_identity(label=label)
return client.sign_ssh_challenge(identity=identity, blob=blob)
try:
signer = functools.partial(ssh_sign, client=client)
with server.serve(public_keys=[public_key], signer=signer) as env:
return server.run_process(command=command, environ=env,
use_shell=use_shell)
except KeyboardInterrupt:
log.info('server stopped')
def trezor_agent():
run_agent(trezor.Client)

@ -2,15 +2,25 @@ import io
import hashlib
import base64
import ecdsa
import ed25519
from . import util
import logging
log = logging.getLogger(__name__)
DER_OCTET_STRING = b'\x04'
ECDSA_KEY_PREFIX = b'ecdsa-sha2-'
ECDSA_CURVE_NAME = b'nistp256'
# Supported ECDSA curves
CURVE_NIST256 = b'nist256p1'
CURVE_ED25519 = b'ed25519'
SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519}
# SSH key types
SSH_NIST256_DER_OCTET = b'\x04'
SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-'
SSH_NIST256_CURVE_NAME = b'nistp256'
SSH_NIST256_KEY_TYPE = SSH_NIST256_KEY_PREFIX + SSH_NIST256_CURVE_NAME
SSH_ED25519_KEY_TYPE = b'ssh-ed25519'
SUPPORTED_KEY_TYPES = {SSH_NIST256_KEY_TYPE, SSH_ED25519_KEY_TYPE}
hashfunc = hashlib.sha256
@ -20,73 +30,111 @@ def fingerprint(blob):
return ':'.join('{:02x}'.format(c) for c in bytearray(digest))
def parse_pubkey(blob, curve=ecdsa.NIST256p):
def parse_pubkey(blob):
fp = fingerprint(blob)
s = io.BytesIO(blob)
key_type = util.read_frame(s)
log.debug('key type: %s', key_type)
curve_name = util.read_frame(s)
log.debug('curve name: %s', curve_name)
point = util.read_frame(s)
assert s.read() == b''
_type, point = point[:1], point[1:]
assert _type == DER_OCTET_STRING
size = len(point) // 2
assert len(point) == 2 * size
coords = (util.bytes2num(point[:size]), util.bytes2num(point[size:]))
log.debug('coordinates: %s', coords)
fp = fingerprint(blob)
assert key_type in SUPPORTED_KEY_TYPES, key_type
result = {'blob': blob, 'type': key_type, 'fingerprint': fp}
if key_type == SSH_NIST256_KEY_TYPE:
curve_name = util.read_frame(s)
log.debug('curve name: %s', curve_name)
point = util.read_frame(s)
assert s.read() == b''
_type, point = point[:1], point[1:]
assert _type == SSH_NIST256_DER_OCTET
size = len(point) // 2
assert len(point) == 2 * size
coords = (util.bytes2num(point[:size]), util.bytes2num(point[size:]))
curve = ecdsa.NIST256p
point = ecdsa.ellipticcurve.Point(curve.curve, *coords)
vk = ecdsa.VerifyingKey.from_public_point(point, curve, hashfunc)
def ecdsa_verifier(sig, msg):
assert len(sig) == 2 * size
sig_decode = ecdsa.util.sigdecode_string
vk.verify(signature=sig, data=msg, sigdecode=sig_decode)
parts = [sig[:size], sig[size:]]
return b''.join([util.frame(b'\x00' + p) for p in parts])
result.update(point=coords, curve=CURVE_NIST256,
verifier=ecdsa_verifier)
if key_type == SSH_ED25519_KEY_TYPE:
pubkey = util.read_frame(s)
assert s.read() == b''
vk = ed25519.VerifyingKey(pubkey)
def ed25519_verify(sig, msg):
assert len(sig) == 64
vk.verify(sig, msg)
return sig
result.update(curve=CURVE_ED25519, verifier=ed25519_verify)
point = ecdsa.ellipticcurve.Point(curve.curve, *coords)
vk = ecdsa.VerifyingKey.from_public_point(point, curve, hashfunc)
result = {
'point': coords,
'curve': curve_name,
'fingerprint': fp,
'type': key_type,
'blob': blob,
'size': size,
'verifying_key': vk
}
return result
def decompress_pubkey(pub, curve=ecdsa.NIST256p):
P = curve.curve.p()
A = curve.curve.a()
B = curve.curve.b()
x = util.bytes2num(pub[1:33])
beta = pow(int(x*x*x+A*x+B), int((P+1)//4), int(P))
def decompress_pubkey(pub):
if pub[:1] == b'\x00':
# set by Trezor fsm_msgSignIdentity() and fsm_msgGetPublicKey()
return ed25519.VerifyingKey(pub[1:])
if pub[:1] in {b'\x02', b'\x03'}: # set by ecdsa_get_public_key33()
curve = ecdsa.NIST256p
P = curve.curve.p()
A = curve.curve.a()
B = curve.curve.b()
x = util.bytes2num(pub[1:33])
beta = pow(int(x * x * x + A * x + B), int((P + 1) // 4), int(P))
p0 = util.bytes2num(pub[:1])
y = (P-beta) if ((beta + p0) % 2) else beta
p0 = util.bytes2num(pub[:1])
y = (P - beta) if ((beta + p0) % 2) else beta
point = ecdsa.ellipticcurve.Point(curve.curve, x, y)
return ecdsa.VerifyingKey.from_public_point(point, curve=curve,
hashfunc=hashfunc)
point = ecdsa.ellipticcurve.Point(curve.curve, x, y)
return ecdsa.VerifyingKey.from_public_point(point, curve=curve,
hashfunc=hashfunc)
raise ValueError('invalid {!r}', pub)
def serialize_verifying_key(vk):
key_type = ECDSA_KEY_PREFIX + ECDSA_CURVE_NAME
curve_name = ECDSA_CURVE_NAME
key_blob = DER_OCTET_STRING + vk.to_string()
parts = [key_type, curve_name, key_blob]
return b''.join([util.frame(p) for p in parts])
if isinstance(vk, ed25519.keys.VerifyingKey):
pubkey = vk.to_bytes()
key_type = SSH_ED25519_KEY_TYPE
blob = util.frame(SSH_ED25519_KEY_TYPE) + util.frame(pubkey)
return key_type, blob
if isinstance(vk, ecdsa.keys.VerifyingKey):
curve_name = SSH_NIST256_CURVE_NAME
key_blob = SSH_NIST256_DER_OCTET + vk.to_string()
parts = [SSH_NIST256_KEY_TYPE, curve_name, key_blob]
key_type = SSH_NIST256_KEY_TYPE
blob = b''.join([util.frame(p) for p in parts])
return key_type, blob
raise TypeError('unsupported {!r}'.format(vk))
def export_public_key(pubkey, label):
blob = serialize_verifying_key(decompress_pubkey(pubkey))
assert len(pubkey) == 33
key_type, blob = serialize_verifying_key(decompress_pubkey(pubkey))
log.debug('fingerprint: %s', fingerprint(blob))
b64 = base64.b64encode(blob).decode('ascii')
key_type = ECDSA_KEY_PREFIX + ECDSA_CURVE_NAME
return '{} {} {}\n'.format(key_type.decode('ascii'), b64, label)
def import_public_key(line):
''' Parse public key textual format, as saved at .pub file '''
log.debug('loading SSH public key: %r', line)
file_type, base64blob, name = line.split()
blob = base64.b64decode(base64blob)
result = parse_pubkey(blob)
result['name'] = name.encode('ascii')
assert result['type'] == file_type.encode('ascii')
log.debug('loaded %s %s', file_type, result['fingerprint'])
log.debug('loaded %s public key: %s', file_type, result['fingerprint'])
return result

@ -1,4 +1,5 @@
import io
import binascii
from . import util
from . import formats
@ -9,15 +10,10 @@ log = logging.getLogger(__name__)
SSH_AGENTC_REQUEST_RSA_IDENTITIES = 1
SSH_AGENT_RSA_IDENTITIES_ANSWER = 2
SSH_AGENTC_REMOVE_ALL_RSA_IDENTITIES = 9
SSH2_AGENTC_REQUEST_IDENTITIES = 11
SSH2_AGENT_IDENTITIES_ANSWER = 12
SSH2_AGENTC_SIGN_REQUEST = 13
SSH2_AGENT_SIGN_RESPONSE = 14
SSH2_AGENTC_ADD_IDENTITY = 17
SSH2_AGENTC_REMOVE_IDENTITY = 18
SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19
class Error(Exception):
@ -91,23 +87,16 @@ class Handler(object):
raise MissingKey('key not found')
log.debug('signing %d-byte blob', len(blob))
r, s = self.signer(label=key['name'], blob=blob)
signature = (r, s)
log.debug('signature: %s', signature)
signature = self.signer(label=key['name'], blob=blob)
log.debug('signature: %s', binascii.hexlify(signature))
try:
key['verifying_key'].verify(signature=signature, data=blob,
sigdecode=lambda sig, _: sig)
sig_bytes = key['verifier'](sig=signature, msg=blob)
log.info('signature status: OK')
except formats.ecdsa.BadSignatureError:
log.exception('signature status: ERROR')
raise BadSignature('invalid ECDSA signature')
sig_bytes = io.BytesIO()
for x in signature:
x_frame = util.frame(b'\x00' + util.num2bytes(x, key['size']))
sig_bytes.write(x_frame)
sig_bytes = sig_bytes.getvalue()
log.debug('signature size: %d bytes', len(sig_bytes))
data = util.frame(util.frame(key['type']), util.frame(sig_bytes))

@ -1,4 +1,5 @@
import binascii
import pytest
from .. import formats
@ -27,13 +28,48 @@ def test_parse_public_key():
assert key['name'] == b'home'
assert key['point'] == _point
assert key['curve'] == b'nistp256'
assert key['curve'] == b'nist256p1'
assert key['fingerprint'] == '4b:19:bc:0f:c8:7e:dc:fa:1a:e3:c2:ff:6f:e0:80:a2' # nopep8
assert key['type'] == b'ecdsa-sha2-nistp256'
assert key['size'] == 32
def test_decompress():
blob = '036236ceabde25207e81e404586e3a3af1acda1dfed2abbbb4876c1fc5b296b575'
result = formats.export_public_key(binascii.unhexlify(blob), label='home')
assert result == _public_key
def test_parse_ed25519():
pubkey = ('ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFBdF2tj'
'fSO8nLIi736is+f0erq28RTc7CkM11NZtTKR hello\n')
p = formats.import_public_key(pubkey)
assert p['name'] == b'hello'
assert p['curve'] == b'ed25519'
BLOB = (b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#'
b'\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14'
b'\xdc\xec)\x0c\xd7SY\xb52\x91')
assert p['blob'] == BLOB
assert p['fingerprint'] == '6b:b0:77:af:e5:3a:21:6d:17:82:9b:06:19:03:a1:97' # nopep8
assert p['type'] == b'ssh-ed25519'
def test_export_ed25519():
pub = (b'\x00P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4'
b'z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91')
vk = formats.decompress_pubkey(pub)
result = formats.serialize_verifying_key(vk)
assert result == (b'ssh-ed25519',
b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc'
b'\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc'
b'\xec)\x0c\xd7SY\xb52\x91')
def test_decompress_error():
with pytest.raises(ValueError):
formats.decompress_pubkey('')
def test_serialize_error():
with pytest.raises(TypeError):
formats.serialize_verifying_key(None)

@ -5,52 +5,73 @@ import pytest
# pylint: disable=line-too-long
KEY = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEUksojS/qRlTKBKLQO7CBX7a7oqFkysuFn1nJ6gzlR3wNuQXEgd7qb2bjmiiBHsjNxyWvH5SxVi3+fghrqODWo= ssh://localhost' # nopep8
BLOB = b'\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj' # nopep8
SIG = (61640221631134565789126560951398335114074531708367858563384221818711312348703, 51535548700089687831159696283235534298026173963719263249292887877395159425513) # nopep8
NIST256_KEY = 'ecdsa-sha2-nistp256 AAAAE2VjZHNhLXNoYTItbmlzdHAyNTYAAAAIbmlzdHAyNTYAAABBBEUksojS/qRlTKBKLQO7CBX7a7oqFkysuFn1nJ6gzlR3wNuQXEgd7qb2bjmiiBHsjNxyWvH5SxVi3+fghrqODWo= ssh://localhost' # nopep8
NIST256_BLOB = b'\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj' # nopep8
NIST256_SIG = b'\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1fq\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
LIST_MSG = b'\x0b'
LIST_REPLY = b'\x00\x00\x00\x84\x0c\x00\x00\x00\x01\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x0fssh://localhost' # nopep8
LIST_NIST256_REPLY = b'\x00\x00\x00\x84\x0c\x00\x00\x00\x01\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x0fssh://localhost' # nopep8
SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\xd1\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x00' # nopep8
SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
NIST256_SIGN_MSG = b'\r\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\xd1\x00\x00\x00 !S^\xe7\xf8\x1cKN\xde\xcbo\x0c\x83\x9e\xc48\r\xac\xeb,]"\xc1\x9bA\x0eit\xc1\x81\xd4E2\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00h\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00\x08nistp256\x00\x00\x00A\x04E$\xb2\x88\xd2\xfe\xa4eL\xa0J-\x03\xbb\x08\x15\xfbk\xba*\x16L\xac\xb8Y\xf5\x9c\x9e\xa0\xceTw\xc0\xdb\x90\\H\x1d\xee\xa6\xf6n9\xa2\x88\x11\xec\x8c\xdcrZ\xf1\xf9K\x15b\xdf\xe7\xe0\x86\xba\x8e\rj\x00\x00\x00\x00' # nopep8
NIST256_SIGN_REPLY = b'\x00\x00\x00j\x0e\x00\x00\x00e\x00\x00\x00\x13ecdsa-sha2-nistp256\x00\x00\x00J\x00\x00\x00!\x00\x88G!\x0c\n\x16:\xbeF\xbe\xb9\xd2\xa9&e\x89\xad\xc4}\x10\xf8\xbc\xdc\xef\x0e\x8d_\x8a6.\xb6\x1f\x00\x00\x00!\x00q\xf0\x16>,\x9a\xde\xe7(\xd6\xd7\x93\x1f\xed\xf9\x94ddw\xfe\xbdq\x13\xbb\xfc\xa9K\xea\x9dC\xa1\xe9' # nopep8
def test_list():
key = formats.import_public_key(KEY)
key = formats.import_public_key(NIST256_KEY)
h = protocol.Handler(keys=[key], signer=None)
reply = h.handle(LIST_MSG)
assert reply == LIST_REPLY
assert reply == LIST_NIST256_REPLY
def signer(label, blob):
def ecdsa_signer(label, blob):
assert label == b'ssh://localhost'
assert blob == BLOB
return SIG
assert blob == NIST256_BLOB
return NIST256_SIG
def test_sign():
key = formats.import_public_key(KEY)
h = protocol.Handler(keys=[key], signer=signer)
reply = h.handle(SIGN_MSG)
assert reply == SIGN_REPLY
def test_ecdsa_sign():
key = formats.import_public_key(NIST256_KEY)
h = protocol.Handler(keys=[key], signer=ecdsa_signer)
reply = h.handle(NIST256_SIGN_MSG)
assert reply == NIST256_SIGN_REPLY
def test_sign_missing():
h = protocol.Handler(keys=[], signer=signer)
h = protocol.Handler(keys=[], signer=ecdsa_signer)
with pytest.raises(protocol.MissingKey):
h.handle(SIGN_MSG)
h.handle(NIST256_SIGN_MSG)
def test_sign_wrong():
def wrong_signature(label, blob):
assert label == b'ssh://localhost'
assert blob == BLOB
return (0, 0)
assert blob == NIST256_BLOB
return b'\x00' * 64
key = formats.import_public_key(KEY)
key = formats.import_public_key(NIST256_KEY)
h = protocol.Handler(keys=[key], signer=wrong_signature)
with pytest.raises(protocol.BadSignature):
h.handle(SIGN_MSG)
h.handle(NIST256_SIGN_MSG)
ED25519_KEY = 'ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIFBdF2tjfSO8nLIi736is+f0erq28RTc7CkM11NZtTKR ssh://localhost' # nopep8
ED25519_SIGN_MSG = b'''\r\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x94\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\xc0\x9f\x97\x0fTC!\x80\x07\x91\xdb^8\xc1\xd62\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91\x00\x00\x00\x00''' # nopep8
ED25519_SIGN_REPLY = b'''\x00\x00\x00X\x0e\x00\x00\x00S\x00\x00\x00\x0bssh-ed25519\x00\x00\x00@\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3<O\x11\xc0\xfa\xe4\xed\xb8\x81.\x81\xc8\xa6\xba\x10RA'a\xbc\xa9\xd3\xdb\x98\x07\xf0\x1a\x9c4\x84<\xaf\x99\xb7\xe5G\xeb\xf7$\xc1\r\x86f\x16\x8e\x08\x05''' # nopep8
ED25519_BLOB = b'''\x00\x00\x00 i3\xae}yk\\\xa1L\xb9\xe1\xbf\xbc\x8e\x87\r\x0e\xc0\x9f\x97\x0fTC!\x80\x07\x91\xdb^8\xc1\xd62\x00\x00\x00\x05roman\x00\x00\x00\x0essh-connection\x00\x00\x00\tpublickey\x01\x00\x00\x00\x0bssh-ed25519\x00\x00\x003\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 P]\x17kc}#\xbc\x9c\xb2"\xef~\xa2\xb3\xe7\xf4z\xba\xb6\xf1\x14\xdc\xec)\x0c\xd7SY\xb52\x91''' # nopep8
ED25519_SIG = b'''\x8eb)\xa6\xe9P\x83VE\xfbq\xc6\xbf\x1dV3\xe3<O\x11\xc0\xfa\xe4\xed\xb8\x81.\x81\xc8\xa6\xba\x10RA'a\xbc\xa9\xd3\xdb\x98\x07\xf0\x1a\x9c4\x84<\xaf\x99\xb7\xe5G\xeb\xf7$\xc1\r\x86f\x16\x8e\x08\x05''' # nopep8
def ed25519_signer(label, blob):
assert label == b'ssh://localhost'
assert blob == ED25519_BLOB
return ED25519_SIG
def test_ed25519_sign():
key = formats.import_public_key(ED25519_KEY)
h = protocol.Handler(keys=[key], signer=ed25519_signer)
reply = h.handle(ED25519_SIGN_MSG)
assert reply == ED25519_SIGN_REPLY

@ -34,10 +34,10 @@ class ConnectionMock(object):
def clear_session(self):
self.closed = True
def get_public_node(self, n, ecdsa_curve_name='secp256k1'):
def get_public_node(self, n, ecdsa_curve_name=b'secp256k1'):
assert not self.closed
assert n == ADDR
assert ecdsa_curve_name in {'secp256k1', 'nist256p1'}
assert ecdsa_curve_name in {b'secp256k1', b'nist256p1'}
result = mock.Mock(spec=[])
result.node = mock.Mock(spec=[])
result.node.public_key = PUBKEY
@ -80,8 +80,9 @@ SIG = (b'\x00R\x19T\xf2\x84$\xef#\x0e\xee\x04X\xc6\xc3\x99T`\xd1\xd8\xf7!'
def test_ssh_agent():
label = 'localhost:22'
c = client.Client(factory=FactoryMock)
ident = c.get_identity(label='localhost:22')
ident = c.get_identity(label=label)
assert ident.host == 'localhost'
assert ident.proto == 'ssh'
assert ident.port == '22'
@ -89,14 +90,13 @@ def test_ssh_agent():
assert ident.path is None
with c:
assert c.get_public_key(ident) == PUBKEY_TEXT
assert c.get_public_key(label) == PUBKEY_TEXT
def ssh_sign_identity(identity, challenge_hidden,
challenge_visual, ecdsa_curve_name):
assert identity is ident
assert challenge_hidden == BLOB
assert challenge_visual == identity.path
assert ecdsa_curve_name == 'nist256p1'
assert ecdsa_curve_name == b'nist256p1'
result = mock.Mock(spec=[])
result.public_key = PUBKEY
@ -104,11 +104,10 @@ def test_ssh_agent():
return result
c.client.sign_identity = ssh_sign_identity
signature = c.sign_ssh_challenge(identity=ident, blob=BLOB)
signature = c.sign_ssh_challenge(label=label, blob=BLOB)
key = formats.import_public_key(PUBKEY_TEXT)
assert key['verifying_key'].verify(signature=signature, data=BLOB,
sigdecode=lambda sig, _: sig)
assert key['verifier'](sig=signature, msg=BLOB)
def test_utils():

@ -15,7 +15,8 @@ class Client(object):
MIN_VERSION = [1, 3, 4]
def __init__(self, factory=TrezorFactory):
def __init__(self, factory=TrezorFactory, curve=formats.CURVE_NIST256):
self.curve = curve
self.factory = factory
self.client = self.factory.client()
f = self.client.features
@ -46,20 +47,20 @@ class Client(object):
identity.proto = 'ssh'
return identity
def get_public_key(self, identity):
assert identity.proto == 'ssh'
label = identity_to_string(identity)
log.info('getting "%s" public key from Trezor...', label)
def get_public_key(self, label):
identity = self.get_identity(label=label)
label = identity_to_string(identity) # canonize key label
log.info('getting "%s" public key (%s) from Trezor...',
label, self.curve)
addr = _get_address(identity)
node = self.client.get_public_node(n=addr,
ecdsa_curve_name='nist256p1')
ecdsa_curve_name=self.curve)
pubkey = node.node.public_key
return formats.export_public_key(pubkey=pubkey, label=label)
def sign_ssh_challenge(self, identity, blob):
assert identity.proto == 'ssh'
label = identity_to_string(identity)
def sign_ssh_challenge(self, label, blob):
identity = self.get_identity(label=label)
msg = _parse_ssh_blob(blob)
log.info('please confirm user "%s" login to "%s" using Trezor...',
@ -69,21 +70,16 @@ class Client(object):
result = self.client.sign_identity(identity=identity,
challenge_hidden=blob,
challenge_visual=visual,
ecdsa_curve_name='nist256p1')
ecdsa_curve_name=self.curve)
verifying_key = formats.decompress_pubkey(result.public_key)
public_key_blob = formats.serialize_verifying_key(verifying_key)
assert public_key_blob == msg['public_key']['blob']
key_type, blob = formats.serialize_verifying_key(verifying_key)
assert blob == msg['public_key']['blob']
assert key_type == msg['key_type']
assert len(result.signature) == 65
assert result.signature[:1] == bytearray([0])
return parse_signature(result.signature)
def parse_signature(blob):
sig = blob[1:]
r = util.bytes2num(sig[:32])
s = util.bytes2num(sig[32:])
return (r, s)
return result.signature[1:]
_identity_regexp = re.compile(''.join([

Loading…
Cancel
Save