-
Notifications
You must be signed in to change notification settings - Fork 0
/
malwarepatrol_ioc_feed.py
124 lines (105 loc) · 4.89 KB
/
malwarepatrol_ioc_feed.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
#!/var/www/MISP/venv/bin/python
# MalwarePatrol Feed Fetcher and Filter
# Version: 2.0
# Date: 15 October 2024
# Author: A.R.
# Licence: MIT
"""
This script automates the process of fetching IP addresses from the MalwarePatrol feed. It downloads the latest feed,
extracts unique IP addresses, filters out non-public and special-use IP addresses as defined by RFCs, and excludes IPs
present in the `consolidated_ips.json` file.
By utilising functions from `zero_noise_ips.py`, specifically `is_non_public_ip` and `update_consolidated_ips`, the
script ensures that only public, routable IP addresses that are not known benign or irrelevant are retained.
The cleaned list of IPs is then saved to a specified CSV file.
This process reduces false positives and enhances the relevance of the data when integrating with security platforms
like MISP or OpenCTI. The script is designed to handle large datasets efficiently, including mechanisms to check for
data changes and avoid unnecessary processing.
"""
import requests
import os
import gzip
import hashlib
import logging
import json
import csv
from zero_noise_ips import is_non_public_ip, update_consolidated_ips # Import necessary functions
def get_hash_of_file(file_content):
"""Calculates the MD5 hash of the file content."""
return hashlib.md5(file_content).hexdigest()
def process_json_to_csv(gz_path, csv_path, consolidated_ips):
"""
Processes a gzipped JSON file, extracts unique public IPs not in consolidated_ips,
and writes them to a CSV, checking for changes.
"""
with gzip.open(gz_path, 'rt', encoding='utf-8') as f_in:
data = json.load(f_in)
current_hash = get_hash_of_file(json.dumps(data).encode('utf-8'))
# Check if the CSV file exists and if the hash matches
if os.path.exists(csv_path):
with open(csv_path, 'r') as f:
existing_hash = f.readline().strip()
if existing_hash == current_hash:
logging.info("No changes in data. Existing data is up-to-date.")
return False
unique_ips = set()
for entry in data:
ip = entry.get('indicator')
if ip and not is_non_public_ip(ip) and ip not in consolidated_ips:
unique_ips.add(ip)
if not unique_ips:
logging.info("No new IPs to write after filtering.")
return False
rows = [{'ip': ip} for ip in unique_ips]
with open(csv_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=['ip'])
writer.writeheader()
writer.writerows(rows)
# Store hash at the beginning of the file
with open(csv_path, 'r+') as f:
content = f.read()
f.seek(0, 0)
f.write(current_hash + '\n' + content)
logging.info(f"Processed JSON data and written to CSV at {csv_path}")
return True
def download_latest_file(url, dl_dir, ext_dir, consolidated_ips):
"""Downloads the latest feed file, processes it, and cleans up."""
os.makedirs(dl_dir, exist_ok=True)
os.makedirs(ext_dir, exist_ok=True)
gz_path = os.path.join(dl_dir, 'downloaded_file.gz')
csv_path = os.path.join(ext_dir, 'malwarepatrol_feed.csv')
logging.info(f"Attempting to download file from {url}")
try:
response = requests.get(url)
if response.status_code == 200:
with open(gz_path, 'wb') as f:
f.write(response.content)
logging.info(f"Downloaded file to {gz_path}")
if process_json_to_csv(gz_path, csv_path, consolidated_ips):
os.remove(gz_path)
else:
logging.info("No need to update the CSV.")
os.remove(gz_path)
else:
logging.error(f"Failed to download: {url}, Status Code: {response.status_code}")
except Exception as e:
logging.error(f"Error downloading or processing file: {e}")
raise
# Logging configuration
logging.basicConfig(
filename='/var/log/local_feeds.log',
level=logging.INFO,
format='%(asctime)s - %(levelname)s - malwarepatrol_feed_fetcher.py: %(message)s'
)
# Main execution (if run as a script)
if __name__ == "__main__":
# Update the consolidated IPs and load them into a set
logging.info("Updating consolidated IPs from zero_noise_ips.py...")
consolidated_ips_set = update_consolidated_ips()
if not consolidated_ips_set:
with open('consolidated_ips.json', 'r') as file:
consolidated_ips_set = set(json.load(file))
logging.info("Consolidated IPs have been updated.")
base_url = "https://lists.malwarepatrol.net/cgi/getfile?receipt=f1719116834&product=48&list=risk_ips_v2"
download_directory = "/var/www/MISP/app/files/feeds/Malwarepatrol/download"
extract_directory = "/var/www/MISP/app/files/feeds/Malwarepatrol"
download_latest_file(base_url, download_directory, extract_directory, consolidated_ips_set)