Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Commit after all connected multicasted observables are finished processing a record #42

Open
yeenow123 opened this issue Oct 12, 2018 · 4 comments

Comments

@yeenow123
Copy link

yeenow123 commented Oct 12, 2018

I am creating multiple observables (per Kafka topic)

val kafkaConsumerConfig = KafkaConsumerConfig.default.copy(
      bootstrapServers = kConfig.bootstrapServers,
      groupId = kConfig.groupId,
      enableAutoCommit = false,
      observableCommitOrder = ObservableCommitOrder.AfterAck,
      observableCommitType = ObservableCommitType.Sync,
      autoOffsetReset = AutoOffsetReset.Latest
    )

val observable = Observable.merge(topics.map {
    provider => KafkaStream
      .creatConsumer(provider)
  } : _*)

val multiCast = observable.multicast(Pipe.publish[ConsumerRecord[A, B]])

... // multiple subscribers subscribe separately to the multiCast

multicast.connect()

It doesn't seem like the commit back to Kafka is issued after all subscribers are finished processing (I am using a combination of .subscribe() and .foreach to subscribe. It seems to commit after any of them complete. Is this possible or am I doing anything incorrectly?

@yeenow123
Copy link
Author

yeenow123 commented Oct 12, 2018

The issue seems to stem from Observable.merge. I tried Observable.concat and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.

EDIT: Spoke too soon.. when I use .concat only one one of the observables seem to actually start up

@Avasil
Copy link
Collaborator

Avasil commented Oct 12, 2018

@yeenow123

The issue seems to stem from Observable.merge. I tried Observable.concat and it seems to give me the functionality I need. I guess the ordering semantics between the two operators are different, one being unorder vs ordered.

That's correct, concat will wait for the first one to complete before starting the second one and merge will send elements as they come with time.

How do you commit / use kafkaConsumerConfig ? I dont recognize KafkaStream.creatConsumer(provider)

@yeenow123
Copy link
Author

@Avasil

I copied partial code and made some typos changing names. The kafkaConsumerConfig is passed into the KafkaConsumerObservable constructor.

As with my EDIT on my previous comment, concat only seems to start the first observable from the list of observables passed into concat

@Avasil
Copy link
Collaborator

Avasil commented Oct 12, 2018

Probably the first Obsevable never ends, I'll see if I can reproduce your issue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants