From c75c08f6f364620238b67cb2bfd231b3bde57c79 Mon Sep 17 00:00:00 2001 From: Yaron Schneider Date: Wed, 24 Apr 2024 16:10:46 -0700 Subject: [PATCH] Add data extension fields to outbox (#7705) * add data extension fields to outbox Signed-off-by: yaron2 * fix typo Signed-off-by: yaron2 * add pubsub field to finalized cloudevent Signed-off-by: yaron2 * add non-overridable fields and tests Signed-off-by: yaron2 * add validation of correct cloud events to intg test Signed-off-by: yaron2 --------- Signed-off-by: yaron2 --- pkg/runtime/pubsub/outbox.go | 48 +++++------- pkg/runtime/pubsub/outbox_test.go | 74 ++++++++++++++++++- .../suite/daprd/outbox/http/basic.go | 7 +- 3 files changed, 92 insertions(+), 37 deletions(-) diff --git a/pkg/runtime/pubsub/outbox.go b/pkg/runtime/pubsub/outbox.go index 1229d159808..3ae5e4b2ca6 100644 --- a/pkg/runtime/pubsub/outbox.go +++ b/pkg/runtime/pubsub/outbox.go @@ -163,25 +163,23 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope ceData = []byte(fmt.Sprintf("%v", sr.Value)) } - ce := &CloudEvent{ - ID: tr.GetKey(), - Source: source, - Pubsub: c.outboxPubsub, - Data: ceData, - TraceID: traceID, - TraceState: traceState, - } - + var dataContentType string if sr.ContentType != nil { - ce.DataContentType = *sr.ContentType + dataContentType = *sr.ContentType } - msg, err := NewCloudEvent(ce, nil) - if err != nil { - return nil, err + ce := contribPubsub.NewCloudEventsEnvelope(tr.GetKey(), source, "", "", "", c.outboxPubsub, dataContentType, ceData, "", traceState) + ce[contribPubsub.TraceIDField] = traceID + + for k, v := range op.GetMetadata() { + if k == contribPubsub.DataField || k == contribPubsub.IDField { + continue + } + + ce[k] = v } - data, err := json.Marshal(msg) + data, err := json.Marshal(ce) if err != nil { return nil, err } @@ -228,10 +226,6 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string } stateKey := o.cloudEventExtractorFn(cloudEvent, contribPubsub.IDField) - data := []byte(o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataField)) - contentType := o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataContentTypeField) - traceID := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceIDField) - traceState := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceStateField) store, ok := o.getStateFn(stateStore) if !ok { @@ -274,24 +268,16 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string return err } - ce, err := NewCloudEvent(&CloudEvent{ - Data: data, - DataContentType: contentType, - Pubsub: c.publishPubSub, - Source: appID, - Topic: c.publishTopic, - TraceID: traceID, - TraceState: traceState, - }, nil) - if err != nil { - return err - } + cloudEvent[contribPubsub.TopicField] = c.publishTopic + cloudEvent[contribPubsub.PubsubField] = c.publishPubSub - b, err := json.Marshal(ce) + b, err := json.Marshal(cloudEvent) if err != nil { return err } + contentType := cloudEvent[contribPubsub.DataContentTypeField].(string) + err = o.publishFn(ctx, &contribPubsub.PublishRequest{ PubsubName: c.publishPubSub, Data: b, diff --git a/pkg/runtime/pubsub/outbox_test.go b/pkg/runtime/pubsub/outbox_test.go index 4dc5b587791..4d4da01a8b6 100644 --- a/pkg/runtime/pubsub/outbox_test.go +++ b/pkg/runtime/pubsub/outbox_test.go @@ -259,6 +259,60 @@ func TestPublishInternal(t *testing.T) { require.NoError(t, err) }) + t.Run("valid operation, correct overridden parameters", func(t *testing.T) { + o := newTestOutbox().(*outboxImpl) + o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error { + var cloudEvent map[string]interface{} + err := json.Unmarshal(pr.Data, &cloudEvent) + require.NoError(t, err) + + assert.Equal(t, "test", cloudEvent["data"]) + assert.Equal(t, "a", pr.PubsubName) + assert.Equal(t, "testapp1outbox", pr.Topic) + assert.Equal(t, "testsource", cloudEvent["source"]) + assert.Equal(t, "text/plain", cloudEvent["datacontenttype"]) + assert.Equal(t, "a", cloudEvent["pubsubname"]) + + return nil + } + + o.AddOrUpdateOutbox(v1alpha1.Component{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: v1alpha1.ComponentSpec{ + Metadata: []common.NameValuePair{ + { + Name: outboxPublishPubsubKey, + Value: common.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("a"), + }, + }, + }, + { + Name: outboxPublishTopicKey, + Value: common.DynamicValue{ + JSON: v1.JSON{ + Raw: []byte("1"), + }, + }, + }, + }, + }, + }) + + _, err := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{ + state.SetRequest{ + Key: "key", + Value: "test", + Metadata: map[string]string{"source": "testsource"}, + }, + }, "testapp", "", "") + + require.NoError(t, err) + }) + t.Run("valid operation, no datacontenttype", func(t *testing.T) { o := newTestOutbox().(*outboxImpl) o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error { @@ -479,7 +533,7 @@ func TestPublishInternal(t *testing.T) { } func TestSubscribeToInternalTopics(t *testing.T) { - t.Run("correct configuration with trace", func(t *testing.T) { + t.Run("correct configuration with trace, custom field and nonoverridable fields", func(t *testing.T) { o := newTestOutbox().(*outboxImpl) o.cloudEventExtractorFn = extractCloudEventProperty @@ -496,11 +550,16 @@ func TestSubscribeToInternalTopics(t *testing.T) { internalCalledCh := make(chan struct{}) externalCalledCh := make(chan struct{}) + var closed bool + o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error { if pr.Topic == outboxTopic { close(internalCalledCh) } else if pr.Topic == "1" { - close(externalCalledCh) + if !closed { + close(externalCalledCh) + closed = true + } } ce := map[string]string{} @@ -508,8 +567,14 @@ func TestSubscribeToInternalTopics(t *testing.T) { traceID := ce[contribPubsub.TraceIDField] traceState := ce[contribPubsub.TraceStateField] + customField := ce["outbox.cloudevent.customfield"] + data := ce[contribPubsub.DataField] + id := ce[contribPubsub.IDField] assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceID) assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceState) + assert.Equal(t, "a", customField) + assert.Equal(t, "hello", data) + assert.Contains(t, id, "outbox-") return psMock.Publish(ctx, pr) } @@ -556,8 +621,9 @@ func TestSubscribeToInternalTopics(t *testing.T) { go func() { trs, pErr := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{ state.SetRequest{ - Key: "1", - Value: "hello", + Key: "1", + Value: "hello", + Metadata: map[string]string{"outbox.cloudevent.customfield": "a", "data": "a", "id": "b"}, }, }, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01") diff --git a/tests/integration/suite/daprd/outbox/http/basic.go b/tests/integration/suite/daprd/outbox/http/basic.go index c618c900671..604e1d2eb52 100644 --- a/tests/integration/suite/daprd/outbox/http/basic.go +++ b/tests/integration/suite/daprd/outbox/http/basic.go @@ -130,8 +130,9 @@ func (o *basic) Run(t *testing.T, ctx context.Context) { postURL := fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/transaction", o.daprd.HTTPPort()) stateReq := state.SetRequest{ - Key: "1", - Value: "2", + Key: "1", + Value: "2", + Metadata: map[string]string{"outbox.cloudevent.myapp": "myapp1", "data": "a", "id": "b"}, } tr := stateTransactionRequestBody{ @@ -174,5 +175,7 @@ func (o *basic) Run(t *testing.T, ctx context.Context) { //nolint:testifylint assert.NoError(c, err) assert.Equal(c, "2", ce["data"]) + assert.Equal(c, "myapp1", ce["outbox.cloudevent.myapp"]) + assert.Contains(c, ce["id"], "outbox-") }, time.Second*10, time.Millisecond*10) }