diff --git a/cmd/PublishGrantEvents/handler.go b/cmd/PublishGrantEvents/handler.go index 1513575d..bae30aa3 100644 --- a/cmd/PublishGrantEvents/handler.go +++ b/cmd/PublishGrantEvents/handler.go @@ -112,7 +112,7 @@ func buildGrantModificationEventJSON(record events.DynamoDBEventRecord) ([]byte, } if err := prevVersion.Validate(); err != nil { sendMetric("grant_data.invalid", 1, metricTag) - log.Warn(logger, "grant data from ItemMapper is invalid", err) + log.Warn(logger, "grant data from ItemMapper is invalid", "error", err) } } diff --git a/cmd/SplitGrantsGovXMLDB/dynamodb.go b/cmd/SplitGrantsGovXMLDB/dynamodb.go new file mode 100644 index 00000000..9bf7c80a --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/dynamodb.go @@ -0,0 +1,43 @@ +package main + +import ( + "context" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" +) + +// DynamoDBGetItemAPI is the interface for retrieving a single item from a DynamoDB table via primary key lookup +type DynamoDBGetItemAPI interface { + GetItem(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) +} + +// GetDynamoDBLastModified gets the "Last Modified" timestamp for a grantRecord stored in a DynamoDB table. +// If the item exists, a pointer to the last modification time is returned along with a nil error. +// If the specified item does not exist, the returned *time.Time and error are both nil. +// If an error is encountered when calling the HeadObject S3 API method, this will return a nil +// *time.Time value along with the encountered error. +func GetDynamoDBLastModified(ctx context.Context, c DynamoDBGetItemAPI, table string, key map[string]ddbtypes.AttributeValue) (*time.Time, error) { + resp, err := c.GetItem(ctx, &dynamodb.GetItemInput{ + TableName: &table, + Key: key, + ProjectionExpression: aws.String("LastUpdatedDate"), + }) + if err != nil { + return nil, err + } + if resp.Item == nil { + return nil, nil + } + + item := struct{ LastUpdatedDate grantsgov.MMDDYYYYType }{} + if err := attributevalue.UnmarshalMap(resp.Item, &item); err != nil { + return nil, err + } + lastUpdatedDate, err := item.LastUpdatedDate.Time() + return &lastUpdatedDate, err +} diff --git a/cmd/SplitGrantsGovXMLDB/dynamodb_test.go b/cmd/SplitGrantsGovXMLDB/dynamodb_test.go new file mode 100644 index 00000000..e138a644 --- /dev/null +++ b/cmd/SplitGrantsGovXMLDB/dynamodb_test.go @@ -0,0 +1,100 @@ +package main + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" +) + +type mockDynamoDBGetItemClient func(context.Context, *dynamodb.GetItemInput, ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) + +func (m mockDynamoDBGetItemClient) GetItem(ctx context.Context, params *dynamodb.GetItemInput, optFns ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + return m(ctx, params, optFns...) +} + +func makeTestItem(t *testing.T, lastUpdatedDateTestValue any) map[string]ddbtypes.AttributeValue { + t.Helper() + rv, err := attributevalue.MarshalMap(map[string]any{"LastUpdatedDate": lastUpdatedDateTestValue}) + require.NoError(t, err, "Unexpected error creating test DynamoDB item fixture during setup") + return rv +} + +func TestGetDynamoDBLastModified(t *testing.T) { + testTableName := "test-table" + testItemKey := map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: "test-key"}, + } + testLastUpdateDateString := time.Now().Format(grantsgov.TimeLayoutMMDDYYYYType) + testLastUpdateDate, err := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testLastUpdateDateString) + require.NoError(t, err, "Unexpected error parsing time fixture during test setup") + testInvalidDateString := "not a valid date string" + + _, testInvalidDateStringParseError := time.Parse(grantsgov.TimeLayoutMMDDYYYYType, testInvalidDateString) + require.Error(t, testInvalidDateStringParseError, "Error fixture unexpectedly nil during test setup") + + for _, tt := range []struct { + name string + ddbItem map[string]ddbtypes.AttributeValue + ddbErr error + expLastModified *time.Time + expErr error + }{ + { + "GetItem produces item with valid LastUpdatedDate", + makeTestItem(t, testLastUpdateDateString), + nil, + &testLastUpdateDate, + nil, + }, + { + "GetItem returns error", + nil, + errors.New("GetItem action failed"), + nil, + errors.New("GetItem action failed"), + }, + {"GetItem key not found", nil, nil, nil, nil}, + { + "GetItem produces item with invalid LastUpdateDate", + makeTestItem(t, testInvalidDateString), + nil, + nil, + testInvalidDateStringParseError, + }, + { + "GetItem produces item that cannot be unmarshalled", + makeTestItem(t, true), + nil, + nil, + errors.New("unmarshal failed, cannot unmarshal bool into Go value type grantsgov.MMDDYYYYType"), + }, + } { + + t.Run(tt.name, func(t *testing.T) { + mockClient := mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, f ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + require.Equal(t, &testTableName, params.TableName, "Unexpected table name in GetItem params") + require.Equal(t, testItemKey, params.Key, "Unexpected item key in GetItem params") + + return &dynamodb.GetItemOutput{Item: tt.ddbItem}, tt.ddbErr + }) + + lastModified, err := GetDynamoDBLastModified(context.TODO(), + mockClient, testTableName, testItemKey) + + if tt.expErr != nil { + assert.EqualError(t, err, tt.expErr.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expLastModified, lastModified) + } + }) + } +} diff --git a/cmd/SplitGrantsGovXMLDB/handler.go b/cmd/SplitGrantsGovXMLDB/handler.go index 24b152ab..d5042fbd 100644 --- a/cmd/SplitGrantsGovXMLDB/handler.go +++ b/cmd/SplitGrantsGovXMLDB/handler.go @@ -33,12 +33,7 @@ const ( // a partial or complete invocation failure. // Returns nil when all grant records are successfully processed from all source records, // indicating complete success. -func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events.S3Event) error { - // Configure service clients - s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) { - o.UsePathStyle = env.UsePathStyleS3Opt - }) - +func handleS3Event(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, s3Event events.S3Event) error { // Create a records channel to direct opportunity/forecast values parsed from the source // record to individual S3 object uploads records := make(chan grantRecord) @@ -48,7 +43,7 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events wg := multierror.Group{} for i := 0; i < env.MaxConcurrentUploads; i++ { wg.Go(func() error { - return processRecords(processingCtx, s3svc, records) + return processRecords(processingCtx, s3svc, ddbsvc, records) }) } @@ -129,6 +124,9 @@ func handleS3EventWithConfig(cfg aws.Config, ctx context.Context, s3Event events func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error { span, ctx := tracer.StartSpanFromContext(ctx, "read.xml") + // Count records sent to ch + countSentRecords := 0 + d := xml.NewDecoder(r) for { // Check for context cancelation before/between reads @@ -138,6 +136,11 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error return err } + // End early if we have reached any configured limit on the number of records sent to ch + if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords { + break + } + token, err := d.Token() if err != nil { if err == io.EOF { @@ -155,11 +158,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME { var o opportunity if err = d.DecodeElement(&o, &se); err == nil { + countSentRecords++ ch <- &o } } else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled { var f forecast if err = d.DecodeElement(&f, &se); err == nil { + countSentRecords++ ch <- &f } } @@ -181,7 +186,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error // It returns a multi-error containing any errors encountered while processing a received // grantRecord as well as the reason for the context cancelation, if any. // Returns nil if all records were processed successfully until the channel was closed. -func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) (errs error) { +func processRecords(ctx context.Context, s3svc *s3.Client, ddbsvc DynamoDBGetItemAPI, ch <-chan grantRecord) (errs error) { span, ctx := tracer.StartSpanFromContext(ctx, "processing.worker") whenCanceled := func() error { @@ -209,7 +214,7 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) } workSpan, ctx := tracer.StartSpanFromContext(ctx, "processing.worker.work") - err := processRecord(ctx, svc, record) + err := processRecord(ctx, s3svc, ddbsvc, record) if err != nil { sendMetric("record.failed", 1) errs = multierror.Append(errs, err) @@ -223,12 +228,13 @@ func processRecords(ctx context.Context, svc *s3.Client, ch <-chan grantRecord) } } -// processRecord takes a single record and conditionally uploads an XML -// representation of the grant forecast/opportunity to its configured S3 destination. Before uploading, -// any extant S3 object with a matching key in the bucket named by env.DestinationBucket -// is compared with the record. An upload is initiated when the record was updated -// more recently than the extant object was last modified, or when no extant object exists. -func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRecord) error { +// processRecord takes a single record and conditionally uploads an XML representation +// of the grant forecast/opportunity to its configured S3 destination. +// Before uploading, the last-modified date of a matching extant DynamoDB item (if any) +// is compared with the last-modified date the record on-hand. +// An upload is initiated when the record on-hand has a last-modified date that is more recent +// than that of the extant item, or when no extant item exists. +func processRecord(ctx context.Context, s3svc S3ReadWriteObjectAPI, ddbsvc DynamoDBGetItemAPI, record grantRecord) error { logger := record.logWith(logger) lastModified, err := record.lastModified() @@ -239,8 +245,8 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe log.Debug(logger, "Parsed last modified time from record last update date") key := record.s3ObjectKey() - logger = log.With(logger, "bucket", env.DestinationBucket, "key", key) - remoteLastModified, err := GetS3LastModified(ctx, svc, env.DestinationBucket, key) + logger = log.With(logger, "table", env.DynamoDBTableName, "bucket", env.DestinationBucket, "key", key) + remoteLastModified, err := GetDynamoDBLastModified(ctx, ddbsvc, env.DynamoDBTableName, record.dynamoDBItemKey()) if err != nil { return log.Errorf(logger, "Error determining last modified time for remote record", err) } @@ -264,7 +270,7 @@ func processRecord(ctx context.Context, svc S3ReadWriteObjectAPI, record grantRe return log.Errorf(logger, "Error marshaling XML for record", err) } - if err := UploadS3Object(ctx, svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { + if err := UploadS3Object(ctx, s3svc, env.DestinationBucket, key, bytes.NewReader(b)); err != nil { return log.Errorf(logger, "Error uploading prepared grant record to S3", err) } diff --git a/cmd/SplitGrantsGovXMLDB/handler_test.go b/cmd/SplitGrantsGovXMLDB/handler_test.go index 6e079842..5b8b897a 100644 --- a/cmd/SplitGrantsGovXMLDB/handler_test.go +++ b/cmd/SplitGrantsGovXMLDB/handler_test.go @@ -5,6 +5,7 @@ import ( "context" "crypto/tls" "encoding/xml" + "errors" "fmt" "io" "net/http" @@ -19,6 +20,9 @@ import ( awsTransport "github.com/aws/aws-sdk-go-v2/aws/transport/http" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/aws/aws-sdk-go-v2/service/s3" smithyhttp "github.com/aws/smithy-go/transport/http" "github.com/go-kit/log" @@ -30,6 +34,42 @@ import ( grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) +type mockDDBClientGetItemReturnValue struct { + GrantId string + ItemLastModified string + GetItemErr error +} + +// mockDDBClientGetItemCollection is a slice of values that are used to look up return values +// when a mock GetItem call is made. +type mockDDBClientGetItemCollection []mockDDBClientGetItemReturnValue + +// NewGetItemClient returns an implementation of the DynamoDBGetItemAPI that looks up return values from itself at call-time +func (m mockDDBClientGetItemCollection) NewGetItemClient(t *testing.T) mockDynamoDBGetItemClient { + t.Helper() + + return mockDynamoDBGetItemClient(func(ctx context.Context, params *dynamodb.GetItemInput, _ ...func(*dynamodb.Options)) (*dynamodb.GetItemOutput, error) { + getItemKey := map[string]string{} + err := attributevalue.UnmarshalMap(params.Key, &getItemKey) + require.NoError(t, err, "Failed to extract grant_id value from DynamoDB GetItem key") + output := dynamodb.GetItemOutput{Item: nil} + targetGrantId, exists := getItemKey["grant_id"] + var rvErr error + if exists { + for _, rv := range m { + if rv.GrantId == targetGrantId { + output.Item = map[string]ddbtypes.AttributeValue{ + "LastUpdatedDate": &ddbtypes.AttributeValueMemberS{Value: rv.ItemLastModified}, + } + rvErr = rv.GetItemErr + break + } + } + } + return &output, rvErr + }) +} + func setupLambdaEnvForTesting(t *testing.T) { t.Helper() @@ -39,6 +79,7 @@ func setupLambdaEnvForTesting(t *testing.T) { // Configure environment variables goenv.Unmarshal(goenv.EnvSet{ "GRANTS_PREPARED_DATA_BUCKET_NAME": "test-destination-bucket", + "GRANTS_PREPARED_DATA_TABLE_NAME": "test-dynamodb-table", "S3_USE_PATH_STYLE": "true", "DOWNLOAD_CHUNK_LIMIT": "10", }, &env) @@ -172,7 +213,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { setupLambdaEnvForTesting(t) sourceBucketName := "test-source-bucket" now := time.Now() - s3client, cfg, err := setupS3ForTesting(t, sourceBucketName) + s3client, _, err := setupS3ForTesting(t, sourceBucketName) assert.NoError(t, err, "Error configuring test environment") seenOpportunityIDs := make(map[string]struct{}) @@ -348,6 +389,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { // (will also place extant records in S3 if specified in the test case) var sourceGrantsData bytes.Buffer sourceOpportunitiesData := make(map[string][]byte) + ddbGetItemReturnValues := make(mockDDBClientGetItemCollection, 0) _, err := sourceGrantsData.WriteString("") require.NoError(t, err) for _, values := range tt.grantValues { @@ -368,6 +410,10 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceOpportunityData.Bytes()), }) require.NoError(t, err) + extantLastModified := time.Now().Format("01022006") + ddbGetItemReturnValues = append(ddbGetItemReturnValues, mockDDBClientGetItemReturnValue{ + values.OpportunityID, extantLastModified, nil, + }) } _, err = sourceGrantsData.Write(sourceOpportunityData.Bytes()) require.NoError(t, err) @@ -388,14 +434,18 @@ func TestLambdaInvocationScenarios(t *testing.T) { require.NoErrorf(t, err, "Error creating test source object %s", objectKey) // Invoke the handler under test with a constructed S3 event - invocationErr := handleS3EventWithConfig(cfg, context.TODO(), events.S3Event{ - Records: []events.S3EventRecord{{ - S3: events.S3Entity{ - Bucket: events.S3Bucket{Name: sourceBucketName}, - Object: events.S3Object{Key: objectKey}, - }, - }}, - }) + invocationErr := handleS3Event(context.TODO(), + s3client, + ddbGetItemReturnValues.NewGetItemClient(t), + events.S3Event{ + Records: []events.S3EventRecord{{ + S3: events.S3Entity{ + Bucket: events.S3Bucket{Name: sourceBucketName}, + Object: events.S3Object{Key: objectKey}, + }, + }}, + }, + ) // Determine the list of expected grant objects to have been saved by the handler sourceContainsInvalidOpportunities := false @@ -425,8 +475,9 @@ func TestLambdaInvocationScenarios(t *testing.T) { }) if v.isSkipped || (!v.isValid && !v.isExtant) { - // If there was no extant file and the new grant is invalid, or if we were meant to skip - // this grant, there should be no S3 file + // If there was no extant file and the new grant is invalid, + // or if we were meant to skip this grant, + // then there should be no S3 file assert.Error(t, err) } else { // Otherwise, we verify the S3 file matches the source from the test case @@ -448,7 +499,7 @@ func TestLambdaInvocationScenarios(t *testing.T) { setupLambdaEnvForTesting(t) sourceBucketName := "test-source-bucket" - s3client, cfg, err := setupS3ForTesting(t, sourceBucketName) + s3client, _, err := setupS3ForTesting(t, sourceBucketName) require.NoError(t, err) sourceTemplate := template.Must( template.New("xml").Delims("{{", "}}").Parse(SOURCE_OPPORTUNITY_TEMPLATE), @@ -468,7 +519,8 @@ func TestLambdaInvocationScenarios(t *testing.T) { Body: bytes.NewReader(sourceData.Bytes()), }) require.NoError(t, err) - err = handleS3EventWithConfig(cfg, context.TODO(), events.S3Event{ + // err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{ + err = handleS3Event(context.TODO(), s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: sourceBucketName}, @@ -497,12 +549,12 @@ func TestLambdaInvocationScenarios(t *testing.T) { t.Run("Context canceled during invocation", func(t *testing.T) { setupLambdaEnvForTesting(t) - _, cfg, err := setupS3ForTesting(t, "source-bucket") + _, _, err := setupS3ForTesting(t, "source-bucket") require.NoError(t, err) ctx, cancel := context.WithCancel(context.Background()) cancel() - err = handleS3EventWithConfig(cfg, ctx, events.S3Event{ + err = handleS3Event(ctx, s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{ Records: []events.S3EventRecord{ {S3: events.S3Entity{ Bucket: events.S3Bucket{Name: "source-bucket"}, @@ -548,7 +600,7 @@ func TestProcessRecord(t *testing.T) { LastUpdatedDate: grantsgov.MMDDYYYYType(now.Format(grantsgov.TimeLayoutMMDDYYYYType)), } - t.Run("Destination bucket is incorrectly configured", func(t *testing.T) { + t.Run("Error getting item from DynamoDB", func(t *testing.T) { setupLambdaEnvForTesting(t) c := mockS3ReadwriteObjectAPI{ mockHeadObjectAPI( @@ -560,7 +612,13 @@ func TestProcessRecord(t *testing.T) { mockGetObjectAPI(nil), mockPutObjectAPI(nil), } - err := processRecord(context.TODO(), c, testOpportunity) + ddbLookups := make(mockDDBClientGetItemCollection, 0) + ddbLookups = append(ddbLookups, mockDDBClientGetItemReturnValue{ + GrantId: string(testOpportunity.OpportunityID), + ItemLastModified: string(testOpportunity.LastUpdatedDate), + GetItemErr: errors.New("Some issue with DynamoDB"), + }) + err := processRecord(context.TODO(), c, ddbLookups.NewGetItemClient(t), testOpportunity) assert.ErrorContains(t, err, "Error determining last modified time for remote record") }) @@ -588,7 +646,10 @@ func TestProcessRecord(t *testing.T) { }), } fmt.Printf("%T", s3Client) - err := processRecord(context.TODO(), s3Client, testOpportunity) + ddb := mockDDBClientGetItemCollection([]mockDDBClientGetItemReturnValue{ + {GrantId: string(testOpportunity.OpportunityID), ItemLastModified: string(testOpportunity.LastUpdatedDate)}, + }) + err := processRecord(context.TODO(), s3Client, ddb.NewGetItemClient(t), testOpportunity) assert.ErrorContains(t, err, "Error uploading prepared grant record to S3") }) } diff --git a/cmd/SplitGrantsGovXMLDB/main.go b/cmd/SplitGrantsGovXMLDB/main.go index 68806b3a..50979f6d 100644 --- a/cmd/SplitGrantsGovXMLDB/main.go +++ b/cmd/SplitGrantsGovXMLDB/main.go @@ -20,6 +20,8 @@ import ( goenv "github.com/Netflix/go-env" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/usdigitalresponse/grants-ingest/internal/awsHelpers" "github.com/usdigitalresponse/grants-ingest/internal/ddHelpers" "github.com/usdigitalresponse/grants-ingest/internal/log" @@ -30,7 +32,9 @@ type Environment struct { LogLevel string `env:"LOG_LEVEL,default=INFO"` DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"` DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"` + DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"` MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"` + MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"` IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"` Extras goenv.EnvSet @@ -57,7 +61,12 @@ func main() { return fmt.Errorf("could not create AWS SDK config: %w", err) } awstrace.AppendMiddleware(&cfg) + s3svc := s3.NewFromConfig(cfg, func(o *s3.Options) { + o.UsePathStyle = env.UsePathStyleS3Opt + }) + dynamodbSvc := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) {}) + log.Debug(logger, "Starting Lambda") - return handleS3EventWithConfig(cfg, ctx, s3Event) + return handleS3Event(ctx, s3svc, dynamodbSvc, s3Event) }, nil)) } diff --git a/cmd/SplitGrantsGovXMLDB/types.go b/cmd/SplitGrantsGovXMLDB/types.go index fd99e16a..2814cbc7 100644 --- a/cmd/SplitGrantsGovXMLDB/types.go +++ b/cmd/SplitGrantsGovXMLDB/types.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + ddbtypes "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/usdigitalresponse/grants-ingest/internal/log" grantsgov "github.com/usdigitalresponse/grants-ingest/pkg/grantsSchemas/grants.gov" ) @@ -13,6 +14,7 @@ type grantRecord interface { logWith(log.Logger) log.Logger // s3ObjectKey returns a string to use as the object key when saving the opportunity to an S3 bucket s3ObjectKey() string + dynamoDBItemKey() map[string]ddbtypes.AttributeValue lastModified() (time.Time, error) toXML() ([]byte, error) } @@ -33,6 +35,12 @@ func (o opportunity) s3ObjectKey() string { ) } +func (o opportunity) dynamoDBItemKey() map[string]ddbtypes.AttributeValue { + return map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)}, + } +} + func (o opportunity) lastModified() (time.Time, error) { return o.LastUpdatedDate.Time() } @@ -64,3 +72,9 @@ func (f forecast) lastModified() (time.Time, error) { func (f forecast) toXML() ([]byte, error) { return xml.Marshal(grantsgov.OpportunityForecastDetail_1_0(f)) } + +func (o forecast) dynamoDBItemKey() map[string]ddbtypes.AttributeValue { + return map[string]ddbtypes.AttributeValue{ + "grant_id": &ddbtypes.AttributeValueMemberS{Value: string(o.OpportunityID)}, + } +} diff --git a/terraform/local.tfvars b/terraform/local.tfvars index 9dd76ddd..b43cbc3c 100644 --- a/terraform/local.tfvars +++ b/terraform/local.tfvars @@ -12,6 +12,7 @@ eventbridge_scheduler_enabled = false ssm_deployment_parameters_path_prefix = "/grants-ingest/local" dynamodb_contributor_insights_enabled = false ffis_ingest_email_address = "ffis-ingest@localhost.grants.usdr.dev" +max_split_grantsgov_records = 10 ses_active_receipt_rule_set_enabled = false additional_lambda_environment_variables = { diff --git a/terraform/main.tf b/terraform/main.tf index f1f2f001..0a41c9b3 100644 --- a/terraform/main.tf +++ b/terraform/main.tf @@ -516,8 +516,11 @@ module "SplitGrantsGovXMLDB" { additional_lambda_execution_policy_documents = local.lambda_execution_policies lambda_layer_arns = local.lambda_layer_arns - grants_source_data_bucket_name = module.grants_source_data_bucket.bucket_id - grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id + grants_source_data_bucket_name = module.grants_source_data_bucket.bucket_id + grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id + grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name + grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn + max_split_records = var.max_split_grantsgov_records } module "ReceiveFFISEmail" { diff --git a/terraform/modules/PersistGrantsGovXMLDB/main.tf b/terraform/modules/PersistGrantsGovXMLDB/main.tf index 1c3647bb..cb5aaf36 100644 --- a/terraform/modules/PersistGrantsGovXMLDB/main.tf +++ b/terraform/modules/PersistGrantsGovXMLDB/main.tf @@ -37,15 +37,17 @@ module "lambda_execution_policy" { ] resources = [ data.aws_s3_bucket.prepared_data.arn, - # Path: //grants.gov/v2.xml - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + # Path: //grants.gov/v2.xml (deprecated) + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml", + # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", ] } AllowDynamoDBPreparedData = { effect = "Allow" actions = [ "dynamodb:ListTables", - "dynamodb:UpdateItem" + "dynamodb:UpdateItem", ] resources = [var.grants_prepared_dynamodb_table_arn] } diff --git a/terraform/modules/SplitGrantsGovXMLDB/main.tf b/terraform/modules/SplitGrantsGovXMLDB/main.tf index 371acc3a..0765c2b6 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/main.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/main.tf @@ -41,23 +41,22 @@ module "lambda_execution_policy" { "${data.aws_s3_bucket.source_data.arn}/sources/*/*/*/grants.gov/extract.xml" ] } - AllowInspectS3PreparedData = { + AllowReadDynamoDBPreparedData = { effect = "Allow" actions = [ - "s3:GetObject", - "s3:ListBucket" - ] - resources = [ - data.aws_s3_bucket.prepared_data.arn, - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + "dynamodb:GetItem", + "dynamodb:ListTables", ] + resources = [var.grants_prepared_dynamodb_table_arn] } AllowS3UploadPreparedData = { effect = "Allow" actions = ["s3:PutObject"] resources = [ - # Path: //grants.gov/v2.xml - "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.xml" + # Path: /grants.gov/v2.OpportunitySynopsisDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunitySynopsisDetail_1_0.xml", + # Path: /grants.gov/v2.OpportunityForecastDetail_1_0.xml + "${data.aws_s3_bucket.prepared_data.arn}/*/*/grants.gov/v2.OpportunityForecastDetail_1_0.xml", ] } } @@ -104,8 +103,10 @@ module "lambda_function" { DD_TAGS = join(",", sort([for k, v in local.dd_tags : "${k}:${v}"])) DOWNLOAD_CHUNK_LIMIT = "20" GRANTS_PREPARED_DATA_BUCKET_NAME = data.aws_s3_bucket.prepared_data.id + GRANTS_PREPARED_DATA_TABLE_NAME = var.grants_prepared_dynamodb_table_name LOG_LEVEL = var.log_level MAX_CONCURRENT_UPLOADS = "10" + MAX_SPLIT_RECORDS = tostring(var.max_split_records) IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled }) diff --git a/terraform/modules/SplitGrantsGovXMLDB/variables.tf b/terraform/modules/SplitGrantsGovXMLDB/variables.tf index eba28d47..de28c068 100644 --- a/terraform/modules/SplitGrantsGovXMLDB/variables.tf +++ b/terraform/modules/SplitGrantsGovXMLDB/variables.tf @@ -87,8 +87,24 @@ variable "grants_prepared_data_bucket_name" { type = string } +variable "grants_prepared_dynamodb_table_name" { + description = "Name of the DynamoDB table used to provide grants prepared data modification timestamps." + type = string +} + +variable "grants_prepared_dynamodb_table_arn" { + description = "ARN of the DynamoDB table used to provide grants prepared data modification timestamps." + type = string +} + variable "is_forecasted_grants_enabled" { description = "Flag to control whether forecasted grants should be processed and stored in S3." type = bool default = false } + +variable "max_split_records" { + description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation." + type = number + default = -1 +} diff --git a/terraform/variables.tf b/terraform/variables.tf index 468a30b7..25700fd6 100644 --- a/terraform/variables.tf +++ b/terraform/variables.tf @@ -225,3 +225,9 @@ variable "ses_active_receipt_rule_set_enabled" { type = bool default = true } + +variable "max_split_grantsgov_records" { + description = "Optional limit (i.e. for testing) on the number of records that SplitGrantsGovXMLDB handler will process during a single invocation." + type = number + default = -1 +}