macdev
quadrismegistus 4 years ago
parent e9b86c4846
commit 868976ce66

@ -47,12 +47,13 @@ def start_selfless_thread():
return asyncio.run(_go())
async def _getdb(self=None,port=PORT_LISTEN):
from kademlia.network import Server
if self: self.log('starting server..')
import os
if self: self.log(os.getcwd())
node = KadServer(storage=HalfForgetfulStorage()) #fn='../p2p/data.db',log=(self.log if self else print)))
node = Server() #fn='../p2p/data.db',log=(self.log if self else print)))
if self: self.log('listening..')
await node.listen(port)

@ -1,255 +1,255 @@
###
# Kademlia patches
###
# ###
# # Kademlia patches
# ###
from kademlia.storage import *
from kademlia.network import *
from kademlia.routing import RoutingTable
from rpcudp.protocol import RPCProtocol
import os
# from kademlia.storage import *
# from kademlia.network import *
# from kademlia.routing import RoutingTable
# from rpcudp.protocol import RPCProtocol
# import os
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
log = logging.getLogger('kademlia')
log.addHandler(handler)
log.setLevel(logging.DEBUG)
# handler = logging.StreamHandler()
# formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
# handler.setFormatter(formatter)
# log = logging.getLogger('kademlia')
# log.addHandler(handler)
# log.setLevel(logging.DEBUG)
PROXY_ADDR = ('0.0.0.0',8368)
# PROXY_ADDR = ('0.0.0.0',8368)
"""UDP proxy server."""
# """UDP proxy server."""
import asyncio
# import asyncio
class ProxyDatagramProtocol(asyncio.DatagramProtocol):
# class ProxyDatagramProtocol(asyncio.DatagramProtocol):
def __init__(self, remote_address=PROXY_ADDR):
self.remote_address = remote_address
self.remotes_d = {}
super().__init__()
# def __init__(self, remote_address=PROXY_ADDR):
# self.remote_address = remote_address
# self.remotes_d = {}
# super().__init__()
def connection_made(self, transport):
self.transport = transport
# def connection_made(self, transport):
# self.transport = transport
def datagram_received(self, data, addr):
if addr in self.remotes_d:
self.remotes_d[addr].transport.sendto(data)
return
loop = asyncio.get_event_loop()
self.remotes_d[addr] = RemoteDatagramProtocol(self, addr, data)
coro = loop.create_datagram_endpoint(
lambda: self.remotes_d[addr], remote_addr=self.remote_address)
asyncio.ensure_future(coro)
# def datagram_received(self, data, addr):
# if addr in self.remotes_d:
# self.remotes_d[addr].transport.sendto(data)
# return
# loop = asyncio.get_event_loop()
# self.remotes_d[addr] = RemoteDatagramProtocol(self, addr, data)
# coro = loop.create_datagram_endpoint(
# lambda: self.remotes_d[addr], remote_addr=self.remote_address)
# asyncio.ensure_future(coro)
class RemoteDatagramProtocol(asyncio.DatagramProtocol):
# class RemoteDatagramProtocol(asyncio.DatagramProtocol):
def __init__(self, proxy, addr, data):
print('RemoteDP got:',proxy,addr,data)
self.proxy = proxy
self.addr = addr
self.data = data
super().__init__()
def connection_made(self, transport):
self.transport = transport
self.transport.sendto(self.data)
# def __init__(self, proxy, addr, data):
# print('RemoteDP got:',proxy,addr,data)
# self.proxy = proxy
# self.addr = addr
# self.data = data
# super().__init__()
# def connection_made(self, transport):
# self.transport = transport
# self.transport.sendto(self.data)
def datagram_received(self, data, _):
self.proxy.transport.sendto(data, self.addr)
# def datagram_received(self, data, _):
# self.proxy.transport.sendto(data, self.addr)
def connection_lost(self, exc):
self.proxy.remotes.pop(self.attr)
# def connection_lost(self, exc):
# self.proxy.remotes.pop(self.attr)
async def start_datagram_proxy(protocol_class, bind, port, remote_host, remote_port):
loop = asyncio.get_event_loop()
protocol = protocol_class((remote_host, remote_port))
return await loop.create_datagram_endpoint(
lambda: protocol, local_addr=(bind, port))
def main(bind='0.0.0.0', port=8888,
remote_host='0.0.0.0', remote_port=9999):
loop = asyncio.get_event_loop()
print("Starting datagram proxy...")
coro = start_datagram_proxy(bind, port, remote_host, remote_port)
transport, _ = loop.run_until_complete(coro)
print("Datagram proxy is running...")
try:
loop.run_forever()
except KeyboardInterrupt:
pass
print("Closing transport...")
transport.close()
loop.close()
log = logging.getLogger('kademlia') # pylint: disable=invalid-name
class KadProtocol(KademliaProtocol):
# remote_address = PROXY_ADDR
# REMOTES_D={}
# def __init__(self, source_node, storage, ksize):
# RPCProtocol.__init__(self,wait_timeout=5)
# self.router = RoutingTable(self, ksize, source_node)
# self.storage = storage
# self.source_node = source_node
# def datagram_received(self, data, addr):
# #if not hasattr(self,'remotes_d'): self.remotes_d={}
# # print('\n\n!?!?!?',self.REMOTES_D, type(self.REMOTES_D))
# # if addr in self.REMOTES_D:
# # self.REMOTES_D[addr].transport.sendto(data)
# # return
# loop = asyncio.get_event_loop()
# # self.REMOTES_D[addr] = RemoteDatagramProtocol(self, addr, data)
# RDP = RemoteDatagramProtocol(self, addr, data)
# coro = loop.create_datagram_endpoint(lambda: RDP, remote_addr=self.remote_address)
# asyncio.ensure_future(coro)
def handle_call_response(self, result, node):
"""
If we get a response, add the node to the routing table. If
we get no response, make sure it's removed from the routing table.
"""
if not result[0]:
log.warning("no response from %s, ?removing from router", node)
self.router.remove_contact(node)
return result
log.info("got successful response from %s", node)
self.welcome_if_new(node)
return result
class KadServer(Server):
protocol_class = KademliaProtocol # KadProtocol #KademliaProtocol
# def __init__(self, *x, **y):
# self.storage = y['storage']
# # raise Exception(str(self.storage))
# super().__init__(*x,**y)
# log.info(f'Storage has {len(self.storage.data)} keys')
def __repr__(self):
repr = f"""
KadServer()
ksize = {self.ksize}
alpha = {self.alpha}
storage = {len(self.storage.data)} keys
node = {self.node}
transport = {self.transport}
protocol = {self.protocol}
refresh_loop = {self.refresh_loop}
save_state_loop = {self.save_state_loop}
bootstrappable_neighbors = {self.bootstrappable_neighbors()}
"""
return repr
# async def get(self, key):
# """
# Get a key if the network has it.
# Returns:
# :class:`None` if not found, the value otherwise.
# """
# log.info("Looking up key %s", key)
# dkey = digest(key)
# # if this node has it, return it
# if self.storage.get(dkey) is not None:
# log.info('I already have this')
# return self.storage.get(dkey)
# node = Node(dkey)
# nearest = self.protocol.router.find_neighbors(node)
# log.info(f'My nearest nodes are: {nearest}')
# if not nearest:
# log.warning("There are no known neighbors to get key %s", key)
# return None
# spider = ValueSpiderCrawl(self.protocol, node, nearest,
# self.ksize, self.alpha)
# found = await spider.find()
# log.info(f'spider done crawling: {spider}')
# log.info(f'spider found value: {found}')
# self.storage[dkey]=found
# return found
# async def set(self, key, value):
# """
# Set the given string key to the given value in the network.
# """
# if not check_dht_value_type(value):
# raise TypeError(
# "Value must be of type int, float, bool, str, or bytes"
# )
# log.info("setting '%s' = '%s' on network", key, value)
# dkey = digest(key)
# print('STORE??',type(self.storage),self.storage)
# self.storage.set(dkey,value)
# return await self.set_digest(dkey, value)
# async def set_digest(self, dkey, value):
# """
# Set the given SHA1 digest key (bytes) to the given value in the
# network.
# """
# node = Node(dkey)
# nearest = self.protocol.router.find_neighbors(node)
# if not nearest:
# log.warning("There are no known neighbors to set key %s",
# dkey.hex())
# #return False
# spider = NodeSpiderCrawl(self.protocol, node, nearest,
# self.ksize, self.alpha)
# nodes = await spider.find()
# log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes)))
# # if this node is close too, then store here as well
# neighbs=[n.distance_to(node) for n in nodes]
# log.info('setting on %s neighbors', neighbs)
# biggest = max(neighbs) if neighbs else 0
# log.info('my distance to node is %s, biggest distance is %s',
# self.node.distance_to(node),biggest)
# if self.node.distance_to(node) < biggest:
# self.storage.set(dkey,value)
# async def start_datagram_proxy(protocol_class, bind, port, remote_host, remote_port):
# loop = asyncio.get_event_loop()
# protocol = protocol_class((remote_host, remote_port))
# return await loop.create_datagram_endpoint(
# lambda: protocol, local_addr=(bind, port))
# def main(bind='0.0.0.0', port=8888,
# remote_host='0.0.0.0', remote_port=9999):
# loop = asyncio.get_event_loop()
# print("Starting datagram proxy...")
# coro = start_datagram_proxy(bind, port, remote_host, remote_port)
# transport, _ = loop.run_until_complete(coro)
# print("Datagram proxy is running...")
# try:
# loop.run_forever()
# except KeyboardInterrupt:
# pass
# print("Closing transport...")
# transport.close()
# loop.close()
# log = logging.getLogger('kademlia') # pylint: disable=invalid-name
# class KadProtocol(KademliaProtocol):
# # remote_address = PROXY_ADDR
# # REMOTES_D={}
# # def __init__(self, source_node, storage, ksize):
# # RPCProtocol.__init__(self,wait_timeout=5)
# # self.router = RoutingTable(self, ksize, source_node)
# # self.storage = storage
# # self.source_node = source_node
# # def datagram_received(self, data, addr):
# # #if not hasattr(self,'remotes_d'): self.remotes_d={}
# # # print('\n\n!?!?!?',self.REMOTES_D, type(self.REMOTES_D))
# # # if addr in self.REMOTES_D:
# # # self.REMOTES_D[addr].transport.sendto(data)
# # # return
# # loop = asyncio.get_event_loop()
# # # self.REMOTES_D[addr] = RemoteDatagramProtocol(self, addr, data)
# # RDP = RemoteDatagramProtocol(self, addr, data)
# # coro = loop.create_datagram_endpoint(lambda: RDP, remote_addr=self.remote_address)
# # asyncio.ensure_future(coro)
# def handle_call_response(self, result, node):
# """
# If we get a response, add the node to the routing table. If
# we get no response, make sure it's removed from the routing table.
# """
# if not result[0]:
# log.warning("no response from %s, ?removing from router", node)
# self.router.remove_contact(node)
# return result
# log.info("got successful response from %s", node)
# self.welcome_if_new(node)
# return result
# class KadServer(Server):
# protocol_class = KademliaProtocol # KadProtocol #KademliaProtocol
# # def __init__(self, *x, **y):
# # self.storage = y['storage']
# # # raise Exception(str(self.storage))
# # super().__init__(*x,**y)
# # log.info(f'Storage has {len(self.storage.data)} keys')
# def __repr__(self):
# repr = f"""
# KadServer()
# ksize = {self.ksize}
# alpha = {self.alpha}
# storage = {len(self.storage.data)} keys
# node = {self.node}
# transport = {self.transport}
# protocol = {self.protocol}
# refresh_loop = {self.refresh_loop}
# save_state_loop = {self.save_state_loop}
# bootstrappable_neighbors = {self.bootstrappable_neighbors()}
# """
# return repr
# # async def get(self, key):
# # """
# # Get a key if the network has it.
# # Returns:
# # :class:`None` if not found, the value otherwise.
# # """
# # log.info("Looking up key %s", key)
# # dkey = digest(key)
# # # if this node has it, return it
# # if self.storage.get(dkey) is not None:
# # log.info('I already have this')
# # return self.storage.get(dkey)
# # node = Node(dkey)
# # nearest = self.protocol.router.find_neighbors(node)
# # log.info(f'My nearest nodes are: {nearest}')
# # if not nearest:
# # log.warning("There are no known neighbors to get key %s", key)
# # return None
# # spider = ValueSpiderCrawl(self.protocol, node, nearest,
# # self.ksize, self.alpha)
# # found = await spider.find()
# # log.info(f'spider done crawling: {spider}')
# # log.info(f'spider found value: {found}')
# # self.storage[dkey]=found
# # return found
# # async def set(self, key, value):
# # """
# # Set the given string key to the given value in the network.
# # """
# # if not check_dht_value_type(value):
# # raise TypeError(
# # "Value must be of type int, float, bool, str, or bytes"
# # )
# # log.info("setting '%s' = '%s' on network", key, value)
# # dkey = digest(key)
# # print('STORE??',type(self.storage),self.storage)
# # self.storage.set(dkey,value)
# # return await self.set_digest(dkey, value)
# # async def set_digest(self, dkey, value):
# # """
# # Set the given SHA1 digest key (bytes) to the given value in the
# # network.
# # """
# # node = Node(dkey)
# # nearest = self.protocol.router.find_neighbors(node)
# # if not nearest:
# # log.warning("There are no known neighbors to set key %s",
# # dkey.hex())
# # #return False
# # spider = NodeSpiderCrawl(self.protocol, node, nearest,
# # self.ksize, self.alpha)
# # nodes = await spider.find()
# # log.info("setting '%s' on %s", dkey.hex(), list(map(str, nodes)))
# # # if this node is close too, then store here as well
# # neighbs=[n.distance_to(node) for n in nodes]
# # log.info('setting on %s neighbors', neighbs)
# # biggest = max(neighbs) if neighbs else 0
# # log.info('my distance to node is %s, biggest distance is %s',
# # self.node.distance_to(node),biggest)
# # if self.node.distance_to(node) < biggest:
# # self.storage.set(dkey,value)
# log.info('here are the nodes %s' % nodes)
# results = [self.protocol.call_store(n, dkey, value) for n in nodes]
# log.info('here are the results')
# # log.info('here are the nodes %s' % nodes)
# # results = [self.protocol.call_store(n, dkey, value) for n in nodes]
# # log.info('here are the results')
# # return true only if at least one store call succeeded
# return any(await asyncio.gather(*results))
# # # return true only if at least one store call succeeded
# # return any(await asyncio.gather(*results))
#### NEVERMIND
# KadServer = Server
# #### NEVERMIND
# # KadServer = Server
import time
if __name__=='__main__':
# test
hfs = HalfForgetfulStorage(fn='test.db')
# import time
# if __name__=='__main__':
# # test
# hfs = HalfForgetfulStorage(fn='test.db')
#hfs['a']=1
# time.sleep(2)
hfs['a']=1000
# #hfs['a']=1
# # time.sleep(2)
# hfs['a']=1000
print(hfs['a'])
# print(hfs['a'])
print(hfs['a'])
# print(hfs['a'])

