Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaSinkCluster - move destination field from PendingRequestState into PendingRequest #1828

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 57 additions & 69 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,13 +343,9 @@ pub(crate) struct KafkaSinkCluster {
#[derive(Debug)]
enum PendingRequestState {
/// A route has been determined for this request but it has not yet been sent.
Routed {
destination: Destination,
request: Message,
},
Routed { request: Message },
/// The request has been sent to the specified broker and we are now awaiting a response from that broker.
Sent {
destination: Destination,
/// How many responses must be received before this response is received.
/// When this is 0 the next response from the broker will be for this request.
/// This field must be manually decremented when another response for this broker comes through.
Expand All @@ -360,8 +356,6 @@ enum PendingRequestState {
/// The broker has returned a Response to this request.
/// Returning this response may be delayed until a response to an earlier request comes back from another broker.
Received {
// TODO: move this into the parent type
destination: Destination,
response: Message,
/// Some message types store the request here in case they need to resend it.
// TODO: if we ever turn the Message into a CoW type we will be able to
Expand All @@ -371,11 +365,8 @@ enum PendingRequestState {
}

impl PendingRequestState {
fn routed(broker_id: BrokerId, request: Message) -> Self {
Self::Routed {
destination: Destination::Id(broker_id),
request,
}
fn routed(request: Message) -> Self {
Self::Routed { request }
}
}

Expand All @@ -396,6 +387,9 @@ enum PendingRequestTy {

struct PendingRequest {
state: PendingRequestState,

destination: Destination,

/// Type of the request sent
ty: PendingRequestTy,
/// Combine the next N responses into a single response
Expand Down Expand Up @@ -1209,7 +1203,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);
tracing::debug!("Routing request to random broker {}", destination.0);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1228,7 +1223,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1250,7 +1246,8 @@ The connection to the client has been closed."

T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -1275,7 +1272,8 @@ The connection to the client has been closed."
let request_frame = T::get_request_frame(&mut request);
T::reassemble(request_frame, topic_data);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses,
});
Expand Down Expand Up @@ -1432,7 +1430,8 @@ The connection to the client has been closed."
let destination = random_broker_id(&self.nodes, &mut self.rng);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand All @@ -1455,7 +1454,8 @@ The connection to the client has been closed."

fetch.topics = topics;
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
// we dont need special handling for fetch, so just use Other
ty: PendingRequestTy::Other,
combine_responses: 1,
Expand Down Expand Up @@ -1494,7 +1494,8 @@ The connection to the client has been closed."
fetch.topics = topics;
}
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Fetch {
originally_sent_at: Instant::now(),
max_wait_ms,
Expand Down Expand Up @@ -2017,28 +2018,26 @@ The connection to the client has been closed."

let mut broker_to_routed_requests: HashMap<Destination, RoutedRequests> = HashMap::new();
for i in 0..self.pending_requests.len() {
if let PendingRequestState::Routed { destination, .. } = &self.pending_requests[i].state
{
let routed_requests = broker_to_routed_requests
.entry(*destination)
.or_insert_with(|| RoutedRequests {
requests: vec![],
already_pending: self
.pending_requests
.iter()
.filter(|pending_request| {
if let PendingRequestState::Sent {
destination: check_destination,
..
} = &pending_request.state
{
check_destination == destination
} else {
false
}
})
.count(),
});
if let PendingRequestState::Routed { .. } = &self.pending_requests[i].state {
let destination = self.pending_requests[i].destination;
let routed_requests =
broker_to_routed_requests
.entry(destination)
.or_insert_with(|| RoutedRequests {
requests: vec![],
already_pending: self
.pending_requests
.iter()
.filter(|pending_request| {
if let PendingRequestState::Sent { .. } = &pending_request.state
{
pending_request.destination == destination
} else {
false
}
})
.count(),
});

let request = match self.pending_requests[i].ty {
PendingRequestTy::Fetch { .. } => {
Expand All @@ -2056,7 +2055,6 @@ The connection to the client has been closed."
PendingRequestTy::Other => None,
};
let mut value = PendingRequestState::Sent {
destination: *destination,
index: routed_requests.requests.len() + routed_requests.already_pending,
request,
};
Expand Down Expand Up @@ -2163,19 +2161,15 @@ The connection to the client has been closed."
for response in self.temp_responses_buffer.drain(..) {
let mut response = Some(response);
for pending_request in &mut self.pending_requests {
if let PendingRequestState::Sent {
destination,
index,
request,
} = &mut pending_request.state
if let PendingRequestState::Sent { index, request } =
&mut pending_request.state
{
if destination == connection_destination {
if &pending_request.destination == connection_destination {
// Store the PendingRequestState::Received at the location of the next PendingRequestState::Sent
// All other PendingRequestState::Sent need to be decremented, in order to determine the PendingRequestState::Sent
// to be used next time, and the time after that, and ...
if *index == 0 {
pending_request.state = PendingRequestState::Received {
destination: *destination,
response: response.take().unwrap(),
request: request.take(),
};
Expand Down Expand Up @@ -2242,14 +2236,10 @@ The connection to the client has been closed."

for i in 0..combine_responses {
let pending_request = &mut self.pending_requests[i];
if let PendingRequestState::Received {
destination,
request,
..
} = &mut pending_request.state
if let PendingRequestState::Received { request, .. } =
&mut pending_request.state
{
pending_request.state = PendingRequestState::Routed {
destination: *destination,
request: request.take().unwrap(),
}
} else {
Expand Down Expand Up @@ -2278,12 +2268,8 @@ The connection to the client has been closed."
} else {
let drain = self.pending_requests.drain(..combine_responses).map(|x| {
if let PendingRequest {
state:
PendingRequestState::Received {
response,
destination,
..
},
state: PendingRequestState::Received { response, .. },
destination,
..
} = x
{
Expand Down Expand Up @@ -3353,10 +3339,8 @@ The connection to the client has been closed."
"route_to_control_connection cannot be called after auth is complete. Otherwise it would collide with control_send_receive"
);
self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::Routed {
destination: Destination::ControlConnection,
request,
},
state: PendingRequestState::Routed { request },
destination: Destination::ControlConnection,
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -3378,7 +3362,8 @@ The connection to the client has been closed."
};

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::Other,
combine_responses: 1,
});
Expand All @@ -3405,7 +3390,8 @@ The connection to the client has been closed."
);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::RoutedToGroup(group_id),
combine_responses: 1,
});
Expand All @@ -3432,7 +3418,8 @@ The connection to the client has been closed."
);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty: PendingRequestTy::RoutedToTransaction(transaction_id),
combine_responses: 1,
});
Expand All @@ -3452,7 +3439,8 @@ The connection to the client has been closed."
tracing::debug!("Routing FindCoordinator to random broker {}", destination.0);

self.pending_requests.push_back(PendingRequest {
state: PendingRequestState::routed(destination, request),
state: PendingRequestState::routed(request),
destination: Destination::Id(destination),
ty,
combine_responses: 1,
});
Expand Down
Loading