Skip to content

Commit

Permalink
Cyberwatch-cli added --verify-ssl flag
Browse files Browse the repository at this point in the history
  • Loading branch information
aaTarek authored and hezanathos committed Jul 27, 2023
1 parent a2a1df7 commit 41e61e7
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 45 deletions.
10 changes: 5 additions & 5 deletions cli/bin/airgap/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,21 @@ def help():
print("{: >30} \t {}".format("download-compliance-scripts", "Download airgap compliance scripts"))
print("\n")

def manager(arguments):
def manager(arguments, verify_ssl=False):
if not arguments or arguments[0] == "help":
help()

elif arguments[0] == "download-scripts":
download_scripts.manager(arguments[1:])
download_scripts.manager(arguments[1:], verify_ssl)

elif arguments[0] == "download-compliance-scripts":
download_compliance_scripts.manager(arguments[1:])
download_compliance_scripts.manager(arguments[1:], verify_ssl)

elif arguments[0] == "upload":
upload_scripts.manager(arguments[1:])
upload_scripts.manager(arguments[1:], verify_ssl)

elif arguments[0] == "upload-compliance":
upload_compliance_scripts.manager(arguments[1:])
upload_compliance_scripts.manager(arguments[1:], verify_ssl)

else:
print("ERROR : '" + str(arguments[0]) + "' is not a valid subcommand\n---", file=sys.stderr)
Expand Down
14 changes: 8 additions & 6 deletions cli/bin/airgap/download_compliance_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ def help():
print("{: >20} \t {: >30} \t{}".format("--dest-dir", "cyberwatch-airgap-compliance", "Destination folder where to put the downloaded scripts"))
print("\n")

def retrieve_compliance_scripts(os_key, repositories):
def retrieve_compliance_scripts(os_key, repositories, verify_ssl=False):
apiResponse = Cyberwatch_Pyhelper().request(
method="GET",
endpoint="/api/v2/compliances/scripts",
body_params={
"os" : str(os_key),
"repositories" : repositories
}
},
verify_ssl=verify_ssl
)
return next(apiResponse).json()

def download_compliance_scripts(os_key, repositories, destination_folder):
def download_compliance_scripts(os_key, repositories, destination_folder, verify_ssl=False):
script_dir = join(abspath(destination_folder), "scripts")
upload_dir = join(abspath(destination_folder), "uploads")
if os.path.exists(script_dir):
Expand All @@ -38,7 +39,7 @@ def download_compliance_scripts(os_key, repositories, destination_folder):
os.makedirs(upload_dir, exist_ok=True)

print("Downloading compliance scripts..")
compliance_scripts_metadata = retrieve_compliance_scripts(os_key, repositories)
compliance_scripts_metadata = retrieve_compliance_scripts(os_key, repositories, verify_ssl)

if 'error' in compliance_scripts_metadata:
print(compliance_scripts_metadata["error"]["message"])
Expand All @@ -51,7 +52,8 @@ def download_compliance_scripts(os_key, repositories, destination_folder):
print("\033[A\033[A\nDownload complete ! Scripts are located in '" + str(destination_folder) + "'")


def manager(arguments):
def manager(arguments, verify_ssl=False):

parser = argparse.ArgumentParser()
parser.add_argument("--os")
parser.add_argument("--repositories", "--groups", nargs='*')
Expand All @@ -71,4 +73,4 @@ def manager(arguments):
print("Use the 'help' subcommand to view available actions")

