Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Will NOT Merge] Chasm interface draft #6987

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 71 additions & 0 deletions service/history/chasm/component.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this in a top level chasm directory. There's likely going to be some chasm related code in other services.

Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package chasm

import "context"

type Component interface {
LifecycleState() LifecycleState

// TBD: the framework can just put the component in terminated state
// component lifecycle state can still be running when getting terminated
// but framework will use some rule to block incoming operations
// Terminate()

// we may not need this in the beginning
mustEmbedUnimplementedComponent()
}

// Embed UnimplementedComponent to get forward compatibility
type UnimplementedComponent struct{}

func (UnimplementedComponent) LifecycleState() LifecycleState {
return LifecycleStateUnspecified
}

// func (UnimplementedComponent) Terminate() {}

func (UnimplementedComponent) mustEmbedUnimplementedComponent() {}

// Shall it be named ComponentLifecycleState?
type LifecycleState int

const (
LifecycleStateUnspecified LifecycleState = 0
)
const (
LifecycleStateCreated LifecycleState = 1 << iota
LifecycleStateRunning
// LifecycleStatePaused // <- this can also be a method of the engine: PauseComponent
LifecycleStateCompleted
LifecycleStateFailed
// LifecycleStateTerminated
// LifecycleStateReset
)

type OperationIntent int

const (
OperationIntentProgress OperationIntent = 1 << iota
OperationIntentObserve
)

// The operation intent must come from the context
// as the handler may not pass the endpoint request as Input to,
// say, the chasm.UpdateComponent method.
// So similar to the chasm engine, handler needs to add the intent
// to the context.
type operationIntentCtxKeyType string

const operationIntentCtxKey engineCtxKeyType = "chasmOperationIntent"

func newContextWithOperationIntent(
ctx context.Context,
intent OperationIntent,
) context.Context {
return context.WithValue(ctx, operationIntentCtxKey, intent)
}

func operationIntentFromContext(
ctx context.Context,
) OperationIntent {
return ctx.Value(engineCtxKey).(OperationIntent)

Check failure on line 70 in service/history/chasm/component.go

View workflow job for this annotation

GitHub Actions / golangci

unchecked-type-assertion: type cast result is unchecked in ctx.Value(engineCtxKey).(OperationIntent) - type assertion will panic if not matched (revive)
}
168 changes: 168 additions & 0 deletions service/history/chasm/components/activity/activity.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package activity

import (
"time"

"go.temporal.io/api/common/v1"

Check failure on line 6 in service/history/chasm/components/activity/activity.go

View workflow job for this annotation

GitHub Actions / golangci

import "go.temporal.io/api/common/v1" imported without alias but must be with alias "commonpb" according to config (importas)
"go.temporal.io/server/api/matchingservice/v1"
persistencepb "go.temporal.io/server/api/persistence/v1"

Check failure on line 8 in service/history/chasm/components/activity/activity.go

View workflow job for this annotation

GitHub Actions / golangci

import "go.temporal.io/server/api/persistence/v1" imported as "persistencepb" but must be "persistencespb" according to config (importas)
"go.temporal.io/server/service/history/chasm"
"google.golang.org/protobuf/types/known/timestamppb"
)

type (
Activity struct {
// ALL FIELDS MUST BE EXPORTED for reflection to work

// In V1, we will only support only one non-chasm.XXX field in the struct.
// and that field must be a proto.Message.
// TODO: define a serializer/deserializer interface?
//
// Framework will try to recognize the type and do serialization/deserialization
// proto.Message is recommended so the component get compatibility if state definition changes
State persistencepb.ActivityInfo // proto.Message
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call this standard field Data, since State is usually an enum.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we can let you embed a proto and treat that as the data field. That could be slightly nicer to use.


// At the end of a transition, framework will use reflection to understant the component
// tree structure.

// TODO: also support field name tag, so the fields can be renamed
Input *chasm.Field[*common.Payload] `chasm:"lazy"`
Output *chasm.Field[*common.Payload] `chasm:"lazy"`

EventNotifier *chasm.Field[EventNotifier]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is just to show the pointer capabilities, but the real activity implementation will need to do more than just notify its parent, it'll need to be injected with a way to read the inputs and outputs. They won't be embedded payloads in the workflow case.


// forward compatibility in case new method got added to the chasm.Component interface
chasm.UnimplementedComponent
}
)

func NewScheduledActivity(
chasmContext chasm.MutableContext,
params *NewActivityRequest,
) (*Activity, *NewActivityResponse, error) {
// after return framework will use reflection to analyze
// and understand the structure of the component tree
activity := &Activity{
// State: persistencepb.ActivityInfo{},
}
if params.notifier != nil {
// we need to give some guidance here, likely the implementation of the
// notifier will just be the parent component itself (say Workflow),
// as the handling logic will need to change the state of the parent component
activity.EventNotifier = chasm.NewComponentPointer(chasmContext, params.notifier)
}

_, err := activity.Schedule(chasmContext, &ScheduleRequest{
Input: params.Input,
})
if err != nil {
return nil, &NewActivityResponse{}, err
}

return activity, &NewActivityResponse{}, nil
}

