Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Carl/bcda 8596 root cause fixes #1023

Merged
merged 4 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions bcda/client/bluebutton.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,7 @@ func NewBlueButtonClient(config BlueButtonConfig) (*BlueButtonClient, error) {
// Ensure that we have compression enabled. This allows the transport to request for gzip content
// and handle the decompression transparently.
// See: https://golang.org/src/net/http/transport.go?s=3396:10950#L182 for more information
DisableCompression: false,
MaxIdleConns: 100, // default value
MaxIdleConnsPerHost: 100, // Upped from 2 to 100 to match the default value of MaxIdleConns
IdleConnTimeout: 90 * time.Second, // default value
MaxConnsPerHost: 0, // default value (0 means no limit)
DisableCompression: false,
bhagatparwinder marked this conversation as resolved.
Show resolved Hide resolved
}
var timeout int
if timeout, err = strconv.Atoi(conf.GetEnv("BB_TIMEOUT_MS")); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion bcda/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ const (
)

type JobEnqueueArgs struct {
ID int
ID int // parent Job ID
ACOID string
CMSID string
BeneficiaryIDs []string
Expand Down
4 changes: 2 additions & 2 deletions bcdaworker/queueing/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func validateJob(ctx context.Context, cfg ValidateJobConfig) (*models.Job, error
maxNotFoundRetries, err := safecast.ToInt(utils.GetEnvInt("BCDA_WORKER_MAX_JOB_NOT_FOUND_RETRIES", 3))
if err != nil {
cfg.Logger.Errorf("Failed to convert BCDA_WORKER_MAX_JOB_NOT_FOUND_RETRIES to int32. Defaulting to 3. Error: %s", err)
return nil, err, false
return nil, err, true
}

if cfg.ErrorCount >= maxNotFoundRetries {
Expand All @@ -93,7 +93,7 @@ func validateJob(ctx context.Context, cfg ValidateJobConfig) (*models.Job, error
u, err := safecast.ToUint(cfg.JobID)
if err != nil {
cfg.Logger.Errorf("Failed to convert Job ID to uint. Error: %s", err)
return nil, err, false
return nil, err, true
}

_, err = worker.CheckJobCompleteAndCleanup(ctx, cfg.Repository, u)
Expand Down
3 changes: 1 addition & 2 deletions bcdaworker/queueing/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/stretchr/testify/mock"
)

func TestProcessJobFailedValidation(t *testing.T) {
func TestProcessJobFailedValidation_Integration(t *testing.T) {
tests := []struct {
name string
validateErr error
Expand Down Expand Up @@ -87,7 +87,6 @@ func TestProcessJobFailedValidation(t *testing.T) {
}
})
}

}

func TestCheckIfCancelled(t *testing.T) {
Expand Down
10 changes: 5 additions & 5 deletions bcdaworker/queueing/que.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,15 +97,15 @@ func (q *queue) processJob(queJob *que.Job) error {
return nil
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", queJob.ID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", jobArgs.TransactionID)

jobID, err := safecast.ToInt64(jobArgs.ID)
if err != nil {
return err
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", jobID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", jobArgs.TransactionID)

exportJob, err, ackJob := validateJob(ctx, ValidateJobConfig{
WorkerInstance: q.worker,
Logger: logger,
Expand All @@ -125,7 +125,7 @@ func (q *queue) processJob(queJob *que.Job) error {
}

// start a goroutine that will periodically check the status of the parent job
go checkIfCancelled(ctx, q.repository, cancel, queJob.ID, 15)
go checkIfCancelled(ctx, q.repository, cancel, jobID, 15)

if err := q.worker.ProcessJob(ctx, queJob.ID, *exportJob, jobArgs); err != nil {
err := errors.Wrap(err, "failed to process job")
Expand Down
26 changes: 13 additions & 13 deletions bcdaworker/queueing/river.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,18 @@ type JobWorker struct {
river.WorkerDefaults[models.JobEnqueueArgs]
}

func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueArgs]) error {
func (w *JobWorker) Work(ctx context.Context, rjob *river.Job[models.JobEnqueueArgs]) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()

jobID, err := safecast.ToInt64(rjob.Args.ID)
if err != nil {
return err
}

ctx = log.NewStructuredLoggerEntry(log.Worker, ctx)
ctx, _ = log.SetCtxLogger(ctx, "job_id", job.Args.ID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", job.Args.TransactionID)
ctx, _ = log.SetCtxLogger(ctx, "job_id", jobID)
ctx, logger := log.SetCtxLogger(ctx, "transaction_id", rjob.Args.TransactionID)

// TODO: use pgxv5 when available
mainDB := database.Connection
Expand All @@ -114,19 +119,14 @@ func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueAr

defer updateJobQueueCountCloudwatchMetric(mainDB, logger)

jobID, err := safecast.ToInt64(job.Args.ID)
if err != nil {
return err
}

exportJob, err, ackJob := validateJob(ctx, ValidateJobConfig{
WorkerInstance: workerInstance,
Logger: logger,
Repository: repo,
JobID: jobID,
QJobID: job.ID,
Args: job.Args,
ErrorCount: len(job.Errors),
QJobID: rjob.ID,
Args: rjob.Args,
ErrorCount: len(rjob.Errors),
})
if ackJob {
// End logic here, basically acknowledge and return which will remove it from the queue.
Expand All @@ -138,9 +138,9 @@ func (w *JobWorker) Work(ctx context.Context, job *river.Job[models.JobEnqueueAr
}

// start a goroutine that will periodically check the status of the parent job
go checkIfCancelled(ctx, repo, cancel, job.ID, 15)
go checkIfCancelled(ctx, repo, cancel, jobID, 15)

if err := workerInstance.ProcessJob(ctx, job.ID, *exportJob, job.Args); err != nil {
if err := workerInstance.ProcessJob(ctx, rjob.ID, *exportJob, rjob.Args); err != nil {
err := errors.Wrap(err, "failed to process job")
logger.Error(err)
return err
Expand Down
Loading