Initial commit

This commit is contained in:
Sarah 2021-10-17 02:09:38 +02:00
commit c96063470a
No known key found for this signature in database
GPG key ID: 708F7ACE058F0186
14 changed files with 830 additions and 0 deletions

0
peerix/__init__.py Normal file
View file

13
peerix/__main__.py Normal file
View file

@ -0,0 +1,13 @@
import asyncio
import uvloop
from hypercorn import Config
from hypercorn.asyncio import serve
from peerix.app import app
if __name__ == "__main__":
uvloop.install()
config = Config()
config.bind = ["0.0.0.0:12304"]
asyncio.run(serve(app, config))

71
peerix/app.py Normal file
View file

@ -0,0 +1,71 @@
import asyncio
import contextlib
from starlette.requests import Request
from starlette.responses import Response, StreamingResponse
from starlette.applications import Starlette
from peerix.local import local
from peerix.remote import remote
from peerix.prefix import PrefixStore
@contextlib.asynccontextmanager
async def _setup_stores(local_port: int):
global l_access, r_access
async with local() as l:
l_access = PrefixStore("local/nar", l)
lp = PrefixStore("local", l)
async with remote(lp, local_port, "0.0.0.0") as r:
r_access = PrefixStore("remote", r)
yield
setup_store = _setup_stores(12304)
app = Starlette()
@app.on_event("startup")
async def _setup_stores_init():
await setup_store.__aenter__()
@app.on_event("shutdown")
async def _setup_stores_deinit():
await setup_store.__aexit__(None, None, None)
@app.route("/nix-cache-info")
async def cache_info(_: Request) -> Response:
ci = await l_access.cache_info()
ci = ci._replace(priority=20)
return Response(content=ci.dump())
@app.route("/{hash:str}.narinfo")
async def narinfo(req: Request) -> Response:
if req.client.host != "127.0.0.1":
return Response(content="Permission denied.", status_code=403)
ni = await r_access.narinfo(req.path_params["hash"])
if ni is None:
return Response(content="Not found", status_code=404)
return Response(content=ni.dump(), status_code=200, media_type="text/x-nix-narinfo")
@app.route("/local/{hash:str}.narinfo")
async def access_narinfo(req: Request) -> Response:
ni = await l_access.narinfo(req.path_params["hash"])
if ni is None:
return Response(content="Not found", status_code=404)
return Response(content=ni.dump(), status_code=200, media_type="text/x-nix-narinfo")
@app.route("/local/nar/{path:str}")
async def push_nar(req: Request) -> Response:
return StreamingResponse(l_access.nar(f"local/nar/{req.path_params['path']}"), media_type="text/plain")
@app.route("/remote/{path:path}")
async def pull_nar(req: Request) -> Response:
return StreamingResponse(l_access.nar(f"remote/{req.path_params['path']}"), media_type="text/plain")

106
peerix/local.py Normal file
View file