@ -46,6 +46,22 @@ class Server:
self.save_state_loop = None
def __repr__(self):
repr = f"""
kademlia.network.Server status:
ksize = {self.ksize}
alpha = {self.alpha}
storage = {self.storage}
node = {self.node}
transport = {self.transport}
protocol = {self.protocol}
refresh_loop = {self.refresh_loop}
save_state_loop = {self.save_state_loop}
bootstrappable_neighbors = {self.bootstrappable_neighbors()}
"""
return repr
def stop(self):
if self.transport is not None:
@ -165,14 +181,20 @@ class Server:
"Value must be of type int, float, bool, str, or bytes"
)
log.info("setting '%s' = '%s' on network", key, value)
dkey = digest(key)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value):
#dkey = digest(key)
return await self.set_digest(key, value)
async def set_digest(self, key, value):
"""
Set the given SHA1 digest key (bytes) to the given value in the
network.
"""
dkey=digest(key)
node = Node(dkey)
nearest = self.protocol.router.find_neighbors(node)
@ -189,7 +211,10 @@ class Server:
# if this node is close too, then store here as well
biggest = max([n.distance_to(node) for n in nodes])
if self.node.distance_to(node) < biggest:
self.storage[dkey] = value
#self.storage[dkey] = value
## IMPOSSIBLE STORING UNDIGESTED IN LOCAL STORAGE FOR NOW @DEBUG @HACK
#self.storage.data_debug[key]=value
self.storage.set(dkey,value,undigested_too=key)
results = [self.protocol.call_store(n, dkey, value) for n in nodes]
# return true only if at least one store call succeeded
return any(await asyncio.gather(*results))

