from subprocess import run import os import re _hdr = re.compile("^\[\w*\]", flags=re.MULTILINE) class WireGuardConfig: def __init__(self, conf: str = None) -> None: self.Interface: dict[str, str] = {} self.Peers: list[dict[str, str]] = [] if conf: sp = _hdr.split(conf) interface = sp[1] peers = sp[2:] self.Interface = self._lines2dict(interface.splitlines()) for peer in peers: self.Peers.append(self._lines2dict(peer.splitlines())) def get_from_interface(ifname: str): cfg = wg_showconf(ifname) return WireGuardConfig(cfg) def get_peer(self, pubkey: str): for peer in self.Peers: if peer["PublicKey"] == pubkey: return peer return None def _lines2dict(self, lns: list): d = {} for l in lns: if "=" in l: k, v = l.split("=", 1) d[k.strip()] = v.strip() return d def _flat_dict(self, d: dict): return [f"{k} = {v}" for k, v in d.items()] + [""] def _flat_peer(self, peer: dict): return ["[Peer]", *self._flat_dict(peer)] def _flat_interface(self): return ["[Interface]", *self._flat_dict(self.Interface)] def _get_peer_lines(self): return sum([self._flat_peer(p) for p in self.Peers], []) def get_peers_cfg(self): return os.linesep.join(self._get_peer_lines()) def __str__(self) -> str: strs = self._flat_interface() + self._get_peer_lines() return os.linesep.join(strs) def wg_reload_config(ifname: str): run( f"wg syncconf {ifname} <(wg-quick strip {ifname})", shell=True, executable="/bin/bash", ) def wg_showconf(ifname: str): return run( f"wg showconf {ifname}", shell=True, capture_output=True, ).stdout.decode() def wg_syncconf(ifname: str, conf: str): run(f"wg syncconf {ifname} /dev/stdin", shell=True, input=conf.encode())