Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sasl with python-kafka #1783

Merged
merged 3 commits into from
Oct 25, 2024
Merged

Fix sasl with python-kafka #1783

merged 3 commits into from
Oct 25, 2024

Conversation

rukai
Copy link
Member

@rukai rukai commented Oct 23, 2024

This PR implements the legacy SASL implementation still in use by the python driver.
I am not aware of any other drivers that use it.

The protocol docs describe the situation pretty well.
To summarize, when the client sends a SaslHandshake request of version 0, the client and broker will perform all SASL communication over raw packets.

Implementing this within shotover requires a lot of complexity, but it is ultimately manageable, and the implementation does not impact non-legacy SASL.

While the docs do not specify this, I am assuming that the client will always send its raw sasl requests immediately after sending a v0 SaslHandshake. The python driver at least behaves this way, and we would have to resort to heuristics to detect message types if any driver did behave this way.

Overview of implementation

To implement this functionality the following is done:

CodecState

Refer to this comment about CodecState:

/// Database protocols are often designed such that their messages can be parsed without knowledge of any state of prior messages.
/// When protocols remain stateless, Shotover's parser implementations can remain fairly simple.
/// However in the real world there is often some kind of connection level state that we need to track in order to parse messages.
///
/// Shotover solves this issue via this enum which provides any of the connection level state required to decode and then reencode messages.
/// 1. The Decoder includes this value in all messages it produces.
/// 2. If any transforms call `.frame()` this value is used to parse the frame of the message.
/// 3. The Encoder uses this value to reencode the message if it has been modified.

In this PR CodecState::Kafka is expanded to contain KafkaCodecState struct which contains the existing request_header field along with the new raw_sasl: Option<SaslRequestState> field. When this field is None the standard message handling occurs, when it is Some special encoding and decoding paths will be taken to pretend the raw sasl messages are SaslAuthenticate v0 messages. This ensures that the transforms can treat these raw messages just like they do for the regular sasl messages, completely hiding the distinction from them. This makes KafkaSinkCluster simple to implement and also makes any possible future transforms simple to implement too.

Codecs

In order to correctly populate the CodecState value, the KafkaEncoder and KafkaDecoder need to coordinate with each other to keep track of any SaslHandshake v0 that pass through in order to mark the following messages as raw sasl messages in the KafkaCodecState.

Overall Flow

client request -> kafka (SaslHandshake v0)

  1. Source KafkaDecoder detects SaslHandshake v0, sets self.expect_raw_sasl = Some(_)
  2. Sink KafkaEncoder detects SaslHandshake v0, sends expect_raw_sasl = Some() to Sink KafkaDecoder

kafka response -> client (SaslHandshake v0)

  1. Sink KafkaDecoder, receives self.expect_raw_sasl = Some(_) from KafkaDecoder
  2. Source KafkaEncoder does nothing in particular of note

client request -> kafka (raw sasl)

  1. Source KafkaDecoder assumes that the request is a raw sasl request due to self.expect_raw_sasl being Some, as a result it sets the KafkaCodecState::raw_sasl field on the message.
  2. Transforms may successfully parse the request due to the KafkaCodecState::raw_sasl field.
  3. Sink KafkaEncoder knows that the request is a raw sasl request due to the KafkaCodecState::raw_sasl field.

kafka response -> client (raw sasl)

  1. Source KafkaDecoder assumes that the response is a raw sasl response due to self.expect_raw_sasl being Some, as a result it sets the KafkaCodecState::raw_sasl field on the message.
  2. Transforms may successfully parse the response due to the KafkaCodecState::raw_sasl field.
  3. Sink KafkaEncoder knows that the response is a raw sasl response due to the KafkaCodecState::raw_sasl field.

Other fixes

There were a few unrelated misc fixes that were needed to get the new integration tests working.

  • avoid panic due to hitting a connection error before we fetch broker metadata (connections.rs:241)
  • avoid warning on ApiVersions request sent outside of handshake
  • Avoid killing shotover connection when cluster reports controller is -1 (indicates controller is not ready yet)
  • only set the controller id metadata when its not -1, due to the AtomicBrokerId's set/unset abstraction, if we set it to -1 we wont lookup for new values since it appears set but is actually -1.
  • create_topics is added to main.py to ensure that the topic already exists, since auto topic creation was hitting some kind of race condition in the python driver.
  • .kill_on_drop(true) is needed to ensure the python scripts do not linger around after a failed test.

New tests

Added tests for:

  • running python driver over sasl scram over mtls - includes test to ensure incorrect credentials are rejected.
  • running python driver over sasl plain

Alternatives

We could submit a patch to the python driver that upgrades it to use the modern SASL implementation.
This is probably worth doing if we ever have the resources to spare.
But we should probably still still ship this implementation so that we can handle any other drivers out there still using this old SASL implementation. I dont think kafka has ever deprecated old parts of the protocol before.

@rukai rukai force-pushed the fix-kafka-python-sasl branch 3 times, most recently from 79dfe4e to ce5ff5e Compare October 23, 2024 23:44
Copy link

codspeed-hq bot commented Oct 23, 2024

CodSpeed Performance Report

Merging #1783 will not alter performance

Comparing rukai:fix-kafka-python-sasl (2b70be3) with main (9930709)

Summary

✅ 38 untouched benchmarks

@rukai rukai force-pushed the fix-kafka-python-sasl branch 7 times, most recently from 32fcd7f to c6dac84 Compare October 24, 2024 05:53
@rukai rukai marked this pull request as ready for review October 24, 2024 06:20
@rukai rukai force-pushed the fix-kafka-python-sasl branch 4 times, most recently from 699e388 to c50eb75 Compare October 24, 2024 23:20
shotover/src/message/mod.rs Outdated Show resolved Hide resolved
shotover/src/codec/kafka.rs Outdated Show resolved Hide resolved
@rukai rukai force-pushed the fix-kafka-python-sasl branch from c50eb75 to fae217e Compare October 25, 2024 00:36
@rukai rukai force-pushed the fix-kafka-python-sasl branch from 5aaf719 to 2b70be3 Compare October 25, 2024 00:46
@rukai rukai merged commit 421ec9e into shotover:main Oct 25, 2024
41 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants