-
Notifications
You must be signed in to change notification settings - Fork 13
/
rmtdd
executable file
·118 lines (105 loc) · 4.06 KB
/
rmtdd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
#!/usr/bin/env python3
# rmtdd -- remote dd using 'rmt'
#
# The 'rmt' tool comes installed as part of 'tar', which uses it to access
# remote tape devices; it translates stdio commands to file operations.
import argparse
import os
import subprocess
import sys
def bcopy(src, dst, count, bufsz):
while count:
buf = src.read(min(count, bufsz))
if blen := len(buf):
dst.write(buf)
count -= blen
else:
break
class RmtFile:
def __init__(self, host, *, exe=None):
if not exe:
# Autodetect /etc/rmt (traditional) and /usr/lib/tar/rmt (Arch)
exe = f"PATH=$PATH:/etc:/usr/lib/tar; rmt"
self.proc = subprocess.Popen(["ssh", host, exe],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE)
def _transact(self, cmd, args=(), odata=None):
obuf = cmd.encode()
for arg in args:
if type(arg) not in {bytes, bytearray}:
arg = str(arg).encode()
obuf += arg + b"\n"
self.proc.stdin.write(obuf)
if odata:
self.proc.stdin.write(odata)
self.proc.stdin.flush()
ibuf = self.proc.stdout.read(1)
if ibuf == b"A":
ibuf = self.proc.stdout.readline()
code = int(ibuf.decode())
return int(code)
elif ibuf == b"E":
ibuf = self.proc.stdout.readline()
code = int(ibuf.decode())
ibuf = self.proc.stdout.readline()
msg = ibuf.decode()
raise IOError(f"Remote error: {ibuf}")
else:
ibuf += self.proc.stdout.readline()
raise IOError(f"Protocol error: {ibuf}")
def _sockread(self, count):
return self.proc.stdout.read(count)
def open(self, dev, mode=os.O_RDONLY):
return self._transact(f"O{dev}\n{int(mode)}\n")
def close(self, dev=""):
return self._transact(f"C{dev}\n")
def seek(self, offset, whence=os.SEEK_SET):
# Documentation says Lwhence\noffset\n, but that's wrong.
return self._transact("L", [offset, int(whence)])
def read(self, count):
rcount = self._transact(f"R{count}\n")
return self._sockread(rcount)
def write(self, obuf):
count = len(obuf)
wcount = self._transact("W", [count], obuf)
if wcount != count:
raise IOError(f"Truncated write (asked for {count}, wrote {wcount})")
return wcount
parser = argparse.ArgumentParser()
parser.add_argument("-w", "--write", action="store_true",
help="write stdin to remote file (default is to read)")
parser.add_argument("-s", "--offset", type=int, default=0,
help="byte offset to seek remote file to")
parser.add_argument("-n", "--length", type=int, default=0,
help="byte count to read (required) or write (optional)")
parser.add_argument("-b", "--bsize", type=int, default=4096,
help="block size for read/write operations")
parser.add_argument("-x", "--rmt-path", metavar="PATH",
help="path to remote 'rmt' executable")
parser.add_argument("host",
help="SSH host specification to connect to")
parser.add_argument("dev",
help="path to remote file")
args = parser.parse_args()
sys.stdin = sys.stdin.detach()
sys.stdout = sys.stdout.detach()
if args.length:
rmt = RmtFile(args.host, exe=args.rmt_path)
if args.write:
rmt.open(args.dev, os.O_WRONLY)
rmt.seek(args.offset, os.SEEK_SET)
bcopy(sys.stdin, rmt, args.length, args.bsize)
else:
rmt.open(args.dev, os.O_RDONLY)
rmt.seek(args.offset, os.SEEK_SET)
bcopy(rmt, sys.stdout, args.length, args.bsize)
rmt.close()
elif args.write:
rmt = RmtFile(args.host, exe=args.rmt_path)
rmt.open(args.dev, os.O_WRONLY)
rmt.seek(args.offset, os.SEEK_SET)
while buf := sys.stdin.read(args.bsize):
rmt.write(buf)
rmt.close()
else:
exit(f"error: must specify length when reading")