diff --git a/cmd/api/src/daemons/datapipe/datapipe.go b/cmd/api/src/daemons/datapipe/datapipe.go index a47ee54a39..3b6dc9d358 100644 --- a/cmd/api/src/daemons/datapipe/datapipe.go +++ b/cmd/api/src/daemons/datapipe/datapipe.go @@ -108,11 +108,11 @@ func (s *Daemon) analyze() { if err := RunAnalysisOperations(s.ctx, s.db, s.graphdb, s.cfg); err != nil { log.Errorf("Graph analysis failed: %v", err) - s.failJobsUnderAnalysis() + FailAnalyzedFileUploadJobs(s.ctx, s.db) s.status.Update(model.DatapipeStatusIdle, false) } else { - s.completeJobsUnderAnalysis() + CompleteAnalyzedFileUploadJobs(s.ctx, s.db) if entityPanelCachingFlag, err := s.db.GetFlagByKey(appcfg.FeatureEntityPanelCaching); err != nil { log.Errorf("Error retrieving entity panel caching flag: %v", err) @@ -140,18 +140,6 @@ func (s *Daemon) ingestAvailableTasks() { } } -func (s *Daemon) getNumJobsWaitingForAnalysis() (int, error) { - numJobsWaitingForAnalysis := 0 - - if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { - return 0, err - } else { - numJobsWaitingForAnalysis += len(fileUploadJobsUnderAnalysis) - } - - return numJobsWaitingForAnalysis, nil -} - func (s *Daemon) Start() { var ( datapipeLoopTimer = time.NewTimer(s.tickInterval) @@ -173,15 +161,15 @@ func (s *Daemon) Start() { s.ingestAvailableTasks() // Manage time-out state progression for file upload jobs - fileupload.ProcessStaleFileUploadJobs(s.db) + fileupload.ProcessStaleFileUploadJobs(s.ctx, s.db) // Manage nominal state transitions for file upload jobs - s.processIngestedFileUploadJobs() + ProcessIngestedFileUploadJobs(s.ctx, s.db) // If there are completed file upload jobs or if analysis was user-requested, perform analysis. - if numJobsWaitingForAnalysis, err := s.getNumJobsWaitingForAnalysis(); err != nil { + if hasJobsWaitingForAnalysis, err := HasFileUploadJobsWaitingForAnalysis(s.db); err != nil { log.Errorf("Failed looking up jobs waiting for analysis: %v", err) - } else if numJobsWaitingForAnalysis > 0 || s.getAnalysisRequested() { + } else if hasJobsWaitingForAnalysis || s.getAnalysisRequested() { s.analyze() } @@ -228,11 +216,9 @@ func (s *Daemon) clearOrphanedData() { log.Errorf("Failed removing file: %s", fullPath) } - // Check to see if we need to shutdown after every file deletion - select { - case <-s.ctx.Done(): + // Check to see if we need to exit after every file deletion + if s.ctx.Err() != nil { return - default: } } diff --git a/cmd/api/src/daemons/datapipe/jobs.go b/cmd/api/src/daemons/datapipe/jobs.go index b80a028c98..e0b1c5fb4d 100644 --- a/cmd/api/src/daemons/datapipe/jobs.go +++ b/cmd/api/src/daemons/datapipe/jobs.go @@ -18,6 +18,7 @@ package datapipe import ( "context" + "github.com/specterops/bloodhound/src/database" "os" "github.com/specterops/bloodhound/dawgs/graph" @@ -26,37 +27,67 @@ import ( "github.com/specterops/bloodhound/src/services/fileupload" ) -func (s *Daemon) failJobsUnderAnalysis() { - if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { +func HasFileUploadJobsWaitingForAnalysis(db database.Database) (bool, error) { + if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { + return false, err + } else { + return len(fileUploadJobsUnderAnalysis) > 0, nil + } +} + +func FailAnalyzedFileUploadJobs(ctx context.Context, db database.Database) { + // Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not + // commit state transitions when we are shutting down. + if ctx.Err() != nil { + return + } + + if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { log.Errorf("Failed to load file upload jobs under analysis: %v", err) } else { for _, job := range fileUploadJobsUnderAnalysis { - if err := fileupload.FailFileUploadJob(s.db, job.ID, "Analysis failed"); err != nil { + if err := fileupload.UpdateFileUploadJobStatus(db, job, model.JobStatusFailed, "Analysis failed"); err != nil { log.Errorf("Failed updating file upload job %d to failed status: %v", job.ID, err) } } } } -func (s *Daemon) completeJobsUnderAnalysis() { - if fileUploadJobsUnderAnalysis, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { +func CompleteAnalyzedFileUploadJobs(ctx context.Context, db database.Database) { + // Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not + // commit state transitions when we are shutting down. + if ctx.Err() != nil { + return + } + + if fileUploadJobsUnderAnalysis, err := db.GetFileUploadJobsWithStatus(model.JobStatusAnalyzing); err != nil { log.Errorf("Failed to load file upload jobs under analysis: %v", err) } else { for _, job := range fileUploadJobsUnderAnalysis { - if err := fileupload.UpdateFileUploadJobStatus(s.db, job, model.JobStatusComplete, "Complete"); err != nil { + if err := fileupload.UpdateFileUploadJobStatus(db, job, model.JobStatusComplete, "Complete"); err != nil { log.Errorf("Error updating fileupload job %d: %v", job.ID, err) } } } } -func (s *Daemon) processIngestedFileUploadJobs() { - if ingestedFileUploadJobs, err := s.db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil { +func ProcessIngestedFileUploadJobs(ctx context.Context, db database.Database) { + // Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not + // commit state transitions when shutting down. + if ctx.Err() != nil { + return + } + + if ingestingFileUploadJobs, err := db.GetFileUploadJobsWithStatus(model.JobStatusIngesting); err != nil { log.Errorf("Failed to look up finished file upload jobs: %v", err) } else { - for _, ingestedFileUploadJob := range ingestedFileUploadJobs { - if err := fileupload.UpdateFileUploadJobStatus(s.db, ingestedFileUploadJob, model.JobStatusAnalyzing, "Analyzing"); err != nil { - log.Errorf("Error updating fileupload job %d: %v", ingestedFileUploadJob.ID, err) + for _, ingestingFileUploadJob := range ingestingFileUploadJobs { + if remainingIngestTasks, err := db.GetIngestTasksForJob(ingestingFileUploadJob.ID); err != nil { + log.Errorf("Failed looking up remaining ingest tasks for file upload job %d: %v", ingestingFileUploadJob.ID, err) + } else if len(remainingIngestTasks) == 0 { + if err := fileupload.UpdateFileUploadJobStatus(db, ingestingFileUploadJob, model.JobStatusAnalyzing, "Analyzing"); err != nil { + log.Errorf("Error updating fileupload job %d: %v", ingestingFileUploadJob.ID, err) + } } } } @@ -97,10 +128,8 @@ func (s *Daemon) processIngestTasks(ctx context.Context, ingestTasks model.Inges for _, ingestTask := range ingestTasks { // Check the context to see if we should continue processing ingest tasks. This has to be explicit since error // handling assumes that all failures should be logged and not returned. - select { - case <-ctx.Done(): + if ctx.Err() != nil { return - default: } if err := s.processIngestFile(ctx, ingestTask.FileName); err != nil { diff --git a/cmd/api/src/daemons/datapipe/jobs_test.go b/cmd/api/src/daemons/datapipe/jobs_test.go new file mode 100644 index 0000000000..9aff2c352f --- /dev/null +++ b/cmd/api/src/daemons/datapipe/jobs_test.go @@ -0,0 +1,133 @@ +package datapipe_test + +import ( + "context" + "github.com/specterops/bloodhound/src/daemons/datapipe" + "github.com/specterops/bloodhound/src/database/mocks" + "github.com/specterops/bloodhound/src/model" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "testing" +) + +func TestHasJobsWaitingForAnalysis(t *testing.T) { + var ( + mockCtrl = gomock.NewController(t) + dbMock = mocks.NewMockDatabase(mockCtrl) + ) + + defer mockCtrl.Finish() + + t.Run("Has Jobs Waiting for Analysis", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{}}, nil) + + hasJobs, err := datapipe.HasFileUploadJobsWaitingForAnalysis(dbMock) + + require.True(t, hasJobs) + require.Nil(t, err) + }) + + t.Run("Has No Jobs Waiting for Analysis", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{}, nil) + + hasJobs, err := datapipe.HasFileUploadJobsWaitingForAnalysis(dbMock) + + require.False(t, hasJobs) + require.Nil(t, err) + }) +} + +func TestFailAnalyzedFileUploadJobs(t *testing.T) { + const jobID int64 = 1 + + var ( + mockCtrl = gomock.NewController(t) + dbMock = mocks.NewMockDatabase(mockCtrl) + ) + + defer mockCtrl.Finish() + + t.Run("Fail Analyzed File Upload Jobs", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{ + BigSerial: model.BigSerial{ + ID: jobID, + }, + Status: model.JobStatusAnalyzing, + }}, nil) + + dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error { + require.Equal(t, model.JobStatusFailed, fileUploadJob.Status) + return nil + }) + + datapipe.FailAnalyzedFileUploadJobs(context.Background(), dbMock) + }) +} + +func TestCompleteAnalyzedFileUploadJobs(t *testing.T) { + const jobID int64 = 1 + + var ( + mockCtrl = gomock.NewController(t) + dbMock = mocks.NewMockDatabase(mockCtrl) + ) + + defer mockCtrl.Finish() + + t.Run("Complete Analyzed File Upload Jobs", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusAnalyzing).Return([]model.FileUploadJob{{ + BigSerial: model.BigSerial{ + ID: jobID, + }, + Status: model.JobStatusAnalyzing, + }}, nil) + + dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error { + require.Equal(t, model.JobStatusComplete, fileUploadJob.Status) + return nil + }) + + datapipe.CompleteAnalyzedFileUploadJobs(context.Background(), dbMock) + }) +} + +func TestProcessIngestedFileUploadJobs(t *testing.T) { + const jobID int64 = 1 + + var ( + mockCtrl = gomock.NewController(t) + dbMock = mocks.NewMockDatabase(mockCtrl) + ) + + defer mockCtrl.Finish() + + t.Run("Transition Jobs with No Remaining Ingest Tasks", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusIngesting).Return([]model.FileUploadJob{{ + BigSerial: model.BigSerial{ + ID: jobID, + }, + Status: model.JobStatusIngesting, + }}, nil) + + dbMock.EXPECT().GetIngestTasksForJob(jobID).Return([]model.IngestTask{}, nil) + dbMock.EXPECT().UpdateFileUploadJob(gomock.Any()).DoAndReturn(func(fileUploadJob model.FileUploadJob) error { + require.Equal(t, model.JobStatusAnalyzing, fileUploadJob.Status) + return nil + }) + + datapipe.ProcessIngestedFileUploadJobs(context.Background(), dbMock) + }) + + t.Run("Don't Transition Jobs with Remaining Ingest Tasks", func(t *testing.T) { + dbMock.EXPECT().GetFileUploadJobsWithStatus(model.JobStatusIngesting).Return([]model.FileUploadJob{{ + BigSerial: model.BigSerial{ + ID: jobID, + }, + Status: model.JobStatusIngesting, + }}, nil) + + dbMock.EXPECT().GetIngestTasksForJob(jobID).Return([]model.IngestTask{{}}, nil) + + datapipe.ProcessIngestedFileUploadJobs(context.Background(), dbMock) + }) +} diff --git a/cmd/api/src/services/fileupload/file_upload.go b/cmd/api/src/services/fileupload/file_upload.go index 6517547321..36d244905c 100644 --- a/cmd/api/src/services/fileupload/file_upload.go +++ b/cmd/api/src/services/fileupload/file_upload.go @@ -18,6 +18,7 @@ package fileupload import ( + "context" "errors" "fmt" "io" @@ -42,7 +43,13 @@ type FileUploadData interface { GetFileUploadJobsWithStatus(status model.JobStatus) ([]model.FileUploadJob, error) } -func ProcessStaleFileUploadJobs(db FileUploadData) { +func ProcessStaleFileUploadJobs(ctx context.Context, db FileUploadData) { + // Because our database interfaces do not yet accept contexts this is a best-effort check to ensure that we do not + // commit state transitions when shutting down. + if ctx.Err() != nil { + return + } + var ( now = time.Now().UTC() threshold = now.Add(-jobActivityTimeout) diff --git a/packages/go/analysis/ad/adcs.go b/packages/go/analysis/ad/adcs.go index 4dae7474c0..26c536927e 100644 --- a/packages/go/analysis/ad/adcs.go +++ b/packages/go/analysis/ad/adcs.go @@ -449,29 +449,25 @@ func PostADCS(ctx context.Context, db graph.Database, groupExpansions impact.Pat if !adcsEnabled { return &analysis.AtomicPostProcessingStats{}, nil } - operation := analysis.NewPostRelationshipOperation(ctx, db, "ADCS Post Processing") if enterpriseCertAuthorities, err := FetchNodesByKind(ctx, db, ad.EnterpriseCA); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching enterpriseCA nodes: %w", err) } else if rootCertAuthorities, err := FetchNodesByKind(ctx, db, ad.RootCA); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching rootCA nodes: %w", err) } else if certTemplates, err := FetchNodesByKind(ctx, db, ad.CertTemplate); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching cert template nodes: %w", err) } else if domains, err := FetchNodesByKind(ctx, db, ad.Domain); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed fetching domain nodes: %w", err) } else if step1Stats, err := postADCSPreProcessStep1(ctx, db, enterpriseCertAuthorities, rootCertAuthorities); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed adcs pre-processing step 1: %w", err) } else if step2Stats, err := postADCSPreProcessStep2(ctx, db, certTemplates); err != nil { - operation.Done() return &analysis.AtomicPostProcessingStats{}, fmt.Errorf("failed adcs pre-processing step 2: %w", err) } else { + operation := analysis.NewPostRelationshipOperation(ctx, db, "ADCS Post Processing") + operation.Stats.Merge(step1Stats) operation.Stats.Merge(step2Stats) + var cache = NewADCSCache() cache.BuildCache(ctx, db, enterpriseCertAuthorities, certTemplates)