diff --git a/balancer.go b/balancer.go index 9319c67..de5abc5 100644 --- a/balancer.go +++ b/balancer.go @@ -27,3 +27,22 @@ func GetBalancerReferenceHash() Balancer { func GetBalancerRoundRobin() Balancer { return &kafka.RoundRobin{} } + +func GetBalancerString(balancer Balancer) string { + switch balancer.(type) { + case *kafka.CRC32Balancer: + return "CRC32Balancer" + case *kafka.Hash: + return "Hash" + case *kafka.LeastBytes: + return "LeastBytes" + case *kafka.Murmur2Balancer: + return "Murmur2Balancer" + case *kafka.ReferenceHash: + return "ReferenceHash" + case *kafka.RoundRobin: + return "RoundRobin" + default: + return "Unknown" + } +} diff --git a/balancer_test.go b/balancer_test.go index 11da72d..39d3734 100644 --- a/balancer_test.go +++ b/balancer_test.go @@ -64,3 +64,54 @@ func TestGetBalancerRoundRobinh(t *testing.T) { t.Errorf("Expected *kafka.RoundRobin, got %s", reflect.TypeOf(balancer).String()) } } + +func TestGetBalancerString(t *testing.T) { + tests := []struct { + name string + balancer Balancer + want string + }{ + { + name: "Should_Return_CRC32Balancer", + balancer: GetBalancerCRC32(), + want: "CRC32Balancer", + }, + { + name: "Should_Return_Hash", + balancer: GetBalancerHash(), + want: "Hash", + }, + { + name: "Should_Return_LeastBytes", + balancer: GetBalancerLeastBytes(), + want: "LeastBytes", + }, + { + name: "Should_Return_Murmur2Balancer", + balancer: GetBalancerMurmur2Balancer(), + want: "Murmur2Balancer", + }, + { + name: "Should_Return_ReferenceHash", + balancer: GetBalancerReferenceHash(), + want: "ReferenceHash", + }, + { + name: "Should_Return_RoundRobin", + balancer: GetBalancerRoundRobin(), + want: "RoundRobin", + }, + { + name: "Should_Return_Unknown", + balancer: nil, + want: "Unknown", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetBalancerString(tt.balancer); got != tt.want { + t.Errorf("GetBalancerString() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/consumer_config.go b/consumer_config.go index e3cf8d1..7a18da1 100644 --- a/consumer_config.go +++ b/consumer_config.go @@ -1,6 +1,11 @@ package kafka import ( + "bytes" + "encoding/json" + "fmt" + "regexp" + "strings" "time" "github.com/segmentio/kafka-go" @@ -61,6 +66,51 @@ type ConsumerConfig struct { MetricPrefix string } +func (cfg RetryConfiguration) JSON() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Topic": %q, "StartTimeCron": %q, "WorkDuration": %q, `+ + `"MaxRetry": %d, "VerifyTopicOnStartup": %t, "Rack": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.Topic, cfg.StartTimeCron, + cfg.WorkDuration, cfg.MaxRetry, cfg.VerifyTopicOnStartup, cfg.Rack) +} + +func (cfg *BatchConfiguration) JSON() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"MessageGroupLimit": %d}`, cfg.MessageGroupLimit) +} + +func (cfg ReaderConfig) JSON() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "GroupId": %q, "GroupTopics": ["%s"], `+ + `"MaxWait": %q, "CommitInterval": %q, "StartOffset": %q}`, + strings.Join(cfg.Brokers, "\", \""), cfg.GroupID, strings.Join(cfg.GroupTopics, "\", \""), + cfg.MaxWait, cfg.CommitInterval, kcronsumer.ToStringOffset(cfg.StartOffset)) +} + +func (cfg *ConsumerConfig) JSON() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"ClientID": %q, "Reader": %s, "BatchConfiguration": %s, "MessageGroupDuration": %q, `+ + `"TransactionalRetry": %t, "Concurrency": %d, "RetryEnabled": %t, "RetryConfiguration": %s, `+ + `"VerifyTopicOnStartup": %t, "Rack": %q, "SASL": %s, "TLS": %s}`, + cfg.ClientID, cfg.Reader.JSON(), cfg.BatchConfiguration.JSON(), + cfg.MessageGroupDuration, *cfg.TransactionalRetry, cfg.Concurrency, + cfg.RetryEnabled, cfg.RetryConfiguration.JSON(), cfg.VerifyTopicOnStartup, + cfg.Rack, cfg.SASL.JSON(), cfg.TLS.JSON()) +} + +func (cfg *ConsumerConfig) JSONPretty() string { + return jsonPretty(cfg.JSON()) +} + +func (cfg *ConsumerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.JSON(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + func (cfg *ConsumerConfig) newCronsumerConfig() *kcronsumer.Config { cronsumerCfg := kcronsumer.Config{ MetricPrefix: cfg.RetryConfiguration.MetricPrefix, @@ -266,3 +316,12 @@ func (cfg *ConsumerConfig) setDefaults() { func NewBoolPtr(value bool) *bool { return &value } + +func jsonPretty(jsonString string) string { + var out bytes.Buffer + err := json.Indent(&out, []byte(jsonString), "", "\t") + if err != nil { + return jsonString + } + return out.String() +} diff --git a/consumer_config_test.go b/consumer_config_test.go index 98a8333..ebf9878 100644 --- a/consumer_config_test.go +++ b/consumer_config_test.go @@ -189,3 +189,219 @@ func TestConsumerConfig_getTopics(t *testing.T) { } }) } + +func Test_jsonPretty(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "Simple JSON", + input: `{"key1":"value1","key2":2}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"key2\": 2\n}", + }, + { + name: "Nested JSON", + input: `{"key1":"value1","key2":{"nestedKey1":1,"nestedKey2":2},"key3":[1,2,3]}`, + expected: "{\n\t\"key1\": \"value1\",\n\t\"" + + "key2\": {\n\t\t\"nestedKey1\": 1,\n\t\t\"nestedKey2\": 2\n\t},\n\t\"" + + "key3\": [\n\t\t1,\n\t\t2,\n\t\t3\n\t]\n}", + }, + { + name: "Invalid JSON", + input: `{"key1": "value1", "key2": 2`, + expected: `{"key1": "value1", "key2": 2`, + }, + { + name: "Empty JSON", + input: ``, + expected: ``, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := jsonPretty(tt.input) + if got != tt.expected { + t.Errorf("jsonPretty() = %v, want %v", got, tt.expected) + } + }) + } +} + +func TestConsumerConfig_JSON(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ConsumerConfig + expected := "{}" + // When + result := config.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "\"GroupId\": \"test-consumer.0\", \"GroupTopics\": [\"test-updated.0\"], \"MaxWait\": \"2s\", " + + "\"CommitInterval\": \"1s\", \"StartOffset\": \"earliest\"}, \"BatchConfiguration\": {\"MessageGroupLimit\": 100}, " + + "\"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, \"RetryEnabled\": true, " + + "\"RetryConfiguration\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], \"Topic\": \"test-exception.0\", " + + "\"StartTimeCron\": \"*/2 * * * *\", \"WorkDuration\": \"1m0s\", \"MaxRetry\": 3, \"VerifyTopicOnStartup\": true, \"Rack\": \"\"}, " + + "\"VerifyTopicOnStartup\": true, \"Rack\": \"stage\", " + + "\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " + + "\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getConsumerConfigExample().JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"ClientID\": \"test-consumer-client-id\", \"Reader\": {\"Brokers\": [\"\"], \"GroupId\": \"\", " + + "\"GroupTopics\": [\"\"], \"MaxWait\": \"0s\", \"CommitInterval\": \"0s\", \"StartOffset\": \"earliest\"}, " + + "\"BatchConfiguration\": {}, \"MessageGroupDuration\": \"20ns\", \"TransactionalRetry\": false, \"Concurrency\": 10, " + + "\"RetryEnabled\": true, \"RetryConfiguration\": {\"Brokers\": [\"\"], \"Topic\": \"\", \"StartTimeCron\": \"\", " + + "\"WorkDuration\": \"0s\", \"MaxRetry\": 0, \"VerifyTopicOnStartup\": false, \"Rack\": \"\"}, \"VerifyTopicOnStartup\": true, " + + "\"Rack\": \"stage\", \"SASL\": {}, \"TLS\": {}}" + // When + result := getConsumerConfigWithoutInnerObjectExample().JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "GroupId: \"test-consumer.0\", GroupTopics: [\"test-updated.0\"], MaxWait: \"2s\", CommitInterval: \"1s\", " + + "StartOffset: \"earliest\"}, BatchConfiguration: {MessageGroupLimit: 100}, MessageGroupDuration: \"20ns\", " + + "TransactionalRetry: false, Concurrency: 10, RetryEnabled: true, " + + "RetryConfiguration: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], Topic: \"test-exception.0\", " + + "StartTimeCron: \"*/2 * * * *\", WorkDuration: \"1m0s\", MaxRetry: 3, VerifyTopicOnStartup: true, Rack: \"\"}, " + + "VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + + "TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getConsumerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "ClientID: \"test-consumer-client-id\", Reader: {Brokers: [\"\"], GroupId: \"\", " + + "GroupTopics: [\"\"], MaxWait: \"0s\", CommitInterval: \"0s\", StartOffset: \"earliest\"}, " + + "BatchConfiguration: {}, MessageGroupDuration: \"20ns\", TransactionalRetry: false, Concurrency: 10, " + + "RetryEnabled: true, RetryConfiguration: {Brokers: [\"\"], Topic: \"\", StartTimeCron: \"\", WorkDuration: \"0s\", " + + "MaxRetry: 0, VerifyTopicOnStartup: false, Rack: \"\"}, VerifyTopicOnStartup: true, Rack: \"stage\", SASL: {}, TLS: {}" + // When + result := getConsumerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestConsumerConfig_JSONPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"Reader\": {\n\t\t\"" + + "Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "GroupId\": \"test-consumer.0\",\n\t\t\"GroupTopics\": [\n\t\t\t\"test-updated.0\"\n\t\t],\n\t\t\"" + + "MaxWait\": \"2s\",\n\t\t\"CommitInterval\": \"1s\",\n\t\t\"StartOffset\": \"earliest\"\n\t},\n\t\"" + + "BatchConfiguration\": {\n\t\t\"MessageGroupLimit\": 100\n\t},\n\t\"MessageGroupDuration\": \"20ns\",\n\t\"" + + "TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"RetryEnabled\": true,\n\t\"" + + "RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "Topic\": \"test-exception.0\",\n\t\t\"StartTimeCron\": \"*/2 * * * *\",\n\t\t\"WorkDuration\": \"1m0s\",\n\t\t\"" + + "MaxRetry\": 3,\n\t\t\"VerifyTopicOnStartup\": true,\n\t\t\"Rack\": \"\"\n\t},\n\t\"" + + "VerifyTopicOnStartup\": true,\n\t\"Rack\": \"stage\",\n\t\"" + + "SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" + + "TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getConsumerConfigExample().JSONPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "Reader\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"GroupId\": \"\",\n\t\t\"" + + "GroupTopics\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"MaxWait\": \"0s\",\n\t\t\"CommitInterval\": \"0s\",\n\t\t\"" + + "StartOffset\": \"earliest\"\n\t},\n\t\"BatchConfiguration\": {},\n\t\"" + + "MessageGroupDuration\": \"20ns\",\n\t\"TransactionalRetry\": false,\n\t\"Concurrency\": 10,\n\t\"" + + "RetryEnabled\": true,\n\t\"RetryConfiguration\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"" + + "Topic\": \"\",\n\t\t\"StartTimeCron\": \"\",\n\t\t\"WorkDuration\": \"0s\",\n\t\t\"MaxRetry\": 0,\n\t\t\"" + + "VerifyTopicOnStartup\": false,\n\t\t\"Rack\": \"\"\n\t},\n\t\"VerifyTopicOnStartup\": true,\n\t\"" + + "Rack\": \"stage\",\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getConsumerConfigWithoutInnerObjectExample().JSONPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getConsumerConfigExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + GroupID: "test-consumer.0", + GroupTopics: []string{"test-updated.0"}, + MaxWait: 2 * time.Second, + CommitInterval: time.Second, + }, + BatchConfiguration: &BatchConfiguration{ + MessageGroupLimit: 100, + }, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{ + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Topic: "test-exception.0", + StartTimeCron: "*/2 * * * *", + WorkDuration: time.Minute * 1, + MaxRetry: 3, + VerifyTopicOnStartup: true, + }, + VerifyTopicOnStartup: true, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getConsumerConfigWithoutInnerObjectExample() *ConsumerConfig { + return &ConsumerConfig{ + Rack: "stage", + ClientID: "test-consumer-client-id", + Reader: ReaderConfig{}, + MessageGroupDuration: 20, + TransactionalRetry: NewBoolPtr(false), + Concurrency: 10, + RetryEnabled: true, + RetryConfiguration: RetryConfiguration{}, + VerifyTopicOnStartup: true, + } +} diff --git a/mechanism.go b/mechanism.go index 5e89bf6..65cf284 100644 --- a/mechanism.go +++ b/mechanism.go @@ -1,6 +1,8 @@ package kafka import ( + "fmt" + "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/plain" "github.com/segmentio/kafka-go/sasl/scram" @@ -37,3 +39,10 @@ func (s *SASLConfig) plain() sasl.Mechanism { func (s *SASLConfig) IsEmpty() bool { return s == nil } + +func (s *SASLConfig) JSON() string { + if s == nil { + return "{}" + } + return fmt.Sprintf(`{"Mechanism": %q, "Username": %q, "Password": %q}`, s.Type, s.Username, s.Password) +} diff --git a/mechanism_test.go b/mechanism_test.go new file mode 100644 index 0000000..1c93ed0 --- /dev/null +++ b/mechanism_test.go @@ -0,0 +1,34 @@ +package kafka + +import "testing" + +func TestSASLConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *SASLConfig + + expected := "{}" + // When + result := cfg.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + } + + expected := "{\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}" + // When + result := cfg.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} diff --git a/producer_config.go b/producer_config.go index 1feeecd..7af1195 100644 --- a/producer_config.go +++ b/producer_config.go @@ -1,6 +1,9 @@ package kafka import ( + "fmt" + "regexp" + "strings" "time" "go.opentelemetry.io/otel" @@ -29,6 +32,11 @@ type WriterConfig struct { AllowAutoTopicCreation bool } +func (cfg WriterConfig) JSON() string { + return fmt.Sprintf(`{"Brokers": ["%s"], "Balancer": %q, "Compression": %q}`, + strings.Join(cfg.Brokers, "\", \""), GetBalancerString(cfg.Balancer), cfg.Compression.String()) +} + type TransportConfig struct { MetadataTopics []string DialTimeout time.Duration @@ -46,6 +54,25 @@ type ProducerConfig struct { DistributedTracingEnabled bool } +func (cfg *ProducerConfig) String() string { + re := regexp.MustCompile(`"(\w+)"\s*:`) + modifiedString := re.ReplaceAllString(cfg.JSON(), `$1:`) + modifiedString = modifiedString[1 : len(modifiedString)-1] + return modifiedString +} + +func (cfg *ProducerConfig) JSON() string { + if cfg == nil { + return "{}" + } + return fmt.Sprintf(`{"Writer": %s, "ClientID": %q, "DistributedTracingEnabled": %t, "SASL": %s, "TLS": %s}`, + cfg.Writer.JSON(), cfg.ClientID, cfg.DistributedTracingEnabled, cfg.SASL.JSON(), cfg.TLS.JSON()) +} + +func (cfg *ProducerConfig) JSONPretty() string { + return jsonPretty(cfg.JSON()) +} + func (cfg *ProducerConfig) newKafkaTransport() (*kafka.Transport, error) { transport := &Transport{ Transport: &kafka.Transport{ diff --git a/producer_config_test.go b/producer_config_test.go index 95bf956..0cf6388 100644 --- a/producer_config_test.go +++ b/producer_config_test.go @@ -1,6 +1,10 @@ package kafka -import "testing" +import ( + "testing" + + "github.com/segmentio/kafka-go" +) func TestProducerConfig_setDefaults(t *testing.T) { // Given @@ -17,3 +21,124 @@ func TestProducerConfig_setDefaults(t *testing.T) { t.Fatal("Propagator cannot be null") } } + +func TestProducerConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var config *ProducerConfig + expected := "{}" + // When + result := config.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "\"Balancer\": \"Hash\", \"Compression\": \"gzip\"}, \"ClientID\": \"test-consumer-client-id\", " + + "\"DistributedTracingEnabled\": false, " + + "\"SASL\": {\"Mechanism\": \"scram\", \"Username\": \"user\", \"Password\": \"pass\"}, " + + "\"TLS\": {\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}}" + // When + result := getProducerConfigExample().JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\"Writer\": {\"Brokers\": [\"\"], \"Balancer\": \"Unknown\", \"Compression\": \"uncompressed\"}, " + + "\"ClientID\": \"test-consumer-client-id\", \"DistributedTracingEnabled\": false, \"SASL\": {}, \"TLS\": {}}" + // When + result := getProducerConfigWithoutInnerObjectExample().JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_JsonPretty(t *testing.T) { + t.Run("Should_Convert_To_Pretty_Json", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"broker-1.test.com\",\n\t\t\t\"broker-2.test.com\"\n\t\t],\n\t\t\"" + + "Balancer\": \"Hash\",\n\t\t\"Compression\": \"gzip\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "DistributedTracingEnabled\": false,\n\t\"" + + "SASL\": {\n\t\t\"Mechanism\": \"scram\",\n\t\t\"Username\": \"user\",\n\t\t\"Password\": \"pass\"\n\t},\n\t\"" + + "TLS\": {\n\t\t\"RootCAPath\": \"resources/ca\",\n\t\t\"IntermediateCAPath\": \"resources/intCa\"\n\t}\n}" + // When + result := getProducerConfigExample().JSONPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Pretty_Json_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "{\n\t\"Writer\": {\n\t\t\"Brokers\": [\n\t\t\t\"\"\n\t\t],\n\t\t\"Balancer\": \"Unknown\",\n\t\t\"" + + "Compression\": \"uncompressed\"\n\t},\n\t\"ClientID\": \"test-consumer-client-id\",\n\t\"" + + "DistributedTracingEnabled\": false,\n\t\"SASL\": {},\n\t\"TLS\": {}\n}" + // When + result := getProducerConfigWithoutInnerObjectExample().JSONPretty() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func TestProducerConfig_String(t *testing.T) { + t.Run("Should_Convert_To_String", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"broker-1.test.com\", \"broker-2.test.com\"], " + + "Balancer: \"Hash\", Compression: \"gzip\"}, ClientID: \"test-consumer-client-id\", " + + "DistributedTracingEnabled: false, SASL: {Mechanism: \"scram\", Username: \"user\", Password: \"pass\"}, " + + "TLS: {RootCAPath: \"resources/ca\", IntermediateCAPath: \"resources/intCa\"}" + // When + result := getProducerConfigExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_String_Without_Inner_Object", func(t *testing.T) { + // Given + expected := "Writer: {Brokers: [\"\"], Balancer: \"Unknown\", Compression: \"uncompressed\"}, " + + "ClientID: \"test-consumer-client-id\", DistributedTracingEnabled: false, SASL: {}, TLS: {}" + // When + result := getProducerConfigWithoutInnerObjectExample().String() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +} + +func getProducerConfigExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + Writer: WriterConfig{ + Balancer: GetBalancerHash(), + Brokers: []string{"broker-1.test.com", "broker-2.test.com"}, + Compression: kafka.Gzip, + }, + TLS: &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + }, + SASL: &SASLConfig{ + Type: "scram", + Username: "user", + Password: "pass", + }, + } +} + +func getProducerConfigWithoutInnerObjectExample() *ProducerConfig { + return &ProducerConfig{ + ClientID: "test-consumer-client-id", + } +} diff --git a/tls.go b/tls.go index d6ebf6b..8ffaa8e 100644 --- a/tls.go +++ b/tls.go @@ -33,3 +33,10 @@ func (c *TLSConfig) TLSConfig() (*tls.Config, error) { func (c *TLSConfig) IsEmpty() bool { return c == nil || c.RootCAPath == "" && c.IntermediateCAPath == "" } + +func (c *TLSConfig) JSON() string { + if c == nil { + return "{}" + } + return fmt.Sprintf(`{"RootCAPath": %q, "IntermediateCAPath": %q}`, c.RootCAPath, c.IntermediateCAPath) +} diff --git a/tls_test.go b/tls_test.go index e3351e0..e49db51 100644 --- a/tls_test.go +++ b/tls_test.go @@ -65,3 +65,33 @@ func TestTLSConfig_IsEmpty(t *testing.T) { }) } } + +func TestTLSConfig_Json(t *testing.T) { + t.Run("Should_Convert_Nil_Config_To_Json", func(t *testing.T) { + // Given + var cfg *TLSConfig + + expected := "{}" + // When + result := cfg.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) + t.Run("Should_Convert_To_Json", func(t *testing.T) { + // Given + cfg := &TLSConfig{ + RootCAPath: "resources/ca", + IntermediateCAPath: "resources/intCa", + } + + expected := "{\"RootCAPath\": \"resources/ca\", \"IntermediateCAPath\": \"resources/intCa\"}" + // When + result := cfg.JSON() + // Then + if result != expected { + t.Fatal("result must be equal to expected") + } + }) +}