diff --git a/activity.go b/activity.go index 93fc3a1..4fb9adf 100644 --- a/activity.go +++ b/activity.go @@ -37,6 +37,8 @@ func (a ActivityWithImpl) validate(q *Queue, v *validationState) error { type Activity[Param, Return any] struct { Name string queue *Queue + // If true, the Param struct fields will be passed as positional arguments + positional bool } // NewActivity declares the existence of an activity on a given queue with a given name. @@ -47,6 +49,37 @@ func NewActivity[Param, Return any](q *Queue, name string) Activity[Param, Retur return Activity[Param, Return]{Name: name, queue: q} } +// NewActivityPositional declares the existence of an activity on a given queue with a given name. +// Instead of passing the Param struct directly to the activity, it passes each field of the struct +// as a separate positional argument in the order they are defined. +func NewActivityPositional[Param, Return any](q *Queue, name string) Activity[Param, Return] { + panicIfNotStruct[Param]("NewActivityPositional") + panicIfNotStruct[Return]("NewActivityPositional") + + // Get the type information for the Param struct + paramType := reflect.TypeOf((*Param)(nil)).Elem() + if paramType.Kind() == reflect.Ptr { + paramType = paramType.Elem() + } + + // Create a slice of function parameter types: (context.Context, field1Type, field2Type, ...) + paramTypes := make([]reflect.Type, paramType.NumField()+1) + paramTypes[0] = reflect.TypeOf((*context.Context)(nil)).Elem() + for i := 0; i < paramType.NumField(); i++ { + paramTypes[i+1] = paramType.Field(i).Type + } + + // Create the function type: func(context.Context, field1Type, field2Type, ...) (Return, error) + returnType := reflect.TypeOf((*Return)(nil)).Elem() + errorType := reflect.TypeOf((*error)(nil)).Elem() + fnType := reflect.FuncOf(paramTypes, []reflect.Type{returnType, errorType}, false) + + // Register a nil function of the correct type + q.registerActivity(name, reflect.Zero(fnType).Interface()) + + return Activity[Param, Return]{Name: name, queue: q, positional: true} +} + func panicIfNotStruct[Param any](funcName string) { paramType := reflect.TypeOf((*Param)(nil)).Elem() if paramType.Kind() == reflect.Ptr { @@ -57,19 +90,93 @@ func panicIfNotStruct[Param any](funcName string) { } } +func extractFieldTypes(structType reflect.Type) []reflect.Type { + if structType.Kind() == reflect.Ptr { + structType = structType.Elem() + } + if structType.Kind() != reflect.Struct { + panic("extractFieldTypes called with non-struct type") + } + + fieldTypes := make([]reflect.Type, structType.NumField()) + for i := 0; i < structType.NumField(); i++ { + fieldTypes[i] = structType.Field(i).Type + } + return fieldTypes +} + // WithImplementation should be called to create the parameters for NewWorker(). It declares which function implements the activity. func (a Activity[Param, Return]) WithImplementation(fn func(context.Context, Param) (Return, error)) *ActivityWithImpl { - return &ActivityWithImpl{activityName: a.Name, queue: a.queue, fn: fn} -} + if !a.positional { + return &ActivityWithImpl{activityName: a.Name, queue: a.queue, fn: fn} + } -// Run executes the activity and synchronously returns the output. -func (a Activity[Param, Return]) Run(ctx workflow.Context, param Param) (Return, error) { - var ret Return - err := a.Execute(ctx, param).Get(ctx, &ret) - return ret, err + // For positional activities, create a wrapper function that converts positional arguments to a struct + paramType := reflect.TypeOf((*Param)(nil)).Elem() + var fieldTypes []reflect.Type + if paramType.Kind() == reflect.Ptr { + fieldTypes = extractFieldTypes(paramType.Elem()) + } else { + fieldTypes = extractFieldTypes(paramType) + } + + wrapper := reflect.MakeFunc( + reflect.FuncOf( + append([]reflect.Type{reflect.TypeOf((*context.Context)(nil)).Elem()}, fieldTypes...), + []reflect.Type{reflect.TypeOf((*Return)(nil)).Elem(), reflect.TypeOf((*error)(nil)).Elem()}, + false, + ), + func(args []reflect.Value) []reflect.Value { + ctx := args[0] + + // Create a new instance of the Param struct + var paramVal reflect.Value + if paramType.Kind() == reflect.Ptr { + paramVal = reflect.New(paramType.Elem()) + // Fill the struct fields with the positional arguments + for i := 0; i < paramType.Elem().NumField(); i++ { + paramVal.Elem().Field(i).Set(args[i+1]) + } + } else { + paramVal = reflect.New(paramType).Elem() + // Fill the struct fields with the positional arguments + for i := 0; i < paramType.NumField(); i++ { + paramVal.Field(i).Set(args[i+1]) + } + } + + // Call the implementation function with the context and constructed struct + results := reflect.ValueOf(fn).Call([]reflect.Value{ctx, paramVal}) + return results + }, + ) + + return &ActivityWithImpl{activityName: a.Name, queue: a.queue, fn: wrapper.Interface()} } -// Execute asynchnronously executes the activity and returns a promise. +// Execute asynchronously executes the activity and returns a promise. func (a Activity[Param, Return]) Execute(ctx workflow.Context, param Param) workflow.Future { - return workflow.ExecuteActivity(workflow.WithTaskQueue(ctx, a.queue.name), a.Name, param) + if !a.positional { + return workflow.ExecuteActivity(workflow.WithTaskQueue(ctx, a.queue.name), a.Name, param) + } + + // For positional activities, extract struct fields into separate arguments + paramVal := reflect.ValueOf(param) + if paramVal.Kind() == reflect.Ptr { + paramVal = paramVal.Elem() + } + + args := make([]interface{}, paramVal.NumField()) + for i := 0; i < paramVal.NumField(); i++ { + args[i] = paramVal.Field(i).Interface() + } + + return workflow.ExecuteActivity(workflow.WithTaskQueue(ctx, a.queue.name), a.Name, args...) +} + +// Run synchronously executes the activity and returns the result. +func (a Activity[Param, Return]) Run(ctx workflow.Context, param Param) (Return, error) { + var result Return + err := a.Execute(ctx, param).Get(ctx, &result) + return result, err }