From 14fac1ed3057f69d49ae2fb254e80507a9da98cb Mon Sep 17 00:00:00 2001 From: Matthew Heroux Date: Mon, 6 Nov 2023 07:19:32 -0600 Subject: [PATCH] feat: data lake (#504) * feat: data lake Signed-off-by: hxtree * fix: readme syntax Signed-off-by: hxtree --------- Signed-off-by: hxtree --- README.md | 11 +- platform/constructs/src/queue/README.md | 9 + platform/message-bus/README.md | 20 +- .../message-bus.stack.test.ts.snap | 240 ++++++++++++++++++ .../message-bus/stacks/message-bus.stack.ts | 103 +++++--- 5 files changed, 343 insertions(+), 40 deletions(-) create mode 100644 platform/constructs/src/queue/README.md diff --git a/README.md b/README.md index edb30827b..59ab28cd6 100644 --- a/README.md +++ b/README.md @@ -37,13 +37,12 @@ Alternatively, build, test, and deploy apps locally using the git checkout -b feature/improve-readme ``` -2. Work on changes (e.g. fix a bug or add a new feature). Build, lint, and unit - test projects. +2. Work on changes (e.g. fix a bug or add a new feature). Test-driven + development encouraged. - ```bashProd Environment - rush build - rush linthttps://github.com/hxtree/cats-cradle - rush test + ```bash + rushx dev + rushx test ``` 3. Stage and commit changes using diff --git a/platform/constructs/src/queue/README.md b/platform/constructs/src/queue/README.md new file mode 100644 index 000000000..82902edde --- /dev/null +++ b/platform/constructs/src/queue/README.md @@ -0,0 +1,9 @@ +# TODO make a construct for turn key pub sub, e.g + +``` +import { Queue } from 'aws-cdk-lib/aws-sqs'; +new Queue(this, 'EmailSendCommand', { queueName: +EmailSendCommand.topicName }); +``` + +Queues should be deployed with the service they run on Make turn-key DLE diff --git a/platform/message-bus/README.md b/platform/message-bus/README.md index 3bf8eae55..cb5531cf9 100644 --- a/platform/message-bus/README.md +++ b/platform/message-bus/README.md @@ -3,9 +3,23 @@ ![TypeScript](https://shields.io/badge/TypeScript-3178C6?logo=TypeScript&logoColor=FFF&style=flat-square) ![Lifecycle](https://img.shields.io/badge/lifecycle-stable-brightgreen) -This service deploys SNS topics from all defined message schemas. It ensures -that message sent to a topic gets stored in a S3 data lake. +This service deploys a SNS topic for every registered +`@cats-cradle/message-schema`. Other services can then subscribe to these +topics. + +## Data Lake + +Message sent to any topic also get stored in a S3 data lake. This works using a +Kinesis Firehose. + +Messages are saved in the `123456789-default-data-lake-bucket` in a structured +manner for analysis. For example: + +> messages/2023/11/06/04/test-firehose-delivery-stream-2-2023-11-06-04-28-20-be72f66c-f0e7-4812-9b1d-064cee498d23 ## References -- [structure data lake for analysis](https://www.youtube.com/watch?v=4xjckHvapFk) +- [Event-Driven Architecture.](https://aws.amazon.com/event-driven-architecture/) +- [Pub/sub](https://cloud.google.com/pubsub/docs/overview) +- [Structure Data Lake for Analysis](https://www.youtube.com/watch?v=4xjckHvapFk) +- [SNS to Kinesis Data Firehouse to S3](https://docs.aws.amazon.com/sns/latest/dg/sns-firehose-as-subscriber.html) diff --git a/platform/message-bus/stacks/__snapshots__/message-bus.stack.test.ts.snap b/platform/message-bus/stacks/__snapshots__/message-bus.stack.test.ts.snap index 0a3ad08c3..7ef6f3aa5 100644 --- a/platform/message-bus/stacks/__snapshots__/message-bus.stack.test.ts.snap +++ b/platform/message-bus/stacks/__snapshots__/message-bus.stack.test.ts.snap @@ -18,6 +18,103 @@ exports[`MessageBusStack should match snapshot test 1`] = ` }, Type: AWS::SNS::Topic, }, + DataLakeFirehoseDeliveryStream: { + Properties: { + DeliveryStreamName: test-firehose-delivery-stream, + DeliveryStreamType: DirectPut, + S3DestinationConfiguration: { + BucketARN: { + Fn::GetAtt: [ + defaultdatalake9656A812, + Arn, + ], + }, + BufferingHints: { + IntervalInSeconds: 60, + SizeInMBs: 1, + }, + CompressionFormat: UNCOMPRESSED, + EncryptionConfiguration: { + NoEncryptionConfig: NoEncryption, + }, + ErrorOutputPrefix: messages-logs/, + Prefix: messages/, + RoleARN: { + Fn::GetAtt: [ + KinesisFirehoseRole0891766E, + Arn, + ], + }, + }, + }, + Type: AWS::KinesisFirehose::DeliveryStream, + }, + DataLakeFirehoseSubscriptionForCharacterLevelUpEvent36356735: { + Properties: { + Endpoint: { + Fn::GetAtt: [ + DataLakeFirehoseDeliveryStream, + Arn, + ], + }, + Protocol: firehose, + RawMessageDelivery: true, + SubscriptionRoleArn: { + Fn::GetAtt: [ + SnsRoleToPutMessagesInFirehose73474B02, + Arn, + ], + }, + TopicArn: { + Ref: CharacterLevelUpEventEE3941E0, + }, + }, + Type: AWS::SNS::Subscription, + }, + DataLakeFirehoseSubscriptionForEmailSendCommand6B92740C: { + Properties: { + Endpoint: { + Fn::GetAtt: [ + DataLakeFirehoseDeliveryStream, + Arn, + ], + }, + Protocol: firehose, + RawMessageDelivery: true, + SubscriptionRoleArn: { + Fn::GetAtt: [ + SnsRoleToPutMessagesInFirehose73474B02, + Arn, + ], + }, + TopicArn: { + Ref: EmailSendCommand52EDF753, + }, + }, + Type: AWS::SNS::Subscription, + }, + DataLakeFirehoseSubscriptionForItemDestroyCommand1F71CBDC: { + Properties: { + Endpoint: { + Fn::GetAtt: [ + DataLakeFirehoseDeliveryStream, + Arn, + ], + }, + Protocol: firehose, + RawMessageDelivery: true, + SubscriptionRoleArn: { + Fn::GetAtt: [ + SnsRoleToPutMessagesInFirehose73474B02, + Arn, + ], + }, + TopicArn: { + Ref: ItemDestroyCommand739F092B, + }, + }, + Type: AWS::SNS::Subscription, + }, EmailSendCommand52EDF753: { Properties: { DisplayName: DefaultEmailSendCommandTopic, @@ -34,6 +131,149 @@ exports[`MessageBusStack should match snapshot test 1`] = ` }, Type: AWS::SNS::Topic, }, + KinesisFirehoseRole0891766E: { + Properties: { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: sts:AssumeRole, + Effect: Allow, + Principal: { + Service: firehose.amazonaws.com, + }, + }, + ], + Version: 2012-10-17, + }, + }, + Type: AWS::IAM::Role, + }, + KinesisFirehoseRoleDefaultPolicy9DF4ED0B: { + Properties: { + PolicyDocument: { + Statement: [ + { + Action: [ + s3:GetObject*, + s3:GetBucket*, + s3:List*, + s3:DeleteObject*, + s3:PutObject, + s3:PutObjectLegalHold, + s3:PutObjectRetention, + s3:PutObjectTagging, + s3:PutObjectVersionTagging, + s3:Abort*, + ], + Effect: Allow, + Resource: [ + { + Fn::GetAtt: [ + defaultdatalake9656A812, + Arn, + ], + }, + { + Fn::Join: [ + , + [ + { + Fn::GetAtt: [ + defaultdatalake9656A812, + Arn, + ], + }, + /*, + ], + ], + }, + ], + }, + ], + Version: 2012-10-17, + }, + PolicyName: KinesisFirehoseRoleDefaultPolicy9DF4ED0B, + Roles: [ + { + Ref: KinesisFirehoseRole0891766E, + }, + ], + }, + Type: AWS::IAM::Policy, + }, + SnsRoleToPutMessagesInFirehose73474B02: { + Properties: { + AssumeRolePolicyDocument: { + Statement: [ + { + Action: sts:AssumeRole, + Effect: Allow, + Principal: { + Service: sns.amazonaws.com, + }, + }, + ], + Version: 2012-10-17, + }, + ManagedPolicyArns: [ + { + Fn::Join: [ + , + [ + arn:, + { + Ref: AWS::Partition, + }, + :iam::aws:policy/service-role/AmazonSNSRole, + ], + ], + }, + ], + }, + Type: AWS::IAM::Role, + }, + SnsRoleToPutMessagesInFirehoseDefaultPolicyECC99DF6: { + Properties: { + PolicyDocument: { + Statement: [ + { + Action: [ + firehose:DescribeDeliveryStream, + firehose:ListDeliveryStreams, + firehose:ListTagsForDeliveryStream, + firehose:PutRecord, + firehose:PutRecordBatch, + ], + Effect: Allow, + Resource: { + Fn::Join: [ + , + [ + arn:aws:firehose:, + { + Ref: AWS::Region, + }, + :, + { + Ref: AWS::AccountId, + }, + :deliverystream/*, + ], + ], + }, + }, + ], + Version: 2012-10-17, + }, + PolicyName: SnsRoleToPutMessagesInFirehoseDefaultPolicyECC99DF6, + Roles: [ + { + Ref: SnsRoleToPutMessagesInFirehose73474B02, + }, + ], + }, + Type: AWS::IAM::Policy, + }, defaultdatalake9656A812: { DeletionPolicy: Retain, Properties: { diff --git a/platform/message-bus/stacks/message-bus.stack.ts b/platform/message-bus/stacks/message-bus.stack.ts index 925d44482..4ab9fe39b 100644 --- a/platform/message-bus/stacks/message-bus.stack.ts +++ b/platform/message-bus/stacks/message-bus.stack.ts @@ -1,17 +1,18 @@ +import { messageRegistry } from '@cats-cradle/messaging-schemas'; import { Construct } from 'constructs'; import { StackProps, RemovalPolicy } from 'aws-cdk-lib'; import * as s3 from 'aws-cdk-lib/aws-s3'; import * as cdk from 'aws-cdk-lib'; import * as sns from 'aws-cdk-lib/aws-sns'; import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose'; -import * as subs from 'aws-cdk-lib/aws-sns-subscriptions'; -import { messageRegistry } from '@cats-cradle/messaging-schemas'; +import * as iam from 'aws-cdk-lib/aws-iam'; export class MessageBusStack extends cdk.Stack { constructor(scope: Construct, id: string, props?: StackProps) { super(scope, id, props); const awsAccountId = cdk.Stack.of(this).account; + const awsAccountRegion = cdk.Stack.of(this).region; const stageName = process.env.STAGE_NAME ?? 'default'; // provision a S3 bucket to contain a record of every message @@ -26,17 +27,64 @@ export class MessageBusStack extends cdk.Stack { }); // Create a Kinesis Data Firehose delivery stream - // const firehoseStream = new firehose.CfnDeliveryStream( - // this, - // 'DataLakeFirehoseDeliveryStream', - // { - // deliveryStreamType: 'DirectPut', - // s3DestinationConfiguration: { - // bucketArn: dataLakeBucket.bucketArn, - // roleArn: 'ARN_OF_FIREHOSE_ROLE', // Replace with the ARN of the role for Firehose - // }, - // }, - // ); + // https://github.com/aws/aws-cdk/issues/14391 + const snsRoleToPutMessagesInFirehose = new iam.Role( + this, + 'SnsRoleToPutMessagesInFirehose', + { + assumedBy: new iam.ServicePrincipal('sns.amazonaws.com'), + managedPolicies: [ + iam.ManagedPolicy.fromAwsManagedPolicyName( + 'service-role/AmazonSNSRole', + ), + ], + }, + ); + snsRoleToPutMessagesInFirehose.addToPolicy( + new iam.PolicyStatement({ + actions: [ + 'firehose:DescribeDeliveryStream', + 'firehose:ListDeliveryStreams', + 'firehose:ListTagsForDeliveryStream', + 'firehose:PutRecord', + 'firehose:PutRecordBatch', + ], + resources: [ + `arn:aws:firehose:${awsAccountRegion}:${awsAccountId}:deliverystream/*`, + ], + }), + ); + + const kinesisFirehoseRole = new iam.Role(this, 'KinesisFirehoseRole', { + assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'), + }); + + // Attach a policy that allows writing to the S3 bucket + dataLakeBucket.grantReadWrite(kinesisFirehoseRole); + + // outputs file as`${projectFolder}${year}/${month}/${day}/${folder}/${file}` + const firehoseStream = new firehose.CfnDeliveryStream( + this, + 'DataLakeFirehoseDeliveryStream', + { + deliveryStreamName: 'test-firehose-delivery-stream', + deliveryStreamType: 'DirectPut', + s3DestinationConfiguration: { + bucketArn: dataLakeBucket.bucketArn, + roleArn: kinesisFirehoseRole.roleArn, + bufferingHints: { + sizeInMBs: cdk.Size.mebibytes(1).toMebibytes(), + intervalInSeconds: cdk.Duration.seconds(60).toSeconds(), + }, + compressionFormat: 'UNCOMPRESSED', + encryptionConfiguration: { + noEncryptionConfig: 'NoEncryption', + }, + prefix: 'messages/', + errorOutputPrefix: 'messages-logs/', + }, + }, + ); // create a topic for every type of message for (const [className, classReference] of messageRegistry @@ -50,24 +98,17 @@ export class MessageBusStack extends cdk.Stack { fifo: false, }); - // snsTopic.addSubscription({ - // protocol: sns.SubscriptionProtocol.FIREHOSE, - // topic: snsTopic, - // subscriberId: 'FirehoseSubscription', - // endpoint: firehoseStream.attrArn, // Use the ARN of the Firehose stream - // }); + new sns.Subscription( + this, + `DataLakeFirehoseSubscriptionFor${className}`, + { + protocol: sns.SubscriptionProtocol.FIREHOSE, + topic: snsTopic, + rawMessageDelivery: true, + endpoint: firehoseStream.attrArn, + subscriptionRoleArn: snsRoleToPutMessagesInFirehose.roleArn, + }, + ); } - - // const file = `${projectFolder}${year}/${month}/${day}/${folder}/${file}` - - // TODO add firehose to save message to data lake - - // Subscribe SNS topic to the Firehose stream - - // TODO make a construct for turn key pub sub, e.g. - // import { Queue } from 'aws-cdk-lib/aws-sqs'; - // new Queue(this, 'EmailSendCommand', { queueName: EmailSendCommand.topicName }); - // Queues should be deployed with the service they run on - // Make turn-key DLE } }