Skip to content

Commit

Permalink
refactor(kafkatopic): add KafkaTopic repository [skip ci]
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Sep 20, 2023
1 parent dadb7c2 commit b6e684b
Show file tree
Hide file tree
Showing 16 changed files with 769 additions and 708 deletions.
2 changes: 2 additions & 0 deletions examples_tests/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (s *BaseTestSuite) TearDownSuite() {
}

// withDefaults adds default options for terraform test
//
//lint:ignore U1000 Ignore unused function. Used in child structs
func (s *BaseTestSuite) withDefaults(opts *terraform.Options) *terraform.Options {
// No need to use lock file for dev build
opts.Lock = false
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.19

require (
github.com/aiven/aiven-go-client v1.36.0
github.com/avast/retry-go v3.0.0+incompatible
github.com/dave/jennifer v1.7.0
github.com/docker/go-units v0.5.0
github.com/ettle/strcase v0.1.1
Expand All @@ -15,9 +16,9 @@ require (
github.com/hashicorp/terraform-plugin-mux v0.11.2
github.com/hashicorp/terraform-plugin-sdk/v2 v2.28.0
github.com/kelseyhightower/envconfig v1.4.0
github.com/samber/lo v1.38.1
github.com/stretchr/testify v1.8.4
golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819
golang.org/x/sync v0.3.0
gopkg.in/yaml.v3 v3.0.1
)

Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/
github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec=
github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw=
github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/aws/aws-sdk-go v1.44.122 h1:p6mw01WBaNpbdP2xrisz5tIkcNwzj/HysobNoaAHjgo=
github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas=
Expand Down Expand Up @@ -496,6 +498,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ=
github.com/skeema/knownhosts v1.1.0 h1:Wvr9V0MxhjRbl3f9nMnKnFfiWTJmtECJ9Njkea3ysW0=
Expand Down Expand Up @@ -691,8 +695,6 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
44 changes: 44 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/create.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package kafkatopicrepository

import (
"context"
"time"

"github.com/aiven/aiven-go-client"
"github.com/avast/retry-go"
)

const createRetryDelay = 10 * time.Second

// Create creates topic.
// First checks if topic does not exist for the safety
// Then calls creates topic.
func (rep *repository) Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error {
// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists, and we have retries in aiven client
// So to be sure, better check it before create
err := rep.exists(ctx, project, service, req.TopicName, true)
if err == nil {
return errAlreadyExists
}

// If this is not errNotFound, then something happened
if err != errNotFound {
return err
}

return retry.Do(func() error {
err := rep.client.Create(project, service, req)
if err == nil || aiven.IsAlreadyExists(err) {
return nil
}

// If some brokers are offline while the request is being executed
// the operation may fail.
_, ok := err.(aiven.Error)
if ok {
return err
}
return retry.Unrecoverable(err)
}, retry.Context(ctx), retry.Delay(createRetryDelay))
}
68 changes: 68 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/create_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package kafkatopicrepository

import (
"context"
"sync"
"sync/atomic"
"testing"

"github.com/aiven/aiven-go-client"
"github.com/stretchr/testify/assert"
)

// TestCreateConflict tests that one goroutine out of 100 creates topic, while others get errAlreadyExists
func TestCreateConflict(t *testing.T) {
client := &fakeTopicClient{}
rep := newRepository(client)
ctx := context.Background()

var conflictErr int32
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
err := rep.Create(ctx, "foo", "bar", aiven.CreateKafkaTopicRequest{TopicName: "baz"})
if err == errAlreadyExists {
atomic.AddInt32(&conflictErr, 1)
}
wg.Done()
}()
}
wg.Wait()
assert.EqualValues(t, 99, conflictErr)
assert.EqualValues(t, 1, client.createCalled)
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["foo/bar"])
assert.True(t, rep.seenTopics["foo/bar/baz"])
}

func TestCreateRecreateMissing(t *testing.T) {
client := &fakeTopicClient{}
rep := newRepository(client)
ctx := context.Background()

// Creates topic
err := rep.Create(ctx, "foo", "bar", aiven.CreateKafkaTopicRequest{TopicName: "baz"})
assert.NoError(t, err)
assert.EqualValues(t, 1, client.createCalled)
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["foo/bar"])
assert.True(t, rep.seenTopics["foo/bar/baz"])

// Forgets the topic, like if it's missing
err = rep.forgetTopic("foo", "bar", "baz")
assert.NoError(t, err)
assert.True(t, rep.seenServices["foo/bar"])
assert.False(t, rep.seenTopics["foo/bar/baz"]) // not cached, missing

// Recreates topic
err = rep.Create(ctx, "foo", "bar", aiven.CreateKafkaTopicRequest{TopicName: "baz"})
assert.NoError(t, err)
assert.EqualValues(t, 2, client.createCalled) // Updated
assert.EqualValues(t, 1, client.v1ListCalled)
assert.EqualValues(t, 0, client.v2ListCalled)
assert.True(t, rep.seenServices["foo/bar"])
assert.True(t, rep.seenTopics["foo/bar/baz"]) // cached again
}
18 changes: 18 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/delete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kafkatopicrepository

