Skip to content

Commit

Permalink
[CLI-3315] Panic: confluent kafka topic consume v4.8.0 (#2915)
Browse files Browse the repository at this point in the history
  • Loading branch information
channingdong authored Oct 28, 2024
1 parent 69941b8 commit 00a35f0
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 13 deletions.
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,6 @@ github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEt
github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0 h1:2QuFhvrfU4AdxyfWWPFY0fqEg8p8wmKFfC6N+35pxHg=
github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0/go.mod h1:cl7LEL6bFgiXQ+8sEZvo3BrYZxDOvGkx4jV7eX1ssN4=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.12.0 h1:fra7uBCCtYkUFtHb6ununxMWqYuVonG52t1mV9o7qRE=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.12.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk=
github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU=
github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 h1:ZUAow4L6De1FwYoiwvEodm4lvxc+46wNW+IEAb7K9VU=
Expand Down
4 changes: 0 additions & 4 deletions internal/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ type flags struct {
topics []string
}

// messageOffset is 5, as the schema ID is stored at the [1:5] bytes of a message as meta info (when valid)
const messageOffset = 5
const protobufErrorMessage = "protobuf is not supported"

func (c *command) newExportCommand() *cobra.Command {
Expand Down Expand Up @@ -292,8 +290,6 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content
if err != nil {
return nil, err
}
// Message body is encoded after 5 bytes of meta information.
value = value[messageOffset:]
if err := deserializationProvider.LoadSchema(schemaPath, referencePathMap); err != nil {
return nil, err
}
Expand Down
8 changes: 4 additions & 4 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error {
// Initialize the value serializer with the same SR endpoint during registration
// The associated schema ID is also required to initialize the serializer
var valueSchemaId = -1
if len(valueMetaInfo) >= 5 {
valueSchemaId = int(binary.BigEndian.Uint32(valueMetaInfo[1:5]))
if len(valueMetaInfo) >= messageOffset {
valueSchemaId = int(binary.BigEndian.Uint32(valueMetaInfo[1:messageOffset]))
}
if err := valueSerializer.InitSerializer(srEndpoint, srClusterId, "value", srApiKey, srApiSecret, token, valueSchemaId); err != nil {
return err
Expand Down Expand Up @@ -485,7 +485,7 @@ func getProduceMessage(cmd *cobra.Command, keyMetaInfo, valueMetaInfo []byte, to
return message, nil
}

func serializeMessage(keyMetaInfo, valueMetaInfo []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]byte, []byte, error) {
func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]byte, []byte, error) {
var serializedKey []byte
val := data
if parseKey {
Expand All @@ -508,7 +508,7 @@ func serializeMessage(keyMetaInfo, valueMetaInfo []byte, topic, data, delimiter
return nil, nil, err
}

return append(keyMetaInfo, serializedKey...), append(valueMetaInfo, serializedValue...), nil
return serializedKey, serializedValue, nil
}

func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, error) {
Expand Down
3 changes: 0 additions & 3 deletions internal/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ func consumeMessage(message *ckgo.Message, h *GroupHandler) error {
if err != nil {
return err
}
message.Key = message.Key[messageOffset:]
if err := keyDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil {
return err
}
Expand Down Expand Up @@ -243,8 +242,6 @@ func consumeMessage(message *ckgo.Message, h *GroupHandler) error {
if err != nil {
return err
}
// Message body is encoded after 5 bytes of meta information.
message.Value = message.Value[messageOffset:]
if err := valueDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil {
return err
}
Expand Down

0 comments on commit 00a35f0

Please sign in to comment.