-
Notifications
You must be signed in to change notification settings - Fork 0
/
p4fs.py
executable file
·139 lines (109 loc) · 3.61 KB
/
p4fs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#!/usr/bin/env python
import argparse
from collections import defaultdict
from errno import ENOENT
import sys
import time
from fuse import FUSE, FuseOSError, Operations, LoggingMixIn
from fsutils import Directory
import p4utils
to_p4_path = lambda x: '/' + x
class P4fs(LoggingMixIn, Operations):
"""
A simple read-only perforce filesystem
"""
def __init__(self, port, user):
"""
Init method
"""
self.files = {}
self.fd = 0
self.files['/'] = Directory('/')
self.p4 = p4utils.P4utils(port, user)
def chmod(self, path, mode):
# Not implemented
return 0
def chown(self, path, uid, gid):
# Not implemented
return
def create(self, path, mode):
# Not implemented
return 0
def getattr(self, path, fh=None):
if path == '/':
return self.files[path].attrs
result = self.p4.get_attrs(to_p4_path(path))
if not result:
raise FuseOSError(ENOENT)
return result
def getxattr(self, path, name, position=0):
# Extended attributes not supported
return '' # should return ENOATTR, but it's not in errno
def listxattr(self, path):
return []
def mkdir(self, path, mode):
# Not implemented
return
def open(self, path, flags):
self.fd += 1
return self.fd
def read(self, path, size, offset, fh):
# Naive implementation; no caching
data = self.p4.get_file(to_p4_path(path))
return data[offset:offset + size]
def readdir(self, path, fh):
#return ['.', '..'] + [x[1:] for x in self.files if x != '/']
dirents = ['..', '..']
dirents.extend(self.p4.listdir(to_p4_path(path)))
return dirents
def readlink(self, path):
return
def removexattr(self, path, name):
attrs = self.files[path].get('attrs', {})
try:
del attrs[name]
except KeyError:
pass # Should return ENOATTR
def rename(self, old, new):
# Not implemented
self.files[new] = self.files.pop(old)
def rmdir(self, path):
# XXX: Not implemented
self.files.pop(path)
self.files['/']['st_nlink'] -= 1
def setxattr(self, path, name, value, options, position=0):
# Ignore options
attrs = self.files[path].setdefault('attrs', {})
attrs[name] = value
def statfs(self, path):
return dict(f_bsize=512, f_blocks=4096, f_bavail=2048)
def symlink(self, target, source):
# Not implemented
return 0
def truncate(self, path, length, fh=None):
# Not implemented
return 0
def unlink(self, path):
# Not implemented
return 0
def utimens(self, path, times=None):
# Not implemented
return
def write(self, path, data, offset, fh):
# Not implemented
return 0
def parse_opts(opts):
"""
Parse command line arguments
"""
parser = argparse.ArgumentParser()
parser.add_argument('--port', '-p', help='port to connect to')
parser.add_argument('--user', '-u', help='user to connect as')
parser.add_argument('--debug', '-d', help='show debug info (runs in'
' foreground)', action='store_true', default=False)
parser.add_argument('mountpoint')
return parser.parse_args(opts)
if __name__ == "__main__":
opts = parse_opts(sys.argv[1:])
p4fs = P4fs(opts.port, opts.user)
fuse = FUSE(p4fs, opts.mountpoint, foreground=opts.debug)