Skip to content

Commit

Permalink
[indexer_autoscaler_lambda] pause indexing if the available storage d…
Browse files Browse the repository at this point in the history
…rops below a threshold (#14)

incorporate free storage metrics into autoscaler tuning
  • Loading branch information
akumar1214 authored Nov 12, 2024
1 parent 81c3701 commit 342335d
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 25 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ These libraries wrap the the core ElasticGraph libraries so that they can be dep
graph LR;
elasticgraph-admin_lambda --> rake & elasticgraph-admin & elasticgraph-lambda_support
elasticgraph-graphql_lambda --> elasticgraph-graphql & elasticgraph-lambda_support
elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & ox
elasticgraph-indexer_autoscaler_lambda --> elasticgraph-datastore_core & elasticgraph-lambda_support & aws-sdk-lambda & aws-sdk-sqs & aws-sdk-cloudwatch & ox
elasticgraph-indexer_lambda --> elasticgraph-indexer & elasticgraph-lambda_support & aws-sdk-s3 & ox
elasticgraph-lambda_support --> elasticgraph-opensearch & faraday_middleware-aws-sigv4
style elasticgraph-admin_lambda color: DodgerBlue;
Expand All @@ -112,11 +112,13 @@ graph LR;
style elasticgraph-datastore_core color: Green;
style aws-sdk-lambda color: Red;
style aws-sdk-sqs color: Red;
style aws-sdk-cloudwatch color: Red;
style ox color: Red;
style elasticgraph-indexer color: Green;
style aws-sdk-s3 color: Red;
style elasticgraph-opensearch color: Green;
style faraday_middleware-aws-sigv4 color: Red;
click aws-sdk-cloudwatch href "https://rubygems.org/gems/aws-sdk-cloudwatch"
click aws-sdk-lambda href "https://rubygems.org/gems/aws-sdk-lambda"
click aws-sdk-s3 href "https://rubygems.org/gems/aws-sdk-s3"
click aws-sdk-sqs href "https://rubygems.org/gems/aws-sdk-sqs"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ ElasticGraphGemspecHelper.define_elasticgraph_gem(gemspec_file: __FILE__, catego

spec.add_dependency "aws-sdk-lambda", "~> 1.125"
spec.add_dependency "aws-sdk-sqs", "~> 1.80"
spec.add_dependency "aws-sdk-cloudwatch", "~> 1.104"

# aws-sdk-sqs requires an XML library be available. On Ruby < 3 it'll use rexml from the standard library but on Ruby 3.0+
# we have to add an explicit dependency. It supports ox, oga, libxml, nokogiri or rexml, and of those, ox seems to be the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ def self.from_parsed_yaml(parsed_yaml, &datastore_client_customization_block)
def initialize(
datastore_core:,
sqs_client: nil,
lambda_client: nil
lambda_client: nil,
cloudwatch_client: nil
)
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

def sqs_client
Expand All @@ -53,13 +55,21 @@ def lambda_client
end
end

def cloudwatch_client
@cloudwatch_client ||= begin
require "aws-sdk-cloudwatch"
Aws::CloudWatch::Client.new
end
end

def concurrency_scaler
@concurrency_scaler ||= begin
require "elastic_graph/indexer_autoscaler_lambda/concurrency_scaler"
ConcurrencyScaler.new(
datastore_core: @datastore_core,
sqs_client: sqs_client,
lambda_client: lambda_client
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,25 @@ module ElasticGraph
class IndexerAutoscalerLambda
# @private
class ConcurrencyScaler
def initialize(datastore_core:, sqs_client:, lambda_client:)
def initialize(datastore_core:, sqs_client:, lambda_client:, cloudwatch_client:)
@logger = datastore_core.logger
@datastore_core = datastore_core
@sqs_client = sqs_client
@lambda_client = lambda_client
@cloudwatch_client = cloudwatch_client
end

MINIMUM_CONCURRENCY = 2

def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maximum_concurrency:, indexer_function_name:)
def tune_indexer_concurrency(
queue_urls:,
min_cpu_target:,
max_cpu_target:,
maximum_concurrency:,
required_free_storage_in_mb:,
indexer_function_name:,
cluster_name:
)
queue_attributes = get_queue_attributes(queue_urls)
queue_arns = queue_attributes.fetch(:queue_arns)
num_messages = queue_attributes.fetch(:total_messages)
Expand All @@ -37,6 +46,8 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi

new_target_concurrency =
if num_messages.positive?
lowest_node_free_storage_in_mb = get_lowest_node_free_storage_in_mb(cluster_name)

cpu_utilization = get_max_cpu_utilization
cpu_midpoint = (max_cpu_target + min_cpu_target) / 2.0

Expand All @@ -45,11 +56,19 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
if current_concurrency.nil?
details_logger.log_unset
nil
elsif lowest_node_free_storage_in_mb < required_free_storage_in_mb
details_logger.log_pause(
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb
)
MINIMUM_CONCURRENCY
elsif cpu_utilization < min_cpu_target
increase_factor = (cpu_midpoint / cpu_utilization).clamp(0.0, 1.5)
(current_concurrency * increase_factor).round.tap do |new_concurrency|
details_logger.log_increase(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
Expand All @@ -59,20 +78,24 @@ def tune_indexer_concurrency(queue_urls:, min_cpu_target:, max_cpu_target:, maxi
(current_concurrency - (current_concurrency * decrease_factor)).round.tap do |new_concurrency|
details_logger.log_decrease(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency,
new_concurrency: new_concurrency
)
end
else
details_logger.log_no_change(
cpu_utilization: cpu_utilization,
lowest_node_free_storage_in_mb: lowest_node_free_storage_in_mb,
required_free_storage_in_mb: required_free_storage_in_mb,
current_concurrency: current_concurrency
)
current_concurrency
end
else
details_logger.log_reset
0
MINIMUM_CONCURRENCY
end

if new_target_concurrency && new_target_concurrency != current_concurrency
Expand All @@ -94,6 +117,22 @@ def get_max_cpu_utilization
end.max.to_f
end

def get_lowest_node_free_storage_in_mb(cluster_name)
metric_response = @cloudwatch_client.get_metric_data({
start_time: ::Time.now - 1200, # past 20 minutes
end_time: ::Time.now,
metric_data_queries: [
{
id: "minFreeStorageAcrossNodes",
expression: "SEARCH('{AWS/ES,ClientId,DomainName} MetricName=\"FreeStorageSpace\" AND DomainName=\"#{cluster_name}\"', 'Minimum', 60)",
return_data: true
}
]
})

metric_response.metric_data_results.first.values.first
end

def get_queue_attributes(queue_urls)
attributes_per_queue = queue_urls.map do |queue_url|
@sqs_client.get_queue_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,32 +30,46 @@ def initialize(
}
end

def log_increase(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_increase(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "increase",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_decrease(cpu_utilization:, current_concurrency:, new_concurrency:)
def log_decrease(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:, new_concurrency:)
log_result({
"action" => "decrease",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency,
"new_concurrency" => new_concurrency
})
end

def log_no_change(cpu_utilization:, current_concurrency:)
def log_no_change(cpu_utilization:, lowest_node_free_storage_in_mb:, required_free_storage_in_mb:, current_concurrency:)
log_result({
"action" => "no_change",
"cpu_utilization" => cpu_utilization,
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb,
"current_concurrency" => current_concurrency
})
end

def log_pause(lowest_node_free_storage_in_mb:, required_free_storage_in_mb:)
log_result({
"action" => "pause",
"lowest_node_free_storage_in_mb" => lowest_node_free_storage_in_mb,
"required_free_storage_in_mb" => required_free_storage_in_mb
})
end

def log_reset
log_result({"action" => "reset"})
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ def handle_request(event:, context:)
min_cpu_target: event.fetch("min_cpu_target"),
max_cpu_target: event.fetch("max_cpu_target"),
maximum_concurrency: event.fetch("maximum_concurrency"),
indexer_function_name: event.fetch("indexer_function_name")
required_free_storage_in_mb: event.fetch("required_free_storage_in_mb"),
indexer_function_name: event.fetch("indexer_function_name"),
cluster_name: event.fetch("cluster_name")
)
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ module ElasticGraph
def initialize: (
datastore_core: DatastoreCore,
sqs_client: Aws::SQS::Client,
lambda_client: Aws::Lambda::Client
lambda_client: Aws::Lambda::Client,
cloudwatch_client: Aws::CloudWatch::Client
) -> void

MINIMUM_CONCURRENCY: ::Integer
Expand All @@ -14,7 +15,9 @@ module ElasticGraph
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
maximum_concurrency: ::Integer,
indexer_function_name: ::String
required_free_storage_in_mb: ::Integer,
indexer_function_name: ::String,
cluster_name: ::String
) -> void

private
Expand All @@ -23,8 +26,10 @@ module ElasticGraph
@datastore_core: DatastoreCore
@sqs_client: Aws::SQS::Client
@lambda_client: Aws::Lambda::Client
@cloudwatch_client: Aws::CloudWatch::Client

def get_max_cpu_utilization: () -> ::Float
def get_lowest_node_free_storage_in_mb: (::String) -> ::Float
def get_queue_attributes: (::Array[::String]) -> { total_messages: ::Integer, queue_arns: ::Array[::String] }
def get_concurrency: (::String) -> ::Integer?

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,37 @@ module ElasticGraph
queue_urls: ::Array[::String],
min_cpu_target: ::Integer,
max_cpu_target: ::Integer,
num_messages: ::Integer,
num_messages: ::Integer
) -> void

def log_increase: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_decrease: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer,
new_concurrency: ::Integer
) -> void

def log_no_change: (
cpu_utilization: ::Float,
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer,
current_concurrency: ::Integer
) -> void

def log_pause: (
lowest_node_free_storage_in_mb: ::Float,
required_free_storage_in_mb: ::Integer
) -> void

def log_reset: () -> void

def log_unset: () -> void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ module ElasticGraph
datastore_core: DatastoreCore,
?sqs_client: Aws::SQS::Client?,
?lambda_client: Aws::Lambda::Client?,
?cloudwatch_client: Aws::CloudWatch::Client?,
) -> void

@sqs_client: Aws::SQS::Client?
Expand All @@ -19,6 +20,9 @@ module ElasticGraph
@lambda_client: Aws::Lambda::Client?
def lambda_client: () -> Aws::Lambda::Client

@cloudwatch_client: Aws::CloudWatch::Client?
def cloudwatch_client: () -> Aws::CloudWatch::Client

@concurrency_scaler: ConcurrencyScaler?
def concurrency_scaler: () -> ConcurrencyScaler
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ module BuildsIndexerAutoscalerLambda
def build_indexer_autoscaler(
sqs_client: nil,
lambda_client: nil,
cloudwatch_client: nil,
**datastore_core_options,
&customize_datastore_config
)
Expand All @@ -28,6 +29,7 @@ def build_indexer_autoscaler(
IndexerAutoscalerLambda.new(
sqs_client: sqs_client,
lambda_client: lambda_client,
cloudwatch_client: cloudwatch_client,
datastore_core: datastore_core
)
end
Expand Down
Loading

0 comments on commit 342335d

Please sign in to comment.