@ -0,0 +1,106 @@
import typing as t
import contextlib
import subprocess
import tempfile
import asyncio
import shutil
import base64
import sys
import os
import aiohttp
from peerix.store import NarInfo, CacheInfo, Store
nix_serve = shutil.which("nix-serve")
if nix_serve is None:
raise RuntimeError("nix-serve is not installed.")
nix = shutil.which("nix")
if nix is None:
raise RuntimeError("nix is not installed.")
assert nix_serve is not None
assert nix is not None
class LocalStore(Store):
def __init__(self, session: aiohttp.ClientSession):
self.session = session
self._cache: t.Optional[CacheInfo] = None
async def cache_info(self) -> CacheInfo:
if self._cache is None:
async with self.session.get("http://_/nix-cache-info") as resp:
storeDir = ""
wantMassQuery = -1
priority = 50
for line in (await resp.text()).splitlines():
k, v = line.split(":", 1)
v = v.strip()
k = k.strip()
if k == "StoreDir":
storeDir = v
elif k == "WantMassQuery":
wantMassQuery = int(v)
elif k == "Priority":
priority = int(v)
self._cache = CacheInfo(storeDir, wantMassQuery, priority)
return self._cache
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
async with self.session.get(f"http://_/{hsh}.narinfo") as resp:
if resp.status == 404:
return None
info = NarInfo.parse(await resp.text())
return info._replace(url=base64.b64encode(info.storePath.encode("utf-8")).replace(b"/", b"_").decode("ascii"))
async def nar(self, sp: str) -> t.AsyncIterable[bytes]:
path = base64.b64decode(sp.replace("_", "/")).decode("utf-8")
if not path.startswith((await self.cache_info()).storeDir):
raise FileNotFoundError()
process = await asyncio.create_subprocess_exec(
nix, "dump-path", "--", path,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
stdin=subprocess.DEVNULL,
)
assert process.stdout is not None
while not process.stdout.at_eof():
yield await process.stdout.read(10*1024*1024)
@contextlib.asynccontextmanager
async def local():
with tempfile.TemporaryDirectory() as tmpdir:
sock = f"{tmpdir}/server.sock"
process = await asyncio.create_subprocess_exec(
nix_serve, "--listen", sock,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=sys.stderr
)
for _ in range(10):
if os.path.exists(sock):
break
await asyncio.sleep(1)
else:
raise RuntimeError("Failed to start up local store.")
try:
connector = aiohttp.UnixConnector(sock)
async with aiohttp.ClientSession(connector_owner=True, connector=connector) as session:
yield LocalStore(session)
finally:
process.terminate()

25
peerix/prefix.py Normal file
View file

@ -0,0 +1,25 @@
import typing as t
from peerix.store import NarInfo, Store
class PrefixStore(Store):
def __init__(self, prefix: str, backend: Store):
self.backend = backend
self.prefix = prefix
async def cache_info(self):
return await self.backend.cache_info()
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
info = await self.backend.narinfo(hsh)
if info is None:
return None
return info._replace(url=f"{self.prefix}/{info.url}")
async def nar(self, path: str) -> t.AsyncIterable[bytes]:
if not path.startswith(self.prefix + "/"):
raise FileNotFoundError("Not found.")
async for chunk in self.backend.nar(path[len(self.prefix)+1:]):
yield chunk

108
peerix/remote.py Normal file
View file

