-
Notifications
You must be signed in to change notification settings - Fork 2
/
viper.py
87 lines (72 loc) · 2.41 KB
/
viper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import requests
import json
import jsonlines
import pandas
import os
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
vi_plus_api_key = os.environ.get("VI_Plus_API_Key")
headers = {"X-Risk-Token": vi_plus_api_key}
cve_list_output_json_file = "cve_list.json"
output_jsonl_file = "data/vidata.json"
API_Host = os.environ.get("API_Host", "api.kennasecurity.com")
def requests_retry_session(
retries=3,
backoff_factor=0.3,
status_forcelist=(429, 500, 502, 504),
session=None,
):
session = session or requests.Session()
retry = Retry(
total=retries,
read=retries,
connect=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
def chunks(lst, n):
for i in range(0, len(lst), n):
yield lst[i : i + n]
def import_cves():
params = {}
updated_since = os.environ.get("Updated_Since")
if updated_since:
params["updated_since"] = updated_since
import_cves_url = (
"https://" + API_Host + "/vulnerability_definitions/cve_identifiers"
)
r = requests_retry_session().get(
import_cves_url, params=params, headers=headers
)
json_cve_ids = r.json()
cve_ids = json_cve_ids["cve_identifiers"]
print(f"Pulling {len(cve_ids)} CVEs")
with open(cve_list_output_json_file, "w") as write_file:
json.dump(cve_ids, write_file, indent=4, sort_keys=True)
page_size = 100
with jsonlines.open(output_jsonl_file, mode="w") as writer:
cve_ids_page = cve_ids[:page_size]
cve_ids = cve_ids[page_size:]
i = 0
while len(cve_ids_page) > 0:
request_cves_url = (
"https://" + API_Host + "/vulnerability_definitions/"
)
r = requests_retry_session().get(
request_cves_url,
params={"cves": ",".join(cve_ids_page)},
headers=headers,
)
json_cves = r.json()
for cve_id, cve_dict in json_cves.items():
writer.write(cve_dict)
cve_ids_page = cve_ids[:page_size]
cve_ids = cve_ids[page_size:]
i = i + 1
if __name__ == "__main__":
import_cves()
df = pandas.read_json(r"data/vidata.json", lines=True)