Skip to content

Commit

Permalink
Improve warning for insufficient file resources for PQ max_bytes
Browse files Browse the repository at this point in the history
This commit refactors the `PersistedQueueConfigValidator` class to provide a
more detailed, accurate and actionable warning when pipeline's PQ configs are at
risk of running out of disk space. See
elastic#14839 for design considerations. The
highlights of the changes include accurately determining the free resources on a
filesystem disk and then providing a breakdown of the usage for each of the
paths configured for a queue.
  • Loading branch information
donoghuc committed Nov 8, 2024
1 parent 046ea1f commit 9b408b4
Show file tree
Hide file tree
Showing 2 changed files with 222 additions and 22 deletions.
75 changes: 63 additions & 12 deletions logstash-core/lib/logstash/persisted_queue_config_validator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def check(running_pipelines, pipeline_configs)
warn_msg = []
err_msg = []
queue_path_file_system = Hash.new # (String: queue path, String: file system)
required_free_bytes = Hash.new # (String: file system, Integer: size)
required_free_bytes = Hash.new # (String: file system, Integer: size)
current_usage_bytes = Hash.new # (String: file system, Integer: size)

pipeline_configs.select { |config| config.settings.get('queue.type') == 'persisted'}
.select { |config| config.settings.get('queue.max_bytes').to_i != 0 }
Expand All @@ -60,12 +61,13 @@ def check(running_pipelines, pipeline_configs)
check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)

queue_path_file_system[queue_path] = file_system
if used_bytes < max_bytes
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes - used_bytes
end
# Add max_bytes to required total for this filesystem
required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes
# Track current usage separately
current_usage_bytes[file_system] = current_usage_bytes.fetch(file_system, 0) + used_bytes
end

check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)

@last_check_pass = err_msg.empty? && warn_msg.empty?

Expand All @@ -85,15 +87,64 @@ def check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes)
end
end

# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/ devices.
# Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/devices.
# It uses the filesystem of the path and count the required bytes by filesystem
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes)
disk_warn_msg =
queue_path_file_system
.select { |queue_path, file_system| !FsUtil.hasFreeSpace(Paths.get(queue_path), required_free_bytes.fetch(file_system, 0)) }
.map { |queue_path, file_system| "The persistent queue on path \"#{queue_path}\" won't fit in file system \"#{file_system}\" when full. Please free or allocate #{required_free_bytes.fetch(file_system, 0)} more bytes." }
def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes)
# Group paths by filesystem
paths_by_filesystem = queue_path_file_system.group_by { |_, fs| fs }

# Only process filesystems that need more space
filesystems_needing_space = paths_by_filesystem.select do |file_system, paths|
additional_needed = required_free_bytes.fetch(file_system, 0) - current_usage_bytes.fetch(file_system, 0)
!FsUtil.hasFreeSpace(Paths.get(paths.first.first), additional_needed)
end

return if filesystems_needing_space.empty?

message_parts = [
"Persistent queues require more disk space than is available on #{filesystems_needing_space.size > 1 ? 'multiple filesystems' : 'a filesystem'}:",
""
]

# Add filesystem-specific information
filesystems_needing_space.each do |file_system, paths|
total_required = required_free_bytes.fetch(file_system, 0)
current_usage = current_usage_bytes.fetch(file_system, 0)
additional_needed = total_required - current_usage
fs_path = Paths.get(paths.first.first)
free_space = Files.getFileStore(fs_path).getUsableSpace

message_parts.concat([
"Filesystem '#{file_system}':",
"- Total space required: #{LogStash::Util::ByteValue.human_readable(total_required)}",
"- Currently free space: #{LogStash::Util::ByteValue.human_readable(free_space)}",
"- Current PQ usage: #{LogStash::Util::ByteValue.human_readable(current_usage)}",
"- Additional space needed: #{LogStash::Util::ByteValue.human_readable(additional_needed)}",
"",
"Individual queue requirements:",
*paths.map { |path, _|
used = get_page_size(::File.join(path, "page.*"))
[
" #{path}:",
" Current size: #{LogStash::Util::ByteValue.human_readable(used)}",
" Maximum size: #{LogStash::Util::ByteValue.human_readable(total_required / paths.size)}"
]
}.flatten,
"" # Empty line between filesystems
])
end

warn_msg << disk_warn_msg unless disk_warn_msg.empty?
# Add common footer
message_parts.concat([
"Please either:",
"1. Free up disk space",
"2. Reduce queue.max_bytes in your pipeline configurations",
"3. Move PQ storage to a filesystem with more available space",
"Note: Logstash may fail to start if this is not resolved.",
""
])

warn_msg << message_parts.join("\n")
end

def get_file_system(queue_path)
Expand Down
169 changes: 159 additions & 10 deletions logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -80,24 +80,173 @@
end

context("disk does not have sufficient space") do
# two pq with different paths
let(:settings1) { settings.dup.merge("queue.max_bytes" => "1000pb") }
let(:settings2) { settings1.dup.merge("path.queue" => Stud::Temporary.directory) }
let(:pipeline_id) { "main" }
# Create a pipeline config double that matches what the class expects
let(:pipeline_config) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) # 300GB
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) # 64MB
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id)
allow(s).to receive(:get).with("path.queue").and_return(queue_path)
end
)
end
end

