diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f96d59f2093..7814c36b478 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -347,7 +347,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Journald in the System module. {pull}41555[41555] - Add ability to remove request trace logs from http_endpoint input. {pull}40005[40005] - Add ability to remove request trace logs from entityanalytics input. {pull}40004[40004] +- Refactor & cleanup with updates to default values and documentation. {pull}41834[41834] - Update CEL mito extensions to v1.16.0. {pull}41727[41727] +- Add evaluation state dump debugging option to CEL input. {pull}41335[41335] *Auditbeat* diff --git a/metricbeat/module/system/raid/blockinfo/getdev.go b/metricbeat/module/system/raid/blockinfo/getdev.go index 02527c80636..7c92b0fc881 100644 --- a/metricbeat/module/system/raid/blockinfo/getdev.go +++ b/metricbeat/module/system/raid/blockinfo/getdev.go @@ -19,14 +19,15 @@ package blockinfo import ( "fmt" - "io/ioutil" "os" "path/filepath" + + "github.com/elastic/beats/v7/metricbeat/mb" ) // ListAll lists all the multi-disk devices in a RAID array func ListAll(path string) ([]MDDevice, error) { - dir, err := ioutil.ReadDir(path) + dir, err := os.ReadDir(path) if err != nil { return nil, fmt.Errorf("could not read directory: %w", err) } @@ -44,7 +45,7 @@ func ListAll(path string) ([]MDDevice, error) { } if len(mds) == 0 { - return nil, fmt.Errorf("no matches from path %s", path) + return nil, mb.PartialMetricsError{Err: fmt.Errorf("no RAID devices found. You have probably enabled the RAID metrics on a non-RAID system.")} } return mds, nil @@ -69,8 +70,5 @@ func getMDDevice(path string) (MDDevice, error) { // Right now, we're doing this by looking for an `md` directory in the device dir. func isMD(path string) bool { _, err := os.Stat(filepath.Join(path, "md")) - if err != nil { - return false - } - return true + return err == nil } diff --git a/metricbeat/module/system/raid/raid.go b/metricbeat/module/system/raid/raid.go index 191027657d7..7b07e36c4d2 100644 --- a/metricbeat/module/system/raid/raid.go +++ b/metricbeat/module/system/raid/raid.go @@ -41,8 +41,11 @@ type MetricSet struct { // New creates a new instance of the raid metricset. func New(base mb.BaseMetricSet) (mb.MetricSet, error) { + sys, ok := base.Module().(resolve.Resolver) + if !ok { + return nil, fmt.Errorf("unexpected module type: %T", base.Module()) + } - sys := base.Module().(resolve.Resolver) return &MetricSet{ BaseMetricSet: base, @@ -62,7 +65,7 @@ func blockto1024(b int64) int64 { func (m *MetricSet) Fetch(r mb.ReporterV2) error { devices, err := blockinfo.ListAll(m.mod.ResolveHostFS("/sys/block")) if err != nil { - return fmt.Errorf("failed to parse sysfs: %w", err) + return fmt.Errorf("failed to list RAID devices: %w", err) } for _, blockDev := range devices { diff --git a/metricbeat/module/system/raid/raid_test.go b/metricbeat/module/system/raid/raid_test.go index 4c35394413a..28b3358f685 100644 --- a/metricbeat/module/system/raid/raid_test.go +++ b/metricbeat/module/system/raid/raid_test.go @@ -18,10 +18,14 @@ package raid import ( + "errors" + "os" + "path/filepath" "testing" "github.com/stretchr/testify/assert" + "github.com/elastic/beats/v7/metricbeat/mb" mbtest "github.com/elastic/beats/v7/metricbeat/mb/testing" _ "github.com/elastic/beats/v7/metricbeat/module/system" ) @@ -46,6 +50,22 @@ func TestFetch(t *testing.T) { events[0].BeatEvent("system", "raid").Fields.StringToPrint()) } +func TestFetchNoRAID(t *testing.T) { + // Ensure that we return partial metrics when no RAID devices are present. + tmpDir := t.TempDir() + assert.NoError(t, os.MkdirAll(filepath.Join(tmpDir, "sys/block"), 0755)) + c := getConfig() + c["hostfs"] = tmpDir + + f := mbtest.NewReportingMetricSetV2Error(t, c) + events, errs := mbtest.ReportingFetchV2Error(f) + + assert.Len(t, errs, 1) + assert.ErrorAs(t, errors.Join(errs...), &mb.PartialMetricsError{}) + assert.Contains(t, errors.Join(errs...).Error(), "failed to list RAID devices: no RAID devices found. You have probably enabled the RAID metrics on a non-RAID system.") + assert.Empty(t, events) +} + func getConfig() map[string]interface{} { return map[string]interface{}{ "module": "system", diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index a96e8df5f3d..8e062025b24 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -795,6 +795,26 @@ This specifies fields in the `state` to be redacted prior to debug logging. Fiel This specifies whether fields should be replaced with a `*` or deleted entirely from messages sent to debug logs. If delete is `true`, fields will be deleted rather than replaced. +[float] +==== `failure_dump.enabled` + +It is possible to log CEL program evaluation failures to a local file-system for debugging configurations. +This option is enabled by setting `failure_dump.enabled` to true and setting the `failure_dump.filename` value. +To delete existing failure dumps, set `failure_dump.enabled` to false without unsetting the filename option. + +Enabling this option compromises security and should only be used for debugging. + +[float] +==== `failure_dump.filename` + +This specifies a directory path to write failure dumps to. If it is not empty and a CEL program evaluation fails, +the complete set of states for the CEL program's evaluation will be written as a JSON file, along with the error +that was reported. This option should only be used when debugging a failure as it imposes a significant performance +impact on the input and may potentially use large quantities of memory to hold the full set of states. If a failure +dump is configured, it is recommended that data input sizes be reduced to avoid excessive memory consumption, and +making dumps that are intractable to analysis. To delete existing failure dumps, set `failure_dump.enabled` to +false without unsetting the filename option. + [float] === Metrics diff --git a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc index 23ac0e021c6..2a762ddec18 100644 --- a/x-pack/filebeat/docs/inputs/input-gcs.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-gcs.asciidoc @@ -10,9 +10,7 @@ ++++ Use the `google cloud storage input` to read content from files stored in buckets which reside on your Google Cloud. -The input can be configured to work with and without polling, though currently, if polling is disabled it will only -perform a one time passthrough, list the file contents and end the process. Polling is generally recommented for most cases -even though it can get expensive with dealing with a very large number of files. +The input can be configured to work with and without polling, though if polling is disabled it will only perform a single collection of data, list the file contents and end the process. *To mitigate errors and ensure a stable processing environment, this input employs the following features :* @@ -66,12 +64,11 @@ many buckets as we deem fit. We are also able to configure the attributes `max_w then be applied to all buckets which do not specify any of these attributes explicitly. NOTE: If the attributes `max_workers`, `poll`, `poll_interval` and `bucket_timeout` are specified at the root level, these can still be overridden at the bucket level with -different values, thus offering extensive flexibility and customization. Examples <> show this behaviour. +different values, thus offering extensive flexibility and customization. Examples <> show this behavior. On receiving this config the google cloud storage input will connect to the service and retrieve a `Storage Client` using the given `bucket_name` and `auth.credentials_file`, then it will spawn two main go-routines, one for each bucket. After this each of these routines (threads) will initialize a scheduler -which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, -one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). +which will in turn use the `max_workers` value to initialize an in-memory worker pool (thread pool) with `3` `workers` available. Basically that equates to two instances of a worker pool, one per bucket, each having 3 workers. These `workers` will be responsible for performing `jobs` that process a file (in this case read and output the contents of a file). NOTE: The scheduler is responsible for scheduling jobs, and uses the `maximum available workers` in the pool, at each iteration, to decide the number of files to retrieve and process. This keeps work distribution efficient. The scheduler uses `poll_interval` attribute value to decide how long to wait after each iteration. The `bucket_timeout` value is used to timeout calls to the bucket list api if it exceeds the given value. Each iteration consists of processing a certain number of files, decided by the `maximum available workers` value. @@ -213,7 +210,7 @@ This is a specific subfield of a bucket. It specifies the bucket name. This attribute defines the maximum amount of time after which a bucket operation will give and stop if no response is recieved (example: reading a file / listing a file). It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. -If no value is specified for this, by default its initialized to `50 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. +If no value is specified for this, by default its initialized to `120 seconds`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The value of `bucket_timeout` that should be used depends on the size of the files and the network speed. If the timeout is too low, the input will not be able to read the file completely and `context_deadline_exceeded` errors will be seen in the logs. If the timeout is too high, the input will wait for a long time for the file to be read, which can cause the input to be slow. The ratio between the `bucket_timeout` and `poll_interval` should be considered while setting both the values. A low `poll_interval` and a very high `bucket_timeout` can cause resource utilization issues as schedule ops will be spawned every poll iteration. If previous poll ops are still running, this could result in concurrently running ops and so could cause a bottleneck over time. [id="attrib-max_workers-gcs"] [float] @@ -228,9 +225,8 @@ NOTE: The value of `max_workers` is tied to the `batch_size` currently to ensure [float] ==== `poll` -This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is `false`, so it will not keep polling if not explicitly -specified. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always -take priority and override the root level values if both are specified. +This attribute informs the scheduler whether to keep polling for new files or not. Default value of this is set to `true`. This attribute can be specified both at the +root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. [id="attrib-poll_interval-gcs"] [float] @@ -238,7 +234,7 @@ take priority and override the root level values if both are specified. This attribute defines the maximum amount of time after which the internal scheduler will make the polling call for the next set of objects/files. It can be defined in the following formats : `{{x}}s`, `{{x}}m`, `{{x}}h`, here `s = seconds`, `m = minutes` and `h = hours`. The value `{{x}}` can be anything we wish. -Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `300 seconds`. +Example : `10s` would mean we would like the polling to occur every 10 seconds. If no value is specified for this, by default its initialized to `5 minutes`. This attribute can be specified both at the root level of the configuration as well at the bucket level. The bucket level values will always take priority and override the root level values if both are specified. The `poll_interval` should be set to a value that is equal to the `bucket_timeout` value. This would ensure that another schedule operation is not started before the current buckets have all been processed. If the `poll_interval` is set to a value that is less than the `bucket_timeout`, then the input will start another schedule operation before the current one has finished, which can cause a bottleneck over time. Having a lower `poll_interval` can make the input faster at the cost of more resource utilization. diff --git a/x-pack/filebeat/input/cel/config.go b/x-pack/filebeat/input/cel/config.go index b04b7845719..aee095b199b 100644 --- a/x-pack/filebeat/input/cel/config.go +++ b/x-pack/filebeat/input/cel/config.go @@ -58,6 +58,9 @@ type config struct { // Resource is the configuration for establishing an // HTTP request or for locating a local resource. Resource *ResourceConfig `config:"resource" validate:"required"` + + // FailureDump configures failure dump behaviour. + FailureDump *dumpConfig `config:"failure_dump"` } type redact struct { @@ -69,6 +72,19 @@ type redact struct { Delete bool `config:"delete"` } +// dumpConfig configures the CEL program to retain +// the full evaluation state using the cel.OptTrackState +// option. The state is written to a file in the path if +// the evaluation fails. +type dumpConfig struct { + Enabled *bool `config:"enabled"` + Filename string `config:"filename"` +} + +func (t *dumpConfig) enabled() bool { + return t != nil && (t.Enabled == nil || *t.Enabled) +} + func (c config) Validate() error { if c.Redact == nil { logp.L().Named("input.cel").Warn("missing recommended 'redact' configuration: " + @@ -89,7 +105,8 @@ func (c config) Validate() error { if len(c.Regexps) != 0 { patterns = map[string]*regexp.Regexp{".": nil} } - _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil) + wantDump := c.FailureDump.enabled() && c.FailureDump.Filename != "" + _, _, err = newProgram(context.Background(), c.Program, root, nil, &http.Client{}, nil, nil, patterns, c.XSDs, logp.L().Named("input.cel"), nil, wantDump) if err != nil { return fmt.Errorf("failed to check program: %w", err) } diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index ff4f1dccf51..97ab8c9bee0 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -10,6 +10,7 @@ package cel import ( "compress/gzip" "context" + "encoding/json" "errors" "fmt" "io" @@ -166,7 +167,8 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p Password: cfg.Auth.Basic.Password, } } - prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace) + wantDump := cfg.FailureDump.enabled() && cfg.FailureDump.Filename != "" + prg, ast, err := newProgram(ctx, cfg.Program, root, getEnv(cfg.AllowedEnvironment), client, limiter, auth, patterns, cfg.XSDs, log, trace, wantDump) if err != nil { return err } @@ -251,12 +253,25 @@ func (i input) run(env v2.Context, src *source, cursor map[string]interface{}, p log.Debugw("request state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact}) metrics.executions.Add(1) start := i.now().In(time.UTC) - state, err = evalWith(ctx, prg, ast, state, start) + state, err = evalWith(ctx, prg, ast, state, start, wantDump) log.Debugw("response state", logp.Namespace("cel"), "state", redactor{state: state, cfg: cfg.Redact}) if err != nil { + var dump dumpError switch { case errors.Is(err, context.Canceled), errors.Is(err, context.DeadlineExceeded): return err + case errors.As(err, &dump): + path := strings.ReplaceAll(cfg.FailureDump.Filename, "*", sanitizeFileName(env.IDWithoutName)) + dir := filepath.Dir(path) + base := filepath.Base(path) + ext := filepath.Ext(base) + prefix := strings.TrimSuffix(base, ext) + path = filepath.Join(dir, prefix+"-"+i.now().In(time.UTC).Format("2006-01-02T15-04-05.000")+ext) + log.Debugw("writing failure dump file", "path", path) + err := dump.writeToFile(path) + if err != nil { + log.Errorw("failed to write failure dump", "path", path, "error", err) + } } log.Errorw("failed evaluation", "error", err) env.UpdateStatus(status.Degraded, "failed evaluation: "+err.Error()) @@ -785,6 +800,26 @@ func newClient(ctx context.Context, cfg config, log *logp.Logger, reg *monitorin } } } + if !cfg.FailureDump.enabled() && cfg.FailureDump != nil && cfg.FailureDump.Filename != "" { + // We have a fail-dump name, but we are not enabled, + // so remove all dumps we own. + err = os.Remove(cfg.FailureDump.Filename) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", cfg.FailureDump.Filename, "error", err) + } + ext := filepath.Ext(cfg.FailureDump.Filename) + base := strings.TrimSuffix(cfg.FailureDump.Filename, ext) + paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext) + if err != nil { + log.Errorw("failed to collect request trace log path names", "error", err) + } + for _, p := range paths { + err = os.Remove(p) + if err != nil && !errors.Is(err, fs.ErrNotExist) { + log.Errorw("failed to remove request trace log", "path", p, "error", err) + } + } + } if reg != nil { c.Transport = httpmon.NewMetricsRoundTripper(c.Transport, reg) @@ -1004,7 +1039,7 @@ func getEnv(allowed []string) map[string]string { return env } -func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper) (cel.Program, *cel.Ast, error) { +func newProgram(ctx context.Context, src, root string, vars map[string]string, client *http.Client, limiter *rate.Limiter, auth *lib.BasicAuth, patterns map[string]*regexp.Regexp, xsd map[string]string, log *logp.Logger, trace *httplog.LoggingRoundTripper, details bool) (cel.Program, *cel.Ast, error) { xml, err := lib.XML(nil, xsd) if err != nil { return nil, nil, fmt.Errorf("failed to build xml type hints: %w", err) @@ -1043,7 +1078,11 @@ func newProgram(ctx context.Context, src, root string, vars map[string]string, c return nil, nil, fmt.Errorf("failed compilation: %w", iss.Err()) } - prg, err := env.Program(ast) + var progOpts []cel.ProgramOption + if details { + progOpts = []cel.ProgramOption{cel.EvalOptions(cel.OptTrackState)} + } + prg, err := env.Program(ast, progOpts...) if err != nil { return nil, nil, fmt.Errorf("failed program instantiation: %w", err) } @@ -1065,8 +1104,8 @@ func debug(log *logp.Logger, trace *httplog.LoggingRoundTripper) func(string, an } } -func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time) (map[string]interface{}, error) { - out, _, err := prg.ContextEval(ctx, map[string]interface{}{ +func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[string]interface{}, now time.Time, details bool) (map[string]interface{}, error) { + out, det, err := prg.ContextEval(ctx, map[string]interface{}{ // Replace global program "now" with current time. This is necessary // as the lib.Time now global is static at program instantiation time // which will persist over multiple evaluations. The lib.Time behaviour @@ -1081,6 +1120,9 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri }) if err != nil { err = lib.DecoratedError{AST: ast, Err: err} + if details { + err = dumpError{error: err, dump: lib.NewDump(ast, det)} + } } if e := ctx.Err(); e != nil { err = e @@ -1109,6 +1151,36 @@ func evalWith(ctx context.Context, prg cel.Program, ast *cel.Ast, state map[stri } } +// dumpError is an evaluation state dump associated with an error. +type dumpError struct { + error + dump *lib.Dump +} + +func (e dumpError) writeToFile(path string) (err error) { + err = os.MkdirAll(filepath.Dir(path), 0o700) + if err != nil { + return err + } + f, err := os.Create(path) + if err != nil { + return err + } + defer func() { + err = errors.Join(err, f.Sync(), f.Close()) + }() + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + type dump struct { + Error string `json:"error"` + State []lib.NodeValue `json:"state"` + } + return enc.Encode(dump{ + Error: e.Error(), + State: e.dump.NodeValues(), + }) +} + // clearWantMore sets the state to not request additional work in a periodic evaluation. // It leaves state intact if there is no "want_more" element, and sets the element to false // if there is. This is necessary instead of just doing delete(state, "want_more") as diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 0d91710ca09..143402d9834 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -45,6 +45,7 @@ var inputTests = []struct { want []map[string]interface{} wantCursor []map[string]interface{} wantErr error + prepare func() error wantFile string wantNoFile string }{ @@ -1685,6 +1686,88 @@ var inputTests = []struct { }, }}, }, + { + name: "dump_no_error", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": try(debug("divide by zero", 0/0))}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": true, + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 8, 0, 0, 0, 0, time.UTC) }, + wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-08T00-00-00.000.json"), + want: []map[string]interface{}{{ + "message": map[string]interface{}{ + "value": "division by zero", + }, + }}, + }, + { + name: "dump_error", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": true, + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) }, + wantFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case. + want: []map[string]interface{}{ + { + "error": map[string]interface{}{ + "message": `failed eval: ERROR: :1:58: division by zero + | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]} + | .........................................................^`, + }, + }, + }, + }, + { + name: "dump_error_delete", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":{"value": debug("divide by zero", 0/0)}}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + "failure_dump": map[string]interface{}{ + "enabled": false, // We have a name but are disabled, so delete. + "filename": "failure_dumps/dump.json", + }, + }, + time: func() time.Time { return time.Date(2010, 2, 9, 0, 0, 0, 0, time.UTC) }, + prepare: func() error { + // Make a file that the configuration should delete. + err := os.MkdirAll("failure_dumps", 0o700) + if err != nil { + return err + } + return os.WriteFile(filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), nil, 0o600) + }, + wantNoFile: filepath.Join("failure_dumps", "dump-2010-02-09T00-00-00.000.json"), // One day after the no dump case. + want: []map[string]interface{}{ + { + "error": map[string]interface{}{ + "message": `failed eval: ERROR: :1:58: division by zero + | {"events":[{"message":{"value": debug("divide by zero", 0/0)}}]} + | .........................................................^`, + }, + }, + }, + }, // not yet done from httpjson (some are redundant since they are compositional products). // @@ -1708,6 +1791,11 @@ func TestInput(t *testing.T) { os.Setenv("CELTESTENVVAR", "TESTVALUE") os.Setenv("DISALLOWEDCELTESTENVVAR", "DISALLOWEDTESTVALUE") + err := os.RemoveAll("failure_dumps") + if err != nil { + t.Fatalf("failed to remove failure_dumps directory: %v", err) + } + logp.TestingSetup() for _, test := range inputTests { t.Run(test.name, func(t *testing.T) { @@ -1718,6 +1806,13 @@ func TestInput(t *testing.T) { t.Skip("skipping remote endpoint test") } + if test.prepare != nil { + err := test.prepare() + if err != nil { + t.Fatalf("unexpected from prepare(): %v", err) + } + } + if test.server != nil { test.server(t, test.handler, test.config) } @@ -1770,6 +1865,20 @@ func TestInput(t *testing.T) { if fmt.Sprint(err) != fmt.Sprint(test.wantErr) { t.Errorf("unexpected error from running input: got:%v want:%v", err, test.wantErr) } + if test.wantFile != "" { + if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil { + t.Errorf("expected log file not found: %v", err) + } + } + if test.wantNoFile != "" { + paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile)) + if err != nil { + t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err) + } + if len(paths) != 0 { + t.Errorf("unexpected files found: %v", paths) + } + } if test.wantErr != nil { return } @@ -1802,20 +1911,6 @@ func TestInput(t *testing.T) { t.Errorf("unexpected cursor for event %d: got:- want:+\n%s", i, cmp.Diff(got, test.wantCursor[i])) } } - if test.wantFile != "" { - if _, err := os.Stat(filepath.Join(tempDir, test.wantFile)); err != nil { - t.Errorf("expected log file not found: %v", err) - } - } - if test.wantNoFile != "" { - paths, err := filepath.Glob(filepath.Join(tempDir, test.wantNoFile)) - if err != nil { - t.Fatalf("unexpected error calling filepath.Glob(%q): %v", test.wantNoFile, err) - } - if len(paths) != 0 { - t.Errorf("unexpected files found: %v", paths) - } - } }) } } diff --git a/x-pack/filebeat/input/gcs/client.go b/x-pack/filebeat/input/gcs/client.go index 7fd45d2d0a9..1846e08c5ab 100644 --- a/x-pack/filebeat/input/gcs/client.go +++ b/x-pack/filebeat/input/gcs/client.go @@ -12,11 +12,9 @@ import ( "cloud.google.com/go/storage" "golang.org/x/oauth2/google" "google.golang.org/api/option" - - "github.com/elastic/elastic-agent-libs/logp" ) -func fetchStorageClient(ctx context.Context, cfg config, log *logp.Logger) (*storage.Client, error) { +func fetchStorageClient(ctx context.Context, cfg config) (*storage.Client, error) { if cfg.AlternativeHost != "" { var h *url.URL h, err := url.Parse(cfg.AlternativeHost) diff --git a/x-pack/filebeat/input/gcs/config.go b/x-pack/filebeat/input/gcs/config.go index 6a7b93d5e47..64f64c69bc5 100644 --- a/x-pack/filebeat/input/gcs/config.go +++ b/x-pack/filebeat/input/gcs/config.go @@ -28,16 +28,16 @@ type config struct { // Auth - Defines the authentication mechanism to be used for accessing the gcs bucket. Auth authConfig `config:"auth"` // MaxWorkers - Defines the maximum number of go routines that will be spawned. - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` + MaxWorkers int `config:"max_workers" validate:"max=5000"` // Poll - Defines if polling should be performed on the input bucket source. - Poll *bool `config:"poll,omitempty"` + Poll bool `config:"poll"` // PollInterval - Defines the maximum amount of time to wait before polling for the next batch of objects from the bucket. - PollInterval *time.Duration `config:"poll_interval,omitempty"` + PollInterval time.Duration `config:"poll_interval"` // ParseJSON - Informs the publisher whether to parse & objectify json data or not. By default this is set to // false, since it can get expensive dealing with highly nested json data. - ParseJSON *bool `config:"parse_json,omitempty"` + ParseJSON bool `config:"parse_json"` // BucketTimeOut - Defines the maximum time that the sdk will wait for a bucket api response before timing out. - BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` + BucketTimeOut time.Duration `config:"bucket_timeout"` // Buckets - Defines a list of buckets that will be polled for objects. Buckets []bucket `config:"buckets" validate:"required"` // FileSelectors - Defines a list of regex patterns that can be used to filter out objects from the bucket. @@ -49,17 +49,17 @@ type config struct { // ExpandEventListFromField - Defines the field name that will be used to expand the event into separate events. ExpandEventListFromField string `config:"expand_event_list_from_field"` // This field is only used for system test purposes, to override the HTTP endpoint. - AlternativeHost string `config:"alternative_host,omitempty"` + AlternativeHost string `config:"alternative_host"` } // bucket contains the config for each specific object storage bucket in the root account type bucket struct { Name string `config:"name" validate:"required"` - MaxWorkers *int `config:"max_workers,omitempty" validate:"max=5000"` - BucketTimeOut *time.Duration `config:"bucket_timeout,omitempty"` - Poll *bool `config:"poll,omitempty"` - PollInterval *time.Duration `config:"poll_interval,omitempty"` - ParseJSON *bool `config:"parse_json,omitempty"` + MaxWorkers *int `config:"max_workers" validate:"max=5000"` + BucketTimeOut *time.Duration `config:"bucket_timeout"` + Poll *bool `config:"poll"` + PollInterval *time.Duration `config:"poll_interval"` + ParseJSON *bool `config:"parse_json"` FileSelectors []fileSelectorConfig `config:"file_selectors"` ReaderConfig readerConfig `config:",inline"` TimeStampEpoch *int64 `config:"timestamp_epoch"` @@ -78,13 +78,15 @@ type readerConfig struct { Decoding decoderConfig `config:"decoding"` } +// authConfig defines the authentication mechanism to be used for accessing the gcs bucket. +// If either is configured the 'omitempty' tag will prevent the other option from being serialized in the config. type authConfig struct { CredentialsJSON *jsonCredentialsConfig `config:"credentials_json,omitempty"` CredentialsFile *fileCredentialsConfig `config:"credentials_file,omitempty"` } type fileCredentialsConfig struct { - Path string `config:"path,omitempty"` + Path string `config:"path"` } type jsonCredentialsConfig struct { AccountKey string `config:"account_key"` @@ -115,3 +117,14 @@ func (c authConfig) Validate() error { return fmt.Errorf("no authentication credentials were configured or detected " + "(credentials_file, credentials_json, and application default credentials (ADC))") } + +// defaultConfig returns the default configuration for the input +func defaultConfig() config { + return config{ + MaxWorkers: 1, + Poll: true, + PollInterval: 5 * time.Minute, + BucketTimeOut: 120 * time.Second, + ParseJSON: false, + } +} diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index cc0e9ad74bb..33e46d034d7 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -50,7 +50,7 @@ func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { } func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { - config := config{} + config := defaultConfig() if err := cfg.Unpack(&config); err != nil { return nil, nil, err } @@ -78,44 +78,22 @@ func configure(cfg *conf.C) ([]cursor.Source, cursor.Input, error) { return sources, &gcsInput{config: config}, nil } -// tryOverrideOrDefault, overrides global values with local -// bucket level values if present. If both global & local values -// are absent, assigns default values +// tryOverrideOrDefault, overrides the bucket level values with global values if the bucket fields are not set func tryOverrideOrDefault(cfg config, b bucket) bucket { if b.MaxWorkers == nil { - maxWorkers := 1 - if cfg.MaxWorkers != nil { - maxWorkers = *cfg.MaxWorkers - } - b.MaxWorkers = &maxWorkers + b.MaxWorkers = &cfg.MaxWorkers } if b.Poll == nil { - var poll bool - if cfg.Poll != nil { - poll = *cfg.Poll - } - b.Poll = &poll + b.Poll = &cfg.Poll } if b.PollInterval == nil { - interval := time.Second * 300 - if cfg.PollInterval != nil { - interval = *cfg.PollInterval - } - b.PollInterval = &interval + b.PollInterval = &cfg.PollInterval } if b.ParseJSON == nil { - parse := false - if cfg.ParseJSON != nil { - parse = *cfg.ParseJSON - } - b.ParseJSON = &parse + b.ParseJSON = &cfg.ParseJSON } if b.BucketTimeOut == nil { - timeOut := time.Second * 50 - if cfg.BucketTimeOut != nil { - timeOut = *cfg.BucketTimeOut - } - b.BucketTimeOut = &timeOut + b.BucketTimeOut = &cfg.BucketTimeOut } if b.TimeStampEpoch == nil { b.TimeStampEpoch = cfg.TimeStampEpoch @@ -173,11 +151,12 @@ func (input *gcsInput) Run(inputCtx v2.Context, src cursor.Source, cancel() }() - client, err := fetchStorageClient(ctx, input.config, log) + client, err := fetchStorageClient(ctx, input.config) if err != nil { metrics.errorsTotal.Inc() return err } + bucket := client.Bucket(currentSource.BucketName).Retryer( // Use WithBackoff to change the timing of the exponential backoff. storage.WithBackoff(gax.Backoff{ diff --git a/x-pack/filebeat/input/gcs/input_stateless.go b/x-pack/filebeat/input/gcs/input_stateless.go index f56f7f35bc5..c0038bf31dc 100644 --- a/x-pack/filebeat/input/gcs/input_stateless.go +++ b/x-pack/filebeat/input/gcs/input_stateless.go @@ -88,7 +88,6 @@ func (in *statelessInput) Run(inputCtx v2.Context, publisher stateless.Publisher // Since we are only reading, the operation is always idempotent storage.WithPolicy(storage.RetryAlways), ) - scheduler := newScheduler(pub, bkt, currentSource, &in.config, st, metrics, log) // allows multiple containers to be scheduled concurrently while testing // the stateless input is triggered only while testing and till now it did not mimic diff --git a/x-pack/filebeat/input/gcs/input_test.go b/x-pack/filebeat/input/gcs/input_test.go index 8accb774f38..5595622c93e 100644 --- a/x-pack/filebeat/input/gcs/input_test.go +++ b/x-pack/filebeat/input/gcs/input_test.go @@ -535,7 +535,7 @@ func Test_StorageClient(t *testing.T) { client, _ := storage.NewClient(context.Background(), option.WithEndpoint(serv.URL), option.WithoutAuthentication(), option.WithHTTPClient(&httpclient)) cfg := conf.MustNewConfigFrom(tt.baseConfig) - conf := config{} + conf := defaultConfig() err := cfg.Unpack(&conf) if err != nil { assert.EqualError(t, err, fmt.Sprint(tt.isError)) @@ -558,8 +558,8 @@ func Test_StorageClient(t *testing.T) { }) var timeout *time.Timer - if conf.PollInterval != nil { - timeout = time.NewTimer(1*time.Second + *conf.PollInterval) + if conf.PollInterval != 0 { + timeout = time.NewTimer(1*time.Second + conf.PollInterval) } else { timeout = time.NewTimer(5 * time.Second) }