Skip to content

Commit

Permalink
Speedup Eval - Instrument With Otel (#302)
Browse files Browse the repository at this point in the history
# Speedup Evaluation

Per #301, our experiments were really slow. We were observing times of
approximately 30s to 60s to process individual examples. With the new
dataset of 424 examples it would take ~7hours to run an experiment;
significantly impeding iteration.

This PR instruments evaluation with OTEL so that we can see the
bottlenecks in the code. Below are some graphs.

The graph below shows the P95 latency of processing evaluation examples.
We can see its about 30s.
<img width="1148" alt="experiment_traces_p95_latency"
src="https://github.com/user-attachments/assets/7f3d8aba-f3ed-4358-9dbc-4cb7fd292172">

Below is a heatmap showing the duration of waitForBlockLog
<img width="1207" alt="experiment_traces_heatmap_wait_for_block_log"
src="https://github.com/user-attachments/assets/7213fb6e-669d-4b46-b7e1-450154652962">

We can see that its about 20s-30s so it accounts for a large portion of
the duration.

A big source of the latency is the Analyzer uses a rate limiting queue
for reprocessing the logs. The max delay is 30s so we are probably
reprocessing the logs and 30s intervals. So to speed it up we make the
delay configurable so we can use a shorter delay during experiments.

Here is an updated heatmap of the duration of wait for block log.
<img width="1190" alt="Screen Shot 2024-10-15 at 4 34 40 PM"
src="https://github.com/user-attachments/assets/76d75baf-ae45-4c32-8c80-7bf6f1b3387e">

16:25 is when it started running with a maxDelaySeconds of 1s. We can
see this drops the latency down significantly from about 30s to ~6s
  • Loading branch information
jlewi authored Oct 16, 2024
1 parent 43f746c commit 6f3a967
Show file tree
Hide file tree
Showing 9 changed files with 151 additions and 74 deletions.
14 changes: 11 additions & 3 deletions app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,29 @@ type Analyzer struct {
}

// NewAnalyzer creates a new Analyzer.
func NewAnalyzer(logOffsetsFile string, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) {
func NewAnalyzer(logOffsetsFile string, maxDelay time.Duration, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) {
logOffsets, err := initOffsets(logOffsetsFile)
if err != nil {
return nil, err
}

// Create a rate limiting queue for processing files. We rate limit to each file every 30 seconds. This is because
// Create a rate limiting queue for processing files. We rate limit to each file every N seconds. This is because
// The logs are constantly being written to and we don't want to process the files too quickly.
// We are potentially writing to multiple files at the same time e.g. the Analyzer logs and then a different
// log file for each instance of RunMe. So we need to track different backoffs for each file which the rate limiter
// does. Using exponential backoff would make sense when we update processLogFile to detect the end of a trace.
// In that case, after we detect the start of a trace we would want to retry on a very short interval with backoff
// to detect the end of the trace as quickly as possible. Right now we don't do that and in fact we never call
// forget so we will basically max out the retry limit at the max delay.
fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 30*time.Second))
// The Max delay is configurable because in some cases (e.g. evaluation, https://github.com/jlewi/foyle/issues/301)
// We want to minimize the time between when a log entry is written and when we process it so the trace is available.

if maxDelay <= 0 {
return nil, errors.New("Max delay must be greater than 0")
}

baseDelay := time.Second
fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay))

sessBuilder, err := NewSessionBuilder(sessions)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion app/pkg/analyze/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func Test_Analyzer(t *testing.T) {
}

logOffsetsFile := filepath.Join(rawDir, "log_offsets.json")
a, err := NewAnalyzer(logOffsetsFile, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager)
a, err := NewAnalyzer(logOffsetsFile, 3, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager)
if err != nil {
t.Fatalf("Failed to create analyzer: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion app/pkg/application/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func (a *App) SetupAnalyzer() (*analyze.Analyzer, error) {
a.sessionsManager = manager
a.sessionsDB = db

analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager)
maxDelay := time.Duration(a.Config.GetLogsMaxDelaySeconds()) * time.Second
analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), maxDelay, a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions app/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type Logging struct {
// Use stderr to write to stderr.
// Use gcplogs:///projects/${PROJECT}/logs/${LOGNAME} to write to Google Cloud Logging
Sinks []LogSink `json:"sinks,omitempty" yaml:"sinks,omitempty"`

// MaxDelaySeconds is the maximum delay in seconds to wait before processing the logs.
MaxDelaySeconds int `json:"maxDelaySeconds,omitempty" yaml:"maxDelaySeconds,omitempty"`
}

