diff --git a/logstash-core/lib/logstash/persisted_queue_config_validator.rb b/logstash-core/lib/logstash/persisted_queue_config_validator.rb index 6918d88fbab..45a8edad341 100644 --- a/logstash-core/lib/logstash/persisted_queue_config_validator.rb +++ b/logstash-core/lib/logstash/persisted_queue_config_validator.rb @@ -42,7 +42,8 @@ def check(running_pipelines, pipeline_configs) warn_msg = [] err_msg = [] queue_path_file_system = Hash.new # (String: queue path, String: file system) - required_free_bytes = Hash.new # (String: file system, Integer: size) + required_free_bytes = Hash.new # (String: file system, Integer: size) + current_usage_bytes = Hash.new # (String: file system, Integer: size) pipeline_configs.select { |config| config.settings.get('queue.type') == 'persisted'} .select { |config| config.settings.get('queue.max_bytes').to_i != 0 } @@ -60,12 +61,13 @@ def check(running_pipelines, pipeline_configs) check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes) queue_path_file_system[queue_path] = file_system - if used_bytes < max_bytes - required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes - used_bytes - end + # Add max_bytes to required total for this filesystem + required_free_bytes[file_system] = required_free_bytes.fetch(file_system, 0) + max_bytes + # Track current usage separately + current_usage_bytes[file_system] = current_usage_bytes.fetch(file_system, 0) + used_bytes end - check_disk_space(warn_msg, queue_path_file_system, required_free_bytes) + check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes) @last_check_pass = err_msg.empty? && warn_msg.empty? @@ -85,15 +87,64 @@ def check_queue_usage(warn_msg, pipeline_id, max_bytes, used_bytes) end end - # Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/ devices. + # Check disk has sufficient space for all queues reach their max bytes. Queues may config with different paths/devices. # It uses the filesystem of the path and count the required bytes by filesystem - def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes) - disk_warn_msg = - queue_path_file_system - .select { |queue_path, file_system| !FsUtil.hasFreeSpace(Paths.get(queue_path), required_free_bytes.fetch(file_system, 0)) } - .map { |queue_path, file_system| "The persistent queue on path \"#{queue_path}\" won't fit in file system \"#{file_system}\" when full. Please free or allocate #{required_free_bytes.fetch(file_system, 0)} more bytes." } + def check_disk_space(warn_msg, queue_path_file_system, required_free_bytes, current_usage_bytes) + # Group paths by filesystem + paths_by_filesystem = queue_path_file_system.group_by { |_, fs| fs } + + # Only process filesystems that need more space + filesystems_needing_space = paths_by_filesystem.select do |file_system, paths| + additional_needed = required_free_bytes.fetch(file_system, 0) - current_usage_bytes.fetch(file_system, 0) + !FsUtil.hasFreeSpace(Paths.get(paths.first.first), additional_needed) + end + + return if filesystems_needing_space.empty? + + message_parts = [ + "Persistent queues require more disk space than is available on #{filesystems_needing_space.size > 1 ? 'multiple filesystems' : 'a filesystem'}:", + "" + ] + + # Add filesystem-specific information + filesystems_needing_space.each do |file_system, paths| + total_required = required_free_bytes.fetch(file_system, 0) + current_usage = current_usage_bytes.fetch(file_system, 0) + additional_needed = total_required - current_usage + fs_path = Paths.get(paths.first.first) + free_space = Files.getFileStore(fs_path).getUsableSpace + + message_parts.concat([ + "Filesystem '#{file_system}':", + "- Total space required: #{LogStash::Util::ByteValue.human_readable(total_required)}", + "- Currently free space: #{LogStash::Util::ByteValue.human_readable(free_space)}", + "- Current PQ usage: #{LogStash::Util::ByteValue.human_readable(current_usage)}", + "- Additional space needed: #{LogStash::Util::ByteValue.human_readable(additional_needed)}", + "", + "Individual queue requirements:", + *paths.map { |path, _| + used = get_page_size(::File.join(path, "page.*")) + [ + " #{path}:", + " Current size: #{LogStash::Util::ByteValue.human_readable(used)}", + " Maximum size: #{LogStash::Util::ByteValue.human_readable(total_required / paths.size)}" + ] + }.flatten, + "" # Empty line between filesystems + ]) + end - warn_msg << disk_warn_msg unless disk_warn_msg.empty? + # Add common footer + message_parts.concat([ + "Please either:", + "1. Free up disk space", + "2. Reduce queue.max_bytes in your pipeline configurations", + "3. Move PQ storage to a filesystem with more available space", + "Note: Logstash may fail to start if this is not resolved.", + "" + ]) + + warn_msg << message_parts.join("\n") end def get_file_system(queue_path) diff --git a/logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb b/logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb index 9baf9dfebea..98a3110153d 100644 --- a/logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb +++ b/logstash-core/spec/logstash/persisted_queue_config_validator_spec.rb @@ -80,24 +80,173 @@ end context("disk does not have sufficient space") do - # two pq with different paths - let(:settings1) { settings.dup.merge("queue.max_bytes" => "1000pb") } - let(:settings2) { settings1.dup.merge("path.queue" => Stud::Temporary.directory) } + let(:pipeline_id) { "main" } + # Create a pipeline config double that matches what the class expects + let(:pipeline_config) do + double("PipelineConfig").tap do |config| + allow(config).to receive(:pipeline_id).and_return(pipeline_id) + allow(config).to receive(:settings).and_return( + double("Settings").tap do |s| + allow(s).to receive(:get).with("queue.type").and_return("persisted") + allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) # 300GB + allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) # 64MB + allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id) + allow(s).to receive(:get).with("path.queue").and_return(queue_path) + end + ) + end + end + + before do + allow(Dir).to receive(:glob).and_return(["page.1"]) + allow(File).to receive(:size).and_return(25 * 1024 * 1024 * 1024) + allow(FsUtil).to receive(:hasFreeSpace).and_return(false) + allow(Files).to receive(:exists).and_return(true) - let(:pipeline_configs) do - LogStash::Config::Source::Local.new(settings1).pipeline_configs + - LogStash::Config::Source::Local.new(settings2).pipeline_configs + # Mock filesystem + mock_file_store = double("FileStore", + name: "disk1", + getUsableSpace: 100 * 1024 * 1024 * 1024 # 100GB free + ) + allow(Files).to receive(:getFileStore).and_return(mock_file_store) end - it "should throw" do + it "reports detailed space information" do expect(pq_config_validator).to receive(:check_disk_space) do |_, _, required_free_bytes| expect(required_free_bytes.size).to eq(1) - expect(required_free_bytes.values[0]).to eq(1024**5 * 1000 * 2) # require 2000pb + expect(required_free_bytes.values[0]).to eq(300 * 1024 * 1024 * 1024) end.and_call_original - expect(pq_config_validator.logger).to receive(:warn).once.with(/won't fit in file system/) + expect(pq_config_validator.logger).to receive(:warn).once do |msg| + expect(msg).to include("Total space required: 300gb") + expect(msg).to include("Current PQ usage: 25gb") + end - pq_config_validator.check({}, pipeline_configs) + pq_config_validator.check({}, [pipeline_config]) + end + + context "with multiple pipelines" do + let(:pipeline_id1) { "main" } + let(:pipeline_id2) { "secondary" } + let(:pipeline_id3) { "third" } + + let(:base_queue_path) { queue_path } + let(:queue_path1) { ::File.join(base_queue_path, pipeline_id1) } + let(:queue_path2) { ::File.join(base_queue_path, pipeline_id2) } + let(:queue_path3) { ::File.join(Stud::Temporary.directory, pipeline_id3) } + + let(:pipeline_config1) do + double("PipelineConfig").tap do |config| + allow(config).to receive(:pipeline_id).and_return(pipeline_id1) + allow(config).to receive(:settings).and_return( + double("Settings").tap do |s| + allow(s).to receive(:get).with("queue.type").and_return("persisted") + allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) + allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) + allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id1) + allow(s).to receive(:get).with("path.queue").and_return(base_queue_path) + end + ) + end + end + + let(:pipeline_config2) do + double("PipelineConfig").tap do |config| + allow(config).to receive(:pipeline_id).and_return(pipeline_id2) + allow(config).to receive(:settings).and_return( + double("Settings").tap do |s| + allow(s).to receive(:get).with("queue.type").and_return("persisted") + allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) + allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) + allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id2) + allow(s).to receive(:get).with("path.queue").and_return(base_queue_path) + end + ) + end + end + + let(:pipeline_config3) do + double("PipelineConfig").tap do |config| + allow(config).to receive(:pipeline_id).and_return(pipeline_id3) + allow(config).to receive(:settings).and_return( + double("Settings").tap do |s| + allow(s).to receive(:get).with("queue.type").and_return("persisted") + allow(s).to receive(:get).with("queue.max_bytes").and_return(300 * 1024 * 1024 * 1024) + allow(s).to receive(:get).with("queue.page_capacity").and_return(64 * 1024 * 1024) + allow(s).to receive(:get).with("pipeline.id").and_return(pipeline_id3) + allow(s).to receive(:get).with("path.queue").and_return(::File.dirname(queue_path3)) + end + ) + end + end + + let(:mock_file_store1) { double("FileStore", name: "disk1", getUsableSpace: 100 * 1024 * 1024 * 1024) } + let(:mock_file_store2) { double("FileStore", name: "disk2", getUsableSpace: 50 * 1024 * 1024 * 1024) } + + before do + # Precise path matching for Dir.glob + allow(Dir).to receive(:glob) do |pattern| + case pattern + when /#{pipeline_id1}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"] + when /#{pipeline_id2}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1", "#{::File.dirname(pattern)}/page.2"] + when /#{pipeline_id3}.*page\.*/ then ["#{::File.dirname(pattern)}/page.1"] + else [] + end + end + + # Set up file size matching with full paths + allow(File).to receive(:size) do |path| + case + when path.include?(pipeline_id1) then 30 * 1024 * 1024 * 1024 # 30GB for main + when path.include?(pipeline_id2) then 25 * 1024 * 1024 * 1024 # 25GB for secondary + when path.include?(pipeline_id3) then 25 * 1024 * 1024 * 1024 # 25GB for third + else 0 + end + end + + allow(Files).to receive(:getFileStore) do |path| + case path.toString + when /#{pipeline_id3}/ then mock_file_store2 + else mock_file_store1 + end + end + + allow(FsUtil).to receive(:hasFreeSpace).and_return(false) + allow(Files).to receive(:exists).and_return(true) + end + + context "with multiple queues on same filesystem" do + it "reports consolidated information for same filesystem" do + expect(pq_config_validator.logger).to receive(:warn).once do |msg| + expect(msg).to match(/Persistent queues require more disk space than is available on a filesystem:/) + expect(msg).to match(/Filesystem 'disk1':/) + expect(msg).to match(/Total space required: 600gb/) # 300GB * 2 + expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB) + expect(msg).to match(/Current size: 30gb/) # First queue + expect(msg).to match(/Current size: 50gb/) # Second queue (2 files * 25GB) + end + + pq_config_validator.check({}, [pipeline_config1, pipeline_config2]) + end + end + + context "with queues across multiple filesystems" do + it "reports separate information for each filesystem" do + expect(pq_config_validator.logger).to receive(:warn).once do |msg| + # First filesystem + expect(msg).to match(/Filesystem 'disk1':/) + expect(msg).to match(/Total space required: 600gb/) # 300GB * 2 + expect(msg).to match(/Current PQ usage: 80gb/) # 30GB + (2 * 25GB) + + # Second filesystem + expect(msg).to match(/Filesystem 'disk2':/) + expect(msg).to match(/Total space required: 300gb/) # 300GB + expect(msg).to match(/Current PQ usage: 25gb/) # 25GB + end + + pq_config_validator.check({}, [pipeline_config1, pipeline_config2, pipeline_config3]) + end + end end end