Merge pull request #378 from shinji-s/devel

Unify imager code again.
pull/379/head
multibootusb 6 years ago committed by GitHub
commit b12d053a68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -13,13 +13,12 @@ import platform
import shutil
import string
import zipfile
import tempfile
import re
import ctypes
from . import config
from .osdriver import get_physical_disk_number, wmi_get_drive_info, \
log, resource_path
log, resource_path, multibootusb_host_dir
def scripts_dir_path():
return os.path.dirname(os.path.realpath(__file__))
@ -71,20 +70,6 @@ def sys_64bits():
return sys.maxsize > 2**32
def multibootusb_host_dir():
"""
Cross platform way to detect multibootusb directory on host system.
:return: Path to multibootusb directory of host system.
"""
if platform.system() == "Linux":
home_dir = os.path.expanduser('~')
mbusb_dir = os.path.join(home_dir, ".multibootusb")
elif platform.system() == "Windows":
mbusb_dir = os.path.join(tempfile.gettempdir(), "multibootusb")
return mbusb_dir
def iso_cfg_ext_dir():
"""
Function to return the path to ISO configuration file extraction directory.

@ -24,7 +24,6 @@ from . import iso
from . import osdriver
from . import progressbar
from . import osdriver
from . import udisks
from . import usb
@ -32,16 +31,14 @@ if platform.system() == "Windows":
import win32com.client
def dd_iso_image(dd_progress_thread):
if platform.system() == "Windows":
dd_win()
else:
try:
_dd_iso_image(dd_progress_thread)
except:
o = io.StringIO()
traceback.print_exc(None, o)
log(o.getvalue())
dd_progress_thread.set_error(o.getvalue())
try:
_dd_iso_image(dd_progress_thread)
except:
# config.imager_return = False
o = io.StringIO()
traceback.print_exc(None, o)
log(o.getvalue())
dd_progress_thread.set_error(o.getvalue())
def _dd_iso_image(dd_progress_thread):
pbar = progressbar.ProgressBar(
@ -59,73 +56,31 @@ def _dd_iso_image(dd_progress_thread):
config.imager_percentage = percentage
pbar.update(percentage)
unmounted_contexts = [
(usb.UnmountedContext(p[0], config.update_usb_mount), p[0]) for p
in udisks.find_partitions_on(config.usb_disk)]
def status_update(text):
config.status_text = text
mounted_partitions = osdriver.find_mounted_partitions_on(config.usb_disk)
really_unmounted = []
try:
for c, pname in unmounted_contexts:
c.__enter__()
really_unmounted.append((c, pname))
for x in mounted_partitions:
partition_dev, mount_point = x[:2]
c = usb.UnmountedContext(partition_dev, config.update_usb_mount)
c.__enter__()
really_unmounted.append((c, partition_dev))
error = osdriver.dd_iso_image(
config.image_path, config.usb_disk, gui_update)
config.image_path, config.usb_disk, gui_update, status_update)
if error:
dd_progress_thread.set_error(error)
log('Error writing iso image...')
# config.imager_return = False
else:
log('ISO has been written to USB disk...')
# config.imager_return = True
finally:
for c, pname in really_unmounted:
for c, partition_dev in really_unmounted:
c.__exit__(None, None, None)
def dd_win_clean_usb(usb_disk_no):
"""
"""
host_dir = multibootusb_host_dir()
temp_file = os.path.join(host_dir, "preference", "disk_part.txt")
diskpart_text_feed = 'select disk ' + str(usb_disk_no) + '\nclean\nexit'
write_to_file(temp_file, diskpart_text_feed)
config.status_text = 'Cleaning the disk...'
if subprocess.call('diskpart.exe -s ' + temp_file ) == 0:
return True
else:
return False
def dd_win():
windd = quote(resource_path(os.path.join("data", "tools", "dd", "dd.exe")))
usb_disk_no = osdriver.get_physical_disk_number(config.usb_disk)
_input = "if=" + config.image_path.strip()
in_file_size = float(os.path.getsize(config.image_path) / 1024 / 1024)
_output = "of=\\\.\\PhysicalDrive" + str(usb_disk_no)
if dd_win_clean_usb(usb_disk_no) is False:
return
# _output = "od=" + config.usb_disk # 'od=' option should also work.
# command = [windd, _input, _output, "bs=1M", "--progress"]
command = windd + ' ' + _input + ' ' + _output + " bs=1M" + " --progress"
#log("Executing ==> " + " ".join(command))
log("Executing ==> " + command)
dd_process = subprocess.Popen(command, universal_newlines=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE,
shell=False)
time.sleep(0.1)
while dd_process.poll() is None:
for line in iter(dd_process.stderr.readline, ''):
line = line.strip()
if 'error' in line.lower() or 'invalid' in line.lower():
config.imager_return = False
break
if line and line[-1] == 'M':
copied = float(line.strip('M').replace(',', ''))
config.imager_percentage = round((copied / float(in_file_size) * 100))
if config.imager_return is False:
log("Error writing to disk...")
else:
log("ISO has been written to USB disk...")
return
class Imager(QtWidgets.QMainWindow, Ui_MainWindow):
"""
Raw write to USB disk using dd.

