diff --git a/pkg/sharding/leases/doc.go b/pkg/sharding/leases/doc.go new file mode 100644 index 00000000..0bbe05d2 --- /dev/null +++ b/pkg/sharding/leases/doc.go @@ -0,0 +1,19 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package leases implements logic for determining the state of shards based on their membership Lease object. +// It is used by the shardlease controller to maintain the state label. +package leases diff --git a/pkg/sharding/leases/shards.go b/pkg/sharding/leases/shards.go new file mode 100644 index 00000000..b918af5f --- /dev/null +++ b/pkg/sharding/leases/shards.go @@ -0,0 +1,79 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leases + +import ( + "k8s.io/utils/clock" + + coordinationv1 "k8s.io/api/coordination/v1" +) + +type Shard struct { + ID string + State ShardState + Times Times +} + +type Shards []Shard + +func (s Shards) ByID(id string) Shard { + for _, shard := range s { + if shard.ID == id { + return shard + } + } + + return Shard{} +} + +func (s Shards) AvailableShards() Shards { + var shards Shards + for _, shard := range s { + if shard.State.IsAvailable() { + shards = append(shards, shard) + } + } + + return shards +} + +func (s Shards) IDs() []string { + ids := make([]string, len(s)) + for i, shard := range s { + ids[i] = shard.ID + } + + return ids +} + +func ToShards(leases []coordinationv1.Lease, cl clock.PassiveClock) Shards { + shards := make(Shards, 0, len(leases)) + for _, lease := range leases { + l := lease + shards = append(shards, ToShard(&l, cl)) + } + return shards +} + +func ToShard(lease *coordinationv1.Lease, cl clock.PassiveClock) Shard { + times := ToTimes(lease, cl) + return Shard{ + ID: lease.GetName(), + Times: times, + State: toState(lease, times), + } +} diff --git a/pkg/sharding/leases/state.go b/pkg/sharding/leases/state.go new file mode 100644 index 00000000..7958ceec --- /dev/null +++ b/pkg/sharding/leases/state.go @@ -0,0 +1,102 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leases + +import ( + "k8s.io/utils/clock" + + coordinationv1 "k8s.io/api/coordination/v1" +) + +type ShardState int + +const ( + // Unknown is the ShardState if the Lease is not present or misses required fields. + Unknown ShardState = iota + // Orphaned is the ShardState if the Lease has been in state Dead for 1 minute. + Orphaned + // Dead is the ShardState if the Lease is Uncertain and was successfully acquired by the sharder. + Dead + // Uncertain is the ShardState if the Lease has expired more than leaseDuration ago. + Uncertain + // Expired is the ShardState if the Lease has expired less than leaseDuration ago. + Expired + // Ready is the ShardState if the Lease is held by the shard and has not expired. + Ready +) + +func (s ShardState) String() string { + switch s { + case Orphaned: + return "orphaned" + case Dead: + return "dead" + case Uncertain: + return "uncertain" + case Expired: + return "expired" + case Ready: + return "ready" + default: + return "unknown" + } +} + +func StateFromString(state string) ShardState { + switch state { + case "orphaned": + return Orphaned + case "dead": + return Dead + case "uncertain": + return Uncertain + case "expired": + return Expired + case "ready": + return Ready + default: + return Unknown + } +} + +// IsAvailable returns true for shard states that should be considered for object assignment. +func (s ShardState) IsAvailable() bool { + return s >= Uncertain +} + +func ToState(lease *coordinationv1.Lease, cl clock.PassiveClock) ShardState { + return toState(lease, ToTimes(lease, cl)) +} + +func toState(lease *coordinationv1.Lease, t Times) ShardState { + // check if lease was released or acquired by sharder + if holder := lease.Spec.HolderIdentity; holder == nil || *holder == "" || *holder != lease.Name { + if t.ToOrphaned <= 0 { + return Orphaned + } + return Dead + } + + switch { + case t.ToUncertain <= 0: + return Uncertain + case t.ToExpired <= 0: + return Expired + } + + return Ready +} diff --git a/pkg/sharding/leases/times.go b/pkg/sharding/leases/times.go new file mode 100644 index 00000000..a41c7a92 --- /dev/null +++ b/pkg/sharding/leases/times.go @@ -0,0 +1,64 @@ +/* +Copyright 2023 Tim Ebert. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package leases + +import ( + "time" + + "k8s.io/utils/clock" + + coordinationv1 "k8s.io/api/coordination/v1" +) + +const ( + defaultLeaseDuration = 15 * time.Second + leaseTTL = time.Minute +) + +type Times struct { + Expiration time.Time + LeaseDuration time.Duration + + ToExpired time.Duration + ToUncertain time.Duration + ToOrphaned time.Duration +} + +func ToTimes(lease *coordinationv1.Lease, cl clock.PassiveClock) Times { + var ( + t = Times{} + now = cl.Now() + acquireTime = lease.Spec.AcquireTime + renewTime = lease.Spec.RenewTime + durationSeconds = lease.Spec.LeaseDurationSeconds + ) + + if acquireTime == nil || renewTime == nil || durationSeconds == nil { + t.Expiration = now + t.LeaseDuration = defaultLeaseDuration + } else { + t.LeaseDuration = time.Duration(*durationSeconds) * time.Second + t.Expiration = renewTime.Add(t.LeaseDuration) + } + + t.ToExpired = t.Expiration.Sub(now) + t.ToUncertain = t.ToExpired + t.LeaseDuration + // ToOrphaned only applies, if lease is released or acquired by sharded + t.ToOrphaned = t.ToExpired + leaseTTL + + return t +}