Skip to content

Commit

Permalink
Add path secrets and fix permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
kseebaldt committed Sep 19, 2022
1 parent 9df8687 commit 82d4530
Show file tree
Hide file tree
Showing 14 changed files with 157 additions and 45 deletions.
11 changes: 8 additions & 3 deletions sample_pipelines/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,18 @@ def shell_job():


def _get_job_function():
args = getResolvedOptions(sys.argv, ["job_module", "job_function", "secret_id"])
args = getResolvedOptions(
sys.argv, ["job_module", "job_function", "path_secret_id", "secrets_id"]
)

job_module = import_module(args["job_module"])
f = getattr(job_module, args["job_function"])
return partial(f, secrets(args["secret_id"]))
return partial(f, secrets(args["path_secret_id"]), secrets(args["secrets_id"]))


def secrets(key: str):
client = boto3.client("secretsmanager")
return json.loads(client.get_secret_value(SecretId=key)["SecretString"])
try:
return json.loads(client.get_secret_value(SecretId=key)["SecretString"])
except client.exceptions.ResourceNotFoundException:
return {}
8 changes: 4 additions & 4 deletions sample_pipelines/traffic/raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,20 @@
LIMIT = 10000000


def import_csv(secrets):
def import_csv(paths, secrets):
now_timestamp = _now()
s3 = boto3.client("s3")

where = f"traffic_report_status_date_time <= '{now_timestamp}'"
last_timestamp = _get_last_timestamp(s3, secrets["raw_path"])
last_timestamp = _get_last_timestamp(s3, paths["raw_path"])
if last_timestamp:
where = f"{where} and traffic_report_status_date_time > '{last_timestamp}'"

response = requests.get(
DATASET_URL, params={"$where": where, "$limit": LIMIT}, stream=True
)

url = f"{secrets['raw_path']}sample/austin_traffic/load_time={now_timestamp}/data.csv.gz"
url = f"{paths['raw_path']}sample/austin_traffic/load_time={now_timestamp}/data.csv.gz"

results = response.iter_lines()

Expand All @@ -36,7 +36,7 @@ def import_csv(secrets):
except StopIteration:
return None

_write_last_timestamp(s3, secrets["raw_path"], now_timestamp)
_write_last_timestamp(s3, paths["raw_path"], now_timestamp)


def _get_last_timestamp(s3, base_url):
Expand Down
6 changes: 3 additions & 3 deletions sample_pipelines/traffic/stage.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
def transform_raw(secrets, spark, glueContext):
def transform_raw(paths, secrets, spark, glueContext):
df = (
spark.read.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(f"{secrets['raw_path']}/sample/austin_traffic/")
.load(f"{paths['raw_path']}/sample/austin_traffic/")
)

df.write.format("parquet").mode("overwrite").save(
f"{secrets['stage_path']}/sample/austin_traffic/"
f"{paths['stage_path']}/sample/austin_traffic/"
)
7 changes: 0 additions & 7 deletions sample_pipelines/utils/s3.py

This file was deleted.

36 changes: 36 additions & 0 deletions tests/sample_pipelines/driver_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

import boto3
from moto import mock_secretsmanager
from pytest import fixture

from sample_pipelines.driver import secrets


@fixture(autouse=True)
def mock_secrets():
os.environ["AWS_DEFAULT_REGION"] = "us-east-1"

with mock_secretsmanager():
client = boto3.client("secretsmanager", region_name="us-east-1")
yield client


@fixture()
def paths_key(mock_secrets):
key = "my-paths"
mock_secrets.create_secret(
Name=key,
SecretString='{"raw_path": "s3://abc/123"}',
)
return key


def test_returns_parsed_secrets(paths_key):
result = secrets(paths_key)
assert result == {"raw_path": "s3://abc/123"}


def test_returns_empty_dict_when_key_is_not_found():
result = secrets("other")
assert result == {}
33 changes: 19 additions & 14 deletions tests/sample_pipelines/traffic/raw_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,52 +27,57 @@ def s3_bucket(bucket_name):


