diff --git a/libagent/device/ui.py b/libagent/device/ui.py index 20c0434..d73d91d 100644 --- a/libagent/device/ui.py +++ b/libagent/device/ui.py @@ -4,7 +4,7 @@ import logging import os import subprocess -from ..gpg import agent +from .. import util log = logging.getLogger(__name__) @@ -93,17 +93,17 @@ def interact(title, description, prompt, binary, options): env=os.environ) expect(p, [b'OK']) - title = agent.serialize(title.encode('ascii')) + title = util.assuan_serialize(title.encode('ascii')) write(p, b'SETTITLE ' + title + b'\n') expect(p, [b'OK']) if description: - description = agent.serialize(description.encode('ascii')) + description = util.assuan_serialize(description.encode('ascii')) write(p, b'SETDESC ' + description + b'\n') expect(p, [b'OK']) if prompt: - prompt = agent.serialize(prompt.encode('ascii')) + prompt = util.assuan_serialize(prompt.encode('ascii')) write(p, b'SETPROMPT ' + prompt + b'\n') expect(p, [b'OK']) diff --git a/libagent/gpg/agent.py b/libagent/gpg/agent.py index c4271e5..1bfad8f 100644 --- a/libagent/gpg/agent.py +++ b/libagent/gpg/agent.py @@ -21,25 +21,17 @@ def yield_connections(sock): yield conn -def serialize(data): - """Serialize data according to ASSUAN protocol.""" - for c in [b'%', b'\n', b'\r']: - escaped = '%{:02X}'.format(ord(c)).encode('ascii') - data = data.replace(c, escaped) - return data - - def sig_encode(r, s): """Serialize ECDSA signature data into GPG S-expression.""" - r = serialize(util.num2bytes(r, 32)) - s = serialize(util.num2bytes(s, 32)) + r = util.assuan_serialize(util.num2bytes(r, 32)) + s = util.assuan_serialize(util.num2bytes(s, 32)) return b'(7:sig-val(5:ecdsa(1:r32:' + r + b')(1:s32:' + s + b')))' def _serialize_point(data): prefix = '{}:'.format(len(data)).encode('ascii') # https://www.gnupg.org/documentation/manuals/assuan/Server-responses.html - return b'(5:value' + serialize(prefix + data) + b')' + return b'(5:value' + util.assuan_serialize(prefix + data) + b')' def parse_ecdh(line): diff --git a/libagent/gpg/tests/test_agent.py b/libagent/gpg/tests/test_agent.py new file mode 100644 index 0000000..2f82522 --- /dev/null +++ b/libagent/gpg/tests/test_agent.py @@ -0,0 +1,11 @@ +from .. import agent + + +def test_sig_encode(): + SIG = ( + b'(7:sig-val(5:ecdsa(1:r32:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x0c)(1:s32:\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00' + b'\x00\x00\x00\x00")))') + assert agent.sig_encode(12, 34) == SIG diff --git a/libagent/tests/test_util.py b/libagent/tests/test_util.py index ead8970..2cef300 100644 --- a/libagent/tests/test_util.py +++ b/libagent/tests/test_util.py @@ -115,3 +115,9 @@ def test_memoize(): assert g(1) == g(1) assert g(1) != g(2) assert f.mock_calls == [mock.call(1), mock.call(2)] + + +def test_assuan_serialize(): + assert util.assuan_serialize(b'') == b'' + assert util.assuan_serialize(b'123\n456') == b'123%0A456' + assert util.assuan_serialize(b'\r\n') == b'%0D%0A' diff --git a/libagent/util.py b/libagent/util.py index 15c172b..623ddda 100644 --- a/libagent/util.py +++ b/libagent/util.py @@ -229,3 +229,11 @@ def which(cmd): raise OSError('Cannot find {!r} in $PATH'.format(cmd)) log.debug('which %r => %r', cmd, full_path) return full_path + + +def assuan_serialize(data): + """Serialize data according to ASSUAN protocol (for GPG daemon communication).""" + for c in [b'%', b'\n', b'\r']: + escaped = '%{:02X}'.format(ord(c)).encode('ascii') + data = data.replace(c, escaped) + return data