Skip to content

Commit

Permalink
add activity0 and activity0r
Browse files Browse the repository at this point in the history
  • Loading branch information
vikstrous committed Feb 11, 2024
1 parent be7cb3c commit 03248bd
Showing 1 changed file with 54 additions and 0 deletions.
54 changes: 54 additions & 0 deletions activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,60 @@ func (a ActivityWithImpl) validate(q *Queue, v *ValidationState) error {
return nil
}

type Activity0 struct {
Name string
queue *Queue
}

func NewActivity0(q *Queue, name string) Activity0 {
q.registerActivity(name, (func(context.Context) error)(nil))
return Activity0{Name: name, queue: q}
}

func (a Activity0) WithImplementation(fn func(context.Context) error) *ActivityWithImpl {
return &ActivityWithImpl{activityName: a.Name, queue: a.queue, fn: fn}
}

func (a Activity0) Run(ctx workflow.Context) error {
return a.Execute(ctx).Get(ctx, nil)
}

func (a Activity0) Execute(ctx workflow.Context) workflow.Future {
return workflow.ExecuteActivity(workflow.WithWorkflowNamespace(workflow.WithTaskQueue(ctx, a.queue.name), a.queue.namespace.name), a.Name)
}

func (a Activity0) Register(w worker.ActivityRegistry, fn func(context.Context) error) {
w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: a.Name})
}

type Activity0R[Return any] struct {
Name string
queue *Queue
}

func NewActivity0R[Return any](q *Queue, name string) Activity0R[Return] {
q.registerActivity(name, (func(context.Context) (Return, error))(nil))
return Activity0R[Return]{Name: name, queue: q}
}

func (a Activity0R[Return]) WithImplementation(fn func(context.Context) (Return, error)) *ActivityWithImpl {
return &ActivityWithImpl{activityName: a.Name, queue: a.queue, fn: fn}
}

func (a Activity0R[Return]) Run(ctx workflow.Context) (Return, error) {
var ret Return
err := a.Execute(ctx).Get(ctx, &ret)
return ret, err
}

func (a Activity0R[Return]) Execute(ctx workflow.Context) workflow.Future {
return workflow.ExecuteActivity(workflow.WithWorkflowNamespace(workflow.WithTaskQueue(ctx, a.queue.name), a.queue.namespace.name), a.Name)
}

func (a Activity0R[Return]) Register(w worker.ActivityRegistry, fn func(context.Context) (Return, error)) {
w.RegisterActivityWithOptions(fn, activity.RegisterOptions{Name: a.Name})
}

type Activity1[Param any] struct {
Name string
queue *Queue
Expand Down

0 comments on commit 03248bd

Please sign in to comment.