else:
download_compliance_scripts(options.os, options.repositories, options.dest_dir)
download_compliance_scripts(options.os, options.repositories, options.dest_dir, verify_ssl)
23 changes: 13 additions & 10 deletions cli/bin/airgap/download_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,25 @@ def help():
done
"""

def retrieve_scripts(scriptID=""):
def retrieve_scripts(scriptID, verify_ssl=False):
# If the script ID is an empty string, this will fetch every scripts available
apiResponse = Cyberwatch_Pyhelper().request(
method="GET",
endpoint="/api/v2/cbw_scans/scripts/" + str(scriptID)
endpoint="/api/v2/cbw_scans/scripts/" + str(scriptID),
verify_ssl=verify_ssl
)
return next(apiResponse).json()

def download_scripts(destination_folder, with_attachment=False):
def download_scripts(destination_folder, with_attachment=False, verify_ssl=False):
script_dir = join(abspath(destination_folder), "scripts")
upload_dir = join(abspath(destination_folder), "uploads")
os.makedirs(script_dir, exist_ok=True)
os.makedirs(upload_dir, exist_ok=True)

scripts_metadata = retrieve_scripts()
scripts_metadata = retrieve_scripts("", verify_ssl)
print("Downloading scripts..")
# Downloading every single script
saved_scripts = [download_individual_script(script["id"], script_dir, with_attachment) for script in scripts_metadata]
saved_scripts = [download_individual_script(script["id"], script_dir, with_attachment, verify_ssl) for script in scripts_metadata]
# Grouping script by OS
grouped_scripts = groupby(sorted(saved_scripts), lambda x: x[0])

Expand All @@ -70,8 +72,8 @@ def append_extension(target_os):
if target_os == "Windows": return ".ps1"
return ""

def download_individual_script(scriptID, base_dir, with_attachment=False):
script = retrieve_scripts(str(scriptID))
def download_individual_script(scriptID, base_dir, with_attachment=False, verify_ssl=False):
script = retrieve_scripts(str(scriptID), verify_ssl)
if script is None or script["type"] is None: return None, None

target_os, script_name = script["type"].split("::")[1:]
Expand All @@ -85,14 +87,15 @@ def download_individual_script(scriptID, base_dir, with_attachment=False):

# Download the attachment if it exists
if script["attachment"] and with_attachment:
attachment = requests.get(script["attachment"], allow_redirects=True, verify=False)
attachment = requests.get(script["attachment"], allow_redirects=True, verify=verify_ssl)
with open(join(script_dir, basename(script["attachment"])), "wb") as attachmentFile:
attachmentFile.write(attachment.content)

print("\033[A\033[A\nDownloaded script : " + str(script_name) + " " * 40)
return target_os, script_filename

def manager(arguments):
def manager(arguments, verify_ssl=False):

parser = argparse.ArgumentParser()
parser.add_argument("--no-attachment", action="store_false")
parser.add_argument("--dest-dir", default="cyberwatch-airgap")
Expand All @@ -102,4 +105,4 @@ def manager(arguments):
if arguments and arguments[0] == "help":
help()
else:
download_scripts(options.dest_dir, options.no_attachment)
download_scripts(options.dest_dir, options.no_attachment, verify_ssl)
12 changes: 7 additions & 5 deletions cli/bin/airgap/upload_compliance_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ def help():
print("---")
print("If there is no file specified, this script will look into the `cyberwatch-airgap-compliance/uploads/` directory if it exists to find results scripts to upload.")

def upload_result_file(result_file):
def upload_result_file(result_file, verify_ssl=False):
print("\n[*] Uploading : " + str(result_file))

try: # Catch error like 'file doesn't exist'
Expand All @@ -31,7 +31,8 @@ def upload_result_file(result_file):
endpoint="/api/v2/cbw_scans/scripts",
body_params={
"output" : file_content
}
},
verify_ssl=verify_ssl
)
result = next(apiResponse).json()

Expand All @@ -45,7 +46,8 @@ def upload_result_file(result_file):

return next(apiResponse).json()

def manager(arguments):
def manager(arguments, verify_ssl=False):

parser = argparse.ArgumentParser()
parser.add_argument("files", nargs="*")
options = parser.parse_args(arguments)
Expand All @@ -59,8 +61,8 @@ def manager(arguments):
else:
print("No file has been specified, searching through the `cyberwatch-airgap-compliance/uploads/` directory.\n--")
for file in os.listdir("cyberwatch-airgap-compliance/uploads"):
upload_result_file(os.path.join("cyberwatch-airgap-compliance/uploads", file))
upload_result_file(os.path.join("cyberwatch-airgap-compliance/uploads", file), verify_ssl)
else:
for file in options.files:
upload_result_file(file)
upload_result_file(file, verify_ssl)

13 changes: 7 additions & 6 deletions cli/bin/airgap/upload_scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from os.path import abspath, basename, dirname, join
from itertools import groupby
import argparse

import chardet
import sys
import os
Expand All @@ -14,7 +13,7 @@ def help():
print("---")
print("If there is no file specified, this script will look into the `cyberwatch-airgap/uploads/` directory if it exists to find results scripts to upload.")

def upload_result_file(result_file):
def upload_result_file(result_file, verify_ssl=False):
print("\n[*] Uploading : " + str(result_file))

try: # Catch error like 'file doesn't exist'
Expand All @@ -32,7 +31,8 @@ def upload_result_file(result_file):
endpoint="/api/v2/cbw_scans/scripts",
body_params={
"output" : file_content
}
},
verify_ssl=verify_ssl
)
result = next(apiResponse).json()

Expand All @@ -46,7 +46,8 @@ def upload_result_file(result_file):

return next(apiResponse).json()

def manager(arguments):
def manager(arguments, verify_ssl=False):

parser = argparse.ArgumentParser()
parser.add_argument("files", nargs="*")
options = parser.parse_args(arguments)
Expand All @@ -60,8 +61,8 @@ def manager(arguments):
else:
print("No file has been specified, searching through the `cyberwatch-airgap/uploads/` directory.\n--")
for file in os.listdir("cyberwatch-airgap/uploads"):
upload_result_file(os.path.join("cyberwatch-airgap/uploads", file))
upload_result_file(os.path.join("cyberwatch-airgap/uploads", file), verify_ssl)
else:
for file in options.files:
upload_result_file(file)
upload_result_file(file, verify_ssl)

10 changes: 6 additions & 4 deletions cli/bin/os.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,26 @@ def help():
print("{: >10} \t {}".format("list", "List all operating systems"))
print("\n")

def retrieve_os():
def retrieve_os(verify_ssl=False):
apiResponse = Cyberwatch_Pyhelper().request(
method="GET",
endpoint="/api/v3/os"
endpoint="/api/v3/os",
verify_ssl=verify_ssl
)
os = []
for page in apiResponse:
os = os + page.json()
return os

def manager(arguments):
def manager(arguments, verify_ssl=False):

if not arguments or arguments[0] == "help":
help()

elif arguments[0] == "list":
print("{:<32} {:<27} {:<13}".format("KEY", "SHORT NAME", "ARCH"))
print("{:<32} {:<27} {:<13}".format("---", "---", "---"))
for single_os in retrieve_os():
for single_os in retrieve_os(verify_ssl):
print("{:<32} {:<27} {:<13}".format(single_os["key"], single_os["short_name"], single_os["arch"] if single_os["arch"] else "-"))

else:
Expand Down
27 changes: 20 additions & 7 deletions cli/cyberwatch-cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,39 @@
from cyberwatch_api import Cyberwatch_Pyhelper
import sys
from bin import os, airgap
import requests
from urllib3.exceptions import InsecureRequestWarning

# Disable certificate check warnings
requests.packages.urllib3.disable_warnings(category=InsecureRequestWarning)

def help():
print("Usage : " + str(sys.argv[0]) + " [COMMAND] [ARGS]")
print("Usage : " + str(sys.argv[0]) + " [--verify_ssl] [COMMAND] [ARGS]")
print("---")
print("Cli to interact with Cyberwatch API.\n")
print("{: >10} \t {}".format("COMMAND", "DESCRIPTION"))
print("{: >10} \t {}".format("---", "---"))
print("{: >10} \t {}".format("os", "Manage cyberwatch operating systems"))
print("{: >10} \t {}".format("airgap", "Interact with the airgap interface"))
print("{: >15} \t {}".format("--verify-ssl", "Using this options, SSL verification will be done"))
print("\n")
print("{: >15} \t {}".format("COMMAND", "DESCRIPTION"))
print("{: >15} \t {}".format("---", "---"))
print("{: >15} \t {}".format("os", "Manage cyberwatch operating systems"))
print("{: >15} \t {}".format("airgap", "Interact with the airgap interface"))
print("\n")

arguments = sys.argv[1:]

try:
# Check if we should verify the SSL Certificate
VERIFY_SSL = False
if "--verify-ssl" in arguments:
VERIFY_SSL = True
arguments.remove("--verify-ssl")

if not arguments or arguments[0] == "help":
help()
elif arguments[0] == "os":
os.manager(arguments[1:])
os.manager(arguments[1:], VERIFY_SSL)
elif arguments[0] == "airgap":
airgap.manager(arguments[1:])
airgap.manager(arguments[1:], VERIFY_SSL)
else:
print("ERROR : '" + str(arguments[0]) + "' is not a valid command\n---", file=sys.stderr)
help()
Expand Down
5 changes: 3 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
from setuptools import setup, find_packages

setup(name='cyberwatch_api',
version='0.3.1',
version='0.3.2',
description='Python Api client for the Cyberwatch software',
long_description=open('README.md').read().strip(),
long_description_content_type="text/markdown",
author='CyberWatch SAS',
packages=find_packages(),
py_modules=['cyberwatch_api'],
license='MIT',
install_requires=['requests>=2.20.1']
install_requires=['requests>=2.20.1'],
scripts=['cli/cyberwatch-cli']
)

0 comments on commit 41e61e7

Please sign in to comment.