diff --git a/infrastructure-cdk/flink-artifact-created-hook/app.py b/infrastructure-cdk/flink-artifact-created-hook/app.py new file mode 100644 index 0000000..231709b --- /dev/null +++ b/infrastructure-cdk/flink-artifact-created-hook/app.py @@ -0,0 +1,66 @@ +import logging +import os +import time + +import boto3 +from botocore.exceptions import ClientError + +log = logging.getLogger(__name__) +log.setLevel(logging.INFO) + +BUCKET_NAME = os.environ['BUCKET_NAME'] +FILE_NAME = os.environ['FILE_NAME'] + +s3 = boto3.client('s3') + + +def on_create(event): + log.info("Got Create for %s", event) + + while True: + try: + log.info(f"Try to fetch object '{FILE_NAME}' from bucket '{BUCKET_NAME}'") + response = s3.head_object(Bucket=BUCKET_NAME, Key=FILE_NAME) + + if response['ResponseMetadata']['HTTPStatusCode'] == 200: + log.info(f"Found the object with response: {response}") + return {'PhysicalResourceId': f"Flink-Application-Created-In-{BUCKET_NAME}/{FILE_NAME}"} + + else: + log.error(f"Invalid response from S3: {response}") + raise ValueError("Invalid response from S3: ", response) + + except ClientError as e: + if e.response['ResponseMetadata']['HTTPStatusCode'] == 404: + log.info(f"Not found the object yet with response: {e}") + time.sleep(30) + + else: + log.error(f"Invalid state from S3: {e}") + raise ValueError("Invalid state from S3: ", e) + + +def on_update(event): + log.info("Got Update for %s", event["PhysicalResourceId"]) + # If the update resulted in a new resource being created, return an id for the new resource. + # CloudFormation will send a delete event with the old id when stack update completes + + +def on_delete(event): + log.info("Got Delete for %s", event["PhysicalResourceId"]) + # Delete never returns anything. Should not fail if the underlying resources are already deleted. + # Desired state. + + +def on_event(event, context): + log.info("Received event: %s", event) + + request_type = event['RequestType'] + if request_type == 'Create': + return on_create(event) + if request_type == 'Update': + return on_update(event) + if request_type == 'Delete': + return on_delete(event) + + raise Exception("Invalid request type: %s" % request_type) diff --git a/infrastructure-cdk/lib/application-stack.ts b/infrastructure-cdk/lib/application-stack.ts index 5655c3d..e59d6a3 100644 --- a/infrastructure-cdk/lib/application-stack.ts +++ b/infrastructure-cdk/lib/application-stack.ts @@ -69,5 +69,27 @@ export class ApplicationStack extends Stack { stream.grantRead(application); + // Wait for the build artifact to be initialized + const flinkArtifactCreatedHook = new Function(this, 'FlinkArtifactCreatedHook', { + runtime: Runtime.PYTHON_3_9, + code: Code.fromAsset('./infrastructure/flink-artifact-created-hook'), + handler: 'app.on_event', + architecture: Architecture.ARM_64, + timeout: Duration.minutes(15), + environment: { + BUCKET_NAME: assetBucket.bucketName, + FILE_NAME: binaryPath, + } + }); + assetBucket.grantRead(flinkArtifactCreatedHook); + + const flinkArtifactCreatedHookProvider = new Provider(this, 'FlinkArtifactCreatedHookProvider', { + onEventHandler: flinkArtifactCreatedHook + }); + const flinkArtifactCreatedHookCustomResource = new CustomResource(this, 'FlinkArtifactCreatedHookCustomResource', { + serviceToken: flinkArtifactCreatedHookProvider.serviceToken + }); + + this.application.node.addDependency(flinkArtifactCreatedHookCustomResource); } } diff --git a/infrastructure-cdk/lib/constructs/java-build-pipeline.ts b/infrastructure-cdk/lib/constructs/java-build-pipeline.ts index a7d9b59..450c739 100644 --- a/infrastructure-cdk/lib/constructs/java-build-pipeline.ts +++ b/infrastructure-cdk/lib/constructs/java-build-pipeline.ts @@ -12,7 +12,7 @@ import { import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda"; import {Aws, Duration} from "aws-cdk-lib"; import {PolicyStatement} from "aws-cdk-lib/aws-iam"; -import {SOURCE_CODE_ZIP} from "../shared-vars"; +import {SOURCE_CODE_ZIP, MANUAL_APPROVAL_REQUIRED} from "../shared-vars"; interface JavaBuildPipelineProps { @@ -106,14 +106,18 @@ export class JavaBuildPipeline extends Construct { input: buildOutput, extract: true })] - }, { - stageName: "approval", - actions: [new ManualApprovalAction({ - actionName: "Manual" - })] }] }); + if (MANUAL_APPROVAL_REQUIRED) { + this.pipeline.addStage({ + stageName: "approval", + actions: [new ManualApprovalAction({ + actionName: "Manual" + })] + }) + } + const versionUpdateFn = new Function(this, 'version-update-fn', { code: Code.fromAsset('flink-app-redeploy-hook'), handler: "app.lambda_handler", diff --git a/infrastructure-cdk/lib/constructs/python-build-pipeline.ts b/infrastructure-cdk/lib/constructs/python-build-pipeline.ts index adfb39f..7f70e43 100644 --- a/infrastructure-cdk/lib/constructs/python-build-pipeline.ts +++ b/infrastructure-cdk/lib/constructs/python-build-pipeline.ts @@ -12,7 +12,7 @@ import { import {Code, Function, Runtime} from "aws-cdk-lib/aws-lambda"; import {Aws, Duration} from "aws-cdk-lib"; import {PolicyStatement} from "aws-cdk-lib/aws-iam"; -import {SOURCE_CODE_ZIP} from "../shared-vars"; +import {SOURCE_CODE_ZIP, MANUAL_APPROVAL_REQUIRED} from "../shared-vars"; interface PythonBuildPipelineProps { @@ -107,14 +107,18 @@ export class PythonBuildPipeline extends Construct { input: buildOutput, extract: true })] - }, { - stageName: "approval", - actions: [new ManualApprovalAction({ - actionName: "Manual" - })] }] }); + if (MANUAL_APPROVAL_REQUIRED) { + this.pipeline.addStage({ + stageName: "approval", + actions: [new ManualApprovalAction({ + actionName: "Manual" + })] + }) + } + const versionUpdateFn = new Function(this, 'version-update-fn', { code: Code.fromAsset('flink-app-redeploy-hook'), handler: "app.lambda_handler", diff --git a/infrastructure-cdk/lib/shared-vars.ts b/infrastructure-cdk/lib/shared-vars.ts index 2942d06..a50371d 100644 --- a/infrastructure-cdk/lib/shared-vars.ts +++ b/infrastructure-cdk/lib/shared-vars.ts @@ -4,3 +4,4 @@ export const APPLICATION_NAME = "kinesis-analytics-application" export const BUILD_FOR_RUNTIME = ApplicationRuntime.JAVA export const SOURCE_CODE_ZIP = "automate-deployment-and-version-update-of-kda-application-main.zip" export const ASSET_BUCKET_EXPORT_NAME = "Blog::Artifact::BucketName" +export const MANUAL_APPROVAL_REQUIRED = true