-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enhance: implement kafka for wal #38598
enhance: implement kafka for wal #38598
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #38598 +/- ##
===========================================
+ Coverage 69.33% 81.08% +11.74%
===========================================
Files 292 1381 +1089
Lines 26187 195226 +169039
===========================================
+ Hits 18158 158307 +140149
- Misses 8029 31338 +23309
- Partials 0 5581 +5581
|
2123f40
to
97b3bd4
Compare
@chyezh E2e jenkins job failed, comment |
312b193
to
4fbc9b8
Compare
// and there's no commit opeartions. | ||
consumerConfig := cloneKafkaConfig(w.consumerConfig) | ||
consumerConfig.SetKey("group.id", opt.Name) | ||
switch opt.DeliverPolicy.GetPolicy().(type) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove unnecessary code snippets.
} | ||
|
||
// getProducerAndConsumerConfig returns the producer and consumer config. | ||
func (b *builderImpl) getProducerAndConsumerConfig() (producerConfig kafka.ConfigMap, consumerConfig kafka.ConfigMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maintain single responsibility in methods by separating them into getProducerConfig and getConsumerConfig.
} | ||
default: | ||
// ignore other events | ||
log.Info("kafka producer incoming non-message, non-error event", zap.String("event", ev.String())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use debug log level
return | ||
} | ||
if c, ok := err.(kafka.Error); ok && c.Code() == kafka.ErrTimedOut { | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should always retry when the context error is not nil and consider adding a log if the error occurs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The retry opeartion is executed by the scanner adaptor layer and consumer layer.
Same implementation as pulsar scanner.
The current scanner adaptor layer implementation is ill, will be fixed in next pr.
Signed-off-by: chyezh <[email protected]>
Signed-off-by: chyezh <[email protected]>
Signed-off-by: chyezh <[email protected]>
4fbc9b8
to
8eb9821
Compare
[APPROVALNOTIFIER] This PR is APPROVED Approval requirements bypassed by manually added approval. This pull-request has been approved by: chyezh The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/lgtm |
issue: milvus-io#38399 --------- Signed-off-by: chyezh <[email protected]>
issue: #38399