Skip to content

Commit

Permalink
Make KafkaTopicGroupingWorkUnitPacker pack with desired num of contai…
Browse files Browse the repository at this point in the history
…ner (#3814)

* Make KafkaTopicGroupingWorkUnitPacker pack with desired num of container

* update comment
  • Loading branch information
hanghangliu authored Nov 8, 2023
1 parent 5619a0a commit 9e30c6c
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,4 +160,15 @@ public int hashCode() {
public static MultiWorkUnit createEmpty() {
return new MultiWorkUnit();
}

/**
* Create a new {@link MultiWorkUnit} instance based on provided collection of {@link WorkUnit}s.
*
* @return a the {@link MultiWorkUnit} instance with the provided collection of {@link WorkUnit}s.
*/
public static MultiWorkUnit createMultiWorkUnit(Collection<WorkUnit> workUnits) {
MultiWorkUnit multiWorkUnit = new MultiWorkUnit();
multiWorkUnit.addWorkUnits(workUnits);
return multiWorkUnit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;

import java.util.PriorityQueue;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -172,6 +174,9 @@ public KafkaTopicGroupingWorkUnitPacker(AbstractSource<?, ?> source, SourceState
* - For each topic pack the workunits into a set of topic specific buckets by filling the fullest bucket that can hold
* the workunit without exceeding the container capacity.
* - The topic specific multi-workunits are squeezed and returned as a workunit.
*
* @param numContainers desired number of containers, which will be the size of return value List<WorkUnit>. The actual
* num can be smaller or bigger depends on container capacity and total workUnit/partition number
*/
@Override
public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int numContainers) {
Expand Down Expand Up @@ -230,6 +235,11 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
}
}

// If size of mwuGroups is smaller than numContainers, try to further split the multi WU to respect the container number requirement
if(mwuGroups.size() < numContainers) {
mwuGroups = splitMultiWorkUnits(mwuGroups, numContainers);
}

List<WorkUnit> squeezedGroups = squeezeMultiWorkUnits(mwuGroups);
log.debug("Squeezed work unit groups: " + squeezedGroups);
return squeezedGroups;
Expand Down Expand Up @@ -383,4 +393,40 @@ static double getContainerCapacityForTopic(List<Double> capacities, ContainerCap
throw new RuntimeException("Unsupported computation strategy: " + strategy.name());
}
}

/**
* A method that split a list of {@link MultiWorkUnit} to the size of desiredWUSize if possible. The approach is to try
* to evenly split the {@link WorkUnit} within MWU into two, and always try to split MWU with more partitions first.
* Stop when each {@link MultiWorkUnit} only contains single {@link WorkUnit} as further split is no possible.
* @param multiWorkUnits the list of {@link MultiWorkUnit} to be split
* @param desiredWUSize desired number of {@link MultiWorkUnit}
* @return splitted MultiWorkUnit
*/
public static List<MultiWorkUnit> splitMultiWorkUnits(List<MultiWorkUnit> multiWorkUnits, int desiredWUSize) {
PriorityQueue<MultiWorkUnit> pQueue = new PriorityQueue<>(
Comparator.comparing(mwu -> mwu.getWorkUnits().size(), Comparator.reverseOrder()));
pQueue.addAll(multiWorkUnits);

while(pQueue.size() < desiredWUSize) {
MultiWorkUnit mwu = pQueue.poll();
int size = mwu.getWorkUnits().size();
// If the size is smaller than 2, meaning each mwu only contains a single wu and can't be further split.
// Add back the polled element and stop the loop.
if(size < 2) {
pQueue.add(mwu);
break;
}
// Split the mwu into 2 with evenly distributed wu
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(0, size/2)));
pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(size/2, size)));
}

log.info("Min size of the container is set to {}, successfully split the multi workunit to {}", desiredWUSize, pQueue.size());

