From 0eac5f0f29db07ce77c620e95e530cf3ce5821da Mon Sep 17 00:00:00 2001 From: Yingrong Zhao <22300958+VinozzZ@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:14:29 -0400 Subject: [PATCH] feat: forward decision span through peer endpoint (#1342) ## Which problem is this PR solving? implements: #1318 #1326 ## Short description of the changes - Add a new config option `ForceTraceLocality` to turn off trace distribution feature - forward decision spans when peer membership changes --- app/app_test.go | 165 ++++++++++++++++++++++++-------- collect/collect.go | 78 ++++++++++++++- collect/collect_test.go | 116 +++++++++++++++++++++- config/file_config.go | 1 + config/metadata/configMeta.yaml | 10 ++ route/route.go | 36 +++---- types/event.go | 8 +- 7 files changed, 351 insertions(+), 63 deletions(-) diff --git a/app/app_test.go b/app/app_test.go index 3b3904cc4c..b36f6c15f1 100644 --- a/app/app_test.go +++ b/app/app_test.go @@ -36,6 +36,7 @@ import ( "github.com/honeycombio/refinery/sample" "github.com/honeycombio/refinery/sharder" "github.com/honeycombio/refinery/transmit" + "github.com/honeycombio/refinery/types" ) const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e" @@ -88,14 +89,8 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) { } } -func newStartedApp( - t testing.TB, - libhoneyT transmission.Sender, - basePort int, - peers peer.Peers, - enableHostMetadata bool, -) (*App, inject.Graph) { - c := &config.MockConfig{ +func defaultConfig(basePort int) *config.MockConfig { + return &config.MockConfig{ GetTracesConfigVal: config.TracesConfig{ SendTicker: config.Duration(2 * time.Millisecond), SendDelay: config.Duration(1 * time.Millisecond), @@ -109,8 +104,7 @@ func newStartedApp( GetListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort), GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1), GetHoneycombAPIVal: "http://api.honeycomb.io", - GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second)}, - AddHostMetadataToTrace: enableHostMetadata, + GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second), EnableTraceLocality: true}, TraceIdFieldNames: []string{"trace.trace_id"}, ParentIdFieldNames: []string{"trace.parent_id"}, SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)}, @@ -119,7 +113,16 @@ func newStartedApp( AcceptOnlyListedKeys: true, }, } +} +func newStartedApp( + t testing.TB, + libhoneyT transmission.Sender, + peerTransmission transmission.Sender, + peers peer.Peers, + cfg *config.MockConfig, +) (*App, inject.Graph) { + c := cfg var err error if peers == nil { peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}} @@ -158,13 +161,13 @@ func newStartedApp( }) assert.NoError(t, err) - sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer")) - peerClient, err := libhoney.NewClient(libhoney.ClientConfig{ - Transmission: &transmission.Honeycomb{ - MaxBatchSize: c.GetTracesConfigVal.MaxBatchSize, + if peerTransmission == nil { + sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer")) + peerTransmission = &transmission.Honeycomb{ + MaxBatchSize: cfg.GetTracesConfigVal.MaxBatchSize, BatchTimeout: libhoney.DefaultBatchTimeout, MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches, - PendingWorkCapacity: uint(c.GetPeerBufferSize()), + PendingWorkCapacity: uint(cfg.GetPeerBufferSize()), Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, Dial: (&net.Dialer{ @@ -175,7 +178,10 @@ func newStartedApp( DisableGzipCompression: true, EnableMsgpackEncoding: true, Metrics: sdPeer, - }, + } + } + peerClient, err := libhoney.NewClient(libhoney.ClientConfig{ + Transmission: peerTransmission, }) assert.NoError(t, err) @@ -210,7 +216,7 @@ func newStartedApp( assert.NoError(t, err) // Racy: wait just a moment for ListenAndServe to start up. - time.Sleep(10 * time.Millisecond) + time.Sleep(15 * time.Millisecond) return &a, g } @@ -227,7 +233,8 @@ func TestAppIntegration(t *testing.T) { port := 10500 sender := &transmission.MockSender{} - app, graph := newStartedApp(t, sender, port, nil, false) + cfg := defaultConfig(port) + app, graph := newStartedApp(t, sender, nil, nil, cfg) // Send a root span, it should be sent in short order. req := httptest.NewRequest( @@ -264,7 +271,8 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) { port := 10600 sender := &transmission.MockSender{} - a, graph := newStartedApp(t, sender, port, nil, false) + cfg := defaultConfig(port) + a, graph := newStartedApp(t, sender, nil, nil, cfg) a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) @@ -304,7 +312,8 @@ func TestAppIntegrationWithUnauthorizedKey(t *testing.T) { port := 10700 sender := &transmission.MockSender{} - a, graph := newStartedApp(t, sender, port, nil, false) + cfg := defaultConfig(port) + a, graph := newStartedApp(t, sender, nil, nil, cfg) a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) @@ -345,7 +354,9 @@ func TestPeerRouting(t *testing.T) { Peers: peerList, ID: peerList[i], } - apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false) + cfg := defaultConfig(basePort) + + apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg) defer startstop.Stop(graph.Objects(), nil) } @@ -402,7 +413,6 @@ func TestPeerRouting(t *testing.T) { }, } assert.Equal(t, expectedEvent, senders[0].Events()[0]) - // Repeat, but deliver to host 1 on the peer channel, it should be // passed to host 0 since that's who the trace belongs to. req, err = http.NewRequest( @@ -416,10 +426,16 @@ func TestPeerRouting(t *testing.T) { req.Body = io.NopCloser(strings.NewReader(blob)) post(t, req) - assert.Eventually(t, func() bool { - return len(senders[0].Events()) == 1 + require.Eventually(t, func() bool { + return len(senders[0].Events()) == 2 }, 2*time.Second, 2*time.Millisecond) - assert.Equal(t, expectedEvent, senders[0].Events()[0]) + expectedEvent.Metadata = map[string]any{ + "api_host": "http://api.honeycomb.io", + "dataset": "dataset", + "environment": "", + "enqueued_at": senders[0].Events()[1].Metadata.(map[string]any)["enqueued_at"], + } + assert.Equal(t, expectedEvent, senders[0].Events()[1]) } func TestHostMetadataSpanAdditions(t *testing.T) { @@ -427,7 +443,9 @@ func TestHostMetadataSpanAdditions(t *testing.T) { port := 14000 sender := &transmission.MockSender{} - app, graph := newStartedApp(t, sender, port, nil, true) + cfg := defaultConfig(port) + cfg.AddHostMetadataToTrace = true + app, graph := newStartedApp(t, sender, nil, nil, cfg) // Send a root span, it should be sent in short order. req := httptest.NewRequest( @@ -481,11 +499,12 @@ func TestEventsEndpoint(t *testing.T) { ID: peerList[i], } - apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false) + cfg := defaultConfig(basePort) + apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg) defer startstop.Stop(graph.Objects(), nil) } - // Deliver to host 1, it should be passed to host 0 and emitted there. + // Deliver to host 1, it should be passed to host 0 zEnc, _ := zstd.NewWriter(nil) blob := zEnc.EncodeAll([]byte(`{"foo":"bar","trace.trace_id":"1"}`), nil) req, err := http.NewRequest( @@ -504,7 +523,6 @@ func TestEventsEndpoint(t *testing.T) { assert.Eventually(t, func() bool { return len(senders[0].Events()) == 1 }, 2*time.Second, 2*time.Millisecond) - assert.Equal( t, &transmission.Event{ @@ -527,7 +545,6 @@ func TestEventsEndpoint(t *testing.T) { }, senders[0].Events()[0], ) - // Repeat, but deliver to host 1 on the peer channel, it should be // passed to host 0 since that's the host this trace belongs to. @@ -553,7 +570,6 @@ func TestEventsEndpoint(t *testing.T) { assert.Eventually(t, func() bool { return len(senders[0].Events()) == 1 }, 2*time.Second, 2*time.Millisecond) - assert.Equal( t, &transmission.Event{ @@ -577,7 +593,6 @@ func TestEventsEndpoint(t *testing.T) { senders[0].Events()[0], ) } - func TestEventsEndpointWithNonLegacyKey(t *testing.T) { t.Parallel() @@ -585,6 +600,9 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { "http://localhost:15001", "http://localhost:15003", } + // this traceID was chosen because it hashes to the appropriate shard for this + // test. You can't change it or the number of peers and still expect the test to pass. + traceID := "4" var apps [2]*App var senders [2]*transmission.MockSender @@ -596,16 +614,15 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { ID: peerList[i], } - app, graph := newStartedApp(t, senders[i], basePort, peers, false) + cfg := defaultConfig(basePort) + + app, graph := newStartedApp(t, senders[i], nil, peers, cfg) app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil }) apps[i] = app defer startstop.Stop(graph.Objects(), nil) } - // this traceID was chosen because it hashes to the appropriate shard for this - // test. You can't change it or the number of peers and still expect the test to pass. - traceID := "4" traceData := []byte(fmt.Sprintf(`{"foo":"bar","trace.trace_id":"%s"}`, traceID)) // Deliver to host 1, it should be passed to host 0 and emitted there. zEnc, _ := zstd.NewWriter(nil) @@ -649,7 +666,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { }, senders[0].Events()[0], ) - // Repeat, but deliver to host 1 on the peer channel, it should be // passed to host 0. @@ -700,6 +716,75 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) { ) } +func TestPeerRouting_TraceLocalityDisabled(t *testing.T) { + // Parallel integration tests need different ports! + t.Parallel() + + peerList := []string{"http://localhost:17001", "http://localhost:17003"} + + var apps [2]*App + var senders [2]*transmission.MockSender + var peerSenders [2]*transmission.MockSender + for i := range apps { + var graph inject.Graph + basePort := 17000 + (i * 2) + senders[i] = &transmission.MockSender{} + peerSenders[i] = &transmission.MockSender{} + peers := &peer.MockPeers{ + Peers: peerList, + ID: peerList[i], + } + cfg := defaultConfig(basePort) + collectionCfg := cfg.GetCollectionConfigVal + collectionCfg.EnableTraceLocality = false + cfg.GetCollectionConfigVal = collectionCfg + + apps[i], graph = newStartedApp(t, senders[i], peerSenders[i], peers, cfg) + defer startstop.Stop(graph.Objects(), nil) + } + + // Deliver to host 1, it should be passed to host 0 and emitted there. + req, err := http.NewRequest( + "POST", + "http://localhost:17002/1/batch/dataset", + nil, + ) + assert.NoError(t, err) + req.Header.Set("X-Honeycomb-Team", legacyAPIKey) + req.Header.Set("Content-Type", "application/json") + + // this span index was chosen because it hashes to the appropriate shard for this + // test. You can't change it and expect the test to pass. + blob := `[` + string(spans[10]) + `]` + req.Body = io.NopCloser(strings.NewReader(blob)) + post(t, req) + require.Eventually(t, func() bool { + return len(peerSenders[1].Events()) == 1 + }, 2*time.Second, 2*time.Millisecond) + + expectedEvent := &transmission.Event{ + APIKey: legacyAPIKey, + Dataset: "dataset", + SampleRate: 2, + APIHost: "http://localhost:17001", + Timestamp: now, + Data: map[string]interface{}{ + "trace_id": "2", + "meta.refinery.min_span": true, + "meta.annotation_type": types.SpanAnnotationTypeUnknown, + "meta.refinery.root": false, + "meta.refinery.span_data_size": 157, + }, + Metadata: map[string]any{ + "api_host": "http://localhost:17001", + "dataset": "dataset", + "environment": "", + "enqueued_at": peerSenders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"], + }, + } + assert.Equal(t, expectedEvent, peerSenders[1].Events()[0]) +} + var ( now = time.Now().UTC() nowString = now.Format(time.RFC3339Nano) @@ -760,7 +845,8 @@ func BenchmarkTraces(b *testing.B) { W: io.Discard, }, } - _, graph := newStartedApp(b, sender, 11000, nil, false) + cfg := defaultConfig(11000) + _, graph := newStartedApp(b, sender, nil, nil, cfg) req, err := http.NewRequest( "POST", @@ -862,7 +948,8 @@ func BenchmarkDistributedTraces(b *testing.B) { ID: peerList[i], } - apps[i], graph = newStartedApp(b, sender, basePort, peers, false) + cfg := defaultConfig(basePort) + apps[i], graph = newStartedApp(b, sender, nil, peers, cfg) defer startstop.Stop(graph.Objects(), nil) addrs[i] = "localhost:" + strconv.Itoa(basePort) diff --git a/collect/collect.go b/collect/collect.go index ec8373b5a8..34301ba500 100644 --- a/collect/collect.go +++ b/collect/collect.go @@ -424,10 +424,24 @@ func (i *InMemCollector) redistributeTraces() { newTarget := i.Sharder.WhichShard(trace.TraceID) if newTarget.Equals(i.Sharder.MyShard()) { + if !i.Config.GetCollectionConfig().EnableTraceLocality { + // Drop all proxy spans since peers will resend them + trace.RemoveDecisionSpans() + } continue } for _, sp := range trace.GetSpans() { + if sp.IsDecisionSpan() { + continue + } + + if !i.Config.GetCollectionConfig().EnableTraceLocality { + dc := i.createDecisionSpan(sp, trace, newTarget) + i.PeerTransmission.EnqueueEvent(dc) + continue + } + sp.APIHost = newTarget.GetAddress() if sp.Data == nil { @@ -537,6 +551,24 @@ func (i *InMemCollector) processSpan(sp *types.Span) { // great! trace is live. add the span. trace.AddSpan(sp) + // Figure out if we should handle this span locally or pass on to a peer + var spanForwarded bool + if !i.Config.GetCollectionConfig().EnableTraceLocality { + // if this trace doesn't belong to us, we should forward a decision span to its decider + targetShard := i.Sharder.WhichShard(trace.ID()) + if !targetShard.Equals(i.Sharder.MyShard()) && !sp.IsDecisionSpan() { + i.Metrics.Increment("incoming_router_peer") + i.Logger.Debug(). + WithString("peer", targetShard.GetAddress()). + Logf("Sending span to peer") + + dc := i.createDecisionSpan(sp, trace, targetShard) + + i.PeerTransmission.EnqueueEvent(dc) + spanForwarded = true + } + } + // we may override these values in conditions below var markTraceForSending bool timeout := tcfg.GetSendDelay() @@ -544,8 +576,8 @@ func (i *InMemCollector) processSpan(sp *types.Span) { timeout = 2 * time.Second // a sensible default } - // if this is a root span, say so and send the trace - if sp.IsRoot { + // if this is a root span and its destination shard is the current refinery, say so and send the trace + if sp.IsRoot && !spanForwarded { markTraceForSending = true trace.RootSpan = sp } @@ -556,7 +588,8 @@ func (i *InMemCollector) processSpan(sp *types.Span) { timeout = 0 // don't use a timeout in this case; this is an "act fast" situation } - if markTraceForSending { + // we should only mark a trace for sending if we are the destination shard + if markTraceForSending && !spanForwarded { trace.SendBy = i.Clock.Now().Add(timeout) } } @@ -635,6 +668,13 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe }) defer span.End() + // if we receive a proxy span after a trace decision has been made, + // we should just broadcast the decision again + if sp.IsDecisionSpan() { + // TODO: broadcast the decision again + return + } + if i.Config.GetAddRuleReasonToTrace() { var metaReason string if len(keptReason) > 0 { @@ -801,7 +841,11 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) { } else { i.Logger.Info().WithFields(logFields).Logf("Sending trace") } + for _, sp := range trace.GetSpans() { + if sp.IsDecisionSpan() { + continue + } if i.Config.GetAddRuleReasonToTrace() { sp.Data["meta.refinery.reason"] = reason sp.Data["meta.refinery.send_reason"] = sendReason @@ -954,7 +998,8 @@ func (i *InMemCollector) sendTracesOnShutdown() { // distributeSpansInCache takes a list of spans and sends them to the appropriate channel based on the state of the trace. func (i *InMemCollector) distributeSpansOnShutdown(sentSpanChan chan sentRecord, forwardSpanChan chan *types.Span, spans ...*types.Span) { for _, sp := range spans { - if sp != nil { + // if the span is a decision span, we don't need to do anything with it + if sp != nil && !sp.IsDecisionSpan() { // first check if there's a trace decision record, reason, found := i.sampleTraceCache.CheckSpan(sp) @@ -1052,6 +1097,31 @@ func (i *InMemCollector) addAdditionalAttributes(sp *types.Span) { } } +func (i *InMemCollector) createDecisionSpan(sp *types.Span, trace *types.Trace, targetShard sharder.Shard) *types.Event { + selector, isLegacyKey := trace.GetSamplerKey() + if selector == "" { + i.Logger.Error().WithField("trace_id", trace.ID()).Logf("error getting sampler selection key for trace") + } + + sampler, found := i.datasetSamplers[selector] + if !found { + sampler = i.SamplerFactory.GetSamplerImplementationForKey(selector, isLegacyKey) + i.datasetSamplers[selector] = sampler + } + + dc := sp.ExtractDecisionContext() + // extract all key fields from the span + keyFields := sampler.GetKeyFields() + for _, keyField := range keyFields { + if val, ok := sp.Data[keyField]; ok { + dc.Data[keyField] = val + } + } + + dc.APIHost = targetShard.GetAddress() + return dc +} + func newRedistributeNotifier(logger logger.Logger, met metrics.Metrics, clock clockwork.Clock) *redistributeNotifier { r := &redistributeNotifier{ initialDelay: 3 * time.Second, diff --git a/collect/collect_test.go b/collect/collect_test.go index fe78fb602d..61d93f1abd 100644 --- a/collect/collect_test.go +++ b/collect/collect_test.go @@ -155,6 +155,30 @@ func TestAddRootSpan(t *testing.T) { assert.Equal(t, 2, len(transmission.Events), "adding another root span should send the span") assert.Equal(t, "aoeu", transmission.Events[1].Dataset, "sending a root span should immediately send that span via transmission") transmission.Mux.RUnlock() + + decisionSpanTraceID := "decision_root_span" + span = &types.Span{ + TraceID: decisionSpanTraceID, + Event: types.Event{ + Dataset: "aoeu", + APIKey: legacyAPIKey, + Data: map[string]interface{}{ + "meta.refinery.min_span": true, + }, + }, + IsRoot: true, + } + + coll.AddSpanFromPeer(span) + time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2) + // adding one root decision span with no parent ID should: + // * create the trace in the cache + // * send the trace + // * remove the trace from the cache + assert.Nil(t, coll.getFromCache(decisionSpanTraceID), "after sending the span, it should be removed from the cache") + transmission.Mux.RLock() + assert.Equal(t, 2, len(transmission.Events), "adding a root decision span should send the trace but not the decision span itself") + transmission.Mux.RUnlock() } // #490, SampleRate getting stomped could cause confusion if sampling was @@ -1100,7 +1124,19 @@ func TestAddSpanCount(t *testing.T) { APIKey: legacyAPIKey, }, } + decisionSpan := &types.Span{ + TraceID: traceID, + Event: types.Event{ + Dataset: "aoeu", + Data: map[string]interface{}{ + "trace.parent_id": "unused", + "meta.refinery.min_span": true, + }, + APIKey: legacyAPIKey, + }, + } coll.AddSpanFromPeer(span) + coll.AddSpanFromPeer(decisionSpan) time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2) assert.Equal(t, traceID, coll.getFromCache(traceID).TraceID, "after adding the span, we should have a trace in the cache with the right trace ID") @@ -1122,7 +1158,7 @@ func TestAddSpanCount(t *testing.T) { transmission.Mux.RLock() assert.Equal(t, 2, len(transmission.Events), "adding a root span should send all spans in the trace") assert.Equal(t, nil, transmission.Events[0].Data["meta.span_count"], "child span metadata should NOT be populated with span count") - assert.Equal(t, int64(2), transmission.Events[1].Data["meta.span_count"], "root span metadata should be populated with span count") + assert.Equal(t, int64(3), transmission.Events[1].Data["meta.span_count"], "root span metadata should be populated with span count") transmission.Mux.RUnlock() } @@ -1916,3 +1952,81 @@ func TestBigTracesGoEarly(t *testing.T) { assert.Equal(t, "trace_send_late_span", transmission.Events[spanlimit].Data["meta.refinery.send_reason"], "send reason should indicate span count exceeded") transmission.Mux.RUnlock() } + +func TestCreateDecisionSpan(t *testing.T) { + conf := &config.MockConfig{ + GetTracesConfigVal: config.TracesConfig{ + SendTicker: config.Duration(2 * time.Millisecond), + SendDelay: config.Duration(1 * time.Millisecond), + TraceTimeout: config.Duration(5 * time.Millisecond), + MaxBatchSize: 500, + }, + } + + transmission := &transmit.MockTransmission{} + transmission.Start() + peerTransmission := &transmit.MockTransmission{} + peerTransmission.Start() + coll := newTestCollector(conf, transmission, peerTransmission) + + mockSampler := &sample.DynamicSampler{ + Config: &config.DynamicSamplerConfig{ + SampleRate: 1, + FieldList: []string{"http.status_code", "test"}, + }, Logger: coll.Logger, Metrics: coll.Metrics, + } + mockSampler.Start() + + coll.datasetSamplers = map[string]sample.Sampler{ + "aoeu": mockSampler, + } + + traceID1 := "trace1" + peerShard := &sharder.TestShard{Addr: "peer-address"} + + nonrootSpan := &types.Span{ + TraceID: traceID1, + Event: types.Event{ + Dataset: "aoeu", + Data: map[string]interface{}{ + "trace.parent_id": "unused", + "http.status_code": 200, + "test": 1, + "should-not-be-included": 123, + }, + APIKey: legacyAPIKey, + }, + } + + trace := &types.Trace{ + TraceID: traceID1, + Dataset: "aoeu", + APIKey: legacyAPIKey, + } + ds := coll.createDecisionSpan(nonrootSpan, trace, peerShard) + + expected := &types.Event{ + Dataset: "aoeu", + APIHost: peerShard.Addr, + APIKey: legacyAPIKey, + Data: map[string]interface{}{ + "meta.annotation_type": types.SpanAnnotationTypeUnknown, + "meta.refinery.min_span": true, + "meta.refinery.root": false, + "meta.refinery.span_data_size": 30, + "trace_id": traceID1, + + "http.status_code": 200, + "test": 1, + }, + } + + assert.EqualValues(t, expected, ds) + + rootSpan := nonrootSpan + rootSpan.IsRoot = true + + ds = coll.createDecisionSpan(rootSpan, trace, peerShard) + expected.Data["meta.refinery.root"] = true + assert.EqualValues(t, expected, ds) +} diff --git a/config/file_config.go b/config/file_config.go index 9f5dd7ea8b..0b5e09bb3b 100644 --- a/config/file_config.go +++ b/config/file_config.go @@ -306,6 +306,7 @@ type CollectionConfig struct { MaxAlloc MemorySize `yaml:"MaxAlloc"` DisableRedistribution bool `yaml:"DisableRedistribution"` ShutdownDelay Duration `yaml:"ShutdownDelay" default:"15s"` + EnableTraceLocality bool `yaml:"EnableTraceLocality"` } // GetMaxAlloc returns the maximum amount of memory to use for the cache. diff --git a/config/metadata/configMeta.yaml b/config/metadata/configMeta.yaml index bf9220720c..69f3767182 100644 --- a/config/metadata/configMeta.yaml +++ b/config/metadata/configMeta.yaml @@ -1286,6 +1286,16 @@ groups: This value should be set to a bit less than the normal timeout period for shutting down without forcibly terminating the process. + - name: EnableTraceLocality + type: bool + valuetype: nondefault + firstversion: v2.9 + default: false + reload: true + summary: controls whether all spans that belongs to the same trace are sent to a single Refinery for processing. + description: > + If `true`, Refinery's will route all spans that belongs to the same trace to a single peer. + - name: BufferSizes title: "Buffer Sizes" description: > diff --git a/route/route.go b/route/route.go index ccafc5f7bf..854c1b77bf 100644 --- a/route/route.go +++ b/route/route.go @@ -600,23 +600,24 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { } } - // TODO: only do this if the span proxy feature is disabled - // Figure out if we should handle this span locally or pass on to a peer - targetShard := r.Sharder.WhichShard(traceID) - if !targetShard.Equals(r.Sharder.MyShard()) { - r.Metrics.Increment(r.incomingOrPeer + "_router_peer") - debugLog. - WithString("peer", targetShard.GetAddress()). - WithField("isprobe", isProbe). - Logf("Sending span from batch to peer") - - ev.APIHost = targetShard.GetAddress() - - // Unfortunately this doesn't tell us if the event was actually - // enqueued; we need to watch the response channel to find out, at - // which point it's too late to tell the client. - r.PeerTransmission.EnqueueEvent(ev) - return nil + if r.Config.GetCollectionConfig().EnableTraceLocality { + // Figure out if we should handle this span locally or pass on to a peer + targetShard := r.Sharder.WhichShard(traceID) + if !targetShard.Equals(r.Sharder.MyShard()) { + r.Metrics.Increment(r.incomingOrPeer + "_router_peer") + debugLog. + WithString("peer", targetShard.GetAddress()). + WithField("isprobe", isProbe). + Logf("Sending span from batch to peer") + + ev.APIHost = targetShard.GetAddress() + + // Unfortunately this doesn't tell us if the event was actually + // enqueued; we need to watch the response channel to find out, at + // which point it's too late to tell the client. + r.PeerTransmission.EnqueueEvent(ev) + return nil + } } if isProbe { @@ -630,7 +631,6 @@ func (r *Router) processEvent(ev *types.Event, reqID interface{}) error { if r.incomingOrPeer == "incoming" { err = r.Collector.AddSpan(span) } else { - // TODO: again, only do this if span proxy is disabled err = r.Collector.AddSpanFromPeer(span) } if err != nil { diff --git a/types/event.go b/types/event.go index f80849e69d..dd891e4b14 100644 --- a/types/event.go +++ b/types/event.go @@ -241,8 +241,14 @@ func (sp *Span) GetDataSize() int { if sp.IsDecisionSpan() { if v, ok := sp.Data["meta.refinery.span_data_size"]; ok { - return v.(int) + switch value := v.(type) { + case int64: + return int(value) + case uint64: + return int(value) + } } + return 0 } // the data types we should be getting from JSON are: // float64, int64, bool, string, []byte