From 8729ad0a64d1eeef99156ebeb9b9752d352de751 Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 3 Dec 2024 12:04:51 +0100 Subject: [PATCH 1/5] rescue if lavinmq starts with faulty msg_store --- src/lavinmq/amqp/queue/message_store.cr | 6 +++++- src/lavinmq/amqp/queue/queue.cr | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index 62c0d28ca..16277204f 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -347,6 +347,9 @@ module LavinMQ @segments.delete seg next end + rescue ex + Log.error { "Closing message store: invalid SchemaVersion in #{path}" } + close end end file.pos = 4 @@ -377,7 +380,8 @@ module LavinMQ rescue ex : IO::EOFError break rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode - raise Error.new(mfile, cause: ex) + Log.error { "Closing message store: Failed to read segment #{seg} at pos #{mfile.pos}, #{ex}" } + close end mfile.pos = 4 mfile.unmap # will be mmap on demand diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index fc1d22161..20319c240 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -802,6 +802,7 @@ module LavinMQ::AMQP expire_msg(sp, :rejected) end rescue ex : MessageStore::Error + @log.error(ex) { "Queue closed due to error" } close raise ex end From fde53329fd19c3f58769ec42118da2af66bb67cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christina=20Dahl=C3=A9n?= <85930202+kickster97@users.noreply.github.com> Date: Wed, 4 Dec 2024 12:08:40 +0100 Subject: [PATCH 2/5] Update src/lavinmq/amqp/queue/message_store.cr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/message_store.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index 16277204f..e8ac62496 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -348,7 +348,7 @@ module LavinMQ next end rescue ex - Log.error { "Closing message store: invalid SchemaVersion in #{path}" } + @log.error { "Closing message store: invalid SchemaVersion in #{path}" } close end end From 2c208673fe3eec2100b308145473a3dd573ffe19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christina=20Dahl=C3=A9n?= <85930202+kickster97@users.noreply.github.com> Date: Wed, 4 Dec 2024 12:08:46 +0100 Subject: [PATCH 3/5] Update src/lavinmq/amqp/queue/message_store.cr MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson --- src/lavinmq/amqp/queue/message_store.cr | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index e8ac62496..8e9bfe7d5 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -380,7 +380,7 @@ module LavinMQ rescue ex : IO::EOFError break rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode - Log.error { "Closing message store: Failed to read segment #{seg} at pos #{mfile.pos}, #{ex}" } + @log.error { "Closing message store: Failed to read segment #{seg} at pos #{mfile.pos}, #{ex}" } close end mfile.pos = 4 From a5c06cbaaab79a89677865f43f0b33428c77a93d Mon Sep 17 00:00:00 2001 From: Christina Date: Wed, 4 Dec 2024 15:33:23 +0100 Subject: [PATCH 4/5] more descriptive log messages --- src/lavinmq/amqp/queue/message_store.cr | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index 8e9bfe7d5..b76d58612 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -348,7 +348,7 @@ module LavinMQ next end rescue ex - @log.error { "Closing message store: invalid SchemaVersion in #{path}" } + @log.error { "Could not initialize segment, closing message store: #{ex.message}" } close end end @@ -380,7 +380,7 @@ module LavinMQ rescue ex : IO::EOFError break rescue ex : OverflowError | AMQ::Protocol::Error::FrameDecode - @log.error { "Closing message store: Failed to read segment #{seg} at pos #{mfile.pos}, #{ex}" } + @log.error { "Could not initialize segment, closing message store: Failed to read segment #{seg} at pos #{mfile.pos}. #{ex}" } close end mfile.pos = 4 From 43fbc5bcb77dfe1bebd24135863ed6f5a917ca90 Mon Sep 17 00:00:00 2001 From: Christina Date: Fri, 6 Dec 2024 10:16:42 +0100 Subject: [PATCH 5/5] if msg store fails to init, we need to close queue on init aswell --- src/lavinmq/amqp/queue/message_store.cr | 1 + src/lavinmq/amqp/queue/queue.cr | 3 +++ 2 files changed, 4 insertions(+) diff --git a/src/lavinmq/amqp/queue/message_store.cr b/src/lavinmq/amqp/queue/message_store.cr index b76d58612..31ab06492 100644 --- a/src/lavinmq/amqp/queue/message_store.cr +++ b/src/lavinmq/amqp/queue/message_store.cr @@ -22,6 +22,7 @@ module LavinMQ @segment_msg_count = Hash(UInt32, UInt32).new(0u32) @requeued = Deque(SegmentPosition).new @closed = false + getter closed getter bytesize = 0u64 getter size = 0u32 getter empty_change = Channel(Bool).new diff --git a/src/lavinmq/amqp/queue/queue.cr b/src/lavinmq/amqp/queue/queue.cr index 20319c240..06e8bae7c 100644 --- a/src/lavinmq/amqp/queue/queue.cr +++ b/src/lavinmq/amqp/queue/queue.cr @@ -138,6 +138,9 @@ module LavinMQ::AMQP File.open(File.join(@data_dir, ".queue"), "w") { |f| f.sync = true; f.print @name } @state = QueueState::Paused if File.exists?(File.join(@data_dir, ".paused")) @msg_store = init_msg_store(@data_dir) + if @msg_store.closed + close + end @empty_change = @msg_store.empty_change handle_arguments spawn queue_expire_loop, name: "Queue#queue_expire_loop #{@vhost.name}/#{@name}" if @expires