type LogSink struct {
Expand Down Expand Up @@ -389,6 +392,7 @@ func InitViperInstance(v *viper.Viper, cmd *cobra.Command) error {
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv() // read in environment variables that match

setLoggingDefaults(v)
setAgentDefaults(v)
setServerDefaults(v)

Expand Down Expand Up @@ -440,6 +444,10 @@ func (c *Config) APIBaseURL() string {
return fmt.Sprintf("http://%s:%d/%s", c.Server.BindAddress, c.Server.HttpPort, c.APIPrefix())
}

func (c *Config) GetLogsMaxDelaySeconds() int {
return c.Logging.MaxDelaySeconds
}

// GetConfig returns a configuration created from the viper configuration.
func GetConfig() *Config {
if globalV == nil {
Expand Down Expand Up @@ -506,6 +514,9 @@ func (c *Config) Write(cfgFile string) error {
return yaml.NewEncoder(f).Encode(c)
}

func setLoggingDefaults(v *viper.Viper) {
v.SetDefault("logging.maxDelaySeconds", 30)
}
func setServerDefaults(v *viper.Viper) {
v.SetDefault("server.bindAddress", "0.0.0.0")
v.SetDefault("server.httpPort", defaultHTTPPort)
Expand Down
36 changes: 20 additions & 16 deletions app/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,34 @@ import (

func Test_ConfigDefaultConfig(t *testing.T) {
type testCase struct {
name string
configFile string
expectedRAG bool
expectedHTTPPort int
name string
configFile string
expectedRAG bool
expectedHTTPPort int
expectedMaxDelaySeconds int
}

cases := []testCase{
{
name: "config-file-does-not-exist",
configFile: "doesnotexist.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "config-file-does-not-exist",
configFile: "doesnotexist.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
{
name: "empty-file",
configFile: "empty.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "empty-file",
configFile: "empty.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
{
name: "partial",
configFile: "partial.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "partial",
configFile: "partial.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
}

Expand Down
5 changes: 3 additions & 2 deletions app/pkg/config/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func Test_UpdateViperConfig(t *testing.T) {
expression: "agent.model=some-other-model",
expected: &Config{
Logging: Logging{
Level: "info",
Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}},
Level: "info",
Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}},
MaxDelaySeconds: 30,
},
Agent: &api.AgentConfig{
Model: "some-other-model",
Expand Down
141 changes: 90 additions & 51 deletions app/pkg/eval/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ import (
"sort"
"time"

"connectrpc.com/otelconnect"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"google.golang.org/protobuf/encoding/protojson"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -85,11 +89,17 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er
return errors.New("EvalDir is required")
}

aiClient := newAIServiceClient(experiment.Spec.AgentAddress)
otelInterceptor, err := otelconnect.NewInterceptor()
if err != nil {
return errors.Wrapf(err, "Failed to create OpenTelemetry interceptor")
}

aiClient := newAIServiceClient(experiment.Spec.AgentAddress, connect.WithInterceptors(otelInterceptor))

logsClient := logspbconnect.NewLogsServiceClient(
newHTTPClient(),
experiment.Spec.AgentAddress,
connect.WithInterceptors(otelInterceptor),
)

manager, err := openResultsManager(experiment.Spec.OutputDB)
Expand Down Expand Up @@ -141,7 +151,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er
sortEvalExamplesInTime(examples)

// Now generate predictions for any results that are missing them.
if err := e.processExamples(ctx, examples, lastProcessedTime, aiClient, logsClient, manager); err != nil {
if err := e.processExamples(ctx, experiment, examples, lastProcessedTime, aiClient, logsClient, manager); err != nil {
return err
}

Expand Down Expand Up @@ -177,7 +187,7 @@ func (e *Evaluator) Reconcile(ctx context.Context, experiment api.Experiment) er
return nil
}

func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager) error {
func (e *Evaluator) processExamples(ctx context.Context, experiment api.Experiment, examples []*v1alpha1.EvalExample, lastProcessedTime time.Time, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager) error {
oLog := logs.FromContext(ctx)

oaiClient, err := oai.NewClient(e.config)
Expand Down Expand Up @@ -205,62 +215,78 @@ func (e *Evaluator) processExamples(ctx context.Context, examples []*v1alpha1.Ev
}
log.Info("Processing example", "index", eIndex, "numExamples", len(examples))

var processErr error

uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error {
processErr = e.processResult(ctx, result, example, client, logsClient, judge)
// We need to return for the transaction to be committed.
return nil
})

if processErr != nil {
log.Error(processErr, "Failed to process example")
// For now we abort on error to see what's going on.
return processErr
exampleCtx := logr.NewContext(ctx, log)
if err := e.processExample(exampleCtx, experiment.Metadata.Name, example, client, logsClient, manager, judge); err != nil {
return err
}
}
return nil
}

if uErr != nil {
log.Error(uErr, "Failed to update result")
// For now we abort on error to see what's going on.
return uErr
}
func (e *Evaluator) processExample(originalCtx context.Context, name string, example *v1alpha1.EvalExample, client v1alpha1connect.AIServiceClient, logsClient logspbconnect.LogsServiceClient, manager *ResultsManager, judge *Judge) error {
log := logs.FromContext(originalCtx).WithValues("exampleId", example.GetId())
// We need to start a new trace for this example
tp := tracer()
traceCtx, traceSpan := tp.Start(originalCtx, "(*Evaluator).processExample", trace.WithNewRoot(), trace.WithAttributes(attribute.String("experiment", name), attribute.String("exampleId", example.GetId())))
traceId := traceSpan.SpanContext().TraceID()
log = log.WithValues("traceId", traceId.String())
ctx := logr.NewContext(traceCtx, log)
defer traceSpan.End()
log.Info("Start example")
var processErr error

uErr := manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error {
processErr = e.processResult(ctx, result, example, client, logsClient, judge)
// We need to return for the transaction to be committed.
return nil
})

result, err := manager.Get(ctx, example.GetId())
if err != nil {
return errors.Wrapf(err, "Failed to get latest result for example %s", example.GetId())
}
if processErr != nil {
log.Error(processErr, "Failed to process example")
// For now we abort on error to see what's going on.
return processErr
}

if result.Error != "" {
// Generating a completion failed for this example so we should keep going.
// There won't be a blocklog to wait for.
continue
}
if uErr != nil {
log.Error(uErr, "Failed to update result")
// For now we abort on error to see what's going on.
return uErr
}

if err := e.waitForBlockLog(ctx, result, logsClient); err != nil {
log.Error(err, "Failed to wait for block log")
// For now we abort on error to see what's going on.
return errors.Wrapf(err, "Failed to get block log for example %s", example.GetId())
}
result, err := manager.Get(ctx, example.GetId())
if err != nil {
return errors.Wrapf(err, "Failed to get latest result for example %s", example.GetId())
}

var ragErr error
// Getting the bestRAG result depends on the trace having been processed so we run after waiting for the BlockLog
uErr = manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error {
ragErr = e.reconcileBestRAGResult(ctx, result, logsClient)
return nil
})
if result.Error != "" {
// Generating a completion failed for this example so we should keep going.
// There won't be a blocklog to wait for.
return nil
}

if ragErr != nil {
log.Error(ragErr, "Failed to reconcile best RAG result")
// For now we abort on error to see what's going on.
return ragErr
}
if err := e.waitForBlockLog(ctx, result, logsClient); err != nil {
log.Error(err, "Failed to wait for block log")
// For now we abort on error to see what's going on.
return errors.Wrapf(err, "Failed to get block log for example %s", example.GetId())
}

if uErr != nil {
log.Error(uErr, "Failed to update result")
// For now we abort on error to see what's going on.
return uErr
}
var ragErr error
// Getting the bestRAG result depends on the trace having been processed so we run after waiting for the BlockLog
uErr = manager.Update(ctx, example.GetId(), func(result *v1alpha1.EvalResult) error {
ragErr = e.reconcileBestRAGResult(ctx, result, logsClient)
return nil
})

if ragErr != nil {
log.Error(ragErr, "Failed to reconcile best RAG result")
// For now we abort on error to see what's going on.
return ragErr
}

if uErr != nil {
log.Error(uErr, "Failed to update result")
// For now we abort on error to see what's going on.
return uErr
}
return nil
}
Expand All @@ -270,6 +296,8 @@ func (e *Evaluator) processResult(ctx context.Context, result *v1alpha1.EvalResu
result.Example = example
log := logs.FromContext(ctx).WithValues("exampleId", example.GetId())
ctx = logr.NewContext(ctx, log)
ctx, span := tracer().Start(ctx, "(*Evaluator).processResult")
defer span.End()

if err := runGenerate(ctx, result, client); err != nil {
return err
Expand Down Expand Up @@ -302,6 +330,9 @@ func (e *Evaluator) processResult(ctx context.Context, result *v1alpha1.EvalResu
// example then the result will be nil but result.Error will be set
func runGenerate(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha1connect.AIServiceClient) error {
log := logs.FromContext(ctx)
ctx, span := tracer().Start(ctx, "runGenerate")
defer span.End()

// ID for the generate session
genSessionID := ulid.GenerateID()

Expand Down Expand Up @@ -381,6 +412,8 @@ func runGenerate(ctx context.Context, result *v1alpha1.EvalResult, client v1alph
}

func runExecute(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha1connect.AIServiceClient) error {
ctx, span := tracer().Start(ctx, "runExecute")
defer span.End()
log := logs.FromContext(ctx)
// We need to send a LOG event to the agent to simulate the cells being executed.
executeEventReq := &v1alpha1.LogEventsRequest{}
Expand Down Expand Up @@ -442,6 +475,8 @@ func runExecute(ctx context.Context, result *v1alpha1.EvalResult, client v1alpha
}

func (e *Evaluator) waitForBlockLog(ctx context.Context, result *v1alpha1.EvalResult, client logspbconnect.LogsServiceClient) error {
ctx, span := tracer().Start(ctx, "(*Evaluator).waitForBlockLog")
defer span.End()
// We need to wait for the block log to be processed.
// This is done to
// 1. Increase the likelihood we have learned from the block
Expand Down Expand Up @@ -507,6 +542,9 @@ func (e *Evaluator) waitForBlockLog(ctx context.Context, result *v1alpha1.EvalRe
}

func (e *Evaluator) reconcileBestRAGResult(ctx context.Context, evalResult *v1alpha1.EvalResult, client logspbconnect.LogsServiceClient) error {
ctx, span := tracer().Start(ctx, "(*Evaluator).reconcileBestRAGResult")
defer span.End()

if evalResult.GenTraceId == "" {
return errors.WithStack(errors.New("GenTraceId is empty"))
}
Expand Down Expand Up @@ -760,11 +798,12 @@ func sortEvalExamplesInTime(examples []*v1alpha1.EvalExample) {
})
}

func newAIServiceClient(baseURL string) v1alpha1connect.AIServiceClient {
func newAIServiceClient(baseURL string, opts ...connect.ClientOption) v1alpha1connect.AIServiceClient {
// Create a new client
client := v1alpha1connect.NewAIServiceClient(
newHTTPClient(),
baseURL,
opts...,
)
return client
}
3 changes: 3 additions & 0 deletions app/pkg/eval/judge.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type Judge struct {
}

func (j *Judge) Score(ctx context.Context, result *v1alpha1.EvalResult) error {
ctx, span := tracer().Start(ctx, "(*Judge).Score")
defer span.End()

if len(result.GetExample().ExpectedCells) != 1 {
return errors.New("expected a single expected cell")
}
Expand Down
Loading

0 comments on commit 6f3a967

Please sign in to comment.