Skip to content

Commit

Permalink
feat(kafka): allow protobuf schema
Browse files Browse the repository at this point in the history
  • Loading branch information
Serpentiel committed Oct 2, 2023
1 parent 913aed9 commit 450a5e7
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ nav_order: 1
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Allow to modify `pg_user` replication settings
- Allow usage of `protobuf` schema in Kafka

## [4.9.0] - 2023-09-18

Expand Down
4 changes: 2 additions & 2 deletions docs/data-sources/kafka_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ data "aiven_kafka_schema_configuration" "config" {

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `id` (String) The ID of this resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `version` (Number) Kafka Schema configuration version.
4 changes: 2 additions & 2 deletions docs/data-sources/kafka_schema_configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ resource "aiven_kafka_schema_configuration" "config" {

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `id` (String) The ID of this resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource.
- `version` (Number) Kafka Schema configuration version.
4 changes: 2 additions & 2 deletions docs/resources/kafka_schema.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,14 @@ resource "aiven_kafka_schema" "kafka-schema1" {
### Required

- `project` (String) Identifies the project this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource.
- `schema` (String) Kafka Schema configuration should be a valid Avro Schema JSON format.
- `schema` (String) Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema, depending on the schema type.
- `service_name` (String) Specifies the name of the service that this resource belongs to. To set up proper dependencies please refer to this variable as a reference. This property cannot be changed, doing so forces recreation of the resource.
- `subject_name` (String) The Kafka Schema Subject name. This property cannot be changed, doing so forces recreation of the resource.

### Optional

- `compatibility_level` (String) Kafka Schemas compatibility level. The possible values are `BACKWARD`, `BACKWARD_TRANSITIVE`, `FORWARD`, `FORWARD_TRANSITIVE`, `FULL`, `FULL_TRANSITIVE` and `NONE`.
- `schema_type` (String) Kafka Schema type JSON or AVRO
- `schema_type` (String) Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, and PROTOBUF.
- `timeouts` (Block, Optional) (see [below for nested schema](#nestedblock--timeouts))

### Read-Only
Expand Down
71 changes: 62 additions & 9 deletions internal/sdkprovider/service/kafkaschema/kafka_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"reflect"
"regexp"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
Expand All @@ -16,6 +17,29 @@ import (
"github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig"
)

// protobufHeaderRegExp is a regular expression that matches a valid Protobuf header.
var protobufHeaderRegExp = regexp.MustCompile("syntax(.*|)=(.*|)proto3(.*|);")

// newlineRegExp is a regular expression that matches a newline.
var newlineRegExp = regexp.MustCompile(`\r?\n`)

// validateStringHasProtobufHeader is a schema.SchemaValidateFunc that ensures a string contains a valid Protobuf
// header.
func validateStringHasProtobufHeader(i any, k string) (warnings []string, errors []error) {
v, ok := i.(string)
if !ok {
errors = append(errors, fmt.Errorf("expected type of %s to be string", k))

return warnings, errors
}

if ok := protobufHeaderRegExp.Match([]byte(v)); !ok {
errors = append(errors, fmt.Errorf("%q contains an invalid Protobuf", k))
}

return warnings, errors
}

var aivenKafkaSchemaSchema = map[string]*schema.Schema{
"project": schemautil.CommonSchemaProjectReference,
"service_name": schemautil.CommonSchemaServiceNameReference,
Expand All @@ -28,18 +52,20 @@ var aivenKafkaSchemaSchema = map[string]*schema.Schema{
"schema": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsJSON,
StateFunc: normalizeJSONString,
DiffSuppressFunc: diffSuppressJSONObject,
Description: "Kafka Schema configuration should be a valid Avro Schema JSON format.",
ValidateFunc: validation.Any(validation.StringIsJSON, validateStringHasProtobufHeader),
StateFunc: normalizeJSONOrProtobufString,
DiffSuppressFunc: diffSuppressJSONObjectOrProtobufString,
Description: "Kafka Schema configuration. Should be a valid Avro, JSON, or Protobuf schema," +
" depending on the schema type.",
},
"schema_type": {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Kafka Schema type JSON or AVRO",
Type: schema.TypeString,
Optional: true,
ForceNew: true,
Description: "Kafka Schema configuration type. Defaults to AVRO. Possible values are AVRO, JSON, " +
"and PROTOBUF.",
Default: "AVRO",
ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON"}, false),
ValidateFunc: validation.StringInSlice([]string{"AVRO", "JSON", "PROTOBUF"}, false),
DiffSuppressFunc: func(k, oldValue, newValue string, d *schema.ResourceData) bool {
// This field can't be retrieved once resource is created.
// That produces a diff on plan on resource import.
Expand Down Expand Up @@ -79,13 +105,40 @@ func diffSuppressJSONObject(_, old, new string, _ *schema.ResourceData) bool {
return reflect.DeepEqual(objNew, objOld)
}

// diffSuppressJSONObjectOrProtobufString checks logical equivalences in JSON or Protobuf Kafka Schema values.
func diffSuppressJSONObjectOrProtobufString(k, old, new string, d *schema.ResourceData) bool {
if ok := protobufHeaderRegExp.Match([]byte(old)); ok {
return normalizeProtobufString(old) == normalizeProtobufString(new)
}

return diffSuppressJSONObject(k, old, new, d)
}

// normalizeJSONString returns normalized JSON string
func normalizeJSONString(v interface{}) string {
jsonString, _ := structure.NormalizeJsonString(v)

return jsonString
}

// normalizeProtobufString returns normalized Protobuf string.
func normalizeProtobufString(i any) string {
v := i.(string)

return newlineRegExp.ReplaceAllString(v, "")
}

// normalizeJSONOrProtobufString returns normalized JSON or Protobuf string.
func normalizeJSONOrProtobufString(i any) string {
v := i.(string)

if ok := protobufHeaderRegExp.Match([]byte(v)); ok {
return normalizeProtobufString(v)
}

return normalizeJSONString(v)
}

func ResourceKafkaSchema() *schema.Resource {
return &schema.Resource{
Description: "The Kafka Schema resource allows the creation and management of Aiven Kafka Schemas.",
Expand Down
61 changes: 52 additions & 9 deletions internal/sdkprovider/service/kafkaschema/kafka_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,11 @@ resource "aiven_kafka_schema" "schema" {
`, project, serviceName, subjectName)
}

func TestAccAivenKafkaSchema_json_basic(t *testing.T) {
// TestAccAivenKafkaSchema_json_protobuf_basic is a test for JSON and Protobuf schema Kafka Schema resource.
func TestAccAivenKafkaSchema_json_protobuf_basic(t *testing.T) {
resourceName := "aiven_kafka_schema.foo"
resourceName2 := "aiven_kafka_schema.bar"

rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)

resource.ParallelTest(t, resource.TestCase{
Expand All @@ -102,19 +105,34 @@ func TestAccAivenKafkaSchema_json_basic(t *testing.T) {
CheckDestroy: testAccCheckAivenKafkaSchemaResourceDestroy,
Steps: []resource.TestStep{
{
Config: testAccKafkaSchemaJSONResource(rName),
Config: testAccKafkaSchemaJSONProtobufResource(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema"),
testAccCheckAivenKafkaSchemaAttributes("data.aiven_kafka_schema.schema2"),
resource.TestCheckResourceAttr(resourceName, "project", os.Getenv("AIVEN_PROJECT_NAME")),
resource.TestCheckResourceAttr(resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName)),
resource.TestCheckResourceAttr(resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s", rName)),
resource.TestCheckResourceAttr(
resourceName, "service_name", fmt.Sprintf("test-acc-sr-%s", rName),
),
resource.TestCheckResourceAttr(
resourceName, "subject_name", fmt.Sprintf("kafka-schema-%s-foo", rName),
),
resource.TestCheckResourceAttr(resourceName, "version", "1"),
resource.TestCheckResourceAttr(resourceName, "schema_type", "JSON"),
resource.TestCheckResourceAttr(resourceName2, "project", os.Getenv("AIVEN_PROJECT_NAME")),
resource.TestCheckResourceAttr(
resourceName2, "service_name", fmt.Sprintf("test-acc-sr-%s", rName),
),
resource.TestCheckResourceAttr(
resourceName2, "subject_name", fmt.Sprintf("kafka-schema-%s-bar", rName),
),
resource.TestCheckResourceAttr(resourceName2, "version", "1"),
resource.TestCheckResourceAttr(resourceName2, "schema_type", "PROTOBUF"),
),
},
},
})
}

func TestAccAivenKafkaSchema_basic(t *testing.T) {
resourceName := "aiven_kafka_schema.foo"
rName := acctest.RandStringFromCharSet(10, acctest.CharSetAlphaNum)
Expand Down Expand Up @@ -208,17 +226,18 @@ func testAccCheckAivenKafkaSchemaResourceDestroy(s *terraform.State) error {
return nil
}

func testAccKafkaSchemaJSONResource(name string) string {
// testAccKafkaSchemaJSONProtobufResource is a test resource for JSON and Protobuf schema Kafka Schema resource.
func testAccKafkaSchemaJSONProtobufResource(name string) string {
return fmt.Sprintf(`
data "aiven_project" "foo" {
project = "%s"
project = "%[1]s"
}
resource "aiven_kafka" "bar" {
project = data.aiven_project.foo.project
cloud_name = "google-europe-west1"
plan = "startup-2"
service_name = "test-acc-sr-%s"
service_name = "test-acc-sr-%[2]s"
maintenance_window_dow = "monday"
maintenance_window_time = "10:00:00"
Expand All @@ -241,7 +260,7 @@ resource "aiven_kafka_schema_configuration" "foo" {
resource "aiven_kafka_schema" "foo" {
project = aiven_kafka_schema_configuration.foo.project
service_name = aiven_kafka_schema_configuration.foo.service_name
subject_name = "kafka-schema-%s"
subject_name = "kafka-schema-%[2]s-foo"
schema_type = "JSON"
schema = <<EOT
Expand All @@ -266,7 +285,31 @@ data "aiven_kafka_schema" "schema" {
subject_name = aiven_kafka_schema.foo.subject_name
depends_on = [aiven_kafka_schema.foo]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name, name)
}
resource "aiven_kafka_schema" "bar" {
project = aiven_kafka_schema_configuration.foo.project
service_name = aiven_kafka_schema_configuration.foo.service_name
subject_name = "kafka-schema-%[2]s-bar"
schema_type = "PROTOBUF"
schema = <<EOT
syntax = "proto3";
message Example {
int32 test = 5;
}
EOT
}
data "aiven_kafka_schema" "schema2" {
project = aiven_kafka_schema.bar.project
service_name = aiven_kafka_schema.bar.service_name
subject_name = aiven_kafka_schema.bar.subject_name
depends_on = [aiven_kafka_schema.bar]
}`, os.Getenv("AIVEN_PROJECT_NAME"), name)
}

func testAccKafkaSchemaResource(name string) string {
Expand Down

0 comments on commit 450a5e7

Please sign in to comment.