From 0673abf82fd02e09587665b239eb3216b4dbb3ac Mon Sep 17 00:00:00 2001 From: rahulii Date: Wed, 31 Jul 2024 19:19:38 +0530 Subject: [PATCH] fix lint errors Signed-off-by: rahulii --- control-plane/pkg/contract/contract.pb.go | 5 +++-- control-plane/pkg/reconciler/broker/broker_test.go | 2 +- test/e2e_sink/kafka_sink.go | 9 +++++---- test/e2e_sink/kafka_sink_test.go | 4 ++-- test/e2e_source/helpers/kafka_helper.go | 1 - test/lib/resources/kafkachannel.go | 4 ++-- test/rekt/features/leases.go | 2 +- test/rekt/resources/kafkasource/kafkasource.go | 2 +- test/rekt/resources/kafkatopic/topic.go | 4 ++-- .../consumer-group-lag-provider-test/admin.go | 10 ++++++---- test/upgrade/postupgrade.go | 4 ++-- 11 files changed, 25 insertions(+), 22 deletions(-) diff --git a/control-plane/pkg/contract/contract.pb.go b/control-plane/pkg/contract/contract.pb.go index c319671526..69cfa27d7b 100644 --- a/control-plane/pkg/contract/contract.pb.go +++ b/control-plane/pkg/contract/contract.pb.go @@ -7,10 +7,11 @@ package contract import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/control-plane/pkg/reconciler/broker/broker_test.go b/control-plane/pkg/reconciler/broker/broker_test.go index eec98331e9..803c0251b0 100644 --- a/control-plane/pkg/reconciler/broker/broker_test.go +++ b/control-plane/pkg/reconciler/broker/broker_test.go @@ -3071,7 +3071,7 @@ func makeTLSSecret() *corev1.Secret { Name: brokerIngressTLSSecretName, }, Data: map[string][]byte{ - "ca.crt": []byte(eventingtlstesting.CA), + "ca.crt": eventingtlstesting.CA, }, Type: corev1.SecretTypeTLS, } diff --git a/test/e2e_sink/kafka_sink.go b/test/e2e_sink/kafka_sink.go index a22b8e6005..855f8d113a 100644 --- a/test/e2e_sink/kafka_sink.go +++ b/test/e2e_sink/kafka_sink.go @@ -24,7 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" testlib "knative.dev/eventing/test/lib" eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" @@ -37,7 +37,8 @@ import ( ) const ( - sinkSecretName = "secret-test" + sinkSecretName = "secret-test" + numPartitions int32 = 10 ) func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func(kss *eventingv1alpha1.KafkaSinkSpec) error) { @@ -59,10 +60,10 @@ func RunTestKafkaSink(t *testing.T, mode string, sp SecretProvider, opts ...func kss := eventingv1alpha1.KafkaSinkSpec{ Topic: "kafka-sink-" + client.Namespace, - NumPartitions: pointer.Int32(10), + NumPartitions: ptr.To(numPartitions), ReplicationFactor: func(rf int16) *int16 { return &rf }(1), BootstrapServers: BootstrapServersPlaintextArr, - ContentMode: pointer.String(mode), + ContentMode: ptr.To(mode), } for _, opt := range opts { require.Nil(t, opt(&kss)) diff --git a/test/e2e_sink/kafka_sink_test.go b/test/e2e_sink/kafka_sink_test.go index 961b340f24..511261c83a 100644 --- a/test/e2e_sink/kafka_sink_test.go +++ b/test/e2e_sink/kafka_sink_test.go @@ -22,7 +22,7 @@ package e2e_sink import ( "testing" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" eventingv1alpha1 "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/eventing/v1alpha1" . "knative.dev/eventing-kafka-broker/test/pkg" @@ -31,7 +31,7 @@ import ( func TestKafkaSinkV1Alpha1DefaultContentMode(t *testing.T) { RunTestKafkaSink(t, eventingv1alpha1.ModeStructured, nil, func(kss *eventingv1alpha1.KafkaSinkSpec) error { - kss.ContentMode = pointer.String("") + kss.ContentMode = ptr.To("") return nil }) } diff --git a/test/e2e_source/helpers/kafka_helper.go b/test/e2e_source/helpers/kafka_helper.go index a565b87c6d..1123c244a5 100644 --- a/test/e2e_source/helpers/kafka_helper.go +++ b/test/e2e_source/helpers/kafka_helper.go @@ -50,7 +50,6 @@ const ( var ( topicGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziTopicResource} - userGVR = schema.GroupVersionResource{Group: strimziApiGroup, Version: strimziApiVersion, Resource: strimziUserResource} ImcGVR = schema.GroupVersionResource{Group: "messaging.knative.dev", Version: "v1", Resource: "inmemorychannels"} ) diff --git a/test/lib/resources/kafkachannel.go b/test/lib/resources/kafkachannel.go index 5127853bc6..7471fb4036 100644 --- a/test/lib/resources/kafkachannel.go +++ b/test/lib/resources/kafkachannel.go @@ -21,7 +21,7 @@ import ( "time" "k8s.io/apimachinery/pkg/types" - "k8s.io/utils/pointer" + "k8s.io/utils/ptr" v1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -130,7 +130,7 @@ func WithKafkaChannelEndpointsReady() KafkaChannelOption { func WithKafkaChannelAddress(a string) KafkaChannelOption { return func(nc *v1beta1.KafkaChannel) { nc.Status.SetAddress(&duckv1.Addressable{ - Name: pointer.String("http"), + Name: ptr.To("http"), URL: apis.HTTP(a), }) } diff --git a/test/rekt/features/leases.go b/test/rekt/features/leases.go index fc2d9b85ac..2188c605aa 100644 --- a/test/rekt/features/leases.go +++ b/test/rekt/features/leases.go @@ -50,7 +50,7 @@ func KafkaSourceLease() *feature.Feature { func verifyLeaseAcquired(name string) feature.StepFn { return func(ctx context.Context, t feature.T) { - err := wait.Poll(time.Second, time.Minute, func() (done bool, err error) { + err := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) { lease, err := kubeclient.Get(ctx). CoordinationV1(). Leases(system.Namespace()). diff --git a/test/rekt/resources/kafkasource/kafkasource.go b/test/rekt/resources/kafkasource/kafkasource.go index 32336ea981..54bfff8c70 100644 --- a/test/rekt/resources/kafkasource/kafkasource.go +++ b/test/rekt/resources/kafkasource/kafkasource.go @@ -64,7 +64,7 @@ func VerifyScale(name string, replicas int32) feature.StepFn { return func(ctx context.Context, t feature.T) { interval, timeout := environment.PollTimingsFromContext(ctx) last := &sources.KafkaSource{} - err := wait.PollImmediate(interval, timeout, func() (done bool, err error) { + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { ks, err := kafkaclientset.Get(ctx). SourcesV1beta1(). KafkaSources(environment.FromContext(ctx).Namespace()). diff --git a/test/rekt/resources/kafkatopic/topic.go b/test/rekt/resources/kafkatopic/topic.go index 544e547a9d..1ca83b93be 100644 --- a/test/rekt/resources/kafkatopic/topic.go +++ b/test/rekt/resources/kafkatopic/topic.go @@ -80,7 +80,7 @@ func HasReplicationFactor(name string, replicationFactor int, timings ...time.Du return func(ctx context.Context, t feature.T) { interval, timeout := k8s.PollTimings(ctx, timings) - err := wait.PollImmediate(interval, timeout, func() (bool, error) { + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { ut, err := dynamicclient.Get(ctx). Resource(GVR()). Namespace(kafkaNamespace). @@ -119,7 +119,7 @@ func HasNumPartitions(name string, numPartitions int, timings ...time.Duration) return func(ctx context.Context, t feature.T) { interval, timeout := k8s.PollTimings(ctx, timings) - err := wait.PollImmediate(interval, timeout, func() (bool, error) { + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { ut, err := dynamicclient.Get(ctx). Resource(GVR()). Namespace(kafkaNamespace). diff --git a/test/test_images/consumer-group-lag-provider-test/admin.go b/test/test_images/consumer-group-lag-provider-test/admin.go index b2500b8ee4..6b97f76fe2 100644 --- a/test/test_images/consumer-group-lag-provider-test/admin.go +++ b/test/test_images/consumer-group-lag-provider-test/admin.go @@ -23,6 +23,7 @@ import ( "github.com/IBM/sarama" "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/reconciler-test/pkg/k8s" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" testingpkg "knative.dev/eventing-kafka-broker/test/pkg" @@ -76,6 +77,7 @@ func main() { log.Println("Sending events to topic", topic) + interval, timeout := k8s.PollTimings(ctx, []time.Duration{}) for i := 0; i < n; i++ { msg := &sarama.ProducerMessage{ Topic: topic, @@ -84,7 +86,7 @@ func main() { } // Send message might fail with: // "kafka server: Request was for a topic or partition that does not exist on this broker." - err := wait.PollImmediateUntil(time.Minute, func() (done bool, err error) { + err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { partition, offset, err := producer.SendMessage(msg) if err != nil { return false, nil @@ -95,7 +97,7 @@ func main() { } lastOffset = offset return true, nil - }, ctx.Done()) + }) mustBeNil(err) } if int64(n) != lastOffset+1 { // Consistency check @@ -137,7 +139,7 @@ func main() { mustBeNil(err) // Wait for propagation of the committed offset - err = wait.PollImmediateUntil(time.Minute, func() (done bool, err error) { + err = wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) { log.Println("Starting consumer group lag provider") consumerGroupLagProvider := kafka.NewConsumerGroupLagProvider(client, sarama.NewClusterAdminFromClient, sarama.OffsetOldest) @@ -176,7 +178,7 @@ func main() { return false, nil } return true, nil - }, ctx.Done()) + }) mustBeNil(err) } diff --git a/test/upgrade/postupgrade.go b/test/upgrade/postupgrade.go index 1fefbd07be..099a60c501 100644 --- a/test/upgrade/postupgrade.go +++ b/test/upgrade/postupgrade.go @@ -104,8 +104,8 @@ func verifyPostInstall(t *testing.T) { defer testlib.TearDown(client) var lastJob *batchv1.Job - err := wait.Poll(5*time.Second, 10*time.Minute, func() (done bool, err error) { - lastJob, err = client.Kube. + err := wait.PollUntilContextTimeout(context.Background(), 5*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) { + lastJob, err := client.Kube. BatchV1(). Jobs(system.Namespace()). Get(context.Background(), name, metav1.GetOptions{})