-
Notifications
You must be signed in to change notification settings - Fork 0
/
gtm-scraper.py
180 lines (144 loc) · 7.52 KB
/
gtm-scraper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import requests, sys, csv, bs4, re, time, random, urllib3, ssl, logging
from urllib.parse import urlparse
def get_page(url):
# Get the URL and return it as a Beautiful Soup object
try:
res = requests.get(url)
res.raise_for_status()
except (requests.exceptions.RequestException, urllib3.exceptions.MaxRetryError, requests.exceptions.SSLError, ssl.SSLCertVerificationError) as ex:
# This handles all the errors I've seen so far in testing that come out of the requests portion of the process
errored_urls.append(url)
error_details[url] = ex
return None
# Sometimes links aren't really HTML pages, so make sure they are before trying to Soupify them
if "text/html" in res.headers.get('content-type',''):
pageSoup = bs4.BeautifulSoup(res.text, 'html.parser')
return pageSoup
else:
errored_urls.append(url)
error_details[url] = 'Unsupported or Missing Content Type'
return None
def find_urls_on_page(current_url, bs4_obj):
# Take a Beautiful Soup object, find everything that looks like a link
# then return all valid, unchecked, unqueued links
anchors = bs4_obj.select('a[href]')
# Get rid of any links to call or email and any links that just run a script
# Also, try to avoid any obvious file extensions
real_links = []
invalid_link_regex = re.compile(r'^(mailto|tel|javascript):', re.IGNORECASE)
invalid_path_regex = re.compile(r'\.(png|jpe?g|gif|pdf|xlsx?|docx?|pptx?|zip|txt|mpeg|mp4|mp3)$', re.IGNORECASE)
for anchor in anchors:
href = anchor.get('href')
if invalid_link_regex.search(href) is None:
if invalid_path_regex.search(urlparse(href).path) is None:
real_links.append(href)
# Narrow the list down to just scheme, hostname, and path - no parameters or fragments
base_links = []
current_scheme = urlparse(current_url).scheme
current_hostname = urlparse(current_url).hostname
for link in real_links:
base_url = ''
parsed_link = urlparse(link)
if parsed_link.hostname:
base_url = base_url + parsed_link.scheme + '://' + parsed_link.hostname + parsed_link.path
else:
# HREF values for same domain links won't include scheme or hostname, so use the current ones
base_url = base_url + current_scheme + '://' + current_hostname + parsed_link.path
base_links.append(base_url)
# Only return the valid links that haven't been already queued, checked, or resulted in an error
return_links = []
for link in base_links:
if urlparse(link).hostname in valid_hostnames:
if link not in queued_urls and link not in checked_urls and link not in errored_urls and link not in return_links:
return_links.append(link)
return return_links
def find_gtm_containers(bs4_obj):
# Take a Beautiful Soup object, find all scripts in the head and no scripts in the body,
# then return a list of dictionaries with container id, as well as placement details
containers = []
head_scripts = bs4_obj.select('head script')
gtm_scripts = []
for script in head_scripts:
if re.search('googletagmanager', str(script)):
gtm_scripts.append(script)
gtm_noscripts = bs4_obj.select('body noscript iframe[src*="googletagmanager"]')
container_id_regex = re.compile(r'GTM-[A-Z0-9]+')
# Extract the container IDs from all found scripts
head_container_ids = []
noscript_container_ids = []
for script in gtm_scripts:
match = container_id_regex.search(str(script))
if match:
match_result = match.group(0)
if match_result not in head_container_ids:
head_container_ids.append(match_result)
for noscript in gtm_noscripts:
match = container_id_regex.search(str(noscript))
if match:
match_result = match.group(0)
if match_result not in noscript_container_ids:
noscript_container_ids.append(match_result)
# Get comprehensive list of unique IDs
all_container_ids = list(set(head_container_ids + noscript_container_ids))
for container_id in all_container_ids:
container = {
"id": container_id,
"in_head": container_id in head_container_ids,
"in_body": container_id in noscript_container_ids
}
containers.append(container)
return containers
def write_results_to_file(dictionary):
# For lack of a better option at the moment, start the name with the first hostname in the list from the command line
name_root = valid_hostnames[0].replace('.','_')
with open(f'{name_root}-gtm-scraper-results.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['url', 'container_id', 'in_head', 'in_body'])
for key, value in dictionary.items():
if len(value) >= 1:
# Note that a page could have more than one container
for container in value:
container_id = container["id"]
head_bool = container["in_head"]
body_bool = container["in_body"]
writer.writerow([key, container_id, head_bool, body_bool])
else:
# Represents pages that don't have GTM on them at all
writer.writerow([key, 'none', 'na', 'na'])
def write_errors_to_file(dictionary):
# For lack of a better option at the moment, start the name with the first hostname in the list from the command line
name_root = valid_hostnames[0].replace('.','_')
with open(f'{name_root}-gtm-scraper-errors.csv', 'w', newline='') as csvfile:
writer = csv.writer(csvfile, delimiter=',', quotechar='"', quoting=csv.QUOTE_MINIMAL)
writer.writerow(['url', 'error'])
for key, value in dictionary.items():
writer.writerow([key, value])
if __name__ == '__main__':
# Set logging level
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Get the hostnames to check from the command line arguments
valid_hostnames = sys.argv[1:]
queued_urls = [] # working list of urls to check
checked_urls = [] # everything that was reviewed
errored_urls = [] # everything that failed its request status
page_details = {} # dictionary of pages as keys and lists of containers as values
error_details = {} # dictionary of pages as keys and errors as values
# Initialize the queue with the homepages for the hostnames from the command line
for hostname in valid_hostnames:
queued_urls.append(f'https://{hostname}/')
# Check pages until the queue is empty
while queued_urls:
current_url = queued_urls.pop(0)
logging.info(f'checking: {current_url}')
current_page = get_page(current_url)
if current_page:
checked_urls.append(current_url) # make sure it's in one of the lists so that it doesn't get enqueued
queued_urls.extend(find_urls_on_page(current_url, current_page))
page_details[current_url] = find_gtm_containers(current_page)
# Pause for a moment before the next URL to try to avoid triggering bot detection
time.sleep(random.randrange(3,7)/10)
# Use the dictionary to create a csv file
write_results_to_file(page_details)
# If applicable, produce an error report as well
if len(error_details) >= 1:
write_errors_to_file(error_details)