Skip to content

Commit

Permalink
[8.16](backport #41495) x-pack/filebeat/input/awss3: support for Acce…
Browse files Browse the repository at this point in the history
…ss Point ARN (#41688)

* x-pack/filebeat/input/awss3: support for Access Point ARN (#41495)

Backport: Added a new option access_point_arn to the AWS S3 input as an alternative to the bucket ARN to access S3 buckets.
  • Loading branch information
mergify[bot] authored Nov 22, 2024
1 parent 3f8c4bd commit dcad799
Show file tree
Hide file tree
Showing 9 changed files with 273 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]
- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ configuring multiline options.
[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set).
URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn`, `access_point_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `region`
Expand Down Expand Up @@ -472,7 +472,12 @@ value is `20s`.
[float]
==== `bucket_arn`

ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` and `non_aws_bucket_name` are not set).
ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url`, `access_point_arn, and `non_aws_bucket_name` are not set).

[float]
==== `access_point_arn`

ARN of the AWS S3 Access Point that will be polled for list operation. (Required when `queue_url`, `bucket_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `non_aws_bucket_name`
Expand All @@ -492,7 +497,7 @@ Prefix to apply for the list request to the S3 bucket. Default empty.
[float]
==== `number_of_workers`

Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` is set, otherwise (in the SQS case) defaults to 5.
Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` or `access_point_arn` is set, otherwise (in the SQS case) defaults to 5.


[float]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,9 @@ filebeat.inputs:
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
37 changes: 29 additions & 8 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"errors"
"fmt"
"net/url"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -33,6 +34,7 @@ type config struct {
QueueURL string `config:"queue_url"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
AccessPointARN string `config:"access_point_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
Expand Down Expand Up @@ -61,28 +63,32 @@ func defaultConfig() config {
}

func (c *config) Validate() error {
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.NonAWSBucketName != ""}
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.AccessPointARN != "", c.NonAWSBucketName != ""}
enabled := []bool{}
for i := range configs {
if configs[i] {
enabled = append(enabled, configs[i])
}
}
if len(enabled) == 0 {
return errors.New("neither queue_url, bucket_arn nor non_aws_bucket_name were provided")
return errors.New("neither queue_url, bucket_arn, access_point_arn, nor non_aws_bucket_name were provided")
} else if len(enabled) > 1 {
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.NonAWSBucketName)
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, access_point_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.AccessPointARN, c.NonAWSBucketName)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

if c.AccessPointARN != "" && !isValidAccessPointARN(c.AccessPointARN) {
return fmt.Errorf("invalid format for access_point_arn <%v>", c.AccessPointARN)
}

if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) {
return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+
"less than or equal to 12h", c.VisibilityTimeout)
Expand Down Expand Up @@ -117,14 +123,15 @@ func (c *config) Validate() error {
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" {
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" && c.AccessPointARN == "" {
return errors.New("backup to AWS bucket can only be used for AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" {
return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together")
}
if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" {
if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) ||
if (c.BackupConfig.BackupToBucketArn != "" &&
(c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.BackupToBucketArn == c.AccessPointARN)) ||
(c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) {
if c.BackupConfig.BackupToBucketPrefix == "" {
return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same")
Expand Down Expand Up @@ -233,6 +240,9 @@ func (c config) getBucketName() string {
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
if c.BucketARN != "" {
return getBucketNameFromARN(c.BucketARN)
}
Expand All @@ -246,6 +256,9 @@ func (c config) getBucketARN() string {
if c.BucketARN != "" {
return c.BucketARN
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
return ""
}

Expand Down Expand Up @@ -292,3 +305,11 @@ func (c config) getFileSelectors() []fileSelectorConfig {
}
return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}}
}

// Helper function to detect if an ARN is an Access Point
func isValidAccessPointARN(arn string) bool {
parts := strings.Split(arn, ":")
return len(parts) >= 6 &&
strings.HasPrefix(parts[5], "accesspoint/") &&
len(strings.TrimPrefix(parts[5], "accesspoint/")) > 0
}
Loading

0 comments on commit dcad799

Please sign in to comment.