From 1fd56694ca64bfa9d3d463853d13aed57832e92b Mon Sep 17 00:00:00 2001 From: Sri Harsha Singudasu <105226401+ssingudasu@users.noreply.github.com> Date: Thu, 13 Jun 2024 10:26:40 -0700 Subject: [PATCH] Conn timeout as flag in topicctl (#200) * Conn timeout as flag in topicctl * Fixing test cases topicctl --- cmd/topicctl/subcmd/shared.go | 18 ++++++++++++++---- pkg/admin/connector.go | 7 ++++--- pkg/admin/zkclient.go | 4 +++- pkg/groups/groups_test.go | 6 ++++++ pkg/messages/bounds.go | 5 +---- pkg/messages/bounds_test.go | 1 + pkg/messages/tail_test.go | 1 + pkg/version/version.go | 2 +- 8 files changed, 31 insertions(+), 13 deletions(-) diff --git a/cmd/topicctl/subcmd/shared.go b/cmd/topicctl/subcmd/shared.go index bc149415..3fdff5b4 100644 --- a/cmd/topicctl/subcmd/shared.go +++ b/cmd/topicctl/subcmd/shared.go @@ -4,6 +4,7 @@ import ( "context" "errors" "os" + "time" "github.com/aws/aws-sdk-go/aws/session" "github.com/hashicorp/go-multierror" @@ -29,6 +30,7 @@ type sharedOptions struct { tlsServerName string zkAddr string zkPrefix string + connTimeout time.Duration } func (s sharedOptions) validate() error { @@ -164,6 +166,7 @@ func (s sharedOptions) getAdminClient( Username: s.saslUsername, SecretsManagerArn: s.saslSecretsManagerArn, }, + ConnTimeout: s.connTimeout, }, ReadOnly: readOnly, }, @@ -172,10 +175,11 @@ func (s sharedOptions) getAdminClient( return admin.NewZKAdminClient( ctx, admin.ZKAdminClientConfig{ - ZKAddrs: []string{s.zkAddr}, - ZKPrefix: s.zkPrefix, - Sess: sess, - ReadOnly: readOnly, + ZKAddrs: []string{s.zkAddr}, + ZKPrefix: s.zkPrefix, + Sess: sess, + ReadOnly: readOnly, + KafkaConnTimeout: s.connTimeout, }, ) } @@ -275,6 +279,12 @@ func addSharedFlags(cmd *cobra.Command, options *sharedOptions) { "", "Prefix for cluster-related nodes in zk", ) + cmd.PersistentFlags().DurationVar( + &options.connTimeout, + "conn-timeout", + 10*time.Second, + "Kafka connection timeout", + ) } func addSharedConfigOnlyFlags(cmd *cobra.Command, options *sharedOptions) { diff --git a/pkg/admin/connector.go b/pkg/admin/connector.go index cbda87cc..c088143b 100644 --- a/pkg/admin/connector.go +++ b/pkg/admin/connector.go @@ -35,9 +35,10 @@ const ( // ConnectorConfig contains the configuration used to contruct a connector. type ConnectorConfig struct { - BrokerAddr string - TLS TLSConfig - SASL SASLConfig + BrokerAddr string + TLS TLSConfig + SASL SASLConfig + ConnTimeout time.Duration } // TLSConfig stores the TLS-related configuration for a connection. diff --git a/pkg/admin/zkclient.go b/pkg/admin/zkclient.go index 11829027..d46697a3 100644 --- a/pkg/admin/zkclient.go +++ b/pkg/admin/zkclient.go @@ -63,6 +63,7 @@ type ZKAdminClientConfig struct { ExpectedClusterID string Sess *session.Session ReadOnly bool + KafkaConnTimeout time.Duration } // NewZKAdminClient creates and returns a new Client instance. @@ -136,7 +137,8 @@ func NewZKAdminClient( client.bootstrapAddrs = bootstrapAddrs client.Connector, err = NewConnector( ConnectorConfig{ - BrokerAddr: bootstrapAddrs[0], + BrokerAddr: bootstrapAddrs[0], + ConnTimeout: config.KafkaConnTimeout, }, ) diff --git a/pkg/groups/groups_test.go b/pkg/groups/groups_test.go index 73780643..71288f7b 100644 --- a/pkg/groups/groups_test.go +++ b/pkg/groups/groups_test.go @@ -18,6 +18,7 @@ func TestGetGroups(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -83,6 +84,7 @@ func TestGetGroupsMultiMember(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -164,6 +166,7 @@ func TestGetGroupsMultiMemberMultiTopic(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -260,6 +263,7 @@ func TestGetLags(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -303,6 +307,7 @@ func TestGetEarliestOrLatestOffset(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) @@ -350,6 +355,7 @@ func TestResetOffsets(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/messages/bounds.go b/pkg/messages/bounds.go index 45c1a9e4..04621b34 100644 --- a/pkg/messages/bounds.go +++ b/pkg/messages/bounds.go @@ -26,9 +26,6 @@ const ( // Parameters for backoff when there are connection errors maxRetries = 4 backoffInitSleepDuration = 200 * time.Millisecond - - // Connection timeout - connTimeout = 10 * time.Second ) // Bounds represents the start and end "bounds" of the messages in @@ -284,6 +281,6 @@ func dialLeaderRetries( return nil, fmt.Errorf("Error dialing partition %d: %+v", partition, err) } - conn.SetDeadline(time.Now().Add(connTimeout)) + conn.SetDeadline(time.Now().Add(connector.Config.ConnTimeout)) return conn, nil } diff --git a/pkg/messages/bounds_test.go b/pkg/messages/bounds_test.go index a1dfe5c1..991a3a81 100644 --- a/pkg/messages/bounds_test.go +++ b/pkg/messages/bounds_test.go @@ -17,6 +17,7 @@ func TestGetAllPartitionBounds(t *testing.T) { ctx := context.Background() connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/messages/tail_test.go b/pkg/messages/tail_test.go index 90f68d4b..92867c88 100644 --- a/pkg/messages/tail_test.go +++ b/pkg/messages/tail_test.go @@ -20,6 +20,7 @@ func TestTailerGetMessages(t *testing.T) { connector, err := admin.NewConnector(admin.ConnectorConfig{ BrokerAddr: util.TestKafkaAddr(), + ConnTimeout: 10 * time.Second, }) require.NoError(t, err) diff --git a/pkg/version/version.go b/pkg/version/version.go index 390ac488..84704277 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -1,4 +1,4 @@ package version // Version is the current topicctl version. -const Version = "1.17.0" +const Version = "1.18.0"