// If size is the same, meaning no split can be done. Return the original list to avoid construct a new list
if(multiWorkUnits.size() == pQueue.size()) {
return multiWorkUnits;
}
return new ArrayList<>(pQueue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -28,6 +29,7 @@
import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils;
import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource;
import org.apache.gobblin.source.workunit.Extract;
import org.apache.gobblin.source.workunit.MultiWorkUnit;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
Expand All @@ -51,7 +53,7 @@ public void setUp() {
}

/**
* Check that capacity is honored.
* Check that capacity is honored. Set numContainers to 0 so the workUnit list size is determined only by the capacity
*/
@Test
public void testSingleTopic() {
Expand All @@ -64,7 +66,7 @@ public void testSingleTopic() {
.newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2),
getWorkUnitWithTopicPartition("topic1", 3)));

List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 2);
Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1);
Expand All @@ -91,7 +93,7 @@ public void testMultiTopic() {
.newArrayList(getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3)));

List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10);
List<WorkUnit> workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0);
Assert.assertEquals(workUnits.size(), 4);

Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1");
Expand All @@ -113,8 +115,49 @@ public void testMultiTopic() {
Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY), 2, 0.001);
}

@Test
public void testMultiTopicWithNumContainers() {
KafkaSource source = new UniversalKafkaSource();
SourceState state = new SourceState(new State(props));
state.setProp("gobblin.kafka.streaming.enableIndexing", true);
state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath());

Map<String, List<WorkUnit>> workUnitsByTopic = ImmutableMap.of(
"topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1),
getWorkUnitWithTopicPartition("topic1", 2)),
"topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1),
getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3),
getWorkUnitWithTopicPartition("topic2", 4)));
KafkaTopicGroupingWorkUnitPacker packer = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent());
List<WorkUnit> workUnits = packer.pack(workUnitsByTopic, 5);
Assert.assertEquals(workUnits.size(), 5);

int partitionCount = 0;
for(WorkUnit workUnit : workUnits) {
partitionCount += KafkaUtils.getPartitions(workUnit).size();
}
Assert.assertEquals(partitionCount, 6);

workUnitsByTopic = ImmutableMap.of(
"topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1),
getWorkUnitWithTopicPartition("topic1", 2)),
"topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1),
getWorkUnitWithTopicPartition("topic2", 2),
getWorkUnitWithTopicPartition("topic2", 3),
getWorkUnitWithTopicPartition("topic2", 4)));
workUnits = packer.pack(workUnitsByTopic, 7);
// Total WU size wouldn't be more than 6
Assert.assertEquals(workUnits.size(), 6);
partitionCount = 0;
for(WorkUnit workUnit : workUnits) {
partitionCount += KafkaUtils.getPartitions(workUnit).size();
}
Assert.assertEquals(partitionCount, 6);
}

public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {

public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) {
WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", topic));
workUnit.setProp(KafkaSource.TOPIC_NAME, topic);
workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition));
Expand Down Expand Up @@ -159,4 +202,33 @@ public void testGetContainerCapacityForTopic() {
capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy);
Assert.assertEquals(capacity, 1.35, delta);
}

@Test
public void testSplitMultiWorkUnits() {
// Create a list of 2 MWU, each contains 3 WU within
List<MultiWorkUnit> multiWorkUnitList = new ArrayList<>();
for (int i = 0; i < 2; i++) {
MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty();
for (int j = 0; j < 3; j++) {
multiWorkUnit.addWorkUnit(WorkUnit.createEmpty());

}
multiWorkUnitList.add(multiWorkUnit);
}

// minWUSize is smaller than MWU size, so the result should remain the size of list of MWU
List<MultiWorkUnit> mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 1);
Assert.assertEquals(mwuList.size(), 2);

mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 3);
Assert.assertEquals(mwuList.size(), 3);

mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 6);
Assert.assertEquals(mwuList.size(), 6);

// minWUSize is bigger than number combining of all WU, so the result will be the sum of all WU
mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 7);
Assert.assertEquals(mwuList.size(), 6);
}

}

0 comments on commit 9e30c6c

Please sign in to comment.