From a0881f92c11cc404b9e13f2c1898ab538abee0a9 Mon Sep 17 00:00:00 2001 From: tyler Date: Sat, 24 Aug 2024 00:16:43 +0000 Subject: [PATCH 01/21] Add dynamoDBItemKey() to grantRecord interface --- cmd/SplitGrantsGovXMLDB/types.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/cmd/SplitGrantsGovXMLDB/types.go b/cmd/SplitGrantsGovXMLDB/types.go index fd99e16a..2814cbc7 100644 --- a/cmd/SplitGrantsGovXMLDB/types.go +++ b/cmd/SplitGrantsGovXMLDB/types.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/usdigitalresponse/grants-ingest/internal/log" grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) @@ -13,6 +14,7 @@ type grantRecord interface { logWith(log.Logger) log.Logger // s3ObjectKey returns a string to use as the object key when saving the opportunity to an S3 bucket s3ObjectKey() string + dynamoDBItemKey() map[string]ddbtypes.AttributeValue lastModified() (time.Time, error) toXML() ([]byte, error) } @@ -33,6 +35,12 @@ func (o opportunity) s3ObjectKey() string { ) } +func (o opportunity) dynamoDBItemKey() map[string]ddbtypes.AttributeValue { + return map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)}, + } +} + func (o opportunity) lastModified() (time.Time, error) { return o.LastUpdatedDate.Time() } @@ -64,3 +72,9 @@ func (f forecast) lastModified() (time.Time, error) { func (f forecast) toXML() ([]byte, error) { return xml.Marshal(grantsgov.OpportunityForecastDetail_1_0(f)) } + +func (o forecast) dynamoDBItemKey() map[string]ddbtypes.AttributeValue { + return map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)}, + } +} From 504efe22c25cd386c1e7acd43dc7f44dbd10b8b2 Mon Sep 17 00:00:00 2001 From: tyler Date: Sat, 24 Aug 2024 00:17:26 +0000 Subject: [PATCH 02/21] Add functionality to check LastUpdatedDate in DynamoDB --- cmd/SplitGrantsGovXMLDB/dynamodb.go | 39 +++++++++ cmd/SplitGrantsGovXMLDB/dynamodb_test.go | 100 +++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 cmd/SplitGrantsGovXMLDB/dynamodb.go create mode 100644 cmd/SplitGrantsGovXMLDB/dynamodb_test.go diff --git a/cmd/SplitGrantsGovXMLDB/dynamodb.go b/cmd/SplitGrantsGovXMLDB/dynamodb.go new file mode 100644 index 00000000..8d91be14 --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/dynamodb.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" +) + +type DynamoDBItemKey map[string]ddbtypes.AttributeValue + +type DynamoDBGetItemAPI interface { + GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) +} + +func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key DynamoDBItemKey) (*time.Time, error) { + resp, err := c.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: &table, + Key: key, + ProjectionExpression: aws.String("LastUpdatedDate"), + }) + if err != nil { + return nil, err + } + if resp.Item == nil { + return nil, nil + } + + item := struct{ LastUpdatedDate grantsgov.MMDDYYYYType }{} + if err := attributevalue.UnmarshalMap(resp.Item, &item); err != nil { + return nil, err + } + lastUpdatedDate, err := item.LastUpdatedDate.Time() + return &lastUpdatedDate, err +} diff --git a/cmd/SplitGrantsGovXMLDB/dynamodb_test.go b/cmd/SplitGrantsGovXMLDB/dynamodb_test.go new file mode 100644 index 00000000..e138a644 --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/dynamodb_test.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" +) + +type mockDynamoDBGetItemClient func(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) + +func (m mockDynamoDBGetItemClient) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + return m(ctx, params, optFns...) +} + +func makeTestItem(t *testing.T, lastUpdatedDateTestValue any) map[string]ddbtypes.AttributeValue { + t.Helper() + rv, err := attributevalue.MarshalMap(map[string]any{"LastUpdatedDate": lastUpdatedDateTestValue}) + require.NoError(t, err, "Unexpected error creating test DynamoDB item fixture during setup") + return rv +} + +func TestGetDynamoDBLastModified(t *testing.T) { + testTableName := "test-table" + testItemKey := map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: "test-key"}, + } + testLastUpdateDateString := time.Now().Format(grantsgov.TimeLayoutMMDDYYYYType) + testLastUpdateDate, err := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testLastUpdateDateString) + require.NoError(t, err, "Unexpected error parsing time fixture during test setup") + testInvalidDateString := "not a valid date string" + + _, testInvalidDateStringParseError := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testInvalidDateString) + require.Error(t, testInvalidDateStringParseError, "Error fixture unexpectedly nil during test setup") + + for _, tt := range []struct { + name string + ddbItem map[string]ddbtypes.AttributeValue + ddbErr error + expLastModified *time.Time + expErr error + }{ + { + "GetItem produces item with valid LastUpdatedDate", + makeTestItem(t, testLastUpdateDateString), + nil, + &testLastUpdateDate, + nil, + }, + { + "GetItem returns error", + nil, + errors.New("GetItem action failed"), + nil, + errors.New("GetItem action failed"), + }, + {"GetItem key not found", nil, nil, nil, nil}, + { + "GetItem produces item with invalid LastUpdateDate", + makeTestItem(t, testInvalidDateString), + nil, + nil, + testInvalidDateStringParseError, + }, + { + "GetItem produces item that cannot be unmarshalled", + makeTestItem(t, true), + nil, + nil, + errors.New("unmarshal failed, cannot unmarshal bool into Go value type grantsgov.MMDDYYYYType"), + }, + } { + + t.Run(tt.name, func(t *testing.T) { + mockClient := mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + require.Equal(t, &testTableName, params.TableName, "Unexpected table name in GetItem params") + require.Equal(t, testItemKey, params.Key, "Unexpected item key in GetItem params") + + return &dynamodb.GetItemOutput{Item: tt.ddbItem}, tt.ddbErr + }) + + lastModified, err := GetDynamoDBLastModified(context.TODO(), + mockClient, testTableName, testItemKey) + + if tt.expErr != nil { + assert.EqualError(t, err, tt.expErr.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expLastModified, lastModified) + } + }) + } +} From c9e437d7b82b94c410ff38b95a14a7d86dac3e57 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 15:50:07 +0000 Subject: [PATCH 03/21] Refactor handler to use DynamoDB --- cmd/SplitGrantsGovXMLDB/handler.go | 20 +++---- cmd/SplitGrantsGovXMLDB/handler_test.go | 72 ++++++++++++++++--------- cmd/SplitGrantsGovXMLDB/main.go | 10 +++- 3 files changed, 64 insertions(+), 38 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 24b152ab..2eb7464e 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -33,12 +33,7 @@ const ( // a partial or complete invocation failure. // Returns nil when all grant records are successfully processed from all source records, // indicating complete success. -func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error { - // Configure service clients - s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) { - o.UsePathStyle = env.UsePathStyleS3Opt - }) - +func handleS3Event(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, s3Event events.S3Event) error { // Create a records channel to direct opportunity/forecast values parsed from the source // record to individual S3 object uploads records := make(chan grantRecord) @@ -48,7 +43,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events wg := multierror.Group{} for i := 0; i < env.MaxConcurrentUploads; i++ { wg.Go(func() error { - return processRecords(processingCtx, s3svc, records) + return processRecords(processingCtx, s3svc, ddbsvc, records) }) } @@ -181,7 +176,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error // It returns a multi-error containing any errors encountered while processing a received // grantRecord as well as the reason for the context cancelation, if any. // Returns nil if all records were processed successfully until the channel was closed. -func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) { +func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, ch <-chan grantRecord) (errs error) { span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker") whenCanceled := func() error { @@ -209,7 +204,7 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) } workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work") - err := processRecord(ctx, svc, record) + err := processRecord(ctx, s3svc, ddbsvc, record) if err != nil { sendMetric("record.failed", 1) errs = multierror.Append(errs, err) @@ -228,7 +223,7 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) // any extant S3 object with a matching key in the bucket named by env.DestinationBucket // is compared with the record. An upload is initiated when the record was updated // more recently than the extant object was last modified, or when no extant object exists. -func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error { +func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error { logger := record.logWith(logger) lastModified, err := record.lastModified() @@ -240,7 +235,8 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe key := record.s3ObjectKey() logger = log.With(logger, "bucket", env.DestinationBucket, "key", key) - remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) + // remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) + remoteLastModified, err := GetDynamoDBLastModified(ctx, ddbsvc, env.DynamoDBTableName, record.dynamoDBItemKey()) if err != nil { return log.Errorf(logger, "Error determining last modified time for remote record", err) } @@ -264,7 +260,7 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe return log.Errorf(logger, "Error marshaling XML for record", err) } - if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { + if err := UploadS3Object(ctx, s3svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { return log.Errorf(logger, "Error uploading prepared grant record to S3", err) } diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index 6e079842..edee303f 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -19,6 +19,10 @@ import ( awsTransport "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/s3" smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/go-kit/log" @@ -30,6 +34,34 @@ import ( grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) +type mockDDBGetItemRVLookup map[string]mockDDBGetItemRV + +func (m mockDDBGetItemRVLookup) AddEntry(id, lastModified string, err error) { + m[id] = mockDDBGetItemRV{lastModified, err} +} + +type mockDDBGetItemRV struct { + lastModified string + err error +} + +func newMockDDBClient(t *testing.T, idToRV mockDDBGetItemRVLookup) mockDynamoDBGetItemClient { + t.Helper() + return mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + key := struct{ grant_id string }{} + err := attributevalue.UnmarshalMap(params.Key, &key) + require.NoError(t, err, "Failed to extract grant_id value from DynamoDB GetItem key") + output := dynamodb.GetItemOutput{Item: nil} + if rv, exists := idToRV[key.grant_id]; exists { + output.Item = map[string]types.AttributeValue{ + "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.lastModified}, + } + return &output, rv.err + } + return &output, nil + }) +} + func setupLambdaEnvForTesting(t *testing.T) { t.Helper() @@ -39,6 +71,7 @@ func setupLambdaEnvForTesting(t *testing.T) { // Configure environment variables goenv.Unmarshal(goenv.EnvSet{ "GRANTS_PREPARED_DATA_BUCKET_NAME": "test-destination-bucket", + "GRANTS_PREPARED_DATA_TABLE_NAME": "test-dynamodb-table", "S3_USE_PATH_STYLE": "true", "DOWNLOAD_CHUNK_LIMIT": "10", }, &env) @@ -172,7 +205,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { setupLambdaEnvForTesting(t) sourceBucketName := "test-source-bucket" now := time.Now() - s3client, cfg, err := setupS3ForTesting(t, sourceBucketName) + s3client, _, err := setupS3ForTesting(t, sourceBucketName) assert.NoError(t, err, "Error configuring test environment") seenOpportunityIDs := make(map[string]struct{}) @@ -348,6 +381,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { // (will also place extant records in S3 if specified in the test case) var sourceGrantsData bytes.Buffer sourceOpportunitiesData := make(map[string][]byte) + dynamodbLookups := make(mockDDBGetItemRVLookup) _, err := sourceGrantsData.WriteString("") require.NoError(t, err) for _, values := range tt.grantValues { @@ -368,6 +402,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceOpportunityData.Bytes()), }) require.NoError(t, err) + dynamodbLookups[values.OpportunityID] = mockDDBGetItemRV{lastModified: values.LastUpdatedDate} } _, err = sourceGrantsData.Write(sourceOpportunityData.Bytes()) require.NoError(t, err) @@ -388,7 +423,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { require.NoErrorf(t, err, "Error creating test source object %s", objectKey) // Invoke the handler under test with a constructed S3 event - invocationErr := handleS3EventWithConfig(cfg, context.TODO(), events.S3Event{ + invocationErr := handleS3Event(context.TODO(), s3client, newMockDDBClient(t, dynamodbLookups), events.S3Event{ Records: []events.S3EventRecord{{ S3: events.S3Entity{ Bucket: events.S3Bucket{Name: sourceBucketName}, @@ -425,8 +460,9 @@ func TestLambdaInvocationScenarios(t *testing.T) { }) if v.isSkipped || (!v.isValid && !v.isExtant) { - // If there was no extant file and the new grant is invalid, or if we were meant to skip - // this grant, there should be no S3 file + // If there was no extant file and the new grant is invalid, + // or if we were meant to skip this grant, + // then there should be no S3 file assert.Error(t, err) } else { // Otherwise, we verify the S3 file matches the source from the test case @@ -448,7 +484,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { setupLambdaEnvForTesting(t) sourceBucketName := "test-source-bucket" - s3client, cfg, err := setupS3ForTesting(t, sourceBucketName) + s3client, _, err := setupS3ForTesting(t, sourceBucketName) require.NoError(t, err) sourceTemplate := template.Must( template.New("xml").Delims("{{", "}}").Parse(SOURCE_OPPORTUNITY_TEMPLATE), @@ -468,7 +504,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceData.Bytes()), }) require.NoError(t, err) - err = handleS3EventWithConfig(cfg, context.TODO(), events.S3Event{ + err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: sourceBucketName}, @@ -497,12 +533,12 @@ func TestLambdaInvocationScenarios(t *testing.T) { t.Run("Context canceled during invocation", func(t *testing.T) { setupLambdaEnvForTesting(t) - _, cfg, err := setupS3ForTesting(t, "source-bucket") + _, _, err := setupS3ForTesting(t, "source-bucket") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) cancel() - err = handleS3EventWithConfig(cfg, ctx, events.S3Event{ + err = handleS3Event(ctx, s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: "source-bucket"}, @@ -548,22 +584,6 @@ func TestProcessRecord(t *testing.T) { LastUpdatedDate: grantsgov.MMDDYYYYType(now.Format(grantsgov.TimeLayoutMMDDYYYYType)), } - t.Run("Destination bucket is incorrectly configured", func(t *testing.T) { - setupLambdaEnvForTesting(t) - c := mockS3ReadwriteObjectAPI{ - mockHeadObjectAPI( - func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { - t.Helper() - return &s3.HeadObjectOutput{}, fmt.Errorf("server error") - }, - ), - mockGetObjectAPI(nil), - mockPutObjectAPI(nil), - } - err := processRecord(context.TODO(), c, testOpportunity) - assert.ErrorContains(t, err, "Error determining last modified time for remote record") - }) - t.Run("Error uploading to S3", func(t *testing.T) { setupLambdaEnvForTesting(t) s3Client := mockS3ReadwriteObjectAPI{ @@ -588,7 +608,9 @@ func TestProcessRecord(t *testing.T) { }), } fmt.Printf("%T", s3Client) - err := processRecord(context.TODO(), s3Client, testOpportunity) + err := processRecord(context.TODO(), s3Client, newMockDDBClient(t, map[string]mockDDBGetItemRV{ + string(testOpportunity.OpportunityID): {lastModified: string(testOpportunity.LastUpdatedDate), err: nil}, + }), testOpportunity) assert.ErrorContains(t, err, "Error uploading prepared grant record to S3") }) } diff --git a/cmd/SplitGrantsGovXMLDB/main.go b/cmd/SplitGrantsGovXMLDB/main.go index 68806b3a..8eec6291 100644 --- a/cmd/SplitGrantsGovXMLDB/main.go +++ b/cmd/SplitGrantsGovXMLDB/main.go @@ -20,6 +20,8 @@ import ( goenv "github.com/Netflix/go-env" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/usdigitalresponse/grants-ingest/internal/awsHelpers" "github.com/usdigitalresponse/grants-ingest/internal/ddHelpers" "github.com/usdigitalresponse/grants-ingest/internal/log" @@ -30,6 +32,7 @@ type Environment struct { LogLevel string `env:"LOG_LEVEL,default=INFO"` DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` + DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"` MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` @@ -57,7 +60,12 @@ func main() { return fmt.Errorf("could not create AWS SDK config: %w", err) } awstrace.AppendMiddleware(&cfg) + s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = env.UsePathStyleS3Opt + }) + dynamodbSvc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {}) + log.Debug(logger, "Starting Lambda") - return handleS3EventWithConfig(cfg, ctx, s3Event) + return handleS3Event(ctx, s3svc, dynamodbSvc, s3Event) }, nil)) } From 75dafc99208220b99714b3b7624f9c0769405f41 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 16:46:54 +0000 Subject: [PATCH 04/21] Fix ID comparisons --- cmd/SplitGrantsGovXMLDB/handler_test.go | 27 +++++++++++++++---------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index edee303f..eb758403 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -48,11 +48,11 @@ type mockDDBGetItemRV struct { func newMockDDBClient(t *testing.T, idToRV mockDDBGetItemRVLookup) mockDynamoDBGetItemClient { t.Helper() return mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { - key := struct{ grant_id string }{} + key := map[string]string{} err := attributevalue.UnmarshalMap(params.Key, &key) require.NoError(t, err, "Failed to extract grant_id value from DynamoDB GetItem key") output := dynamodb.GetItemOutput{Item: nil} - if rv, exists := idToRV[key.grant_id]; exists { + if rv, exists := idToRV[key["grant_id"]]; exists { output.Item = map[string]types.AttributeValue{ "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.lastModified}, } @@ -402,7 +402,8 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceOpportunityData.Bytes()), }) require.NoError(t, err) - dynamodbLookups[values.OpportunityID] = mockDDBGetItemRV{lastModified: values.LastUpdatedDate} + extantLastModified := time.Now().Format("01022006") + dynamodbLookups.AddEntry(values.OpportunityID, extantLastModified, nil) } _, err = sourceGrantsData.Write(sourceOpportunityData.Bytes()) require.NoError(t, err) @@ -423,14 +424,18 @@ func TestLambdaInvocationScenarios(t *testing.T) { require.NoErrorf(t, err, "Error creating test source object %s", objectKey) // Invoke the handler under test with a constructed S3 event - invocationErr := handleS3Event(context.TODO(), s3client, newMockDDBClient(t, dynamodbLookups), events.S3Event{ - Records: []events.S3EventRecord{{ - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: sourceBucketName}, - Object: events.S3Object{Key: objectKey}, - }, - }}, - }) + invocationErr := handleS3Event(context.TODO(), + s3client, + newMockDDBClient(t, dynamodbLookups), + events.S3Event{ + Records: []events.S3EventRecord{{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: sourceBucketName}, + Object: events.S3Object{Key: objectKey}, + }, + }}, + }, + ) // Determine the list of expected grant objects to have been saved by the handler sourceContainsInvalidOpportunities := false From d9513ff178950848a9fdb6be513ac94b4da891e2 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 19:34:19 +0000 Subject: [PATCH 05/21] Refactor for clearer test-mocking --- cmd/SplitGrantsGovXMLDB/handler_test.go | 85 +++++++++++++++++-------- 1 file changed, 60 insertions(+), 25 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index eb758403..9c07859f 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/xml" + "errors" "fmt" "io" "net/http" @@ -34,31 +35,39 @@ import ( grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) -type mockDDBGetItemRVLookup map[string]mockDDBGetItemRV - -func (m mockDDBGetItemRVLookup) AddEntry(id, lastModified string, err error) { - m[id] = mockDDBGetItemRV{lastModified, err} +type mockDDBClientGetItemReturnValue struct { + GrantId string + ItemLastModified string + GetItemErr error } -type mockDDBGetItemRV struct { - lastModified string - err error -} +// mockDDBClientGetItemCollection is a slice of values that are used to look up return values +// when a mock GetItem call is made. +type mockDDBClientGetItemCollection []mockDDBClientGetItemReturnValue -func newMockDDBClient(t *testing.T, idToRV mockDDBGetItemRVLookup) mockDynamoDBGetItemClient { +// NewGetItemClient returns an implementation of the DynamoDBGetItemAPI that looks up return values from itself at call-time +func (m mockDDBClientGetItemCollection) NewGetItemClient(t *testing.T) mockDynamoDBGetItemClient { t.Helper() - return mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { - key := map[string]string{} - err := attributevalue.UnmarshalMap(params.Key, &key) + + return mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + getItemKey := map[string]string{} + err := attributevalue.UnmarshalMap(params.Key, &getItemKey) require.NoError(t, err, "Failed to extract grant_id value from DynamoDB GetItem key") output := dynamodb.GetItemOutput{Item: nil} - if rv, exists := idToRV[key["grant_id"]]; exists { - output.Item = map[string]types.AttributeValue{ - "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.lastModified}, + targetGrantId, exists := getItemKey["grant_id"] + var rvErr error + if exists { + for _, rv := range m { + if rv.GrantId == targetGrantId { + output.Item = map[string]types.AttributeValue{ + "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.ItemLastModified}, + } + rvErr = rv.GetItemErr + break + } } - return &output, rv.err } - return &output, nil + return &output, rvErr }) } @@ -381,7 +390,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { // (will also place extant records in S3 if specified in the test case) var sourceGrantsData bytes.Buffer sourceOpportunitiesData := make(map[string][]byte) - dynamodbLookups := make(mockDDBGetItemRVLookup) + ddbGetItemReturnValues := make(mockDDBClientGetItemCollection, 0) _, err := sourceGrantsData.WriteString("") require.NoError(t, err) for _, values := range tt.grantValues { @@ -403,7 +412,9 @@ func TestLambdaInvocationScenarios(t *testing.T) { }) require.NoError(t, err) extantLastModified := time.Now().Format("01022006") - dynamodbLookups.AddEntry(values.OpportunityID, extantLastModified, nil) + ddbGetItemReturnValues = append(ddbGetItemReturnValues, mockDDBClientGetItemReturnValue{ + values.OpportunityID, extantLastModified, nil, + }) } _, err = sourceGrantsData.Write(sourceOpportunityData.Bytes()) require.NoError(t, err) @@ -426,7 +437,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { // Invoke the handler under test with a constructed S3 event invocationErr := handleS3Event(context.TODO(), s3client, - newMockDDBClient(t, dynamodbLookups), + ddbGetItemReturnValues.NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{{ S3: events.S3Entity{ @@ -509,7 +520,8 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceData.Bytes()), }) require.NoError(t, err) - err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ + // err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ + err = handleS3Event(context.TODO(), s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: sourceBucketName}, @@ -543,7 +555,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) cancel() - err = handleS3Event(ctx, s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ + err = handleS3Event(ctx, s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: "source-bucket"}, @@ -589,6 +601,28 @@ func TestProcessRecord(t *testing.T) { LastUpdatedDate: grantsgov.MMDDYYYYType(now.Format(grantsgov.TimeLayoutMMDDYYYYType)), } + t.Run("Error getting item from DynamoDB", func(t *testing.T) { + setupLambdaEnvForTesting(t) + c := mockS3ReadwriteObjectAPI{ + mockHeadObjectAPI( + func(ctx context.Context, params *s3.HeadObjectInput, optFns ...func(*s3.Options)) (*s3.HeadObjectOutput, error) { + t.Helper() + return &s3.HeadObjectOutput{}, fmt.Errorf("server error") + }, + ), + mockGetObjectAPI(nil), + mockPutObjectAPI(nil), + } + ddbLookups := make(mockDDBClientGetItemCollection, 0) + ddbLookups = append(ddbLookups, mockDDBClientGetItemReturnValue{ + GrantId: string(testOpportunity.OpportunityID), + ItemLastModified: string(testOpportunity.LastUpdatedDate), + GetItemErr: errors.New("Some issue with DynamoDB"), + }) + err := processRecord(context.TODO(), c, ddbLookups.NewGetItemClient(t), testOpportunity) + assert.ErrorContains(t, err, "Error determining last modified time for remote record") + }) + t.Run("Error uploading to S3", func(t *testing.T) { setupLambdaEnvForTesting(t) s3Client := mockS3ReadwriteObjectAPI{ @@ -613,9 +647,10 @@ func TestProcessRecord(t *testing.T) { }), } fmt.Printf("%T", s3Client) - err := processRecord(context.TODO(), s3Client, newMockDDBClient(t, map[string]mockDDBGetItemRV{ - string(testOpportunity.OpportunityID): {lastModified: string(testOpportunity.LastUpdatedDate), err: nil}, - }), testOpportunity) + ddb := mockDDBClientGetItemCollection([]mockDDBClientGetItemReturnValue{ + {GrantId: string(testOpportunity.OpportunityID), ItemLastModified: string(testOpportunity.LastUpdatedDate)}, + }) + err := processRecord(context.TODO(), s3Client, ddb.NewGetItemClient(t), testOpportunity) assert.ErrorContains(t, err, "Error uploading prepared grant record to S3") }) } From 6784e46cf2cafb5f3817c09c3ca2bfcc3e0b7d30 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:46:19 +0000 Subject: [PATCH 06/21] Update comments and logs --- cmd/SplitGrantsGovXMLDB/handler.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 2eb7464e..17aa83f7 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -218,11 +218,12 @@ func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetIte } } -// processRecord takes a single record and conditionally uploads an XML -// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading, -// any extant S3 object with a matching key in the bucket named by env.DestinationBucket -// is compared with the record. An upload is initiated when the record was updated -// more recently than the extant object was last modified, or when no extant object exists. +// processRecord takes a single record and conditionally uploads an XML representation +// of the grant forecast/opportunity to its configured S3 destination. +// Before uploading, the last-modified date of a matching extant DynamoDB item (if any) +// is compared with the last-modified date the record on-hand. +// An upload is initiated when the record on-hand has a last-modified date that is more recent +// than that of the extant item, or when no extant item exists. func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error { logger := record.logWith(logger) @@ -234,8 +235,7 @@ func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc Dynam log.Debug(logger, "Parsed last modified time from record last update date") key := record.s3ObjectKey() - logger = log.With(logger, "bucket", env.DestinationBucket, "key", key) - // remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) + logger = log.With(logger, "table", env.DynamoDBTableName, "bucket", env.DestinationBucket, "key", key) remoteLastModified, err := GetDynamoDBLastModified(ctx, ddbsvc, env.DynamoDBTableName, record.dynamoDBItemKey()) if err != nil { return log.Errorf(logger, "Error determining last modified time for remote record", err) From e171e117fb9acd0ec84f16d97660b620cbe1973b Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:57:44 +0000 Subject: [PATCH 07/21] Grant Lambda permission to get DynamoDB items --- terraform/main.tf | 5 +++-- terraform/modules/SplitGrantsGovXMLDB/main.tf | 19 +++++++++---------- .../modules/SplitGrantsGovXMLDB/variables.tf | 5 +++++ 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/terraform/main.tf b/terraform/main.tf index f1f2f001..1b25f21a 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -516,8 +516,9 @@ module "SplitGrantsGovXMLDB" { additional_lambda_execution_policy_documents = local.lambda_execution_policies lambda_layer_arns = local.lambda_layer_arns - grants_source_data_bucket_name = module.grants_source_data_bucket.bucket_id - grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id + grants_source_data_bucket_name = module.grants_source_data_bucket.bucket_id + grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id + grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name } module "ReceiveFFISEmail" { diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 371acc3a..209070d5 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -27,6 +27,10 @@ data "aws_s3_bucket" "prepared_data" { bucket = var.grants_prepared_data_bucket_name } +data "aws_dynamodb_table" "prepared_data" { + name = var.grants_prepared_data_table_name +} + module "lambda_execution_policy" { source = "cloudposse/iam-policy/aws" version = "1.0.1" @@ -41,16 +45,10 @@ module "lambda_execution_policy" { "${data.aws_s3_bucket.source_data.arn}/sources/*/*/*/grants.gov/extract.xml" ] } - AllowInspectS3PreparedData = { - effect = "Allow" - actions = [ - "s3:GetObject", - "s3:ListBucket" - ] - resources = [ - data.aws_s3_bucket.prepared_data.arn, - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" - ] + AllowReadDynamoDBPreparedData = { + effect = "Allow" + actions = ["dynamodb:GetItem"] + resources = [data.aws_dynamodb_table.prepared_data.arn] } AllowS3UploadPreparedData = { effect = "Allow" @@ -104,6 +102,7 @@ module "lambda_function" { DD_TAGS = join(",", sort([for k, v in local.dd_tags : "${k}:${v}"])) DOWNLOAD_CHUNK_LIMIT = "20" GRANTS_PREPARED_DATA_BUCKET_NAME = data.aws_s3_bucket.prepared_data.id + GRANTS_PREPARED_DATA_TABLE_NAME = data.aws_dynamodb_table.prepared_data.id LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index eba28d47..ecc95105 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -87,6 +87,11 @@ variable "grants_prepared_data_bucket_name" { type = string } +variable "grants_prepared_data_table_name" { + description = "Name of the DynamoDB table used to provide grants prepared data modification timestamps." + type = string +} + variable "is_forecasted_grants_enabled" { description = "Flag to control whether forecasted grants should be processed and stored in S3." type = bool From 0785c03974a58c9e29a70aab518e0ddf04877e53 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 20:58:16 +0000 Subject: [PATCH 08/21] Grant lambda permission to use new S3 key naming convention --- terraform/modules/SplitGrantsGovXMLDB/main.tf | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 209070d5..24117925 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -54,8 +54,12 @@ module "lambda_execution_policy" { effect = "Allow" actions = ["s3:PutObject"] resources = [ - # Path: //grants.gov/v2.xml + # Path: //grants.gov/v2.xml (deprecated) "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml" + # Path: /grants.gov/v2.OpportunityForecastDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunityForecastDetail_1_0.xml" ] } } From 95cc6e0bdb16fdda95f27189874860c74ab0dd14 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 21:02:57 +0000 Subject: [PATCH 09/21] Fix syntax error --- terraform/modules/SplitGrantsGovXMLDB/main.tf | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 24117925..9af6634c 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -55,11 +55,11 @@ module "lambda_execution_policy" { actions = ["s3:PutObject"] resources = [ # Path: //grants.gov/v2.xml (deprecated) - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml", # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml" + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", # Path: /grants.gov/v2.OpportunityForecastDetail_1_0.xml - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunityForecastDetail_1_0.xml" + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunityForecastDetail_1_0.xml", ] } } From f0a9082d00e75e4c397172727253b6d0d3a04328 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 22:15:38 +0000 Subject: [PATCH 10/21] Resolve duplicate import --- cmd/SplitGrantsGovXMLDB/handler_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index 9c07859f..5b8b897a 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -22,7 +22,6 @@ import ( "github.com/aws/aws-sdk-go-v2/credentials" "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" "github.com/aws/aws-sdk-go-v2/service/dynamodb" - "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/s3" smithyhttp "github.com/aws/smithy-go/transport/http" @@ -59,7 +58,7 @@ func (m mockDDBClientGetItemCollection) NewGetItemClient(t *testing.T) mockDynam if exists { for _, rv := range m { if rv.GrantId == targetGrantId { - output.Item = map[string]types.AttributeValue{ + output.Item = map[string]ddbtypes.AttributeValue{ "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.ItemLastModified}, } rvErr = rv.GetItemErr From 90fc5bb52e801e8518bc199966654b93107fb4fc Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 22:24:32 +0000 Subject: [PATCH 11/21] Fix typo in var name --- terraform/modules/SplitGrantsGovXMLDB/main.tf | 2 +- terraform/modules/SplitGrantsGovXMLDB/variables.tf | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 9af6634c..b8888626 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -28,7 +28,7 @@ data "aws_s3_bucket" "prepared_data" { } data "aws_dynamodb_table" "prepared_data" { - name = var.grants_prepared_data_table_name + name = var.grants_prepared_dynamodb_table_name } module "lambda_execution_policy" { diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index ecc95105..96daecb0 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -87,7 +87,7 @@ variable "grants_prepared_data_bucket_name" { type = string } -variable "grants_prepared_data_table_name" { +variable "grants_prepared_dynamodb_table_name" { description = "Name of the DynamoDB table used to provide grants prepared data modification timestamps." type = string } From b19d5a87e952dc9a8ee0d5ffc4a1080e6566ecca Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Mon, 26 Aug 2024 23:06:10 +0000 Subject: [PATCH 12/21] Pass DynamoDB ARN to module instead of using data source --- terraform/main.tf | 1 + terraform/modules/SplitGrantsGovXMLDB/main.tf | 8 ++------ terraform/modules/SplitGrantsGovXMLDB/variables.tf | 5 +++++ 3 files changed, 8 insertions(+), 6 deletions(-) diff --git a/terraform/main.tf b/terraform/main.tf index 1b25f21a..ac90aeda 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -519,6 +519,7 @@ module "SplitGrantsGovXMLDB" { grants_source_data_bucket_name = module.grants_source_data_bucket.bucket_id grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name + grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn } module "ReceiveFFISEmail" { diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index b8888626..c3349b5d 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -27,10 +27,6 @@ data "aws_s3_bucket" "prepared_data" { bucket = var.grants_prepared_data_bucket_name } -data "aws_dynamodb_table" "prepared_data" { - name = var.grants_prepared_dynamodb_table_name -} - module "lambda_execution_policy" { source = "cloudposse/iam-policy/aws" version = "1.0.1" @@ -48,7 +44,7 @@ module "lambda_execution_policy" { AllowReadDynamoDBPreparedData = { effect = "Allow" actions = ["dynamodb:GetItem"] - resources = [data.aws_dynamodb_table.prepared_data.arn] + resources = [var.grants_prepared_dynamodb_table_arn] } AllowS3UploadPreparedData = { effect = "Allow" @@ -106,7 +102,7 @@ module "lambda_function" { DD_TAGS = join(",", sort([for k, v in local.dd_tags : "${k}:${v}"])) DOWNLOAD_CHUNK_LIMIT = "20" GRANTS_PREPARED_DATA_BUCKET_NAME = data.aws_s3_bucket.prepared_data.id - GRANTS_PREPARED_DATA_TABLE_NAME = data.aws_dynamodb_table.prepared_data.id + GRANTS_PREPARED_DATA_TABLE_NAME = var.grants_prepared_dynamodb_table_name LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index 96daecb0..fabad68d 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -92,6 +92,11 @@ variable "grants_prepared_dynamodb_table_name" { type = string } +variable "grants_prepared_dynamodb_table_arn" { + description = "ARN of the DynamoDB table used to provide grants prepared data modification timestamps." + type = string +} + variable "is_forecasted_grants_enabled" { description = "Flag to control whether forecasted grants should be processed and stored in S3." type = bool From 4278a24cc83c889ceada396baa4c2fe260ca1af6 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 21:36:36 +0000 Subject: [PATCH 13/21] Allow PersistGrantsGovXMLDB to read from new XML object names --- terraform/modules/PersistGrantsGovXMLDB/main.tf | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/terraform/modules/PersistGrantsGovXMLDB/main.tf b/terraform/modules/PersistGrantsGovXMLDB/main.tf index 1c3647bb..cb5aaf36 100644 --- a/terraform/modules/PersistGrantsGovXMLDB/main.tf +++ b/terraform/modules/PersistGrantsGovXMLDB/main.tf @@ -37,15 +37,17 @@ module "lambda_execution_policy" { ] resources = [ data.aws_s3_bucket.prepared_data.arn, - # Path: //grants.gov/v2.xml - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + # Path: //grants.gov/v2.xml (deprecated) + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml", + # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", ] } AllowDynamoDBPreparedData = { effect = "Allow" actions = [ "dynamodb:ListTables", - "dynamodb:UpdateItem" + "dynamodb:UpdateItem", ] resources = [var.grants_prepared_dynamodb_table_arn] } From 63b59c13bd8a8bbf44989ef4a5622b2acae7ddbf Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 21:36:55 +0000 Subject: [PATCH 14/21] Add missing dynamodb permission to SplitGrantsGovXMLDB --- terraform/modules/SplitGrantsGovXMLDB/main.tf | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index c3349b5d..dc62cf2d 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -42,8 +42,11 @@ module "lambda_execution_policy" { ] } AllowReadDynamoDBPreparedData = { - effect = "Allow" - actions = ["dynamodb:GetItem"] + effect = "Allow" + actions = [ + "dynamodb:GetItem", + "dynamodb:ListTables", + ] resources = [var.grants_prepared_dynamodb_table_arn] } AllowS3UploadPreparedData = { From 439f2d3b405d6d3a77d001d5ba3712a0d267c553 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:15:50 +0000 Subject: [PATCH 15/21] Add ability to limit SplitGrantsGovXMLDB processing in local dev --- cmd/SplitGrantsGovXMLDB/handler.go | 10 ++++++++++ cmd/SplitGrantsGovXMLDB/main.go | 1 + terraform/main.tf | 1 + terraform/modules/SplitGrantsGovXMLDB/main.tf | 1 + terraform/modules/SplitGrantsGovXMLDB/variables.tf | 6 ++++++ terraform/variables.tf | 6 ++++++ 6 files changed, 25 insertions(+) diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 17aa83f7..d5042fbd 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -124,6 +124,9 @@ func handleS3Event(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItem func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error { span, ctx := tracer.StartSpanFromContext(ctx, "read.xml") + // Count records sent to ch + countSentRecords := 0 + d := xml.NewDecoder(r) for { // Check for context cancelation before/between reads @@ -133,6 +136,11 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error return err } + // End early if we have reached any configured limit on the number of records sent to ch + if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords { + break + } + token, err := d.Token() if err != nil { if err == io.EOF { @@ -150,11 +158,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { var o opportunity if err = d.DecodeElement(&o, &se); err == nil { + countSentRecords++ ch <- &o } } else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled { var f forecast if err = d.DecodeElement(&f, &se); err == nil { + countSentRecords++ ch <- &f } } diff --git a/cmd/SplitGrantsGovXMLDB/main.go b/cmd/SplitGrantsGovXMLDB/main.go index 8eec6291..50979f6d 100644 --- a/cmd/SplitGrantsGovXMLDB/main.go +++ b/cmd/SplitGrantsGovXMLDB/main.go @@ -34,6 +34,7 @@ type Environment struct { DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"` MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` + MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` Extras goenv.EnvSet diff --git a/terraform/main.tf b/terraform/main.tf index ac90aeda..0a41c9b3 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -520,6 +520,7 @@ module "SplitGrantsGovXMLDB" { grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn + max_split_records = var.max_split_grantsgov_records } module "ReceiveFFISEmail" { diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index dc62cf2d..12c3a8a5 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -108,6 +108,7 @@ module "lambda_function" { GRANTS_PREPARED_DATA_TABLE_NAME = var.grants_prepared_dynamodb_table_name LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" + MAX_SPLIT_RECORDS = tostring(var.max_split_records) IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled }) diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index fabad68d..de28c068 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -102,3 +102,9 @@ variable "is_forecasted_grants_enabled" { type = bool default = false } + +variable "max_split_records" { + description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation." + type = number + default = -1 +} diff --git a/terraform/variables.tf b/terraform/variables.tf index 468a30b7..25700fd6 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -225,3 +225,9 @@ variable "ses_active_receipt_rule_set_enabled" { type = bool default = true } + +variable "max_split_grantsgov_records" { + description = "Optional limit (i.e. for testing) on the number of records that SplitGrantsGovXMLDB handler will process during a single invocation." + type = number + default = -1 +} From f303a4ed4bd6ba73ae2b71c3052b9ce6cb07ada3 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:27:35 +0000 Subject: [PATCH 16/21] Set var.max_split_grantsgov_records to 10 in local deployments --- terraform/local.tfvars | 1 + 1 file changed, 1 insertion(+) diff --git a/terraform/local.tfvars b/terraform/local.tfvars index 9dd76ddd..b43cbc3c 100644 --- a/terraform/local.tfvars +++ b/terraform/local.tfvars @@ -12,6 +12,7 @@ eventbridge_scheduler_enabled = false ssm_deployment_parameters_path_prefix = "/grants-ingest/local" dynamodb_contributor_insights_enabled = false ffis_ingest_email_address = "ffis-ingest@localhost.grants.usdr.dev" +max_split_grantsgov_records = 10 ses_active_receipt_rule_set_enabled = false additional_lambda_environment_variables = { From 5ce86c024d7a23e104ccbc7c7875a341e60fe197 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:59:13 +0000 Subject: [PATCH 17/21] Test fix for staticcheck --- .github/workflows/ci.yml | 3 ++- .github/workflows/qa.yml | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index dfdc9fca..7f99181f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,7 +1,8 @@ name: Continuous Integration on: - pull_request_target: {} + # pull_request_target: {} + pull_request: {} # Testing permissions: contents: read diff --git a/.github/workflows/qa.yml b/.github/workflows/qa.yml index 738d6758..eef4b446 100644 --- a/.github/workflows/qa.yml +++ b/.github/workflows/qa.yml @@ -149,6 +149,7 @@ jobs: uses: dominikh/staticcheck-action@fe1dd0c3658873b46f8c9bb3291096a617310ca6 # v1.3.1 with: install-go: false + version: 2023.1.6 go-build: name: Ensure Go Builds From b6395ca011f2be465dff3b9ca9488dfe20389b0c Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:04:17 +0000 Subject: [PATCH 18/21] Revert 5ce86c0 --- .github/workflows/ci.yml | 3 +-- .github/workflows/qa.yml | 1 - 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7f99181f..dfdc9fca 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,8 +1,7 @@ name: Continuous Integration on: - # pull_request_target: {} - pull_request: {} # Testing + pull_request_target: {} permissions: contents: read diff --git a/.github/workflows/qa.yml b/.github/workflows/qa.yml index eef4b446..738d6758 100644 --- a/.github/workflows/qa.yml +++ b/.github/workflows/qa.yml @@ -149,7 +149,6 @@ jobs: uses: dominikh/staticcheck-action@fe1dd0c3658873b46f8c9bb3291096a617310ca6 # v1.3.1 with: install-go: false - version: 2023.1.6 go-build: name: Ensure Go Builds From a416816640d044512814473d5324e2316308a51a Mon Sep 17 00:00:00 2001 From: tyler Date: Wed, 4 Sep 2024 15:13:10 +0000 Subject: [PATCH 19/21] Add docstrings and remove type alias --- cmd/SplitGrantsGovXMLDB/dynamodb.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/SplitGrantsGovXMLDB/dynamodb.go b/cmd/SplitGrantsGovXMLDB/dynamodb.go index 8d91be14..9bf7c80a 100644 --- a/cmd/SplitGrantsGovXMLDB/dynamodb.go +++ b/cmd/SplitGrantsGovXMLDB/dynamodb.go @@ -11,13 +11,17 @@ import ( grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) -type DynamoDBItemKey map[string]ddbtypes.AttributeValue - +// DynamoDBGetItemAPI is the interface for retrieving a single item from a DynamoDB table via primary key lookup type DynamoDBGetItemAPI interface { GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) } -func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key DynamoDBItemKey) (*time.Time, error) { +// GetDynamoDBLastModified gets the "Last Modified" timestamp for a grantRecord stored in a DynamoDB table. +// If the item exists, a pointer to the last modification time is returned along with a nil error. +// If the specified item does not exist, the returned *time.Time and error are both nil. +// If an error is encountered when calling the HeadObject S3 API method, this will return a nil +// *time.Time value along with the encountered error. +func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key map[string]ddbtypes.AttributeValue) (*time.Time, error) { resp, err := c.GetItem(ctx, &dynamodb.GetItemInput{ TableName: &table, Key: key, From b76e9f7ba128840a060e26cd6036ccc6f0997c4d Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Wed, 25 Sep 2024 18:32:54 +0000 Subject: [PATCH 20/21] Fix missing key in WARN log --- cmd/PublishGrantEvents/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/PublishGrantEvents/handler.go b/cmd/PublishGrantEvents/handler.go index 1513575d..bae30aa3 100644 --- a/cmd/PublishGrantEvents/handler.go +++ b/cmd/PublishGrantEvents/handler.go @@ -112,7 +112,7 @@ func buildGrantModificationEventJSON(record events.DynamoDBEventRecord) ([]byte, } if err := prevVersion.Validate(); err != nil { sendMetric("grant_data.invalid", 1, metricTag) - log.Warn(logger, "grant data from ItemMapper is invalid", err) + log.Warn(logger, "grant data from ItemMapper is invalid", "error", err) } } From ceb6f05abbcf9e5e1983fa1f6297540e354a3239 Mon Sep 17 00:00:00 2001 From: tyler <1851017+TylerHendrickson@users.noreply.github.com> Date: Wed, 25 Sep 2024 20:43:23 +0000 Subject: [PATCH 21/21] Remove permissions for deprecated S3 keys --- terraform/modules/SplitGrantsGovXMLDB/main.tf | 2 -- 1 file changed, 2 deletions(-) diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 12c3a8a5..0765c2b6 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -53,8 +53,6 @@ module "lambda_execution_policy" { effect = "Allow" actions = ["s3:PutObject"] resources = [ - # Path: //grants.gov/v2.xml (deprecated) - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml", # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", # Path: /grants.gov/v2.OpportunityForecastDetail_1_0.xml