rewrite with dataclasses

Adds 'managed' option and yaml support, renames env vars and can load profiles by name.
Closes #4.
pull/8/head v2.0.0
dadevel 2 years ago
parent 42f67672f0
commit 59a67b445c
No known key found for this signature in database
GPG Key ID: 1A8A9735430193D5

@ -10,6 +10,7 @@ Requirements:
- Python 3.7 or newer
- `ip` from iproute2
- `wg` from wireguard-tools
- optional: [pyyaml](https://pypi.org/project/PyYAML/) python package for configuration files in YAML format, otherwise only JSON is supported
Installation:
@ -21,9 +22,9 @@ sudo ./wg-netns/setup.sh
## Usage
First, create a configuration profile.
You can find two examples below.
JSON and YAML file formats are supported.
`./mini.json`:
Minimal JSON example:
~~~ json
{
@ -45,59 +46,76 @@ You can find two examples below.
}
~~~
`./maxi.json`:
~~~ json
{
"name": "ns-example",
"dns-server": ["10.10.10.1", "10.10.10.2"],
"pre-up": "some shell command",
"post-up": "some shell command",
"pred-own": "some shell command",
"post-down": "some shell command",
"interfaces": [
{
"name": "wg-site-a",
"address": ["10.10.11.172/32", "fc00:dead:beef:1::172/128"],
"listen-port": 51821,
"fwmark": 51821,
"private-key": "nFkQQjN+...",
"mtu": 1420,
"peers": [
{
"public-key": "Kx+wpJpj...",
"preshared-key": "5daskLoW...",
"endpoint": "a.example.com:51821",
"persistent-keepalive": 25,
"allowed-ips": ["10.10.11.0/24", "fc00:dead:beef:1::/64"]
}
]
},
{
"name": "wg-site-b",
"address": ["10.10.12.172/32", "fc00:dead:beef:2::172/128"],
"listen-port": 51822,
"fwmark": 51822,
"private-key": "guYPuE3X...",
"mtu": 1420,
"peers": [
{
"public-key": "NvZMoyrg...",
"preshared-key": "cFQuyIX/...",
"endpoint": "b.example.com:51822",
"persistent-keepalive": 25,
"allowed-ips": ["10.10.12.0/24", "fc00:dead:beef:2::/64"]
}
]
}
]
}
Full YAML example:
~~~ yaml
# name of the network namespace
name: ns-example
# if false, the netns itself won't be created or deleted, just the interfaces inside it
managed: true
# list of dns servers, if empty dns servers from default netns will be used
dns-server: [10.10.10.1, 10.10.10.2]
# shell hooks, e.g. to set firewall rules
pre-up: echo pre-up
post-up: echo post-up
pre-own: echo pre-down
post-down: echo post-down
# list of wireguard interfaces inside the netns
interfaces:
# interface name, required
- name: wg-site-a
# list of ip addresses, at least one entry required
address:
- 10.10.11.172/32
- fc00:dead:beef:1::172/128
private-key: nFkQQjN+...
# optional settings
listen-port: 51821
fwmark: 21
mtu: 1420
# list of wireguard peers
peers:
# public key is required
- public-key: Kx+wpJpj...
# optional settings
preshared-key: 5daskLoW...
endpoint: a.example.com:51821
persistent-keepalive: 25
# list of ips the peer is allowed to use, at least one entry required
allowed-ips:
- 10.10.11.0/24
- fc00:dead:beef:1::/64
# by default the networks specified in 'allowed-ips' are routed over the interface, 'routes' can be used to overwrite this behaivor
routes:
- 10.10.11.0/24
- fc00:dead:beef:1::/64
- name: wg-site-b
address:
- 10.10.12.172/32
- fc00:dead:beef:2::172/128
private-key: guYPuE3X...
listen-port: 51822
fwmark: 22
peers:
- public-key: NvZMoyrg...
preshared-key: cFQuyIX/...
endpoint: b.example.com:51822
persistent-keepalive: 25
allowed-ips:
- 10.10.12.0/24
- fc00:dead:beef:2::/64
~~~
Now it's time to setup your new network namespace and all associated wireguard interfaces.
~~~ bash
wg-netns up ./example.json
wg-netns up ./example.yaml
~~~
Profiles stored under `/etc/wireguard/` can be referenced by their name.
~~~ bash
wg-netns up example
~~~
You can verify the success with a combination of `ip` and `wg`.
@ -115,7 +133,7 @@ ip netns exec ns-example bash -i
Or connect a container to it.
~~~ bash
podman run -it --rm --network ns:/var/run/netns/ns-example docker.io/alpine wget -O - https://ipinfo.io
podman run -it --rm --network ns:/run/netns/ns-example alpine wget -O - https://ipinfo.io
~~~
Or do whatever else you want.
@ -123,6 +141,11 @@ Or do whatever else you want.
### System Service
You can find a `wg-quick@.service` equivalent at [wg-netns@.service](./wg-netns@.service).
Place your profile in `/etc/wireguard/`, e.g. `example.json`, then start the service.
~~~ bash
systemctl start wg-netns@example.service
~~~
### Port Forwarding
@ -132,12 +155,12 @@ With `socat` you can forward TCP traffic from outside a network namespace to a p
socat tcp-listen:$OUTSIDE_PORT,reuseaddr,fork "exec:ip netns exec $NETNS_NAME socat stdio 'tcp-connect:$INSIDE_PORT',nofork"
~~~
Example: All connections to port 1234/tcp in the main netns are forwarded into the *ns-example* namespace to port 5678/tcp.
Example: All connections to port 1234/tcp in the main/default netns are forwarded to port 5678/tcp in the *ns-example* namespace.
~~~ bash
# terminal 1, create netns and start http server inside
wg-netns up ns-example
hello > ./hello.txt
echo 'Hello from ns-example!' > ./hello.txt
ip netns exec ns-example python3 -m http.server 5678
# terminal 2, setup port forwarding
socat tcp-listen:1234,reuseaddr,fork "exec:ip netns exec ns-example socat stdio 'tcp-connect:127.0.0.1:5678',nofork"

@ -1,200 +1,288 @@
#!/usr/bin/env python3
from __future__ import annotations
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from pathlib import Path
from typing import Any, Optional
import dataclasses
import json
import os
import subprocess
import sys
NETNS_CONFIG_DIR = '/etc/netns'
DEBUG_LEVEL = 0
SHELL = '/bin/sh'
try:
import yaml
YAML_SUPPORTED = True
except ModuleNotFoundError:
YAML_SUPPORTED = False
WIREGUARD_DIR = Path('/etc/wireguard')
NETNS_DIR = Path('/etc/netns')
VERBOSE = 0
SHELL = Path('/bin/sh')
def main(args):
global NETNS_CONFIG_DIR
global DEBUG_LEVEL
global WIREGUARD_DIR
global NETNS_DIR
global VERBOSE
global SHELL
entrypoint = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
epilog=(
'environment variables:\n'
f' NETNS_CONFIG_DIR network namespace config directory, default: {NETNS_CONFIG_DIR}\n'
f' DEBUG_LEVEL print stack traces, default: {DEBUG_LEVEL}\n'
f' SHELL program for execution of shell hooks, default: {SHELL}\n'
f' WG_PROFILE_DIR wireguard config dir, default: {WIREGUARD_DIR}\n'
f' WG_NETNS_DIR network namespace config dir, default: {NETNS_DIR}\n'
f' WG_VERBOSE print detailed output if 1, default: {VERBOSE}\n'
f' WG_SHELL program for execution of shell hooks, default: {SHELL}\n'
),
)
subparsers = entrypoint.add_subparsers(dest='action', required=True)
subparsers = entrypoint.add_subparsers(dest='action', required=True, metavar='ACTION')
parser = subparsers.add_parser('up', help='setup namespace and associated interfaces')
parser.add_argument('profile', type=lambda x: Path(x).expanduser(), help='path to profile')
parser.add_argument('profile', type=lambda x: Path(x).expanduser(), metavar='PROFILE', help='name or path of profile')
parser = subparsers.add_parser('down', help='teardown namespace and associated interfaces')
parser.add_argument('-f', '--force', action='store_true', help='ignore errors')
parser.add_argument('profile', type=lambda x: Path(x).expanduser(), help='path to profile')
parser.add_argument('profile', type=lambda x: Path(x).expanduser(), metavar='PROFILE', help='name or path of profile')
opts = entrypoint.parse_args(args)
try:
NETNS_CONFIG_DIR = Path(os.environ.get('NETNS_CONFIG_DIR', NETNS_CONFIG_DIR))
DEBUG_LEVEL = int(os.environ.get('DEBUG_LEVEL', DEBUG_LEVEL))
SHELL = Path(os.environ.get('SHELL', SHELL))
WIREGUARD_DIR = Path(os.environ.get('WG_PROFILE_DIR', WIREGUARD_DIR))
NETNS_DIR = Path(os.environ.get('WG_NETNS_DIR', NETNS_DIR))
VERBOSE = int(os.environ.get('WG_VERBOSE', VERBOSE))
SHELL = Path(os.environ.get('WG_SHELL', SHELL))
except Exception as e:
raise RuntimeError(f'failed to load environment variable: {e} (e.__class__.__name__)') from e
namespace = Namespace.from_profile(opts.profile)
if opts.action == 'up':
setup_action(opts.profile)
try:
namespace.setup()
except KeyboardInterrupt:
namespace.teardown(check=False)
except Exception:
namespace.teardown(check=False)
raise
elif opts.action == 'down':
teardown_action(opts.profile, check=not opts.force)
namespace.teardown(check=not opts.force)
else:
raise RuntimeError('congratulations, you reached unreachable code')
def setup_action(path):
namespace = profile_read(path)
try:
namespace_setup(namespace)
except KeyboardInterrupt:
namespace_teardown(namespace, check=False)
except Exception as e:
namespace_teardown(namespace, check=False)
raise
def teardown_action(path, check=True):
namespace = profile_read(path)
namespace_teardown(namespace, check=check)
def profile_read(path):
with open(path) as file:
return json.load(file)
def namespace_setup(namespace):
if namespace.get('pre-up'):
ip_netns_shell(namespace['pre-up'], netns=namespace)
namespace_create(namespace)
namespace_resolvconf_write(namespace)
for interface in namespace['interfaces']:
interface_setup(interface, namespace)
if namespace.get('post-up'):
ip_netns_shell(namespace['post-up'], netns=namespace)
def namespace_create(namespace):
ip('netns', 'add', namespace['name'])
ip('-n', namespace['name'], 'link', 'set', 'dev', 'lo', 'up')
def namespace_resolvconf_write(namespace):
content = '\n'.join(f'nameserver {server}' for server in namespace.get('dns-server', ()))
if content:
NETNS_CONFIG_DIR.joinpath(namespace['name']).mkdir(parents=True, exist_ok=True)
NETNS_CONFIG_DIR.joinpath(namespace['name']).joinpath('resolv.conf').write_text(content)
def namespace_teardown(namespace, check=True):
if namespace.get('pre-down'):
ip_netns_shell(namespace['pre-down'], netns=namespace)
for interface in namespace['interfaces']:
interface_teardown(interface, namespace)
namespace_delete(namespace)
namespace_resolvconf_delete(namespace)
if namespace.get('post-down'):
ip_netns_shell(namespace['post-down'], netns=namespace)
def namespace_delete(namespace, check=True):
ip('netns', 'delete', namespace['name'], check=check)
def namespace_resolvconf_delete(namespace):
path = NETNS_CONFIG_DIR/namespace['name']/'resolv.conf'
if path.exists():
path.unlink()
try:
NETNS_CONFIG_DIR.rmdir()
except OSError:
pass
def interface_setup(interface, namespace):
interface_create(interface, namespace)
interface_configure_wireguard(interface, namespace)
for peer in interface['peers']:
peer_setup(peer, interface, namespace)
interface_assign_addresses(interface, namespace)
interface_bring_up(interface, namespace)
interface_create_routes(interface, namespace)
def interface_create(interface, namespace):
ip('link', 'add', interface['name'], 'type', 'wireguard')
ip('link', 'set', interface['name'], 'netns', namespace['name'])
def interface_configure_wireguard(interface, namespace):
wg('set', interface['name'], 'listen-port', interface.get('listen-port', 0), netns=namespace)
wg('set', interface['name'], 'fwmark', interface.get('fwmark', 0), netns=namespace)
wg('set', interface['name'], 'private-key', '/dev/stdin', stdin=interface['private-key'], netns=namespace)
def interface_assign_addresses(interface, namespace):
for address in interface['address']:
ip('-n', namespace['name'], '-6' if ':' in address else '-4', 'address', 'add', address, 'dev', interface['name'])
def interface_bring_up(interface, namespace):
ip('-n', namespace['name'], 'link', 'set', 'dev', interface['name'], 'mtu', interface.get('mtu', 1420), 'up')
def interface_create_routes(interface, namespace):
for peer in interface['peers']:
networks = peer['routes'] if 'routes' in peer else peer.get('allowed-ips', ())
for network in networks:
ip('-n', namespace['name'], '-6' if ':' in network else '-4', 'route', 'add', network, 'dev', interface['name'])
def interface_teardown(interface, namespace, check=True):
ip('-n', namespace['name'], 'link', 'set', interface['name'], 'down', check=check)
ip('-n', namespace['name'], 'link', 'delete', interface['name'], check=check)
def peer_setup(peer, interface, namespace):
options = [
'peer', peer['public-key'],
'preshared-key', '/dev/stdin' if peer.get('preshared-key') else '/dev/null',
'persistent-keepalive', peer.get('persistent-keepalive', 0),
]
if peer.get('endpoint'):
options.extend(('endpoint', peer.get('endpoint')))
if peer.get('allowed-ips'):
options.extend(('allowed-ips', ','.join(peer['allowed-ips'])))
wg('set', interface['name'], *options, stdin=peer.get('preshared-key'), netns=namespace)
def wg(*args, **kwargs):
return ip_netns_exec('wg', *args, **kwargs)
def ip_netns_shell(*args, **kwargs):
return ip_netns_exec(SHELL, '-c', *args, **kwargs)
def ip_netns_exec(*args, netns=None, **kwargs):
return ip('netns', 'exec', netns['name'], *args, **kwargs)
def ip(*args, **kwargs):
return run('ip', *args, **kwargs)
def run(*args, stdin=None, check=True, capture=False):
@dataclasses.dataclass
class Peer:
name: str
public_key: str
preshared_key: Optional[str] = None
endpoint: Optional[str] = None
persistent_keepalive: int = 0
allowed_ips: list[str] = dataclasses.field(default_factory=list)
routes: Optional[list[str]] = None
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Peer:
data = {key.replace('-', '_'): value for key, value in data.items()}
return cls(**data)
def setup(self, interface: Interface, namespace: Namespace) -> Peer:
options = [
'peer', self.public_key,
'preshared-key', '/dev/stdin' if self.preshared_key else '/dev/null',
'persistent-keepalive', self.persistent_keepalive,
]
if self.endpoint:
options.extend(('endpoint', self.endpoint))
if self.allowed_ips:
options.extend(('allowed-ips', ','.join(self.allowed_ips)))
wg('set', interface.name, *options, stdin=self.preshared_key, netns=namespace.name)
return self
@dataclasses.dataclass
class Interface:
name: str
public_key: str
private_key: str
address: list[str] = dataclasses.field(default_factory=list)
listen_port: int = 0
fwmark: int = 0
mtu: int = 1420
peers: list[Peer] = dataclasses.field(default_factory=list)
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Interface:
peers = data.pop('peers', list())
peers = [Peer.from_dict({key.replace('-', '_'): value for key, value in peer.items()}) for peer in peers]
return cls(**data, peers=peers)
def setup(self, namespace: Namespace) -> Interface:
self._create(namespace)
self._configure_wireguard(namespace)
for peer in self.peers:
peer.setup(self, namespace)
self._assign_addresses(namespace)
self._bring_up(namespace)
self._create_routes(namespace)
return self
def _create(self, namespace: Namespace) -> None:
ip('link', 'add', self.name, 'type', 'wireguard')
ip('link', 'set', self.name, 'netns', namespace.name)
def _configure_wireguard(self, namespace: Namespace) -> None:
wg('set', self.name, 'listen-port', self.listen_port, netns=namespace.name)
wg('set', self.name, 'fwmark', self.fwmark, netns=namespace.name)
wg('set', self.name, 'private-key', '/dev/stdin', stdin=self.private_key, netns=namespace.name)
def _assign_addresses(self, namespace: Namespace) -> None:
for address in self.address:
ip('-n', namespace.name, '-6' if ':' in address else '-4', 'address', 'add', address, 'dev', self.name)
def _bring_up(self, namespace: Namespace) -> None:
ip('-n', namespace.name, 'link', 'set', 'dev', self.name, 'mtu', self.mtu, 'up')
def _create_routes(self, namespace: Namespace):
for peer in self.peers:
networks = peer.routes if peer.routes is not None else peer.allowed_ips
for network in networks:
ip('-n', namespace.name, '-6' if ':' in network else '-4', 'route', 'add', network, 'dev', self.name)
def teardown(self, namespace: Namespace, check=True) -> Interface:
if self.exists(namespace):
ip('-n', namespace.name, 'link', 'set', self.name, 'down', check=check)
ip('-n', namespace.name, 'link', 'delete', self.name, check=check)
return self
def exists(self, namespace: Namespace) -> bool:
try:
ip('-n', namespace.name, 'link', 'show', self.name, capture=True)
return True
except Exception:
return False
@dataclasses.dataclass
class Namespace:
name: str
pre_up: Optional[str] = None
post_up: Optional[str] = None
pre_down: Optional[str] = None
post_down: Optional[str] = None
managed: bool = True
dns_server: list[str] = dataclasses.field(default_factory=list)
interfaces: list[Interface] = dataclasses.field(default_factory=list)
@classmethod
def from_profile(cls, path: Path) -> Namespace:
try:
return cls.from_dict(cls._read_profile(cls._find_profile(path)))
except Exception as e:
raise RuntimeError('failed to load profile') from e
@staticmethod
def _find_profile(profile: Path) -> Path:
if not profile.is_file() and profile.name == profile.as_posix(): # path does not contain '/' and '.'
for extension in ('yaml', 'yml', 'json'):
path = WIREGUARD_DIR/f'{profile.name}.{extension}'
if path.is_file():
return path
return profile
@staticmethod
def _read_profile(profile: Path) -> dict[str, Any]:
with open(profile) as file:
if profile.suffix in ('.yaml', '.yml'):
if not YAML_SUPPORTED:
raise RuntimeError(f'can not load profile in yaml format if pyyaml library is not installed')
return yaml.safe_load(file)
elif profile.suffix == '.json':
return json.load(file)
else:
raise RuntimeError(f'unsupported file format {profile.suffix.removeprefix(".")}')
@classmethod
def from_dict(cls, data: dict[str, Any]) -> Namespace:
data = {key.replace('-', '_'): value for key, value in data.items()}
interfaces = data.pop('interfaces', list())
interfaces = [Interface.from_dict({key.replace('-', '_'): value for key, value in interface.items()}) for interface in interfaces]
return cls(**data, interfaces=interfaces)
def setup(self) -> Namespace:
if self.pre_up:
ip_netns_eval(self.pre_up, netns=self.name)
if self.managed:
self._create()
self._write_resolvconf()
for interface in self.interfaces:
interface.setup(self)
if self.post_up:
ip_netns_eval(self.post_up, netns=self.name)
return self
def teardown(self, check=True) -> Namespace:
if self.pre_down:
ip_netns_eval(self.pre_down, netns=self.name)
for interface in self.interfaces:
interface.teardown(self, check=check)
if self.managed and self.exists():
self._delete(check)
self._delete_resolvconf()
if self.post_down:
ip_netns_eval(self.post_down, netns=self.name)
return self
def exists(self) -> bool:
namespaces = json.loads(ip('-j', 'netns', 'list', capture=True))
return self.name in {namespace['name'] for namespace in namespaces}
def _create(self) -> None:
ip('netns', 'add', self.name)
ip('-n', self.name, 'link', 'set', 'dev', 'lo', 'up')
def _delete(self, check=True) -> None:
ip('netns', 'delete', self.name, check=check)
@property
def _resolvconf_path(self) -> Path:
return NETNS_DIR/self.name/'resolv.conf'
def _write_resolvconf(self) -> None:
if self.dns_server:
self._resolvconf_path.parent.mkdir(parents=True, exist_ok=True)
content = '\n'.join(f'nameserver {server}' for server in self.dns_server)
self._resolvconf_path.write_text(content)
def _delete_resolvconf(self) -> None:
if self._resolvconf_path.exists():
self._resolvconf_path.unlink()
try:
NETNS_DIR.rmdir()
except OSError:
pass
def wg(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
return ip_netns_exec('wg', *args, netns=netns, stdin=stdin, check=check, capture=capture)
def ip_netns_eval(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
return ip_netns_exec(SHELL, '-c', *args, netns=netns, stdin=stdin, check=check, capture=capture)
def ip_netns_exec(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
return ip('netns', 'exec', netns, *args, stdin=stdin, check=check, capture=capture)
def ip(*args, stdin: str = None, check=True, capture=False) -> str:
return run('ip', *args, stdin=stdin, check=check, capture=capture)
def run(*args, stdin: str = None, check=True, capture=False) -> str:
args = [str(item) if item is not None else '' for item in args]
if DEBUG_LEVEL:
if VERBOSE:
print('>', ' '.join(args), file=sys.stderr)
process = subprocess.run(args, input=stdin, text=True, capture_output=capture)
if check and process.returncode != 0:
@ -208,7 +296,7 @@ if __name__ == '__main__':
main(sys.argv[1:])
sys.exit(0)
except Exception as e:
if DEBUG_LEVEL:
raise
print(f'error: {e} ({e.__class__.__name__})', file=sys.stderr)
sys.exit(2)
if VERBOSE:
raise
sys.exit(1)

@ -6,13 +6,13 @@ After=network-online.target nss-lookup.target
[Service]
Type=oneshot
Environment=WG_ENDPOINT_RESOLUTION_RETRIES=infinity
Environment=DEBUG_LEVEL=1
ExecStart=wg-netns up ./%i.json
ExecStop=wg-netns down ./%i.json
Environment=WG_VERBOSE=1
ExecStart=wg-netns up %i
ExecStop=wg-netns down %i
RemainAfterExit=yes
WorkingDirectory=%E/wg-netns
ConfigurationDirectory=wg-netns
WorkingDirectory=%E/wireguard
ConfigurationDirectory=wireguard
[Install]
WantedBy=multi-user.target

Loading…
Cancel
Save