-
Notifications
You must be signed in to change notification settings - Fork 0
/
mp3md.py
executable file
·474 lines (403 loc) · 17.7 KB
/
mp3md.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
#!/usr/bin/python
# TODO:
# - group reports by error-per-dir, or error-global
# - supply checks by file, command line, etc
# - collection-wide thresholds for incremental improvements (error if <50% have tag xxxx)
# - apply fixes on a directory level
# - dependent tests - run only if the other passed. Overwrite operators?
# - documentation sweep
# - test cases (sigh)
# - document recommended order - version checks to make sure everything's OK, check presence + apply sets before bulk dir
# - percentage of nice-to-haves: e.g. TDRL
# - Strip id3v1, or check it's consistent?
# - Option to find incorrect directory structure (Various Artists TPE2 not in Various Artists, non-two-levels-deep structures)
# - Sort alphabetically when scanning
# - Coloured output for errors, or dump to stderr
# - document which ones are 'subtag' (e.g. 'COMM:foo' safe)
# - Generalised reporting mode? Summary of genres etc. by track count.
from mutagen.id3 import ID3
from mutagen.id3 import Frame
from mutagen.id3 import Frames
from optparse import OptionParser
import sys, os, fnmatch, re
class Doctor(object):
def __init__(self, tests):
self.tests = tests
def checkup(self, directory, recursive=False, fix=False):
if recursive:
for dirpath, _, _ in sorted(os.walk(directory)):
self.test_dir(dirpath, fix)
else:
self.test_dir(directory, fix)
def test_dir(self, directory, apply_fixes):
errors = Errors()
valid_tags = self.files_with_valid_tags(directory, errors=errors)
for test in self.tests:
local_errors = Errors()
test.run_check(directory, valid_tags, "WARNING" if test.fix and apply_fixes else "ERROR", local_errors)
if apply_fixes and test.fix and local_errors.has_errors():
tofix = [(path, id3) for (path, id3) in valid_tags if path in local_errors.error_files()]
test.fix.try_fix(directory, valid_tags, tofix, local_errors)
# Update tags in case we're in an unknown state - fixes failed, etc. Some tests may no longer apply.
# Don't record ID3 errors though, we've already gotten those above (unless something's gone horribly
# horribly wrong).
valid_tags = self.files_with_valid_tags(directory, errors=None)
test.run_check(directory, valid_tags, "ERROR", local_errors)
errors.merge(local_errors)
if errors.has_errors():
for file, errormessages in errors.items():
print "%s:" % (file)
for message in errormessages:
print " %s" % (message)
sys.stdout.flush()
def files_with_valid_tags(self, directory, errors=None):
valid_tags = []
files = fnmatch.filter(os.listdir(directory), '*.mp3')
for file in files:
path = os.path.join(directory, file)
try:
id3 = ID3(path)
valid_tags.append((path, id3))
except:
if errors:
errors.record(path, "ERROR", "Unable to find ID3v2 tag")
return valid_tags
class Message(object):
def __init__(self, severity, text):
self.severity = severity
self.text = text
def __str__(self):
return "%s: %s" % (self.severity, self.text)
class Errors(object):
def __init__(self):
self.errors = dict()
def record(self, path, severity, error):
self.errors.setdefault(path, []).append(Message(severity, error))
def has_errors(self):
return len(self.errors) > 0
def error_files(self):
return self.errors.keys()
def items(self):
return self.errors.items()
def merge(self, other):
for (path, errors) in other.items():
for error in errors:
self.errors.setdefault(path, []).append(error)
def demote_all(self, severity):
for (_, errors) in self.errors.items():
for error in errors:
error.severity = severity
class Check(object):
def __init__(self, fix=None):
self.fix = fix
def run_check(self, directory, files, severity, errors):
pass
def get_frame(id3, frametype):
try:
return id3.getall(frametype)[0]
except IndexError:
return None
get_frame = staticmethod(get_frame)
def get_value(id3, frametype, default=None):
frame = Check.get_frame(id3, frametype)
if frame:
# TODO: deal with multi-valued fields
try:
return unicode(frame.text[0])
except UnicodeEncodeError, e:
print "ERROR reading id3: %s", id3
raise e
return default
get_value = staticmethod(get_value)
class FileCheck(Check):
def run_check(self, directory, files, severity, errors):
for file, frames in files:
self.check_file(file, frames, severity, errors)
def check_file(self, file, id3, errors):
pass
class FramePresentCheck(FileCheck):
def __init__(self, frametypes, fix=None):
FileCheck.__init__(self, fix)
self.frametypes = frametypes
def check_file(self, file, id3, severity, errors):
for frametype in self.frametypes:
frame = Check.get_frame(id3, frametype)
if not frame:
errors.record(file, severity, "Required frame %s missing" % frametype)
class FrameAbsentCheck(FileCheck):
def __init__(self, frametypes, fix=None):
FileCheck.__init__(self, fix)
self.frametypes = frametypes
def check_file(self, file, id3, severity, errors):
for frametype in self.frametypes:
frame = Check.get_frame(id3, frametype)
if frame:
errors.record(file, severity, "Banned frame %s present" % frametype)
class FrameWhitelistCheck(FileCheck):
def __init__(self, frametype, whitelist, regex=False, fix=None):
FileCheck.__init__(self, fix)
self.frametype = frametype
self.whitelist = set(whitelist)
self.regex = regex
def check_file(self, file, id3, severity, errors):
frame = Check.get_frame(id3, self.frametype)
if not frame:
return
if self.regex:
valid = [[string for regex in self.whitelist if re.search(regex, string)] for string in frame.text]
if valid:
valid = valid[0]
invalid = [string for string in frame.text if string not in valid]
else:
invalid = [string for string in frame.text if string not in self.whitelist]
if len(invalid) > 0:
errors.record(file, severity, "Frame %s has values not in whitelist %s" % (self.frametype, invalid))
class FrameBlacklistCheck(FileCheck):
def __init__(self, frametype, blacklist, regex=False, fix=None):
FileCheck.__init__(self, fix)
self.frametype = frametype
self.blacklist = set(blacklist)
self.regex = regex
def check_file(self, file, id3, severity, errors):
frame = Check.get_frame(id3, self.frametype)
if not frame:
return
if (self.regex):
invalid = [[string for regex in self.blacklist if re.search(regex, string)] for string in frame.text]
if invalid:
invalid = invalid[0]
else:
invalid = [string for string in frame.text if string in self.blacklist]
if len(invalid) > 0:
errors.record(file, severity, "Frame %s has values %s matching blacklist %s" % (self.frametype, invalid, self.blacklist))
class FrameConsistencyCheck(Check):
def __init__(self, frametypes, fix=None):
Check.__init__(self, fix)
self.frametypes = frametypes
def run_check(self, directory, files, severity, errors):
for frametype in self.frametypes:
values = set()
for file, frame in files:
value = Check.get_value(frame, frametype)
values.add(value)
if len(values) > 1:
errors.record(directory, severity, "Inconsistent values for frame %s: %s" % (frametype, values))
class MutualPresenceCheck(FileCheck):
def __init__(self, frametypes, fix=None):
FileCheck.__init__(self, fix)
self.frametypes = frametypes
def check_file(self, file, id3, severity, errors):
present = [frametype for frametype in self.frametypes if Check.get_frame(id3, frametype)]
absent = [frametype for frametype in self.frametypes if frametype not in present]
if len(present) == 0:
return
if len(absent) == 0:
return
errors.record(file, severity, "Mutally required frames missing: has %s but not %s" % (present, absent))
class TagVersionCheck(FileCheck):
def check_file(self, file, id3, severity, errors):
if (id3.version < (2, 4, 0)):
errors.record(file, severity, "Frame version too old: %s" % (id3.version,))
class TrackNumberContinuityCheck(Check):
def run_check(self, directory, files, severity, errors):
# TODO: make aware of disc numbers. Provide option not to care about single-track purchases.
if len(files) == 0:
return
track_values = set()
maxtrack_values = set()
for file, frame in files:
values = Check.get_value(frame, "TRCK").split("/")
track = int(values[0])
maxtrack = int(values[1]) if len(values) > 1 else None
track_values.add(track)
maxtrack_values.add(maxtrack)
if len(maxtrack_values) > 1:
errors.record(directory, severity, "Tracks don't agree about album track length: %s" % (maxtrack_values,))
return
maxtrack = list(maxtrack_values)[0]
if not maxtrack:
maxtrack = len(files)
maxtrack = max(maxtrack, max(track_values))
sorted_tracks = sorted(track_values)
missing = [num for num in range(1, maxtrack) if not num in track_values]
if len(missing) > 0:
errors.record(directory, severity, "Missing track numbers %s out of %s tracks" % (missing, maxtrack))
class DependentValueCheck(FileCheck):
def __init__(self, required_frame, required_value, dependent_frame, dependent_value, fix=None):
FileCheck.__init__(self, fix)
self.required_frame = required_frame
self.required_value = required_value
self.dependent_frame = dependent_frame
self.dependent_value = dependent_value
def check_file(self, file, id3, severity, errors):
dependent = Check.get_value(id3, self.dependent_frame)
required = Check.get_value(id3, self.required_frame)
if dependent == self.dependent_value and required != self.required_value:
errors.record(file, severity, "Frame %s = '%s' but %s not '%s' (was '%s')" % (self.dependent_frame,
self.dependent_value, self.required_frame, self.required_value, required))
class Compressed24Tag(FileCheck):
def __init__(self, fix=None):
FileCheck.__init__(self, fix)
def check_file(self, file, id3, severity, errors):
if (2,4,0) > id3.version:
return
compressed_frames = []
for frame_type in id3:
for frame in id3.getall(frame_type):
if frame._flags & Frame.FLAG24_COMPRESS:
compressed_frames.append(frame_type)
if compressed_frames:
errors.record(file, severity, "Found compressed v2.4 frames: %s" % (compressed_frames,))
class Fix(object):
def try_fix(self, directory, valid_files, to_fix, errors):
pass
class ApplyValue(Fix):
def __init__(self, frametype, value):
self.frametype = frametype
self.value = value
def try_fix(self, directory, valid_files, to_fix, errors):
for file, tag in to_fix:
try:
tag.delall(self.frametype)
frame = Frames.get(self.frametype)(encoding=3, text=self.value)
tag.add(frame)
tag.save()
errors.record(file, "FIXED", "Frame %s set to '%s'" % (self.frametype, self.value))
except object, e:
errors.record(file, "FIXERROR", "Could not set frame %s to '%s': %s" % (self.frametype, self.value, e))
class StripFrame(Fix):
def __init__(self, frametypes):
self.frametypes = frametypes
def try_fix(self, directory, valid_files, to_fix, errors):
for frametype in self.frametypes:
for file, frame in to_fix:
try:
frame.delall(frametype)
frame.save()
errors.record(file, "FIXED", "Frame %s deleted" % (frametype,))
except:
errors.record(file, "FIXERROR", "Could not delete frame %s" % (frametype,))
class ApplyCommonValue(Fix):
# outliers may be an integer (in which case it represents the number of files which are permitted
# to not have the common value), or a decimal < 1.0, in which case it is a fraction. So if there
# are 25 files in a directory, a value of 5 or a value of 0.2 will have the same effect (apply
# a common value to target if 20/25 share the same value in source. If a decimal is supplied, a
# minimum outlier of 1 is assumed. Ties in cardinality (so with 10 items, outliers = 5, and two
# source values each used by 5 files) are broken arbitrarily.
def __init__(self, source, target, outliers):
self.source = source
self.target = target
self.outliers = outliers
def try_fix(self, directory, valid_files, to_fix, errors):
values = []
for (file, tag) in valid_files:
values.append(Check.get_value(tag, self.source))
if len(values) == 0:
for (file, tag) in to_fix:
errors.record(file, "FIXERROR", "No valid source values for tag %s" % self.source)
counter = {}
for value in values: counter[value] = counter.get(value, 0) + 1
top = sorted([ (freq,word) for word, freq in counter.items() ], reverse=True)[0]
top_value = top[1]
top_freq = top[0]
outliers = len(valid_files) - top_freq
permitted_outliers = 0 if self.outliers == 0 else self.outliers if self.outliers >= 1 else max(int(self.outliers * len(valid_files)), 1)
for (file, tag) in to_fix:
if outliers > permitted_outliers:
errors.record(file, "FIXERROR", "Too many outliers from %s: %s (max %s)" % (top_value, outliers, permitted_outliers))
else:
try:
frame = Frames.get(self.target)(encoding=3, text=top_value)
tag.add(frame)
tag.save()
errors.record(file, "FIX", "Fixed: set field %s to \"%s\"" % (self.target, top_value))
except object, e:
errors.record(file, "FIXERROR", "Could not save %s" % e)
class UpdateTag(Fix):
def try_fix(self, directory, valid_files, to_fix, errors):
for (file, tag) in to_fix:
tag.save()
errors.record(file, "FIX", "Updated tag to v2.4")
class TrailingArtistCheck(FileCheck, Fix):
def __init__(self):
FileCheck.__init__(self, self)
def check_file(self, file, id3, severity, errors):
artist = Check.get_value(id3, 'TPE1')
title = Check.get_value(id3, 'TIT2')
if not artist:
return
if not title:
return
search_string = " - " + artist
if title.endswith(search_string):
errors.record(file, severity, "Title '%s' appears to include artist '%s'" % (title, artist))
def try_fix(self, directory, valid_files, to_fix, errors):
for file, tag in to_fix:
try:
artist = Check.get_value(tag, 'TPE1')
title = Check.get_value(tag, 'TIT2')
search_string = " - " + artist
newtitle = title.replace(search_string, "")
tag.delall('TIT2')
frame = Frames.get('TIT2')(encoding=3, text=newtitle)
tag.add(frame)
tag.save()
errors.record(file, "FIXED", "Title set to '%s'" % (newtitle))
except object, e:
errors.record(file, "FIXERROR", "Could not set title to '%s': %s" % (newtitle, e))
class MigrateRegex(Fix):
def __init__(self, from_frame, to_frame, regex, overwrite, match_group=0):
self.from_frame = from_frame
self.to_frame = to_frame
self.regex = regex
self.overwrite = overwrite
self.match_group = match_group
def try_fix(self, directory, valid_files, to_fix, errors):
for file, tag in to_fix:
try:
source = Check.get_value(tag, self.from_frame)
match = re.search(self.regex, source)
if not match:
errors.record(file, "FIXERROR", "Cannot move value from frame %s as does not match regex %s" % (self.from_frame, self.regex))
break
replace_value = match.group(self.match_group)
existing_value = Check.get_value(tag, self.to_frame)
if existing_value:
if not self.overwrite:
errors.record(file, "FIXERROR", "Could not copy value '%s' from frame %s; destination frame %s already has value '%s' and overwrite=False" % (replace_value, self.from_frame, self.to_frame, existing_value))
break
tag.delall(self.to_frame)
tag.delall(self.from_frame)
replace_from = re.sub(self.regex, "", source)
frame = Frames.get(self.to_frame)(encoding=3, text=replace_value)
tag.add(frame)
frame = Frames.get(self.from_frame)(encoding=3, text=replace_from)
tag.add(frame)
tag.save()
errors.record(file, "FIXED", "Value '%s' moved from frame %s to %s" % (replace_value, self.from_frame, self.to_frame))
except object, e:
errors.record(file, "FIXERROR", "Could not move regex %s from frame %s to frame %s: %s" % (self.refex, self.from_frame, self.to_frame, e))
def runchecks(tests):
parser = OptionParser()
parser.add_option("-r", "--recursive", action="store_true", default=False, help="Recurse into directories")
parser.add_option("-f", "--apply-fixes", action="store_true", default=False, help="Apply any configured fixes")
(options, args) = parser.parse_args()
if not args:
raise SystemExit(parser.print_help() or 1)
doctor = Doctor(tests)
doctor.checkup(args[0], recursive=options.recursive, fix=options.apply_fixes)
if __name__ == "__main__":
tests = [
#TagVersionCheck(fix=UpdateTag()),
#FramePresentCheck(['APIC', 'TALB', 'TRCK']),
#MutualPresenceCheck(['TOAL', 'TOPE', 'TDOR']),
#FrameConsistencyCheck(['TALB', 'TPE2']),
#FramePresentCheck(['TPE2'], fix=ApplyCommonValue(source='TPE1', target='TPE2', outliers=2)),
#TrackNumberContinuityCheck(),
#FrameAbsentCheck(['COMM'], fix=StripFrame(['COMM'])),
#FrameWhitelistCheck('TCON', ['Rock', 'Pop', 'Alternative']),
#FrameBlacklistCheck('TIT2', [r'[\(\[].*with', r'[\(\[].*live', r'[\(\[].*remix', r'[\(\[].*cover'], regex=True),
FramePresentCheck(['TXXX'], fix=ApplyValue('TXXX', 'test')),
# DependentValueCheck('TCON', 'Rock', 'TPE1', 'Florence + The Machine', fix=ApplyValue('TCON', 'Rock'))
]
runchecks(tests)