Skip to content

Commit

Permalink
Additional max-split configurations for SplitGrantsGovXMLDB (#934)
Browse files Browse the repository at this point in the history
* Support additional record-type-specific split limits

* Allow setting new env vars in Terraform variables

* Expose TF `var.is_forecasted_grants_enabled` at project level
  • Loading branch information
TylerHendrickson authored Oct 31, 2024
1 parent c31998d commit 36684e8
Show file tree
Hide file tree
Showing 9 changed files with 160 additions and 38 deletions.
2 changes: 1 addition & 1 deletion cli/grants-ingest/ffisImport/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (cmd *Cmd) Run(app *kong.Kong, logger *log.Logger) error {
if !cmd.DryRun {
return err
}
app.Errorf(err.Error())
app.Errorf("%s", err.Error())
}

log.Debug(*logger, "Mapping files in directory to S3 keys...", "directory", cmd.SourceDirectory)
Expand Down
27 changes: 18 additions & 9 deletions cmd/SplitGrantsGovXMLDB/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/go-kit/log/level"
"github.com/hashicorp/go-multierror"
"github.com/usdigitalresponse/grants-ingest/internal/log"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
Expand Down Expand Up @@ -125,7 +124,8 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
span, ctx := tracer.StartSpanFromContext(ctx, "read.xml")

// Count records sent to ch
countSentRecords := 0
countSentOpportunityRecords := 0
countSentForecastRecords := 0

d := xml.NewDecoder(r)
for {
Expand All @@ -136,8 +136,13 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
return err
}

// End early if we have reached any configured limit on the number of records sent to ch
if env.MaxSplitRecords > -1 && countSentRecords >= env.MaxSplitRecords {
// End early if a configured limit on the number of records sent to ch is reached
// OR if both record types have configured limits and both have been reached
if (env.MaxSplitRecords > -1 &&
countSentOpportunityRecords+countSentForecastRecords >= env.MaxSplitRecords) ||
(env.MaxSplitForecastRecords > -1 && env.MaxSplitOpportunityRecords > -1 &&
countSentForecastRecords >= env.MaxSplitForecastRecords &&
countSentOpportunityRecords >= env.MaxSplitOpportunityRecords) {
break
}

Expand All @@ -147,7 +152,7 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
// EOF means that we're done reading
break
}
level.Error(logger).Log("msg", "Error reading XML token", "error", err)
log.Error(logger, "Error reading XML token", err)
span.Finish(tracer.WithError(err))
return err
}
Expand All @@ -158,14 +163,18 @@ func readRecords(ctx context.Context, r io.Reader, ch chan<- grantRecord) error
if se.Name.Local == GRANT_OPPORTUNITY_XML_NAME {
var o opportunity
if err = d.DecodeElement(&o, &se); err == nil {
countSentRecords++
ch <- &o
if env.MaxSplitOpportunityRecords < 0 || countSentOpportunityRecords < env.MaxSplitOpportunityRecords {
ch <- &o
countSentOpportunityRecords++
}
}
} else if se.Name.Local == GRANT_FORECAST_XML_NAME && env.IsForecastedGrantsEnabled {
var f forecast
if err = d.DecodeElement(&f, &se); err == nil {
countSentRecords++
ch <- &f
if env.MaxSplitForecastRecords < 0 || countSentForecastRecords < env.MaxSplitForecastRecords {
ch <- &f
countSentForecastRecords++
}
}
}

