|
|
|
@ -4,6 +4,7 @@ from argparse import ArgumentParser, RawDescriptionHelpFormatter
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Optional
|
|
|
|
|
import dataclasses
|
|
|
|
|
import itertools
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import subprocess
|
|
|
|
@ -58,6 +59,11 @@ def cli(args):
|
|
|
|
|
parser.add_argument('-f', '--force', action='store_true', help='ignore errors')
|
|
|
|
|
parser.add_argument('profile', type=lambda x: Path(x).expanduser(), metavar='PROFILE', help='name or path of profile')
|
|
|
|
|
|
|
|
|
|
parser = subparsers.add_parser('list', help='list network namespaces')
|
|
|
|
|
|
|
|
|
|
parser = subparsers.add_parser('switch', help='open shell in namespace')
|
|
|
|
|
parser.add_argument('netns', metavar='NETNS', help='network namespace name')
|
|
|
|
|
|
|
|
|
|
opts = entrypoint.parse_args(args)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
@ -68,8 +74,9 @@ def cli(args):
|
|
|
|
|
except Exception as e:
|
|
|
|
|
raise RuntimeError(f'failed to load environment variable: {e} (e.__class__.__name__)') from e
|
|
|
|
|
|
|
|
|
|
namespace = Namespace.from_profile(opts.profile)
|
|
|
|
|
if opts.action == 'up':
|
|
|
|
|
_conditional_elevate()
|
|
|
|
|
namespace = Namespace.from_profile(opts.profile)
|
|
|
|
|
try:
|
|
|
|
|
namespace.setup()
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
@ -78,11 +85,24 @@ def cli(args):
|
|
|
|
|
namespace.teardown(check=False)
|
|
|
|
|
raise
|
|
|
|
|
elif opts.action == 'down':
|
|
|
|
|
_conditional_elevate()
|
|
|
|
|
namespace = Namespace.from_profile(opts.profile)
|
|
|
|
|
namespace.teardown(check=not opts.force)
|
|
|
|
|
elif opts.action == 'list':
|
|
|
|
|
output = ip('-json', 'netns', capture=True)
|
|
|
|
|
data = json.loads(output)
|
|
|
|
|
print('\n'.join(item['name'] for item in data))
|
|
|
|
|
elif opts.action == 'switch':
|
|
|
|
|
os.execvp('sudo', ['ip', 'ip', 'netns', 'exec', opts.netns, 'sudo', '-u', os.getlogin(), '-D', Path.cwd().as_posix(), os.environ['SHELL'], '-i'])
|
|
|
|
|
else:
|
|
|
|
|
raise RuntimeError('congratulations, you reached unreachable code')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _conditional_elevate() -> None:
|
|
|
|
|
if os.getuid() != 0 and os.isatty(sys.stdin.fileno()):
|
|
|
|
|
os.execvp('sudo', [sys.argv[0], *sys.argv])
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@dataclasses.dataclass
|
|
|
|
|
class Peer:
|
|
|
|
|
public_key: str
|
|
|
|
|