Skip to content

Commit

Permalink
Merge pull request #95 from ossf/scovetta/assertion-fixes1
Browse files Browse the repository at this point in the history
A few fixes for the assertion generator.
  • Loading branch information
Cyber-JiuJiteria authored Jan 2, 2023
2 parents 7b58899 + 2c7f0dd commit f9aced5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 29 deletions.
54 changes: 31 additions & 23 deletions omega/oaf/omega/analyze.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import shlex
import subprocess # nosec: B404
import tempfile
import uuid
from datetime import datetime, timedelta

from dotenv import dotenv_values
Expand All @@ -23,7 +22,7 @@ class AnalysisRunner:
Executes analysis and creates assertions.
"""

def __init__(self, package_url: str, docker_container: str, repository: str, signer: str):
def __init__(self, package_url: str, docker_container: str, repository: str, signer: str, work_directory: str | None):
"""Initialize a new Analysis Runner."""
required_commands = [
["python", "-V"],
Expand All @@ -49,20 +48,27 @@ def __init__(self, package_url: str, docker_container: str, repository: str, sig
self.repository = repository
self.signer = signer

_uuid = str(uuid.uuid4())
self.work_directory = os.path.join(tempfile.gettempdir(), f"omega-{_uuid}") # ADD UUID

self.work_directory = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with
prefix="omega-", ignore_cleanup_errors=True
)
logging.debug("Output directory: %s", self.work_directory.name)
# Set up the work directory (default: temporary, or provided by the user)
if work_directory:
self.work_directory = work_directory
self.work_directory_name = self.work_directory
else:
self.work_directory = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with
prefix="omega-", ignore_cleanup_errors=True
)
self.work_directory_name = self.work_directory.name
logging.debug("Output (work) directory: %s", self.work_directory_name)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Cleans up after ourselves."""
logging.warning("We did not clean up the directory: %s", self.work_directory)
"""Clean up after ourselves."""
if isinstance(self.work_directory, tempfile.TemporaryDirectory):
try:
self.work_directory.cleanup()
except Exception: # pylint: disable=broad-except
logging.warning("We were unable to clean up the directory: %s", self.work_directory_name)

def execute_docker_container(self):
"""Runs the Omega docker container with specific arguments."""
Expand All @@ -73,7 +79,7 @@ def execute_docker_container(self):
"--rm",
"-t",
"-v",
f"{self.work_directory.name}:/opt/export",
f"{self.work_directory_name}:/opt/export",
"--env-file",
".env",
self.docker_container,
Expand All @@ -82,24 +88,24 @@ def execute_docker_container(self):

# Write the command to a file so we can capture it later
self.docker_cmdline = shlex.join(cmd)
with open(f"{self.work_directory.name}/top-execute-cmd.txt", "w", encoding="utf-8") as f:
with open(f"{self.work_directory_name}/top-execute-cmd.txt", "w", encoding="utf-8") as f:
f.write(self.docker_cmdline)

logging.debug("Running command: %s", cmd)
res = subprocess.Popen( # nosec B603
with subprocess.Popen( # nosec B603
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
encoding="utf-8",
universal_newlines=True
)
for line in iter(res.stdout.readline, ""):
logging.debug(line.rstrip())
) as res:
for line in iter(res.stdout.readline, ""):
logging.debug(line.rstrip())

res.stdout.close()
res.stdout.close()

if res.wait() != 0:
raise RuntimeError(f"Error running docker container: {res.stderr}")
if res.wait() != 0:
raise RuntimeError(f"Error running docker container: {res.stderr}")

def _execute_assertion_noexcept(self, **kwargs):
try:
Expand All @@ -119,7 +125,8 @@ def _execute_assertion(self, **kwargs):
kwargs["expiration"] = datetime.strftime(
datetime.now() + timedelta(days=2 * 365), "%Y-%m-%dT%H:%M:%S.%fZ"
)
cmd.append(f"--expiration={kwargs['expiration']}")
else:
cmd.append(f"--expiration={kwargs['expiration']}")

if kwargs.get('signer'):
cmd.append(f"--signer={kwargs['signer']}")
Expand All @@ -146,7 +153,7 @@ def _execute_assertion(self, **kwargs):

def find_output_file(self, filename: str) -> str:
"""Finds a file in the output directory."""
for root, _, files in os.walk(self.work_directory.name):
for root, _, files in os.walk(self.work_directory_name):
if filename in files:
return os.path.join(root, filename)
return None
Expand Down Expand Up @@ -235,6 +242,7 @@ def execute_assertions(self):
parser.add_argument(
"--toolchain-container", required=False, default="openssf/omega-toolshed:latest"
)
parser.add_argument('--work-directory', required=False, help='Use a specific working directory instead of a temporary one.')
parser.add_argument(
"--repository", required=True
)
Expand All @@ -244,6 +252,6 @@ def execute_assertions(self):
args = parser.parse_args()

logging.info("Starting analysis runner")
runner = AnalysisRunner(args.package_url, args.toolchain_container, args.repository, args.signer)
runner = AnalysisRunner(args.package_url, args.toolchain_container, args.repository, args.signer, args.work_directory)
runner.execute_docker_container()
runner.execute_assertions()
18 changes: 12 additions & 6 deletions omega/oaf/omega/assertion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,22 @@ def find_repository(package_url: PackageURL | str) -> str | None:
raise EnvironmentError("Invalid PackageURL provided.")

if package_url.type == "github":
return purl2url(package_url)
try:
return purl2url(str(package_url))
except Exception:
logging.warning("Unable to parse PackageURL to GitHub repository: %s", str(package_url))

if not is_command_available(["oss-find-source"]):
raise EnvironmentError("oss-find-source is not available.")

cmd = ["oss-find-source", "-S", str(package_url)]
res = subprocess.run(cmd, check=False, capture_output=True, encoding="utf-8") # nosec B603
if res.returncode == 0:
repository = res.stdout.strip()
return repository or None
try:
cmd = ["oss-find-source", "-S", str(package_url)]
res = subprocess.run(cmd, check=False, capture_output=True, encoding="utf-8") # nosec B603
if res.returncode == 0:
repository = res.stdout.strip()
return repository or None
except Exception:
logging.warning("Failed to find repository for %s", str(package_url))

return None

Expand Down

0 comments on commit f9aced5

Please sign in to comment.