Skip to content

Commit

Permalink
fix kafka cluster bug discovered by java driver
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Mar 15, 2024
1 parent 97f027a commit 9097bdf
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 60 deletions.
22 changes: 9 additions & 13 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,9 @@ async fn passthrough_tls(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_tls(#[case] driver: KafkaDriver) {
test_helpers::cert::generate_kafka_test_certs();
Expand Down Expand Up @@ -133,10 +132,9 @@ async fn passthrough_sasl_encode(#[case] driver: KafkaDriver) {
shotover.shutdown_and_then_consume_events(&[]).await;
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand All @@ -156,11 +154,10 @@ async fn cluster_1_rack_single_shotover(#[case] driver: KafkaDriver) {
.expect("Shotover did not shutdown within 10s");
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-1-rack/docker-compose.yaml");
Expand Down Expand Up @@ -192,10 +189,9 @@ async fn cluster_1_rack_multi_shotover(#[case] driver: KafkaDriver) {
}
}

#[cfg(feature = "rdkafka-driver-tests")] // temporarily needed to avoid a warning
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
//#[case::java(KafkaDriver::Java)]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
Expand Down Expand Up @@ -233,8 +229,8 @@ async fn cluster_2_racks_multi_shotover(#[case] driver: KafkaDriver) {
#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
Expand All @@ -258,8 +254,8 @@ async fn cluster_sasl_single_shotover(#[case] driver: KafkaDriver) {
#[cfg(feature = "rdkafka-driver-tests")]
#[rstest]
#[cfg_attr(feature = "rdkafka-driver-tests", case::cpp(KafkaDriver::Cpp))]
// #[case::java(KafkaDriver::Java)]
#[tokio::test]
#[case::java(KafkaDriver::Java)]
#[tokio::test(flavor = "multi_thread")] // multi_thread is needed since java driver will block when consuming, causing shotover logs to not appear
async fn cluster_sasl_multi_shotover(#[case] driver: KafkaDriver) {
let _docker_compose =
docker_compose("tests/test-configs/kafka/cluster-sasl/docker-compose.yaml");
Expand Down
173 changes: 126 additions & 47 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::common::produce_channel;
use crate::frame::kafka::{KafkaFrame, RequestBody, ResponseBody};
use crate::frame::Frame;
use crate::message::{Message, Messages};
use crate::message::{Message, MessageIdMap, Messages};
use crate::tls::{TlsConnector, TlsConnectorConfig};
use crate::transforms::util::{Request, Response};
use crate::transforms::{Transform, TransformBuilder, TransformContextBuilder, Wrapper};
Expand All @@ -23,6 +23,7 @@ use rand::rngs::SmallRng;
use rand::seq::{IteratorRandom, SliceRandom};
use rand::SeedableRng;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::hash::Hasher;
use std::net::SocketAddr;
use std::sync::atomic::AtomicI64;
Expand All @@ -32,6 +33,14 @@ use tokio::sync::{mpsc, oneshot, RwLock};
use tokio::time::timeout;
use uuid::Uuid;

#[derive(thiserror::Error, Debug)]
enum FindCoordinatorError {
#[error("Coordinator not available")]
CoordinatorNotAvailable,
#[error("{0:?}")]
Unrecoverable(#[from] anyhow::Error),
}

mod node;

#[derive(Serialize, Deserialize, Debug)]
Expand Down Expand Up @@ -157,6 +166,8 @@ impl TransformBuilder for KafkaSinkClusterBuilder {
rng: SmallRng::from_rng(rand::thread_rng()).unwrap(),
sasl_status: SaslStatus::new(self.sasl_enabled),
connection_factory: ConnectionFactory::new(self.tls.clone(), self.connect_timeout),
fetch_session_id_to_broker: HashMap::new(),
fetch_request_destinations: Default::default(),
})
}

Expand Down Expand Up @@ -231,6 +242,11 @@ pub struct KafkaSinkCluster {
rng: SmallRng,
sasl_status: SaslStatus,
connection_factory: ConnectionFactory,
// its not clear from the docs if this cache needs to be accessed cross connection:
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability
fetch_session_id_to_broker: HashMap<i32, BrokerId>,
// for use with fetch_session_id_to_broker
fetch_request_destinations: MessageIdMap<BrokerId>,
}

#[async_trait]
Expand Down Expand Up @@ -355,10 +371,20 @@ impl KafkaSinkCluster {
}

for group in groups {
let node = self.find_coordinator_of_group(group.clone()).await?;
self.group_to_coordinator_broker
.insert(group, node.broker_id);
self.add_node_if_new(node).await;
match self.find_coordinator_of_group(group.clone()).await {
Ok(node) => {
self.group_to_coordinator_broker
.insert(group, node.broker_id);
self.add_node_if_new(node).await;
}
Err(FindCoordinatorError::CoordinatorNotAvailable) => {
// We cant find the coordinator so do nothing so that the request will be routed to a random node:
// * If it happens to be the coordinator all is well
// * If its not the coordinator then it will return a COORDINATOR_NOT_AVAILABLE message to
// the client prompting it to retry the whole process again.
}
Err(FindCoordinatorError::Unrecoverable(err)) => Err(err)?,
}
}

// request and process metadata if we are missing topics or the controller broker id
Expand Down Expand Up @@ -439,37 +465,55 @@ impl KafkaSinkCluster {
body: RequestBody::Fetch(fetch),
..
})) => {
// assume that all topics in this message have the same routing requirements
let topic = fetch
.topics
.first()
.ok_or_else(|| anyhow!("No topics in produce message"))?;
let connection = if let Some(topic_meta) = self.topics.get(&topic.topic.0) {
// assume that all partitions in this topic have the same routing requirements
let partition = &topic_meta.partitions[topic
.partitions
let node = if fetch.session_id == 0 {
// assume that all topics in this message have the same routing requirements
let topic = fetch
.topics
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.partition
as usize];
self.nodes
.iter_mut()
.filter(|node| partition.replica_nodes.contains(&node.broker_id))
.choose(&mut self.rng)
.unwrap()
.get_connection(&self.connection_factory)
.await?
.clone()
.ok_or_else(|| anyhow!("No topics in fetch message"))?;
let topic_name = &topic.topic;
let node = if let Some(topic_meta) = self.topics.get(topic_name) {
let partition_index = topic
.partitions
.first()
.ok_or_else(|| anyhow!("No partitions in topic"))?
.partition
as usize;
// assume that all partitions in this topic have the same routing requirements
if let Some(partition) = topic_meta.partitions.get(partition_index) {
self.nodes
.iter_mut()
.filter(|node| {
partition.replica_nodes.contains(&node.broker_id)
})
.choose(&mut self.rng)
.unwrap()
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition replica for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose_mut(&mut self.rng).unwrap()
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes.choose_mut(&mut self.rng).unwrap()
};
self.fetch_request_destinations
.insert(message.id(), node.broker_id);
node
} else {
let topic = &topic.topic;
tracing::warn!("no known partition replica for {topic:?}, routing message to a random node so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
self.nodes
.choose_mut(&mut self.rng)
.unwrap()
.get_connection(&self.connection_factory)
.await?
.clone()
// route via session id
if let Some(destination) =
self.fetch_session_id_to_broker.get(&fetch.session_id)
{
self.nodes
.iter_mut()
.find(|x| &x.broker_id == destination)
.unwrap()
} else {
todo!()
}
};
let connection = node.get_connection(&self.connection_factory).await?.clone();

let (tx, rx) = oneshot::channel();
connection
Expand Down Expand Up @@ -498,9 +542,19 @@ impl KafkaSinkCluster {
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(offset_fetch),
..
header,
})) => {
let group_id = offset_fetch.group_id.clone();
let group_id = if header.request_api_version <= 7 {
offset_fetch.group_id.clone()
} else {
// This is possibly dangerous.
// The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker.
// We may need to add some logic to split the request into multiple messages going to different destinations,
// and then reconstruct the response back into a single response
//
// For now just pick the first group as that is sufficient for the simple cases.
offset_fetch.groups.first().unwrap().group_id.clone()
};
results.push(self.route_to_coordinator(message, group_id).await?);
}
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -602,7 +656,10 @@ impl KafkaSinkCluster {
Ok(())
}

async fn find_coordinator_of_group(&mut self, group: GroupId) -> Result<KafkaNode> {
async fn find_coordinator_of_group(
&mut self,
group: GroupId,
) -> Result<KafkaNode, FindCoordinatorError> {
let request = Message::from_frame(Frame::Kafka(KafkaFrame::Request {
header: RequestHeader::builder()
.request_api_key(ApiKey::FindCoordinatorKey as i16)
Expand Down Expand Up @@ -637,14 +694,20 @@ impl KafkaSinkCluster {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(coordinator),
..
})) => Ok(KafkaNode::new(
coordinator.node_id,
KafkaAddress::new(coordinator.host.clone(), coordinator.port),
None,
)),
})) => {
if coordinator.error_code == 0 {
Ok(KafkaNode::new(
coordinator.node_id,
KafkaAddress::new(coordinator.host.clone(), coordinator.port),
None,
))
} else {
Err(FindCoordinatorError::CoordinatorNotAvailable)
}
}
other => Err(anyhow!(
"Unexpected message returned to findcoordinator request {other:?}"
)),
))?,
}
}

Expand Down Expand Up @@ -704,8 +767,8 @@ impl KafkaSinkCluster {

// TODO: Handle errors like NOT_COORDINATOR by removing element from self.topics and self.coordinator_broker_id

// Rewrite responses to ensure clients only see the shotover cluster and hide the existence of the real kafka cluster
for (i, response) in responses.iter_mut().enumerate() {
let request_id = response.request_id();
match response.frame() {
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::FindCoordinator(find_coordinator),
Expand All @@ -729,6 +792,17 @@ impl KafkaSinkCluster {
self.rewrite_metadata_response(metadata)?;
response.invalidate_cache();
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Fetch(fetch),
..
})) => {
let destination = self
.fetch_request_destinations
.remove(&request_id.unwrap())
.unwrap();
self.fetch_session_id_to_broker
.insert(fetch.session_id, destination);
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::DescribeCluster(_),
..
Expand Down Expand Up @@ -786,6 +860,7 @@ impl KafkaSinkCluster {
if let Some(broker_id) = self.group_to_coordinator_broker.get(&group_id) {
if node.broker_id == *broker_id {
connection = Some(node.get_connection(&self.connection_factory).await?.clone());
break;
}
}
}
Expand Down Expand Up @@ -847,12 +922,16 @@ impl KafkaSinkCluster {
) {
if request.key_type == 0 {
if version <= 3 {
self.group_to_coordinator_broker
.insert(GroupId(request.key.clone()), find_coordinator.node_id);
if find_coordinator.error_code == 0 {
self.group_to_coordinator_broker
.insert(GroupId(request.key.clone()), find_coordinator.node_id);
}
} else {
for coordinator in &find_coordinator.coordinators {
self.group_to_coordinator_broker
.insert(GroupId(coordinator.key.clone()), find_coordinator.node_id);
if coordinator.error_code == 0 {
self.group_to_coordinator_broker
.insert(GroupId(coordinator.key.clone()), coordinator.node_id);
}
}
}
}
Expand Down

0 comments on commit 9097bdf

Please sign in to comment.