Skip to content

Commit

Permalink
fix: only ignore messages that are coming from the node itself (#1438)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

Previously, the logic to ignore messages were wrong. We should only
ignore messages that produced by the same sender as the current node

## Short description of the changes

- fix the sender id checking code
- add more tests
  • Loading branch information
VinozzZ authored Nov 18, 2024
1 parent a0e461e commit fa1520c
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 8 deletions.
7 changes: 3 additions & 4 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2256,9 +2256,8 @@ func TestSendDropDecisions(t *testing.T) {
close(coll.dropDecisionBuffer)
droppedMessage := <-messages

peerID, err := coll.Peers.GetInstanceID()
require.NoError(t, err)
decompressedData, err := newDroppedTraceDecision(droppedMessage, peerID)
// pretend we are a peer that received the message
decompressedData, err := newDroppedTraceDecision(droppedMessage, "another-peer")
assert.NoError(t, err)
droppedTraceID := make([]string, 0)
for _, td := range decompressedData {
Expand Down Expand Up @@ -2289,7 +2288,7 @@ func TestSendDropDecisions(t *testing.T) {
close(coll.dropDecisionBuffer)
droppedMessage = <-messages

decompressedData, err = newDroppedTraceDecision(droppedMessage, peerID)
decompressedData, err = newDroppedTraceDecision(droppedMessage, "another-peer")
assert.NoError(t, err)
droppedTraceID = make([]string, 0)
for _, td := range decompressedData {
Expand Down
6 changes: 4 additions & 2 deletions collect/trace_decision.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func newDroppedTraceDecision(msg string, senderID string) ([]TraceDecision, erro
return nil, fmt.Errorf("invalid dropped decision message")
}

if msg[:separatorIdx] != senderID {
// If the sender ID is the same as the current service, ignore the message
if msg[:separatorIdx] == senderID {
return nil, nil
}

Expand Down Expand Up @@ -108,7 +109,8 @@ func newKeptTraceDecision(msg string, senderID string) ([]TraceDecision, error)
return nil, fmt.Errorf("invalid dropped decision message")
}

if msg[:separatorIdx] != senderID {
// If the sender ID is the same as the current service, ignore the message
if msg[:separatorIdx] == senderID {
return nil, nil
}

Expand Down
24 changes: 22 additions & 2 deletions collect/trace_decision_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,23 @@ func TestDropDecisionRoundTrip(t *testing.T) {
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 2: Decompress the message back to TraceDecision using newDroppedTraceDecision
decompressedTds, err := newDroppedTraceDecision(msg, "sender1")
decompressedTds, err := newDroppedTraceDecision(msg, "sender2")
assert.NoError(t, err, "expected no error during decompression of the dropped decision message")
assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original")

// Step 3: Verify that the decompressed data matches the original TraceDecision data
for i, td := range decompressedTds {
assert.Equal(t, td.TraceID, tds[i].TraceID, "expected TraceID to match")
}

// Make sure we only ignore messages that are produced from the same node
msg, err = newDroppedDecisionMessage(tds, "sender1")
assert.NoError(t, err, "expected no error for valid dropped decision message")
assert.NotEmpty(t, msg, "expected non-empty message")

decompressedTds, err = newDroppedTraceDecision(msg, "sender1")
assert.NoError(t, err, "expected no error during decompression of the dropped decision message")
assert.Empty(t, decompressedTds)
}

func TestKeptDecisionRoundTrip(t *testing.T) {
Expand Down Expand Up @@ -73,7 +82,7 @@ func TestKeptDecisionRoundTrip(t *testing.T) {
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 2: Decompress the message back to TraceDecision using newKeptTraceDecision
decompressedTds, err := newKeptTraceDecision(msg, "sender1")
decompressedTds, err := newKeptTraceDecision(msg, "sender2")
assert.NoError(t, err, "expected no error during decompression of the kept decision message")
assert.Len(t, decompressedTds, len(tds), "expected decompressed TraceDecision length to match original")

Expand All @@ -91,6 +100,17 @@ func TestKeptDecisionRoundTrip(t *testing.T) {
assert.Equal(t, td.EventCount, tds[i].EventCount, "expected EventCount to match")
assert.Equal(t, td.LinkCount, tds[i].LinkCount, "expected LinkCount to match")
}

// Make sure we only ignore messages that are produced from the same node
msg, err = newKeptDecisionMessage(tds, "sender1")
assert.NoError(t, err, "expected no error for valid kept decision message")
assert.NotEmpty(t, msg, "expected non-empty message")

// Step 2: Decompress the message back to TraceDecision using newKeptTraceDecision
decompressedTds, err = newKeptTraceDecision(msg, "sender1")
assert.NoError(t, err, "expected no error during decompression of the kept decision message")
assert.Empty(t, decompressedTds)

}

// used in test only
Expand Down

0 comments on commit fa1520c

Please sign in to comment.