Introduce infrastructure for rewriting boot configs and refreshed

persistence support for debians.
  * Boot parameters have to be 'persistence' and 'persistence-path'
  rather than 'persistent' and 'persistent-path'.
  * Backing file must be named 'persistence' rather than 'live-rw'
  * Filesystem on the backing file must contain 'persistence.conf'.
Avoid rewriting of paths starting with '/cdrom/' or '/dev/'.
Try somedir/vmlinuz if somedir/vmlinuz.efi is unavailable.
Check availability of 'e2fsck' and 'resize2fs'.
Added data/tools/persistence.gz that has initialized ext3 file
system that contains 'persistence.conf' file.
pull/299/head
Shinji Suzuki 6 years ago
parent 03077403d2
commit 7a18b71349

Binary file not shown.

@ -516,6 +516,12 @@ Are you SURE you want to enable it?",
'Please select the partition (ending '
'with a digit eg. /dev/sdb1)\nfrom the drop down list.')
self.ui_enable_controls()
elif 0 < config.persistence and \
persistence.detect_missing_tools(config.distro):
QtWidgets.QMessageBox.information(
self, 'Missing tools...!',
persistence.detect_missing_tools(config.distro))
self.ui_enable_controls()
else:
# clean_iso_cfg_ext_dir(os.path.join(multibootusb_host_dir(), "iso_cfg_ext_dir")) # Need to be cleaned.
# extract_cfg_file(config.image_path) # Extract files from ISO

@ -0,0 +1,84 @@
from functools import partial
# Operations
def op_add_tokens(tokens, params):
return params + [t for t in tokens if t not in params]
def add_tokens(*tokens):
return partial(op_add_tokens, tokens)
def op_remove_tokens(tokens, params):
return [x for x in params if x not in tokens]
def remove_tokens(*tokens):
return partial(op_remove_tokens, tokens)
def op_replace_token(old_token, new_token, params):
return [new_token if x==old_token else x for x in params]
def replace_token(old_token, new_token):
return partial(op_replace_token, old_token, new_token)
def op_add_or_replace_kv(key, value, params):
assert key.endswith('=')
if any([x.startswith(key) for x in params]):
return op_replace_kv(key, value, params)
return params + [(key + (value(key, None, params)
if callable(value) else value))]
def add_or_replace_kv(key, value):
assert key.endswith('=')
return partial(op_add_or_replace_kv, key, value)
def op_replace_kv(key, value, params):
''' Replace {value} of {key} if the key already exists in {params}.
{value} may be a callable. In that case {value} will be called with
key, definition to be replaced and {params}.
'''
assert key.endswith('=')
return [key +
(value(key, x[len(key):], params) if callable(value) else value)
if x.startswith(key)
else x for x in params]
def replace_kv(key, value):
assert key.endswith('=')
return partial(op_replace_kv, key, value)
def op_remove_keys(keys, params):
return [x for x in params if all([not x.startswith(k) for k in keys])]
def remove_keys(*keys):
assert all([k.endswith('=') for k in keys])
return partial(op_remove_keys, keys)
# Predicates
def always(params):
return True
def contains_token(token):
return lambda params: token in params
def contains_all_tokens(*tokens):
return lambda params: all([t in params for t in tokens])
def contains_any_token(*tokens):
return lambda params: any([t in params for t in tokens])
def contains_key(key):
assert type(key)==str
return lambda params: any(x.startswith(key) for x in params)
def contains_all_keys(*keys):
assert all([k.endswith('=') for k in keys])
return lambda params: all(any(p.startswith(k) for p in params)
for k in keys)
def contains_any_key(*keys):
return lambda params: any(any(p.startswith(k) for p in params)
for k in keys)
def _not(another_predicate):
return lambda params: not another_predicate(params)

