Improve UI.

This commit is contained in:
Sarah 2021-10-18 14:50:16 +02:00
parent a19570395c
commit bd97110a25
No known key found for this signature in database
GPG key ID: 708F7ACE058F0186
7 changed files with 116 additions and 32 deletions

View file

@ -48,6 +48,20 @@ in
The user the service will use. The user the service will use.
''; '';
}; };
globalCacheTTL = lib.mkOption {
type = types.nullOr types.int;
default = null;
description = ''
How long should nix store narinfo files.
If not defined, the module will not reconfigure the entry.
If it is defined, this will define how many seconds a cache entry will
be stored.
By default not given, as it affects the UX of the nix installation.
'';
}
}; };
}; };
@ -120,6 +134,11 @@ in
binaryCachePublicKeys = lib.mkIf (cfg.publicKeyFile != null) [ binaryCachePublicKeys = lib.mkIf (cfg.publicKeyFile != null) [
(builtins.readFile cfg.publicKeyFile) (builtins.readFile cfg.publicKeyFile)
]; ];
extraOptions = lib.mkIf (cfg.globalCacheTTL != null) ''
narinfo-cache-negative-ttl ${cfg.globalCacheTTL}
narinfo-cache-positive-ttl ${cfg.globalCacheTTL}
'';
}; };
networking.firewall = lib.mkIf (cfg.openFirewall) { networking.firewall = lib.mkIf (cfg.openFirewall) {

View file

@ -1,19 +1,36 @@
import os
import logging import logging
import asyncio import asyncio
import argparse
import uvloop import uvloop
from hypercorn import Config from hypercorn import Config
from hypercorn.asyncio import serve from hypercorn.asyncio import serve
from peerix.app import app from peerix.app import app, setup_stores
parser = argparse.ArgumentParser(description="Peerix nix binary cache.")
parser.add_argument("--verbose", action="store_const", const=logging.DEBUG, default=logging.INFO, dest="loglevel")
parser.add_argument("--port", default=12304, type=int)
parser.add_argument("--private-key", required=False)
def run(): def run():
logging.basicConfig() args = parser.parse_args()
os.environ["NIX_SECRET_KEY_FILE"] = os.path.abspath(os.path.expanduser(args.private_key))
logging.basicConfig(level=args.loglevel)
uvloop.install() uvloop.install()
asyncio.run(main(args.port))
async def main(port: int):
config = Config() config = Config()
config.bind = ["0.0.0.0:12304"] config.bind = [f"0.0.0.0:{port}"]
asyncio.run(serve(app, config))
async with setup_stores(port):
await serve(app, config)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -1,4 +1,5 @@
import logging import logging
import datetime
import contextlib import contextlib
from starlette.requests import Request from starlette.requests import Request
@ -12,27 +13,18 @@ from peerix.prefix import PrefixStore
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def _setup_stores(local_port: int): async def setup_stores(local_port: int):
global l_access, r_access global l_access, r_access
async with local() as l: async with local() as l:
l_access = PrefixStore("local/nar", l) l_access = PrefixStore("local/nar", l)
lp = PrefixStore("local", l) lp = PrefixStore("local", l)
async with remote(lp, local_port, "0.0.0.0", lp.prefix) as r: async with remote(lp, local_port, "0.0.0.0", lp.prefix) as r:
r_access = PrefixStore("remote", r) r_access = PrefixStore("v2/remote", r)
yield yield
setup_store = _setup_stores(12304)
app = Starlette() app = Starlette()
@app.on_event("startup")
async def _setup_stores_init():
await setup_store.__aenter__()
@app.on_event("shutdown")
async def _setup_stores_deinit():
await setup_store.__aexit__(None, None, None)
@app.route("/nix-cache-info") @app.route("/nix-cache-info")
async def cache_info(_: Request) -> Response: async def cache_info(_: Request) -> Response:
@ -43,10 +35,14 @@ async def cache_info(_: Request) -> Response:
@app.route("/{hash:str}.narinfo") @app.route("/{hash:str}.narinfo")
async def narinfo(req: Request) -> Response: async def narinfo(req: Request) -> Response:
if req.client.host != "127.0.0.1": if req.client.host != "127.0.0.1":
return Response(content="Permission denied.", status_code=403) return Response(content="Permission denied.", status_code=403)
# We do not cache nar-infos.
# Therefore, dynamically recompute expires at.
ni = await r_access.narinfo(req.path_params["hash"]) ni = await r_access.narinfo(req.path_params["hash"])
if ni is None: if ni is None:
return Response(content="Not found", status_code=404) return Response(content="Not found", status_code=404)
@ -62,9 +58,18 @@ async def access_narinfo(req: Request) -> Response:
@app.route("/local/nar/{path:str}") @app.route("/local/nar/{path:str}")
async def push_nar(req: Request) -> Response: async def push_nar(req: Request) -> Response:
return StreamingResponse(l_access.nar(f"local/nar/{req.path_params['path']}"), media_type="text/plain") try:
return StreamingResponse(
await l_access.nar(f"local/nar/{req.path_params['path']}"),
media_type="text/plain"
)
except FileNotFoundError:
return Response(content="Gone", status_code=404)
# Paths must be versioned as nix is caching the NAR urls.
@app.route("/remote/{path:path}") @app.route("/v2/remote/{path:path}")
async def pull_nar(req: Request) -> Response: async def pull_nar(req: Request) -> Response:
return StreamingResponse(r_access.nar(f"remote/{req.path_params['path']}"), media_type="text/plain") try:
return StreamingResponse(await r_access.nar(f"remote/{req.path_params['path']}"), media_type="text/plain")
except FileNotFoundError:
return Response(content="Gone", status_code=404)

View file

@ -67,13 +67,20 @@ class LocalStore(Store):
info = NarInfo.parse(await resp.text()) info = NarInfo.parse(await resp.text())
return info._replace(url=base64.b64encode(info.storePath.encode("utf-8")).replace(b"/", b"_").decode("ascii")+".nar") return info._replace(url=base64.b64encode(info.storePath.encode("utf-8")).replace(b"/", b"_").decode("ascii")+".nar")
async def nar(self, sp: str) -> t.AsyncIterable[bytes]: async def nar(self, sp: str) -> t.Awaitable[t.AsyncIterable[bytes]]:
if sp.endswith(".nar"): if sp.endswith(".nar"):
sp = sp[:-4] sp = sp[:-4]
path = base64.b64decode(sp.replace("_", "/")).decode("utf-8") path = base64.b64decode(sp.replace("_", "/")).decode("utf-8")
if not path.startswith((await self.cache_info()).storeDir): if not path.startswith((await self.cache_info()).storeDir):
raise FileNotFoundError() raise FileNotFoundError()
if not os.path.exists(path):
raise FileNotFoundError()
return self._nar_pull(path)
async def _nar_pull(self, path: str) -> t.AsyncIterable[bytes]:
logger.info(f"Serving {path}")
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
nix, "dump-path", "--", path, nix, "dump-path", "--", path,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
@ -85,11 +92,19 @@ class LocalStore(Store):
while not process.stdout.at_eof(): while not process.stdout.at_eof():
yield await process.stdout.read(10*1024*1024) yield await process.stdout.read(10*1024*1024)
logger.debug(f"Served {path}")
try:
process.terminate()
except ProcessLookupError:
pass
@contextlib.asynccontextmanager @contextlib.asynccontextmanager
async def local(): async def local():
with tempfile.TemporaryDirectory() as tmpdir: with tempfile.TemporaryDirectory() as tmpdir:
sock = f"{tmpdir}/server.sock" sock = f"{tmpdir}/server.sock"
logger.info("Launching nix-serve.")
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
nix_serve, "--listen", sock, nix_serve, "--listen", sock,
stdin=subprocess.DEVNULL, stdin=subprocess.DEVNULL,
@ -108,5 +123,10 @@ async def local():
async with aiohttp.ClientSession(connector_owner=True, connector=connector) as session: async with aiohttp.ClientSession(connector_owner=True, connector=connector) as session:
yield LocalStore(session) yield LocalStore(session)
finally: finally:
process.terminate() try:
process.terminate()
except ProcessLookupError:
pass
logger.info("nix-serve exited.")

View file

@ -16,10 +16,9 @@ class PrefixStore(Store):
return None return None
return info._replace(url=f"{self.prefix}/{info.url}") return info._replace(url=f"{self.prefix}/{info.url}")
async def nar(self, path: str) -> t.AsyncIterable[bytes]: def nar(self, path: str) -> t.Awaitable[t.AsyncIterable[bytes]]:
if not path.startswith(self.prefix + "/"): if not path.startswith(self.prefix + "/"):
raise FileNotFoundError("Not found.") raise FileNotFoundError("Not found.")
async for chunk in self.backend.nar(path[len(self.prefix)+1:]): return self.backend.nar(path[len(self.prefix)+1:])
yield chunk

View file

@ -126,17 +126,41 @@ class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
return return
info = NarInfo.parse(await resp.text()) info = NarInfo.parse(await resp.text())
return info._replace(url = f"{addr[0]}/{port}/{info.url}") return info._replace(url = f"{addr[0]}/{port}/{hsh}/{info.url}")
async def nar(self, sp: str) -> t.AsyncIterable[bytes]: async def nar(self, sp: str) -> t.Awaitable[t.AsyncIterable[bytes]]:
addr1, addr2, p = sp.split("/", 2) try:
async with self.session.get(f"http://{addr1}:{addr2}/{p}") as resp: return await self._nar_req(sp)
if resp.status != 200: except FileNotFoundError:
raise FileNotFoundError("Not found.") addr1, addr2, hsh, _ = sp.split("/", 2)
logging.warn(f"Remote({addr1}:{addr2})-store path is dead: {sp}")
pass
_, _, hsh, _ = sp.split("/", 2)
narinfo = await self.narinfo(hsh)
if narinfo is None:
logging.warn(f"All sources are gone.")
raise FileNotFoundError()
return await self._nar_req(narinfo.url)
async def _nar_req(self, url: str) -> t.Awaitable[t.AsyncIterable[bytes]]:
addr1, addr2, _, p = url.split("/", 2)
resp = await self.session.get(f"http://{addr1}:{addr2}/{p}")
if resp.status == 200:
return self._nar_direct(resp)
else:
raise FileNotFoundError()
async def _nar_direct(self, resp: aiohttp.ClientResponse) -> t.AsyncIterable[bytes]:
try:
content = resp.content content = resp.content
while not content.at_eof(): while not content.at_eof():
yield await content.readany() yield await content.readany()
finally:
resp.close()
await resp.wait_for_close()
@contextlib.asynccontextmanager @contextlib.asynccontextmanager

View file

@ -86,6 +86,6 @@ class Store:
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]: async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
raise NotImplementedError() raise NotImplementedError()
async def nar(self, url: str) -> t.AsyncIterable[bytes]: def nar(self, url: str) -> t.Awaitable[t.AsyncIterable[bytes]]:
raise NotImplementedError() raise NotImplementedError()