Merge remote-tracking branch 'upstream/master' into fixes_397
commit
22089c63d3
Binary file not shown.
Binary file not shown.
@ -1 +1 @@
|
||||
9.2.0
|
||||
9.3.0
|
||||
|
@ -0,0 +1,463 @@
|
||||
import collections
|
||||
import logging
|
||||
import logging.handlers
|
||||
import os
|
||||
import platform
|
||||
import queue
|
||||
import shutil
|
||||
import signal
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
if platform.system() == 'Windows':
|
||||
import wmi
|
||||
from scripts import win32
|
||||
else:
|
||||
try:
|
||||
from . import udisks
|
||||
except ImportError:
|
||||
import udisks
|
||||
|
||||
def log(message, info=True, error=False, debug=False, _print=True):
|
||||
"""
|
||||
Dirty function to log messages to file and also print on screen.
|
||||
:param message:
|
||||
:param info:
|
||||
:param error:
|
||||
:param debug:
|
||||
:return:
|
||||
"""
|
||||
if _print is True:
|
||||
print(message)
|
||||
|
||||
# remove ANSI color codes from logs
|
||||
# message_clean = re.compile(r'\x1b[^m]*m').sub('', message)
|
||||
|
||||
if info is True:
|
||||
logging.info(message)
|
||||
elif error is not False:
|
||||
logging.error(message)
|
||||
elif debug is not False:
|
||||
logging.debug(message)
|
||||
|
||||
def resource_path(relativePath):
|
||||
"""
|
||||
Function to detect the correct path of file when working with sourcecode/install or binary.
|
||||
:param relativePath: Path to file/data.
|
||||
:return: Modified path to file/data.
|
||||
"""
|
||||
# This is not strictly needed because Windows recognize '/'
|
||||
# as a path separator but we follow the discipline here.
|
||||
relativePath = relativePath.replace('/', os.sep)
|
||||
for dir_ in [
|
||||
os.path.abspath('.'),
|
||||
os.path.abspath('..'),
|
||||
getattr(sys, '_MEIPASS', None),
|
||||
os.path.dirname(os.path.dirname( # go up two levels
|
||||
os.path.realpath(__file__))),
|
||||
'/usr/share/multibootusb'.replace('/', os.sep),
|
||||
]:
|
||||
if dir_ is None:
|
||||
continue
|
||||
fullpath = os.path.join(dir_, relativePath)
|
||||
if os.path.exists(fullpath):
|
||||
return fullpath
|
||||
log("Could not find resource '%s'." % relativePath)
|
||||
|
||||
|
||||
def get_physical_disk_number(usb_disk):
|
||||
"""
|
||||
Get the physical disk number as detected ny Windows.
|
||||
:param usb_disk: USB disk (Like F:)
|
||||
:return: Disk number.
|
||||
"""
|
||||
partition, logical_disk = wmi_get_drive_info(usb_disk)
|
||||
log("Physical Device Number is %d" % partition.DiskIndex)
|
||||
return partition.DiskIndex
|
||||
|
||||
|
||||
def wmi_get_drive_info(usb_disk):
|
||||
assert platform.system() == 'Windows'
|
||||
for partition in wmi.WMI().Win32_DiskPartition():
|
||||
logical_disks = partition.associators("Win32_LogicalDiskToPartition")
|
||||
# Here, 'disk' is a windows logical drive rather than a physical drive
|
||||
for disk in logical_disks:
|
||||
if disk.Caption == usb_disk:
|
||||
return (partition, disk)
|
||||
raise RuntimeError('Failed to obtain drive information ' + usb_disk)
|
||||
|
||||
|
||||
def collect_relevant_info(obj, tuple_name, attributes, named_tuple):
|
||||
if len(named_tuple)==0:
|
||||
names = [x[0] for x in attributes]
|
||||
named_tuple.append(collections.namedtuple(tuple_name, names))
|
||||
L = []
|
||||
for (attr, convfunc) in attributes:
|
||||
v = getattr(obj, attr)
|
||||
L.append(None if v is None else convfunc(v))
|
||||
return named_tuple[0](*L)
|
||||
|
||||
|
||||
def collect_relevant_physicaldrive_info(d, physicaldrive_info_tuple=[]):
|
||||
attributes = [
|
||||
('BytesPerSector', int),
|
||||
('DeviceID', str),
|
||||
('Index', int),
|
||||
('Manufacturer', str),
|
||||
('MediaType', str),
|
||||
('Model', str),
|
||||
('Partitions', int),
|
||||
('SerialNumber', str),
|
||||
('Size', int),
|
||||
('TotalSectors', int),
|
||||
]
|
||||
return collect_relevant_info(d, 'PhysicalDrive', attributes,
|
||||
physicaldrive_info_tuple)
|
||||
|
||||
|
||||
def collect_relevant_volume_info(v, volume_info_tuple=[]):
|
||||
attributes = [
|
||||
('DeviceID', str),
|
||||
('DriveType', int),
|
||||
('FreeSpace', int),
|
||||
('FileSystem', str),
|
||||
('Size', int),
|
||||
]
|
||||
return collect_relevant_info(v, 'Volume', attributes, volume_info_tuple)
|
||||
|
||||
|
||||
def wmi_get_physicaldrive_info(usb_disk):
|
||||
"Return information about the drive that contains 'usb_disk'."
|
||||
partition, disk = wmi_get_drive_info(usb_disk)
|
||||
import wmi
|
||||
c = wmi.WMI()
|
||||
drv_list = [d for d in c.Win32_DiskDrive()
|
||||
if d.Index == partition.DiskIndex]
|
||||
assert len(drv_list)==1
|
||||
return collect_relevant_physicaldrive_info(drv_list[0])
|
||||
|
||||
|
||||
def wmi_get_physicaldrive_info_all():
|
||||
c = wmi.WMI()
|
||||
L = [collect_relevant_physicaldrive_info(d) for d in c.Win32_DiskDrive()]
|
||||
L.sort(key=lambda x: x.Index)
|
||||
return L
|
||||
|
||||
|
||||
def wmi_get_volume_info_on(diskIndex):
|
||||
L = [volumes for (dindex, volumes) in wmi_get_volume_info_all().items()
|
||||
if dindex==diskIndex]
|
||||
return [] if len(L)==0 else L[0]
|
||||
|
||||
|
||||
def wmi_get_volume_info_all():
|
||||
r = {}
|
||||
for dindex, volumes in [
|
||||
(p.DiskIndex, map(lambda d: collect_relevant_volume_info(d),
|
||||
p.associators("Win32_LogicalDiskToPartition")))
|
||||
for p in wmi.WMI().Win32_DiskPartition()]:
|
||||
r.setdefault(dindex, []).extend(volumes)
|
||||
for dindex, volumes in r.items():
|
||||
volumes.sort(key=lambda x: x.DeviceID)
|
||||
return r
|
||||
|
||||
|
||||
def wmi_get_volume_info_ex(usb_disk):
|
||||
assert platform.system() == 'Windows'
|
||||
partition, disk = wmi_get_drive_info(usb_disk)
|
||||
#print (disk.Caption, partition.StartingOffset, partition.DiskIndex,
|
||||
# disk.FileSystem, disk.VolumeName)
|
||||
|
||||
# Extract Volume serial number off of the boot sector because
|
||||
# retrieval via COM object 'Scripting.FileSystemObject' or wmi interface
|
||||
# truncates NTFS serial number to 32 bits.
|
||||
with open('//./Physicaldrive%d'%partition.DiskIndex, 'rb') as f:
|
||||
f.seek(int(partition.StartingOffset))
|
||||
bs_ = f.read(512)
|
||||
serial_extractor = {
|
||||
'NTFS' : lambda bs: \
|
||||
''.join('%02X' % c for c in reversed(bs[0x48:0x48+8])),
|
||||
'FAT32' : lambda bs: \
|
||||
'%02X%02X-%02X%02X' % tuple(
|
||||
map(int,reversed(bs[67:71])))
|
||||
}.get(disk.FileSystem, lambda bs: None)
|
||||
uuid = serial_extractor(bs_)
|
||||
mount_point = usb_disk + '\\'
|
||||
size_total, size_used, size_free \
|
||||
= shutil.disk_usage(mount_point)[:3]
|
||||
r = {
|
||||
'uuid' : uuid,
|
||||
'file_system' : disk.FileSystem,
|
||||
'label' : disk.VolumeName.strip() or 'No_label',
|
||||
'mount_point' : mount_point,
|
||||
'size_total' : size_total,
|
||||
'size_used' : size_used,
|
||||
'size_free' : size_free,
|
||||
'vendor' : 'Not_Found',
|
||||
'model' : 'Not_Found',
|
||||
'devtype' : 'partition',
|
||||
'mediatype' : {
|
||||
0 : 'Unknown',
|
||||
1 : 'Fixed Disk',
|
||||
2 : 'Removable Disk',
|
||||
3 : 'Local Disk',
|
||||
4 : 'Network Drive',
|
||||
5 : 'Compact Disc',
|
||||
6 : 'RAM Disk',
|
||||
}.get(disk.DriveType, 'DiskType(%d)' % disk.DriveType),
|
||||
'disk_index' : partition.DiskIndex,
|
||||
}
|
||||
# print (r)
|
||||
return r
|
||||
|
||||
def wmi_get_physicaldrive_info_ex(diskIndex):
|
||||
drv_list = [d for d in wmi.WMI().Win32_DiskDrive()
|
||||
if d.Index == diskIndex]
|
||||
assert len(drv_list)==1
|
||||
d = collect_relevant_physicaldrive_info(drv_list[0])
|
||||
r = {}
|
||||
for src, dst in [
|
||||
('Size', 'size_total'),
|
||||
('Model', 'model'),
|
||||
('Manufacturer', 'vendor'),
|
||||
('MediaType', 'mediatype'),
|
||||
('SerialNumber', 'uuid'),
|
||||
('DeviceID', 'label'),
|
||||
]:
|
||||
r[dst] = getattr(d, src)
|
||||
r['devtype'] = 'disk'
|
||||
r['size_free'] = 0
|
||||
r['file_system'] = 'N/A'
|
||||
r['mount_point'] = 'N/A'
|
||||
return r
|
||||
|
||||
|
||||
def win_physicaldrive_to_listbox_entry(pdrive):
|
||||
return '%d:%s' % (pdrive.Index,pdrive.Model)
|
||||
|
||||
|
||||
def win_volume_to_listbox_entry(v):
|
||||
return v.DeviceID
|
||||
|
||||
class Base:
|
||||
|
||||
def run_dd(self, input, output, bs, count):
|
||||
cmd = [self.dd_exe, 'if='+input, 'of='+output,
|
||||
'bs=%d' % bs, 'count=%d'%count]
|
||||
self.dd_add_args(cmd, input, output, bs, count)
|
||||
if subprocess.call(cmd) != 0:
|
||||
log('Failed to execute [%s]' % str(cmd))
|
||||
else:
|
||||
log("%s succeeded." % str(cmd))
|
||||
|
||||
|
||||
def dd_iso_image(self, input_, output, gui_update, status_update):
|
||||
''' Implementation for OS that use dd to write the iso image.
|
||||
'''
|
||||
in_file_size = os.path.getsize(input_)
|
||||
cmd = [self.dd_exe, 'if=' + input_,
|
||||
'of=' + self.physical_disk(output), 'bs=1M']
|
||||
self.dd_iso_image_add_args(cmd, input_, output)
|
||||
kw_args = {
|
||||
'stdout' : subprocess.PIPE,
|
||||
'stderr' : subprocess.PIPE,
|
||||
'shell' : False,
|
||||
}
|
||||
self.add_dd_iso_image_popen_args(kw_args)
|
||||
self.dd_iso_image_prepare(input, output, status_update)
|
||||
log('Executing => ' + str(cmd))
|
||||
dd_process = subprocess.Popen(cmd, **kw_args)
|
||||
output_q = queue.Queue()
|
||||
while dd_process.poll() is None:
|
||||
self.dd_iso_image_readoutput(dd_process, gui_update, in_file_size,
|
||||
output_q)
|
||||
output_lines = [output_q.get() for i in range(output_q.qsize())]
|
||||
for l in output_lines:
|
||||
log('dd: ' + l)
|
||||
return self.dd_iso_image_interpret_result(
|
||||
dd_process.returncode, output_lines)
|
||||
|
||||
class Windows(Base):
|
||||
|
||||
def __init__(self):
|
||||
self.dd_exe = resource_path('data/tools/dd/dd.exe')
|
||||
|
||||
def dd_add_args(self, cmd_vec, input, output, bs, count):
|
||||
pass
|
||||
|
||||
def dd_iso_image(self, input_, output, gui_update, status_update):
|
||||
assert type(output) is int
|
||||
status_update('Zapping PhyiscalDisk%d' % output)
|
||||
win32.ZapPhysicalDrive(output, wmi_get_volume_info_on, log)
|
||||
# Ouch. Needs sometime for the zapping to take effect...
|
||||
# Better way than sleeping constant time?
|
||||
time.sleep(3)
|
||||
status_update('Writing to PhysicalDisk%d' % output)
|
||||
in_file_size = os.path.getsize(input_)
|
||||
with win32.openHandle('\\\\.\\PhysicalDrive%d' % output,
|
||||
True, False, log) as hDrive:
|
||||
hDrive.LockPhysicalDrive()
|
||||
hDrive.CopyFrom(input_, lambda bytes_copied:
|
||||
gui_update(float(bytes_copied)/in_file_size*100.))
|
||||
|
||||
def physical_disk(self, usb_disk):
|
||||
if type(usb_disk) is str:
|
||||
usb_disk = get_physical_disk_number(usb_disk)
|
||||
return r'\\.\physicaldrive%d' % usb_disk
|
||||
|
||||
def mbusb_log_file(self):
|
||||
return os.path.join(os.getcwd(), 'multibootusb.log')
|
||||
|
||||
def find_mounted_partitions_on(self, usb_disk):
|
||||
return [] # No-op until UnmountedContext() get implemented for Windows
|
||||
|
||||
def multibootusb_host_dir(self):
|
||||
return os.path.join(tempfile.gettempdir(), "multibootusb")
|
||||
|
||||
def gpt_device(self, dev_name):
|
||||
if type(dev_name) is int:
|
||||
diskIndex = dev_name
|
||||
for p in wmi.WMI().Win32_DiskPartition():
|
||||
if p.DiskIndex == diskIndex:
|
||||
return p.Type.startswith('GPT:')
|
||||
log(usb_disk_desc(dev_name) + ' seems not partitioned. ' +
|
||||
'assuming msdos.')
|
||||
return False
|
||||
else:
|
||||
partition, disk = wmi_get_drive_info(dev_name)
|
||||
return partition.Type.startswith('GPT:')
|
||||
|
||||
def usb_disk_desc(self, dev_name):
|
||||
if type(dev_name) is int:
|
||||
return 'PhysicalDrive%d' % dev_name
|
||||
return dev_name
|
||||
|
||||
def listbox_entry_to_device(self, lb_entry):
|
||||
left = lb_entry.split(':', 1)[0]
|
||||
if left.isdigit():
|
||||
return int(left) # see win_physicaldrive_to_listbox_entry()
|
||||
else:
|
||||
return lb_entry # see win_volume_to_listbox_entry()
|
||||
|
||||
def qemu_more_params(self, disk):
|
||||
return ['-L', '.', '-boot', 'c', '-hda', self.physical_disk(disk)]
|
||||
|
||||
class Linux(Base):
|
||||
|
||||
def __init__(self):
|
||||
self.dd_exe = 'dd'
|
||||
|
||||
def dd_iso_image_prepare(self, input, output, status_update):
|
||||
pass
|
||||
|
||||
def dd_add_args(self, cmd_vec, input, output, bs, count):
|
||||
cmd_vec.append('conv=notrunc')
|
||||
|
||||
def dd_iso_image_add_args(self, cmd_vec, input_, output):
|
||||
cmd_vec.append('oflag=sync')
|
||||
|
||||
def add_dd_iso_image_popen_args(self, dd_iso_image_popen_args):
|
||||
pass
|
||||
|
||||
def dd_iso_image_readoutput(self, dd_process, gui_update, in_file_size,
|
||||
output_q):
|
||||
# If this time delay is not given, the Popen does not execute
|
||||
# the actual command
|
||||
time.sleep(0.1)
|
||||
dd_process.send_signal(signal.SIGUSR1)
|
||||
dd_process.stderr.flush()
|
||||
while True:
|
||||
time.sleep(0.1)
|
||||
out_error = dd_process.stderr.readline().decode()
|
||||
if out_error:
|
||||
if 'bytes' in out_error:
|
||||
bytes_copied = float(out_error.split(' ', 1)[0])
|
||||
gui_update( bytes_copied / in_file_size * 100. )
|
||||
break
|
||||
if 15 < output_q.qsize():
|
||||
output_q.get()
|
||||
output_q.put(out_error.rstrip())
|
||||
else:
|
||||
# stderr is closed
|
||||
break
|
||||
|
||||
def dd_iso_image_interpret_result(self, returncode, output_list):
|
||||
return None if returncode==0 else '\n'.join(output_list)
|
||||
|
||||
def physical_disk(self, usb_disk):
|
||||
return usb_disk.rstrip('0123456789')
|
||||
|
||||
def mbusb_log_file(self):
|
||||
return '/var/log/multibootusb.log'
|
||||
|
||||
def find_mounted_partitions_on(self, usb_disk):
|
||||
return udisks.find_mounted_partitions_on(usb_disk)
|
||||
|
||||
def multibootusb_host_dir(self):
|
||||
return os.path.join(os.path.expanduser('~'), ".multibootusb")
|
||||
|
||||
def gpt_device(self, dev_name):
|
||||
disk_dev = self.physical_disk(dev_name)
|
||||
cmd = ['parted', disk_dev, '-s', 'print']
|
||||
with open(os.devnull) as devnull:
|
||||
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, stdin=devnull)
|
||||
_cmd_out, _err_out = p.communicate()
|
||||
p.wait()
|
||||
if p.returncode != 0:
|
||||
lang = os.getenv('LANG')
|
||||
encoding = lang.rsplit('.')[-1] if lang else 'utf-8'
|
||||
raise RuntimeError(str(_err_out, encoding))
|
||||
subprocess.check_call(['partprobe', disk_dev])
|
||||
if b'msdos' in _cmd_out:
|
||||
return False
|
||||
if b'gpt' in _cmd_out:
|
||||
return True
|
||||
raise RuntimeError("Disk '%s' is uninitialized and not usable." %
|
||||
disk_dev)
|
||||
|
||||
def usb_disk_desc(self, dev_name):
|
||||
return dev_name
|
||||
|
||||
def listbox_entry_to_device(self, lb_entry):
|
||||
return lb_entry
|
||||
|
||||
def qemu_more_params(self, disk):
|
||||
return ['-hda', self.physical_disk(disk), '-vga', 'std']
|
||||
|
||||
driverClass = {
|
||||
'Windows' : Windows,
|
||||
'Linux' : Linux,
|
||||
}.get(platform.system(), None)
|
||||
if driverClass is None:
|
||||
raise Exception('Platform [%s] is not supported.' % platform.system())
|
||||
osdriver = driverClass()
|
||||
|
||||
for func_name in [
|
||||
'run_dd',
|
||||
'physical_disk',
|
||||
'mbusb_log_file',
|
||||
'dd_iso_image',
|
||||
'find_mounted_partitions_on',
|
||||
'multibootusb_host_dir',
|
||||
'gpt_device',
|
||||
'listbox_entry_to_device',
|
||||
'usb_disk_desc',
|
||||
'qemu_more_params',
|
||||
]:
|
||||
globals()[func_name] = getattr(osdriver, func_name)
|
||||
|
||||
def initialize():
|
||||
logging.root.setLevel(logging.DEBUG)
|
||||
fmt = '%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(message)s'
|
||||
datefmt = '%H:%M:%S'
|
||||
the_handler = logging.handlers.RotatingFileHandler(
|
||||
osdriver.mbusb_log_file(), 'a', 1024*1024, 5)
|
||||
the_handler.setFormatter(logging.Formatter(fmt, datefmt))
|
||||
logging.root.addHandler(the_handler)
|
||||
|
||||
if platform.system() == 'Windows':
|
||||
import pythoncom
|
||||
pythoncom.CoInitialize()
|
@ -0,0 +1,258 @@
|
||||
import collections
|
||||
import ctypes
|
||||
import io
|
||||
import pywintypes
|
||||
import struct
|
||||
import sys
|
||||
import time
|
||||
import win32api
|
||||
import win32con
|
||||
import win32file
|
||||
import winerror
|
||||
import winioctlcon
|
||||
import wmi
|
||||
|
||||
from ctypes import wintypes
|
||||
from functools import reduce
|
||||
|
||||
kernel32 = ctypes.WinDLL('kernel32') # , use_last_error=True)
|
||||
|
||||
kernel32.FindFirstVolumeW.restype = wintypes.HANDLE
|
||||
kernel32.FindNextVolumeW.argtypes = (wintypes.HANDLE,
|
||||
wintypes.LPWSTR,
|
||||
wintypes.DWORD)
|
||||
kernel32.FindVolumeClose.argtypes = (wintypes.HANDLE,)
|
||||
|
||||
def FindFirstVolume():
|
||||
volume_name = ctypes.create_unicode_buffer(" " * 255)
|
||||
h = kernel32.FindFirstVolumeW(volume_name, 255)
|
||||
if h == win32file.INVALID_HANDLE_VALUE:
|
||||
raise RuntimeError("FindFirstVolume() returned an invalid handle.")
|
||||
return h, volume_name.value
|
||||
|
||||
def FindNextVolume(hSearch):
|
||||
volume_name = ctypes.create_unicode_buffer(" " * 255)
|
||||
if kernel32.FindNextVolumeW(hSearch, volume_name, 255) != 0:
|
||||
return volume_name.value
|
||||
else:
|
||||
errno = ctypes.GetLastError()
|
||||
if errno == winerror.ERROR_NO_MORE_FILES:
|
||||
FindVolumeClose(hSearch)
|
||||
return None
|
||||
raise RuntimeError("FindNextVolume failed (%s)" % errno)
|
||||
|
||||
def FindVolumeClose(hSearch):
|
||||
"""Close a search handle opened by FindFirstVolume, typically
|
||||
after the last volume has been returned.
|
||||
"""
|
||||
if kernel32.FindVolumeClose(hSearch) == 0:
|
||||
raise RuntimeError("FindVolumeClose() failed.")
|
||||
|
||||
def findAvailableDrives():
|
||||
return [(d, win32file.GetDriveType(d)) for d in
|
||||
win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')]
|
||||
|
||||
def findNewDriveLetter(used_letters):
|
||||
all_letters = set([chr(i) for i in range(ord('C'), ord('Z')+1)])
|
||||
return min(list(all_letters - set([s[0] for s in used_letters])))
|
||||
|
||||
def _openHandle(path, bWriteAccess, bWriteShare,
|
||||
logfunc = lambda s: None):
|
||||
TIMEOUT, NUM_RETRIES = 10, 20
|
||||
for retry_count in range(6):
|
||||
try:
|
||||
access_flag = win32con.GENERIC_READ | \
|
||||
(bWriteAccess and win32con.GENERIC_WRITE or 0)
|
||||
share_flag = win32con.FILE_SHARE_READ | \
|
||||
(bWriteShare and win32con.FILE_SHARE_WRITE or 0)
|
||||
handle = win32file.CreateFile(
|
||||
path, access_flag, share_flag, None,
|
||||
win32con.OPEN_EXISTING, win32con.FILE_ATTRIBUTE_NORMAL, None)
|
||||
nth = { 0: 'first', 1:'second', 2:'third'}
|
||||
logfunc("Opening [%s]: success at the %s iteration" %
|
||||
(path, nth.get(retry_count, '%sth' % (retry_count+1))))
|
||||
return handle
|
||||
except pywintypes.error as e:
|
||||
logfunc('Exception=>'+str(e))
|
||||
if NUM_RETRIES/3 < retry_count:
|
||||
bWriteShare = True
|
||||
time.sleep(TIMEOUT / float(NUM_RETRIES))
|
||||
else:
|
||||
raise RuntimeError("Couldn't open handle for %s." % path)
|
||||
|
||||
def _closeHandle(h):
|
||||
x = win32file.CloseHandle(h)
|
||||
assert x != win32file.INVALID_HANDLE_VALUE
|
||||
return x
|
||||
|
||||
class openHandle:
|
||||
def __init__(self, path, bWriteAccess, bWriteShare,
|
||||
logfunc = lambda s: None):
|
||||
self.path = path
|
||||
self.bWriteAccess = bWriteAccess
|
||||
self.bWriteShare = bWriteShare
|
||||
self.logfunc = logfunc
|
||||
self.h = None
|
||||
|
||||
def __enter__(self):
|
||||
self.h = _openHandle(self.path, self.bWriteAccess, self.bWriteShare,
|
||||
self.logfunc)
|
||||
return self
|
||||
|
||||
def __exit__(self, type_, value, traceback_):
|
||||
_closeHandle(self.h)
|
||||
|
||||
def assert_physical_drive(self):
|
||||
if self.path.lower().find('physicaldrive')<0:
|
||||
raise RuntimeError("Handle is not one of a physical drive.")
|
||||
|
||||
def LockPhysicalDrive(self):
|
||||
self.assert_physical_drive()
|
||||
lockPhysicalDrive(self.h, self.logfunc)
|
||||
self.logfunc("Successfully locked '%s'" % self.path)
|
||||
|
||||
|
||||
def ReadFile(self, size):
|
||||
return win32file.ReadFile(self.h, size, None)
|
||||
|
||||
def WriteFile(self, b):
|
||||
return win32file.WriteFile(self.h, b, None)
|
||||
|
||||
geometory_tuple = collections.namedtuple(
|
||||
'DiskGeometory',
|
||||
['number_of_cylinders', 'media_type', 'tracks_per_cylinder',
|
||||
'sectors_per_track', 'bytes_per_sector', 'disk_size'])
|
||||
def DiskGeometory(self):
|
||||
self.assert_physical_drive()
|
||||
o = win32file.DeviceIoControl(
|
||||
self.h, winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY_EX,
|
||||
None, 256, None)
|
||||
return self.geometory_tuple(*struct.unpack('<qiiiiq', o[:32]))
|
||||
|
||||
MAX_SECTORS_TO_CLEAR=128
|
||||
def ZapMBRGPT(self, disk_size, sector_size, add1MB):
|
||||
self.assert_physical_drive()
|
||||
# Implementation borrowed from rufus: https://github.com/pbatard/rufus
|
||||
num_sectors_to_clear \
|
||||
= (add1MB and 2048 or 0) + self.MAX_SECTORS_TO_CLEAR
|
||||
zeroBuf = b'\0' * sector_size
|
||||
for i in range(num_sectors_to_clear):
|
||||
self.WriteFile(zeroBuf)
|
||||
offset = disk_size - self.MAX_SECTORS_TO_CLEAR * sector_size
|
||||
win32file.SetFilePointer(self.h, offset, win32con.FILE_BEGIN)
|
||||
for i in range(num_sectors_to_clear):
|
||||
self.WriteFile(zeroBuf)
|
||||
# We need to append paddings as CREATE_DISK structure contains a union.
|
||||
param = struct.pack('<IIIHH8s',
|
||||
winioctlcon.PARTITION_STYLE_MBR, 0xdeadbeef,
|
||||
0,0,0,b'abcdefgh')
|
||||
win32file.DeviceIoControl(
|
||||
self.h, winioctlcon.IOCTL_DISK_CREATE_DISK, param, 0, None)
|
||||
|
||||
def CopyFrom(self, src_file, progress_cb):
|
||||
with openHandle(src_file, True, False,
|
||||
lambda s:sys.stdout.write(s+'\n')) as src:
|
||||
total_bytes = 0
|
||||
hr, b = src.ReadFile(1024*1024)
|
||||
# win32file.ReadFile() seems to have a bug in the interpretation
|
||||
# of 'hr'. https://sourceforge.net/p/pywin32/bugs/689/
|
||||
# The following loop condition is a workaround, which may not
|
||||
# work properly.
|
||||
while hr == 0 and len(b):
|
||||
win32file.WriteFile(self.h, b, None)
|
||||
total_bytes += len(b)
|
||||
progress_cb(total_bytes)
|
||||
hr, b = src.ReadFile(1024*1024)
|
||||
|
||||
|
||||
def lockPhysicalDrive(handle, logfunc=lambda s: None):
|
||||
try:
|
||||
win32file.DeviceIoControl(
|
||||
handle, winioctlcon.FSCTL_ALLOW_EXTENDED_DASD_IO,
|
||||
None, 0, None)
|
||||
except pywintypes.error as e:
|
||||
logfunc('IO boundary checks diabled.')
|
||||
for retry in range(20):
|
||||
try:
|
||||
win32file.DeviceIoControl(handle, winioctlcon.FSCTL_LOCK_VOLUME,
|
||||
None, 0, None)
|
||||
return
|
||||
except pywintypes.error as e:
|
||||
logfunc( str(e) )
|
||||
time.sleep(1)
|
||||
raise RuntimeError("Couldn't lock the Volume.")
|
||||
|
||||
|
||||
def findVolumeGuids():
|
||||
DiskExtent = collections.namedtuple(
|
||||
'DiskExtent', ['DiskNumber', 'StartingOffset', 'ExtentLength'])
|
||||
Volume = collections.namedtuple(
|
||||
'Volume', ['Guid', 'MediaType', 'DosDevice', 'Extents'])
|
||||
found = []
|
||||
h, guid = FindFirstVolume()
|
||||
while h and guid:
|
||||
#print (guid)
|
||||
#print (guid, win32file.GetDriveType(guid),
|
||||
# win32file.QueryDosDevice(guid[4:-1]))
|
||||
hVolume = win32file.CreateFile(
|
||||
guid[:-1], win32con.GENERIC_READ,
|
||||
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
|
||||
None, win32con.OPEN_EXISTING, win32con.FILE_ATTRIBUTE_NORMAL, None)
|
||||
extents = []
|
||||
driveType = win32file.GetDriveType(guid)
|
||||
if driveType in [win32con.DRIVE_REMOVABLE, win32con.DRIVE_FIXED]:
|
||||
x = win32file.DeviceIoControl(
|
||||
hVolume, winioctlcon.IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS,
|
||||
None, 512, None)
|
||||
instream = io.BytesIO(x)
|
||||
numRecords = struct.unpack('<q', instream.read(8))[0]
|
||||
fmt = '<qqq'
|
||||
sz = struct.calcsize(fmt)
|
||||
while 1:
|
||||
b = instream.read(sz)
|
||||
if len(b) < sz:
|
||||
break
|
||||
rec = struct.unpack(fmt, b)
|
||||
extents.append( DiskExtent(*rec) )
|
||||
vinfo = Volume(guid, driveType, win32file.QueryDosDevice(guid[4:-1]),
|
||||
extents)
|
||||
found.append(vinfo)
|
||||
guid = FindNextVolume(h)
|
||||
return found
|
||||
|
||||
def ZapPhysicalDrive(target_drive, get_volume_info_func, log_func):
|
||||
with openHandle('\\\\.\\PhysicalDrive%d' % target_drive, True, False,
|
||||
lambda s:sys.stdout.write(s+'\n')) as hDrive:
|
||||
hDrive.LockPhysicalDrive()
|
||||
geom = hDrive.DiskGeometory()
|
||||
for v in get_volume_info_func(target_drive):
|
||||
volume_path = '\\\\.\\'+v.DeviceID
|
||||
log_func('Dismounting volume ' + volume_path)
|
||||
with openHandle(volume_path, False, False) as h:
|
||||
x = win32file.DeviceIoControl(
|
||||
h.h, winioctlcon.FSCTL_DISMOUNT_VOLUME, None, None)
|
||||
print ('FSCTL_DISMOUNT_VOLUME=>%s' % x)
|
||||
x = win32file.DeleteVolumeMountPoint(volume_path+'\\')
|
||||
log_func('DeleteVolumeMountPoint=>%s' % x)
|
||||
else:
|
||||
log_func('No volumes on %s' % target_drive)
|
||||
|
||||
add1MB = False
|
||||
hDrive.ZapMBRGPT(geom.disk_size, geom.bytes_per_sector, add1MB)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
# used_letters = [d for d in
|
||||
# win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')]
|
||||
# print (used_letters)
|
||||
# print (findNewDriveLetter(used_letters))
|
||||
# TargetDrive = 2
|
||||
# vinfo_list = [x for x in findVolumeGuids()
|
||||
# if x.Extents and x.Extents[0].DiskNumber==TargetDrive]
|
||||
|
||||
TargetDrive = 5
|
||||
with openHandle('\\\\.\\PhysicalDrive%d' % TargetDrive, True, False,
|
||||
lambda s:sys.stdout.write(s+'\n')) as hDrive:
|
||||
hDrive.CopyFrom('c:/Users/shinj/Downloads/salitaz-rolling_iso',
|
||||
lambda b: None)
|
@ -0,0 +1,146 @@
|
||||
import sys
|
||||
import unittest
|
||||
from unittest.mock import MagicMock as MM, patch, mock_open
|
||||
|
||||
sys.path = ['..'] + sys.path
|
||||
from scripts import distro
|
||||
from scripts import gen
|
||||
|
||||
class DistoDetection(unittest.TestCase):
|
||||
|
||||
def distro(self, isobin_exists, filelist_in_iso, input_text):
|
||||
mock_isobin_exists = MM(return_value=isobin_exists)
|
||||
mock_iso_list = MM(return_value=filelist_in_iso)
|
||||
mock_os_walk = MM(return_value=[('/', [], ['grub.cfg'])])
|
||||
@patch('scripts.iso.isolinux_bin_exist', mock_isobin_exists)
|
||||
@patch('os.walk', mock_os_walk)
|
||||
@patch('scripts._7zip.list_iso', mock_iso_list)
|
||||
@patch('builtins.open', mock_open(read_data=input_text))
|
||||
def test_when_isolinux_bin_is_available():
|
||||
return (distro.distro('{iso-cfg-dir}', 'ProDOS2.iso'))
|
||||
return test_when_isolinux_bin_is_available()
|
||||
|
||||
|
||||
def test_filebased_detection(self):
|
||||
test_inputs = [
|
||||
('f4ubcd', '', ['f4ubcd']),
|
||||
('memdisk_iso', '', []),
|
||||
('memdisk_iso', 'debian-installer', ['casper']),
|
||||
('debian-install', 'debian-installer', []),
|
||||
('alpine', '', ['alpine-release']),
|
||||
('memdisk_iso', '', ['']),
|
||||
]
|
||||
for expected_distro, input_texts, file_list in test_inputs:
|
||||
for input_text in input_texts.split('|'):
|
||||
distro = self.distro(True, file_list, input_text)
|
||||
assert distro==expected_distro, (
|
||||
"From \"%s&%s\", '%s' is expected but got '%s'" %
|
||||
(input_text, file_list, expected_distro, distro))
|
||||
|
||||
|
||||
def test_detection_with_isobin(self):
|
||||
test_inputs = [
|
||||
('parted-magic', 'pmagic|partedmagic', True),
|
||||
('memdisk_iso', 'pmagic|partedmagic', False),
|
||||
('debian', 'boot=live', True),
|
||||
('memdisk_iso', 'boot=live', False),
|
||||
('sms', 'sms.jpg|vector |autoexec', True),
|
||||
('memdisk_iso', 'sms.jpg|vector |autoexec', False),
|
||||
]
|
||||
for expected_distro, input_texts, isobin_exists in test_inputs:
|
||||
for input_text in input_texts.split('|'):
|
||||
distro = self.distro(isobin_exists, [], input_text)
|
||||
assert distro==expected_distro, (
|
||||
"From \"%s&isobin=%s\", '%s' is expected but got '%s'" %
|
||||
(input_text, isobin_exists, expected_distro, distro))
|
||||
|
||||
|
||||
def test_detection_isobin_agnostic(self):
|
||||
test_inputs = [
|
||||
('ubcd', 'ubcd'),
|
||||
('sgrubd2', 'Super Grub Disk'),
|
||||
('hbcd', 'hbcd'),
|
||||
('systemrescuecd', 'systemrescuecd'),
|
||||
('mageialive', 'mgalive'),
|
||||
('arch', 'archisolabel|misolabel|parabolaisolabel'),
|
||||
('chakra', 'chakraisolabel'),
|
||||
('kaos', 'kdeosisolabel'),
|
||||
('memdisk_iso', 'grml'),
|
||||
('grml', 'grml live-media-path=/dev/sda1'),
|
||||
('solydx', 'solydx'),
|
||||
('knoppix', 'knoppix'),
|
||||
('fedora', 'root=live:CDLABEL=|redcore'),
|
||||
('redhat', 'redhat'),
|
||||
('slitaz', 'slitaz|dban |ophcrack|tinycore'
|
||||
'|rescue.cpi|xpud|untangle|4mlinux|partition wizard'
|
||||
'|android-x86.png|riplinux|lebel dummy'
|
||||
'|http://pogostick.net/~pnh/ntpasswd/|AVG Rescue CD'
|
||||
'|AntivirusLiveCD|lkrn|Nanolinux|OSForensics'
|
||||
'|minimal Slackware|Slackware-HOWTO'),
|
||||
('opensuse-install', 'class opensuse'),
|
||||
('ubuntu', 'boot=casper'),
|
||||
('wifislax', 'wifislax'),
|
||||
('slax', 'slax'),
|
||||
('antix', 'antix'),
|
||||
('porteus', 'porteus'),
|
||||
('pclinuxos', 'livecd=livecd|PCLinuxOS'),
|
||||
('gentoo', 'looptype=squashfs|http://dee.su/liberte'),
|
||||
('finnix', 'finnix'),
|
||||
('wifiway', 'wifiway'),
|
||||
('puppy', 'puppy|quirky|fatdog|slacko|xenialpup'),
|
||||
('ipcop', 'ipcop'),
|
||||
('ipfire', 'ipfire'),
|
||||
('zenwalk', 'zenwalk|slack|salix'),
|
||||
('salix-live', 'zenwalk live|live slack|live salix'),
|
||||
('zenwalk', 'zenwalk|slack|salix'),
|
||||
('puppy', 'zenwalk slacko|slacko slack'),
|
||||
('ubuntu-server', 'ubuntu server'),
|
||||
('centos', 'root=live:CDLABEL=CentOS'),
|
||||
('centos-install', 'Install CentOS'),
|
||||
('centos', 'CentOS'),
|
||||
('trinity-rescue', 'Trinity Rescue Kit'),
|
||||
('kaspersky', 'http://support.kaspersky.com'),
|
||||
('alt-linux', 'ALT Linux'),
|
||||
('Windows', 'Sergei Strelec'),
|
||||
('ReactOS', 'ReactOS'),
|
||||
('fsecure', 'fsecure'),
|
||||
('pc-unlocker', 'default rwp'),
|
||||
('pc-tool', '/system/stage1'),
|
||||
('grub2only', 'vba32rescue'),
|
||||
('rising-av', 'BOOT_IMAGE=rising'),
|
||||
('Avira-RS', 'Avira Rescue System'),
|
||||
('insert', 'BOOT_IMAGE=insert'),
|
||||
]
|
||||
|
||||
for expected_distro, input_texts in test_inputs:
|
||||
for input_text in input_texts.lower().split('|'):
|
||||
distro = self.distro(False, [], input_text)
|
||||
assert distro==expected_distro, (
|
||||
"From \"%s\", '%s' is expected but got '%s'" %
|
||||
(input_text, expected_distro, distro))
|
||||
|
||||
|
||||
def test_distro_detection(self):
|
||||
def os_path_exists(f):
|
||||
if f.endswith('multibootusb.log'):
|
||||
return False
|
||||
return True
|
||||
os_path_exists_mock = MM()
|
||||
log_mock = MM()
|
||||
@patch('os.path.exists', os_path_exists)
|
||||
@patch('scripts.distro.log', log_mock)
|
||||
def _():
|
||||
fn = distro.detect_iso_from_file_list
|
||||
assert fn(['BOOT.wim', 'Sources']) == 'Windows'
|
||||
assert fn(['BOOT.wim', 'Sause']) is None
|
||||
assert fn(['config.isoclient', 'foo']) == 'opensuse'
|
||||
assert fn(['bar', 'dban', 'foo']) == 'slitaz'
|
||||
assert fn(['memtest.img']) == 'memtest'
|
||||
assert fn(['mt86.png','isolinux']) == 'raw_iso'
|
||||
assert fn(['menu.lst']) == 'grub4dos'
|
||||
assert fn(['bootwiz.cfg', 'bootmenu_logo.png']) == \
|
||||
'grub4dos_iso'
|
||||
_()
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
Loading…
Reference in New Issue