Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into jlewi/ghostmarkup
Browse files Browse the repository at this point in the history
  • Loading branch information
jlewi committed Oct 16, 2024
2 parents 502d872 + 7300e45 commit c7cc13c
Show file tree
Hide file tree
Showing 11 changed files with 304 additions and 85 deletions.
34 changes: 30 additions & 4 deletions app/pkg/analyze/analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,21 +83,29 @@ type Analyzer struct {
}

// NewAnalyzer creates a new Analyzer.
func NewAnalyzer(logOffsetsFile string, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) {
func NewAnalyzer(logOffsetsFile string, maxDelay time.Duration, rawLogsDB *dbutil.LockingDB[*logspb.LogEntries], tracesDB *pebble.DB, blocksDB *dbutil.LockingDB[*logspb.BlockLog], sessions *SessionsManager) (*Analyzer, error) {
logOffsets, err := initOffsets(logOffsetsFile)
if err != nil {
return nil, err
}

// Create a rate limiting queue for processing files. We rate limit to each file every 30 seconds. This is because
// Create a rate limiting queue for processing files. We rate limit to each file every N seconds. This is because
// The logs are constantly being written to and we don't want to process the files too quickly.
// We are potentially writing to multiple files at the same time e.g. the Analyzer logs and then a different
// log file for each instance of RunMe. So we need to track different backoffs for each file which the rate limiter
// does. Using exponential backoff would make sense when we update processLogFile to detect the end of a trace.
// In that case, after we detect the start of a trace we would want to retry on a very short interval with backoff
// to detect the end of the trace as quickly as possible. Right now we don't do that and in fact we never call
// forget so we will basically max out the retry limit at the max delay.
fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(5*time.Second, 30*time.Second))
// The Max delay is configurable because in some cases (e.g. evaluation, https://github.com/jlewi/foyle/issues/301)
// We want to minimize the time between when a log entry is written and when we process it so the trace is available.

if maxDelay <= 0 {
return nil, errors.New("Max delay must be greater than 0")
}

baseDelay := time.Second
fileQueue := workqueue.NewRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(baseDelay, maxDelay))

