forked from storaged-project/api-examples
-
Notifications
You must be signed in to change notification settings - Fork 0
/
demo-1-libblockdev-python
executable file
·135 lines (104 loc) · 4.16 KB
/
demo-1-libblockdev-python
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/python3
import sys
import os
import gi
gi.require_version("GLib", "2.0")
gi.require_version("BlockDev", "2.0")
from gi.repository import GLib
from gi.repository import BlockDev as bd
REQUESTED_PLUGIN_NAMES = {"lvm", "swap", "crypto", "fs"}
requested_plugins = bd.plugin_specs_from_names(REQUESTED_PLUGIN_NAMES)
try:
succ_ = bd.init(requested_plugins)
except GLib.GError as err:
raise RuntimeError("Failed to initialize libbd and its plugins (%s)" % REQUESTED_PLUGIN_NAMES)
MAX_SWAP_SIZE = 1024 ** 3 # 1 GiB
MAX_SWAP_PORTION = 0.1 # 10 %
SWAP_LV_NAME = "swap"
SWAP_LABEL = "demoswap"
DATA_LV_NAME = "data"
DATA_LABEL = "demodata"
LUKS_PREFIX = "luks-"
PASSPHRASE = "myshinylittlepassphrase"
def create_setup(disk1, disk2, demo_name=sys.argv[0]):
for disk in (disk1, disk2):
try:
bd.fs.wipe(disk, True)
except bd.FSError as e:
# wipe() fails when the device is already empty, but that's okay for us
if str(e).startswith("No signature"):
pass
else:
# some other error is a real one, though
raise
# format the disks as PVs
bd.lvm.pvcreate(disk1)
bd.lvm.pvcreate(disk2)
# get the name of the VG
demo_name = os.path.basename(demo_name).rsplit(".")[0]
vg_name = demo_name.replace("-", "_")
# create the VG with extent size of 8 MiB
bd.lvm.vgcreate(vg_name, [disk1, disk2], pe_size=8 * 1024**2)
# get the information about the VG
vgi = bd.lvm.vginfo(vg_name)
# determine the size of the swap LV and create it
size = min(MAX_SWAP_SIZE, vgi.free * MAX_SWAP_PORTION)
# create the swap LV
bd.lvm.lvcreate(vg_name, SWAP_LV_NAME, size)
lv_path = "/dev/%s/%s" % (vg_name, SWAP_LV_NAME)
bd.swap.mkswap(lv_path, SWAP_LABEL)
# get the up to date information about the VG (the amount of free space has changed)
vgi = bd.lvm.vginfo(vg_name)
# create the data LV in the rest of VG's space
bd.lvm.lvcreate(vg_name, DATA_LV_NAME, vgi.free)
# create the LUKS format on the data LV and open (unlock) it
lv_path = "/dev/%s/%s" % (vg_name, DATA_LV_NAME)
bd.crypto.luks_format(lv_path, passphrase=PASSPHRASE)
bd.crypto.luks_open(lv_path, LUKS_PREFIX + DATA_LV_NAME, PASSPHRASE)
# create an XFS file system on the LUKS device
luks_path = "/dev/mapper/" + LUKS_PREFIX + DATA_LV_NAME
# bd.fs.mkfs_xfs() doesn't take any 'label' argument, but it supports extra
# arguments which allow label to be specified
label_extra = {"-L": DATA_LABEL}
bd.fs.xfs_mkfs(luks_path, extra=label_extra)
def clean_setup(disk1, disk2, demo_name=sys.argv[0]):
# we need to go from leaves when tearing devices down
# first close (lock) the LUKS device
luks_path = "/dev/mapper/" + LUKS_PREFIX + DATA_LV_NAME
bd.crypto.luks_close(luks_path)
# get the name of the VG
demo_name = os.path.basename(demo_name).rsplit(".")[0]
vg_name = demo_name.replace("-", "_")
# remove the LVs
bd.lvm.lvremove(vg_name, DATA_LV_NAME)
bd.lvm.lvremove(vg_name, SWAP_LV_NAME)
# remove the VG
bd.lvm.vgremove(vg_name)
# clear the PV signatures (LVM metadata)
for disk in (disk1, disk2):
bd.lvm.pvremove(disk)
if __name__ == "__main__":
if os.geteuid() != 0:
print("Requires to be run as root!", file=sys.stderr)
exit(1)
if len(sys.argv) < 3:
print("Requires at least two arguments!", file=sys.stderr)
print("Usage: %s [--cleanup] DISK1 DISK2", sys.argv[0], file=sys.stderr)
exit(1)
cleanup = False
if "--cleanup" in sys.argv:
if len(sys.argv) < 4:
print("--cleanup requires disks to clean!", file=sys.stderr)
print("Usage: %s [--cleanup] DISK1 DISK2", sys.argv[0], file=sys.stderr)
exit(1)
cleanup = True
sys.argv.remove("--cleanup")
disk1 = sys.argv[1]
disk2 = sys.argv[2]
if cleanup:
clean_setup(disk1, disk2)
else:
ans = input("About to remove all existing metadata from %s and %s. Is this okay? [y/N] " % (disk1, disk2))
if ans not in ("y", "Y", "YES", "yes", "Yes"):
exit(0)
create_setup(disk1, disk2)