forked from farsonic/unifi-to-hosts-mapping
-
Notifications
You must be signed in to change notification settings - Fork 2
/
hosts.py
454 lines (421 loc) · 18.4 KB
/
hosts.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
# -*- coding: utf-8 -*-
""" This module contains classes:
HostsEntry:
A representation of a hosts file entry, i.e. a line containing an IP address
and name(s), a comment, or a blank line/line separator.
Hosts:
A representation of a hosts file, e.g. /etc/hosts and
c:\\\windows\\\system32\\\drivers\\\etc\\\hosts for a linux or MS windows
based machine respectively. Each entry being represented as an instance
of the HostsEntry class.
"""
import sys
try:
from urllib.request import urlopen
except ImportError: # pragma: no cover
from urllib2 import urlopen
from python_hosts.utils import (is_ipv4, is_ipv6, is_readable, valid_hostnames,
dedupe_list)
from python_hosts.exception import (InvalidIPv6Address, InvalidIPv4Address,
UnableToWriteHosts)
class HostsEntry(object):
""" An entry in a hosts file. """
__slots__ = ['entry_type', 'address', 'comment', 'names']
def __init__(self,
entry_type=None,
address=None,
comment=None,
names=None):
"""
Initialise an instance of a Hosts file entry
:param entry_type: ipv4 | ipv6 | comment | blank
:param address: The ipv4 or ipv6 address belonging to the instance
:param comment: The comment belonging to the instance
:param names: The names that resolve to the specified address
:return: None
"""
if not entry_type or entry_type not in ('ipv4',
'ipv6',
'comment',
'blank'):
raise Exception('entry_type invalid or not specified')
if entry_type == 'comment' and not comment:
raise Exception('entry_type comment supplied without value.')
if entry_type == 'ipv4':
if not all((address, names)):
raise Exception('Address and Name(s) must be specified.')
if not is_ipv4(address):
raise InvalidIPv4Address()
if entry_type == 'ipv6':
if not all((address, names)):
raise Exception('Address and Name(s) must be specified.')
if not is_ipv6(address):
raise InvalidIPv6Address()
self.entry_type = entry_type
self.address = address
self.comment = comment
self.names = names
def is_real_entry(self):
return self.entry_type in ('ipv4', 'ipv6')
def __repr__(self):
return "HostsEntry(entry_type=\'{0}\', address=\'{1}\', " \
"comment={2}, names={3})".format(self.entry_type,
self.address,
self.comment,
self.names)
def __str__(self):
if self.entry_type in ('ipv4', 'ipv6'):
return "TYPE={0}, ADDR={1}, NAMES={2}".format(self.entry_type,
self.address,
" ".join(self.names))
elif self.entry_type == 'comment':
return "TYPE = {0}, COMMENT = {1}".format(self.entry_type, self.comment)
elif self.entry_type == 'blank':
return "TYPE = {0}".format(self.entry_type)
@staticmethod
def get_entry_type(hosts_entry=None):
"""
Return the type of entry for the line of hosts file passed
:param hosts_entry: A line from the hosts file
:return: 'comment' | 'blank' | 'ipv4' | 'ipv6'
"""
if hosts_entry and isinstance(hosts_entry, str):
entry = hosts_entry.strip()
if not entry or not entry[0] or entry[0] == "\n":
return 'blank'
if entry[0] == "#":
return 'comment'
entry_chunks = entry.split()
if is_ipv6(entry_chunks[0]):
return 'ipv6'
if is_ipv4(entry_chunks[0]):
return 'ipv4'
@staticmethod
def str_to_hostentry(entry):
"""
Transform a line from a hosts file into an instance of HostsEntry
:param entry: A line from the hosts file
:return: An instance of HostsEntry
"""
line_parts = entry.strip().split()
if is_ipv4(line_parts[0]) and valid_hostnames(line_parts[1:]):
return HostsEntry(entry_type='ipv4',
address=line_parts[0],
names=line_parts[1:])
elif is_ipv6(line_parts[0]) and valid_hostnames(line_parts[1:]):
return HostsEntry(entry_type='ipv6',
address=line_parts[0],
names=line_parts[1:])
else:
return False
class Hosts(object):
""" A hosts file. """
__slots__ = ['entries', 'hosts_path']
def __init__(self, path=None):
"""
Initialise an instance of a hosts file
:param path: The filesystem path of the hosts file to manage
:return: None
"""
self.entries = []
if path:
self.hosts_path = path
else:
self.hosts_path = self.determine_hosts_path()
self.populate_entries()
def __repr__(self):
return 'Hosts(hosts_path=\'{0}\', entries={1})'.format(self.hosts_path, self.entries)
def __str__(self):
output = ('hosts_path={0}, '.format(self.hosts_path))
for entry in self.entries:
output += str(entry)
return output
def count(self):
""" Get a count of the number of host entries
:return: The number of host entries
"""
return len(self.entries)
@staticmethod
def determine_hosts_path(platform=None):
"""
Return the hosts file path based on the supplied
or detected platform.
:param platform: a string used to identify the platform
:return: detected filesystem path of the hosts file
"""
if not platform:
platform = sys.platform
if platform.startswith('win'):
result = r"c:\windows\system32\drivers\etc\hosts"
return result
else:
return '/etc/hosts'
def write(self, path=None):
"""
Write all of the HostsEntry instances back to the hosts file
:param path: override the write path
:return: Dictionary containing counts
"""
written_count = 0
comments_written = 0
blanks_written = 0
ipv4_entries_written = 0
ipv6_entries_written = 0
if path:
output_file_path = path
else:
output_file_path = self.hosts_path
try:
with open(output_file_path, 'w') as hosts_file:
for written_count, line in enumerate(self.entries):
if line.entry_type == 'comment':
hosts_file.write(line.comment + "\n")
comments_written += 1
if line.entry_type == 'blank':
hosts_file.write("\n")
blanks_written += 1
if line.entry_type == 'ipv4':
hosts_file.write(
"{0} {1}\n".format(
line.address,
' '.join(line.names),
)
)
ipv4_entries_written += 1
if line.entry_type == 'ipv6':
hosts_file.write(
"{0} {1}\n".format(
line.address,
' '.join(line.names), ))
ipv6_entries_written += 1
except:
raise UnableToWriteHosts()
return {'total_written': written_count + 1,
'comments_written': comments_written,
'blanks_written': blanks_written,
'ipv4_entries_written': ipv4_entries_written,
'ipv6_entries_written': ipv6_entries_written}
@staticmethod
def get_hosts_by_url(url=None):
"""
Request the content of a URL and return the response
:param url: The URL of the hosts file to download
:return: The content of the passed URL
"""
response = urlopen(url)
return response.read()
def exists(self, address=None, names=None, comment=None):
"""
Determine if the supplied address and/or names, or comment, exists in a HostsEntry within Hosts
:param address: An ipv4 or ipv6 address to search for
:param names: A list of names to search for
:param comment: A comment to search for
:return: True if a supplied address, name, or comment is found. Otherwise, False.
"""
for entry in self.entries:
if entry.entry_type in ('ipv4', 'ipv6'):
if address and address == entry.address:
return True
if names:
for name in names:
if name in entry.names:
return True
elif entry.entry_type == 'comment' and entry.comment == comment:
return True
return False
def remove_all_matching(self, address=None, name=None):
"""
Remove all HostsEntry instances from the Hosts object
where the supplied ip address or name matches
:param address: An ipv4 or ipv6 address
:param name: A host name
:return: None
"""
if self.entries:
if address and name:
func = lambda entry: not entry.is_real_entry() or (entry.address != address and name not in entry.names)
elif address:
func = lambda entry: not entry.is_real_entry() or entry.address != address
elif name:
func = lambda entry: not entry.is_real_entry() or name not in entry.names
else:
raise ValueError('No address or name was specified for removal.')
self.entries = list(filter(func, self.entries))
def import_url(self, url=None, force=None):
"""
Read a list of host entries from a URL, convert them into instances of HostsEntry and
then append to the list of entries in Hosts
:param url: The URL of where to download a hosts file
:return: Counts reflecting the attempted additions
"""
file_contents = self.get_hosts_by_url(url=url).decode('utf-8')
file_contents = file_contents.rstrip().replace('^M', '\n')
file_contents = file_contents.rstrip().replace('\r\n', '\n')
lines = file_contents.split('\n')
skipped = 0
import_entries = []
for line in lines:
stripped_entry = line.strip()
if (not stripped_entry) or (stripped_entry.startswith('#')):
skipped += 1
else:
line = line.partition('#')[0]
line = line.rstrip()
import_entry = HostsEntry.str_to_hostentry(line)
if import_entry:
import_entries.append(import_entry)
add_result = self.add(entries=import_entries, force=force)
write_result = self.write()
return {'result': 'success',
'skipped': skipped,
'add_result': add_result,
'write_result': write_result}
def import_file(self, import_file_path=None):
"""
Read a list of host entries from a file, convert them into instances
of HostsEntry and then append to the list of entries in Hosts
:param import_file_path: The path to the file containing the host entries
:return: Counts reflecting the attempted additions
"""
skipped = 0
invalid_count = 0
if is_readable(import_file_path):
import_entries = []
with open(import_file_path, 'r') as infile:
for line in infile:
stripped_entry = line.strip()
if (not stripped_entry) or (stripped_entry.startswith('#')):
skipped += 1
else:
line = line.partition('#')[0]
line = line.rstrip()
import_entry = HostsEntry.str_to_hostentry(line)
if import_entry:
import_entries.append(import_entry)
else:
invalid_count += 1
add_result = self.add(entries=import_entries)
write_result = self.write()
return {'result': 'success',
'skipped': skipped,
'invalid_count': invalid_count,
'add_result': add_result,
'write_result': write_result}
else:
return {'result': 'failed',
'message': 'Cannot read: file {0}.'.format(import_file_path)}
def add(self, entries=None, force=False, allow_address_duplication=False, merge_names=False):
"""
Add instances of HostsEntry to the instance of Hosts.
:param entries: A list of instances of HostsEntry
:param force: Remove matching before adding
:param allow_address_duplication: Allow using multiple entries for same address
:param merge_names: Merge names where address already exists
:return: The counts of successes and failures
"""
ipv4_count = 0
ipv6_count = 0
comment_count = 0
invalid_count = 0
duplicate_count = 0
replaced_count = 0
import_entries = []
existing_addresses = [x.address for x in self.entries if x.address]
existing_names = []
for item in self.entries:
if item.names:
existing_names.extend(item.names)
existing_names = dedupe_list(existing_names)
for entry in entries:
if entry.entry_type == 'comment':
entry.comment = entry.comment.strip()
if entry.comment[0] != "#":
entry.comment = "# " + entry.comment
import_entries.append(entry)
elif entry.address in ('0.0.0.0', '127.0.0.1') or allow_address_duplication:
# Allow duplicates entries for addresses used for adblocking
if set(entry.names).intersection(existing_names):
if force:
for name in entry.names:
self.remove_all_matching(name=name)
import_entries.append(entry)
else:
duplicate_count += 1
else:
import_entries.append(entry)
elif entry.address in existing_addresses:
if not any((force, merge_names)):
duplicate_count += 1
elif merge_names:
# get the last entry with matching address
entry_names = list()
for existing_entry in self.entries:
if entry.address == existing_entry.address:
entry_names = existing_entry.names
break
# merge names with that entry
merged_names = list(set(entry.names + entry_names))
# remove all matching
self.remove_all_matching(address=entry.address)
# append merged entry
entry.names = merged_names
import_entries.append(entry)
elif force:
self.remove_all_matching(address=entry.address)
replaced_count += 1
import_entries.append(entry)
elif set(entry.names).intersection(existing_names):
if not force:
duplicate_count += 1
else:
for name in entry.names:
self.remove_all_matching(name=name)
replaced_count += 1
import_entries.append(entry)
else:
import_entries.append(entry)
for item in import_entries:
if item.entry_type == 'comment':
comment_count += 1
self.entries.append(item)
elif item.entry_type == 'ipv4':
ipv4_count += 1
self.entries.append(item)
elif item.entry_type == 'ipv6':
ipv6_count += 1
self.entries.append(item)
return {'comment_count': comment_count,
'ipv4_count': ipv4_count,
'ipv6_count': ipv6_count,
'invalid_count': invalid_count,
'duplicate_count': duplicate_count,
'replaced_count': replaced_count}
def populate_entries(self):
"""
Called by the initialiser of Hosts. This reads the entries from the local hosts file,
converts them into instances of HostsEntry and adds them to the Hosts list of entries.
:return: None
"""
try:
with open(self.hosts_path, 'r') as hosts_file:
hosts_entries = [line for line in hosts_file]
for hosts_entry in hosts_entries:
entry_type = HostsEntry.get_entry_type(hosts_entry)
if entry_type == "comment":
hosts_entry = hosts_entry.replace("\r", "")
hosts_entry = hosts_entry.replace("\n", "")
self.entries.append(HostsEntry(entry_type="comment",
comment=hosts_entry))
elif entry_type == "blank":
self.entries.append(HostsEntry(entry_type="blank"))
elif entry_type in ("ipv4", "ipv6"):
chunked_entry = hosts_entry.split()
stripped_name_list = [name.strip() for name in chunked_entry[1:]]
self.entries.append(
HostsEntry(
entry_type=entry_type,
address=chunked_entry[0].strip(),
names=stripped_name_list))
except IOError:
return {'result': 'failed',
'message': 'Cannot read: {0}.'.format(self.hosts_path)}