Skip to content

Commit

Permalink
KafkaSinkCluster improve send error messages (#1751)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Sep 17, 2024
1 parent 4cfa26c commit a39c43c
Showing 1 changed file with 11 additions and 4 deletions.
15 changes: 11 additions & 4 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,9 @@ impl Transform for KafkaSinkCluster {
self.route_requests(std::mem::take(&mut chain_state.requests))
.await
.context("Failed to route requests")?;
self.send_requests().await?;
self.send_requests()
.await
.context("Failed to send requests")?;
self.recv_responses()
.await
.context("Failed to receive responses")?
Expand Down Expand Up @@ -1108,11 +1110,14 @@ impl KafkaSinkCluster {
body: RequestBody::FindCoordinator(
FindCoordinatorRequest::default()
.with_key_type(0)
.with_key(group.0),
.with_key(group.0.clone()),
),
}));

let mut response = self.control_send_receive(request).await?;
let mut response = self
.control_send_receive(request)
.await
.with_context(|| format!("Failed to query for coordinator of group {:?}", group.0))?;
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(coordinator),
Expand Down Expand Up @@ -1162,7 +1167,9 @@ impl KafkaSinkCluster {
),
}));

self.control_send_receive(request).await
self.control_send_receive(request)
.await
.context("Failed to query metadata of topics")
}

/// Convert all PendingRequestTy::Routed into PendingRequestTy::Sent
Expand Down

0 comments on commit a39c43c

Please sign in to comment.