Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SDK version and name to history #1245

Merged
merged 3 commits into from
Sep 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions internal/internal_event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ type (
contextPropagators []ContextPropagator
deadlockDetectionTimeout time.Duration
sdkFlags *sdkFlags
sdkVersionUpdated bool
sdkVersion string
sdkNameUpdated bool
sdkName string

protocols *protocol.Registry
}
Expand Down Expand Up @@ -350,6 +354,22 @@ func (wc *workflowEnvironmentImpl) Send(msg *protocolpb.Message, opts ...msgSend
wc.outbox = append(wc.outbox, outboxEntry{msg: msg, eventPredicate: sendCfg.pred})
}

func (wc *workflowEnvironmentImpl) getNewSdkNameAndReset() string {
if wc.sdkNameUpdated {
wc.sdkNameUpdated = false
return wc.sdkName
}
return ""
}

func (wc *workflowEnvironmentImpl) getNewSdkVersionAndReset() string {
if wc.sdkVersionUpdated {
wc.sdkVersionUpdated = false
return wc.sdkVersion
}
return ""
}

func (wc *workflowEnvironmentImpl) getNextLocalActivityID() string {
wc.localActivityCounterID++
return getStringID(wc.localActivityCounterID)
Expand Down
65 changes: 55 additions & 10 deletions internal/internal_task_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ type (
next []*historypb.HistoryEvent
nextFlags []sdkFlag
binaryChecksum string
sdkVersion string
sdkName string
}

workflowTaskHeartbeatError struct {
Expand All @@ -195,6 +197,16 @@ type (
flags []sdkFlag
msgs []*protocolpb.Message
binaryChecksum string
sdkVersion string
sdkName string
}

finishedTask struct {
isFailed bool
binaryChecksum string
flags []sdkFlag
sdkVersion string
sdkName string
}
)

Expand Down Expand Up @@ -237,15 +249,15 @@ func (eh *history) IsReplayEvent(event *historypb.HistoryEvent) bool {
return event.GetEventId() <= eh.workflowTask.task.GetPreviousStartedEventId() || isCommandEvent(event.GetEventType())
}

// IsNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// isNextWorkflowTaskFailed checks if the workflow task failed or completed. If it did complete returns some information
// on the completed workflow task.
func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum string, flags []sdkFlag, err error) {
func (eh *history) isNextWorkflowTaskFailed() (task finishedTask, err error) {
nextIndex := eh.currentIndex + 1
// Server can return an empty page so if we need the next event we must keep checking until we either get it
// or know we have no more pages to check
for nextIndex >= len(eh.loadedEvents) && eh.hasMoreEvents() { // current page ends and there is more pages
if err := eh.loadMoreEvents(); err != nil {
return false, "", nil, err
return finishedTask{}, err
}
}

Expand All @@ -262,14 +274,20 @@ func (eh *history) IsNextWorkflowTaskFailed() (isFailed bool, binaryChecksum str
f := sdkFlagFromUint(flag)
if !f.isValid() {
// If a flag is not recognized (value is too high or not defined), it must fail the workflow task
return false, "", nil, errors.New("could not recognize SDK flag")
return finishedTask{}, errors.New("could not recognize SDK flag")
}
flags = append(flags, f)
}
}
return isFailed, binaryChecksum, flags, nil
return finishedTask{
isFailed: isFailed,
binaryChecksum: binaryChecksum,
flags: flags,
sdkName: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName(),
sdkVersion: nextEvent.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion(),
}, nil
}
return false, "", nil, nil
return finishedTask{}, nil
}

func (eh *history) loadMoreEvents() error {
Expand Down Expand Up @@ -318,11 +336,15 @@ func (eh *history) nextTask() (*preparedTask, error) {
}
eh.next = firstTask.events
eh.nextFlags = firstTask.flags
eh.sdkName = firstTask.sdkName
eh.sdkVersion = firstTask.sdkVersion
}

result := eh.next
checksum := eh.binaryChecksum
sdkFlags := eh.nextFlags
sdkName := eh.sdkName
sdkVersion := eh.sdkVersion

var markers []*historypb.HistoryEvent
var msgs []*protocolpb.Message
Expand All @@ -333,6 +355,8 @@ func (eh *history) nextTask() (*preparedTask, error) {
}
eh.next = nextTaskEvents.events
eh.nextFlags = nextTaskEvents.flags
eh.sdkName = nextTaskEvents.sdkName
eh.sdkVersion = nextTaskEvents.sdkVersion
markers = nextTaskEvents.markers
msgs = nextTaskEvents.msgs
}
Expand All @@ -342,6 +366,8 @@ func (eh *history) nextTask() (*preparedTask, error) {
flags: sdkFlags,
msgs: msgs,
binaryChecksum: checksum,
sdkName: sdkName,
sdkVersion: sdkVersion,
}, nil
}

Expand Down Expand Up @@ -408,16 +434,22 @@ OrderEvents:

