diff --git a/trezor_agent/gpg/README b/trezor_agent/gpg/README new file mode 100644 index 0000000..b080a8a --- /dev/null +++ b/trezor_agent/gpg/README @@ -0,0 +1,22 @@ +# Generate new stand-along GPG identity + +``` +$ USER_ID="Satoshi Nakamoto " +$ trezor-gpg create "${USER_ID}" > identity.pub # create new TREZOR-based GPG identity +$ gpg2 --import identity.pub # import into local GPG public keyring +$ gpg2 --edit "${USER_ID}" trust # OPTIONAL: mark the key as trusted +``` + +# Generate new subkey for existing GPG identity +``` +$ USER_ID="Satoshi Nakamoto " +$ gpg2 --list-keys "${USER_ID}" # make sure this identity already exists +$ trezor-gpg create --subkey "${USER_ID}" > identity.pub # create new TREZOR-based GPG public key +$ gpg2 --import identity.pub # append it to existing identity +``` + +# Generate signatures using the TREZOR device +``` +$ trezor-gpg sign EXAMPLE > EXAMPLE.sig # confirm signature using the device +$ gpg2 --verify EXAMPLE.sig # verify using standard GPG binary +``` diff --git a/trezor_agent/gpg/check.py b/trezor_agent/gpg/check.py deleted file mode 100755 index f31f6dd..0000000 --- a/trezor_agent/gpg/check.py +++ /dev/null @@ -1,51 +0,0 @@ -#!/usr/bin/env python -"""Check GPG v2 signature for a given public key.""" -import argparse -import base64 -import io -import logging - -from . import decode -from .. import util - -log = logging.getLogger(__name__) - - -def original_data(filename): - """Locate and load original file data, whose signature is provided.""" - parts = filename.rsplit('.', 1) - if len(parts) == 2 and parts[1] in ('sig', 'asc'): - log.debug('loading file %s', parts[0]) - return open(parts[0], 'rb').read() - - -def verify(pubkey, sig_file): - """Verify correctness of public key and signature.""" - stream = open(sig_file, 'rb') - if stream.name.endswith('.asc'): - lines = stream.readlines()[3:-1] - data = base64.b64decode(''.join(lines)) - payload, checksum = data[:-3], data[-3:] - assert util.crc24(payload) == checksum - stream = io.BytesIO(payload) - - signature, digest = decode.load_signature(stream, original_data(sig_file)) - decode.verify_digest(pubkey=pubkey, digest=digest, - signature=signature['sig'], label='GPG signature') - log.info('%s OK', sig_file) - - -def main(): - """Main function.""" - p = argparse.ArgumentParser() - p.add_argument('pubkey') - p.add_argument('signature') - p.add_argument('-v', '--verbose', action='store_true', default=False) - args = p.parse_args() - logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, - format='%(asctime)s %(levelname)-10s %(message)s') - verify(pubkey=decode.load_public_key(open(args.pubkey, 'rb')), - sig_file=args.signature) - -if __name__ == '__main__': - main() diff --git a/trezor_agent/gpg/decode.py b/trezor_agent/gpg/decode.py index 515353b..5cbbf6c 100644 --- a/trezor_agent/gpg/decode.py +++ b/trezor_agent/gpg/decode.py @@ -1,4 +1,5 @@ """Decoders for GPG v2 data structures.""" +import base64 import functools import hashlib import io @@ -268,6 +269,7 @@ def load_public_key(stream, use_custom=False): packet, signature = packets[:2] packets = packets[2:] + packet['user_id'] = userid['value'] return packet @@ -281,7 +283,8 @@ def load_signature(stream, original_data): def load_from_gpg(user_id, use_custom=False): """Load existing GPG public key for `user_id` from local keyring.""" - pubkey_bytes = subprocess.check_output(['gpg2', '--export', user_id]) + args = ['gpg2', '--export'] + ([user_id] if user_id else []) + pubkey_bytes = subprocess.check_output(args=args) if pubkey_bytes: return load_public_key(io.BytesIO(pubkey_bytes), use_custom=use_custom) else: @@ -298,3 +301,19 @@ def verify_digest(pubkey, digest, signature, label): except ecdsa.keys.BadSignatureError: log.error('Bad %s!', label) raise + + +def verify(pubkey, signature, original_data): + """Verify correctness of public key and signature.""" + stream = io.BytesIO(signature) + + # remove GPG armor + lines = stream.readlines()[3:-1] + data = base64.b64decode(''.join(lines)) + payload, checksum = data[:-3], data[-3:] + assert util.crc24(payload) == checksum + stream = io.BytesIO(payload) + + signature, digest = load_signature(stream, original_data) + verify_digest(pubkey=pubkey, digest=digest, + signature=signature['sig'], label='GPG signature') diff --git a/trezor_agent/gpg/encode.py b/trezor_agent/gpg/encode.py index 7a3e18a..c672aa8 100644 --- a/trezor_agent/gpg/encode.py +++ b/trezor_agent/gpg/encode.py @@ -125,8 +125,9 @@ class Signer(object): curve_name=curve_name, created=created, verifying_key=self.conn.pubkey()) - log.info('%s GPG public key %s created at %s', curve_name, - self.pubkey, util.time_format(self.pubkey.created)) + log.info('%s GPG public key %s created at %s for "%s"', + curve_name, self.pubkey, + util.time_format(self.pubkey.created), user_id) @classmethod def from_public_key(cls, pubkey, user_id): diff --git a/trezor_agent/gpg/signer.py b/trezor_agent/gpg/signer.py index a303413..b8c6014 100755 --- a/trezor_agent/gpg/signer.py +++ b/trezor_agent/gpg/signer.py @@ -6,73 +6,65 @@ import subprocess as sp import sys import time -from . import check, decode, encode +from . import decode, encode log = logging.getLogger(__name__) -def _open_output(filename): - return sys.stdout if filename == '-' else open(filename, 'wb') +def run_create(args): + """Generate a new pubkey for a new/existing GPG identity.""" + s = encode.Signer(user_id=args.user_id, created=args.time, + curve_name=args.ecdsa_curve) + if args.subkey: + subkey = s.subkey() + primary = sp.check_output(['gpg2', '--export', args.user_id]) + result = primary + subkey + else: + result = s.export() + s.close() + + return encode.armor(result, 'PUBLIC KEY BLOCK') + +def run_sign(args): + """Generate a GPG signature using hardware-based device.""" + pubkey = decode.load_from_gpg(user_id=None, use_custom=True) + s = encode.Signer.from_public_key(pubkey=pubkey, user_id=pubkey['user_id']) + if args.filename: + data = open(args.filename, 'rb').read() + else: + data = sys.stdin.read() + sig = s.sign(data) + s.close() -def _call_with_input(args, blob): - p = sp.Popen(args=args, stdin=sp.PIPE) - p.stdin.write(blob) - p.stdin.close() - exit_code = p.wait() - assert exit_code == 0, exit_code + sig = encode.armor(sig, 'SIGNATURE') + decode.verify(pubkey=pubkey, signature=sig, original_data=data) + return sig def main(): """Main function.""" p = argparse.ArgumentParser() - p.add_argument('user_id') - p.add_argument('filename', nargs='?') - p.add_argument('-t', '--time', type=int, default=int(time.time())) - p.add_argument('-a', '--armor', action='store_true', default=False) p.add_argument('-v', '--verbose', action='store_true', default=False) - p.add_argument('-s', '--subkey', action='store_true', default=False) - p.add_argument('-e', '--ecdsa-curve', default='nist256p1') - p.add_argument('-o', '--output', - help='Output file name for the results. ' - 'Use "-" to write the results to stdout or "GPG" ' - 'to import a public key into the local keyring.') + subparsers = p.add_subparsers() + + create = subparsers.add_parser('create') + create.add_argument('user_id', help='e.g. ' + '"Satoshi Nakamoto "') + create.add_argument('-s', '--subkey', action='store_true', default=False) + create.add_argument('-e', '--ecdsa-curve', default='nist256p1') + create.add_argument('-t', '--time', type=int, default=int(time.time())) + create.set_defaults(run=run_create) + + sign = subparsers.add_parser('sign') + sign.add_argument('filename', nargs='?') + sign.set_defaults(run=run_sign) args = p.parse_args() logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO, format='%(asctime)s %(levelname)-10s %(message)s') - user_id = args.user_id.encode('ascii') - if not args.filename: - s = encode.Signer(user_id=user_id, created=args.time, - curve_name=args.ecdsa_curve) - if args.subkey: - pubkey = s.subkey() - else: - pubkey = s.export() - - ext = '.pub' - if args.armor: - pubkey = encode.armor(pubkey, 'PUBLIC KEY BLOCK') - ext = '.asc' - filename = args.output or '-' # use stdout if no file specified - if filename == 'GPG': - log.info('importing public key to local keyring') - _call_with_input(['gpg2', '--import'], pubkey) - else: - _open_output(filename).write(pubkey) - else: - pubkey = decode.load_from_gpg(user_id, use_custom=True) - s = encode.Signer.from_public_key(pubkey=pubkey, user_id=user_id) - data = open(args.filename, 'rb').read() - sig, ext = s.sign(data), '.sig' - if args.armor: - sig = encode.armor(sig, 'SIGNATURE') - ext = '.asc' - filename = args.output or (args.filename + ext) - _open_output(filename).write(sig) - check.verify(pubkey=pubkey, sig_file=filename) - - s.close() + result = args.run(args) + sys.stdout.write(result) if __name__ == '__main__':