sessBuilder, err := NewSessionBuilder(sessions)
if err != nil {
Expand Down Expand Up @@ -128,7 +136,7 @@ func initOffsets(logOffsetsFile string) (*logspb.LogsWaterMark, error) {
watermark := &logspb.LogsWaterMark{}

if err := protojson.Unmarshal(raw, watermark); err != nil {
log.Error(err, "Failed to unmarshal watermarks file %s; watermarks will be reinitialized", logOffsetsFile)
log.Error(err, "Failed to unmarshal watermarks; watermarks will be reinitialized", "file", logOffsetsFile)
}
return watermark, nil
}
Expand Down Expand Up @@ -808,5 +816,23 @@ func combineGenerateTrace(ctx context.Context, entries []*api.LogEntry) (*logspb
trace.EvalMode = evalMode

combineSpans(trace)
dedupeAssertions(trace)
return trace, nil
}

// dedupeAssertions since our processing has at least once semantics we could wind up with duplicate copies
// of an assertion
func dedupeAssertions(trace *logspb.Trace) {
newAssertions := make([]*v1alpha1.Assertion, 0, len(trace.Assertions))
seen := make(map[string]bool)

for _, a := range trace.Assertions {
if seen[a.GetId()] {
continue
}
newAssertions = append(newAssertions, a)
seen[a.GetId()] = true
}

trace.Assertions = newAssertions
}
71 changes: 67 additions & 4 deletions app/pkg/analyze/analyzer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func Test_Analyzer(t *testing.T) {
}

logOffsetsFile := filepath.Join(rawDir, "log_offsets.json")
a, err := NewAnalyzer(logOffsetsFile, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager)
a, err := NewAnalyzer(logOffsetsFile, 3*time.Second, lockingRawDB, tracesDB, lockingBlocksDB, sessionsManager)
if err != nil {
t.Fatalf("Failed to create analyzer: %v", err)
}
Expand Down Expand Up @@ -434,7 +434,7 @@ func Test_CombineGenerateEntries(t *testing.T) {
expectedEvalMode: false,
logFunc: func(log logr.Logger) {
assertion := &v1alpha1.Assertion{
Name: "testassertion",
Name: v1alpha1.Assertion_ONE_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Detail: "",
Id: "1234",
Expand Down Expand Up @@ -498,8 +498,9 @@ func Test_CombineGenerateEntries(t *testing.T) {

c.logFunc(zTestLog)

testLog.Sync()

if err := testLog.Sync(); err != nil {
t.Fatalf("Failed to sync log: %v", err)
}
}

for _, logFile := range logFiles {
Expand Down Expand Up @@ -552,3 +553,65 @@ func Test_CombineGenerateEntries(t *testing.T) {
})
}
}

func Test_DedupeAssertions(t *testing.T) {
type testCase struct {
name string
input []*v1alpha1.Assertion
expected []*v1alpha1.Assertion
}

cases := []testCase{
{
name: "basic",
input: []*v1alpha1.Assertion{
{
Name: v1alpha1.Assertion_ONE_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Id: "1",
},
{
Name: v1alpha1.Assertion_ONE_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Id: "1",
},
{
Name: v1alpha1.Assertion_ENDS_WITH_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Id: "2",
},
},
expected: []*v1alpha1.Assertion{
{
Name: v1alpha1.Assertion_ONE_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Id: "1",
},
{
Name: v1alpha1.Assertion_ENDS_WITH_CODE_CELL,
Result: v1alpha1.AssertResult_PASSED,
Id: "2",
},
},
},
{
name: "nil",
input: nil,
expected: []*v1alpha1.Assertion{},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
trace := &logspb.Trace{
Assertions: c.input,
}

dedupeAssertions(trace)

if d := cmp.Diff(c.expected, trace.Assertions, cmpopts.IgnoreUnexported(v1alpha1.Assertion{})); d != "" {
t.Errorf("Unexpected diff:\n%s", d)
}
})
}
}
3 changes: 2 additions & 1 deletion app/pkg/application/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,8 @@ func (a *App) SetupAnalyzer() (*analyze.Analyzer, error) {
a.sessionsManager = manager
a.sessionsDB = db

analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager)
maxDelay := time.Duration(a.Config.GetLogsMaxDelaySeconds()) * time.Second
analyzer, err := analyze.NewAnalyzer(a.Config.GetLogOffsetsFile(), maxDelay, a.LockingLogEntriesDB, a.TracesDB, a.LockingBlocksDB, manager)
if err != nil {
return nil, err
}
Expand Down
11 changes: 11 additions & 0 deletions app/pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ type Logging struct {
// Use stderr to write to stderr.
// Use gcplogs:///projects/${PROJECT}/logs/${LOGNAME} to write to Google Cloud Logging
Sinks []LogSink `json:"sinks,omitempty" yaml:"sinks,omitempty"`

// MaxDelaySeconds is the maximum delay in seconds to wait before processing the logs.
MaxDelaySeconds int `json:"maxDelaySeconds,omitempty" yaml:"maxDelaySeconds,omitempty"`
}

type LogSink struct {
Expand Down Expand Up @@ -389,6 +392,7 @@ func InitViperInstance(v *viper.Viper, cmd *cobra.Command) error {
v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
v.AutomaticEnv() // read in environment variables that match

setLoggingDefaults(v)
setAgentDefaults(v)
setServerDefaults(v)

Expand Down Expand Up @@ -440,6 +444,10 @@ func (c *Config) APIBaseURL() string {
return fmt.Sprintf("http://%s:%d/%s", c.Server.BindAddress, c.Server.HttpPort, c.APIPrefix())
}

func (c *Config) GetLogsMaxDelaySeconds() int {
return c.Logging.MaxDelaySeconds
}

// GetConfig returns a configuration created from the viper configuration.
func GetConfig() *Config {
if globalV == nil {
Expand Down Expand Up @@ -506,6 +514,9 @@ func (c *Config) Write(cfgFile string) error {
return yaml.NewEncoder(f).Encode(c)
}

func setLoggingDefaults(v *viper.Viper) {
v.SetDefault("logging.maxDelaySeconds", 30)
}
func setServerDefaults(v *viper.Viper) {
v.SetDefault("server.bindAddress", "0.0.0.0")
v.SetDefault("server.httpPort", defaultHTTPPort)
Expand Down
36 changes: 20 additions & 16 deletions app/pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,34 @@ import (

func Test_ConfigDefaultConfig(t *testing.T) {
type testCase struct {
name string
configFile string
expectedRAG bool
expectedHTTPPort int
name string
configFile string
expectedRAG bool
expectedHTTPPort int
expectedMaxDelaySeconds int
}

cases := []testCase{
{
name: "config-file-does-not-exist",
configFile: "doesnotexist.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "config-file-does-not-exist",
configFile: "doesnotexist.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
{
name: "empty-file",
configFile: "empty.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "empty-file",
configFile: "empty.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
{
name: "partial",
configFile: "partial.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
name: "partial",
configFile: "partial.yaml",
expectedRAG: defaultRagEnabled,
expectedHTTPPort: defaultHTTPPort,
expectedMaxDelaySeconds: 30,
},
}

Expand Down
5 changes: 3 additions & 2 deletions app/pkg/config/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func Test_UpdateViperConfig(t *testing.T) {
expression: "agent.model=some-other-model",
expected: &Config{
Logging: Logging{
Level: "info",
Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}},
Level: "info",
Sinks: []LogSink{{JSON: true, Path: "gcplogs:///projects/fred-dev/logs/foyle"}, {Path: "stderr"}},
MaxDelaySeconds: 30,
},
Agent: &api.AgentConfig{
Model: "some-other-model",
Expand Down
Loading

0 comments on commit c7cc13c

Please sign in to comment.