Expand Down
76 changes: 75 additions & 1 deletion cmd/SplitGrantsGovXMLDB/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"io"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"text/template"
"time"
Expand Down Expand Up @@ -516,7 +518,6 @@ func TestLambdaInvocationScenarios(t *testing.T) {
Body: bytes.NewReader(sourceData.Bytes()),
})
require.NoError(t, err)
// err = handleS3Event(context.TODO(), s3client, newMockDDBClient(t, mockDDBGetItemRVLookup{}), events.S3Event{
err = handleS3Event(context.TODO(), s3client, make(mockDDBClientGetItemCollection, 0).NewGetItemClient(t), events.S3Event{
Records: []events.S3EventRecord{
{S3: events.S3Entity{
Expand Down Expand Up @@ -580,6 +581,7 @@ func (r *MockReader) Read(p []byte) (int, error) {
}

func TestReadRecords(t *testing.T) {
setupLambdaEnvForTesting(t)
t.Run("Context cancelled between reads", func(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
err := readRecords(ctx, &MockReader{func(p []byte) (int, error) {
Expand All @@ -588,6 +590,78 @@ func TestReadRecords(t *testing.T) {
}}, make(chan<- grantRecord, 10))
assert.ErrorIs(t, err, context.Canceled)
})

t.Run("max record limits", func(t *testing.T) {
for _, tt := range []struct {
name string
maxSplitRecords, maxSplitOpportunityRecords, maxSplitForecastRecords int
expOpportunityRecords, expForecastRecords int
}{
{
"no limits processes all records",
-1, -1, -1,
10, 10,
},
{
"opportunity limit does not limit forecasts",
-1, 2, -1,
2, 10,
},
{
"forecast limit does not limit opportunities",
-1, -1, 2,
10, 2,
},
{
"hard limit takes precedent over no type limits",
2, -1, -1,
2, 0,
},
{
"hard limit takes precedent over type limits",
2, 3, 5,
2, 0,
},
{
"mix of limits",
5, 3, -1,
3, 2,
},
} {
t.Run(tt.name, func(t *testing.T) {
env.MaxSplitRecords = tt.maxSplitRecords
env.MaxSplitOpportunityRecords = tt.maxSplitOpportunityRecords
env.MaxSplitForecastRecords = tt.maxSplitForecastRecords
env.IsForecastedGrantsEnabled = true

xmlData := "<Grants>\n" +
// Content of records doesn't matter since we're just looking at the tag
strings.Repeat("<OpportunitySynopsisDetail_1_0></OpportunitySynopsisDetail_1_0>\n", 10) +
strings.Repeat("<OpportunityForecastDetail_1_0></OpportunityForecastDetail_1_0>\n", 10) +
"</Grants>"
ch := make(chan grantRecord, 20)
require.NoError(t, readRecords(context.TODO(), strings.NewReader(xmlData), ch))
close(ch)
var countSentOpportunityRecords, countSentForecastRecords int
for rec := range ch {
switch reflect.Indirect(reflect.ValueOf(rec)).Type().Name() {
case "opportunity":
countSentOpportunityRecords++
case "forecast":
countSentForecastRecords++
default:
require.Fail(t,
"Unknown grantRecord type sent to channel during test setup",
"type %T unrecognized", rec)
}
}
assert.Equalf(t, tt.expOpportunityRecords, countSentOpportunityRecords,
"Unexpected number of opportunity records sent to channel")
assert.Equalf(t, tt.expForecastRecords, countSentForecastRecords,
"Unexpected number of forecast records sent to channel")
})
}
})
}

func TestProcessRecord(t *testing.T) {
Expand Down
20 changes: 11 additions & 9 deletions cmd/SplitGrantsGovXMLDB/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,17 @@ import (
)

type Environment struct {
LogLevel string `env:"LOG_LEVEL,default=INFO"`
DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"`
DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"`
DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"`
MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"`
MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"`
UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"`
IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"`
Extras goenv.EnvSet
LogLevel string `env:"LOG_LEVEL,default=INFO"`
DownloadChunkLimit int64 `env:"DOWNLOAD_CHUNK_LIMIT,default=10"`
DestinationBucket string `env:"GRANTS_PREPARED_DATA_BUCKET_NAME,required=true"`
DynamoDBTableName string `env:"GRANTS_PREPARED_DATA_TABLE_NAME,required=true"`
MaxConcurrentUploads int `env:"MAX_CONCURRENT_UPLOADS,default=1"`
UsePathStyleS3Opt bool `env:"S3_USE_PATH_STYLE,default=false"`
IsForecastedGrantsEnabled bool `env:"IS_FORECASTED_GRANTS_ENABLED,default=false"`
MaxSplitRecords int `env:"MAX_SPLIT_RECORDS,default=-1"` // Hard limit of records to process, regardless of type. -1 for no limit.
MaxSplitOpportunityRecords int `env:"MAX_SPLIT_OPPORTUNITY_RECORDS,default=-1"` // Limit opportunity-type records to process. -1 for no limit.
MaxSplitForecastRecords int `env:"MAX_SPLIT_FORECAST_RECORDS,default=-1"` // Limit forecast-type records to process. -1 for no limit.
Extras goenv.EnvSet
}

var (
Expand Down
34 changes: 18 additions & 16 deletions terraform/local.tfvars
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
namespace = "grants-ingest"
environment = "sandbox"
version_identifier = "dev"
permissions_boundary_policy_name = ""
datadog_enabled = false
datadog_dashboards_enabled = false
datadog_lambda_extension_version = "38"
lambda_binaries_autobuild = true
lambda_default_log_retention_in_days = 7
lambda_default_log_level = "DEBUG"
eventbridge_scheduler_enabled = false
ssm_deployment_parameters_path_prefix = "/grants-ingest/local"
dynamodb_contributor_insights_enabled = false
ffis_ingest_email_address = "[email protected]"
max_split_grantsgov_records = 10
ses_active_receipt_rule_set_enabled = false
namespace = "grants-ingest"
environment = "sandbox"
version_identifier = "dev"
permissions_boundary_policy_name = ""
datadog_enabled = false
datadog_dashboards_enabled = false
datadog_lambda_extension_version = "38"
lambda_binaries_autobuild = true
lambda_default_log_retention_in_days = 7
lambda_default_log_level = "DEBUG"
eventbridge_scheduler_enabled = false
ssm_deployment_parameters_path_prefix = "/grants-ingest/local"
dynamodb_contributor_insights_enabled = false
ffis_ingest_email_address = "[email protected]"
is_forecasted_grants_enabled = true
max_split_grantsgov_opportunity_records = 10
max_split_grantsgov_forecast_records = 10
ses_active_receipt_rule_set_enabled = false

additional_lambda_environment_variables = {
S3_USE_PATH_STYLE = "true"
Expand Down
3 changes: 3 additions & 0 deletions terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -520,7 +520,10 @@ module "SplitGrantsGovXMLDB" {
grants_prepared_data_bucket_name = module.grants_prepared_data_bucket.bucket_id
grants_prepared_dynamodb_table_name = module.grants_prepared_dynamodb_table.table_name
grants_prepared_dynamodb_table_arn = module.grants_prepared_dynamodb_table.table_arn
is_forecasted_grants_enabled = var.is_forecasted_grants_enabled
max_split_records = var.max_split_grantsgov_records
max_split_opportunity_records = var.max_split_grantsgov_opportunity_records
max_split_forecast_records = var.max_split_grantsgov_forecast_records
}

module "ReceiveFFISEmail" {
Expand Down
2 changes: 2 additions & 0 deletions terraform/modules/SplitGrantsGovXMLDB/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ module "lambda_function" {
LOG_LEVEL = var.log_level
MAX_CONCURRENT_UPLOADS = "10"
MAX_SPLIT_RECORDS = tostring(var.max_split_records)
MAX_SPLIT_OPPORTUNITY_RECORDS = tostring(var.max_split_opportunity_records)
MAX_SPLIT_FORECAST_RECORDS = tostring(var.max_split_forecast_records)
IS_FORECASTED_GRANTS_ENABLED = var.is_forecasted_grants_enabled
})

Expand Down
14 changes: 13 additions & 1 deletion terraform/modules/SplitGrantsGovXMLDB/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,19 @@ variable "is_forecasted_grants_enabled" {
}

variable "max_split_records" {
description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation."
description = "Optional limit (i.e. for testing) on the number of records that the handler will process during a single invocation. This setting is a hard cap on top of opportunity- and forecast-specific limits."
type = number
default = -1
}

variable "max_split_opportunity_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_forecast_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that the handler will process during a single invocation."
type = number
default = -1
}
20 changes: 19 additions & 1 deletion terraform/variables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,26 @@ variable "ses_active_receipt_rule_set_enabled" {
default = true
}

variable "is_forecasted_grants_enabled" {
description = "When true, enables processing of forecasted grant records from Grants.gov."
type = bool
default = false
}

variable "max_split_grantsgov_records" {
description = "Optional limit (i.e. for testing) on the number of records that SplitGrantsGovXMLDB handler will process during a single invocation."
description = "Optional hard limit (i.e. for testing) on the number of records (of any type) that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_grantsgov_opportunity_records" {
description = "Optional limit (i.e. for testing) on the number of opportunity records that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}

variable "max_split_grantsgov_forecast_records" {
description = "Optional limit (i.e. for testing) on the number of forecast records that SplitGrantsGovXMLDB handler will process during a single invocation."
type = number
default = -1
}

0 comments on commit 36684e8

Please sign in to comment.