From b6a91987b3b14a510a658c0392f2facd0730e3b0 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Mon, 30 Oct 2023 14:32:07 -0700 Subject: [PATCH 01/18] update interface for godo methods --- do/databases.go | 72 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/do/databases.go b/do/databases.go index 9c5016e75..99857d701 100644 --- a/do/databases.go +++ b/do/databases.go @@ -111,6 +111,14 @@ type RedisConfig struct { *godo.RedisConfig } +// DatabaseTopics is a slice of DatabaseTopic +type DatabaseTopics []DatabaseTopic + +// DatabaseTopic is a wrapper for godo.DatabaseTopic +type DatabaseTopic struct { + *godo.DatabaseTopic +} + // DatabasesService is an interface for interacting with DigitalOcean's Database API type DatabasesService interface { List() (Databases, error) @@ -159,6 +167,12 @@ type DatabasesService interface { GetMySQLConfiguration(databaseID string) (*MySQLConfig, error) GetPostgreSQLConfiguration(databaseID string) (*PostgreSQLConfig, error) GetRedisConfiguration(databaseID string) (*RedisConfig, error) + + ListTopics(string) (DatabaseTopics, error) + GetTopic(string, string) (*DatabaseTopic, error) + CreateTopic(string, *godo.DatabaseCreateTopicRequest) (*DatabaseTopic, error) + UpdateTopic(string, string, *godo.DatabaseUpdateTopicRequest) error + DeleteTopic(string, string) error } type databasesService struct { @@ -618,3 +632,61 @@ func (ds *databasesService) GetRedisConfiguration(databaseID string) (*RedisConf RedisConfig: cfg, }, nil } + +func (ds *databasesService) ListTopics(databaseID string) (DatabaseTopics, error) { + f := func(opt *godo.ListOptions) ([]interface{}, *godo.Response, error) { + list, resp, err := ds.client.Databases.ListTopics(context.TODO(), databaseID, opt) + if err != nil { + return nil, nil, err + } + + si := make([]interface{}, len(list)) + for i := range list { + si[i] = list[i] + } + + return si, resp, err + } + + si, err := PaginateResp(f) + if err != nil { + return nil, err + } + + list := make(DatabaseTopics, len(si)) + for i := range si { + t := si[i].(godo.DatabaseTopic) + list[i] = DatabaseTopic{DatabaseTopic: &t} + } + return list, nil +} + +func (ds *databasesService) CreateTopic(databaseID string, req *godo.DatabaseCreateTopicRequest) (*DatabaseTopic, error) { + t, _, err := ds.client.Databases.CreateTopic(context.TODO(), databaseID, req) + if err != nil { + return nil, err + } + + return &DatabaseTopic{DatabaseTopic: t}, nil +} + +func (ds *databasesService) UpdateTopic(databaseID, topicName string, req *godo.DatabaseUpdateTopicRequest) error { + _, err := ds.client.Databases.UpdateTopic(context.TODO(), databaseID, topicName, req) + + return err +} + +func (ds *databasesService) GetTopic(databaseID, topicName string) (*DatabaseTopic, error) { + t, _, err := ds.client.Databases.GetTopic(context.TODO(), databaseID, topicName) + if err != nil { + return nil, err + } + + return &DatabaseTopic{DatabaseTopic: t}, nil +} + +func (ds *databasesService) DeleteTopic(databaseID, topicName string) error { + _, err := ds.client.Databases.DeleteTopic(context.TODO(), databaseID, topicName) + + return err +} From fb51d26073b42997b29827a32be0460d98f1eb86 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Mon, 30 Oct 2023 17:22:32 -0700 Subject: [PATCH 02/18] add topic config displayer --- commands/displayers/database.go | 185 ++++++++++++++++++++++++++++++++ 1 file changed, 185 insertions(+) diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 157c8747f..0262ec6f8 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -691,6 +691,191 @@ func (dr *DatabaseFirewallRules) KV() []map[string]interface{} { return out } +type DatabaseKafkaTopics struct { + DatabaseTopics do.DatabaseTopics +} + +var _ Displayable = &DatabaseKafkaTopics{} + +func (dt *DatabaseKafkaTopics) JSON(out io.Writer) error { + return writeJSON(dt.DatabaseTopics, out) +} + +func (dt *DatabaseKafkaTopics) Cols() []string { + return []string{ + "Name", + "State", + "ReplicationFactor", + } +} + +func (dt *DatabaseKafkaTopics) ColMap() map[string]string { + + return map[string]string{ + "Name": "Name", + "State": "State", + "ReplicationFactor": "ReplicationFactor", + } +} + +func (dt *DatabaseKafkaTopics) KV() []map[string]interface{} { + out := make([]map[string]interface{}, 0, len(dt.DatabaseTopics)) + + for _, t := range dt.DatabaseTopics { + o := map[string]interface{}{ + "Name": t.Name, + "State": t.State, + "ReplicationFactor": *t.ReplicationFactor, + } + out = append(out, o) + } + + return out +} + +type DatabaseKafkaTopic struct { + DatabaseTopic do.DatabaseTopic +} + +var _ Displayable = &DatabaseKafkaTopic{} + +func (dt *DatabaseKafkaTopic) JSON(out io.Writer) error { + return writeJSON(dt.DatabaseTopic, out) +} + +func (dt *DatabaseKafkaTopic) Cols() []string { + return []string{ + "key", + "value", + } +} + +func (dt *DatabaseKafkaTopic) ColMap() map[string]string { + + return map[string]string{ + "key": "key", + "value": "value", + } +} + +func (dt *DatabaseKafkaTopic) KV() []map[string]interface{} { + t := dt.DatabaseTopic + o := []map[string]interface{}{ + { + "key": "Name", + "value": t.Name, + }, + { + "key": "State", + "value": t.State, + }, + { + "key": "ReplicationFactor", + "value": *t.ReplicationFactor, + }, + { + "key": "PartitionCount", + "value": len(t.Partitions), + }, + } + + if t.Config != nil { + cfg := []map[string]interface{}{ + { + "key": "CleanupPolicy", + "value": t.Config.CleanupPolicy, + }, + { + "key": "CompressionType", + "value": t.Config.CompressionType, + }, + { + "key": "DeleteRetentionMS", + "value": *t.Config.DeleteRetentionMS, + }, + { + "key": "FileDeleteDelayMS", + "value": *t.Config.FileDeleteDelayMS, + }, + { + "key": "FlushMessages", + "value": *t.Config.FlushMessages, + }, + { + "key": "FlushMS", + "value": *t.Config.FlushMS, + }, + { + "key": "IndexIntervalBytes", + "value": *t.Config.IndexIntervalBytes, + }, + { + "key": "MaxCompactionLagMS", + "value": *t.Config.MaxCompactionLagMS, + }, + { + "key": "MessageDownConversionEnable", + "value": *t.Config.MessageDownConversionEnable, + }, + { + "key": "MessageFormatVersion", + "value": t.Config.MessageFormatVersion, + }, + { + "key": "MessageTimestampDifferentMaxMS", + "value": *t.Config.MessageTimestampDifferenceMaxMS, + }, + { + "key": "MessageTimestampType", + "value": t.Config.MessageTimestampType, + }, + { + "key": "MinCleanableDirtyRatio", + "value": *t.Config.MinCleanableDirtyRatio, + }, + { + "key": "MinCompactionLagMS", + "value": *t.Config.MinCompactionLagMS, + }, + { + "key": "MinInsyncReplicas", + "value": *t.Config.MinInsyncReplicas, + }, + { + "key": "Preallocate", + "value": *t.Config.Preallocate, + }, + { + "key": "RetentionBytes", + "value": *t.Config.RetentionBytes, + }, + { + "key": "RetentionMS", + "value": *t.Config.RetentionMS, + }, + { + "key": "SegmentBytes", + "value": *t.Config.SegmentBytes, + }, + { + "key": "SegmentIndexBytes", + "value": *t.Config.SegmentIndexBytes, + }, + { + "key": "SegmentJitterMS", + "value": *t.Config.SegmentJitterMS, + }, + { + "key": "SegmentMS", + "value": *t.Config.SegmentMS, + }, + } + o = append(o, cfg...) + } + + return o +} + type MySQLConfiguration struct { MySQLConfiguration do.MySQLConfig } From b27b611783b29dd597f8f72331c4fa1076f51a1b Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Mon, 30 Oct 2023 17:53:59 -0700 Subject: [PATCH 03/18] add support for listing partitions --- commands/databases.go | 91 +++++++++++++++++++++++++++++++++ commands/displayers/database.go | 45 ++++++++++++++++ do/databases.go | 5 ++ 3 files changed, 141 insertions(+) diff --git a/commands/databases.go b/commands/databases.go index 13c58290e..456000137 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -141,6 +141,7 @@ Database nodes cannot be resized to smaller sizes due to the risk of data loss.` cmd.AddCommand(databaseFirewalls()) cmd.AddCommand(databaseOptions()) cmd.AddCommand(databaseConfiguration()) + cmd.AddCommand(databaseTopic()) return cmd } @@ -1535,6 +1536,96 @@ func RunDatabaseSetSQLModes(c *CmdConfig) error { return c.Databases().SetSQLMode(databaseID, sqlModes...) } +func RunDatabaseTopicList(c *CmdConfig) error { + if len(c.Args) == 0 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topics, err := c.Databases().ListTopics(databaseID) + if err != nil { + return err + } + item := &displayers.DatabaseKafkaTopics{DatabaseTopics: topics} + return c.Display(item) +} + +func RunDatabaseTopicGet(c *CmdConfig) error { + args := c.Args + if len(args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + topic, err := c.Databases().GetTopic(databaseID, topicName) + if err != nil { + return err + } + + item := &displayers.DatabaseKafkaTopic{DatabaseTopic: *topic} + return c.Display(item) +} + +func RunDatabaseTopicListPartition(c *CmdConfig) error { + args := c.Args + if len(args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + topic, err := c.Databases().GetTopic(databaseID, topicName) + if err != nil { + return err + } + + item := &displayers.DatabaseKafkaTopicPartitions{DatabaseTopicPartitions: topic.Partitions} + return c.Display(item) +} + +func databaseTopic() *Command { + cmd := &Command{ + Command: &cobra.Command{ + Use: "topics", + Short: `Display commands to manage topics for kafka database clusters`, + Long: `The subcommands under ` + "`" + `doctl databases topics` + "`" + ` enable the management of topics for kafka database clusters`, + }, + } + + topicListDetails := ` +This command lists the following details for each topic in a kafka database cluster: + + - The Name of the topic. + - The State of the topic - The possible values are: "topicstate_active", "configuring", "deleting". + - The Replication Factor of the topic - number of brokers the topic's partitions are replicated across. + ` + + topicGetDetails := ` +This command lists the following details for a given topic in a kafka database cluster: + + - The Name of the topic. + - The Partitions of the topic - the number of partitions in the topics + - The Replication Factor of the topic - number of brokers the topic's partitions are replicated across. + - Additional advanced configuration for the topic. + +The details of the topic are listed in key/value pairs + ` + topicGetPartitionDetails := ` +This command lists the following details for each partition of a given topic in a kafka database cluster: + + - The Id - identifier of the topic partition. + - The Size - size of the topic partition, in bytes. + - The InSyncReplicas - number of brokers that are in sync with the partition leader. + - The EarliestOffset - earliest offset read amongst all consumers of the partition. + ` + + CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, aliasOpt("ls")) + CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, aliasOpt("g")) + CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) + return cmd +} + func databaseFirewalls() *Command { cmd := &Command{ Command: &cobra.Command{ diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 0262ec6f8..c6e1dcfe9 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -733,6 +733,51 @@ func (dt *DatabaseKafkaTopics) KV() []map[string]interface{} { return out } +type DatabaseKafkaTopicPartitions struct { + DatabaseTopicPartitions []*godo.TopicPartition +} + +var _ Displayable = &DatabaseKafkaTopicPartitions{} + +func (dp *DatabaseKafkaTopicPartitions) JSON(out io.Writer) error { + return writeJSON(dp.DatabaseTopicPartitions, out) +} + +func (dp *DatabaseKafkaTopicPartitions) Cols() []string { + return []string{ + "Id", + "InSyncReplicas", + "EarliestOffset", + "Size", + } +} + +func (dt *DatabaseKafkaTopicPartitions) ColMap() map[string]string { + + return map[string]string{ + "Id": "Id", + "InSyncReplicas": "InSyncReplicas", + "EarliestOffset": "EarliestOffset", + "Size": "Size", + } +} + +func (dt *DatabaseKafkaTopicPartitions) KV() []map[string]interface{} { + out := make([]map[string]interface{}, 0, len(dt.DatabaseTopicPartitions)) + + for _, p := range dt.DatabaseTopicPartitions { + o := map[string]interface{}{ + "Id": p.Id, + "InSyncReplicas": p.InSyncReplicas, + "EarliestOffset": p.EarliestOffset, + "Size": p.Size, + } + out = append(out, o) + } + + return out +} + type DatabaseKafkaTopic struct { DatabaseTopic do.DatabaseTopic } diff --git a/do/databases.go b/do/databases.go index 99857d701..9eb1b35bb 100644 --- a/do/databases.go +++ b/do/databases.go @@ -119,6 +119,11 @@ type DatabaseTopic struct { *godo.DatabaseTopic } +// DatabaseTopicPartitions is a slice of *godo.TopicPartition +type DatabaseTopicPartitions struct { + Partitions []*godo.TopicPartition +} + // DatabasesService is an interface for interacting with DigitalOcean's Database API type DatabasesService interface { List() (Databases, error) From ed6c428329c1263f81e262cc5bb76f36c8de21b2 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Mon, 30 Oct 2023 17:56:54 -0700 Subject: [PATCH 04/18] rename variables --- commands/displayers/database.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/commands/displayers/database.go b/commands/displayers/database.go index c6e1dcfe9..0593ab5c8 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -752,7 +752,7 @@ func (dp *DatabaseKafkaTopicPartitions) Cols() []string { } } -func (dt *DatabaseKafkaTopicPartitions) ColMap() map[string]string { +func (dp *DatabaseKafkaTopicPartitions) ColMap() map[string]string { return map[string]string{ "Id": "Id", @@ -762,10 +762,10 @@ func (dt *DatabaseKafkaTopicPartitions) ColMap() map[string]string { } } -func (dt *DatabaseKafkaTopicPartitions) KV() []map[string]interface{} { - out := make([]map[string]interface{}, 0, len(dt.DatabaseTopicPartitions)) +func (dp *DatabaseKafkaTopicPartitions) KV() []map[string]interface{} { + out := make([]map[string]interface{}, 0, len(dp.DatabaseTopicPartitions)) - for _, p := range dt.DatabaseTopicPartitions { + for _, p := range dp.DatabaseTopicPartitions { o := map[string]interface{}{ "Id": p.Id, "InSyncReplicas": p.InSyncReplicas, From fd30e7e9a1ab2044d0f2112b82bddf93d558e6b4 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Wed, 1 Nov 2023 14:35:42 -0700 Subject: [PATCH 05/18] add ability to specific advanced configuration when creating/updating --- args.go | 51 +++++++ commands/databases.go | 252 +++++++++++++++++++++++++++++++- commands/displayers/database.go | 141 ++++++++++++------ 3 files changed, 394 insertions(+), 50 deletions(-) diff --git a/args.go b/args.go index 6e2499eac..ce110d6f7 100644 --- a/args.go +++ b/args.go @@ -385,6 +385,57 @@ const ( // ArgDatabasePrivateConnectionBool determine if the private connection details should be shown ArgDatabasePrivateConnectionBool = "private" + // ArgDatabaseTopicName is the name of a kafka topic + ArgDatabaseTopicName = "name" + // ArgDatabaseTopicReplicationFactor is the replication factor of a kafka topic + ArgDatabaseTopicReplicationFactor = "replication-factor" + // ArgDatabaseTopicPartitionCount is the number of partitions that are associated with a kafka topic + ArgDatabaseTopicPartitionCount = "partition-count" + // ArgDatabaseTopicCleanupPolicy is the cleanup policy associated with a kafka topic + ArgDatabaseTopicCleanupPolicy = "cleanup-policy" + // ArgDatabaseTopicCompressionType is the compression algorithm used for a kafka topic + ArgDatabaseTopicCompressionType = "compression-type" + // ArgDatabaseTopicDeleteRetentionMS is the amount of time, in ms, to retain delete tombstone markers for a kafka topic + ArgDatabaseTopicDeleteRetentionMS = "delete-retention-ms" + // ArgDatabaseTopicFileDeleteDelayMS is the amount of time, in ms, to wait before deleting a file from the filesystem + ArgDatabaseTopicFileDeleteDelayMS = "file-delete-delay-ms" + // ArgDatabaseTopicFlushMessages is the size, in bytes, of all messages to accumulate on a partition before flushing them to disk + ArgDatabaseTopicFlushMessages = "flush-messages" + // ArgDatabaseTopicFlushMS is the amount of time, in ms, a message is kept in memory before it is flushed to disk + ArgDatabaseTopicFlushMS = "flush-ms" + // ArgDatabaseTopicIntervalIndexBytes is the number of bytes between entries being added into the offset index + ArgDatabaseTopicIntervalIndexBytes = "interval-index-bytes" + // ArgDatabaseTopicMaxCompactionLagMS is the maximum amount of time, in ms, that a message will remain uncompacted (if compaction is enabled) + ArgDatabaseTopicMaxCompactionLagMS = "max-compaction-lag-ms" + // ArgDatabaseTopicMaxMessageBytes is the maximum size, in bytes, of the largest record batch that can be sent to the server + ArgDatabaseTopicMaxMessageBytes = "max-message-bytes" + // ArgDatabaseTopicMesssageDownConversionEnable determines whether brokers should convert messages for consumers expecting older message formats + ArgDatabaseTopicMesssageDownConversionEnable = "message-down-conversion-enable" + // ArgDatabaseTopicMessageFormatVersion is the version used by the broker to append messages to the kafka topic logs + ArgDatabaseTopicMessageFormatVersion = "message-format-version" + // ArgDatabaseTopicMessageTimestampType is the timestamp used for messages + ArgDatabaseTopicMessageTimestampType = "message-timestamp-type" + // ArgDatabaseTopicMinCleanableDirtyRatio is ratio, between 0 and 1, specifying the frequenty of log compaction + ArgDatabaseTopicMinCleanableDirtyRatio = "min-cleanable-dirty-ratio" + // ArgDatabaseTopicMinCompactionLagMS is the minimum time, in ms, that a message will remain uncompacted + ArgDatabaseTopicMinCompactionLagMS = "min-compaction-lag-ms" + // ArgDatabaseTopicMinInsyncReplicas is the minimum number of replicas that must ACK a write for the write to be considered successful + ArgDatabaseTopicMinInsyncReplicas = "min-insync-replicas" + // ArgDatabaseTopicPreallocate determines whether a file should be preallocated on disk when creating a new log segment + ArgDatabaseTopicPreallocate = "preallocate" + // ArgDatabaseTopicRetentionBytes is the maximum size, in bytes, of a topic log before messages are deleted + ArgDatabaseTopicRetentionBytes = "retention-bytes" + // ArgDatabaseTopicRetentionMS is the maximum time, in ms, that a message is retained before deleting it + ArgDatabaseTopicRetentionMS = "retention-ms" + // ArgDatabaseTopicSegmentBytes is the maximum size, in bytes, of a single log file + ArgDatabaseTopicSegmentBytes = "segment-bytes" + // ArgDatabaseTopicSegmentJitterMS is the maximum random jitter, in ms, subtracted from the scheduled segment roll time to avoid thundering herds of segment rolling + ArgDatabaseTopicSegmentJitterMS = "segment-jitter-ms" + // ArgDatabaseTopicSegmentMS is the period of time, in ms, after which the log will be forced to roll if the segment file isn't full + ArgDatabaseTopicSegmentMS = "segment-ms" + // ArgDatabaseTopicUncleanLeaderElectionEnable determines whether to allow replicas that are not insync to be elected as leaders as last resort + ArgDatabaseTopicUncleanLeaderElectionEnable = "unclean-leader-election-enable" + // ArgPrivateNetworkUUID is the flag for VPC UUID ArgPrivateNetworkUUID = "private-network-uuid" diff --git a/commands/databases.go b/commands/databases.go index 456000137..462015604 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "os" + "strconv" "strings" "time" @@ -1551,8 +1552,7 @@ func RunDatabaseTopicList(c *CmdConfig) error { } func RunDatabaseTopicGet(c *CmdConfig) error { - args := c.Args - if len(args) < 2 { + if len(c.Args) < 2 { return doctl.NewMissingArgsErr(c.NS) } @@ -1568,8 +1568,7 @@ func RunDatabaseTopicGet(c *CmdConfig) error { } func RunDatabaseTopicListPartition(c *CmdConfig) error { - args := c.Args - if len(args) < 2 { + if len(c.Args) < 2 { return doctl.NewMissingArgsErr(c.NS) } @@ -1584,6 +1583,198 @@ func RunDatabaseTopicListPartition(c *CmdConfig) error { return c.Display(item) } +func RunDatabaseTopicDelete(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + force, err := c.Doit.GetBool(c.NS, doctl.ArgForce) + if err != nil { + return err + } + + if force || AskForConfirmDelete("kafka topic", 1) == nil { + databaseID := c.Args[0] + topicName := c.Args[1] + return c.Databases().DeleteTopic(databaseID, topicName) + } + + return errOperationAborted +} + +func RunDatabaseTopicCreate(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) + if err != nil { + return err + } + pcUInt32 := uint32(pc) + rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) + if err != nil { + return err + } + rfUInt32 := uint32(rf) + + _, err = c.Databases().CreateTopic(databaseID, &godo.DatabaseCreateTopicRequest{ + Name: topicName, + ReplicationFactor: &rfUInt32, + PartitionCount: &pcUInt32, + Config: getDatabaseTopicConfigArgs(c), + }) + if err != nil { + return err + } + + return nil +} + +func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { + res := &godo.TopicConfig{} + val, err := c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCleanupPolicy) + if err == nil { + res.CleanupPolicy = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCompressionType) + if err == nil { + res.CompressionType = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicDeleteRetentionMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.DeleteRetentionMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFileDeleteDelayMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FileDeleteDelayMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMessages) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FlushMessages = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FlushMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicIntervalIndexBytes) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.IndexIntervalBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxCompactionLagMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MaxCompactionLagMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxMessageBytes) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MaxMessageBytes = &i + } + } + bVal, err := c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicMesssageDownConversionEnable) + if err == nil { + res.MessageDownConversionEnable = bVal + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageFormatVersion) + if err == nil { + res.MessageFormatVersion = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageTimestampType) + if err == nil { + res.MessageTimestampType = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCleanableDirtyRatio) + if err == nil { + i, err := strconv.ParseFloat(val, 32) + if err == nil { + iFloat32 := float32(i) + res.MinCleanableDirtyRatio = &iFloat32 + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCompactionLagMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MinCompactionLagMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinInsyncReplicas) + if err == nil { + i, err := strconv.ParseUint(val, 10, 32) + if err == nil { + iUint32 := uint32(i) + res.MinInsyncReplicas = &iUint32 + } + } + bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicPreallocate) + if err == nil { + res.Preallocate = bVal + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionBytes) + if err == nil { + i, err := strconv.ParseInt(val, 10, 64) + if err == nil { + res.RetentionBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionMS) + if err == nil { + i, err := strconv.ParseInt(val, 10, 64) + if err == nil { + res.RetentionMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentBytes) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentJitterMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentJitterMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentMS) + if err == nil { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentMS = &i + } + } + bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable) + if err == nil { + res.UncleanLeaderElectionEnable = bVal + } + + return res +} + func databaseTopic() *Command { cmd := &Command{ Command: &cobra.Command{ @@ -1623,6 +1814,59 @@ This command lists the following details for each partition of a given topic in CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, aliasOpt("ls")) CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, aliasOpt("g")) CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) + cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) + AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") + cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", + "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create ", Writer, aliasOpt("c")) + AddIntFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicReplicationFactor, "", 2, "Specifies the number of nodes to replicate data across the kafka cluster") + AddIntFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicPartitionCount, "", 1, "Specifies the number of partitions available for the topic") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicCleanupPolicy, "", "delete", + "Specifies the retention policy to use on log segments: Possible values are 'delete', 'compact_delete', 'compact'") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicCompressionType, "", "producer", + "Specifies the compression type for a kafka topic: Possible values are 'producer', 'gzip', 'snappy', 'Iz4', 'zstd', 'uncompressed'") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicDeleteRetentionMS, "", "", + "Specifies how long (in ms) to retain delete tombstone markers for topics") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFileDeleteDelayMS, "", "", + "Specifies the minimum time (in ms) to wait before deleting a file from the filesystem") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFlushMessages, "", "", + "Specifies the maximum number of messages to accumulate on a log partition before messages are flushed to disk") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFlushMS, "", "", + "Specifies the maximum time (in ms) that a message is kept in memory before being flushed to disk") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicIntervalIndexBytes, "", "", + "Specifies the number of bytes between entries being added into the offset index") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMaxCompactionLagMS, "", "", + "Specifies the maximum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMaxMessageBytes, "", "", + "Specifies the largest record batch (in bytes) that can be sent to the server. This is calculated after compression, if compression is enabled") + AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true, + "Specifies whether down-conversion of message formats is enabled to satisfy consumer requests") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMessageFormatVersion, "", "", + `Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be + smaller or equal to the specified version`) + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMessageTimestampType, "", "", + "Specifies whether to use the create time or log append time as the timestamp on a message") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "", + `Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log + could be duplicates before compaction would begin`) + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "", + "Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "", + "Specifies the minimum number of replicas that must ACK a write for it to be considered successful") + AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicPreallocate, "", false, + "Specifies whether a file should be preallocated on disk when creating a new log segment") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicRetentionBytes, "", "", + "Specifies the maximum size (in bytes) before deleting messages. '-1' indicates that there is no limit") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicRetentionMS, "", "", + "Specifies the maximum time (in ms) to store a message before deleting it. '-1' indicates that there is no limit") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentBytes, "", "", + "Specifies the maximum size (in bytes) of a single log file") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentJitterMS, "", "", + "Specifies the maximum time (in ms) for random jitter that is subtracted from the scheduled segment roll time to avoid thundering herd problems") + AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentMS, "", "", + "Specifies the maximum time (in ms) to wait to force a log roll if the segment file isn't full. After this period, the log will be forced to roll") + AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable, "", false, + "Specifies whether to allow replicas that are not insync to be elected as leader as a last resort. This may result in data loss since those leaders are not in sync") + return cmd } diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 0593ab5c8..207f56fb6 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -825,95 +825,144 @@ func (dt *DatabaseKafkaTopic) KV() []map[string]interface{} { } if t.Config != nil { - cfg := []map[string]interface{}{ - { + cfg := make([]map[string]interface{}, 0) + if t.Config.CleanupPolicy != "" { + cfg = append(cfg, map[string]interface{}{ "key": "CleanupPolicy", "value": t.Config.CleanupPolicy, - }, - { + }) + } + if t.Config.CompressionType != "" { + cfg = append(cfg, map[string]interface{}{ "key": "CompressionType", "value": t.Config.CompressionType, - }, - { + }) + } + if t.Config.DeleteRetentionMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "DeleteRetentionMS", "value": *t.Config.DeleteRetentionMS, - }, - { + }) + } + if t.Config.FileDeleteDelayMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "FileDeleteDelayMS", "value": *t.Config.FileDeleteDelayMS, - }, - { + }) + } + if t.Config.FlushMessages != nil { + cfg = append(cfg, map[string]interface{}{ "key": "FlushMessages", "value": *t.Config.FlushMessages, - }, - { + }) + } + if t.Config.FlushMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "FlushMS", "value": *t.Config.FlushMS, - }, - { + }) + } + if t.Config.IndexIntervalBytes != nil { + cfg = append(cfg, map[string]interface{}{ "key": "IndexIntervalBytes", "value": *t.Config.IndexIntervalBytes, - }, - { + }) + } + if t.Config.MaxCompactionLagMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MaxCompactionLagMS", "value": *t.Config.MaxCompactionLagMS, - }, - { + }) + } + if t.Config.MessageDownConversionEnable != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MessageDownConversionEnable", "value": *t.Config.MessageDownConversionEnable, - }, - { + }) + } + if t.Config.MessageFormatVersion != "" { + cfg = append(cfg, map[string]interface{}{ "key": "MessageFormatVersion", "value": t.Config.MessageFormatVersion, - }, - { + }) + } + if t.Config.MessageTimestampDifferenceMaxMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MessageTimestampDifferentMaxMS", "value": *t.Config.MessageTimestampDifferenceMaxMS, - }, - { + }) + } + if t.Config.MessageTimestampType != "" { + cfg = append(cfg, map[string]interface{}{ "key": "MessageTimestampType", "value": t.Config.MessageTimestampType, - }, - { + }) + } + if t.Config.MinCleanableDirtyRatio != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MinCleanableDirtyRatio", "value": *t.Config.MinCleanableDirtyRatio, - }, - { + }) + } + if t.Config.MinCompactionLagMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MinCompactionLagMS", - "value": *t.Config.MinCompactionLagMS, - }, - { + "value": t.Config.MinCompactionLagMS, + }) + } + if t.Config.MinInsyncReplicas != nil { + cfg = append(cfg, map[string]interface{}{ "key": "MinInsyncReplicas", "value": *t.Config.MinInsyncReplicas, - }, - { + }) + } + if t.Config.Preallocate != nil { + cfg = append(cfg, map[string]interface{}{ "key": "Preallocate", "value": *t.Config.Preallocate, - }, - { + }) + } + if t.Config.RetentionBytes != nil { + cfg = append(cfg, map[string]interface{}{ "key": "RetentionBytes", "value": *t.Config.RetentionBytes, - }, - { + }) + } + if t.Config.RetentionMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "RetentionMS", "value": *t.Config.RetentionMS, - }, - { + }) + } + if t.Config.SegmentBytes != nil { + cfg = append(cfg, map[string]interface{}{ "key": "SegmentBytes", "value": *t.Config.SegmentBytes, - }, - { + }) + } + if t.Config.SegmentIndexBytes != nil { + cfg = append(cfg, map[string]interface{}{ "key": "SegmentIndexBytes", "value": *t.Config.SegmentIndexBytes, - }, - { + }) + } + if t.Config.SegmentJitterMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "SegmentJitterMS", "value": *t.Config.SegmentJitterMS, - }, - { + }) + } + if t.Config.SegmentMS != nil { + cfg = append(cfg, map[string]interface{}{ "key": "SegmentMS", "value": *t.Config.SegmentMS, - }, + }) + } + if t.Config.UncleanLeaderElectionEnable != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "UncleanLeaderElectionEnable", + "value": *t.Config.UncleanLeaderElectionEnable, + }) } o = append(o, cfg...) } From 1e3d7e53ff720416a94c9f9019eeb362e8a5be04 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Wed, 1 Nov 2023 14:45:22 -0700 Subject: [PATCH 06/18] add update functionality --- commands/databases.go | 127 +++++++++++++++++++++++++----------------- 1 file changed, 77 insertions(+), 50 deletions(-) diff --git a/commands/databases.go b/commands/databases.go index 462015604..787c9ce21 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1627,11 +1627,34 @@ func RunDatabaseTopicCreate(c *CmdConfig) error { PartitionCount: &pcUInt32, Config: getDatabaseTopicConfigArgs(c), }) + return err +} + +func RunDatabaseTopicUpdate(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) if err != nil { return err } + pcUInt32 := uint32(pc) + rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) + if err != nil { + return err + } + rfUInt32 := uint32(rf) - return nil + err = c.Databases().UpdateTopic(databaseID, topicName, &godo.DatabaseUpdateTopicRequest{ + ReplicationFactor: &rfUInt32, + PartitionCount: &pcUInt32, + Config: getDatabaseTopicConfigArgs(c), + }) + return err } func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { @@ -1818,55 +1841,59 @@ This command lists the following details for each partition of a given topic in AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create ", Writer, aliasOpt("c")) - AddIntFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicReplicationFactor, "", 2, "Specifies the number of nodes to replicate data across the kafka cluster") - AddIntFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicPartitionCount, "", 1, "Specifies the number of partitions available for the topic") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicCleanupPolicy, "", "delete", - "Specifies the retention policy to use on log segments: Possible values are 'delete', 'compact_delete', 'compact'") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicCompressionType, "", "producer", - "Specifies the compression type for a kafka topic: Possible values are 'producer', 'gzip', 'snappy', 'Iz4', 'zstd', 'uncompressed'") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicDeleteRetentionMS, "", "", - "Specifies how long (in ms) to retain delete tombstone markers for topics") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFileDeleteDelayMS, "", "", - "Specifies the minimum time (in ms) to wait before deleting a file from the filesystem") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFlushMessages, "", "", - "Specifies the maximum number of messages to accumulate on a log partition before messages are flushed to disk") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicFlushMS, "", "", - "Specifies the maximum time (in ms) that a message is kept in memory before being flushed to disk") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicIntervalIndexBytes, "", "", - "Specifies the number of bytes between entries being added into the offset index") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMaxCompactionLagMS, "", "", - "Specifies the maximum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMaxMessageBytes, "", "", - "Specifies the largest record batch (in bytes) that can be sent to the server. This is calculated after compression, if compression is enabled") - AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true, - "Specifies whether down-conversion of message formats is enabled to satisfy consumer requests") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMessageFormatVersion, "", "", - `Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be - smaller or equal to the specified version`) - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMessageTimestampType, "", "", - "Specifies whether to use the create time or log append time as the timestamp on a message") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "", - `Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log - could be duplicates before compaction would begin`) - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "", - "Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "", - "Specifies the minimum number of replicas that must ACK a write for it to be considered successful") - AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicPreallocate, "", false, - "Specifies whether a file should be preallocated on disk when creating a new log segment") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicRetentionBytes, "", "", - "Specifies the maximum size (in bytes) before deleting messages. '-1' indicates that there is no limit") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicRetentionMS, "", "", - "Specifies the maximum time (in ms) to store a message before deleting it. '-1' indicates that there is no limit") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentBytes, "", "", - "Specifies the maximum size (in bytes) of a single log file") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentJitterMS, "", "", - "Specifies the maximum time (in ms) for random jitter that is subtracted from the scheduled segment roll time to avoid thundering herd problems") - AddStringFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicSegmentMS, "", "", - "Specifies the maximum time (in ms) to wait to force a log roll if the segment file isn't full. After this period, the log will be forced to roll") - AddBoolFlag(cmdDatabaseTopicCreate, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable, "", false, - "Specifies whether to allow replicas that are not insync to be elected as leader as a last resort. This may result in data loss since those leaders are not in sync") - + cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", + "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) + cmdsWithConfig := []*Command{cmdDatabaseTopicCreate, cmdDatabaseTopicUpdate} + for _, c := range cmdsWithConfig { + AddIntFlag(c, doctl.ArgDatabaseTopicReplicationFactor, "", 2, "Specifies the number of nodes to replicate data across the kafka cluster") + AddIntFlag(c, doctl.ArgDatabaseTopicPartitionCount, "", 1, "Specifies the number of partitions available for the topic") + AddStringFlag(c, doctl.ArgDatabaseTopicCleanupPolicy, "", "delete", + "Specifies the retention policy to use on log segments: Possible values are 'delete', 'compact_delete', 'compact'") + AddStringFlag(c, doctl.ArgDatabaseTopicCompressionType, "", "producer", + "Specifies the compression type for a kafka topic: Possible values are 'producer', 'gzip', 'snappy', 'Iz4', 'zstd', 'uncompressed'") + AddStringFlag(c, doctl.ArgDatabaseTopicDeleteRetentionMS, "", "", + "Specifies how long (in ms) to retain delete tombstone markers for topics") + AddStringFlag(c, doctl.ArgDatabaseTopicFileDeleteDelayMS, "", "", + "Specifies the minimum time (in ms) to wait before deleting a file from the filesystem") + AddStringFlag(c, doctl.ArgDatabaseTopicFlushMessages, "", "", + "Specifies the maximum number of messages to accumulate on a log partition before messages are flushed to disk") + AddStringFlag(c, doctl.ArgDatabaseTopicFlushMS, "", "", + "Specifies the maximum time (in ms) that a message is kept in memory before being flushed to disk") + AddStringFlag(c, doctl.ArgDatabaseTopicIntervalIndexBytes, "", "", + "Specifies the number of bytes between entries being added into the offset index") + AddStringFlag(c, doctl.ArgDatabaseTopicMaxCompactionLagMS, "", "", + "Specifies the maximum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(c, doctl.ArgDatabaseTopicMaxMessageBytes, "", "", + "Specifies the largest record batch (in bytes) that can be sent to the server. This is calculated after compression, if compression is enabled") + AddBoolFlag(c, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true, + "Specifies whether down-conversion of message formats is enabled to satisfy consumer requests") + AddStringFlag(c, doctl.ArgDatabaseTopicMessageFormatVersion, "", "", + `Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be + smaller or equal to the specified version`) + AddStringFlag(c, doctl.ArgDatabaseTopicMessageTimestampType, "", "", + "Specifies whether to use the create time or log append time as the timestamp on a message") + AddStringFlag(c, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "", + `Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log + could be duplicates before compaction would begin`) + AddStringFlag(c, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "", + "Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(c, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "", + "Specifies the minimum number of replicas that must ACK a write for it to be considered successful") + AddBoolFlag(c, doctl.ArgDatabaseTopicPreallocate, "", false, + "Specifies whether a file should be preallocated on disk when creating a new log segment") + AddStringFlag(c, doctl.ArgDatabaseTopicRetentionBytes, "", "", + "Specifies the maximum size (in bytes) before deleting messages. '-1' indicates that there is no limit") + AddStringFlag(c, doctl.ArgDatabaseTopicRetentionMS, "", "", + "Specifies the maximum time (in ms) to store a message before deleting it. '-1' indicates that there is no limit") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentBytes, "", "", + "Specifies the maximum size (in bytes) of a single log file") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentJitterMS, "", "", + "Specifies the maximum time (in ms) for random jitter that is subtracted from the scheduled segment roll time to avoid thundering herd problems") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentMS, "", "", + "Specifies the maximum time (in ms) to wait to force a log roll if the segment file isn't full. After this period, the log will be forced to roll") + AddBoolFlag(c, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable, "", false, + "Specifies whether to allow replicas that are not insync to be elected as leader as a last resort. This may result in data loss since those leaders are not in sync") + } return cmd } From 87f63cf27fc6f38a6a6e10c956a8c3264c7d6ca7 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Wed, 1 Nov 2023 16:13:54 -0700 Subject: [PATCH 07/18] add tests and mocks --- commands/databases.go | 97 +++++++-------- commands/databases_test.go | 208 ++++++++++++++++++++++++++++++++ commands/displayers/database.go | 2 +- do/mocks/DatabasesService.go | 73 +++++++++++ 4 files changed, 328 insertions(+), 52 deletions(-) diff --git a/commands/databases.go b/commands/databases.go index 787c9ce21..a444d84c3 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1610,23 +1610,21 @@ func RunDatabaseTopicCreate(c *CmdConfig) error { databaseID := c.Args[0] topicName := c.Args[1] + createReq := &godo.DatabaseCreateTopicRequest{Name: topicName} + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) - if err != nil { - return err + if err == nil && pc != 0 { + pcUInt32 := uint32(pc) + createReq.PartitionCount = &pcUInt32 } - pcUInt32 := uint32(pc) rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) - if err != nil { - return err + if err == nil && rf != 0 { + rfUInt32 := uint32(rf) + createReq.ReplicationFactor = &rfUInt32 } - rfUInt32 := uint32(rf) + createReq.Config = getDatabaseTopicConfigArgs(c) - _, err = c.Databases().CreateTopic(databaseID, &godo.DatabaseCreateTopicRequest{ - Name: topicName, - ReplicationFactor: &rfUInt32, - PartitionCount: &pcUInt32, - Config: getDatabaseTopicConfigArgs(c), - }) + _, err = c.Databases().CreateTopic(databaseID, createReq) return err } @@ -1638,22 +1636,21 @@ func RunDatabaseTopicUpdate(c *CmdConfig) error { databaseID := c.Args[0] topicName := c.Args[1] + updateReq := &godo.DatabaseUpdateTopicRequest{} + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) - if err != nil { - return err + if err == nil && pc != 0 { + pcUInt32 := uint32(pc) + updateReq.PartitionCount = &pcUInt32 } - pcUInt32 := uint32(pc) rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) - if err != nil { - return err + if err == nil && rf != 0 { + rfUInt32 := uint32(rf) + updateReq.ReplicationFactor = &rfUInt32 } - rfUInt32 := uint32(rf) + updateReq.Config = getDatabaseTopicConfigArgs(c) - err = c.Databases().UpdateTopic(databaseID, topicName, &godo.DatabaseUpdateTopicRequest{ - ReplicationFactor: &rfUInt32, - PartitionCount: &pcUInt32, - Config: getDatabaseTopicConfigArgs(c), - }) + err = c.Databases().UpdateTopic(databaseID, topicName, updateReq) return err } @@ -1664,72 +1661,72 @@ func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { res.CleanupPolicy = val } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCompressionType) - if err == nil { + if err == nil && val != "" { res.CompressionType = val } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicDeleteRetentionMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.DeleteRetentionMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFileDeleteDelayMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.FileDeleteDelayMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMessages) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.FlushMessages = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.FlushMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicIntervalIndexBytes) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.IndexIntervalBytes = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxCompactionLagMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.MaxCompactionLagMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxMessageBytes) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.MaxMessageBytes = &i } } bVal, err := c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicMesssageDownConversionEnable) - if err == nil { + if err == nil && bVal != nil { res.MessageDownConversionEnable = bVal } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageFormatVersion) - if err == nil { + if err == nil && val != "" { res.MessageFormatVersion = val } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageTimestampType) - if err == nil { + if err == nil && val != "" { res.MessageTimestampType = val } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCleanableDirtyRatio) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseFloat(val, 32) if err == nil { iFloat32 := float32(i) @@ -1737,14 +1734,14 @@ func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCompactionLagMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.MinCompactionLagMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinInsyncReplicas) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 32) if err == nil { iUint32 := uint32(i) @@ -1752,46 +1749,46 @@ func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { } } bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicPreallocate) - if err == nil { + if err == nil && bVal != nil { res.Preallocate = bVal } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionBytes) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseInt(val, 10, 64) if err == nil { res.RetentionBytes = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseInt(val, 10, 64) if err == nil { res.RetentionMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentBytes) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.SegmentBytes = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentJitterMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.SegmentJitterMS = &i } } val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentMS) - if err == nil { + if err == nil && val != "" { i, err := strconv.ParseUint(val, 10, 64) if err == nil { res.SegmentMS = &i } } bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable) - if err == nil { + if err == nil && bVal != nil { res.UncleanLeaderElectionEnable = bVal } @@ -1811,7 +1808,7 @@ func databaseTopic() *Command { This command lists the following details for each topic in a kafka database cluster: - The Name of the topic. - - The State of the topic - The possible values are: "topicstate_active", "configuring", "deleting". + - The State of the topic. - The Replication Factor of the topic - number of brokers the topic's partitions are replicated across. ` @@ -1834,8 +1831,8 @@ This command lists the following details for each partition of a given topic in - The EarliestOffset - earliest offset read amongst all consumers of the partition. ` - CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, aliasOpt("ls")) - CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, aliasOpt("g")) + CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) + CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") @@ -1868,13 +1865,11 @@ This command lists the following details for each partition of a given topic in AddBoolFlag(c, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true, "Specifies whether down-conversion of message formats is enabled to satisfy consumer requests") AddStringFlag(c, doctl.ArgDatabaseTopicMessageFormatVersion, "", "", - `Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be - smaller or equal to the specified version`) + "Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be smaller or equal to the specified version") AddStringFlag(c, doctl.ArgDatabaseTopicMessageTimestampType, "", "", "Specifies whether to use the create time or log append time as the timestamp on a message") AddStringFlag(c, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "", - `Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log - could be duplicates before compaction would begin`) + "Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log could be duplicates before compaction would begin") AddStringFlag(c, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "", "Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") AddStringFlag(c, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "", diff --git a/commands/databases_test.go b/commands/databases_test.go index 4b9e52ba6..5f323288b 100644 --- a/commands/databases_test.go +++ b/commands/databases_test.go @@ -65,6 +65,21 @@ var ( }, } + testKafkaDBCluster = do.Database{ + Database: &godo.Database{ + ID: "ea93928g-8se0-929e-m1ns-029daj2k3j12", + Name: "kafka-db-cluster", + RegionSlug: "nyc1", + EngineSlug: "kafka", + VersionSlug: "3.5", + NumNodes: 3, + SizeSlug: "db-s-2vcpu-4gb", + CreatedAt: time.Now(), + Status: "online", + Tags: []string{"testing"}, + }, + } + testDBBackUpCluster = do.Database{ Database: &godo.Database{ ID: "ea4652de-4fe0-11e9-b7ab-df1ef30eab9e", @@ -189,6 +204,36 @@ var ( RedisConfig: &godo.RedisConfig{}, } + topicReplicationFactor = uint32(3) + testKafkaTopic = do.DatabaseTopic{ + DatabaseTopic: &godo.DatabaseTopic{ + Name: "topic1", + State: "active", + Config: &godo.TopicConfig{ + CleanupPolicy: "delete", + }, + Partitions: []*godo.TopicPartition{ + { + Id: 0, + Size: 4096, + EarliestOffset: 0, + InSyncReplicas: 2, + }, + { + Id: 1, + Size: 4096, + EarliestOffset: 4, + InSyncReplicas: 2, + }, + }, + ReplicationFactor: &topicReplicationFactor, + }, + } + + testKafkaTopics = do.DatabaseTopics{ + testKafkaTopic, + } + errTest = errors.New("error") ) @@ -214,6 +259,7 @@ func TestDatabasesCommand(t *testing.T) { "db", "sql-mode", "configuration", + "topics", ) } @@ -290,6 +336,19 @@ func TestDatabaseConfigurationCommand(t *testing.T) { assertCommandNames(t, cmd, "get") } +func TestDatabaseKafkaTopicCommand(t *testing.T) { + cmd := databaseTopic() + assert.NotNil(t, cmd) + assertCommandNames(t, cmd, + "get", + "list", + "delete", + "create", + "update", + "partitions", + ) +} + func TestDatabasesGet(t *testing.T) { // Successful call withTestClient(t, func(config *CmdConfig, tm *tcMocks) { @@ -554,6 +613,155 @@ func TestDatabaseListBackups(t *testing.T) { }) } +func TestDatabaseListKafkaTopics(t *testing.T) { + // Success + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().ListTopics(testKafkaDBCluster.ID).Return(testKafkaTopics, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID) + + err := RunDatabaseTopicList(config) + assert.NoError(t, err) + }) + + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().ListTopics(testKafkaDBCluster.ID).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID) + + err := RunDatabaseTopicList(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseGetKafkaTopic(t *testing.T) { + // Success + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().GetTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicGet(config) + assert.NoError(t, err) + }) + + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().GetTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicGet(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseCreateKafkaTopic(t *testing.T) { + // Success - only topic name + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + createReq := &godo.DatabaseCreateTopicRequest{ + Name: testKafkaTopic.Name, + Config: &godo.TopicConfig{}, + } + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, createReq).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicCreate(config) + assert.NoError(t, err) + }) + // Success - with additional config + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + pc := uint32(len(testKafkaTopic.Partitions)) + createReq := &godo.DatabaseCreateTopicRequest{ + Name: testKafkaTopic.Name, + ReplicationFactor: testKafkaTopic.ReplicationFactor, + PartitionCount: &pc, + Config: &godo.TopicConfig{ + CleanupPolicy: testKafkaTopic.Config.CleanupPolicy, + }, + } + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, createReq).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, pc) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicReplicationFactor, testKafkaTopic.ReplicationFactor) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicCleanupPolicy, testKafkaTopic.Config.CleanupPolicy) + + err := RunDatabaseTopicCreate(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, gomock.AssignableToTypeOf(&godo.DatabaseCreateTopicRequest{})).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicCreate(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseUpdateKafkaTopic(t *testing.T) { + // Success - only partition count + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + currPC := uint32(len(testKafkaTopic.Partitions)) + newPC := currPC + 1 + updateReq := &godo.DatabaseUpdateTopicRequest{ + PartitionCount: &newPC, + Config: &godo.TopicConfig{}, + } + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, updateReq).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, newPC) + + err := RunDatabaseTopicUpdate(config) + assert.NoError(t, err) + }) + // Success - with additional config + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + currPC := uint32(len(testKafkaTopic.Partitions)) + newPC := currPC + 1 + updateReq := &godo.DatabaseUpdateTopicRequest{ + PartitionCount: &newPC, + Config: &godo.TopicConfig{ + CleanupPolicy: testKafkaTopic.Config.CleanupPolicy, + }, + } + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, updateReq).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, newPC) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicCleanupPolicy, testKafkaTopic.Config.CleanupPolicy) + + err := RunDatabaseTopicUpdate(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, gomock.AssignableToTypeOf(&godo.DatabaseUpdateTopicRequest{})).Return(errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicUpdate(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseDeleteKafkaTopic(t *testing.T) { + // Successful + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().DeleteTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgForce, "true") + + err := RunDatabaseTopicDelete(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().DeleteTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(errTest) + + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgForce, "true") + + err := RunDatabaseTopicDelete(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + func TestDatabaseConnectionGet(t *testing.T) { // Success withTestClient(t, func(config *CmdConfig, tm *tcMocks) { diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 207f56fb6..257de05ba 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -907,7 +907,7 @@ func (dt *DatabaseKafkaTopic) KV() []map[string]interface{} { if t.Config.MinCompactionLagMS != nil { cfg = append(cfg, map[string]interface{}{ "key": "MinCompactionLagMS", - "value": t.Config.MinCompactionLagMS, + "value": *t.Config.MinCompactionLagMS, }) } if t.Config.MinInsyncReplicas != nil { diff --git a/do/mocks/DatabasesService.go b/do/mocks/DatabasesService.go index 518c339aa..37e926bfb 100644 --- a/do/mocks/DatabasesService.go +++ b/do/mocks/DatabasesService.go @@ -99,6 +99,21 @@ func (mr *MockDatabasesServiceMockRecorder) CreateReplica(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateReplica", reflect.TypeOf((*MockDatabasesService)(nil).CreateReplica), arg0, arg1) } +// CreateTopic mocks base method. +func (m *MockDatabasesService) CreateTopic(arg0 string, arg1 *godo.DatabaseCreateTopicRequest) (*do.DatabaseTopic, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTopic", arg0, arg1) + ret0, _ := ret[0].(*do.DatabaseTopic) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateTopic indicates an expected call of CreateTopic. +func (mr *MockDatabasesServiceMockRecorder) CreateTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockDatabasesService)(nil).CreateTopic), arg0, arg1) +} + // CreateUser mocks base method. func (m *MockDatabasesService) CreateUser(arg0 string, arg1 *godo.DatabaseCreateUserRequest) (*do.DatabaseUser, error) { m.ctrl.T.Helper() @@ -170,6 +185,20 @@ func (mr *MockDatabasesServiceMockRecorder) DeleteReplica(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteReplica", reflect.TypeOf((*MockDatabasesService)(nil).DeleteReplica), arg0, arg1) } +// DeleteTopic mocks base method. +func (m *MockDatabasesService) DeleteTopic(arg0, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTopic indicates an expected call of DeleteTopic. +func (mr *MockDatabasesServiceMockRecorder) DeleteTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockDatabasesService)(nil).DeleteTopic), arg0, arg1) +} + // DeleteUser mocks base method. func (m *MockDatabasesService) DeleteUser(arg0, arg1 string) error { m.ctrl.T.Helper() @@ -364,6 +393,21 @@ func (mr *MockDatabasesServiceMockRecorder) GetSQLMode(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSQLMode", reflect.TypeOf((*MockDatabasesService)(nil).GetSQLMode), arg0) } +// GetTopic mocks base method. +func (m *MockDatabasesService) GetTopic(arg0, arg1 string) (*do.DatabaseTopic, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopic", arg0, arg1) + ret0, _ := ret[0].(*do.DatabaseTopic) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTopic indicates an expected call of GetTopic. +func (mr *MockDatabasesServiceMockRecorder) GetTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopic", reflect.TypeOf((*MockDatabasesService)(nil).GetTopic), arg0, arg1) +} + // GetUser mocks base method. func (m *MockDatabasesService) GetUser(arg0, arg1 string) (*do.DatabaseUser, error) { m.ctrl.T.Helper() @@ -469,6 +513,21 @@ func (mr *MockDatabasesServiceMockRecorder) ListReplicas(arg0 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListReplicas", reflect.TypeOf((*MockDatabasesService)(nil).ListReplicas), arg0) } +// ListTopics mocks base method. +func (m *MockDatabasesService) ListTopics(arg0 string) (do.DatabaseTopics, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTopics", arg0) + ret0, _ := ret[0].(do.DatabaseTopics) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTopics indicates an expected call of ListTopics. +func (mr *MockDatabasesServiceMockRecorder) ListTopics(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopics", reflect.TypeOf((*MockDatabasesService)(nil).ListTopics), arg0) +} + // ListUsers mocks base method. func (m *MockDatabasesService) ListUsers(arg0 string) (do.DatabaseUsers, error) { m.ctrl.T.Helper() @@ -587,3 +646,17 @@ func (mr *MockDatabasesServiceMockRecorder) UpdateMaintenance(arg0, arg1 any) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMaintenance", reflect.TypeOf((*MockDatabasesService)(nil).UpdateMaintenance), arg0, arg1) } + +// UpdateTopic mocks base method. +func (m *MockDatabasesService) UpdateTopic(arg0, arg1 string, arg2 *godo.DatabaseUpdateTopicRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTopic", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTopic indicates an expected call of UpdateTopic. +func (mr *MockDatabasesServiceMockRecorder) UpdateTopic(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopic", reflect.TypeOf((*MockDatabasesService)(nil).UpdateTopic), arg0, arg1, arg2) +} From c3539a0663ad0ecbf17890b12668f80605f082fa Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Mon, 30 Oct 2023 11:05:28 -0700 Subject: [PATCH 08/18] Support scalable storage for PostgreSQL/MySQL databases (#1454) * Support scalable storage for PostgreSQL/MySQL databases * update displayer to be consistent between short/long versions --- args.go | 2 ++ commands/databases.go | 21 +++++++++++- commands/databases_test.go | 10 ++++-- commands/displayers/database.go | 61 ++++++++++++++++++--------------- 4 files changed, 63 insertions(+), 31 deletions(-) diff --git a/args.go b/args.go index ce110d6f7..e4cd0ca70 100644 --- a/args.go +++ b/args.go @@ -368,6 +368,8 @@ const ( ArgDatabaseEngine = "engine" // ArgDatabaseNumNodes is the number of nodes in the database cluster ArgDatabaseNumNodes = "num-nodes" + // ArgDatabaseStorageSizeMib is the amount of disk space, in MiB, that should be allocated to the database cluster + ArgDatabaseStorageSizeMib = "storage-size-mib" // ArgDatabaseMaintenanceDay is the new day for the maintenance window ArgDatabaseMaintenanceDay = "day" // ArgDatabaseMaintenanceHour is the new hour for the maintenance window diff --git a/commands/databases.go b/commands/databases.go index a444d84c3..06b1ebc1a 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -70,6 +70,7 @@ func Databases() *Command { nodeSizeDetails := "The size of the nodes in the database cluster, e.g. `db-s-1vcpu-1gb`` for a 1 CPU, 1GB node. For a list of available size slugs, visit: https://docs.digitalocean.com/reference/api/api-reference/#tag/Databases" nodeNumberDetails := "The number of nodes in the database cluster. Valid values are 1-3. In addition to the primary node, up to two standby nodes may be added for high availability." + storageSizeMiBDetails := "The amount of disk space allocated to the cluster. Applicable for PostgreSQL and MySQL clusters. This will be set to a default level based on the plan size selected, but can be increased in increments up to a maximum amount. For ranges, visit: https://www.digitalocean.com/pricing/managed-databases" cmdDatabaseCreate := CmdBuilder(cmd, RunDatabaseCreate, "create ", "Create a database cluster", `This command creates a database cluster with the specified name. There are a number of flags that customize the configuration, all of which are optional. Without any flags set, a single-node, single-CPU PostgreSQL database cluster will be created.`, Writer, @@ -77,6 +78,7 @@ There are a number of flags that customize the configuration, all of which are o AddIntFlag(cmdDatabaseCreate, doctl.ArgDatabaseNumNodes, "", defaultDatabaseNodeCount, nodeNumberDetails) AddStringFlag(cmdDatabaseCreate, doctl.ArgRegionSlug, "", defaultDatabaseRegion, "The region where the database cluster will be created, e.g. `nyc1` or `sfo2`") AddStringFlag(cmdDatabaseCreate, doctl.ArgSizeSlug, "", defaultDatabaseNodeSize, nodeSizeDetails) + AddIntFlag(cmdDatabaseCreate, doctl.ArgDatabaseStorageSizeMib, "", 0, storageSizeMiBDetails) AddStringFlag(cmdDatabaseCreate, doctl.ArgDatabaseEngine, "", defaultDatabaseEngine, "The database engine to be used for the cluster. Possible values are: `pg` for PostgreSQL, `mysql`, `redis`, `mongodb`, and `kafka`.") AddStringFlag(cmdDatabaseCreate, doctl.ArgVersion, "", "", "The database engine version, e.g. 14 for PostgreSQL version 14") AddStringFlag(cmdDatabaseCreate, doctl.ArgPrivateNetworkUUID, "", "", "The UUID of a VPC to create the database cluster in; the default VPC for the region will be used if excluded") @@ -116,10 +118,15 @@ You must specify the desired number of nodes and size of the nodes. For example: doctl databases resize ca9f591d-9999-5555-a0ef-1c02d1d1e352 --num-nodes 2 --size db-s-16vcpu-64gb -Database nodes cannot be resized to smaller sizes due to the risk of data loss.`, Writer, +Database nodes cannot be resized to smaller sizes due to the risk of data loss. + +In addition, for PostgreSQL and MySQL clusters, you can provide a disk size in MiB, which will set the total storage (up to a certain range) to the cluster independently. Storage cannot be reduced from its current levels. For example: + + doctl databases resize ca9f591d-9999-5555-a0ef-1c02d1d1e352 --num-nodes 2 --size db-s-16vcpu-64gb --storage-size-mib 2048000`, Writer, aliasOpt("rs")) AddIntFlag(cmdDatabaseResize, doctl.ArgDatabaseNumNodes, "", 0, nodeNumberDetails, requiredOpt()) AddStringFlag(cmdDatabaseResize, doctl.ArgSizeSlug, "", "", nodeSizeDetails, requiredOpt()) + AddIntFlag(cmdDatabaseResize, doctl.ArgDatabaseStorageSizeMib, "", 0, storageSizeMiBDetails) cmdDatabaseMigrate := CmdBuilder(cmd, RunDatabaseMigrate, "migrate ", "Migrate a database cluster to a new region", `This command migrates the specified database cluster to a new region`, Writer, aliasOpt("m")) @@ -294,6 +301,12 @@ func buildDatabaseCreateRequestFromArgs(c *CmdConfig) (*godo.DatabaseCreateReque r.PrivateNetworkUUID = privateNetworkUUID + storageSizeMibInt, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseStorageSizeMib) + if err != nil { + return nil, err + } + r.StorageSizeMib = uint64(storageSizeMibInt) + return r, nil } @@ -496,6 +509,12 @@ func buildDatabaseResizeRequestFromArgs(c *CmdConfig) (*godo.DatabaseResizeReque } r.SizeSlug = size + storageSizeMibInt, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseStorageSizeMib) + if err != nil { + return nil, err + } + r.StorageSizeMib = uint64(storageSizeMibInt) + return r, nil } diff --git a/commands/databases_test.go b/commands/databases_test.go index 5f323288b..3952f6991 100644 --- a/commands/databases_test.go +++ b/commands/databases_test.go @@ -62,6 +62,7 @@ var ( }, PrivateNetworkUUID: "1fe49b6c-ac8e-11e9-98cb-3bec94f411bc", Tags: []string{"testing"}, + StorageSizeMib: 20480, }, } @@ -400,6 +401,7 @@ func TestDatabasesCreate(t *testing.T) { SizeSlug: testDBCluster.SizeSlug, PrivateNetworkUUID: testDBCluster.PrivateNetworkUUID, Tags: testDBCluster.Tags, + StorageSizeMib: testDBCluster.StorageSizeMib, } // Successful call @@ -414,6 +416,7 @@ func TestDatabasesCreate(t *testing.T) { config.Doit.Set(config.NS, doctl.ArgDatabaseNumNodes, testDBCluster.NumNodes) config.Doit.Set(config.NS, doctl.ArgPrivateNetworkUUID, testDBCluster.PrivateNetworkUUID) config.Doit.Set(config.NS, doctl.ArgTag, testDBCluster.Tags) + config.Doit.Set(config.NS, doctl.ArgDatabaseStorageSizeMib, testDBCluster.StorageSizeMib) err := RunDatabaseCreate(config) assert.NoError(t, err) @@ -566,8 +569,9 @@ func TestDatabaseMigrate(t *testing.T) { func TestDatabaseResize(t *testing.T) { r := &godo.DatabaseResizeRequest{ - SizeSlug: testDBCluster.SizeSlug, - NumNodes: testDBCluster.NumNodes, + SizeSlug: testDBCluster.SizeSlug, + NumNodes: testDBCluster.NumNodes, + StorageSizeMib: testDBCluster.StorageSizeMib, } // Success @@ -576,6 +580,7 @@ func TestDatabaseResize(t *testing.T) { config.Args = append(config.Args, testDBCluster.ID) config.Doit.Set(config.NS, doctl.ArgSizeSlug, testDBCluster.SizeSlug) config.Doit.Set(config.NS, doctl.ArgDatabaseNumNodes, testDBCluster.NumNodes) + config.Doit.Set(config.NS, doctl.ArgDatabaseStorageSizeMib, testDBCluster.StorageSizeMib) err := RunDatabaseResize(config) assert.NoError(t, err) @@ -587,6 +592,7 @@ func TestDatabaseResize(t *testing.T) { config.Args = append(config.Args, testDBCluster.ID) config.Doit.Set(config.NS, doctl.ArgSizeSlug, testDBCluster.SizeSlug) config.Doit.Set(config.NS, doctl.ArgDatabaseNumNodes, testDBCluster.NumNodes) + config.Doit.Set(config.NS, doctl.ArgDatabaseStorageSizeMib, testDBCluster.StorageSizeMib) err := RunDatabaseResize(config) assert.EqualError(t, err, errTest.Error()) diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 257de05ba..682982ac1 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -45,6 +45,7 @@ func (d *Databases) Cols() []string { "Region", "Status", "Size", + "StorageMib", } } @@ -59,34 +60,37 @@ func (d *Databases) Cols() []string { "Size", "URI", "Created", + "StorageMib", } } func (d *Databases) ColMap() map[string]string { if d.Short { return map[string]string{ - "ID": "ID", - "Name": "Name", - "Engine": "Engine", - "Version": "Version", - "NumNodes": "Number of Nodes", - "Region": "Region", - "Status": "Status", - "Size": "Size", + "ID": "ID", + "Name": "Name", + "Engine": "Engine", + "Version": "Version", + "NumNodes": "Number of Nodes", + "Region": "Region", + "Status": "Status", + "Size": "Size", + "StorageMib": "Storage (MiB)", } } return map[string]string{ - "ID": "ID", - "Name": "Name", - "Engine": "Engine", - "Version": "Version", - "NumNodes": "Number of Nodes", - "Region": "Region", - "Status": "Status", - "Size": "Size", - "URI": "URI", - "Created": "Created At", + "ID": "ID", + "Name": "Name", + "Engine": "Engine", + "Version": "Version", + "NumNodes": "Number of Nodes", + "Region": "Region", + "Status": "Status", + "Size": "Size", + "StorageMib": "Storage (MiB)", + "URI": "URI", + "Created": "Created At", } } @@ -95,16 +99,17 @@ func (d *Databases) KV() []map[string]interface{} { for _, db := range d.Databases { o := map[string]interface{}{ - "ID": db.ID, - "Name": db.Name, - "Engine": db.EngineSlug, - "Version": db.VersionSlug, - "NumNodes": db.NumNodes, - "Region": db.RegionSlug, - "Status": db.Status, - "Size": db.SizeSlug, - "URI": db.Connection.URI, - "Created": db.CreatedAt, + "ID": db.ID, + "Name": db.Name, + "Engine": db.EngineSlug, + "Version": db.VersionSlug, + "NumNodes": db.NumNodes, + "Region": db.RegionSlug, + "Status": db.Status, + "Size": db.SizeSlug, + "StorageMib": db.StorageSizeMib, + "URI": db.Connection.URI, + "Created": db.CreatedAt, } out = append(out, o) } From ead5d27334d9e47608cf6e1848955ed73d2748d8 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 30 Oct 2023 14:26:36 -0400 Subject: [PATCH 09/18] build(deps): bump github.com/docker/docker (#1456) Bumps [github.com/docker/docker](https://github.com/docker/docker) from 24.0.5+incompatible to 24.0.7+incompatible. - [Release notes](https://github.com/docker/docker/releases) - [Commits](https://github.com/docker/docker/compare/v24.0.5...v24.0.7) --- updated-dependencies: - dependency-name: github.com/docker/docker dependency-type: direct:production ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- go.mod | 2 +- go.sum | 4 ++-- .../github.com/docker/docker/api/swagger.yaml | 4 ++-- .../docker/docker/api/types/filters/parse.go | 10 +++++----- .../docker/api/types/versions/compare.go | 8 ++++---- .../docker/docker/pkg/archive/diff.go | 19 +++++++++++++++++++ vendor/modules.txt | 2 +- 7 files changed, 34 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 406ebf438..d57b5b1e2 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/creack/pty v1.1.11 github.com/digitalocean/godo v1.105.0 github.com/docker/cli v24.0.5+incompatible - github.com/docker/docker v24.0.5+incompatible + github.com/docker/docker v24.0.7+incompatible github.com/docker/docker-credential-helpers v0.7.0 // indirect github.com/dustin/go-humanize v1.0.0 github.com/fatih/color v1.13.0 diff --git a/go.sum b/go.sum index 5676ceedb..29bad1197 100644 --- a/go.sum +++ b/go.sum @@ -109,8 +109,8 @@ github.com/docker/cli v24.0.5+incompatible h1:WeBimjvS0eKdH4Ygx+ihVq1Q++xg36M/rM github.com/docker/cli v24.0.5+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8= github.com/docker/distribution v2.8.2+incompatible h1:T3de5rq0dB1j30rp0sA2rER+m322EBzniBPB6ZIzuh8= github.com/docker/distribution v2.8.2+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= -github.com/docker/docker v24.0.5+incompatible h1:WmgcE4fxyI6EEXxBRxsHnZXrO1pQ3smi0k/jho4HLeY= -github.com/docker/docker v24.0.5+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/docker v24.0.7+incompatible h1:Wo6l37AuwP3JaMnZa226lzVXGA3F9Ig1seQen0cKYlM= +github.com/docker/docker v24.0.7+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= github.com/docker/docker-credential-helpers v0.7.0 h1:xtCHsjxogADNZcdv1pKUHXryefjlVRqWqIhk/uXJp0A= github.com/docker/docker-credential-helpers v0.7.0/go.mod h1:rETQfLdHNT3foU5kuNkFR1R1V12OJRRO5lzt2D1b5X0= github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKohAFqRJQ= diff --git a/vendor/github.com/docker/docker/api/swagger.yaml b/vendor/github.com/docker/docker/api/swagger.yaml index a820f996f..7635b9f66 100644 --- a/vendor/github.com/docker/docker/api/swagger.yaml +++ b/vendor/github.com/docker/docker/api/swagger.yaml @@ -5068,7 +5068,7 @@ definitions: Go runtime (`GOOS`). Currently returned values are "linux" and "windows". A full list of - possible values can be found in the [Go documentation](https://golang.org/doc/install/source#environment). + possible values can be found in the [Go documentation](https://go.dev/doc/install/source#environment). type: "string" example: "linux" Architecture: @@ -5076,7 +5076,7 @@ definitions: Hardware architecture of the host, as returned by the Go runtime (`GOARCH`). - A full list of possible values can be found in the [Go documentation](https://golang.org/doc/install/source#environment). + A full list of possible values can be found in the [Go documentation](https://go.dev/doc/install/source#environment). type: "string" example: "x86_64" NCPU: diff --git a/vendor/github.com/docker/docker/api/types/filters/parse.go b/vendor/github.com/docker/docker/api/types/filters/parse.go index 887648cf3..0c39ab5f1 100644 --- a/vendor/github.com/docker/docker/api/types/filters/parse.go +++ b/vendor/github.com/docker/docker/api/types/filters/parse.go @@ -98,7 +98,7 @@ func FromJSON(p string) (Args, error) { // Fallback to parsing arguments in the legacy slice format deprecated := map[string][]string{} if legacyErr := json.Unmarshal(raw, &deprecated); legacyErr != nil { - return args, invalidFilter{} + return args, &invalidFilter{} } args.fields = deprecatedArgs(deprecated) @@ -206,7 +206,7 @@ func (args Args) GetBoolOrDefault(key string, defaultValue bool) (bool, error) { } if len(fieldValues) == 0 { - return defaultValue, invalidFilter{key, nil} + return defaultValue, &invalidFilter{key, nil} } isFalse := fieldValues["0"] || fieldValues["false"] @@ -216,7 +216,7 @@ func (args Args) GetBoolOrDefault(key string, defaultValue bool) (bool, error) { invalid := !isFalse && !isTrue if conflicting || invalid { - return defaultValue, invalidFilter{key, args.Get(key)} + return defaultValue, &invalidFilter{key, args.Get(key)} } else if isFalse { return false, nil } else if isTrue { @@ -224,7 +224,7 @@ func (args Args) GetBoolOrDefault(key string, defaultValue bool) (bool, error) { } // This code shouldn't be reached. - return defaultValue, unreachableCode{Filter: key, Value: args.Get(key)} + return defaultValue, &unreachableCode{Filter: key, Value: args.Get(key)} } // ExactMatch returns true if the source matches exactly one of the values. @@ -282,7 +282,7 @@ func (args Args) Contains(field string) bool { func (args Args) Validate(accepted map[string]bool) error { for name := range args.fields { if !accepted[name] { - return invalidFilter{name, nil} + return &invalidFilter{name, nil} } } return nil diff --git a/vendor/github.com/docker/docker/api/types/versions/compare.go b/vendor/github.com/docker/docker/api/types/versions/compare.go index 489e917ee..621725a36 100644 --- a/vendor/github.com/docker/docker/api/types/versions/compare.go +++ b/vendor/github.com/docker/docker/api/types/versions/compare.go @@ -16,11 +16,11 @@ func compare(v1, v2 string) int { otherTab = strings.Split(v2, ".") ) - max := len(currTab) - if len(otherTab) > max { - max = len(otherTab) + maxVer := len(currTab) + if len(otherTab) > maxVer { + maxVer = len(otherTab) } - for i := 0; i < max; i++ { + for i := 0; i < maxVer; i++ { var currInt, otherInt int if len(currTab) > i { diff --git a/vendor/github.com/docker/docker/pkg/archive/diff.go b/vendor/github.com/docker/docker/pkg/archive/diff.go index c8c7be747..1a2fb971f 100644 --- a/vendor/github.com/docker/docker/pkg/archive/diff.go +++ b/vendor/github.com/docker/docker/pkg/archive/diff.go @@ -223,6 +223,25 @@ func ApplyUncompressedLayer(dest string, layer io.Reader, options *TarOptions) ( return applyLayerHandler(dest, layer, options, false) } +// IsEmpty checks if the tar archive is empty (doesn't contain any entries). +func IsEmpty(rd io.Reader) (bool, error) { + decompRd, err := DecompressStream(rd) + if err != nil { + return true, fmt.Errorf("failed to decompress archive: %v", err) + } + defer decompRd.Close() + + tarReader := tar.NewReader(decompRd) + if _, err := tarReader.Next(); err != nil { + if err == io.EOF { + return true, nil + } + return false, fmt.Errorf("failed to read next archive header: %v", err) + } + + return false, nil +} + // do the bulk load of ApplyLayer, but allow for not calling DecompressStream func applyLayerHandler(dest string, layer io.Reader, options *TarOptions, decompress bool) (int64, error) { dest = filepath.Clean(dest) diff --git a/vendor/modules.txt b/vendor/modules.txt index 6c960ca1d..bd6e847cb 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -71,7 +71,7 @@ github.com/docker/cli/cli/config/types ## explicit github.com/docker/distribution/digestset github.com/docker/distribution/reference -# github.com/docker/docker v24.0.5+incompatible +# github.com/docker/docker v24.0.7+incompatible ## explicit github.com/docker/docker/api github.com/docker/docker/api/types From c23db21c11a767d320babd9f925f4c8f8e22a595 Mon Sep 17 00:00:00 2001 From: Daniel Weinshenker Date: Tue, 14 Nov 2023 10:25:51 -0800 Subject: [PATCH 10/18] remove unclean_leader_election_enable --- args.go | 2 -- commands/databases.go | 27 +-------------------------- commands/displayers/database.go | 6 ------ 3 files changed, 1 insertion(+), 34 deletions(-) diff --git a/args.go b/args.go index e4cd0ca70..b1beb074f 100644 --- a/args.go +++ b/args.go @@ -435,8 +435,6 @@ const ( ArgDatabaseTopicSegmentJitterMS = "segment-jitter-ms" // ArgDatabaseTopicSegmentMS is the period of time, in ms, after which the log will be forced to roll if the segment file isn't full ArgDatabaseTopicSegmentMS = "segment-ms" - // ArgDatabaseTopicUncleanLeaderElectionEnable determines whether to allow replicas that are not insync to be elected as leaders as last resort - ArgDatabaseTopicUncleanLeaderElectionEnable = "unclean-leader-election-enable" // ArgPrivateNetworkUUID is the flag for VPC UUID ArgPrivateNetworkUUID = "private-network-uuid" diff --git a/commands/databases.go b/commands/databases.go index 06b1ebc1a..429984b99 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -70,7 +70,6 @@ func Databases() *Command { nodeSizeDetails := "The size of the nodes in the database cluster, e.g. `db-s-1vcpu-1gb`` for a 1 CPU, 1GB node. For a list of available size slugs, visit: https://docs.digitalocean.com/reference/api/api-reference/#tag/Databases" nodeNumberDetails := "The number of nodes in the database cluster. Valid values are 1-3. In addition to the primary node, up to two standby nodes may be added for high availability." - storageSizeMiBDetails := "The amount of disk space allocated to the cluster. Applicable for PostgreSQL and MySQL clusters. This will be set to a default level based on the plan size selected, but can be increased in increments up to a maximum amount. For ranges, visit: https://www.digitalocean.com/pricing/managed-databases" cmdDatabaseCreate := CmdBuilder(cmd, RunDatabaseCreate, "create ", "Create a database cluster", `This command creates a database cluster with the specified name. There are a number of flags that customize the configuration, all of which are optional. Without any flags set, a single-node, single-CPU PostgreSQL database cluster will be created.`, Writer, @@ -78,7 +77,6 @@ There are a number of flags that customize the configuration, all of which are o AddIntFlag(cmdDatabaseCreate, doctl.ArgDatabaseNumNodes, "", defaultDatabaseNodeCount, nodeNumberDetails) AddStringFlag(cmdDatabaseCreate, doctl.ArgRegionSlug, "", defaultDatabaseRegion, "The region where the database cluster will be created, e.g. `nyc1` or `sfo2`") AddStringFlag(cmdDatabaseCreate, doctl.ArgSizeSlug, "", defaultDatabaseNodeSize, nodeSizeDetails) - AddIntFlag(cmdDatabaseCreate, doctl.ArgDatabaseStorageSizeMib, "", 0, storageSizeMiBDetails) AddStringFlag(cmdDatabaseCreate, doctl.ArgDatabaseEngine, "", defaultDatabaseEngine, "The database engine to be used for the cluster. Possible values are: `pg` for PostgreSQL, `mysql`, `redis`, `mongodb`, and `kafka`.") AddStringFlag(cmdDatabaseCreate, doctl.ArgVersion, "", "", "The database engine version, e.g. 14 for PostgreSQL version 14") AddStringFlag(cmdDatabaseCreate, doctl.ArgPrivateNetworkUUID, "", "", "The UUID of a VPC to create the database cluster in; the default VPC for the region will be used if excluded") @@ -118,15 +116,10 @@ You must specify the desired number of nodes and size of the nodes. For example: doctl databases resize ca9f591d-9999-5555-a0ef-1c02d1d1e352 --num-nodes 2 --size db-s-16vcpu-64gb -Database nodes cannot be resized to smaller sizes due to the risk of data loss. - -In addition, for PostgreSQL and MySQL clusters, you can provide a disk size in MiB, which will set the total storage (up to a certain range) to the cluster independently. Storage cannot be reduced from its current levels. For example: - - doctl databases resize ca9f591d-9999-5555-a0ef-1c02d1d1e352 --num-nodes 2 --size db-s-16vcpu-64gb --storage-size-mib 2048000`, Writer, +Database nodes cannot be resized to smaller sizes due to the risk of data loss.`, Writer, aliasOpt("rs")) AddIntFlag(cmdDatabaseResize, doctl.ArgDatabaseNumNodes, "", 0, nodeNumberDetails, requiredOpt()) AddStringFlag(cmdDatabaseResize, doctl.ArgSizeSlug, "", "", nodeSizeDetails, requiredOpt()) - AddIntFlag(cmdDatabaseResize, doctl.ArgDatabaseStorageSizeMib, "", 0, storageSizeMiBDetails) cmdDatabaseMigrate := CmdBuilder(cmd, RunDatabaseMigrate, "migrate ", "Migrate a database cluster to a new region", `This command migrates the specified database cluster to a new region`, Writer, aliasOpt("m")) @@ -301,12 +294,6 @@ func buildDatabaseCreateRequestFromArgs(c *CmdConfig) (*godo.DatabaseCreateReque r.PrivateNetworkUUID = privateNetworkUUID - storageSizeMibInt, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseStorageSizeMib) - if err != nil { - return nil, err - } - r.StorageSizeMib = uint64(storageSizeMibInt) - return r, nil } @@ -509,12 +496,6 @@ func buildDatabaseResizeRequestFromArgs(c *CmdConfig) (*godo.DatabaseResizeReque } r.SizeSlug = size - storageSizeMibInt, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseStorageSizeMib) - if err != nil { - return nil, err - } - r.StorageSizeMib = uint64(storageSizeMibInt) - return r, nil } @@ -1806,10 +1787,6 @@ func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { res.SegmentMS = &i } } - bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable) - if err == nil && bVal != nil { - res.UncleanLeaderElectionEnable = bVal - } return res } @@ -1905,8 +1882,6 @@ This command lists the following details for each partition of a given topic in "Specifies the maximum time (in ms) for random jitter that is subtracted from the scheduled segment roll time to avoid thundering herd problems") AddStringFlag(c, doctl.ArgDatabaseTopicSegmentMS, "", "", "Specifies the maximum time (in ms) to wait to force a log roll if the segment file isn't full. After this period, the log will be forced to roll") - AddBoolFlag(c, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable, "", false, - "Specifies whether to allow replicas that are not insync to be elected as leader as a last resort. This may result in data loss since those leaders are not in sync") } return cmd } diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 682982ac1..ea9c6207a 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -963,12 +963,6 @@ func (dt *DatabaseKafkaTopic) KV() []map[string]interface{} { "value": *t.Config.SegmentMS, }) } - if t.Config.UncleanLeaderElectionEnable != nil { - cfg = append(cfg, map[string]interface{}{ - "key": "UncleanLeaderElectionEnable", - "value": *t.Config.UncleanLeaderElectionEnable, - }) - } o = append(o, cfg...) } From 890c08c3c9fe5d65080eb68e902981decf6f9510 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:43:55 -0800 Subject: [PATCH 11/18] Update args.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- args.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/args.go b/args.go index b1beb074f..b56b22f12 100644 --- a/args.go +++ b/args.go @@ -387,8 +387,6 @@ const ( // ArgDatabasePrivateConnectionBool determine if the private connection details should be shown ArgDatabasePrivateConnectionBool = "private" - // ArgDatabaseTopicName is the name of a kafka topic - ArgDatabaseTopicName = "name" // ArgDatabaseTopicReplicationFactor is the replication factor of a kafka topic ArgDatabaseTopicReplicationFactor = "replication-factor" // ArgDatabaseTopicPartitionCount is the number of partitions that are associated with a kafka topic From 97a66e76308abdc99399a1a0b212d1d4352f5a1b Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:44:07 -0800 Subject: [PATCH 12/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index 498b21c80..abe605eb1 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1846,7 +1846,7 @@ This command lists the following details for each partition of a given topic in - The EarliestOffset - earliest offset read amongst all consumers of the partition. ` - CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) + CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) From 3ff72a675517842ea0ecfd6908e93cf8ee41a305 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:44:15 -0800 Subject: [PATCH 13/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index abe605eb1..a2fa2c6e0 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1847,7 +1847,7 @@ This command lists the following details for each partition of a given topic in ` CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) - CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) + CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") From 5c036da6a3bdb95355f5fa07f63ea3d2f7c4a7a1 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:44:28 -0800 Subject: [PATCH 14/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index a2fa2c6e0..8a7e2f9b3 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1849,7 +1849,7 @@ This command lists the following details for each partition of a given topic in CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) - cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) + cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create ", Writer, aliasOpt("c")) From ee7f420bb5008814924b60ddb6a65e2642e64119 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:44:43 -0800 Subject: [PATCH 15/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index 8a7e2f9b3..ea643ef2f 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1851,7 +1851,7 @@ This command lists the following details for each partition of a given topic in CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") - cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", + cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create ", Writer, aliasOpt("c")) cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) From 241ca861678dc4323b3387ef1dad72af97339ed9 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:45:10 -0800 Subject: [PATCH 16/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index ea643ef2f..eb949d3ef 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1852,7 +1852,7 @@ This command lists the following details for each partition of a given topic in cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", - "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create ", Writer, aliasOpt("c")) + "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create --replication_factor 2 --partition_count 4", Writer, aliasOpt("c")) cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) cmdsWithConfig := []*Command{cmdDatabaseTopicCreate, cmdDatabaseTopicUpdate} From 1e887da1c2a095b7dfa325ffc7c5f1abd64683dc Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:45:21 -0800 Subject: [PATCH 17/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index eb949d3ef..13adfacc2 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1853,7 +1853,7 @@ This command lists the following details for each partition of a given topic in AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create --replication_factor 2 --partition_count 4", Writer, aliasOpt("c")) - cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", + cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) cmdsWithConfig := []*Command{cmdDatabaseTopicCreate, cmdDatabaseTopicUpdate} for _, c := range cmdsWithConfig { From 10a6ab7b2955e592c35e8937b05e74420b7a8a39 Mon Sep 17 00:00:00 2001 From: Daniel-Weinshenker <9273337+dweinshenker@users.noreply.github.com> Date: Fri, 17 Nov 2023 08:45:31 -0800 Subject: [PATCH 18/18] Update commands/databases.go Co-authored-by: danaelhe <42972711+danaelhe@users.noreply.github.com> --- commands/databases.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/commands/databases.go b/commands/databases.go index 13adfacc2..7a4c5fe68 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -1854,7 +1854,7 @@ This command lists the following details for each partition of a given topic in cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create --replication_factor 2 --partition_count 4", Writer, aliasOpt("c")) cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", - "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) + "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) cmdsWithConfig := []*Command{cmdDatabaseTopicCreate, cmdDatabaseTopicUpdate} for _, c := range cmdsWithConfig { AddIntFlag(c, doctl.ArgDatabaseTopicReplicationFactor, "", 2, "Specifies the number of nodes to replicate data across the kafka cluster")