@fixture
def secrets(bucket_name):
def paths(bucket_name):
return {"raw_path": f"s3://{bucket_name}/raw/"}


@fixture
def previous_timestamp(bucket_name, secrets):
def secrets():
return {"some_secret": "abc123"}


@fixture
def previous_timestamp(bucket_name, paths):
previous_timestamp = "2022-09-06T21:30:46.717814"
s3 = boto3.client("s3")
_write_last_timestamp(s3, secrets["raw_path"], previous_timestamp)
_write_last_timestamp(s3, paths["raw_path"], previous_timestamp)
return previous_timestamp


def test_queries_all_records(secrets, now, requests_mock):
def test_queries_all_records(paths, secrets, now, requests_mock):
requests_mock.get(
DATASET_URL,
text="a,b,c",
)

import_csv(secrets)
import_csv(paths, secrets)

where = f"traffic_report_status_date_time <= '{now.lower()}'"
assert where in requests_mock.last_request.qs["$where"]


def test_queries_records_since_last_timestamp(
secrets, requests_mock, now, previous_timestamp
paths, secrets, requests_mock, now, previous_timestamp
):
requests_mock.get(
DATASET_URL,
text="a,b,c",
)

import_csv(secrets)
import_csv(paths, secrets)
where = f"traffic_report_status_date_time <= '{now.lower()}' and traffic_report_status_date_time > '{previous_timestamp.lower()}'"
assert where in requests_mock.last_request.qs["$where"]


def test_writes_nothing_if_only_header_returned(
secrets, requests_mock, now, bucket_name
paths, secrets, requests_mock, now, bucket_name
):
requests_mock.get(
DATASET_URL,
text="a,b,c",
)

import_csv(secrets)
import_csv(paths, secrets)

s3 = boto3.resource("s3")

