Merge branch 'gpg-agent'

nistp521
Roman Zeyde 8 years ago
commit 59b39ce81f

@ -1,2 +1,2 @@
[MESSAGES CONTROL]
disable=invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking
disable=fixme, invalid-name, missing-docstring, locally-disabled, unbalanced-tuple-unpacking

@ -9,7 +9,6 @@ setup(
author_email='roman.zeyde@gmail.com',
url='http://github.com/romanz/trezor-agent',
packages=['trezor_agent', 'trezor_agent.gpg'],
scripts=['trezor_agent/gpg/trezor-git-gpg-wrapper.sh'],
install_requires=['ecdsa>=0.13', 'ed25519>=1.4', 'Cython>=0.23.4', 'protobuf>=2.6.1', 'trezor>=0.6.12', 'semver>=2.2'],
platforms=['POSIX'],
classifiers=[

@ -1,5 +1,6 @@
"""SSH-agent implementation using hardware authentication devices."""
import argparse
import functools
import logging
import os
import re
@ -115,6 +116,19 @@ def run_server(conn, public_key, command, debug, timeout):
log.info('server stopped')
def handle_connection_error(func):
"""Fail with non-zero exit code."""
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except IOError as e:
log.error('Connection error: %s', e)
return 1
return wrapper
@handle_connection_error
def run_agent(client_factory=client.Client):
"""Run ssh-agent using given hardware client factory."""
args = create_agent_parser().parse_args()
@ -143,6 +157,7 @@ def run_agent(client_factory=client.Client):
debug=args.debug, timeout=args.timeout)
@handle_connection_error
def run_git(client_factory=client.Client):
"""Run git under ssh-agent using given hardware client factory."""
args = create_git_parser().parse_args()

@ -128,8 +128,8 @@ def identity_to_string(identity):
return ''.join(result)
def get_address(identity):
"""Compute BIP32 derivation address for SignIdentity API."""
def get_address(identity, ecdh=False):
"""Compute BIP32 derivation address according to SLIP-0013/0017."""
index = struct.pack('<L', identity.index)
addr = index + identity_to_string(identity).encode('ascii')
log.debug('address string: %r', addr)
@ -137,7 +137,8 @@ def get_address(identity):
s = io.BytesIO(bytearray(digest))
hardened = 0x80000000
address_n = [13] + list(util.recv(s, '<LLLL'))
addr_0 = [13, 17][bool(ecdh)]
address_n = [addr_0] + list(util.recv(s, '<LLLL'))
return [(hardened | value) for value in address_n]

@ -1,16 +1,17 @@
# Using TREZOR as hardware GPG agent
## Generate new GPG signing key:
First, verify that you have GPG 2.1+ [installed](https://gist.github.com/vt0r/a2f8c0bcb1400131ff51):
```
$ gpg2 --version | head -n1
gpg (GnuPG) 2.1.11
```
Update you TREZOR firmware to the latest version (at least [5430c82](https://github.com/trezor/trezor-mcu/commit/5430c82b2b1dbdd43c89de419ef92b754bed4c91)): see [a sample build log](https://gist.github.com/romanz/324c8e513abf5b5a452602ed648fa2cf).
Install the latest development version of `trezor-agent`:
Update you TREZOR firmware to the latest version (at least [c720614](https://github.com/trezor/trezor-mcu/commit/c720614f6e9b9c07f446c95bda0257980d942871)).
Install latest `trezor-agent` package from [gpg-agent](https://github.com/romanz/trezor-agent/commits/gpg-agent) branch:
```
$ pip install git+https://github.com/romanz/trezor-agent.git@master
$ pip install git+https://github.com/romanz/trezor-agent.git@gpg-agent
```
Define your GPG user ID as an environment variable:
@ -20,38 +21,80 @@ $ export TREZOR_GPG_USER_ID="John Doe <john@doe.bit>"
There are two ways to generate TREZOR-based GPG public keys, as described below.
### (1) create new GPG identity:
## 1. generate a new GPG identity:
```
$ trezor-gpg create > identity.pub # create new TREZOR-based GPG identity
$ gpg2 --import identity.pub # import into local GPG public keyring
$ gpg2 --list-keys # verify that the new identity is created correctly
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # OPTIONAL: mark the key as trusted
$ trezor-gpg create | gpg2 --import # use the TREZOR to confirm signing the primary key
gpg: key 5E4D684D: public key "John Doe <john@doe.bit>" imported
gpg: Total number processed: 1
gpg: imported: 1
$ gpg2 --edit "${TREZOR_GPG_USER_ID}" trust # set this key to ultimate trust (option #5)
$ gpg2 -k
/home/roman/.gnupg/pubring.kbx
------------------------------
pub nistp256/5E4D684D 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub nistp256/A31D9E25 2016-06-17 [E]
```
[![asciicast](https://asciinema.org/a/44880.png)](https://asciinema.org/a/44880)
### (2) create new subkey for an existing GPG identity:
## 2. generate a new subkey for an existing GPG identity:
```
$ gpg2 --list-keys "${TREZOR_GPG_USER_ID}" # make sure this identity already exists
$ trezor-gpg create --subkey > identity.pub # create new TREZOR-based GPG subkey
$ gpg2 --import identity.pub # append it to an existing identity
$ gpg2 --list-keys "${TREZOR_GPG_USER_ID}" # verify that the new subkey is added to keyring
$ gpg2 -k # suppose there is already a GPG primary key
/home/roman/.gnupg/pubring.kbx
------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E]
$ trezor-gpg create --subkey | gpg2 --import # use the TREZOR to confirm signing the subkey
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new signatures
gpg: key 87BB07B4: "John Doe <john@doe.bit>" 2 new subkeys
gpg: Total number processed: 1
gpg: new subkeys: 2
gpg: new signatures: 2
$ gpg2 -k
/home/roman/.gnupg/pubring.kbx
------------------------------
pub rsa2048/87BB07B4 2016-06-17 [SC]
uid [ultimate] John Doe <john@doe.bit>
sub rsa2048/7176D31F 2016-06-17 [E]
sub nistp256/DDE80B36 2016-06-17 [S]
sub nistp256/E3D0BA19 2016-06-17 [E]
```
[![subkey](https://asciinema.org/a/8t78s6pqo5yocisaiolqnjp63.png)](https://asciinema.org/a/8t78s6pqo5yocisaiolqnjp63)
## Generate GPG signatures using a TREZOR device:
# Usage examples:
## Start the TREZOR-based gpg-agent:
```
$ trezor-gpg sign EXAMPLE # confirm signature using the device
$ gpg2 --verify EXAMPLE.asc # verify using standard GPG binary
$ trezor-gpg agent &
```
Note: this agent intercepts all GPG requests, so make sure to close it (e.g. by using `killall trezor-gpg`),
when you are done with the TREZOR-based GPG operations.
## Sign and verify GPG messages:
```
$ echo "Hello World!" | gpg2 --sign | gpg2 --verify
gpg: Signature made Fri 17 Jun 2016 08:55:13 PM IDT using ECDSA key ID 5E4D684D
gpg: Good signature from "Roman Zeyde <roman.zeyde@gmail.com>" [ultimate]
```
## Encrypt and decrypt GPG messages:
```
$ date | gpg2 --encrypt -r "${TREZOR_GPG_USER_ID}" | gpg2 --decrypt
gpg: encrypted with 256-bit ECDH key, ID A31D9E25, created 2016-06-17
"Roman Zeyde <roman.zeyde@gmail.com>"
Fri Jun 17 20:55:31 IDT 2016
```
[![sign](https://asciinema.org/a/f1unkptesb7anq09i8wugoko6.png)](https://asciinema.org/a/f1unkptesb7anq09i8wugoko6)
## Git commit & tag signatures:
Git can use GPG to sign and verify commits and tags (see [here](https://git-scm.com/book/en/v2/Git-Tools-Signing-Your-Work)):
```
$ git config --local gpg.program "trezor-git-gpg-wrapper.sh"
$ git config --local gpg.program gpg2
$ git commit --gpg-sign # create GPG-signed commit
$ git log --show-signature -1 # verify commit signature
$ git tag --sign "TAG" # create GPG-signed tag
$ git verify-tag "TAG" # verify tag signature
```
[![asciicast](https://asciinema.org/a/44879.png)](https://asciinema.org/a/44879)
```

@ -3,11 +3,12 @@
import argparse
import contextlib
import logging
import os
import sys
import time
import os
from . import decode, encode, keyring, proto
from . import agent, encode, keyring, proto
from .. import server
log = logging.getLogger(__name__)
@ -15,47 +16,54 @@ log = logging.getLogger(__name__)
def run_create(args):
"""Generate a new pubkey for a new/existing GPG identity."""
user_id = os.environ['TREZOR_GPG_USER_ID']
f = encode.Factory(user_id=user_id, created=args.time,
curve_name=args.ecdsa_curve)
with contextlib.closing(f):
if args.subkey:
primary_key = keyring.export_public_key(user_id=user_id)
result = f.create_subkey(primary_bytes=primary_key)
else:
result = f.create_primary()
conn = encode.HardwareSigner(user_id=user_id,
curve_name=args.ecdsa_curve)
verifying_key = conn.pubkey(ecdh=False)
decryption_key = conn.pubkey(ecdh=True)
if args.subkey:
primary_bytes = keyring.export_public_key(user_id=user_id)
# subkey for signing
signing_key = proto.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
encryption_key = proto.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
result = encode.create_subkey(primary_bytes=primary_bytes,
pubkey=signing_key,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=encryption_key,
signer_func=conn.sign)
else:
# primary key for signing
primary = proto.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=verifying_key, ecdh=False)
# subkey for encryption
subkey = proto.PublicKey(
curve_name=args.ecdsa_curve, created=args.time,
verifying_key=decryption_key, ecdh=True)
result = encode.create_primary(user_id=user_id,
pubkey=primary,
signer_func=conn.sign)
result = encode.create_subkey(primary_bytes=result,
pubkey=subkey,
signer_func=conn.sign)
sys.stdout.write(proto.armor(result, 'PUBLIC KEY BLOCK'))
def run_sign(args):
"""Generate a GPG signature using hardware-based device."""
pubkey = decode.load_public_key(keyring.export_public_key(user_id=None),
use_custom=True)
f = encode.Factory.from_public_key(pubkey=pubkey,
user_id=pubkey['user_id'])
with contextlib.closing(f):
if args.filename:
data = open(args.filename, 'rb').read()
else:
data = sys.stdin.read()
sig = f.sign_message(data)
sig = proto.armor(sig, 'SIGNATURE').encode('ascii')
decode.verify(pubkey=pubkey, signature=sig, original_data=data)
filename = '-' # write to stdout
if args.output:
filename = args.output
elif args.filename:
filename = args.filename + '.asc'
if filename == '-':
output = sys.stdout
else:
output = open(filename, 'wb')
output.write(sig)
def run_agent(args):
"""Run a simple GPG-agent server."""
sock_path = os.path.expanduser(args.sock_path)
with server.unix_domain_socket_server(sock_path) as sock:
for conn in agent.yield_connections(sock):
with contextlib.closing(conn):
agent.handle_connection(conn)
def main():
@ -66,18 +74,15 @@ def main():
subparsers.required = True
subparsers.dest = 'command'
create = subparsers.add_parser('create')
create.add_argument('-s', '--subkey', action='store_true', default=False)
create.add_argument('-e', '--ecdsa-curve', default='nist256p1')
create.add_argument('-t', '--time', type=int, default=int(time.time()))
create.set_defaults(run=run_create)
sign = subparsers.add_parser('sign')
sign.add_argument('filename', nargs='?',
help='Use stdin, if not specified.')
sign.add_argument('-o', '--output', default=None,
help='Use stdout, if equals to "-".')
sign.set_defaults(run=run_sign)
create_cmd = subparsers.add_parser('create')
create_cmd.add_argument('-s', '--subkey', action='store_true', default=False)
create_cmd.add_argument('-e', '--ecdsa-curve', default='nist256p1')
create_cmd.add_argument('-t', '--time', type=int, default=int(time.time()))
create_cmd.set_defaults(run=run_create)
agent_cmd = subparsers.add_parser('agent')
agent_cmd.add_argument('-s', '--sock-path', default='~/.gnupg/S.gpg-agent')
agent_cmd.set_defaults(run=run_agent)
args = p.parse_args()
logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,

@ -0,0 +1,142 @@
"""GPG-agent utilities."""
import binascii
import contextlib
import logging
from . import decode, encode, keyring
from .. import util
log = logging.getLogger(__name__)
def yield_connections(sock):
"""Run a server on the specified socket."""
while True:
log.debug('waiting for connection on %s', sock.getsockname())
try:
conn, _ = sock.accept()
except KeyboardInterrupt:
return
conn.settimeout(None)
log.debug('accepted connection on %s', sock.getsockname())
yield conn
def serialize(data):
"""Serialize data according to ASSUAN protocol."""
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return data
def sig_encode(r, s):
"""Serialize ECDSA signature data into GPG S-expression."""
r = serialize(util.num2bytes(r, 32))
s = serialize(util.num2bytes(s, 32))
return '(7:sig-val(5:ecdsa(1:r32:{})(1:s32:{})))'.format(r, s)
def pksign(keygrip, digest, algo):
"""Sign a message digest using a private EC key."""
assert algo == '8'
pubkey_dict = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=None),
use_custom=True, ecdh=False)
pubkey, conn = encode.load_from_public_key(pubkey_dict=pubkey_dict)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
r, s = conn.sign(binascii.unhexlify(digest))
result = sig_encode(r, s)
log.debug('result: %r', result)
return result
def _serialize_point(data):
data = '{}:'.format(len(data)) + data
# https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html
for c in ['%', '\n', '\r']:
data = data.replace(c, '%{:02X}'.format(ord(c)))
return '(5:value' + data + ')'
def parse_ecdh(line):
"""Parse ECDH request and return remote public key."""
prefix, line = line.split(' ', 1)
assert prefix == 'D'
exp, leftover = keyring.parse(keyring.unescape(line))
log.debug('ECDH s-exp: %r', exp)
assert not leftover
label, exp = exp
assert label == b'enc-val'
assert exp[0] == b'ecdh'
items = exp[1:]
log.debug('ECDH parameters: %r', items)
return dict(items)['e']
def pkdecrypt(keygrip, conn):
"""Handle decryption using ECDH."""
for msg in [b'S INQUIRE_MAXLEN 4096', b'INQUIRE CIPHERTEXT']:
keyring.sendline(conn, msg)
line = keyring.recvline(conn)
assert keyring.recvline(conn) == b'END'
remote_pubkey = parse_ecdh(line)
local_pubkey = decode.load_public_key(
pubkey_bytes=keyring.export_public_key(user_id=None),
use_custom=True, ecdh=True)
pubkey, conn = encode.load_from_public_key(pubkey_dict=local_pubkey)
with contextlib.closing(conn):
assert pubkey.keygrip == binascii.unhexlify(keygrip)
shared_secret = conn.ecdh(remote_pubkey)
assert len(shared_secret) == 65
assert shared_secret[:1] == b'\x04'
return _serialize_point(shared_secret)
def iterlines(conn):
"""Iterate over input, split by lines."""
while True:
line = keyring.recvline(conn)
if line is None:
break
yield line
def handle_connection(conn):
"""Handle connection from GPG binary using the ASSUAN protocol."""
keygrip = None
digest = None
algo = None
version = keyring.gpg_version()
keyring.sendline(conn, b'OK')
for line in iterlines(conn):
parts = line.split(' ')
command = parts[0]
args = parts[1:]
if command in {'RESET', 'OPTION', 'HAVEKEY', 'SETKEYDESC'}:
pass # reply with OK
elif command == 'GETINFO':
keyring.sendline(conn, b'D ' + version)
elif command == 'AGENT_ID':
keyring.sendline(conn, b'D TREZOR')
elif command in {'SIGKEY', 'SETKEY'}:
keygrip, = args
elif command == 'SETHASH':
algo, digest = args
elif command == 'PKSIGN':
sig = pksign(keygrip, digest, algo)
keyring.sendline(conn, b'D ' + sig)
elif command == 'PKDECRYPT':
sec = pkdecrypt(keygrip, conn)
keyring.sendline(conn, b'D ' + sec)
elif command == 'BYE':
return
else:
log.error('unknown request: %r', line)
return
keyring.sendline(conn, b'OK')

@ -268,7 +268,7 @@ def digest_packets(packets):
return hashlib.sha256(data_to_hash.getvalue()).digest()
def load_public_key(pubkey_bytes, use_custom=False):
def load_public_key(pubkey_bytes, use_custom=False, ecdh=False):
"""Parse and validate GPG public key from an input stream."""
stream = io.BytesIO(pubkey_bytes)
packets = list(parse_packets(util.Reader(stream)))
@ -288,13 +288,15 @@ def load_public_key(pubkey_bytes, use_custom=False):
packet = pubkey
while use_custom:
if packet['type'] in ('pubkey', 'subkey') and signature['_is_custom']:
log.debug('found custom %s', packet['type'])
break
if ecdh == (packet['algo'] == proto.ECDH_ALGO_ID):
log.debug('found custom %s', packet['type'])
break
packet, signature = packets[:2]
packets = packets[2:]
packet['user_id'] = userid['value']
packet['_is_custom'] = signature['_is_custom']
return packet

@ -19,9 +19,9 @@ class HardwareSigner(object):
self.identity.host = user_id
self.curve_name = curve_name
def pubkey(self):
def pubkey(self, ecdh=False):
"""Return public key as VerifyingKey object."""
addr = client.get_address(self.identity)
addr = client.get_address(identity=self.identity, ecdh=ecdh)
public_node = self.client_wrapper.connection.get_public_node(
n=addr, ecdsa_curve_name=self.curve_name)
@ -38,8 +38,17 @@ class HardwareSigner(object):
ecdsa_curve_name=self.curve_name)
assert result.signature[:1] == b'\x00'
sig = result.signature[1:]
return (proto.mpi(util.bytes2num(sig[:32])) +
proto.mpi(util.bytes2num(sig[32:])))
return (util.bytes2num(sig[:32]), util.bytes2num(sig[32:]))
def ecdh(self, pubkey):
"""Derive shared secret using ECDH from remote public key."""
result = self.client_wrapper.connection.get_ecdh_session_key(
identity=self.identity,
peer_public_key=pubkey,
ecdsa_curve_name=self.curve_name)
assert len(result.session_key) == 65
assert result.session_key[:1] == b'\x04'
return result.session_key
def close(self):
"""Close the connection to the device."""
@ -57,9 +66,8 @@ class AgentSigner(object):
def sign(self, digest):
"""Sign the digest and return an ECDSA/RSA/DSA signature."""
params = keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
return b''.join(proto.mpi(p) for p in params)
return keyring.sign_digest(sock=self.sock,
keygrip=self.keygrip, digest=digest)
def close(self):
"""Close the connection to gpg-agent."""
@ -70,129 +78,133 @@ def _time_format(t):
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(t))
class Factory(object):
"""Performs GPG signing operations."""
def __init__(self, user_id, created, curve_name):
"""Construct and loads a public key from the device."""
self.user_id = user_id
assert curve_name in formats.SUPPORTED_CURVES
self.conn = HardwareSigner(user_id, curve_name=curve_name)
self.pubkey = proto.PublicKey(
curve_name=curve_name, created=created,
verifying_key=self.conn.pubkey())
log.info('%s created at %s for "%s"',
self.pubkey, _time_format(self.pubkey.created), user_id)
@classmethod
def from_public_key(cls, pubkey, user_id):
"""Create from an existing GPG public key."""
s = cls(user_id=user_id,
created=pubkey['created'],
curve_name=proto.find_curve_by_algo_id(pubkey['algo']))
assert s.pubkey.key_id() == pubkey['key_id']
return s
def close(self):
"""Close connection and turn off the screen of the device."""
self.conn.close()
def create_primary(self):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = proto.packet(tag=6, blob=self.pubkey.data())
user_id_packet = proto.packet(tag=13,
blob=self.user_id.encode('ascii'))
data_to_sign = (self.pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', self.user_id.encode('ascii')))
log.info('signing public key "%s"', self.user_id)
hashed_subpackets = [
proto.subpacket_time(self.pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.4
proto.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.21
proto.subpacket_byte(0x15, 8), # preferred hash (SHA256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.8
proto.subpacket_byte(0x16, 0), # preferred compression (none)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.9
proto.subpacket_byte(0x17, 0x80) # key server prefs (no-modify)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.17
]
unhashed_subpackets = [
proto.subpacket(16, self.pubkey.key_id()), # issuer key id
proto.CUSTOM_SUBPACKET]
signature = proto.make_signature(
signer_func=self.conn.sign,
public_algo=self.pubkey.algo_id,
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = proto.packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(self, primary_bytes):
"""Export new subkey to `self.user_id` GPG primary key."""
subkey_packet = proto.packet(tag=14, blob=self.pubkey.data())
primary = decode.load_public_key(primary_bytes)
log.info('adding subkey to primary GPG key "%s" (%s)',
self.user_id, util.hexlify(primary['key_id']))
data_to_sign = primary['_to_hash'] + self.pubkey.data_to_hash()
def create_primary(user_id, pubkey, signer_func):
"""Export new primary GPG public key, ready for "gpg2 --import"."""
pubkey_packet = proto.packet(tag=6, blob=pubkey.data())
user_id_packet = proto.packet(tag=13,
blob=user_id.encode('ascii'))
data_to_sign = (pubkey.data_to_hash() +
user_id_packet[:1] +
util.prefix_len('>L', user_id.encode('ascii')))
log.info('creating primary GPG key "%s"', user_id)
hashed_subpackets = [
proto.subpacket_time(pubkey.created), # signature time
# https://tools.ietf.org/html/rfc4880#section-5.2.3.7
proto.subpacket_byte(0x0B, 9), # preferred symmetric algo (AES-256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.4
proto.subpacket_byte(0x1B, 1 | 2), # key flags (certify & sign)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.21
proto.subpacket_byte(0x15, 8), # preferred hash (SHA256)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.8
proto.subpacket_byte(0x16, 0), # preferred compression (none)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.9
proto.subpacket_byte(0x17, 0x80) # key server prefs (no-modify)
# https://tools.ietf.org/html/rfc4880#section-5.2.3.17
]
unhashed_subpackets = [
proto.subpacket(16, pubkey.key_id()), # issuer key id
proto.CUSTOM_SUBPACKET]
log.info('confirm signing with primary key')
signature = proto.make_signature(
signer_func=signer_func,
public_algo=pubkey.algo_id,
data_to_sign=data_to_sign,
sig_type=0x13, # user id & public key
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = proto.packet(tag=2, blob=signature)
return pubkey_packet + user_id_packet + sign_packet
def create_subkey(primary_bytes, pubkey, signer_func):
"""Export new subkey to GPG primary key."""
subkey_packet = proto.packet(tag=14, blob=pubkey.data())
primary = decode.load_public_key(primary_bytes)
log.info('adding subkey to primary GPG key "%s"', primary['user_id'])
data_to_sign = primary['_to_hash'] + pubkey.data_to_hash()
if pubkey.ecdh:
embedded_sig = None
else:
# Primary Key Binding Signature
hashed_subpackets = [
proto.subpacket_time(self.pubkey.created)] # signature time
proto.subpacket_time(pubkey.created)] # signature time
unhashed_subpackets = [
proto.subpacket(16, self.pubkey.key_id())] # issuer key id
log.info('confirm signing subkey with hardware device')
proto.subpacket(16, pubkey.key_id())] # issuer key id
log.info('confirm signing with new subkey')
embedded_sig = proto.make_signature(
signer_func=self.conn.sign,
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=self.pubkey.algo_id,
public_algo=pubkey.algo_id,
sig_type=0x19,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
# Subkey Binding Signature
hashed_subpackets = [
proto.subpacket_time(self.pubkey.created), # signature time
proto.subpacket_byte(0x1B, 2)] # key flags (certify & sign)
unhashed_subpackets = [
proto.subpacket(16, primary['key_id']), # issuer key id
proto.subpacket(32, embedded_sig),
proto.CUSTOM_SUBPACKET]
log.info('confirm signing subkey with gpg-agent')
gpg_agent = AgentSigner(self.user_id)
signature = proto.make_signature(
signer_func=gpg_agent.sign,
data_to_sign=data_to_sign,
public_algo=primary['algo'],
sig_type=0x18,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = proto.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet
def sign_message(self, msg, sign_time=None):
"""Sign GPG message at specified time."""
if sign_time is None:
sign_time = int(time.time())
log.info('signing %d byte message at %s',
len(msg), _time_format(sign_time))
hashed_subpackets = [proto.subpacket_time(sign_time)]
unhashed_subpackets = [
proto.subpacket(16, self.pubkey.key_id())] # issuer key id
blob = proto.make_signature(
signer_func=self.conn.sign,
data_to_sign=msg,
public_algo=self.pubkey.algo_id,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
return proto.packet(tag=2, blob=blob)
# Subkey Binding Signature
# Key flags: https://tools.ietf.org/html/rfc4880#section-5.2.3.21
# (certify & sign) (encrypt)
flags = (2) if (not pubkey.ecdh) else (4 | 8)
hashed_subpackets = [
proto.subpacket_time(pubkey.created), # signature time
proto.subpacket_byte(0x1B, flags)]
unhashed_subpackets = []
unhashed_subpackets.append(proto.subpacket(16, primary['key_id']))
if embedded_sig is not None:
unhashed_subpackets.append(proto.subpacket(32, embedded_sig))
unhashed_subpackets.append(proto.CUSTOM_SUBPACKET)
log.info('confirm signing with primary key')
if not primary['_is_custom']:
signer_func = AgentSigner(primary['user_id']).sign
signature = proto.make_signature(
signer_func=signer_func,
data_to_sign=data_to_sign,
public_algo=primary['algo'],
sig_type=0x18,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
sign_packet = proto.packet(tag=2, blob=signature)
return primary_bytes + subkey_packet + sign_packet
def sign_message(signer_func, msg, pubkey, sign_time):
"""Sign GPG message at specified time."""
log.info('signing %d byte message at %s',
len(msg), _time_format(sign_time))
hashed_subpackets = [proto.subpacket_time(sign_time)]
unhashed_subpackets = [
proto.subpacket(16, pubkey.key_id())] # issuer key id
blob = proto.make_signature(
signer_func=signer_func,
data_to_sign=msg,
public_algo=pubkey.algo_id,
hashed_subpackets=hashed_subpackets,
unhashed_subpackets=unhashed_subpackets)
return proto.packet(tag=2, blob=blob)
def load_from_public_key(pubkey_dict):
"""Load correct public key from the device."""
user_id = pubkey_dict['user_id']
created = pubkey_dict['created']
curve_name = proto.find_curve_by_algo_id(pubkey_dict['algo'])
assert curve_name in formats.SUPPORTED_CURVES
ecdh = (pubkey_dict['algo'] == proto.ECDH_ALGO_ID)
conn = HardwareSigner(user_id, curve_name=curve_name)
pubkey = proto.PublicKey(
curve_name=curve_name, created=created,
verifying_key=conn.pubkey(ecdh=ecdh), ecdh=ecdh)
assert pubkey.key_id() == pubkey_dict['key_id']
log.info('%s created at %s for "%s"',
pubkey, _time_format(pubkey.created), user_id)
return pubkey, conn

@ -22,26 +22,33 @@ def connect_to_agent(sock_path='~/.gnupg/S.gpg-agent', sp=subprocess):
return sock
def _communicate(sock, msg):
msg += '\n'
sock.sendall(msg.encode('ascii'))
log.debug('-> %r', msg)
return _recvline(sock)
def communicate(sock, msg):
"""Send a message and receive a single line."""
sendline(sock, msg.encode('ascii'))
return recvline(sock)
def _recvline(sock):
def sendline(sock, msg):
"""Send a binary message, followed by EOL."""
log.debug('<- %r', msg)
sock.sendall(msg + b'\n')
def recvline(sock):
"""Receive a single line from the socket."""
reply = io.BytesIO()
while True:
c = sock.recv(1)
if not c:
raise EOFError
return None # socket is closed
if c == b'\n':
break
reply.write(c)
result = reply.getvalue()
log.debug('<- %r', result)
log.debug('-> %r', result)
return result
@ -86,8 +93,9 @@ def _parse_ecdsa_sig(args):
return (util.bytes2num(sig_r),
util.bytes2num(sig_s))
# DSA happens to have the same structure as ECDSA signatures
# DSA and EDDSA happen to have the same structure as ECDSA signatures
_parse_dsa_sig = _parse_ecdsa_sig
_parse_eddsa_sig = _parse_ecdsa_sig
def _parse_rsa_sig(args):
@ -103,6 +111,7 @@ def parse_sig(sig):
algo_name = sig[0]
parser = {b'rsa': _parse_rsa_sig,
b'ecdsa': _parse_ecdsa_sig,
b'eddsa': _parse_eddsa_sig,
b'dsa': _parse_dsa_sig}[algo_name]
return parser(args=sig[1:])
@ -112,7 +121,7 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
hash_algo = 8 # SHA256
assert len(digest) == 32
assert _communicate(sock, 'RESET').startswith(b'OK')
assert communicate(sock, 'RESET').startswith(b'OK')
ttyname = sp.check_output(['tty']).strip()
options = ['ttyname={}'.format(ttyname)] # set TTY for passphrase entry
@ -122,17 +131,17 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None):
options.append('display={}'.format(display))
for opt in options:
assert _communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert communicate(sock, 'OPTION {}'.format(opt)) == b'OK'
assert _communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
assert communicate(sock, 'SIGKEY {}'.format(keygrip)) == b'OK'
hex_digest = binascii.hexlify(digest).upper().decode('ascii')
assert _communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
assert communicate(sock, 'SETHASH {} {}'.format(hash_algo,
hex_digest)) == b'OK'
assert _communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert _communicate(sock, 'PKSIGN') == b'OK'
line = _recvline(sock).strip()
assert communicate(sock, 'SETKEYDESC '
'Sign+a+new+TREZOR-based+subkey') == b'OK'
assert communicate(sock, 'PKSIGN') == b'OK'
line = recvline(sock).strip()
line = unescape(line)
log.debug('unescaped: %r', line)
prefix, sig = line.split(b' ', 1)
@ -151,6 +160,14 @@ def get_keygrip(user_id, sp=subprocess):
return re.findall(r'Keygrip = (\w+)', output)[0]
def gpg_version(sp=subprocess):
"""Get a keygrip of the primary GPG key of the specified user."""
args = ['gpg2', '--version']
output = sp.check_output(args).decode('ascii')
line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11'
return line.split(b' ')[-1] # b'2.1.11'
def export_public_key(user_id, sp=subprocess):
"""Export GPG public key for specified `user_id`."""
args = ['gpg2', '--export'] + ([user_id] if user_id else [])

@ -77,26 +77,70 @@ def _serialize_ed25519(vk):
util.bytes2num(vk.to_bytes()))
def _compute_keygrip(params):
parts = []
for name, value in params:
exp = '1:{}{}:'.format(name, len(value))
parts.append(b'(' + exp.encode('ascii') + value + b')')
return hashlib.sha1(b''.join(parts)).digest()
def _keygrip_nist256(vk):
curve = vk.curve.curve
gen = vk.curve.generator
g = (4 << 512) | (gen.x() << 256) | gen.y()
point = vk.pubkey.point
q = (4 << 512) | (point.x() << 256) | point.y()
return _compute_keygrip([
['p', util.num2bytes(curve.p(), size=32)],
['a', util.num2bytes(curve.a() % curve.p(), size=32)],
['b', util.num2bytes(curve.b() % curve.p(), size=32)],
['g', util.num2bytes(g, size=65)],
['n', util.num2bytes(vk.curve.order, size=32)],
['q', util.num2bytes(q, size=65)],
])
def _keygrip_ed25519(vk):
# pylint: disable=line-too-long
return _compute_keygrip([
['p', util.num2bytes(0x7FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFED, size=32)], # nopep8
['a', b'\x01'],
['b', util.num2bytes(0x2DFC9311D490018C7338BF8688861767FF8FF5B2BEBE27548A14B235ECA6874A, size=32)], # nopep8
['g', util.num2bytes(0x04216936D3CD6E53FEC0A4E231FDD6DC5C692CC7609525A7B2C9562D608F25D51A6666666666666666666666666666666666666666666666666666666666666658, size=65)], # nopep8
['n', util.num2bytes(0x1000000000000000000000000000000014DEF9DEA2F79CD65812631A5CF5D3ED, size=32)], # nopep8
['q', vk.to_bytes()],
])
SUPPORTED_CURVES = {
formats.CURVE_NIST256: {
# https://tools.ietf.org/html/rfc6637#section-11
'oid': b'\x2A\x86\x48\xCE\x3D\x03\x01\x07',
'algo_id': 19,
'serialize': _serialize_nist256
'serialize': _serialize_nist256,
'keygrip': _keygrip_nist256,
},
formats.CURVE_ED25519: {
'oid': b'\x2B\x06\x01\x04\x01\xDA\x47\x0F\x01',
'algo_id': 22,
'serialize': _serialize_ed25519
'serialize': _serialize_ed25519,
'keygrip': _keygrip_ed25519,
}
}
ECDH_ALGO_ID = 18
CUSTOM_SUBPACKET = subpacket(100, b'TREZOR-GPG') # marks "our" pubkey
def find_curve_by_algo_id(algo_id):
"""Find curve name that matches a public key algorith ID."""
if algo_id == ECDH_ALGO_ID:
return formats.CURVE_NIST256
curve_name, = [name for name, info in SUPPORTED_CURVES.items()
if info['algo_id'] == algo_id]
return curve_name
@ -105,15 +149,27 @@ def find_curve_by_algo_id(algo_id):
class PublicKey(object):
"""GPG representation for public key packets."""
def __init__(self, curve_name, created, verifying_key):
def __init__(self, curve_name, created, verifying_key, ecdh=False):
"""Contruct using a ECDSA VerifyingKey object."""
self.curve_info = SUPPORTED_CURVES[curve_name]
self.created = int(created) # time since Epoch
self.verifying_key = verifying_key
self.algo_id = self.curve_info['algo_id']
self.ecdh = ecdh
if ecdh:
self.algo_id = ECDH_ALGO_ID
self.ecdh_packet = b'\x03\x01\x08\x07'
else:
self.algo_id = self.curve_info['algo_id']
self.ecdh_packet = b''
hex_key_id = util.hexlify(self.key_id())[-8:]
self.desc = 'GPG public key {}/{}'.format(curve_name, hex_key_id)
@property
def keygrip(self):
"""Compute GPG2 keygrip."""
return self.curve_info['keygrip'](self.verifying_key)
def data(self):
"""Data for packet creation."""
header = struct.pack('>BLB',
@ -122,7 +178,7 @@ class PublicKey(object):
self.algo_id) # public key algorithm ID
oid = util.prefix_len('>B', self.curve_info['oid'])
blob = self.curve_info['serialize'](self.verifying_key)
return header + oid + blob
return header + oid + blob + self.ecdh_packet
def data_to_hash(self):
"""Data for digest computation."""
@ -175,7 +231,8 @@ def make_signature(signer_func, data_to_sign, public_algo,
log.debug('hashing %d bytes', len(data_to_hash))
digest = hashlib.sha256(data_to_hash).digest()
log.debug('signing digest: %s', util.hexlify(digest))
sig = signer_func(digest=digest)
params = signer_func(digest=digest)
sig = b''.join(mpi(p) for p in params)
return bytes(header + hashed + unhashed +
digest[:2] + # used for decoder's sanity check

@ -55,7 +55,7 @@ def test_make_signature():
def signer_func(digest):
assert digest == (b'\xd0\xe5]|\x8bP\xe6\x91\xb3\xe8+\xf4A\xf0`(\xb1'
b'\xc7\xf4;\x86\x97s\xdb\x9a\xda\xee< \xcb\x9e\x00')
return b'SIGNATURE'
return (7, 8)
sig = proto.make_signature(
signer_func=signer_func,
@ -65,7 +65,7 @@ def test_make_signature():
unhashed_subpackets=[],
sig_type=25)
assert sig == (b'\x04\x19\x16\x08\x00\x06\x05\x02'
b'\x00\x00\x00\x01\x00\x00\xd0\xe5SIGNATURE')
b'\x00\x00\x00\x01\x00\x00\xd0\xe5\x00\x03\x07\x00\x04\x08')
def test_nist256p1():
@ -74,6 +74,16 @@ def test_nist256p1():
pk = proto.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key nist256p1/F82361D9'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_nist256p1_ecdh():
sk = ecdsa.SigningKey.from_secret_exponent(secexp=1, curve=ecdsa.NIST256p)
vk = sk.get_verifying_key()
pk = proto.PublicKey(curve_name=formats.CURVE_NIST256,
created=42, verifying_key=vk, ecdh=True)
assert repr(pk) == 'GPG public key nist256p1/5811DF46'
assert pk.keygrip == b'\x95\x85.\x91\x7f\xe2\xc3\x91R\xba\x99\x81\x92\xb5y\x1d\xb1\\\xdc\xf0'
def test_ed25519():
@ -82,3 +92,4 @@ def test_ed25519():
pk = proto.PublicKey(curve_name=formats.CURVE_ED25519,
created=42, verifying_key=vk)
assert repr(pk) == 'GPG public key ed25519/36B40FE6'
assert pk.keygrip == b'\xbf\x01\x90l\x17\xb64\xa3-\xf4\xc0gr\x99\x18<\xddBQ?'

@ -1,7 +0,0 @@
#!/bin/bash
if [[ "$*" == *"--verify"* ]]
then
gpg2 $* # verify using GPG2 (for ECDSA and EdDSA keys)
else
trezor-gpg sign -o- # sign using TREZOR and write the signature to stdout
fi
Loading…
Cancel
Save