Skip to content

Commit

Permalink
Update web_scraper.py
Browse files Browse the repository at this point in the history
  • Loading branch information
slyfox1186 committed Sep 26, 2024
1 parent 7acdc1a commit 58b8f7d
Showing 1 changed file with 167 additions and 58 deletions.
225 changes: 167 additions & 58 deletions Python3/Networking/web_scraper.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,182 @@
#!/usr/bin/env python3

import requests
from bs4 import BeautifulSoup
import sys
import argparse
import time
import csv
import json
import logging
import multiprocessing
import requests
import time
import warnings
import re
from bs4 import BeautifulSoup, XMLParsedAsHTMLWarning
from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from urllib.parse import urljoin
from concurrent.futures import ThreadPoolExecutor
from urllib3 import connectionpool

# Suppress XMLParsedAsHTMLWarning
warnings.filterwarnings("ignore", category=XMLParsedAsHTMLWarning)

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def fetch_url(url, retries=3, backoff_factor=2):
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
for attempt in range(retries):
try:
response = requests.get(url, headers=headers, timeout=10)
response.raise_for_status()
return response.content, url
except requests.RequestException as e:
logging.error(f"Error fetching {url} (attempt {attempt + 1}/{retries}): {e}")
sleep_time = backoff_factor ** attempt
logging.info(f"Retrying in {sleep_time} seconds...")
time.sleep(sleep_time)
return None, url

def scrape_content(content, base_url):
try:
if not content:
return

soup = BeautifulSoup(content, 'html.parser')
# Adjust the selector based on the website structure
articles = soup.find_all('h3')

print(f"Website: {base_url}")
print("-" * 40)
print()
for article in articles:
title = article.get_text(strip=True)
link = article.find('a')
if link:
link = link.get('href')
full_link = urljoin(base_url, link)
print(f"Title: {title}")
print(f"Link: {full_link}")
print()
print("=" * 40)
except Exception as e:
logging.error(f"Error parsing content from {base_url}: {e}")

def scrape_websites(urls):
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s - %(message)s')
logger = logging.getLogger(__name__)

DEFAULT_USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:130.0) Gecko/20100101 Firefox/130.0'
DEFAULT_POOL_SIZE = 100

def create_session(user_agent, pool_size):
session = requests.Session()
retries = Retry(total=3, backoff_factor=0.5, status_forcelist=[500, 502, 503, 504])
adapter = HTTPAdapter(max_retries=retries, pool_connections=pool_size, pool_maxsize=pool_size)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.headers.update({'User-Agent': user_agent})
return session

def fetch_url(session, url):
try:
with ThreadPoolExecutor() as executor:
contents_and_urls = list(executor.map(fetch_url, urls))
for content, url in contents_and_urls:
scrape_content(content, url)
except Exception as e:
logging.error(f"Error: {e}")
response = session.get(url, timeout=10)
response.raise_for_status()
return response.content, url
except requests.RequestException as e:
logger.error(f"Error fetching {url}: {e}")
return None, url

def extract_main_content(content, url):
soup = BeautifulSoup(content, 'lxml')

# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'header', 'footer']):
element.decompose()

# Try to find the main content
main_content = soup.find('main') or soup.find('article') or soup.find('div', class_='content')

if not main_content:
# If no specific content container is found, use the body
main_content = soup.body

if main_content:
# Extract text and links
text = main_content.get_text(strip=True)
# Remove '\r' characters and normalize line endings
text = re.sub(r'\r\n?', '\n', text)
links = [{'text': a.get_text(strip=True), 'href': urljoin(url, a.get('href'))}
for a in main_content.find_all('a', href=True)]

# Limit text to 1000 characters
text = text[:1000] + '...' if len(text) > 1000 else text

return {
'url': url,
'title': soup.title.string if soup.title else 'No title',
'content': text,
'links': links[:10] # Limit to 10 links
}
else:
return {
'url': url,
'title': 'Failed to extract content',
'content': '',
'links': []
}

def scrape_websites(urls, max_workers, user_agent, pool_size):
session = create_session(user_agent, pool_size)
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {executor.submit(fetch_url, session, url): url for url in urls}
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
content, _ = future.result()
if content:
result = extract_main_content(content, url)
results.append(result)
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return results

def save_results_json(results, filename, minimize=False):
with open(filename, 'w', encoding='utf-8') as f:
if minimize:
json.dump(results, f, ensure_ascii=False, separators=(',', ':'))
else:
json.dump(results, f, ensure_ascii=False, indent=2)

def save_results_csv(results, filename, minimize=False):
with open(filename, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['URL', 'Title', 'Content', 'Links'])
for result in results:
links = json.dumps(result['links'], ensure_ascii=False, separators=(',', ':')) if minimize else json.dumps(result['links'])
writer.writerow([
result['url'],
result['title'],
result['content'],
links
])

def read_urls_from_file(file_path):
with open(file_path, 'r') as f:
return [line.strip() for line in f if line.strip()]

def main():
parser = argparse.ArgumentParser(description="Scrape the titles and links of articles from multiple news websites in parallel.")
parser.add_argument('urls', nargs='+', help='URLs of the websites to scrape')
default_max_workers = max(multiprocessing.cpu_count() * 2, 5)

parser = argparse.ArgumentParser(description="Scrape the main content from multiple websites in parallel.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog='''
Examples:
python script.py https://www.example1.com https://www.example2.com
python script.py --output results.json --format json https://www.example.com
python script.py --max-workers 5 --verbose https://www.example1.com https://www.example2.com
python script.py --input-file urls.txt
python script.py --output results.json --format json --minimize https://www.example.com
''')

parser.add_argument('urls', nargs='*', help='URLs of the websites to scrape')
parser.add_argument('-i', '--input-file', help='Path to a file containing URLs to scrape (one per line)')
parser.add_argument('-o', '--output', help='Output file to save results')
parser.add_argument('-f', '--format', choices=['json', 'csv'], default='json', help='Output format (default: json)')
parser.add_argument('-w', '--max-workers', type=int, default=default_max_workers,
help=f'Maximum number of worker threads (default: {default_max_workers})')
parser.add_argument('-v', '--verbose', action='store_true', help='Increase output verbosity')
parser.add_argument('--user-agent', default=DEFAULT_USER_AGENT, help='Custom User-Agent string')
parser.add_argument('--pool-size', type=int, default=DEFAULT_POOL_SIZE, help='Connection pool size')
parser.add_argument('-m', '--minimize', action='store_true', help='Minimize output file size')

args = parser.parse_args()

scrape_websites(args.urls)
if args.verbose:
logger.setLevel(logging.DEBUG)

if args.input_file:
urls = read_urls_from_file(args.input_file)
elif args.urls:
urls = args.urls
else:
parser.error("Either provide URLs as arguments or use the --input-file option.")

start_time = time.time()
results = scrape_websites(urls, args.max_workers, args.user_agent, args.pool_size)

if args.output:
if args.format == 'json':
save_results_json(results, args.output, args.minimize)
elif args.format == 'csv':
save_results_csv(results, args.output, args.minimize)
logger.info(f"Results saved to {args.output}")
else:
if args.minimize:
print(json.dumps(results, ensure_ascii=False, separators=(',', ':')))
else:
print(json.dumps(results, ensure_ascii=False, indent=2))

end_time = time.time()
logger.info(f"Scraping completed in {end_time - start_time:.2f} seconds")

if __name__ == "__main__":
main()

0 comments on commit 58b8f7d

Please sign in to comment.