You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Comrad/comrad/cli/cli.py

954 lines
34 KiB
Python

4 years ago
import os,sys; sys.path.append(os.path.abspath(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__),'..')),'..')))
from comrad import *
from comrad.backend import *
4 years ago
import art,asyncio
4 years ago
import textwrap as tw
4 years ago
import readline,logging
4 years ago
readline.set_completer_delims('\t')
4 years ago
# from tab_completer import tabCompleter
4 years ago
tabber=tabCompleter()
if 'libedit' in readline.__doc__:
readline.parse_and_bind("bind ^I rl_complete")
else:
readline.parse_and_bind("tab: complete")
4 years ago
torpy_logger = logging.getLogger('torpy')
logging.getLogger('urllib3').propagate=False
logging.getLogger('shapely').propagate=False
logging.getLogger('pyproj').propagate=False
logging.getLogger('rtree').propagate=False
4 years ago
logging.getLogger('asyncio').setLevel(logging.WARNING)
4 years ago
torpy_logger.propagate=False
4 years ago
from shutil import get_terminal_size
4 years ago
4 years ago
CLI_WIDTH = get_terminal_size().columns
CLI_HEIGHT = get_terminal_size().lines-1
4 years ago
4 years ago
class CLI(Logger):
ROUTES = {
4 years ago
'help':'seek help',
4 years ago
'register':'join the comrades',
4 years ago
'login':'log back in',
'meet':'meet a comrad',
4 years ago
'who':'show contacts or info',
4 years ago
'dm':'write people',
4 years ago
'refresh':'refresh feed/DMs',
4 years ago
'dms':'read DMs',
'feed':'read posts',
4 years ago
'verbose':'show/hide log output',
'post':'post to world',
4 years ago
'feed':'fetch posts',
'exit':'exit comrad',
4 years ago
'clearnet':'switch to clearnet',
'tor':'switch to tor',
4 years ago
}
4 years ago
def __init__(self,name='',cmd='',persona=None):
4 years ago
self.name=name
4 years ago
self.cmd=cmd
self.comrad=None
4 years ago
self.loggedin=False
4 years ago
self.tabber=tabber
4 years ago
self.log('ROUTES:',self.ROUTES)
4 years ago
self.hops=[]
4 years ago
# Routes
rts=['/'+k for k in self.ROUTES]
tabber.createListCompleter(rts)
readline.set_completer(tabber.listCompleter)
4 years ago
4 years ago
# logging
from comrad.backend.mazes import MazeWalker
4 years ago
self.walker=MazeWalker(callbacks=self.callbacks)
self.torpy_logger = logging.getLogger('torpy')
self.torpy_logger.propagate=False
self.torpy_logger.addHandler(self.walker)
import ipinfo
ipinfo_access_token = '90df1baf7c373a'
self.ipinfo_handler = ipinfo.getHandler(ipinfo_access_token)
4 years ago
def verbose(self,*x):
self.toggle_log()
4 years ago
def run(self,inp='',name=''):
4 years ago
# if name: self.name=name
4 years ago
clear_screen()
self.boot()
4 years ago
if not inp:
self.help()
4 years ago
4 years ago
if inp:
4 years ago
for inpx in inp.split('/'):
self.route('/'+inpx.strip())
4 years ago
while True:
try:
4 years ago
inp=input(f'\n@{self.name if self.name else "?"}: ')
4 years ago
# self.print(inp,'??')
4 years ago
self.route(inp)
4 years ago
except (KeyboardInterrupt,EOFError) as e:
4 years ago
self.stat('Goodbye.')
exit()
except ComradException as e:
4 years ago
self.stat(f'I could not handle your request. {e}\n')
4 years ago
#await asyncio.sleep(0.5)
4 years ago
self.walker.walk=[] # reset logger's walk?
4 years ago
def route(self,inp):
inp=inp.strip()
4 years ago
# self.print('route got:',[inp])
4 years ago
if not inp.startswith('/'): return
cmd=inp.split()[0]
dat=inp[len(cmd):].strip()
cmd=cmd[1:]
4 years ago
# self.print([cmd,dat])
4 years ago
if cmd in self.ROUTES and hasattr(self,cmd):
f=getattr(self,cmd)
4 years ago
try:
res=f(dat)
except ComradException as e:
4 years ago
self.stat('Message not sent.',str(e),'\n')
4 years ago
def stat(self,*msgs,use_prefix=True,prefix=None,comrad_name=None,pause=False,clear=False,min_prefix_len=12,**kwargs):
4 years ago
if hasattr(self,'_mapscr') and self._mapscr:
self._mapscr.endwin()
self._mapscr=None
self.hops = []
4 years ago
if not prefix:
if not comrad_name: comrad_name='Telephone'
# prefix='Comrad @'+comrad_name+': '
prefix='@'+comrad_name+': '
4 years ago
# wrap msg
total_msg=wrapp(
*msgs,
use_prefix=use_prefix,
prefix=prefix if use_prefix and prefix else '',
min_prefix_len=min_prefix_len if use_prefix and prefix else 0
)
print(total_msg)
4 years ago
# print()
4 years ago
if pause: do_pause()
if clear: clear_screen()
4 years ago
def print(self,*x):
4 years ago
print(*x)
# x=' '.join(str(xx) for xx in x)
# x=str(x).replace('\r\n','\n').replace('\r','\n')
# for ln in x.split('\n'):
# # #scan_print(ln+'\n\n')
# if not ln: print()
# for ln2 in tw.wrap(ln,CLI_WIDTH):
# print(ln2)
# # x='\n'.join(tw.wrap(x,CLI_WIDTH))
# # print(x)
4 years ago
4 years ago
def boot(self,indent=None,scan=True):
4 years ago
logo=art.text2art(CLI_TITLE,font=CLI_FONT).replace('\r\n','\n')
4 years ago
newlogo=[]
for logo_ln in logo.split('\n'):
logo_ln = logo_ln.center(CLI_WIDTH,' ')
newlogo.append(logo_ln)
newlogo_s='\n'.join(newlogo)
scan_print(newlogo_s,speed=5) if scan else self.print(newlogo_s)
4 years ago
4 years ago
def status_str(self,unr,tot):
read=tot-unr
4 years ago
return f'({unr}*)' if unr else f'({unr})'
4 years ago
# return f'({unr}*)' if unr else f'({unr})'
4 years ago
# return f'{unr}* of {tot}' if tot else str(tot)
@property
def post_status_str(self):
if not self.comrad: return ''
4 years ago
return self.status_str(
unr=self.comrad.num_unread_posts,
tot=self.comrad.num_posts
4 years ago
)
@property
def msg_status_str(self):
if not self.comrad: return ''
4 years ago
return self.status_str(
unr=self.comrad.num_unread_msgs,
tot=self.comrad.num_msgs
4 years ago
)
4 years ago
def clearnet(self,_=''):
4 years ago
os.environ['COMRAD_USE_CLEARNET'] = '1'
os.environ['COMRAD_USE_TOR'] = '0'
4 years ago
def tor(self,_=''):
4 years ago
os.environ['COMRAD_USE_CLEARNET'] = '0'
os.environ['COMRAD_USE_TOR'] = '1'
4 years ago
4 years ago
4 years ago
def help(self,*x,**y):
4 years ago
clear_screen()
4 years ago
self.boot(scan=False)
4 years ago
4 years ago
4 years ago
if not self.logged_in:
HELPSTR=f"""
4 years ago
/login [name] --> log back in
/register [name] --> new comrad"""
4 years ago
else:
HELPSTR=f"""
4 years ago
/refresh --> get new data
4 years ago
/feed --> scroll feed {self.post_status_str}
4 years ago
/post --> post to all
4 years ago
4 years ago
/dms --> see DMs {self.msg_status_str}
4 years ago
/dm [name] --> send a DM
4 years ago
/meet [name] --> exchange info
/who [name] --> show contacts
"""
4 years ago
HELPSTR+=f"""
4 years ago
/help --> seek help
/exit --> exit app
4 years ago
"""
4 years ago
helpstr = tw.indent(HELPSTR.strip()+'\n\n',' '*11)
4 years ago
# self.print(helpstr)
4 years ago
# self.print(border+helpstr+'\n'+self.border)
4 years ago
lns=[ln.strip() for ln in helpstr.split('\n')]
maxlen=max([len(ln) for ln in lns])
for ln in lns:
for n in range(len(ln),maxlen): ln +=' '
print(ln.center(CLI_WIDTH))
4 years ago
4 years ago
def exit(self,dat=''):
exit('Goodbye.')
4 years ago
@property
def border(self):
border = '-' * CLI_WIDTH # (len(logo.strip().split('\n')[0]))
border=tw.indent(border, ' '*2)
return border
4 years ago
def intro(self):
4 years ago
self.status(None)
4 years ago
def who(self,whom):
4 years ago
if self.with_required_login():
contacts_obj = self.comrad.contacts()
4 years ago
contacts = [p.name for p in contacts_obj]
4 years ago
self.print(' ' + '\n '.join(contacts))
4 years ago
4 years ago
@property
def callbacks(self):
return {
4 years ago
'torpy_guard_node_connect':self.callback_on_hop,
'torpy_extend_circuit':self.callback_on_hop,
4 years ago
}
4 years ago
def callback_on_hop(self,rtr):
4 years ago
def run_map():
from worldmap_curses import make_map #,print_map_simple
# clear_screen()
if not hasattr(self,'_mapscr') or not self._mapscr:
self._mapscr = mapscr = make_map()
mapscr.add_base_map()
# stop
else:
mapscr = self._mapscr
# return
# mapscr.stdscr.getch()
deets = self.ipinfo_handler.getDetails(rtr.ip)
4 years ago
if rtr.ip not in {hop[0].ip for hop in self.hops}:
self.hops.append((rtr,deets))
places = [
(_deets.city,tuple(float(_) for _ in _deets.loc.split(',')))
for (_rtr,_deets) in [(rtr,deets)]
]
4 years ago
4 years ago
msg = ['@Tor: Hiding your IP by hopping it around the globe:'] + [f'{_deets.city}, {_deets.country_name} ({_rtr.nickname})' for _rtr,_deets in self.hops
]
mapscr.run_print_map(places,msg=msg)
4 years ago
# self.stat(
# msg,
# comrad_name='Tor'
4 years ago
# )
# print()
# input('pausing on callback: '+msg)
4 years ago
4 years ago
run_map()
# self._mapscr=None
4 years ago
4 years ago
4 years ago
def register(self,name=None):
4 years ago
if not name: name=input('name: ')
if not name: return
self.comrad = Comrad(name,callbacks=self.callbacks)
4 years ago
was_off=self.off
4 years ago
# if was_off: self.show_log()
def logfunc(*x,comrad_name='Keymaker',**y):
self.stat(*x,comrad_name=comrad_name,**y)
4 years ago
res=self.comrad.register(logfunc=logfunc)
4 years ago
# if was_off: self.toggle_log()
4 years ago
if res and type(res)==dict and 'success' in res and res['success']:
self.name=self.comrad.name
4 years ago
self.loggedin=True
4 years ago
self.help()
# self.stat(f'Welcome, Comrad @{self.name}.')
4 years ago
else:
4 years ago
self.name=None
self.loggedin=False
self.comrad=None
4 years ago
self.help()
4 years ago
if res and 'status' in res:
# self.boot()
self.stat(res.get('status','?'),comrad_name='Operator')
4 years ago
4 years ago
def login(self,name):
# self.print(self,name,self.name,self.comrad,self.loggedin)
4 years ago
if not name: name=input('name: ')
if not name: return
4 years ago
self.comrad=commie=Comrad(name,callbacks=self.callbacks)
if commie.exists_locally_as_account():
from getpass import getpass
# pw=getpass(f'\n@Telephone: Welcome back. Your password please?\n@{name}: ')
pw=getpass(f'password: ')
commie.keychain(passphrase=pw)
self.log(f'updated keychain: {dict_format(commie.keychain())}')
self.log(f'is account')
# self.login_status.text='You should be able to log into this account.'
if commie.privkey:
self.log(f'passkey login succeeded')
self.stat(f'Welcome back, Comrad @{commie.name}')
self.loggedin=True
self.name=commie.name
self.help()
else:
logger.info(f'passkey login failed')
self.stat('Login failed...')
else:
self.stat('This is not an account of yours.')
# return self.refresh()
# res = self.comrad.login()
4 years ago
# return self.do_login(res)
4 years ago
4 years ago
def do_login(self,res):
4 years ago
# print('got login res:',res)
self.log('<- comrad.login() <-',res)
4 years ago
4 years ago
if res and type(res)==dict and 'success' in res and res['success']:
4 years ago
self.name=res['name']
self.comrad=Comrad(res['name'],callbacks=self.callbacks)
4 years ago
self.loggedin=True
4 years ago
else:
4 years ago
self.name=None
self.loggedin=False
self.comrad=None
if res and 'status' in res:
4 years ago
self.boot(scan=False)
4 years ago
self.help()
self.stat(res.get('status','?'),comrad_name='Operator')
4 years ago
return bool(res.get('success'))
4 years ago
4 years ago
@property
def logged_in(self):
return (self.loggedin and self.comrad and self.name)
4 years ago
4 years ago
4 years ago
def with_required_login(self,quiet=False):
4 years ago
if not self.logged_in:
4 years ago
if not quiet:
4 years ago
self.stat('You must be logged in first.')
4 years ago
return False
return True
4 years ago
4 years ago
def meet(self,dat,returning=False):
4 years ago
if self.with_required_login():
4 years ago
datl=dat.strip().split()
if not datl:
self.stat('Meet whom?')
return
name_or_pubkey = datl[0]
4 years ago
res = self.comrad.meet(name_or_pubkey,returning=returning)
4 years ago
status=res.get('status')
4 years ago
self.stat(status)
4 years ago
4 years ago
def dm(self,dat='',name_or_pubkey=None,msg_s=None):
4 years ago
if self.with_required_login():
4 years ago
4 years ago
if not name_or_pubkey:
dat=dat.strip()
if not dat:
self.status('Message whom? Usage: /msg [name]')
return
datl=dat.split(' ',1)
name_or_pubkey = datl[0]
4 years ago
if name_or_pubkey.startswith('@'):
name_or_pubkey=name_or_pubkey[1:]
4 years ago
if not msg_s:
4 years ago
datl=dat.split()
4 years ago
if len(datl)==1:
print()
self.stat(f'Compose your message to @{name_or_pubkey} below.', 'Press Ctrl+D to complete, or Ctrl+C to cancel.')
print()
msg_s = multiline_input().strip()
if not msg_s:
print('\n')
self.stat('Not sending. No message found.')
return
else:
msg_s = datl[1]
4 years ago
self.log(f'Composed msg to {name_or_pubkey}: {msg_s}')
msg_obj = self.comrad.msg(
4 years ago
name_or_pubkey,
4 years ago
msg_s
4 years ago
)
self.log(f'Sent msg obj to {name_or_pubkey}: {msg_obj}')
4 years ago
print()
self.stat(f'Message successfully sent to @{name_or_pubkey}.',comrad_name='Operator',pause=True)
4 years ago
4 years ago
4 years ago
def refresh(self,dat=None,res=None,statd={}):
4 years ago
self.log(f'<-- dat={dat}, res={res}')
4 years ago
# stop
4 years ago
## get updates
# this does login, msgs, and posts in one req
4 years ago
time.sleep(0.25)
4 years ago
if not self.comrad:
self.stat('You must login first.')
res = self.comrad.refresh()
4 years ago
#print(res.get('success'))
if not res.get('success'):
self.stat(res.get('status'),comrad_name='')
4 years ago
return
4 years ago
# self.stat('Patching you through to the @Operator. One moment please...')
4 years ago
if hasattr(self,'_mapscr') and self._mapscr:
#stop
msg=self._mapscr.msg + ['???, ??? (@Operator)']
self._mapscr.run_print_map(msg=msg)
time.sleep(1)
self._mapscr.endwin()
self._mapscr=None
self.hops=[]
4 years ago
self.log('<-- get_updates',res)
# check logged in
4 years ago
# res_login=res.get('res_login',{})
# if not self.do_login(res_login): return
# self.stat('',res['status'],comrad_name='Operator',**statd)
4 years ago
self.help()
4 years ago
return res
4 years ago
4 years ago
def prompt_adduser(self,msg):
4 years ago
# self.print('prompt got:',msg)
# self.print(msg.data)
4 years ago
do_pause()
4 years ago
# clear_screen()
4 years ago
# print(dict_format(msg.data))
# do_pause()
4 years ago
meet_name = msg.data.get('meet_name')
meet_uri = msg.data.get('meet')
qrstr=self.comrad.qr_str(meet_uri)
4 years ago
self.stat(f"Add @{meet_name}'s public key to your address book?",f'It will allow you and @{meet_name} to read and write encrypted messages to one another.')
do_adduser = input(f'''\n{self.comrad} [y/N]: ''')
4 years ago
if do_adduser.strip().lower()=='y':
4 years ago
import pyqrcode
print('meet_uri',meet_uri,'???')
qr = pyqrcode.create(meet_uri)
fnfn = os.path.join(PATH_QRCODES,meet_name+'.png') # self.get_path_qrcode(name=name)
qr.png(fnfn,scale=5)
4 years ago
clear_screen()
4 years ago
self.stat(f'The public key of @{meet_name} has been saved as a QRcode to {fnfn}')
print(qrstr)
4 years ago
do_pause()
clear_screen()
4 years ago
4 years ago
# print(msg.data)
# if not msg.data.get('returning'):
# bit hacky way to tell if this has been returned or not!
if 'has agreed' not in msg.data.get('txt',''):
self.stat('Send this comrad your public key as well?')
do_senduser = input(f'''\n{self.comrad} [y/N]: ''')
4 years ago
if do_senduser.strip().lower()=='y':
res = self.comrad.meet(meet_name,returning=True)
4 years ago
if res.get('success'):
4 years ago
self.log('res?!',res)
4 years ago
self.stat('Returning the invitation:',f'"{res.get("res",{}).get("msg_d",{}).get("msg",{}).get("txt","[?]")}"',use_prefix=True)
4 years ago
do_pause()
else:
self.stat(msg.get('status'))
4 years ago
4 years ago
def prompt_msg(self,msg):
4 years ago
clear_screen()
print(msg)
4 years ago
self.stat('Type "r" to reply to this message, "d" to delete it, hit Enter to continue, or type "q" to return to main menu.',use_prefix=False)
do = input(f'\n{self.comrad}: ')
4 years ago
do=do.strip().lower()
if do=='d':
4 years ago
# self.print('del',msg.post_id)
res=self.comrad.delete_post(msg.post_id)
4 years ago
if res.get('success'):
self.stat('Deleted message.')
else:
self.stat('Could not delete message.')
do_pause()
4 years ago
elif do=='r':
4 years ago
# self.print('@todo: replying...')
4 years ago
return self.dm(msg.from_name)
4 years ago
elif do=='q':
raise EOFError
4 years ago
else:
4 years ago
# seen this msg!
self.comrad.seen_msg(msg)
4 years ago
pass
4 years ago
4 years ago
4 years ago
def dms(self,dat=''):
4 years ago
if self.with_required_login():
msgs=self.comrad.messages()
4 years ago
return self.read(msgs)
def feed(self,dat=''):
if self.with_required_login():
posts=self.comrad.posts()
4 years ago
return self.read(posts)
def read(self,msgs):
if not msgs:
self.stat('No messages.')
else:
clear_screen()
for i,msg in enumerate(msgs):
try:
self.stat(f'Showing message {i+1} of {len(msgs)}, from newest to oldest. Hit Ctrl+D to exit.')
print()
print(msg)
4 years ago
self.log('DATA',msg.msg_d)
4 years ago
if msg.data.get('prompt_id')=='addcontact':
self.prompt_adduser(msg)
self.prompt_msg(msg)
clear_screen()
except EOFError:
break
self.help()
4 years ago
4 years ago
4 years ago
def post(self,dat='',maxlen=MAX_POST_LEN):
4 years ago
if self.with_required_login():
4 years ago
self.stat(f'Write your post below. Maximum of 1000 characters.', 'Press Ctrl+D to complete, or Ctrl+C to cancel.')
print()
4 years ago
msg_s = multiline_input() #.strip()
4 years ago
if not msg_s:
print('\n')
self.stat('Not sending. No text entered.')
return
4 years ago
if len(msg_s)>maxlen:
4 years ago
msg1=f'Not sending. Message is {len(msg_s)-maxlen} characters over the character limit of {maxlen}.'
msg2='The message you wanted to send is copied below (in case you want to copy/paste it to edit it):',
msg3='The message you wanted to send is copied above (in case you want to copy/paste it to edit it).'
4 years ago
err = f'{msg1}\n\n{msg2}\n\n{msg_s}'
err2= f'{msg1}\n\n{msg3}'
4 years ago
print()
4 years ago
self.stat(msg1)
# self.stat(err2)
4 years ago
return
4 years ago
self.log(f'Post written: {msg_s}')
msg_obj = self.comrad.post(
4 years ago
msg_s
)
self.log(f'Posted: {msg_obj}')
print()
self.stat(f'Post sent to @{WORLD_NAME}.',comrad_name='Operator',pause=True)
4 years ago
4 years ago
4 years ago
4 years ago
### DIALOGUES
4 years ago
4 years ago
# hello, op?
4 years ago
def status_keymaker_part1(self,name):
4 years ago
self.status(None,{ART_OLDPHONE4+'\n',True},3) #,scan=False,width=None,pause=None,clear=None)
4 years ago
nm=name if name else '?'
self.status(
f'\n\n\n@{nm}: Uh yes hello, Operator? I would like to join Comrad, the socialist network. Could you patch me through?',clear=False)
4 years ago
while not name:
name=self.status(('name','@TheTelephone: Of course, Comrad...?\n@')).get('vals').get('name').strip()
4 years ago
self.print()
4 years ago
self.status(
f'@TheTelephone: Of course, Comrad @{name}. A fine name.',
4 years ago
'''@TheTelephone: However, I'm just the local operator who lives on your device; my only job is to communicate with the remote operator securely.''',
'''Comrad @TheOperator lives on the deep web. She's the one you want to speak with.''',
4 years ago
4 years ago
None,{ART_OLDPHONE4},f'''@{name}: Hm, ok. Well, could you patch me through to the remote operator then?''',
4 years ago
4 years ago
f'''@{TELEPHONEname}: I could, but it's not safe yet. Your information could be exposed. You need to cut your encryption keys first.''',
4 years ago
f'@{name}: Fine, but how do I do that?',
4 years ago
f'@{TELEPHONEname}: Visit the Keymaker.',
4 years ago
clear=False,pause=True)
### KEYMAKER
4 years ago
self.status(None,{tw.indent(ART_KEY,' '*5)+'\n',True},3) #,clear=False,indent=10,pause=False)
4 years ago
# convo
4 years ago
self.status(
f'\n@{name}: Hello, Comrad @Keymaker? I would like help forging a new set of keys.',
4 years ago
f'@Keymaker: Of course, Comrad @{name}.',
4 years ago
)
4 years ago
4 years ago
self.status(
4 years ago
'I will cut for you two matching keys, part of an "asymmetric" pair.',
4 years ago
'Please, watch me work.',
4 years ago
4 years ago
None,{tw.indent(ART_KEY,' '*5)+'\n'},
4 years ago
4 years ago
'I use a high-level cryptographic function from Themis, a well-respected open-source cryptography library.',
'I use the iron-clad Elliptic Curve algorthm to generate the asymmetric keypair.',
'> GenerateKeyPair(KEY_PAIR_TYPE.EC)',
3
)
self.status(
None,
{ART_KEY_PAIR,True}
) #,clear=False,indent=10,pause=False)
4 years ago
return name
def status_keymaker_part2(self,name,passphrase,pubkey,privkey,hasher,persona):
from getpass import getpass
# gen what we need
uri_id = pubkey.data_b64
qr_str = get_qr_str(uri_id)
qr_path = os.path.join(PATH_QRCODES,name+'.png')
# what are pub/priv?
# self.status(
# None,{ART_KEY_PAIR},
# 'A matching set of keys have been generated.',
# None,{ART_KEY_PAIR2A+'\nA matching set of keys have been generated.'+'\n'},
# 'First, I have made a "public key" which you can share with anyone:',
# f'(1) {pubkey.data_b64.decode()}',
# 'This key is a randomly-generated binary string, which acts as your "address" on Comrad.',
4 years ago
# 'With it, someone can write you an encrypted message which only you can read.'
# )
# self.status(
# None,{ART_KEY_PAIR2A},
# f'You can share your public key by copy/pasting it to them over a secure channel (e.g. Signal).',
# 'Or, you can share it as a QR code, especially IRL:',
# {qr_str+'\n\n',True,5},
# f'\n\n(If registration is successful, this QR code be saved as an image to your device at: {qr_path}.)'
# )
4 years ago
4 years ago
# private keys
4 years ago
self.status(None,
{ART_KEY_PAIR2B},
4 years ago
'Second, I have cut a matching "private key".',
"It's too dangerous to show in full, so here it is 66% redacted:",
f'(2) {make_key_discreet(privkey.data_b64,0.3)}',
'With it, you can decrypt and read any message sent to you via your public key.',
'You can also encrypt and send messages to other people whose public keys you have.',
4 years ago
)
4 years ago
# private keys
4 years ago
self.status(None,
4 years ago
{CUBEKEY},
'So if someone were to steal your private key, they could read your mail and forge your signature.'
'You you should never, ever give your private key to anyone.',
'In fact, this key is so dangerous that we need to lock it away immediately.',
"We'll even throw away the key we use to lock this private key with!",
"How? By regenerating it each time from your password.",
4 years ago
)
4 years ago
if not passphrase:
4 years ago
from getpass import getpass
4 years ago
self.status(
'And it looks like you haven\'t yet chosen a password.',
3,"Don't tell it to me! Never tell it to anyone.",
"Ideally, don't even save it on your computer; just remember it, or write it down on paper.",
"Instead, whisper it to Comrad @Hasher, who scrambles information '1-way', like a blender.",
4 years ago
)
4 years ago
4 years ago
res = self.status(None,
4 years ago
{indent_str(ART_FROG_BLENDER,10),True},
4 years ago
"@Keymaker: Go ahead, try it. Type anything to @Hasher.",
('str_to_hash',f'@{name}: ',input)
)
str_to_hash = res.get('vals').get('str_to_hash')
4 years ago
hashed_str1 = hasher(str_to_hash.encode())
4 years ago
res = self.status(
4 years ago
'@Hasher: '+hashed_str1
4 years ago
)
res = self.status(
4 years ago
'@Keymaker: Whatever you typed, there\'s no way to reconstruct it from that garbled mess.',
'But whatever you typed will always produce the *same* garbled mess.',
('str_to_hash',f'Try typing the exact same thing over again:\n@{name}: ',input)
4 years ago
)
str_to_hash = res.get('vals').get('str_to_hash')
4 years ago
hashed_str2 = hasher(str_to_hash.encode())
4 years ago
res = self.status(
4 years ago
'@Hasher: '+hashed_str2
4 years ago
)
4 years ago
if hashed_str1==hashed_str2:
self.status('See how the hashed values are also exactly the same?')
else:
self.status('See how the hashed values have also changed?')
4 years ago
res = self.status(
4 years ago
('str_to_hash',f'Now try typing something just a little bit different:\n@{name}: ',input)
4 years ago
)
str_to_hash = res.get('vals').get('str_to_hash')
4 years ago
hashed_str3 = hasher(str_to_hash.encode())
4 years ago
res = self.status(
4 years ago
'@Hasher: '+hashed_str3
4 years ago
)
4 years ago
if hashed_str2==hashed_str3:
self.status('See how the hashed values are also the same?')
4 years ago
else:
4 years ago
self.status('See how the hashed values have also changed?')
4 years ago
4 years ago
self.status(
None,{indent_str(ART_FROG_BLENDER,10)},
'@Keymaker: Behind the scenes, @Hasher is using the SHA-256 hashing function, which was designed by the NSA.',
'But @Hasher also adds a secret "salt" to the recipe, as it\'s called.',
'To whatever you type in, @Hasher adds a secret phrase: another random string of characters which never changes.',
"By doing so, the hash output is \"salted\": made even more idiosyncratic to outside observers.",
)
self.status(
None,{indent_str(ART_FROG_BLENDER,10)},
f"I've taken the liberty of generating a random secret for your device, which I show here mostly redacted:",
make_key_discreet_str(persona.crypt_keys.secret.decode(),0.25),
'The full version of this secret is silently added to every input you type into @Hasher.',
"I've saved this secret phrase to a hidden location on your device hardware.",
)
self.status(
None,{indent_str(ART_FROG_BLENDER,10)},
'However, this means that you will be unable to log in to your account from any other device.',
'This limitation provides yet another level of hardware protection against network attacks.',
'However, you can always choose (not recommended) to the secret file with another device by a secure channel.',
3,f'But, please excuse me Comrad @{name} -- I digress.'
4 years ago
)
while not passphrase:
res = self.status(None,
{indent_str(ART_FROG_BLENDER,10)},
"@Keymaker: Please type your chosen password into @Hasher.",
('str_to_hash',f'\n@{name}: ',getpass),
pause=False
)
str_to_hash = res.get('vals').get('str_to_hash')
hashed_pass1 = hasher(str_to_hash.encode())
res = self.status(
'\n@Hasher: '+hashed_pass1,
pause=False
)
res = self.status(
'\nNow type in the same password one more time to verify it:',
('str_to_hash',f'\n@{name}: ',getpass),
pause=False
)
str_to_hash = res.get('vals').get('str_to_hash')
hashed_pass2 = hasher(str_to_hash.encode())
res = self.status(
'\n@Hasher: '+hashed_pass2,
pause=False
)
if hashed_pass1==hashed_pass2:
self.status('','@Keymaker: Excellent. The passwords clearly matched, because the hashed values matched.',pause=False)
passphrase = hashed_pass1
else:
self.status('@Keymaker: A pity. It looks like the passwords didn\'t match, since the hashed values didn\'t match either. Try again?')
return passphrase
def status_keymaker_part3(self,privkey,privkey_decr,privkey_encr,passphrase):
4 years ago
self.status(
None,{tw.indent(ART_KEY,' '*5)+'\n',True},
# None,{ART_+'\n',True},
'Now that we have a hashed passphrase, we can generate the (2A) encryption key.',
{ART_KEY_KEY2A,True,0.1},
'''The key is formed using Themis's high-level symmetric encryption library: SecureCell, using Seal mode.''',
'This key (2A) then uses the AES-256 encryption algorithm to encrypt the super-sensitive private key (2):'
)
4 years ago
4 years ago
s0=str.center('[Encryption Process]',CLI_WIDTH)
4 years ago
s1=s0 + '\n\n' + self.printt('Now that we have (2A), we can use it to encrypt the super-sensitive private key (2):',ret=True)
s2a = self.printt(f"(2A) {make_key_discreet_str(passphrase)}",ret=True)
s2 = self.printt(f"(2) {make_key_discreet(privkey.data_b64)}",ret=True)
s2b = self.printt(f"(2B) {make_key_discreet(b64encode(privkey_encr))}",ret=True)
4 years ago
self.status(
# screen 1
None,{f'{s1}'},
False,
# 2
None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING1}'},
{s2a,True},
False,
# 3
None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING2}\n{s2a}'},
{'\n'+s2,True},
False,
# 4
None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING3}\n{s2a}\n\n{s2}'},
{'\n'+s2b,True},
False,
)
4 years ago
4 years ago
4 years ago
shdr=str.center('[Decryption Process]',CLI_WIDTH) + '\n\n' + self.printt('Once we have (2B), we don\'t need (2A) or (2) anymore. We can regenerate them!',ret=True)
4 years ago
from getpass import getpass
passhash = None
4 years ago
4 years ago
while passhash!=passphrase:
res = self.status(
None,{shdr},False if passhash is None else True,
4 years ago
4 years ago
("pass",self.printt(f"Let's try. Re-type your password into @Hasher:",ret=True)+f" \n ",getpass)
4 years ago
)
passhash = self.persona.crypt_keys.hash(res.get('vals').get('pass').encode())
if passhash!=passphrase:
self.status({' Looks like they don\'t match. Try again?'},False)
4 years ago
4 years ago
self.status(
{' Excellent. We can now regenerate the decryption key:'},False,
{s2a,True},False,
)
4 years ago
4 years ago
# self.status('great')
# shdr2=
self.status(
# 2
None,{f'{shdr}\n\n{ART_KEY_PAIR_SPLITTING1}'},
{s2a,True},
False,
# 3
# None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING2}\n{s2a}'},
# {'\n'+s2,True},
# False,
# 4
None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING4}\n{s2a}\n\n\n\n'},
{'\n'+s2b,True},
False,
)
4 years ago
4 years ago
4 years ago
def run_cli(inp):
4 years ago
cli = CLI()
4 years ago
cli.run(inp) #'/register elon') #'/register',name='elon')
4 years ago
if __name__=='__main__':
4 years ago
inp = ' '.join(sys.argv[1:])
run_cli(inp)
4 years ago
# asyncio.run(test_async())
"""
Outtakes
self.status(None,
{ART_KEY_PAIR31A},
{ART_KEY_PAIR3B+'\n',True},
3,'Allow me to explain.',
'(2A) is a separate encryption key generated by your password.',
'(2B) is a version of (2) which has been encrypted by (2A).',
"Because (2) will be destroyed, to rebuild it requires decrypting (2B) with (2A).",
)
self.status(
None,{ART_KEY_PAIR5+'\n'},
"However, in a final move, I will now destroy (2A), too.",
None,{ART_KEY_PAIR4Z1+'\n'},
'Why? Because now only you can regenerate it by remembering the password which created it.',
# None,{ART_KEY_PAIR4Z1+'\n'},
'However, this also means that if you lose or forget your password, you\'re screwed.',
None,{ART_KEY_PAIR4Z2+'\n'},
"Because without key (2A),you couldn never unlock (2B).",
None,{ART_KEY_PAIR4Z3+'\n'},
"And without (2B) and (2A) together, you could never re-assemble the private key of (2).",
None,{ART_KEY_PAIR4Z42+'\n'},
"And without (2), you couldn't read messages sent to your public key.",
)
"""