@ -4,7 +4,7 @@ import operator
from collections import OrderedDict
from abc import abstractmethod, ABC
import asyncio
from kademlia.utils import digest
BSEP_ST = b'||||'
import base64,json
@ -119,7 +119,10 @@ class HalfForgetfulStorage(IStorage):
self.fn = fn
self.ttl = ttl
self.log = logger.info
self.data = OrderedDict() if not os.path.exists(self.fn) else self.load()
self.data_root = {} if not os.path.exists(self.fn) else self.load()
for x in ['_digest','_plain']: self.data_root[x]=OrderedDict()
self.data = self.data_root['_digest']
self.data_plain = self.data_root['_plain']
# import pickledb
# self.data = pickledb.load(self.fn,auto_dump=True)
@ -127,9 +130,14 @@ class HalfForgetfulStorage(IStorage):
#self.data = shelve.open(self.fn,flag='cs')
def dump(self):
def dump(self,show_keys=100):
async def do():
self.log('[async!!] dumping %s keys...' % len(self.keys()))
msg='[async!!] dumping %s keys...' % len(self.keys())
if show_keys:
keystr=list(sorted(self.keys()))[:show_keys]
if keystr:
msg+='\n'+', '.join(keystr)
with open(self.fn,'wb') as of:
pickle.dump(self.data, of)
asyncio.create_task(do())
@ -146,16 +154,22 @@ class HalfForgetfulStorage(IStorage):
def items(self): return [(k,v) for k,v in zip(self.keys(),self.values())]
def values(self): return [self.data[k] for k in self.keys()]
def set(self,key,value):
def set(self,dkey,value,undigested_too=None):
# log(f'HFS.set({key}) -> {value}')
newval = (time.monotonic(), value)
# store
if key in self.data:
del self.data[key]
newval = (time.monotonic(), value)
self.data[key] = newval
# self.data.set(key,newval)
if dkey in self.data:
del self.data[dkey]
self.data[dkey]=newval
if undigested_too:
key=undigested_too
if key in self.data:
del self.data[key]
self.data_plain[key]=newval
# save and prune
# self.dump()
@ -185,9 +199,16 @@ class HalfForgetfulStorage(IStorage):
#self.cull()
return self.get(key)
def __repr__(self):
def __repr__(self,lim_eg=20):
#self.cull()
return repr(self.data)
#return repr(self.data)
eg = list(sorted(self.data_plain.keys()))[:lim_eg]
msg=f"""HalfForgetfulStorage()
# digested keys = {len(self.data)}
# undigested keys = {len(self.data_plain)}
e.g., {eg}
"""
return msg
def iter_older_than(self, seconds_old):
min_birthday = time.monotonic() - seconds_old

@ -15,6 +15,7 @@ async def gather_dict(dic):
def digest(string):
if not isinstance(string, bytes):
string = str(string).encode('utf8')
return hashlib.sha1(string).digest()

Loading…
Cancel
Save