From 143aeabbd9d7de951da0b1a7566082065dc3a713 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Wed, 30 Oct 2024 16:42:05 +0000 Subject: [PATCH] Add `CreateSchemaWithIDAndVersion()` to the sr package Signed-off-by: Mihai Todor --- pkg/sr/api.go | 36 ++++++++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/pkg/sr/api.go b/pkg/sr/api.go index c38ce26b..8bb04339 100644 --- a/pkg/sr/api.go +++ b/pkg/sr/api.go @@ -93,13 +93,13 @@ type ( // a Kafka topic, and whether this is for a key or value. For example, // "foo-key" would be the subject for the foo topic for serializing the // key field of a record. - Subject string `json:"subject"` + Subject string `json:"subject,omitempty"` // Version is the version of this subject. - Version int `json:"version"` + Version int `json:"version,omitempty"` // ID is the globally unique ID of the schema. - ID int `json:"id"` + ID int `json:"id,omitempty"` Schema } @@ -352,18 +352,38 @@ func (cl *Client) Schemas(ctx context.Context, subject string) ([]SubjectSchema, // // This supports param [Normalize]. func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (SubjectSchema, error) { + return cl.CreateSchemaWithIDAndVersion(ctx, subject, s, -1, -1) +} + +// CreateSchemaWithIDAndVersion attempts to create a schema with a fixed ID and +// version ID in the given subject. If the id is set to -1 or 0, this method is +// equivalent to CreateSchema(). If the versionID is set to -1 or 0, it will be +// omitted when creating the schema. +// +// This supports param [Normalize]. +func (cl *Client) CreateSchemaWithIDAndVersion(ctx context.Context, subject string, s Schema, id, versionID int) (SubjectSchema, error) { // POST /subjects/{subject}/versions => returns ID // Newer SR returns the full SubjectSchema, but old does not, so we // re-request to find the full information. path := pathSubjectWithVersion(subject) - var id struct { + var into struct { ID int `json:"id"` } - if err := cl.post(ctx, path, s, &id); err != nil { - return SubjectSchema{}, err + if id == -1 { + if err := cl.post(ctx, path, s, &into); err != nil { + return SubjectSchema{}, err + } + } else { + ss := SubjectSchema{Schema: s, ID: id} + if versionID != -1 { + ss.Version = versionID + } + if err := cl.post(ctx, path, ss, &into); err != nil { + return SubjectSchema{}, err + } } - usages, err := cl.SchemaUsagesByID(ctx, id.ID) + usages, err := cl.SchemaUsagesByID(ctx, into.ID) if err != nil { return SubjectSchema{}, err } @@ -372,7 +392,7 @@ func (cl *Client) CreateSchema(ctx context.Context, subject string, s Schema) (S return usage, nil } } - return SubjectSchema{}, fmt.Errorf("created schema under id %d, but unable to find SubjectSchema", id.ID) + return SubjectSchema{}, fmt.Errorf("created schema under id %d, but unable to find SubjectSchema", into.ID) } // LookupSchema checks to see if a schema is already registered and if so,