Remote: Allow to manually specify a timeout.

This commit is contained in:
Sarah 2021-10-27 10:29:47 +02:00
parent adc1e28af2
commit e039b5f508
No known key found for this signature in database
GPG key ID: 708F7ACE058F0186
3 changed files with 12 additions and 9 deletions

View file

@ -14,6 +14,7 @@ parser = argparse.ArgumentParser(description="Peerix nix binary cache.")
parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, default=logging.INFO, dest="loglevel") parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, default=logging.INFO, dest="loglevel")
parser.add_argument("--port", default=12304, type=int) parser.add_argument("--port", default=12304, type=int)
parser.add_argument("--private-key", required=False) parser.add_argument("--private-key", required=False)
parser.add_argument("--timeout", type=int, default=50)
def run(): def run():
args = parser.parse_args() args = parser.parse_args()
@ -23,14 +24,14 @@ def run():
logging.basicConfig(level=args.loglevel) logging.basicConfig(level=args.loglevel)
uvloop.install() uvloop.install()
asyncio.run(main(args.port)) asyncio.run(main(args.port, args.timeout / 1000.0))
async def main(port: int): async def main(port: int, timeout: float):
config = Config() config = Config()
config.bind = [f"0.0.0.0:{port}"] config.bind = [f"0.0.0.0:{port}"]
async with setup_stores(port): async with setup_stores(port, timeout):
await serve(app, config) await serve(app, config)

View file

@ -13,12 +13,12 @@ from peerix.prefix import PrefixStore
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def setup_stores(local_port: int): async def setup_stores(local_port: int, timeout: float):
global l_access, r_access global l_access, r_access
async with local() as l: async with local() as l:
l_access = PrefixStore("local/nar", l) l_access = PrefixStore("local/nar", l)
lp = PrefixStore("local", l) lp = PrefixStore("local", l)
async with remote(lp, local_port, "0.0.0.0", lp.prefix) as r: async with remote(lp, local_port, "0.0.0.0", lp.prefix, timeout) as r:
r_access = PrefixStore("v2/remote", r) r_access = PrefixStore("v2/remote", r)
yield yield

View file

@ -47,14 +47,16 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
session: aiohttp.ClientSession session: aiohttp.ClientSession
local_port: int local_port: int
prefix: str prefix: str
timeout: float
def __init__(self, store: Store, session: aiohttp.ClientSession, local_port: int, prefix: str): def __init__(self, store: Store, session: aiohttp.ClientSession, local_port: int, prefix: str, timeout: float):
self.idx = 0 self.idx = 0
self.waiters = {} self.waiters = {}
self.store = store self.store = store
self.session = session self.session = session
self.local_port = local_port self.local_port = local_port
self.prefix = prefix self.prefix = prefix
self.timeout = timeout
def connection_made(self, transport): def connection_made(self, transport):
self.transport = transport self.transport = transport
@ -114,7 +116,7 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
try: try:
# This must have a short timeout so it does not noticably slow down # This must have a short timeout so it does not noticably slow down
# querying of other caches. # querying of other caches.
port, url, addr = await asyncio.wait_for(fut, 0.05) port, url, addr = await asyncio.wait_for(fut, self.timeout)
except asyncio.TimeoutError: except asyncio.TimeoutError:
logging.debug(f"No response for {hsh}") logging.debug(f"No response for {hsh}")
return None return None
@ -164,11 +166,11 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def remote(store: Store, local_port: int, local_addr: str="0.0.0.0", prefix: str="local"): async def remote(store: Store, local_port: int, local_addr: str="0.0.0.0", prefix: str="local", timeout: float = 0.05):
protocol: DiscoveryProtocol protocol: DiscoveryProtocol
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
_, protocol = await asyncio.get_running_loop().create_datagram_endpoint( _, protocol = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: DiscoveryProtocol(store, session, local_port, prefix), lambda: DiscoveryProtocol(store, session, local_port, prefix, timeout),
local_addr=(local_addr, local_port), local_addr=(local_addr, local_port),
family=socket.AF_INET, family=socket.AF_INET,
allow_broadcast=True allow_broadcast=True