Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pulsar connector produces message asynchronously #22026

Closed
wants to merge 1 commit into from

Conversation

nlu90
Copy link
Member

@nlu90 nlu90 commented Jun 23, 2022

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@asf-ci
Copy link

asf-ci commented Jun 23, 2022

Can one of the admins verify this patch?

4 similar comments
@asf-ci
Copy link

asf-ci commented Jun 23, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 23, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 23, 2022

Can one of the admins verify this patch?

@asf-ci
Copy link

asf-ci commented Jun 23, 2022

Can one of the admins verify this patch?

@@ -45,12 +54,48 @@ public void setup() throws PulsarClientException {

@ProcessElement
public void processElement(@Element byte[] messageToSend) throws Exception {
producer.send(messageToSend);
producer.sendAsync(messageToSend)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any concern of data loss here? Typically Beam is supposed to handle any threading or async itself. I'm thinking of the case where async is delayed, and Beam thinks a given element is completed, but it hasn't actually been sent to pulsar.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the concerns here, I've seen very troublesome async writers in Beam that were prone to loosing data. I suppose in this case it depends on the behavior of producer.flush() in finishBundle(). Does it block until all pending messages are send? If so, I'd suggest to add some comments to clarify.

@mosche
Copy link
Member

mosche commented Jun 27, 2022

Wondering, is there a lack of test coverage if you could change the producer from sync to async without having to change tests?

@github-actions
Copy link
Contributor

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions.

@github-actions github-actions bot added the stale label Aug 27, 2022
@github-actions
Copy link
Contributor

github-actions bot commented Sep 5, 2022

This pull request has been closed due to lack of activity. If you think that is incorrect, or the pull request requires review, you can revive the PR at any time.

@hpvd
Copy link

hpvd commented Apr 24, 2024

added this to [Parent issue] Support for Apache Pulsar #31078

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants