Skip to content

Commit

Permalink
[Bugfix] ProcFSIPResolver: Retry handling new processes if mapping ex…
Browse files Browse the repository at this point in the history
…traction fails (#122)

Co-authored-by: Ori Shoshan <[email protected]>
  • Loading branch information
roekatz and orishoshan authored Jul 20, 2023
1 parent a475e52 commit aa8ef3e
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 35 deletions.
23 changes: 20 additions & 3 deletions .github/workflows/e2e-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ env:
jobs:
e2e-test:
timeout-minutes: 5
strategy:
matrix:
workers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
runs-on: ubuntu-latest
steps:
- name: Checkout
Expand Down Expand Up @@ -64,7 +67,7 @@ jobs:
SNIFFER_FLAGS="--set-string networkMapper.sniffer.repository=${{ env.REGISTRY }} --set-string networkMapper.sniffer.image=sniffer --set-string networkMapper.sniffer.tag=${{ inputs.sniffer-tag }} --set-string networkMapper.sniffer.pullPolicy=Never"
TELEMETRY_FLAG="--set global.telemetry.enabled=false"
helm dep up ./helm-charts/otterize-kubernetes
helm install otterize ./helm-charts/otterize-kubernetes -n otterize-system --create-namespace $MAPPER_FLAGS $SNIFFER_FLAGS $TELEMETRY_FLAG
helm install otterize ./helm-charts/otterize-kubernetes -n otterize-system --create-namespace --set networkMapper.debug=true $MAPPER_FLAGS $SNIFFER_FLAGS $TELEMETRY_FLAG
- name: Install CLI
run: |-
Expand All @@ -89,13 +92,27 @@ jobs:
- name: Test Policy Export
run: |-
# wait for 2 intents to be discovered with timeout of 30 seconds.
# sleeps 10 because this is the report interval from the watcher to the mapper
for i in 1 2 3 4 5
for i in {1..5}
do
if [ `otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq ". | length"` != 2 ]; then echo "wait for discovered intents"; sleep 10 ; fi
if [ `otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq ". | length"` != 2 ]; then
echo "wait for discovered intents";
echo _SNIFFER LOGS_
kubectl logs --since=15s -n otterize-system -l app=otterize-network-sniffer
echo _MAPPER LOGS_
kubectl logs --since=15s -n otterize-system -l app=otterize-network-mapper
sleep 10 ;
fi
done
echo Outputting all logs
echo _SNIFFER LOGS_
kubectl logs -n otterize-system -l app=otterize-network-sniffer --tail=-1
echo _MAPPER LOGS_
kubectl logs -n otterize-system -l app=otterize-network-mapper --tail=-1
echo "export intents and compare to expected file"
otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq 'sort_by(.metadata.namespace + .metadata.name)' > /tmp/intents.json
diff .github/workflows/tests-expected-results/simple-tutorial-intents.json /tmp/intents.json
Expand Down
9 changes: 5 additions & 4 deletions src/sniffer/pkg/collectors/dnssniffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,12 @@ func (s *DNSSniffer) HandlePacket(packet gopacket.Packet) {
hostname, err := s.resolver.ResolveIP(ip.DstIP.String())
if err != nil {
logrus.Debugf("Can't resolve IP addr %s, skipping", ip.DstIP.String())
} else {
// Resolver cache could be outdated, verify same resolving result after next poll
s.pending = append(s.pending, pendingCapture{
srcIp: ip.DstIP.String(), srcHostname: hostname, dest: string(answer.Name), time: captureTime,
})
}
// Resolver cache could be outdated, verify same resolving result after next poll
s.pending = append(s.pending, pendingCapture{
srcIp: ip.DstIP.String(), srcHostname: hostname, dest: string(answer.Name), time: captureTime,
})
}
}
}
Expand Down
53 changes: 38 additions & 15 deletions src/sniffer/pkg/ipresolver/process_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,69 @@ package ipresolver

import (
"github.com/otterize/network-mapper/src/sniffer/pkg/utils"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/util/sets"
)

const MaxRetries = 3

// ProcessMonitorCallback Should be idempotent on failures because retried on error
type ProcessMonitorCallback func(pid int64, pDir string) error

type ProcessMonitor struct {
processes sets.Set[int64]
onProcNew utils.ProcessScanCallback
onProcExit utils.ProcessScanCallback
forEachProcess utils.ProcessScanner
processes sets.Set[int64]
failingProcesses map[int64]int
onProcNew ProcessMonitorCallback
onProcExit ProcessMonitorCallback
forEachProcess utils.ProcessScanner
}

func NewProcessMonitor(
onProcNew utils.ProcessScanCallback,
onProcExit utils.ProcessScanCallback,
onProcNew ProcessMonitorCallback,
onProcExit ProcessMonitorCallback,
forEachProcess utils.ProcessScanner,
) *ProcessMonitor {
return &ProcessMonitor{
processes: sets.New[int64](),
onProcNew: onProcNew,
onProcExit: onProcExit,
forEachProcess: forEachProcess,
processes: sets.New[int64](),
failingProcesses: make(map[int64]int),
onProcNew: onProcNew,
onProcExit: onProcExit,
forEachProcess: forEachProcess,
}
}

