From 0f23eff404c4ae5288e6ea933fa6f3f4ea9dc7de Mon Sep 17 00:00:00 2001 From: Vignesh Rao Date: Thu, 28 Nov 2024 11:12:28 -0600 Subject: [PATCH] Add draft scripts for macOS drive split --- pyninja/macOS/diskutil.py | 54 ++++++++++++++++++++++++++++++++ pyninja/macOS/draft.py | 59 +++++++++++++++++++++++++++++++++++ pyninja/macOS/sys_profiler.py | 25 +++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 pyninja/macOS/diskutil.py create mode 100644 pyninja/macOS/draft.py create mode 100644 pyninja/macOS/sys_profiler.py diff --git a/pyninja/macOS/diskutil.py b/pyninja/macOS/diskutil.py new file mode 100644 index 0000000..2a8417f --- /dev/null +++ b/pyninja/macOS/diskutil.py @@ -0,0 +1,54 @@ +import re +import shutil +import subprocess + + +def size_it(input_string): + match = re.search(r'\((\d+) Bytes\)', input_string) + if match: + return int(match.group(1)) + + +def get_mountpoints(disk_node): + result = subprocess.run( + [f"diskutil info -plist {disk_node}"], shell=True, capture_output=True, text=True + ) + import plistlib + if mountpoint := plistlib.loads(result.stdout.encode())["MountPoint"]: + return mountpoint + else: + print("No mountpoint found!!") + return "/" + + +def diskutil_all(): + result = subprocess.run("diskutil info -all", shell=True, capture_output=True, text=True) + disks = [] + data_dict = {} + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + if line == "**********": + disks.append(data_dict) + data_dict = {} + else: + key, value = line.split(":", 1) + data_dict[key.strip()] = value.strip() + data = [] + for disk in disks: + if disk.get("Virtual") == "No": + new_dict = { + **{ + "Name": disk["Device / Media Name"], + "Size": size_it(disk["Disk Size"]), + "DeviceID": disk["Device Identifier"], + "Node": disk["Device Node"] + }, + **shutil.disk_usage( + get_mountpoints(disk["Device Node"]) + )._asdict() + } + new_dict["total"] = new_dict["Size"] + data.append(new_dict) + return data diff --git a/pyninja/macOS/draft.py b/pyninja/macOS/draft.py new file mode 100644 index 0000000..2983fcd --- /dev/null +++ b/pyninja/macOS/draft.py @@ -0,0 +1,59 @@ +import json +import re +import subprocess + + +def size_it(input_string): + match = re.search(r'\((\d+) Bytes\)', input_string) + if match: + return int(match.group(1)) + + +def update_mountpoints(disks, device_ids): + for disk in disks: + if disk.get("Mount Point"): + if disk.get("Part of Whole") in device_ids.keys(): + device_ids[disk["Part of Whole"]].append(disk.get("Mount Point")) + for device_id in device_ids: + if disk.get("APFS Physical Store", "").startswith(device_id): + device_ids[device_id].append(disk.get("Mount Point")) + return device_ids + + +def diskutil_all(): + result = subprocess.run("diskutil info -all", shell=True, capture_output=True, text=True) + disks = [] + data_dict = {} + for line in result.stdout.splitlines(): + line = line.strip() + if not line: + continue + if line == "**********": + disks.append(data_dict) + data_dict = {} + else: + key, value = line.split(":", 1) + data_dict[key.strip()] = value.strip() + data = [] + device_ids = {} + for disk in disks: + if disk.get("Virtual") == "No": + new_dict = { + **{ + "Name": disk["Device / Media Name"], + "Size": size_it(disk["Disk Size"]), + "DeviceID": disk["Device Identifier"], + "Node": disk["Device Node"], + }, + } + data.append(new_dict) + device_ids[disk["Device Identifier"]] = [] + mountpoints = update_mountpoints(disks, device_ids) + for device in data: + device["Mountpoints"] = mountpoints[device["DeviceID"]] + return data + + +if __name__ == '__main__': + dump = diskutil_all() + print(json.dumps(dump, indent=2)) diff --git a/pyninja/macOS/sys_profiler.py b/pyninja/macOS/sys_profiler.py new file mode 100644 index 0000000..25d6e76 --- /dev/null +++ b/pyninja/macOS/sys_profiler.py @@ -0,0 +1,25 @@ +import json +import subprocess + + +def sys_profiler(): + result = subprocess.run( + ["/usr/sbin/system_profiler", "SPStorageDataType", "-json"], capture_output=True, text=True + ) + data = json.loads(result.stdout) + usable_volumes = [] + for volume in data["SPStorageDataType"]: + if volume.get("writable") == "yes": + usable_volumes.append(volume) + for mlist in usable_volumes: + new_dict = { + "DeviceID": mlist["_name"], + "Size": mlist["size_in_bytes"], + "Name": mlist["physical_drive"]["device_name"], + "Mountpoints": mlist["mount_point"] + } + print(new_dict) + + +if __name__ == '__main__': + sys_profiler()