-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: 878 permissions errors #892
Changes from all commits
a0881f9
504efe2
c9e437d
75dafc9
d9513ff
6784e46
e171e11
0785c03
95cc6e0
f0a9082
90fc5bb
b19d5a8
4278a24
63b59c1
439f2d3
f303a4e
5ce86c0
b6395ca
c9cec77
a416816
a376b70
04229dd
06fedae
b7712cd
b76e9f7
ceb6f05
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/aws" | ||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" | ||
"github.com/aws/aws-sdk-go-v2/service/dynamodb" | ||
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" | ||
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" | ||
) | ||
|
||
// DynamoDBGetItemAPI is the interface for retrieving a single item from a DynamoDB table via primary key lookup | ||
type DynamoDBGetItemAPI interface { | ||
GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) | ||
} | ||
|
||
// GetDynamoDBLastModified gets the "Last Modified" timestamp for a grantRecord stored in a DynamoDB table. | ||
// If the item exists, a pointer to the last modification time is returned along with a nil error. | ||
// If the specified item does not exist, the returned *time.Time and error are both nil. | ||
// If an error is encountered when calling the HeadObject S3 API method, this will return a nil | ||
// *time.Time value along with the encountered error. | ||
func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key map[string]ddbtypes.AttributeValue) (*time.Time, error) { | ||
resp, err := c.GetItem(ctx, &dynamodb.GetItemInput{ | ||
TableName: &table, | ||
Key: key, | ||
ProjectionExpression: aws.String("LastUpdatedDate"), | ||
}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
if resp.Item == nil { | ||
return nil, nil | ||
} | ||
|
||
item := struct{ LastUpdatedDate grantsgov.MMDDYYYYType }{} | ||
if err := attributevalue.UnmarshalMap(resp.Item, &item); err != nil { | ||
return nil, err | ||
} | ||
lastUpdatedDate, err := item.LastUpdatedDate.Time() | ||
return &lastUpdatedDate, err | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,100 @@ | ||
package main | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" | ||
"github.com/aws/aws-sdk-go-v2/service/dynamodb" | ||
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" | ||
"github.com/stretchr/testify/assert" | ||
"github.com/stretchr/testify/require" | ||
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" | ||
) | ||
|
||
type mockDynamoDBGetItemClient func(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) | ||
|
||
func (m mockDynamoDBGetItemClient) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { | ||
return m(ctx, params, optFns...) | ||
} | ||
|
||
func makeTestItem(t *testing.T, lastUpdatedDateTestValue any) map[string]ddbtypes.AttributeValue { | ||
t.Helper() | ||
rv, err := attributevalue.MarshalMap(map[string]any{"LastUpdatedDate": lastUpdatedDateTestValue}) | ||
require.NoError(t, err, "Unexpected error creating test DynamoDB item fixture during setup") | ||
return rv | ||
} | ||
|
||
func TestGetDynamoDBLastModified(t *testing.T) { | ||
testTableName := "test-table" | ||
testItemKey := map[string]ddbtypes.AttributeValue{ | ||
"grant_id": &ddbtypes.AttributeValueMemberS{Value: "test-key"}, | ||
} | ||
testLastUpdateDateString := time.Now().Format(grantsgov.TimeLayoutMMDDYYYYType) | ||
testLastUpdateDate, err := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testLastUpdateDateString) | ||
require.NoError(t, err, "Unexpected error parsing time fixture during test setup") | ||
testInvalidDateString := "not a valid date string" | ||
|
||
_, testInvalidDateStringParseError := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testInvalidDateString) | ||
require.Error(t, testInvalidDateStringParseError, "Error fixture unexpectedly nil during test setup") | ||
|
||
for _, tt := range []struct { | ||
name string | ||
ddbItem map[string]ddbtypes.AttributeValue | ||
ddbErr error | ||
expLastModified *time.Time | ||
expErr error | ||
}{ | ||
{ | ||
"GetItem produces item with valid LastUpdatedDate", | ||
makeTestItem(t, testLastUpdateDateString), | ||
nil, | ||
&testLastUpdateDate, | ||
nil, | ||
}, | ||
{ | ||
"GetItem returns error", | ||
nil, | ||
errors.New("GetItem action failed"), | ||
nil, | ||
errors.New("GetItem action failed"), | ||
}, | ||
{"GetItem key not found", nil, nil, nil, nil}, | ||
{ | ||
"GetItem produces item with invalid LastUpdateDate", | ||
makeTestItem(t, testInvalidDateString), | ||
nil, | ||
nil, | ||
testInvalidDateStringParseError, | ||
}, | ||
{ | ||
"GetItem produces item that cannot be unmarshalled", | ||
makeTestItem(t, true), | ||
nil, | ||
nil, | ||
errors.New("unmarshal failed, cannot unmarshal bool into Go value type grantsgov.MMDDYYYYType"), | ||
}, | ||
} { | ||
|
||
t.Run(tt.name, func(t *testing.T) { | ||
mockClient := mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { | ||
require.Equal(t, &testTableName, params.TableName, "Unexpected table name in GetItem params") | ||
require.Equal(t, testItemKey, params.Key, "Unexpected item key in GetItem params") | ||
|
||
return &dynamodb.GetItemOutput{Item: tt.ddbItem}, tt.ddbErr | ||
}) | ||
|
||
lastModified, err := GetDynamoDBLastModified(context.TODO(), | ||
mockClient, testTableName, testItemKey) | ||
|
||
if tt.expErr != nil { | ||
assert.EqualError(t, err, tt.expErr.Error()) | ||
} else { | ||
assert.NoError(t, err) | ||
assert.Equal(t, tt.expLastModified, lastModified) | ||
} | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,12 +33,7 @@ const ( | |
// a partial or complete invocation failure. | ||
// Returns nil when all grant records are successfully processed from all source records, | ||
// indicating complete success. | ||
func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error { | ||
// Configure service clients | ||
s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) { | ||
o.UsePathStyle = env.UsePathStyleS3Opt | ||
}) | ||
|
||
func handleS3Event(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, s3Event events.S3Event) error { | ||
// Create a records channel to direct opportunity/forecast values parsed from the source | ||
// record to individual S3 object uploads | ||
records := make(chan grantRecord) | ||
|
@@ -48,7 +43,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events | |
wg := multierror.Group{} | ||
for i := 0; i < env.MaxConcurrentUploads; i++ { | ||
wg.Go(func() error { | ||
return processRecords(processingCtx, s3svc, records) | ||
return processRecords(processingCtx, s3svc, ddbsvc, records) | ||
}) | ||
} | ||
|
||
|
@@ -129,6 +124,9 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events | |
func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changes in this function enable developers to set a Very simply: we're incrementing a counter for each extracted record and breaking out of the loop if it reaches |
||
span, ctx := tracer.StartSpanFromContext(ctx, "read.xml") | ||
|
||
// Count records sent to ch | ||
countSentRecords := 0 | ||
|
||
d := xml.NewDecoder(r) | ||
for { | ||
// Check for context cancelation before/between reads | ||
|
@@ -138,6 +136,11 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error | |
return err | ||
} | ||
|
||
// End early if we have reached any configured limit on the number of records sent to ch | ||
if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If |
||
break | ||
} | ||
|
||
token, err := d.Token() | ||
if err != nil { | ||
if err == io.EOF { | ||
|
@@ -155,11 +158,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error | |
if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { | ||
var o opportunity | ||
if err = d.DecodeElement(&o, &se); err == nil { | ||
countSentRecords++ | ||
ch <- &o | ||
} | ||
} else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled { | ||
var f forecast | ||
if err = d.DecodeElement(&f, &se); err == nil { | ||
countSentRecords++ | ||
ch <- &f | ||
} | ||
} | ||
|
@@ -181,7 +186,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error | |
// It returns a multi-error containing any errors encountered while processing a received | ||
// grantRecord as well as the reason for the context cancelation, if any. | ||
// Returns nil if all records were processed successfully until the channel was closed. | ||
func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) { | ||
func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, ch <-chan grantRecord) (errs error) { | ||
span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker") | ||
|
||
whenCanceled := func() error { | ||
|
@@ -209,7 +214,7 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) | |
} | ||
|
||
workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work") | ||
err := processRecord(ctx, svc, record) | ||
err := processRecord(ctx, s3svc, ddbsvc, record) | ||
if err != nil { | ||
sendMetric("record.failed", 1) | ||
errs = multierror.Append(errs, err) | ||
|
@@ -223,12 +228,13 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) | |
} | ||
} | ||
|
||
// processRecord takes a single record and conditionally uploads an XML | ||
// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading, | ||
// any extant S3 object with a matching key in the bucket named by env.DestinationBucket | ||
// is compared with the record. An upload is initiated when the record was updated | ||
// more recently than the extant object was last modified, or when no extant object exists. | ||
func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error { | ||
// processRecord takes a single record and conditionally uploads an XML representation | ||
// of the grant forecast/opportunity to its configured S3 destination. | ||
// Before uploading, the last-modified date of a matching extant DynamoDB item (if any) | ||
// is compared with the last-modified date the record on-hand. | ||
// An upload is initiated when the record on-hand has a last-modified date that is more recent | ||
// than that of the extant item, or when no extant item exists. | ||
func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error { | ||
logger := record.logWith(logger) | ||
|
||
lastModified, err := record.lastModified() | ||
|
@@ -239,8 +245,8 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe | |
log.Debug(logger, "Parsed last modified time from record last update date") | ||
|
||
key := record.s3ObjectKey() | ||
logger = log.With(logger, "bucket", env.DestinationBucket, "key", key) | ||
remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) | ||
logger = log.With(logger, "table", env.DynamoDBTableName, "bucket", env.DestinationBucket, "key", key) | ||
remoteLastModified, err := GetDynamoDBLastModified(ctx, ddbsvc, env.DynamoDBTableName, record.dynamoDBItemKey()) | ||
if err != nil { | ||
return log.Errorf(logger, "Error determining last modified time for remote record", err) | ||
} | ||
|
@@ -264,7 +270,7 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe | |
return log.Errorf(logger, "Error marshaling XML for record", err) | ||
} | ||
|
||
if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { | ||
if err := UploadS3Object(ctx, s3svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { | ||
return log.Errorf(logger, "Error uploading prepared grant record to S3", err) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FYI: This change is wholly unrelated to the rest of this PR; I just happened to notice that we were missing the
error
key for this log in Datadog (odd numbers of k/v arguments make things weird). We can probably lint for this in the future.