switch event.GetEventType() {
case enumspb.EVENT_TYPE_WORKFLOW_TASK_STARTED:
isFailed, binaryChecksum, newFlags, err1 := eh.IsNextWorkflowTaskFailed()
finishedTask, err1 := eh.isNextWorkflowTaskFailed()
if err1 != nil {
err := err1
return nil, err
}
if !isFailed {
eh.binaryChecksum = binaryChecksum
if !finishedTask.isFailed {
eh.binaryChecksum = finishedTask.binaryChecksum
eh.currentIndex++
taskEvents.events = append(taskEvents.events, event)
taskEvents.flags = append(taskEvents.flags, newFlags...)
taskEvents.flags = append(taskEvents.flags, finishedTask.flags...)
if finishedTask.sdkName != "" {
taskEvents.sdkName = finishedTask.sdkName
}
if finishedTask.sdkVersion != "" {
taskEvents.sdkVersion = finishedTask.sdkVersion
}
break OrderEvents
}
case enumspb.EVENT_TYPE_WORKFLOW_TASK_SCHEDULED,
Expand Down Expand Up @@ -959,9 +991,20 @@ ProcessEvents:
var msgs *eventMsgIndex
if isReplay {
msgs = indexMessagesByEventID(historyMessages)

eventHandler.sdkVersion = nextTask.sdkVersion
eventHandler.sdkName = nextTask.sdkName
} else {
msgs = indexMessagesByEventID(taskMessages)
taskMessages = []*protocolpb.Message{}
if eventHandler.sdkVersion != SDKVersion {
eventHandler.sdkVersionUpdated = true
eventHandler.sdkVersion = SDKVersion
}
if eventHandler.sdkName != SDKName {
eventHandler.sdkNameUpdated = true
eventHandler.sdkName = SDKName
}
}

eventHandler.sdkFlags.set(flags...)
Expand Down Expand Up @@ -1729,6 +1772,8 @@ func (wth *workflowTaskHandlerImpl) completeWorkflow(
MeteringMetadata: &commonpb.MeteringMetadata{NonfirstLocalActivityExecutionAttempts: nonfirstLAAttempts},
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: langUsedFlags,
SdkName: eventHandler.getNewSdkNameAndReset(),
SdkVersion: eventHandler.getNewSdkVersionAndReset(),
},
WorkerVersionStamp: &commonpb.WorkerVersionStamp{
BuildId: wth.workerBuildID,
Expand Down
4 changes: 4 additions & 0 deletions internal/internal_task_handlers_interfaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,8 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() {
StartedEventId: 3,
SdkMetadata: &sdk.WorkflowTaskCompletedMetadata{
LangUsedFlags: []uint32{SDKFlagLimitChangeVersionSASize},
SdkName: SDKName,
SdkVersion: "1.0",
},
}),
createTestEventVersionMarker(5, 4, "test-id", 1),
Expand Down Expand Up @@ -241,6 +243,8 @@ func (s *PollLayerInterfacesTestSuite) TestGetNextCommandsSdkFlags() {
// function is run.
s.Equal(1, len(nextTask.flags))
s.EqualValues(SDKFlagLimitChangeVersionSASize, nextTask.flags[0])
s.EqualValues(SDKName, nextTask.sdkName)
s.EqualValues("1.0", nextTask.sdkVersion)

nextTask, err = eh.nextTask()

Expand Down
3 changes: 3 additions & 0 deletions internal/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ const (
// Server validates if SDKVersion fits its supported range and rejects request if it doesn't.
SDKVersion = "1.24.0"

// SDKName represents the name of the SDK.
SDKName = clientNameHeaderValue

// SupportedServerVersions is a semver rages (https://github.com/blang/semver#ranges) of server versions that
// are supported by this Temporal SDK.
// Server validates if its version fits into SupportedServerVersions range and rejects request if it doesn't.
Expand Down
34 changes: 34 additions & 0 deletions test/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,40 @@ func (ts *IntegrationTestSuite) TestPanicActivityWorkflow() {
}, res)
}

func (ts *IntegrationTestSuite) TestSDKNameAndVersionWritten() {
const wfID = "test-sdk-name-and-version"
wfOpts := ts.startWorkflowOptions(wfID)
ctx, cancel := context.WithTimeout(context.Background(), ctxTimeout)
defer cancel()

run, err := ts.client.ExecuteWorkflow(ctx, wfOpts, ts.workflows.sleep, time.Second)
ts.NoError(err)

var result int
err = run.Get(ctx, &result)
ts.NoError(err)

iter := ts.client.GetWorkflowHistory(ctx, run.GetID(), run.GetRunID(), false, enumspb.HISTORY_EVENT_FILTER_TYPE_ALL_EVENT)
var firstTaskFound bool
for iter.HasNext() {
event, err := iter.Next()
ts.NoError(err)
if event.EventType == enumspb.EVENT_TYPE_WORKFLOW_TASK_COMPLETED {
sdkName := event.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkName()
sdkVersion := event.GetWorkflowTaskCompletedEventAttributes().GetSdkMetadata().GetSdkVersion()
if !firstTaskFound {
firstTaskFound = true
// The name and version should only be written once if they don't change
ts.Equal(internal.SDKName, sdkName)
ts.Equal(internal.SDKVersion, sdkVersion)
} else {
ts.Equal("", sdkName)
ts.Equal("", sdkVersion)
}
}
}
}

func (ts *IntegrationTestSuite) TestDeadlockDetection() {
var expected []string
wfOpts := ts.startWorkflowOptions("test-deadlock")
Expand Down
Loading