Skip to content

Commit

Permalink
add tests and mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
dweinshenker committed Nov 1, 2023
1 parent 1e3d7e5 commit 87f63cf
Show file tree
Hide file tree
Showing 4 changed files with 328 additions and 52 deletions.
97 changes: 46 additions & 51 deletions commands/databases.go
Original file line number Diff line number Diff line change
Expand Up @@ -1610,23 +1610,21 @@ func RunDatabaseTopicCreate(c *CmdConfig) error {
databaseID := c.Args[0]
topicName := c.Args[1]

createReq := &godo.DatabaseCreateTopicRequest{Name: topicName}

pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount)
if err != nil {
return err
if err == nil && pc != 0 {
pcUInt32 := uint32(pc)
createReq.PartitionCount = &pcUInt32
}
pcUInt32 := uint32(pc)
rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor)
if err != nil {
return err
if err == nil && rf != 0 {
rfUInt32 := uint32(rf)
createReq.ReplicationFactor = &rfUInt32
}
rfUInt32 := uint32(rf)
createReq.Config = getDatabaseTopicConfigArgs(c)

_, err = c.Databases().CreateTopic(databaseID, &godo.DatabaseCreateTopicRequest{
Name: topicName,
ReplicationFactor: &rfUInt32,
PartitionCount: &pcUInt32,
Config: getDatabaseTopicConfigArgs(c),
})
_, err = c.Databases().CreateTopic(databaseID, createReq)
return err
}

