From d52f295326caa93b0580b458cbcd7974dcc76791 Mon Sep 17 00:00:00 2001 From: Roman Zeyde Date: Thu, 2 Nov 2017 17:10:09 +0200 Subject: [PATCH] gpg: use shutil.which() for Python 3 --- libagent/gpg/__init__.py | 3 +-- libagent/gpg/keyring.py | 20 +++++++++++--------- libagent/util.py | 11 +++++++++++ 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/libagent/gpg/__init__.py b/libagent/gpg/__init__.py index 0a10e21..24f2523 100644 --- a/libagent/gpg/__init__.py +++ b/libagent/gpg/__init__.py @@ -129,8 +129,7 @@ def run_init(device_type, args): check_call(['mkdir', '-p', homedir]) check_call(['chmod', '700', homedir]) - agent_path = check_output(['which', '{}-gpg-agent'.format(device_name)]) - agent_path = agent_path.strip() + agent_path = util.which('{}-gpg-agent'.format(device_name)) # Prepare GPG configuration file with open(os.path.join(homedir, 'gpg.conf'), 'w') as f: diff --git a/libagent/gpg/keyring.py b/libagent/gpg/keyring.py index 76c8090..cd73d99 100644 --- a/libagent/gpg/keyring.py +++ b/libagent/gpg/keyring.py @@ -16,10 +16,11 @@ log = logging.getLogger(__name__) def get_agent_sock_path(env=None, sp=subprocess): """Parse gpgconf output to find out GPG agent UNIX socket path.""" - output = sp.check_output(['gpgconf', '--list-dirs'], env=env) + args = [util.which('gpgconf'), '--list-dirs'] + output = sp.check_output(args=args, env=env) lines = output.strip().split(b'\n') dirs = dict(line.split(b':', 1) for line in lines) - log.debug('gpgconf --list-dirs: %s', dirs) + log.debug('%s: %s', args, dirs) return dirs[b'agent-socket'] @@ -179,22 +180,23 @@ def sign_digest(sock, keygrip, digest, sp=subprocess, environ=None): def get_gnupg_components(sp=subprocess): """Parse GnuPG components' paths.""" - output = sp.check_output(['gpgconf', '--list-components']) + output = sp.check_output([util.which('gpgconf'), '--list-components']) components = dict(re.findall('(.*):.*:(.*)', output.decode('ascii'))) log.debug('gpgconf --list-components: %s', components) return components +@util.memoize def get_gnupg_binary(sp=subprocess): """Starting GnuPG 2.2.x, the default installation uses `gpg`.""" return get_gnupg_components(sp=sp)['gpg'] -def gpg_command(args, env=None, sp=subprocess): +def gpg_command(args, env=None): """Prepare common GPG command line arguments.""" if env is None: env = os.environ - cmd = [get_gnupg_binary(sp=sp)] + cmd = [get_gnupg_binary()] homedir = env.get('GNUPGHOME') if homedir: cmd.extend(['--homedir', homedir]) @@ -203,14 +205,14 @@ def gpg_command(args, env=None, sp=subprocess): def get_keygrip(user_id, sp=subprocess): """Get a keygrip of the primary GPG key of the specified user.""" - args = gpg_command(['--list-keys', '--with-keygrip', user_id], sp=sp) + args = gpg_command(['--list-keys', '--with-keygrip', user_id]) output = sp.check_output(args).decode('ascii') return re.findall(r'Keygrip = (\w+)', output)[0] def gpg_version(sp=subprocess): """Get a keygrip of the primary GPG key of the specified user.""" - args = gpg_command(['--version'], sp=sp) + args = gpg_command(['--version']) output = sp.check_output(args) line = output.split(b'\n')[0] # b'gpg (GnuPG) 2.1.11' return line.split(b' ')[-1] # b'2.1.11' @@ -218,7 +220,7 @@ def gpg_version(sp=subprocess): def export_public_key(user_id, env=None, sp=subprocess): """Export GPG public key for specified `user_id`.""" - args = gpg_command(['--export', user_id], sp=sp) + args = gpg_command(['--export', user_id]) result = sp.check_output(args=args, env=env) if not result: log.error('could not find public key %r in local GPG keyring', user_id) @@ -228,7 +230,7 @@ def export_public_key(user_id, env=None, sp=subprocess): def export_public_keys(env=None, sp=subprocess): """Export all GPG public keys.""" - args = gpg_command(['--export'], sp=sp) + args = gpg_command(['--export']) result = sp.check_output(args=args, env=env) if not result: raise KeyError('No GPG public keys found at env: {!r}'.format(env)) diff --git a/libagent/util.py b/libagent/util.py index c10b0b6..82e60ee 100644 --- a/libagent/util.py +++ b/libagent/util.py @@ -4,6 +4,7 @@ import contextlib import functools import io import logging +import shutil import struct log = logging.getLogger(__name__) @@ -213,3 +214,13 @@ def memoize(func): return result return wrapper + + +@memoize +def which(cmd): + """Return full path to specified command, or raise OSError if missing.""" + full_path = shutil.which(cmd) + if full_path is None: + raise OSError('Cannot find {!r} in $PATH'.format(cmd)) + log.debug('which %r => %r', cmd, full_path) + return full_path