before do
allow(Dir).to receive(:glob).and_return(["page.1"])
allow(File).to receive(:size).and_return(25 * 1024 * 1024 * 1024)
allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
allow(Files).to receive(:exists).and_return(true)

let(:pipeline_configs) do
LogStash::Config::Source::Local.new(settings1).pipeline_configs +
LogStash::Config::Source::Local.new(settings2).pipeline_configs
# Mock filesystem
mock_file_store = double("FileStore",
name: "disk1",
getUsableSpace: 100 * 1024 * 1024 * 1024 # 100GB free
)
allow(Files).to receive(:getFileStore).and_return(mock_file_store)
end

it "should throw" do
it "reports detailed space information" do
expect(pq_config_validator).to receive(:check_disk_space) do |_, _, required_free_bytes|
expect(required_free_bytes.size).to eq(1)
expect(required_free_bytes.values[0]).to eq(1024**5 * 1000 * 2) # require 2000pb
expect(required_free_bytes.values[0]).to eq(300 * 1024 * 1024 * 1024)
end.and_call_original

expect(pq_config_validator.logger).to receive(:warn).once.with(/won't fit in file system/)
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
expect(msg).to include("Total space required: 300gb")
expect(msg).to include("Current PQ usage: 25gb")
end

pq_config_validator.check({}, pipeline_configs)
pq_config_validator.check({}, [pipeline_config])
end

context "with multiple pipelines" do
let(:pipeline_id1) { "main" }
let(:pipeline_id2) { "secondary" }
let(:pipeline_id3) { "third" }

let(:base_queue_path) { queue_path }
let(:queue_path1) { ::File.join(base_queue_path, pipeline_id1) }
let(:queue_path2) { ::File.join(base_queue_path, pipeline_id2) }
let(:queue_path3) { ::File.join(Stud::Temporary.directory, pipeline_id3) }

let(:pipeline_config1) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id1)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id1)
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
end
)
end
end

let(:pipeline_config2) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id2)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id2)
allow(s).to receive(:get).with("path.queue").and_return(base_queue_path)
end
)
end
end

let(:pipeline_config3) do
double("PipelineConfig").tap do |config|
allow(config).to receive(:pipeline_id).and_return(pipeline_id3)
allow(config).to receive(:settings).and_return(
double("Settings").tap do |s|
allow(s).to receive(:get).with("queue.type").and_return("persisted")
allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024)
allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024)
allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id3)
allow(s).to receive(:get).with("path.queue").and_return(::File.dirname(queue_path3))
end
)
end
end

let(:mock_file_store1) { double("FileStore", name: "disk1", getUsableSpace: 100 * 1024 * 1024 * 1024) }
let(:mock_file_store2) { double("FileStore", name: "disk2", getUsableSpace: 50 * 1024 * 1024 * 1024) }

before do
# Precise path matching for Dir.glob
allow(Dir).to receive(:glob) do |pattern|
case pattern
when /#{pipeline_id1}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
when /#{pipeline_id2}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1", "#{::File.dirname(pattern)}/page.2"]
when /#{pipeline_id3}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"]
else []
end
end

# Set up file size matching with full paths
allow(File).to receive(:size) do |path|
case
when path.include?(pipeline_id1) then 30 * 1024 * 1024 * 1024 # 30GB for main
when path.include?(pipeline_id2) then 25 * 1024 * 1024 * 1024 # 25GB for secondary
when path.include?(pipeline_id3) then 25 * 1024 * 1024 * 1024 # 25GB for third
else 0
end
end

allow(Files).to receive(:getFileStore) do |path|
case path.toString
when /#{pipeline_id3}/ then mock_file_store2
else mock_file_store1
end
end

allow(FsUtil).to receive(:hasFreeSpace).and_return(false)
allow(Files).to receive(:exists).and_return(true)
end

context "with multiple queues on same filesystem" do
it "reports consolidated information for same filesystem" do
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
expect(msg).to match(/Persistent queues require more disk space than is available on a filesystem:/)
expect(msg).to match(/Filesystem 'disk1':/)
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)
expect(msg).to match(/Current size: 30gb/) # First queue
expect(msg).to match(/Current size: 50gb/) # Second queue (2 files * 25GB)
end

pq_config_validator.check({}, [pipeline_config1, pipeline_config2])
end
end

context "with queues across multiple filesystems" do
it "reports separate information for each filesystem" do
expect(pq_config_validator.logger).to receive(:warn).once do |msg|
# First filesystem
expect(msg).to match(/Filesystem 'disk1':/)
expect(msg).to match(/Total space required: 600gb/) # 300GB * 2
expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB)

# Second filesystem
expect(msg).to match(/Filesystem 'disk2':/)
expect(msg).to match(/Total space required: 300gb/) # 300GB
expect(msg).to match(/Current PQ usage: 25gb/) # 25GB
end

pq_config_validator.check({}, [pipeline_config1, pipeline_config2, pipeline_config3])
end
end
end
end

Expand Down

0 comments on commit 9b408b4

Please sign in to comment.