diff --git a/pkg/actors/actors.go b/pkg/actors/actors.go index 91138e01a34..deb5782dc23 100644 --- a/pkg/actors/actors.go +++ b/pkg/actors/actors.go @@ -53,6 +53,7 @@ import ( "github.com/dapr/dapr/pkg/retry" "github.com/dapr/dapr/pkg/runtime/compstore" "github.com/dapr/dapr/pkg/security" + eventqueue "github.com/dapr/kit/events/queue" "github.com/dapr/kit/logger" "github.com/dapr/kit/ptr" ) @@ -63,6 +64,9 @@ const ( errStateStoreNotFound = "actors: state store does not exist or incorrectly configured" errStateStoreNotConfigured = `actors: state store does not exist or incorrectly configured. Have you set the property '{"name": "actorStateStore", "value": "true"}' in your state store component file?` + + // If an idle actor is getting deactivated, but it's still busy, will be re-enqueued with its idle timeout increased by this duration. + actorBusyReEnqueueInterval = 10 * time.Second ) var ( @@ -110,6 +114,8 @@ type Actors interface { type GRPCConnectionFn func(ctx context.Context, address string, id string, namespace string, customOpts ...grpc.DialOption) (*grpc.ClientConn, func(destroy bool), error) type actorsRuntime struct { + idleActorProcessor *eventqueue.Processor[string, *actor] + appChannel channel.AppChannel placement internal.PlacementService placementEnabled bool @@ -223,6 +229,7 @@ func newActorsWithClock(opts ActorsOpts, clock clock.WithTicker) (ActorRuntime, a.timers.SetExecuteTimerFn(a.executeTimer) + a.idleActorProcessor = eventqueue.NewProcessor[string, *actor](a.idleProcessorExecuteFn).WithClock(clock) return a, nil } @@ -305,14 +312,7 @@ func (a *actorsRuntime) Init(ctx context.Context) (err error) { a.placement.Start(ctx) }() - a.wg.Add(1) - go func() { - defer a.wg.Done() - a.deactivationTicker(a.actorsConfig, a.haltActor) - }() - - log.Infof("Actor runtime started. Actor idle timeout: %v. Actor scan interval: %v", - a.actorsConfig.Config.ActorIdleTimeout, a.actorsConfig.Config.ActorDeactivationScanInterval) + log.Infof("Actor runtime started. Idle timeout: %v", a.actorsConfig.Config.ActorIdleTimeout) return nil } @@ -464,50 +464,11 @@ func (a *actorsRuntime) deactivateActor(act *actor) error { return nil } -func (a *actorsRuntime) removeActorFromTable(actorType, actorID string) { - a.actorsTable.Delete(constructCompositeKey(actorType, actorID)) -} - func (a *actorsRuntime) getActorTypeAndIDFromKey(key string) (string, string) { typ, id, _ := strings.Cut(key, daprSeparator) return typ, id } -func (a *actorsRuntime) deactivationTicker(configuration Config, haltFn internal.HaltActorFn) { - ticker := a.clock.NewTicker(configuration.ActorDeactivationScanInterval) - ch := ticker.C() - defer ticker.Stop() - - for { - select { - case t := <-ch: - a.actorsTable.Range(func(key, value any) bool { - actorInstance := value.(*actor) - - if actorInstance.isBusy() { - return true - } - - if !t.Before(actorInstance.ScheduledTime()) { - a.wg.Add(1) - go func(actorKey string) { - defer a.wg.Done() - actorType, actorID := a.getActorTypeAndIDFromKey(actorKey) - err := haltFn(actorType, actorID) - if err != nil { - log.Errorf("failed to deactivate actor %s: %s", actorKey, err) - } - }(key.(string)) - } - - return true - }) - case <-a.closeCh: - return - } - } -} - // Returns an internal actor instance, allocating it if needed. // If the actor type does not correspond to an internal actor, the returned boolean is false func (a *actorsRuntime) getInternalActor(actorType string, actorID string) (InternalActor, bool) { @@ -648,6 +609,10 @@ func (a *actorsRuntime) callLocalActor(ctx context.Context, req *internalv1pb.In if err != nil { return nil, status.Error(codes.ResourceExhausted, err.Error()) } + err = a.idleActorProcessor.Enqueue(act) + if err != nil { + return nil, fmt.Errorf("failed to enqueue actor in idle processor: %w", err) + } defer act.unlock() // Replace method to actors method. @@ -1302,6 +1267,12 @@ func (a *actorsRuntime) Close() error { errs = append(errs, fmt.Errorf("failed to close placement service: %w", err)) } } + if a.idleActorProcessor != nil { + err := a.idleActorProcessor.Close() + if err != nil { + errs = append(errs, fmt.Errorf("failed to close actor idle processor: %w", err)) + } + } } return errors.Join(errs...) diff --git a/pkg/actors/actors_test.go b/pkg/actors/actors_test.go index 7ee7f4256be..3fce8c5dd6c 100644 --- a/pkg/actors/actors_test.go +++ b/pkg/actors/actors_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" kclock "k8s.io/utils/clock" @@ -46,6 +47,7 @@ import ( "github.com/dapr/dapr/pkg/resiliency" "github.com/dapr/dapr/pkg/runtime/compstore" daprt "github.com/dapr/dapr/pkg/testing" + eventqueue "github.com/dapr/kit/events/queue" "github.com/dapr/kit/ptr" ) @@ -59,7 +61,6 @@ const ( var DefaultAppConfig = config.ApplicationConfig{ Entities: []string{"1", "reentrantActor"}, ActorIdleTimeout: "1s", - ActorScanInterval: "2s", DrainOngoingCallTimeout: "3s", DrainRebalancedActors: true, Reentrancy: config.ReentrancyConfig{}, @@ -313,56 +314,24 @@ func fakeCallAndActivateActor(actors *actorsRuntime, actorType, actorID string, actorKey := constructCompositeKey(actorType, actorID) act := newActor(actorType, actorID, &reentrancyStackDepth, actors.actorsConfig.GetIdleTimeoutForType(actorType), clock) actors.actorsTable.LoadOrStore(actorKey, act) + actors.idleActorProcessor.Enqueue(act) } -func deactivateActorWithDuration(testActorsRuntime *actorsRuntime, actorType, actorID string) <-chan struct{} { - fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) - - ch := make(chan struct{}, 1) - go testActorsRuntime.deactivationTicker(testActorsRuntime.actorsConfig, func(at, aid string) error { - if actorType == at { - testActorsRuntime.removeActorFromTable(at, aid) - ch <- struct{}{} - } - return nil - }) - return ch -} - -func assertTestSignal(t *testing.T, clock *clocktesting.FakeClock, ch <-chan struct{}) { - t.Helper() +func deactivateActorsCh(testActorsRuntime *actorsRuntime) <-chan string { + ch := make(chan string, 2) - end := clock.Now().Add(700 * time.Millisecond) - - for { - select { - case <-ch: - // all good + // Replace the processor with a mock one that returns deactivated actors in a channel + testActorsRuntime.idleActorProcessor = eventqueue.NewProcessor[string, *actor](func(act *actor) { + if !testActorsRuntime.idleActorBusyCheck(act) { return - default: } - if clock.Now().After(end) { - require.Fail(t, "did not receive signal in 700ms") - } + key := act.Key() + testActorsRuntime.actorsTable.Delete(key) + ch <- constructCompositeKey(key) + }).WithClock(testActorsRuntime.clock) - // The signal is sent in a background goroutine, so we need to use a wall - // clock here - time.Sleep(time.Millisecond * 5) - advanceTickers(t, clock, time.Millisecond*10) - } -} - -func assertNoTestSignal(t *testing.T, ch <-chan struct{}) { - t.Helper() - - // The signal is sent in a background goroutine, so we need to use a wall clock here - select { - case <-ch: - t.Fatalf("received unexpected signal") - case <-time.After(500 * time.Millisecond): - // all good - } + return ch } // Makes tickers advance @@ -377,49 +346,55 @@ func advanceTickers(t *testing.T, clock *clocktesting.FakeClock, step time.Durat clock.Step(step) } -func TestDeactivationTicker(t *testing.T) { - t.Run("actor is deactivated", func(t *testing.T) { +func TestDeactivateIdleActors(t *testing.T) { + actorType, actorID := getTestActorTypeAndID() + actorKey := constructCompositeKey(actorType, actorID) + + t.Run("idle actor is deactivated", func(t *testing.T) { testActorsRuntime := newTestActorsRuntime(t) + testActorsRuntime.actorsConfig.ActorIdleTimeout = time.Second * 2 defer testActorsRuntime.Close() clock := testActorsRuntime.clock.(*clocktesting.FakeClock) - actorType, actorID := getTestActorTypeAndID() - actorKey := constructCompositeKey(actorType, actorID) - - testActorsRuntime.actorsConfig.ActorIdleTimeout = time.Second * 2 - testActorsRuntime.actorsConfig.ActorDeactivationScanInterval = time.Second * 1 + ch := deactivateActorsCh(testActorsRuntime) - ch := deactivateActorWithDuration(testActorsRuntime, actorType, actorID) + fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) _, exists := testActorsRuntime.actorsTable.Load(actorKey) assert.True(t, exists) - advanceTickers(t, clock, time.Second*3) - - assertTestSignal(t, clock, ch) + clock.Step(2 * time.Second) + select { + case deactivatedKey := <-ch: + assert.Equal(t, actorKey, deactivatedKey) + case <-time.After(700 * time.Millisecond): + t.Errorf("Did not receive signal in time") + } _, exists = testActorsRuntime.actorsTable.Load(actorKey) assert.False(t, exists) }) - t.Run("actor is not deactivated", func(t *testing.T) { + t.Run("non-idle actor is not deactivated", func(t *testing.T) { testActorsRuntime := newTestActorsRuntime(t) + testActorsRuntime.actorsConfig.ActorIdleTimeout = time.Second * 5 defer testActorsRuntime.Close() clock := testActorsRuntime.clock.(*clocktesting.FakeClock) - actorType, actorID := getTestActorTypeAndID() - actorKey := constructCompositeKey(actorType, actorID) + ch := deactivateActorsCh(testActorsRuntime) - testActorsRuntime.actorsConfig.ActorIdleTimeout = time.Second * 5 - testActorsRuntime.actorsConfig.ActorDeactivationScanInterval = time.Second * 1 - - ch := deactivateActorWithDuration(testActorsRuntime, actorType, actorID) + fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) _, exists := testActorsRuntime.actorsTable.Load(actorKey) assert.True(t, exists) - advanceTickers(t, clock, time.Second*3) - assertNoTestSignal(t, ch) + clock.Step(3 * time.Second) + select { + case <-ch: + t.Errorf("Received unexpected signal") + case <-time.After(500 * time.Millisecond): + // all good + } _, exists = testActorsRuntime.actorsTable.Load(actorKey) assert.True(t, exists) @@ -430,26 +405,120 @@ func TestDeactivationTicker(t *testing.T) { defer testActorsRuntime.Close() clock := testActorsRuntime.clock.(*clocktesting.FakeClock) - firstType := "a" - secondType := "b" - actorID := "1" + ch := deactivateActorsCh(testActorsRuntime) - testActorsRuntime.actorsConfig.EntityConfigs[firstType] = internal.EntityConfig{Entities: []string{firstType}, ActorIdleTimeout: time.Second * 2} - testActorsRuntime.actorsConfig.EntityConfigs[secondType] = internal.EntityConfig{Entities: []string{secondType}, ActorIdleTimeout: time.Second * 5} - testActorsRuntime.actorsConfig.ActorDeactivationScanInterval = time.Second * 1 + testActorsRuntime.actorsConfig.EntityConfigs["a"] = internal.EntityConfig{ + Entities: []string{"a"}, + ActorIdleTimeout: time.Second * 2, + } + testActorsRuntime.actorsConfig.EntityConfigs["b"] = internal.EntityConfig{ + Entities: []string{"b"}, + ActorIdleTimeout: time.Second * 5, + } - ch1 := deactivateActorWithDuration(testActorsRuntime, firstType, actorID) - ch2 := deactivateActorWithDuration(testActorsRuntime, secondType, actorID) + // Actors a||1 and a||2 should be deactivated in 2s, while b||1 in 5s + fakeCallAndActivateActor(testActorsRuntime, "a", "1", testActorsRuntime.clock) + fakeCallAndActivateActor(testActorsRuntime, "a", "2", testActorsRuntime.clock) + fakeCallAndActivateActor(testActorsRuntime, "b", "1", testActorsRuntime.clock) + + collectSignals := func() []string { + signals := make([]string, 0) + deadline := time.After(700 * time.Millisecond) + for { + select { + case <-deadline: + slices.Sort(signals) + return signals + case key := <-ch: + signals = append(signals, key) + } + } + } - advanceTickers(t, clock, time.Second*2) - assertTestSignal(t, clock, ch1) - assertNoTestSignal(t, ch2) + assertActorExists := func(t *testing.T, actorKey string, expectExists bool) { + _, exists := testActorsRuntime.actorsTable.Load(actorKey) + assert.Equal(t, expectExists, exists) + } - _, exists := testActorsRuntime.actorsTable.Load(constructCompositeKey(firstType, actorID)) - assert.False(t, exists) + // After 2s, we should have a||1 and a||2 deactivated only + advanceTickers(t, clock, 2*time.Second) + assert.Equal(t, []string{constructCompositeKey("a", "1"), constructCompositeKey("a", "2")}, collectSignals()) + assertActorExists(t, constructCompositeKey("a", "1"), false) + assertActorExists(t, constructCompositeKey("a", "2"), false) + assertActorExists(t, constructCompositeKey("b", "1"), true) + + // Activate a||3 which should be deactivated in 2s, before b||1 + fakeCallAndActivateActor(testActorsRuntime, "a", "3", testActorsRuntime.clock) + advanceTickers(t, clock, 2*time.Second) + assert.Equal(t, []string{constructCompositeKey("a", "3")}, collectSignals()) + assertActorExists(t, constructCompositeKey("a", "3"), false) + assertActorExists(t, constructCompositeKey("b", "1"), true) + + // Activate a||4 which should be deactivated in 2s + // But first, in 1s, b||1 should be deactivated too + fakeCallAndActivateActor(testActorsRuntime, "a", "4", testActorsRuntime.clock) + advanceTickers(t, clock, 1*time.Second) + assert.Equal(t, []string{constructCompositeKey("b", "1")}, collectSignals()) + assertActorExists(t, constructCompositeKey("b", "1"), false) + assertActorExists(t, constructCompositeKey("a", "4"), true) + + // Lastly, a||4 should be deactivated in 1s + advanceTickers(t, clock, 1*time.Second) + assert.Equal(t, []string{constructCompositeKey("a", "4")}, collectSignals()) + assertActorExists(t, constructCompositeKey("a", "4"), false) + }) - _, exists = testActorsRuntime.actorsTable.Load(constructCompositeKey(secondType, actorID)) - assert.True(t, exists) + t.Run("actor is still busy", func(t *testing.T) { + testActorsRuntime := newTestActorsRuntime(t) + testActorsRuntime.actorsConfig.ActorIdleTimeout = time.Second * 2 + defer testActorsRuntime.Close() + clock := testActorsRuntime.clock.(*clocktesting.FakeClock) + + ch := deactivateActorsCh(testActorsRuntime) + + fakeCallAndActivateActor(testActorsRuntime, actorType, actorID, testActorsRuntime.clock) + + actAny, ok := testActorsRuntime.actorsTable.Load(actorKey) + require.True(t, ok) + act, ok := actAny.(*actor) + require.True(t, ok) + + // Get the idleAt timeout + idleAt := *act.idleAt.Load() + require.Equal(t, clock.Now().Add(2*time.Second), idleAt) + + // Lock the actor so it's busy + act.lock(nil) + + // Advance by 3s which should trigger the tick + // The actor should not be deactivated + clock.Step(3 * time.Second) + select { + case <-ch: + t.Errorf("Received unexpected signal") + case <-time.After(500 * time.Millisecond): + // all good + } + _, ok = testActorsRuntime.actorsTable.Load(actorKey) + require.True(t, ok) + + // The actor's idleAt time should have increased + newIdleAt := *act.idleAt.Load() + require.Equal(t, clock.Now().Add(actorBusyReEnqueueInterval), newIdleAt) + + // Unlock the actor + act.unlock() + + // Advance by actorBusyReEnqueueInterval and ensure the actor now is deactivated + clock.Step(actorBusyReEnqueueInterval) + select { + case deactivatedKey := <-ch: + assert.Equal(t, actorKey, deactivatedKey) + case <-time.After(700 * time.Millisecond): + t.Errorf("Did not receive signal in time") + } + _, ok = testActorsRuntime.actorsTable.Load(actorKey) + require.False(t, ok) }) } diff --git a/pkg/actors/config.go b/pkg/actors/config.go index 79a9ffa9258..0df7f879b80 100644 --- a/pkg/actors/config.go +++ b/pkg/actors/config.go @@ -31,7 +31,6 @@ type Config struct { const ( defaultActorIdleTimeout = time.Minute * 60 defaultHeartbeatInterval = time.Second * 1 - defaultActorScanInterval = time.Second * 30 defaultOngoingCallTimeout = time.Second * 60 defaultReentrancyStackLimit = 32 ) @@ -54,30 +53,24 @@ type ConfigOpts struct { // NewConfig returns the actor runtime configuration. func NewConfig(opts ConfigOpts) Config { c := internal.Config{ - HostAddress: opts.HostAddress, - AppID: opts.AppID, - ActorsService: opts.ActorsService, - RemindersService: opts.RemindersService, - Port: opts.Port, - Namespace: opts.Namespace, - DrainRebalancedActors: opts.AppConfig.DrainRebalancedActors, - HostedActorTypes: internal.NewHostedActors(opts.AppConfig.Entities), - Reentrancy: opts.AppConfig.Reentrancy, - RemindersStoragePartitions: opts.AppConfig.RemindersStoragePartitions, - HealthHTTPClient: opts.HealthHTTPClient, - HealthEndpoint: opts.HealthEndpoint, - HeartbeatInterval: defaultHeartbeatInterval, - ActorDeactivationScanInterval: defaultActorScanInterval, - ActorIdleTimeout: defaultActorIdleTimeout, - DrainOngoingCallTimeout: defaultOngoingCallTimeout, - EntityConfigs: make(map[string]internal.EntityConfig), - AppChannelAddress: opts.AppChannelAddress, - PodName: opts.PodName, - } - - scanDuration, err := time.ParseDuration(opts.AppConfig.ActorScanInterval) - if err == nil { - c.ActorDeactivationScanInterval = scanDuration + HostAddress: opts.HostAddress, + AppID: opts.AppID, + ActorsService: opts.ActorsService, + RemindersService: opts.RemindersService, + Port: opts.Port, + Namespace: opts.Namespace, + DrainRebalancedActors: opts.AppConfig.DrainRebalancedActors, + HostedActorTypes: internal.NewHostedActors(opts.AppConfig.Entities), + Reentrancy: opts.AppConfig.Reentrancy, + RemindersStoragePartitions: opts.AppConfig.RemindersStoragePartitions, + HealthHTTPClient: opts.HealthHTTPClient, + HealthEndpoint: opts.HealthEndpoint, + HeartbeatInterval: defaultHeartbeatInterval, + ActorIdleTimeout: defaultActorIdleTimeout, + DrainOngoingCallTimeout: defaultOngoingCallTimeout, + EntityConfigs: make(map[string]internal.EntityConfig), + AppChannelAddress: opts.AppChannelAddress, + PodName: opts.PodName, } idleDuration, err := time.ParseDuration(opts.AppConfig.ActorIdleTimeout) diff --git a/pkg/actors/config_test.go b/pkg/actors/config_test.go index 73d225fc548..4de04c293e3 100644 --- a/pkg/actors/config_test.go +++ b/pkg/actors/config_test.go @@ -33,7 +33,6 @@ const ( func TestConfig(t *testing.T) { config := config.ApplicationConfig{ Entities: []string{"1"}, - ActorScanInterval: "1s", ActorIdleTimeout: "2s", DrainOngoingCallTimeout: "3s", DrainRebalancedActors: true, @@ -54,7 +53,6 @@ func TestConfig(t *testing.T) { assert.Equal(t, "placement:placement:5050", c.ActorsService) assert.Equal(t, internal.NewHostedActors([]string{"1"}), c.HostedActorTypes) assert.Equal(t, 3500, c.Port) - assert.Equal(t, "1s", c.ActorDeactivationScanInterval.String()) assert.Equal(t, "2s", c.ActorIdleTimeout.String()) assert.Equal(t, "3s", c.DrainOngoingCallTimeout.String()) assert.True(t, c.DrainRebalancedActors) @@ -153,7 +151,6 @@ func TestDefaultConfigValuesSet(t *testing.T) { assert.Equal(t, Port, config.Port) assert.Equal(t, Namespace, config.Namespace) assert.NotNil(t, config.ActorIdleTimeout) - assert.NotNil(t, config.ActorDeactivationScanInterval) assert.NotNil(t, config.DrainOngoingCallTimeout) assert.NotNil(t, config.DrainRebalancedActors) } @@ -162,7 +159,6 @@ func TestPerActorTypeConfigurationValues(t *testing.T) { appConfig := config.ApplicationConfig{ Entities: []string{"actor1", "actor2", "actor3", "actor4"}, ActorIdleTimeout: "1s", - ActorScanInterval: "2s", DrainOngoingCallTimeout: "5s", DrainRebalancedActors: true, RemindersStoragePartitions: 1, @@ -201,7 +197,6 @@ func TestPerActorTypeConfigurationValues(t *testing.T) { assert.Equal(t, Port, config.Port) assert.Equal(t, Namespace, config.Namespace) assert.Equal(t, time.Second, config.ActorIdleTimeout) - assert.Equal(t, time.Second*2, config.ActorDeactivationScanInterval) assert.Equal(t, time.Second*5, config.DrainOngoingCallTimeout) assert.True(t, config.DrainRebalancedActors) @@ -234,7 +229,6 @@ func TestOnlyHostedActorTypesAreIncluded(t *testing.T) { appConfig := config.ApplicationConfig{ Entities: []string{"actor1", "actor2"}, ActorIdleTimeout: "1s", - ActorScanInterval: "2s", DrainOngoingCallTimeout: "5s", DrainRebalancedActors: true, RemindersStoragePartitions: 1, diff --git a/pkg/actors/idleprocessor.go b/pkg/actors/idleprocessor.go new file mode 100644 index 00000000000..c580f51c9d2 --- /dev/null +++ b/pkg/actors/idleprocessor.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package actors + +import ( + "github.com/dapr/kit/ptr" +) + +func (a *actorsRuntime) idleProcessorExecuteFn(act *actor) { + // This function is outlined for testing + if !a.idleActorBusyCheck(act) { + return + } + + // Proceed with deactivating the actor + err := a.deactivateActor(act) + if err != nil { + log.Errorf("Failed to deactivate actor %s: %v", act.Key(), err) + } +} + +func (a *actorsRuntime) idleActorBusyCheck(act *actor) bool { + // If the actor is still busy, we will increase its idle time and re-enqueue it + if !act.isBusy() { + return true + } + + act.idleAt.Store(ptr.Of(a.clock.Now().Add(actorBusyReEnqueueInterval))) + a.idleActorProcessor.Enqueue(act) + return false +} diff --git a/pkg/actors/internal/config.go b/pkg/actors/internal/config.go index 5fd2f76b6c1..2bc8a5ca9f7 100644 --- a/pkg/actors/internal/config.go +++ b/pkg/actors/internal/config.go @@ -27,25 +27,24 @@ import ( // Config is the actor runtime configuration. type Config struct { - HostAddress string - AppID string - ActorsService string - RemindersService string - HostedActorTypes *hostedActors - Port int - HeartbeatInterval time.Duration - ActorDeactivationScanInterval time.Duration - ActorIdleTimeout time.Duration - DrainOngoingCallTimeout time.Duration - DrainRebalancedActors bool - Namespace string - Reentrancy daprAppConfig.ReentrancyConfig - RemindersStoragePartitions int - EntityConfigs map[string]EntityConfig - HealthHTTPClient *http.Client - HealthEndpoint string - AppChannelAddress string - PodName string + HostAddress string + AppID string + ActorsService string + RemindersService string + HostedActorTypes *hostedActors + Port int + HeartbeatInterval time.Duration + ActorIdleTimeout time.Duration + DrainOngoingCallTimeout time.Duration + DrainRebalancedActors bool + Namespace string + Reentrancy daprAppConfig.ReentrancyConfig + RemindersStoragePartitions int + EntityConfigs map[string]EntityConfig + HealthHTTPClient *http.Client + HealthEndpoint string + AppChannelAddress string + PodName string } func (c Config) GetRuntimeHostname() string { diff --git a/pkg/actors/internal/placement_service.go b/pkg/actors/internal/placement_service.go index 00e5cc3857b..52d63da1a6b 100644 --- a/pkg/actors/internal/placement_service.go +++ b/pkg/actors/internal/placement_service.go @@ -51,6 +51,7 @@ type PlacementService interface { type LookupActorRequest struct { ActorType string ActorID string + NoCache bool } // ActorKey returns the key for the actor, which is "type/id". diff --git a/pkg/actors/reminders/reminders_test.go b/pkg/actors/reminders/reminders_test.go index 58f16872d7e..1578cf4303e 100644 --- a/pkg/actors/reminders/reminders_test.go +++ b/pkg/actors/reminders/reminders_test.go @@ -464,10 +464,6 @@ func newTestRemindersWithMockAndActorMetadataPartition() *reminders { RemindersStoragePartitions: appConfig.RemindersStoragePartitions, EntityConfigs: make(map[string]internal.EntityConfig), } - scanDuration, err := time.ParseDuration(appConfig.ActorScanInterval) - if err == nil { - conf.ActorDeactivationScanInterval = scanDuration - } idleDuration, err := time.ParseDuration(appConfig.ActorIdleTimeout) if err == nil { diff --git a/pkg/config/app_configuration.go b/pkg/config/app_configuration.go index c744d3db325..89aa2410eb5 100644 --- a/pkg/config/app_configuration.go +++ b/pkg/config/app_configuration.go @@ -18,8 +18,6 @@ type ApplicationConfig struct { Entities []string `json:"entities"` // Duration. example: "1h". ActorIdleTimeout string `json:"actorIdleTimeout"` - // Duration. example: "30s". This value is global. - ActorScanInterval string `json:"actorScanInterval"` // Duration. example: "30s". DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout"` DrainRebalancedActors bool `json:"drainRebalancedActors"` diff --git a/pkg/runtime/runtime_test.go b/pkg/runtime/runtime_test.go index 4defe277538..afb9fd1e0c8 100644 --- a/pkg/runtime/runtime_test.go +++ b/pkg/runtime/runtime_test.go @@ -944,7 +944,6 @@ func TestActorReentrancyConfig(t *testing.T) { fullConfig := `{ "entities":["actorType1", "actorType2"], "actorIdleTimeout": "1h", - "actorScanInterval": "30s", "drainOngoingCallTimeout": "30s", "drainRebalancedActors": true, "reentrancy": { @@ -957,7 +956,6 @@ func TestActorReentrancyConfig(t *testing.T) { minimumConfig := `{ "entities":["actorType1", "actorType2"], "actorIdleTimeout": "1h", - "actorScanInterval": "30s", "drainOngoingCallTimeout": "30s", "drainRebalancedActors": true, "reentrancy": { @@ -968,7 +966,6 @@ func TestActorReentrancyConfig(t *testing.T) { emptyConfig := `{ "entities":["actorType1", "actorType2"], "actorIdleTimeout": "1h", - "actorScanInterval": "30s", "drainOngoingCallTimeout": "30s", "drainRebalancedActors": true }` diff --git a/tests/apps/actorapp/app.go b/tests/apps/actorapp/app.go index 38bc7368d1a..7be1af64965 100644 --- a/tests/apps/actorapp/app.go +++ b/tests/apps/actorapp/app.go @@ -35,7 +35,6 @@ const ( registeredActorType = "testactor" // Actor type must be unique per test app. actorIdleTimeout = "5s" // Short idle timeout. - actorScanInterval = "1s" // Smaller then actorIdleTimeout and short for speedy test. drainOngoingCallTimeout = "1s" drainRebalancedActors = true ) @@ -57,7 +56,6 @@ type actorLogEntry struct { type daprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` } @@ -65,7 +63,6 @@ type daprConfig struct { var daprConfigResponse = daprConfig{ []string{registeredActorType}, actorIdleTimeout, - actorScanInterval, drainOngoingCallTimeout, drainRebalancedActors, } diff --git a/tests/apps/actorfeatures/app.go b/tests/apps/actorfeatures/app.go index 62e8c9352a8..d73215efe38 100644 --- a/tests/apps/actorfeatures/app.go +++ b/tests/apps/actorfeatures/app.go @@ -41,7 +41,6 @@ const ( actorTypeEnvName = "TEST_APP_ACTOR_TYPE" // To set to change actor type. actorRemindersPartitionsEnvName = "TEST_APP_ACTOR_REMINDERS_PARTITIONS" // To set actor type partition count. actorIdleTimeout = "1h" - actorScanInterval = "30s" drainOngoingCallTimeout = "30s" drainRebalancedActors = true secondsToWaitInMethod = 5 @@ -82,7 +81,6 @@ type actorLogEntry struct { type daprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` RemindersStoragePartitions int `json:"remindersStoragePartitions,omitempty"` @@ -234,7 +232,6 @@ func configHandler(w http.ResponseWriter, r *http.Request) { daprConfigResponse := daprConfig{ []string{getActorType()}, actorIdleTimeout, - actorScanInterval, drainOngoingCallTimeout, drainRebalancedActors, getActorRemindersPartitions(), diff --git a/tests/apps/actorinvocationapp/app.go b/tests/apps/actorinvocationapp/app.go index 309c35f80a3..8b4e9084e53 100644 --- a/tests/apps/actorinvocationapp/app.go +++ b/tests/apps/actorinvocationapp/app.go @@ -36,7 +36,6 @@ const ( actorTypesEnvName = "TEST_APP_ACTOR_TYPES" // To set to change actor types. actorIdleTimeout = "5s" // Short idle timeout. - actorScanInterval = "1s" // Smaller then actorIdleTimeout and short for speedy test. drainOngoingCallTimeout = "1s" drainRebalancedActors = true ) @@ -68,7 +67,6 @@ type callRequest struct { type daprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` } @@ -76,7 +74,6 @@ type daprConfig struct { var daprConfigResponse = daprConfig{ getActorTypes(), actorIdleTimeout, - actorScanInterval, drainOngoingCallTimeout, drainRebalancedActors, } diff --git a/tests/apps/actorjava/src/main/java/io/dapr/apps/actor/Application.java b/tests/apps/actorjava/src/main/java/io/dapr/apps/actor/Application.java index a9009cc0585..317e9203e8f 100644 --- a/tests/apps/actorjava/src/main/java/io/dapr/apps/actor/Application.java +++ b/tests/apps/actorjava/src/main/java/io/dapr/apps/actor/Application.java @@ -26,7 +26,6 @@ public class Application { public static void main(String[] args) { ActorRuntime.getInstance().getConfig().setActorIdleTimeout(Duration.ofSeconds(1)); - ActorRuntime.getInstance().getConfig().setActorScanInterval(Duration.ofSeconds(1)); ActorRuntime.getInstance().registerActor(CarActorImpl.class); SpringApplication.run(Application.class, args); } diff --git a/tests/apps/actorload/cmd/stateactor/main.go b/tests/apps/actorload/cmd/stateactor/main.go index c3883e8e513..406d357accf 100644 --- a/tests/apps/actorload/cmd/stateactor/main.go +++ b/tests/apps/actorload/cmd/stateactor/main.go @@ -104,7 +104,6 @@ func main() { service := serve.NewActorService(*appPort, &rt.DaprConfig{ Entities: actorTypes, ActorIdleTimeout: "5m", - ActorScanInterval: "10s", DrainOngoingCallTimeout: "10s", DrainRebalancedActors: true, }) diff --git a/tests/apps/actorload/cmd/stateactor/service/server.go b/tests/apps/actorload/cmd/stateactor/service/server.go index 0a255f83a00..7d90386d1e2 100644 --- a/tests/apps/actorload/cmd/stateactor/service/server.go +++ b/tests/apps/actorload/cmd/stateactor/service/server.go @@ -56,7 +56,6 @@ func NewActorService(port int, config *rt.DaprConfig) *ActorService { daprConfig = rt.DaprConfig{ Entities: []string{}, ActorIdleTimeout: "60m", - ActorScanInterval: "10s", DrainOngoingCallTimeout: "10s", DrainRebalancedActors: true, } diff --git a/tests/apps/actorload/pkg/actor/runtime/config.go b/tests/apps/actorload/pkg/actor/runtime/config.go index e1d47beb8e4..fbc2e1b9662 100644 --- a/tests/apps/actorload/pkg/actor/runtime/config.go +++ b/tests/apps/actorload/pkg/actor/runtime/config.go @@ -17,7 +17,6 @@ package runtime type DaprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` } diff --git a/tests/apps/actorreentrancy/app.go b/tests/apps/actorreentrancy/app.go index 6741722a7c4..b4c93da6fe1 100644 --- a/tests/apps/actorreentrancy/app.go +++ b/tests/apps/actorreentrancy/app.go @@ -34,7 +34,6 @@ const ( actorMethodURLFormat = "http://localhost:%d/v1.0/actors/%s/%s/method/%s" defaultActorType = "reentrantActor" actorIdleTimeout = "1h" - actorScanInterval = "30s" drainOngoingCallTimeout = "30s" drainRebalancedActors = true ) @@ -69,7 +68,6 @@ func (e actorLogEntry) String() string { type daprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` Reentrancy config.ReentrancyConfig `json:"reentrancy,omitempty"` @@ -97,7 +95,6 @@ var ( var daprConfigResponse = daprConfig{ Entities: []string{defaultActorType}, ActorIdleTimeout: actorIdleTimeout, - ActorScanInterval: actorScanInterval, DrainOngoingCallTimeout: drainOngoingCallTimeout, DrainRebalancedActors: drainRebalancedActors, Reentrancy: config.ReentrancyConfig{Enabled: false}, diff --git a/tests/apps/actorstate/app.go b/tests/apps/actorstate/app.go index 7c809b8bc6c..f6481e5575d 100644 --- a/tests/apps/actorstate/app.go +++ b/tests/apps/actorstate/app.go @@ -189,13 +189,11 @@ func configHandler(w http.ResponseWriter, r *http.Request) { json.NewEncoder(w).Encode(struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` }{ Entities: []string{"httpMyActorType", "grpcMyActorType"}, ActorIdleTimeout: "30s", - ActorScanInterval: "10s", DrainOngoingCallTimeout: "20s", DrainRebalancedActors: true, }) diff --git a/tests/apps/perf/actorfeatures/app.go b/tests/apps/perf/actorfeatures/app.go index b587cc6c060..857541a6f5f 100644 --- a/tests/apps/perf/actorfeatures/app.go +++ b/tests/apps/perf/actorfeatures/app.go @@ -33,7 +33,6 @@ const ( defaultActorType = "testactorfeatures" // Actor type must be unique per test app. actorTypeEnvName = "TEST_APP_ACTOR_TYPE" // Env variable tests can set to change actor type. actorIdleTimeout = "1h" - actorScanInterval = "30s" drainOngoingCallTimeout = "30s" drainRebalancedActors = true secondsToWaitInMethod = 5 @@ -42,7 +41,6 @@ const ( type daprConfig struct { Entities []string `json:"entities,omitempty"` ActorIdleTimeout string `json:"actorIdleTimeout,omitempty"` - ActorScanInterval string `json:"actorScanInterval,omitempty"` DrainOngoingCallTimeout string `json:"drainOngoingCallTimeout,omitempty"` DrainRebalancedActors bool `json:"drainRebalancedActors,omitempty"` } @@ -52,7 +50,6 @@ var registeredActorType = map[string]bool{} var daprConfigResponse = daprConfig{ getActorType(), actorIdleTimeout, - actorScanInterval, drainOngoingCallTimeout, drainRebalancedActors, }