func (pm *ProcessMonitor) Poll() error {
processesSeenLastTime := pm.processes.Clone()
pm.processes = sets.New[int64]()

err := pm.forEachProcess(func(pid int64, pDir string) {
if err := pm.forEachProcess(func(pid int64, pDir string) {
if !processesSeenLastTime.Has(pid) {
pm.onProcNew(pid, pDir)
if err := pm.onProcNew(pid, pDir); err != nil {
// Failed to handle
failures := 0
if _, ok := pm.failingProcesses[pid]; ok {
failures = pm.failingProcesses[pid]
}
failures++
if failures <= MaxRetries {
// Try again next interval
pm.failingProcesses[pid] = failures
return // Don't insert pid to handled set
} else {
logrus.Debugf("Giving up failing process: %d", pid)
delete(pm.failingProcesses, pid)
}
}
}
// Shouldn't handle again
pm.processes.Insert(pid)
})
if err != nil {
}); err != nil {
return err
}

exitedProcesses := processesSeenLastTime.Difference(pm.processes)
for _, pid := range exitedProcesses.UnsortedList() {
pm.onProcExit(pid, "")
_ = pm.onProcExit(pid, "")
}

return nil
Expand Down
6 changes: 4 additions & 2 deletions src/sniffer/pkg/ipresolver/process_montior_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ func (s *ProcessMonitorTestSuite) SetupTest() {
s.processMonitor = NewProcessMonitor(s.onNew, s.onExit, s.scanPids)
}

func (s *ProcessMonitorTestSuite) onNew(pid int64, pDir string) {
func (s *ProcessMonitorTestSuite) onNew(pid int64, _ string) error {
s.pidCalledNew = append(s.pidCalledNew, pid)
return nil
}

func (s *ProcessMonitorTestSuite) onExit(pid int64, pDir string) {
func (s *ProcessMonitorTestSuite) onExit(pid int64, _ string) error {
s.pidCalledExit = append(s.pidCalledExit, pid)
return nil
}

func (s *ProcessMonitorTestSuite) scanPids(callback utils.ProcessScanCallback) error {
Expand Down
23 changes: 14 additions & 9 deletions src/sniffer/pkg/ipresolver/procfs_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,57 @@ func (r *ProcFSIPResolver) ResolveIP(ipaddr string) (hostname string, err error)
if hostInfo, ok := r.byAddr[ipaddr]; ok {
return hostInfo.Hostname, nil
}
return "", errors.New("IP not found")
return "", errors.New("ip not found")
}

func (r *ProcFSIPResolver) Refresh() error {
return r.monitor.Poll()
}

func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) {
hostname, err := utils.ExtractProcessHostname(pDir)
func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) (err error) {
var hostname, ipaddr string
hostname, err = utils.ExtractProcessHostname(pDir)
if err != nil {
logrus.Debugf("Failed to extract hostname for process %d: %v", pid, err)
return
return err
}

ipaddr, err := utils.ExtractProcessIPAddr(pDir)
ipaddr, err = utils.ExtractProcessIPAddr(pDir)
if err != nil {
logrus.Debugf("Failed to extract IP address for process %d: %v", pid, err)
return
return err
}

if entry, ok := r.byAddr[ipaddr]; ok {
if entry.Hostname == hostname {
// Already mapped to this hostname, add another process reference
r.byPid[pid] = entry
entry.ProcessRefCount++
return
logrus.Debugf("Mapping %s:%s already exists, increased refcount to %d", ipaddr, hostname, entry.ProcessRefCount)
return nil
} else {
// Shouldn't happen - it could happen if an ip replaces its pod very fast and the current single scan sees the new process and not the older one
logrus.Warnf("IP mapping conflict: %s got new hostname %s, but already mapped to %s. Would use the newer hostname", ipaddr, hostname, entry.Hostname)
// For now, treat it as a new IP mapping (make sure at exit to decrement ref count only if hostname matches)
}
}

logrus.Debugf("Found new mapping %s:%s", ipaddr, hostname)
newEntry := &ProcFSIPResolverEntry{
IPAddr: ipaddr,
Hostname: hostname,
ProcessRefCount: 1,
}
r.byPid[pid] = newEntry
r.byAddr[ipaddr] = newEntry
return nil
}

func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) {
func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) error {
if entry, ok := r.byPid[pid]; !ok {
// Shouldn't happen
logrus.Debugf("Unknown process %d exited", pid)
return
return nil
} else {
entry.ProcessRefCount--
if entry.ProcessRefCount == 0 {
Expand All @@ -93,4 +97,5 @@ func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) {
// Remove process from pid map
delete(r.byPid, pid)
}
return nil
}
4 changes: 2 additions & 2 deletions src/sniffer/pkg/ipresolver/procfs_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ func (s *ProcFSIPResolverTestSuite) TestResolverRefCount() {
s.Require().Equal("service-2", hostname)

s.mockKillProcess(20)
s.mockKillProcess(21)
s.mockKillProcess(22)
_ = s.resolver.Refresh()

hostname, err = s.resolver.ResolveIP("172.17.0.2")
s.Require().NoError(err)
s.Require().Equal("service-2", hostname)

s.mockKillProcess(22)
s.mockKillProcess(21)
_ = s.resolver.Refresh()

hostname, err = s.resolver.ResolveIP("172.17.0.2")
Expand Down

0 comments on commit aa8ef3e

Please sign in to comment.