have encryption working!

macdev
quadrismegistus 4 years ago
parent e6929ae7c7
commit 8cbf1307ea

@ -8,7 +8,7 @@ HORIZONTAL = False #random.choice([True,True,True,False])
FACTOR=1
WINDOW_SIZE = (1136*FACTOR,640*FACTOR) if HORIZONTAL else (640*FACTOR,1136*FACTOR)
BG_IMG='assets/bg-russiangreen.png'
BG_IMG='assets/bg-brown.png'
grass=(201,203,163)
russiangreen = (109,140,96)
@ -44,8 +44,8 @@ dutchwhite=229,219,181
COLOR_TOOLBAR= smokyblack #5,5,5 #russiangreen #pinetreegreen #kombugreen #(12,5,5) #russiangreen
COLOR_BG = (0,73,54)
# COLOR_ICON = (201,203,163)
COLOR_LOGO = dutchwhite#russiangreen #(0,0,0) #(0,0,0) #(151,177,140) #(132,162,118) #(109,140,106)
COLOR_ICON = dutchwhite#russiangreen #(0,0,0) #COLOR_LOGO
COLOR_LOGO = grass#russiangreen #(0,0,0) #(0,0,0) #(151,177,140) #(132,162,118) #(109,140,106)
COLOR_ICON = grass#russiangreen #(0,0,0) #COLOR_LOGO
COLOR_TEXT =dutchwhite #(241,233,203) #COLOR_ICON #(207,219,204) #(239,235,206) # (194,211,187) # (171,189,163) # (222,224,198) # COLOR_LOGO #(223, 223, 212)
COLOR_CARD = smokyblack #skin2 #huntergreen #(30,23,20) #(51,73,45) # (67,92,61) #(12,9,10)
# COLOR_TOOLBAR = (8s9,59,43)

