Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka): allow protobuf schema #1369

Merged
merged 1 commit into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ nav_order: 1

- Allow to modify `pg_user` replication settings
- Fix `aiven_project_user` 409 error handling
- Allow usage of `protobuf` schema in Kafka

## [4.9.0] - 2023-09-18

Expand Down
4 changes: 2 additions & 2 deletions docs/data-sources/kafka_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ data "aiven_kafka_schema_configuration" "config" {

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `id` (String) The ID of this resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `version` (Number) Kafka Schema configuration version.
4 changes: 2 additions & 2 deletions docs/data-sources/kafka_schema_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ resource "aiven_kafka_schema_configuration" "config" {

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `id` (String) The ID of this resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource.
- `version` (Number) Kafka Schema configuration version.
4 changes: 2 additions & 2 deletions docs/resources/kafka_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ resource "aiven_kafka_schema" "kafka-schema1" {
### Required

- `project` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `service_name` (String) Specifies the name of the service that this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource.
- `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource.

### Optional

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts))

### Read-Only
Expand Down
51 changes: 38 additions & 13 deletions internal/sdkprovider/service/kafkaschema/kafka_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"regexp"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand All @@ -16,6 +17,9 @@ import (
"github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig"
)

// newlineRegExp is a regular expression that matches a newline.
var newlineRegExp = regexp.MustCompile(`\r?\n`)

var aivenKafkaSchemaSchema = map[string]*schema.Schema{
"project": schemautil.CommonSchemaProjectReference,
"service_name": schemautil.CommonSchemaServiceNameReference,
Expand All @@ -28,18 +32,19 @@ var aivenKafkaSchemaSchema = map[string]*schema.Schema{
"schema": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsJSON,
StateFunc: normalizeJSONString,
DiffSuppressFunc: diffSuppressJSONObject,
Description: "Kafka Schema configuration should be a valid Avro Schema JSON format.",
StateFunc: normalizeJSONOrProtobufString,
DiffSuppressFunc: diffSuppressJSONObjectOrProtobufString,
Description: "Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema," +
" depending on the schema type.",
},
"schema_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Kafka Schema type JSON or AVRO",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, " +
"and PROTOBUF.",
Default: "AVRO",
ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON"}, false),
ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON", "PROTOBUF"}, false),
DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool {
// This field can't be retrieved once resource is created.
// That produces a diff on plan on resource import.
Expand Down Expand Up @@ -79,11 +84,31 @@ func diffSuppressJSONObject(_, old, new string, _ *schema.ResourceData) bool {
return reflect.DeepEqual(objNew, objOld)
}

// normalizeJSONString returns normalized JSON string
func normalizeJSONString(v interface{}) string {
jsonString, _ := structure.NormalizeJsonString(v)
// diffSuppressJSONObjectOrProtobufString checks logical equivalences in JSON or Protobuf Kafka Schema values.
func diffSuppressJSONObjectOrProtobufString(k, old, new string, d *schema.ResourceData) bool {
if !diffSuppressJSONObject(k, old, new, d) {
return normalizeProtobufString(old) == normalizeProtobufString(new)
}

return false
}

// normalizeProtobufString returns normalized Protobuf string.
func normalizeProtobufString(i any) string {
Serpentiel marked this conversation as resolved.
Show resolved Hide resolved
v := i.(string)

return newlineRegExp.ReplaceAllString(v, "")
}

// normalizeJSONOrProtobufString returns normalized JSON or Protobuf string.
func normalizeJSONOrProtobufString(i any) string {
v := i.(string)

if n, err := structure.NormalizeJsonString(v); err == nil {
return n
}

return jsonString
return normalizeProtobufString(v)
}

func ResourceKafkaSchema() *schema.Resource {
Expand Down
61 changes: 52 additions & 9 deletions internal/sdkprovider/service/kafkaschema/kafka_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ resource "aiven_kafka_schema" "schema" {
`, project, serviceName, subjectName)
}

func TestAccAivenKafkaSchema_json_basic(t *testing.T) {
// TestAccAivenKafkaSchema_json_protobuf_basic is a test for JSON and Protobuf schema Kafka Schema resource.
func TestAccAivenKafkaSchema_json_protobuf_basic(t *testing.T) {
resourceName := "aiven_kafka_schema.foo"
resourceName2 := "aiven_kafka_schema.bar"

rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)

resource.ParallelTest(t, resource.TestCase{
Expand All @@ -102,19 +105,34 @@ func TestAccAivenKafkaSchema_json_basic(t *testing.T) {
CheckDestroy: testAccCheckAivenKafkaSchemaResourceDestroy,
Steps: []resource.TestStep{
{
Config: testAccKafkaSchemaJSONResource(rName),
Config: testAccKafkaSchemaJSONProtobufResource(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"),
testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema2"),
resource.TestCheckResourceAttr(resourceName, "project", os.Getenv("AIVEN_PROJECT_NAME")),
resource.TestCheckResourceAttr(resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName)),
resource.TestCheckResourceAttr(resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s", rName)),
resource.TestCheckResourceAttr(
resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName),
),
resource.TestCheckResourceAttr(
resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s-foo", rName),
),
resource.TestCheckResourceAttr(resourceName, "version", "1"),
resource.TestCheckResourceAttr(resourceName, "schema_type", "JSON"),
resource.TestCheckResourceAttr(resourceName2, "project", os.Getenv("AIVEN_PROJECT_NAME")),
resource.TestCheckResourceAttr(
resourceName2, "service_name", fmt.Sprintf("test-acc-sr-%s", rName),
),
resource.TestCheckResourceAttr(
resourceName2, "subject_name", fmt.Sprintf("kafka-schema-%s-bar", rName),
),
resource.TestCheckResourceAttr(resourceName2, "version", "1"),
resource.TestCheckResourceAttr(resourceName2, "schema_type", "PROTOBUF"),
),
},
},
})
}

func TestAccAivenKafkaSchema_basic(t *testing.T) {
resourceName := "aiven_kafka_schema.foo"
rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
Expand Down Expand Up @@ -208,17 +226,18 @@ func testAccCheckAivenKafkaSchemaResourceDestroy(s *terraform.State) error {
return nil
}

func testAccKafkaSchemaJSONResource(name string) string {
// testAccKafkaSchemaJSONProtobufResource is a test resource for JSON and Protobuf schema Kafka Schema resource.
func testAccKafkaSchemaJSONProtobufResource(name string) string {
return fmt.Sprintf(`
data "aiven_project" "foo" {
project = "%s"
project = "%[1]s"
}

resource "aiven_kafka" "bar" {
project = data.aiven_project.foo.project
cloud_name = "google-europe-west1"
plan = "startup-2"
service_name = "test-acc-sr-%s"
service_name = "test-acc-sr-%[2]s"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"

Expand All @@ -241,7 +260,7 @@ resource "aiven_kafka_schema_configuration" "foo" {
resource "aiven_kafka_schema" "foo" {
project = aiven_kafka_schema_configuration.foo.project
service_name = aiven_kafka_schema_configuration.foo.service_name
subject_name = "kafka-schema-%s"
subject_name = "kafka-schema-%[2]s-foo"
schema_type = "JSON"

schema = <<EOT
Expand All @@ -266,7 +285,31 @@ data "aiven_kafka_schema" "schema" {
subject_name = aiven_kafka_schema.foo.subject_name

depends_on = [aiven_kafka_schema.foo]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name, name)
}

resource "aiven_kafka_schema" "bar" {
project = aiven_kafka_schema_configuration.foo.project
service_name = aiven_kafka_schema_configuration.foo.service_name
subject_name = "kafka-schema-%[2]s-bar"
schema_type = "PROTOBUF"

schema = <<EOT
syntax = "proto3";

message Example {
int32 test = 5;
}

EOT
}

data "aiven_kafka_schema" "schema2" {
project = aiven_kafka_schema.bar.project
service_name = aiven_kafka_schema.bar.service_name
subject_name = aiven_kafka_schema.bar.subject_name

depends_on = [aiven_kafka_schema.bar]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name)
}

func testAccKafkaSchemaResource(name string) string {
Expand Down