forked from lovoo/goka
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
717 lines (612 loc) · 19.7 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
package goka
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/lovoo/goka/logger"
"github.com/lovoo/goka/multierr"
"github.com/lovoo/goka/storage"
)
const (
// ProcStateIdle indicates an idling partition processor (not started yet)
ProcStateIdle State = iota
// ProcStateStarting indicates a starting partition processor, i.e. before rebalance
ProcStateStarting
// ProcStateSetup indicates a partition processor during setup of a rebalance round
ProcStateSetup
// ProcStateRunning indicates a running partition processor
ProcStateRunning
// ProcStateStopping indicates a stopping partition processor
ProcStateStopping
)
// ProcessCallback function is called for every message received by the
// processor.
type ProcessCallback func(ctx Context, msg interface{})
// Processor is a set of stateful callback functions that, on the arrival of
// messages, modify the content of a table (the group table) and emit messages into other
// topics. Messages as well as rows in the group table are key-value pairs.
// A group is composed by multiple processor instances.
type Processor struct {
opts *poptions
log logger.Logger
brokers []string
rebalanceCallback RebalanceCallback
// Partition processors
partitions map[int32]*PartitionProcessor
// lookup tables
lookupTables map[string]*View
partitionCount int
graph *GroupGraph
saramaConsumer sarama.Consumer
producer Producer
tmgr TopicManager
state *Signal
ctx context.Context
cancel context.CancelFunc
}
// NewProcessor creates a processor instance in a group given the address of
// Kafka brokers, the consumer group name, a list of subscriptions (topics,
// codecs, and callbacks), and series of options.
func NewProcessor(brokers []string, gg *GroupGraph, options ...ProcessorOption) (*Processor, error) {
options = append(
// default options comes first
[]ProcessorOption{
WithClientID(fmt.Sprintf("goka-processor-%s", gg.Group())),
WithLogger(logger.Default()),
WithUpdateCallback(DefaultUpdate),
WithPartitionChannelSize(defaultPartitionChannelSize),
WithStorageBuilder(storage.DefaultBuilder(DefaultProcessorStoragePath(gg.Group()))),
WithRebalanceCallback(DefaultRebalance),
},
// user-defined options (may overwrite default ones)
options...,
)
if err := gg.Validate(); err != nil {
return nil, err
}
opts := new(poptions)
err := opts.applyOptions(gg, options...)
if err != nil {
return nil, fmt.Errorf(errApplyOptions, err)
}
npar, err := prepareTopics(brokers, gg, opts)
if err != nil {
return nil, err
}
// create views
lookupTables := make(map[string]*View)
for _, t := range gg.LookupTables() {
view, err := NewView(brokers, Table(t.Topic()), t.Codec(),
WithViewLogger(opts.log),
WithViewHasher(opts.hasher),
WithViewClientID(opts.clientID),
WithViewTopicManagerBuilder(opts.builders.topicmgr),
WithViewStorageBuilder(opts.builders.storage),
WithViewConsumerSaramaBuilder(opts.builders.consumerSarama),
)
if err != nil {
return nil, fmt.Errorf("error creating view: %v", err)
}
lookupTables[t.Topic()] = view
}
// combine things together
processor := &Processor{
opts: opts,
log: opts.log.Prefix(fmt.Sprintf("Processor %s", gg.Group())),
brokers: brokers,
rebalanceCallback: opts.rebalanceCallback,
partitions: make(map[int32]*PartitionProcessor),
partitionCount: npar,
lookupTables: lookupTables,
graph: gg,
state: NewSignal(ProcStateIdle, ProcStateStarting, ProcStateSetup, ProcStateRunning, ProcStateStopping).SetState(ProcStateIdle),
}
return processor, nil
}
// Graph returns the group graph of the processor.
func (g *Processor) Graph() *GroupGraph {
return g.graph
}
// isStateless returns whether the processor is a stateless one.
func (g *Processor) isStateless() bool {
return g.graph.GroupTable() == nil
}
///////////////////////////////////////////////////////////////////////////////
// value getter
///////////////////////////////////////////////////////////////////////////////
// Get returns a read-only copy of a value from the group table if the
// respective partition is owned by the processor instace.
// Get can be called by multiple goroutines concurrently.
// Get can be only used with stateful processors (ie, when group table is
// enabled) and after Recovered returns true.
func (g *Processor) Get(key string) (interface{}, error) {
if g.isStateless() {
return nil, fmt.Errorf("can't get a value from stateless processor")
}
// find partition where key is located
s, err := g.find(key)
if err != nil {
return nil, err
}
// get key and return
val, err := s.Get(key)
if err != nil {
return nil, fmt.Errorf("error getting %s: %v", key, err)
} else if val == nil {
// if the key does not exist the return value is nil
return nil, nil
}
// since we don't know what the codec does, make copy of the object
data := make([]byte, len(val))
copy(data, val)
value, err := g.graph.GroupTable().Codec().Decode(data)
if err != nil {
return nil, fmt.Errorf("error decoding %s: %v", key, err)
}
return value, nil
}
func (g *Processor) find(key string) (storage.Storage, error) {
p, err := g.hash(key)
if err != nil {
return nil, err
}
if _, ok := g.partitions[p]; !ok {
return nil, fmt.Errorf("this processor does not contain partition %v", p)
}
return g.partitions[p].table.st, nil
}
func (g *Processor) hash(key string) (int32, error) {
// create a new hasher every time. Alternative would be to store the hash in
// view and every time reset the hasher (ie, hasher.Reset()). But that would
// also require us to protect the access of the hasher with a mutex.
hasher := g.opts.hasher()
_, err := hasher.Write([]byte(key))
if err != nil {
return -1, err
}
hash := int32(hasher.Sum32())
if hash < 0 {
hash = -hash
}
if g.partitionCount == 0 {
return 0, errors.New("can't hash with 0 partitions")
}
return hash % int32(g.partitionCount), nil
}
// Run starts the processor using passed context.
// The processor stops in case of errors or if the context is cancelled
func (g *Processor) Run(ctx context.Context) (rerr error) {
g.log.Debugf("starting")
defer g.log.Debugf("stopped")
// create errorgroup
ctx, g.cancel = context.WithCancel(ctx)
errg, ctx := multierr.NewErrGroup(ctx)
g.ctx = ctx
defer g.cancel()
// set a starting state. From this point on we know that there's a cancel and a valid context set
// in the processor which we can use for waiting
g.state.SetState(ProcStateStarting)
// collect all errors before leaving
merrors := new(multierr.Errors)
defer func() {
_ = merrors.Collect(rerr)
rerr = merrors.NilOrError()
}()
// create kafka consumer
consumerGroup, err := g.opts.builders.consumerGroup(g.brokers, string(g.graph.Group()), g.opts.clientID)
if err != nil {
return fmt.Errorf(errBuildConsumer, err)
}
errg.Go(func() error {
consumerErrors := consumerGroup.Errors()
for {
select {
case err, ok := <-consumerErrors:
if !ok {
return nil
}
var cbErr *errProcessing
if errors.As(err, &cbErr) {
g.log.Printf("error processing message (non-transient), stopping execution: %v", err)
return cbErr
}
g.log.Printf("Error executing group consumer (continuing execution): %v", err)
case <-ctx.Done():
// drain the channel and log in case we still have errors in the channel, otherwise they would be muted.
for err := range consumerErrors {
g.log.Printf("Error executing consumer group: %v", err)
}
}
}
})
g.saramaConsumer, err = g.opts.builders.consumerSarama(g.brokers, g.opts.clientID)
if err != nil {
return fmt.Errorf("Error creating consumer for brokers [%s]: %v", strings.Join(g.brokers, ","), err)
}
g.tmgr, err = g.opts.builders.topicmgr(g.brokers)
if err != nil {
return fmt.Errorf("Error creating topic manager for brokers [%s]: %v", strings.Join(g.brokers, ","), err)
}
// create kafka producer
g.log.Debugf("creating producer")
producer, err := g.opts.builders.producer(g.brokers, g.opts.clientID, g.opts.hasher)
if err != nil {
return fmt.Errorf(errBuildProducer, err)
}
g.producer = producer
defer func() {
g.log.Debugf("closing producer")
defer g.log.Debugf("producer ... closed")
if err := g.producer.Close(); err != nil {
merrors.Collect(fmt.Errorf("error closing producer: %v", err))
}
}()
// start all lookup tables
for topic, view := range g.lookupTables {
g.log.Debugf("Starting lookup table for %s", topic)
// make local copies
topic, view := topic, view
errg.Go(func() error {
if err := view.Run(ctx); err != nil {
return fmt.Errorf("error running lookup table %s: %v", topic, err)
}
return nil
})
}
g.waitForLookupTables()
// run the main rebalance-consume-loop
errg.Go(func() error {
return g.rebalanceLoop(ctx, consumerGroup)
})
return errg.Wait().NilOrError()
}
func (g *Processor) rebalanceLoop(ctx context.Context, consumerGroup sarama.ConsumerGroup) (rerr error) {
var topics []string
for _, e := range g.graph.InputStreams() {
topics = append(topics, e.Topic())
}
if g.graph.LoopStream() != nil {
topics = append(topics, g.graph.LoopStream().Topic())
}
var errs = new(multierr.Errors)
defer func() {
errs.Collect(rerr)
rerr = errs.NilOrError()
}()
defer func() {
g.log.Debugf("closing consumer group")
defer g.log.Debugf("closing consumer group ... done")
errs.Collect(consumerGroup.Close())
}()
for {
var (
consumeErr = make(chan error)
)
go func() {
g.log.Debugf("consuming from consumer loop")
defer g.log.Debugf("consuming from consumer loop done")
defer close(consumeErr)
err := consumerGroup.Consume(ctx, topics, g)
if err != nil {
consumeErr <- err
}
}()
select {
case err := <-consumeErr:
g.log.Debugf("Consumer group loop done, will stop here")
if err != nil {
errs.Collect(err)
return
}
case <-ctx.Done():
g.log.Debugf("context closed, waiting for processor to finish up")
err := <-consumeErr
errs.Collect(err)
g.log.Debugf("context closed, waiting for processor to finish up")
return
}
// let's wait some time before we retry to consume
<-time.After(5 * time.Second)
}
}
func (g *Processor) waitForLookupTables() {
// no tables to wait for
if len(g.lookupTables) == 0 {
return
}
multiWait := multierr.NewMultiWait(g.ctx, len(g.lookupTables))
// init the multiwait with all
for _, view := range g.lookupTables {
multiWait.Add(view.WaitRunning())
}
var (
start = time.Now()
logTicker = time.NewTicker(1 * time.Minute)
)
defer logTicker.Stop()
for {
select {
case <-g.ctx.Done():
g.log.Debugf("Stopping to wait for views to get up, context closed")
return
case <-multiWait.Done():
g.log.Debugf("View catchup finished")
return
case <-logTicker.C:
var tablesWaiting []string
for topic, view := range g.lookupTables {
if !view.Recovered() {
tablesWaiting = append(tablesWaiting, topic)
}
}
g.log.Printf("Waiting for views [%s] to catchup since %.2f minutes",
strings.Join(tablesWaiting, ", "),
time.Since(start).Minutes())
}
}
}
// Recovered returns whether the processor is running, i.e. if the processor
// has recovered all lookups/joins/tables and is running
func (g *Processor) Recovered() bool {
return g.state.IsState(ProcStateRunning)
}
func (g *Processor) assignmentFromSession(session sarama.ConsumerGroupSession) (Assignment, error) {
var (
assignment Assignment
)
// get the partitions this processor is assigned to.
// We use that loop to verify copartitioning and fail otherwise
for _, claim := range session.Claims() {
// for first claim, generate the assignment
if assignment == nil {
assignment = Assignment{}
for _, part := range claim {
assignment[part] = sarama.OffsetNewest
}
} else {
// for all others, verify the assignment is the same
// first check length
if len(claim) != len(assignment) {
return nil, fmt.Errorf("session claims are not copartitioned: %#v", session.Claims())
}
// then check the partitions are exactly the same
for _, part := range claim {
if _, exists := assignment[part]; !exists {
return nil, fmt.Errorf("session claims are not copartitioned: %#v", session.Claims())
}
}
}
}
return assignment, nil
}
// Setup is run at the beginning of a new session, before ConsumeClaim.
func (g *Processor) Setup(session sarama.ConsumerGroupSession) error {
g.state.SetState(ProcStateSetup)
defer g.state.SetState(ProcStateRunning)
g.log.Debugf("setup generation %d, claims=%#v", session.GenerationID(), session.Claims())
defer g.log.Debugf("setup generation %d ... done", session.GenerationID())
assignment, err := g.assignmentFromSession(session)
if err != nil {
return fmt.Errorf("Error verifying assignment from session: %v", err)
}
if g.rebalanceCallback != nil {
g.rebalanceCallback(assignment)
}
// no partitions configured, we cannot setup anything
if len(assignment) == 0 {
g.log.Printf("No partitions assigned. Claims were: %#v. Will probably sleep this generation", session.Claims())
return nil
}
// create partition views for all partitions
for partition := range assignment {
// create partition processor for our partition
err := g.createPartitionProcessor(session.Context(), partition, session)
if err != nil {
return fmt.Errorf("Error creating partition processor for %s/%d: %v", g.Graph().Group(), partition, err)
}
}
// setup all processors
errg, _ := multierr.NewErrGroup(session.Context())
for _, partition := range g.partitions {
pproc := partition
errg.Go(func() error {
return pproc.Setup(session.Context())
})
}
return errg.Wait().NilOrError()
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
// but before the offsets are committed for the very last time.
func (g *Processor) Cleanup(session sarama.ConsumerGroupSession) error {
g.log.Debugf("Cleaning up for %d", session.GenerationID())
defer g.log.Debugf("Cleaning up for %d ... done", session.GenerationID())
g.state.SetState(ProcStateStopping)
defer g.state.SetState(ProcStateIdle)
errg, _ := multierr.NewErrGroup(session.Context())
for part, partition := range g.partitions {
partID, pproc := part, partition
errg.Go(func() error {
err := pproc.Stop()
if err != nil {
return fmt.Errorf("error stopping partition processor %d: %v", partID, err)
}
return nil
})
}
err := errg.Wait().NilOrError()
g.partitions = make(map[int32]*PartitionProcessor)
return err
}
// WaitForReady waits until the processor is ready to consume messages (or is actually consuming messages)
// i.e., it is done catching up all partition tables, joins and lookup tables
func (g *Processor) WaitForReady() {
// wait for the processor to be started
<-g.state.WaitForStateMin(ProcStateStarting)
// wait that the processor is actually running
select {
case <-g.state.WaitForState(ProcStateRunning):
case <-g.ctx.Done():
}
// wait for all partitionprocessors to be running
for _, part := range g.partitions {
select {
case <-part.state.WaitForState(PPStateRunning):
case <-g.ctx.Done():
}
}
}
// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
// Once the Messages() channel is closed, the Handler must finish its processing
// loop and exit.
func (g *Processor) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
g.log.Debugf("ConsumeClaim for topic/partition %s/%d, initialOffset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
defer g.log.Debugf("ConsumeClaim done for topic/partition %s/%d", claim.Topic(), claim.Partition())
part, has := g.partitions[claim.Partition()]
if !has {
return fmt.Errorf("No partition (%d) to handle input in topic %s", claim.Partition(), claim.Topic())
}
messages := claim.Messages()
errors := part.Errors()
for {
select {
case msg, ok := <-messages:
if !ok {
return nil
}
select {
case part.input <- msg:
case err := <-errors:
if err != nil {
return newErrProcessing(err)
}
return nil
}
case err := <-errors:
if err != nil {
return newErrProcessing(err)
}
return nil
}
}
}
// Stats returns the aggregated stats for the processor including all partitions, tables, lookups and joins
func (g *Processor) Stats() *ProcessorStats {
return g.StatsWithContext(context.Background())
}
// StatsWithContext returns stats for the processor, see #Processor.Stats()
func (g *Processor) StatsWithContext(ctx context.Context) *ProcessorStats {
var (
m sync.Mutex
stats = newProcessorStats(len(g.partitions))
)
errg, ctx := multierr.NewErrGroup(ctx)
// get partition-processor stats
for partID, proc := range g.partitions {
partID, proc := partID, proc
errg.Go(func() error {
partStats := proc.fetchStats(ctx)
m.Lock()
defer m.Unlock()
stats.Group[partID] = partStats
return nil
})
}
for topic, view := range g.lookupTables {
topic, view := topic, view
errg.Go(func() error {
m.Lock()
viewStats := view.Stats(ctx)
defer m.Unlock()
stats.Lookup[topic] = viewStats
return nil
})
}
err := errg.Wait().NilOrError()
if err != nil {
g.log.Printf("Error retrieving stats: %v", err)
}
return stats
}
// creates the partition that is responsible for the group processor's table
func (g *Processor) createPartitionProcessor(ctx context.Context, partition int32, session sarama.ConsumerGroupSession) error {
g.log.Debugf("Creating partition processor for partition %d", partition)
if _, has := g.partitions[partition]; has {
return fmt.Errorf("processor [%s]: partition %d already exists", g.graph.Group(), partition)
}
backoff, err := g.opts.builders.backoff()
if err != nil {
return fmt.Errorf("processor [%s]: could not build backoff handler: %v", g.graph.Group(), err)
}
pproc := newPartitionProcessor(partition, g.graph, session, g.log, g.opts, g.lookupTables, g.saramaConsumer, g.producer, g.tmgr, backoff, g.opts.backoffResetTime)
g.partitions[partition] = pproc
return nil
}
// Stop stops the processor.
// This is semantically equivalent of closing the Context
// that was passed to Processor.Run(..).
// This method will return immediately, errors during running
// will be returned from teh Processor.Run(..)
func (g *Processor) Stop() {
g.cancel()
}
func prepareTopics(brokers []string, gg *GroupGraph, opts *poptions) (npar int, err error) {
// create topic manager
tm, err := opts.builders.topicmgr(brokers)
if err != nil {
return 0, fmt.Errorf("Error creating topic manager: %v", err)
}
defer func() {
e := tm.Close()
if e != nil && err == nil {
err = fmt.Errorf("Error closing topic manager: %v", e)
}
}()
// check co-partitioned (external) topics have the same number of partitions
npar, err = ensureCopartitioned(tm, gg.copartitioned().Topics())
if err != nil {
return 0, err
}
// TODO(diogo): add output topics
if ls := gg.LoopStream(); ls != nil {
ensureStreams := []string{ls.Topic()}
for _, t := range ensureStreams {
if err = tm.EnsureStreamExists(t, npar); err != nil {
return 0, err
}
}
}
if gt := gg.GroupTable(); gt != nil {
if err = tm.EnsureTableExists(gt.Topic(), npar); err != nil {
return 0, err
}
}
return
}
// returns the number of partitions the topics have, and an error if topics are
// not copartitionea.
func ensureCopartitioned(tm TopicManager, topics []string) (int, error) {
var npar int
for _, topic := range topics {
partitions, err := tm.Partitions(topic)
if err != nil {
return 0, fmt.Errorf("Error fetching partitions for topic %s: %v", topic, err)
}
// check assumption that partitions are gap-less
for i, p := range partitions {
if i != int(p) {
return 0, fmt.Errorf("Topic %s has partition gap: %v", topic, partitions)
}
}
if npar == 0 {
npar = len(partitions)
}
if len(partitions) != npar {
return 0, fmt.Errorf("Topic %s does not have %d partitions", topic, npar)
}
}
return npar, nil
}