Expand All @@ -81,29 +86,29 @@ def test_writes_nothing_if_only_header_returned(
assert keys == []


def test_writes_csv_file(secrets, requests_mock, now, bucket_name):
def test_writes_csv_file(paths, secrets, requests_mock, now, bucket_name):
rows = ["col1,col2\n", "foo,bar\n", "abc,def\n"]
requests_mock.get(
DATASET_URL,
text="".join(rows),
)

import_csv(secrets)
import_csv(paths, secrets)

url = f"{secrets['raw_path']}sample/austin_traffic/load_time={now}/data.csv.gz"
url = f"{paths['raw_path']}sample/austin_traffic/load_time={now}/data.csv.gz"

lines = list(smart_open.open(url, transport_params={"client": boto3.client("s3")}))

assert lines == rows


def test_writes_last_timestamp(secrets, requests_mock, now, bucket_name):
def test_writes_last_timestamp(paths, secrets, requests_mock, now, bucket_name):
requests_mock.get(
DATASET_URL,
text="col1,col2\nfoo,bar\nabc,def\n",
)

import_csv(secrets)
import_csv(paths, secrets)

s3 = boto3.resource("s3")

Expand Down
19 changes: 12 additions & 7 deletions tests/sample_pipelines/traffic/stage_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,22 @@


@fixture
def secrets(tmp_path, data_path):
def paths(tmp_path, data_path):
return {
"raw_path": str(data_path / "raw"),
"stage_path": str(tmp_path / "stage"),
}


def test_converts_dates(secrets, spark, glueContext):
transform_raw(secrets, spark, glueContext)
@fixture
def secrets():
return {"some_secret": "abc123"}


def test_converts_dates(paths, secrets, spark, glueContext):
transform_raw(paths, secrets, spark, glueContext)
df = spark.read.format("parquet").load(
f"{secrets['stage_path']}/sample/austin_traffic/"
f"{paths['stage_path']}/sample/austin_traffic/"
)

expected = spark.createDataFrame(
Expand Down Expand Up @@ -48,10 +53,10 @@ def test_converts_dates(secrets, spark, glueContext):
)


def test_converts_latitude_longitude(secrets, spark, glueContext):
transform_raw(secrets, spark, glueContext)
def test_converts_latitude_longitude(paths, secrets, spark, glueContext):
transform_raw(paths, secrets, spark, glueContext)
df = spark.read.format("parquet").load(
f"{secrets['stage_path']}/sample/austin_traffic/"
f"{paths['stage_path']}/sample/austin_traffic/"
)

expected = spark.createDataFrame(
Expand Down
40 changes: 39 additions & 1 deletion tf/modules/glue/main.tf
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
locals {
path_secret_value = {
raw_path = "s3://${var.data_bucket}/raw/"
stage_path = "s3://${var.data_bucket}/stage/"
analytics_path = "s3://${var.data_bucket}/analytics/"
}
}

resource "aws_secretsmanager_secret" "path_secret" {
name = "${var.app_prefix}-glue-paths"
}

resource "aws_secretsmanager_secret" "secrets" {
name = "${var.app_prefix}-glue-secrets"
}

resource "aws_secretsmanager_secret_version" "path_secret_version" {
secret_id = aws_secretsmanager_secret.path_secret.id
secret_string = jsonencode(local.path_secret_value)
}

resource "aws_iam_role" "glue_role" {
name = "${var.app_prefix}-glue"

Expand All @@ -11,6 +32,13 @@ resource "aws_iam_role" "glue_role" {
"Service": "ec2.amazonaws.com"
},
"Effect": "Allow"
},
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "glue.amazonaws.com"
},
"Effect": "Allow"
}
]
}
Expand Down Expand Up @@ -44,14 +72,24 @@ EOF
resource "aws_iam_policy" "glue_policy" {
name = "${var.app_prefix}-glue"
description = "Glue Access Policy"
policy = templatefile("${path.module}/policies/policy.json.tpl", { app_prefix = var.app_prefix, role_arn = aws_iam_role.glue_role.arn })
policy = templatefile("${path.module}/policies/policy.json.tpl", {
app_prefix = var.app_prefix,
role_arn = aws_iam_role.glue_role.arn,
path_secret_arn = aws_secretsmanager_secret.path_secret.arn,
secrets_arn = aws_secretsmanager_secret.secrets.arn
})
}

resource "aws_iam_role_policy_attachment" "glue_attach" {
role = aws_iam_role.glue_role.name
policy_arn = aws_iam_policy.glue_policy.arn
}

resource "aws_iam_role_policy_attachment" "glue_crawler_attach" {
role = aws_iam_role.glue_crawler_role.name
policy_arn = "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole"
}

resource "aws_glue_catalog_database" "stage_database" {
name = "${var.app_prefix}-stage"

Expand Down
8 changes: 8 additions & 0 deletions tf/modules/glue/outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,11 @@ output "etl_script_url" {
output "shell_script_url" {
value = "s3://${var.data_bucket}/${aws_s3_object.shell_script.id}"
}

output "path_secret_id" {
value = aws_secretsmanager_secret.path_secret.id
}

output "secrets_id" {
value = aws_secretsmanager_secret.secrets.id
}
4 changes: 2 additions & 2 deletions tf/modules/glue/policies/crawler_policy.json.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::${data_bucket}*"
"arn:aws:s3:::${data_bucket}/*"
]
}
]
}
}
12 changes: 11 additions & 1 deletion tf/modules/glue/policies/policy.json.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"iam:ListRolePolicies",
"iam:GetRole",
"iam:GetRolePolicy",
"cloudwatch:PutMetricData"
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
Expand Down Expand Up @@ -50,6 +50,16 @@
"arn:aws:s3:::*/*aws-glue-*/*"
]
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": [
"${path_secret_arn}",
"${secrets_arn}"
]
},
{
"Effect": "Allow",
"Action": [
Expand Down
Loading

0 comments on commit 82d4530

Please sign in to comment.