import (
"context"

"github.com/aiven/aiven-go-client"
)

func (rep *repository) Delete(_ context.Context, project, service, topic string) error {
rep.Lock()
rep.seenTopics[newKey(project, service, topic)] = false
rep.Unlock()
err := rep.client.Delete(project, service, topic)
if !(err == nil || aiven.IsNotFound(err)) {
return err
}
return nil
}
163 changes: 163 additions & 0 deletions internal/sdkprovider/kafkatopicrepository/read.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package kafkatopicrepository

import (
"context"
"fmt"

"github.com/aiven/aiven-go-client"
"github.com/avast/retry-go"
"github.com/samber/lo"
)

func (rep *repository) Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) {
// We have quick methods to determine that topic does not exist
err := rep.exists(ctx, project, service, topic, false)
if err != nil {
return nil, err
}

// Adds request to the queue
c := make(chan *response, 1)
r := &request{
project: project,
service: service,
topic: topic,
rsp: c,
}
rep.Lock()
rep.queue = append(rep.queue, r)
rep.Unlock()

// Waits response from the channel
// Or exits on context done
select {
case <-ctx.Done():
return nil, ctx.Err()
case rsp := <-c:
close(c)
return rsp.topic, rsp.err
}
}

// exists returns nil if topic exists, or errNotFound if doesn't:
// 1. checks repository.seenTopics for known topics
// 2. calls v1List for the remote state for the given service and marks it in repository.seenServices
// 3. saves topic names to repository.seenTopics, so its result can be reused
// 4. when acquire true, then saves topic to repository.seenTopics (for creating)
// todo: use context with the new client
func (rep *repository) exists(_ context.Context, project, service, topic string, acquire bool) error {
rep.Lock()
defer rep.Unlock()
// Checks repository.seenTopics.
// If it has been just created, it is not available in v1List.
// So calling it first doesn't make any sense
serviceKey := newKey(project, service)
topicKey := newKey(serviceKey, topic)
if rep.seenTopics[topicKey] {
return nil
}

// Goes for v1List
if !rep.seenServices[serviceKey] {
list, err := rep.client.List(project, service)
if err != nil {
return err
}

// Marks seen all the topics
for _, t := range list {
rep.seenTopics[newKey(serviceKey, t.TopicName)] = true
}

// Service is seen too. It never goes here again
rep.seenServices[serviceKey] = true
}

// Checks updated list
if rep.seenTopics[topicKey] {
return nil
}

// Create functions run in parallel need to lock the name before create
// Otherwise they may run into conflict
if acquire {
rep.seenTopics[topicKey] = true
}

// v1List doesn't contain the topic
return errNotFound
}

// fetch fetches requested topics configuration
// 1. groups topics by service
// 2. requests topics (in chunks)
// Warning: if we call V2List with at least one "not found" topic, it will return 404 for all topics
// Should be certain that all topics in queue do exist. Call repository.exists first to do so
func (rep *repository) fetch(queue map[string]*request) {
// Groups topics by service
byService := make(map[string][]*request, 0)
for i := range queue {
r := queue[i]
key := newKey(r.project, r.service)
byService[key] = append(byService[key], r)
}

// Fetches topics configuration
for _, reqs := range byService {
topicNames := make([]string, 0, len(reqs))
for _, r := range reqs {
topicNames = append(topicNames, r.topic)
}

// Topics are grouped by service
// We can share this values
project := reqs[0].project
service := reqs[0].service

// Slices topic names by repository.v2ListBatchSize
// because V2List has a limit
for _, chunk := range lo.Chunk(topicNames, rep.v2ListBatchSize) {
// V2List() and Get() do not get info immediately
// Some retries should be applied if result is not equal to requested values
var list []*aiven.KafkaTopic
err := retry.Do(func() error {
rspList, err := rep.client.V2List(project, service, chunk)

// 404 means that there is "not found" on the list
// But repository.exists should have checked these, so now this is a fail
if aiven.IsNotFound(err) {
return retry.Unrecoverable(fmt.Errorf("topic list has changed"))
}

// Something else happened
// We have retries in the client, so this is bad
if err != nil {
return retry.Unrecoverable(err)
}

// This is an old cache, we need to retry it until succeed
if len(rspList) != len(chunk) {
return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk))
}

list = rspList
return nil
}, retry.Delay(rep.v2ListRetryDelay))

if err != nil {
// Send errors
// Flattens error to a string, because it might go really completed for testing
err = fmt.Errorf("topic read error: %s", err)
for _, r := range reqs {
r.send(nil, err)
}
continue
}

// Sends topics
for _, t := range list {
queue[newKey(project, service, t.TopicName)].send(t, nil)
}
}
}
}
Loading

0 comments on commit b6e684b

Please sign in to comment.