func (i *Activity) Schedule(
chasmContext chasm.MutableContext,
req *ScheduleRequest,
) (*ScheduleResponse, error) {
// also validate current state etc.

i.State.ScheduledTime = timestamppb.New(chasmContext.Now(i))
i.Input = chasm.NewData(chasmContext, &common.Payload{
Data: req.Input,
})

if err := chasmContext.AddTask(
i,
chasm.TaskAttributes{}, // immediate task
DispatchTask{},
); err != nil {
return nil, err
}
if err := chasmContext.AddTask(
i,
chasm.TaskAttributes{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that you've separated this out and not put it in the Task like I had it in my POC.

ScheduledTime: chasmContext.Now(i).Add(10 * time.Second),
},
TimeoutTask{
TimeoutType: TimeoutTypeScheduleToStart,
},
); err != nil {
return nil, nil
}

return &ScheduleResponse{}, nil
}

func (i *Activity) GetDispatchInfo(
chasmContext chasm.MutableContext,
t *DispatchTask,
) (*matchingservice.AddActivityTaskRequest, error) {
panic("not implemented")
}

func (i *Activity) RecordStarted(
chasmContext chasm.MutableContext,
req *RecordStartedRequest,
) (*RecordStartedResponse, error) {

// only this field will be updated
i.State.StartedTime = timestamppb.New(chasmContext.Now(i))
// update other states

payload, err := i.Input.Get(chasmContext)
if err != nil {
return nil, err
}

if err := chasmContext.AddTask(
i,
chasm.TaskAttributes{
ScheduledTime: chasmContext.Now(i).Add(10 * time.Second),
},
TimeoutTask{
TimeoutType: TimeoutTypeStartToClose,
},
); err != nil {
return nil, nil
}

return &RecordStartedResponse{
Input: payload.Data,
}, nil
}

func (i *Activity) RecordCompleted(
chasmContext chasm.MutableContext,
req *RecordCompletedRequest,
) (*RecordCompletedResponse, error) {
// say we have a completedTime field in ActivityInfo
// i.State.CompletedTime = timestamppb.New(chasmContext.Now())
output := &common.Payload{
Data: req.Output,
}
i.Output = chasm.NewData(chasmContext, output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably call this method NewDataField since it constructs a Field.


completedEvent := ActivityCompletedEvent{
Output: output,
}
if notifier, err := i.EventNotifier.Get(chasmContext); err != nil && notifier != nil {
if err := notifier.OnCompletion(completedEvent); err != nil {
return nil, err
}
}

return &RecordCompletedResponse{}, nil
}

func (i *Activity) Describe(
_ chasm.Context,
_ *DescribeActivityRequest,
) (*DescribeActivityResponse, error) {
panic("not implemented")
}

func (i *Activity) LifecycleState() chasm.LifecycleState {
panic("not implemented")
}
23 changes: 23 additions & 0 deletions service/history/chasm/components/activity/event_notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package activity

import (
"time"

"go.temporal.io/api/common/v1"

Check failure on line 6 in service/history/chasm/components/activity/event_notifier.go

View workflow job for this annotation

GitHub Actions / golangci

import "go.temporal.io/api/common/v1" imported without alias but must be with alias "commonpb" according to config (importas)
"go.temporal.io/server/service/history/chasm"
)

type EventNotifier interface {
chasm.Component

OnStart(ActivityStartedEvent) error
OnCompletion(ActivityCompletedEvent) error
}

type ActivityStartedEvent struct {
StartTime time.Time
}

type ActivityCompletedEvent struct {
Output *common.Payload
}
51 changes: 51 additions & 0 deletions service/history/chasm/components/activity/fx.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a library, and should be put in chasm/lib/activity/ IMHO.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Library specific RPCs, configs, and protos should also be colocated in the lib directory.

Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package activity

import (
"go.temporal.io/server/api/matchingservice/v1"
"go.temporal.io/server/service/history/chasm"
"go.uber.org/fx"
)

type Library struct {
matchingClient matchingservice.MatchingServiceClient
}

func (l Library) Name() string {
return "activity"
}

func (l Library) Components() []chasm.RegistrableComponent {
return []chasm.RegistrableComponent{
chasm.NewRegistrableComponent[*Activity](
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm kinda going back and forth between a RegisterComponent function and this approach where the library returns the list of components.

The main downside to this approach is that you have to wrap with NewRegisterableComponent.

If we do want this approach, we may want a LibraryBase struct that has empty implementations for all of the methods so library implementors only provide the items they need registered.

I like that with the concept of a library, you essentially get a namespace for all components and tasks.

"",
chasm.EntityShardingFn(
func(key chasm.EntityKey) string {
return key.NamespaceID + key.BusinessID
},
),
),
}
}

func (l Library) Tasks() []chasm.RegistrableTask {
return []chasm.RegistrableTask{
chasm.NewRegistrableTask(
"dispatchTask",
&DispatchTaskHandler{
l.matchingClient,
},
),
chasm.NewRegistrableTask(
"timeoutTask",
&TimeoutTaskHandler{},
),
}
}

var Module = fx.Options(
fx.Invoke(
func(registry chasm.Registry, matchingClient matchingservice.MatchingServiceClient) {
registry.RegisterLibrary(Library{matchingClient})
},
),
)
Loading
Loading