diff --git a/multibootusb b/multibootusb index d50c01d..0d8f2f1 100644 --- a/multibootusb +++ b/multibootusb @@ -222,7 +222,7 @@ def main(): if __name__ == '__main__': - osdriver.init_logging() + osdriver.initialize() try: main() finally: diff --git a/scripts/imager.py b/scripts/imager.py index 7ae4b56..a136a77 100644 --- a/scripts/imager.py +++ b/scripts/imager.py @@ -32,6 +32,7 @@ if platform.system() == "Windows": def dd_iso_image(dd_progress_thread): try: + dd_progress_thread.set_error(None) _dd_iso_image(dd_progress_thread) except: # config.imager_return = False @@ -142,47 +143,34 @@ class Imager(QtWidgets.QMainWindow, Ui_MainWindow): return disk @staticmethod - def imager_usb_detail(usb_disk, partition=1): + def imager_usb_detail(physical_disk): """ Function to detect details of USB disk using lsblk - :param usb_disk: path to usb disk - :param partition: by default partition is set (but yet to code for it) + :param physical_disk: /dev/sd? (linux) or integer disk number (win) :return: details of size, type and model as tuples """ - _ntuple_diskusage = collections.namedtuple('usage', 'total_size usb_type model') + _ntuple_diskusage = collections.namedtuple( + 'usage', 'total_size usb_type model') if platform.system() == "Linux": - output = subprocess.check_output("lsblk -ib " + usb_disk, shell=True) + output = subprocess.check_output("lsblk -ib " + physical_disk, + shell=True) for line in output.splitlines(): line = line.split() - if partition != 1: - if line[2].strip() == b'1' and line[5].strip() == b'disk': - total_size = line[3] - if not total_size: - total_size = "Unknown" - usb_type = "Removable" - model = subprocess.check_output("lsblk -in -f -o MODEL " + usb_disk, shell=True).decode().strip() - if not model: - model = "Unknown" + if line[2].strip() == b'1' and line[5].strip() == b'disk': + total_size = line[3] + if not total_size: + total_size = "Unknown" + usb_type = "Removable" + model = subprocess.check_output( + "lsblk -in -f -o MODEL " + physical_disk, + shell=True).decode().strip() + if not model: + model = "Unknown" else: - if partition == 0: - dinfo = osdriver.wmi_get_physicaldrive_info(usb_disk) - return _ntuple_diskusage(*[dinfo[a] for a in [ - 'Size', 'MediaType', 'Model']]) - try: - selected_usb_part = str(usb_disk) - oFS = win32com.client.Dispatch("Scripting.FileSystemObject") - d = oFS.GetDrive(oFS.GetDriveName(oFS.GetAbsolutePathName(selected_usb_part))) -# selected_usb_device = d.DriveLetter - label = (d.VolumeName).strip() - if not label.strip(): - label = "No label." - total_size = d.TotalSize - usb_type = "Removable" - model = label - except: - log("Error detecting USB details.") - raise + dinfo = osdriver.wmi_get_physicaldrive_info_ex(physical_disk) + return _ntuple_diskusage(*[dinfo[a] for a in [ + 'size_total', 'mediatype', 'model']]) return _ntuple_diskusage(total_size, usb_type, model) diff --git a/scripts/mbusb_gui.py b/scripts/mbusb_gui.py index bbb1b22..af10cd9 100644 --- a/scripts/mbusb_gui.py +++ b/scripts/mbusb_gui.py @@ -159,19 +159,28 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow): :return: """ self.ui.installed_distros.clear() - config.usb_disk = str(self.ui.combo_drives.currentText()) - if config.usb_disk: + config.usb_disk = osdriver.listbox_entry_to_device( + self.ui.combo_drives.currentText()) + if config.usb_disk == 0 or config.usb_disk: # Get the GPT status of the disk and store it on a variable try: usb.gpt_device(config.usb_disk) + config.imager_usb_disk \ + = self.ui.combo_drives.currentText() + config.usb_details \ + = usb.details(config.usb_disk) except Exception as e: + o = io.StringIO() + traceback.print_exc(None, o) + log(o.getvalue()) QtWidgets.QMessageBox.critical( - self, "The disk/partition is not usable.", str(e)) + self, "The disk/partition is not usable.", + str(e)) self.ui.combo_drives.setCurrentIndex(0) - config.usb_disk = str(self.ui.combo_drives.currentText()) - log("Selected device " + config.usb_disk) - config.imager_usb_disk = str(self.ui.combo_drives.currentText()) - config.usb_details = usb.details(config.usb_disk) + # Above statement triggers call to this method. + return + log("Selected device " + + osdriver.usb_disk_desc(config.usb_disk)) self.update_target_info() self.update_list_box(config.usb_disk) self.ui_update_persistence() @@ -191,11 +200,8 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow): :return: """ self.ui.combo_drives.clear() - if self.ui.checkbox_all_drives.isChecked(): - detected_devices = usb.list_devices(fixed=True) - else: - detected_devices = usb.list_devices() - + detected_devices = usb.list_devices( + fixed=self.ui.checkbox_all_drives.isChecked()) if not detected_devices: return protected_drives = getattr(config, 'protected_drives', []) @@ -591,7 +597,7 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow): :return: """ for cond, log_msg, dialog_title, dialog_msg in [ - (lambda: not config.usb_disk, + (lambda: config.usb_disk is None, 'ERROR: No USB device found.', 'No Device...', 'No USB device found.\n\nInsert USB and ' @@ -620,16 +626,15 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow): "Please mount USB disk and press refresh " "USB button.") return False - if platform.system() == 'Linux' and \ - config.usb_disk[-1].isdigit() is False: - gen.log('Selected USB is a disk. Please select ' - 'a disk partition from the drop down list') + if config.usb_details['devtype'] == 'disk': + gen.log('Selected USB is a physical disk. ' + 'Please select ' + 'a partition or volume from the drop down list') QtWidgets.QMessageBox.information( self, 'No Partition...!', - 'USB disk selected doesn\'t contain a ' - 'partition.\n' - 'Please select the partition (ending ' - 'with a digit eg. /dev/sdb1)\n' + 'Selected USB is a physical disk. ' + 'Please select a partition (e.g. /dev/sdc1) ' + 'or a volume (e.g. G:) ' 'from the drop down list.') return False if 0 < config.persistence and \ @@ -760,7 +765,9 @@ Proceed with installation?'''.lstrip() % \ self.ui.button_browse_image.setEnabled(False) self.ui.combo_drives.setEnabled(False) # FIXME self.ui.pushbtn_imager_refreshusb.setEnabled(False) - status_text = ("Status: Writing " + os.path.basename(config.image_path) + " to " + config.usb_disk) + status_text = ("Status: Writing " + + os.path.basename(config.image_path) + " to " + + osdriver.usb_disk_desc(config.usb_disk)) self.ui.statusbar.showMessage(status_text) def dd_quit(self): @@ -782,19 +789,19 @@ Proceed with installation?'''.lstrip() % \ self.ui_enable_controls() else: imager = Imager() - if platform.system() == 'Linux' and config.usb_details['devtype'] == "partition": + if config.usb_details['devtype'] == "partition": gen.log('Selected device is a partition. Please select a disk from the drop down list') QtWidgets.QMessageBox.information(self, 'Incompatible device', 'Selected device (%s) is a partition!\n' 'ISO must be written to a whole disk.' '\n\nPlease select a disk from the drop down list.' % config.usb_disk) self.ui_enable_controls() else: - usb_disk_size = int(imager.imager_usb_detail(config.usb_disk, partition=0).total_size) + usb_disk_size = int(imager.imager_usb_detail(config.usb_disk).total_size) self.iso_size = os.path.getsize(config.image_path) if self.iso_size >= usb_disk_size: QtWidgets.QMessageBox.information(self, "No enough space on disk.", os.path.basename(config.image_path) + - " size is larger than the size of " + config.usb_disk) + " size is larger than the size of " + osdriver.usb_disk_desc(config.usb_disk)) self.ui_enable_controls() # elif gen.process_exist('explorer.exe') is not False: # # Check if windows explorer is running and inform user to close it. @@ -804,7 +811,7 @@ Proceed with installation?'''.lstrip() % \ else: reply = QtWidgets.QMessageBox.question \ (self, 'Review selection', - 'Selected disk: %s\n' % config.usb_disk + + 'Selected disk: %s\n' % osdriver.usb_disk_desc(config.usb_disk) + 'Selected image: %s\n\n' % os.path.basename(config.image_path) + 'Proceed with writing image to disk?', QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No) @@ -850,7 +857,7 @@ Proceed with installation?'''.lstrip() % \ usb_free_size= str(usb.bytes2human(config.usb_details.get('size_free', ""))) config.persistence_max_size = persistence.max_disk_persistence(config.usb_disk) config.usb_mount = config.usb_details.get('mount_point', "") - self.ui.usb_dev.setText(config.usb_disk) + self.ui.usb_dev.setText(osdriver.usb_disk_desc(config.usb_disk)) self.ui.usb_vendor.setText(config.usb_details.get('vendor', "")) self.ui.usb_model.setText(config.usb_details.get('model', "")) @@ -983,11 +990,13 @@ class DD_Progress(QtCore.QThread): self.error = error def run(self): + config.imager_percentage = 0 self.thread.start() while self.thread.isRunning(): if config.imager_percentage: self.update.emit(config.imager_percentage) - if not self.thread.isFinished() and config.percentage == 100: + if not self.thread.isFinished() and \ + config.percentage == 100: config.imager_status_text = "" self.status.emit("Please wait...") time.sleep(0.1) diff --git a/scripts/osdriver.py b/scripts/osdriver.py index 962fc98..a5d0075 100644 --- a/scripts/osdriver.py +++ b/scripts/osdriver.py @@ -1,3 +1,4 @@ +import collections import logging import logging.handlers import os @@ -10,7 +11,14 @@ import sys import tempfile import time -from . import udisks +if platform.system() == 'Windows': + import wmi + from scripts import win32 +else: + try: + from . import udisks + except ImportError: + import udisks def log(message, info=True, error=False, debug=False, _print=True): """ @@ -58,14 +66,13 @@ def resource_path(relativePath): return fullpath log("Could not find resource '%s'." % relativePath) + def get_physical_disk_number(usb_disk): """ Get the physical disk number as detected ny Windows. :param usb_disk: USB disk (Like F:) :return: Disk number. """ - import pythoncom - pythoncom.CoInitialize() partition, logical_disk = wmi_get_drive_info(usb_disk) log("Physical Device Number is %d" % partition.DiskIndex) return partition.DiskIndex @@ -73,9 +80,7 @@ def get_physical_disk_number(usb_disk): def wmi_get_drive_info(usb_disk): assert platform.system() == 'Windows' - import wmi - c = wmi.WMI() - for partition in c.Win32_DiskPartition(): + for partition in wmi.WMI().Win32_DiskPartition(): logical_disks = partition.associators("Win32_LogicalDiskToPartition") # Here, 'disk' is a windows logical drive rather than a physical drive for disk in logical_disks: @@ -83,6 +88,46 @@ def wmi_get_drive_info(usb_disk): return (partition, disk) raise RuntimeError('Failed to obtain drive information ' + usb_disk) + +def collect_relevant_info(obj, tuple_name, attributes, named_tuple): + if len(named_tuple)==0: + names = [x[0] for x in attributes] + named_tuple.append(collections.namedtuple(tuple_name, names)) + L = [] + for (attr, convfunc) in attributes: + v = getattr(obj, attr) + L.append(None if v is None else convfunc(v)) + return named_tuple[0](*L) + + +def collect_relevant_physicaldrive_info(d, physicaldrive_info_tuple=[]): + attributes = [ + ('BytesPerSector', int), + ('DeviceID', str), + ('Index', int), + ('Manufacturer', str), + ('MediaType', str), + ('Model', str), + ('Partitions', int), + ('SerialNumber', str), + ('Size', int), + ('TotalSectors', int), + ] + return collect_relevant_info(d, 'PhysicalDrive', attributes, + physicaldrive_info_tuple) + + +def collect_relevant_volume_info(v, volume_info_tuple=[]): + attributes = [ + ('DeviceID', str), + ('DriveType', int), + ('FreeSpace', int), + ('FileSystem', str), + ('Size', int), + ] + return collect_relevant_info(v, 'Volume', attributes, volume_info_tuple) + + def wmi_get_physicaldrive_info(usb_disk): "Return information about the drive that contains 'usb_disk'." partition, disk = wmi_get_drive_info(usb_disk) @@ -91,22 +136,35 @@ def wmi_get_physicaldrive_info(usb_disk): drv_list = [d for d in c.Win32_DiskDrive() if d.Index == partition.DiskIndex] assert len(drv_list)==1 - d = drv_list[0] - rdict = {} - for attrname, convfunc in [ - ('BytesPerSector', int), - ('DeviceID', str), - ('MediaType', str), - ('Model', str), - ('Partitions', int), - ('SerialNumber', str), - ('Size', int), - ('TotalSectors', int), - ]: - rdict[attrname] = getattr(d, attrname) - return rdict + return collect_relevant_physicaldrive_info(drv_list[0]) + -def wmi_get_drive_info_ex(usb_disk): +def wmi_get_physicaldrive_info_all(): + c = wmi.WMI() + L = [collect_relevant_physicaldrive_info(d) for d in c.Win32_DiskDrive()] + L.sort(key=lambda x: x.Index) + return L + + +def wmi_get_volume_info_on(diskIndex): + L = [volumes for (dindex, volumes) in wmi_get_volume_info_all().items() + if dindex==diskIndex] + return [] if len(L)==0 else L[0] + + +def wmi_get_volume_info_all(): + r = {} + for dindex, volumes in [ + (p.DiskIndex, map(lambda d: collect_relevant_volume_info(d), + p.associators("Win32_LogicalDiskToPartition"))) + for p in wmi.WMI().Win32_DiskPartition()]: + r.setdefault(dindex, []).extend(volumes) + for dindex, volumes in r.items(): + volumes.sort(key=lambda x: x.DeviceID) + return r + + +def wmi_get_volume_info_ex(usb_disk): assert platform.system() == 'Windows' partition, disk = wmi_get_drive_info(usb_disk) #print (disk.Caption, partition.StartingOffset, partition.DiskIndex, @@ -139,7 +197,8 @@ def wmi_get_drive_info_ex(usb_disk): 'size_free' : size_free, 'vendor' : 'Not_Found', 'model' : 'Not_Found', - 'devtype' : { + 'devtype' : 'partition', + 'mediatype' : { 0 : 'Unknown', 1 : 'Fixed Disk', 2 : 'Removable Disk', @@ -153,26 +212,34 @@ def wmi_get_drive_info_ex(usb_disk): # print (r) return r +def wmi_get_physicaldrive_info_ex(diskIndex): + drv_list = [d for d in wmi.WMI().Win32_DiskDrive() + if d.Index == diskIndex] + assert len(drv_list)==1 + d = collect_relevant_physicaldrive_info(drv_list[0]) + r = {} + for src, dst in [ + ('Size', 'size_total'), + ('Model', 'model'), + ('Manufacturer', 'vendor'), + ('MediaType', 'mediatype'), + ('SerialNumber', 'uuid'), + ('DeviceID', 'label'), + ]: + r[dst] = getattr(d, src) + r['devtype'] = 'disk' + r['size_free'] = 0 + r['file_system'] = 'N/A' + r['mount_point'] = 'N/A' + return r + -def dd_win_clean_usb(usb_disk_no, status_update): - """ +def win_physicaldrive_to_listbox_entry(pdrive): + return '%d:%s' % (pdrive.Index,pdrive.Model) - """ - host_dir = multibootusb_host_dir() - temp_file = os.path.join(host_dir, "preference", "disk_part.txt") - diskpart_text_feed = 'select disk ' + str(usb_disk_no) + '\nclean\nexit' - with open(temp_file, 'wb') as fp: - fp.write(bytes(diskpart_text_feed, 'utf-8')) - status_update('Cleaning the disk...') - if subprocess.call('diskpart.exe -s ' + temp_file ) == 0: - for i in range(40): - # Wait for the drive to reappear if it has ever gone away. - time.sleep(0.25) - with open('\\\\.\\PhysicalDrive%d' % usb_disk_no, 'rb'): - return - raise RuntimeError('PhysicalDrive%d is now gone!' % usb_disk_no) - raise RuntimeError("Execution of diskpart.exe has failed.") +def win_volume_to_listbox_entry(v): + return v.DeviceID class Base: @@ -187,6 +254,8 @@ class Base: def dd_iso_image(self, input_, output, gui_update, status_update): + ''' Implementation for OS that use dd to write the iso image. + ''' in_file_size = os.path.getsize(input_) cmd = [self.dd_exe, 'if=' + input_, 'of=' + self.physical_disk(output), 'bs=1M'] @@ -215,48 +284,25 @@ class Windows(Base): def __init__(self): self.dd_exe = resource_path('data/tools/dd/dd.exe') - def dd_add_args(self, cmd_vec, input, output, bs, count): - pass - - def dd_iso_image_prepare(self, input, output, status_update): - return dd_win_clean_usb(get_physical_disk_number(output), - status_update) - - def dd_iso_image_add_args(self, cmd_vec, input_, output): - cmd_vec.append('--progress') - - def add_dd_iso_image_popen_args(self, dd_iso_image_popen_args): - dd_iso_image_popen_args['universal_newlines'] = True - - def dd_iso_image_readoutput(self, dd_process, gui_update, in_file_size, - output_q): - for line in iter(dd_process.stderr.readline, ''): - line = line.strip() - if line: - l = line.replace(',', '') - if l[-1:] == 'M': - bytes_copied = float(l.rstrip('M')) * 1024 * 1024 - elif l.isdigit(): - bytes_copied = float(l) - else: - if 15 < output_q.qsize(): - output_q.get() - output_q.put(line) - continue - gui_update(bytes_copied / in_file_size * 100.) - continue - # Now the 'dd' process should have completed or going to soon. - - def dd_iso_image_interpret_result(self, returncode, output_list): - # dd.exe always returns 0 - if any([ 'invalid' in s or 'error' in s for s - in [l.lower() for l in output_list] ]): - return '\n'.join(output_list) - else: - return None + def dd_iso_image(self, input_, output, gui_update, status_update): + assert type(output) is int + status_update('Zapping PhyiscalDisk%d' % output) + win32.ZapPhysicalDrive(output, wmi_get_volume_info_on, log) + # Ouch. Needs sometime for the zapping to take effect... + # Better way than sleeping constant time? + time.sleep(3) + status_update('Writing to PhysicalDisk%d' % output) + in_file_size = os.path.getsize(input_) + with win32.openHandle('\\\\.\\PhysicalDrive%d' % output, + True, False, log) as hDrive: + hDrive.LockPhysicalDrive() + hDrive.CopyFrom(input_, lambda bytes_copied: + gui_update(float(bytes_copied)/in_file_size*100.)) def physical_disk(self, usb_disk): - return r'\\.\physicaldrive%d' % get_physical_disk_number(usb_disk) + if type(usb_disk) is str: + usb_disk = get_physical_disk_number(usb_disk) + return r'\\.\physicaldrive%d' % usb_disk def mbusb_log_file(self): return os.path.join(os.getcwd(), 'multibootusb.log') @@ -268,8 +314,32 @@ class Windows(Base): return os.path.join(tempfile.gettempdir(), "multibootusb") def gpt_device(self, dev_name): - partition, disk = wmi_get_drive_info(dev_name) - return partition.Type.startswith('GPT:') + if type(dev_name) is int: + diskIndex = dev_name + for p in wmi.WMI().Win32_DiskPartition(): + if p.DiskIndex == diskIndex: + return p.Type.startswith('GPT:') + log(usb_disk_desc(dev_name) + ' seems not partitioned. ' + + 'assuming msdos.') + return False + else: + partition, disk = wmi_get_drive_info(dev_name) + return partition.Type.startswith('GPT:') + + def usb_disk_desc(self, dev_name): + if type(dev_name) is int: + return 'PhysicalDrive%d' % dev_name + return dev_name + + def listbox_entry_to_device(self, lb_entry): + left = lb_entry.split(':', 1)[0] + if left.isdigit(): + return int(left) # see win_physicaldrive_to_listbox_entry() + else: + return lb_entry # see win_volume_to_listbox_entry() + + def qemu_more_params(self, disk): + return ['-L', '.', '-boot', 'c', '-hda', self.physical_disk(disk)] class Linux(Base): @@ -326,7 +396,7 @@ class Linux(Base): return os.path.join(os.path.expanduser('~'), ".multibootusb") def gpt_device(self, dev_name): - disk_dev = dev_name.rstrip('0123456789') + disk_dev = self.physical_disk(dev_name) cmd = ['parted', disk_dev, '-s', 'print'] with open(os.devnull) as devnull: p = subprocess.Popen(cmd, stdout=subprocess.PIPE, @@ -345,6 +415,14 @@ class Linux(Base): raise RuntimeError("Disk '%s' is uninitialized and not usable." % disk_dev) + def usb_disk_desc(self, dev_name): + return dev_name + + def listbox_entry_to_device(self, lb_entry): + return lb_entry + + def qemu_more_params(self, disk): + return ['-hda', self.physical_disk(disk), '-vga', 'std'] driverClass = { 'Windows' : Windows, @@ -362,10 +440,13 @@ for func_name in [ 'find_mounted_partitions_on', 'multibootusb_host_dir', 'gpt_device', + 'listbox_entry_to_device', + 'usb_disk_desc', + 'qemu_more_params', ]: globals()[func_name] = getattr(osdriver, func_name) -def init_logging(): +def initialize(): logging.root.setLevel(logging.DEBUG) fmt = '%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(message)s' datefmt = '%H:%M:%S' @@ -373,3 +454,7 @@ def init_logging(): osdriver.mbusb_log_file(), 'a', 1024*1024, 5) the_handler.setFormatter(logging.Formatter(fmt, datefmt)) logging.root.addHandler(the_handler) + + if platform.system() == 'Windows': + import pythoncom + pythoncom.CoInitialize() diff --git a/scripts/qemu.py b/scripts/qemu.py index aa402ad..82c9205 100644 --- a/scripts/qemu.py +++ b/scripts/qemu.py @@ -16,6 +16,7 @@ from PyQt5 import QtWidgets from .gui.ui_multibootusb import Ui_MainWindow from .gen import * from . import config +from . import osdriver from . import usb class Qemu(QtWidgets.QMainWindow, Ui_MainWindow): @@ -100,16 +101,8 @@ class Qemu(QtWidgets.QMainWindow, Ui_MainWindow): self, 'No disk...', 'No USB disk selected.\n\nPlease choose a disk first.') return - if platform.system() == "Windows": - disk_number = get_physical_disk_number(config.usb_disk) - qemu_more_params = ['-L', '.', '-boot', 'c', '-hda', - '//./PhysicalDrive' + str(disk_number)] - elif platform.system() == "Linux": - qemu_more_params = ['-hda', config.usb_disk.rstrip('0123456789'), - '-vga', 'std'] - else: - assert False, "Unknown platform '%s'" % platform.system() - self.run_qemu(self.qemu_usb_ram(), qemu_more_params, + more_params = osdriver.qemu_more_params(config.usb_disk) + self.run_qemu(self.qemu_usb_ram(), more_params, "ERROR: USB Boot: qemu not found!", 'Error...', 'Error booting USB\nUnable to start QEMU.') diff --git a/scripts/usb.py b/scripts/usb.py index 45efc27..904ef10 100644 --- a/scripts/usb.py +++ b/scripts/usb.py @@ -25,11 +25,9 @@ from . import osdriver if platform.system() == 'Linux': from . import udisks UDISKS = udisks.get_udisks(ver=None) + if platform.system() == 'Windows': - import psutil import win32com.client -# import wmi - import pythoncom def is_block(usb_disk): @@ -174,29 +172,14 @@ def list_devices(fixed=False): devices.sort() elif platform.system() == "Windows": - if fixed is True: - for drive in psutil.disk_partitions(): - if 'cdrom' in drive.opts or drive.fstype == '': - # Skip cdrom drives or the disk with no filesystem - continue - devices.append(drive[0][:-1]) - else: - try: - # Try new method using psutil. It should also detect USB 3.0 (but not tested by me) - for drive in psutil.disk_partitions(): - if 'cdrom' in drive.opts or drive.fstype == '': - # Skip cdrom drives or the disk with no filesystem - continue - if 'removable' in drive.opts: - devices.append(drive[0][:-1]) - except: - # Revert back to old method if psutil fails (which is unlikely) - oFS = win32com.client.Dispatch("Scripting.FileSystemObject") - oDrives = oFS.Drives - for drive in oDrives: - if drive.DriveType == 1 and drive.IsReady: - devices.append(drive) - + volumes = osdriver.wmi_get_volume_info_all() + devices = [] + for pdrive in osdriver.wmi_get_physicaldrive_info_all(): + if (not fixed) and pdrive.MediaType != 'Removable Media': + continue + devices.append(osdriver.win_physicaldrive_to_listbox_entry(pdrive)) + devices.extend([osdriver.win_volume_to_listbox_entry(d) + for d in volumes.get(pdrive.Index, [])]) if devices: return devices else: @@ -501,10 +484,10 @@ def repair_vfat_filesystem(usb_disk, result=None): result.append((p.returncode, output, ' '.join(cmd))) return None -def details(usb_disk_part): +def details(disk_or_partition): """ Populate and get details of an USB disk. - :param usb_disk_part: USB disk. Example.. "/dev/sdb1" on Linux and "D:\" on Windows. + :param disk_or_partition: USB disk. Example.. "/dev/sdb1" on Linux and "D:\" on Windows. :return: label == > returns name/label of an inserted USB device. mount_point == > returns mount path of an inserted USB device. uuid == > returns uuid of an inserted USB device. @@ -517,18 +500,20 @@ def details(usb_disk_part): model == > returns the model name of the USB. """ - assert usb_disk_part is not None + assert disk_or_partition is not None details = {} if platform.system() == 'Linux': try: - details = details_udev(usb_disk_part) + details = details_udev(disk_or_partition) except: - details = details_udisks2(usb_disk_part) + details = details_udisks2(disk_or_partition) elif platform.system() == 'Windows': - details = osdriver.wmi_get_drive_info_ex(usb_disk_part) - + if type(disk_or_partition) == int: + details = osdriver.wmi_get_physicaldrive_info_ex(disk_or_partition) + else: + details = osdriver.wmi_get_volume_info_ex(disk_or_partition) return details diff --git a/scripts/win32.py b/scripts/win32.py new file mode 100644 index 0000000..9d0693a --- /dev/null +++ b/scripts/win32.py @@ -0,0 +1,258 @@ +import collections +import ctypes +import io +import pywintypes +import struct +import sys +import time +import win32api +import win32con +import win32file +import winerror +import winioctlcon +import wmi + +from ctypes import wintypes +from functools import reduce + +kernel32 = ctypes.WinDLL('kernel32') # , use_last_error=True) + +kernel32.FindFirstVolumeW.restype = wintypes.HANDLE +kernel32.FindNextVolumeW.argtypes = (wintypes.HANDLE, + wintypes.LPWSTR, + wintypes.DWORD) +kernel32.FindVolumeClose.argtypes = (wintypes.HANDLE,) + +def FindFirstVolume(): + volume_name = ctypes.create_unicode_buffer(" " * 255) + h = kernel32.FindFirstVolumeW(volume_name, 255) + if h == win32file.INVALID_HANDLE_VALUE: + raise RuntimeError("FindFirstVolume() returned an invalid handle.") + return h, volume_name.value + +def FindNextVolume(hSearch): + volume_name = ctypes.create_unicode_buffer(" " * 255) + if kernel32.FindNextVolumeW(hSearch, volume_name, 255) != 0: + return volume_name.value + else: + errno = ctypes.GetLastError() + if errno == winerror.ERROR_NO_MORE_FILES: + FindVolumeClose(hSearch) + return None + raise RuntimeError("FindNextVolume failed (%s)" % errno) + +def FindVolumeClose(hSearch): + """Close a search handle opened by FindFirstVolume, typically + after the last volume has been returned. + """ + if kernel32.FindVolumeClose(hSearch) == 0: + raise RuntimeError("FindVolumeClose() failed.") + +def findAvailableDrives(): + return [(d, win32file.GetDriveType(d)) for d in + win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')] + +def findNewDriveLetter(used_letters): + all_letters = set([chr(i) for i in range(ord('C'), ord('Z')+1)]) + return min(list(all_letters - set([s[0] for s in used_letters]))) + +def _openHandle(path, bWriteAccess, bWriteShare, + logfunc = lambda s: None): + TIMEOUT, NUM_RETRIES = 10, 20 + for retry_count in range(6): + try: + access_flag = win32con.GENERIC_READ | \ + (bWriteAccess and win32con.GENERIC_WRITE or 0) + share_flag = win32con.FILE_SHARE_READ | \ + (bWriteShare and win32con.FILE_SHARE_WRITE or 0) + handle = win32file.CreateFile( + path, access_flag, share_flag, None, + win32con.OPEN_EXISTING, win32con.FILE_ATTRIBUTE_NORMAL, None) + nth = { 0: 'first', 1:'second', 2:'third'} + logfunc("Opening [%s]: success at the %s iteration" % + (path, nth.get(retry_count, '%sth' % (retry_count+1)))) + return handle + except pywintypes.error as e: + logfunc('Exception=>'+str(e)) + if NUM_RETRIES/3 < retry_count: + bWriteShare = True + time.sleep(TIMEOUT / float(NUM_RETRIES)) + else: + raise RuntimeError("Couldn't open handle for %s." % path) + +def _closeHandle(h): + x = win32file.CloseHandle(h) + assert x != win32file.INVALID_HANDLE_VALUE + return x + +class openHandle: + def __init__(self, path, bWriteAccess, bWriteShare, + logfunc = lambda s: None): + self.path = path + self.bWriteAccess = bWriteAccess + self.bWriteShare = bWriteShare + self.logfunc = logfunc + self.h = None + + def __enter__(self): + self.h = _openHandle(self.path, self.bWriteAccess, self.bWriteShare, + self.logfunc) + return self + + def __exit__(self, type_, value, traceback_): + _closeHandle(self.h) + + def assert_physical_drive(self): + if self.path.lower().find('physicaldrive')<0: + raise RuntimeError("Handle is not one of a physical drive.") + + def LockPhysicalDrive(self): + self.assert_physical_drive() + lockPhysicalDrive(self.h, self.logfunc) + self.logfunc("Successfully locked '%s'" % self.path) + + + def ReadFile(self, size): + return win32file.ReadFile(self.h, size, None) + + def WriteFile(self, b): + return win32file.WriteFile(self.h, b, None) + + geometory_tuple = collections.namedtuple( + 'DiskGeometory', + ['number_of_cylinders', 'media_type', 'tracks_per_cylinder', + 'sectors_per_track', 'bytes_per_sector', 'disk_size']) + def DiskGeometory(self): + self.assert_physical_drive() + o = win32file.DeviceIoControl( + self.h, winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY_EX, + None, 256, None) + return self.geometory_tuple(*struct.unpack('%s' % x) + x = win32file.DeleteVolumeMountPoint(volume_path+'\\') + log_func('DeleteVolumeMountPoint=>%s' % x) + else: + log_func('No volumes on %s' % target_drive) + + add1MB = False + hDrive.ZapMBRGPT(geom.disk_size, geom.bytes_per_sector, add1MB) + + +if __name__ == '__main__': + + # used_letters = [d for d in + # win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')] + # print (used_letters) + # print (findNewDriveLetter(used_letters)) + # TargetDrive = 2 + # vinfo_list = [x for x in findVolumeGuids() + # if x.Extents and x.Extents[0].DiskNumber==TargetDrive] + + TargetDrive = 5 + with openHandle('\\\\.\\PhysicalDrive%d' % TargetDrive, True, False, + lambda s:sys.stdout.write(s+'\n')) as hDrive: + hDrive.CopyFrom('c:/Users/shinj/Downloads/salitaz-rolling_iso', + lambda b: None)