Merge pull request #51 from romanz/curve25519

Curve25519
nistp521
Roman Zeyde 8 years ago committed by GitHub
commit b4a382d22e

@ -11,11 +11,15 @@ from . import util
log = logging.getLogger(__name__)
# Supported ECDSA curves
# Supported ECDSA curves (for SSH and GPG)
CURVE_NIST256 = 'nist256p1'
CURVE_ED25519 = 'ed25519'
SUPPORTED_CURVES = {CURVE_NIST256, CURVE_ED25519}
# Supported ECDH curves (for GPG)
ECDH_NIST256 = 'nist256p1'
ECDH_CURVE25519 = 'curve25519'
# SSH key types
SSH_NIST256_DER_OCTET = b'\x04'
SSH_NIST256_KEY_PREFIX = b'ecdsa-sha2-'
@ -134,7 +138,8 @@ def decompress_pubkey(pubkey, curve_name):
if len(pubkey) == 33:
decompress = {
CURVE_NIST256: _decompress_nist256,
CURVE_ED25519: _decompress_ed25519
CURVE_ED25519: _decompress_ed25519,
ECDH_CURVE25519: _decompress_ed25519,
}[curve_name]
vk = decompress(pubkey)
@ -192,3 +197,12 @@ def import_public_key(line):
assert result['type'] == file_type.encode('ascii')
log.debug('loaded %s public key: %s', file_type, result['fingerprint'])
return result
def get_ecdh_curve_name(signature_curve_name):
"""Return appropriate curve for ECDH for specified signing curve."""
return {
CURVE_NIST256: ECDH_NIST256,
CURVE_ED25519: ECDH_CURVE25519,
ECDH_CURVE25519: ECDH_CURVE25519,
}[signature_curve_name]

@ -7,8 +7,8 @@ import os
import sys
import time
from . import agent, encode, keyring, protocol
from .. import server
from . import agent, device, encode, keyring, protocol
from .. import formats, server
log = logging.getLogger(__name__)
@ -19,7 +19,7 @@ def run_create(args):
log.warning('NOTE: in order to re-generate the exact same GPG key later, '
'run this command with "--time=%d" commandline flag (to set '
'the timestamp of the GPG key manually).', args.time)
conn = encode.HardwareSigner(user_id=user_id,
conn = device.HardwareSigner(user_id=user_id,
curve_name=args.ecdsa_curve)
verifying_key = conn.pubkey(ecdh=False)
decryption_key = conn.pubkey(ecdh=True)
@ -32,8 +32,8 @@ def run_create(args):
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
encryption_key = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_subkey(primary_bytes=primary_bytes,
pubkey=signing_key,
signer_func=conn.sign)
@ -47,8 +47,8 @@ def run_create(args):
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
subkey = protocol.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
curve_name=formats.get_ecdh_curve_name(args.ecdsa_curve),
created=args.time, verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=user_id,
pubkey=primary,

@ -92,11 +92,7 @@ def pkdecrypt(keygrip, conn):
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
shared_secret = conn.ecdh(remote_pubkey)
assert len(shared_secret) == 65
assert shared_secret[:1] == b'\x04'
return _serialize_point(shared_secret)
return _serialize_point(conn.ecdh(remote_pubkey))
def handle_connection(conn):

@ -83,9 +83,15 @@ def _parse_ed25519_verifier(mpi):
return _ed25519_verify, vk
def _parse_curve25519_verifier(_):
log.warning('Curve25519 ECDH is not verified')
return None, None
SUPPORTED_CURVES = {
b'\x2A\x86\x48\xCE\x3D\x03\x01\x07': _parse_nist256p1_verifier,
b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01': _parse_ed25519_verifier,
b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01': _parse_curve25519_verifier,
}
RSA_ALGO_IDS = {1, 2, 3}
@ -168,6 +174,7 @@ def _parse_pubkey(stream, packet_type='pubkey'):
oid_size = stream.readfmt('B')
oid = stream.read(oid_size)
assert oid in SUPPORTED_CURVES, util.hexlify(oid)
p['curve_oid'] = oid
parser = SUPPORTED_CURVES[oid]
mpi = parse_mpi(stream)

@ -0,0 +1,54 @@
"""Device abstraction layer for GPG operations."""
from .. import factory, formats, util
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
if ecdh:
curve_name = formats.get_ecdh_curve_name(self.curve_name)
else:
curve_name = self.curve_name
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=curve_name)
return formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual='',
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=formats.get_ecdh_curve_name(self.curve_name))
assert len(result.session_key) in {65, 33} # NIST256 or Curve25519
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.close()

