protocol: use Handler class and fix pylint warnings

nistp521
Roman Zeyde 9 years ago
parent 7ef71df81b
commit 8596537a54

@ -20,70 +20,80 @@ SSH2_AGENTC_REMOVE_IDENTITY = 18
SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19 SSH2_AGENTC_REMOVE_ALL_IDENTITIES = 19
def legacy_pubs(buf, keys, signer): class Handler(object):
code = util.pack('B', SSH_AGENT_RSA_IDENTITIES_ANSWER)
num = util.pack('L', 0) # no SSH v1 keys def __init__(self, keys, signer):
return util.frame(code, num) self.public_keys = keys
self.signer = signer
def list_pubs(buf, keys, signer): self.methods = {
code = util.pack('B', SSH2_AGENT_IDENTITIES_ANSWER) SSH_AGENTC_REQUEST_RSA_IDENTITIES: Handler.legacy_pubs,
num = util.pack('L', len(keys)) SSH2_AGENTC_REQUEST_IDENTITIES: self.list_pubs,
log.debug('available keys: %s', [k['name'] for k in keys]) SSH2_AGENTC_SIGN_REQUEST: self.sign_message,
for i, k in enumerate(keys): }
log.debug('%2d) %s', i+1, k['fingerprint'])
pubs = [util.frame(k['blob']) + util.frame(k['name']) for k in keys] def handle(self, msg):
return util.frame(code, num, *pubs) log.debug('request: %d bytes', len(msg))
buf = io.BytesIO(msg)
code, = util.recv(buf, '>B')
def sign_message(buf, keys, signer): method = self.methods[code]
key = formats.parse_pubkey(util.read_frame(buf)) log.debug('calling %s()', method.__name__)
log.debug('looking for %s', key['fingerprint']) reply = method(buf=buf)
blob = util.read_frame(buf) log.debug('reply: %d bytes', len(reply))
return reply
for k in keys:
if (k['fingerprint']) == (key['fingerprint']): @staticmethod
log.debug('using key %r (%s)', k['name'], k['fingerprint']) def legacy_pubs(buf):
key = k ''' SSH v1 public keys are not supported '''
break assert not buf.read()
else: code = util.pack('B', SSH_AGENT_RSA_IDENTITIES_ANSWER)
raise ValueError('key not found') num = util.pack('L', 0) # no SSH v1 keys
return util.frame(code, num)
log.debug('signing %d-byte blob', len(blob))
r, s = signer(label=key['name'], blob=blob) def list_pubs(self, buf):
signature = (r, s) ''' SSH v2 public keys are serialized and returned. '''
log.debug('signature: %s', signature) assert not buf.read()
keys = self.public_keys
success = key['verifying_key'].verify(signature=signature, data=blob, code = util.pack('B', SSH2_AGENT_IDENTITIES_ANSWER)
sigdecode=lambda sig, _: sig) num = util.pack('L', len(keys))
log.info('signature status: %s', 'OK' if success else 'ERROR') log.debug('available keys: %s', [k['name'] for k in keys])
if not success: for i, k in enumerate(keys):
raise ValueError('invalid signature') log.debug('%2d) %s', i+1, k['fingerprint'])
pubs = [util.frame(k['blob']) + util.frame(k['name']) for k in keys]
sig_bytes = io.BytesIO() return util.frame(code, num, *pubs)
for x in signature:
sig_bytes.write(util.frame(b'\x00' + util.num2bytes(x, key['size']))) def sign_message(self, buf):
sig_bytes = sig_bytes.getvalue() ''' SSH v2 public key authentication is performed. '''
log.debug('signature size: %d bytes', len(sig_bytes)) key = formats.parse_pubkey(util.read_frame(buf))
log.debug('looking for %s', key['fingerprint'])
data = util.frame(util.frame(key['type']), util.frame(sig_bytes)) blob = util.read_frame(buf)
code = util.pack('B', SSH2_AGENT_SIGN_RESPONSE)
return util.frame(code, data) for k in self.public_keys:
if (k['fingerprint']) == (key['fingerprint']):
log.debug('using key %r (%s)', k['name'], k['fingerprint'])
handlers = { key = k
SSH_AGENTC_REQUEST_RSA_IDENTITIES: legacy_pubs, break
SSH2_AGENTC_REQUEST_IDENTITIES: list_pubs, else:
SSH2_AGENTC_SIGN_REQUEST: sign_message, raise ValueError('key not found')
}
log.debug('signing %d-byte blob', len(blob))
r, s = self.signer(label=key['name'], blob=blob)
def handle_message(msg, keys, signer): signature = (r, s)
log.debug('request: %d bytes', len(msg)) log.debug('signature: %s', signature)
buf = io.BytesIO(msg)
code, = util.recv(buf, '>B') success = key['verifying_key'].verify(signature=signature, data=blob,
handler = handlers[code] sigdecode=lambda sig, _: sig)
log.debug('calling %s()', handler.__name__) log.info('signature status: %s', 'OK' if success else 'ERROR')
reply = handler(buf=buf, keys=keys, signer=signer) if not success:
log.debug('reply: %d bytes', len(reply)) raise ValueError('invalid signature')
return reply
sig_bytes = io.BytesIO()
for x in signature:
x_frame = util.frame(b'\x00' + util.num2bytes(x, key['size']))
sig_bytes.write(x_frame)
sig_bytes = sig_bytes.getvalue()
log.debug('signature size: %d bytes', len(sig_bytes))
data = util.frame(util.frame(key['type']), util.frame(sig_bytes))
code = util.pack('B', SSH2_AGENT_SIGN_RESPONSE)
return util.frame(code, data)

@ -31,12 +31,12 @@ def unix_domain_socket_server(sock_path):
os.remove(sock_path) os.remove(sock_path)
def handle_connection(conn, keys, signer): def handle_connection(conn, handler):
try: try:
log.debug('welcome agent') log.debug('welcome agent')
while True: while True:
msg = util.read_frame(conn) msg = util.read_frame(conn)
reply = protocol.handle_message(msg=msg, keys=keys, signer=signer) reply = handler.handle(msg=msg)
util.send(conn, reply) util.send(conn, reply)
except EOFError: except EOFError:
log.debug('goodbye agent') log.debug('goodbye agent')
@ -47,6 +47,7 @@ def handle_connection(conn, keys, signer):
def server_thread(server, keys, signer): def server_thread(server, keys, signer):
log.debug('server thread started') log.debug('server thread started')
handler = protocol.Handler(keys=keys, signer=signer)
while True: while True:
log.debug('waiting for connection on %s', server.getsockname()) log.debug('waiting for connection on %s', server.getsockname())
try: try:
@ -55,7 +56,7 @@ def server_thread(server, keys, signer):
log.debug('server error: %s', e, exc_info=True) log.debug('server error: %s', e, exc_info=True)
break break
with contextlib.closing(conn): with contextlib.closing(conn):
handle_connection(conn, keys, signer) handle_connection(conn, handler)
log.debug('server thread stopped') log.debug('server thread stopped')

Loading…
Cancel
Save