Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/artifact-created-hook #5

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions infrastructure-cdk/flink-artifact-created-hook/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import logging
import os
import time

import boto3
from botocore.exceptions import ClientError

log = logging.getLogger(__name__)
log.setLevel(logging.INFO)

BUCKET_NAME = os.environ['BUCKET_NAME']
FILE_NAME = os.environ['FILE_NAME']

s3 = boto3.client('s3')


def on_create(event):
log.info("Got Create for %s", event)

while True:
try:
log.info(f"Try to fetch object '{FILE_NAME}' from bucket '{BUCKET_NAME}'")
response = s3.head_object(Bucket=BUCKET_NAME, Key=FILE_NAME)

if response['ResponseMetadata']['HTTPStatusCode'] == 200:
log.info(f"Found the object with response: {response}")
return {'PhysicalResourceId': f"Flink-Application-Created-In-{BUCKET_NAME}/{FILE_NAME}"}

else:
log.error(f"Invalid response from S3: {response}")
raise ValueError("Invalid response from S3: ", response)

except ClientError as e:
if e.response['ResponseMetadata']['HTTPStatusCode'] == 404:
log.info(f"Not found the object yet with response: {e}")
time.sleep(30)

else:
log.error(f"Invalid state from S3: {e}")
raise ValueError("Invalid state from S3: ", e)


def on_update(event):
log.info("Got Update for %s", event["PhysicalResourceId"])
# If the update resulted in a new resource being created, return an id for the new resource.
# CloudFormation will send a delete event with the old id when stack update completes


def on_delete(event):
log.info("Got Delete for %s", event["PhysicalResourceId"])
# Delete never returns anything. Should not fail if the underlying resources are already deleted.
# Desired state.


def on_event(event, context):
log.info("Received event: %s", event)

request_type = event['RequestType']
if request_type == 'Create':
return on_create(event)
if request_type == 'Update':
return on_update(event)
if request_type == 'Delete':
return on_delete(event)

raise Exception("Invalid request type: %s" % request_type)
22 changes: 22 additions & 0 deletions infrastructure-cdk/lib/application-stack.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,27 @@ export class ApplicationStack extends Stack {

stream.grantRead(application);

// Wait for the build artifact to be initialized
const flinkArtifactCreatedHook = new Function(this, 'FlinkArtifactCreatedHook', {
runtime: Runtime.PYTHON_3_9,
code: Code.fromAsset('./infrastructure/flink-artifact-created-hook'),
handler: 'app.on_event',
architecture: Architecture.ARM_64,
timeout: Duration.minutes(15),
environment: {
BUCKET_NAME: assetBucket.bucketName,
FILE_NAME: binaryPath,
}
});
assetBucket.grantRead(flinkArtifactCreatedHook);

const flinkArtifactCreatedHookProvider = new Provider(this, 'FlinkArtifactCreatedHookProvider', {
onEventHandler: flinkArtifactCreatedHook
});
const flinkArtifactCreatedHookCustomResource = new CustomResource(this, 'FlinkArtifactCreatedHookCustomResource', {
serviceToken: flinkArtifactCreatedHookProvider.serviceToken
});

this.application.node.addDependency(flinkArtifactCreatedHookCustomResource);
}
}
16 changes: 10 additions & 6 deletions infrastructure-cdk/lib/constructs/java-build-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
import {Aws, Duration} from "aws-cdk-lib";
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
import {SOURCE_CODE_ZIP} from "../shared-vars";
import {SOURCE_CODE_ZIP, MANUAL_APPROVAL_REQUIRED} from "../shared-vars";


interface JavaBuildPipelineProps {
Expand Down Expand Up @@ -106,14 +106,18 @@ export class JavaBuildPipeline extends Construct {
input: buildOutput,
extract: true
})]
}, {
stageName: "approval",
actions: [new ManualApprovalAction({
actionName: "Manual"
})]
}]
});

if (MANUAL_APPROVAL_REQUIRED) {
this.pipeline.addStage({
stageName: "approval",
actions: [new ManualApprovalAction({
actionName: "Manual"
})]
})
}

const versionUpdateFn = new Function(this, 'version-update-fn', {
code: Code.fromAsset('flink-app-redeploy-hook'),
handler: "app.lambda_handler",
Expand Down
16 changes: 10 additions & 6 deletions infrastructure-cdk/lib/constructs/python-build-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda";
import {Aws, Duration} from "aws-cdk-lib";
import {PolicyStatement} from "aws-cdk-lib/aws-iam";
import {SOURCE_CODE_ZIP} from "../shared-vars";
import {SOURCE_CODE_ZIP, MANUAL_APPROVAL_REQUIRED} from "../shared-vars";


interface PythonBuildPipelineProps {
Expand Down Expand Up @@ -107,14 +107,18 @@ export class PythonBuildPipeline extends Construct {
input: buildOutput,
extract: true
})]
}, {
stageName: "approval",
actions: [new ManualApprovalAction({
actionName: "Manual"
})]
}]
});

if (MANUAL_APPROVAL_REQUIRED) {
this.pipeline.addStage({
stageName: "approval",
actions: [new ManualApprovalAction({
actionName: "Manual"
})]
})
}

const versionUpdateFn = new Function(this, 'version-update-fn', {
code: Code.fromAsset('flink-app-redeploy-hook'),
handler: "app.lambda_handler",
Expand Down
1 change: 1 addition & 0 deletions infrastructure-cdk/lib/shared-vars.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ export const APPLICATION_NAME = "kinesis-analytics-application"
export const BUILD_FOR_RUNTIME = ApplicationRuntime.JAVA
export const SOURCE_CODE_ZIP = "automate-deployment-and-version-update-of-kda-application-main.zip"
export const ASSET_BUCKET_EXPORT_NAME = "Blog::Artifact::BucketName"
export const MANUAL_APPROVAL_REQUIRED = true