import os , sys ; sys . path . append ( os . path . abspath ( os . path . join ( os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , ' .. ' ) ) , ' .. ' ) ) )
from comrad import *
from comrad . backend import *
import art , asyncio
import textwrap as tw
import readline , logging
readline . set_completer_delims ( ' \t ' )
# from tab_completer import tabCompleter
tabber = tabCompleter ( )
if ' libedit ' in readline . __doc__ :
readline . parse_and_bind ( " bind ^I rl_complete " )
else :
readline . parse_and_bind ( " tab: complete " )
torpy_logger = logging . getLogger ( ' torpy ' )
logging . getLogger ( ' urllib3 ' ) . propagate = False
logging . getLogger ( ' shapely ' ) . propagate = False
logging . getLogger ( ' pyproj ' ) . propagate = False
logging . getLogger ( ' rtree ' ) . propagate = False
logging . getLogger ( ' asyncio ' ) . setLevel ( logging . WARNING )
torpy_logger . propagate = False
from shutil import get_terminal_size
CLI_WIDTH = get_terminal_size ( ) . columns
CLI_HEIGHT = get_terminal_size ( ) . lines - 1
class CLI ( Logger ) :
ROUTES = {
' help ' : ' seek help ' ,
' register ' : ' join the comrades ' ,
' login ' : ' log back in ' ,
' meet ' : ' meet a comrad ' ,
' who ' : ' show contacts or info ' ,
' dm ' : ' write people ' ,
' refresh ' : ' refresh feed/DMs ' ,
' dms ' : ' read DMs ' ,
' feed ' : ' read posts ' ,
' verbose ' : ' show/hide log output ' ,
' post ' : ' post to world ' ,
' feed ' : ' fetch posts ' ,
' exit ' : ' exit comrad ' ,
' clearnet ' : ' switch to clearnet ' ,
' tor ' : ' switch to tor ' ,
}
def __init__ ( self , name = ' ' , cmd = ' ' , persona = None ) :
self . name = name
self . cmd = cmd
self . comrad = None
self . loggedin = False
self . tabber = tabber
self . log ( ' ROUTES: ' , self . ROUTES )
self . hops = [ ]
# Routes
rts = [ ' / ' + k for k in self . ROUTES ]
tabber . createListCompleter ( rts )
readline . set_completer ( tabber . listCompleter )
# logging
from comrad . backend . mazes import MazeWalker
self . walker = MazeWalker ( callbacks = self . callbacks )
self . torpy_logger = logging . getLogger ( ' torpy ' )
self . torpy_logger . propagate = False
self . torpy_logger . addHandler ( self . walker )
import ipinfo
ipinfo_access_token = ' 90df1baf7c373a '
self . ipinfo_handler = ipinfo . getHandler ( ipinfo_access_token )
def verbose ( self , * x ) :
self . toggle_log ( )
def run ( self , inp = ' ' , name = ' ' ) :
# if name: self.name=name
clear_screen ( )
self . boot ( )
if not inp :
self . help ( )
if inp :
for inpx in inp . split ( ' / ' ) :
self . route ( ' / ' + inpx . strip ( ) )
while True :
try :
inp = input ( f ' \n @ { self . name if self . name else " ? " } : ' )
# self.print(inp,'??')
self . route ( inp )
except ( KeyboardInterrupt , EOFError ) as e :
self . stat ( ' Goodbye. ' )
exit ( )
except ComradException as e :
self . stat ( f ' I could not handle your request. { e } \n ' )
#await asyncio.sleep(0.5)
self . walker . walk = [ ] # reset logger's walk?
def route ( self , inp ) :
inp = inp . strip ( )
# self.print('route got:',[inp])
if not inp . startswith ( ' / ' ) : return
cmd = inp . split ( ) [ 0 ]
dat = inp [ len ( cmd ) : ] . strip ( )
cmd = cmd [ 1 : ]
# self.print([cmd,dat])
if cmd in self . ROUTES and hasattr ( self , cmd ) :
f = getattr ( self , cmd )
try :
res = f ( dat )
except ComradException as e :
self . stat ( ' Message not sent. ' , str ( e ) , ' \n ' )
def stat ( self , * msgs , use_prefix = True , prefix = None , comrad_name = None , pause = False , clear = False , min_prefix_len = 12 , * * kwargs ) :
if hasattr ( self , ' _mapscr ' ) and self . _mapscr :
self . _mapscr . endwin ( )
self . _mapscr = None
self . hops = [ ]
if not prefix :
if not comrad_name : comrad_name = ' Telephone '
# prefix='Comrad @'+comrad_name+': '
prefix = ' @ ' + comrad_name + ' : '
# wrap msg
total_msg = wrapp (
* msgs ,
use_prefix = use_prefix ,
prefix = prefix if use_prefix and prefix else ' ' ,
min_prefix_len = min_prefix_len if use_prefix and prefix else 0
)
print ( total_msg )
# print()
if pause : do_pause ( )
if clear : clear_screen ( )
def print ( self , * x ) :
print ( * x )
# x=' '.join(str(xx) for xx in x)
# x=str(x).replace('\r\n','\n').replace('\r','\n')
# for ln in x.split('\n'):
# # #scan_print(ln+'\n\n')
# if not ln: print()
# for ln2 in tw.wrap(ln,CLI_WIDTH):
# print(ln2)
# # x='\n'.join(tw.wrap(x,CLI_WIDTH))
# # print(x)
def boot ( self , indent = None , scan = True ) :
logo = art . text2art ( CLI_TITLE , font = CLI_FONT ) . replace ( ' \r \n ' , ' \n ' )
newlogo = [ ]
for logo_ln in logo . split ( ' \n ' ) :
logo_ln = logo_ln . center ( CLI_WIDTH , ' ' )
newlogo . append ( logo_ln )
newlogo_s = ' \n ' . join ( newlogo )
scan_print ( newlogo_s , speed = 5 ) if scan else self . print ( newlogo_s )
def status_str ( self , unr , tot ) :
read = tot - unr
return f ' ( { unr } *) ' if unr else f ' ( { unr } ) '
# return f'({unr}*)' if unr else f'({unr})'
# return f'{unr}* of {tot}' if tot else str(tot)
@property
def post_status_str ( self ) :
if not self . comrad : return ' '
return self . status_str (
unr = self . comrad . num_unread_posts ,
tot = self . comrad . num_posts
)
@property
def msg_status_str ( self ) :
if not self . comrad : return ' '
return self . status_str (
unr = self . comrad . num_unread_msgs ,
tot = self . comrad . num_msgs
)
def clearnet ( self , _ = ' ' ) :
os . environ [ ' COMRAD_USE_CLEARNET ' ] = ' 1 '
os . environ [ ' COMRAD_USE_TOR ' ] = ' 0 '
def tor ( self , _ = ' ' ) :
os . environ [ ' COMRAD_USE_CLEARNET ' ] = ' 0 '
os . environ [ ' COMRAD_USE_TOR ' ] = ' 1 '
def help ( self , * x , * * y ) :
clear_screen ( )
self . boot ( scan = False )
if not self . logged_in :
HELPSTR = f """
/ login [ name ] - - > log back in
/ register [ name ] - - > new comrad """
else :
HELPSTR = f """
/ refresh - - > get new data
/ feed - - > scroll feed { self . post_status_str }
/ post - - > post to all
/ dms - - > see DMs { self . msg_status_str }
/ dm [ name ] - - > send a DM
/ meet [ name ] - - > exchange info
/ who [ name ] - - > show contacts
"""
HELPSTR + = f """
/ help - - > seek help
/ exit - - > exit app
"""
helpstr = tw . indent ( HELPSTR . strip ( ) + ' \n \n ' , ' ' * 11 )
# self.print(helpstr)
# self.print(border+helpstr+'\n'+self.border)
lns = [ ln . strip ( ) for ln in helpstr . split ( ' \n ' ) ]
maxlen = max ( [ len ( ln ) for ln in lns ] )
for ln in lns :
for n in range ( len ( ln ) , maxlen ) : ln + = ' '
print ( ln . center ( CLI_WIDTH ) )
def exit ( self , dat = ' ' ) :
exit ( ' Goodbye. ' )
@property
def border ( self ) :
border = ' - ' * CLI_WIDTH # (len(logo.strip().split('\n')[0]))
border = tw . indent ( border , ' ' * 2 )
return border
def intro ( self ) :
self . status ( None )
def who ( self , whom ) :
if self . with_required_login ( ) :
contacts_obj = self . comrad . contacts ( )
contacts = [ p . name for p in contacts_obj ]
self . print ( ' ' + ' \n ' . join ( contacts ) )
@property
def callbacks ( self ) :
return {
' torpy_guard_node_connect ' : self . callback_on_hop ,
' torpy_extend_circuit ' : self . callback_on_hop ,
}
def callback_on_hop ( self , rtr ) :
def run_map ( ) :
from worldmap_curses import make_map #,print_map_simple
# clear_screen()
if not hasattr ( self , ' _mapscr ' ) or not self . _mapscr :
self . _mapscr = mapscr = make_map ( )
mapscr . add_base_map ( )
# stop
else :
mapscr = self . _mapscr
# return
# mapscr.stdscr.getch()
deets = self . ipinfo_handler . getDetails ( rtr . ip )
if rtr . ip not in { hop [ 0 ] . ip for hop in self . hops } :
self . hops . append ( ( rtr , deets ) )
places = [
( _deets . city , tuple ( float ( _ ) for _ in _deets . loc . split ( ' , ' ) ) )
for ( _rtr , _deets ) in [ ( rtr , deets ) ]
]
msg = [ ' @Tor: Hiding your IP by hopping it around the globe: ' ] + [ f ' { _deets . city } , { _deets . country_name } ( { _rtr . nickname } ) ' for _rtr , _deets in self . hops
]
mapscr . run_print_map ( places , msg = msg )
# self.stat(
# msg,
# comrad_name='Tor'
# )
# print()
# input('pausing on callback: '+msg)
run_map ( )
# self._mapscr=None
def register ( self , name = None ) :
if not name : name = input ( ' name: ' )
if not name : return
self . comrad = Comrad ( name , callbacks = self . callbacks )
was_off = self . off
# if was_off: self.show_log()
def logfunc ( * x , comrad_name = ' Keymaker ' , * * y ) :
self . stat ( * x , comrad_name = comrad_name , * * y )
res = self . comrad . register ( logfunc = logfunc )
# if was_off: self.toggle_log()
if res and type ( res ) == dict and ' success ' in res and res [ ' success ' ] :
self . name = self . comrad . name
self . loggedin = True
self . help ( )
# self.stat(f'Welcome, Comrad @{self.name}.')
else :
self . name = None
self . loggedin = False
self . comrad = None
self . help ( )
if res and ' status ' in res :
# self.boot()
self . stat ( res . get ( ' status ' , ' ? ' ) , comrad_name = ' Operator ' )
def login ( self , name ) :
# self.print(self,name,self.name,self.comrad,self.loggedin)
if not name : name = input ( ' name: ' )
if not name : return
self . comrad = commie = Comrad ( name , callbacks = self . callbacks )
if commie . exists_locally_as_account ( ) :
from getpass import getpass
# pw=getpass(f'\n@Telephone: Welcome back. Your password please?\n@{name}: ')
pw = getpass ( f ' password: ' )
commie . keychain ( passphrase = pw )
self . log ( f ' updated keychain: { dict_format ( commie . keychain ( ) ) } ' )
self . log ( f ' is account ' )
# self.login_status.text='You should be able to log into this account.'
if commie . privkey :
self . log ( f ' passkey login succeeded ' )
self . stat ( f ' Welcome back, Comrad @ { commie . name } ' )
self . loggedin = True
self . name = commie . name
self . help ( )
else :
logger . info ( f ' passkey login failed ' )
self . stat ( ' Login failed... ' )
else :
self . stat ( ' This is not an account of yours. ' )
# return self.refresh()
# res = self.comrad.login()
# return self.do_login(res)
def do_login ( self , res ) :
# print('got login res:',res)
self . log ( ' <- comrad.login() <- ' , res )
if res and type ( res ) == dict and ' success ' in res and res [ ' success ' ] :
self . name = res [ ' name ' ]
self . comrad = Comrad ( res [ ' name ' ] , callbacks = self . callbacks )
self . loggedin = True
else :
self . name = None
self . loggedin = False
self . comrad = None
if res and ' status ' in res :
self . boot ( scan = False )
self . help ( )
self . stat ( res . get ( ' status ' , ' ? ' ) , comrad_name = ' Operator ' )
return bool ( res . get ( ' success ' ) )
@property
def logged_in ( self ) :
return ( self . loggedin and self . comrad and self . name )
def with_required_login ( self , quiet = False ) :
if not self . logged_in :
if not quiet :
self . stat ( ' You must be logged in first. ' )
return False
return True
def meet ( self , dat , returning = False ) :
if self . with_required_login ( ) :
datl = dat . strip ( ) . split ( )
if not datl :
self . stat ( ' Meet whom? ' )
return
name_or_pubkey = datl [ 0 ]
res = self . comrad . meet ( name_or_pubkey , returning = returning )
status = res . get ( ' status ' )
self . stat ( status )
def dm ( self , dat = ' ' , name_or_pubkey = None , msg_s = None ) :
if self . with_required_login ( ) :
if not name_or_pubkey :
dat = dat . strip ( )
if not dat :
self . status ( ' Message whom? Usage: /msg [name] ' )
return
datl = dat . split ( ' ' , 1 )
name_or_pubkey = datl [ 0 ]
if name_or_pubkey . startswith ( ' @ ' ) :
name_or_pubkey = name_or_pubkey [ 1 : ]
if not msg_s :
datl = dat . split ( )
if len ( datl ) == 1 :
print ( )
self . stat ( f ' Compose your message to @ { name_or_pubkey } below. ' , ' Press Ctrl+D to complete, or Ctrl+C to cancel. ' )
print ( )
msg_s = multiline_input ( ) . strip ( )
if not msg_s :
print ( ' \n ' )
self . stat ( ' Not sending. No message found. ' )
return
else :
msg_s = datl [ 1 ]
self . log ( f ' Composed msg to { name_or_pubkey } : { msg_s } ' )
msg_obj = self . comrad . msg (
name_or_pubkey ,
msg_s
)
self . log ( f ' Sent msg obj to { name_or_pubkey } : { msg_obj } ' )
print ( )
self . stat ( f ' Message successfully sent to @ { name_or_pubkey } . ' , comrad_name = ' Operator ' , pause = True )
def refresh ( self , dat = None , res = None , statd = { } ) :
self . log ( f ' <-- dat= { dat } , res= { res } ' )
# stop
## get updates
# this does login, msgs, and posts in one req
time . sleep ( 0.25 )
if not self . comrad :
self . stat ( ' You must login first. ' )
res = self . comrad . refresh ( )
#print(res.get('success'))
if not res . get ( ' success ' ) :
self . stat ( res . get ( ' status ' ) , comrad_name = ' ' )
return
# self.stat('Patching you through to the @Operator. One moment please...')
if hasattr ( self , ' _mapscr ' ) and self . _mapscr :
#stop
msg = self . _mapscr . msg + [ ' ???, ??? (@Operator) ' ]
self . _mapscr . run_print_map ( msg = msg )
time . sleep ( 1 )
self . _mapscr . endwin ( )
self . _mapscr = None
self . hops = [ ]
self . log ( ' <-- get_updates ' , res )
# check logged in
# res_login=res.get('res_login',{})
# if not self.do_login(res_login): return
# self.stat('',res['status'],comrad_name='Operator',**statd)
self . help ( )
return res
def prompt_adduser ( self , msg ) :
# self.print('prompt got:',msg)
# self.print(msg.data)
do_pause ( )
# clear_screen()
# print(dict_format(msg.data))
# do_pause()
meet_name = msg . data . get ( ' meet_name ' )
meet_uri = msg . data . get ( ' meet ' )
qrstr = self . comrad . qr_str ( meet_uri )
self . stat ( f " Add @ { meet_name } ' s public key to your address book? " , f ' It will allow you and @ { meet_name } to read and write encrypted messages to one another. ' )
do_adduser = input ( f ''' \n { self . comrad } [y/N]: ''' )
if do_adduser . strip ( ) . lower ( ) == ' y ' :
import pyqrcode
print ( ' meet_uri ' , meet_uri , ' ??? ' )
qr = pyqrcode . create ( meet_uri )
fnfn = os . path . join ( PATH_QRCODES , meet_name + ' .png ' ) # self.get_path_qrcode(name=name)
qr . png ( fnfn , scale = 5 )
clear_screen ( )
self . stat ( f ' The public key of @ { meet_name } has been saved as a QRcode to { fnfn } ' )
print ( qrstr )
do_pause ( )
clear_screen ( )
# print(msg.data)
# if not msg.data.get('returning'):
# bit hacky way to tell if this has been returned or not!
if ' has agreed ' not in msg . data . get ( ' txt ' , ' ' ) :
self . stat ( ' Send this comrad your public key as well? ' )
do_senduser = input ( f ''' \n { self . comrad } [y/N]: ''' )
if do_senduser . strip ( ) . lower ( ) == ' y ' :
res = self . comrad . meet ( meet_name , returning = True )
if res . get ( ' success ' ) :
self . log ( ' res?! ' , res )
self . stat ( ' Returning the invitation: ' , f ' " { res . get ( " res " , { } ) . get ( " msg_d " , { } ) . get ( " msg " , { } ) . get ( " txt " , " [?] " ) } " ' , use_prefix = True )
do_pause ( )
else :
self . stat ( msg . get ( ' status ' ) )
def prompt_msg ( self , msg ) :
clear_screen ( )
print ( msg )
self . stat ( ' Type " r " to reply to this message, " d " to delete it, hit Enter to continue, or type " q " to return to main menu. ' , use_prefix = False )
do = input ( f ' \n { self . comrad } : ' )
do = do . strip ( ) . lower ( )
if do == ' d ' :
# self.print('del',msg.post_id)
res = self . comrad . delete_post ( msg . post_id )
if res . get ( ' success ' ) :
self . stat ( ' Deleted message. ' )
else :
self . stat ( ' Could not delete message. ' )
do_pause ( )
elif do == ' r ' :
# self.print('@todo: replying...')
return self . dm ( msg . from_name )
elif do == ' q ' :
raise EOFError
else :
# seen this msg!
self . comrad . seen_msg ( msg )
pass
def dms ( self , dat = ' ' ) :
if self . with_required_login ( ) :
msgs = self . comrad . messages ( )
return self . read ( msgs )
def feed ( self , dat = ' ' ) :
if self . with_required_login ( ) :
posts = self . comrad . posts ( )
return self . read ( posts )
def read ( self , msgs ) :
if not msgs :
self . stat ( ' No messages. ' )
else :
clear_screen ( )
for i , msg in enumerate ( msgs ) :
try :
self . stat ( f ' Showing message { i + 1 } of { len ( msgs ) } , from newest to oldest. Hit Ctrl+D to exit. ' )
print ( )
print ( msg )
self . log ( ' DATA ' , msg . msg_d )
if msg . data . get ( ' prompt_id ' ) == ' addcontact ' :
self . prompt_adduser ( msg )
self . prompt_msg ( msg )
clear_screen ( )
except EOFError :
break
self . help ( )
def post ( self , dat = ' ' , maxlen = MAX_POST_LEN ) :
if self . with_required_login ( ) :
self . stat ( f ' Write your post below. Maximum of 1000 characters. ' , ' Press Ctrl+D to complete, or Ctrl+C to cancel. ' )
print ( )
msg_s = multiline_input ( ) #.strip()
if not msg_s :
print ( ' \n ' )
self . stat ( ' Not sending. No text entered. ' )
return
if len ( msg_s ) > maxlen :
msg1 = f ' Not sending. Message is { len ( msg_s ) - maxlen } characters over the character limit of { maxlen } . '
msg2 = ' The message you wanted to send is copied below (in case you want to copy/paste it to edit it): ' ,
msg3 = ' The message you wanted to send is copied above (in case you want to copy/paste it to edit it). '
err = f ' { msg1 } \n \n { msg2 } \n \n { msg_s } '
err2 = f ' { msg1 } \n \n { msg3 } '
print ( )
self . stat ( msg1 )
# self.stat(err2)
return
self . log ( f ' Post written: { msg_s } ' )
msg_obj = self . comrad . post (
msg_s
)
self . log ( f ' Posted: { msg_obj } ' )
print ( )
self . stat ( f ' Post sent to @ { WORLD_NAME } . ' , comrad_name = ' Operator ' , pause = True )
### DIALOGUES
# hello, op?
def status_keymaker_part1 ( self , name ) :
self . status ( None , { ART_OLDPHONE4 + ' \n ' , True } , 3 ) #,scan=False,width=None,pause=None,clear=None)
nm = name if name else ' ? '
self . status (
f ' \n \n \n @ { nm } : Uh yes hello, Operator? I would like to join Comrad, the socialist network. Could you patch me through? ' , clear = False )
while not name :
name = self . status ( ( ' name ' , ' @TheTelephone: Of course, Comrad...? \n @ ' ) ) . get ( ' vals ' ) . get ( ' name ' ) . strip ( )
self . print ( )
self . status (
f ' @TheTelephone: Of course, Comrad @ { name } . A fine name. ' ,
''' @TheTelephone: However, I ' m just the local operator who lives on your device; my only job is to communicate with the remote operator securely. ''' ,
''' Comrad @TheOperator lives on the deep web. She ' s the one you want to speak with. ''' ,
None , { ART_OLDPHONE4 } , f ''' @ { name } : Hm, ok. Well, could you patch me through to the remote operator then? ''' ,
f ''' @ { TELEPHONEname } : I could, but it ' s not safe yet. Your information could be exposed. You need to cut your encryption keys first. ''' ,
f ' @ { name } : Fine, but how do I do that? ' ,
f ' @ { TELEPHONEname } : Visit the Keymaker. ' ,
clear = False , pause = True )
### KEYMAKER
self . status ( None , { tw . indent ( ART_KEY , ' ' * 5 ) + ' \n ' , True } , 3 ) #,clear=False,indent=10,pause=False)
# convo
self . status (
f ' \n @ { name } : Hello, Comrad @Keymaker? I would like help forging a new set of keys. ' ,
f ' @Keymaker: Of course, Comrad @ { name } . ' ,
)
self . status (
' I will cut for you two matching keys, part of an " asymmetric " pair. ' ,
' Please, watch me work. ' ,
None , { tw . indent ( ART_KEY , ' ' * 5 ) + ' \n ' } ,
' I use a high-level cryptographic function from Themis, a well-respected open-source cryptography library. ' ,
' I use the iron-clad Elliptic Curve algorthm to generate the asymmetric keypair. ' ,
' > GenerateKeyPair(KEY_PAIR_TYPE.EC) ' ,
3
)
self . status (
None ,
{ ART_KEY_PAIR , True }
) #,clear=False,indent=10,pause=False)
return name
def status_keymaker_part2 ( self , name , passphrase , pubkey , privkey , hasher , persona ) :
from getpass import getpass
# gen what we need
uri_id = pubkey . data_b64
qr_str = get_qr_str ( uri_id )
qr_path = os . path . join ( PATH_QRCODES , name + ' .png ' )
# what are pub/priv?
# self.status(
# None,{ART_KEY_PAIR},
# 'A matching set of keys have been generated.',
# None,{ART_KEY_PAIR2A+'\nA matching set of keys have been generated.'+'\n'},
# 'First, I have made a "public key" which you can share with anyone:',
# f'(1) {pubkey.data_b64.decode()}',
# 'This key is a randomly-generated binary string, which acts as your "address" on Comrad.',
# 'With it, someone can write you an encrypted message which only you can read.'
# )
# self.status(
# None,{ART_KEY_PAIR2A},
# f'You can share your public key by copy/pasting it to them over a secure channel (e.g. Signal).',
# 'Or, you can share it as a QR code, especially IRL:',
# {qr_str+'\n\n',True,5},
# f'\n\n(If registration is successful, this QR code be saved as an image to your device at: {qr_path}.)'
# )
# private keys
self . status ( None ,
{ ART_KEY_PAIR2B } ,
' Second, I have cut a matching " private key " . ' ,
" It ' s too dangerous to show in full, so here it is 66 % r edacted: " ,
f ' (2) { make_key_discreet ( privkey . data_b64 , 0.3 ) } ' ,
' With it, you can decrypt and read any message sent to you via your public key. ' ,
' You can also encrypt and send messages to other people whose public keys you have. ' ,
)
# private keys
self . status ( None ,
{ CUBEKEY } ,
' So if someone were to steal your private key, they could read your mail and forge your signature. '
' You you should never, ever give your private key to anyone. ' ,
' In fact, this key is so dangerous that we need to lock it away immediately. ' ,
" We ' ll even throw away the key we use to lock this private key with! " ,
" How? By regenerating it each time from your password. " ,
)
if not passphrase :
from getpass import getpass
self . status (
' And it looks like you haven \' t yet chosen a password. ' ,
3 , " Don ' t tell it to me! Never tell it to anyone. " ,
" Ideally, don ' t even save it on your computer; just remember it, or write it down on paper. " ,
" Instead, whisper it to Comrad @Hasher, who scrambles information ' 1-way ' , like a blender. " ,
)
res = self . status ( None ,
{ indent_str ( ART_FROG_BLENDER , 10 ) , True } ,
" @Keymaker: Go ahead, try it. Type anything to @Hasher. " ,
( ' str_to_hash ' , f ' @ { name } : ' , input )
)
str_to_hash = res . get ( ' vals ' ) . get ( ' str_to_hash ' )
hashed_str1 = hasher ( str_to_hash . encode ( ) )
res = self . status (
' @Hasher: ' + hashed_str1
)
res = self . status (
' @Keymaker: Whatever you typed, there \' s no way to reconstruct it from that garbled mess. ' ,
' But whatever you typed will always produce the *same* garbled mess. ' ,
( ' str_to_hash ' , f ' Try typing the exact same thing over again: \n @ { name } : ' , input )
)
str_to_hash = res . get ( ' vals ' ) . get ( ' str_to_hash ' )
hashed_str2 = hasher ( str_to_hash . encode ( ) )
res = self . status (
' @Hasher: ' + hashed_str2
)
if hashed_str1 == hashed_str2 :
self . status ( ' See how the hashed values are also exactly the same? ' )
else :
self . status ( ' See how the hashed values have also changed? ' )
res = self . status (
( ' str_to_hash ' , f ' Now try typing something just a little bit different: \n @ { name } : ' , input )
)
str_to_hash = res . get ( ' vals ' ) . get ( ' str_to_hash ' )
hashed_str3 = hasher ( str_to_hash . encode ( ) )
res = self . status (
' @Hasher: ' + hashed_str3
)
if hashed_str2 == hashed_str3 :
self . status ( ' See how the hashed values are also the same? ' )
else :
self . status ( ' See how the hashed values have also changed? ' )
self . status (
None , { indent_str ( ART_FROG_BLENDER , 10 ) } ,
' @Keymaker: Behind the scenes, @Hasher is using the SHA-256 hashing function, which was designed by the NSA. ' ,
' But @Hasher also adds a secret " salt " to the recipe, as it \' s called. ' ,
' To whatever you type in, @Hasher adds a secret phrase: another random string of characters which never changes. ' ,
" By doing so, the hash output is \" salted \" : made even more idiosyncratic to outside observers. " ,
)
self . status (
None , { indent_str ( ART_FROG_BLENDER , 10 ) } ,
f " I ' ve taken the liberty of generating a random secret for your device, which I show here mostly redacted: " ,
make_key_discreet_str ( persona . crypt_keys . secret . decode ( ) , 0.25 ) ,
' The full version of this secret is silently added to every input you type into @Hasher. ' ,
" I ' ve saved this secret phrase to a hidden location on your device hardware. " ,
)
self . status (
None , { indent_str ( ART_FROG_BLENDER , 10 ) } ,
' However, this means that you will be unable to log in to your account from any other device. ' ,
' This limitation provides yet another level of hardware protection against network attacks. ' ,
' However, you can always choose (not recommended) to the secret file with another device by a secure channel. ' ,
3 , f ' But, please excuse me Comrad @ { name } -- I digress. '
)
while not passphrase :
res = self . status ( None ,
{ indent_str ( ART_FROG_BLENDER , 10 ) } ,
" @Keymaker: Please type your chosen password into @Hasher. " ,
( ' str_to_hash ' , f ' \n @ { name } : ' , getpass ) ,
pause = False
)
str_to_hash = res . get ( ' vals ' ) . get ( ' str_to_hash ' )
hashed_pass1 = hasher ( str_to_hash . encode ( ) )
res = self . status (
' \n @Hasher: ' + hashed_pass1 ,
pause = False
)
res = self . status (
' \n Now type in the same password one more time to verify it: ' ,
( ' str_to_hash ' , f ' \n @ { name } : ' , getpass ) ,
pause = False
)
str_to_hash = res . get ( ' vals ' ) . get ( ' str_to_hash ' )
hashed_pass2 = hasher ( str_to_hash . encode ( ) )
res = self . status (
' \n @Hasher: ' + hashed_pass2 ,
pause = False
)
if hashed_pass1 == hashed_pass2 :
self . status ( ' ' , ' @Keymaker: Excellent. The passwords clearly matched, because the hashed values matched. ' , pause = False )
passphrase = hashed_pass1
else :
self . status ( ' @Keymaker: A pity. It looks like the passwords didn \' t match, since the hashed values didn \' t match either. Try again? ' )
return passphrase
def status_keymaker_part3 ( self , privkey , privkey_decr , privkey_encr , passphrase ) :
self . status (
None , { tw . indent ( ART_KEY , ' ' * 5 ) + ' \n ' , True } ,
# None,{ART_+'\n',True},
' Now that we have a hashed passphrase, we can generate the (2A) encryption key. ' ,
{ ART_KEY_KEY2A , True , 0.1 } ,
''' The key is formed using Themis ' s high-level symmetric encryption library: SecureCell, using Seal mode. ''' ,
' This key (2A) then uses the AES-256 encryption algorithm to encrypt the super-sensitive private key (2): '
)
s0 = str . center ( ' [Encryption Process] ' , CLI_WIDTH )
s1 = s0 + ' \n \n ' + self . printt ( ' Now that we have (2A), we can use it to encrypt the super-sensitive private key (2): ' , ret = True )
s2a = self . printt ( f " (2A) { make_key_discreet_str ( passphrase ) } " , ret = True )
s2 = self . printt ( f " (2) { make_key_discreet ( privkey . data_b64 ) } " , ret = True )
s2b = self . printt ( f " (2B) { make_key_discreet ( b64encode ( privkey_encr ) ) } " , ret = True )
self . status (
# screen 1
None , { f ' { s1 } ' } ,
False ,
# 2
None , { f ' { s1 } \n \n { ART_KEY_PAIR_SPLITTING1 } ' } ,
{ s2a , True } ,
False ,
# 3
None , { f ' { s1 } \n \n { ART_KEY_PAIR_SPLITTING2 } \n { s2a } ' } ,
{ ' \n ' + s2 , True } ,
False ,
# 4
None , { f ' { s1 } \n \n { ART_KEY_PAIR_SPLITTING3 } \n { s2a } \n \n { s2 } ' } ,
{ ' \n ' + s2b , True } ,
False ,
)
shdr = str . center ( ' [Decryption Process] ' , CLI_WIDTH ) + ' \n \n ' + self . printt ( ' Once we have (2B), we don \' t need (2A) or (2) anymore. We can regenerate them! ' , ret = True )
from getpass import getpass
passhash = None
while passhash != passphrase :
res = self . status (
None , { shdr } , False if passhash is None else True ,
( " pass " , self . printt ( f " Let ' s try. Re-type your password into @Hasher: " , ret = True ) + f " \n " , getpass )
)
passhash = self . persona . crypt_keys . hash ( res . get ( ' vals ' ) . get ( ' pass ' ) . encode ( ) )
if passhash != passphrase :
self . status ( { ' Looks like they don \' t match. Try again? ' } , False )
self . status (
{ ' Excellent. We can now regenerate the decryption key: ' } , False ,
{ s2a , True } , False ,
)
# self.status('great')
# shdr2=
self . status (
# 2
None , { f ' { shdr } \n \n { ART_KEY_PAIR_SPLITTING1 } ' } ,
{ s2a , True } ,
False ,
# 3
# None,{f'{s1}\n\n{ART_KEY_PAIR_SPLITTING2}\n{s2a}'},
# {'\n'+s2,True},
# False,
# 4
None , { f ' { s1 } \n \n { ART_KEY_PAIR_SPLITTING4 } \n { s2a } \n \n \n \n ' } ,
{ ' \n ' + s2b , True } ,
False ,
)
def run_cli ( inp ) :
cli = CLI ( )
cli . run ( inp ) #'/register elon') #'/register',name='elon')
if __name__ == ' __main__ ' :
inp = ' ' . join ( sys . argv [ 1 : ] )
run_cli ( inp )
# asyncio.run(test_async())
"""
Outtakes
self . status ( None ,
{ ART_KEY_PAIR31A } ,
{ ART_KEY_PAIR3B + ' \n ' , True } ,
3 , ' Allow me to explain. ' ,
' (2A) is a separate encryption key generated by your password. ' ,
' (2B) is a version of (2) which has been encrypted by (2A). ' ,
" Because (2) will be destroyed, to rebuild it requires decrypting (2B) with (2A). " ,
)
self . status (
None , { ART_KEY_PAIR5 + ' \n ' } ,
" However, in a final move, I will now destroy (2A), too. " ,
None , { ART_KEY_PAIR4Z1 + ' \n ' } ,
' Why? Because now only you can regenerate it by remembering the password which created it. ' ,
# None,{ART_KEY_PAIR4Z1+'\n'},
' However, this also means that if you lose or forget your password, you \' re screwed. ' ,
None , { ART_KEY_PAIR4Z2 + ' \n ' } ,
" Because without key (2A),you couldn never unlock (2B). " ,
None , { ART_KEY_PAIR4Z3 + ' \n ' } ,
" And without (2B) and (2A) together, you could never re-assemble the private key of (2). " ,
None , { ART_KEY_PAIR4Z42 + ' \n ' } ,
" And without (2), you couldn ' t read messages sent to your public key. " ,
)
"""