From 8e37a21717657c68c3dc9d771cd779295b25c40f Mon Sep 17 00:00:00 2001 From: Enric Bisbe Gil Date: Wed, 10 Jul 2024 16:57:32 +0200 Subject: [PATCH] create INSERT ddb query for step function --- src/resources/streamPipes.ts | 48 ++++++------- src/stepFunctions/index.ts | 4 +- src/stepFunctions/itemStream.ts | 115 ++++++++++++++++++++++++++++++-- src/types/extendedSlsTypes.ts | 18 +++++ 4 files changed, 156 insertions(+), 29 deletions(-) diff --git a/src/resources/streamPipes.ts b/src/resources/streamPipes.ts index 14ad291..872ce8e 100644 --- a/src/resources/streamPipes.ts +++ b/src/resources/streamPipes.ts @@ -1,6 +1,9 @@ +import { ItemStream } from "../stepFunctions/itemStream"; import { getAttribute, getJoin, + getRef, + idHelper, ServerlessExtended, } from "../types/extendedSlsTypes"; import { WeddingTable } from "./ddb"; @@ -48,35 +51,34 @@ export const streamPipesResource: ServerlessExtended["resources"] = { ], }, }, - // { - // PolicyName: "TargetPolicy", - // PolicyDocument: { - // Version: "2012-10-17", - // Statement: [ - // { - // Effect: "Allow", - // Action: ["events:PutEvents"], - // Resource: getAttribute("EventBus", "Arn"), - // }, - // { - // Effect: "Allow", - // Action: [ - // "states:StartExecution", - // "states:StartSyncExecution", - // ], - // Resource: [ - // ], - // }, - // ], - // }, - // }, + { + PolicyName: "TargetPolicy", + PolicyDocument: { + Version: "2012-10-17", + Statement: [ + { + Effect: "Allow", + Action: ["events:PutEvents"], + Resource: getAttribute("EventBus", "Arn"), + }, + { + Effect: "Allow", + Action: [ + "states:StartExecution", + "states:StartSyncExecution", + ], + Resource: [getRef(idHelper(ItemStream))], + }, + ], + }, + }, ], }, }, LogsStreamPipe: { Type: "AWS::Pipes::Pipe", - //DependsOn: [TBD], + DependsOn: [idHelper(ItemStream)], Properties: { Description: "Pipe to connect dynamoDB stream to step function", RoleArn: getAttribute("PipeRole", "Arn"), diff --git a/src/stepFunctions/index.ts b/src/stepFunctions/index.ts index 99a9c32..0dbe8b0 100644 --- a/src/stepFunctions/index.ts +++ b/src/stepFunctions/index.ts @@ -1,5 +1,5 @@ import { merge } from "lodash"; -import { incStream } from "./itemStream"; +import { itemStream } from "./itemStream"; -export const stepFunctions = merge(incStream); +export const stepFunctions = merge(itemStream); diff --git a/src/stepFunctions/itemStream.ts b/src/stepFunctions/itemStream.ts index 2fb65c6..0010b3a 100644 --- a/src/stepFunctions/itemStream.ts +++ b/src/stepFunctions/itemStream.ts @@ -1,8 +1,115 @@ -export const incStream = { +import { WeddingTable } from "../resources/ddb"; +import { getRef, idHelper, nameHelper } from "../types/extendedSlsTypes"; + +export const ItemStream = "ItemStream"; + +export const itemStream = { stateMachines: { - incStream: { - name: "incStream", - definition: {}, + [ItemStream]: { + id: idHelper(ItemStream), + name: nameHelper(ItemStream), + type: "EXPRESS", + useExactVersion: true, + definition: { + Comment: "Update List totals ( price and total items )", + StartAt: "Check event type", + States: { + "Check event type": { + Type: "Choice", + Choices: [ + { + Variable: "$.eventName", + StringEquals: "INSERT", + Next: "Increment totalPrice and totalItems", + }, + { + Variable: "$.eventName", + StringEquals: "MODIFY", + Next: "Compute price difference", + }, + { + Variable: "$.eventName", + StringEquals: "REMOVE", + Next: "Decrease totalPrice and totalItems", + }, + ], + OutputPath: "$.dynamodb", + Default: "Fail", + }, + "Compute price difference": { + Type: "Pass", + Next: "Update totalPrice", + }, + "Increment totalPrice and totalItems": { + Type: "Task", + Resource: "arn:aws:states:::dynamodb:updateItem", + Parameters: { + TableName: getRef(WeddingTable), + Key: { + pk: { + "S.$": "States.Format('LIST#{}',$.NewImage.listId.S)", + }, + sk: { + "S.$": "States.Format('LIST#{}',$.NewImage.listId.S)", + }, + }, + UpdateExpression: + "SET totalPrice = totalPrice + :itemPrice, totalItems = totalItems + :addOne", + ExpressionAttributeValues: { + ":itemPrice": { + "N.$": "$.NewImage.price.N", + }, + ":addOne": { + N: "1", + }, + }, + }, + End: true, + }, + Fail: { + Type: "Fail", + Error: "Missing EventType", + }, + "Update totalPrice": { + Type: "Task", + Resource: "arn:aws:states:::dynamodb:updateItem", + Parameters: { + TableName: "MyDynamoDBTable", + Key: { + Column: { + S: "MyEntry", + }, + }, + UpdateExpression: "SET MyKey = :myValueRef", + ExpressionAttributeValues: { + ":myValueRef": { + S: "MyValue", + }, + }, + }, + End: true, + }, + "Decrease totalPrice and totalItems": { + Type: "Task", + Resource: "arn:aws:states:::dynamodb:updateItem", + Parameters: { + TableName: "MyDynamoDBTable", + Key: { + Column: { + S: "MyEntry", + }, + }, + UpdateExpression: "SET MyKey = :myValueRef", + ExpressionAttributeValues: { + ":myValueRef": { + S: "MyValue", + }, + }, + }, + End: true, + }, + }, + }, }, }, }; diff --git a/src/types/extendedSlsTypes.ts b/src/types/extendedSlsTypes.ts index 7e300ac..91a4eb7 100644 --- a/src/types/extendedSlsTypes.ts +++ b/src/types/extendedSlsTypes.ts @@ -21,6 +21,24 @@ type SlsResource = | Resource<"AWS::IAM::Role", IAMRoleProps> | Resource<"AWS::Pipes::Pipe", PipesPipeProps>; +/** + * Generate a state machine name with the followin pattern: service-stage-name + * @param name string + * @returns + */ +export function nameHelper(name: string) { + return `\${self:service}-\${self:provider.stage}-${name}`; +} + +/** + * Generates a logical id name similar to the original + * @param name string + * @returns + */ +export function idHelper(name: string) { + return `${name}StepFunctionsStateMachine`; +} + /** * Gets the attribute value. * @param resource The resource to get the attribute for.