From c7fc80e65ae37943d776feb9062317cc0c42969b Mon Sep 17 00:00:00 2001 From: quadrismegistus Date: Tue, 18 Aug 2020 13:08:06 +0100 Subject: [PATCH] abandoning udp proxying for now --- p2p/api.py | 8 ++--- p2p/p2p.py | 56 ++++++++++++++++++++++++++++++++- p2p/run.sh | 2 +- p2p/udp2tcp.py | 80 ++++++++++++++++++++++++++++++++++++++++++++++++ p2p/udp_proxy.py | 43 -------------------------- requirements.txt | 15 --------- 6 files changed, 140 insertions(+), 64 deletions(-) create mode 100644 p2p/udp2tcp.py delete mode 100644 p2p/udp_proxy.py diff --git a/p2p/api.py b/p2p/api.py index ecf5e38..e97e786 100644 --- a/p2p/api.py +++ b/p2p/api.py @@ -9,7 +9,6 @@ # from werkzeug.utils import secure_filename import os,time from pathlib import Path -from flask_api import status import asyncio from .crypto import * from main import log @@ -63,7 +62,9 @@ class Api(object): node.stop() return res - return asyncio.run(_get()) + loop = asyncio.get_event_loop() + return loop.run_until_complete(_get) + # return asyncio.run(_get()) def get_json(self,key_or_keys): @@ -131,8 +132,7 @@ class Api(object): if not (name and passkey): error('name and passkey not set') - return {'error':'Register failed'},status.HTTP_400_BAD_REQUEST - + return {'error':'Register failed'} person = self.get_person(name) if person is not None: log('error! person exists') diff --git a/p2p/p2p.py b/p2p/p2p.py index 07389f0..568b922 100644 --- a/p2p/p2p.py +++ b/p2p/p2p.py @@ -8,10 +8,52 @@ import pickle,os NODES_PRIME = [("128.232.229.63",8468), ("68.66.241.111",8468)] +def start_udp_tcp_bridge(): + from twisted.internet.protocol import Protocol, Factory, DatagramProtocol + from twisted.internet import reactor + class TCPServer(Protocol): + def connectionMade(self): + self.port = reactor.listenUDP(8000, UDPServer(self)) + def connectionLost(self, reason): + self.port.stopListening() -def start_first_node(port=8468): + + + + # class UDPServer(DatagramProtocol): + # def __init__(self, stream): + # self.stream = stream + + # def datagramReceived(self, datagram, address): + # self.stream.transport.write(datagram) + +async def tcp_echo_client(message): + + reader, writer = await asyncio.open_connection( + '127.0.0.1', 8888) + + for n in range(5): + print(f'Send: {message!r}') + writer.write(message.encode()) + await writer.drain() + import time + await asyncio.sleep(1) + + #data = await reader.read(100) + #print(f'Received: {data.decode()!r}') + + #asyncio.sleep(1) + + print('Close the connection') + #writer.close() + #await writer.wait_closed() + +async def echo(msg): + print('echo',msg) + +def boot_selfless_node(port=8468): handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) @@ -22,9 +64,21 @@ def start_first_node(port=8468): loop = asyncio.get_event_loop() loop.set_debug(True) + # loop.create_task(tcp_echo_client('hello')) + # loop.create_task(echo('hello??')) + + # ## UDP <-> TCP bridge + # print("Starting datagram proxy...") + # coro = start_datagram_proxy(bind, port, remote_host, remote_port) + # transport, _ = loop.run_until_complete(coro) + # print("Datagram proxy is running on " + str(port)) + + + # shelf = HalfForgetfulStorage() shelf = None + print('starting kad server') #server = Server(storage=shelf) from kad import KadServer,HalfForgetfulStorage diff --git a/p2p/run.sh b/p2p/run.sh index 1000eaf..34b42db 100755 --- a/p2p/run.sh +++ b/p2p/run.sh @@ -1 +1 @@ -python -c "import p2p; p2p.start_first_node()" \ No newline at end of file +python -c "import p2p; p2p.boot_selfless_node()" \ No newline at end of file diff --git a/p2p/udp2tcp.py b/p2p/udp2tcp.py new file mode 100644 index 0000000..a15344c --- /dev/null +++ b/p2p/udp2tcp.py @@ -0,0 +1,80 @@ +""" +1-way UDP to TCP relay. + +Test with netcat +1) Run TCP server: + nc -l 999 +2) Run UDP proxy: + python udpproxy.py +3) Run UDP client: + nc -u 127.0.0.1 8888 +4) Type some strings, type enter, they should show on the TCP server +""" + +import asyncio + + +class ProxyDatagramProtocol(asyncio.DatagramProtocol): + + def __init__(self, remote_address): + self.remote_address = remote_address + self.remotes = {} + self.transport = None + super().__init__() + + def connection_made(self, transport): + self.transport = transport + + def datagram_received(self, data, addr): + if addr in self.remotes: + self.remotes[addr].transport.write(data) + return + loop = asyncio.get_event_loop() + self.remotes[addr] = RemoteStreamProtocol(self, data) + coro = loop.create_connection( + lambda: self.remotes[addr], host=self.remote_address[0], port=int(self.remote_address[1])) + asyncio.ensure_future(coro) + + +class RemoteStreamProtocol(asyncio.Protocol): + + def __init__(self, proxy, data): + self.proxy = proxy + self.data = data + self.transport = None + super().__init__() + + def connection_made(self, transport): + self.transport = transport + self.transport.write(self.data) + + def data_received(self, data, _): + pass + + def eof_received(self): + pass + + +def start_datagram_proxy(bind, port, remote_host, remote_port): + loop = asyncio.get_event_loop() + protocol = ProxyDatagramProtocol((remote_host, remote_port)) + return (yield from loop.create_datagram_endpoint(lambda: protocol, local_addr=(bind, port))) + + +def main(bind='0.0.0.0', port=8888, remote_host='127.0.0.1', remote_port=9999): + loop = asyncio.get_event_loop() + print("Starting datagram proxy...") + coro = start_datagram_proxy(bind, port, remote_host, remote_port) + transport, _ = loop.run_until_complete(coro) + print("Datagram proxy is running on " + str(port)) + try: + loop.run_forever() + except KeyboardInterrupt: + pass + print("Closing transport...") + transport.close() + loop.close() + + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/p2p/udp_proxy.py b/p2p/udp_proxy.py deleted file mode 100644 index 386b043..0000000 --- a/p2p/udp_proxy.py +++ /dev/null @@ -1,43 +0,0 @@ -import socket -from threading import Thread - -class Proxy(Thread): - """ used to proxy single udp connection - """ - BUFFER_SIZE = 4096 - def __init__(self, listening_address, forward_address): - print " Server started on", listening_address - Thread.__init__(self) - self.bind = listening_address - self.target = forward_address - - def run(self): - # listen for incoming connections: - target = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - target.connect(self.target) - - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - s.bind(self.bind) - except socket.error, err: - print "Couldn't bind server on %r" % (self.bind, ) - raise SystemExit - while 1: - datagram = s.recv(self.BUFFER_SIZE) - if not datagram: - break - length = len(datagram) - sent = target.send(datagram) - if length != sent: - print 'cannot send to %r, %r !+ %r' % (self.target, length, sent) - s.close() - - -if __name__ == "__main__": - LISTEN = ("0.0.0.0", 53) - TARGET = ("172.30.14.11", 53) - while 1: - proxy = Proxy(LISTEN, TARGET) - proxy.start() - proxy.join() - print ' [restarting] ' \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index e1e227a..3958982 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,38 +1,23 @@ -astroid==2.4.2 certifi==2020.6.20 cffi==1.14.2 chardet==3.0.4 -click==7.1.2 cryptography==3.0 docutils==0.16 -Flask==1.1.2 -Flask-API==2.0 idna==2.10 -isort==4.3.21 -itsdangerous==1.1.0 -Jinja2==2.11.2 kademlia==2.2.1 Kivy==1.11.1 Kivy-Garden==0.1.4 kivymd==0.104.1 -lazy-object-proxy==1.4.3 llp==0.2.2 -MarkupSafe==1.1.1 -mccabe==0.6.1 mpi-slingshot==0.2.0 pathtools==0.1.2 Pillow==7.2.0 plyer==1.4.3 pycparser==2.20 Pygments==2.6.1 -pylint==2.5.3 requests==2.24.0 rpcudp==4.0.1 six==1.15.0 -toml==0.10.1 -typed-ast==1.4.1 u-msgpack-python==2.7.0 urllib3==1.25.10 watchdog==0.10.3 -Werkzeug==1.0.1 -wrapt==1.12.1