From 485b83a5735aa869f31d68d8c3d51b911faa647f Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 8 Oct 2024 14:56:54 +1030 Subject: [PATCH 1/7] x-pack/filebeat/input/azureblobstorage: add support for CSV decoding (#40978) The test file txn.csv.gz was obtained from https://netskopepartnerlogfilebucket.s3.amazonaws.com/txn-1722875066329034-fe10b6a23cc643c4b282e6190de2352d.csv.gz --- CHANGELOG.next.asciidoc | 1 + .../inputs/input-azure-blob-storage.asciidoc | 55 ++ .../filebeat/input/azureblobstorage/config.go | 9 + .../input/azureblobstorage/decoding.go | 47 ++ .../input/azureblobstorage/decoding_config.go | 54 ++ .../input/azureblobstorage/decoding_csv.go | 139 ++++ .../input/azureblobstorage/decoding_test.go | 237 +++++++ .../filebeat/input/azureblobstorage/input.go | 2 + .../input/azureblobstorage/input_stateless.go | 1 + x-pack/filebeat/input/azureblobstorage/job.go | 70 ++- .../input/azureblobstorage/testdata/txn.csv | 5 + .../azureblobstorage/testdata/txn.csv.gz | Bin 0 -> 2527 bytes .../input/azureblobstorage/testdata/txn.json | 594 ++++++++++++++++++ .../filebeat/input/azureblobstorage/types.go | 1 + 14 files changed, 1195 insertions(+), 20 deletions(-) create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding.go create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_config.go create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_csv.go create mode 100644 x-pack/filebeat/input/azureblobstorage/decoding_test.go create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.csv create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz create mode 100644 x-pack/filebeat/input/azureblobstorage/testdata/txn.json diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3ad14f0f60e..b36bf002ffe 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -317,6 +317,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779] - Add CSV decoder to awss3 input. {pull}40896[40896] - Change request trace logging to include headers instead of complete request. {pull}41072[41072] +- Add CSV decoding capacity to azureblobstorage input {pull}40978[40978] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc index d6c1b4c9050..0ee02cf91d7 100644 --- a/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc @@ -247,6 +247,61 @@ Example : `10s` would mean we would like the polling to occur every 10 seconds. This attribute can be specified both at the root level of the configuration as well at the container level. The container level values will always take priority and override the root level values if both are specified. +[id="input-{type}-encoding"] +[float] +==== `encoding` + +The file encoding to use for reading data that contains international +characters. This only applies to non-JSON logs. See <<_encoding_3>>. + +[id="input-{type}-decoding"] +[float] +==== `decoding` + +The file decoding option is used to specify a codec that will be used to +decode the file contents. This can apply to any file stream data. +An example config is shown below: + +Currently supported codecs are given below:- + + 1. <>: This codec decodes RFC 4180 CSV data streams. + +[id="attrib-decoding-csv-azureblobstorage"] +[float] +==== `the CSV codec` +The `CSV` codec is used to decode RFC 4180 CSV data streams. +Enabling the codec without other options will use the default codec options. + +[source,yaml] +---- + decoding.codec.csv.enabled: true +---- + +The CSV codec supports five sub attributes to control aspects of CSV decoding. +The `comma` attribute specifies the field separator character used by the CSV +format. If it is not specified, the comma character '`,`' is used. The `comment` +attribute specifies the character that should be interpreted as a comment mark. +If it is specified, lines starting with the character will be ignored. Both +`comma` and `comment` must be single characters. The `lazy_quotes` attribute +controls how quoting in fields is handled. If `lazy_quotes` is true, a quote may +appear in an unquoted field and a non-doubled quote may appear in a quoted field. +The `trim_leading_space` attribute specifies that leading white space should be +ignored, even if the `comma` character is white space. For complete details +of the preceding configuration attribute behaviors, see the CSV decoder +https://pkg.go.dev/encoding/csv#Reader[documentation] The `fields_names` +attribute can be used to specify the column names for the data. If it is +absent, the field names are obtained from the first non-comment line of +data. The number of fields must match the number of field names. + +An example config is shown below: + +[source,yaml] +---- + decoding.codec.csv.enabled: true + decoding.codec.csv.comma: "\t" + decoding.codec.csv.comment: "#" +---- + [id="attrib-file_selectors"] [float] ==== `file_selectors` diff --git a/x-pack/filebeat/input/azureblobstorage/config.go b/x-pack/filebeat/input/azureblobstorage/config.go index 5367596935a..5923e8ce7bb 100644 --- a/x-pack/filebeat/input/azureblobstorage/config.go +++ b/x-pack/filebeat/input/azureblobstorage/config.go @@ -11,6 +11,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore" "github.com/elastic/beats/v7/libbeat/common/match" + "github.com/elastic/beats/v7/libbeat/reader/parser" ) // MaxWorkers, Poll, PollInterval, FileSelectors, TimeStampEpoch & ExpandEventListFromField can @@ -25,6 +26,7 @@ type config struct { PollInterval *time.Duration `config:"poll_interval"` Containers []container `config:"containers" validate:"required"` FileSelectors []fileSelectorConfig `config:"file_selectors"` + ReaderConfig readerConfig `config:",inline"` TimeStampEpoch *int64 `config:"timestamp_epoch"` ExpandEventListFromField string `config:"expand_event_list_from_field"` } @@ -36,6 +38,7 @@ type container struct { Poll *bool `config:"poll"` PollInterval *time.Duration `config:"poll_interval"` FileSelectors []fileSelectorConfig `config:"file_selectors"` + ReaderConfig readerConfig `config:",inline"` TimeStampEpoch *int64 `config:"timestamp_epoch"` ExpandEventListFromField string `config:"expand_event_list_from_field"` } @@ -46,6 +49,12 @@ type fileSelectorConfig struct { // TODO: Add support for reader config in future } +// readerConfig defines the options for reading the content of an azure container. +type readerConfig struct { + Parsers parser.Config `config:",inline"` + Decoding decoderConfig `config:"decoding"` +} + type authConfig struct { SharedCredentials *sharedKeyConfig `config:"shared_credentials"` ConnectionString *connectionStringConfig `config:"connection_string"` diff --git a/x-pack/filebeat/input/azureblobstorage/decoding.go b/x-pack/filebeat/input/azureblobstorage/decoding.go new file mode 100644 index 00000000000..77f0a1e8c2c --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/decoding.go @@ -0,0 +1,47 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "fmt" + "io" +) + +// decoder is an interface for decoding data from an io.Reader. +type decoder interface { + // decode reads and decodes data from an io reader based on the codec type. + // It returns the decoded data and an error if the data cannot be decoded. + decode() ([]byte, error) + // next advances the decoder to the next data item and returns true if there is more data to be decoded. + next() bool + // close closes the decoder and releases any resources associated with it. + // It returns an error if the decoder cannot be closed. + + // more returns whether there are more records to read. + more() bool + + close() error +} + +// valueDecoder is a decoder that can decode directly to a JSON serialisable value. +type valueDecoder interface { + decoder + + decodeValue() ([]byte, map[string]any, error) +} + +// newDecoder creates a new decoder based on the codec type. +// It returns a decoder type and an error if the codec type is not supported. +// If the reader config codec option is not set, it returns a nil decoder and nil error. +func newDecoder(cfg decoderConfig, r io.Reader) (decoder, error) { + switch { + case cfg.Codec == nil: + return nil, nil + case cfg.Codec.CSV != nil: + return newCSVDecoder(cfg, r) + default: + return nil, fmt.Errorf("unsupported config value: %v", cfg) + } +} diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_config.go b/x-pack/filebeat/input/azureblobstorage/decoding_config.go new file mode 100644 index 00000000000..3f680873bf7 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/decoding_config.go @@ -0,0 +1,54 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "fmt" + "unicode/utf8" +) + +// decoderConfig contains the configuration options for instantiating a decoder. +type decoderConfig struct { + Codec *codecConfig `config:"codec"` +} + +// codecConfig contains the configuration options for different codecs used by a decoder. +type codecConfig struct { + CSV *csvCodecConfig `config:"csv"` +} + +// csvCodecConfig contains the configuration options for the CSV codec. +type csvCodecConfig struct { + Enabled bool `config:"enabled"` + + // Fields is the set of field names. If it is present + // it is used to specify the object names of returned + // values and the FieldsPerRecord field in the csv.Reader. + // Otherwise, names are obtained from the first + // line of the CSV data. + Fields []string `config:"fields_names"` + + // The fields below have the same meaning as the + // fields of the same name in csv.Reader. + Comma *configRune `config:"comma"` + Comment configRune `config:"comment"` + LazyQuotes bool `config:"lazy_quotes"` + TrimLeadingSpace bool `config:"trim_leading_space"` +} + +type configRune rune + +func (r *configRune) Unpack(s string) error { + if s == "" { + return nil + } + n := utf8.RuneCountInString(s) + if n != 1 { + return fmt.Errorf("single character option given more than one character: %q", s) + } + _r, _ := utf8.DecodeRuneInString(s) + *r = configRune(_r) + return nil +} diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_csv.go b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go new file mode 100644 index 00000000000..b28be94c523 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/decoding_csv.go @@ -0,0 +1,139 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "bytes" + "encoding/csv" + "fmt" + "io" + "slices" +) + +// csvDecoder is a decoder for CSV data. +type csvDecoder struct { + r *csv.Reader + + header []string + current []string + coming []string + + err error +} + +// newCSVDecoder creates a new CSV decoder. +func newCSVDecoder(config decoderConfig, r io.Reader) (decoder, error) { + d := csvDecoder{r: csv.NewReader(r)} + d.r.ReuseRecord = true + if config.Codec.CSV.Comma != nil { + d.r.Comma = rune(*config.Codec.CSV.Comma) + } + d.r.Comment = rune(config.Codec.CSV.Comment) + d.r.LazyQuotes = config.Codec.CSV.LazyQuotes + d.r.TrimLeadingSpace = config.Codec.CSV.TrimLeadingSpace + if len(config.Codec.CSV.Fields) != 0 { + d.r.FieldsPerRecord = len(config.Codec.CSV.Fields) + d.header = config.Codec.CSV.Fields + } else { + h, err := d.r.Read() + if err != nil { + return nil, err + } + d.header = slices.Clone(h) + } + var err error + d.coming, err = d.r.Read() + if err != nil { + return nil, err + } + d.current = make([]string, 0, len(d.header)) + return &d, nil +} + +func (d *csvDecoder) more() bool { return len(d.coming) == len(d.header) } + +// next advances the decoder to the next data item and returns true if +// there is more data to be decoded. +func (d *csvDecoder) next() bool { + if !d.more() && d.err != nil { + return false + } + d.current = d.current[:len(d.header)] + copy(d.current, d.coming) + d.coming, d.err = d.r.Read() + if d.err == io.EOF { + d.coming = nil + } + return true +} + +// decode returns the JSON encoded value of the current CSV line. next must +// have been called before any calls to decode. +func (d *csvDecoder) decode() ([]byte, error) { + err := d.check() + if err != nil { + return nil, err + } + var buf bytes.Buffer + buf.WriteByte('{') + for i, n := range d.header { + if i != 0 { + buf.WriteByte(',') + } + buf.WriteByte('"') + buf.WriteString(n) + buf.WriteString(`":"`) + buf.WriteString(d.current[i]) + buf.WriteByte('"') + } + buf.WriteByte('}') + d.current = d.current[:0] + return buf.Bytes(), nil +} + +// decodeValue returns the value of the current CSV line interpreted as +// an object with fields based on the header held by the receiver. next must +// have been called before any calls to decode. +func (d *csvDecoder) decodeValue() ([]byte, map[string]any, error) { + err := d.check() + if err != nil { + return nil, nil, err + } + m := make(map[string]any, len(d.header)) + for i, n := range d.header { + m[n] = d.current[i] + } + d.current = d.current[:0] + b, err := d.decode() + if err != nil { + return nil, nil, err + } + return b, m, nil +} + +func (d *csvDecoder) check() error { + if d.err != nil { + if d.err == io.EOF && d.coming == nil { + return nil + } + return d.err + } + if len(d.current) == 0 { + return fmt.Errorf("decode called before next") + } + // By the time we are here, current must be the same + // length as header; if it was not read, it would be + // zero, but if it was, it must match by the contract + // of the csv.Reader. + return nil +} + +// close closes the csv decoder and releases the resources. +func (d *csvDecoder) close() error { + if d.err == io.EOF { + return nil + } + return d.err +} diff --git a/x-pack/filebeat/input/azureblobstorage/decoding_test.go b/x-pack/filebeat/input/azureblobstorage/decoding_test.go new file mode 100644 index 00000000000..e541fc74279 --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/decoding_test.go @@ -0,0 +1,237 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package azureblobstorage + +import ( + "context" + "encoding/json" + "errors" + "os" + "path/filepath" + "reflect" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" + azcontainer "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// all test files are read from the "testdata" directory +const testDataPath = "testdata" + +func TestDecoding(t *testing.T) { + logp.TestingSetup() + log := logp.L() + + testCases := []struct { + name string + file string + content string + contentType string + numEvents int + assertAgainst string + config decoderConfig + }{ + { + name: "gzip_csv", + file: "txn.csv.gz", + content: "text/csv", + numEvents: 4, + assertAgainst: "txn.json", + config: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + { + name: "csv", + file: "txn.csv", + content: "text/csv", + numEvents: 4, + assertAgainst: "txn.json", + config: decoderConfig{ + Codec: &codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + file := filepath.Join(testDataPath, tc.file) + if tc.contentType == "" { + tc.contentType = "application/octet-stream" + } + f, err := os.Open(file) + if err != nil { + t.Fatalf("failed to open test data: %v", err) + } + defer f.Close() + p := &pub{t: t} + item := &azcontainer.BlobItem{ + Name: ptr("test_blob"), + Properties: &azcontainer.BlobProperties{ + ContentType: ptr(tc.content), + LastModified: &time.Time{}, + }, + } + j := newJob(&blob.Client{}, item, "https://foo.blob.core.windows.net/", newState(), &Source{}, p, log) + j.src.ReaderConfig.Decoding = tc.config + err = j.decode(context.Background(), f, "test") + if err != nil { + t.Errorf("unexpected error calling decode: %v", err) + } + + events := p.events + if tc.assertAgainst != "" { + targetData := readJSONFromFile(t, filepath.Join(testDataPath, tc.assertAgainst)) + assert.Equal(t, len(targetData), len(events)) + + for i, event := range events { + msg, err := event.Fields.GetValue("message") + assert.NoError(t, err) + assert.JSONEq(t, targetData[i], msg.(string)) + } + } + }) + } +} + +type pub struct { + t *testing.T + events []beat.Event +} + +func (p *pub) Publish(e beat.Event, _cursor interface{}) error { + p.t.Logf("%v\n", e.Fields) + p.events = append(p.events, e) + return nil +} + +// readJSONFromFile reads the json file and returns the data as a slice of strings +func readJSONFromFile(t *testing.T, filepath string) []string { + fileBytes, err := os.ReadFile(filepath) + assert.NoError(t, err) + var rawMessages []json.RawMessage + err = json.Unmarshal(fileBytes, &rawMessages) + assert.NoError(t, err) + var data []string + + for _, rawMsg := range rawMessages { + data = append(data, string(rawMsg)) + } + return data +} + +var codecConfigTests = []struct { + name string + yaml string + want decoderConfig + wantErr error +}{ + { + name: "handle_rune", + yaml: ` +codec: + csv: + enabled: true + comma: ' ' + comment: '#' +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune](' '), + Comment: '#', + }, + }}, + }, + { + name: "no_comma", + yaml: ` +codec: + csv: + enabled: true +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + }, + }}, + }, + { + name: "null_comma", + yaml: ` +codec: + csv: + enabled: true + comma: "\u0000" +`, + want: decoderConfig{&codecConfig{ + CSV: &csvCodecConfig{ + Enabled: true, + Comma: ptr[configRune]('\x00'), + }, + }}, + }, + { + name: "bad_rune", + yaml: ` +codec: + csv: + enabled: true + comma: 'this is too long' +`, + wantErr: errors.New(`single character option given more than one character: "this is too long" accessing 'codec.csv.comma'`), + }, +} + +func TestCodecConfig(t *testing.T) { + for _, test := range codecConfigTests { + t.Run(test.name, func(t *testing.T) { + c, err := conf.NewConfigWithYAML([]byte(test.yaml), "") + if err != nil { + t.Fatalf("unexpected error unmarshaling config: %v", err) + } + + var got decoderConfig + err = c.Unpack(&got) + if !sameError(err, test.wantErr) { + t.Errorf("unexpected error unpacking config: got:%v want:%v", err, test.wantErr) + } + + if !reflect.DeepEqual(got, test.want) { + t.Errorf("unexpected result\n--- want\n+++ got\n%s", cmp.Diff(test.want, got)) + } + }) + } +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} + +func ptr[T any](v T) *T { return &v } diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 245daaef6e5..7c7989ccbce 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -71,6 +71,7 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { TimeStampEpoch: container.TimeStampEpoch, ExpandEventListFromField: container.ExpandEventListFromField, FileSelectors: container.FileSelectors, + ReaderConfig: container.ReaderConfig, }) } @@ -120,6 +121,7 @@ func tryOverrideOrDefault(cfg config, c container) container { if len(c.FileSelectors) == 0 && len(cfg.FileSelectors) != 0 { c.FileSelectors = cfg.FileSelectors } + c.ReaderConfig = cfg.ReaderConfig return c } diff --git a/x-pack/filebeat/input/azureblobstorage/input_stateless.go b/x-pack/filebeat/input/azureblobstorage/input_stateless.go index 73ae14a62e5..8bff050c112 100644 --- a/x-pack/filebeat/input/azureblobstorage/input_stateless.go +++ b/x-pack/filebeat/input/azureblobstorage/input_stateless.go @@ -57,6 +57,7 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher TimeStampEpoch: container.TimeStampEpoch, ExpandEventListFromField: container.ExpandEventListFromField, FileSelectors: container.FileSelectors, + ReaderConfig: container.ReaderConfig, } st := newState() diff --git a/x-pack/filebeat/input/azureblobstorage/job.go b/x-pack/filebeat/input/azureblobstorage/job.go index f886f330af6..ffc8fd04dcd 100644 --- a/x-pack/filebeat/input/azureblobstorage/job.go +++ b/x-pack/filebeat/input/azureblobstorage/job.go @@ -134,20 +134,47 @@ func (j *job) processAndPublishData(ctx context.Context, id string) error { } }() - err = j.readJsonAndPublish(ctx, reader, id) - if err != nil { - return fmt.Errorf("failed to read data from blob with error: %w", err) - } - - return err + return j.decode(ctx, reader, id) } -func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { +func (j *job) decode(ctx context.Context, r io.Reader, id string) error { r, err := j.addGzipDecoderIfNeeded(bufio.NewReader(r)) if err != nil { return fmt.Errorf("failed to add gzip decoder to blob: %s, with error: %w", *j.blob.Name, err) } + dec, err := newDecoder(j.src.ReaderConfig.Decoding, r) + if err != nil { + return err + } + var evtOffset int64 + switch dec := dec.(type) { + case decoder: + defer dec.close() + + for dec.next() { + msg, err := dec.decode() + if err != nil { + if err == io.EOF { + return nil + } + break + } + evt := j.createEvent(string(msg), evtOffset) + j.publish(evt, !dec.more(), id) + } + default: + err = j.readJsonAndPublish(ctx, r, id) + if err != nil { + return fmt.Errorf("failed to read data from blob with error: %w", err) + } + } + + return err +} + +func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) error { + var err error // checks if the root element is an array or not r, j.isRootArray, err = evaluateJSON(bufio.NewReader(r)) if err != nil { @@ -183,22 +210,25 @@ func (j *job) readJsonAndPublish(ctx context.Context, r io.Reader, id string) er return err } evt := j.createEvent(string(data), offset) + j.publish(evt, !dec.More(), id) + } + return nil +} - if !dec.More() { - // if this is the last object, then perform a complete state save - cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) - if err := j.publisher.Publish(evt, cp); err != nil { - j.log.Errorf(jobErrString, id, err) - } - done() - } else { - // since we don't update the cursor checkpoint, lack of a lock here should be fine - if err := j.publisher.Publish(evt, nil); err != nil { - j.log.Errorf(jobErrString, id, err) - } +func (j *job) publish(evt beat.Event, last bool, id string) { + if last { + // if this is the last object, then perform a complete state save + cp, done := j.state.saveForTx(*j.blob.Name, *j.blob.Properties.LastModified) + if err := j.publisher.Publish(evt, cp); err != nil { + j.log.Errorf(jobErrString, id, err) } + done() + return + } + // since we don't update the cursor checkpoint, lack of a lock here should be fine + if err := j.publisher.Publish(evt, nil); err != nil { + j.log.Errorf(jobErrString, id, err) } - return nil } // splitEventList splits the event list into individual events and publishes them diff --git a/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv new file mode 100644 index 00000000000..80ca65df21e --- /dev/null +++ b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv @@ -0,0 +1,5 @@ +date time time-taken cs-bytes sc-bytes bytes c-ip s-ip cs-username cs-method cs-uri-scheme cs-uri-query cs-user-agent cs-content-type sc-status sc-content-type cs-dns cs-host cs-uri cs-uri-port cs-referer x-cs-session-id x-cs-access-method x-cs-app x-s-country x-s-latitude x-s-longitude x-s-location x-s-region x-s-zipcode x-c-country x-c-latitude x-c-longitude x-c-location x-c-region x-c-zipcode x-c-os x-c-browser x-c-browser-version x-c-device x-cs-site x-cs-timestamp x-cs-page-id x-cs-userip x-cs-traffic-type x-cs-tunnel-id x-category x-other-category x-type x-server-ssl-err x-client-ssl-err x-transaction-id x-request-id x-cs-sni x-cs-domain-fronted-sni x-category-id x-other-category-id x-sr-headers-name x-sr-headers-value x-cs-ssl-ja3 x-sr-ssl-ja3s x-ssl-bypass x-ssl-bypass-reason x-r-cert-subject-cn x-r-cert-issuer-cn x-r-cert-startdate x-r-cert-enddate x-r-cert-valid x-r-cert-expired x-r-cert-untrusted-root x-r-cert-incomplete-chain x-r-cert-self-signed x-r-cert-revoked x-r-cert-revocation-check x-r-cert-mismatch x-cs-ssl-fronting-error x-cs-ssl-handshake-error x-sr-ssl-handshake-error x-sr-ssl-client-certificate-error x-sr-ssl-malformed-ssl x-s-custom-signing-ca-error x-cs-ssl-engine-action x-cs-ssl-engine-action-reason x-sr-ssl-engine-action x-sr-ssl-engine-action-reason x-ssl-policy-src-ip x-ssl-policy-dst-ip x-ssl-policy-dst-host x-ssl-policy-dst-host-source x-ssl-policy-categories x-ssl-policy-action x-ssl-policy-name x-cs-ssl-version x-cs-ssl-cipher x-sr-ssl-version x-sr-ssl-cipher x-cs-src-ip-egress x-s-dp-name x-cs-src-ip x-cs-src-port x-cs-dst-ip x-cs-dst-port x-sr-src-ip x-sr-src-port x-sr-dst-ip x-sr-dst-port x-cs-ip-connect-xff x-cs-ip-xff x-cs-connect-host x-cs-connect-port x-cs-connect-user-agent x-cs-url x-cs-uri-path x-cs-http-version rs-status x-cs-app-category x-cs-app-cci x-cs-app-ccl x-cs-app-tags x-cs-app-suite x-cs-app-instance-id x-cs-app-instance-name x-cs-app-instance-tag x-cs-app-activity x-cs-app-from-user x-cs-app-to-user x-cs-app-object-type x-cs-app-object-name x-cs-app-object-id x-rs-file-type x-rs-file-category x-rs-file-language x-rs-file-size x-rs-file-md5 x-rs-file-sha256 x-error x-c-local-time x-policy-action x-policy-name x-policy-src-ip x-policy-dst-ip x-policy-dst-host x-policy-dst-host-source x-policy-justification-type x-policy-justification-reason x-sc-notification-name +2024-08-05 16:24:20 64 2971 2050 5021 10.5.78.159 204.79.197.237 "vikash.ranjan@riverbed.com" GET https cc=US&setlang=en-US "Mozilla/5.0 (Windows NT 10.0; Win64; x64; Cortana 1.14.7.19041; 10.0.0.0.19045.2006) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.102 Safari/537.36 Edge/18.19045" - 200 "application/json; charset=utf-8" www.bing.com www.bing.com /client/config?cc=US&setlang=en-US 443 - 3683772769278232507 "Client" "Microsoft Bing" "US" 47.682899 -122.120903 "Redmond" "Washington" "N/A" "US" 29.775400 -95.598000 "Houston" "Texas" "77079" "Windows 10" "Edge" "18.19045" "Windows Device" "bing" 1722875060 5762388460300455936 10.5.78.159 CloudApp - "Search Engines" - http_transaction - - 2696581500064586450 2901306739654139904 www.bing.com - 551 - - - 28a2c9bd18a11de089ef85a160da29e4 NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.5.78.159 69.192.139.97 www.bing.com Sni "Search Engines" Decrypt - TLSv1.2 ECDHE-RSA-AES256-GCM-SHA384 NotChecked NotChecked 208.185.23.18 "US-ATL2" 10.5.78.159 25941 69.192.139.97 443 - - 10.144.54.201 842 - - - - - https://www.bing.com/client/config?cc=US&setlang=en-US /client/config HTTP1.1 200 "Search Engines" 58 low "Consumer,Unsanctioned" - - - - "Browse" - - - - - - - - - - - - "2024-08-05 11:24:00" "allow" "NetskopeAllow" 10.5.78.159 204.79.197.237 www.bing.com HttpHostHeader - - - +2024-08-05 16:24:19 - 18 0 18 10.70.0.19 - "nadav@skyformation.onmicrosoft.com" PRI - - - - - - - us-west1-b-osconfig.googleapis.com * 443 - 0 "Client" - - - - - - - "US" 45.605600 -121.180700 "The Dalles" "Oregon" "97058" - - - - - 1722875059 0 10.70.0.19 - - "Technology" "Cloud Storage" http_transaction - - 2035489204758272484 0 us-west1-b-osconfig.googleapis.com - 564 "7" - - 7a15285d4efc355608b304698cd7f9ab NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No No NotChecked NotChecked NotChecked No Allow "Established" None "NotEstablished" 10.70.0.19 142.250.99.95 us-west1-b-osconfig.googleapis.com Sni "Technology, Cloud Storage" Decrypt - TLSv1.3 TLS_AES_256_GCM_SHA384 NotChecked NotChecked 34.82.190.203 "US-SEA2" 10.70.0.19 32951 142.250.99.95 443 - - - - - - - - - - - HTTP1.1 - - - - - - - - - - - - - - - - - - - - - http-malformed "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:20 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875060 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1350739992944030464 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - +2024-08-05 16:24:23 - 0 0 0 10.0.20.111 - "levente.fangli@cososys.com" - - - - - - - - achecker-alliances.eu.goskope.com - 443 - 0 "Client" - - - - - - - "RO" 46.765700 23.594300 "Cluj-Napoca" "Cluj County" "400027" - - - - - 1722875063 0 10.0.20.111 - - - - http_transaction - "HsFailure (error:14094418:SSL routines:ssl3_read_bytes:tlsv1 alert unknown ca)" 1615432978285898071 0 achecker-alliances.eu.goskope.com - - - - - bc29aa426fc99c0be1b9be941869f88a NotAvailable No - "NotChecked" "NotChecked" "NotChecked" "NotChecked" NotChecked NotChecked NotChecked NotChecked NotChecked NotChecked "NotChecked" NotChecked No Yes NotChecked NotChecked NotChecked No Block "SSL Error - SSL Handshake Error" None "NotEstablished" - - - Unknown - Decrypt - - - NotChecked NotChecked 81.196.156.53 "AT-VIE1" 10.0.20.111 57897 31.186.239.94 443 - - - - - - - - - - - UNKNOWN - - - - - - - - - - - - - - - - - - - - - client-ssl "NotChecked" NotChecked - - - - - - - - diff --git a/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz b/x-pack/filebeat/input/azureblobstorage/testdata/txn.csv.gz new file mode 100644 index 0000000000000000000000000000000000000000..52e8fb20539a40e3127997a8877e3346ea1c6dfc GIT binary patch literal 2527 zcmV<52_W_#iwFP!00000|8!Mbn(H0bbhgbUzHnwq|6>2XLC4)%VlQk~5hX z8-672pb3p{=C<+y!!HguQuufp>rJT=J&Y^wJt84{jZSZ)s>1!Ywi1GuU1 zNW^-&SfG8sJs|mT1e0f`J({y);=;tRj2xl47&0TgH1dzzDdsaYsG?C7T~yJiin=Pg zsEV3@9}@r55>%t%&Z6zg#CJzrV|G@lR7Ex1_QLMC@R^BJ09OSjIM0qsk_&Q#z`h}5Vg2ZTpxj=E-JsSh9=tisTXHd6SuP)h~ZfLajROK?bu%t3Z>G+Ca(iOk>6x4M7D5x!w?6}#b;=P{O(9cn} zRCM)aCJ(i^<%p6Ggs*OJSX^Kzjr9ZEOIYo^UU~-)m(;@^y`v|na2amuach}27@z-vD9l_V}`#yRRBdnDHW2YPaiJ6tYDfd9TlNM^3p&sZOZ`R-AEh#wip6xKRu^l@ui93>YPn zC{36&xGRVhPGr)pQQ)aoh}0*B5uyi@}YZ z^{Wf#)4K|ioA7trbvBcP!pr;CH(@ux?L3B~=YSi9IEPR1H3|J_y#+=XM#tb(u=Bm% z{)w_AoYLYIl$%ZG?%m>TJ7zLhBro9Q+u8i)`ZaXzgM%wqKZI8>S@+@5Wh$EIGRN8| zhsrK(-+nfmHY=Ageej+@iltLXEc$P2JGQ}RABW)2psJwi zQz5In*7-53gI~6*KmH37A#x0q7dlsptHN-ldCnz8peK9d1Yv9IJ`77E9YaWAdQ0X37LiI6t6mGNf*^V2cloVK5Y9x_H(^%vzJPilU zx%*@XT$Q5AL^sbgF^WP&2$wGy{d;`rwrw5H7W)*L;#@093v4J=@LX$A(43-;G>G(Z ze@(jZ)F6onLsB{0*K3&iRqNdlDTv+lVZ>$|Y#k}UE###zh15)9>V=RR{}k(@ET@Ia z@kp@TV1B1$q#z~Ro1+75d2JRA)0VNuQEl8(OUnw{SZpjXcKwa^vt2Hm*F# z|8rBHur@;gW?>j-&Ni+;g!=ure*h9uW=ReP001A02mk;800003ol{MZ+aM6VSL%Nl zajXgg3^wekRyOKZ>c^^f&la)qht@HXVYAA=FHR24M%gy1#P7RhgVRI`r8yOuR6OHCi3odxR17i|fU*;{%rME7Y=mv=L`H3DlTeXr zRX16y%?7@^knhbfn$2kAV&NJ~tKQa6wn4Wp{^b9gz72PJ{)q(nIJy@=YagPP!S^Si z$sVC!RnLaQ1SeclqBYU-mg_C1;e47O;bNHgww@2W5bv@4TI?A~mT#4>i;By%`n>v9 z6%Tn5hk0E`B8cLV7$wmFiz{)l&iO0K3D=Tc5I?#5T0E5HW3=)cyVqi|Bi@>E`3L_p zn2Wae?|uVN)}`m^0{{RYiwFP!000001MO2yZxb;Py(94-ET7=8mcQ0(=R^rXMW9ro z6vQd=I!?2!v7@y&DF43RO%SwAqzDd3sG7^nc>LbXXdcCs7g!d!1Vl_IpHK`6fJY-3 zVMH+^gybA_c58jIXr=wFYhIZUL)=AV!grXT@0&JjYg*eC+POx1V?Ifhti}7z4^ssW`)9egCbP>rF5k?jtY_y-&Lwaf~soe#ase zrLroqcs$bH)+ucksnb4clbYZM?#uXAH?Hq2cwxIPbQ8j{;+)9I_4NnPg+4Xj#z~Bh zEjp{0i+Y!GoLt-zpq=d!=zZ(M4-brf2?1dxu9#AaW)#dY7d&@!&X#^!ZK%?kQ?W8i z8C+XZtJciWLUy;gJ1rix~Jj5R~yfP0#Er7$S zXT$dMEQ5ck{W7j~Yxmg!-P@6G7E>O(iR*!LzptCa`U~*Jnr^pAhx~C;Nf1#2ky4 Date: Tue, 8 Oct 2024 10:56:24 +0530 Subject: [PATCH 2/7] [filebeat][GCS] - Fixed failed job handling and removed false-positive error logs (#41142) [filebeat][GCS] fix failed job handling and remove false-positive error logs In cases where GCS report an error, we were logging the error but not otherwise making use of the information. When GCS was reporting that a requested object does note exist this was causing unnecessary log-spam and not updating the set of failed jobs to remove the object, resulting in future futile re-attempts to collect the object. When any other error was reported, the set of failed jobs was not being updated, resulting in continued re-attempts, even past the maximum retry count. This fixes both cases by differentiating the two situations, logging only at debug level when the object is reported to be missing and removing the object from the failed rework set, and logging at error and increasing the failure count for all other error cases. --- CHANGELOG.next.asciidoc | 2 +- x-pack/filebeat/input/gcs/scheduler.go | 12 +++++++++++- x-pack/filebeat/input/gcs/state.go | 8 ++++++++ 3 files changed, 20 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b36bf002ffe..395dd313dec 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -172,7 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix long filepaths in diagnostics exceeding max path limits on Windows. {pull}40909[40909] - Add backup and delete for AWS S3 polling mode feature back. {pull}41071[41071] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] - +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] *Heartbeat* diff --git a/x-pack/filebeat/input/gcs/scheduler.go b/x-pack/filebeat/input/gcs/scheduler.go index 45e7585c14e..ef1bebd083d 100644 --- a/x-pack/filebeat/input/gcs/scheduler.go +++ b/x-pack/filebeat/input/gcs/scheduler.go @@ -6,6 +6,7 @@ package gcs import ( "context" + "errors" "fmt" "slices" "sort" @@ -212,7 +213,16 @@ func (s *scheduler) addFailedJobs(ctx context.Context, jobs []*job) []*job { if !jobMap[name] { obj, err := s.bucket.Object(name).Attrs(ctx) if err != nil { - s.log.Errorf("adding failed job %s to job list caused an error: %v", name, err) + if errors.Is(err, storage.ErrObjectNotExist) { + // if the object is not found in the bucket, then remove it from the failed job list + s.state.deleteFailedJob(name) + s.log.Debugf("scheduler: failed job %s not found in bucket %s", name, s.src.BucketName) + } else { + // if there is an error while validating the object, + // then update the failed job retry count and work towards natural removal + s.state.updateFailedJobs(name) + s.log.Errorf("scheduler: adding failed job %s to job list caused an error: %v", name, err) + } continue } diff --git a/x-pack/filebeat/input/gcs/state.go b/x-pack/filebeat/input/gcs/state.go index 59f79fce471..ea04edcae90 100644 --- a/x-pack/filebeat/input/gcs/state.go +++ b/x-pack/filebeat/input/gcs/state.go @@ -79,6 +79,14 @@ func (s *state) updateFailedJobs(jobName string) { s.mu.Unlock() } +// deleteFailedJob, deletes a failed job from the failedJobs map +// this is used when a job no longer exists in the bucket or gets expired +func (s *state) deleteFailedJob(jobName string) { + s.mu.Lock() + delete(s.cp.FailedJobs, jobName) + s.mu.Unlock() +} + // setCheckpoint, sets checkpoint from source to current state instance // If for some reason the current state is empty, assigns new states as // a fail safe mechanism From 4c1489d67c3f6e0903bcfe6fb05cbdc5ab54d979 Mon Sep 17 00:00:00 2001 From: Vinit Chauhan Date: Tue, 8 Oct 2024 02:58:38 -0400 Subject: [PATCH 3/7] Update Ubuntu 20.04 with 24.04 for Docker base images (#40942) * Replace Ubuntu 20.04 with 22.04 for Docker base images * Added link to pull request in CHANGELOG.next.asciidoc * update to ubuntu:24.04 Co-authored-by: Julien Lind * remove default ubuntu user in 24.04 while generating container. --------- Co-authored-by: Mauri de Souza Meneguzzo Co-authored-by: Julien Lind Co-authored-by: Andrzej Stencel --- CHANGELOG.next.asciidoc | 2 ++ dev-tools/packaging/packages.yml | 4 ++-- dev-tools/packaging/templates/docker/Dockerfile.tmpl | 2 ++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 395dd313dec..22f9c0701bd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -233,6 +233,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - When running under Elastic-Agent Kafka output allows dynamic topic in `topic` field {pull}40415[40415] - The script processor has a new configuration option that only uses the cached javascript sessions and prevents the creation of new javascript sessions. - Update to Go 1.22.7. {pull}41018[41018] +- Replace Ubuntu 20.04 with 24.04 for Docker base images {issue}40743[40743] {pull}40942[40942] + *Auditbeat* diff --git a/dev-tools/packaging/packages.yml b/dev-tools/packaging/packages.yml index 8c22acc9dba..1391368cf0b 100644 --- a/dev-tools/packaging/packages.yml +++ b/dev-tools/packaging/packages.yml @@ -159,7 +159,7 @@ shared: - &docker_spec <<: *binary_spec extra_vars: - from: '--platform=linux/amd64 ubuntu:20.04' + from: '--platform=linux/amd64 ubuntu:24.04' buildFrom: '--platform=linux/amd64 cgr.dev/chainguard/wolfi-base' user: '{{ .BeatName }}' linux_capabilities: '' @@ -172,7 +172,7 @@ shared: - &docker_arm_spec <<: *docker_spec extra_vars: - from: '--platform=linux/arm64 ubuntu:20.04' + from: '--platform=linux/arm64 ubuntu:24.04' buildFrom: '--platform=linux/arm64 cgr.dev/chainguard/wolfi-base' - &docker_ubi_spec diff --git a/dev-tools/packaging/templates/docker/Dockerfile.tmpl b/dev-tools/packaging/templates/docker/Dockerfile.tmpl index f8848640079..85904ffe5dd 100644 --- a/dev-tools/packaging/templates/docker/Dockerfile.tmpl +++ b/dev-tools/packaging/templates/docker/Dockerfile.tmpl @@ -57,6 +57,8 @@ RUN for iter in {1..10}; do \ {{- end }} {{- if contains .from "ubuntu" }} +RUN touch /var/mail/ubuntu && chown ubuntu /var/mail/ubuntu && userdel -r ubuntu + RUN for iter in {1..10}; do \ apt-get update -y && \ DEBIAN_FRONTEND=noninteractive apt-get install --no-install-recommends --yes ca-certificates curl gawk libcap2-bin xz-utils && \ From dc94114a914017843fb8d4eaa76de3b61a6787cd Mon Sep 17 00:00:00 2001 From: VihasMakwana <121151420+VihasMakwana@users.noreply.github.com> Date: Tue, 8 Oct 2024 13:00:10 +0530 Subject: [PATCH 4/7] [system/process]: mark module as healthy if metrics are partially filled for single process as well. (#40924) * fix: mark module as healthy if metrics are partially filled for single processes * chore: fix lint * chore: wrap the error and return * chore: fix lint --------- Co-authored-by: subham sarkar --- metricbeat/module/system/process/process.go | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/metricbeat/module/system/process/process.go b/metricbeat/module/system/process/process.go index 01c8480656d..f84c0b6027a 100644 --- a/metricbeat/module/system/process/process.go +++ b/metricbeat/module/system/process/process.go @@ -57,7 +57,10 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { return nil, err } - sys := base.Module().(resolve.Resolver) + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("resolver cannot be cast from the module") + } enableCgroups := false if runtime.GOOS == "linux" { if config.Cgroups == nil || *config.Cgroups { @@ -131,14 +134,17 @@ func (m *MetricSet) Fetch(r mb.ReporterV2) error { return err } else { proc, root, err := m.stats.GetOneRootEvent(m.setpid) - if err != nil { + if err != nil && !errors.Is(err, process.NonFatalErr{}) { + // return only if the error is fatal in nature return fmt.Errorf("error fetching pid %d: %w", m.setpid, err) + } else if (err != nil && errors.Is(err, process.NonFatalErr{})) { + err = mb.PartialMetricsError{Err: err} } + // if error is non-fatal, emit partial metrics. r.Event(mb.Event{ MetricSetFields: proc, RootFields: root, }) + return err } - - return nil } From bc242aeb6c023d0e8c259c97a88b68d7f61daac3 Mon Sep 17 00:00:00 2001 From: ShourieG <105607378+ShourieG@users.noreply.github.com> Date: Tue, 8 Oct 2024 18:55:43 +0530 Subject: [PATCH 5/7] [filebeat][GCS] - Improved documentation (#41143) --- CHANGELOG.next.asciidoc | 1 + .../filebeat/docs/inputs/input-gcs.asciidoc | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 22f9c0701bd..140b5c061cd 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -319,6 +319,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support to CEL for reading host environment variables. {issue}40762[40762] {pull}40779[40779] - Add CSV decoder to awss3 input. {pull}40896[40896] - Change request trace logging to include headers instead of complete request. {pull}41072[41072] +- Improved GCS input documentation. {pull}41143[41143] - Add CSV decoding capacity to azureblobstorage input {pull}40978[40978] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index b73e6ca5232..ef2db8c1f05 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -213,17 +213,16 @@ This is a specific subfield of a bucket. It specifies the bucket name. This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file). It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. -If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. -The bucket level values will always take priority and override the root level values if both are specified. +If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. [id="attrib-max_workers-gcs"] [float] ==== `max_workers` -This attribute defines the maximum number of workers (go routines / lightweight threads) are allocated in the worker pool (thread pool) for processing jobs -which read contents of file. More number of workers equals a greater amount of concurrency achieved. There is an upper cap of `5000` workers per bucket that -can be defined due to internal sdk constraints. This attribute can be specified both at the root level of the configuration as well at the bucket level. -The bucket level values will always take priority and override the root level values if both are specified. +This attribute defines the maximum number of workers (goroutines / lightweight threads) are allocated in the worker pool (thread pool) for processing jobs which read the contents of files. This attribute can be specified both at the root level of the configuration and at the bucket level. Bucket level values override the root level values if both are specified. Larger number of workers do not necessarily improve of throughput, and this should be carefully tuned based on the number of files, the size of the files being processed and resources available. Increasing `max_workers` to very high values may cause resource utilization problems and can lead to a bottleneck in processing. Usually a maximum cap of `2000` workers is recommended. A very low `max_worker` count will drastically increase the number of network calls required to fetch the objects, which can cause a bottleneck in processing. + +NOTE: The value of `max_workers` is tied to the `batch_size` currently to ensure even distribution of workloads across all goroutines. This ensures that the input is able to process the files in an efficient manner. This `batch_size` determines how many objects will be fetched in one single call. The `max_workers` value should be set based on the number of files to be read, the resources available and the network speed. For example,`max_workers=3` would mean that every pagination request a total number of `3` gcs objects are fetched and distributed among `3 goroutines`, `max_workers=100` would mean `100` gcs objects are fetched in every pagination request and distributed among `100 goroutines`. + [id="attrib-poll-gcs"] [float] @@ -241,7 +240,9 @@ This attribute defines the maximum amount of time after which the internal sched defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `300 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority -and override the root level values if both are specified. +and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization. + +NOTE: Some edge case scenarios could require different values for `poll_interval` and `bucket_timeout`. For example, if the files are very large and the network speed is slow, then the `bucket_timeout` value should be set to a higher value than the `poll_interval`. This would ensure that polling operation does not wait too long for the files to be read and moves to the next iteration while the current one is still being processed. This would ensure a higher throughput and better resource utilization. [id="attrib-parse_json"] [float] @@ -276,6 +277,8 @@ filebeat.inputs: - regex: '/Security-Logs/' ---- +The `file_selectors` operation is performed within the agent locally, hence using this option will cause the agent to download all the files and then filter them. This can cause a bottleneck in processing if the number of files is very high. It is recommended to use this attribute only when the number of files is limited or ample resources are available. + [id="attrib-expand_event_list_from_field-gcs"] [float] ==== `expand_event_list_from_field` @@ -341,6 +344,8 @@ filebeat.inputs: timestamp_epoch: 1630444800 ---- +The GCS APIs don't provide a direct way to filter files based on the timestamp, so the input will download all the files and then filter them based on the timestamp. This can cause a bottleneck in processing if the number of files are very high. It is recommended to use this attribute only when the number of files are limited or ample resources are available. This option scales vertically and not horizontally. + [id="bucket-overrides"] *The sample configs below will explain the bucket level overriding of attributes a bit further :-* From 36327a4b2841f20e4685c308469df356f04415a6 Mon Sep 17 00:00:00 2001 From: kaiyan-sheng Date: Tue, 8 Oct 2024 08:43:23 -0600 Subject: [PATCH 6/7] [AWS] Use namespace for GetListMetrics when exists (#41022) * Use namespace for GetListMetrics when exists --- CHANGELOG.next.asciidoc | 1 + metricbeat/docs/modules/aws.asciidoc | 3 ++- .../metricbeat/module/aws/_meta/docs.asciidoc | 3 ++- .../module/aws/cloudwatch/cloudwatch.go | 20 +++++++++++++++---- 4 files changed, 21 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 140b5c061cd..7b0ced75159 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -202,6 +202,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Remove excessive info-level logs in cgroups setup {pull}40491[40491] - Add missing ECS Cloud fields in GCP `metrics` metricset when using `exclude_labels: true` {issue}40437[40437] {pull}40467[40467] - Add AWS OwningAccount support for cross account monitoring {issue}40570[40570] {pull}40691[40691] +- Use namespace for GetListMetrics when exists in AWS {pull}41022[41022] - Fix http server helper SSL config. {pull}39405[39405] *Osquerybeat* diff --git a/metricbeat/docs/modules/aws.asciidoc b/metricbeat/docs/modules/aws.asciidoc index 20b6854a834..0ee7f601052 100644 --- a/metricbeat/docs/modules/aws.asciidoc +++ b/metricbeat/docs/modules/aws.asciidoc @@ -329,7 +329,8 @@ GetMetricData max page size: 100, based on https://docs.aws.amazon.com/AmazonClo | IAM ListAccountAliases | 1 | Once on startup | STS GetCallerIdentity | 1 | Once on startup | EC2 DescribeRegions| 1 | Once on startup -| CloudWatch ListMetrics | Total number of results / ListMetrics max page size | Per region per collection period +| CloudWatch ListMetrics without specifying namespace in configuration | Total number of results / ListMetrics max page size | Per region per collection period +| CloudWatch ListMetrics with specific namespaces in configuration | Total number of results / ListMetrics max page size * number of unique namespaces | Per region per collection period | CloudWatch GetMetricData | Total number of results / GetMetricData max page size | Per region per namespace per collection period |=== `billing`, `ebs`, `elb`, `sns`, `usage` and `lambda` are the same as `cloudwatch` metricset. diff --git a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc index 223f90e0e24..0b224bf5630 100644 --- a/x-pack/metricbeat/module/aws/_meta/docs.asciidoc +++ b/x-pack/metricbeat/module/aws/_meta/docs.asciidoc @@ -317,7 +317,8 @@ GetMetricData max page size: 100, based on https://docs.aws.amazon.com/AmazonClo | IAM ListAccountAliases | 1 | Once on startup | STS GetCallerIdentity | 1 | Once on startup | EC2 DescribeRegions| 1 | Once on startup -| CloudWatch ListMetrics | Total number of results / ListMetrics max page size | Per region per collection period +| CloudWatch ListMetrics without specifying namespace in configuration | Total number of results / ListMetrics max page size | Per region per collection period +| CloudWatch ListMetrics with specific namespaces in configuration | Total number of results / ListMetrics max page size * number of unique namespaces | Per region per collection period | CloudWatch GetMetricData | Total number of results / GetMetricData max page size | Per region per namespace per collection period |=== `billing`, `ebs`, `elb`, `sns`, `usage` and `lambda` are the same as `cloudwatch` metricset. diff --git a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go index dde2463ea85..ed043e8c38f 100644 --- a/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go +++ b/x-pack/metricbeat/module/aws/cloudwatch/cloudwatch.go @@ -179,10 +179,22 @@ func (m *MetricSet) Fetch(report mb.ReporterV2) error { continue } - // retrieve all the details for all the metrics available in the current region - listMetricsOutput, err := aws.GetListMetricsOutput("*", regionName, m.Period, m.IncludeLinkedAccounts, m.OwningAccount, m.MonitoringAccountID, svcCloudwatch) - if err != nil { - m.Logger().Errorf("Error while retrieving the list of metrics for region %s: %w", regionName, err) + // retrieve all the details for all the metrics available in the current region when no namespace is specified + // otherwise only retrieve metrics from the specific namespaces from the config + var listMetricsOutput []aws.MetricWithID + if len(namespaceDetailTotal) == 0 { + listMetricsOutput, err = aws.GetListMetricsOutput("*", regionName, m.Period, m.IncludeLinkedAccounts, m.OwningAccount, m.MonitoringAccountID, svcCloudwatch) + if err != nil { + m.Logger().Errorf("Error while retrieving the list of metrics for region %s and namespace %s: %w", regionName, "*", err) + } + } else { + for namespace := range namespaceDetailTotal { + listMetricsOutputPerNamespace, err := aws.GetListMetricsOutput(namespace, regionName, m.Period, m.IncludeLinkedAccounts, m.OwningAccount, m.MonitoringAccountID, svcCloudwatch) + if err != nil { + m.Logger().Errorf("Error while retrieving the list of metrics for region %s and namespace %s: %w", regionName, namespace, err) + } + listMetricsOutput = append(listMetricsOutput, listMetricsOutputPerNamespace...) + } } if len(listMetricsOutput) == 0 { From 6339a4292da1bfcf5a33c8593b41556efbbdad86 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 8 Oct 2024 13:54:53 -0300 Subject: [PATCH 7/7] chore: update ksm to v2.12.0 (#41051) We have test files for ksm v2.12.0 but the module manifest still requires v2.7.0. --- metricbeat/module/kubernetes/kubernetes.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metricbeat/module/kubernetes/kubernetes.yml b/metricbeat/module/kubernetes/kubernetes.yml index 1ce2c581bc9..7ce36d136df 100644 --- a/metricbeat/module/kubernetes/kubernetes.yml +++ b/metricbeat/module/kubernetes/kubernetes.yml @@ -43,7 +43,7 @@ spec: spec: containers: - name: kube-state-metrics - image: registry.k8s.io/kube-state-metrics/kube-state-metrics:v2.7.0 + image: registry.k8s.io/kube-state-metrics/kube-state-metrics:v2.12.0 livenessProbe: httpGet: path: /healthz