@ -0,0 +1,108 @@
import typing as t
import socket
import asyncio
import contextlib
import aiohttp
from peerix.store import NarInfo, Store
class DiscoveryProtocol(asyncio.DatagramProtocol, Store):
idx: int
transport: asyncio.DatagramTransport
waiters: t.Dict[int, asyncio.Future]
store: Store
session: aiohttp.ClientSession
local_port: int
def __init__(self, store: Store, session: aiohttp.ClientSession, local_port: int):
self.idx = 0
self.waiters = {}
self.store = store
self.session = session
self.local_port = local_port
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data: bytes, addr: None) -> None:
# 0 => Response to a command of mine.
if data[0] == 1:
idx = int.from_bytes(data[1:5], "big")
if idx not in self.waiters:
return
self.waiters[idx].set_result((data[6:].decode("utf-8"), addr))
# 1 => Request from another server.
elif data[0] == 0:
asyncio.create_task(self.respond(data, addr))
def stop(self):
self.transport.close()
async def cache_info(self):
return await self.store.cache_info()
async def respond(self, data: bytes, addr: None) -> None:
hsh = data[6:].decode("utf-8")
print("Got request from {addr[0]}:{addr[1]} for {hsh}")
narinfo = await self.store.narinfo(hsh)
if narinfo is None:
return
self.transport.sendto(b"\x01" + data[1:5] + self.local_port.to_bytes(4, "big") + narinfo.url.encode("utf-8"))
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
fut = asyncio.get_running_loop().create_future()
self.idx = (idx := self.idx)+1
self.waiters[idx] = fut
fut.add_done_callback(lambda _: self.waiters.pop(idx, None))
print(f"Requesting {hsh} from direct local network.")
self.transport.sendto(b"".join([b"\x00", idx.to_bytes(4, "big"), hsh.encode("utf-8")]), ("255.255.255.255", self.local_port))
try:
data, addr = await asyncio.wait_for(fut, 0.5)
except asyncio.TimeoutError:
print(f"No response for {hsh}")
return None
port = int.from_bytes(data[0:4], "big")
url = data[5:].decode("utf-8")
print(f"{addr[0]}:{addr[1]} responded for {hsh} with http://{addr[0]}:{port}/{url}")
async with self.session.get(f"http://{addr[0]}:{port}/{url}") as resp:
if resp.status != 200:
return
info = NarInfo.parse(await resp.text())
return info._replace(url = f"{addr[0]}/{port}/{info.url}")
async def nar(self, sp: str) -> t.AsyncIterable[bytes]:
addr1, addr2, p = sp.split("/", 2)
async with self.session.get(f"http://{addr1}:{addr2}/{p}") as resp:
if resp.status != 200:
raise FileNotFoundError("Not found.")
content = resp.content
while not content.at_eof():
yield await content.readany()
@contextlib.asynccontextmanager
async def remote(store: Store, local_port: int, local_addr: str="0.0.0.0"):
protocol: DiscoveryProtocol
async with aiohttp.ClientSession() as session:
_, protocol = await asyncio.get_running_loop().create_datagram_endpoint(
lambda: DiscoveryProtocol(store, session, local_port),
local_addr=(local_addr, local_port),
family=socket.AF_INET,
allow_broadcast=True
)
try:
yield protocol
finally:
protocol.stop()

91
peerix/store.py Normal file
View file

@ -0,0 +1,91 @@
import typing as t
class NarInfo(t.NamedTuple):
storePath: str
url: str
compression: t.Literal["none"]
narHash: str
narSize: int
references: t.Sequence[str]
deriver: t.Optional[str]
signatures: t.Sequence[str]
def dump(self) -> str:
lines = [
f"StorePath: {self.storePath}",
f"URL: {self.url}",
f"Compression: {self.compression}",
f"NarHash: {self.narHash}",
f"NarSize: {self.narSize}"
]
if self.references:
lines.append(f"References: {' '.join(self.references)}")
if self.deriver:
lines.append(f"Deriver: {self.deriver} ")
for sig in self.signatures:
lines.append(f"Sig: {sig}")
return "\n".join(lines)
@classmethod
def parse(cls, data: str) -> "NarInfo":
storePath = ""
url = ""
compression = "none"
narHash = ""
narSize = -1
references = []
deriver = None
signatures = []
for line in data.splitlines():
k, v = line.split(":", 1)
v = v.strip()
k = k.strip()
if k == "StorePath":
storePath = v
elif k == "URL":
url = v
elif k == "Compression" and v == "none":
compression = v
elif k == "NarHash":
narHash = v
elif k == "NarSize":
narSize = int(v)
elif k == "References":
references = v.split(" ")
elif k == "Deriver":
deriver = v
elif k == "Sig":
signatures.append(v)
return NarInfo(storePath, url, compression, narHash, narSize, references, deriver, signatures)
class CacheInfo(t.NamedTuple):
storeDir: str
wantMassQuery: int
priority: int
def dump(self) -> str:
return "\n".join((
f"StoreDir: {self.storeDir}",
f"WantMassQuery: {self.wantMassQuery}",
f"Priority: {self.priority}"
))
class Store:
async def cache_info(self) -> CacheInfo:
raise NotImplementedError()
async def narinfo(self, hsh: str) -> t.Optional[NarInfo]:
raise NotImplementedError()
async def nar(self, url: str) -> t.AsyncIterable[bytes]:
raise NotImplementedError()