-
Notifications
You must be signed in to change notification settings - Fork 2
/
produce.go
119 lines (97 loc) · 3.44 KB
/
produce.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package client
// ProduceRequest is used to send message sets to the server.
type ProduceRequest struct {
RequiredAcks int16
AckTimeoutMs int32
Data map[string]map[int32][]*MessageAndOffset
}
// Key returns the Kafka API key for ProduceRequest.
func (pr *ProduceRequest) Key() int16 {
return 0
}
// Version returns the Kafka request version for backwards compatibility.
func (pr *ProduceRequest) Version() int16 {
return 0
}
func (pr *ProduceRequest) Write(encoder Encoder) {
encoder.WriteInt16(pr.RequiredAcks)
encoder.WriteInt32(pr.AckTimeoutMs)
encoder.WriteInt32(int32(len(pr.Data)))
for topic, partitionData := range pr.Data {
encoder.WriteString(topic)
encoder.WriteInt32(int32(len(partitionData)))
for partition, data := range partitionData {
encoder.WriteInt32(partition)
encoder.Reserve(&LengthSlice{})
for _, messageAndOffset := range data {
messageAndOffset.Write(encoder)
}
encoder.UpdateReserved()
}
}
}
// AddMessage is a convenience method to add a single message to be produced to a topic partition.
func (pr *ProduceRequest) AddMessage(topic string, partition int32, message *Message) {
if pr.Data == nil {
pr.Data = make(map[string]map[int32][]*MessageAndOffset)
}
if pr.Data[topic] == nil {
pr.Data[topic] = make(map[int32][]*MessageAndOffset)
}
pr.Data[topic][partition] = append(pr.Data[topic][partition], &MessageAndOffset{Message: message})
}
// ProduceResponse contains highest assigned offsets by topic partitions and errors if they occurred.
type ProduceResponse struct {
Status map[string]map[int32]*ProduceResponseStatus
}
func (pr *ProduceResponse) Read(decoder Decoder) *DecodingError {
pr.Status = make(map[string]map[int32]*ProduceResponseStatus)
topicsLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidProduceTopicsLength)
}
for i := int32(0); i < topicsLength; i++ {
topic, err := decoder.GetString()
if err != nil {
return NewDecodingError(err, reasonInvalidProduceTopic)
}
blocksForTopic := make(map[int32]*ProduceResponseStatus)
pr.Status[topic] = blocksForTopic
partitionsLength, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidProducePartitionsLength)
}
for j := int32(0); j < partitionsLength; j++ {
partition, err := decoder.GetInt32()
if err != nil {
return NewDecodingError(err, reasonInvalidProducePartition)
}
data := new(ProduceResponseStatus)
errCode, err := decoder.GetInt16()
if err != nil {
return NewDecodingError(err, reasonInvalidProduceErrorCode)
}
data.Error = BrokerErrors[errCode]
offset, err := decoder.GetInt64()
if err != nil {
return NewDecodingError(err, reasonInvalidProduceOffset)
}
data.Offset = offset
blocksForTopic[partition] = data
}
}
return nil
}
// ProduceResponseStatus contains a highest assigned offset from a ProduceRequest and an error if it occurred.
type ProduceResponseStatus struct {
Error error
Offset int64
}
const (
reasonInvalidProduceTopicsLength = "Invalid topics length in ProduceResponse"
reasonInvalidProduceTopic = "Invalid topic in ProduceResponse"
reasonInvalidProducePartitionsLength = "Invalid partitions length in ProduceResponse"
reasonInvalidProducePartition = "Invalid partition in ProduceResponse"
reasonInvalidProduceErrorCode = "Invalid error code in ProduceResponse"
reasonInvalidProduceOffset = "Invalid offset in ProduceResponse"
)