Support DCERPC.V5 library and SMB2 for smb_login and smb_lookupsid modules

pull/12/head
asolino 9 years ago
parent e6b7be52f7
commit fccc584098

@ -2546,15 +2546,18 @@ class LDAP_login:
# SMB {{{
try:
from impacket import smb as impacket_smb
from impacket.dcerpc import dcerpc, transport, lsarpc
from impacket.smbconnection import SMBConnection, SessionError
from impacket import nt_errors
from impacket.dcerpc.v5 import transport, lsat, lsad
from impacket.dcerpc.v5.dtypes import MAXIMUM_ALLOWED
from impacket.dcerpc.v5.samr import SID_NAME_USE
except ImportError:
notfound.append('impacket')
class SMB_Connection(TCP_Connection):
def close(self):
self.fp.get_socket().close()
self.fp.getSMBServer().get_socket().close()
class Response_SMB(Response_Base):
indicatorsfmt = [('code', -8), ('size', -4), ('time', 6)]
@ -2579,34 +2582,9 @@ class SMB_login(TCP_Cache):
Response = Response_SMB
# ripped from medusa smbnt.c
error_map = {
0xffff: 'UNKNOWN_ERROR_CODE',
0x0000: 'STATUS_SUCCESS',
0x000d: 'STATUS_INVALID_PARAMETER',
0x005e: 'STATUS_NO_LOGON_SERVERS',
0x006d: 'STATUS_LOGON_FAILURE',
0x006e: 'STATUS_ACCOUNT_RESTRICTION',
0x006f: 'STATUS_INVALID_LOGON_HOURS',
0x0002: 'STATUS_INVALID_LOGON_HOURS (LM Authentication)',
0x0070: 'STATUS_INVALID_WORKSTATION',
0x0002: 'STATUS_INVALID_WORKSTATION (LM Authentication)',
0x0071: 'STATUS_PASSWORD_EXPIRED',
0x0072: 'STATUS_ACCOUNT_DISABLED',
0x015b: 'STATUS_LOGON_TYPE_NOT_GRANTED',
0x018d: 'STATUS_TRUSTED_RELATIONSHIP_FAILURE',
0x0193: 'STATUS_ACCOUNT_EXPIRED',
0x0002: 'STATUS_ACCOUNT_EXPIRED_OR_DISABLED (LM Authentication)',
0x0224: 'STATUS_PASSWORD_MUST_CHANGE',
0x0002: 'STATUS_PASSWORD_MUST_CHANGE (LM Authentication)',
0x0234: 'STATUS_ACCOUNT_LOCKED_OUT (No LM Authentication Code)',
0x0001: 'AS400_STATUS_LOGON_FAILURE',
0x0064: 'The machine you are logging onto is protected by an authentication firewall.',
}
def connect(self, host, port):
# if port == 445, impacket will use <host> instead of '*SMBSERVER' as the remote_name
fp = impacket_smb.SMB('*SMBSERVER', host, sess_port=int(port))
fp = SMBConnection('*SMBSERVER', host, sess_port=int(port))
return SMB_Connection(fp)
@ -2632,28 +2610,14 @@ class SMB_login(TCP_Cache):
fp.login(user, password, domain)
logger.debug('No error')
code, mesg = '0', '%s\\%s (%s)' % (fp.get_server_domain(), fp.get_server_name(), fp.get_server_os())
code, mesg = '0', '%s\\%s (%s)' % (fp.getServerDomain(), fp.getServerName(), fp.getServerOS())
self.reset()
except impacket_smb.SessionError as e:
if e.error_class == 0:
code = '%04x' % e.error_code
mesg = self.error_map.get(e.error_code & 0x0000fffff, '')
else:
code = '%x%04x' % (e.error_class, e.error_code)
error_class = e.error_classes.get(e.error_class, None) # -> ("ERRNT", nt_msgs)
if error_class:
class_str = error_class[0] # "ERRNT"
error_tuple = error_class[1].get(e.error_code, None) # -> ("STATUS_LOGON_FAILURE","Logon failure: unknown user name or bad password.") or None
if error_tuple:
mesg = '%s %s' % error_tuple
else:
mesg = '%s' % class_str
except SessionError as e:
# Something failed, bye
code = str(hex(e.getErrorCode()))
mesg = nt_errors.ERROR_MESSAGES[e.getErrorCode()][0]
if persistent == '0':
self.reset()
@ -2691,37 +2655,33 @@ class SMB_lookupsid(TCP_Cache):
def connect(self, host, port, user, password):
smbt = transport.SMBTransport(host, int(port), r'\lsarpc', user, password)
dce = dcerpc.DCERPC_v5(smbt)
dce = smbt.get_dce_rpc()
dce.connect()
dce.bind(lsarpc.MSRPC_UUID_LSARPC)
fp = lsarpc.DCERPCLsarpc(dce)
return DCE_Connection(fp, smbt)
dce.bind(lsat.MSRPC_UUID_LSAT)
# http://msdn.microsoft.com/en-us/library/windows/desktop/hh448528%28v=vs.85%29.aspx
SID_NAME_USER = [0, 'User', 'Group', 'Domain', 'Alias', 'WellKnownGroup', 'DeletedAccount', 'Invalid', 'Unknown', 'Computer', 'Label']
return DCE_Connection(dce, smbt)
def execute(self, host, port='139', user='', password='', sid=None, rid=None, persistent='1'):
fp, _ = self.bind(host, port, user, password)
try:
fp, _ = self.bind(host, port, user, password)
except SessionError, e:
error_code = nt_errors.ERROR_MESSAGES[e.getErrorCode()][0]
# Something failed, bye
return self.Response(1, error_code )
if rid:
sid = '%s-%s' % (sid, rid)
op2 = fp.LsarOpenPolicy2('\\', access_mask=0x02000000)
res = fp.LsarLookupSids(op2['ContextHandle'], [sid])
if res['ErrorCode'] == 0:
op2 = lsat.hLsarOpenPolicy2(fp, MAXIMUM_ALLOWED | lsat.POLICY_LOOKUP_NAMES)
try:
res = lsat.hLsarLookupSids(fp, op2['PolicyHandle'], [sid],lsat.LSAP_LOOKUP_LEVEL.LsapLookupWksta)
code, names = 0, []
for d in res.formatDict():
if 'types' in d: # http://code.google.com/p/impacket/issues/detail?id=10
names.append(','.join('%s\\%s (%s)' % (d['domain'], n, self.SID_NAME_USER[t]) for n, t in zip(d['names'], d['types'])))
else:
names.append(','.join('%s\\%s' % (d['domain'], n) for n in d['names']))
else:
for n, item in enumerate(res['TranslatedNames']['Names']):
if item['Use'] != SID_NAME_USE.SidTypeUnknown:
names.append("%s\\%s (%s)" % (res['ReferencedDomains']['Domains'][item['DomainIndex']]['Name'], item['Name'], SID_NAME_USE.enumItems(item['Use']).name))
except Exception, e:
code, names = 1, ['unknown'] # STATUS_SOME_NOT_MAPPED
if persistent == '0':

Loading…
Cancel
Save