Skip to content

Commit

Permalink
organize
Browse files Browse the repository at this point in the history
  • Loading branch information
vikstrous committed Feb 11, 2024
1 parent 03248bd commit 4420e24
Showing 1 changed file with 65 additions and 65 deletions.
130 changes: 65 additions & 65 deletions workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,71 +104,6 @@ func setSchedule(ctx context.Context, temporalClient *Client, opts client.Schedu
return nil
}

type Workflow1R[Param any, Return any] struct {
Name string
queue *Queue
}

func NewWorkflow1R[
Param any,
Return any,
](queue *Queue, name string,
) Workflow1R[Param, Return] {
queue.registerWorkflow(name, (func(context.Context, Param) (Return, error))(nil))
return Workflow1R[Param, Return]{
Name: name,
queue: queue,
}
}

func (w Workflow1R[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl {
return &WorkflowWithImpl{workflowName: w.Name, queue: *w.queue, fn: fn}
}

func (w Workflow1R[Param, Return]) Register(wr worker.WorkflowRegistry, fn func(workflow.Context, Param) (Return, error)) {
wr.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{
Name: w.Name,
})
}

func (w Workflow1R[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error) {
var ret Return
r, err := w.Execute(ctx, temporalClient, opts, param)
if err != nil {
return ret, err
}
err = r.Get(ctx, &ret)
return ret, err
}

func (w Workflow1R[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error) {
opts.TaskQueue = w.queue.name
if w.queue.namespace.name != temporalClient.namespace {
// The user must provide a client that's connected to the right namespace to be able to start this workflow.
return nil, fmt.Errorf("wrong namespace for client %s vs workflow %s", temporalClient.namespace, w.queue.namespace.name)
}
return temporalClient.Client.ExecuteWorkflow(ctx, opts, w.Name, param)
}

func (w Workflow1R[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error) {
var o Return
err := w.ExecuteChild(ctx, opts, param).Get(ctx, &o)
if err != nil {
return o, err
}
return o, nil
}

func (w Workflow1R[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture {
opts.TaskQueue = w.queue.name
opts.Namespace = w.queue.namespace.name
return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, opts), w.Name, param)
}

func (w Workflow1R[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error {
return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param})
}

type Workflow0R[Return any] struct {
Name string
queue *Queue
Expand Down Expand Up @@ -314,3 +249,68 @@ func (w Workflow1[Param]) ExecuteChild(ctx workflow.Context, opts workflow.Child
func (w Workflow1[Param]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error {
return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param})
}

type Workflow1R[Param any, Return any] struct {
Name string
queue *Queue
}

func NewWorkflow1R[
Param any,
Return any,
](queue *Queue, name string,
) Workflow1R[Param, Return] {
queue.registerWorkflow(name, (func(context.Context, Param) (Return, error))(nil))
return Workflow1R[Param, Return]{
Name: name,
queue: queue,
}
}

func (w Workflow1R[Param, Return]) WithImplementation(fn func(workflow.Context, Param) (Return, error)) *WorkflowWithImpl {
return &WorkflowWithImpl{workflowName: w.Name, queue: *w.queue, fn: fn}
}

func (w Workflow1R[Param, Return]) Register(wr worker.WorkflowRegistry, fn func(workflow.Context, Param) (Return, error)) {
wr.RegisterWorkflowWithOptions(fn, workflow.RegisterOptions{
Name: w.Name,
})
}

func (w Workflow1R[Param, Return]) Run(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (Return, error) {
var ret Return
r, err := w.Execute(ctx, temporalClient, opts, param)
if err != nil {
return ret, err
}
err = r.Get(ctx, &ret)
return ret, err
}

func (w Workflow1R[Param, Return]) Execute(ctx context.Context, temporalClient *Client, opts client.StartWorkflowOptions, param Param) (client.WorkflowRun, error) {
opts.TaskQueue = w.queue.name
if w.queue.namespace.name != temporalClient.namespace {
// The user must provide a client that's connected to the right namespace to be able to start this workflow.
return nil, fmt.Errorf("wrong namespace for client %s vs workflow %s", temporalClient.namespace, w.queue.namespace.name)
}
return temporalClient.Client.ExecuteWorkflow(ctx, opts, w.Name, param)
}

func (w Workflow1R[Param, Return]) RunChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) (Return, error) {
var o Return
err := w.ExecuteChild(ctx, opts, param).Get(ctx, &o)
if err != nil {
return o, err
}
return o, nil
}

func (w Workflow1R[Param, Return]) ExecuteChild(ctx workflow.Context, opts workflow.ChildWorkflowOptions, param Param) workflow.ChildWorkflowFuture {
opts.TaskQueue = w.queue.name
opts.Namespace = w.queue.namespace.name
return workflow.ExecuteChildWorkflow(workflow.WithChildOptions(ctx, opts), w.Name, param)
}

func (w Workflow1R[Param, Return]) SetSchedule(ctx context.Context, temporalClient *Client, opts client.ScheduleOptions, param Param) error {
return setSchedule(ctx, temporalClient, opts, w.Name, w.queue, []any{param})
}

0 comments on commit 4420e24

Please sign in to comment.