diff --git a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java index 1392fa97f4c..9d530f811f3 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/source/workunit/MultiWorkUnit.java @@ -160,4 +160,15 @@ public int hashCode() { public static MultiWorkUnit createEmpty() { return new MultiWorkUnit(); } + + /** + * Create a new {@link MultiWorkUnit} instance based on provided collection of {@link WorkUnit}s. + * + * @return a the {@link MultiWorkUnit} instance with the provided collection of {@link WorkUnit}s. + */ + public static MultiWorkUnit createMultiWorkUnit(Collection workUnits) { + MultiWorkUnit multiWorkUnit = new MultiWorkUnit(); + multiWorkUnit.addWorkUnits(workUnits); + return multiWorkUnit; + } } diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java index 1bdc2d8bb52..9c22d47cc36 100644 --- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java +++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java @@ -20,9 +20,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import org.apache.gobblin.metrics.event.GobblinEventBuilder; import org.apache.hadoop.fs.Path; @@ -172,6 +174,9 @@ public KafkaTopicGroupingWorkUnitPacker(AbstractSource source, SourceState * - For each topic pack the workunits into a set of topic specific buckets by filling the fullest bucket that can hold * the workunit without exceeding the container capacity. * - The topic specific multi-workunits are squeezed and returned as a workunit. + * + * @param numContainers desired number of containers, which will be the size of return value List. The actual + * num can be smaller or bigger depends on container capacity and total workUnit/partition number */ @Override public List pack(Map> workUnitsByTopic, int numContainers) { @@ -230,6 +235,11 @@ public List pack(Map> workUnitsByTopic, int num } } + // If size of mwuGroups is smaller than numContainers, try to further split the multi WU to respect the container number requirement + if(mwuGroups.size() < numContainers) { + mwuGroups = splitMultiWorkUnits(mwuGroups, numContainers); + } + List squeezedGroups = squeezeMultiWorkUnits(mwuGroups); log.debug("Squeezed work unit groups: " + squeezedGroups); return squeezedGroups; @@ -383,4 +393,40 @@ static double getContainerCapacityForTopic(List capacities, ContainerCap throw new RuntimeException("Unsupported computation strategy: " + strategy.name()); } } + + /** + * A method that split a list of {@link MultiWorkUnit} to the size of desiredWUSize if possible. The approach is to try + * to evenly split the {@link WorkUnit} within MWU into two, and always try to split MWU with more partitions first. + * Stop when each {@link MultiWorkUnit} only contains single {@link WorkUnit} as further split is no possible. + * @param multiWorkUnits the list of {@link MultiWorkUnit} to be split + * @param desiredWUSize desired number of {@link MultiWorkUnit} + * @return splitted MultiWorkUnit + */ + public static List splitMultiWorkUnits(List multiWorkUnits, int desiredWUSize) { + PriorityQueue pQueue = new PriorityQueue<>( + Comparator.comparing(mwu -> mwu.getWorkUnits().size(), Comparator.reverseOrder())); + pQueue.addAll(multiWorkUnits); + + while(pQueue.size() < desiredWUSize) { + MultiWorkUnit mwu = pQueue.poll(); + int size = mwu.getWorkUnits().size(); + // If the size is smaller than 2, meaning each mwu only contains a single wu and can't be further split. + // Add back the polled element and stop the loop. + if(size < 2) { + pQueue.add(mwu); + break; + } + // Split the mwu into 2 with evenly distributed wu + pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(0, size/2))); + pQueue.add(MultiWorkUnit.createMultiWorkUnit(mwu.getWorkUnits().subList(size/2, size))); + } + + log.info("Min size of the container is set to {}, successfully split the multi workunit to {}", desiredWUSize, pQueue.size()); + + // If size is the same, meaning no split can be done. Return the original list to avoid construct a new list + if(multiWorkUnits.size() == pQueue.size()) { + return multiWorkUnits; + } + return new ArrayList<>(pQueue); + } } \ No newline at end of file diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java index b44df38baab..d45e0bf1cb9 100644 --- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java +++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPackerTest.java @@ -16,6 +16,7 @@ */ package org.apache.gobblin.source.extractor.extract.kafka.workunit.packer; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -28,6 +29,7 @@ import org.apache.gobblin.source.extractor.extract.kafka.KafkaUtils; import org.apache.gobblin.source.extractor.extract.kafka.UniversalKafkaSource; import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.MultiWorkUnit; import org.apache.gobblin.source.workunit.WorkUnit; import org.testng.Assert; import org.testng.annotations.BeforeMethod; @@ -51,7 +53,7 @@ public void setUp() { } /** - * Check that capacity is honored. + * Check that capacity is honored. Set numContainers to 0 so the workUnit list size is determined only by the capacity */ @Test public void testSingleTopic() { @@ -64,7 +66,7 @@ public void testSingleTopic() { .newArrayList(getWorkUnitWithTopicPartition("topic1", 1), getWorkUnitWithTopicPartition("topic1", 2), getWorkUnitWithTopicPartition("topic1", 3))); - List workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10); + List workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0); Assert.assertEquals(workUnits.size(), 2); Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1"); Assert.assertEquals(workUnits.get(0).getPropAsInt(KafkaUtils.getPartitionPropName(KafkaSource.PARTITION_ID, 0)), 1); @@ -91,7 +93,7 @@ public void testMultiTopic() { .newArrayList(getWorkUnitWithTopicPartition("topic2", 1), getWorkUnitWithTopicPartition("topic2", 2), getWorkUnitWithTopicPartition("topic2", 3))); - List workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 10); + List workUnits = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()).pack(workUnitsByTopic, 0); Assert.assertEquals(workUnits.size(), 4); Assert.assertEquals(workUnits.get(0).getProp(KafkaSource.TOPIC_NAME), "topic1"); @@ -113,8 +115,49 @@ public void testMultiTopic() { Assert.assertEquals(workUnits.get(3).getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY), 2, 0.001); } + @Test + public void testMultiTopicWithNumContainers() { + KafkaSource source = new UniversalKafkaSource(); + SourceState state = new SourceState(new State(props)); + state.setProp("gobblin.kafka.streaming.enableIndexing", true); + state.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, Files.createTempDir().getAbsolutePath()); + + Map> workUnitsByTopic = ImmutableMap.of( + "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1), + getWorkUnitWithTopicPartition("topic1", 2)), + "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1), + getWorkUnitWithTopicPartition("topic2", 2), + getWorkUnitWithTopicPartition("topic2", 3), + getWorkUnitWithTopicPartition("topic2", 4))); + KafkaTopicGroupingWorkUnitPacker packer = new KafkaTopicGroupingWorkUnitPacker(source, state, Optional.absent()); + List workUnits = packer.pack(workUnitsByTopic, 5); + Assert.assertEquals(workUnits.size(), 5); + + int partitionCount = 0; + for(WorkUnit workUnit : workUnits) { + partitionCount += KafkaUtils.getPartitions(workUnit).size(); + } + Assert.assertEquals(partitionCount, 6); + + workUnitsByTopic = ImmutableMap.of( + "topic1", Lists.newArrayList(getWorkUnitWithTopicPartition("topic1", 1), + getWorkUnitWithTopicPartition("topic1", 2)), + "topic2", Lists.newArrayList(getWorkUnitWithTopicPartition("topic2", 1), + getWorkUnitWithTopicPartition("topic2", 2), + getWorkUnitWithTopicPartition("topic2", 3), + getWorkUnitWithTopicPartition("topic2", 4))); + workUnits = packer.pack(workUnitsByTopic, 7); + // Total WU size wouldn't be more than 6 + Assert.assertEquals(workUnits.size(), 6); + partitionCount = 0; + for(WorkUnit workUnit : workUnits) { + partitionCount += KafkaUtils.getPartitions(workUnit).size(); + } + Assert.assertEquals(partitionCount, 6); + } - public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) { + + public WorkUnit getWorkUnitWithTopicPartition(String topic, int partition) { WorkUnit workUnit = new WorkUnit(new Extract(Extract.TableType.APPEND_ONLY, "kafka", topic)); workUnit.setProp(KafkaSource.TOPIC_NAME, topic); workUnit.setProp(KafkaSource.PARTITION_ID, Integer.toString(partition)); @@ -159,4 +202,33 @@ public void testGetContainerCapacityForTopic() { capacity = KafkaTopicGroupingWorkUnitPacker.getContainerCapacityForTopic(capacities, strategy); Assert.assertEquals(capacity, 1.35, delta); } + + @Test + public void testSplitMultiWorkUnits() { + // Create a list of 2 MWU, each contains 3 WU within + List multiWorkUnitList = new ArrayList<>(); + for (int i = 0; i < 2; i++) { + MultiWorkUnit multiWorkUnit = MultiWorkUnit.createEmpty(); + for (int j = 0; j < 3; j++) { + multiWorkUnit.addWorkUnit(WorkUnit.createEmpty()); + + } + multiWorkUnitList.add(multiWorkUnit); + } + + // minWUSize is smaller than MWU size, so the result should remain the size of list of MWU + List mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 1); + Assert.assertEquals(mwuList.size(), 2); + + mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 3); + Assert.assertEquals(mwuList.size(), 3); + + mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 6); + Assert.assertEquals(mwuList.size(), 6); + + // minWUSize is bigger than number combining of all WU, so the result will be the sum of all WU + mwuList = KafkaTopicGroupingWorkUnitPacker.splitMultiWorkUnits(multiWorkUnitList, 7); + Assert.assertEquals(mwuList.size(), 6); + } + } \ No newline at end of file