From 6a15e93619b79102c9ce35a9b3a19ebf1ce116a9 Mon Sep 17 00:00:00 2001 From: karthitect Date: Sun, 24 May 2020 17:14:45 -0500 Subject: [PATCH] Updates for target tracking autoscaling --- .gitignore | 5 + README.md | 3 +- step-scaling/README.md | 2 +- targettracking-scaling/README.md | 23 ++- targettracking-scaling/index.py | 149 ------------------ .../targettracking-scaling.yaml | 22 +-- targettracking-scaling/zipanddeploy.sh | 12 ++ 7 files changed, 52 insertions(+), 164 deletions(-) create mode 100644 targettracking-scaling/zipanddeploy.sh diff --git a/.gitignore b/.gitignore index a3502c7..a69bebe 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,11 @@ # Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio and WebStorm # Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 +step-scaling/**/index.zip +step-scaling/**/metrics.txt +targettracking-scaling/**/index.zip +targettracking-scaling/**/metrics.txt + # User-specific stuff .idea/**/workspace.xml .idea/**/tasks.xml diff --git a/README.md b/README.md index 633617c..b91d275 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,7 @@ We've included guidance for both [step scaling](https://github.com/karthitect/kd - [Target tracking scaling](https://docs.aws.amazon.com/autoscaling/application/userguide/application-auto-scaling-target-tracking.html) ## Why use Application Autoscaling? -You may be wondering: "Why use Application Autoscaling; why not just trigger a Lambda function via a CloudWatch alarm and SNS?". The main reason is that Application Autoscaling has a well defined API for specifying scaling policies and associated attributes such as cooldown periods. In addition, we can take advantage of the various types of scaling types included with Application Autoscaling: step scaling, target tracking scaling, and schedule-based scaling (not covered in this doc). - +You may be wondering: "Why use Application Autoscaling; why not just trigger a Lambda function via a CloudWatch alarm and SNS?". The main reason is that Application Autoscaling has a well defined API for specifying scaling policies and associated attributes such as cooldown periods. In addition, we can take advantage of all three scaling types included with Application Autoscaling: step scaling, target tracking scaling, and schedule-based scaling (not covered in this doc). ## Step scaling The step scaling sample uses the incomingRecords metric for the source Kinesis stream to proportionately configure the parallelism of the associated KDA application. The following subsections describe the key components behind the scaling approach diff --git a/step-scaling/README.md b/step-scaling/README.md index e51d6ca..fc038d8 100644 --- a/step-scaling/README.md +++ b/step-scaling/README.md @@ -1,5 +1,5 @@ ## Step scaling -The step scaling sample uses the incomingRecords metric for the source Kinesis stream to proportionately configure the parallelism of the associated KDA application. The following subsections describe the key components behind the scaling approach +The step scaling sample uses the incomingRecords metric for the source Kinesis data stream source to proportionately configure the parallelism of the associated KDA application. The following subsections describe the key components behind the scaling approach ### Application autoscaling of custom resource diff --git a/targettracking-scaling/README.md b/targettracking-scaling/README.md index 0faeb08..e5fe651 100644 --- a/targettracking-scaling/README.md +++ b/targettracking-scaling/README.md @@ -1 +1,22 @@ -# [In Progress] \ No newline at end of file +## Target-tracking scaling +The target tracking scaling sample uses the millisBehindLatest metric for the Kinesis data stream source as the signal to adjust the parallelism of the KDA Flink app. The algorithm is fairly simple: when millisBehindLatest > 1ms, trigger scale-out. Target tracking autoscaling then scales back in once millisBehindLatest for the KDA Flink consumer drops below 1ms. + +### Application autoscaling of custom resource + +Application autoscaling allows users to scale in/out custom resources by specifying a custom endpoint that can be invoked by Application Autoscaling. In this example, this custom endpoint is implemented using API Gateway and an AWS Lambda function. Here's a high level flow depicting this approach: + +CW Alarm => Application Autoscaling => Custom Endpoint (API GW + Lambda) => Scale KDA App + +### CloudFormation template +The accompanying [CloudFormation template](https://github.com/karthitect/kda-flink-autoscaling/blob/master/step-scaling/step-scaling.yaml) takes care of provisioning all of the above components. + +### Scaling logic +When invoked by Application Autoscaling, the Lambda function (written in Python) will call [UpdateApplication](https://docs.aws.amazon.com/kinesisanalytics/latest/apiv2/API_UpdateApplication.html) with the desired capacity specified. In addition, it will also re-configure the alarm thresholds to take into account the current parallelism of the KDA application. + +You can review the Python code for the Lambda function [here](https://github.com/karthitect/kda-flink-autoscaling/blob/master/step-scaling/index.py). + +### Some caveats + +1. When scaling out/in, in this sample we only update the overall parallelism; we don't adjust parallelism/KPU. +2. When scaling occurs, the KDA app experiences downtime. Please take this into consideration when implement target tracking autoscaling against KDA Flink. +3. Please keep in mind that the throughput of a Flink application is dependent on many factors (complexity of processing, destination throughput, etc...). This example assumes a simple relationship between the millisBehindLatest value and scaling for demonstration purposes. \ No newline at end of file diff --git a/targettracking-scaling/index.py b/targettracking-scaling/index.py index 89bb027..1a25a97 100644 --- a/targettracking-scaling/index.py +++ b/targettracking-scaling/index.py @@ -54,8 +54,6 @@ def update_parallelism(context, desiredCapacity, resourceName, appVersionId): print("In update_parallelism; response: ") print(response) scalingStatus = "InProgress" - - put_alarms(context, desiredCapacity) # In case of error of updating the sharding, raise an exception. except Exception as e: @@ -67,153 +65,6 @@ def update_parallelism(context, desiredCapacity, resourceName, appVersionId): return scalingStatus -def put_alarms(context, curr_capacity): - try: - - scaling_policies = get_scaling_policies(context) - if not scaling_policies: - print("ERROR: Unable to get scaling_policies") - return - - if curr_capacity > 1: - # scale in alarm (for curr_capacity > 1) - scalein_policy_arn = scaling_policies['KDAScaleIn'] - print("scalein_policy_arn: " + scalein_policy_arn) - if not scalein_policy_arn: - print("ERROR - scalein_policy_arn is None in put_alarms") - return - - client_cloudwatch.put_metric_alarm( - AlarmName=get_cloudwatch_alarm_in(), - AlarmDescription='KDA scale in alarm', - Metrics=[ - { - "Expression": "ir/60", - "Id": "ad1", - "ReturnData": False - }, - { - "Id": "ir", - "MetricStat": { - "Metric": { - "Dimensions": [ - { - "Value": get_kinesis_stream(), - "Name": "StreamName" - } - ], - "MetricName": "IncomingRecords", - "Namespace": "AWS/Kinesis" - }, - "Period": 60, - "Stat": "Sum" - }, - "ReturnData": True - } - ], - Threshold=(curr_capacity-1)*get_max_throughput_per_kpu(), - ComparisonOperator='LessThanOrEqualToThreshold', - AlarmActions=[ - scalein_policy_arn - ], - EvaluationPeriods=2) - elif curr_capacity <= 1: - # scale in alarm (for capacity <= 1) - client_cloudwatch.put_metric_alarm( - AlarmName=get_cloudwatch_alarm_in(), - AlarmDescription='KDA scale in alarm', - Metrics=[ - { - "Expression": "ir/60", - "Id": "ad1", - "ReturnData": False - }, - { - "Id": "ir", - "MetricStat": { - "Metric": { - "Dimensions": [ - { - "Value": get_kinesis_stream(), - "Name": "StreamName" - } - ], - "MetricName": "IncomingRecords", - "Namespace": "AWS/Kinesis" - }, - "Period": 60, - "Stat": "Sum" - }, - "ReturnData": True - } - ], - Threshold=0, - ComparisonOperator='LessThanOrEqualToThreshold', - EvaluationPeriods=2) - - if curr_capacity < get_max_kpus(): - # scale out alarm - scaleout_policy_arn = scaling_policies['KDAScaleOut'] - print("scaleout_policy_arn: " + scaleout_policy_arn) - if not scaleout_policy_arn: - print("ERROR - scaleout_policy_arn is None in put_alarms") - return - - client_cloudwatch.put_metric_alarm( - AlarmName=get_cloudwatch_alarm_out(), - AlarmDescription='KDA scale out alarm', - Metrics=[ - { - "Expression": "ir/60", - "Id": "ad1", - "ReturnData": False - }, - { - "Id": "ir", - "MetricStat": { - "Metric": { - "Dimensions": [ - { - "Value": get_kinesis_stream(), - "Name": "StreamName" - } - ], - "MetricName": "IncomingRecords", - "Namespace": "AWS/Kinesis" - }, - "Period": 60, - "Stat": "Sum" - }, - "ReturnData": True - } - ], - Threshold=curr_capacity*get_max_throughput_per_kpu(), - ComparisonOperator='GreaterThanOrEqualToThreshold', - AlarmActions=[ - scaleout_policy_arn - ], - EvaluationPeriods=1) - except Exception as e: - print("ERROR - Exception in put_alarms") - print(e) - - -def get_cloudwatch_alarm_in(): - try: - return os.environ['CloudWatchAlarmNameIn'] - except Exception as e: - print ("ERROR - Exception in get_cloudwatch_alarm_in") - print(e) - - -def get_cloudwatch_alarm_out(): - try: - return os.environ['CloudWatchAlarmNameOut'] - except Exception as e: - print ("ERROR - Exception in get_cloudwatch_alarm_out") - print(e) - - def get_kinesis_stream(): try: return os.environ['KinesisStreamName'] diff --git a/targettracking-scaling/targettracking-scaling.yaml b/targettracking-scaling/targettracking-scaling.yaml index 3ac804b..82ae487 100644 --- a/targettracking-scaling/targettracking-scaling.yaml +++ b/targettracking-scaling/targettracking-scaling.yaml @@ -66,7 +66,7 @@ Resources: Timeout: 10 CodeUri: Bucket: !Ref S3BucketLambda - Key: kdascalinglambda/index.zip + Key: kdattscalinglambda/index.zip Policies: - AWSCloudFormationReadOnlyAccess - Statement: @@ -101,9 +101,6 @@ Resources: ParameterStore: !Ref KDADesiredCapacityParameter KDAAppName: !Ref KDAAppName KinesisStreamName: !Ref KinesisStreamName - CloudWatchAlarmNameIn: !Ref CloudWatchAlarmNameIn - CloudWatchAlarmNameOut: !Ref CloudWatchAlarmNameOut - MaxThroughputPerKPU: !Ref ThroughputPerKPU MaxKPUs: !Ref MaxKPUs Events: KinesisAPI: @@ -175,7 +172,7 @@ Resources: Type: "String" Value: !Ref KDAParallelism Description: "Store DesiredCapacity in Parameter Store" - AllowedPattern: "[0-9]" + AllowedPattern: "[0-9]+" KDAScalableTarget: Type: AWS::ApplicationAutoScaling::ScalableTarget @@ -206,17 +203,20 @@ Resources: ScalableDimension: "custom-resource:ResourceType:Property" ServiceNamespace: custom-resource TargetTrackingScalingPolicyConfiguration: - TargetValue: 0.0 + TargetValue: 1 ScaleInCooldown: 60 ScaleOutCooldown: 60 CustomizedMetricSpecification: - MetricName: MyAverageUtilizationMetric - Namespace: MetricNamespace + MetricName: millisBehindLatest + Namespace: AWS/KinesisAnalytics Dimensions: - Name: MyMetricDimensionName - Value: MyMetricDimensionValue + - Name: Id + Value: KDAScalingTestStream + - Name: Application + Value: kdascaling + - Name: Flow + Value: Input Statistic: Average - Unit: Percent Outputs: URL: diff --git a/targettracking-scaling/zipanddeploy.sh b/targettracking-scaling/zipanddeploy.sh new file mode 100644 index 0000000..036c335 --- /dev/null +++ b/targettracking-scaling/zipanddeploy.sh @@ -0,0 +1,12 @@ +#!/bin/bash + +S3_LOCATION=s3://ktohio +FILE_NAME=kdattscalinglambda/index.zip + +echo Zipping... +zip index.zip index.py + +echo Copying to $S3_LOCATION/$FILE_NAME +aws s3 cp index.zip $S3_LOCATION/$FILE_NAME + +echo Done