Skip to content

Commit

Permalink
Allow outbox payload transformation (dapr#7718)
Browse files Browse the repository at this point in the history
* first commit

Signed-off-by: yaron2 <[email protected]>

* add tests

Signed-off-by: yaron2 <[email protected]>

* linter

Signed-off-by: yaron2 <[email protected]>

---------

Signed-off-by: yaron2 <[email protected]>
  • Loading branch information
yaron2 authored May 7, 2024
1 parent b13d86e commit a89c41b
Show file tree
Hide file tree
Showing 6 changed files with 395 additions and 14 deletions.
4 changes: 2 additions & 2 deletions pkg/api/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -961,14 +961,14 @@ func (a *api) ExecuteStateTransaction(ctx context.Context, in *runtimev1pb.Execu
if outboxEnabled {
span := diagUtils.SpanFromContext(ctx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
trs, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(ctx, in.GetStoreName(), operations, a.Universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.AppID(), err)
apiServerLogger.Debug(nerr)
return &emptypb.Empty{}, nerr
}

operations = append(operations, trs...)
operations = ops
}

start := time.Now()
Expand Down
4 changes: 2 additions & 2 deletions pkg/api/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -1578,15 +1578,15 @@ func (a *api) onPostStateTransaction(reqCtx *fasthttp.RequestCtx) {
if outboxEnabled {
span := diagUtils.SpanFromContext(reqCtx)
corID, traceState := diag.TraceIDAndStateFromSpan(span)
trs, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
ops, err := a.pubsubAdapter.Outbox().PublishInternal(reqCtx, storeName, operations, a.universal.AppID(), corID, traceState)
if err != nil {
nerr := apierrors.PubSubOutbox(a.universal.AppID(), err)
universalFastHTTPErrorResponder(reqCtx, nerr)
log.Debug(nerr)
return
}

operations = append(operations, trs...)
operations = ops
}

start := time.Now()
Expand Down
51 changes: 41 additions & 10 deletions pkg/runtime/pubsub/outbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func transaction() (state.TransactionalStateOperation, error) {
}, nil
}

// PublishInternal publishes the state to an internal topic for outbox processing
// PublishInternal publishes the state to an internal topic for outbox processing and returns the updated list of transactions
func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, operations []state.TransactionalStateOperation, source, traceID, traceState string) ([]state.TransactionalStateOperation, error) {
o.lock.RLock()
c, ok := o.outboxStores[stateStore]
Expand All @@ -139,7 +139,21 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
return nil, fmt.Errorf("error publishing internal outbox message: could not find outbox configuration on state store %s", stateStore)
}

trs := make([]state.TransactionalStateOperation, 0, len(operations))
projections := map[string]state.SetRequest{}

for i, op := range operations {
sr, ok := op.(state.SetRequest)

if ok {
for k, v := range sr.Metadata {
if k == "outbox.projection" && utils.IsTruthy(v) {
projections[sr.Key] = sr
operations = append(operations[:i], operations[i+1:]...)
}
}
}
}

for _, op := range operations {
sr, ok := op.(state.SetRequest)
if ok {
Expand All @@ -148,24 +162,41 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
return nil, err
}

var payload any
var contentType string

if proj, ok := projections[sr.Key]; ok {
payload = proj.Value

if proj.ContentType != nil {
contentType = *proj.ContentType
}
} else {
payload = sr.Value

if sr.ContentType != nil {
contentType = *sr.ContentType
}
}

var ceData []byte
bt, ok := sr.Value.([]byte)
bt, ok := payload.([]byte)
if ok {
ceData = bt
} else if sr.ContentType != nil && strings.EqualFold(*sr.ContentType, "application/json") {
b, sErr := json.Marshal(sr.Value)
} else if contentType != "" && strings.EqualFold(contentType, "application/json") {
b, sErr := json.Marshal(payload)
if sErr != nil {
return nil, sErr
}

ceData = b
} else {
ceData = []byte(fmt.Sprintf("%v", sr.Value))
ceData = []byte(fmt.Sprintf("%v", payload))
}

var dataContentType string
if sr.ContentType != nil {
dataContentType = *sr.ContentType
if contentType != "" {
dataContentType = contentType
}

ce := contribPubsub.NewCloudEventsEnvelope(tr.GetKey(), source, "", "", "", c.outboxPubsub, dataContentType, ceData, "", traceState)
Expand Down Expand Up @@ -193,11 +224,11 @@ func (o *outboxImpl) PublishInternal(ctx context.Context, stateStore string, ope
return nil, err
}

trs = append(trs, tr)
operations = append(operations, tr)
}
}

return trs, nil
return operations, nil
}

