macdev
quadrismegistus 4 years ago
parent f01b98a0e4
commit b335e4eb37

@ -52,43 +52,28 @@ class Api(object):
def connect(self):
self.log('connecting...')
#loop=asyncio.get_event_loop()
async def _getdb():
from .kad import KadServer
self.log('starting server..')
node = KadServer() #storage=HalfForgetfulStorage())
self.log('listening..')
await node.listen(PORT_LISTEN)
self.log('bootstrapping server..')
await node.bootstrap(NODES_PRIME)
return node
async def _connect():
self._node0 = node = await _getdb() #await loop.create_task(_getdb())
# self.log('!!!',type(self._node))
#await node
#self.node = node
return node
return await _getdb()
# return asyncio.run(_connect())
# loop.set_debug(True)
# self.log('loop???',loop)
return asyncio.run(_connect())
def get(self,key_or_keys):
from .kad import KadServer
# loop=asyncio.get_event_loop()
# asyncio.set_event_loop(loop)
async def _get():
try:
await self.node
except TypeError:
pass
self.log('wtf??',self.node)
# self.log('async _get()',self.node)
node = self.node
# node=self.node
@ -120,21 +105,14 @@ class Api(object):
return None if res is None else json.loads(res)
def set(self,key_or_keys,value_or_values):
# self.log('hello?')
# loop=asyncio.get_event_loop()
async def _set():
try:
await self.node
except TypeError:
pass
# self.log('async _set()',self.node)
node=self.node
if type(key_or_keys) in {list,tuple,dict}:
keys = key_or_keys
values = value_or_values
self.log(len(keys),len(values))
self.log('# keys and values?',len(keys),len(values))
assert len(keys)==len(values)
res = await asyncio.gather(*[node.set(key,value) for key,value in zip(keys,values)])
# self.log('RES?',res)
@ -302,14 +280,14 @@ class Api(object):
def post(self,data):
post_id=get_random_id()
res = self.set_json('/post/'+post_id, data)
self.log('got data:',data)
self.log('Api.post() got data back from set_json():',res)
## add to channels
self.append_json('/posts/channel/earth', post_id)
# ## add to channels
# self.append_json('/posts/channel/earth', post_id)
## add to user
un=data.get('author')
if un: self.append_json('/posts/author/'+un, post_id)
# ## add to user
# un=data.get('author')
# if un: self.append_json('/posts/author/'+un, post_id)
if res:
return {'success':'Posted! %s' % post_id, 'post_id':post_id}

@ -8,6 +8,13 @@ from kademlia.routing import RoutingTable
from rpcudp.protocol import RPCProtocol
import os
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
log = logging.getLogger('kademlia')
log.addHandler(handler)
log.setLevel(logging.DEBUG)
PROXY_ADDR = ('0.0.0.0',8368)
@ -43,7 +50,7 @@ class HalfForgetfulStorage(ForgetfulStorage):
def get(self, key, default=None):
# self.cull()
print('looking for key: ', key.decode())
print('looking for key: ', key)
if key in self.data:
val=self[key]
print('...found it! = %s' % val)
@ -138,7 +145,7 @@ class KadProtocol(KademliaProtocol):
# REMOTES_D={}
def __init__(self, source_node, storage, ksize):
RPCProtocol.__init__(self,wait_timeout=5)
RPCProtocol.__init__(self,wait_timeout=15)
self.router = RoutingTable(self, ksize, source_node)
self.storage = storage
self.source_node = source_node
@ -173,6 +180,59 @@ class KadProtocol(KademliaProtocol):
class KadServer(Server):
protocol_class = KadProtocol # KadProtocol #KademliaProtocol
def __repr__(self):
repr = f"""
KadServer()
ksize = {self.ksize}
alpha = {self.alpha}
storage = {self.storage}
node = {self.node}
transport = {self.transport}
protocol = {self.protocol}
refresh_loop = {self.refresh_loop}
save_state_loop = {self.save_state_loop}
bootstrappable_neighbors = {self.bootstrappable_neighbors()}
"""
return repr
async def get(self, key):
"""
Get a key if the network has it.
Returns:
:class:`None` if not found, the value otherwise.
"""
log.info("Looking up key %s", key)
dkey = digest(key)
# if this node has it, return it
if self.storage.get(dkey) is not None:
log.info('I already have this')
return self.storage.get(dkey)
node = Node(dkey)
nearest = self.protocol.router.find_neighbors(node)
log.info(f'My nearest nodes are: {nearest}')
if not nearest:
log.warning("There are no known neighbors to get key %s", key)
return None
spider = ValueSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
await spider.find()
log.info(f'spider done crawling: {spider}')
async def set(self, key, value):
"""
Set the given string key to the given value in the network.
"""
if not check_dht_value_type(value):
raise TypeError(
"Value must be of type int, float, bool, str, or bytes"
)
log.info("setting '%s' = '%s' on network", key, value)
dkey = digest(key)
return await self.set_digest(dkey, value)
async def set_digest(self, dkey, value):
"""
@ -185,7 +245,7 @@ class KadServer(Server):
if not nearest:
log.warning("There are no known neighbors to set key %s",
dkey.hex())
return False
#return False
spider = NodeSpiderCrawl(self.protocol, node, nearest,
self.ksize, self.alpha)
@ -195,10 +255,8 @@ class KadServer(Server):
# if this node is close too, then store here as well
neighbs=[n.distance_to(node) for n in nodes]
biggest = max(neighbs) if neighbs else 0
if self.node.distance_to(node) < biggest:
self.storage[dkey] = value
#if self.node.distance_to(node) < biggest:
self.storage[dkey] = value
results = [self.protocol.call_store(n, dkey, value) for n in nodes]
# return true only if at least one store call succeeded
return any(await asyncio.gather(*results))
return any(await asyncio.gather(*results))
Loading…
Cancel
Save