Show phyiscal disks on Windows too.

Unmount partitions and lock physical drive when dd-ing iso on Windows
Reset progres before starting dd-iso thread.
pull/388/head
shinji-s 6 years ago
parent 9d349c7c64
commit 30128a7933

@ -222,7 +222,7 @@ def main():
if __name__ == '__main__':
osdriver.init_logging()
osdriver.initialize()
try:
main()
finally:

@ -32,6 +32,7 @@ if platform.system() == "Windows":
def dd_iso_image(dd_progress_thread):
try:
dd_progress_thread.set_error(None)
_dd_iso_image(dd_progress_thread)
except:
# config.imager_return = False
@ -142,47 +143,34 @@ class Imager(QtWidgets.QMainWindow, Ui_MainWindow):
return disk
@staticmethod
def imager_usb_detail(usb_disk, partition=1):
def imager_usb_detail(physical_disk):
"""
Function to detect details of USB disk using lsblk
:param usb_disk: path to usb disk
:param partition: by default partition is set (but yet to code for it)
:param physical_disk: /dev/sd? (linux) or integer disk number (win)
:return: details of size, type and model as tuples
"""
_ntuple_diskusage = collections.namedtuple('usage', 'total_size usb_type model')
_ntuple_diskusage = collections.namedtuple(
'usage', 'total_size usb_type model')
if platform.system() == "Linux":
output = subprocess.check_output("lsblk -ib " + usb_disk, shell=True)
output = subprocess.check_output("lsblk -ib " + physical_disk,
shell=True)
for line in output.splitlines():
line = line.split()
if partition != 1:
if line[2].strip() == b'1' and line[5].strip() == b'disk':
total_size = line[3]
if not total_size:
total_size = "Unknown"
usb_type = "Removable"
model = subprocess.check_output("lsblk -in -f -o MODEL " + usb_disk, shell=True).decode().strip()
if not model:
model = "Unknown"
if line[2].strip() == b'1' and line[5].strip() == b'disk':
total_size = line[3]
if not total_size:
total_size = "Unknown"
usb_type = "Removable"
model = subprocess.check_output(
"lsblk -in -f -o MODEL " + physical_disk,
shell=True).decode().strip()
if not model:
model = "Unknown"
else:
if partition == 0:
dinfo = osdriver.wmi_get_physicaldrive_info(usb_disk)
return _ntuple_diskusage(*[dinfo[a] for a in [
'Size', 'MediaType', 'Model']])
try:
selected_usb_part = str(usb_disk)
oFS = win32com.client.Dispatch("Scripting.FileSystemObject")
d = oFS.GetDrive(oFS.GetDriveName(oFS.GetAbsolutePathName(selected_usb_part)))
# selected_usb_device = d.DriveLetter
label = (d.VolumeName).strip()
if not label.strip():
label = "No label."
total_size = d.TotalSize
usb_type = "Removable"
model = label
except:
log("Error detecting USB details.")
raise
dinfo = osdriver.wmi_get_physicaldrive_info_ex(physical_disk)
return _ntuple_diskusage(*[dinfo[a] for a in [
'size_total', 'mediatype', 'model']])
return _ntuple_diskusage(total_size, usb_type, model)

