Skip to content

Commit

Permalink
Remove ActorScanInterval (dapr#7654)
Browse files Browse the repository at this point in the history
* Remove ActorScanInterval

Signed-off-by: ItalyPaleAle <[email protected]>

* Rename file

Signed-off-by: ItalyPaleAle <[email protected]>

---------

Signed-off-by: ItalyPaleAle <[email protected]>
Co-authored-by: Yaron Schneider <[email protected]>
  • Loading branch information
ItalyPaleAle and yaron2 authored Apr 16, 2024
1 parent a524889 commit 2ce5d5d
Show file tree
Hide file tree
Showing 20 changed files with 245 additions and 206 deletions.
65 changes: 18 additions & 47 deletions pkg/actors/actors.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/dapr/dapr/pkg/retry"
"github.com/dapr/dapr/pkg/runtime/compstore"
"github.com/dapr/dapr/pkg/security"
eventqueue "github.com/dapr/kit/events/queue"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
Expand All @@ -63,6 +64,9 @@ const (

errStateStoreNotFound = "actors: state store does not exist or incorrectly configured"
errStateStoreNotConfigured = `actors: state store does not exist or incorrectly configured. Have you set the property '{"name": "actorStateStore", "value": "true"}' in your state store component file?`

// If an idle actor is getting deactivated, but it's still busy, will be re-enqueued with its idle timeout increased by this duration.
actorBusyReEnqueueInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -110,6 +114,8 @@ type Actors interface {
type GRPCConnectionFn func(ctx context.Context, address string, id string, namespace string, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(destroy bool), error)

type actorsRuntime struct {
idleActorProcessor *eventqueue.Processor[string, *actor]

appChannel channel.AppChannel
placement internal.PlacementService
placementEnabled bool
Expand Down Expand Up @@ -223,6 +229,7 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime,

a.timers.SetExecuteTimerFn(a.executeTimer)

a.idleActorProcessor = eventqueue.NewProcessor[string, *actor](a.idleProcessorExecuteFn).WithClock(clock)
return a, nil
}

Expand Down Expand Up @@ -305,14 +312,7 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) {
a.placement.Start(ctx)
}()

a.wg.Add(1)
go func() {
defer a.wg.Done()
a.deactivationTicker(a.actorsConfig, a.haltActor)
}()

log.Infof("Actor runtime started. Actor idle timeout: %v. Actor scan interval: %v",
a.actorsConfig.Config.ActorIdleTimeout, a.actorsConfig.Config.ActorDeactivationScanInterval)
log.Infof("Actor runtime started. Idle timeout: %v", a.actorsConfig.Config.ActorIdleTimeout)

return nil
}
Expand Down Expand Up @@ -464,50 +464,11 @@ func (a *actorsRuntime) deactivateActor(act *actor) error {
return nil
}

func (a *actorsRuntime) removeActorFromTable(actorType, actorID string) {
a.actorsTable.Delete(constructCompositeKey(actorType, actorID))
}

func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) {
typ, id, _ := strings.Cut(key, daprSeparator)
return typ, id
}

func (a *actorsRuntime) deactivationTicker(configuration Config, haltFn internal.HaltActorFn) {
ticker := a.clock.NewTicker(configuration.ActorDeactivationScanInterval)
ch := ticker.C()
defer ticker.Stop()

for {
select {
case t := <-ch:
a.actorsTable.Range(func(key, value any) bool {
actorInstance := value.(*actor)

if actorInstance.isBusy() {
return true
}

if !t.Before(actorInstance.ScheduledTime()) {
a.wg.Add(1)
go func(actorKey string) {
defer a.wg.Done()
actorType, actorID := a.getActorTypeAndIDFromKey(actorKey)
err := haltFn(actorType, actorID)
if err != nil {
log.Errorf("failed to deactivate actor %s: %s", actorKey, err)
}
}(key.(string))
}

return true
})
case <-a.closeCh:
return
}
}
}

// Returns an internal actor instance, allocating it if needed.
// If the actor type does not correspond to an internal actor, the returned boolean is false
func (a *actorsRuntime) getInternalActor(actorType string, actorID string) (InternalActor, bool) {
Expand Down Expand Up @@ -648,6 +609,10 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *internalv1pb.In
if err != nil {
return nil, status.Error(codes.ResourceExhausted, err.Error())
}
err = a.idleActorProcessor.Enqueue(act)
if err != nil {
return nil, fmt.Errorf("failed to enqueue actor in idle processor: %w", err)
}
defer act.unlock()

// Replace method to actors method.
Expand Down Expand Up @@ -1302,6 +1267,12 @@ func (a *actorsRuntime) Close() error {
errs = append(errs, fmt.Errorf("failed to close placement service: %w", err))
}
}
if a.idleActorProcessor != nil {
err := a.idleActorProcessor.Close()
if err != nil {
errs = append(errs, fmt.Errorf("failed to close actor idle processor: %w", err))
}
}
}

return errors.Join(errs...)
Expand Down
Loading

0 comments on commit 2ce5d5d

Please sign in to comment.