From cc8abec5a0b5a2b856b17580e1dba97a36e40a67 Mon Sep 17 00:00:00 2001 From: Sarah Date: Sun, 17 Oct 2021 02:57:44 +0200 Subject: [PATCH] some better broadcasting. --- peerix/remote.py | 52 +++++++++++++++++++++++++++++++++++++----------- requirements.txt | 1 + 2 files changed, 41 insertions(+), 12 deletions(-) diff --git a/peerix/remote.py b/peerix/remote.py index 6cb18a4..9c31169 100644 --- a/peerix/remote.py +++ b/peerix/remote.py @@ -2,14 +2,37 @@ import typing as t import socket import asyncio +import ipaddress import contextlib +import psutil import aiohttp from peerix.store import NarInfo, Store +def get_brdcasts(): + for interface, iaddrs in psutil.net_if_addrs().items(): + for iaddr in iaddrs: + if iaddr.broadcast is None or iaddr.family != socket.AF_INET: + continue + + ifa = ipaddress.IPv4Interface(f"{iaddr.address}/{iaddr.netmask}") + if not ifa.network.is_private: + continue + + yield str(ifa.network.broadcast_address) + + +def get_myself(): + for interface, iaddrs in psutil.net_if_addrs().items(): + for iaddr in iaddrs: + if iaddr.broadcast is None or iaddr.family != socket.AF_INET: + continue + + yield str(iaddr.address) + class DiscoveryProtocol(asyncio.DatagramProtocol, Store): idx: int @@ -29,14 +52,19 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store): def connection_made(self, transport): self.transport = transport - def datagram_received(self, data: bytes, addr: None) -> None: + def datagram_received(self, data: bytes, addr: t.Tuple[str, int]) -> None: + print(set(get_myself())) + if addr[0] in set(get_myself()): + print(f"Ignoring packet from {addr[0]}") + return + # 0 => Response to a command of mine. if data[0] == 1: idx = int.from_bytes(data[1:5], "big") if idx not in self.waiters: return - - self.waiters[idx].set_result((data[6:].decode("utf-8"), addr)) + + self.waiters[idx].set_result((int.from_bytes(data[5:9], "big"), data[9:].decode("utf-8"), addr)) # 1 => Request from another server. elif data[0] == 0: @@ -48,13 +76,14 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store): async def cache_info(self): return await self.store.cache_info() - async def respond(self, data: bytes, addr: None) -> None: - hsh = data[6:].decode("utf-8") - print("Got request from {addr[0]}:{addr[1]} for {hsh}") + async def respond(self, data: bytes, addr: t.Tuple[str, int]) -> None: + hsh = data[5:].decode("utf-8") + print(f"Got request from {addr[0]}:{addr[1]} for {hsh}") narinfo = await self.store.narinfo(hsh) if narinfo is None: return - self.transport.sendto(b"\x01" + data[1:5] + self.local_port.to_bytes(4, "big") + narinfo.url.encode("utf-8")) + + self.transport.sendto(b"\x01" + data[1:5] + self.local_port.to_bytes(4, "big") + narinfo.url.encode("utf-8"), addr) async def narinfo(self, hsh: str) -> t.Optional[NarInfo]: fut = asyncio.get_running_loop().create_future() @@ -62,16 +91,15 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store): self.waiters[idx] = fut fut.add_done_callback(lambda _: self.waiters.pop(idx, None)) print(f"Requesting {hsh} from direct local network.") - self.transport.sendto(b"".join([b"\x00", idx.to_bytes(4, "big"), hsh.encode("utf-8")]), ("255.255.255.255", self.local_port)) + for addr in set(get_brdcasts()): + self.transport.sendto(b"".join([b"\x00", idx.to_bytes(4, "big"), hsh.encode("utf-8")]), (addr, self.local_port)) + try: - data, addr = await asyncio.wait_for(fut, 0.5) + port, url, addr = await asyncio.wait_for(fut, 0.5) except asyncio.TimeoutError: print(f"No response for {hsh}") return None - port = int.from_bytes(data[0:4], "big") - url = data[5:].decode("utf-8") - print(f"{addr[0]}:{addr[1]} responded for {hsh} with http://{addr[0]}:{port}/{url}") async with self.session.get(f"http://{addr[0]}:{port}/{url}") as resp: diff --git a/requirements.txt b/requirements.txt index 1ac41c6..9ab7a4c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ aiohttp uvloop hypercorn starlette +psutil