@ -159,19 +159,28 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow):
:return:
"""
self.ui.installed_distros.clear()
config.usb_disk = str(self.ui.combo_drives.currentText())
if config.usb_disk:
config.usb_disk = osdriver.listbox_entry_to_device(
self.ui.combo_drives.currentText())
if config.usb_disk == 0 or config.usb_disk:
# Get the GPT status of the disk and store it on a variable
try:
usb.gpt_device(config.usb_disk)
config.imager_usb_disk \
= self.ui.combo_drives.currentText()
config.usb_details \
= usb.details(config.usb_disk)
except Exception as e:
o = io.StringIO()
traceback.print_exc(None, o)
log(o.getvalue())
QtWidgets.QMessageBox.critical(
self, "The disk/partition is not usable.", str(e))
self, "The disk/partition is not usable.",
str(e))
self.ui.combo_drives.setCurrentIndex(0)
config.usb_disk = str(self.ui.combo_drives.currentText())
log("Selected device " + config.usb_disk)
config.imager_usb_disk = str(self.ui.combo_drives.currentText())
config.usb_details = usb.details(config.usb_disk)
# Above statement triggers call to this method.
return
log("Selected device " +
osdriver.usb_disk_desc(config.usb_disk))
self.update_target_info()
self.update_list_box(config.usb_disk)
self.ui_update_persistence()
@ -191,11 +200,8 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow):
:return:
"""
self.ui.combo_drives.clear()
if self.ui.checkbox_all_drives.isChecked():
detected_devices = usb.list_devices(fixed=True)
else:
detected_devices = usb.list_devices()
detected_devices = usb.list_devices(
fixed=self.ui.checkbox_all_drives.isChecked())
if not detected_devices:
return
protected_drives = getattr(config, 'protected_drives', [])
@ -591,7 +597,7 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow):
:return:
"""
for cond, log_msg, dialog_title, dialog_msg in [
(lambda: not config.usb_disk,
(lambda: config.usb_disk is None,
'ERROR: No USB device found.',
'No Device...',
'No USB device found.\n\nInsert USB and '
@ -620,16 +626,15 @@ class AppGui(qemu.Qemu, Imager, QtWidgets.QMainWindow, Ui_MainWindow):
"Please mount USB disk and press refresh "
"USB button.")
return False
if platform.system() == 'Linux' and \
config.usb_disk[-1].isdigit() is False:
gen.log('Selected USB is a disk. Please select '
'a disk partition from the drop down list')
if config.usb_details['devtype'] == 'disk':
gen.log('Selected USB is a physical disk. '
'Please select '
'a partition or volume from the drop down list')
QtWidgets.QMessageBox.information(
self, 'No Partition...!',
'USB disk selected doesn\'t contain a '
'partition.\n'
'Please select the partition (ending '
'with a digit eg. /dev/sdb1)\n'
'Selected USB is a physical disk. '
'Please select a partition (e.g. /dev/sdc1) '
'or a volume (e.g. G:) '
'from the drop down list.')
return False
if 0 < config.persistence and \
@ -760,7 +765,9 @@ Proceed with installation?'''.lstrip() % \
self.ui.button_browse_image.setEnabled(False)
self.ui.combo_drives.setEnabled(False)
# FIXME self.ui.pushbtn_imager_refreshusb.setEnabled(False)
status_text = ("Status: Writing " + os.path.basename(config.image_path) + " to " + config.usb_disk)
status_text = ("Status: Writing " +
os.path.basename(config.image_path) + " to " +
osdriver.usb_disk_desc(config.usb_disk))
self.ui.statusbar.showMessage(status_text)
def dd_quit(self):
@ -782,19 +789,19 @@ Proceed with installation?'''.lstrip() % \
self.ui_enable_controls()
else:
imager = Imager()
if platform.system() == 'Linux' and config.usb_details['devtype'] == "partition":
if config.usb_details['devtype'] == "partition":
gen.log('Selected device is a partition. Please select a disk from the drop down list')
QtWidgets.QMessageBox.information(self, 'Incompatible device', 'Selected device (%s) is a partition!\n'
'ISO must be written to a whole disk.'
'\n\nPlease select a disk from the drop down list.' % config.usb_disk)
self.ui_enable_controls()
else:
usb_disk_size = int(imager.imager_usb_detail(config.usb_disk, partition=0).total_size)
usb_disk_size = int(imager.imager_usb_detail(config.usb_disk).total_size)
self.iso_size = os.path.getsize(config.image_path)
if self.iso_size >= usb_disk_size:
QtWidgets.QMessageBox.information(self, "No enough space on disk.",
os.path.basename(config.image_path) +
" size is larger than the size of " + config.usb_disk)
" size is larger than the size of " + osdriver.usb_disk_desc(config.usb_disk))
self.ui_enable_controls()
# elif gen.process_exist('explorer.exe') is not False:
# # Check if windows explorer is running and inform user to close it.
@ -804,7 +811,7 @@ Proceed with installation?'''.lstrip() % \
else:
reply = QtWidgets.QMessageBox.question \
(self, 'Review selection',
'Selected disk: %s\n' % config.usb_disk +
'Selected disk: %s\n' % osdriver.usb_disk_desc(config.usb_disk) +
'Selected image: %s\n\n' % os.path.basename(config.image_path) +
'Proceed with writing image to disk?',
QtWidgets.QMessageBox.Yes | QtWidgets.QMessageBox.No, QtWidgets.QMessageBox.No)
@ -850,7 +857,7 @@ Proceed with installation?'''.lstrip() % \
usb_free_size= str(usb.bytes2human(config.usb_details.get('size_free', "")))
config.persistence_max_size = persistence.max_disk_persistence(config.usb_disk)
config.usb_mount = config.usb_details.get('mount_point', "")
self.ui.usb_dev.setText(config.usb_disk)
self.ui.usb_dev.setText(osdriver.usb_disk_desc(config.usb_disk))
self.ui.usb_vendor.setText(config.usb_details.get('vendor', ""))
self.ui.usb_model.setText(config.usb_details.get('model', ""))
@ -983,11 +990,13 @@ class DD_Progress(QtCore.QThread):
self.error = error
def run(self):
config.imager_percentage = 0
self.thread.start()
while self.thread.isRunning():
if config.imager_percentage:
self.update.emit(config.imager_percentage)
if not self.thread.isFinished() and config.percentage == 100:
if not self.thread.isFinished() and \
config.percentage == 100:
config.imager_status_text = ""
self.status.emit("Please wait...")
time.sleep(0.1)

@ -1,3 +1,4 @@
import collections
import logging
import logging.handlers
import os
@ -10,7 +11,14 @@ import sys
import tempfile
import time
from . import udisks
if platform.system() == 'Windows':
import wmi
from scripts import win32
else:
try:
from . import udisks
except ImportError:
import udisks
def log(message, info=True, error=False, debug=False, _print=True):
"""
@ -58,14 +66,13 @@ def resource_path(relativePath):
return fullpath
log("Could not find resource '%s'." % relativePath)
def get_physical_disk_number(usb_disk):
"""
Get the physical disk number as detected ny Windows.
:param usb_disk: USB disk (Like F:)
:return: Disk number.
"""
import pythoncom
pythoncom.CoInitialize()
partition, logical_disk = wmi_get_drive_info(usb_disk)
log("Physical Device Number is %d" % partition.DiskIndex)
return partition.DiskIndex
@ -73,9 +80,7 @@ def get_physical_disk_number(usb_disk):
def wmi_get_drive_info(usb_disk):
assert platform.system() == 'Windows'
import wmi
c = wmi.WMI()
for partition in c.Win32_DiskPartition():
for partition in wmi.WMI().Win32_DiskPartition():
logical_disks = partition.associators("Win32_LogicalDiskToPartition")
# Here, 'disk' is a windows logical drive rather than a physical drive
for disk in logical_disks:
@ -83,6 +88,46 @@ def wmi_get_drive_info(usb_disk):
return (partition, disk)
raise RuntimeError('Failed to obtain drive information ' + usb_disk)
def collect_relevant_info(obj, tuple_name, attributes, named_tuple):
if len(named_tuple)==0:
names = [x[0] for x in attributes]
named_tuple.append(collections.namedtuple(tuple_name, names))
L = []
for (attr, convfunc) in attributes:
v = getattr(obj, attr)
L.append(None if v is None else convfunc(v))
return named_tuple[0](*L)
def collect_relevant_physicaldrive_info(d, physicaldrive_info_tuple=[]):
attributes = [
('BytesPerSector', int),
('DeviceID', str),
('Index', int),
('Manufacturer', str),
('MediaType', str),
('Model', str),
('Partitions', int),
('SerialNumber', str),
('Size', int),
('TotalSectors', int),
]
return collect_relevant_info(d, 'PhysicalDrive', attributes,
physicaldrive_info_tuple)
def collect_relevant_volume_info(v, volume_info_tuple=[]):
attributes = [
('DeviceID', str),
('DriveType', int),
('FreeSpace', int),
('FileSystem', str),
('Size', int),
]
return collect_relevant_info(v, 'Volume', attributes, volume_info_tuple)
def wmi_get_physicaldrive_info(usb_disk):
"Return information about the drive that contains 'usb_disk'."
partition, disk = wmi_get_drive_info(usb_disk)
@ -91,22 +136,35 @@ def wmi_get_physicaldrive_info(usb_disk):
drv_list = [d for d in c.Win32_DiskDrive()
if d.Index == partition.DiskIndex]
assert len(drv_list)==1
d = drv_list[0]
rdict = {}
for attrname, convfunc in [
('BytesPerSector', int),
('DeviceID', str),
('MediaType', str),
('Model', str),
('Partitions', int),
('SerialNumber', str),
('Size', int),
('TotalSectors', int),
]:
rdict[attrname] = getattr(d, attrname)
return rdict
return collect_relevant_physicaldrive_info(drv_list[0])
def wmi_get_drive_info_ex(usb_disk):
def wmi_get_physicaldrive_info_all():
c = wmi.WMI()
L = [collect_relevant_physicaldrive_info(d) for d in c.Win32_DiskDrive()]
L.sort(key=lambda x: x.Index)
return L
def wmi_get_volume_info_on(diskIndex):
L = [volumes for (dindex, volumes) in wmi_get_volume_info_all().items()
if dindex==diskIndex]
return [] if len(L)==0 else L[0]
def wmi_get_volume_info_all():
r = {}
for dindex, volumes in [
(p.DiskIndex, map(lambda d: collect_relevant_volume_info(d),
p.associators("Win32_LogicalDiskToPartition")))
for p in wmi.WMI().Win32_DiskPartition()]:
r.setdefault(dindex, []).extend(volumes)
for dindex, volumes in r.items():
volumes.sort(key=lambda x: x.DeviceID)
return r
def wmi_get_volume_info_ex(usb_disk):
assert platform.system() == 'Windows'
partition, disk = wmi_get_drive_info(usb_disk)
#print (disk.Caption, partition.StartingOffset, partition.DiskIndex,
@ -139,7 +197,8 @@ def wmi_get_drive_info_ex(usb_disk):
'size_free' : size_free,
'vendor' : 'Not_Found',
'model' : 'Not_Found',
'devtype' : {
'devtype' : 'partition',
'mediatype' : {
0 : 'Unknown',
1 : 'Fixed Disk',
2 : 'Removable Disk',
@ -153,26 +212,34 @@ def wmi_get_drive_info_ex(usb_disk):
# print (r)
return r
def wmi_get_physicaldrive_info_ex(diskIndex):
drv_list = [d for d in wmi.WMI().Win32_DiskDrive()
if d.Index == diskIndex]
assert len(drv_list)==1
d = collect_relevant_physicaldrive_info(drv_list[0])
r = {}
for src, dst in [
('Size', 'size_total'),
('Model', 'model'),
('Manufacturer', 'vendor'),
('MediaType', 'mediatype'),
('SerialNumber', 'uuid'),
('DeviceID', 'label'),
]:
r[dst] = getattr(d, src)
r['devtype'] = 'disk'
r['size_free'] = 0
r['file_system'] = 'N/A'
r['mount_point'] = 'N/A'
return r
def dd_win_clean_usb(usb_disk_no, status_update):
"""
def win_physicaldrive_to_listbox_entry(pdrive):
return '%d:%s' % (pdrive.Index,pdrive.Model)
"""
host_dir = multibootusb_host_dir()
temp_file = os.path.join(host_dir, "preference", "disk_part.txt")
diskpart_text_feed = 'select disk ' + str(usb_disk_no) + '\nclean\nexit'
with open(temp_file, 'wb') as fp:
fp.write(bytes(diskpart_text_feed, 'utf-8'))
status_update('Cleaning the disk...')
if subprocess.call('diskpart.exe -s ' + temp_file ) == 0:
for i in range(40):
# Wait for the drive to reappear if it has ever gone away.
time.sleep(0.25)
with open('\\\\.\\PhysicalDrive%d' % usb_disk_no, 'rb'):
return
raise RuntimeError('PhysicalDrive%d is now gone!' % usb_disk_no)
raise RuntimeError("Execution of diskpart.exe has failed.")
def win_volume_to_listbox_entry(v):
return v.DeviceID
class Base:
@ -187,6 +254,8 @@ class Base:
def dd_iso_image(self, input_, output, gui_update, status_update):
''' Implementation for OS that use dd to write the iso image.
'''
in_file_size = os.path.getsize(input_)
cmd = [self.dd_exe, 'if=' + input_,
'of=' + self.physical_disk(output), 'bs=1M']
@ -215,48 +284,25 @@ class Windows(Base):
def __init__(self):
self.dd_exe = resource_path('data/tools/dd/dd.exe')
def dd_add_args(self, cmd_vec, input, output, bs, count):
pass
def dd_iso_image_prepare(self, input, output, status_update):
return dd_win_clean_usb(get_physical_disk_number(output),
status_update)
def dd_iso_image_add_args(self, cmd_vec, input_, output):
cmd_vec.append('--progress')
def add_dd_iso_image_popen_args(self, dd_iso_image_popen_args):
dd_iso_image_popen_args['universal_newlines'] = True
def dd_iso_image_readoutput(self, dd_process, gui_update, in_file_size,
output_q):
for line in iter(dd_process.stderr.readline, ''):
line = line.strip()
if line:
l = line.replace(',', '')
if l[-1:] == 'M':
bytes_copied = float(l.rstrip('M')) * 1024 * 1024
elif l.isdigit():
bytes_copied = float(l)
else:
if 15 < output_q.qsize():
output_q.get()
output_q.put(line)
continue
gui_update(bytes_copied / in_file_size * 100.)
continue
# Now the 'dd' process should have completed or going to soon.
def dd_iso_image_interpret_result(self, returncode, output_list):
# dd.exe always returns 0
if any([ 'invalid' in s or 'error' in s for s
in [l.lower() for l in output_list] ]):
return '\n'.join(output_list)
else:
return None
def dd_iso_image(self, input_, output, gui_update, status_update):
assert type(output) is int
status_update('Zapping PhyiscalDisk%d' % output)
win32.ZapPhysicalDrive(output, wmi_get_volume_info_on, log)
# Ouch. Needs sometime for the zapping to take effect...
# Better way than sleeping constant time?
time.sleep(3)
status_update('Writing to PhysicalDisk%d' % output)
in_file_size = os.path.getsize(input_)
with win32.openHandle('\\\\.\\PhysicalDrive%d' % output,
True, False, log) as hDrive:
hDrive.LockPhysicalDrive()
hDrive.CopyFrom(input_, lambda bytes_copied:
gui_update(float(bytes_copied)/in_file_size*100.))
def physical_disk(self, usb_disk):
return r'\\.\physicaldrive%d' % get_physical_disk_number(usb_disk)
if type(usb_disk) is str:
usb_disk = get_physical_disk_number(usb_disk)
return r'\\.\physicaldrive%d' % usb_disk
def mbusb_log_file(self):
return os.path.join(os.getcwd(), 'multibootusb.log')
@ -268,8 +314,32 @@ class Windows(Base):
return os.path.join(tempfile.gettempdir(), "multibootusb")
def gpt_device(self, dev_name):
partition, disk = wmi_get_drive_info(dev_name)
return partition.Type.startswith('GPT:')
if type(dev_name) is int:
diskIndex = dev_name
for p in wmi.WMI().Win32_DiskPartition():
if p.DiskIndex == diskIndex:
return p.Type.startswith('GPT:')
log(usb_disk_desc(dev_name) + ' seems not partitioned. ' +
'assuming msdos.')
return False
else:
partition, disk = wmi_get_drive_info(dev_name)
return partition.Type.startswith('GPT:')
def usb_disk_desc(self, dev_name):
if type(dev_name) is int:
return 'PhysicalDrive%d' % dev_name
return dev_name
def listbox_entry_to_device(self, lb_entry):
left = lb_entry.split(':', 1)[0]
if left.isdigit():
return int(left) # see win_physicaldrive_to_listbox_entry()
else:
return lb_entry # see win_volume_to_listbox_entry()
def qemu_more_params(self, disk):
return ['-L', '.', '-boot', 'c', '-hda', self.physical_disk(disk)]
class Linux(Base):
@ -326,7 +396,7 @@ class Linux(Base):
return os.path.join(os.path.expanduser('~'), ".multibootusb")
def gpt_device(self, dev_name):
disk_dev = dev_name.rstrip('0123456789')
disk_dev = self.physical_disk(dev_name)
cmd = ['parted', disk_dev, '-s', 'print']
with open(os.devnull) as devnull:
p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
@ -345,6 +415,14 @@ class Linux(Base):
raise RuntimeError("Disk '%s' is uninitialized and not usable." %
disk_dev)
def usb_disk_desc(self, dev_name):
return dev_name
def listbox_entry_to_device(self, lb_entry):
return lb_entry
def qemu_more_params(self, disk):
return ['-hda', self.physical_disk(disk), '-vga', 'std']
driverClass = {
'Windows' : Windows,
@ -362,10 +440,13 @@ for func_name in [
'find_mounted_partitions_on',
'multibootusb_host_dir',
'gpt_device',
'listbox_entry_to_device',
'usb_disk_desc',
'qemu_more_params',
]:
globals()[func_name] = getattr(osdriver, func_name)
def init_logging():
def initialize():
logging.root.setLevel(logging.DEBUG)
fmt = '%(asctime)s.%(msecs)03d %(name)s %(levelname)s %(message)s'
datefmt = '%H:%M:%S'
@ -373,3 +454,7 @@ def init_logging():
osdriver.mbusb_log_file(), 'a', 1024*1024, 5)
the_handler.setFormatter(logging.Formatter(fmt, datefmt))
logging.root.addHandler(the_handler)
if platform.system() == 'Windows':
import pythoncom
pythoncom.CoInitialize()

@ -16,6 +16,7 @@ from PyQt5 import QtWidgets
from .gui.ui_multibootusb import Ui_MainWindow
from .gen import *
from . import config
from . import osdriver
from . import usb
class Qemu(QtWidgets.QMainWindow, Ui_MainWindow):
@ -100,16 +101,8 @@ class Qemu(QtWidgets.QMainWindow, Ui_MainWindow):
self, 'No disk...',
'No USB disk selected.\n\nPlease choose a disk first.')
return
if platform.system() == "Windows":
disk_number = get_physical_disk_number(config.usb_disk)
qemu_more_params = ['-L', '.', '-boot', 'c', '-hda',
'//./PhysicalDrive' + str(disk_number)]
elif platform.system() == "Linux":
qemu_more_params = ['-hda', config.usb_disk.rstrip('0123456789'),
'-vga', 'std']
else:
assert False, "Unknown platform '%s'" % platform.system()
self.run_qemu(self.qemu_usb_ram(), qemu_more_params,
more_params = osdriver.qemu_more_params(config.usb_disk)
self.run_qemu(self.qemu_usb_ram(), more_params,
"ERROR: USB Boot: qemu not found!",
'Error...', 'Error booting USB\nUnable to start QEMU.')

@ -25,11 +25,9 @@ from . import osdriver
if platform.system() == 'Linux':
from . import udisks
UDISKS = udisks.get_udisks(ver=None)
if platform.system() == 'Windows':
import psutil
import win32com.client
# import wmi
import pythoncom
def is_block(usb_disk):
@ -174,29 +172,14 @@ def list_devices(fixed=False):
devices.sort()
elif platform.system() == "Windows":
if fixed is True:
for drive in psutil.disk_partitions():
if 'cdrom' in drive.opts or drive.fstype == '':
# Skip cdrom drives or the disk with no filesystem
continue
devices.append(drive[0][:-1])
else:
try:
# Try new method using psutil. It should also detect USB 3.0 (but not tested by me)
for drive in psutil.disk_partitions():
if 'cdrom' in drive.opts or drive.fstype == '':
# Skip cdrom drives or the disk with no filesystem
continue
if 'removable' in drive.opts:
devices.append(drive[0][:-1])
except:
# Revert back to old method if psutil fails (which is unlikely)
oFS = win32com.client.Dispatch("Scripting.FileSystemObject")
oDrives = oFS.Drives
for drive in oDrives:
if drive.DriveType == 1 and drive.IsReady:
devices.append(drive)
volumes = osdriver.wmi_get_volume_info_all()
devices = []
for pdrive in osdriver.wmi_get_physicaldrive_info_all():
if (not fixed) and pdrive.MediaType != 'Removable Media':
continue
devices.append(osdriver.win_physicaldrive_to_listbox_entry(pdrive))
devices.extend([osdriver.win_volume_to_listbox_entry(d)
for d in volumes.get(pdrive.Index, [])])
if devices:
return devices
else:
@ -501,10 +484,10 @@ def repair_vfat_filesystem(usb_disk, result=None):
result.append((p.returncode, output, ' '.join(cmd)))
return None
def details(usb_disk_part):
def details(disk_or_partition):
"""
Populate and get details of an USB disk.
:param usb_disk_part: USB disk. Example.. "/dev/sdb1" on Linux and "D:\" on Windows.
:param disk_or_partition: USB disk. Example.. "/dev/sdb1" on Linux and "D:\" on Windows.
:return: label == > returns name/label of an inserted USB device.
mount_point == > returns mount path of an inserted USB device.
uuid == > returns uuid of an inserted USB device.
@ -517,18 +500,20 @@ def details(usb_disk_part):
model == > returns the model name of the USB.
"""
assert usb_disk_part is not None
assert disk_or_partition is not None
details = {}
if platform.system() == 'Linux':
try:
details = details_udev(usb_disk_part)
details = details_udev(disk_or_partition)
except:
details = details_udisks2(usb_disk_part)
details = details_udisks2(disk_or_partition)
elif platform.system() == 'Windows':
details = osdriver.wmi_get_drive_info_ex(usb_disk_part)
if type(disk_or_partition) == int:
details = osdriver.wmi_get_physicaldrive_info_ex(disk_or_partition)
else:
details = osdriver.wmi_get_volume_info_ex(disk_or_partition)
return details

@ -0,0 +1,258 @@
import collections
import ctypes
import io
import pywintypes
import struct
import sys
import time
import win32api
import win32con
import win32file
import winerror
import winioctlcon
import wmi
from ctypes import wintypes
from functools import reduce
kernel32 = ctypes.WinDLL('kernel32') # , use_last_error=True)
kernel32.FindFirstVolumeW.restype = wintypes.HANDLE
kernel32.FindNextVolumeW.argtypes = (wintypes.HANDLE,
wintypes.LPWSTR,
wintypes.DWORD)
kernel32.FindVolumeClose.argtypes = (wintypes.HANDLE,)
def FindFirstVolume():
volume_name = ctypes.create_unicode_buffer(" " * 255)
h = kernel32.FindFirstVolumeW(volume_name, 255)
if h == win32file.INVALID_HANDLE_VALUE:
raise RuntimeError("FindFirstVolume() returned an invalid handle.")
return h, volume_name.value
def FindNextVolume(hSearch):
volume_name = ctypes.create_unicode_buffer(" " * 255)
if kernel32.FindNextVolumeW(hSearch, volume_name, 255) != 0:
return volume_name.value
else:
errno = ctypes.GetLastError()
if errno == winerror.ERROR_NO_MORE_FILES:
FindVolumeClose(hSearch)
return None
raise RuntimeError("FindNextVolume failed (%s)" % errno)
def FindVolumeClose(hSearch):
"""Close a search handle opened by FindFirstVolume, typically
after the last volume has been returned.
"""
if kernel32.FindVolumeClose(hSearch) == 0:
raise RuntimeError("FindVolumeClose() failed.")
def findAvailableDrives():
return [(d, win32file.GetDriveType(d)) for d in
win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')]
def findNewDriveLetter(used_letters):
all_letters = set([chr(i) for i in range(ord('C'), ord('Z')+1)])
return min(list(all_letters - set([s[0] for s in used_letters])))
def _openHandle(path, bWriteAccess, bWriteShare,
logfunc = lambda s: None):
TIMEOUT, NUM_RETRIES = 10, 20
for retry_count in range(6):
try:
access_flag = win32con.GENERIC_READ | \
(bWriteAccess and win32con.GENERIC_WRITE or 0)
share_flag = win32con.FILE_SHARE_READ | \
(bWriteShare and win32con.FILE_SHARE_WRITE or 0)
handle = win32file.CreateFile(
path, access_flag, share_flag, None,
win32con.OPEN_EXISTING, win32con.FILE_ATTRIBUTE_NORMAL, None)
nth = { 0: 'first', 1:'second', 2:'third'}
logfunc("Opening [%s]: success at the %s iteration" %
(path, nth.get(retry_count, '%sth' % (retry_count+1))))
return handle
except pywintypes.error as e:
logfunc('Exception=>'+str(e))
if NUM_RETRIES/3 < retry_count:
bWriteShare = True
time.sleep(TIMEOUT / float(NUM_RETRIES))
else:
raise RuntimeError("Couldn't open handle for %s." % path)
def _closeHandle(h):
x = win32file.CloseHandle(h)
assert x != win32file.INVALID_HANDLE_VALUE
return x
class openHandle:
def __init__(self, path, bWriteAccess, bWriteShare,
logfunc = lambda s: None):
self.path = path
self.bWriteAccess = bWriteAccess
self.bWriteShare = bWriteShare
self.logfunc = logfunc
self.h = None
def __enter__(self):
self.h = _openHandle(self.path, self.bWriteAccess, self.bWriteShare,
self.logfunc)
return self
def __exit__(self, type_, value, traceback_):
_closeHandle(self.h)
def assert_physical_drive(self):
if self.path.lower().find('physicaldrive')<0:
raise RuntimeError("Handle is not one of a physical drive.")
def LockPhysicalDrive(self):
self.assert_physical_drive()
lockPhysicalDrive(self.h, self.logfunc)
self.logfunc("Successfully locked '%s'" % self.path)
def ReadFile(self, size):
return win32file.ReadFile(self.h, size, None)
def WriteFile(self, b):
return win32file.WriteFile(self.h, b, None)
geometory_tuple = collections.namedtuple(
'DiskGeometory',
['number_of_cylinders', 'media_type', 'tracks_per_cylinder',
'sectors_per_track', 'bytes_per_sector', 'disk_size'])
def DiskGeometory(self):
self.assert_physical_drive()
o = win32file.DeviceIoControl(
self.h, winioctlcon.IOCTL_DISK_GET_DRIVE_GEOMETRY_EX,
None, 256, None)
return self.geometory_tuple(*struct.unpack('<qiiiiq', o[:32]))
MAX_SECTORS_TO_CLEAR=128
def ZapMBRGPT(self, disk_size, sector_size, add1MB):
self.assert_physical_drive()
# Implementation borrowed from rufus: https://github.com/pbatard/rufus
num_sectors_to_clear \
= (add1MB and 2048 or 0) + self.MAX_SECTORS_TO_CLEAR
zeroBuf = b'\0' * sector_size
for i in range(num_sectors_to_clear):
self.WriteFile(zeroBuf)
offset = disk_size - self.MAX_SECTORS_TO_CLEAR * sector_size
win32file.SetFilePointer(self.h, offset, win32con.FILE_BEGIN)
for i in range(num_sectors_to_clear):
self.WriteFile(zeroBuf)
# We need to append paddings as CREATE_DISK structure contains a union.
param = struct.pack('<IIIHH8s',
winioctlcon.PARTITION_STYLE_MBR, 0xdeadbeef,
0,0,0,b'abcdefgh')
win32file.DeviceIoControl(
self.h, winioctlcon.IOCTL_DISK_CREATE_DISK, param, 0, None)
def CopyFrom(self, src_file, progress_cb):
with openHandle(src_file, True, False,
lambda s:sys.stdout.write(s+'\n')) as src:
total_bytes = 0
hr, b = src.ReadFile(1024*1024)
# win32file.ReadFile() seems to have a bug in the interpretation
# of 'hr'. https://sourceforge.net/p/pywin32/bugs/689/
# The following loop condition is a workaround, which may not
# work properly.
while hr == 0 and len(b):
win32file.WriteFile(self.h, b, None)
total_bytes += len(b)
progress_cb(total_bytes)
hr, b = src.ReadFile(1024*1024)
def lockPhysicalDrive(handle, logfunc=lambda s: None):
try:
win32file.DeviceIoControl(
handle, winioctlcon.FSCTL_ALLOW_EXTENDED_DASD_IO,
None, 0, None)
except pywintypes.error as e:
logfunc('IO boundary checks diabled.')
for retry in range(20):
try:
win32file.DeviceIoControl(handle, winioctlcon.FSCTL_LOCK_VOLUME,
None, 0, None)
return
except pywintypes.error as e:
logfunc( str(e) )
time.sleep(1)
raise RuntimeError("Couldn't lock the Volume.")
def findVolumeGuids():
DiskExtent = collections.namedtuple(
'DiskExtent', ['DiskNumber', 'StartingOffset', 'ExtentLength'])
Volume = collections.namedtuple(
'Volume', ['Guid', 'MediaType', 'DosDevice', 'Extents'])
found = []
h, guid = FindFirstVolume()
while h and guid:
#print (guid)
#print (guid, win32file.GetDriveType(guid),
# win32file.QueryDosDevice(guid[4:-1]))
hVolume = win32file.CreateFile(
guid[:-1], win32con.GENERIC_READ,
win32con.FILE_SHARE_READ | win32con.FILE_SHARE_WRITE,
None, win32con.OPEN_EXISTING, win32con.FILE_ATTRIBUTE_NORMAL, None)
extents = []
driveType = win32file.GetDriveType(guid)
if driveType in [win32con.DRIVE_REMOVABLE, win32con.DRIVE_FIXED]:
x = win32file.DeviceIoControl(
hVolume, winioctlcon.IOCTL_VOLUME_GET_VOLUME_DISK_EXTENTS,
None, 512, None)
instream = io.BytesIO(x)
numRecords = struct.unpack('<q', instream.read(8))[0]
fmt = '<qqq'
sz = struct.calcsize(fmt)
while 1:
b = instream.read(sz)
if len(b) < sz:
break
rec = struct.unpack(fmt, b)
extents.append( DiskExtent(*rec) )
vinfo = Volume(guid, driveType, win32file.QueryDosDevice(guid[4:-1]),
extents)
found.append(vinfo)
guid = FindNextVolume(h)
return found
def ZapPhysicalDrive(target_drive, get_volume_info_func, log_func):
with openHandle('\\\\.\\PhysicalDrive%d' % target_drive, True, False,
lambda s:sys.stdout.write(s+'\n')) as hDrive:
hDrive.LockPhysicalDrive()
geom = hDrive.DiskGeometory()
for v in get_volume_info_func(target_drive):
volume_path = '\\\\.\\'+v.DeviceID
log_func('Dismounting volume ' + volume_path)
with openHandle(volume_path, False, False) as h:
x = win32file.DeviceIoControl(
h.h, winioctlcon.FSCTL_DISMOUNT_VOLUME, None, None)
print ('FSCTL_DISMOUNT_VOLUME=>%s' % x)
x = win32file.DeleteVolumeMountPoint(volume_path+'\\')
log_func('DeleteVolumeMountPoint=>%s' % x)
else:
log_func('No volumes on %s' % target_drive)
add1MB = False
hDrive.ZapMBRGPT(geom.disk_size, geom.bytes_per_sector, add1MB)
if __name__ == '__main__':
# used_letters = [d for d in
# win32api.GetLogicalDriveStrings().rstrip('\0').split('\0')]
# print (used_letters)
# print (findNewDriveLetter(used_letters))
# TargetDrive = 2
# vinfo_list = [x for x in findVolumeGuids()
# if x.Extents and x.Extents[0].DiskNumber==TargetDrive]
TargetDrive = 5
with openHandle('\\\\.\\PhysicalDrive%d' % TargetDrive, True, False,
lambda s:sys.stdout.write(s+'\n')) as hDrive:
hDrive.CopyFrom('c:/Users/shinj/Downloads/salitaz-rolling_iso',
lambda b: None)
Loading…
Cancel
Save