Skip to content

Commit

Permalink
Fix: 878 permissions errors (#892)
Browse files Browse the repository at this point in the history
* Add `dynamoDBItemKey()` to grantRecord interface

* Add functionality to check LastUpdatedDate in DynamoDB

* Refactor handler to use DynamoDB

* Grant Lambda permission to get DynamoDB items

* Grant lambda permission to use new S3 key naming convention

* Allow PersistGrantsGovXMLDB to read from new XML object names

* Add ability to limit SplitGrantsGovXMLDB processing in local dev
  • Loading branch information
TylerHendrickson authored Sep 25, 2024
1 parent 8292a19 commit db256e6
Show file tree
Hide file tree
Showing 13 changed files with 314 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cmd/PublishGrantEvents/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func buildGrantModificationEventJSON(record events.DynamoDBEventRecord) ([]byte,
}
if err := prevVersion.Validate(); err != nil {
sendMetric("grant_data.invalid", 1, metricTag)
log.Warn(logger, "grant data from ItemMapper is invalid", err)
log.Warn(logger, "grant data from ItemMapper is invalid", "error", err)
}
}

Expand Down
43 changes: 43 additions & 0 deletions cmd/SplitGrantsGovXMLDB/dynamodb.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package main

import (
"context"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
)

// DynamoDBGetItemAPI is the interface for retrieving a single item from a DynamoDB table via primary key lookup
type DynamoDBGetItemAPI interface {
GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)
}

// GetDynamoDBLastModified gets the "Last Modified" timestamp for a grantRecord stored in a DynamoDB table.
// If the item exists, a pointer to the last modification time is returned along with a nil error.
// If the specified item does not exist, the returned *time.Time and error are both nil.
// If an error is encountered when calling the HeadObject S3 API method, this will return a nil
// *time.Time value along with the encountered error.
func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key map[string]ddbtypes.AttributeValue) (*time.Time, error) {
resp, err := c.GetItem(ctx, &dynamodb.GetItemInput{
TableName: &table,
Key: key,
ProjectionExpression: aws.String("LastUpdatedDate"),
})
if err != nil {
return nil, err
}
if resp.Item == nil {
return nil, nil
}

item := struct{ LastUpdatedDate grantsgov.MMDDYYYYType }{}
if err := attributevalue.UnmarshalMap(resp.Item, &item); err != nil {
return nil, err
}
lastUpdatedDate, err := item.LastUpdatedDate.Time()
return &lastUpdatedDate, err
}
100 changes: 100 additions & 0 deletions cmd/SplitGrantsGovXMLDB/dynamodb_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package main

import (
"context"
"errors"
"testing"
"time"

"github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
"github.com/aws/aws-sdk-go-v2/service/dynamodb"
ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov"
)

type mockDynamoDBGetItemClient func(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error)

func (m mockDynamoDBGetItemClient) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) {
return m(ctx, params, optFns...)
}

func makeTestItem(t *testing.T, lastUpdatedDateTestValue any) map[string]ddbtypes.AttributeValue {
t.Helper()
rv, err := attributevalue.MarshalMap(map[string]any{"LastUpdatedDate": lastUpdatedDateTestValue})
require.NoError(t, err, "Unexpected error creating test DynamoDB item fixture during setup")
return rv
}

func TestGetDynamoDBLastModified(t *testing.T) {
testTableName := "test-table"
testItemKey := map[string]ddbtypes.AttributeValue{
"grant_id": &ddbtypes.AttributeValueMemberS{Value: "test-key"},
}
testLastUpdateDateString := time.Now().Format(grantsgov.TimeLayoutMMDDYYYYType)
testLastUpdateDate, err := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testLastUpdateDateString)
require.NoError(t, err, "Unexpected error parsing time fixture during test setup")
testInvalidDateString := "not a valid date string"

_, testInvalidDateStringParseError := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testInvalidDateString)
require.Error(t, testInvalidDateStringParseError, "Error fixture unexpectedly nil during test setup")

