some better broadcasting.

This commit is contained in:
Sarah 2021-10-17 02:57:44 +02:00
parent c96063470a
commit cc8abec5a0
No known key found for this signature in database
GPG key ID: 708F7ACE058F0186
2 changed files with 41 additions and 12 deletions

View file

@ -2,14 +2,37 @@ import typing as t
import socket
import asyncio
import ipaddress
import contextlib
import psutil
import aiohttp
from peerix.store import NarInfo, Store
def get_brdcasts():
for interface, iaddrs in psutil.net_if_addrs().items():
for iaddr in iaddrs:
if iaddr.broadcast is None or iaddr.family != socket.AF_INET:
continue
ifa = ipaddress.IPv4Interface(f"{iaddr.address}/{iaddr.netmask}")
if not ifa.network.is_private:
continue
yield str(ifa.network.broadcast_address)
def get_myself():
for interface, iaddrs in psutil.net_if_addrs().items():
for iaddr in iaddrs:
if iaddr.broadcast is None or iaddr.family != socket.AF_INET:
continue
yield str(iaddr.address)
class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
idx: int
@ -29,14 +52,19 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data: bytes, addr: None) -> None:
def datagram_received(self, data: bytes, addr: t.Tuple[str, int]) -> None:
print(set(get_myself()))
if addr[0] in set(get_myself()):
print(f"Ignoring packet from {addr[0]}")
return
# 0 => Response to a command of mine.
if data[0] == 1:
idx = int.from_bytes(data[1:5], "big")
if idx not in self.waiters:
return
self.waiters[idx].set_result((data[6:].decode("utf-8"), addr))
self.waiters[idx].set_result((int.from_bytes(data[5:9], "big"), data[9:].decode("utf-8"), addr))
# 1 => Request from another server.
elif data[0] == 0:
@ -48,13 +76,14 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
async def cache_info(self):
return await self.store.cache_info()
async def respond(self, data: bytes, addr: None) -> None:
hsh = data[6:].decode("utf-8")
print("Got request from {addr[0]}:{addr[1]} for {hsh}")
async def respond(self, data: bytes, addr: t.Tuple[str, int]) -> None:
hsh = data[5:].decode("utf-8")
print(f"Got request from {addr[0]}:{addr[1]} for {hsh}")
narinfo = await self.store.narinfo(hsh)
if narinfo is None:
return
self.transport.sendto(b"\x01" + data[1:5] + self.local_port.to_bytes(4, "big") + narinfo.url.encode("utf-8"))
self.transport.sendto(b"\x01" + data[1:5] + self.local_port.to_bytes(4, "big") + narinfo.url.encode("utf-8"), addr)
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
fut = asyncio.get_running_loop().create_future()
@ -62,16 +91,15 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
self.waiters[idx] = fut
fut.add_done_callback(lambda _: self.waiters.pop(idx, None))
print(f"Requesting {hsh} from direct local network.")
self.transport.sendto(b"".join([b"\x00", idx.to_bytes(4, "big"), hsh.encode("utf-8")]), ("255.255.255.255", self.local_port))
for addr in set(get_brdcasts()):
self.transport.sendto(b"".join([b"\x00", idx.to_bytes(4, "big"), hsh.encode("utf-8")]), (addr, self.local_port))
try:
data, addr = await asyncio.wait_for(fut, 0.5)
port, url, addr = await asyncio.wait_for(fut, 0.5)
except asyncio.TimeoutError:
print(f"No response for {hsh}")
return None
port = int.from_bytes(data[0:4], "big")
url = data[5:].decode("utf-8")
print(f"{addr[0]}:{addr[1]} responded for {hsh} with http://{addr[0]}:{port}/{url}")
async with self.session.get(f"http://{addr[0]}:{port}/{url}") as resp:

View file

@ -2,3 +2,4 @@ aiohttp
uvloop
hypercorn
starlette
psutil