-
Notifications
You must be signed in to change notification settings - Fork 0
/
p4utils.py
145 lines (121 loc) · 3.63 KB
/
p4utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""
A module to handle p4 operations
"""
import datetime
import P4
from fsutils import Directory, File
class P4utils(object):
"""
Encapsulates interaction with perforce
"""
def __init__(self, port, user):
self.port = port
self.user = user
p4 = P4.P4()
if port:
p4.port = port
if user:
p4.user = user
p4.connect()
self.p4 = p4
# Raise exceptions on errors only; not warnings
self.p4.exception_level = 1
def listdir(self, path):
"""
Lists the contents of path
"""
if not path.endswith('/'):
path += '/'
dirs = self._get_dirnames(path + '*')
files = []
if path != '//':
# For some reason, p4 files throws an error if called with //*
files = self._get_filenames(path + '*')
return self._normalize_paths(path, dirs + files)
def _normalize_paths(self, root, paths):
return [path.split(root)[1] for path in paths]
def _get_dirnames(self, path):
"""
Get all directories in path
"""
results = self.p4.run('dirs', path)
return [item['dir'] for item in results]
def _get_filenames(self, path):
"""
Get all filenames in path
"""
results = self.p4.run('files', path)
return [item['depotFile'] for item in results
if item['action'].find('delete') == -1]
def _get_filelog(self, path):
"""
Return a list of DepotFile objects at `path`
"""
return self.p4.run_filelog(path)
def get_attrs(self, path):
"""
Get the attributes of path, if it exists. Else, return None
"""
# Try directory first, then file
result = self._get_dirnames(path)
if result:
return P4Directory(path).attrs
result = self._get_filelog(path)
if result:
return P4File(path, result[0]).attrs
return None
def get_file(self, path):
"""
Return the contents of path
"""
handler = P4OutputHandler()
result = self.p4.run('print', path, handler=handler)
# Output is [{'type': 'text', 'action': 'edit'}]
if result[0]['type'].find('text') == -1:
return ''
return handler.output
def __del__(self):
self.p4.disconnect()
class P4Directory(Directory):
"""
A p4 directory, same as a normal directory
"""
pass
class P4File(File):
"""
Represents a p4 file
"""
def __init__(self, path, depotfile):
self.depotfile = depotfile
super(P4File, self).__init__(path)
def get_attrs(self):
attrs = super(P4File, self).get_attrs()
latest_rev = self.depotfile.revisions[0]
time = to_epoch(latest_rev.time)
size = int(latest_rev.fileSize)
override = {
'st_size': size,
'st_atime': time,
'st_ctime': time,
'st_mtime': time,
}
attrs.update(override)
return attrs
class P4OutputHandler(P4.OutputHandler):
"""
Custom output handler, mainly for `p4 print` because it doesn't handle
streaming output correctly
"""
def __init__(self):
P4.OutputHandler.__init__(self)
self.output = ''
def outputText(self, s):
self.output = self.output + s
# Signal that we've already handled the output
return self.HANDLED
def to_epoch(dt):
"""
Converts a datetime object to epoch time
"""
delta = dt - datetime.datetime(1970, 1, 1)
return delta.seconds + delta.days * 24 * 3600