func outboxTopic(appID, topic, namespace string) string {
Expand Down
6 changes: 6 additions & 0 deletions pkg/runtime/pubsub/outbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
},
}, appID, "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01", "00-ecdf5aaa79bff09b62b201442c0f3061-d2597ed7bfd029e4-01")

trs = append(trs[:0], trs[0+1:]...)

if pErr != nil {
errCh <- pErr
return
Expand Down Expand Up @@ -781,6 +783,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
},
}, appID, "", "")

trs = append(trs[:0], trs[0+1:]...)

if pErr != nil {
errCh <- pErr
return
Expand Down Expand Up @@ -906,6 +910,8 @@ func TestSubscribeToInternalTopics(t *testing.T) {
},
}, appID, "", "")

trs = append(trs[:0], trs[0+1:]...)

if pErr != nil {
errCh <- pErr
return
Expand Down
146 changes: 146 additions & 0 deletions tests/integration/suite/daprd/outbox/grpc/projection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieh.
See the License for the specific language governing permissions and
limitations under the License.
*/

package grpc

import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"

"github.com/dapr/dapr/pkg/proto/common/v1"
runtimev1pb "github.com/dapr/dapr/pkg/proto/runtime/v1"
"github.com/dapr/dapr/tests/integration/framework"
"github.com/dapr/dapr/tests/integration/framework/process/daprd"
"github.com/dapr/dapr/tests/integration/framework/process/grpc/app"
"github.com/dapr/dapr/tests/integration/suite"
)

func init() {
suite.Register(new(projection))
}

type projection struct {
daprd *daprd.Daprd
lock sync.Mutex
msg []byte
}

func (o *projection) Setup(t *testing.T) []framework.Option {
onTopicEvent := func(ctx context.Context, in *runtimev1pb.TopicEventRequest) (*runtimev1pb.TopicEventResponse, error) {
o.lock.Lock()
defer o.lock.Unlock()
o.msg = in.GetData()
return &runtimev1pb.TopicEventResponse{
Status: runtimev1pb.TopicEventResponse_SUCCESS,
}, nil
}

srv1 := app.New(t, app.WithOnTopicEventFn(onTopicEvent))
o.daprd = daprd.New(t, daprd.WithAppID("outboxtest"), daprd.WithAppPort(srv1.Port(t)), daprd.WithAppProtocol("grpc"), daprd.WithResourceFiles(`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: mystore
spec:
type: state.in-memory
version: v1
metadata:
- name: outboxPublishPubsub
value: "mypubsub"
- name: outboxPublishTopic
value: "test"
`,
`
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: 'mypubsub'
spec:
type: pubsub.in-memory
version: v1
`,
`
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: 'order'
spec:
topic: 'test'
routes:
default: '/test'
pubsubname: 'mypubsub'
scopes:
- outboxtest
`))

return []framework.Option{
framework.WithProcesses(srv1, o.daprd),
}
}

func (o *projection) Run(t *testing.T, ctx context.Context) {
o.daprd.WaitUntilRunning(t, ctx)

conn, err := grpc.DialContext(ctx, o.daprd.GRPCAddress(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, conn.Close()) })

_, err = runtimev1pb.NewDaprClient(conn).ExecuteStateTransaction(ctx, &runtimev1pb.ExecuteStateTransactionRequest{
StoreName: "mystore",
Operations: []*runtimev1pb.TransactionalStateOperation{
{
OperationType: "upsert",
Request: &common.StateItem{
Key: "1",
Value: []byte("2"),
},
},
{
OperationType: "upsert",
Request: &common.StateItem{
Key: "1",
Value: []byte("3"),
Metadata: map[string]string{
"outbox.projection": "true",
},
},
},
},
})
require.NoError(t, err)

assert.Eventually(t, func() bool {
o.lock.Lock()
defer o.lock.Unlock()
return string(o.msg) == "3"
}, time.Second*5, time.Millisecond*10, "failed to receive message in time")

assert.Eventually(t, func() bool {
o.lock.Lock()
defer o.lock.Unlock()

resp, err := runtimev1pb.NewDaprClient(conn).GetState(ctx, &runtimev1pb.GetStateRequest{
Key: "1",
StoreName: "mystore",
})
require.NoError(t, err)
return string(resp.GetData()) == "2"
}, time.Second*5, time.Millisecond*10, "failed to receive message in time")
}
Loading

0 comments on commit a89c41b

Please sign in to comment.