@ -130,7 +130,7 @@ class Api(object):
return await _get()
def well_documented_val(self,val,sep=BSEP,encrypt=True):
def well_documented_val(self,val,sep=BSEP,do_encrypt=True,receiver_pubkey=None):
"""
What do we want to store with
@ -139,6 +139,7 @@ class Api(object):
3) Public key of author
4) Signature of value by author
"""
import time
timestamp=time.time()
self.log('timestamp =',timestamp)
@ -153,22 +154,32 @@ class Api(object):
# encrypt?
self.log('value while unencrypted =',value_bytes)
value_bytes = encrypt_msg(value_bytes, self.public_key_global)
self.log('value while encrypted = ',value_bytes)
if not receiver_pubkey: receiver_pubkey=self.public_key_global
# self.log('value while unencrypted =',value_bytes)
# value_bytes = encrypt_msg(value_bytes, self.public_key_global)
res = encrypt(value_bytes, self.private_key, receiver_pubkey)
#aes_ciphertext, encry_aes_key, hmac, hmac_signature, iv, metadata
self.log('value while encrypted = ',res)
# stop
aes_ciphertext, encry_aes_key, iv = res
#self.log('value =',value)
pem_public_key = save_public_key(self.public_key,return_instead=True)
pem_public_key = serialize_pubkey(receiver_pubkey)
#self.log('pem_public_key =',pem_public_key)
# stop
self.log('aes_ciphertext = ',aes_ciphertext,type(aes_ciphertext))
signature = sign_msg(value_bytes, self.private_key)
signature = sign(aes_ciphertext, self.private_key)
self.log('signature =',signature)
## Verify!
authentic = verify_msg(value_bytes,signature,self.public_key)
authentic = verify_signature(signature, aes_ciphertext, self.public_key)
self.log('message is authentic for set??',authentic)
# value_bytes_ascii = ''.join([chr(x) for x in value_bytes])
@ -176,36 +187,69 @@ class Api(object):
# jsond = {'time':timestamp, 'val':value_bytes_ascii, 'pub':pem_public_key, 'sign':signature}
# jsonstr=jsonify(jsond)
WDV = sep.join([str(timestamp).encode(), value_bytes,pem_public_key,signature])
# self.log('WDV = 'mWD)
# self.log(WDV.split(sep))
# self.log('well_documented_val() =',WDV)
time_b=str(timestamp).encode()
val_encr = aes_ciphertext
val_encr_key = encry_aes_key
sender_pubkey_b = serialize_pubkey(self.public_key)
receiver_pubkey_b = serialize_pubkey(receiver_pubkey)
signature = signature
WDV = sep.join([
time_b,
val_encr,
val_encr_key,
iv,
receiver_pubkey_b,
sender_pubkey_b,
signature
])
self.log('well_documented_val() =',WDV)
return WDV
async def unpack_well_documented_val(self,WDV,sep=BSEP):
async def unpack_well_documented_val(self,WDV,sep=BSEP,private_key=None):
self.log('WDV???',WDV)
time,val,pub,sign = WDV.split(sep)
pub=load_public_key(pub)
if WDV is None: return WDV
#WDV=[base64.b64decode(x) for x in WDV.split(sep)]
WDV=WDV.split(sep)
self.log('WDV NEW:',WDV)
time_b,val_encr,val_encr_key,iv,to_pub_b,from_pub_b,signature = WDV #.split(sep)
to_pub = load_pubkey(to_pub_b)
from_pub = load_pubkey(from_pub_b)
# verify
authentic = verify_msg(val,sign,pub)
self.log('message is authentic for GET?',authentic)
val_encr_decode = base64.b64decode(val_encr)
authentic = verify_signature(signature,val_encr,from_pub)
self.log('message is authentic for GET?',authentic,signature,val_encr)
if not authentic:
self.log('inauthentic message!')
return None
return {}
# decrypt
self.log('val before decryption = ',val)
val = decrypt_msg(val, self.private_key_global)
self.log('val after decryption = ',val)
# self.log('val before decryption = ',val_encr)
if private_key is None: private_key=self.private_key_global
val = decrypt(val_encr, val_encr_key, iv, private_key)
# self.log('val after decryption = ',val)
time=float(time.decode())
# time=float(time.decode())
#val=val.decode()
return time,val,pub,sign
# return time,val,pub,sign
WDV={
'time':float(time_b.decode()),
'val':val,
'to':to_pub,
'from':from_pub,
'sign':signature
}
self.log('GOT WDV:',WDV)
return WDV
#,signature
async def set(self,key_or_keys,value_or_values):
@ -234,27 +278,21 @@ class Api(object):
async def get_json(self,key_or_keys):
res = await self.get(key_or_keys)
self.log('GET_JSON',res)
if res is None: return res
def jsonize(entry):
if not 'val' in entry: return entry
val=entry['val']
dat=json.loads(val.decode())
entry['val']=dat
return entry
if type(res)==list:
jsonl=[]
for time,val,pub,sign in res:
dat=json.loads(val.decode())
if type(dat)==dict:
dat['_time']=time
dat['_pub']=pub
dat['_sign']=sign
jsonl.append(dat)
jsonl=[jsonize(entry) for entry in res]
return jsonl
else:
#log('RES!!!',res)
time,val,pub,sign = res
dat = None if res is None else json.loads(val.decode())
if type(dat)==dict:
dat['_time']=time
dat['_pub']=pub
dat['_sign']=sign
self.log('RETURNING -->',dat)
return dat
entry = res
return jsonize(entry)
async def set_json(self,key,value):
@ -269,11 +307,12 @@ class Api(object):
## PERSONS
async def get_person(self,username):
return await self.get_json('/person/'+username)
res=await self.get_json('/person/'+username)
return res.get('val') if res and type(res)==dict else res
async def set_person(self,username,public_key):
pem_public_key = save_public_key(public_key,return_instead=True)
obj = {'name':username, 'public_key':pem_public_key.decode()}
async def set_person(self,username,pem_public_key):
# pem_public_key = save_public_key(public_key,return_instead=True)
obj = {'name':username, 'public_key':pem_public_key}
await self.set_json('/person/'+username,obj)
@ -286,16 +325,18 @@ class Api(object):
person = await self.get_person(name)
if person is not None: return {'error':'Username already exists'}
private_key,public_key = new_keys(password=passkey,save=False)
self._private_key = private_key
self._public_key = public_key
pem_private_key = save_private_key(private_key,password=passkey,return_instead=True)
pem_public_key = save_public_key(public_key,return_instead=True)
self._private_key = private_key = generate_rsa_key()
# self._public_key = public_key = self.private_key.public_key()
pem_private_key = serialize_privkey(self.private_key, password=passkey)# save_private_key(private_key,password=passkey,return_instead=True)
#pem_public_key = save_public_key(public_key,return_instead=True)
pem_public_key = serialize_pubkey(self.public_key)
await self.set_person(name,pem_public_key.decode())
self.app_storage.put('_keys',
private=str(pem_private_key.decode()),
public=str(pem_public_key.decode())) #(private_key,password=passkey)
await self.set_person(name,public_key)
private=pem_private_key.decode(),
public=pem_public_key.decode()) #(private_key,password=passkey)
return {'success':'Account created', 'username':name}
@ -304,8 +345,9 @@ class Api(object):
def load_private_key(self,password):
if not self.app_storage.exists('_keys'): return {'error':'No login keys present on this device'}
pem_private_key=self.app_storage.get('_keys').get('private')
self.log('my private key ====',pem_private_key)
try:
return {'success':load_private_key(pem_private_key.encode(),password)}
return {'success':load_privkey(pem_private_key,password)}
except ValueError as e:
self.log('!!',e)
return {'error':'Incorrect password'}
@ -334,7 +376,7 @@ class Api(object):
# verify keys
person_public_key_pem = person['public_key']
public_key = load_public_key(person_public_key_pem.encode())
public_key = load_pubkey(person_public_key_pem) #load_public_key(person_public_key_pem.encode())
self._public_key = real_public_key = private_key.public_key()
#log('PUBLIC',public_key.public_numbers())
@ -347,7 +389,10 @@ class Api(object):
@property
def public_key(self):
if not hasattr(self,'_public_key'):
self.app.root.change_screen('login')
if not hasattr(self,'_private_key'):
self.app.root.change_screen('login')
else:
self._public_key=self.private_key.public_key()
return self._public_key
@property

