Skip to content

Commit

Permalink
Fix kafka reader, change log format
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremy5189 committed Oct 12, 2023
1 parent e8f7bef commit 93e47b7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 10 deletions.
3 changes: 2 additions & 1 deletion internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,8 @@ func updateExpiringDecisionLists(
log.Println("From baskerville", fromBaskerville)
}

purgeNginxAuthCacheForIp(ip)
// XXX We are not using nginx to banjax cache feature yet
// purgeNginxAuthCacheForIp(ip)
expires := now.Add(time.Duration(config.ExpiringDecisionTtlSeconds) * time.Second)
(*decisionLists).ExpiringDecisionLists[ip] = ExpiringDecision{newDecision, expires, fromBaskerville}
}
Expand Down
24 changes: 15 additions & 9 deletions internal/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ sample baskerville message:
}
*/
type commandMessage struct {
Name string
Value string
Host string `json:"host"`
Name string
Value string
Host string `json:"host"`
SessionId string `json:"session_id"`
Source string `json:"source"`
}

func getDialer(config *Config) *kafka.Dialer {
Expand Down Expand Up @@ -87,10 +89,11 @@ func RunKafkaReader(
// XXX this infinite loop is so we reconnect if we get dropped.
for {
r := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaBrokers,
GroupID: uuid.New().String(),
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
Brokers: config.KafkaBrokers,
GroupID: uuid.New().String(),
StartOffset: kafka.LastOffset,
Topic: config.KafkaCommandTopic,
Dialer: getDialer(config),
})
r.SetOffset(kafka.LastOffset)
defer r.Close()
Expand All @@ -109,15 +112,18 @@ func RunKafkaReader(
continue // XXX what to do here?
}

log.Printf("KAFKA: message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
// log.Printf("KAFKA: message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))

command := commandMessage{}
err = json.Unmarshal(m.Value, &command)
if err != nil {
log.Println("KAFKA: json.Unmarshal() failed")
log.Printf("KAFKA: Unmarshal failed %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
continue
}

log.Printf("KAFKA: message %s (%d/%d) = N: %s, V: %s, S: %s: Src: %s\n",
string(m.Key), m.Offset, m.Partition, command.Name, command.Value, command.SessionId, command.Source)

handleCommand(
config,
command,
Expand Down

0 comments on commit 93e47b7

Please sign in to comment.