Skip to content

Commit

Permalink
Merge branch 'main' into allocate_producer_ids
Browse files Browse the repository at this point in the history
  • Loading branch information
conorbros authored Nov 11, 2024
2 parents 4e3526d + 157f199 commit 3204a62
Show file tree
Hide file tree
Showing 11 changed files with 873 additions and 242 deletions.
672 changes: 466 additions & 206 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ unlicensed = "deny"
allow = [
"MIT",
"ISC",
"Unicode-3.0",
"Apache-2.0",
"BSD-3-Clause",
"BSD-2-Clause",
Expand Down
2 changes: 1 addition & 1 deletion ec2-cargo/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ clap.workspace = true
tracing-subscriber.workspace = true
aws-throwaway.workspace = true
tracing-appender.workspace = true
shellfish = { version = "0.9.0", features = ["async"] }
shellfish = { version = "0.10.0", features = ["async"] }
cargo_metadata = "0.18.0"
shell-quote.workspace = true
117 changes: 112 additions & 5 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,13 +133,59 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
async fn admin_cleanup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

admin
.delete_groups(&["some_group", "some_group1", "consumer_group_with_offsets"])
.await;
delete_records(&admin, connection_builder).await;
admin.delete_groups(&["some_group", "some_group1"]).await;
delete_records_partitions1(&admin, connection_builder).await;
delete_records_partitions3(&admin, connection_builder).await;
}

async fn delete_offsets(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;

// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// assert offset exists
let result = admin
.list_consumer_group_offsets("consumer_group_with_offsets".to_owned())
.await;
let expected_result: HashMap<_, HashMap<TopicPartition, OffsetAndMetadata>> =
HashMap::from([(
"consumer_group_with_offsets".to_owned(),
HashMap::from([(
TopicPartition {
topic_name: "partitions1_with_offset".to_owned(),
partition: 0,
},
OffsetAndMetadata { offset: 2 },
)]),
)]);
assert_eq!(result, expected_result);

// delete offset
admin
.delete_consumer_group_offsets(
"consumer_group_with_offsets".to_owned(),
&[TopicPartition {
topic_name: "partitions1_with_offset".to_owned(),
partition: 0,
}],
)
.await;

// assert offset is deleted
let result = admin
.list_consumer_group_offsets("consumer_group_with_offsets".to_owned())
.await;
let expected_result: HashMap<_, HashMap<TopicPartition, OffsetAndMetadata>> =
HashMap::from([("consumer_group_with_offsets".to_owned(), HashMap::new())]);
assert_eq!(result, expected_result);
}
}

async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnectionBuilder) {
async fn delete_records_partitions1(
admin: &KafkaAdmin,
connection_builder: &KafkaConnectionBuilder,
) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
Expand Down Expand Up @@ -183,6 +229,64 @@ async fn delete_records(admin: &KafkaAdmin, connection_builder: &KafkaConnection
}
}

async fn delete_records_partitions3(
admin: &KafkaAdmin,
connection_builder: &KafkaConnectionBuilder,
) {
// Only supported by java driver
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
// assert partition contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()])
.with_group("test_delete_records"),
)
.await;

// assert that a record exists, due to cross partition ordering we dont know what the record is, just that it exists.
consumer.consume(Duration::from_secs(30)).await;

// delete all records in the partition
admin
.delete_records(&[
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 0,
},
delete_before_offset: -1,
},
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 1,
},
delete_before_offset: -1,
},
RecordsToDelete {
topic_partition: TopicPartition {
topic_name: "partitions3_case1".to_owned(),
partition: 2,
},
delete_before_offset: -1,
},
])
.await;

// assert partition no longer contains a record
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions3_case1".to_owned()])
.with_group("test_delete_records2"),
)
.await;
consumer
.assert_no_consume_within_timeout(Duration::from_secs(2))
.await;
}
}

/// Attempt to make the driver batch produce requests for different topics into the same request
/// This is important to test since shotover has complex logic for splitting these batch requests into individual requests.
pub async fn produce_consume_multi_topic_batch(connection_builder: &KafkaConnectionBuilder) {
Expand Down Expand Up @@ -894,6 +998,9 @@ pub async fn produce_consume_commit_offsets_partitions1(
})
.await;
}

// test the admin API's offset list and delete operations
delete_offsets(connection_builder).await;
}