@ -5,6 +5,7 @@ from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.exceptions import InvalidSignature
import os
from .syfr import * #import syfr
key_dir = os.path.join(os.path.expanduser('~'),'.keys','komrade')
if not os.path.exists(key_dir): os.makedirs(key_dir)
@ -28,6 +29,8 @@ def new_keys(save=True,password=None):
return private_key,public_key
def save_private_key(private_key,fn=PATH_PRIVATE_KEY,password=None, return_instead=False):
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
@ -75,7 +78,7 @@ def load_public_key_from_file(fn=PATH_PUBLIC_KEY):
### DE/ENCRYPTING
def encrypt_msg(message, public_key):
return public_key.encrypt(
str(message).encode(),
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
@ -83,6 +86,33 @@ def encrypt_msg(message, public_key):
)
)
def encrypt_msg_symmetric(message):
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
backend = default_backend()
key = os.urandom(32)
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=backend)
encryptor = cipher.encryptor()
ct = encryptor.update(message) + encryptor.finalize()
return ct
def decrypt_msg_symmetric():
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
backend = default_backend()
key = os.urandom(32)
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=backend)
return ct
decryptor = cipher.decryptor()
decryptor.update(ct) + decryptor.finalize()
b'a secret message'
def decrypt_msg(encrypted, private_key):
return private_key.decrypt(
encrypted,
@ -157,3 +187,16 @@ def gen_global_keys(fn='.keys.global.json'):
store.put('_keys',private=str(pem_private_key.decode()),public=str(pem_public_key.decode())) #(private_key,password=passkey)
"""
import os
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
backend = default_backend()
key = os.urandom(32)
iv = os.urandom(16)
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), backend=backend)
encryptor = cipher.encryptor()
ct = encryptor.update(b"a secret message") + encryptor.finalize()
decryptor = cipher.decryptor()
decryptor.update(ct) + decryptor.finalize()
b'a secret message'"""

@ -0,0 +1,2 @@
from .syfr import *
from .loader import *

@ -0,0 +1,187 @@
import copy
import hashlib
import os
import requests
from .syfr import *
DATA_BLOCK_SIZE = 65536
def encrypt_file(file_path, rsa_priv, receiver_pubkey):
contents = open(file_path).read()
return blocks
def masters_from_children(children, rsa_priv, receiver_pubkey):
contents = []
masters_content = []
last_master_content = "MASTER:"
for child in children:
if len(last_master_content) + len(child['id']) > DATA_BLOCK_SIZE - bitsize_marker_length - 32:
masters_content.append(last_master_content)
last_master_content = "MASTER:"
last_master_content += "\n{0}".format(child['id'])
if len(last_master_content) > 8:
masters_content.append(last_master_content)
masters = [encrypt_block(long_pad(master_c), rsa_priv, receiver_pubkey) for master_c in masters_content]
return masters
def fetch_block(id):
url = "https://syfr.io/data/v0/{0}".format(id)
print("Fetching block {0} from {1}.".format(id, url))
return requests.get(url).content
def tree_decrypt_block(block, priv):
"""
In parsing a tree, determine if the block is a master block.
If so, return TRUE and a list of ids.
Else it is a leaf-content block, return FALSE and whole contents.
Whole block dict assumed to be passed in. If ID passed in, try to fetch.
"""
if isinstance(block, str):
block = fetch_block(id)
contents = long_unpad(full_decrypt_block(block, priv))
if contents[0:7] == "MASTER:":
return True, contents[7:].split('\n')[1:]
else:
return False, contents
def tree_decrypt(block, priv, cached_blocks=None):
"""
Decrypts and assembles an entire block tree.
If blocks are provided, checks here for cached blocks, otherwise it fetches
on Internet.
"""
content = ""
cont = True
blocks = [block]
if isinstance(cached_blocks, list):
cb = dict([(b['id'], b) for b in cached_blocks])
cached_blocks = cb
level = 0
while cont:
new_contents = []
print("Decrypting {0} blocks at level {1}.".format(len(blocks), level))
level += 1
for b in blocks:
if cached_blocks and isinstance(b, str) and b in cached_blocks:
new_contents.append(tree_decrypt_block(cached_blocks[b], priv))
else:
new_contents.append(tree_decrypt_block(b, priv))
blocks = []
for is_master, con in new_contents:
if not is_master:
content += con
else:
blocks += con
cont = len(blocks) > 0
return content
def assemble_block_tree(contents, rsa_priv, receiver_pubkey):
content_pieces = divide_contents(contents)
print("Encrypting {0} Leaf Blocks.".format(len(content_pieces)))
leaf_blocks = [encrypt_block(c, rsa_priv, receiver_pubkey) for c in content_pieces]
blocks = copy.copy(leaf_blocks)
n = len(blocks)
new_blocks = blocks
while n > 1:
new_blocks = masters_from_children(new_blocks, rsa_priv, receiver_pubkey)
print("APPENDING {0} Masters".format(len(new_blocks)))
blocks += new_blocks
n = len(new_blocks)
return blocks
def divide_contents(contents):
subcontents = []
n = 0
while n < len(contents):
m = min(len(contents), n + DATA_BLOCK_SIZE - bitsize_marker_length)
subcontent = contents[n:m]
subcontent = long_pad(subcontent, DATA_BLOCK_SIZE)
subcontents.append(subcontent)
n += DATA_BLOCK_SIZE - bitsize_marker_length
return subcontents
def unite_contents(content_blocks):
content = ""
for n, x in enumerate(content_blocks):
content += long_unpad(x)
return content
def compute_block_hash(block_dict):
s = ""
for k in sorted(block_dict.keys()):
if k in ['id']:
continue
s += "&{0}:{1}".format(k, block_dict[k])
return hashlib.sha256(s).hexdigest()
def decompose_metadata(metadata):
sender, receiver = [x.split(':')[-1] for x in metadata.split(';')]
return sender, receiver
def recompose_metadata(sender, receiver):
# TODO remove this
return "sender_pubkey:{0};receiver_pubkey:{1}".format(sender, receiver)
def encrypt_block(content, rsa_priv, receiver_pubkey):
assert len(content) == DATA_BLOCK_SIZE
aes_ciphertext, encry_aes_key, hmac, hmac_signature, iv, metadata = \
encrypt(content, rsa_priv, receiver_pubkey)
sender, receiver = decompose_metadata(metadata)
response = {
'aes_ciphertext': aes_ciphertext,
'encry_aes_key': encry_aes_key,
'hmac': hmac,
'hmac_signature': hmac_signature,
'iv': iv,
'sender_public_key': sender,
'receiver_public_key': receiver
}
response['id'] = compute_block_hash(response)
return response
def full_decrypt_block(response, receiver_privkey):
assert compute_block_hash(response) == response['id']
return decrypt(
response['aes_ciphertext'],
response['encry_aes_key'],
response['hmac'],
response['hmac_signature'],
receiver_privkey,
response['iv'],
recompose_metadata(
response['sender_public_key'], response['receiver_public_key'])
)
def aes_decrypt_block(response, aes_key):
return

@ -0,0 +1,220 @@
import base64
import hashlib
import os
import cryptography
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes, hmac
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.padding import PKCS7
from . import loader
bitsize_marker_length = 10
def generate_rsa_key(complexity=4096):
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=complexity,
backend=default_backend()
)
return private_key
def serialize_privkey(key, password=False):
return base64.b64encode(key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption() if not password else serialization.BestAvailableEncryption(password.encode())
))
def serialize_pubkey(pubkey):
return base64.b64encode(pubkey.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
def load_pubkey(pubkey_text):
return serialization.load_der_public_key(
base64.b64decode(pubkey_text),
backend=default_backend())
def load_privkey(privkey_text,password=None):
return serialization.load_der_private_key(
base64.b64decode(privkey_text),
password=password.encode() if password else None,
backend=default_backend()
)
def write_key(key, file_path='mykey.pem'):
with open(file_path, 'w+') as fh:
fh.write(key)
def sign(message, private_key):
signature = private_key.sign(
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH),
hashes.SHA256()
)
# signer.update(message)
return base64.b64encode(signature) #signer.finalize())
def verify_signature(signature, message, pubkey):
try:
verified = pubkey.verify(
base64.b64decode(signature),
message,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except cryptography.exceptions.InvalidSignature as e:
print('!?',e)
return False
return None
# verifier.update(message)
# try:
# verifier.verify()
# return True
# except cryptography.exceptions.InvalidSignature:
# print("Invalid Signature")
# return False
def rsa_encrypt(message, pubkey):
ciphertext = pubkey.encrypt(
message,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()), # SHA1 is suspect
algorithm=hashes.SHA1(),
label=None
)
)
return base64.b64encode(ciphertext)
def rsa_decrypt(ciphertext, key):
plaintext = key.decrypt(
base64.b64decode(ciphertext),
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA1()),
algorithm=hashes.SHA1(),
label=None
)
)
return plaintext
def create_aes_key(complexity=32):
return base64.b64encode(os.urandom(complexity))
def create_hmac(key, message_list):
h = hmac.HMAC(key, hashes.SHA256(), backend=default_backend())
message = "?".join([str(x) for x in message_list])
h.update(message)
return base64.b64encode(h.finalize())
def pad(message, blocksize=128):
padder = PKCS7(blocksize).padder()
padded_data = padder.update(message)
padded_data += padder.finalize()
return padded_data
def long_pad(message, goal_length=loader.DATA_BLOCK_SIZE):
assert len(message) + bitsize_marker_length <= goal_length
c = 0
for _ in range(goal_length - len(message) - bitsize_marker_length):
message += "0"
c += 1
d = str(c).zfill(bitsize_marker_length)
message += d
return message
def unpad(padded_data, blocksize=128):
unpadder = PKCS7(blocksize).unpadder()
data = unpadder.update(padded_data)
return data + unpadder.finalize()
def long_unpad(message):
assert len(message) <= 10**bitsize_marker_length
padding_size = int(message[-bitsize_marker_length:])
return message[0:-bitsize_marker_length-padding_size]
def aes_encrypt(message, key):
iv = os.urandom(16)
cipher = Cipher(
algorithms.AES(base64.b64decode(key)),
modes.CBC(iv),
backend=default_backend()
)
message = pad(message)
encryptor = cipher.encryptor()
ciphertext = encryptor.update(message) + encryptor.finalize()
return base64.b64encode(ciphertext), base64.b64encode(iv)
def aes_decrypt(ciphertext, key, iv):
cipher = Cipher(
algorithms.AES(base64.b64decode(key)),
modes.CBC(base64.b64decode(iv)),
backend=default_backend()
)
decryptor = cipher.decryptor()
padded = decryptor.update(base64.b64decode(ciphertext)) + decryptor.finalize()
return unpad(padded)
def encrypt(message, rsa_priv, recipient_rsa_pub):
aes_key = create_aes_key()
aes_ciphertext, iv = aes_encrypt(message, aes_key)
# hmac_key = hashlib.sha256(aes_key).hexdigest()
sender_pubkey = serialize_pubkey(rsa_priv.public_key())
# recipient_rsa_pub = load_pubkey(receiver_pubkey)
#recipient_rsa_pub = receiver_pubkey
# metadata = loader.recompose_metadata(sender_pubkey, receiver_pubkey)
encry_aes_key = rsa_encrypt(aes_key, recipient_rsa_pub)
# hmac_list = [metadata, iv, aes_ciphertext, encry_aes_key]
# hmac = create_hmac(hmac_key, hmac_list)
# hmac_signature = sign(hmac, rsa_priv)
# return aes_ciphertext, encry_aes_key, hmac, hmac_signature, iv, metadata
return aes_ciphertext, encry_aes_key, iv #, sign
def decrypt(aes_ciphertext, encry_aes_key, iv, rsa_priv): #, hmac, hmac_signature, rsa_priv, iv, metadata):
aes_key = rsa_decrypt(encry_aes_key, rsa_priv)
# hmac_key = hashlib.sha256(aes_key).hexdigest()
# hmac_list = [metadata, iv, aes_ciphertext, encry_aes_key]
# independent_hmac = create_hmac(hmac_key, hmac_list)
# assert hmac == independent_hmac
# sender_pub, receiver_pub = [x.split(':')[-1] for x in metadata.split(';')]
# sender_pub = load_pubkey(sender_pub)
#assert verify_signature(hmac_signature, hmac, sender_pub)
plaintext = aes_decrypt(aes_ciphertext, aes_key, iv)
return plaintext

