diff --git a/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts b/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts index eccb2392c..b9503bd52 100644 --- a/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/migration-assistance-stack.ts @@ -7,6 +7,7 @@ import {Cluster} from "aws-cdk-lib/aws-ecs"; import {StackPropsExt} from "./stack-composer"; import {LogGroup, RetentionDays} from "aws-cdk-lib/aws-logs"; import {NamespaceType} from "aws-cdk-lib/aws-servicediscovery"; +import {StringParameter} from "aws-cdk-lib/aws-ssm"; export interface migrationStackProps extends StackPropsExt { readonly vpc: IVpc, @@ -99,6 +100,11 @@ export class MigrationAssistanceStack extends Stack { } }); this.mskARN = mskCluster.attrArn + new StringParameter(this, 'SSMParameterMSKARN', { + description: 'OpenSearch Migration Parameter for MSK ARN', + parameterName: `/migration/${props.stage}/msk/cluster/arn`, + stringValue: mskCluster.attrArn + }); const comparatorSQLiteSG = new SecurityGroup(this, 'comparatorSQLiteSG', { vpc: props.vpc, @@ -132,6 +138,7 @@ export class MigrationAssistanceStack extends Stack { allowAllOutbound: true, }) this.serviceConnectSecurityGroup.addIngressRule(replayerOutputSG, Port.allTraffic()); + this.ecsCluster = new Cluster(this, 'migrationECSCluster', { vpc: props.vpc, diff --git a/deployment/cdk/opensearch-service-migration/lib/msk-utility-stack.ts b/deployment/cdk/opensearch-service-migration/lib/msk-utility-stack.ts index 0bfef970f..d7c7a53b4 100644 --- a/deployment/cdk/opensearch-service-migration/lib/msk-utility-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/msk-utility-stack.ts @@ -7,6 +7,7 @@ import {NodejsFunction} from "aws-cdk-lib/aws-lambda-nodejs"; import {Runtime} from "aws-cdk-lib/aws-lambda"; import {AwsCustomResource, AwsCustomResourcePolicy, PhysicalResourceId, Provider} from "aws-cdk-lib/custom-resources"; import * as path from "path"; +import {StringParameter} from "aws-cdk-lib/aws-ssm"; export interface mskUtilityStackProps extends StackPropsExt { readonly vpc: IVpc, @@ -94,15 +95,20 @@ export class MSKUtilityStack extends Stack { handle: wcHandle.ref }) waitCondition.node.addDependency(customResource); - // TODO handle public case + // TODO Need to add setting SSM parameter for public endpoint case brokerEndpointsOutput = waitCondition.attrData.toString() } else { const mskGetBrokersCustomResource = getBrokersCustomResource(this, props.vpc, props.mskARN) const brokerEndpoints = mskGetBrokersCustomResource.getResponseField("BootstrapBrokerStringSaslIam") + //const brokerEndpoints = mskGetBrokersCustomResource.getResponseField("BootstrapBrokerStringPublicSaslIam") brokerEndpointsOutput = `export MIGRATION_KAFKA_BROKER_ENDPOINTS=${brokerEndpoints}` - //brokerEndpointsOutput = mskGetBrokersCustomResource.getResponseField("BootstrapBrokerStringPublicSaslIam") this.mskBrokerEndpoints = brokerEndpoints + new StringParameter(this, 'SSMParameterMSKBrokers', { + description: 'OpenSearch Migration Parameter for MSK Brokers', + parameterName: `/migration/${props.stage}/msk/cluster/brokers`, + stringValue: brokerEndpoints + }); } const cfnOutput = new CfnOutput(this, 'CopilotBrokerEndpointsExport', { diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts index 3ac7032b9..d90288cdf 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/capture-proxy-es-stack.ts @@ -6,13 +6,12 @@ import {join} from "path"; import {MigrationServiceCore} from "./migration-service-core"; import {Effect, PolicyStatement} from "aws-cdk-lib/aws-iam"; import {ServiceConnectService} from "aws-cdk-lib/aws-ecs/lib/base/base-service"; +import {StringParameter} from "aws-cdk-lib/aws-ssm"; export interface CaptureProxyESProps extends StackPropsExt { readonly vpc: IVpc, readonly ecsCluster: Cluster, - readonly mskBrokerEndpoints: string, - readonly mskClusterArn: string, readonly serviceConnectSecurityGroup: ISecurityGroup readonly additionalServiceSecurityGroups?: ISecurityGroup[] } @@ -20,34 +19,37 @@ export interface CaptureProxyESProps extends StackPropsExt { export class CaptureProxyESStack extends MigrationServiceCore { constructor(scope: Construct, id: string, props: CaptureProxyESProps) { + super(scope, id, props) let securityGroups = [props.serviceConnectSecurityGroup] if (props.additionalServiceSecurityGroups) { securityGroups = securityGroups.concat(props.additionalServiceSecurityGroups) } const servicePort: PortMapping = { - name: "service-port-mapping", + name: "capture-proxy-es-connect", hostPort: 9200, containerPort: 9200, protocol: Protocol.TCP } const serviceConnectService: ServiceConnectService = { - portMappingName: "service-port-mapping", - //dnsName: "capture-proxy-es", + portMappingName: "capture-proxy-es-connect", + dnsName: "capture-proxy-es", port: 9200 } + const mskClusterARN = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/msk/cluster/arn`); const mskClusterConnectPolicy = new PolicyStatement({ effect: Effect.ALLOW, - resources: [props.mskClusterArn], + resources: [mskClusterARN], actions: [ "kafka-cluster:Connect" ] }) - let mskClusterAllTopicArn = props.mskClusterArn.replace(":cluster", ":topic").concat("/*") + // Ideally we should have something like this, but this is actually working on a token value: + // let mskClusterAllTopicArn = mskClusterARN.replace(":cluster", ":topic").concat("/*") const mskTopicProducerPolicy = new PolicyStatement({ effect: Effect.ALLOW, - resources: [mskClusterAllTopicArn], + resources: ["*"], actions: [ "kafka-cluster:CreateTopic", "kafka-cluster:DescribeTopic", @@ -55,10 +57,11 @@ export class CaptureProxyESStack extends MigrationServiceCore { ] }) - super(scope, id, { + const brokerEndpoints = StringParameter.valueForStringParameter(this, `/migration/${props.stage}/msk/cluster/brokers`); + this.createService({ serviceName: "capture-proxy-es", dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/build/docker/trafficCaptureProxyServer"), - dockerImageCommand: [`/bin/sh -c '/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --kafkaConnection ${props.mskBrokerEndpoints} --enableMSKAuth --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml & wait -n 1'`], + dockerImageCommand: ['/bin/sh', '-c', `/usr/local/bin/docker-entrypoint.sh eswrapper & /runJavaWithClasspath.sh org.opensearch.migrations.trafficcapture.proxyserver.Main --kafkaConnection ${brokerEndpoints} --enableMSKAuth --destinationUri https://localhost:19200 --insecureDestination --listenPort 9200 --sslConfigFile /usr/share/elasticsearch/config/proxy_tls.yml & wait -n 1`], securityGroups: securityGroups, taskRolePolicies: [mskClusterConnectPolicy, mskTopicProducerPolicy], portMappings: [servicePort], diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts index 8e868416f..eb6af12c2 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-console-stack.ts @@ -22,6 +22,7 @@ export interface MigrationConsoleProps extends StackPropsExt { export class MigrationConsoleStack extends MigrationServiceCore { constructor(scope: Construct, id: string, props: MigrationConsoleProps) { + super(scope, id, props) let securityGroups = [props.serviceConnectSecurityGroup] if (props.additionalServiceSecurityGroups) { securityGroups = securityGroups.concat(props.additionalServiceSecurityGroups) @@ -31,8 +32,9 @@ export class MigrationConsoleStack extends MigrationServiceCore { const replayerOutputEFSVolume: Volume = { name: volumeName, efsVolumeConfiguration: { - fileSystemId: props.replayerOutputFileSystemId - }, + fileSystemId: props.replayerOutputFileSystemId, + transitEncryption: "ENABLED" + } }; const replayerOutputMountPoint: MountPoint = { @@ -41,7 +43,7 @@ export class MigrationConsoleStack extends MigrationServiceCore { sourceVolume: volumeName } - super(scope, id, { + this.createService({ serviceName: "migration-console", dockerFilePath: join(__dirname, "../../../../../", "TrafficCapture/dockerSolution/src/main/docker/migrationConsole"), securityGroups: securityGroups, diff --git a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts index b7f5e81d0..391ff22ad 100644 --- a/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts +++ b/deployment/cdk/opensearch-service-migration/lib/service-stacks/migration-service-core.ts @@ -40,9 +40,11 @@ export interface MigrationServiceCoreProps extends StackPropsExt { export class MigrationServiceCore extends Stack { - constructor(scope: Construct, id: string, props: MigrationServiceCoreProps) { + constructor(scope: Construct, id: string, props: StackPropsExt) { super(scope, id, props); + } + createService(props: MigrationServiceCoreProps) { const serviceTaskRole = new Role(this, 'ServiceTaskRole', { assumedBy: new ServicePrincipal('ecs-tasks.amazonaws.com'), description: 'ECS Service Task Role' @@ -65,6 +67,7 @@ export class MigrationServiceCore extends Stack { props.taskRolePolicies?.forEach(policy => serviceTaskRole.addToPolicy(policy)) const serviceTaskDef = new FargateTaskDefinition(this, "ServiceTaskDef", { + family: `migration-${props.stage}-${props.serviceName}`, memoryLimitMiB: props.taskMemoryLimitMiB ? props.taskMemoryLimitMiB : 1024, cpu: props.taskCpuUnits ? props.taskCpuUnits : 256, taskRole: serviceTaskRole @@ -80,7 +83,7 @@ export class MigrationServiceCore extends Stack { const serviceLogGroup = new LogGroup(this, 'ServiceLogGroup', { retention: RetentionDays.ONE_MONTH, removalPolicy: RemovalPolicy.DESTROY, - logGroupName: `/migration/${props.stage}-${props.serviceName}` + logGroupName: `/migration/${props.stage}/${props.serviceName}` }); const serviceContainer = serviceTaskDef.addContainer("ServiceContainer", { diff --git a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts index cb7590e41..43573cb4b 100644 --- a/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts +++ b/deployment/cdk/opensearch-service-migration/lib/stack-composer.ts @@ -62,6 +62,8 @@ export class StackComposer { const mskARN = getContextForType('mskARN', 'string') const mskEnablePublicEndpoints = getContextForType('mskEnablePublicEndpoints', 'boolean') const mskBrokerNodeCount = getContextForType('mskBrokerNodeCount', 'number') + const captureProxyESEnabled = getContextForType('captureProxyESEnabled', 'boolean') + const migrationConsoleEnabled = getContextForType('migrationConsoleEnabled', 'boolean') const sourceClusterEndpoint = getContextForType('sourceClusterEndpoint', 'string') const historicalCaptureEnabled = getContextForType('historicalCaptureEnabled', 'boolean') const logstashConfigFilePath = getContextForType('logstashConfigFilePath', 'string') @@ -167,8 +169,10 @@ export class StackComposer { this.stacks.push(opensearchStack) // Currently, placing a requirement on a VPC for a migration stack but this can be revisited + let migrationStack + let mskUtilityStack if (migrationAssistanceEnabled && networkStack) { - const migrationStack = new MigrationAssistanceStack(scope, "migrationAssistanceStack", { + migrationStack = new MigrationAssistanceStack(scope, "migrationAssistanceStack", { vpc: networkStack.vpc, mskImportARN: mskARN, mskEnablePublicEndpoints: mskEnablePublicEndpoints, @@ -179,7 +183,7 @@ export class StackComposer { }) this.stacks.push(migrationStack) - const mskUtilityStack = new MSKUtilityStack(scope, 'mskUtilityStack', { + mskUtilityStack = new MSKUtilityStack(scope, 'mskUtilityStack', { vpc: networkStack.vpc, mskARN: migrationStack.mskARN, mskEnablePublicEndpoints: mskEnablePublicEndpoints, @@ -189,33 +193,47 @@ export class StackComposer { }) mskUtilityStack.addDependency(migrationStack) this.stacks.push(mskUtilityStack) + } - const migrationConsoleStack = new MigrationConsoleStack(scope, "migrationConsole", { + let captureProxyESStack + if (captureProxyESEnabled && networkStack && migrationStack && mskUtilityStack) { + captureProxyESStack = new CaptureProxyESStack(scope, "capture-proxy-es", { vpc: networkStack.vpc, ecsCluster: migrationStack.ecsCluster, - replayerOutputFileSystemId: migrationStack.replayerOutputFileSystemId, - migrationDomainEndpoint: opensearchStack.domainEndpoint, serviceConnectSecurityGroup: migrationStack.serviceConnectSecurityGroup, - additionalServiceSecurityGroups: [networkStack.defaultDomainAccessSecurityGroup, migrationStack.replayerOutputAccessSecurityGroup], - stackName: `OSMigrations-${stage}-${region}-MigrationConsole`, - description: "This stack contains resources for the Migration Console ECS service", + additionalServiceSecurityGroups: [migrationStack.mskAccessSecurityGroup], + stackName: `OSMigrations-${stage}-${region}-CaptureProxyES`, + description: "This stack contains resources for the Capture Proxy/Elasticsearch ECS service", ...props, }) - this.stacks.push(migrationConsoleStack) + captureProxyESStack.addDependency(mskUtilityStack) + captureProxyESStack.addDependency(migrationStack) + captureProxyESStack.addDependency(networkStack) + this.stacks.push(captureProxyESStack) + } - const captureProxyESStack = new CaptureProxyESStack(scope, "captureProxyES", { + let migrationConsoleStack + if (migrationConsoleEnabled && networkStack && opensearchStack && migrationStack) { + migrationConsoleStack = new MigrationConsoleStack(scope, "migration-console", { vpc: networkStack.vpc, ecsCluster: migrationStack.ecsCluster, - mskBrokerEndpoints: mskUtilityStack.mskBrokerEndpoints, - mskClusterArn: migrationStack.mskARN, + replayerOutputFileSystemId: migrationStack.replayerOutputFileSystemId, + migrationDomainEndpoint: opensearchStack.domainEndpoint, serviceConnectSecurityGroup: migrationStack.serviceConnectSecurityGroup, - additionalServiceSecurityGroups: [migrationStack.mskAccessSecurityGroup], - stackName: `OSMigrations-${stage}-${region}-CaptureProxyES`, - description: "This stack contains resources for the Capture Proxy/Elasticsearch ECS service", + additionalServiceSecurityGroups: [networkStack.defaultDomainAccessSecurityGroup, migrationStack.replayerOutputAccessSecurityGroup], + stackName: `OSMigrations-${stage}-${region}-MigrationConsole`, + description: "This stack contains resources for the Migration Console ECS service", ...props, }) - this.stacks.push(captureProxyESStack) - + // To enable the Migration Console to make requests to the Capture Proxy with Service Connect, + // it should be deployed after the Capture Proxy + if (captureProxyESStack) { + migrationConsoleStack.addDependency(captureProxyESStack) + } + migrationConsoleStack.addDependency(migrationStack) + migrationConsoleStack.addDependency(opensearchStack) + migrationConsoleStack.addDependency(networkStack) + this.stacks.push(migrationConsoleStack) } // Currently, placing a requirement on a VPC for a historical capture stack but this can be revisited diff --git a/deployment/copilot/devDeploy.sh b/deployment/copilot/devDeploy.sh index 078605753..a75454afd 100755 --- a/deployment/copilot/devDeploy.sh +++ b/deployment/copilot/devDeploy.sh @@ -136,7 +136,7 @@ fi # This command deploys the required infrastructure for the migration solution with CDK that Copilot requires. # The options provided to `cdk deploy` here will cause a VPC, Opensearch Domain, and MSK(Kafka) resources to get created in AWS (among other resources) # More details on the CDK used here can be found at opensearch-migrations/deployment/cdk/opensearch-service-migration/README.md -cdk deploy "*" --tags $TAGS --c domainName="aos-domain" --c engineVersion="OS_2.7" --c dataNodeCount=2 --c vpcEnabled=true --c availabilityZoneCount=2 --c openAccessPolicyEnabled=true --c domainRemovalPolicy="DESTROY" --c migrationAssistanceEnabled=true --c enableDemoAdmin=true -O cdk.out/cdkOutput.json --require-approval never --concurrency 3 +cdk deploy "capture-proxy-es" --tags $TAGS --c migrationConsoleEnabled=true --c captureProxyESEnabled=true --c domainName="aos-domain" --c engineVersion="OS_2.7" --c dataNodeCount=2 --c vpcEnabled=true --c availabilityZoneCount=2 --c openAccessPolicyEnabled=true --c domainRemovalPolicy="DESTROY" --c migrationAssistanceEnabled=true --c enableDemoAdmin=true -O cdk.out/cdkOutput.json --require-approval never --concurrency 3 exit 1 diff --git a/deployment/copilot/ecsExec.sh b/deployment/copilot/ecsExec.sh new file mode 100755 index 000000000..218c7760f --- /dev/null +++ b/deployment/copilot/ecsExec.sh @@ -0,0 +1,13 @@ +#!/bin/bash + +# Example usage: ./ecsExec.sh migration-console dev us-east-1 + +service_name=$1 +stage=$2 +region=$3 + +export AWS_DEFAULT_REGION=$region + +task_arn=$(aws ecs list-tasks --cluster migration-${stage}-ecs-cluster --family "migration-${stage}-${service_name}" | jq --raw-output '.taskArns[0]') + +aws ecs execute-command --cluster "migration-${stage}-ecs-cluster" --task "${task_arn}" --container "${service_name}" --interactive --command "/bin/bash" \ No newline at end of file