You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Comrad/komrade/api/api.py

519 lines
15 KiB
Python

# ### Constants
import os,sys; sys.path.append(os.path.abspath(os.path.join(os.path.abspath(os.path.join(os.path.dirname(__file__),'..')),'..')))
from komrade import *
from komrade.constants import *
LAST_N_IN_INBOX = 10
### Imports
import os,time,sys,logging
from pathlib import Path
import asyncio,time,sys
from base64 import b64encode,b64decode
sys.path.append(os.path.dirname(__file__))
import logging
import asyncio
import shelve
from collections import OrderedDict
import pickle,os
from threading import Thread
from pathlib import Path
# local imports
from komrade import *
CACHE_DIR = os.path.join(os.path.expanduser('~'),'.komrade','.cache')
if not os.path.exists(CACHE_DIR): os.makedirs(CACHE_DIR)
MEMCACHE_FNFN=os.path.join(CACHE_DIR,'.memory')
### Logging
def logger():
import logging
handler = logging.StreamHandler()
formatter = logging.Formatter('[%(asctime)s]\n%(message)s\n')
handler.setFormatter(formatter)
logger = logging.getLogger('komrade')
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
return logger
LOG = None
def log(*x):
global LOG
if not LOG: LOG=logger().debug
tolog=' '.join(str(_) for _ in x)
LOG(tolog)
## func
def bytes_from_file(filename,chunksize=8192):
with open(filename, 'rb') as f:
while True:
piece = f.read(chunksize)
if not piece:
break
yield piece
def get_random_id():
import uuid
return uuid.uuid4().hex
def get_random_binary_id():
import base64
idstr = get_random_id()
return base64.b64encode(idstr.encode())
### Headless API
def boot_lonely_selfless_node(port=8467):
async def go():
api = Api(log=log, port=port)
await api.connect_forever()
asyncio.run(go())
class NetworkStillConnectingError(OSError): pass
PORT_LISTEN = 5111
async def _getdb(self=None,port=PORT_LISTEN):
return None
from kademlia.network import Server
if self:
self.log('starting server on port %s..' % port)
import os
if self: self.log(os.getcwd())
node = Server(log=self.log if self else None) #fn='../p2p/data.db',log=(self.log if self else print)))
try:
if self: self.log('listening on port %s...' % format(port))
await node.listen(port)
except OSError:
raise NetworkStillConnectingError('Still connecting...')
#await asyncio.sleep(3)
if self: self.log('bootstrapping server..')
await node.bootstrap(NODES_PRIME)
if self: node.log = self.log
self.log('NODE:',node)
# if self and self.app:
# self.app.close_dialog()
return node
def logg(*x):
print(*x)
class Api(object):
def __init__(self,log=None,port=PORT_LISTEN):
self.log = log if log is not None else logg
self.port=port
# load file-based keys
self.load_keys()
async def connect_forever(self,save_every=60):
try:
i = 0
self._node = await self.connect()
while True:
if not i%90: self.log(f'Node status (tick {i}): {self._node}')
if i and not i%save_every: await self.flush()
i += 1
await asyncio.sleep(1)
# asyncio.sleep(0)
except (asyncio.CancelledError,KeyboardInterrupt) as e:
self.log('P2P node cancelled', e)
await self.flush()
finally:
# when canceled, print that it finished
self.log('P2P node shutting down')
pass
@property
async def node(self):
if not hasattr(self,'_node'):
await self.connect()
self._node.log=self.log
return self._node
async def connect(self):
port=self.port
# if self.app: self.app.open_dialog('hello?')
self.log('connecting on port %s...' % port)
node = await _getdb(self,port)
self.log(f'connect() has node {node}')
self._node = node
return node
def get_tor_python_session(self):
# from torpy.http.requests import TorRequests
# with TorRequests() as tor_requests:
# with tor_requests.get_session() as s:
# # return s
# from torpy.http.requests import tor_requests_session
# with tor_requests_session() as s: # returns requests.Session() object
# return s
pass
def get_tor_proxy_session(self):
session = requests.session()
# Tor uses the 9050 port as the default socks port
session.proxies = {'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'}
return session
def get_async_tor_proxy_session(self):
from requests_futures.sessions import FuturesSession
session = FuturesSession()
# Tor uses the 9050 port as the default socks port
session.proxies = {'http': 'socks5://127.0.0.1:9050',
'https': 'socks5://127.0.0.1:9050'}
return session
def tor_request(self,url,method='get',data=None):
stopfixthis
with self.get_tor_proxy_session() as s:
if method=='get':
return s.get(url)
elif method=='post':
self.log('data',data)
return s.post(url,data=data)
def request(self,Q,**kwargs):
self.log('request() Q:',Q)
res = self.tor_request(Q,**kwargs)
self.log('reqeust() <-',res)
return res
#@property
def load_keys(self):
# get key names
KEY_PATH_PUB ='.'
KEY_PATH_PRIV ='.'
pub_key_names = [x.split('.')[1] for x in os.listdir(KEY_PATH_PUB) if x.count('.')==2 and x.endswith('.loc')]
priv_key_names = [x.split('.')[1] for x in os.listdir(KEY_PATH_PRIV) if x.count('.')==2 and x.endswith('.key')]
key_names = set(pub_key_names)|set(priv_key_names)
self.log('get_keys() found public key names:',pub_key_names)
self.log('get_keys() found private key names:',priv_key_names)
# load and find all local
self._keys = {}
for key_name in key_names:
self.log('key_name =',key_name)
self._keys[key_name] = Komrade(key_name,api=self,create_if_missing=False)
# break into types
self.accounts = [self._keys[name] for name in priv_key_names]
self.contacts = [self._keys[name] for name in pub_key_names]
self.log('get_keys() loaded accounts:',self.accounts)
self.log('get_keys() loaded contacts:',self.contacts)
@property
def keys(self):
if not hasattr(self,'_keys'): self.load_keys()
return self._keys
#async
def personate(self,persona_name,create_if_missing=True):
komrade = Komrade(persona_name,api=self,create_if_missing=create_if_missing)
res = persona.boot()
self.log('personate() res =',res)
return persona
# komrade = self.keys[persona_name] if persona_name in self.keys else None
# if komrade is None and create_if_missing:
# self.keys[persona_name] = komrade = Komrade(persona_name, api=self, create_if_missing=create_if_missing)
# res = await persona.boot()
# self.log('BOOT RESULT:',res)
# return persona
async def upload(self,filename,file_id=None, uri='/file/',uri_part='/part/'):
import sys
if not file_id: file_id = get_random_id()
part_ids = []
part_keys = []
parts=[]
PARTS=[]
buffer_size=100
for part in bytes_from_file(filename,chunksize=1024*2):
part_id = get_random_id()
part_ids.append(part_id)
part_key='/part/'+part_id
part_keys.append(part_key)
parts.append(part)
# PARTS.append(part)
# self.log('part!:',sys.getsizeof(part))
#self.set(part_key,part)
if len(parts)>=buffer_size:
# self.log('setting...')
await self.set(part_keys,parts)
part_keys=[]
PARTS+=parts
parts=[]
# set all parts
#self.set(part_keys,PARTS)
# self.log('# parts:',len(PARTS))
if parts and part_keys:
await self.set(part_keys, parts)
# how many parts?
# self.log('# pieces!',len(part_ids))
file_store = {'ext':os.path.splitext(filename)[-1][1:], 'parts':part_ids}
# self.log('FILE STORE??',file_store)
await self.set_json(uri+file_id,file_store)
# file_store['data'].seek(0)
file_store['id']=file_id
return file_store
async def download(self,file_id):
self.log('file_id =',file_id)
file_store = await self.get_json_val('/file/'+file_id)
self.log('file_store =',file_store)
if file_store is None: return
self.log('file_store!?',file_store)
keys = ['/part/'+x for x in file_store['parts']]
#time,pieces,pub,sign = await self.get_json_val(keys)
pieces = await self.get_json_val(keys)
self.log('pieces = ',pieces)
file_store['parts_data']=pieces
return file_store
async def flush(self):
#self.log('saving back to db file...')
node = await self.node
node.storage.dump()
# self.log('DONE saving back to db file...')
async def get_posts(self,uri='/inbox/world'):
# get IDs
post_ids = await self.get_post_ids(uri)
# get posts
posts = [self.get_post(post_id) for post_id in post_ids]
return await asyncio.gather(*posts)
async def read_inbox(self,uri_inbox=None):
if uri_inbox is None: uri_inbox = P2P_PREFIX_INBOX+self.name.encode()
node = await self.node
inbox_ids = await node.get(uri_inbox)
if inbox_ids is not None:
inbox_ids = inbox_ids.split(BSEP)
self.log('found inbox IDs:',inbox_ids)
msgs_toread = [self.read_msg(msg_id) for msg_id in inbox_ids]
msgs = await asyncio.gather(*msgs_toread)
self.log('read_inbox() msgs = ',msgs)
return msgs
return []
async def read_outbox(self,uri_outbox=None):
if uri_outbox is None: uri_outbox = P2P_PREFIX_OUTBOX+self.name.encode()
return await self.read_inbox(uri_outbox)
async def read_msg(self,msg_id):
self.log(f'Persona.read_msg({msg_id}) ?')
uri_msg=P2P_PREFIX_POST+msg_id
node = await self.node
komrade = await self.komrade
res = await node.get(uri_msg)
self.log('res = ',res)
if res is not None:
double_encrypted_payload_b64 = res
single_encrypted_payload = self.decrypt(double_encrypted_payload_b64, komrade.pubkey_b64)
self.log('GOT ENRYPTED PAYLOAD:',single_encrypted_payload)
signed_encrypted_payload_b64,from_pubkey_b64,name_b64,time_b64 = single_encrypted_payload.split(BSEP)
self.log('signed_encrypted_payload =',signed_encrypted_payload_b64)
self.log('from_pubkey_b64 =',from_pubkey_b64)
self.log('time_b64 =',time_b64)
from_name = b64decode(name_b64).decode()
self.log('from_name =',from_name)
timestamp = b64decode(time_b64).decode()
tmpP = Komrade(from_name)
await tmpP.boot()
from_pubkey_b64_accto_name = tmpP.pubkey_b64
assert from_pubkey_b64==from_pubkey_b64_accto_name
encrypted_payload_b64 = self.verify(signed_encrypted_payload_b64, from_pubkey_b64)
self.log('encrypted_payload_b64 =',encrypted_payload_b64)
payload = self.decrypt(encrypted_payload_b64, from_pubkey_b64)
self.log('payload =',payload)
return {
'success':True,
'content':payload,
'from_name':from_name,
'from_pubkey_b64':from_pubkey_b64,
'timestamp':timestamp
}
return {'error':'Unknown'}
async def refresh_inboxes(self):
uris_to_get=[]
for komrade in self.accounts:
inbox = await persona.load_inbox(decrypt_msg_uri=True, last=LAST_N_IN_INBOX)
for decr_msg_uri in inbox:
uris_to_get.append(self.get_msg(decr_msg_uri))
# uris_to_get+=inbox
res = await asyncio.gather(*uris_to_get)
for decr_msg_uri,encr_msg in res:
self.memcache[decr_msg_uri]=encr_msg
self.memcache_save()
@property
def memcache(self):
if not hasattr(self,'_memcache'):
self._memcache = OrderedDict()
if os.path.exists(MEMCACHE_FNFN):
import pickle
try:
self._memcache = pickle.load(open(MEMCACHE_FNFN,'rb'))
except EOFError:
pass
return self._memcache
def memcache_save(self):
import pickle
with open(MEMCACHE_FNFN,'wb') as of:
pickle.dump(self.memcache, of)
self.log('>> saved:',MEMCACHE_FNFN)
async def get_msg(self,decr_msg_uri):
self.log('get_msg()',decr_msg_uri)
rval=self.memcache.get(decr_msg_uri)
self.log('got <--',rval)
if rval is not None:
self.log('in memcache')
encr_msg = rval
else:
self.log('>> downloading',decr_msg_uri,'...')
node = await self.node
encr_msg = await node.get(decr_msg_uri)
self.log('downloaded:',encr_msg)
return (decr_msg_uri,encr_msg)
#self.memcache.
async def see(self,decr_msg_id):
res=await self.get(decr_msg_id)
self.log('see() saw',res)
return decr_msg_id
async def test1():
api = Api()
marx=await api.personate('marx')
elon=await api.personate('elon')
res = await marx.send(b'secret',to=elon)
print(marx,elon,res)
async def test():
api = Api()
# message?
marx=await api.personate('marx')
elon=await api.personate('elon')
res = await marx.send(b'secret',to=elon)
res = await elon.send(b'secret back',to=marx)
# print(marx,elon,res)
# get overall inbox
meta_inbox = await api.refresh_inboxes()
api.log('meta_inbox',meta_inbox)
keys = api.memcache.keys()
api.log('ALL KEYS =',keys)
for key in keys:
val = api.memcache.get(key)
api.log(key,'-->',val)
# stop
#async
async def test_keyserver():
api = Api()
marx = api.personate('marx')
elon = api.personate('elon')
zuck = api.personate('zuck')
print('marx',marx.pubkey_b64)
print('elon',elon.pubkey_b64)
print('zuck',zuck.pubkey_b64)
#marx = await api.personate(marx)
#res = await api.get_externally_signed_pubkey('marx')
#res = await api.get_externally_signed_pubkey('marx')
#return res
await elon.send(b'oh no',to=marx)
if __name__=='__main__':
asyncio.run(test_keyserver())