forked from segmentio/kafka-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
createpartitions.go
103 lines (84 loc) · 2.75 KB
/
createpartitions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package kafka
import (
"context"
"fmt"
"net"
"time"
"github.com/rbisecke/kafka-go/protocol/createpartitions"
)
// CreatePartitionsRequest represents a request sent to a kafka broker to create
// and update topic parititions.
type CreatePartitionsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr
// List of topics to create and their configuration.
Topics []TopicPartitionsConfig
// When set to true, topics are not created but the configuration is
// validated as if they were.
ValidateOnly bool
}
// CreatePartitionsResponse represents a response from a kafka broker to a partition
// creation request.
type CreatePartitionsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration
// Mapping of topic names to errors that occurred while attempting to create
// the topics.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Errors map[string]error
}
// CreatePartitions sends a partitions creation request to a kafka broker and returns the
// response.
func (c *Client) CreatePartitions(ctx context.Context, req *CreatePartitionsRequest) (*CreatePartitionsResponse, error) {
topics := make([]createpartitions.RequestTopic, len(req.Topics))
for i, t := range req.Topics {
topics[i] = createpartitions.RequestTopic{
Name: t.Name,
Count: t.Count,
Assignments: t.assignments(),
}
}
m, err := c.roundTrip(ctx, req.Addr, &createpartitions.Request{
Topics: topics,
TimeoutMs: c.timeoutMs(ctx, defaultCreatePartitionsTimeout),
ValidateOnly: req.ValidateOnly,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).CreatePartitions: %w", err)
}
res := m.(*createpartitions.Response)
ret := &CreatePartitionsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Errors: make(map[string]error, len(res.Results)),
}
for _, t := range res.Results {
ret.Errors[t.Name] = makeError(t.ErrorCode, t.ErrorMessage)
}
return ret, nil
}
type TopicPartitionsConfig struct {
// Topic name
Name string
// Topic partition's count.
Count int32
// TopicPartitionAssignments among kafka brokers for this topic partitions.
TopicPartitionAssignments []TopicPartitionAssignment
}
func (t *TopicPartitionsConfig) assignments() []createpartitions.RequestAssignment {
if len(t.TopicPartitionAssignments) == 0 {
return nil
}
assignments := make([]createpartitions.RequestAssignment, len(t.TopicPartitionAssignments))
for i, a := range t.TopicPartitionAssignments {
assignments[i] = createpartitions.RequestAssignment{
BrokerIDs: a.BrokerIDs,
}
}
return assignments
}
type TopicPartitionAssignment struct {
// Broker IDs
BrokerIDs []int32
}