Skip to content

Commit

Permalink
Add tracking for active node and task execution counts in propeller (#…
Browse files Browse the repository at this point in the history
…4986)

* Add tracking for active node and task execution counts in propeller

Signed-off-by: Shardool <[email protected]>

* Update unit tests for task and node execution counts

Signed-off-by: Shardool <[email protected]>

* Fix linter errors

Signed-off-by: Shardool <[email protected]>

* fix linter errors

Signed-off-by: Paul Dittamo <[email protected]>

---------

Signed-off-by: Shardool <[email protected]>
Signed-off-by: Paul Dittamo <[email protected]>
Co-authored-by: Paul Dittamo <[email protected]>
  • Loading branch information
sshardool and pvditt authored Apr 4, 2024
1 parent bcdbf5f commit f1c2231
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 45 deletions.
19 changes: 13 additions & 6 deletions flytepropeller/pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ type Controller struct {
workflowStore workflowstore.FlyteWorkflow
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
recorder record.EventRecorder
metrics *metrics
leaderElector *leaderelection.LeaderElector
levelMonitor *ResourceLevelMonitor
executionStats *workflowstore.ExecutionStatsMonitor
}

// Run either as a leader -if configured- or as a standalone process.
Expand Down Expand Up @@ -117,6 +118,7 @@ func (c *Controller) run(ctx context.Context) error {

// Start the collector process
c.levelMonitor.RunCollector(ctx)
c.executionStats.RunStatsMonitor(ctx)

// Start the informer factories to begin populating the informer caches
logger.Info(ctx, "Starting FlyteWorkflow controller")
Expand Down Expand Up @@ -329,7 +331,6 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
if sCfg == nil {
logger.Errorf(ctx, "Storage configuration missing.")
}

store, err := storage.NewDataStore(sCfg, scope.NewSubScope("metastore"))
if err != nil {
return nil, errors.Wrapf(err, "Failed to create Metadata storage")
Expand Down Expand Up @@ -445,7 +446,13 @@ func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Inter
return nil, errors.Wrapf(err, "Failed to create Controller.")
}

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope)
activeExecutions, err := workflowstore.NewExecutionStatsHolder()
if err != nil {
return nil, err
}
controller.executionStats = workflowstore.NewExecutionStatsMonitor(scope.NewSubScope("execstats"), flyteworkflowInformer.Lister(), activeExecutions)

workflowExecutor, err := workflow.NewExecutor(ctx, store, controller.enqueueWorkflowForNodeUpdates, eventSink, controller.recorder, cfg.MetadataPrefix, nodeExecutor, &cfg.EventConfig, cfg.ClusterID, scope, activeExecutions)
if err != nil {
return nil, err
}
Expand Down
36 changes: 31 additions & 5 deletions flytepropeller/pkg/controller/executors/execution_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type ImmutableParentInfo interface {
type ControlFlow interface {
CurrentParallelism() uint32
IncrementParallelism() uint32
CurrentNodeExecutionCount() uint32
IncrementNodeExecutionCount() uint32
CurrentTaskExecutionCount() uint32
IncrementTaskExecutionCount() uint32
}

type ExecutionContext interface {
Expand Down Expand Up @@ -71,16 +75,36 @@ func (p *parentExecutionInfo) CurrentAttempt() uint32 {
type controlFlow struct {
// We could use atomic.Uint32, but this is not required for current Propeller. As every round is run in a single
// thread and using atomic will introduce memory barriers
v uint32
parallelism uint32
nodeExecutionCount uint32
taskExecutionCount uint32
}

func (c *controlFlow) CurrentParallelism() uint32 {
return c.v
return c.parallelism
}

func (c *controlFlow) IncrementParallelism() uint32 {
c.v = c.v + 1
return c.v
c.parallelism = c.parallelism + 1
return c.parallelism
}

func (c *controlFlow) CurrentNodeExecutionCount() uint32 {
return c.nodeExecutionCount
}

func (c *controlFlow) IncrementNodeExecutionCount() uint32 {
c.nodeExecutionCount++
return c.nodeExecutionCount
}

func (c *controlFlow) CurrentTaskExecutionCount() uint32 {
return c.taskExecutionCount
}

func (c *controlFlow) IncrementTaskExecutionCount() uint32 {
c.taskExecutionCount++
return c.taskExecutionCount
}

func NewExecutionContextWithTasksGetter(prevExecContext ExecutionContext, taskGetter TaskDetailsGetter) ExecutionContext {
Expand Down Expand Up @@ -114,6 +138,8 @@ func NewParentInfo(uniqueID string, currentAttempts uint32) ImmutableParentInfo

func InitializeControlFlow() ControlFlow {
return &controlFlow{
v: 0,
parallelism: 0,
nodeExecutionCount: 0,
taskExecutionCount: 0,
}
}
128 changes: 128 additions & 0 deletions flytepropeller/pkg/controller/executors/mocks/execution_context.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions flytepropeller/pkg/controller/nodes/array/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ func createNodeExecutionContext(dataStore *storage.DataStore, eventRecorder inte
},
nil,
)
executionContext.OnIncrementNodeExecutionCount().Return(1)
executionContext.OnIncrementTaskExecutionCount().Return(1)
executionContext.OnCurrentNodeExecutionCount().Return(1)
executionContext.OnCurrentTaskExecutionCount().Return(1)
nCtx.OnExecutionContext().Return(executionContext)

// EventsRecorder
Expand Down
10 changes: 10 additions & 0 deletions flytepropeller/pkg/controller/nodes/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,15 @@ func (c *recursiveNodeExecutor) RecursiveNodeHandler(ctx context.Context, execCo
nodeStatus := nl.GetNodeExecutionStatus(ctx, currentNode.GetID())
nodePhase := nodeStatus.GetPhase()

if nodePhase == v1alpha1.NodePhaseRunning && execContext != nil {
execContext.IncrementNodeExecutionCount()
if currentNode.GetKind() == v1alpha1.NodeKindTask {
execContext.IncrementTaskExecutionCount()
}
logger.Debugf(currentNodeCtx, "recursive handler - node execution count [%v], task execution count [%v], phase [%v], ",
execContext.CurrentNodeExecutionCount(), execContext.CurrentTaskExecutionCount(), nodePhase.String())
}

if canHandleNode(nodePhase) {
// TODO Follow up Pull Request,
// 1. Rename this method to DAGTraversalHandleNode (accepts a DAGStructure along-with) the remaining arguments
Expand Down Expand Up @@ -287,6 +296,7 @@ func (c *recursiveNodeExecutor) handleDownstream(ctx context.Context, execContex
}), nil
}

logger.Debugf(ctx, "downstream handler starting node id %v, ", downstreamNode.GetID())
state, err := c.RecursiveNodeHandler(ctx, execContext, dag, nl, downstreamNode)
if err != nil {
return interfaces.NodeStatusUndefined, err
Expand Down
Loading

0 comments on commit f1c2231

Please sign in to comment.