|
|
|
@ -134,7 +134,7 @@ class Peer:
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
|
class Interface:
|
|
|
|
|
name: str
|
|
|
|
|
base_netns: str
|
|
|
|
|
base_netns: str|None = None
|
|
|
|
|
private_key: Optional[str] = None
|
|
|
|
|
public_key: Optional[str] = None
|
|
|
|
|
address: list[str] = dataclasses.field(default_factory=list)
|
|
|
|
@ -144,7 +144,7 @@ class Interface:
|
|
|
|
|
peers: list[Peer] = dataclasses.field(default_factory=list)
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
def from_dict(cls, data: dict[str, Any], base_netns=None) -> Interface:
|
|
|
|
|
def from_dict(cls, data: dict[str, Any], base_netns: str|None = None) -> Interface:
|
|
|
|
|
peers = data.pop('peers', list())
|
|
|
|
|
peers = [Peer.from_dict({key.replace('-', '_'): value for key, value in peer.items()}) for peer in peers]
|
|
|
|
|
return cls(**data, peers=peers, base_netns=base_netns)
|
|
|
|
@ -293,7 +293,7 @@ class Namespace:
|
|
|
|
|
interfaces = data.pop('interfaces', list())
|
|
|
|
|
base_netns = data.pop('base_netns', None)
|
|
|
|
|
interfaces = [Interface.from_dict({key.replace('-', '_'): value for key, value in interface.items()}, base_netns=base_netns) for interface in interfaces]
|
|
|
|
|
return cls(**data, **scriptlets, interfaces=interfaces)
|
|
|
|
|
return cls(**data, **scriptlets, interfaces=interfaces) # type: ignore
|
|
|
|
|
|
|
|
|
|
def setup(self) -> Namespace:
|
|
|
|
|
if self.managed:
|
|
|
|
@ -349,27 +349,27 @@ class Namespace:
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def wg(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
|
|
|
|
|
def wg(*args, netns: str|None = None, stdin: str|None = None, check=True, capture=False) -> str:
|
|
|
|
|
return ip_netns_exec('wg', *args, netns=netns, stdin=stdin, check=check, capture=capture)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ip_netns_eval(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
|
|
|
|
|
def ip_netns_eval(*args, netns: str|None = None, stdin: str|None = None, check=True, capture=False) -> str:
|
|
|
|
|
return ip_netns_exec(SHELL, '-c', *args, netns=netns, stdin=stdin, check=check, capture=capture)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ip_netns_exec(*args, netns: str = None, stdin: str = None, check=True, capture=False) -> str:
|
|
|
|
|
def ip_netns_exec(*args, netns: str|None = None, stdin: str|None = None, check=True, capture=False) -> str:
|
|
|
|
|
return ip('netns', 'exec', netns, *args, stdin=stdin, check=check, capture=capture)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def ip(*args, stdin: str = None, netns=None, check=True, capture=False) -> str:
|
|
|
|
|
def ip(*args, stdin: str|None = None, netns=None, check=True, capture=False) -> str:
|
|
|
|
|
return run('ip', *([] if netns is None else ['-n', netns]), *args, stdin=stdin, check=check, capture=capture)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def host_eval(*args, stdin: str = None, check=True, capture=False) -> str:
|
|
|
|
|
def host_eval(*args, stdin: str|None = None, check=True, capture=False) -> str:
|
|
|
|
|
return run(SHELL, '-c', *args, stdin=stdin, check=check, capture=capture)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run(*args, stdin: str = None, check=True, capture=False) -> str:
|
|
|
|
|
def run(*args, stdin: str|None = None, check=True, capture=False) -> str:
|
|
|
|
|
args = [str(item) if item is not None else '' for item in args]
|
|
|
|
|
if VERBOSE:
|
|
|
|
|
print('>', ' '.join(args), file=sys.stderr)
|
|
|
|
|