diff --git a/app/pkg/analyze/analyzer.go b/app/pkg/analyze/analyzer.go index 6f44569..88fba7c 100644 --- a/app/pkg/analyze/analyzer.go +++ b/app/pkg/analyze/analyzer.go @@ -83,13 +83,13 @@ type Analyzer struct { } // NewAnalyzer creates a new Analyzer. -func NewAnalyzer(logOffsetsFile string, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) { +func NewAnalyzer(logOffsetsFile string, maxDelay time.Duration, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) { logOffsets, err := initOffsets(logOffsetsFile) if err != nil { return nil, err } - // Create a rate limiting queue for processing files. We rate limit to each file every 30 seconds. This is because + // Create a rate limiting queue for processing files. We rate limit to each file every N seconds. This is because // The logs are constantly being written to and we don't want to process the files too quickly. // We are potentially writing to multiple files at the same time e.g. the Analyzer logs and then a different // log file for each instance of RunMe. So we need to track different backoffs for each file which the rate limiter @@ -97,7 +97,15 @@ func NewAnalyzer(logOffsetsFile string, rawLogsDB *dbutil.LockingDB[*logspb.LogE // In that case, after we detect the start of a trace we would want to retry on a very short interval with backoff // to detect the end of the trace as quickly as possible. Right now we don't do that and in fact we never call // forget so we will basically max out the retry limit at the max delay. - fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 30*time.Second)) + // The Max delay is configurable because in some cases (e.g. evaluation, https://github.com/jlewi/foyle/issues/301) + // We want to minimize the time between when a log entry is written and when we process it so the trace is available. + + if maxDelay <= 0 { + return nil, errors.New("Max delay must be greater than 0") + } + + baseDelay := time.Second + fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay)) sessBuilder, err := NewSessionBuilder(sessions) if err != nil { @@ -128,7 +136,7 @@ func initOffsets(logOffsetsFile string) (*logspb.LogsWaterMark, error) { watermark := &logspb.LogsWaterMark{} if err := protojson.Unmarshal(raw, watermark); err != nil { - log.Error(err, "Failed to unmarshal watermarks file %s; watermarks will be reinitialized", logOffsetsFile) + log.Error(err, "Failed to unmarshal watermarks; watermarks will be reinitialized", "file", logOffsetsFile) } return watermark, nil } @@ -808,5 +816,23 @@ func combineGenerateTrace(ctx context.Context, entries []*api.LogEntry) (*logspb trace.EvalMode = evalMode combineSpans(trace) + dedupeAssertions(trace) return trace, nil } + +// dedupeAssertions since our processing has at least once semantics we could wind up with duplicate copies +// of an assertion +func dedupeAssertions(trace *logspb.Trace) { + newAssertions := make([]*v1alpha1.Assertion, 0, len(trace.Assertions)) + seen := make(map[string]bool) + + for _, a := range trace.Assertions { + if seen[a.GetId()] { + continue + } + newAssertions = append(newAssertions, a) + seen[a.GetId()] = true + } + + trace.Assertions = newAssertions +} diff --git a/app/pkg/analyze/analyzer_test.go b/app/pkg/analyze/analyzer_test.go index 1609fb6..9439815 100644 --- a/app/pkg/analyze/analyzer_test.go +++ b/app/pkg/analyze/analyzer_test.go @@ -263,7 +263,7 @@ func Test_Analyzer(t *testing.T) { } logOffsetsFile := filepath.Join(rawDir, "log_offsets.json") - a, err := NewAnalyzer(logOffsetsFile, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager) + a, err := NewAnalyzer(logOffsetsFile, 3*time.Second, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager) if err != nil { t.Fatalf("Failed to create analyzer: %v", err) } @@ -434,7 +434,7 @@ func Test_CombineGenerateEntries(t *testing.T) { expectedEvalMode: false, logFunc: func(log logr.Logger) { assertion := &v1alpha1.Assertion{ - Name: "testassertion", + Name: v1alpha1.Assertion_ONE_CODE_CELL, Result: v1alpha1.AssertResult_PASSED, Detail: "", Id: "1234", @@ -498,8 +498,9 @@ func Test_CombineGenerateEntries(t *testing.T) { c.logFunc(zTestLog) - testLog.Sync() - + if err := testLog.Sync(); err != nil { + t.Fatalf("Failed to sync log: %v", err) + } } for _, logFile := range logFiles { @@ -552,3 +553,65 @@ func Test_CombineGenerateEntries(t *testing.T) { }) } } + +func Test_DedupeAssertions(t *testing.T) { + type testCase struct { + name string + input []*v1alpha1.Assertion + expected []*v1alpha1.Assertion + } + + cases := []testCase{ + { + name: "basic", + input: []*v1alpha1.Assertion{ + { + Name: v1alpha1.Assertion_ONE_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + Id: "1", + }, + { + Name: v1alpha1.Assertion_ONE_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + Id: "1", + }, + { + Name: v1alpha1.Assertion_ENDS_WITH_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + Id: "2", + }, + }, + expected: []*v1alpha1.Assertion{ + { + Name: v1alpha1.Assertion_ONE_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + Id: "1", + }, + { + Name: v1alpha1.Assertion_ENDS_WITH_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + Id: "2", + }, + }, + }, + { + name: "nil", + input: nil, + expected: []*v1alpha1.Assertion{}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + trace := &logspb.Trace{ + Assertions: c.input, + } + + dedupeAssertions(trace) + + if d := cmp.Diff(c.expected, trace.Assertions, cmpopts.IgnoreUnexported(v1alpha1.Assertion{})); d != "" { + t.Errorf("Unexpected diff:\n%s", d) + } + }) + } +} diff --git a/app/pkg/application/app.go b/app/pkg/application/app.go index c78167c..53b7266 100644 --- a/app/pkg/application/app.go +++ b/app/pkg/application/app.go @@ -394,7 +394,8 @@ func (a *App) SetupAnalyzer() (*analyze.Analyzer, error) { a.sessionsManager = manager a.sessionsDB = db - analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager) + maxDelay := time.Duration(a.Config.GetLogsMaxDelaySeconds()) * time.Second + analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), maxDelay, a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager) if err != nil { return nil, err } diff --git a/app/pkg/config/config.go b/app/pkg/config/config.go index 3713b07..85abe9b 100644 --- a/app/pkg/config/config.go +++ b/app/pkg/config/config.go @@ -193,6 +193,9 @@ type Logging struct { // Use stderr to write to stderr. // Use gcplogs:///projects/${PROJECT}/logs/${LOGNAME} to write to Google Cloud Logging Sinks []LogSink `json:"sinks,omitempty" yaml:"sinks,omitempty"` + + // MaxDelaySeconds is the maximum delay in seconds to wait before processing the logs. + MaxDelaySeconds int `json:"maxDelaySeconds,omitempty" yaml:"maxDelaySeconds,omitempty"` } type LogSink struct { @@ -389,6 +392,7 @@ func InitViperInstance(v *viper.Viper, cmd *cobra.Command) error { v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) v.AutomaticEnv() // read in environment variables that match + setLoggingDefaults(v) setAgentDefaults(v) setServerDefaults(v) @@ -440,6 +444,10 @@ func (c *Config) APIBaseURL() string { return fmt.Sprintf("http://%s:%d/%s", c.Server.BindAddress, c.Server.HttpPort, c.APIPrefix()) } +func (c *Config) GetLogsMaxDelaySeconds() int { + return c.Logging.MaxDelaySeconds +} + // GetConfig returns a configuration created from the viper configuration. func GetConfig() *Config { if globalV == nil { @@ -506,6 +514,9 @@ func (c *Config) Write(cfgFile string) error { return yaml.NewEncoder(f).Encode(c) } +func setLoggingDefaults(v *viper.Viper) { + v.SetDefault("logging.maxDelaySeconds", 30) +} func setServerDefaults(v *viper.Viper) { v.SetDefault("server.bindAddress", "0.0.0.0") v.SetDefault("server.httpPort", defaultHTTPPort) diff --git a/app/pkg/config/config_test.go b/app/pkg/config/config_test.go index 6578787..c8eca32 100644 --- a/app/pkg/config/config_test.go +++ b/app/pkg/config/config_test.go @@ -10,30 +10,34 @@ import ( func Test_ConfigDefaultConfig(t *testing.T) { type testCase struct { - name string - configFile string - expectedRAG bool - expectedHTTPPort int + name string + configFile string + expectedRAG bool + expectedHTTPPort int + expectedMaxDelaySeconds int } cases := []testCase{ { - name: "config-file-does-not-exist", - configFile: "doesnotexist.yaml", - expectedRAG: defaultRagEnabled, - expectedHTTPPort: defaultHTTPPort, + name: "config-file-does-not-exist", + configFile: "doesnotexist.yaml", + expectedRAG: defaultRagEnabled, + expectedHTTPPort: defaultHTTPPort, + expectedMaxDelaySeconds: 30, }, { - name: "empty-file", - configFile: "empty.yaml", - expectedRAG: defaultRagEnabled, - expectedHTTPPort: defaultHTTPPort, + name: "empty-file", + configFile: "empty.yaml", + expectedRAG: defaultRagEnabled, + expectedHTTPPort: defaultHTTPPort, + expectedMaxDelaySeconds: 30, }, { - name: "partial", - configFile: "partial.yaml", - expectedRAG: defaultRagEnabled, - expectedHTTPPort: defaultHTTPPort, + name: "partial", + configFile: "partial.yaml", + expectedRAG: defaultRagEnabled, + expectedHTTPPort: defaultHTTPPort, + expectedMaxDelaySeconds: 30, }, } diff --git a/app/pkg/config/update_test.go b/app/pkg/config/update_test.go index d00e833..77997de 100644 --- a/app/pkg/config/update_test.go +++ b/app/pkg/config/update_test.go @@ -27,8 +27,9 @@ func Test_UpdateViperConfig(t *testing.T) { expression: "agent.model=some-other-model", expected: &Config{ Logging: Logging{ - Level: "info", - Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}}, + Level: "info", + Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}}, + MaxDelaySeconds: 30, }, Agent: &api.AgentConfig{ Model: "some-other-model", diff --git a/app/pkg/eval/evaluator.go b/app/pkg/eval/evaluator.go index 2f2458b..19c1103 100644 --- a/app/pkg/eval/evaluator.go +++ b/app/pkg/eval/evaluator.go @@ -7,6 +7,10 @@ import ( "sort" "time" + "connectrpc.com/otelconnect" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/encoding/protojson" "github.com/go-logr/logr" @@ -85,11 +89,17 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er return errors.New("EvalDir is required") } - aiClient := newAIServiceClient(experiment.Spec.AgentAddress) + otelInterceptor, err := otelconnect.NewInterceptor() + if err != nil { + return errors.Wrapf(err, "Failed to create OpenTelemetry interceptor") + } + + aiClient := newAIServiceClient(experiment.Spec.AgentAddress, connect.WithInterceptors(otelInterceptor)) logsClient := logspbconnect.NewLogsServiceClient( newHTTPClient(), experiment.Spec.AgentAddress, + connect.WithInterceptors(otelInterceptor), ) manager, err := openResultsManager(experiment.Spec.OutputDB) @@ -141,7 +151,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er sortEvalExamplesInTime(examples) // Now generate predictions for any results that are missing them. - if err := e.processExamples(ctx, examples, lastProcessedTime, aiClient, logsClient, manager); err != nil { + if err := e.processExamples(ctx, experiment, examples, lastProcessedTime, aiClient, logsClient, manager); err != nil { return err } @@ -177,7 +187,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er return nil } -func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager) error { +func (e *Evaluator) processExamples(ctx context.Context, experiment api.Experiment, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager) error { oLog := logs.FromContext(ctx) oaiClient, err := oai.NewClient(e.config) @@ -205,62 +215,78 @@ func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.Ev } log.Info("Processing example", "index", eIndex, "numExamples", len(examples)) - var processErr error - - uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { - processErr = e.processResult(ctx, result, example, client, logsClient, judge) - // We need to return for the transaction to be committed. - return nil - }) - - if processErr != nil { - log.Error(processErr, "Failed to process example") - // For now we abort on error to see what's going on. - return processErr + exampleCtx := logr.NewContext(ctx, log) + if err := e.processExample(exampleCtx, experiment.Metadata.Name, example, client, logsClient, manager, judge); err != nil { + return err } + } + return nil +} - if uErr != nil { - log.Error(uErr, "Failed to update result") - // For now we abort on error to see what's going on. - return uErr - } +func (e *Evaluator) processExample(originalCtx context.Context, name string, example *v1alpha1.EvalExample, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager, judge *Judge) error { + log := logs.FromContext(originalCtx).WithValues("exampleId", example.GetId()) + // We need to start a new trace for this example + tp := tracer() + traceCtx, traceSpan := tp.Start(originalCtx, "(*Evaluator).processExample", trace.WithNewRoot(), trace.WithAttributes(attribute.String("experiment", name), attribute.String("exampleId", example.GetId()))) + traceId := traceSpan.SpanContext().TraceID() + log = log.WithValues("traceId", traceId.String()) + ctx := logr.NewContext(traceCtx, log) + defer traceSpan.End() + log.Info("Start example") + var processErr error + + uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { + processErr = e.processResult(ctx, result, example, client, logsClient, judge) + // We need to return for the transaction to be committed. + return nil + }) - result, err := manager.Get(ctx, example.GetId()) - if err != nil { - return errors.Wrapf(err, "Failed to get latest result for example %s", example.GetId()) - } + if processErr != nil { + log.Error(processErr, "Failed to process example") + // For now we abort on error to see what's going on. + return processErr + } - if result.Error != "" { - // Generating a completion failed for this example so we should keep going. - // There won't be a blocklog to wait for. - continue - } + if uErr != nil { + log.Error(uErr, "Failed to update result") + // For now we abort on error to see what's going on. + return uErr + } - if err := e.waitForBlockLog(ctx, result, logsClient); err != nil { - log.Error(err, "Failed to wait for block log") - // For now we abort on error to see what's going on. - return errors.Wrapf(err, "Failed to get block log for example %s", example.GetId()) - } + result, err := manager.Get(ctx, example.GetId()) + if err != nil { + return errors.Wrapf(err, "Failed to get latest result for example %s", example.GetId()) + } - var ragErr error - // Getting the bestRAG result depends on the trace having been processed so we run after waiting for the BlockLog - uErr = manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { - ragErr = e.reconcileBestRAGResult(ctx, result, logsClient) - return nil - }) + if result.Error != "" { + // Generating a completion failed for this example so we should keep going. + // There won't be a blocklog to wait for. + return nil + } - if ragErr != nil { - log.Error(ragErr, "Failed to reconcile best RAG result") - // For now we abort on error to see what's going on. - return ragErr - } + if err := e.waitForBlockLog(ctx, result, logsClient); err != nil { + log.Error(err, "Failed to wait for block log") + // For now we abort on error to see what's going on. + return errors.Wrapf(err, "Failed to get block log for example %s", example.GetId()) + } - if uErr != nil { - log.Error(uErr, "Failed to update result") - // For now we abort on error to see what's going on. - return uErr - } + var ragErr error + // Getting the bestRAG result depends on the trace having been processed so we run after waiting for the BlockLog + uErr = manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error { + ragErr = e.reconcileBestRAGResult(ctx, result, logsClient) + return nil + }) + if ragErr != nil { + log.Error(ragErr, "Failed to reconcile best RAG result") + // For now we abort on error to see what's going on. + return ragErr + } + + if uErr != nil { + log.Error(uErr, "Failed to update result") + // For now we abort on error to see what's going on. + return uErr } return nil } @@ -270,6 +296,8 @@ func (e *Evaluator) processResult(ctx context.Context, result *v1alpha1.EvalResu result.Example = example log := logs.FromContext(ctx).WithValues("exampleId", example.GetId()) ctx = logr.NewContext(ctx, log) + ctx, span := tracer().Start(ctx, "(*Evaluator).processResult") + defer span.End() if err := runGenerate(ctx, result, client); err != nil { return err @@ -302,6 +330,9 @@ func (e *Evaluator) processResult(ctx context.Context, result *v1alpha1.EvalResu // example then the result will be nil but result.Error will be set func runGenerate(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha1connect.AIServiceClient) error { log := logs.FromContext(ctx) + ctx, span := tracer().Start(ctx, "runGenerate") + defer span.End() + // ID for the generate session genSessionID := ulid.GenerateID() @@ -381,6 +412,8 @@ func runGenerate(ctx context.Context, result *v1alpha1.EvalResult, client v1alph } func runExecute(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha1connect.AIServiceClient) error { + ctx, span := tracer().Start(ctx, "runExecute") + defer span.End() log := logs.FromContext(ctx) // We need to send a LOG event to the agent to simulate the cells being executed. executeEventReq := &v1alpha1.LogEventsRequest{} @@ -442,6 +475,8 @@ func runExecute(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha } func (e *Evaluator) waitForBlockLog(ctx context.Context, result *v1alpha1.EvalResult, client logspbconnect.LogsServiceClient) error { + ctx, span := tracer().Start(ctx, "(*Evaluator).waitForBlockLog") + defer span.End() // We need to wait for the block log to be processed. // This is done to // 1. Increase the likelihood we have learned from the block @@ -507,6 +542,9 @@ func (e *Evaluator) waitForBlockLog(ctx context.Context, result *v1alpha1.EvalRe } func (e *Evaluator) reconcileBestRAGResult(ctx context.Context, evalResult *v1alpha1.EvalResult, client logspbconnect.LogsServiceClient) error { + ctx, span := tracer().Start(ctx, "(*Evaluator).reconcileBestRAGResult") + defer span.End() + if evalResult.GenTraceId == "" { return errors.WithStack(errors.New("GenTraceId is empty")) } @@ -602,9 +640,19 @@ func (e *Evaluator) buildExperimentReport(ctx context.Context, name string, mana r.CellsMatchCounts = make(map[string]int32) for _, c := range counts { + if c.MatchResult == nil { + // N.B. I think for unknown it ends up being a nil value. I suspect this is because default values are + // elided when marshalling to JSON. We should fix that by changing the JSON serialization. + key := v1alpha1.CellsMatchResult_UNKNOWN_CellsMatchResult.String() + if _, ok := r.CellsMatchCounts[key]; !ok { + r.CellsMatchCounts[key] = 0 + } + r.CellsMatchCounts[key] = r.CellsMatchCounts[key] + int32(c.Count) + continue + } s, ok := c.MatchResult.(string) if !ok { - return r, errors.Wrapf(err, "Failed to convert cellsMatchResult to string") + return r, errors.New("Failed to convert cellsMatchResult to string") } r.CellsMatchCounts[s] = int32(c.Count) } @@ -641,7 +689,7 @@ func (e *Evaluator) buildExperimentReport(ctx context.Context, name string, mana } } - percentiles, err := computePercentilesOfInts(generateTimes, []float64{.9, .95}) + percentiles, err := computePercentilesOfInts(generateTimes, []float64{.5, .75, .9, .95}) if err != nil { return r, errors.Wrapf(err, "Failed to compute percentiles") } @@ -662,10 +710,13 @@ func (e *Evaluator) buildExperimentReport(ctx context.Context, name string, mana return r, nil } +// accumulateAssertionCounts accumulates assertions into the stats map func accumulateAssertionCounts(stats map[v1alpha1.Assertion_Name]*v1alpha1.AssertionCounts, assertions []*v1alpha1.Assertion) { for _, assertion := range assertions { if _, ok := stats[assertion.GetName()]; !ok { - stats[assertion.GetName()] = &v1alpha1.AssertionCounts{} + stats[assertion.GetName()] = &v1alpha1.AssertionCounts{ + Name: assertion.GetName(), + } } switch assertion.GetResult() { @@ -760,11 +811,12 @@ func sortEvalExamplesInTime(examples []*v1alpha1.EvalExample) { }) } -func newAIServiceClient(baseURL string) v1alpha1connect.AIServiceClient { +func newAIServiceClient(baseURL string, opts ...connect.ClientOption) v1alpha1connect.AIServiceClient { // Create a new client client := v1alpha1connect.NewAIServiceClient( newHTTPClient(), baseURL, + opts..., ) return client } diff --git a/app/pkg/eval/evaluator_test.go b/app/pkg/eval/evaluator_test.go index 66355bc..15c8c13 100644 --- a/app/pkg/eval/evaluator_test.go +++ b/app/pkg/eval/evaluator_test.go @@ -6,9 +6,8 @@ import ( "path/filepath" "testing" - "github.com/jlewi/foyle/protos/go/foyle/logs/logspbconnect" - "google.golang.org/protobuf/encoding/protojson" - "gopkg.in/yaml.v3" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/jlewi/foyle/protos/go/foyle/logs/logspbconnect" "google.golang.org/protobuf/encoding/protojson" @@ -276,3 +275,48 @@ func Test_buildExperimentReport(t *testing.T) { } t.Logf("Report: %v", string(reportJson)) } + +func Test_AccumulateAssertionCounts(t *testing.T) { + type testCase struct { + name string + stats map[v1alpha1.Assertion_Name]*v1alpha1.AssertionCounts + assertions []*v1alpha1.Assertion + expected map[v1alpha1.Assertion_Name]*v1alpha1.AssertionCounts + } + + cases := []testCase{ + { + name: "basic", + stats: map[v1alpha1.Assertion_Name]*v1alpha1.AssertionCounts{}, + assertions: []*v1alpha1.Assertion{ + { + Name: v1alpha1.Assertion_ONE_CODE_CELL, + Result: v1alpha1.AssertResult_PASSED, + }, + { + Name: v1alpha1.Assertion_CODE_AFTER_MARKDOWN, + Result: v1alpha1.AssertResult_FAILED, + }, + }, + expected: map[v1alpha1.Assertion_Name]*v1alpha1.AssertionCounts{ + v1alpha1.Assertion_ONE_CODE_CELL: { + Name: v1alpha1.Assertion_ONE_CODE_CELL, + Passed: 1, + }, + v1alpha1.Assertion_CODE_AFTER_MARKDOWN: { + Name: v1alpha1.Assertion_CODE_AFTER_MARKDOWN, + Failed: 1, + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + accumulateAssertionCounts(c.stats, c.assertions) + if d := cmp.Diff(c.expected, c.stats, cmpopts.IgnoreUnexported(v1alpha1.AssertionCounts{})); d != "" { + t.Fatalf("Unexpected diff:\n%+v", d) + } + }) + } +} diff --git a/app/pkg/eval/judge.go b/app/pkg/eval/judge.go index e6508dd..34a82e0 100644 --- a/app/pkg/eval/judge.go +++ b/app/pkg/eval/judge.go @@ -44,6 +44,9 @@ type Judge struct { } func (j *Judge) Score(ctx context.Context, result *v1alpha1.EvalResult) error { + ctx, span := tracer().Start(ctx, "(*Judge).Score") + defer span.End() + if len(result.GetExample().ExpectedCells) != 1 { return errors.New("expected a single expected cell") } diff --git a/app/pkg/eval/results_manager.go b/app/pkg/eval/results_manager.go index f35541b..a76a0a8 100644 --- a/app/pkg/eval/results_manager.go +++ b/app/pkg/eval/results_manager.go @@ -214,7 +214,11 @@ func (m *ResultsManager) ListResults(ctx context.Context, cursor *time.Time, pag } func protoToRowUpdate(result *v1alpha1.EvalResult) (*fsql.UpdateResultParams, error) { - protoJson, err := protojson.Marshal(result) + // Emit default values. Otherwise SQL queries become more complex. + opts := protojson.MarshalOptions{ + EmitDefaultValues: true, + } + protoJson, err := opts.Marshal(result) if err != nil { return nil, errors.Wrapf(err, "Failed to serialize EvalResult to JSON") } diff --git a/app/pkg/eval/tracer.go b/app/pkg/eval/tracer.go new file mode 100644 index 0000000..c440b11 --- /dev/null +++ b/app/pkg/eval/tracer.go @@ -0,0 +1,10 @@ +package eval + +import ( + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/trace" +) + +func tracer() trace.Tracer { + return otel.Tracer("github.com/jlewi/foyle/app/pkg/eval") +}