Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

azservicebus Receiver can improperly hold onto message indefinitely #23893

Open
patrickwhite256 opened this issue Dec 27, 2024 · 3 comments
Open
Assignees
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. Service Bus

Comments

@patrickwhite256
Copy link

patrickwhite256 commented Dec 27, 2024

Bug Report

  • Import Path: github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus
  • SDK version: tested using 63cbd1af5450e56dbdd824fe82bd4af4835d8e40
  • go version: go version go1.23.1 darwin/arm64

What happened?

I was observing that occasionally, when receiving messages, a small percentage of messages would occasionally be in "limbo": the SDK had not returned them to the caller, but they could not be received as servicebus reported them as having been received. After the message delivery lock expired, the server would redeliver them and they would be successfully received. I suspected that the SDK was receiving them but not correctly returning them to the client.

What did you expect or want to happen

I expect that all messages received by the SDK are returned to the client in a timely fashion.

How can we reproduce it?

My reproduction has two components:

  • a program that emits 3000 messages onto a topic (which are filtered onto a specific subscription)
  • a program that receives messages, starting a timer when the first message is received, and waiting until five seconds have passed with no messages, outputs the number of messages received, then completes (acknowledges) all the messages. This program receives messages in a loop as recommended.
Producer Code
package main

import (
        "context"
        "fmt"
        "log"

        "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
        "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
        "golang.org/x/sync/errgroup"
)

const (
        sbNamespace = "<OMITTED>" // requires a namespace
        sbTopicName = "<OMITTED>" // requires a topic

        messageCount = 3000
)

func createSender() *azservicebus.Sender {
        azCred, err := azidentity.NewDefaultAzureCredential(nil)
        if err != nil {
                log.Fatalf("failed to create AZ identity: %v", err)
        }

        busFQDN := fmt.Sprintf("%s.servicebus.windows.net", sbNamespace)
        busClient, err := azservicebus.NewClient(busFQDN, azCred, nil)
        if err != nil {
                log.Fatalf("failed to create bus client: %v", err)
        }

        sender, err := busClient.NewSender(sbTopicName, nil)
        if err != nil {
                log.Fatalf("failed to init sender: %v", err)
        }

        return sender
}

func main() {
        sender := createSender()

        log.Printf("Initialized service bus, publishing messages...")

        eg, ctx := errgroup.WithContext(context.Background())
        for range messageCount {
                eg.Go(func() error {
                        return sender.SendMessage(ctx, &azservicebus.Message{
                                Body:                  []byte(`{"message": "hello"}`),
                                ApplicationProperties: map[string]any{"my_filter": "my_filter_value"},
                        }, nil)
                })
        }
        if err := eg.Wait(); err != nil {
                log.Fatalf("failed to send message: %v", err)
        }

        log.Printf("Done!")
}
Consumer Code
package main

