Skip to content

Commit

Permalink
Revised ASM Sync/SQS ASM Sync code
Browse files Browse the repository at this point in the history
  • Loading branch information
aloftus23 committed Dec 17, 2024
1 parent 38976e5 commit 832311e
Show file tree
Hide file tree
Showing 5 changed files with 430 additions and 40 deletions.
113 changes: 97 additions & 16 deletions src/pe_asm/asm_sync.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""A tool for gathering pe asm data.
Usage:
pe-asm-sync METHOD [--log-level=LEVEL] [--staging]
pe-asm-sync METHOD [--log-level=LEVEL] [--staging] [--org=ORG]
Options:
-h --help Show this message.
Expand All @@ -10,6 +10,10 @@
-l --log-level=LEVEL If specified, then the log level will be set to
the specified value. Valid values are "debug", "info",
"warning", "error", and "critical". [default: info]
-o --org=ORG The cyhy_db_name of the single organization to collect data for.
This option is only used for the SQS version of the ASM Sync.
Org name must match the ID in the cyhy-db. E.g. DHS,DHS_ICE,DOC.
[default: all]
-s --staging Run on the staging database. Otherwise will run on a local copy.
"""

Expand Down Expand Up @@ -38,6 +42,13 @@
identify_sub_changes,
pe_db_connect,
pe_db_staging_connect,
# SQS version imports
sqs_query_org,
sqs_identify_cidr_changes,
sqs_identify_ip_changes,
sqs_identify_sub_changes,
sqs_identify_ip_sub_changes,
sqs_identified_sub_domains,
)
from .helpers.enumerate_subs_from_root import get_subdomains
from .helpers.fill_cidrs_from_cyhy_assets import fill_cidrs
Expand All @@ -60,14 +71,18 @@
LOGGER = logging.getLogger(__name__)


def run_asm_sync(staging, method):
def run_asm_sync(staging, method, org):
"""Collect and sync ASM data."""
if method == "asm":
# Non-SQS version of ASM Sync
LOGGER.info("--- ASM Sync Process Starting ---")
asm_start = time.time()

# --- Local Portion of ASM Sync ---
# *** This portion of the ASM Sync process needs to be run locally
# on a Macbook because the Accessor is not allowed to directly
# connect to the CyHy environment. A dedicated python script is
# available for this "local step" of the ASM Sync
# *** This portion of the ASM Sync process needs to be run locally
# on a Macbook because the Accessor is not allowed to directly
# connect to the CyHy environment. A dedicated python script is
# available for this "local step" of the ASM Sync

# Fetch assets from the CyHy database and store them in the PE database
# LOGGER.info("Retrieving assets from the CyHy database...")
Expand All @@ -76,13 +91,13 @@ def run_asm_sync(staging, method):


# --- Non-Local Portion of ASM Sync ---
# *** This portion of the ASM Sync process can run remotely on the
# Accessor because it does not require connecting to the CyHy environment

# *** This portion of the ASM Sync process can run remotely on the
# Accessor because it does not require connecting to the CyHy environment
print("*** Running ATC-Framework version of ASM Sync ***")

# Fill the PE CIDRs table using the CyHy assets
LOGGER.info("Filling the CIDRs table using the retrieved CyHy assets...")
fill_cidrs("all_orgs", staging)
fill_cidrs(staging, "all_orgs")
LOGGER.info("Finished filling the CIDRs table using the retrieved CyHy assets")
# Identify which CIDRs are current
LOGGER.info("Identifying CIDR changes...")
Expand Down Expand Up @@ -128,6 +143,77 @@ def run_asm_sync(staging, method):
dedupe(staging) # Takes about ~12hrs
LOGGER.info("Finished running Shodan dedupe")

asm_end = time.time()
LOGGER.info(f"Execution time for ASM Sync: {str(timedelta(seconds=(asm_end - asm_start)))} (H:M:S)")
LOGGER.info("--- ASM Sync Process Complete ---")

if method == "asm-sqs":
# SQS version of the ASM Sync (single org)
LOGGER.info(f"--- SQS ASM Sync Process Starting for {org} ---")
sqs_asm_start = time.time()

# --- Local Portion of ASM Sync ---
# *** Warning: The local portion of the ASM Sync process needs
# to be run locally on a Macbook before the following code can
# run. A dedicated python script is available for this
# "local step" of the ASM Sync

# --- Non-Local Portion of ASM Sync ---
# *** This portion of the ASM Sync process can run remotely on the
# Accessor because it does not require connecting to the CyHy environment

# Retrieve additional info for the specified org
org_df = sqs_query_org(staging, org)
org_uid = org_df["organizations_uid"][0]

# Fill the cidrs table with new data from the cyhy_db_assets
LOGGER.info("Filling the CIDRs table using the retrieved CyHy assets...")
fill_cidrs(staging, org_df)
LOGGER.info("Finished filling the CIDRs table using the retrieved CyHy assets")

# Identify which CIDRs are current
LOGGER.info("Identifying CIDR changes...")
sqs_identify_cidr_changes(staging, org_uid)
LOGGER.info("Finished identifying CIDR changes")

# Enumerate subdomains from roots
LOGGER.info("Enumerating sub-domains from root domains...")
get_subdomains(staging, org_df)
LOGGER.info("Finished enumerating sub-domains from root domains")

# Enumerate subdomains from IPs
LOGGER.info("Linking sub-domains and ips using ips...")
connect_subs_from_ips(staging, org_df)
LOGGER.info("Finished linking sub-domains and ips using ips")

# Enumerate IPs from subdomains
LOGGER.info("Linking sub-domains and ips using sub-domains...")
connect_ips_from_subs(staging, org_df)
LOGGER.info("Finished linking sub-domains and ips using sub-domains")

# Identify which IPs, sub-domains, and connections are current
LOGGER.info("Identify IP changes...")
sqs_identify_ip_changes(staging, org_uid)
LOGGER.info("Finished identifying IP changes")
LOGGER.info("Identifying sub-domain changes...")
sqs_identify_sub_changes(staging, org_uid)
LOGGER.info("Finished identifying sub-domain changes")
LOGGER.info("Identifying IP sub-domain link changes...")
sqs_identify_ip_sub_changes(staging, org_uid)
LOGGER.info("Finished identifying IP sub-domain link changes")
LOGGER.info("Updating identified sub-domains...")
sqs_identified_sub_domains(staging, org_uid)
LOGGER.info("Finished updating identified sub-domains")

# Run shodan dedupe
LOGGER.info("Running Shodan dedupe...")
dedupe(staging, org_df)
LOGGER.info("Finished running Shodan dedupe")

sqs_asm_end = time.time()
LOGGER.info(f"SQS ASM Sync execution time for {org}: {str(timedelta(seconds=(sqs_asm_end - sqs_asm_start)))} (H:M:S)")
LOGGER.info(f"--- SQS ASM Sync Process Complete for {org} ---")

elif method == "scorecard":
LOGGER.info("STARTING")
get_cyhy_port_scans(staging)
Expand Down Expand Up @@ -190,12 +276,7 @@ def main():
staging = False

# Run ASM Sync
LOGGER.info("--- ASM Sync Process Starting ---")
start_time = time.time()
run_asm_sync(staging, validated_args["METHOD"])
end_time = time.time()
LOGGER.info(f"Execution time for ASM Sync: {str(timedelta(seconds=(end_time - start_time)))} (H:M:S)")
LOGGER.info("--- ASM Sync Process Complete ---")
run_asm_sync(staging, validated_args["METHOD"], validated_args["--org"])

# Stop logging and clean up
logging.shutdown()
Loading

0 comments on commit 832311e

Please sign in to comment.