The project provides a framework for consuming Kafka.
It aims to simplify the logic of data consumption and transmission, and actively provide a configurable and efficient way.
With core and core-processor
, we can do this:
package main
import (
"context"
"encoding/json"
"fmt"
"github.com/DoNewsCode/core"
processor "github.com/DoNewsCode/core-processor"
"github.com/DoNewsCode/core/di"
"github.com/DoNewsCode/core/otkafka"
"github.com/segmentio/kafka-go"
)
type Handler struct {
}
func NewHandlerOut() processor.Out {
return processor.NewOut(
&Handler{},
)
}
type Data struct {
ID int `json:"id"`
Name string `json:"name"`
}
func (h *Handler) Info() *processor.Info {
return &processor.Info{
Name: "default", // the reader name is default
BatchSize: 3,
}
}
func (h *Handler) Handle(ctx context.Context, msg *kafka.Message) (interface{}, error) {
e := &Data{}
if err := json.Unmarshal(msg.Value, &e); err != nil {
return nil, err
}
return e, nil
}
func (h *Handler) Batch(ctx context.Context, data []interface{}) error {
for _, e := range data {
fmt.Println(e.(*Data))
}
return nil
}
func main() {
// prepare config and dependencies
c := core.New(
core.WithInline("kafka.reader.default.brokers", []string{"127.0.0.1:9092"}),
core.WithInline("kafka.reader.default.topic", "test"),
core.WithInline("kafka.reader.default.groupID", "test"),
core.WithInline("kafka.reader.default.startOffset", kafka.FirstOffset),
)
defer c.Shutdown()
c.ProvideEssentials()
c.Provide(otkafka.Providers())
c.AddModuleFunc(processor.New)
// provide your handlers
c.Provide(di.Deps{
NewHandlerOut,
})
// start server
err := c.Serve(context.Background())
if err != nil {
panic(err)
}
}
After the above, we just need to add handlers and provide new methods for core.
We can use processor.Info
to flexibly adjust the operation of the processor.
Have fun!