Expand All @@ -1638,22 +1636,21 @@ func RunDatabaseTopicUpdate(c *CmdConfig) error {
databaseID := c.Args[0]
topicName := c.Args[1]

updateReq := &godo.DatabaseUpdateTopicRequest{}

pc, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicPartitionCount)
if err != nil {
return err
if err == nil && pc != 0 {
pcUInt32 := uint32(pc)
updateReq.PartitionCount = &pcUInt32
}
pcUInt32 := uint32(pc)
rf, err := c.Doit.GetInt(c.NS, doctl.ArgDatabaseTopicReplicationFactor)
if err != nil {
return err
if err == nil && rf != 0 {
rfUInt32 := uint32(rf)
updateReq.ReplicationFactor = &rfUInt32
}
rfUInt32 := uint32(rf)
updateReq.Config = getDatabaseTopicConfigArgs(c)

err = c.Databases().UpdateTopic(databaseID, topicName, &godo.DatabaseUpdateTopicRequest{
ReplicationFactor: &rfUInt32,
PartitionCount: &pcUInt32,
Config: getDatabaseTopicConfigArgs(c),
})
err = c.Databases().UpdateTopic(databaseID, topicName, updateReq)
return err
}

Expand All @@ -1664,134 +1661,134 @@ func getDatabaseTopicConfigArgs(c *CmdConfig) *godo.TopicConfig {
res.CleanupPolicy = val
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicCompressionType)
if err == nil {
if err == nil && val != "" {
res.CompressionType = val
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicDeleteRetentionMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.DeleteRetentionMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFileDeleteDelayMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.FileDeleteDelayMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMessages)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.FlushMessages = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicFlushMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.FlushMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicIntervalIndexBytes)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.IndexIntervalBytes = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxCompactionLagMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.MaxCompactionLagMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMaxMessageBytes)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.MaxMessageBytes = &i
}
}
bVal, err := c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicMesssageDownConversionEnable)
if err == nil {
if err == nil && bVal != nil {
res.MessageDownConversionEnable = bVal
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageFormatVersion)
if err == nil {
if err == nil && val != "" {
res.MessageFormatVersion = val
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMessageTimestampType)
if err == nil {
if err == nil && val != "" {
res.MessageTimestampType = val
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCleanableDirtyRatio)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseFloat(val, 32)
if err == nil {
iFloat32 := float32(i)
res.MinCleanableDirtyRatio = &iFloat32
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinCompactionLagMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.MinCompactionLagMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicMinInsyncReplicas)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 32)
if err == nil {
iUint32 := uint32(i)
res.MinInsyncReplicas = &iUint32
}
}
bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicPreallocate)
if err == nil {
if err == nil && bVal != nil {
res.Preallocate = bVal
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionBytes)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseInt(val, 10, 64)
if err == nil {
res.RetentionBytes = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicRetentionMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseInt(val, 10, 64)
if err == nil {
res.RetentionMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentBytes)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.SegmentBytes = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentJitterMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.SegmentJitterMS = &i
}
}
val, err = c.Doit.GetString(c.NS, doctl.ArgDatabaseTopicSegmentMS)
if err == nil {
if err == nil && val != "" {
i, err := strconv.ParseUint(val, 10, 64)
if err == nil {
res.SegmentMS = &i
}
}
bVal, err = c.Doit.GetBoolPtr(c.NS, doctl.ArgDatabaseTopicUncleanLeaderElectionEnable)
if err == nil {
if err == nil && bVal != nil {
res.UncleanLeaderElectionEnable = bVal
}

Expand All @@ -1811,7 +1808,7 @@ func databaseTopic() *Command {
This command lists the following details for each topic in a kafka database cluster:
- The Name of the topic.
- The State of the topic - The possible values are: "topicstate_active", "configuring", "deleting".
- The State of the topic.
- The Replication Factor of the topic - number of brokers the topic's partitions are replicated across.
`

Expand All @@ -1834,8 +1831,8 @@ This command lists the following details for each partition of a given topic in
- The EarliestOffset - earliest offset read amongst all consumers of the partition.
`

CmdBuilder(cmd, RunDatabaseTopicList, "list <database-id>", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, aliasOpt("ls"))
CmdBuilder(cmd, RunDatabaseTopicGet, "get <database-id> <topic-name>", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, aliasOpt("g"))
CmdBuilder(cmd, RunDatabaseTopicList, "list <database-id>", "Retrieve a list of topics for a given kafka database", topicListDetails, Writer, displayerType(&displayers.DatabaseKafkaTopics{}), aliasOpt("ls"))
CmdBuilder(cmd, RunDatabaseTopicGet, "get <database-id> <topic-name>", "Retrieve the configuration for a given kafka topic", topicGetDetails, Writer, displayerType(&displayers.DatabaseKafkaTopic{}), aliasOpt("g"))
CmdBuilder(cmd, RunDatabaseTopicListPartition, "partitions <database-id> <topic-name>", "Retrieve the partitions for a given kafka topic", topicGetPartitionDetails, Writer, aliasOpt("p"))
cmdDatabaseTopicDelete := CmdBuilder(cmd, RunDatabaseTopicDelete, "delete <database-id> <topic-name>", "Deletes a kafka topic by topic name", "", Writer, aliasOpt("rm"))
AddBoolFlag(cmdDatabaseTopicDelete, doctl.ArgForce, doctl.ArgShortForce, false, "Deletes the kafka topic without a confirmation prompt")
Expand Down Expand Up @@ -1868,13 +1865,11 @@ This command lists the following details for each partition of a given topic in
AddBoolFlag(c, doctl.ArgDatabaseTopicMesssageDownConversionEnable, "", true,
"Specifies whether down-conversion of message formats is enabled to satisfy consumer requests")
AddStringFlag(c, doctl.ArgDatabaseTopicMessageFormatVersion, "", "",
`Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be
smaller or equal to the specified version`)
"Specifies the message format version used by the broker to append messages to the logs. By setting a format version, all existing messages on disk must be smaller or equal to the specified version")
AddStringFlag(c, doctl.ArgDatabaseTopicMessageTimestampType, "", "",
"Specifies whether to use the create time or log append time as the timestamp on a message")
AddStringFlag(c, doctl.ArgDatabaseTopicMinCleanableDirtyRatio, "", "",
`Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log
could be duplicates before compaction would begin`)
"Specifies the frequenty of log compaction (if enabled) in relation to duplicates present in the logs. For example, 0.5 would mean at most half of the log could be duplicates before compaction would begin")
AddStringFlag(c, doctl.ArgDatabaseTopicMinCompactionLagMS, "", "",
"Specifies the minimum time (in ms) that a message will remain uncompacted. This is only applicable if the logs have compaction enabled")
AddStringFlag(c, doctl.ArgDatabaseTopicMinInsyncReplicas, "", "",
Expand Down
Loading

0 comments on commit 87f63cf

Please sign in to comment.