@ -0,0 +1,52 @@
import base64
import os
from . import syfr as crypto
def test_rsa_encrypt_and_decrypt():
priv = crypto.generate_rsa_key(complexity=512)
message = "Attack at Calais"
ciphertext = crypto.rsa_encrypt(message, priv.public_key())
assert crypto.rsa_decrypt(ciphertext, priv) == message
def test_aes_encrypt_decrypt():
priv = crypto.create_aes_key()
message = "Sell Watermelons"
ciphertext, iv = crypto.aes_encrypt(message, priv)
assert crypto.aes_decrypt(ciphertext, priv, iv) == message
def test_full_encrypt():
priv1 = crypto.generate_rsa_key()
priv2 = crypto.generate_rsa_key()
target_pubkey = crypto.serialize_pubkey(priv2.public_key())
message = "Santa is not real."
aes_ciphertext, encry_aes_key, hmac, hmac_signature, iv, metadata = \
crypto.encrypt(message, priv1, target_pubkey)
aes_key = crypto.rsa_decrypt(encry_aes_key, priv2)
assert crypto.aes_decrypt(aes_ciphertext, aes_key, iv) == message
assert message == \
crypto.decrypt(
aes_ciphertext, encry_aes_key, hmac, hmac_signature,
priv2, iv, metadata
)
def test_sign():
priv = crypto.generate_rsa_key(complexity=512)
message = "Secret wish list"
sig = crypto.sign(message, priv)
assert crypto.verify_signature(sig, message, priv.public_key())
def test_long_pad():
complexity = 10**3 # won't create exactly this length
contents = base64.b64encode(os.urandom(complexity))
padded = crypto.long_pad(contents, 3*complexity)
assert crypto.long_unpad(padded) == contents

