diff --git a/pkg/kgo/client.go b/pkg/kgo/client.go index 775a22e6..197c3933 100644 --- a/pkg/kgo/client.go +++ b/pkg/kgo/client.go @@ -78,8 +78,7 @@ type Client struct { producer producer consumer consumer - compressor *compressor - decompressor *decompressor + compressor *compressor coordinatorsMu sync.Mutex coordinators map[coordinatorKey]*coordinatorLoad @@ -482,8 +481,7 @@ func NewClient(opts ...Opt) (*Client, error) { bufPool: newBufPool(), prsPool: newPrsPool(), - compressor: compressor, - decompressor: newDecompressor(), + compressor: compressor, coordinators: make(map[coordinatorKey]*coordinatorLoad), diff --git a/pkg/kgo/compression.go b/pkg/kgo/compression.go index fe8ad645..81d9d8a7 100644 --- a/pkg/kgo/compression.go +++ b/pkg/kgo/compression.go @@ -235,6 +235,8 @@ type decompressor struct { unzstdPool sync.Pool } +var defaultDecompressor = newDecompressor() + func newDecompressor() *decompressor { d := &decompressor{ ungzPool: sync.Pool{ diff --git a/pkg/kgo/source.go b/pkg/kgo/source.go index 0c475d14..85586a0a 100644 --- a/pkg/kgo/source.go +++ b/pkg/kgo/source.go @@ -92,6 +92,26 @@ func (s *source) removeCursor(rm *cursor) { } } +type ProcessFetchPartitionOptions struct { + // KeepControlRecords sets the parser to keep control messages and return + // them with fetches, overriding the default that discards them. + // + // Generally, control messages are not useful. It is the same as kgo.KeepControlRecords(). + KeepControlRecords bool + + // Offset is the minimum offset for which we'll parse records. Records with lower offsets will not be parsed or returned. + Offset int64 + + // IsolationLevel controls whether or not to return uncomitted records. See kgo.IsolationLevel. + IsolationLevel IsolationLevel + + // Topic is used to populate the Topic field of each Record. + Topic string + + // Topic is used to populate the Partition field of each Record. + Partition int32 +} + // cursor is where we are consuming from for an individual partition. type cursor struct { topic string @@ -1068,7 +1088,7 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe continue } - fp := partOffset.processRespPartition(br, rp, s.cl.decompressor, s.cl.cfg.hooks) + fp := partOffset.processRespPartition(br, rp, s.cl.cfg.hooks) if fp.Err != nil { if moving := kmove.maybeAddFetchPartition(resp, rp, partOffset.from); moving { strip(topic, partition, fp.Err) @@ -1245,7 +1265,41 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe // processRespPartition processes all records in all potentially compressed // batches (or message sets). -func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, decompressor *decompressor, hooks hooks) FetchPartition { +func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchResponseTopicPartition, hooks hooks) (fp FetchPartition) { + if rp.ErrorCode == 0 { + o.hwm = rp.HighWatermark + } + opts := ProcessFetchPartitionOptions{ + KeepControlRecords: br.cl.cfg.keepControl, + Offset: o.offset, + IsolationLevel: IsolationLevel{br.cl.cfg.isolationLevel}, + Topic: o.from.topic, + Partition: o.from.partition, + } + observeMetrics := func(m FetchBatchMetrics) { + hooks.each(func(h Hook) { + if h, ok := h.(HookFetchBatchRead); ok { + h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) + } + }) + } + fp, o.offset = ProcessRespPartition(opts, rp, observeMetrics) + if len(fp.Records) > 0 { + lastRecord := fp.Records[len(fp.Records)-1] + // We adjust the offset separately because it may be larger than the offset of the last record for compacted partitions. + o.lastConsumedEpoch = lastRecord.LeaderEpoch + o.lastConsumedTime = lastRecord.Timestamp + } + + return fp +} + +// ProcessRespPartition processes all records in all potentially compressed batches (or message sets). +// ProcessRespPartition returns the FetchPartition and the last offset of records processed. observeMetrics can be nil. +// This is useful when issuing manual Fetch requests for records. +// In case of a compacted partition, the last offset may be larger than the offset of the last record. +// If the partition response is truncated and the partiiton was compacted, then the last offset is the offset of the last record. +func ProcessRespPartition(o ProcessFetchPartitionOptions, rp *kmsg.FetchResponseTopicPartition, observeMetrics func(FetchBatchMetrics)) (FetchPartition, int64) { fp := FetchPartition{ Partition: rp.Partition, Err: kerr.ErrorForCode(rp.ErrorCode), @@ -1253,12 +1307,9 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon LastStableOffset: rp.LastStableOffset, LogStartOffset: rp.LogStartOffset, } - if rp.ErrorCode == 0 { - o.hwm = rp.HighWatermark - } var aborter aborter - if br.cl.cfg.isolationLevel == 1 { + if o.IsolationLevel.level == 1 { aborter = buildAborter(rp) } @@ -1349,10 +1400,10 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon default: fp.Err = fmt.Errorf("unknown magic %d; message offset is %d and length is %d, skipping and setting to next offset", magic, offset, length) - if next := offset + 1; next > o.offset { - o.offset = next + if next := offset + 1; next > o.Offset { + o.Offset = next } - return fp + return fp, o.Offset } if !check() { @@ -1367,30 +1418,27 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon case *kmsg.MessageV0: m.CompressedBytes = int(length) // for message sets, we include the message set overhead in length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV0OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV0OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.MessageV1: m.CompressedBytes = int(length) m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processV1OuterMessage(&fp, t, decompressor) + m.NumRecords, m.UncompressedBytes = processV1OuterMessage(&o, &fp, t, defaultDecompressor) case *kmsg.RecordBatch: m.CompressedBytes = len(t.Records) // for record batches, we only track the record batch length m.CompressionType = uint8(t.Attributes) & 0b0000_0111 - m.NumRecords, m.UncompressedBytes = o.processRecordBatch(&fp, t, aborter, decompressor) + m.NumRecords, m.UncompressedBytes = processRecordBatch(&o, &fp, t, aborter, defaultDecompressor) } if m.UncompressedBytes == 0 { m.UncompressedBytes = m.CompressedBytes } - hooks.each(func(h Hook) { - if h, ok := h.(HookFetchBatchRead); ok { - h.OnFetchBatchRead(br.meta, o.from.topic, o.from.partition, m) - } - }) + if observeMetrics != nil { + observeMetrics(m) + } } - - return fp + return fp, o.Offset } type aborter map[int64][]int64 @@ -1453,7 +1501,8 @@ func readRawRecords(n int, in []byte) []kmsg.Record { return rs } -func (o *cursorOffsetNext) processRecordBatch( +func processRecordBatch( + o *ProcessFetchPartitionOptions, fp *FetchPartition, batch *kmsg.RecordBatch, aborter aborter, @@ -1464,7 +1513,7 @@ func (o *cursorOffsetNext) processRecordBatch( return 0, 0 } lastOffset := batch.FirstOffset + int64(batch.LastOffsetDelta) - if lastOffset < o.offset { + if lastOffset < o.Offset { // If the last offset in this batch is less than what we asked // for, we got a batch that we entirely do not need. We can // avoid all work (although we should not get this batch). @@ -1496,15 +1545,15 @@ func (o *cursorOffsetNext) processRecordBatch( // either advance offsets or will set to nextAskOffset. nextAskOffset := lastOffset + 1 defer func() { - if numRecords == len(krecords) && o.offset < nextAskOffset { - o.offset = nextAskOffset + if numRecords == len(krecords) && o.Offset < nextAskOffset { + o.Offset = nextAskOffset } }() abortBatch := aborter.shouldAbortBatch(batch) for i := range krecords { record := recordToRecord( - o.from.topic, + o.Topic, fp.Partition, batch, &krecords[i], @@ -1528,14 +1577,10 @@ func (o *cursorOffsetNext) processRecordBatch( // this easy, but if not, we decompress and process each inner message as // either v0 or v1. We only expect the inner message to be v1, but technically // a crazy pipeline could have v0 anywhere. -func (o *cursorOffsetNext) processV1OuterMessage( - fp *FetchPartition, - message *kmsg.MessageV1, - decompressor *decompressor, -) (int, int) { +func processV1OuterMessage(o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, decompressor *decompressor) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV1Message(fp, message) + processV1Message(o, fp, message) return 1, 0 } @@ -1606,13 +1651,13 @@ out: case *kmsg.MessageV0: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } case *kmsg.MessageV1: innerMessage.Offset = firstOffset + int64(i) innerMessage.Attributes |= int8(compression) - if !o.processV1Message(fp, innerMessage) { + if !processV1Message(o, fp, innerMessage) { return i, uncompressedBytes } } @@ -1620,7 +1665,8 @@ out: return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV1Message( +func processV1Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV1, ) bool { @@ -1632,21 +1678,22 @@ func (o *cursorOffsetNext) processV1Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v1MessageToRecord(o.from.topic, fp.Partition, message) + record := v1MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } // Processes an outer v0 message. We expect inner messages to be entirely v0 as // well, so this only tries v0 always. -func (o *cursorOffsetNext) processV0OuterMessage( +func processV0OuterMessage( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, decompressor *decompressor, ) (int, int) { compression := byte(message.Attributes & 0x0003) if compression == 0 { - o.processV0Message(fp, message) + processV0Message(o, fp, message) return 1, 0 // uncompressed bytes is 0; set to compressed bytes on return } @@ -1689,14 +1736,15 @@ func (o *cursorOffsetNext) processV0OuterMessage( innerMessage := &innerMessages[i] innerMessage.Attributes |= int8(compression) innerMessage.Offset = firstOffset + int64(i) - if !o.processV0Message(fp, innerMessage) { + if !processV0Message(o, fp, innerMessage) { return i, uncompressedBytes } } return len(innerMessages), uncompressedBytes } -func (o *cursorOffsetNext) processV0Message( +func processV0Message( + o *ProcessFetchPartitionOptions, fp *FetchPartition, message *kmsg.MessageV0, ) bool { @@ -1708,7 +1756,7 @@ func (o *cursorOffsetNext) processV0Message( fp.Err = fmt.Errorf("unknown attributes on message %d", message.Attributes) return false } - record := v0MessageToRecord(o.from.topic, fp.Partition, message) + record := v0MessageToRecord(o.Topic, fp.Partition, message) o.maybeKeepRecord(fp, record, false) return true } @@ -1717,8 +1765,8 @@ func (o *cursorOffsetNext) processV0Message( // // If the record is being aborted or the record is a control record and the // client does not want to keep control records, this does not keep the record. -func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { - if record.Offset < o.offset { +func (o *ProcessFetchPartitionOptions) maybeKeepRecord(fp *FetchPartition, record *Record, abort bool) { + if record.Offset < o.Offset { // We asked for offset 5, but that was in the middle of a // batch; we got offsets 0 thru 4 that we need to skip. return @@ -1726,7 +1774,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // We only keep control records if specifically requested. if record.Attrs.IsControl() { - abort = !o.from.keepControl + abort = !o.KeepControlRecords } if !abort { fp.Records = append(fp.Records, record) @@ -1734,9 +1782,7 @@ func (o *cursorOffsetNext) maybeKeepRecord(fp *FetchPartition, record *Record, a // The record offset may be much larger than our expected offset if the // topic is compacted. - o.offset = record.Offset + 1 - o.lastConsumedEpoch = record.LeaderEpoch - o.lastConsumedTime = record.Timestamp + o.Offset = record.Offset + 1 } ///////////////////////////////