@ -1,28 +1,76 @@
import os , sys ; sys . path . append ( os . path . abspath ( os . path . join ( os . path . abspath ( os . path . join ( os . path . dirname ( __file__ ) , ' .. ' ) ) , ' .. ' ) ) )
from komrade import *
from komrade . backend . crypt import *
from abc import ABC , abstractmethod
class KomradeKey ( ABC ) :
@abstractmethod
def encrypt ( self , msg , * * kwargs ) : pass
@abstractmethod
def decrypt ( self , msg , * * kwargs ) : pass
@abstractmethod
def data ( self ) : pass
class KomradeSymmetric ( SCellSeal ) :
class KomradeSymmetric Key( KomradeKey ) :
@property
def cell ( self ) :
if not hasattr ( self , ' _cell ' ) :
if hasattr ( self , ' passphrase ' ) and self . passphrase :
self . _cell = SCellSeal ( passphrase = passphrase )
self . _cell = SCellSeal ( passphrase = self . passphrase )
elif hasattr ( self , ' key ' ) and self . key :
self . _cell = SCellSeal ( key = key )
self . _cell = SCellSeal ( key = self . key )
return self . _cell
def encrypt ( self , msg , * * kwargs ) :
if issubclass ( type ( msg ) , KomradeKey ) : msg = msg . data
return self . cell . encrypt ( msg , * * kwargs )
def decrypt ( self , msg , * * kwargs ) :
return self . cell . decrypt ( msg , * * kwargs )
class KomradeSymmetricPass ( KomradeSymmetric ) :
class KomradeSymmetric KeyWith Passphrase ( KomradeSymmetric Key ) :
def __init__ ( self , passphrase = None , why = WHY_MSG ) :
self . passphrase = passphrase
if not self . passphrase :
self . passphrase = getpass . getpass ( why )
return self . passphrase
#return self.passphrase
@property
def data ( self ) : return KEY_TYPE_SYMMETRIC_WITH_PASSPHRASE . encode ( ' utf-8 ' )
class KomradeSymmetricKey ( SCellSeal ) :
class KomradeSymmetricKey WithoutPassphrase( KomradeSymmetricKey ) :
def __init__ ( self ) :
self . key = GenerateSymmetricKey ( )
@property
def data ( self ) : return self . key
class KomradeAsymmetricKey ( KomradeKey ) :
def __init__ ( self , pubkey , privkey ) :
self . pubkey = pubkey
self . privkey = privkey
def encrypt ( self , msg , pubkey = None , privkey = None ) :
if issubclass ( type ( msg ) , KomradeKey ) : msg = msg . data
pubkey = pubkey if pubkey else self . pubkey
privkey = privkey if privkey else self . privkey
return SMessage ( privkey , pubkey ) . wrap ( msg )
def decrypt ( self , msg , pubkey = None , privkey = None ) :
pubkey = pubkey if pubkey else self . pubkey
privkey = privkey if privkey else self . privkey
return SMessage ( privkey , pubkey ) . unwrap ( msg )
@property
def data ( self ) : return self . key
class KomradeAsymmetricPublicKey ( KomradeAsymmetricKey ) :
@property
def key ( self ) : return self . pubkey
class KomradeAsymmetricPrivateKey ( KomradeAsymmetricKey ) :
@property
def key ( self ) : return self . privkey
class Keymaker ( Logger ) :
@ -272,85 +320,135 @@ class Keymaker(Logger):
asymmetric_pubkey = None
asymmetric_privkey = None
keychain = defaultdict ( None )
for key_name , key_type_desc r in key_types . items ( ) :
if key_type_desc r in { KEY_TYPE_ASYMMETRIC_P RIV KEY, KEY_TYPE_ASYMMETRIC_PRIVKEY } :
for key_name , key_type_desc in key_types . items ( ) :
if key_type_desc in { KEY_TYPE_ASYMMETRIC_P UB KEY, KEY_TYPE_ASYMMETRIC_PRIVKEY } :
if not asymmetric_privkey or not asymmetric_pubkey :
keypair = GenerateKeyPair ( KEY_PAIR_TYPE . EC )
asymmetric_privkey = keypair . export_private_key ( )
asymmetric_pubkey = keypair . export_public_key ( )
if key_type_desc r == KEY_TYPE_ASYMMETRIC_PRIVKEY :
keychain [ key_name ] = asymmetric_privkey
elif key_type_desc r == KEY_TYPE_ASYMMETRIC_PUBKEY :
keychain [ key_name ] = asymmetric_pubkey
if key_type_desc == KEY_TYPE_ASYMMETRIC_PRIVKEY :
keychain [ key_name ] = KomradeAsymmetricPrivateKey( asymmetric_pubkey , asymmetric_privkey)
elif key_type_desc == KEY_TYPE_ASYMMETRIC_PUBKEY :
keychain [ key_name ] = KomradeAsymmetricPublicKey( asymmetric_pubkey, asymmetric_privkey )
elif key_type_desc == KEY_TYPE_SYMMETRIC_WITHOUT_PASSPHRASE :
keychain [ key_name ] = KomradeSymmetricKey ( )
keychain [ key_name ] = KomradeSymmetricKey WithoutPassphrase ( )
elif key_type_desc == KEY_TYPE_SYMMETRIC_WITH_PASSPHRASE :
if not passphrase and not self . passphrase :
self . passphrase = getpass . getpass ( WHY_MSG )
keychain [ key_name ] = KomradeSymmetricPass ( passphrase = passphrase if passphrase else self . passphrase )
if not passphrase and not self . passphrase : self . passphrase = getpass . getpass ( WHY_MSG )
passphrase = passphrase if passphrase else self . passphrase
keychain [ key_name ] = KomradeSymmetricKeyWithPassphrase ( passphrase = passphrase )
return keychain
def forge_new_keys ( self ,
name ,
name = None ,
keys_to_save = KEYMAKER_DEFAULT_KEYS_TO_SAVE ,
keys_to_return = KEYMAKER_DEFAULT_KEYS_TO_RETURN ,
keys_to_gen = KEYMAKER_DEFAULT_KEYS_TO_GEN ,
key_types = KEYMAKER_DEFAULT_KEY_TYPES ) :
self . log ( ' forging new keys... ' )
if not name : name = self . name
keys_to_gen = set ( keys_to_gen ) | set ( keys_to_save ) | set ( keys_to_return )
keys_to_gen = sorted ( list ( keys_to_gen ) , key = lambda x : x . count ( ' _ ' ) )
self . log ( ' keys_to_gen = ' , keys_to_gen )
key_types = dict ( [ ( k , key_types [ k ] ) for k in keys_to_gen ] )
self . log ( ' key_types = ' , key_types )
keychain = self . gen_keys_from_types ( key_types )
self . log ( ' keychain = ' , keychain )
self . log ( ' !!!! ' , keychain )
# stop
#keychain_tosave = defaultdict(None)
#keychain_toreturn = defaultdict(None)
self . log ( ' keys_to_save = ' , keys_to_save )
self . log ( ' keys_to_return = ' , keys_to_return )
for key_name in keys_to_gen :
if key_name . endswith ( ' _encr ' ) and key_name not in keychain :
# encrypt it with the associated decr
name_of_what_to_encrypt = key_name [ : - len ( ' _encr ' ) ]
the_key_to_encrypt_it_with = name_of_what_to_encrypt + ' _decr '
if the_key_to_encrypt_it_with in keychain and name_of_what_to_encrypt in keychain :
_key_decr = keychain [ the_key_to_encrypt_it_with ]
_key = keychain [ name_of_what_to_encrypt ]
self . log ( f ' about to encrypt key { name_of_what_to_encrypt } , using { the_key_to_encrypt_it_with } , which is a type { key_types [ the_key_to_encrypt_it_with ] } and has value { keychain [ the_key_to_encrypt_it_with ] } ' )
_key_encr = _key_decr . encrypt ( _key )
self . log ( f ' { _key } \n -- encrypting -----> \n { _key_encr } ' )
keychain [ key_name ] = _key_encr
self . log ( ' once more, with encryption! ' , keychain )
# filter for transfer
for k , v in keychain . items ( ) :
if issubclass ( type ( v ) , KomradeKey ) :
v = v . data
v = b64encode ( v )
keychain [ k ] = v
self . log ( ' --> ' , v )
# stop
# keychain_tosave = dict([(k,keychain[k]) for k in keys_to_save if k in keychain])
# Create public and private keys
keychain = { }
keypair = GenerateKeyPair ( KEY_PAIR_TYPE . EC )
keychain [ ' privkey ' ] = keypair . export_private_key ( )
keychain [ ' pubkey ' ] = keypair . export_public_key ( )
keychain [ ' adminkey ' ] = GenerateSymmetricKey ( )
# # Create decryption/permission keys
# keychain['pubkey_decr'] = GenerateSymmetricKey()
# keychain['privkey_decr'] = GenerateSymmetricKey()
# keychain['adminkey_decr'] = GenerateSymmetricKey() #SCellSeal(passphrase=passphrase)
# for k,v in keychain_tosave.items():
if ' pubkey ' in keys_to_save or ' privkey ' in keys_to_save or ' adminkey ' in keys_to_save :
raise KomradeException ( ' there is no private property in a socialist network! all keys must be split between komrades ' )
### SAVE ENCRYPTED KEYS?
if ' pubkey_encr ' in keys_to_save :
self . crypt_keys . set ( name , keychain [ ' pubkey_encr ' ] , prefix = ' /pubkey_encr/ ' )
if ' privkey_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' pubkey ' ] , keychain [ ' privkey_encr ' ] , prefix = ' /privkey_encr/ ' )
if ' adminkey_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' privkey ' ] , keychain [ ' adminkey_encr ' ] , prefix = ' /adminkey_encr/ ' )
if ' pubkey_encr_encr ' in keys_to_save :
self . crypt_keys . set ( self . name , keychain [ ' pubkey_decr_encr ' ] , prefix = ' /pubkey_decr_encr/ ' )
if ' privkey_encr_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' pubkey_decr ' ] , keychain [ ' privkey_decr_encr ' ] , prefix = ' /privkey_decr_encr/ ' )
if ' adminkey_encr_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' privkey_decr ' ] , keychain [ ' adminkey_decr_encr ' ] , prefix = ' /adminkey_decr_encr/ ' )
if ' pubkey_decr_encr ' in keys_to_save :
self . crypt_keys . set ( self . name , keychain [ ' pubkey_decr_encr ' ] , prefix = ' /pubkey_decr_encr/ ' )
if ' privkey_decr_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' pubkey_decr ' ] , keychain [ ' privkey_decr_encr ' ] , prefix = ' /privkey_decr_encr/ ' )
if ' adminkey_decr_encr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' privkey_decr ' ] , keychain [ ' adminkey_decr_encr ' ] , prefix = ' /adminkey_decr_encr/ ' )
# # Encrypt original keys
# keychain['pubkey_encr'] = SCellSeal(key=pubkey_decr).encrypt(pubkey)
# keychain['privkey_encr'] = SCellSeal(key=privkey_decr).encrypt(privkey)
# keychain['adminkey_encr'] = SCellSeal(key=adminkey_decr).encrypt(adminkey)
double_enc = [ ' pubkey_decr_encr ' , ' pubkey_encr_encr ' , ' pubkey_decr_encr ' , ' pubkey_encr_encr ' , ' pubkey_decr_encr ' , ' pubkey_encr_encr ' ]
for xkey in double_encr :
if xkey in to_return or xkey in to_save :
xkey_orig = xkey [ : - len ( ' _encr ' ) ]
keychain [ xkey ] = self . cell_dblencr . encrypt ( * * )
# store encrypted on my hardware
if save_encrypted :
self . crypt_keys . set ( name , pubkey_encr , prefix = ' /pubkey_encr/ ' )
self . crypt_keys . set ( pubkey , privkey_encr , prefix = ' /privkey_encr/ ' )
self . crypt_keys . set ( privkey , adminkey_encr , prefix = ' /adminkey_encr/ ' )
if ' pubkey_decr ' in keys_to_save :
self . crypt_keys . set ( name , keychain [ ' pubkey_decr ' ] , prefix = ' /pubkey_decr/ ' )
if ' privkey_decr ' in keys_to_save :
self . crypt_keys . set ( pubkey , keychain [ ' privkey_decr ' ] , prefix = ' /privkey_decr/ ' )
if ' adminkey_decr ' in keys_to_save :
self . crypt_keys . set ( privkey , keychain [ ' adminkey_decr ' ] , prefix = ' /adminkey_decr/ ' )
if ' pubkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( self . name , keychain [ ' pubkey_decr_decr ' ] , prefix = ' /pubkey_decr_decr/ ' )
if ' privkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' pubkey_decr ' ] , keychain [ ' privkey_decr_decr ' ] , prefix = ' /privkey_decr_decr/ ' )
if ' adminkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' privkey_decr ' ] , keychain [ ' adminkey_decr_decr ' ] , prefix = ' /adminkey_decr_decr/ ' )
if ' pubkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( self . name , keychain [ ' pubkey_decr_decr ' ] , prefix = ' /pubkey_decr_decr/ ' )
if ' privkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' pubkey_decr ' ] , keychain [ ' privkey_decr_decr ' ] , prefix = ' /privkey_decr_decr/ ' )
if ' adminkey_decr_decr ' in keys_to_save :
self . crypt_keys . set ( keychain [ ' privkey_decr ' ] , keychain [ ' adminkey_decr_decr ' ] , prefix = ' /adminkey_decr_decr/ ' )
# store permissions file?
secret_admin_val = pubkey_encr + BSEP + b ' find,read,admin '
if pubkey_is_public : secret_admin_val + = b ' * ' + BSEP + b ' find '
secret_admin_val_encr = SCellSeal ( key = adminkey ) . encrypt ( secret_admin_val )
if save_encrypted :
self . crypt_keys . set ( adminkey , secret_admin_val_encr , prefix = ' /permkey_encr/ ' )
# keep public key?
if pubkey_is_public :
self . crypt_keys . set ( name , pubkey_decr , prefix = ' /pubkey_decr/ ' )
# send back decryption keys to client
toreturn = { }
if return_decrypted :
toreturn [ ' pubkey_decr ' ] = pubkey_decr
toreturn [ ' privkey_decr ' ] = privkey_decr
toreturn [ ' adminkey_decr ' ] = adminkey_decr
# if 'permkey_encr' in keys_to_save:
# secret_admin_val = keychain['pubkey_encr'] + BSEP + b'find,read,admin'
# secret_admin_val_encr = SCellSeal(key=keychain['adminkey']).encrypt(secret_admin_val)
# self.crypt_keys.set(adminkey,secret_admin_val_encr,prefix='/permkey_encr/')
if return_encrypted :
toreturn [ ' pubkey_encr ' ] = pubkey_encr
toreturn [ ' privkey_encr ' ] = privkey_encr
toreturn [ ' adminkey_encr ' ] = adminkey_encr
return toreturn
keychain_toreturn = defaultdict ( None )
for k in keys_to_return :
if k in keychain :
keychain_toreturn [ k ] = keychain [ k ]
return keychain_toreturn
@property
def cell_dblencr ( self ) :
@ -437,4 +535,11 @@ class Keymaker(Logger):
if res :
_keychain [ keyname ] = res
return _keychain
if __name__ == ' __main__ ' :
keymaker = Keymaker ( ' marx69 ' )
keychain = keymaker . forge_new_keys ( )
print ( keychain )