@ -0,0 +1,44 @@
import base64
import hashlib
import math
import os
from . import syfr as crypto
from . import loader
def random_content(pseudolength):
return base64.b64encode(os.urandom(pseudolength))
def test_divide_unite_contents():
# random contents
complexity = loader.DATA_BLOCK_SIZE * 100 # won't create exactly this length
contents = random_content(complexity)
size = len(contents)
subcontents = loader.divide_contents(contents)
assert len(subcontents) == math.ceil(float(size) / float(loader.DATA_BLOCK_SIZE))
assert all([len(x) == loader.DATA_BLOCK_SIZE for x in subcontents])
united = loader.unite_contents(subcontents)
assert hashlib.sha256(united).hexdigest() == hashlib.sha256(contents).hexdigest()
def test_encrypt_block():
content = crypto.long_pad(random_content(1000))
rsa_priv = crypto.generate_rsa_key()
priv2 = crypto.generate_rsa_key()
receiver_pubkey = crypto.serialize_pubkey(priv2.public_key())
response = loader.encrypt_block(content, rsa_priv, receiver_pubkey)
assert loader.full_decrypt_block(response, priv2) == content
def test_assemble_block_tree():
contents = random_content(10**6)
rsa_priv = crypto.generate_rsa_key()
priv2 = crypto.generate_rsa_key()
receiver_pubkey = crypto.serialize_pubkey(priv2.public_key())
blocks = loader.assemble_block_tree(contents, rsa_priv, receiver_pubkey)
derived_contents = loader.tree_decrypt(blocks[-1], priv2, cached_blocks=blocks)
assert derived_contents == contents
Loading…
Cancel
Save