diff --git a/.github/workflows/e2e-test.yaml b/.github/workflows/e2e-test.yaml index bff62f89..76912a3b 100644 --- a/.github/workflows/e2e-test.yaml +++ b/.github/workflows/e2e-test.yaml @@ -24,6 +24,9 @@ env: jobs: e2e-test: timeout-minutes: 5 + strategy: + matrix: + workers: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] runs-on: ubuntu-latest steps: - name: Checkout @@ -64,7 +67,7 @@ jobs: SNIFFER_FLAGS="--set-string networkMapper.sniffer.repository=${{ env.REGISTRY }} --set-string networkMapper.sniffer.image=sniffer --set-string networkMapper.sniffer.tag=${{ inputs.sniffer-tag }} --set-string networkMapper.sniffer.pullPolicy=Never" TELEMETRY_FLAG="--set global.telemetry.enabled=false" helm dep up ./helm-charts/otterize-kubernetes - helm install otterize ./helm-charts/otterize-kubernetes -n otterize-system --create-namespace $MAPPER_FLAGS $SNIFFER_FLAGS $TELEMETRY_FLAG + helm install otterize ./helm-charts/otterize-kubernetes -n otterize-system --create-namespace --set networkMapper.debug=true $MAPPER_FLAGS $SNIFFER_FLAGS $TELEMETRY_FLAG - name: Install CLI run: |- @@ -89,13 +92,27 @@ jobs: - name: Test Policy Export run: |- + # wait for 2 intents to be discovered with timeout of 30 seconds. # sleeps 10 because this is the report interval from the watcher to the mapper - for i in 1 2 3 4 5 + for i in {1..5} do - if [ `otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq ". | length"` != 2 ]; then echo "wait for discovered intents"; sleep 10 ; fi + if [ `otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq ". | length"` != 2 ]; then + echo "wait for discovered intents"; + echo _SNIFFER LOGS_ + kubectl logs --since=15s -n otterize-system -l app=otterize-network-sniffer + echo _MAPPER LOGS_ + kubectl logs --since=15s -n otterize-system -l app=otterize-network-mapper + sleep 10 ; + fi done + echo Outputting all logs + echo _SNIFFER LOGS_ + kubectl logs -n otterize-system -l app=otterize-network-sniffer --tail=-1 + echo _MAPPER LOGS_ + kubectl logs -n otterize-system -l app=otterize-network-mapper --tail=-1 + echo "export intents and compare to expected file" otterize network-mapper export --telemetry-enabled=false -n otterize-tutorial-mapper --format=json | jq 'sort_by(.metadata.namespace + .metadata.name)' > /tmp/intents.json diff .github/workflows/tests-expected-results/simple-tutorial-intents.json /tmp/intents.json diff --git a/src/sniffer/pkg/collectors/dnssniffer.go b/src/sniffer/pkg/collectors/dnssniffer.go index e1058736..2a7663c4 100644 --- a/src/sniffer/pkg/collectors/dnssniffer.go +++ b/src/sniffer/pkg/collectors/dnssniffer.go @@ -66,11 +66,12 @@ func (s *DNSSniffer) HandlePacket(packet gopacket.Packet) { hostname, err := s.resolver.ResolveIP(ip.DstIP.String()) if err != nil { logrus.Debugf("Can't resolve IP addr %s, skipping", ip.DstIP.String()) + } else { + // Resolver cache could be outdated, verify same resolving result after next poll + s.pending = append(s.pending, pendingCapture{ + srcIp: ip.DstIP.String(), srcHostname: hostname, dest: string(answer.Name), time: captureTime, + }) } - // Resolver cache could be outdated, verify same resolving result after next poll - s.pending = append(s.pending, pendingCapture{ - srcIp: ip.DstIP.String(), srcHostname: hostname, dest: string(answer.Name), time: captureTime, - }) } } } diff --git a/src/sniffer/pkg/ipresolver/process_monitor.go b/src/sniffer/pkg/ipresolver/process_monitor.go index 3cb4b4c3..5d701a16 100644 --- a/src/sniffer/pkg/ipresolver/process_monitor.go +++ b/src/sniffer/pkg/ipresolver/process_monitor.go @@ -2,26 +2,34 @@ package ipresolver import ( "github.com/otterize/network-mapper/src/sniffer/pkg/utils" + "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/util/sets" ) +const MaxRetries = 3 + +// ProcessMonitorCallback Should be idempotent on failures because retried on error +type ProcessMonitorCallback func(pid int64, pDir string) error + type ProcessMonitor struct { - processes sets.Set[int64] - onProcNew utils.ProcessScanCallback - onProcExit utils.ProcessScanCallback - forEachProcess utils.ProcessScanner + processes sets.Set[int64] + failingProcesses map[int64]int + onProcNew ProcessMonitorCallback + onProcExit ProcessMonitorCallback + forEachProcess utils.ProcessScanner } func NewProcessMonitor( - onProcNew utils.ProcessScanCallback, - onProcExit utils.ProcessScanCallback, + onProcNew ProcessMonitorCallback, + onProcExit ProcessMonitorCallback, forEachProcess utils.ProcessScanner, ) *ProcessMonitor { return &ProcessMonitor{ - processes: sets.New[int64](), - onProcNew: onProcNew, - onProcExit: onProcExit, - forEachProcess: forEachProcess, + processes: sets.New[int64](), + failingProcesses: make(map[int64]int), + onProcNew: onProcNew, + onProcExit: onProcExit, + forEachProcess: forEachProcess, } } @@ -29,19 +37,34 @@ func (pm *ProcessMonitor) Poll() error { processesSeenLastTime := pm.processes.Clone() pm.processes = sets.New[int64]() - err := pm.forEachProcess(func(pid int64, pDir string) { + if err := pm.forEachProcess(func(pid int64, pDir string) { if !processesSeenLastTime.Has(pid) { - pm.onProcNew(pid, pDir) + if err := pm.onProcNew(pid, pDir); err != nil { + // Failed to handle + failures := 0 + if _, ok := pm.failingProcesses[pid]; ok { + failures = pm.failingProcesses[pid] + } + failures++ + if failures <= MaxRetries { + // Try again next interval + pm.failingProcesses[pid] = failures + return // Don't insert pid to handled set + } else { + logrus.Debugf("Giving up failing process: %d", pid) + delete(pm.failingProcesses, pid) + } + } } + // Shouldn't handle again pm.processes.Insert(pid) - }) - if err != nil { + }); err != nil { return err } exitedProcesses := processesSeenLastTime.Difference(pm.processes) for _, pid := range exitedProcesses.UnsortedList() { - pm.onProcExit(pid, "") + _ = pm.onProcExit(pid, "") } return nil diff --git a/src/sniffer/pkg/ipresolver/process_montior_test.go b/src/sniffer/pkg/ipresolver/process_montior_test.go index aff8b43d..e84dacd8 100644 --- a/src/sniffer/pkg/ipresolver/process_montior_test.go +++ b/src/sniffer/pkg/ipresolver/process_montior_test.go @@ -19,12 +19,14 @@ func (s *ProcessMonitorTestSuite) SetupTest() { s.processMonitor = NewProcessMonitor(s.onNew, s.onExit, s.scanPids) } -func (s *ProcessMonitorTestSuite) onNew(pid int64, pDir string) { +func (s *ProcessMonitorTestSuite) onNew(pid int64, _ string) error { s.pidCalledNew = append(s.pidCalledNew, pid) + return nil } -func (s *ProcessMonitorTestSuite) onExit(pid int64, pDir string) { +func (s *ProcessMonitorTestSuite) onExit(pid int64, _ string) error { s.pidCalledExit = append(s.pidCalledExit, pid) + return nil } func (s *ProcessMonitorTestSuite) scanPids(callback utils.ProcessScanCallback) error { diff --git a/src/sniffer/pkg/ipresolver/procfs_resolver.go b/src/sniffer/pkg/ipresolver/procfs_resolver.go index edc1cca6..4d33f565 100644 --- a/src/sniffer/pkg/ipresolver/procfs_resolver.go +++ b/src/sniffer/pkg/ipresolver/procfs_resolver.go @@ -33,24 +33,25 @@ func (r *ProcFSIPResolver) ResolveIP(ipaddr string) (hostname string, err error) if hostInfo, ok := r.byAddr[ipaddr]; ok { return hostInfo.Hostname, nil } - return "", errors.New("IP not found") + return "", errors.New("ip not found") } func (r *ProcFSIPResolver) Refresh() error { return r.monitor.Poll() } -func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) { - hostname, err := utils.ExtractProcessHostname(pDir) +func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) (err error) { + var hostname, ipaddr string + hostname, err = utils.ExtractProcessHostname(pDir) if err != nil { logrus.Debugf("Failed to extract hostname for process %d: %v", pid, err) - return + return err } - ipaddr, err := utils.ExtractProcessIPAddr(pDir) + ipaddr, err = utils.ExtractProcessIPAddr(pDir) if err != nil { logrus.Debugf("Failed to extract IP address for process %d: %v", pid, err) - return + return err } if entry, ok := r.byAddr[ipaddr]; ok { @@ -58,7 +59,8 @@ func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) { // Already mapped to this hostname, add another process reference r.byPid[pid] = entry entry.ProcessRefCount++ - return + logrus.Debugf("Mapping %s:%s already exists, increased refcount to %d", ipaddr, hostname, entry.ProcessRefCount) + return nil } else { // Shouldn't happen - it could happen if an ip replaces its pod very fast and the current single scan sees the new process and not the older one logrus.Warnf("IP mapping conflict: %s got new hostname %s, but already mapped to %s. Would use the newer hostname", ipaddr, hostname, entry.Hostname) @@ -66,6 +68,7 @@ func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) { } } + logrus.Debugf("Found new mapping %s:%s", ipaddr, hostname) newEntry := &ProcFSIPResolverEntry{ IPAddr: ipaddr, Hostname: hostname, @@ -73,13 +76,14 @@ func (r *ProcFSIPResolver) onProcessNew(pid int64, pDir string) { } r.byPid[pid] = newEntry r.byAddr[ipaddr] = newEntry + return nil } -func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) { +func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) error { if entry, ok := r.byPid[pid]; !ok { // Shouldn't happen logrus.Debugf("Unknown process %d exited", pid) - return + return nil } else { entry.ProcessRefCount-- if entry.ProcessRefCount == 0 { @@ -93,4 +97,5 @@ func (r *ProcFSIPResolver) onProcessExit(pid int64, _ string) { // Remove process from pid map delete(r.byPid, pid) } + return nil } diff --git a/src/sniffer/pkg/ipresolver/procfs_resolver_test.go b/src/sniffer/pkg/ipresolver/procfs_resolver_test.go index 8035cdbc..7419c6f6 100644 --- a/src/sniffer/pkg/ipresolver/procfs_resolver_test.go +++ b/src/sniffer/pkg/ipresolver/procfs_resolver_test.go @@ -130,14 +130,14 @@ func (s *ProcFSIPResolverTestSuite) TestResolverRefCount() { s.Require().Equal("service-2", hostname) s.mockKillProcess(20) - s.mockKillProcess(21) + s.mockKillProcess(22) _ = s.resolver.Refresh() hostname, err = s.resolver.ResolveIP("172.17.0.2") s.Require().NoError(err) s.Require().Equal("service-2", hostname) - s.mockKillProcess(22) + s.mockKillProcess(21) _ = s.resolver.Refresh() hostname, err = s.resolver.ResolveIP("172.17.0.2")