Skip to content

Commit

Permalink
create INSERT ddb query for step function
Browse files Browse the repository at this point in the history
  • Loading branch information
ebisbe committed Jul 10, 2024
1 parent 98f03fa commit 8e37a21
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 29 deletions.
48 changes: 25 additions & 23 deletions src/resources/streamPipes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import { ItemStream } from "../stepFunctions/itemStream";
import {
getAttribute,
getJoin,
getRef,
idHelper,
ServerlessExtended,
} from "../types/extendedSlsTypes";
import { WeddingTable } from "./ddb";
Expand Down Expand Up @@ -48,35 +51,34 @@ export const streamPipesResource: ServerlessExtended["resources"] = {
],
},
},
// {
// PolicyName: "TargetPolicy",
// PolicyDocument: {
// Version: "2012-10-17",
// Statement: [
// {
// Effect: "Allow",
// Action: ["events:PutEvents"],
// Resource: getAttribute("EventBus", "Arn"),
// },
// {
// Effect: "Allow",
// Action: [
// "states:StartExecution",
// "states:StartSyncExecution",
// ],
// Resource: [
// ],
// },
// ],
// },
// },
{
PolicyName: "TargetPolicy",
PolicyDocument: {
Version: "2012-10-17",
Statement: [
{
Effect: "Allow",
Action: ["events:PutEvents"],
Resource: getAttribute("EventBus", "Arn"),
},
{
Effect: "Allow",
Action: [
"states:StartExecution",
"states:StartSyncExecution",
],
Resource: [getRef(idHelper(ItemStream))],
},
],
},
},
],
},
},

LogsStreamPipe: {
Type: "AWS::Pipes::Pipe",
//DependsOn: [TBD],
DependsOn: [idHelper(ItemStream)],
Properties: {
Description: "Pipe to connect dynamoDB stream to step function",
RoleArn: getAttribute("PipeRole", "Arn"),
Expand Down
4 changes: 2 additions & 2 deletions src/stepFunctions/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { merge } from "lodash";

import { incStream } from "./itemStream";
import { itemStream } from "./itemStream";

export const stepFunctions = merge(incStream);
export const stepFunctions = merge(itemStream);
115 changes: 111 additions & 4 deletions src/stepFunctions/itemStream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,115 @@
export const incStream = {
import { WeddingTable } from "../resources/ddb";
import { getRef, idHelper, nameHelper } from "../types/extendedSlsTypes";

export const ItemStream = "ItemStream";

export const itemStream = {
stateMachines: {
incStream: {
name: "incStream",
definition: {},
[ItemStream]: {
id: idHelper(ItemStream),
name: nameHelper(ItemStream),
type: "EXPRESS",
useExactVersion: true,
definition: {
Comment: "Update List totals ( price and total items )",
StartAt: "Check event type",
States: {
"Check event type": {
Type: "Choice",
Choices: [
{
Variable: "$.eventName",
StringEquals: "INSERT",
Next: "Increment totalPrice and totalItems",
},
{
Variable: "$.eventName",
StringEquals: "MODIFY",
Next: "Compute price difference",
},
{
Variable: "$.eventName",
StringEquals: "REMOVE",
Next: "Decrease totalPrice and totalItems",
},
],
OutputPath: "$.dynamodb",
Default: "Fail",
},
"Compute price difference": {
Type: "Pass",
Next: "Update totalPrice",
},
"Increment totalPrice and totalItems": {
Type: "Task",
Resource: "arn:aws:states:::dynamodb:updateItem",
Parameters: {
TableName: getRef(WeddingTable),
Key: {
pk: {
"S.$": "States.Format('LIST#{}',$.NewImage.listId.S)",
},
sk: {
"S.$": "States.Format('LIST#{}',$.NewImage.listId.S)",
},
},
UpdateExpression:
"SET totalPrice = totalPrice + :itemPrice, totalItems = totalItems + :addOne",
ExpressionAttributeValues: {
":itemPrice": {
"N.$": "$.NewImage.price.N",
},
":addOne": {
N: "1",
},
},
},
End: true,
},
Fail: {
Type: "Fail",
Error: "Missing EventType",
},
"Update totalPrice": {
Type: "Task",
Resource: "arn:aws:states:::dynamodb:updateItem",
Parameters: {
TableName: "MyDynamoDBTable",
Key: {
Column: {
S: "MyEntry",
},
},
UpdateExpression: "SET MyKey = :myValueRef",
ExpressionAttributeValues: {
":myValueRef": {
S: "MyValue",
},
},
},
End: true,
},
"Decrease totalPrice and totalItems": {
Type: "Task",
Resource: "arn:aws:states:::dynamodb:updateItem",
Parameters: {
TableName: "MyDynamoDBTable",
Key: {
Column: {
S: "MyEntry",
},
},
UpdateExpression: "SET MyKey = :myValueRef",
ExpressionAttributeValues: {
":myValueRef": {
S: "MyValue",
},
},
},
End: true,
},
},
},
},
},
};
18 changes: 18 additions & 0 deletions src/types/extendedSlsTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ type SlsResource =
| Resource<"AWS::IAM::Role", IAMRoleProps>
| Resource<"AWS::Pipes::Pipe", PipesPipeProps>;

/**
* Generate a state machine name with the followin pattern: service-stage-name
* @param name string
* @returns
*/
export function nameHelper(name: string) {
return `\${self:service}-\${self:provider.stage}-${name}`;
}

/**
* Generates a logical id name similar to the original
* @param name string
* @returns
*/
export function idHelper(name: string) {
return `${name}StepFunctionsStateMachine`;
}

/**
* Gets the attribute value.
* @param resource The resource to get the attribute for.
Expand Down

0 comments on commit 8e37a21

Please sign in to comment.