added keyfile option to ssh_login

pull/10/head
lanjelot 10 years ago
parent 576be76d99
commit c37ff75fa7

@ -1981,6 +1981,15 @@ try:
except ImportError:
warnings.append('paramiko')
def load_keyfile(keyfile):
for cls in (paramiko.RSAKey, paramiko.DSSKey, paramiko.ECDSAKey):
try:
return cls.from_private_key_file(keyfile)
except paramiko.SSHException:
pass
else:
raise
class SSH_login(TCP_Cache):
'''Brute-force SSH'''
@ -1993,7 +2002,8 @@ class SSH_login(TCP_Cache):
('port', 'target port [22]'),
('user', 'usernames to test'),
('password', 'passwords to test'),
('auth_type', 'auth type to use [password|keyboard-interactive|auto]'),
('auth_type', 'type of password authentication to use [password|keyboard-interactive|auto]'),
('keyfile', 'file with RSA, DSA or ECDSA private key to test'),
)
available_options += TCP_Cache.available_options
@ -2005,25 +2015,34 @@ class SSH_login(TCP_Cache):
return TCP_Connection(fp, fp.remote_version)
def execute(self, host, port='22', user=None, password=None, auth_type='password', persistent='1'):
def execute(self, host, port='22', user=None, password=None, auth_type='password', keyfile=None, persistent='1'):
with Timing() as timing:
fp, banner = self.bind(host, port, user)
try:
if user is not None and password is not None:
if user is not None:
if keyfile is not None:
key = load_keyfile(keyfile)
with Timing() as timing:
if auth_type == 'password':
fp.auth_password(user, password, fallback=False)
elif auth_type == 'keyboard-interactive':
fp.auth_interactive(user, lambda a,b,c: [password] if len(c) == 1 else [])
if keyfile is not None:
fp.auth_publickey(user, key)
elif auth_type == 'auto':
fp.auth_password(user, password, fallback=True)
elif password is not None:
if auth_type == 'password':
fp.auth_password(user, password, fallback=False)
else:
raise NotImplementedError("Incorrect auth_type '%s'" % auth_type)
elif auth_type == 'keyboard-interactive':
fp.auth_interactive(user, lambda a,b,c: [password] if len(c) == 1 else [])
elif auth_type == 'auto':
fp.auth_password(user, password, fallback=True)
else:
raise NotImplementedError("Incorrect auth_type '%s'" % auth_type)
logger.debug('No error')
code, mesg = '0', banner
@ -3101,10 +3120,10 @@ class Response_HTTP(Response_Base):
def __str__(self):
lines = re.findall('^(HTTP/.+)$', self.mesg, re.M)
if not lines:
return 'Unexpected HTTP response'
else:
if lines:
return lines[-1]
else:
return self.mesg
def match_clen(self, val):
return match_range(self.content_length, val)
@ -3989,22 +4008,24 @@ class Dummy_test:
usage_hints = (
"""%prog data=RANGE0 0=hex:0x00-0xffff""",
"""%prog data=PROG0 0='seq 0 80'""",
"""%prog data=RANGE0 0=int:10-0""",
"""%prog data=PROG0 0='seq -w 10 -1 0'""",
"""%prog data=PROG0 0='mp64.bin -i ?l?l?l',$(mp64.bin --combination -i ?l?l?l)""",
)
available_options = (
('data', 'data to test'),
('data2', 'data2 to test'),
('delay', 'fake random delay'),
)
available_actions = ()
Response = Response_Base
def execute(self, data, data2=''):
def execute(self, data, data2='', delay='1'):
code, mesg = 0, '%s / %s' % (data, data2)
with Timing() as timing:
sleep(random.random())
sleep(random.randint(0, int(delay)*1000)/1000.0)
return self.Response(code, mesg, timing)

Loading…
Cancel
Save