Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster split DeleteGroups request #1785

Merged
merged 1 commit into from
Oct 28, 2024

Conversation

rukai
Copy link
Member

@rukai rukai commented Oct 27, 2024

Testing

The integration test demonstrates an issue without the code changes, but only in the java driver and the test still passes, you have to inspect the messages to see there is in issue.

The java driver will actually retry failed portions of the request, leading to the following exchange:

shotover   23:27:06.525234Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Request: Kafka version:2 correlation_id:4 client_id:"adminclient-3" DeleteGroups(DeleteGroupsRequest { groups_names: ["consumer_group_with_offsets", "some_group", "some_group1"], unknown_tagged_fields: {} })
shotover   23:27:06.529449Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:2 correlation_id:4 DeleteGroups(DeleteGroupsResponse { throttle_time_ms: 0, results: [DeletableGroupResult { group_id: "some_group", error_code: 0, unknown_tagged_fields: {} }, DeletableGroupResult { group_id: "consumer_group_with_offsets", error_code: 0, unknown_tagged_fields: {} }, DeletableGroupResult { group_id: "some_group1", error_code: 16, unknown_tagged_fields: {} }], unknown_tagged_fields: {} })
shotover   23:27:06.530011Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Request: Kafka version:4 correlation_id:5 client_id:"adminclient-3" FindCoordinator(FindCoordinatorRequest { key: "", key_type: 0, coordinator_keys: ["some_group1"], unknown_tagged_fields: {} })
shotover   23:27:06.530946Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:4 correlation_id:5 FindCoordinator(FindCoordinatorResponse { throttle_time_ms: 0, error_code: 0, error_message: Some(""), node_id: 0, host: "", port: 0, coordinators: [Coordinator { key: "some_group1", node_id: 0, host: "127.0.0.1", port: 9192, error_code: 0, error_message: Some(""), unknown_tagged_fields: {} }], unknown_tagged_fields: {} })
shotover   23:27:06.617472Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Request: Kafka version:2 correlation_id:6 client_id:"adminclient-3" DeleteGroups(DeleteGroupsRequest { groups_names: ["some_group1"], unknown_tagged_fields: {} })
shotover   23:27:06.621017Z  INFO connection{id=112 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:2 correlation_id:6 DeleteGroups(DeleteGroupsResponse { throttle_time_ms: 0, results: [DeletableGroupResult { group_id: "some_group1", error_code: 0, unknown_tagged_fields: {} }], unknown_tagged_fields: {} })

While the operation succeeds eventually and the integration test still passes, this is still an issue since all the retries:

  • increase load on shotover and kafka
  • increase the overall latency of the operation
  • some drivers might not handle this case as well as the java driver.

The cpp driver on the other hand does not reproduce the issue at all.
Instead it just sends each group id to delete in its own unique request, avoiding the need for shotover to split it at all.

shotover   23:37:41.972850Z  INFO connection{id=64 source="kafka"}: shotover::transforms::debug::printer: Request: Kafka version:1 correlation_id:7 client_id:"rdkafka" DeleteGroups(DeleteGroupsRequest { groups_names: ["some_group1"], unknown_tagged_fields: {} })
shotover   23:37:41.977434Z  INFO connection{id=64 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:1 correlation_id:6 DeleteGroups(DeleteGroupsResponse { throttle_time_ms: 0, results: [DeletableGroupResult { group_id: "some_group", error_code: 0, unknown_tagged_fields: {} }], unknown_tagged_fields: {} })
shotover   23:37:41.977678Z  INFO connection{id=64 source="kafka"}: shotover::transforms::debug::printer: Request: Kafka version:1 correlation_id:8 client_id:"rdkafka" DeleteGroups(DeleteGroupsRequest { groups_names: ["consumer_group_with_offsets"], unknown_tagged_fields: {} })
shotover   23:37:41.978788Z  INFO connection{id=64 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:1 correlation_id:7 DeleteGroups(DeleteGroupsResponse { throttle_time_ms: 0, results: [DeletableGroupResult { group_id: "some_group1", error_code: 0, unknown_tagged_fields: {} }], unknown_tagged_fields: {} })
shotover   23:37:41.978964Z  INFO connection{id=64 source="kafka"}: shotover::transforms::debug::printer: Response: Kafka version:1 correlation_id:8 DeleteGroups(DeleteGroupsResponse { throttle_time_ms: 0, results: [DeletableGroupResult { group_id: "consumer_group_with_offsets", error_code: 0, unknown_tagged_fields: {} }], unknown_tagged_fields: {} })

Implementation

Since the DeleteGroups message is very simple, the split/combine implementation is much simpler than for message types like fetch and produce.

We just split the list of group ids to delete across different requests by their destination broker.
And then recombine all the responses into a single response

Misc fix in connection/kafka/cpp.rs

A bug slipped through in #1784 where the cpp delete_groups method had a hardcoded list of groups to delete. The fix was to simply use the to_delete list argument instead.

@rukai rukai force-pushed the delete_groups_split branch from 54e7324 to ad7b120 Compare October 27, 2024 23:40
Copy link

codspeed-hq bot commented Oct 27, 2024

CodSpeed Performance Report

Merging #1785 will not alter performance

Comparing rukai:delete_groups_split (cf7bf08) with main (2e91cfb)

Summary

✅ 38 untouched benchmarks

@rukai rukai force-pushed the delete_groups_split branch from ad7b120 to cf7bf08 Compare October 28, 2024 00:10
@rukai rukai marked this pull request as ready for review October 28, 2024 00:11
@rukai rukai merged commit 1b52506 into shotover:main Oct 28, 2024
41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants