From dcad79970e5d1bdb32b290a005c3e7ddc058330b Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 22 Nov 2024 17:37:10 +0000 Subject: [PATCH] [8.16](backport #41495) x-pack/filebeat/input/awss3: support for Access Point ARN (#41688) * x-pack/filebeat/input/awss3: support for Access Point ARN (#41495) Backport: Added a new option access_point_arn to the AWS S3 input as an alternative to the bucket ARN to access S3 buckets. --- CHANGELOG.next.asciidoc | 1 + .../filebeat.inputs.reference.xpack.yml.tmpl | 3 + .../docs/inputs/input-aws-s3.asciidoc | 11 +- x-pack/filebeat/filebeat.reference.yml | 3 + x-pack/filebeat/input/awss3/config.go | 37 +++- x-pack/filebeat/input/awss3/config_test.go | 165 +++++++++++++++--- x-pack/filebeat/input/awss3/input.go | 6 +- .../input/awss3/input_integration_test.go | 72 +++++++- x-pack/filebeat/input/awss3/s3.go | 18 ++ 9 files changed, 273 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 1a968f386a0..4dbf0d40ab8 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix request trace filename handling in http_endpoint input. {pull}39410[39410] - Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036] - Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] +- Add support for Access Points in the `aws-s3` input. {pull}41495[41495] *Heartbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 4188035f832..4e966d594c5 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -102,6 +102,9 @@ # Bucket ARN used for polling AWS S3 buckets #bucket_arn: arn:aws:s3:::test-s3-bucket + # Access Point ARN used for polling AWS S3 buckets + #access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint + # Bucket Name used for polling non-AWS S3 buckets #non_aws_bucket_name: test-s3-bucket diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index b4cb069ee7c..41f7847f005 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -348,7 +348,7 @@ configuring multiline options. [float] ==== `queue_url` -URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set). +URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn`, `access_point_arn`, and `non_aws_bucket_name` are not set). [float] ==== `region` @@ -472,7 +472,12 @@ value is `20s`. [float] ==== `bucket_arn` -ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` and `non_aws_bucket_name` are not set). +ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url`, `access_point_arn, and `non_aws_bucket_name` are not set). + +[float] +==== `access_point_arn` + +ARN of the AWS S3 Access Point that will be polled for list operation. (Required when `queue_url`, `bucket_arn`, and `non_aws_bucket_name` are not set). [float] ==== `non_aws_bucket_name` @@ -492,7 +497,7 @@ Prefix to apply for the list request to the S3 bucket. Default empty. [float] ==== `number_of_workers` -Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` is set, otherwise (in the SQS case) defaults to 5. +Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` or `access_point_arn` is set, otherwise (in the SQS case) defaults to 5. [float] diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 5e636901565..9b514968e79 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -2990,6 +2990,9 @@ filebeat.inputs: # Bucket ARN used for polling AWS S3 buckets #bucket_arn: arn:aws:s3:::test-s3-bucket + # Access Point ARN used for polling AWS S3 buckets + #access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint + # Bucket Name used for polling non-AWS S3 buckets #non_aws_bucket_name: test-s3-bucket diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 6f485431ddf..843061ae3c3 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "net/url" + "strings" "time" awssdk "github.com/aws/aws-sdk-go-v2/aws" @@ -33,6 +34,7 @@ type config struct { QueueURL string `config:"queue_url"` RegionName string `config:"region"` BucketARN string `config:"bucket_arn"` + AccessPointARN string `config:"access_point_arn"` NonAWSBucketName string `config:"non_aws_bucket_name"` BucketListInterval time.Duration `config:"bucket_list_interval"` BucketListPrefix string `config:"bucket_list_prefix"` @@ -61,7 +63,7 @@ func defaultConfig() config { } func (c *config) Validate() error { - configs := []bool{c.QueueURL != "", c.BucketARN != "", c.NonAWSBucketName != ""} + configs := []bool{c.QueueURL != "", c.BucketARN != "", c.AccessPointARN != "", c.NonAWSBucketName != ""} enabled := []bool{} for i := range configs { if configs[i] { @@ -69,20 +71,24 @@ func (c *config) Validate() error { } } if len(enabled) == 0 { - return errors.New("neither queue_url, bucket_arn nor non_aws_bucket_name were provided") + return errors.New("neither queue_url, bucket_arn, access_point_arn, nor non_aws_bucket_name were provided") } else if len(enabled) > 1 { - return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, non_aws_bucket_name <%v> "+ - "cannot be set at the same time", c.QueueURL, c.BucketARN, c.NonAWSBucketName) + return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, access_point_arn <%v>, non_aws_bucket_name <%v> "+ + "cannot be set at the same time", c.QueueURL, c.BucketARN, c.AccessPointARN, c.NonAWSBucketName) } - if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 { + if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 { return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval) } - if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 { + if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 { return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers) } + if c.AccessPointARN != "" && !isValidAccessPointARN(c.AccessPointARN) { + return fmt.Errorf("invalid format for access_point_arn <%v>", c.AccessPointARN) + } + if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) { return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+ "less than or equal to 12h", c.VisibilityTimeout) @@ -117,14 +123,15 @@ func (c *config) Validate() error { if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" { return errors.New("backup to non-AWS bucket can only be used for non-AWS sources") } - if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" { + if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" && c.AccessPointARN == "" { return errors.New("backup to AWS bucket can only be used for AWS sources") } if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" { return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together") } if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" { - if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) || + if (c.BackupConfig.BackupToBucketArn != "" && + (c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.BackupToBucketArn == c.AccessPointARN)) || (c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) { if c.BackupConfig.BackupToBucketPrefix == "" { return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same") @@ -233,6 +240,9 @@ func (c config) getBucketName() string { if c.NonAWSBucketName != "" { return c.NonAWSBucketName } + if c.AccessPointARN != "" { + return c.AccessPointARN + } if c.BucketARN != "" { return getBucketNameFromARN(c.BucketARN) } @@ -246,6 +256,9 @@ func (c config) getBucketARN() string { if c.BucketARN != "" { return c.BucketARN } + if c.AccessPointARN != "" { + return c.AccessPointARN + } return "" } @@ -292,3 +305,11 @@ func (c config) getFileSelectors() []fileSelectorConfig { } return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}} } + +// Helper function to detect if an ARN is an Access Point +func isValidAccessPointARN(arn string) bool { + parts := strings.Split(arn, ":") + return len(parts) >= 6 && + strings.HasPrefix(parts[5], "accesspoint/") && + len(strings.TrimPrefix(parts[5], "accesspoint/")) > 0 +} diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index 907a5854b28..d791271ba6e 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -23,8 +23,9 @@ import ( func TestConfig(t *testing.T) { const queueURL = "https://example.com" const s3Bucket = "arn:aws:s3:::aBucket" + const s3AccessPoint = "arn:aws:s3:us-east-2:123456789:accesspoint/test-accesspoint" const nonAWSS3Bucket = "minio-bucket" - makeConfig := func(quequeURL, s3Bucket string, nonAWSS3Bucket string) config { + makeConfig := func(quequeURL, s3Bucket string, s3AccessPoint string, nonAWSS3Bucket string) config { // Have a separate copy of defaults in the test to make it clear when // anyone changes the defaults. parserConf := parser.Config{} @@ -32,6 +33,7 @@ func TestConfig(t *testing.T) { return config{ QueueURL: quequeURL, BucketARN: s3Bucket, + AccessPointARN: s3AccessPoint, NonAWSBucketName: nonAWSS3Bucket, APITimeout: 120 * time.Second, VisibilityTimeout: 300 * time.Second, @@ -54,15 +56,17 @@ func TestConfig(t *testing.T) { name string queueURL string s3Bucket string + s3AccessPoint string nonAWSS3Bucket string config mapstr.M expectedErr string - expectedCfg func(queueURL, s3Bucket, nonAWSS3Bucket string) config + expectedCfg func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config }{ { name: "input with defaults for queueURL", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -74,14 +78,15 @@ func TestConfig(t *testing.T) { name: "input with defaults for s3Bucket", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig("", s3Bucket, "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig("", s3Bucket, "", "") c.NumberOfWorkers = 5 return c }, @@ -90,6 +95,7 @@ func TestConfig(t *testing.T) { name: "input with file_selectors", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -100,8 +106,8 @@ func TestConfig(t *testing.T) { }, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig(queueURL, "", "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "", "") regex := match.MustCompile("/CloudTrail/") c.FileSelectors = []fileSelectorConfig{ { @@ -116,6 +122,7 @@ func TestConfig(t *testing.T) { name: "non-AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -123,8 +130,8 @@ func TestConfig(t *testing.T) { "endpoint": "ep", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig(queueURL, "", "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "", "") c.RegionName = "region" c.AWSConfig.Endpoint = "ep" return c @@ -134,6 +141,7 @@ func TestConfig(t *testing.T) { name: "explicit_AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", @@ -141,8 +149,8 @@ func TestConfig(t *testing.T) { "endpoint": "amazonaws.com", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig(queueURL, "", "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "", "") c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" c.AWSConfig.Endpoint = "amazonaws.com" c.RegionName = "region" @@ -153,14 +161,15 @@ func TestConfig(t *testing.T) { name: "inferred_AWS_endpoint_with_explicit_region", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs", "region": "region", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig(queueURL, "", "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "", "") c.QueueURL = "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs" c.RegionName = "region" return c @@ -170,84 +179,105 @@ func TestConfig(t *testing.T) { name: "localstack_with_region_name", queueURL: "http://localhost:4566/000000000000/sample-queue", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": "http://localhost:4566/000000000000/sample-queue", "region": "myregion", }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig(queueURL, "", "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, "", "", "") c.RegionName = "myregion" return c }, }, { - name: "error on no queueURL and s3Bucket and nonAWSS3Bucket", + name: "error on no queueURL, s3Bucket, s3AccessPoint, and nonAWSS3Bucket", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": "", "bucket_arn": "", + "access_point_arn": "", "non_aws_bucket_name": "", }, - expectedErr: "neither queue_url, bucket_arn nor non_aws_bucket_name were provided", + expectedErr: "neither queue_url, bucket_arn, access_point_arn, nor non_aws_bucket_name were provided", expectedCfg: nil, }, { name: "error on both queueURL and s3Bucket", queueURL: queueURL, s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, "bucket_arn": s3Bucket, }, - expectedErr: "queue_url , bucket_arn , non_aws_bucket_name <> cannot be set at the same time", + expectedErr: "queue_url , bucket_arn , access_point_arn <>, non_aws_bucket_name <> cannot be set at the same time", expectedCfg: nil, }, { name: "error on both queueURL and NonAWSS3Bucket", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "queue_url": queueURL, "non_aws_bucket_name": nonAWSS3Bucket, }, - expectedErr: "queue_url , bucket_arn <>, non_aws_bucket_name cannot be set at the same time", + expectedErr: "queue_url , bucket_arn <>, access_point_arn <>, non_aws_bucket_name cannot be set at the same time", expectedCfg: nil, }, { name: "error on both s3Bucket and NonAWSS3Bucket", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "bucket_arn": s3Bucket, "non_aws_bucket_name": nonAWSS3Bucket, }, - expectedErr: "queue_url <>, bucket_arn , non_aws_bucket_name cannot be set at the same time", + expectedErr: "queue_url <>, bucket_arn , access_point_arn <>, non_aws_bucket_name cannot be set at the same time", expectedCfg: nil, }, { name: "error on queueURL, s3Bucket, and NonAWSS3Bucket", queueURL: queueURL, s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "queue_url": queueURL, "bucket_arn": s3Bucket, "non_aws_bucket_name": nonAWSS3Bucket, }, - expectedErr: "queue_url , bucket_arn , non_aws_bucket_name cannot be set at the same time", + expectedErr: "queue_url , bucket_arn , access_point_arn <>, non_aws_bucket_name cannot be set at the same time", + expectedCfg: nil, + }, + { + name: "error on both s3Bucket and s3AccessPoint", + queueURL: "", + s3Bucket: s3Bucket, + s3AccessPoint: s3AccessPoint, + nonAWSS3Bucket: nonAWSS3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "access_point_arn": s3AccessPoint, + }, + expectedErr: "queue_url <>, bucket_arn , access_point_arn , non_aws_bucket_name <> cannot be set at the same time", expectedCfg: nil, }, { name: "error on api_timeout == 0", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -260,6 +290,7 @@ func TestConfig(t *testing.T) { name: "error on visibility_timeout == 0", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -272,6 +303,7 @@ func TestConfig(t *testing.T) { name: "error on visibility_timeout > 12h", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -284,6 +316,7 @@ func TestConfig(t *testing.T) { name: "error on bucket_list_interval == 0", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -296,6 +329,7 @@ func TestConfig(t *testing.T) { name: "error on number_of_workers == 0", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -308,6 +342,7 @@ func TestConfig(t *testing.T) { name: "error on buffer_size == 0 ", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -320,6 +355,7 @@ func TestConfig(t *testing.T) { name: "error on max_bytes == 0 ", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -332,6 +368,7 @@ func TestConfig(t *testing.T) { name: "error on expand_event_list_from_field and content_type != application/json ", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -345,6 +382,7 @@ func TestConfig(t *testing.T) { name: "error on expand_event_list_from_field and content_type != application/json ", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -358,14 +396,15 @@ func TestConfig(t *testing.T) { name: "input with defaults for non-AWS S3 Bucket", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig("", "", nonAWSS3Bucket) + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig("", "", "", nonAWSS3Bucket) c.NumberOfWorkers = 5 return c }, @@ -374,6 +413,7 @@ func TestConfig(t *testing.T) { name: "error on FIPS with non-AWS S3 Bucket", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, @@ -387,6 +427,7 @@ func TestConfig(t *testing.T) { name: "error on path_style with AWS native S3 Bucket", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -400,6 +441,7 @@ func TestConfig(t *testing.T) { name: "error on provider with AWS native S3 Bucket", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -413,6 +455,7 @@ func TestConfig(t *testing.T) { name: "error on provider with AWS SQS Queue", queueURL: queueURL, s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "queue_url": queueURL, @@ -426,6 +469,7 @@ func TestConfig(t *testing.T) { name: "backup_to_bucket with AWS", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -434,8 +478,8 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig("", s3Bucket, "") + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig("", s3Bucket, "", "") c.BackupConfig.BackupToBucketArn = "arn:aws:s3:::bBucket" c.BackupConfig.BackupToBucketPrefix = "backup" c.NumberOfWorkers = 5 @@ -446,6 +490,7 @@ func TestConfig(t *testing.T) { name: "backup_to_bucket with non-AWS", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, @@ -454,8 +499,8 @@ func TestConfig(t *testing.T) { "number_of_workers": 5, }, expectedErr: "", - expectedCfg: func(queueURL, s3Bucket, nonAWSS3Bucket string) config { - c := makeConfig("", "", nonAWSS3Bucket) + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig("", "", "", nonAWSS3Bucket) c.NonAWSBucketName = nonAWSS3Bucket c.BackupConfig.NonAWSBackupToBucketName = "bBucket" c.BackupConfig.BackupToBucketPrefix = "backup" @@ -467,6 +512,7 @@ func TestConfig(t *testing.T) { name: "error with non-AWS backup and AWS source", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -480,6 +526,7 @@ func TestConfig(t *testing.T) { name: "error with AWS backup and non-AWS source", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, @@ -493,6 +540,7 @@ func TestConfig(t *testing.T) { name: "error with same bucket backup and empty backup prefix", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -506,6 +554,7 @@ func TestConfig(t *testing.T) { name: "error with same bucket backup (non-AWS) and empty backup prefix", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, @@ -519,6 +568,7 @@ func TestConfig(t *testing.T) { name: "error with same bucket backup and backup prefix equal to list prefix", queueURL: "", s3Bucket: s3Bucket, + s3AccessPoint: "", nonAWSS3Bucket: "", config: mapstr.M{ "bucket_arn": s3Bucket, @@ -534,6 +584,7 @@ func TestConfig(t *testing.T) { name: "error with same bucket backup (non-AWS) and backup prefix equal to list prefix", queueURL: "", s3Bucket: "", + s3AccessPoint: "", nonAWSS3Bucket: nonAWSS3Bucket, config: mapstr.M{ "non_aws_bucket_name": nonAWSS3Bucket, @@ -563,7 +614,67 @@ func TestConfig(t *testing.T) { if tc.expectedCfg == nil { t.Fatal("missing expected config in test case") } - assert.EqualValues(t, tc.expectedCfg(tc.queueURL, tc.s3Bucket, tc.nonAWSS3Bucket), c) + assert.EqualValues(t, tc.expectedCfg(tc.queueURL, tc.s3Bucket, tc.s3AccessPoint, tc.nonAWSS3Bucket), c) + }) + } +} + +// TestIsValidAccessPointARN tests the isValidAccessPointARN function +func TestIsValidAccessPointARN(t *testing.T) { + testCases := []struct { + name string + arn string + expected bool + }{ + { + name: "Valid Access Point ARN", + arn: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point", + expected: true, + }, + { + name: "Valid Access Point ARN with another region", + arn: "arn:aws:s3:us-west-2:123456789:accesspoint/my-access-point", + expected: true, + }, + { + name: "Invalid ARN with missing parts", + arn: "arn:aws:s3:123456789:accesspoint", + expected: false, + }, + { + name: "Invalid ARN without accesspoint keyword", + arn: "arn:aws:s3:us-east-1:123456789:bucket/my-bucket", + expected: false, + }, + { + name: "Invalid ARN with wrong format", + arn: "arn:aws:s3:us-east-1:123456789:my-access-point", + expected: false, + }, + { + name: "Empty ARN", + arn: "", + expected: false, + }, + { + name: "ARN with extra parts but valid access point format", + arn: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point/extra", + expected: true, + }, + { + name: "ARN with empty name", + arn: "arn:aws:s3:us-east-1:123456789:accesspoint/", + expected: false, + }, + } + + // Run test cases + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := isValidAccessPointARN(tc.arn) + if result != tc.expected { + t.Errorf("expected %v, got %v for ARN: %s", tc.expected, result, tc.arn) + } }) } } diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 6d62f454c42..32f9f24be46 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -46,9 +46,9 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { return nil, fmt.Errorf("initializing AWS config: %w", err) } - // The awsConfig now contains the region from the credential profile or default region - // if the region is explicitly set in the config, then it wins if config.RegionName != "" { + // The awsConfig now contains the region from the credential profile or default region + // if the region is explicitly set in the config, then it wins awsConfig.Region = config.RegionName } @@ -56,7 +56,7 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) { return newSQSReaderInput(config, awsConfig), nil } - if config.BucketARN != "" || config.NonAWSBucketName != "" { + if config.BucketARN != "" || config.AccessPointARN != "" || config.NonAWSBucketName != "" { return newS3PollerInput(config, awsConfig, im.store) } diff --git a/x-pack/filebeat/input/awss3/input_integration_test.go b/x-pack/filebeat/input/awss3/input_integration_test.go index 87c199dc246..43973b044e9 100644 --- a/x-pack/filebeat/input/awss3/input_integration_test.go +++ b/x-pack/filebeat/input/awss3/input_integration_test.go @@ -606,9 +606,77 @@ func drainSQS(t *testing.T, region string, queueURL string, cfg aws.Config) { t.Logf("Drained %d SQS messages.", deletedCount) } +func TestGetRegionFromAccessPointARN(t *testing.T) { + // Define test cases + testCases := []struct { + name string + arn string + expected string + }{ + { + name: "Valid Access Point ARN", + arn: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point", + expected: "us-east-1", + }, + { + name: "Invalid ARN with missing region", + arn: "arn:aws:s3::123456789:accesspoint/my-access-point", + expected: "", + }, + { + name: "Invalid ARN with too few parts", + arn: "arn:aws:s3", + expected: "", + }, + { + name: "Standard bucket ARN (not an Access Point)", + arn: "arn:aws:s3:::my_corporate_bucket", + expected: "", + }, + { + name: "Malformed ARN with extra colons", + arn: "arn:aws:s3:::us-west-2:123456789:accesspoint/my-access-point", + expected: "", + }, + { + name: "Access Point ARN with additional elements", + arn: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point/extra", + expected: "us-east-1", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + region := getRegionFromAccessPointARN(tc.arn) + assert.Equal(t, tc.expected, region) + }) + } +} + func TestGetBucketNameFromARN(t *testing.T) { - bucketName := getBucketNameFromARN("arn:aws:s3:::my_corporate_bucket") - assert.Equal(t, "my_corporate_bucket", bucketName) + testCases := []struct { + name string + bucketARN string + expected string + }{ + { + name: "Standard bucket ARN", + bucketARN: "arn:aws:s3:::my_corporate_bucket", + expected: "my_corporate_bucket", + }, + { + name: "Access Point ARN", + bucketARN: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point", + expected: "arn:aws:s3:us-east-1:123456789:accesspoint/my-access-point", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + bucketName := getBucketNameFromARN(tc.bucketARN) + assert.Equal(t, tc.expected, bucketName) + }) + } } func TestGetRegionForBucketARN(t *testing.T) { diff --git a/x-pack/filebeat/input/awss3/s3.go b/x-pack/filebeat/input/awss3/s3.go index fabc1b2f1dd..1fe4584b7db 100644 --- a/x-pack/filebeat/input/awss3/s3.go +++ b/x-pack/filebeat/input/awss3/s3.go @@ -43,6 +43,12 @@ func createPipelineClient(pipeline beat.Pipeline, acks *awsACKHandler) (beat.Cli } func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName string) (string, error) { + // Skip region fetching if it's an Access Point ARN + if isValidAccessPointARN(bucketName) { + // Extract the region from the ARN (e.g., arn:aws:s3:us-west-2:123456789012:accesspoint/my-access-point) + return getRegionFromAccessPointARN(bucketName), nil + } + getBucketLocationOutput, err := s3Client.GetBucketLocation(ctx, &s3.GetBucketLocationInput{ Bucket: awssdk.String(bucketName), }) @@ -59,7 +65,19 @@ func getRegionForBucket(ctx context.Context, s3Client *s3.Client, bucketName str return string(getBucketLocationOutput.LocationConstraint), nil } +// Helper function to extract region from Access Point ARN +func getRegionFromAccessPointARN(arn string) string { + arnParts := strings.Split(arn, ":") + if len(arnParts) > 3 { + return arnParts[3] // The fourth part of ARN is region + } + return "" +} + func getBucketNameFromARN(bucketARN string) string { + if isValidAccessPointARN(bucketARN) { + return bucketARN // Return full ARN for Access Points + } bucketMetadata := strings.Split(bucketARN, ":") bucketName := bucketMetadata[len(bucketMetadata)-1] return bucketName