pub async fn produce_consume_partitions3(
Expand Down
8 changes: 4 additions & 4 deletions shotover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pretty-hex = "0.4.0"
tokio-stream = "0.1.2"
derivative = "2.1.1"
cached = { version = "0.53", features = ["async"], optional = true }
governor = { version = "0.6", default-features = false, features = ["std", "jitter", "quanta"] }
governor = { version = "0.7", default-features = false, features = ["std", "jitter", "quanta"] }
nonzero_ext = "0.3.0"
version-compare = { version = "0.2", optional = true }
rand = { features = ["small_rng"], workspace = true }
Expand All @@ -76,7 +76,7 @@ csv = { workspace = true, optional = true }
hex = { workspace = true, optional = true }
async-trait.workspace = true
typetag.workspace = true
tokio-tungstenite = "0.23.0"
tokio-tungstenite = "0.24.0"

# Error handling
thiserror = "1.0"
Expand All @@ -98,8 +98,8 @@ httparse = { version = "1.8.0", optional = true }
http = { version = "1.0.0", optional = true }

#Observability
metrics = "0.23.0"
metrics-exporter-prometheus = { version = "0.15.0", default-features = false }
metrics = "0.24.0"
metrics-exporter-prometheus = { version = "0.16.0", default-features = false }
tracing.workspace = true
tracing-subscriber.workspace = true
tracing-appender.workspace = true
Expand Down
136 changes: 125 additions & 11 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use connections::{Connections, Destination};
use dashmap::DashMap;
use kafka_node::{ConnectionFactory, KafkaAddress, KafkaNode, KafkaNodeState};
use kafka_protocol::messages::add_partitions_to_txn_request::AddPartitionsToTxnTransaction;
use kafka_protocol::messages::delete_records_request::DeleteRecordsTopic;
use kafka_protocol::messages::delete_records_response::DeleteRecordsTopicResult;
use kafka_protocol::messages::fetch_request::FetchTopic;
use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseLeaderIdAndEpoch;
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
Expand All @@ -27,14 +29,14 @@ use kafka_protocol::messages::produce_response::{
};
use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, ListTransactionsResponse, MetadataRequest,
MetadataResponse, OffsetFetchRequest, OffsetFetchResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, DeleteRecordsRequest,
DeleteRecordsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListGroupsResponse, ListOffsetsRequest, ListOffsetsResponse,
ListTransactionsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -49,8 +51,9 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
DeleteRecordsRequestSplitAndRouter, ListGroupsSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ListTransactionsSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
Expand Down Expand Up @@ -885,6 +888,12 @@ impl KafkaSinkCluster {
})) => self.split_and_route_request::<OffsetForLeaderEpochRequestSplitAndRouter>(
request,
)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::DeleteRecords(_),
..
})) => {
self.split_and_route_request::<DeleteRecordsRequestSplitAndRouter>(request)?
}

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -939,6 +948,13 @@ impl KafkaSinkCluster {
})) => {
self.split_and_route_request::<DeleteGroupsSplitAndRouter>(request)?;
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetDelete(offset_delete),
..
})) => {
let group_id = offset_delete.group_id.clone();
self.route_to_group_coordinator(request, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::TxnOffsetCommit(txn_offset_commit),
..
Expand Down Expand Up @@ -1051,7 +1067,6 @@ The connection to the client has been closed."
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::DeleteRecords(_)
| RequestBody::CreateAcls(_)
| RequestBody::ApiVersions(_),
..
Expand Down Expand Up @@ -1583,6 +1598,58 @@ The connection to the client has been closed."
result
}

/// This method removes all topics from the DeleteRecords request and returns them split up by their destination.
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_delete_records_request_by_destination(
&mut self,
body: &mut DeleteRecordsRequest,
) -> HashMap<BrokerId, Vec<DeleteRecordsTopic>> {
let mut result: HashMap<BrokerId, Vec<DeleteRecordsTopic>> = Default::default();

for mut topic in body.topics.drain(..) {
let topic_name = &topic.name;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition_index as usize;
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing DeleteRecords request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) = dest_topics.iter_mut().find(|x| x.name == topic.name)
{
dest_topic.partitions.push(partition);
} else {
let mut topic = topic.clone();
topic.partitions.push(partition);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
Expand Down Expand Up @@ -2073,6 +2140,10 @@ The connection to the client has been closed."
body: ResponseBody::DeleteGroups(base),
..
})) => Self::combine_delete_groups_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteRecords(base),
..
})) => Self::combine_delete_records(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(base),
..
Expand Down Expand Up @@ -2267,6 +2338,49 @@ The connection to the client has been closed."
Ok(())
}

fn combine_delete_records(
base_body: &mut DeleteRecordsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
let mut base_topics: HashMap<TopicName, DeleteRecordsTopicResult> =
std::mem::take(&mut base_body.topics)
.into_iter()
.map(|response| (response.name.clone(), response))
.collect();
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DeleteRecords(next_body),
..
})) = next.frame()
{
for next_response in std::mem::take(&mut next_body.topics) {
if let Some(base_response) = base_topics.get_mut(&next_response.name) {
for next_partition in &next_response.partitions {
for base_partition in &base_response.partitions {
if next_partition.partition_index == base_partition.partition_index
{
tracing::warn!("Duplicate partition indexes in combined DeleteRecords response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_response.partitions.extend(next_response.partitions)
} else {
base_topics.insert(next_response.name.clone(), next_response);
}
}
} else {
return Err(anyhow!(
"Combining DeleteRecords responses but received another message type"
));
}
}

base_body.topics.extend(base_topics.into_values());

Ok(())
}

fn combine_delete_groups_responses(
base_delete_groups: &mut DeleteGroupsResponse,
drain: impl Iterator<Item = Message>,
Expand Down
Loading

0 comments on commit 3204a62

Please sign in to comment.