for _, tt := range []struct {
name string
ddbItem map[string]ddbtypes.AttributeValue
ddbErr error
expLastModified *time.Time
expErr error
}{
{
"GetItem produces item with valid LastUpdatedDate",
makeTestItem(t, testLastUpdateDateString),
nil,
&testLastUpdateDate,
nil,
},
{
"GetItem returns error",
nil,
errors.New("GetItem action failed"),
nil,
errors.New("GetItem action failed"),
},
{"GetItem key not found", nil, nil, nil, nil},
{
"GetItem produces item with invalid LastUpdateDate",
makeTestItem(t, testInvalidDateString),
nil,
nil,
testInvalidDateStringParseError,
},
{
"GetItem produces item that cannot be unmarshalled",
makeTestItem(t, true),
nil,
nil,
errors.New("unmarshal failed, cannot unmarshal bool into Go value type grantsgov.MMDDYYYYType"),
},
} {

t.Run(tt.name, func(t *testing.T) {
mockClient := mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) {
require.Equal(t, &testTableName, params.TableName, "Unexpected table name in GetItem params")
require.Equal(t, testItemKey, params.Key, "Unexpected item key in GetItem params")

return &dynamodb.GetItemOutput{Item: tt.ddbItem}, tt.ddbErr
})

lastModified, err := GetDynamoDBLastModified(context.TODO(),
mockClient, testTableName, testItemKey)

if tt.expErr != nil {
assert.EqualError(t, err, tt.expErr.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expLastModified, lastModified)
}
})
}
}
42 changes: 24 additions & 18 deletions cmd/SplitGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,7 @@ const (
// a partial or complete invocation failure.
// Returns nil when all grant records are successfully processed from all source records,
// indicating complete success.
func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error {
// Configure service clients
s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
o.UsePathStyle = env.UsePathStyleS3Opt
})

func handleS3Event(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, s3Event events.S3Event) error {
// Create a records channel to direct opportunity/forecast values parsed from the source
// record to individual S3 object uploads
records := make(chan grantRecord)
Expand All @@ -48,7 +43,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events
wg := multierror.Group{}
for i := 0; i < env.MaxConcurrentUploads; i++ {
wg.Go(func() error {
return processRecords(processingCtx, s3svc, records)
return processRecords(processingCtx, s3svc, ddbsvc, records)
})
}

Expand Down Expand Up @@ -129,6 +124,9 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events
func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error {
span, ctx := tracer.StartSpanFromContext(ctx, "read.xml")

// Count records sent to ch
countSentRecords := 0

d := xml.NewDecoder(r)
for {
// Check for context cancelation before/between reads
Expand All @@ -138,6 +136,11 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
return err
}

// End early if we have reached any configured limit on the number of records sent to ch
if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords {
break
}

token, err := d.Token()
if err != nil {
if err == io.EOF {
Expand All @@ -155,11 +158,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME {
var o opportunity
if err = d.DecodeElement(&o, &se); err == nil {
countSentRecords++
ch <- &o
}
} else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled {
var f forecast
if err = d.DecodeElement(&f, &se); err == nil {
countSentRecords++
ch <- &f
}
}
Expand All @@ -181,7 +186,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
// It returns a multi-error containing any errors encountered while processing a received
// grantRecord as well as the reason for the context cancelation, if any.
// Returns nil if all records were processed successfully until the channel was closed.
func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) {
func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, ch <-chan grantRecord) (errs error) {
span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker")

whenCanceled := func() error {
Expand Down Expand Up @@ -209,7 +214,7 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord)
}

workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work")
err := processRecord(ctx, svc, record)
err := processRecord(ctx, s3svc, ddbsvc, record)
if err != nil {
sendMetric("record.failed", 1)
errs = multierror.Append(errs, err)
Expand All @@ -223,12 +228,13 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord)
}
}

// processRecord takes a single record and conditionally uploads an XML
// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading,
// any extant S3 object with a matching key in the bucket named by env.DestinationBucket
// is compared with the record. An upload is initiated when the record was updated
// more recently than the extant object was last modified, or when no extant object exists.
func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error {
// processRecord takes a single record and conditionally uploads an XML representation
// of the grant forecast/opportunity to its configured S3 destination.
// Before uploading, the last-modified date of a matching extant DynamoDB item (if any)
// is compared with the last-modified date the record on-hand.
// An upload is initiated when the record on-hand has a last-modified date that is more recent
// than that of the extant item, or when no extant item exists.
func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error {
logger := record.logWith(logger)

lastModified, err := record.lastModified()
Expand All @@ -239,8 +245,8 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe
log.Debug(logger, "Parsed last modified time from record last update date")

key := record.s3ObjectKey()
logger = log.With(logger, "bucket", env.DestinationBucket, "key", key)
remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key)
logger = log.With(logger, "table", env.DynamoDBTableName, "bucket", env.DestinationBucket, "key", key)
remoteLastModified, err := GetDynamoDBLastModified(ctx, ddbsvc, env.DynamoDBTableName, record.dynamoDBItemKey())
if err != nil {
return log.Errorf(logger, "Error determining last modified time for remote record", err)
}
Expand All @@ -264,7 +270,7 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe
return log.Errorf(logger, "Error marshaling XML for record", err)
}

if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil {
if err := UploadS3Object(ctx, s3svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil {
return log.Errorf(logger, "Error uploading prepared grant record to S3", err)
}

Expand Down
Loading

0 comments on commit db256e6

Please sign in to comment.