@ -2,77 +2,12 @@
import logging
import time
from . import decode, keyring, protocol
from .. import factory, formats, util
from . import decode, device, keyring, protocol
from .. import util
log = logging.getLogger(__name__)
class HardwareSigner(object):
"""Sign messages and get public keys from a hardware device."""
def __init__(self, user_id, curve_name):
"""Connect to the device and retrieve required public key."""
self.client_wrapper = factory.load()
self.identity = self.client_wrapper.identity_type()
self.identity.proto = 'gpg'
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = util.get_bip32_address(identity=self.identity, ecdh=ecdh)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
return formats.decompress_pubkey(
pubkey=public_node.node.public_key,
curve_name=self.curve_name)
def sign(self, digest):
"""Sign the digest and return a serialized signature."""
result = self.client_wrapper.connection.sign_identity(
identity=self.identity,
challenge_hidden=digest,
challenge_visual='',
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=self.curve_name)
assert len(result.session_key) == 65
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
self.client_wrapper.connection.close()
class AgentSigner(object):
"""Sign messages and get public keys using gpg-agent tool."""
def __init__(self, user_id):
"""Connect to the agent and retrieve required public key."""
self.sock = keyring.connect_to_agent()
self.keygrip = keyring.get_keygrip(user_id)
def sign(self, digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
def close(self):
"""Close the connection to gpg-agent."""
self.sock.close()
def _time_format(t):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
@ -160,7 +95,7 @@ def create_subkey(primary_bytes, pubkey, signer_func):
log.info('confirm signing with primary key')
if not primary['_is_custom']:
signer_func = AgentSigner(primary['user_id']).sign
signer_func = keyring.create_agent_signer(primary['user_id'])
signature = protocol.make_signature(
signer_func=signer_func,
@ -175,13 +110,13 @@ def create_subkey(primary_bytes, pubkey, signer_func):
def load_from_public_key(pubkey_dict):
"""Load correct public key from the device."""
log.debug('pubkey_dict: %s', pubkey_dict)
user_id = pubkey_dict['user_id']
created = pubkey_dict['created']
curve_name = protocol.find_curve_by_algo_id(pubkey_dict['algo'])
assert curve_name in formats.SUPPORTED_CURVES
curve_name = protocol.get_curve_name_by_oid(pubkey_dict['curve_oid'])
ecdh = (pubkey_dict['algo'] == protocol.ECDH_ALGO_ID)
conn = HardwareSigner(user_id, curve_name=curve_name)
conn = device.HardwareSigner(user_id, curve_name=curve_name)
pubkey = protocol.PublicKey(
curve_name=curve_name, created=created,
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)

@ -204,3 +204,15 @@ def export_public_key(user_id, sp=subprocess):
log.error('could not find public key %r in local GPG keyring', user_id)
raise KeyError(user_id)
return result
def create_agent_signer(user_id):
"""Sign digest with existing GPG keys using gpg-agent tool."""
sock = connect_to_agent()
keygrip = get_keygrip(user_id)
def sign(digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
return sign_digest(sock=sock, keygrip=keygrip, digest=digest)
return sign

@ -128,6 +128,18 @@ def _keygrip_ed25519(vk):
])
def _keygrip_curve25519(vk):
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01\xDB\x41'],
['b', b'\x01'],
['g', util.num2bytes(0x04000000000000000000000000000000000000000000000000000000000000000920ae19a1b8a086b4e01edd2c7748d14c923d4d7e6d7c61b229e9c5a27eced3d9, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
@ -141,7 +153,13 @@ SUPPORTED_CURVES = {
'algo_id': 22,
'serialize': _serialize_ed25519,
'keygrip': _keygrip_ed25519,
}
},
formats.ECDH_CURVE25519: {
'oid': b'\x2B\x06\x01\x04\x01\x97\x55\x01\x05\x01',
'algo_id': 18,
'serialize': _serialize_ed25519,
'keygrip': _keygrip_curve25519,
},
}
ECDH_ALGO_ID = 18
@ -149,14 +167,12 @@ ECDH_ALGO_ID = 18
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
def find_curve_by_algo_id(algo_id):
"""Find curve name that matches a public key algorith ID."""
if algo_id == ECDH_ALGO_ID:
return formats.CURVE_NIST256
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
def get_curve_name_by_oid(oid):
"""Return curve name matching specified OID, or raise KeyError."""
for curve_name, info in SUPPORTED_CURVES.items():
if info['oid'] == oid:
return curve_name
raise KeyError('Unknown OID: {!r}'.format(oid))
class PublicKey(object):
@ -167,7 +183,7 @@ class PublicKey(object):
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.ecdh = ecdh
self.ecdh = bool(ecdh)
if ecdh:
self.algo_id = ECDH_ALGO_ID
self.ecdh_packet = b'\x03\x01\x08\x07'

@ -30,11 +30,6 @@ def test_mpi():
assert protocol.mpi(0x123) == b'\x00\x09\x01\x23'
def test_find():
assert protocol.find_curve_by_algo_id(19) == formats.CURVE_NIST256
assert protocol.find_curve_by_algo_id(22) == formats.CURVE_ED25519
def test_armor():
data = bytearray(range(256))
assert protocol.armor(data, 'TEST') == '''-----BEGIN PGP TEST-----

Loading…
Cancel
Save