import (
        "context"
        "fmt"
        "log"
        "sync"
        "time"

        azlog "github.com/Azure/azure-sdk-for-go/sdk/azcore/log"
        "github.com/Azure/azure-sdk-for-go/sdk/azidentity"
        "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

const (
        sbNamespace        = "<OMITTED>" // requires a namespace
        sbTopicName        = "<OMITTED>" // requires a topic
        sbSubscriptionName = "<OMITTED>" // requires a subscription

        receiveBatchSize = 100
)

func enableAzureVerboseLogging() {
        azlog.SetListener(func(event azlog.Event, s string) {
                fmt.Printf("[%s] %s\n", event, s)
        })

        azlog.SetEvents(
                azservicebus.EventConn,
                azservicebus.EventAuth,
                azservicebus.EventReceiver,
                azservicebus.EventSender,
                azservicebus.EventAdmin,
        )
}

// receives messages and collects them until five seconds have passed with no messages, then ships that batch off to batchChan
func batchMessages(msgChan chan *azservicebus.ReceivedMessage, batchChan chan []*azservicebus.ReceivedMessage) {
        batch := []*azservicebus.ReceivedMessage{}
        timer := time.NewTimer(5 * time.Second)
        timer.Stop()
        for {
                select {
                case <-timer.C:
                        log.Printf("No messages received for 5 seconds; received %d messages", len(batch))
                        batchChan <- batch
                        batch = []*azservicebus.ReceivedMessage{}
                case msg, ok := <-msgChan:
                        if !ok {
                                return
                        }
                        if len(batch) == 0 {
                                log.Printf("Started receiving message batch...")
                        }
                        batch = append(batch, msg)

                        timer.Stop()
                        timer.Reset(5 * time.Second)
                }
        }
}

// receive messages from servicebus and push them onto msgChan.
func receiveMessages(msgChan chan *azservicebus.ReceivedMessage, receiver *azservicebus.Receiver) {
        for {
                messages, err := receiver.ReceiveMessages(context.Background(), receiveBatchSize, nil)
                if err != nil {
                        log.Printf("error receiving SB messages: %v", err)
                        return
                }

                for _, message := range messages {
                        msgChan <- message
                }
        }
}

// acks messages in batches.
func ackMessages(batchChan chan []*azservicebus.ReceivedMessage, receiver *azservicebus.Receiver) {
        for batch := range batchChan {
                messageCount := len(batch)
                log.Printf("starting to ack %d messages", messageCount)
                wg := sync.WaitGroup{}
                wg.Add(messageCount)
                for _, msg := range batch {
                        go func() {
                                if err := receiver.CompleteMessage(context.Background(), msg, nil); err != nil {
                                        log.Printf("failed to ack msg: %v", err)
                                }
                                wg.Done()
                        }()
                }
                wg.Wait()
                log.Printf("finished acking messages")
        }
}

func createReceiver() *azservicebus.Receiver {
        azCred, err := azidentity.NewDefaultAzureCredential(nil)
        if err != nil {
                log.Fatalf("failed to create AZ identity: %v", err)
        }

        busFQDN := fmt.Sprintf("%s.servicebus.windows.net", sbNamespace)
        busClient, err := azservicebus.NewClient(busFQDN, azCred, nil)
        if err != nil {
                log.Fatalf("failed to create bus client: %v", err)
        }

        receiver, err := busClient.NewReceiverForSubscription(sbTopicName, sbSubscriptionName, nil)
        if err != nil {
                log.Fatalf("failed to init receiver: %v", err)
        }

        return receiver
}

func main() {
        receiver := createReceiver()

        log.Printf("Initialized service bus, receiving messages...")

        // enableAzureVerboseLogging()

        msgChan := make(chan *azservicebus.ReceivedMessage)
        batchChan := make(chan []*azservicebus.ReceivedMessage)

        go receiveMessages(msgChan, receiver)
        go batchMessages(msgChan, batchChan)
        ackMessages(batchChan, receiver)
}

It's worth noting that in my testing, the behaviour appears inconsistently, maybe 20-30% of the time.

Running the consumer and running the producer separately (once the consumer had finished processing the previous batch) produced the following output in one test:

2024/12/27 11:50:09 Initialized service bus, receiving messages...
2024/12/27 11:50:17 Started receiving message batch...
2024/12/27 11:50:27 No messages received for 5 seconds; received 3000 messages
2024/12/27 11:50:27 starting to ack 3000 messages
2024/12/27 11:50:28 finished acking messages
2024/12/27 11:51:30 Started receiving message batch...
2024/12/27 11:51:40 No messages received for 5 seconds; received 3000 messages
2024/12/27 11:51:40 starting to ack 3000 messages
2024/12/27 11:51:41 finished acking messages
2024/12/27 11:51:45 Started receiving message batch...
2024/12/27 11:51:56 No messages received for 5 seconds; received 2998 messages
2024/12/27 11:51:56 starting to ack 2998 messages
2024/12/27 11:51:56 finished acking messages
2024/12/27 11:52:50 Started receiving message batch...
2024/12/27 11:52:56 No messages received for 5 seconds; received 2 messages
2024/12/27 11:52:56 starting to ack 2 messages
2024/12/27 11:52:56 finished acking messages

You can see that this usually works as expected: the full batch of 3000 is received and acknowledged. However, on the third batch, only 2998 messages were received, and they weren't received until a minute later, when the message lock expires and the server redelivers the messages.

Why does this happen?

I created a local copy of the SDK and started investigating. I believe this error is due to an error in newReleaserFunc:

ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
released := 0
// this func gets called when a new ReceiveMessages() starts
r.cancelReleaser.Store(func() string {
cancel()
<-done
return receiver.LinkName()
})
return func() {
defer close(done)
for {
// we might not have all the messages we need here.
msg, err := receiver.Receive(ctx, nil)
if err == nil {
err = receiver.ReleaseMessage(ctx, msg)
}

The message is pulled from the internal Receiver's queue successfully, but the context is cancelled by the next call to ReceiveMessages() after the message is received, so the ReleaseMessage() call fails with a context canceled error, and the message is never released.

I confirmed this by adding the following after that call:

if err != nil {
    log.Printf("tried to release message, got err: %v", err)
}

I then observed the following output:

2024/12/27 12:15:13 Initialized service bus, receiving messages...
2024/12/27 12:15:21 Started receiving message batch...
2024/12/27 12:15:29 No messages received for 5 seconds; received 3000 messages
2024/12/27 12:15:29 starting to ack 3000 messages
2024/12/27 12:15:30 finished acking messages
2024/12/27 12:15:35 Started receiving message batch...
2024/12/27 12:15:46 No messages received for 5 seconds; received 3000 messages
2024/12/27 12:15:46 starting to ack 3000 messages
2024/12/27 12:15:46 finished acking messages
2024/12/27 12:15:57 Started receiving message batch...
2024/12/27 12:16:01 tried to release message, got err: context canceled
2024/12/27 12:16:07 No messages received for 5 seconds; received 2999 messages
2024/12/27 12:16:07 starting to ack 2999 messages
2024/12/27 12:16:08 finished acking messages

I'm not sure what the fix for this is - perhaps the cancelReleaser function should return an additional (optional) value of a message that was received but unsuccessfully deleted? I tried the simple fix of replacing the ReleaseMessages() context with context.Background() (using a different error so the error doesn't bleed into the err that controls exiting the function) but unsurprisingly that made the receive loop quite slow as the release couldn't be cancelled.

@github-actions github-actions bot added Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. Service Bus labels Dec 27, 2024
Copy link

Thanks for the feedback! We are routing this to the appropriate team for follow-up. cc @EldertGrootenboer.

@richardpark-msft
Copy link
Member

@patrickwhite256, thank you for the detailed bug report! I'll take a look - your reasoning looks sound to me. I'll figure out a way to wrangle this - it shouldn't be too difficult.

@richardpark-msft
Copy link
Member

@karenychen, this might be related to the bug you were seeing as well. The message won't be held indefinitely, but it will be forced to go through the entire lock expiration interval before it'd be available again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Client This issue points to a problem in the data-plane of the library. customer-reported Issues that are reported by GitHub users external to the Azure organization. needs-team-attention Workflow: This issue needs attention from Azure service team or SDK team question The issue doesn't require a change to the product in order to be resolved. Most issues start as that Service Attention Workflow: This issue is responsible by Azure service team. Service Bus
Projects
None yet
Development

No branches or pull requests

2 participants