Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PartitionInstanceRing.GetReplicationSetForPartitionAndOperation() #623

Merged
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 51 additions & 33 deletions ring/partition_instance_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,46 +63,64 @@ func (r *PartitionInstanceRing) GetReplicationSetsForOperation(op Operation) ([]
zonesBuffer := make([]string, 0, 3) // Pre-allocate buffer assuming 3 zones.

for partitionID := range partitionsRingDesc.Partitions {
ownerIDs := partitionsRing.PartitionOwnerIDs(partitionID)
instances := make([]InstanceDesc, 0, len(ownerIDs))

for _, instanceID := range ownerIDs {
instance, err := r.instancesRing.GetInstance(instanceID)
if err != nil {
// If an instance doesn't exist in the instances ring we don't return an error
// but lookup for other instances of the partition.
continue
}

if !instance.IsHealthy(op, r.heartbeatTimeout, now) {
continue
}

instances = append(instances, instance)
replicationSet, err := r.getReplicationSetForPartitionAndOperation(partitionID, op, now, zonesBuffer)
if err != nil {
return nil, err
}

if len(instances) == 0 {
return nil, fmt.Errorf("partition %d: %w", partitionID, ErrTooManyUnhealthyInstances)
}
result = append(result, replicationSet)
}
return result, nil
}

// Count the number of unique zones among instances.
zonesBuffer = uniqueZonesFromInstances(instances, zonesBuffer[:0])
uniqueZones := len(zonesBuffer)
// GetReplicationSetForPartitionAndOperation returns a ReplicationSet for the input partition. If the partition doesn't
// exist or there are no healthy owners for the partition, an error is returned.
func (r *PartitionInstanceRing) GetReplicationSetForPartitionAndOperation(partitionID int32, op Operation) (ReplicationSet, error) {
zonesBuffer := make([]string, 0, 3) // Pre-allocate buffer assuming 3 zones.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can avoid the allocation here completely.

Suggested change
zonesBuffer := make([]string, 0, 3) // Pre-allocate buffer assuming 3 zones.
var stackZonesBuffer [3]string // Pre-allocate buffer assuming 3 zones.
zonesBuffer := stackZonesBuffer[:]

(I'd also do 5, not 3, let's not be greedy)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Picked the on-stack buffer: bf3dc4a. About 3 vs 5 I will leave it for a separate discussion, given in the ring/ pkg we assume 3 in different places.


result = append(result, ReplicationSet{
Instances: instances,
return r.getReplicationSetForPartitionAndOperation(partitionID, op, time.Now(), zonesBuffer)
}

// Partitions has no concept of zone, but we enable it in order to support ring's requests
// minimization feature.
ZoneAwarenessEnabled: true,
func (r *PartitionInstanceRing) getReplicationSetForPartitionAndOperation(partitionID int32, op Operation, now time.Time, zonesBuffer []string) (ReplicationSet, error) {
partitionsRing := r.PartitionRing()
ownerIDs := partitionsRing.PartitionOwnerIDs(partitionID)
instances := make([]InstanceDesc, 0, len(ownerIDs))

for _, instanceID := range ownerIDs {
instance, err := r.instancesRing.GetInstance(instanceID)
if err != nil {
// If an instance doesn't exist in the instances ring we don't return an error
// but lookup for other instances of the partition.
continue
}

// We need response from at least 1 owner. The assumption is that we have 1 owner per zone
// but it's not guaranteed (depends on how the application was deployed). The safest thing
// we can do here is to just request a successful response from at least 1 zone.
MaxUnavailableZones: uniqueZones - 1,
})
if !instance.IsHealthy(op, r.heartbeatTimeout, now) {
continue
}

instances = append(instances, instance)
}
return result, nil

if len(instances) == 0 {
return ReplicationSet{}, fmt.Errorf("partition %d: %w", partitionID, ErrTooManyUnhealthyInstances)
}

// Count the number of unique zones among instances.
zonesBuffer = uniqueZonesFromInstances(instances, zonesBuffer[:0])
uniqueZones := len(zonesBuffer)

return ReplicationSet{
Instances: instances,

// Partitions has no concept of zone, but we enable it in order to support ring's requests
// minimization feature.
ZoneAwarenessEnabled: true,

// We need response from at least 1 owner. The assumption is that we have 1 owner per zone
// but it's not guaranteed (depends on how the application was deployed). The safest thing
// we can do here is to just request a successful response from at least 1 zone.
MaxUnavailableZones: uniqueZones - 1,
}, nil
}

// ShuffleShard wraps PartitionRing.ShuffleShard().
Expand Down
Loading