@ -7,8 +7,10 @@ import shutil
import signal
import subprocess
import sys
import tempfile
import time
from . import udisks
def log(message, info=True, error=False, debug=False, _print=True):
"""
@ -130,6 +132,26 @@ def wmi_get_drive_info_ex(usb_disk):
return r
def dd_win_clean_usb(usb_disk_no, status_update):
"""
"""
host_dir = multibootusb_host_dir()
temp_file = os.path.join(host_dir, "preference", "disk_part.txt")
diskpart_text_feed = 'select disk ' + str(usb_disk_no) + '\nclean\nexit'
with open(temp_file, 'wb') as fp:
fp.write(bytes(diskpart_text_feed, 'utf-8'))
status_update('Cleaning the disk...')
if subprocess.call('diskpart.exe -s ' + temp_file ) == 0:
for i in range(40):
# Make sure the drive reappears if it has ever gone away.
time.sleep(0.25)
with open('\\\\.\\PhysicalDrive%d' % usb_disk_no, 'rb'):
return
raise RuntimeError('PhysicalDrive%d is now gone!' % usb_disk_no)
raise RuntimeError("Execution dd(.exe) has failed.")
class Base:
def run_dd(self, input, output, bs, count):
@ -142,19 +164,19 @@ class Base:
log("%s succeeded." % str(cmd))
def dd_iso_image(self, input_, output, gui_update):
def dd_iso_image(self, input_, output, gui_update, status_update):
in_file_size = os.path.getsize(input_)
cmd = [self.dd_exe, 'if=' + input_,
'of=' + self.physical_disk(output), 'bs=1M']
self.dd_iso_image_add_args(cmd, input_, output)
log('Executing => ' + str(cmd))
kw_args = {
'stdout' : subprocess.PIPE,
'stderr' : subprocess.PIPE,
'shell' : False,
}
self.add_dd_iso_image_popen_args(kw_args)
self.dd_iso_image_prepare(input, output, status_update)
log('Executing => ' + str(cmd))
dd_process = subprocess.Popen(cmd, **kw_args)
errors = queue.Queue()
while dd_process.poll() is None:
@ -172,6 +194,9 @@ class Windows(Base):
def dd_add_args(self, cmd_vec, input, output, bs, count):
pass
def dd_iso_image_prepare(self, input, output, status_update):
return dd_win_clean_usb(get_physical_disk_number(output), status_update)
def dd_iso_image_add_args(self, cmd_vec, input_, output):
cmd_vec.append('--progress')
@ -211,11 +236,20 @@ class Windows(Base):
def mbusb_log_file(self):
return os.path.join(os.getcwd(), 'multibootusb.log')
def find_mounted_partitions_on(self, usb_disk):
return [] # No-op until UnmountedContext() get implemented for Windows
def multibootusb_host_dir(self):
return os.path.join(tempfile.gettempdir(), "multibootusb")
class Linux(Base):
def __init__(self):
self.dd_exe = 'dd'
def dd_iso_image_prepare(self, input, output, status_update):
pass
def dd_add_args(self, cmd_vec, input, output, bs, count):
cmd_vec.append('conv=notrunc')
@ -256,6 +290,11 @@ class Linux(Base):
def mbusb_log_file(self):
return '/var/log/multibootusb.log'
def find_mounted_partitions_on(self, usb_disk):
return udisks.find_mounted_partitions_on(usb_disk)
def multibootusb_host_dir(self):
return os.path.join(os.path.expanduser('~'), ".multibootusb")
driverClass = {
'Windows' : Windows,
@ -270,6 +309,8 @@ for func_name in [
'physical_disk',
'mbusb_log_file',
'dd_iso_image',
'find_mounted_partitions_on',
'multibootusb_host_dir',
]:
globals()[func_name] = getattr(osdriver, func_name)

@ -27,7 +27,7 @@ def node_mountpoint(node):
return de_mangle_mountpoint(line[1])
return None
def find_partitions_on(disk):
def find_mounted_partitions_on(disk):
assert not disk[-1:].isdigit()
with open('/proc/mounts') as f:
relevant_lines = [l.split(' ') for l in f.readlines()

Loading…
Cancel
Save