Skip to content

Commit

Permalink
Ver2.0.0 alpha.11 (#20)
Browse files Browse the repository at this point in the history
* add MaxRetry for output

* add: InputContext,OuputContext

* fix tcp test

* change log level from warn to debug when input message parse error
  • Loading branch information
mimuret authored Jan 21, 2023
1 parent 2ad8f0f commit 5f1c184
Show file tree
Hide file tree
Showing 30 changed files with 226 additions and 84 deletions.
32 changes: 21 additions & 11 deletions pkg/core/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"syscall"

"github.com/mimuret/dtap/v2/pkg/config"
"github.com/mimuret/dtap/v2/pkg/logger"
"github.com/mimuret/dtap/v2/pkg/plugin"
"github.com/mimuret/dtap/v2/pkg/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -147,15 +146,20 @@ func (c *Controller) Run(ctx context.Context) error {
iCtx, iCancel := context.WithCancel(ctx)
for i, inputPlugin := range c.inputPlugins {
iwg.Add(1)
go func(i int, ip types.InputPlugin) {
logger.GetLogger().Info("start input plugin", zap.String("name", ip.GetName()), zap.Int("no", i))
err := ip.Start(iCtx, c.inputBuffer)
logger.GetLogger().Info("finish input plugin", zap.String("name", ip.GetName()), zap.Int("no", i), zap.Error(err))
ic := &types.InputContext{
No: i,
Logger: c.logger.With(zap.String("name", inputPlugin.GetName()), zap.Int("no", i)),
Writer: c.inputBuffer,
}
go func(ip types.InputPlugin, ic *types.InputContext) {
ic.Logger.Info("start input plugin")
err := ip.Start(iCtx, ic)
ic.Logger.Info("finish input plugin")
if err != nil {
errCh <- err
}
iwg.Done()
}(i, inputPlugin)
}(inputPlugin, ic)
}

// start outputPlugin
Expand All @@ -164,15 +168,21 @@ func (c *Controller) Run(ctx context.Context) error {
for _, og := range c.outputGroups {
for i, outputPlugin := range og.outputs {
owg.Add(1)
go func(i int, og OutputGroup, op types.OutputPlugin) {
logger.GetLogger().Info("start output plugin", zap.String("og", og.name), zap.String("name", op.GetName()), zap.Int("no", i))
err := op.Start(oCtx, og.buffer)
logger.GetLogger().Info("finish output plugin", zap.String("og", og.name), zap.String("name", op.GetName()), zap.Int("no", i), zap.Error(err))
oc := &types.OutputContext{
OutputGroup: og.name,
No: i,
Logger: c.logger.With(zap.String("og", og.name), zap.String("name", outputPlugin.GetName()), zap.Int("no", i)),
Reader: og.buffer,
}
go func(op types.OutputPlugin, oc *types.OutputContext) {
oc.Logger.Info("start output plugin")
err := op.Start(oCtx, oc)
oc.Logger.Info("finish output plugin")
if err != nil {
errCh <- err
}
owg.Done()
}(i, og, outputPlugin)
}(outputPlugin, oc)
}
}
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import (

// input
_ "github.com/mimuret/dtap/v2/pkg/plugin/input/file"
_ "github.com/mimuret/dtap/v2/pkg/plugin/input/nats"
_ "github.com/mimuret/dtap/v2/pkg/plugin/input/tcp"
_ "github.com/mimuret/dtap/v2/pkg/plugin/input/unix"
_ "github.com/mimuret/dtap/v2/pkg/plugin/input/nats"

// output
_ "github.com/mimuret/dtap/v2/pkg/plugin/output/file"
Expand Down
8 changes: 4 additions & 4 deletions pkg/plugin/input/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func SetupFile(bs json.RawMessage) (types.InputPlugin, error) {
}
if input.NewInputServer(p.Format, &framestream.DecoderOptions{
Bidirectional: false,
}) == nil {
}, nil) == nil {
return nil, errors.Errorf("invalid format")
}
p.fs = afero.NewOsFs()
Expand All @@ -65,16 +65,16 @@ type File struct {
Format input.Format
}

func (p *File) Start(_ context.Context, w types.Writer) error {
func (p *File) Start(_ context.Context, ic *types.InputContext) error {
is := input.NewInputServer(p.Format, &framestream.DecoderOptions{
ContentType: dnstap.FSContentType,
Bidirectional: false,
})
}, ic)
r, err := p.fs.Open(p.Path)
if err != nil {
return fmt.Errorf("failed to open file: %w", err)
}
if err := is.Read(r, w); err != nil {
if err := is.Read(r, ic.Writer); err != nil {
return fmt.Errorf("failed to push message: %w", err)
}
return nil
Expand Down
9 changes: 6 additions & 3 deletions pkg/plugin/input/file/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/mimuret/dtap/v2/pkg/buffer"
"github.com/mimuret/dtap/v2/pkg/plugin/input/file"
"github.com/mimuret/dtap/v2/pkg/testtool"
"github.com/mimuret/dtap/v2/pkg/types"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -80,6 +81,7 @@ var _ = Describe("input/file", func() {
Context("Start", func() {
var (
p types.InputPlugin
ic *types.InputContext
fp *file.File
err error
fs afero.Fs
Expand All @@ -88,6 +90,7 @@ var _ = Describe("input/file", func() {
)
BeforeEach(func() {
buf = buffer.NewRingBuffer(10, nil, nil)
ic = testtool.NewTestInputContext(buf)
fs = afero.NewMemMapFs()
p, err = file.SetupFile(validConfig)
Expect(err).To(Succeed())
Expand All @@ -96,7 +99,7 @@ var _ = Describe("input/file", func() {
})
When("file not exist", func() {
BeforeEach(func() {
err = fp.Start(context.TODO(), buf)
err = fp.Start(context.TODO(), ic)
})
It("returns error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -110,7 +113,7 @@ var _ = Describe("input/file", func() {
_, err = f.Write(dummyData)
Expect(err).To(Succeed())
f.Close()
err = fp.Start(context.TODO(), buf)
err = fp.Start(context.TODO(), ic)
})
It("returns error", func() {
Expect(err).To(HaveOccurred())
Expand All @@ -124,7 +127,7 @@ var _ = Describe("input/file", func() {
_, err = f.Write(validData)
Expect(err).To(Succeed())
f.Close()
err = fp.Start(context.TODO(), buf)
err = fp.Start(context.TODO(), ic)
})
It("returns error", func() {
Expect(err).To(Succeed())
Expand Down
7 changes: 4 additions & 3 deletions pkg/plugin/input/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"strings"

framestream "github.com/farsightsec/golang-framestream"
"github.com/mimuret/dtap/v2/pkg/types"
)

const DefaultFormat = "DNSTAP"
Expand All @@ -14,18 +15,18 @@ var (
registry = map[Format]NewFormatFunc{}
)

type NewFormatFunc func(options *framestream.DecoderOptions) *InputServer
type NewFormatFunc func(options *framestream.DecoderOptions, ic *types.InputContext) *InputServer

func RegisterFormat(f Format, newFunc NewFormatFunc) {
f = Format(strings.ToUpper(string(f)))
registry[f] = newFunc
}

func NewInputServer(f Format, options *framestream.DecoderOptions) *InputServer {
func NewInputServer(f Format, options *framestream.DecoderOptions, ic *types.InputContext) *InputServer {
f = Format(strings.ToUpper(string(f)))
newFunc := registry[f]
if newFunc == nil {
return nil
}
return newFunc(options)
return newFunc(options, ic)
}
13 changes: 6 additions & 7 deletions pkg/plugin/input/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

dnstap "github.com/dnstap/golang-dnstap"
framestream "github.com/farsightsec/golang-framestream"
"github.com/mimuret/dtap/v2/pkg/logger"
"github.com/mimuret/dtap/v2/pkg/plugin/pub"
"github.com/mimuret/dtap/v2/pkg/types"
"github.com/pkg/errors"
Expand Down Expand Up @@ -56,12 +55,12 @@ type FstrmUnmarshaler func([]byte) (*types.DnstapMessage, error)

type InputServer struct {
DecoderOptions *framestream.DecoderOptions
logger *zap.Logger
connectionManager *connectionManager
unmarshaler FstrmUnmarshaler
ic *types.InputContext
}

func NewDnstapInputServer(options *framestream.DecoderOptions) *InputServer {
func NewDnstapInputServer(options *framestream.DecoderOptions, ic *types.InputContext) *InputServer {
if options == nil {
options = &framestream.DecoderOptions{
Bidirectional: true,
Expand All @@ -72,13 +71,13 @@ func NewDnstapInputServer(options *framestream.DecoderOptions) *InputServer {
}
return &InputServer{
DecoderOptions: options,
logger: logger.GetLogger(),
ic: ic,
connectionManager: newConnectionManager(),
unmarshaler: types.NewDnstapMessage,
}
}

func NewDtapFrameInputServer(options *framestream.DecoderOptions) *InputServer {
func NewDtapFrameInputServer(options *framestream.DecoderOptions, ic *types.InputContext) *InputServer {
if options == nil {
options = &framestream.DecoderOptions{
Bidirectional: true,
Expand All @@ -89,7 +88,7 @@ func NewDtapFrameInputServer(options *framestream.DecoderOptions) *InputServer {
}
return &InputServer{
DecoderOptions: options,
logger: logger.GetLogger(),
ic: ic,
connectionManager: newConnectionManager(),
unmarshaler: types.NewDnstapMessageFromDtapFrameRaw,
}
Expand All @@ -114,7 +113,7 @@ func (i *InputServer) Serve(ln net.Listener, buf types.Writer) error {
go func(conn net.Conn) {
if err := i.Read(conn, buf); err != nil {
TotalDecordError.Inc()
i.logger.Warn("input error", zap.Error(err))
i.ic.Logger.Debug("input error", zap.Error(err))
}
i.connectionManager.remove(conn)
wg.Done()
Expand Down
4 changes: 2 additions & 2 deletions pkg/plugin/input/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ var _ = Describe("InputServer", func() {
buf types.Writer
)
BeforeEach(func() {
srv = input.NewInputServer(input.FormatDNSTAP, nil)
srv = input.NewInputServer(input.FormatDNSTAP, nil, testtool.NewTestInputContext(nil))
srvErr = nil
buf = buffer.NewRingBuffer(100, &counter{}, &counter{})
ln, srvErr = nettest.NewLocalListener("unix")
Expand Down Expand Up @@ -80,7 +80,7 @@ var _ = Describe("InputServer", func() {
buf types.Buffer
)
BeforeEach(func() {
srv = input.NewInputServer(input.FormatDNSTAP, nil)
srv = input.NewInputServer(input.FormatDNSTAP, nil, testtool.NewTestInputContext(nil))
buf = buffer.NewRingBuffer(100, &counter{}, &counter{})
connOut, connIn = net.Pipe()
srvErr = nil
Expand Down
18 changes: 12 additions & 6 deletions pkg/plugin/input/nats/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"

"github.com/mimuret/dtap/v2/pkg/logger"
"github.com/mimuret/dtap/v2/pkg/plugin"
"github.com/mimuret/dtap/v2/pkg/types"

Expand Down Expand Up @@ -59,9 +58,13 @@ func Setup(bs json.RawMessage) (types.InputPlugin, error) {
}
if input.NewInputServer(s.Format, &framestream.DecoderOptions{
Bidirectional: false,
}) == nil {
}, nil) == nil {
return nil, errors.Errorf("invalid format")
}
// for test
s.ic = &types.InputContext{
Logger: zap.NewExample(),
}
return s, nil
}

Expand All @@ -73,6 +76,8 @@ type Nats struct {

*output.DnstapOutput

ic *types.InputContext

// config
Hosts []string
Subject string
Expand All @@ -86,14 +91,15 @@ type Nats struct {
Format input.Format
}

func (f *Nats) Start(ctx context.Context, w types.Writer) error {
func (f *Nats) Start(ctx context.Context, ic *types.InputContext) error {
f.ic = ic
LOOP:
for {
select {
case <-ctx.Done():
break LOOP
default:
if err := f.Subscribe(ctx, w); err != nil {
if err := f.Subscribe(ctx, ic.Writer); err != nil {
return err
}
}
Expand Down Expand Up @@ -121,7 +127,7 @@ func (f *Nats) Open() (*nats.Conn, error) {
func (f *Nats) Subscribe(ctx context.Context, w types.Writer) error {
is := input.NewInputServer(f.Format, &framestream.DecoderOptions{
Bidirectional: false,
})
}, f.ic)
nc, err := f.Open()
if err != nil {
return errors.Wrapf(err, "failed to connect nats server")
Expand All @@ -138,7 +144,7 @@ func (f *Nats) Subscribe(ctx context.Context, w types.Writer) error {
_ = sub.Unsubscribe()
}()

logger.GetLogger().Info("start subscribe", zap.String("subject", f.Subject), zap.String("queue name", f.QueueName), zap.Int("queue len", f.QueueLen))
f.ic.Logger.Info("start subscribe", zap.String("subject", f.Subject), zap.String("queue name", f.QueueName), zap.Int("queue len", f.QueueLen))
LOOP:
for {
select {
Expand Down
6 changes: 3 additions & 3 deletions pkg/plugin/input/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func SetupTCPSocket(bs json.RawMessage) (types.InputPlugin, error) {
if p.Port == 0 {
return nil, errors.Errorf("missing parameter Port")
}
if input.NewInputServer(p.Format, nil) == nil {
if input.NewInputServer(p.Format, nil, nil) == nil {
return nil, errors.Errorf("invalid format")
}
return p, nil
Expand Down Expand Up @@ -76,13 +76,13 @@ func (p *TCPSocket) Close() error {
return p.ln.Close()
}

func (p *TCPSocket) Start(ctx context.Context, w types.Writer) error {
func (p *TCPSocket) Start(ctx context.Context, ic *types.InputContext) error {
if err := p.Listen(); err != nil {
return err
}
go func() {
<-ctx.Done()
p.Close()
}()
return input.NewInputServer(p.Format, nil).Serve(p.ln, w)
return input.NewInputServer(p.Format, nil, ic).Serve(p.ln, ic.Writer)
}
10 changes: 10 additions & 0 deletions pkg/plugin/input/tcp/tcp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package tcp_test

import (
"net"

"github.com/goccy/go-json"
"golang.org/x/net/nettest"

"github.com/mimuret/dtap/v2/pkg/plugin/input/tcp"
"github.com/mimuret/dtap/v2/pkg/types"
Expand Down Expand Up @@ -68,9 +71,16 @@ var _ = Describe("input/tcp", func() {
p *tcp.TCPSocket
)
BeforeEach(func() {
ln, lerr := nettest.NewLocalListener("tcp")
Expect(lerr).To(Succeed())
addr, ok := ln.Addr().(*net.TCPAddr)
Expect(ok).To(BeTrue())
ln.Close()

ip, err = tcp.SetupTCPSocket(json.RawMessage(`{"Name": "tcp", "Port": 10053}`))
Expect(err).To(Succeed())
p = ip.(*tcp.TCPSocket)
p.Port = uint16(addr.Port)
})
When("failed to listen", func() {
BeforeEach(func() {
Expand Down
Loading

0 comments on commit 5f1c184

Please sign in to comment.