From 82d4530d2cff86c23568b86173cbad747739e758 Mon Sep 17 00:00:00 2001 From: Kurtis Seebaldt Date: Mon, 19 Sep 2022 14:32:26 -0500 Subject: [PATCH] Add path secrets and fix permissions --- sample_pipelines/driver.py | 11 +++-- sample_pipelines/traffic/raw.py | 8 ++-- sample_pipelines/traffic/stage.py | 6 +-- sample_pipelines/utils/s3.py | 7 ---- tests/sample_pipelines/driver_test.py | 36 +++++++++++++++++ tests/sample_pipelines/traffic/raw_test.py | 33 ++++++++------- tests/sample_pipelines/traffic/stage_test.py | 19 +++++---- tf/modules/glue/main.tf | 40 ++++++++++++++++++- tf/modules/glue/outputs.tf | 8 ++++ .../glue/policies/crawler_policy.json.tpl | 4 +- tf/modules/glue/policies/policy.json.tpl | 12 +++++- tf/modules/pipelines/austin_traffic/main.tf | 8 ++-- tf/modules/pipelines/austin_traffic/vars.tf | 8 ++++ tf/modules/pipelines/main.tf | 2 + 14 files changed, 157 insertions(+), 45 deletions(-) delete mode 100644 sample_pipelines/utils/s3.py create mode 100644 tests/sample_pipelines/driver_test.py diff --git a/sample_pipelines/driver.py b/sample_pipelines/driver.py index d968cdf..086c576 100644 --- a/sample_pipelines/driver.py +++ b/sample_pipelines/driver.py @@ -29,13 +29,18 @@ def shell_job(): def _get_job_function(): - args = getResolvedOptions(sys.argv, ["job_module", "job_function", "secret_id"]) + args = getResolvedOptions( + sys.argv, ["job_module", "job_function", "path_secret_id", "secrets_id"] + ) job_module = import_module(args["job_module"]) f = getattr(job_module, args["job_function"]) - return partial(f, secrets(args["secret_id"])) + return partial(f, secrets(args["path_secret_id"]), secrets(args["secrets_id"])) def secrets(key: str): client = boto3.client("secretsmanager") - return json.loads(client.get_secret_value(SecretId=key)["SecretString"]) + try: + return json.loads(client.get_secret_value(SecretId=key)["SecretString"]) + except client.exceptions.ResourceNotFoundException: + return {} diff --git a/sample_pipelines/traffic/raw.py b/sample_pipelines/traffic/raw.py index 55761a1..0fa8264 100644 --- a/sample_pipelines/traffic/raw.py +++ b/sample_pipelines/traffic/raw.py @@ -9,12 +9,12 @@ LIMIT = 10000000 -def import_csv(secrets): +def import_csv(paths, secrets): now_timestamp = _now() s3 = boto3.client("s3") where = f"traffic_report_status_date_time <= '{now_timestamp}'" - last_timestamp = _get_last_timestamp(s3, secrets["raw_path"]) + last_timestamp = _get_last_timestamp(s3, paths["raw_path"]) if last_timestamp: where = f"{where} and traffic_report_status_date_time > '{last_timestamp}'" @@ -22,7 +22,7 @@ def import_csv(secrets): DATASET_URL, params={"$where": where, "$limit": LIMIT}, stream=True ) - url = f"{secrets['raw_path']}sample/austin_traffic/load_time={now_timestamp}/data.csv.gz" + url = f"{paths['raw_path']}sample/austin_traffic/load_time={now_timestamp}/data.csv.gz" results = response.iter_lines() @@ -36,7 +36,7 @@ def import_csv(secrets): except StopIteration: return None - _write_last_timestamp(s3, secrets["raw_path"], now_timestamp) + _write_last_timestamp(s3, paths["raw_path"], now_timestamp) def _get_last_timestamp(s3, base_url): diff --git a/sample_pipelines/traffic/stage.py b/sample_pipelines/traffic/stage.py index 8e84198..513357e 100644 --- a/sample_pipelines/traffic/stage.py +++ b/sample_pipelines/traffic/stage.py @@ -1,11 +1,11 @@ -def transform_raw(secrets, spark, glueContext): +def transform_raw(paths, secrets, spark, glueContext): df = ( spark.read.format("csv") .option("header", True) .option("inferSchema", True) - .load(f"{secrets['raw_path']}/sample/austin_traffic/") + .load(f"{paths['raw_path']}/sample/austin_traffic/") ) df.write.format("parquet").mode("overwrite").save( - f"{secrets['stage_path']}/sample/austin_traffic/" + f"{paths['stage_path']}/sample/austin_traffic/" ) diff --git a/sample_pipelines/utils/s3.py b/sample_pipelines/utils/s3.py deleted file mode 100644 index 3fa53bd..0000000 --- a/sample_pipelines/utils/s3.py +++ /dev/null @@ -1,7 +0,0 @@ -from typing import Tuple -import re - - -def parse_path(path: str) -> Tuple[str, str]: - parts = re.sub(r"^/", "", path.replace("s3://", "")).split("/", 1) - return parts[0], re.sub(r"/$", "", parts[1]) diff --git a/tests/sample_pipelines/driver_test.py b/tests/sample_pipelines/driver_test.py new file mode 100644 index 0000000..f429c3d --- /dev/null +++ b/tests/sample_pipelines/driver_test.py @@ -0,0 +1,36 @@ +import os + +import boto3 +from moto import mock_secretsmanager +from pytest import fixture + +from sample_pipelines.driver import secrets + + +@fixture(autouse=True) +def mock_secrets(): + os.environ["AWS_DEFAULT_REGION"] = "us-east-1" + + with mock_secretsmanager(): + client = boto3.client("secretsmanager", region_name="us-east-1") + yield client + + +@fixture() +def paths_key(mock_secrets): + key = "my-paths" + mock_secrets.create_secret( + Name=key, + SecretString='{"raw_path": "s3://abc/123"}', + ) + return key + + +def test_returns_parsed_secrets(paths_key): + result = secrets(paths_key) + assert result == {"raw_path": "s3://abc/123"} + + +def test_returns_empty_dict_when_key_is_not_found(): + result = secrets("other") + assert result == {} diff --git a/tests/sample_pipelines/traffic/raw_test.py b/tests/sample_pipelines/traffic/raw_test.py index e5998cc..84a4c74 100644 --- a/tests/sample_pipelines/traffic/raw_test.py +++ b/tests/sample_pipelines/traffic/raw_test.py @@ -27,52 +27,57 @@ def s3_bucket(bucket_name): @fixture -def secrets(bucket_name): +def paths(bucket_name): return {"raw_path": f"s3://{bucket_name}/raw/"} @fixture -def previous_timestamp(bucket_name, secrets): +def secrets(): + return {"some_secret": "abc123"} + + +@fixture +def previous_timestamp(bucket_name, paths): previous_timestamp = "2022-09-06T21:30:46.717814" s3 = boto3.client("s3") - _write_last_timestamp(s3, secrets["raw_path"], previous_timestamp) + _write_last_timestamp(s3, paths["raw_path"], previous_timestamp) return previous_timestamp -def test_queries_all_records(secrets, now, requests_mock): +def test_queries_all_records(paths, secrets, now, requests_mock): requests_mock.get( DATASET_URL, text="a,b,c", ) - import_csv(secrets) + import_csv(paths, secrets) where = f"traffic_report_status_date_time <= '{now.lower()}'" assert where in requests_mock.last_request.qs["$where"] def test_queries_records_since_last_timestamp( - secrets, requests_mock, now, previous_timestamp + paths, secrets, requests_mock, now, previous_timestamp ): requests_mock.get( DATASET_URL, text="a,b,c", ) - import_csv(secrets) + import_csv(paths, secrets) where = f"traffic_report_status_date_time <= '{now.lower()}' and traffic_report_status_date_time > '{previous_timestamp.lower()}'" assert where in requests_mock.last_request.qs["$where"] def test_writes_nothing_if_only_header_returned( - secrets, requests_mock, now, bucket_name + paths, secrets, requests_mock, now, bucket_name ): requests_mock.get( DATASET_URL, text="a,b,c", ) - import_csv(secrets) + import_csv(paths, secrets) s3 = boto3.resource("s3") @@ -81,29 +86,29 @@ def test_writes_nothing_if_only_header_returned( assert keys == [] -def test_writes_csv_file(secrets, requests_mock, now, bucket_name): +def test_writes_csv_file(paths, secrets, requests_mock, now, bucket_name): rows = ["col1,col2\n", "foo,bar\n", "abc,def\n"] requests_mock.get( DATASET_URL, text="".join(rows), ) - import_csv(secrets) + import_csv(paths, secrets) - url = f"{secrets['raw_path']}sample/austin_traffic/load_time={now}/data.csv.gz" + url = f"{paths['raw_path']}sample/austin_traffic/load_time={now}/data.csv.gz" lines = list(smart_open.open(url, transport_params={"client": boto3.client("s3")})) assert lines == rows -def test_writes_last_timestamp(secrets, requests_mock, now, bucket_name): +def test_writes_last_timestamp(paths, secrets, requests_mock, now, bucket_name): requests_mock.get( DATASET_URL, text="col1,col2\nfoo,bar\nabc,def\n", ) - import_csv(secrets) + import_csv(paths, secrets) s3 = boto3.resource("s3") diff --git a/tests/sample_pipelines/traffic/stage_test.py b/tests/sample_pipelines/traffic/stage_test.py index c720362..b081161 100644 --- a/tests/sample_pipelines/traffic/stage_test.py +++ b/tests/sample_pipelines/traffic/stage_test.py @@ -6,17 +6,22 @@ @fixture -def secrets(tmp_path, data_path): +def paths(tmp_path, data_path): return { "raw_path": str(data_path / "raw"), "stage_path": str(tmp_path / "stage"), } -def test_converts_dates(secrets, spark, glueContext): - transform_raw(secrets, spark, glueContext) +@fixture +def secrets(): + return {"some_secret": "abc123"} + + +def test_converts_dates(paths, secrets, spark, glueContext): + transform_raw(paths, secrets, spark, glueContext) df = spark.read.format("parquet").load( - f"{secrets['stage_path']}/sample/austin_traffic/" + f"{paths['stage_path']}/sample/austin_traffic/" ) expected = spark.createDataFrame( @@ -48,10 +53,10 @@ def test_converts_dates(secrets, spark, glueContext): ) -def test_converts_latitude_longitude(secrets, spark, glueContext): - transform_raw(secrets, spark, glueContext) +def test_converts_latitude_longitude(paths, secrets, spark, glueContext): + transform_raw(paths, secrets, spark, glueContext) df = spark.read.format("parquet").load( - f"{secrets['stage_path']}/sample/austin_traffic/" + f"{paths['stage_path']}/sample/austin_traffic/" ) expected = spark.createDataFrame( diff --git a/tf/modules/glue/main.tf b/tf/modules/glue/main.tf index b3e2f6e..b844d61 100644 --- a/tf/modules/glue/main.tf +++ b/tf/modules/glue/main.tf @@ -1,3 +1,24 @@ +locals { + path_secret_value = { + raw_path = "s3://${var.data_bucket}/raw/" + stage_path = "s3://${var.data_bucket}/stage/" + analytics_path = "s3://${var.data_bucket}/analytics/" + } +} + +resource "aws_secretsmanager_secret" "path_secret" { + name = "${var.app_prefix}-glue-paths" +} + +resource "aws_secretsmanager_secret" "secrets" { + name = "${var.app_prefix}-glue-secrets" +} + +resource "aws_secretsmanager_secret_version" "path_secret_version" { + secret_id = aws_secretsmanager_secret.path_secret.id + secret_string = jsonencode(local.path_secret_value) +} + resource "aws_iam_role" "glue_role" { name = "${var.app_prefix}-glue" @@ -11,6 +32,13 @@ resource "aws_iam_role" "glue_role" { "Service": "ec2.amazonaws.com" }, "Effect": "Allow" + }, + { + "Action": "sts:AssumeRole", + "Principal": { + "Service": "glue.amazonaws.com" + }, + "Effect": "Allow" } ] } @@ -44,7 +72,12 @@ EOF resource "aws_iam_policy" "glue_policy" { name = "${var.app_prefix}-glue" description = "Glue Access Policy" - policy = templatefile("${path.module}/policies/policy.json.tpl", { app_prefix = var.app_prefix, role_arn = aws_iam_role.glue_role.arn }) + policy = templatefile("${path.module}/policies/policy.json.tpl", { + app_prefix = var.app_prefix, + role_arn = aws_iam_role.glue_role.arn, + path_secret_arn = aws_secretsmanager_secret.path_secret.arn, + secrets_arn = aws_secretsmanager_secret.secrets.arn + }) } resource "aws_iam_role_policy_attachment" "glue_attach" { @@ -52,6 +85,11 @@ resource "aws_iam_role_policy_attachment" "glue_attach" { policy_arn = aws_iam_policy.glue_policy.arn } +resource "aws_iam_role_policy_attachment" "glue_crawler_attach" { + role = aws_iam_role.glue_crawler_role.name + policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" +} + resource "aws_glue_catalog_database" "stage_database" { name = "${var.app_prefix}-stage" diff --git a/tf/modules/glue/outputs.tf b/tf/modules/glue/outputs.tf index 3c5d9fe..1da768a 100644 --- a/tf/modules/glue/outputs.tf +++ b/tf/modules/glue/outputs.tf @@ -17,3 +17,11 @@ output "etl_script_url" { output "shell_script_url" { value = "s3://${var.data_bucket}/${aws_s3_object.shell_script.id}" } + +output "path_secret_id" { + value = aws_secretsmanager_secret.path_secret.id +} + +output "secrets_id" { + value = aws_secretsmanager_secret.secrets.id +} diff --git a/tf/modules/glue/policies/crawler_policy.json.tpl b/tf/modules/glue/policies/crawler_policy.json.tpl index 77e0be7..31ced9b 100644 --- a/tf/modules/glue/policies/crawler_policy.json.tpl +++ b/tf/modules/glue/policies/crawler_policy.json.tpl @@ -8,8 +8,8 @@ "s3:PutObject" ], "Resource": [ - "arn:aws:s3:::${data_bucket}*" + "arn:aws:s3:::${data_bucket}/*" ] } ] -} \ No newline at end of file +} diff --git a/tf/modules/glue/policies/policy.json.tpl b/tf/modules/glue/policies/policy.json.tpl index 4efdbe8..f7554b6 100644 --- a/tf/modules/glue/policies/policy.json.tpl +++ b/tf/modules/glue/policies/policy.json.tpl @@ -20,7 +20,7 @@ "iam:ListRolePolicies", "iam:GetRole", "iam:GetRolePolicy", - "cloudwatch:PutMetricData" + "cloudwatch:PutMetricData" ], "Resource": [ "*" @@ -50,6 +50,16 @@ "arn:aws:s3:::*/*aws-glue-*/*" ] }, + { + "Effect": "Allow", + "Action": [ + "secretsmanager:GetSecretValue" + ], + "Resource": [ + "${path_secret_arn}", + "${secrets_arn}" + ] + }, { "Effect": "Allow", "Action": [ diff --git a/tf/modules/pipelines/austin_traffic/main.tf b/tf/modules/pipelines/austin_traffic/main.tf index e5a6eaf..9d47b1a 100644 --- a/tf/modules/pipelines/austin_traffic/main.tf +++ b/tf/modules/pipelines/austin_traffic/main.tf @@ -10,7 +10,7 @@ resource "aws_glue_job" "import_raw_traffic_json" { default_arguments = { "--TempDir" = "s3://${var.data_bucket}/tmp/", - "--additional-python-modules" = "s3://${var.data_bucket}/dist/sample_pipelines-0.1.0-py3-none-any.whl", + "--extra-py-files" = "s3://${var.data_bucket}/dist/sample_pipelines-0.1.0-py3-none-any.whl", "--class" = "GlueApp", "--enable-continuous-cloudwatch-log" = "true", "--enable-glue-datacatalog" = "true", @@ -20,7 +20,8 @@ resource "aws_glue_job" "import_raw_traffic_json" { "--job-language" = "python", "--job_function" = "import_csv", "--job_module" = "sample_pipelines.traffic.raw", - "--secret_id" = "${var.app_prefix}-data-secrets", + "--path_secret_id" = var.path_secret_id, + "--secrets_id" = var.secrets_id, "--spark-event-logs-path" = "s3://${var.data_bucket}/sparkHistoryLogs/" } } @@ -48,7 +49,8 @@ resource "aws_glue_job" "transform_traffic_raw_to_stage" { "--job-language" = "python", "--job_function" = "transform_raw", "--job_module" = "sample_pipelines.traffic.stage", - "--secret_id" = "${var.app_prefix}-data-secrets", + "--path_secret_id" = var.path_secret_id, + "--secrets_id" = var.secrets_id, "--spark-event-logs-path" = "s3://${var.data_bucket}/sparkHistoryLogs/" } } diff --git a/tf/modules/pipelines/austin_traffic/vars.tf b/tf/modules/pipelines/austin_traffic/vars.tf index d9d235b..a329a57 100644 --- a/tf/modules/pipelines/austin_traffic/vars.tf +++ b/tf/modules/pipelines/austin_traffic/vars.tf @@ -14,6 +14,14 @@ variable "shell_script_url" { type = string } +variable "path_secret_id" { + type = string +} + +variable "secrets_id" { + type = string +} + variable "glue_role" { type = object({ arn = string diff --git a/tf/modules/pipelines/main.tf b/tf/modules/pipelines/main.tf index b33dab8..87e53bc 100644 --- a/tf/modules/pipelines/main.tf +++ b/tf/modules/pipelines/main.tf @@ -18,6 +18,8 @@ module "austin_traffic_pipeline" { data_bucket = module.buckets.data_bucket etl_script_url = module.glue.etl_script_url shell_script_url = module.glue.shell_script_url + path_secret_id = module.glue.path_secret_id + secrets_id = module.glue.secrets_id glue_role = module.glue.glue_role glue_crawler_role = module.glue.glue_crawler_role stage_database = module.glue.stage_database