Skip to content

Commit

Permalink
Carl/bcda 8596 root cause fixes (#1023)
Browse files Browse the repository at this point in the history
## 🎫 Ticket

https://jira.cms.gov/browse/BCDA-8596

## 🛠 Changes

Revert worker client transport options that were causing large scale
restarts. Adjust a few odds and ends with the job reworks to make things
clearer.

## ℹ️ Context

There was an incident related to jobs being stuck in an infinite loop of
sending successful requests to BFD. It seems like what was happening
under the hood was that requests were happening, then at some point
before finishing, the worker instance would restart. I am 95% sure this
is related to the transport config.

<!-- If any of the following security implications apply, this PR must
not be merged without Stephen Walter's approval. Explain in this section
and add @SJWalter11 as a reviewer.
  - Adds a new software dependency or dependencies.
  - Modifies or invalidates one or more of our security controls.
  - Stores or transmits data that was not stored or transmitted before.
- Requires additional review of security implications for other reasons.
-->

## 🧪 Validation

Tested and linted locally, deployed branch to test env where the same
issue was happening and is now resolved.
  • Loading branch information
carlpartridge authored Dec 23, 2024
1 parent 98407bf commit 3e4aa67
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 28 deletions.
6 changes: 1 addition & 5 deletions bcda/client/bluebutton.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ func NewBlueButtonClient(config BlueButtonConfig) (*BlueButtonClient, error) {
// Ensure that we have compression enabled. This allows the transport to request for gzip content
// and handle the decompression transparently.
// See: https://golang.org/src/net/http/transport.go?s=3396:10950#L182 for more information
DisableCompression: false,
MaxIdleConns: 100, // default value
MaxIdleConnsPerHost: 100, // Upped from 2 to 100 to match the default value of MaxIdleConns
IdleConnTimeout: 90 * time.Second, // default value
MaxConnsPerHost: 0, // default value (0 means no limit)
DisableCompression: false,
}
var timeout int
if timeout, err = strconv.Atoi(conf.GetEnv("BB_TIMEOUT_MS")); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bcda/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const (
)

type JobEnqueueArgs struct {
ID int
ID int // parent Job ID
ACOID string
CMSID string
BeneficiaryIDs []string
Expand Down
4 changes: 2 additions & 2 deletions bcdaworker/queueing/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func validateJob(ctx context.Context, cfg ValidateJobConfig) (*models.Job, error
maxNotFoundRetries, err := safecast.ToInt(utils.GetEnvInt("BCDA_WORKER_MAX_JOB_NOT_FOUND_RETRIES", 3))
if err != nil {
cfg.Logger.Errorf("Failed to convert BCDA_WORKER_MAX_JOB_NOT_FOUND_RETRIES to int32. Defaulting to 3. Error: %s", err)
return nil, err, false
return nil, err, true
}

if cfg.ErrorCount >= maxNotFoundRetries {
Expand All @@ -93,7 +93,7 @@ func validateJob(ctx context.Context, cfg ValidateJobConfig) (*models.Job, error
u, err := safecast.ToUint(cfg.JobID)
if err != nil {
cfg.Logger.Errorf("Failed to convert Job ID to uint. Error: %s", err)
return nil, err, false
return nil, err, true
}

_, err = worker.CheckJobCompleteAndCleanup(ctx, cfg.Repository, u)
Expand Down
3 changes: 1 addition & 2 deletions bcdaworker/queueing/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/mock"
)

func TestProcessJobFailedValidation(t *testing.T) {
func TestProcessJobFailedValidation_Integration(t *testing.T) {
tests := []struct {
name string
validateErr error
Expand Down Expand Up @@ -87,7 +87,6 @@ func TestProcessJobFailedValidation(t *testing.T) {
}
})
}

}

func TestCheckIfCancelled(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions bcdaworker/queueing/que.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func (q *queue) processJob(queJob *que.Job) error {
return nil
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", queJob.ID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", jobArgs.TransactionID)

jobID, err := safecast.ToInt64(jobArgs.ID)
if err != nil {
return err
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", jobID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", jobArgs.TransactionID)

exportJob, err, ackJob := validateJob(ctx, ValidateJobConfig{
WorkerInstance: q.worker,
Logger: logger,
Expand All @@ -125,7 +125,7 @@ func (q *queue) processJob(queJob *que.Job) error {
}

// start a goroutine that will periodically check the status of the parent job
go checkIfCancelled(ctx, q.repository, cancel, queJob.ID, 15)
go checkIfCancelled(ctx, q.repository, cancel, jobID, 15)

if err := q.worker.ProcessJob(ctx, queJob.ID, *exportJob, jobArgs); err != nil {
err := errors.Wrap(err, "failed to process job")
Expand Down
26 changes: 13 additions & 13 deletions bcdaworker/queueing/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ type JobWorker struct {
river.WorkerDefaults[models.JobEnqueueArgs]
}

func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueArgs]) error {
func (w *JobWorker) Work(ctx context.Context, rjob *river.Job[models.JobEnqueueArgs]) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

jobID, err := safecast.ToInt64(rjob.Args.ID)
if err != nil {
return err
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", job.Args.ID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", job.Args.TransactionID)
ctx, _ = log.SetCtxLogger(ctx, "job_id", jobID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", rjob.Args.TransactionID)

// TODO: use pgxv5 when available
mainDB := database.Connection
Expand All @@ -114,19 +119,14 @@ func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueAr

defer updateJobQueueCountCloudwatchMetric(mainDB, logger)

jobID, err := safecast.ToInt64(job.Args.ID)
if err != nil {
return err
}

exportJob, err, ackJob := validateJob(ctx, ValidateJobConfig{
WorkerInstance: workerInstance,
Logger: logger,
Repository: repo,
JobID: jobID,
QJobID: job.ID,
Args: job.Args,
ErrorCount: len(job.Errors),
QJobID: rjob.ID,
Args: rjob.Args,
ErrorCount: len(rjob.Errors),
})
if ackJob {
// End logic here, basically acknowledge and return which will remove it from the queue.
Expand All @@ -138,9 +138,9 @@ func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueAr
}

// start a goroutine that will periodically check the status of the parent job
go checkIfCancelled(ctx, repo, cancel, job.ID, 15)
go checkIfCancelled(ctx, repo, cancel, jobID, 15)

if err := workerInstance.ProcessJob(ctx, job.ID, *exportJob, job.Args); err != nil {
if err := workerInstance.ProcessJob(ctx, rjob.ID, *exportJob, rjob.Args); err != nil {
err := errors.Wrap(err, "failed to process job")
logger.Error(err)
return err
Expand Down

0 comments on commit 3e4aa67

Please sign in to comment.