-
Notifications
You must be signed in to change notification settings - Fork 15
/
find-duplicates.py
188 lines (158 loc) · 7.44 KB
/
find-duplicates.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import argparse
import hashlib
import logging
import os
from collections import defaultdict
def chunk_reader(fd, chunk_size):
"""Generator that reads a file in chunks of bytes"""
while True:
chunk = fd.read(chunk_size)
if not chunk:
return
yield chunk
def get_hash(filename, chunk_size, first_chunk_only=False, hash=hashlib.sha1):
hashobj = hash()
file_object = open(filename, 'rb')
if first_chunk_only:
hashobj.update(file_object.read(chunk_size))
else:
for chunk in chunk_reader(file_object, chunk_size):
hashobj.update(chunk)
file_hash = hashobj.digest()
file_object.close()
return file_hash
def check_for_duplicates(path, chunk_size, recursive, hash=hashlib.sha1):
# dict of size_in_bytes: [full_path_to_file1, full_path_to_file2, ]
hashes_by_size = defaultdict(list)
# dict of (hash1k, size_in_bytes): [full_path_to_file1, full_path_to_file2, ]
hashes_on_1k = defaultdict(list)
hashes_full = {} # dict of full_file_hash: full_path_to_file_string
duplicates = defaultdict(set)
logger.info('Comparing file sizes')
if not recursive:
for filename in os.listdir(path):
try:
full_path = os.path.realpath(os.path.join(path, filename))
if os.path.isfile(full_path):
file_size = os.path.getsize(full_path)
hashes_by_size[file_size].append(full_path)
except (OSError,):
# not accessible (permissions, etc) - pass on
logger.warning(
f'Error reading file: {os.path.join(path, filename)}')
continue
else:
for dirpath, dirnames, filenames in os.walk(path):
# get all files that have the same size - they are the collision candidates
for filename in filenames:
full_path = os.path.join(dirpath, filename)
try:
# if the target is a symlink (soft one), this will
# dereference it - change the value to the actual target file
full_path = os.path.realpath(full_path)
file_size = os.path.getsize(full_path)
hashes_by_size[file_size].append(full_path)
except (OSError,):
# not accessible (permissions, etc) - pass on
logger.warning(
f'Error reading file: {os.path.join(dirpath, filename)}')
continue
logger.info('Comparing short hashes')
# For all files with the same file size, get their hash on the 1st chunk only
for size_in_bytes, files in hashes_by_size.items():
if len(files) < 2:
continue # this file size is unique, no need to spend CPU cycles on it
for filename in files:
try:
small_hash = get_hash(
filename, chunk_size, first_chunk_only=True)
# the key is the hash on the first chunk plus the size - to
# avoid collisions on equal hashes in the first part of the file
# credits to @Futal for the optimization
hashes_on_1k[(small_hash, size_in_bytes)].append(filename)
except (OSError,):
# the file access might've changed till the exec point got here
logger.warning(
f'Error reading file: {os.path.join(path, filename)}')
continue
logger.info('Comparing full hashes')
# For all files with the same hash on the 1st chunk, get their hash on the full file - collisions will be duplicates
for __, files_list in hashes_on_1k.items():
if len(files_list) < 2:
continue # this hash of fist 1k file bytes is unique, no need to spend CPU cycles on it
for filename in files_list:
try:
full_hash = get_hash(filename, chunk_size,
first_chunk_only=False)
duplicate = hashes_full.get(full_hash)
if duplicate:
logger.info(
f'Duplicate: {os.path.basename(duplicate)} | {os.path.basename(filename)}')
duplicates[duplicate].add(filename)
else:
hashes_full[full_hash] = filename
except (OSError,):
logger.warning(
f'Error reading file: {os.path.join(dirpath, filename)}')
continue
return duplicates
def single_yes_or_no_question(question, default_no=True):
choices = ' [y/N]: ' if default_no else ' [Y/n]: '
default_answer = 'n' if default_no else 'y'
reply = str(input(question + choices)).lower().strip() or default_answer
if reply[0] == 'y':
return True
if reply[0] == 'n':
return False
else:
return False if default_no else True
def find_keep_file(files):
filename_files_dict = {os.path.splitext(os.path.basename(f))[
0]: i for i, f in enumerate(files)}
filenames = list(filename_files_dict.keys())
filenames.sort()
filenames.sort(key=len)
keep_file_idx = filename_files_dict[filenames[0]]
return files[keep_file_idx]
def main(path, chunk_size, recursive, force, dry_run):
duplicates = check_for_duplicates(path, chunk_size, recursive)
logger.info(
f'Number of duplicates found: {sum([len(files_set) for __, files_set in duplicates.items()])}')
if len(duplicates) == 0:
logger.info('No duplicates found. Exiting.')
return
if dry_run:
logger.warning('Dry run is enabled. No deletions will be performed.')
if not force:
delete = single_yes_or_no_question('Delete duplicates?')
if delete or force:
for key_file, files_set in duplicates.items():
files_set.add(key_file)
keep_file = find_keep_file(list(files_set))
logger.info(f'Keep: {os.path.basename(keep_file)}')
files_set.remove(keep_file)
for f in files_set:
logger.info(f'Delete: {os.path.basename(f)}')
if not dry_run:
os.remove(f)
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description=('Remove duplicated media, preserving the file with the shortest filename'
'or earliest date encoded in the filename.'))
parser.add_argument('path', type=str, help='Path to WhatsApp media folder')
parser.add_argument('-c', '--chunk-size', default=1024, type=int,
help=('Chunk size for heuristic, smaller values are generally faster'
'but if many files have identical starting chunks, performance'
'degrades as more full hashes are computed'))
parser.add_argument('-f', '--force', action='store_true',
help='Delete duplicates without prompting')
parser.add_argument('-r', '--recursive', default=False,
action='store_true', help='Recursively process media')
parser.add_argument('--dry-run', default=False, action='store_true',
help='Dry run deletion (no files deleted)')
args = parser.parse_args()
logging.basicConfig(
level=logging.INFO, format='%(asctime)s %(name)s %(levelname)s: %(message)s')
logger = logging.getLogger('find-duplicates')
main(args.path, args.chunk_size, args.recursive,
force=args.force, dry_run=args.dry_run)