@ -6,10 +6,13 @@
# Licence: This file is a part of multibootusb package. You can redistribute it or modify
# under the terms of GNU General Public License, v.2 or above
from functools import partial
import os
import platform
import tarfile
import stat
import subprocess
import tarfile
from . import iso
from . import gen
from . import config
@ -51,38 +54,29 @@ def persistence_distro(distro, iso_link):
else:
return None
def create_persistence():
if config.distro == "ubuntu":
fs_name = 'casper-rw'
elif config.distro == 'debian' or config.distro == "debian-install":
fs_name = 'live-rw'
elif config.distro == 'fedora':
fs_name = 'overlay-' + config.usb_label + '-' + config.usb_uuid
persistence = config.persistence / 1024 / 1024
if platform.system() == 'Linux':
def create_persistence_using_mkfs(persistence_fname, persistence_size):
persistence_path = os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path),
persistence_fname)
volume_name = os.path.basename(persistence_fname)
if platform.system() == 'Windows':
tools_dir = os.path.join('data', 'tools')
mke2fs_relative_path = os.path.join(tools_dir, 'mkfs', 'mke2fs.exe')
mkfs = gen.resource_path(mke2fs_relative_path)
dd_relative_path = os.path.join(tools_dir, 'dd', 'dd.exe')
dd = gen.resource_path(dd_relative_path)
persistence_mkfs_cmd = 'echo y|' + mkfs + ' -b 1024 ' + \
'-L ' + volume_name + ' ' + \
persistence_path
else:
mkfs = 'mkfs.ext3'
dd = 'dd'
persistence_mkfs_cmd = mkfs + ' -F ' + gen.quote(os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path),
fs_name))
elif platform.system() == 'Windows':
mkfs = gen.quote(gen.resource_path(os.path.join("data", "tools", "mkfs", "mke2fs.exe")))
dd = gen.quote(gen.resource_path(os.path.join("data", "tools", "dd", "dd.exe")))
persistence_mkfs_cmd = 'echo y|' + mkfs + ' -b 1024 -L ' + fs_name + ' ' + gen.quote(os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path), fs_name))
if config.distro == 'fedora':
persistence_dd_cmd = dd + ' if=/dev/zero ' \
'of=' + gen.quote(os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path), 'LiveOS', fs_name)) + \
' bs=1M count=' + str(int(persistence))
else:
persistence_dd_cmd = dd + ' if=/dev/zero of=' + gen.quote(os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path), fs_name)) +\
' bs=1M count=' + str(int(persistence))
persistence_mkfs_cmd = mkfs + ' -F ' + persistence_path
mbytes = persistence_size / 1024 / 1024
persistence_dd_cmd = dd + ' if=/dev/zero ' + \
'of=' + persistence_path + \
' bs=1M count=' + str(int(mbytes))
gen.log('Executing ==>' + persistence_dd_cmd)
config.status_text = 'Creating persistence file...'
@ -99,6 +93,88 @@ def create_persistence():
gen.log("\nSuccessfully applied filesystem...\n")
def create_persistence_using_resize2fs(persistence_fname, persistence_size):
outdir = os.path.join(config.usb_mount, 'multibootusb',
iso.iso_basename(config.image_path))
persistence_path = os.path.join(outdir, persistence_fname)
tools_dir = os.path.join('data', 'tools')
if platform.system()=='Windows':
_7zip_exe = gen.resource_path(
os.path.join(tools_dir, '7zip', '7z.exe'))
else:
_7zip_exe = '7z'
config.status_text = 'Copying persistence file...'
persistence_gz = gen.resource_path(
os.path.join(tools_dir, 'persistence.gz'))
_7zip_cmd = [_7zip_exe, 'x', '-aoa', persistence_gz, '-o' + outdir]
if subprocess.call(_7zip_cmd, shell=True)==0:
gen.log("Generated 'persistence' file in '%s'" % outdir)
if persistence_fname != 'persistence':
os.rename(os.path.join(outdir, 'persistence'), persistence_path)
gen.log("Renamed to '%s'." % persistence_path)
else:
gen.log("Failed to generate persistent file '%s'" % persistence_path)
return
current_size = os.stat(persistence_path)[stat.ST_SIZE]
if current_size < persistence_size:
msg = 'Extending the persistence file by %.1f MB...' % \
((persistence_size - current_size) / float(1024*1024))
config.status_text = msg
gen.log(msg)
with open(persistence_path, 'ab') as f:
_1M_block = b'\0' * (1024*1024)
bytes_left = persistence_size - current_size
while 0<bytes_left:
block_size = min(1024*1024, bytes_left)
f.write(_1M_block[:block_size])
bytes_left -= block_size
config.status_text = 'Resizing the persistence file...'
fsck_cmd = ['e2fsck', '-y', '-f', persistence_path]
if subprocess.call(fsck_cmd, shell=True)==0:
gen.log("Checking the persistence file.")
resize_cmd = ['resize2fs', persistence_path]
if subprocess.call(resize_cmd, shell=True)==0:
gen.log("Successfully resized the persistence file.")
creator_dict = {
'ubuntu' : (create_persistence_using_mkfs,
lambda C: ('casper-rw',)),
'debian' : (create_persistence_using_resize2fs,
lambda C: ('persistence',)),
'debian-install' : (
create_persistence_using_resize2fs,
lambda C: ('persistence',)),
'fedora' : (
create_persistence_using_mkfs,
lambda C: (os.path.join(
'LiveOS', 'overlay-%s-%s' % (C.usb_label, C.usb_uuid)),)),
}
def detect_missing_tools(distro):
if distro not in creator_dict or \
creator_dict[distro][0] is not create_persistence_using_resize2fs:
return None
try:
with open(os.devnull) as devnull:
for tool in ['e2fsck', 'resize2fs']:
subprocess.Popen([tool], stdout=devnull, stderr=devnull)
except FileNotFoundError: # Windows
return "'%s.exe' is not installed or not available for use." % tool
except OSError: # Linux
return "'%s' is not installed or not available for use." % tool
return None
def create_persistence():
x = creator_dict.get(config.distro)
assert x, "Persistence is not supported for '%s'." % config.distro
creator_func, args_generator = x
args = args_generator(config) + (config.persistence,)
creator_func(*args)
def extract_file(file_path, install_dir):
"""
Function to extract persistence files to distro install directory.
@ -109,3 +185,12 @@ def extract_file(file_path, install_dir):
tar = tarfile.open(file_path, "r:bz2")
tar.extractall(install_dir)
tar.close()
def test():
config.image_path = 'c:/Users/shinj/Downloads/' \
'Fedora-Workstation-Live-x86_64-27-1.6.iso'
config.persistence = 1024 * 1024 * 20
config.distro = 'fedora'
config.usb_mount = 'h:\\'
create_persistence()

@ -9,6 +9,8 @@
import os
import re
import shutil
from functools import partial
from .usb import *
from .gen import *
from .iso import *
@ -16,42 +18,80 @@ from . import config
from . import grub
from . import menus
from .param_rewrite import add_tokens, remove_tokens, replace_token, \
add_or_replace_kv, replace_kv, remove_keys, \
always, contains_all_tokens, contains_any_token, contains_key, \
contains_all_keys, contains_any_key, _not
def dont_require_tweaking(fname):
return fname.startswith(('cdrom/', 'dev/'))
def fix_abspath_r(pattern, string, install_dir, iso_name):
def fix_abspath_r(pattern, string, install_dir, iso_name, config_fname):
"""Return a list of tuples consisting of 'string' with replaced path and a bool representing if /boot/ was prepended in the expression."""
m = pattern.search(string)
if not m:
return [(string, False)]
start, end = m.span()
prologue, specified_path = m.group(1), m.group(2)
if dont_require_tweaking(specified_path):
return [(string[:start] + prologue + '/' + specified_path,
'/%s is white-listed and kept as is.' % specified_path)] \
+ fix_abspath_r(pattern, string[end:], install_dir, iso_name,
config_fname)
# See if a path that has 'boot/' prepended is a better choice.
# E.g. Debian debian-live-9.4.0-amd64-cinnamon has a loopback.cfg
# which contains "source /grub/grub.cfg".
if os.path.exists(os.path.join(install_dir, 'boot', specified_path)) \
and not os.path.exists(os.path.join(install_dir, specified_path)):
selected_path, fixed = 'boot/' + specified_path, True
specified_path_exists = os.path.exists(
os.path.join(install_dir, specified_path))
if specified_path_exists:
# Confidently accept what is specified.
selected_path, fixed = specified_path, False
elif os.path.exists(os.path.join(install_dir, 'boot', specified_path)):
selected_path, fixed = 'boot/' + specified_path, "Prepended '/boot/'"
# A path specified by 'preseed/file=' or 'file=' is utilized
# after OS boots up. Doing this for grub is moot.
#elif specified_path.startswith('cdrom/') and \
# os.path.exists(os.path.join(install_dir, # len('cdrom/') => 6
# specified_path[6:])):
# # See /boot/grub/loopback.cfg in
# # ubuntu-14.04.5-desktop-amd64.iso for an example of this case.
# selected_path, fixed = specified_path[6:], "Removed '/cdrom/'"
elif specified_path.endswith('.efi') and \
os.path.exists(os.path.join(install_dir, specified_path[:-4])):
# Avira-RS provides boot/grub/loopback.cfg which points
# to non-existent /boot/grub/vmlinuz.efi.
selected_path, fixed = specified_path[:-4], "Removed '.efi'"
else:
# Reluctantly accept what is specified.
log("Keeping path [%s] in '%s' though it does not exist." % (
specified_path, config_fname))
selected_path, fixed = specified_path, False
out = string[:start] + prologue + '/multibootusb/' + iso_name + '/' \
+ selected_path.replace('\\', '/')
return [(out, fixed)] \
+ fix_abspath_r(pattern, string[end:], install_dir, iso_name)
+ fix_abspath_r(pattern, string[end:], install_dir, iso_name,
config_fname)
def fix_abspath(string, install_dir, iso_name):
def fix_abspath(string, install_dir, iso_name, config_fname):
"""Rewrite what appear to be a path within 'string'. If a file does not exist with specified path, one with '/boot' prepended is tried."""
path_expression = re.compile(r'([ \t=])/(.*?)((?=[\s*])|$)')
chunks = fix_abspath_r(
path_expression, string, install_dir, iso_name)
num_boot_prefixing = len([c for c in chunks if c[1] is True])
if num_boot_prefixing == 0:
path_expression, string, install_dir, iso_name, config_fname)
tweaked_chunks = [c for c in chunks if c[1]]
if len(tweaked_chunks) == 0:
# Fallback to the legacy implementation so that
# this tweak brings as little breakage as possible.
replace_text = r'\1/multibootusb/' + iso_name + '/'
return re.sub(r'([ \t =,])/', replace_text, string)
else:
log("Prepended '/boot' to %s." %
(num_boot_prefixing==1 and 'a path' or
('%d paths' % num_boot_prefixing)))
log("Applied %s on '%s' as shown below:" %
(len(tweaked_chunks)==1 and 'a tweak' or
('%d tweaks' % len(tweaked_chunks)), config_fname))
for path, op_desc in tweaked_chunks:
log(" %s" % op_desc)
return ''.join([c[0] for c in chunks])
def update_distro_cfg_files(iso_link, usb_disk, distro, persistence=0):
@ -67,7 +107,19 @@ def update_distro_cfg_files(iso_link, usb_disk, distro, persistence=0):
config.status_text = "Updating config files..."
_iso_name = iso_basename(iso_link)
install_dir = os.path.join(usb_mount, "multibootusb", _iso_name)
install_dir_for_grub = '/multibootusb/%s' % _iso_name
log('Updating distro specific config files...')
tweaker_params = ConfigTweakerParam(
distro, '/multibootusb/%s' % iso_basename(iso_link),
persistence, usb_uuid)
tweaker_class_dict = {
'ubuntu' : UbuntuConfigTweaker,
'debian' : DebianConfigTweaker,
'debian-install' : DebianConfigTweaker,
}
tweaker_class = tweaker_class_dict.get(distro)
for dirpath, dirnames, filenames in os.walk(install_dir):
for f in filenames:
if f.endswith(".cfg") or f.endswith('.CFG') or f.endswith('.lst') or f.endswith('.conf'):
@ -78,28 +130,14 @@ def update_distro_cfg_files(iso_link, usb_disk, distro, persistence=0):
log("Unable to read %s" % cfg_file)
else:
if not distro == "generic":
string = fix_abspath(string, install_dir, _iso_name)
string = fix_abspath(string, install_dir, _iso_name,
os.path.join(dirpath, f))
string = re.sub(r'linuxefi', 'linux', string)
string = re.sub(r'initrdefi', 'initrd', string)
if distro == "ubuntu":
string = re.sub(r'boot=casper',
'boot=casper cdrom-detect/try-usb=true floppy.allowed_drive_mask=0 ignore_uuid '
'ignore_bootid root=UUID=' + usb_uuid + ' live-media-path=/multibootusb/'
+ iso_basename(iso_link) + '/casper', string)
# Point to correct .seed file
string = re.sub(r'/cdrom/preseed', '/preseed', string)
string = re.sub(r'live-media=\S*', 'live-media=/dev/disk/by-uuid/' + usb_uuid, string)
string = re.sub(r'ui gfxboot', '#ui gfxboot', string)
if persistence != 0:
string = re.sub(r'boot=casper', 'boot=casper persistent persistent-path=/multibootusb/' +
iso_basename(iso_link) + "/", string)
if tweaker_class:
tweaker = tweaker_class(tweaker_params)
string = tweaker.tweak(string)
elif distro in ["debian", "debian-install"]:
string = re.sub(r'boot=live', 'boot=live ignore_bootid live-media-path=/multibootusb/' +
iso_basename(iso_link) + '/live', string)
if persistence != 0:
string = re.sub(r'boot=live', 'boot=live persistent persistent-path=/multibootusb/' +
iso_basename(iso_link) + "/", string)
elif distro == 'grml':
string = re.sub(r'live-media-path=', 'ignore_bootid live-media-path=', string)
elif distro == "ubuntu-server":
@ -292,18 +330,59 @@ def update_distro_cfg_files(iso_link, usb_disk, distro, persistence=0):
update_mbusb_cfg_file(iso_link, usb_uuid, usb_mount, distro)
grub.mbusb_update_grub_cfg()
# Ensure that isolinux.cfg file is copied as syslinux.cfg to boot correctly.
# copy isolinux.cfg file to syslinux.cfg for grub to boot.
def copy_to_syslinux_cfg_callback(dir_, fname):
if not fname.lower().endswith('isolinux.cfg'):
return
isolinux_cfg_path = os.path.join(dir_, fname)
syslinux_cfg_fname = fname.replace('isolinux.cfg','syslinux.cfg')
syslinux_cfg_path = os.path.join(dir_, syslinux_cfg_fname)
if os.path.exists(syslinux_cfg_path):
return # don't overwrite.
try:
shutil.copyfile(isolinux_cfg_path, syslinux_cfg_path)
except Exception as e:
log('Copying %s %s to %s failed...' % (
fname, dir_, syslinux_cfg_fname))
log(e)
def fix_desktop_image_in_thema_callback(install_dir_for_grub,
dir_, fname):
if fname.lower() != 'theme.txt':
return
log("Probing '%s'!" % fname)
theme_file = os.path.join(dir_, fname)
with open(theme_file, 'r') as f:
lines = []
pattern = re.compile(r'^desktop-image\s*:\s*(.*)$')
for line in f.readlines():
line = line.strip()
m = pattern.match(line)
if m:
log("Updating '%s'" % line)
partial_path = m.group(1).strip('"').lstrip('/')
line = 'desktop-image: "%s/%s"' % \
(install_dir_for_grub, partial_path)
lines.append(line)
with open(theme_file, 'w') as f:
f.write('\n'.join(lines))
visitor_callbacks = [
# Ensure that isolinux.cfg file is copied as syslinux.cfg
# to boot correctly.
copy_to_syslinux_cfg_callback,
# Rewrite 'desktop-image: ...' line in a theme definition file
# so that a background image is displaymed during boot item selection.
# This tweak was first introduced for kali-linux-light-2018-1.
partial(fix_desktop_image_in_thema_callback, install_dir_for_grub),
]
# Now visit the tree.
for dirpath, dirnames, filenames in os.walk(install_dir):
for f in filenames:
if f.lower().endswith("isolinux.cfg"):
if not os.path.exists(os.path.join(dirpath, "syslinux.cfg")):
try:
shutil.copyfile(os.path.join(dirpath, f), os.path.join(dirpath, "syslinux.cfg"))
except Exception as e:
log('Copying isolinux to syslinux failed...')
log(e)
else:
continue
for callback in visitor_callbacks:
callback(dirpath, f)
# Assertain if the entry is made..
sys_cfg_file = os.path.join(config.usb_mount, "multibootusb", "syslinux.cfg")
@ -484,8 +563,10 @@ def update_menu_lst():
def update_grub4dos_iso_menu():
sys_cfg_file = os.path.join(config.usb_mount, "multibootusb", "syslinux.cfg")
install_dir = os.path.join(config.usb_mount, "multibootusb", iso_basename(config.image_path))
sys_cfg_file = os.path.join(config.usb_mount, "multibootusb",
"syslinux.cfg")
install_dir = os.path.join(config.usb_mount, "multibootusb",
iso_basename(config.image_path))
menu_lst_file = os.path.join(install_dir, 'menu.lst')
with open(menu_lst_file, "w") as f:
f.write("title Boot " + iso_name(config.image_path) + "\n")
@ -503,3 +584,397 @@ def update_grub4dos_iso_menu():
f.write("KERNEL grub.exe" + "\n")
f.write('APPEND --config-file=/multibootusb/' + iso_basename(config.image_path) + '/menu.lst' + "\n")
f.write("#end " + iso_basename(config.image_path) + "\n")
class ConfigTweakerParam:
def __init__(self, distro_name, distro_path, persistence_size, usb_uuid):
self.distro_name = distro_name
self.distro_path = distro_path # synonym for 'install_dir'
self.persistence_size = persistence_size
self.usb_uuid = usb_uuid
class ConfigTweaker:
BOOT_PARAMS_STARTER = 'kernel|append|linux'
def __init__(self, setup_params):
self.setup_params = setup_params
self.persistence_awareness_checking_re = re.compile(
r'^\s*(%s).*?\s%s(\s.*|)$' % \
(self.BOOT_PARAMS_STARTER, self.PERSISTENCY_TOKEN),
flags=re.I|re.MULTILINE)
def config_is_persistence_aware(self, content):
""" Used to restrict update of boot parameters to persistent-aware
menu entries if the distribution provides any.
"""
return self.persistence_awareness_checking_re.search(content) \
is not None
def tweak_first_match(self, content, kernel_param_line_pattern,
apply_persistence_to_all_lines,
param_operations,
param_operations_for_persistence):
"""Perofrm specified parameter modification to the first maching
line and return the concatination of the string leading up to the
match and the tweaked paramer line. If no match is found,
unmodified 'content' is returned.
"""
m = kernel_param_line_pattern.search(content)
if m is None:
return content
start, end = m.span()
upto_match, rest_of_content = content[:start], content[end:]
starter_part, params_part = [m.group(i) for i in [1, 3]]
params = params_part.split(' ')
if apply_persistence_to_all_lines or self.PERSISTENCY_TOKEN in params:
param_operations = param_operations + \
param_operations_for_persistence
for op_or_op_list, precondition in param_operations:
if not precondition(params):
continue
try:
iter(op_or_op_list)
op_list = op_or_op_list
except TypeError:
op_list = [op_or_op_list]
for op in op_list:
params = op(params)
# I see something special about this param. Place it at the end.
three_dashes = '---'
if three_dashes in params:
params.remove(three_dashes)
params.append(three_dashes)
return upto_match + starter_part + ' '.join(params) + \
self.tweak_first_match(
rest_of_content, kernel_param_line_pattern,
apply_persistence_to_all_lines,
param_operations, param_operations_for_persistence)
def tweak(self, content):
apply_persistence_to_all_lines = \
0 < self.setup_params.persistence_size and \
not self.config_is_persistence_aware(content)
matching_re = r'^(\s*(%s)\s*)(.*%s([ \t].*$|$))' % (
self.BOOT_PARAMS_STARTER, self.LIVE_BOOT_DETECT_PARAM)
kernel_parameter_line_pattern = re.compile(
matching_re,
flags = re.I | re.MULTILINE)
out = self.tweak_first_match(
content,
kernel_parameter_line_pattern,
apply_persistence_to_all_lines,
self.param_operations(),
self.param_operations_for_persistence())
return self.post_process(out)
class ParamTweakerWithDebianStylePersistenceParam(ConfigTweaker):
def param_operations_for_persistence(self):
return [
(add_tokens(self.PERSISTENCY_TOKEN), always),
(add_or_replace_kv('%s-path=' % self.PERSISTENCY_TOKEN,
self.setup_params.distro_path),
always),
]
class UbuntuConfigTweaker(ParamTweakerWithDebianStylePersistenceParam):
LIVE_BOOT_DETECT_PARAM = 'boot=casper'
PERSISTENCY_TOKEN = 'persistent'
def param_operations(self):
return [
(add_tokens('ignore_bootid'), always),
(add_or_replace_kv('live-media-path=',
'%s/casper' % self.setup_params.distro_path),
always),
(add_or_replace_kv('cdrom-detect/try-usb=', 'true'), always),
# Recently, correct param seems to be 'floppy=0,allowed_driver_mask
(add_or_replace_kv('floppy.allowed_drive_mask=', '0'), always),
(add_tokens('ignore_uuid'), always),
(add_or_replace_kv('root=UUID=', self.setup_params.usb_uuid),
always),
(replace_kv('live-media=',
'/dev/disk/by-uuid/%s' % self.setup_params.usb_uuid),
always),
]
def post_process(self, entire_string):
return entire_string.replace(r'ui gfxboot', '#ui gfxboot')
class DebianConfigTweaker(ParamTweakerWithDebianStylePersistenceParam):
LIVE_BOOT_DETECT_PARAM = 'boot=live'
PERSISTENCY_TOKEN = 'persistence'
def param_operations(self):
return [
(add_tokens('ignore_bootid'), always),
(add_or_replace_kv('live-media-path=',
'%s/live' % self.setup_params.distro_path),
always),
]
def post_process(self, entire_string):
return entire_string
def test_rewrite_machinary():
def transform(op_or_oplist, predicate, input_line):
params = input_line.split(' ')
if not predicate(params):
return input_line
# See if op_or_oplist is iterable
try:
iter(op_or_oplist)
except TypeError:
# Otherwise let's assume we have a singleton op here
op_or_oplist = [op_or_oplist]
for op in op_or_oplist:
params = op(params)
return ' '.join(params)
boot_line = "kernel /boot/vmlinuz bar=baz foo bar key2=value2"
print ('Test token addition')
assert transform(add_tokens('foo','more'), always, boot_line)==\
"kernel /boot/vmlinuz bar=baz foo bar key2=value2 more"
print ('Test token replacement')
assert transform(replace_token('foo', 'hum'), always, boot_line)==\
"kernel /boot/vmlinuz bar=baz hum bar key2=value2"
print ('Test token removal')
assert transform(remove_tokens('foo', 'bar'), always, boot_line)==\
"kernel /boot/vmlinuz bar=baz key2=value2"
print ('Test kv add_or_replace (results in append)')
assert transform(add_or_replace_kv('live-path=', '/lib/live'), always,
boot_line)==\
"kernel /boot/vmlinuz bar=baz foo bar key2=value2 " \
"live-path=/lib/live"
print ('Test kv add_or_replace (results in replace)')
assert transform(add_or_replace_kv('bar=', '/lib/live'),
always, boot_line)==\
"kernel /boot/vmlinuz bar=/lib/live foo bar key2=value2"
print ('Test kv replace (results in no change)')
assert transform(replace_kv('live-path=', '/lib/live'), always,
boot_line)==\
"kernel /boot/vmlinuz bar=baz foo bar key2=value2"
print ('Test kv replace (results in replace)')
assert transform(replace_kv('bar=', '/lib/live'), always, boot_line)==\
"kernel /boot/vmlinuz bar=/lib/live foo bar key2=value2"
print ('Test kv replace with computed value (bang! at head).')
assert transform(replace_kv('bar=', lambda k, old_v, params: '!' + old_v),
always, boot_line)==\
"kernel /boot/vmlinuz bar=!baz foo bar key2=value2"
print ('Test key removal')
assert transform(remove_keys('bar=','key2='), always, boot_line)==\
"kernel /boot/vmlinuz foo bar"
print ('Test strip everything (multi-op)')
assert transform([remove_tokens('foo', 'bar', 'kernel', '/boot/vmlinuz'),
remove_keys('bar=', 'key2=')],
always, boot_line)==''
print ('Test condition always')
assert transform(replace_token('bar', 'tail'), always,
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail "\
"key2=value2"
print ('Test condition contains_token (positive)')
assert transform(replace_token('bar', 'tail'), contains_token('kernel'),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail"\
" key2=value2"
print ('Test condition contains_token (negative)')
assert transform(replace_token('bar', 'tail'), contains_token('initrd'),
boot_line)==boot_line
print ('Test condition contains_all_tokens (positive)')
assert transform(replace_token('bar', 'tail'),
contains_all_tokens('kernel', 'foo'),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail" \
" key2=value2"
print ('Test condition contains_all_tokens (negative)')
assert transform(replace_token('bar', 'tail'),
contains_all_tokens('kernel', 'nowhere'),
boot_line)==boot_line
print ('Test condition contains_any_token (positive)')
assert transform(replace_token('bar', 'tail'),
contains_any_token('kernel', 'nowhere'),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail" \
" key2=value2"
print ('Test condition contains_any_token (negative)')
assert transform(replace_token('bar', 'tail'),
contains_any_token('not_anywhere', 'nowhere'),
boot_line)==boot_line
print ('Test condition contains_any_key (positive)')
assert transform(replace_token('bar', 'tail'),
contains_any_key('key2', 'nowhere'),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail" \
" key2=value2"
print ('Test condition contains_any_key (negative)')
assert transform(replace_token('bar', 'tail'),
contains_any_key('anywhere', 'else'),
boot_line)==boot_line
print ('Test not() predicate on contains_any_token (positive)')
assert transform(replace_token('bar', 'tail'),
_not(contains_any_token('nowhere', 'else')),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail" \
" key2=value2"
print ('Test not() predicate on contains_any_token (negative)')
assert transform(replace_token('bar', 'tail'),
_not(contains_any_token('nowhere', 'kernel')),
boot_line)==boot_line
print ('Test not() predicate on contains_all_keys (positive)')
assert transform(replace_token('bar', 'tail'),
_not(contains_all_keys('bar=', 'nonexistent_key=')),
boot_line)=="kernel /boot/vmlinuz bar=baz foo tail" \
" key2=value2"
print ('Test not() predicate on contains_all_keys (negative)')
assert transform(replace_token('bar', 'tail'),
_not(contains_all_keys('bar=', 'key2=')),
boot_line)=="kernel /boot/vmlinuz bar=baz foo bar" \
" key2=value2"
def test_tweak_objects():
setup_params_no_persistence = ConfigTweakerParam(
'debian', '/multibootusb/debian', 0, '{usb-uuid}')
debian_tweaker = DebianConfigTweaker(setup_params_no_persistence)
ubuntu_tweaker = UbuntuConfigTweaker(setup_params_no_persistence)
# Test awareness on 'persistent'
content = """
append boot=live foo baz=1 double-spaced ignore_bootid persistent more stuff""".lstrip()
print ("Testing awareness on 'persistent' of ubuntu tweaker.")
assert ubuntu_tweaker.config_is_persistence_aware(content)
print ("Testing awareness on 'persistent' of debian tweaker.")
assert not debian_tweaker.config_is_persistence_aware(content)
content = """
append boot=live foo baz=1 double-spaced ignore_bootid persistence more stuff""".lstrip()
print ("Testing awareness on 'persistence' of ubuntu tweaker.")
assert not ubuntu_tweaker.config_is_persistence_aware(content)
print ("Testing awareness on 'persistence' of debian tweaker.")
assert debian_tweaker.config_is_persistence_aware(content)
print ("Testing if 'persistence' token is left at the original place.")
content = "\tkernel\tfoo persistence boot=live in the middle"
assert debian_tweaker.tweak(content) == "\tkernel\tfoo persistence boot=live in the middle ignore_bootid live-media-path=/multibootusb/debian/live persistence-path=/multibootusb/debian"""
print ("Testing if 'boot=live' at the very end is recognized.")
content = "menu\n\tkernel\tfoo persistence in the middle boot=live"
assert debian_tweaker.tweak(content) == "menu\n\tkernel\tfoo persistence in the middle boot=live ignore_bootid live-media-path=/multibootusb/debian/live persistence-path=/multibootusb/debian"""
print ("Testing if 'boot=live' at a line end is recognized.")
content = """\tkernel\tfoo persistence in the middle boot=live
kernel foo"""
assert debian_tweaker.tweak(content) == """\tkernel\tfoo persistence in the middle boot=live ignore_bootid live-media-path=/multibootusb/debian/live persistence-path=/multibootusb/debian
kernel foo"""
print ("Testing if \\tappend is recognized as a starter.")
content = """\tappend foo boot=live ignore_bootid persistence in the middle live-media-path=/foo/bar"""
assert debian_tweaker.tweak(content) == """\tappend foo boot=live ignore_bootid persistence in the middle live-media-path=/multibootusb/debian/live persistence-path=/multibootusb/debian"""
print ("Testing if debian tweaker does not get tickled by 'persistent'.")
content = """\tappend boot=live foo ignore_bootid persistent in the middle live-media-path=/foo/bar"""
assert debian_tweaker.tweak(content) == """\tappend boot=live foo ignore_bootid persistent in the middle live-media-path=/multibootusb/debian/live"""
print ("Testing replacement of 'live-media-path' value.")
content = " append boot=live foo live-media-path=/foo/bar more"
assert debian_tweaker.tweak(content) == """ append boot=live foo live-media-path=/multibootusb/debian/live more ignore_bootid"""
print ("Testing rewriting of 'file=' param by debian_tweaker.")
content = " kernel file=/cdrom/preseed/ubuntu.seed boot=live"
setup_params_persistent = ConfigTweakerParam(
'debian', '/multibootusb/debian', 128*1024*1024, '{usb-uuid}')
debian_persistence_tweaker = DebianConfigTweaker(
setup_params_persistent)
ubuntu_persistence_tweaker = UbuntuConfigTweaker(
setup_params_persistent)
print ("Testing if debian tweaker appends persistence parameters.")
content = """label foo
kernel foo bar
append boot=live foo live-media-path=/foo/bar more
"""
assert debian_persistence_tweaker.tweak(content) == """label foo
kernel foo bar
append boot=live foo live-media-path=/multibootusb/debian/live more ignore_bootid persistence persistence-path=/multibootusb/debian
"""
print ("Testing if ubuntu tweaker selectively appends persistence params.")
content = """label foo
kernel foo bar
append boot=casper foo live-media-path=/foo/bar more
"""
assert ubuntu_persistence_tweaker.tweak(content) == """label foo
kernel foo bar
append boot=casper foo live-media-path=/multibootusb/debian/casper more ignore_bootid cdrom-detect/try-usb=true floppy.allowed_drive_mask=0 ignore_uuid root=UUID={usb-uuid} persistent persistent-path=/multibootusb/debian
"""
# Test rewrite of persistence-aware configuration.
# Only 'live-persistence' line should receive 'persistence-path'
# parameter.
print ("Testing if debian tweaker appends persistence params "
"to relevant lines only.")
content = """label live-forensic
menu label Live (^forensic mode)
linux /live/vmlinuz
initrd /live/initrd.img
append boot=live noconfig=sudo username=root hostname=kali noswap noautomount
label live-persistence
menu label ^Live USB Persistence (check kali.org/prst)
linux /live/vmlinuz
initrd /live/initrd.img
append boot=live noconfig=sudo username=root hostname=kali persistence
"""
assert debian_persistence_tweaker.tweak(content)=="""label live-forensic
menu label Live (^forensic mode)
linux /live/vmlinuz
initrd /live/initrd.img
append boot=live noconfig=sudo username=root hostname=kali noswap noautomount ignore_bootid live-media-path=/multibootusb/debian/live
label live-persistence
menu label ^Live USB Persistence (check kali.org/prst)
linux /live/vmlinuz
initrd /live/initrd.img
append boot=live noconfig=sudo username=root hostname=kali persistence ignore_bootid live-media-path=/multibootusb/debian/live persistence-path=/multibootusb/debian
"""
def test_abspath_rewrite():
content = """menuentry "Install Ubuntu" {
linux /casper/vmlinuz.efi file=/cdrom/preseed/ubuntu.seed boot=casper only-ubiquity iso-scan/filename=${iso_path} quiet splash ---
initrd /casper/initrd.lz
}"""
assert fix_abspath(
content, 'g:/multibootusb/ubuntu-14.04.5-desktop-amd64',
'ubuntu-14.04.5-desktop-amd64', 'test_abspath_rewrite')==\
"""menuentry "Install Ubuntu" {
linux /multibootusb/ubuntu-14.04.5-desktop-amd64/casper/vmlinuz.efi file=/cdrom/preseed/ubuntu.seed boot=casper only-ubiquity iso-scan/filename=${iso_path} quiet splash ---
initrd /multibootusb/ubuntu-14.04.5-desktop-amd64/casper/initrd.lz
}"""

Loading…
Cancel
Save