Skip to content

Commit

Permalink
worker group update
Browse files Browse the repository at this point in the history
  • Loading branch information
zaynt4606 committed Nov 18, 2024
1 parent 5e93ab6 commit d4b5a66
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,7 @@ class CelebornShuffleReader[K, C](
} else {
val numPartitions = dep.partitioner.numPartitions
val numMappers = handle.numMappers
val partitionGroupCnt =
if (conf.groupMapTaskEnabled)
math.ceil(numMappers.toDouble / conf.groupMapTaskGroupSize).toInt
else 1
val partitionGroupCnt = math.ceil(numMappers.toDouble / conf.groupMapTaskGroupSize).toInt
val groupNumPartitions = numPartitions * partitionGroupCnt
(startPartition until endPartition).foreach { originalPartitionId =>
(0 until partitionGroupCnt).foreach { groupCnt =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,13 @@ static class UsableDiskInfo {
}
int partitionGroupSize = partitionIds.size() / numWorkerGroups;
int workerGroupSize = workers.size() / numWorkerGroups;
// The workers in each group are distributed on different racks as much as possible
if (shouldRackAware) {
workers = generateRackAwareWorkers(workers);
}
for (int i = 0; i < numWorkerGroups; i++) {
List<Integer> groupPartitionIds = partitionIds.subList(partitionGroupSize * i, Math.min(partitionIds.size(), partitionGroupSize * (i + 1)));

List<WorkerInfo> groupWorkersList = workers.subList(workerGroupSize * i, Math.min(workers.size(), workerGroupSize * (i + 1)));

Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
Expand Down Expand Up @@ -101,13 +106,21 @@ static class UsableDiskInfo {
}
}
}
slots.putAll(locateSlots(

Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> groupedSlots = locateSlots(
groupPartitionIds,
groupWorkersList,
slotsRestrictions,
workers,
shouldReplicate,
shouldRackAware,
availableStorageTypes));
availableStorageTypes);

groupedSlots.forEach((workerInfo, tuple2) -> {
slots.putIfAbsent(workerInfo, new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
slots.get(workerInfo)._1.addAll(tuple2._1);
slots.get(workerInfo)._2.addAll(tuple2._2);
});
}
return slots;
}
Expand Down Expand Up @@ -147,7 +160,10 @@ static class UsableDiskInfo {
Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> slots = new HashMap<>();
int partitionGroupSize = partitionIds.size() / numWorkerGroups;
int workerGroupSize = workers.size() / numWorkerGroups;

// The workers in each group are distributed on different racks as much as possible
if (shouldRackAware) {
workers = generateRackAwareWorkers(workers);
}
for (int index = 0; index < numWorkerGroups; index++) {
List<Integer> groupPartitionIds = partitionIds.subList(partitionGroupSize * index, Math.min(partitionIds.size(), partitionGroupSize * (index + 1)));
List<WorkerInfo> groupWorkersList = workers.subList(workerGroupSize * index, Math.min(workers.size(), workerGroupSize * (index + 1)));
Expand Down Expand Up @@ -194,13 +210,21 @@ static class UsableDiskInfo {
placeDisksToGroups(usableDisks, diskGroupCount, flushTimeWeight, fetchTimeWeight),
diskToWorkerMap,
shouldReplicate ? groupPartitionIds.size() * 2 : groupPartitionIds.size());
slots.putAll(locateSlots(

Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>> groupedSlots = locateSlots(
groupPartitionIds,
groupWorkersList,
slotsRestrictions,
workers,
shouldReplicate,
shouldRackAware,
availableStorageTypes));
availableStorageTypes);

groupedSlots.forEach((workerInfo, tuple2) -> {
slots.putIfAbsent(workerInfo, new Tuple2<>(new ArrayList<>(), new ArrayList<>()));
slots.get(workerInfo)._1.addAll(tuple2._1);
slots.get(workerInfo)._2.addAll(tuple2._2);
});
}
return slots;
}
Expand Down Expand Up @@ -265,16 +289,19 @@ private static StorageInfo getStorageInfo(
private static Map<WorkerInfo, Tuple2<List<PartitionLocation>, List<PartitionLocation>>>
locateSlots(
List<Integer> partitionIds,
List<WorkerInfo> workersList,
List<WorkerInfo> groupWorkersList,
Map<WorkerInfo, List<UsableDiskInfo>> slotRestrictions,
List<WorkerInfo> workersList,
boolean shouldReplicate,
boolean shouldRackAware,
int availableStorageTypes) {

List<WorkerInfo> workersFromSlotRestrictions = new ArrayList<>(slotRestrictions.keySet());
List<WorkerInfo> groupWorkers = groupWorkersList;
List<WorkerInfo> workers = workersList;
if (shouldReplicate && shouldRackAware) {
workersFromSlotRestrictions = generateRackAwareWorkers(workersFromSlotRestrictions);
groupWorkers = generateRackAwareWorkers(groupWorkers);
workers = generateRackAwareWorkers(workers);
}

Expand All @@ -295,12 +322,15 @@ private static StorageInfo getStorageInfo(
roundRobin(
slots,
remain,
workers,
groupWorkers,
null,
shouldReplicate,
shouldRackAware,
availableStorageTypes);
}
if (!remain.isEmpty()) {
remain = roundRobin(slots, remain, groupWorkers, null, shouldReplicate, false, availableStorageTypes);
}
if (!remain.isEmpty()) {
roundRobin(slots, remain, workers, null, shouldReplicate, false, availableStorageTypes);
}
Expand Down

0 comments on commit d4b5a66

Please sign in to comment.