You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
76 lines
2.0 KiB
Python
76 lines
2.0 KiB
Python
from subprocess import run
|
|
import os
|
|
import re
|
|
|
|
_hdr = re.compile("^\[\w*\]", flags=re.MULTILINE)
|
|
|
|
|
|
class WireGuardConfig:
|
|
def __init__(self, conf: str = None) -> None:
|
|
self.Interface: dict[str, str] = {}
|
|
self.Peers: list[dict[str, str]] = []
|
|
if conf:
|
|
sp = _hdr.split(conf)
|
|
interface = sp[1]
|
|
peers = sp[2:]
|
|
self.Interface = self._lines2dict(interface.splitlines())
|
|
for peer in peers:
|
|
self.Peers.append(self._lines2dict(peer.splitlines()))
|
|
|
|
def get_from_interface(ifname: str):
|
|
cfg = wg_showconf(ifname)
|
|
return WireGuardConfig(cfg)
|
|
|
|
def get_peer(self, pubkey: str):
|
|
for peer in self.Peers:
|
|
if peer["PublicKey"] == pubkey:
|
|
return peer
|
|
return None
|
|
|
|
def _lines2dict(self, lns: list):
|
|
d = {}
|
|
for l in lns:
|
|
if "=" in l:
|
|
k, v = l.split("=", 1)
|
|
d[k.strip()] = v.strip()
|
|
return d
|
|
|
|
def _flat_dict(self, d: dict):
|
|
return [f"{k} = {v}" for k, v in d.items()] + [""]
|
|
|
|
def _flat_peer(self, peer: dict):
|
|
return ["[Peer]", *self._flat_dict(peer)]
|
|
|
|
def _flat_interface(self):
|
|
return ["[Interface]", *self._flat_dict(self.Interface)]
|
|
|
|
def _get_peer_lines(self):
|
|
return sum([self._flat_peer(p) for p in self.Peers], [])
|
|
|
|
def get_peers_cfg(self):
|
|
return os.linesep.join(self._get_peer_lines())
|
|
|
|
def __str__(self) -> str:
|
|
strs = self._flat_interface() + self._get_peer_lines()
|
|
return os.linesep.join(strs)
|
|
|
|
|
|
def wg_reload_config(ifname: str):
|
|
run(
|
|
f"wg syncconf {ifname} <(wg-quick strip {ifname})",
|
|
shell=True,
|
|
executable="/bin/bash",
|
|
)
|
|
|
|
|
|
def wg_showconf(ifname: str):
|
|
return run(
|
|
f"wg showconf {ifname}",
|
|
shell=True,
|
|
capture_output=True,
|
|
).stdout.decode()
|
|
|
|
|
|
def wg_syncconf(ifname: str, conf: str):
|
|
run(f"wg syncconf {ifname} /dev/stdin", shell=True, input=conf.encode())
|