-
Notifications
You must be signed in to change notification settings - Fork 979
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Clarification on workflow.Interceptor #374
Comments
it will only intercept request with context of workflow |
I'm trying to use custom metadata inside a worflow context. However, it seems that dtm is not sending headers when using the interceptor. Even if I try to put myself metadata on the context object, the interceptor is replacing any existing metadata with the call to // TransInfo2Ctx add trans info to grpc context
func TransInfo2Ctx(ctx context.Context, gid, transType, branchID, op, dtm string) context.Context {
md := metadata.Pairs(
dtmpre+"gid", gid,
dtmpre+"trans_type", transType,
dtmpre+"branch_id", branchID,
dtmpre+"op", op,
dtmpre+"dtm", dtm,
)
nctx := ctx
if ctx == nil {
nctx = context.Background()
}
return metadata.NewOutgoingContext(nctx, md)
} I think that we should replace the call to The following piece of code is not working also: workflow.Register(s.Name, func(wf *workflow.Workflow, data []byte) error {
wf.BranchHeaders = myHeaders
} |
client v1.16.4 has been released. In this new version, |
Thank you, but now if I try to extend the workflow context with my headers wf.Context = metadata.NewOutgoingContext(wf.Context, metadata.Pairs(pairs...)) , then I get: dtmimp.PanicIf(wf.currentActionAdded, fmt.Errorf("one branch can have only on action")) |
The interceptor has currently another problem: func Interceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logger.Debugf("grpc client calling: %s%s %v", cc.Target(), method, dtmimp.MustMarshalString(req))
wf := ctx.Value(wfMeta{}).(*Workflow)
origin := func() error {
ctx1 := dtmgimp.TransInfo2Ctx(ctx, wf.Gid, wf.TransType, wf.currentBranch, wf.currentOp, wf.Dtm)
err := invoker(ctx1, method, req, reply, cc, opts...)
res := fmt.Sprintf("grpc client called: %s%s %s result: %s err: %v",
cc.Target(), method, dtmimp.MustMarshalString(req), dtmimp.MustMarshalString(reply), err)
if err != nil {
logger.Errorf("%s", res)
} else {
logger.Debugf("%s", res)
}
return err
}
if wf.currentOp != dtmimp.OpAction {
return origin()
}
sr := wf.recordedDo(func(bb *dtmcli.BranchBarrier) *stepResult {
err := origin()
return wf.stepResultFromGrpc(reply, err)
})
return wf.stepResultToGrpc(sr, reply)
} Basically, if a context without the wfMeta{} key is intercepted, the code will panic, 'cause there is no null check for wf. wf := ctx.Value(wfMeta{}).(*Workflow)
if wf == nil {
return nil
} |
can you make a pr to dtm? |
you can pass a ctx to workflow's constructor |
Of course.
What do you mean? |
This error hint is saying that, you should call NewBranch if you are adding another action |
I am sorry, please ignore it |
Oh, I see, so basically every branch should be followed by at most one grpc call, shouldn't it? wf.NewBranch()...
myOp1
wf.NewBranch()...
myOp2
... While the following is wrong: wf.NewBranch()...
myOp1
myOp2
... |
yes |
Great, I confirm you that this is working. Submitted PR #375. Can you create a release with the fix when it will be merged? |
I will create a release with this fix tomorrow |
released |
When you deploy dtm in your production, please help to fill the user list |
Is it safe to use a grpc client for which I have installed a
workflow.Interceptor
also outside a workflow definition?Or will all calls to the client be intercepted and cached by the interceptor?
The text was updated successfully, but these errors were encountered: