-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962
SolaceIO.Read: handle occasional cases when finalizeCheckpoint is not executed #32962
Conversation
cc @ppawel - since I wasn't able to reproduce the issue you reported, would you mind testing it in your environment? |
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment |
Sorry for late reply, I haven't had much time recently for this topic, but last week I set up a streaming job in our test environment with your branch, and just left it running consuming same messages as the prod job. This job had the Google Streaming Engine enabled. Today I checked the job and unfortunately there was over 2k messages spooled in the Solace queue. I haven't investigated deeper what is exactly the behavior of checkpointing with the new branch, hopefully I will get more time again for this as we really want to fully switch from JmsIO to SolaceIO, and preferably with Streaming Engine enabled. |
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/UnboundedSolaceReader.java
Outdated
Show resolved
Hide resolved
// It's possible for a checkpoint to be taken but never finalized. | ||
// So we simply copy whatever safeToAckIds we currently have. | ||
Map<Long, BytesXMLMessage> snapshotSafeToAckMessages = Maps.newHashMap(safeToAckMessages); | ||
return new SolaceCheckpointMark(this::markAsAcked, active, snapshotSafeToAckMessages); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The checkpoint mark does not need to be aware of whether the reader is or isn't active. The call to close may be observed by the finalizing thread to first close the underlying reader before atomically setting the active flag to false. Put a try/catch statement around the acknowledgement loop in the checkpoint mark's finalizer, if it trips on IllegalStateException
, then the reader has been closed and none of the queued elements can be acknowledged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, but I added the try catch only around the ackMessage
method call. I think it could fail for other reasons, than a closed reader.
sdks/java/io/solace/src/main/java/org/apache/beam/sdk/io/solace/read/SolaceCheckpointMark.java
Outdated
Show resolved
Hide resolved
42a0cbd
to
740646d
Compare
…tion strategy and close services more eagerly.
740646d
to
da95027
Compare
…cBool from the reader.
fa98b30
to
48520ea
Compare
I believe that messages are left unread in the queue because some sessions aren't closed correctly. Solace documentation says that dropped sessions have their unacknowledged messages redelivered. |
In the meantime, while we wait for a review, @ppawel would you be able to test this change? |
@bzablocki I think this can be simplified by storing the message handles for the last emitted checkpoint mark in the reader and having both the reader and the checkpoint mark attempt to acknowledge those messages. The reader would attempt to do this at some point in |
Done. We can consider delegating the ack to an async thread, but that can be added in a future PR. |
036dd62
to
2d48c22
Compare
.../solace/src/main/java/org/apache/beam/sdk/io/solace/broker/BasicAuthJcsmpSessionService.java
Show resolved
Hide resolved
static { | ||
Duration cacheExpirationTimeout = Duration.ofMinutes(1); | ||
sessionServiceCache = | ||
CacheBuilder.newBuilder() | ||
.expireAfterAccess(cacheExpirationTimeout) | ||
.removalListener( | ||
(RemovalNotification<UUID, SessionService> notification) -> { | ||
LOG.info( | ||
"SolaceIO.Read: Closing session for the reader with uuid {} as it has been idle for over {}.", | ||
notification.getKey(), | ||
cacheExpirationTimeout); | ||
SessionService sessionService = notification.getValue(); | ||
if (sessionService != null) { | ||
sessionService.close(); | ||
} | ||
}) | ||
.build(); | ||
|
||
startCleanUpThread(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this cache necessary still now that message acks are attempted whenever possible? Binding the lifetime of the session to a reader should be fine since a reader's lifetime is generally quite long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem I have is that I can't reproduce the issue @ppawel was reporting, so I thought I would try this approach in conjunction with acking messages whenever possible.
Also, without this the session was only closed in the UnboundedSolaceReader#close
method, which was never called in my experiments - It was not called when I drained or updated a pipeline, or when returned false
in the advance()
method. That's why I implemented this caching with removal listener to more eagerly close any open sessions.
I will test it some time this week in our environment and come back with feedback, thanks for working on this. |
@bzablocki We have this PR running in our test environment and so far looks suspiciously good ;) What we tested so far is:
So overall it looks good, we will definitely keep testing this version, especially points (1) and (2) we need to make sure those are stable before we replace JmsIO. The only thing I have so far is these logs right at the startup yesterday: I'm not sure if it's by design but just seems strange that there are messages in the queue (see above point 1) when this new pipeline starts but for some reason the connections to Solace need to be reestablished after 1 minute of idle time for it to start consuming. |
Thank you @ppawel for testing, great to hear it's looking good! Would you say it's ready to merge?
If it's only at the startup then it is probably because it takes some time for from creating an UnboundedSolaceReader (when a client is created for the first time) to the client being actually used. If it takes more than 1 minute (hard-coded max idle time), it will be closed, removed and recreated when needed. I think this is acceptable. |
I haven't had time to look at the code, I'm also not a Beam internals expert so hard to say but as a user it looks good so far. We will be running it through the weekend at least, as there is the biggest peak in messages on Saturdays. Up to you if you want/can wait until next week for more test reports, alternative is that you merge it and I can open a separate issue if anything comes up from our tests. Just a question regarding releases - is it fair to assume that it's going to be included in 2.62.0? |
It will be available in 2.62.0, as long as we merge it before ~Dec 18. Let's see how it behaves over the weekend and if everything is fine we can merge it next week. |
assign set of reviewers |
Assigning reviewers. If you would like to opt out of this review, comment R: @Abacn for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
Our tests look quite good after the weekend. We are now considering rolling out SolaceIO to prod ASAP due to additional new issues with JmsIO... do you think it is safe to apply your PR patch on top of Beam 2.61.0 (as we don't want to roll out a 2.62 snapshot version to prod)? |
LGTM regarding io. Thanks! |
I'm afraid this introduced flakiness in the integration test and potentially errors in the reader. I will revert this commit and work on a fix asap. FYI, the integration tests sometimes crash on this line, as the client can be closed while it reads from the broker. |
…t is not executed (apache#32962)" This reverts commit e279e55.
Sorry for that, false alarm, but better safe than sorry. The flakiness was introduced in another PR that I was working on. This looks good. I'll create a new PR to reintroduce these changes. |
…nt is not executed (apache#32962)" (apache#33259) This reverts commit 4356253.
Addresses #32596
This PR fixes a bug where messages get stuck on the Solace queue because the checkpoint's finalize method, responsible for acknowledging message processing, isn't always called.
Revisited approach (deprecated approach below):
receivedMessages
list.getCheckpointMark()
method, all thereceivedMessages
are moved to thesafeToAckMessages
list. ThesafeToAckList
is passed as a reference to the CheckpointMark object.CheckpointMark
object created from a given Reader has a reference to the samesafeToAckMessages
list. On thefinalizeCheckpoint()
method call, it will acknowledge all messages available for acknowledgement.safeToAckMessages
happens also in theadvance()
method and theclose()
. This means that whenever thegetCheckpointMark()
method marks messages as ready for checkpoint, they can be acknowledged in either of 3 places:finalizeCheckpoint()
,advance()
andclose()
.Deprecated approach:
The approach is similar to the one in PubSubIO. The flow there is the following:
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.