diff --git a/libbeat/outputs/kafka/client.go b/libbeat/outputs/kafka/client.go index 1780f1392b3..1e110f8ca3e 100644 --- a/libbeat/outputs/kafka/client.go +++ b/libbeat/outputs/kafka/client.go @@ -113,7 +113,7 @@ func newKafkaClient( return c, nil } -func (c *client) Connect() error { +func (c *client) Connect(_ context.Context) error { c.mux.Lock() defer c.mux.Unlock() diff --git a/libbeat/outputs/kafka/kafka_integration_test.go b/libbeat/outputs/kafka/kafka_integration_test.go index e9abc559774..b187b5d57ff 100644 --- a/libbeat/outputs/kafka/kafka_integration_test.go +++ b/libbeat/outputs/kafka/kafka_integration_test.go @@ -280,7 +280,7 @@ func TestKafkaPublish(t *testing.T) { output, ok := grp.Clients[0].(*client) assert.True(t, ok, "grp.Clients[0] didn't contain a ptr to client") - if err := output.Connect(); err != nil { + if err := output.Connect(context.Background()); err != nil { t.Fatal(err) } assert.Equal(t, output.index, "testbeat") diff --git a/libbeat/outputs/redis/backoff.go b/libbeat/outputs/redis/backoff.go index 2abc1f846f0..42f9db1c285 100644 --- a/libbeat/outputs/redis/backoff.go +++ b/libbeat/outputs/redis/backoff.go @@ -19,6 +19,7 @@ package redis import ( "context" + "errors" "time" "github.com/gomodule/redigo/redis" @@ -61,7 +62,7 @@ func newBackoffClient(client *client, init, max time.Duration) *backoffClient { } func (b *backoffClient) Connect(ctx context.Context) error { - err := b.client.Connect() + err := b.client.Connect(ctx) if err != nil { // give the client a chance to promote an internal error to a network error. b.updateFailReason(err) @@ -102,7 +103,8 @@ func (b *backoffClient) updateFailReason(err error) { return } - if _, ok := err.(redis.Error); ok { + var redisErr *redis.Error + if errors.As(err, &redisErr) { b.reason = failRedis } else { b.reason = failOther diff --git a/libbeat/outputs/redis/client.go b/libbeat/outputs/redis/client.go index 9f5c9812dd1..db3ec5a3b43 100644 --- a/libbeat/outputs/redis/client.go +++ b/libbeat/outputs/redis/client.go @@ -90,7 +90,7 @@ func newClient( } } -func (c *client) Connect() error { +func (c *client) Connect(_ context.Context) error { c.log.Debug("connect") err := c.Client.Connect() if err != nil { diff --git a/libbeat/tests/integration/kafka_test.go b/libbeat/tests/integration/kafka_test.go new file mode 100644 index 00000000000..72e5b37e49d --- /dev/null +++ b/libbeat/tests/integration/kafka_test.go @@ -0,0 +1,89 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build integration + +package integration + +import ( + "fmt" + "testing" + "time" + + "github.com/Shopify/sarama" +) + +var ( + // https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17 + // The version of MockBroker used when this test was written only supports the lowest protocol version by default. + // Version incompatibilities will result in message decoding errors between the mock and the beat. + kafkaVersion = sarama.MinVersion + kafkaTopic = "test_topic" + kafkaCfg = ` +mockbeat: +logging: + level: debug + selectors: + - publisher_pipeline_output + - kafka +queue.mem: + events: 4096 + flush.timeout: 0s +output.kafka: + topic: %s + version: %s + hosts: + - %s + backoff: + init: 0.1s + max: 0.2s +` +) + +// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka. +// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would +// panic on the first Publish because it's Connect method was no longer called. +func TestKafkaOutputCanConnectAndPublish(t *testing.T) { + // Create a Mock Kafka broker that will listen on localhost on a random unallocated port. + // The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141. + leader := sarama.NewMockBroker(t, 1) + defer leader.Close() + + // The mock broker must respond to a single metadata request. + metadataResponse := new(sarama.MetadataResponse) + metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) + metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, nil, sarama.ErrNoError) + leader.Returns(metadataResponse) + + // The mock broker must return a single produce response. If no produce request is received, the test will fail. + // This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established. + prodSuccess := new(sarama.ProduceResponse) + prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError) + leader.Returns(prodSuccess) + + // Start mockbeat with the appropriate configuration. + mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test") + mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr())) + mockbeat.Start() + + // Wait for mockbeat to log that it successfully published a batch to Kafka. + // This ensures that mockbeat received the expected produce response configured above. + mockbeat.WaitForLogs( + `finished kafka batch`, + 10*time.Second, + "did not find finished batch log") +}