Skip to content

Commit

Permalink
Add data extension fields to outbox (dapr#7705)
Browse files Browse the repository at this point in the history
* add data extension fields to outbox

Signed-off-by: yaron2 <[email protected]>

* fix typo

Signed-off-by: yaron2 <[email protected]>

* add pubsub field to finalized cloudevent

Signed-off-by: yaron2 <[email protected]>

* add non-overridable fields and tests

Signed-off-by: yaron2 <[email protected]>

* add validation of correct cloud events to intg test

Signed-off-by: yaron2 <[email protected]>

---------

Signed-off-by: yaron2 <[email protected]>
  • Loading branch information
yaron2 authored Apr 24, 2024
1 parent d26894e commit c75c08f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 37 deletions.
48 changes: 17 additions & 31 deletions pkg/runtime/pubsub/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,23 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
ceData = []byte(fmt.Sprintf("%v", sr.Value))
}

ce := &CloudEvent{
ID: tr.GetKey(),
Source: source,
Pubsub: c.outboxPubsub,
Data: ceData,
TraceID: traceID,
TraceState: traceState,
}

var dataContentType string
if sr.ContentType != nil {
ce.DataContentType = *sr.ContentType
dataContentType = *sr.ContentType
}

msg, err := NewCloudEvent(ce, nil)
if err != nil {
return nil, err
ce := contribPubsub.NewCloudEventsEnvelope(tr.GetKey(), source, "", "", "", c.outboxPubsub, dataContentType, ceData, "", traceState)
ce[contribPubsub.TraceIDField] = traceID

for k, v := range op.GetMetadata() {
if k == contribPubsub.DataField || k == contribPubsub.IDField {
continue
}

ce[k] = v
}

data, err := json.Marshal(msg)
data, err := json.Marshal(ce)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -228,10 +226,6 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
}

stateKey := o.cloudEventExtractorFn(cloudEvent, contribPubsub.IDField)
data := []byte(o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataField))
contentType := o.cloudEventExtractorFn(cloudEvent, contribPubsub.DataContentTypeField)
traceID := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceIDField)
traceState := o.cloudEventExtractorFn(cloudEvent, contribPubsub.TraceStateField)

store, ok := o.getStateFn(stateStore)
if !ok {
Expand Down Expand Up @@ -274,24 +268,16 @@ func (o *outboxImpl) SubscribeToInternalTopics(ctx context.Context, appID string
return err
}

ce, err := NewCloudEvent(&CloudEvent{
Data: data,
DataContentType: contentType,
Pubsub: c.publishPubSub,
Source: appID,
Topic: c.publishTopic,
TraceID: traceID,
TraceState: traceState,
}, nil)
if err != nil {
return err
}
cloudEvent[contribPubsub.TopicField] = c.publishTopic
cloudEvent[contribPubsub.PubsubField] = c.publishPubSub

b, err := json.Marshal(ce)
b, err := json.Marshal(cloudEvent)
if err != nil {
return err
}

contentType := cloudEvent[contribPubsub.DataContentTypeField].(string)

err = o.publishFn(ctx, &contribPubsub.PublishRequest{
PubsubName: c.publishPubSub,
Data: b,
Expand Down
74 changes: 70 additions & 4 deletions pkg/runtime/pubsub/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,60 @@ func TestPublishInternal(t *testing.T) {
require.NoError(t, err)
})

t.Run("valid operation, correct overridden parameters", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
var cloudEvent map[string]interface{}
err := json.Unmarshal(pr.Data, &cloudEvent)
require.NoError(t, err)

assert.Equal(t, "test", cloudEvent["data"])
assert.Equal(t, "a", pr.PubsubName)
assert.Equal(t, "testapp1outbox", pr.Topic)
assert.Equal(t, "testsource", cloudEvent["source"])
assert.Equal(t, "text/plain", cloudEvent["datacontenttype"])
assert.Equal(t, "a", cloudEvent["pubsubname"])

return nil
}

o.AddOrUpdateOutbox(v1alpha1.Component{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: v1alpha1.ComponentSpec{
Metadata: []common.NameValuePair{
{
Name: outboxPublishPubsubKey,
Value: common.DynamicValue{
JSON: v1.JSON{
Raw: []byte("a"),
},
},
},
{
Name: outboxPublishTopicKey,
Value: common.DynamicValue{
JSON: v1.JSON{
Raw: []byte("1"),
},
},
},
},
},
})

_, err := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{
state.SetRequest{
Key: "key",
Value: "test",
Metadata: map[string]string{"source": "testsource"},
},
}, "testapp", "", "")

require.NoError(t, err)
})

t.Run("valid operation, no datacontenttype", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
Expand Down Expand Up @@ -479,7 +533,7 @@ func TestPublishInternal(t *testing.T) {
}

func TestSubscribeToInternalTopics(t *testing.T) {
t.Run("correct configuration with trace", func(t *testing.T) {
t.Run("correct configuration with trace, custom field and nonoverridable fields", func(t *testing.T) {
o := newTestOutbox().(*outboxImpl)
o.cloudEventExtractorFn = extractCloudEventProperty

Expand All @@ -496,20 +550,31 @@ func TestSubscribeToInternalTopics(t *testing.T) {
internalCalledCh := make(chan struct{})
externalCalledCh := make(chan struct{})

var closed bool

o.publishFn = func(ctx context.Context, pr *contribPubsub.PublishRequest) error {
if pr.Topic == outboxTopic {
close(internalCalledCh)
} else if pr.Topic == "1" {
close(externalCalledCh)
if !closed {
close(externalCalledCh)
closed = true
}
}

ce := map[string]string{}
json.Unmarshal(pr.Data, &ce)

traceID := ce[contribPubsub.TraceIDField]
traceState := ce[contribPubsub.TraceStateField]
customField := ce["outbox.cloudevent.customfield"]
data := ce[contribPubsub.DataField]
id := ce[contribPubsub.IDField]
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceID)
assert.Equal(t, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", traceState)
assert.Equal(t, "a", customField)
assert.Equal(t, "hello", data)
assert.Contains(t, id, "outbox-")

return psMock.Publish(ctx, pr)
}
Expand Down Expand Up @@ -556,8 +621,9 @@ func TestSubscribeToInternalTopics(t *testing.T) {
go func() {
trs, pErr := o.PublishInternal(context.Background(), "test", []state.TransactionalStateOperation{
state.SetRequest{
Key: "1",
Value: "hello",
Key: "1",
Value: "hello",
Metadata: map[string]string{"outbox.cloudevent.customfield": "a", "data": "a", "id": "b"},
},
}, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01")

Expand Down
7 changes: 5 additions & 2 deletions tests/integration/suite/daprd/outbox/http/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (o *basic) Run(t *testing.T, ctx context.Context) {

postURL := fmt.Sprintf("http://localhost:%d/v1.0/state/mystore/transaction", o.daprd.HTTPPort())
stateReq := state.SetRequest{
Key: "1",
Value: "2",
Key: "1",
Value: "2",
Metadata: map[string]string{"outbox.cloudevent.myapp": "myapp1", "data": "a", "id": "b"},
}

tr := stateTransactionRequestBody{
Expand Down Expand Up @@ -174,5 +175,7 @@ func (o *basic) Run(t *testing.T, ctx context.Context) {
//nolint:testifylint
assert.NoError(c, err)
assert.Equal(c, "2", ce["data"])
assert.Equal(c, "myapp1", ce["outbox.cloudevent.myapp"])
assert.Contains(c, ce["id"], "outbox-")
}, time.Second*10, time.Millisecond*10)
}

0 comments on commit c75c08f

Please sign in to comment.