diff --git a/args.go b/args.go index d2ee82424..b56b22f12 100644 --- a/args.go +++ b/args.go @@ -387,6 +387,53 @@ const ( // ArgDatabasePrivateConnectionBool determine if the private connection details should be shown ArgDatabasePrivateConnectionBool = "private" + // ArgDatabaseTopicReplicationFactor is the replication factor of a kafka topic + ArgDatabaseTopicReplicationFactor = "replication-factor" + // ArgDatabaseTopicPartitionCount is the number of partitions that are associated with a kafka topic + ArgDatabaseTopicPartitionCount = "partition-count" + // ArgDatabaseTopicCleanupPolicy is the cleanup policy associated with a kafka topic + ArgDatabaseTopicCleanupPolicy = "cleanup-policy" + // ArgDatabaseTopicCompressionType is the compression algorithm used for a kafka topic + ArgDatabaseTopicCompressionType = "compression-type" + // ArgDatabaseTopicDeleteRetentionMS is the amount of time, in ms, to retain delete tombstone markers for a kafka topic + ArgDatabaseTopicDeleteRetentionMS = "delete-retention-ms" + // ArgDatabaseTopicFileDeleteDelayMS is the amount of time, in ms, to wait before deleting a file from the filesystem + ArgDatabaseTopicFileDeleteDelayMS = "file-delete-delay-ms" + // ArgDatabaseTopicFlushMessages is the size, in bytes, of all messages to accumulate on a partition before flushing them to disk + ArgDatabaseTopicFlushMessages = "flush-messages" + // ArgDatabaseTopicFlushMS is the amount of time, in ms, a message is kept in memory before it is flushed to disk + ArgDatabaseTopicFlushMS = "flush-ms" + // ArgDatabaseTopicIntervalIndexBytes is the number of bytes between entries being added into the offset index + ArgDatabaseTopicIntervalIndexBytes = "interval-index-bytes" + // ArgDatabaseTopicMaxCompactionLagMS is the maximum amount of time, in ms, that a message will remain uncompacted (if compaction is enabled) + ArgDatabaseTopicMaxCompactionLagMS = "max-compaction-lag-ms" + // ArgDatabaseTopicMaxMessageBytes is the maximum size, in bytes, of the largest record batch that can be sent to the server + ArgDatabaseTopicMaxMessageBytes = "max-message-bytes" + // ArgDatabaseTopicMesssageDownConversionEnable determines whether brokers should convert messages for consumers expecting older message formats + ArgDatabaseTopicMesssageDownConversionEnable = "message-down-conversion-enable" + // ArgDatabaseTopicMessageFormatVersion is the version used by the broker to append messages to the kafka topic logs + ArgDatabaseTopicMessageFormatVersion = "message-format-version" + // ArgDatabaseTopicMessageTimestampType is the timestamp used for messages + ArgDatabaseTopicMessageTimestampType = "message-timestamp-type" + // ArgDatabaseTopicMinCleanableDirtyRatio is ratio, between 0 and 1, specifying the frequenty of log compaction + ArgDatabaseTopicMinCleanableDirtyRatio = "min-cleanable-dirty-ratio" + // ArgDatabaseTopicMinCompactionLagMS is the minimum time, in ms, that a message will remain uncompacted + ArgDatabaseTopicMinCompactionLagMS = "min-compaction-lag-ms" + // ArgDatabaseTopicMinInsyncReplicas is the minimum number of replicas that must ACK a write for the write to be considered successful + ArgDatabaseTopicMinInsyncReplicas = "min-insync-replicas" + // ArgDatabaseTopicPreallocate determines whether a file should be preallocated on disk when creating a new log segment + ArgDatabaseTopicPreallocate = "preallocate" + // ArgDatabaseTopicRetentionBytes is the maximum size, in bytes, of a topic log before messages are deleted + ArgDatabaseTopicRetentionBytes = "retention-bytes" + // ArgDatabaseTopicRetentionMS is the maximum time, in ms, that a message is retained before deleting it + ArgDatabaseTopicRetentionMS = "retention-ms" + // ArgDatabaseTopicSegmentBytes is the maximum size, in bytes, of a single log file + ArgDatabaseTopicSegmentBytes = "segment-bytes" + // ArgDatabaseTopicSegmentJitterMS is the maximum random jitter, in ms, subtracted from the scheduled segment roll time to avoid thundering herds of segment rolling + ArgDatabaseTopicSegmentJitterMS = "segment-jitter-ms" + // ArgDatabaseTopicSegmentMS is the period of time, in ms, after which the log will be forced to roll if the segment file isn't full + ArgDatabaseTopicSegmentMS = "segment-ms" + // ArgPrivateNetworkUUID is the flag for VPC UUID ArgPrivateNetworkUUID = "private-network-uuid" diff --git a/commands/databases.go b/commands/databases.go index 8f0a76566..7a4c5fe68 100644 --- a/commands/databases.go +++ b/commands/databases.go @@ -17,6 +17,7 @@ import ( "errors" "fmt" "os" + "strconv" "strings" "time" @@ -148,6 +149,7 @@ In addition, for PostgreSQL and MySQL clusters, you can provide a disk size in M cmd.AddCommand(databaseFirewalls()) cmd.AddCommand(databaseOptions()) cmd.AddCommand(databaseConfiguration()) + cmd.AddCommand(databaseTopic()) return cmd } @@ -1554,6 +1556,355 @@ func RunDatabaseSetSQLModes(c *CmdConfig) error { return c.Databases().SetSQLMode(databaseID, sqlModes...) } +func RunDatabaseTopicList(c *CmdConfig) error { + if len(c.Args) == 0 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topics, err := c.Databases().ListTopics(databaseID) + if err != nil { + return err + } + item := &displayers.DatabaseKafkaTopics{DatabaseTopics: topics} + return c.Display(item) +} + +func RunDatabaseTopicGet(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + topic, err := c.Databases().GetTopic(databaseID, topicName) + if err != nil { + return err + } + + item := &displayers.DatabaseKafkaTopic{DatabaseTopic: *topic} + return c.Display(item) +} + +func RunDatabaseTopicListPartition(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + topic, err := c.Databases().GetTopic(databaseID, topicName) + if err != nil { + return err + } + + item := &displayers.DatabaseKafkaTopicPartitions{DatabaseTopicPartitions: topic.Partitions} + return c.Display(item) +} + +func RunDatabaseTopicDelete(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + force, err := c.Doit.GetBool(c.NS, doctl.ArgForce) + if err != nil { + return err + } + + if force || AskForConfirmDelete("kafka topic", 1) == nil { + databaseID := c.Args[0] + topicName := c.Args[1] + return c.Databases().DeleteTopic(databaseID, topicName) + } + + return errOperationAborted +} + +func RunDatabaseTopicCreate(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + + createReq := &godo.DatabaseCreateTopicRequest{Name: topicName} + + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) + if err == nil && pc != 0 { + pcUInt32 := uint32(pc) + createReq.PartitionCount = &pcUInt32 + } + rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) + if err == nil && rf != 0 { + rfUInt32 := uint32(rf) + createReq.ReplicationFactor = &rfUInt32 + } + createReq.Config = getDatabaseTopicConfigArgs(c) + + _, err = c.Databases().CreateTopic(databaseID, createReq) + return err +} + +func RunDatabaseTopicUpdate(c *CmdConfig) error { + if len(c.Args) < 2 { + return doctl.NewMissingArgsErr(c.NS) + } + + databaseID := c.Args[0] + topicName := c.Args[1] + + updateReq := &godo.DatabaseUpdateTopicRequest{} + + pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount) + if err == nil && pc != 0 { + pcUInt32 := uint32(pc) + updateReq.PartitionCount = &pcUInt32 + } + rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor) + if err == nil && rf != 0 { + rfUInt32 := uint32(rf) + updateReq.ReplicationFactor = &rfUInt32 + } + updateReq.Config = getDatabaseTopicConfigArgs(c) + + err = c.Databases().UpdateTopic(databaseID, topicName, updateReq) + return err +} + +func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig { + res := &godo.TopicConfig{} + val, err := c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCleanupPolicy) + if err == nil { + res.CleanupPolicy = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCompressionType) + if err == nil && val != "" { + res.CompressionType = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicDeleteRetentionMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.DeleteRetentionMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFileDeleteDelayMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FileDeleteDelayMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMessages) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FlushMessages = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.FlushMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicIntervalIndexBytes) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.IndexIntervalBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxCompactionLagMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MaxCompactionLagMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxMessageBytes) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MaxMessageBytes = &i + } + } + bVal, err := c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicMesssageDownConversionEnable) + if err == nil && bVal != nil { + res.MessageDownConversionEnable = bVal + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageFormatVersion) + if err == nil && val != "" { + res.MessageFormatVersion = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageTimestampType) + if err == nil && val != "" { + res.MessageTimestampType = val + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCleanableDirtyRatio) + if err == nil && val != "" { + i, err := strconv.ParseFloat(val, 32) + if err == nil { + iFloat32 := float32(i) + res.MinCleanableDirtyRatio = &iFloat32 + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCompactionLagMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.MinCompactionLagMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinInsyncReplicas) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 32) + if err == nil { + iUint32 := uint32(i) + res.MinInsyncReplicas = &iUint32 + } + } + bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicPreallocate) + if err == nil && bVal != nil { + res.Preallocate = bVal + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionBytes) + if err == nil && val != "" { + i, err := strconv.ParseInt(val, 10, 64) + if err == nil { + res.RetentionBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionMS) + if err == nil && val != "" { + i, err := strconv.ParseInt(val, 10, 64) + if err == nil { + res.RetentionMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentBytes) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentBytes = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentJitterMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentJitterMS = &i + } + } + val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentMS) + if err == nil && val != "" { + i, err := strconv.ParseUint(val, 10, 64) + if err == nil { + res.SegmentMS = &i + } + } + + return res +} + +func databaseTopic() *Command { + cmd := &Command{ + Command: &cobra.Command{ + Use: "topics", + Short: `Display commands to manage topics for kafka database clusters`, + Long: `The subcommands under ` + "`" + `doctl databases topics` + "`" + ` enable the management of topics for kafka database clusters`, + }, + } + + topicListDetails := ` +This command lists the following details for each topic in a kafka database cluster: + + - The Name of the topic. + - The State of the topic. + - The Replication Factor of the topic - number of brokers the topic's partitions are replicated across. + ` + + topicGetDetails := ` +This command lists the following details for a given topic in a kafka database cluster: + + - The Name of the topic. + - The Partitions of the topic - the number of partitions in the topics + - The Replication Factor of the topic - number of brokers the topic's partitions are replicated across. + - Additional advanced configuration for the topic. + +The details of the topic are listed in key/value pairs + ` + topicGetPartitionDetails := ` +This command lists the following details for each partition of a given topic in a kafka database cluster: + + - The Id - identifier of the topic partition. + - The Size - size of the topic partition, in bytes. + - The InSyncReplicas - number of brokers that are in sync with the partition leader. + - The EarliestOffset - earliest offset read amongst all consumers of the partition. + ` + + CmdBuilder(cmd, RunDatabaseTopicList, "list ", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls")) + CmdBuilder(cmd, RunDatabaseTopicGet, "get ", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g")) + CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions ", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p")) + cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete ", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm")) + AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt") + cmdDatabaseTopicCreate := CmdBuilder(cmd, RunDatabaseTopicCreate, "create ", "Creates a topic for a given kafka database", + "This command creates a kafka topic for the specified kafka database cluster, giving it the specified name. Example: doctl databases topics create --replication_factor 2 --partition_count 4", Writer, aliasOpt("c")) + cmdDatabaseTopicUpdate := CmdBuilder(cmd, RunDatabaseTopicUpdate, "update ", "Updates a topic for a given kafka database", + "This command updates a kafka topic for the specified kafka database cluster. Example: doctl databases topics update ", Writer, aliasOpt("u")) + cmdsWithConfig := []*Command{cmdDatabaseTopicCreate, cmdDatabaseTopicUpdate} + for _, c := range cmdsWithConfig { + AddIntFlag(c, doctl.ArgDatabaseTopicReplicationFactor, "", 2, "Specifies the number of nodes to replicate data across the kafka cluster") + AddIntFlag(c, doctl.ArgDatabaseTopicPartitionCount, "", 1, "Specifies the number of partitions available for the topic") + AddStringFlag(c, doctl.ArgDatabaseTopicCleanupPolicy, "", "delete", + "Specifies the retention policy to use on log segments: Possible values are 'delete', 'compact_delete', 'compact'") + AddStringFlag(c, doctl.ArgDatabaseTopicCompressionType, "", "producer", + "Specifies the compression type for a kafka topic: Possible values are 'producer', 'gzip', 'snappy', 'Iz4', 'zstd', 'uncompressed'") + AddStringFlag(c, doctl.ArgDatabaseTopicDeleteRetentionMS, "", "", + "Specifies how long (in ms) to retain delete tombstone markers for topics") + AddStringFlag(c, doctl.ArgDatabaseTopicFileDeleteDelayMS, "", "", + "Specifies the minimum time (in ms) to wait before deleting a file from the filesystem") + AddStringFlag(c, doctl.ArgDatabaseTopicFlushMessages, "", "", + "Specifies the maximum number of messages to accumulate on a log partition before messages are flushed to disk") + AddStringFlag(c, doctl.ArgDatabaseTopicFlushMS, "", "", + "Specifies the maximum time (in ms) that a message is kept in memory before being flushed to disk") + AddStringFlag(c, doctl.ArgDatabaseTopicIntervalIndexBytes, "", "", + "Specifies the number of bytes between entries being added into the offset index") + AddStringFlag(c, doctl.ArgDatabaseTopicMaxCompactionLagMS, "", "", + "Specifies the maximum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(c, doctl.ArgDatabaseTopicMaxMessageBytes, "", "", + "Specifies the largest record batch (in bytes) that can be sent to the server. This is calculated after compression, if compression is enabled") + AddBoolFlag(c, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true, + "Specifies whether down-conversion of message formats is enabled to satisfy consumer requests") + AddStringFlag(c, doctl.ArgDatabaseTopicMessageFormatVersion, "", "", + "Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be smaller or equal to the specified version") + AddStringFlag(c, doctl.ArgDatabaseTopicMessageTimestampType, "", "", + "Specifies whether to use the create time or log append time as the timestamp on a message") + AddStringFlag(c, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "", + "Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log could be duplicates before compaction would begin") + AddStringFlag(c, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "", + "Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled") + AddStringFlag(c, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "", + "Specifies the minimum number of replicas that must ACK a write for it to be considered successful") + AddBoolFlag(c, doctl.ArgDatabaseTopicPreallocate, "", false, + "Specifies whether a file should be preallocated on disk when creating a new log segment") + AddStringFlag(c, doctl.ArgDatabaseTopicRetentionBytes, "", "", + "Specifies the maximum size (in bytes) before deleting messages. '-1' indicates that there is no limit") + AddStringFlag(c, doctl.ArgDatabaseTopicRetentionMS, "", "", + "Specifies the maximum time (in ms) to store a message before deleting it. '-1' indicates that there is no limit") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentBytes, "", "", + "Specifies the maximum size (in bytes) of a single log file") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentJitterMS, "", "", + "Specifies the maximum time (in ms) for random jitter that is subtracted from the scheduled segment roll time to avoid thundering herd problems") + AddStringFlag(c, doctl.ArgDatabaseTopicSegmentMS, "", "", + "Specifies the maximum time (in ms) to wait to force a log roll if the segment file isn't full. After this period, the log will be forced to roll") + } + return cmd +} + func databaseFirewalls() *Command { cmd := &Command{ Command: &cobra.Command{ diff --git a/commands/databases_test.go b/commands/databases_test.go index 2d9de8748..3952f6991 100644 --- a/commands/databases_test.go +++ b/commands/databases_test.go @@ -66,6 +66,21 @@ var ( }, } + testKafkaDBCluster = do.Database{ + Database: &godo.Database{ + ID: "ea93928g-8se0-929e-m1ns-029daj2k3j12", + Name: "kafka-db-cluster", + RegionSlug: "nyc1", + EngineSlug: "kafka", + VersionSlug: "3.5", + NumNodes: 3, + SizeSlug: "db-s-2vcpu-4gb", + CreatedAt: time.Now(), + Status: "online", + Tags: []string{"testing"}, + }, + } + testDBBackUpCluster = do.Database{ Database: &godo.Database{ ID: "ea4652de-4fe0-11e9-b7ab-df1ef30eab9e", @@ -190,6 +205,36 @@ var ( RedisConfig: &godo.RedisConfig{}, } + topicReplicationFactor = uint32(3) + testKafkaTopic = do.DatabaseTopic{ + DatabaseTopic: &godo.DatabaseTopic{ + Name: "topic1", + State: "active", + Config: &godo.TopicConfig{ + CleanupPolicy: "delete", + }, + Partitions: []*godo.TopicPartition{ + { + Id: 0, + Size: 4096, + EarliestOffset: 0, + InSyncReplicas: 2, + }, + { + Id: 1, + Size: 4096, + EarliestOffset: 4, + InSyncReplicas: 2, + }, + }, + ReplicationFactor: &topicReplicationFactor, + }, + } + + testKafkaTopics = do.DatabaseTopics{ + testKafkaTopic, + } + errTest = errors.New("error") ) @@ -215,6 +260,7 @@ func TestDatabasesCommand(t *testing.T) { "db", "sql-mode", "configuration", + "topics", ) } @@ -291,6 +337,19 @@ func TestDatabaseConfigurationCommand(t *testing.T) { assertCommandNames(t, cmd, "get") } +func TestDatabaseKafkaTopicCommand(t *testing.T) { + cmd := databaseTopic() + assert.NotNil(t, cmd) + assertCommandNames(t, cmd, + "get", + "list", + "delete", + "create", + "update", + "partitions", + ) +} + func TestDatabasesGet(t *testing.T) { // Successful call withTestClient(t, func(config *CmdConfig, tm *tcMocks) { @@ -560,6 +619,155 @@ func TestDatabaseListBackups(t *testing.T) { }) } +func TestDatabaseListKafkaTopics(t *testing.T) { + // Success + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().ListTopics(testKafkaDBCluster.ID).Return(testKafkaTopics, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID) + + err := RunDatabaseTopicList(config) + assert.NoError(t, err) + }) + + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().ListTopics(testKafkaDBCluster.ID).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID) + + err := RunDatabaseTopicList(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseGetKafkaTopic(t *testing.T) { + // Success + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().GetTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicGet(config) + assert.NoError(t, err) + }) + + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().GetTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicGet(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseCreateKafkaTopic(t *testing.T) { + // Success - only topic name + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + createReq := &godo.DatabaseCreateTopicRequest{ + Name: testKafkaTopic.Name, + Config: &godo.TopicConfig{}, + } + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, createReq).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicCreate(config) + assert.NoError(t, err) + }) + // Success - with additional config + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + pc := uint32(len(testKafkaTopic.Partitions)) + createReq := &godo.DatabaseCreateTopicRequest{ + Name: testKafkaTopic.Name, + ReplicationFactor: testKafkaTopic.ReplicationFactor, + PartitionCount: &pc, + Config: &godo.TopicConfig{ + CleanupPolicy: testKafkaTopic.Config.CleanupPolicy, + }, + } + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, createReq).Return(&testKafkaTopic, nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, pc) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicReplicationFactor, testKafkaTopic.ReplicationFactor) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicCleanupPolicy, testKafkaTopic.Config.CleanupPolicy) + + err := RunDatabaseTopicCreate(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().CreateTopic(testKafkaDBCluster.ID, gomock.AssignableToTypeOf(&godo.DatabaseCreateTopicRequest{})).Return(nil, errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicCreate(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseUpdateKafkaTopic(t *testing.T) { + // Success - only partition count + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + currPC := uint32(len(testKafkaTopic.Partitions)) + newPC := currPC + 1 + updateReq := &godo.DatabaseUpdateTopicRequest{ + PartitionCount: &newPC, + Config: &godo.TopicConfig{}, + } + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, updateReq).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, newPC) + + err := RunDatabaseTopicUpdate(config) + assert.NoError(t, err) + }) + // Success - with additional config + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + currPC := uint32(len(testKafkaTopic.Partitions)) + newPC := currPC + 1 + updateReq := &godo.DatabaseUpdateTopicRequest{ + PartitionCount: &newPC, + Config: &godo.TopicConfig{ + CleanupPolicy: testKafkaTopic.Config.CleanupPolicy, + }, + } + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, updateReq).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicPartitionCount, newPC) + config.Doit.Set(config.NS, doctl.ArgDatabaseTopicCleanupPolicy, testKafkaTopic.Config.CleanupPolicy) + + err := RunDatabaseTopicUpdate(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().UpdateTopic(testKafkaDBCluster.ID, testKafkaTopic.Name, gomock.AssignableToTypeOf(&godo.DatabaseUpdateTopicRequest{})).Return(errTest) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + + err := RunDatabaseTopicUpdate(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + +func TestDatabaseDeleteKafkaTopic(t *testing.T) { + // Successful + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().DeleteTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(nil) + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgForce, "true") + + err := RunDatabaseTopicDelete(config) + assert.NoError(t, err) + }) + // Error + withTestClient(t, func(config *CmdConfig, tm *tcMocks) { + tm.databases.EXPECT().DeleteTopic(testKafkaDBCluster.ID, testKafkaTopic.Name).Return(errTest) + + config.Args = append(config.Args, testKafkaDBCluster.ID, testKafkaTopic.Name) + config.Doit.Set(config.NS, doctl.ArgForce, "true") + + err := RunDatabaseTopicDelete(config) + assert.EqualError(t, err, errTest.Error()) + }) +} + func TestDatabaseConnectionGet(t *testing.T) { // Success withTestClient(t, func(config *CmdConfig, tm *tcMocks) { diff --git a/commands/displayers/database.go b/commands/displayers/database.go index 0653d2c4a..ea9c6207a 100644 --- a/commands/displayers/database.go +++ b/commands/displayers/database.go @@ -696,6 +696,279 @@ func (dr *DatabaseFirewallRules) KV() []map[string]interface{} { return out } +type DatabaseKafkaTopics struct { + DatabaseTopics do.DatabaseTopics +} + +var _ Displayable = &DatabaseKafkaTopics{} + +func (dt *DatabaseKafkaTopics) JSON(out io.Writer) error { + return writeJSON(dt.DatabaseTopics, out) +} + +func (dt *DatabaseKafkaTopics) Cols() []string { + return []string{ + "Name", + "State", + "ReplicationFactor", + } +} + +func (dt *DatabaseKafkaTopics) ColMap() map[string]string { + + return map[string]string{ + "Name": "Name", + "State": "State", + "ReplicationFactor": "ReplicationFactor", + } +} + +func (dt *DatabaseKafkaTopics) KV() []map[string]interface{} { + out := make([]map[string]interface{}, 0, len(dt.DatabaseTopics)) + + for _, t := range dt.DatabaseTopics { + o := map[string]interface{}{ + "Name": t.Name, + "State": t.State, + "ReplicationFactor": *t.ReplicationFactor, + } + out = append(out, o) + } + + return out +} + +type DatabaseKafkaTopicPartitions struct { + DatabaseTopicPartitions []*godo.TopicPartition +} + +var _ Displayable = &DatabaseKafkaTopicPartitions{} + +func (dp *DatabaseKafkaTopicPartitions) JSON(out io.Writer) error { + return writeJSON(dp.DatabaseTopicPartitions, out) +} + +func (dp *DatabaseKafkaTopicPartitions) Cols() []string { + return []string{ + "Id", + "InSyncReplicas", + "EarliestOffset", + "Size", + } +} + +func (dp *DatabaseKafkaTopicPartitions) ColMap() map[string]string { + + return map[string]string{ + "Id": "Id", + "InSyncReplicas": "InSyncReplicas", + "EarliestOffset": "EarliestOffset", + "Size": "Size", + } +} + +func (dp *DatabaseKafkaTopicPartitions) KV() []map[string]interface{} { + out := make([]map[string]interface{}, 0, len(dp.DatabaseTopicPartitions)) + + for _, p := range dp.DatabaseTopicPartitions { + o := map[string]interface{}{ + "Id": p.Id, + "InSyncReplicas": p.InSyncReplicas, + "EarliestOffset": p.EarliestOffset, + "Size": p.Size, + } + out = append(out, o) + } + + return out +} + +type DatabaseKafkaTopic struct { + DatabaseTopic do.DatabaseTopic +} + +var _ Displayable = &DatabaseKafkaTopic{} + +func (dt *DatabaseKafkaTopic) JSON(out io.Writer) error { + return writeJSON(dt.DatabaseTopic, out) +} + +func (dt *DatabaseKafkaTopic) Cols() []string { + return []string{ + "key", + "value", + } +} + +func (dt *DatabaseKafkaTopic) ColMap() map[string]string { + + return map[string]string{ + "key": "key", + "value": "value", + } +} + +func (dt *DatabaseKafkaTopic) KV() []map[string]interface{} { + t := dt.DatabaseTopic + o := []map[string]interface{}{ + { + "key": "Name", + "value": t.Name, + }, + { + "key": "State", + "value": t.State, + }, + { + "key": "ReplicationFactor", + "value": *t.ReplicationFactor, + }, + { + "key": "PartitionCount", + "value": len(t.Partitions), + }, + } + + if t.Config != nil { + cfg := make([]map[string]interface{}, 0) + if t.Config.CleanupPolicy != "" { + cfg = append(cfg, map[string]interface{}{ + "key": "CleanupPolicy", + "value": t.Config.CleanupPolicy, + }) + } + if t.Config.CompressionType != "" { + cfg = append(cfg, map[string]interface{}{ + "key": "CompressionType", + "value": t.Config.CompressionType, + }) + } + if t.Config.DeleteRetentionMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "DeleteRetentionMS", + "value": *t.Config.DeleteRetentionMS, + }) + } + if t.Config.FileDeleteDelayMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "FileDeleteDelayMS", + "value": *t.Config.FileDeleteDelayMS, + }) + } + if t.Config.FlushMessages != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "FlushMessages", + "value": *t.Config.FlushMessages, + }) + } + if t.Config.FlushMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "FlushMS", + "value": *t.Config.FlushMS, + }) + } + if t.Config.IndexIntervalBytes != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "IndexIntervalBytes", + "value": *t.Config.IndexIntervalBytes, + }) + } + if t.Config.MaxCompactionLagMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MaxCompactionLagMS", + "value": *t.Config.MaxCompactionLagMS, + }) + } + if t.Config.MessageDownConversionEnable != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MessageDownConversionEnable", + "value": *t.Config.MessageDownConversionEnable, + }) + } + if t.Config.MessageFormatVersion != "" { + cfg = append(cfg, map[string]interface{}{ + "key": "MessageFormatVersion", + "value": t.Config.MessageFormatVersion, + }) + } + if t.Config.MessageTimestampDifferenceMaxMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MessageTimestampDifferentMaxMS", + "value": *t.Config.MessageTimestampDifferenceMaxMS, + }) + } + if t.Config.MessageTimestampType != "" { + cfg = append(cfg, map[string]interface{}{ + "key": "MessageTimestampType", + "value": t.Config.MessageTimestampType, + }) + } + if t.Config.MinCleanableDirtyRatio != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MinCleanableDirtyRatio", + "value": *t.Config.MinCleanableDirtyRatio, + }) + } + if t.Config.MinCompactionLagMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MinCompactionLagMS", + "value": *t.Config.MinCompactionLagMS, + }) + } + if t.Config.MinInsyncReplicas != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "MinInsyncReplicas", + "value": *t.Config.MinInsyncReplicas, + }) + } + if t.Config.Preallocate != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "Preallocate", + "value": *t.Config.Preallocate, + }) + } + if t.Config.RetentionBytes != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "RetentionBytes", + "value": *t.Config.RetentionBytes, + }) + } + if t.Config.RetentionMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "RetentionMS", + "value": *t.Config.RetentionMS, + }) + } + if t.Config.SegmentBytes != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "SegmentBytes", + "value": *t.Config.SegmentBytes, + }) + } + if t.Config.SegmentIndexBytes != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "SegmentIndexBytes", + "value": *t.Config.SegmentIndexBytes, + }) + } + if t.Config.SegmentJitterMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "SegmentJitterMS", + "value": *t.Config.SegmentJitterMS, + }) + } + if t.Config.SegmentMS != nil { + cfg = append(cfg, map[string]interface{}{ + "key": "SegmentMS", + "value": *t.Config.SegmentMS, + }) + } + o = append(o, cfg...) + } + + return o +} + type MySQLConfiguration struct { MySQLConfiguration do.MySQLConfig } diff --git a/do/databases.go b/do/databases.go index 9c5016e75..9eb1b35bb 100644 --- a/do/databases.go +++ b/do/databases.go @@ -111,6 +111,19 @@ type RedisConfig struct { *godo.RedisConfig } +// DatabaseTopics is a slice of DatabaseTopic +type DatabaseTopics []DatabaseTopic + +// DatabaseTopic is a wrapper for godo.DatabaseTopic +type DatabaseTopic struct { + *godo.DatabaseTopic +} + +// DatabaseTopicPartitions is a slice of *godo.TopicPartition +type DatabaseTopicPartitions struct { + Partitions []*godo.TopicPartition +} + // DatabasesService is an interface for interacting with DigitalOcean's Database API type DatabasesService interface { List() (Databases, error) @@ -159,6 +172,12 @@ type DatabasesService interface { GetMySQLConfiguration(databaseID string) (*MySQLConfig, error) GetPostgreSQLConfiguration(databaseID string) (*PostgreSQLConfig, error) GetRedisConfiguration(databaseID string) (*RedisConfig, error) + + ListTopics(string) (DatabaseTopics, error) + GetTopic(string, string) (*DatabaseTopic, error) + CreateTopic(string, *godo.DatabaseCreateTopicRequest) (*DatabaseTopic, error) + UpdateTopic(string, string, *godo.DatabaseUpdateTopicRequest) error + DeleteTopic(string, string) error } type databasesService struct { @@ -618,3 +637,61 @@ func (ds *databasesService) GetRedisConfiguration(databaseID string) (*RedisConf RedisConfig: cfg, }, nil } + +func (ds *databasesService) ListTopics(databaseID string) (DatabaseTopics, error) { + f := func(opt *godo.ListOptions) ([]interface{}, *godo.Response, error) { + list, resp, err := ds.client.Databases.ListTopics(context.TODO(), databaseID, opt) + if err != nil { + return nil, nil, err + } + + si := make([]interface{}, len(list)) + for i := range list { + si[i] = list[i] + } + + return si, resp, err + } + + si, err := PaginateResp(f) + if err != nil { + return nil, err + } + + list := make(DatabaseTopics, len(si)) + for i := range si { + t := si[i].(godo.DatabaseTopic) + list[i] = DatabaseTopic{DatabaseTopic: &t} + } + return list, nil +} + +func (ds *databasesService) CreateTopic(databaseID string, req *godo.DatabaseCreateTopicRequest) (*DatabaseTopic, error) { + t, _, err := ds.client.Databases.CreateTopic(context.TODO(), databaseID, req) + if err != nil { + return nil, err + } + + return &DatabaseTopic{DatabaseTopic: t}, nil +} + +func (ds *databasesService) UpdateTopic(databaseID, topicName string, req *godo.DatabaseUpdateTopicRequest) error { + _, err := ds.client.Databases.UpdateTopic(context.TODO(), databaseID, topicName, req) + + return err +} + +func (ds *databasesService) GetTopic(databaseID, topicName string) (*DatabaseTopic, error) { + t, _, err := ds.client.Databases.GetTopic(context.TODO(), databaseID, topicName) + if err != nil { + return nil, err + } + + return &DatabaseTopic{DatabaseTopic: t}, nil +} + +func (ds *databasesService) DeleteTopic(databaseID, topicName string) error { + _, err := ds.client.Databases.DeleteTopic(context.TODO(), databaseID, topicName) + + return err +} diff --git a/do/mocks/DatabasesService.go b/do/mocks/DatabasesService.go index 518c339aa..37e926bfb 100644 --- a/do/mocks/DatabasesService.go +++ b/do/mocks/DatabasesService.go @@ -99,6 +99,21 @@ func (mr *MockDatabasesServiceMockRecorder) CreateReplica(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateReplica", reflect.TypeOf((*MockDatabasesService)(nil).CreateReplica), arg0, arg1) } +// CreateTopic mocks base method. +func (m *MockDatabasesService) CreateTopic(arg0 string, arg1 *godo.DatabaseCreateTopicRequest) (*do.DatabaseTopic, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateTopic", arg0, arg1) + ret0, _ := ret[0].(*do.DatabaseTopic) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CreateTopic indicates an expected call of CreateTopic. +func (mr *MockDatabasesServiceMockRecorder) CreateTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTopic", reflect.TypeOf((*MockDatabasesService)(nil).CreateTopic), arg0, arg1) +} + // CreateUser mocks base method. func (m *MockDatabasesService) CreateUser(arg0 string, arg1 *godo.DatabaseCreateUserRequest) (*do.DatabaseUser, error) { m.ctrl.T.Helper() @@ -170,6 +185,20 @@ func (mr *MockDatabasesServiceMockRecorder) DeleteReplica(arg0, arg1 any) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteReplica", reflect.TypeOf((*MockDatabasesService)(nil).DeleteReplica), arg0, arg1) } +// DeleteTopic mocks base method. +func (m *MockDatabasesService) DeleteTopic(arg0, arg1 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteTopic", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteTopic indicates an expected call of DeleteTopic. +func (mr *MockDatabasesServiceMockRecorder) DeleteTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTopic", reflect.TypeOf((*MockDatabasesService)(nil).DeleteTopic), arg0, arg1) +} + // DeleteUser mocks base method. func (m *MockDatabasesService) DeleteUser(arg0, arg1 string) error { m.ctrl.T.Helper() @@ -364,6 +393,21 @@ func (mr *MockDatabasesServiceMockRecorder) GetSQLMode(arg0 any) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSQLMode", reflect.TypeOf((*MockDatabasesService)(nil).GetSQLMode), arg0) } +// GetTopic mocks base method. +func (m *MockDatabasesService) GetTopic(arg0, arg1 string) (*do.DatabaseTopic, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetTopic", arg0, arg1) + ret0, _ := ret[0].(*do.DatabaseTopic) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetTopic indicates an expected call of GetTopic. +func (mr *MockDatabasesServiceMockRecorder) GetTopic(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTopic", reflect.TypeOf((*MockDatabasesService)(nil).GetTopic), arg0, arg1) +} + // GetUser mocks base method. func (m *MockDatabasesService) GetUser(arg0, arg1 string) (*do.DatabaseUser, error) { m.ctrl.T.Helper() @@ -469,6 +513,21 @@ func (mr *MockDatabasesServiceMockRecorder) ListReplicas(arg0 any) *gomock.Call return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListReplicas", reflect.TypeOf((*MockDatabasesService)(nil).ListReplicas), arg0) } +// ListTopics mocks base method. +func (m *MockDatabasesService) ListTopics(arg0 string) (do.DatabaseTopics, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ListTopics", arg0) + ret0, _ := ret[0].(do.DatabaseTopics) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ListTopics indicates an expected call of ListTopics. +func (mr *MockDatabasesServiceMockRecorder) ListTopics(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTopics", reflect.TypeOf((*MockDatabasesService)(nil).ListTopics), arg0) +} + // ListUsers mocks base method. func (m *MockDatabasesService) ListUsers(arg0 string) (do.DatabaseUsers, error) { m.ctrl.T.Helper() @@ -587,3 +646,17 @@ func (mr *MockDatabasesServiceMockRecorder) UpdateMaintenance(arg0, arg1 any) *g mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMaintenance", reflect.TypeOf((*MockDatabasesService)(nil).UpdateMaintenance), arg0, arg1) } + +// UpdateTopic mocks base method. +func (m *MockDatabasesService) UpdateTopic(arg0, arg1 string, arg2 *godo.DatabaseUpdateTopicRequest) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateTopic", arg0, arg1, arg2) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateTopic indicates an expected call of UpdateTopic. +func (mr *MockDatabasesServiceMockRecorder) UpdateTopic(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopic", reflect.TypeOf((*MockDatabasesService)(nil).UpdateTopic), arg0, arg1, arg2) +}