From 4420e249539f632df691edb44f278e3697ab8f8f Mon Sep 17 00:00:00 2001 From: Viktor Stanchev Date: Sun, 11 Feb 2024 20:15:40 +0000 Subject: [PATCH] organize --- workflow.go | 130 ++++++++++++++++++++++++++-------------------------- 1 file changed, 65 insertions(+), 65 deletions(-) diff --git a/workflow.go b/workflow.go index 7057583..9ab6486 100644 --- a/workflow.go +++ b/workflow.go @@ -104,71 +104,6 @@ func setSchedule(ctx context.Context, temporalClient *Client, opts client.Schedu return nil } -type Workflow1R[Param any, Return any] struct { - Name string - queue *Queue -} - -func NewWorkflow1R[ - Param any, - Return any, -](queue *Queue, name string, -) Workflow1R[Param, Return] { - queue.registerWorkflow(name, (func(context.Context, Param) (Return, error))(nil)) - return Workflow1R[Param, Return]{ - Name: name, - queue: queue, - } -} - -func (w Workflow1R[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl { - return &WorkflowWithImpl{workflowName: w.Name, queue: *w.queue, fn: fn} -} - -func (w Workflow1R[Param, Return]) Register(wr worker.WorkflowRegistry, fn func(workflow.Context, Param) (Return, error)) { - wr.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{ - Name: w.Name, - }) -} - -func (w Workflow1R[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error) { - var ret Return - r, err := w.Execute(ctx, temporalClient, opts, param) - if err != nil { - return ret, err - } - err = r.Get(ctx, &ret) - return ret, err -} - -func (w Workflow1R[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error) { - opts.TaskQueue = w.queue.name - if w.queue.namespace.name != temporalClient.namespace { - // The user must provide a client that's connected to the right namespace to be able to start this workflow. - return nil, fmt.Errorf("wrong namespace for client %s vs workflow %s", temporalClient.namespace, w.queue.namespace.name) - } - return temporalClient.Client.ExecuteWorkflow(ctx, opts, w.Name, param) -} - -func (w Workflow1R[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error) { - var o Return - err := w.ExecuteChild(ctx, opts, param).Get(ctx, &o) - if err != nil { - return o, err - } - return o, nil -} - -func (w Workflow1R[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture { - opts.TaskQueue = w.queue.name - opts.Namespace = w.queue.namespace.name - return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, opts), w.Name, param) -} - -func (w Workflow1R[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error { - return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param}) -} - type Workflow0R[Return any] struct { Name string queue *Queue @@ -314,3 +249,68 @@ func (w Workflow1[Param]) ExecuteChild(ctx workflow.Context, opts workflow.Child func (w Workflow1[Param]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error { return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param}) } + +type Workflow1R[Param any, Return any] struct { + Name string + queue *Queue +} + +func NewWorkflow1R[ + Param any, + Return any, +](queue *Queue, name string, +) Workflow1R[Param, Return] { + queue.registerWorkflow(name, (func(context.Context, Param) (Return, error))(nil)) + return Workflow1R[Param, Return]{ + Name: name, + queue: queue, + } +} + +func (w Workflow1R[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl { + return &WorkflowWithImpl{workflowName: w.Name, queue: *w.queue, fn: fn} +} + +func (w Workflow1R[Param, Return]) Register(wr worker.WorkflowRegistry, fn func(workflow.Context, Param) (Return, error)) { + wr.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{ + Name: w.Name, + }) +} + +func (w Workflow1R[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error) { + var ret Return + r, err := w.Execute(ctx, temporalClient, opts, param) + if err != nil { + return ret, err + } + err = r.Get(ctx, &ret) + return ret, err +} + +func (w Workflow1R[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error) { + opts.TaskQueue = w.queue.name + if w.queue.namespace.name != temporalClient.namespace { + // The user must provide a client that's connected to the right namespace to be able to start this workflow. + return nil, fmt.Errorf("wrong namespace for client %s vs workflow %s", temporalClient.namespace, w.queue.namespace.name) + } + return temporalClient.Client.ExecuteWorkflow(ctx, opts, w.Name, param) +} + +func (w Workflow1R[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error) { + var o Return + err := w.ExecuteChild(ctx, opts, param).Get(ctx, &o) + if err != nil { + return o, err + } + return o, nil +} + +func (w Workflow1R[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture { + opts.TaskQueue = w.queue.name + opts.Namespace = w.queue.namespace.name + return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, opts), w.Name, param) +} + +func (w Workflow1R[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error { + return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param}) +}