diff --git a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java index ffeefaa0866..eceece3b0e9 100644 --- a/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java +++ b/gobblin-iceberg/src/main/java/org/apache/gobblin/iceberg/publisher/GobblinMCEPublisher.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.stream.Collectors; @@ -83,7 +84,6 @@ public class GobblinMCEPublisher extends DataPublisher { public static final String SERIALIZED_AUDIT_COUNT_MAP_KEY = "serializedAuditCountMap"; public GobblinMCEPublisher(State state) throws IOException { - this(state, GobblinMCEProducer.getGobblinMCEProducer(state)); } @@ -96,7 +96,7 @@ public GobblinMCEPublisher(State state, GobblinMCEProducer producer) { public void publishData(Collection states) throws IOException { // First aggregate the new files by partition for (State state : states) { - Map newFiles = computeFileMetrics(state); + Map newFiles = computeFileMetrics(state, state.getPropAsList(NEW_FILES_LIST, "")); Map offsetRange = getPartitionOffsetRange(OFFSET_RANGE_KEY); if (newFiles.isEmpty()) { // There'll be only one dummy file here. This file is parsed for DB and table name calculation. @@ -114,7 +114,7 @@ public void publishData(Collection states) throws IOExc } } - private Map getPartitionOffsetRange(String offsetKey) { + protected Map getPartitionOffsetRange(String offsetKey) { return state.getPropAsList(offsetKey) .stream() .collect(Collectors.toMap(s -> s.split(MAP_DELIMITER_KEY)[0], s -> s.split(MAP_DELIMITER_KEY)[1])); @@ -125,11 +125,11 @@ private Map getPartitionOffsetRange(String offsetKey) { * and calculate the hive spec for each datafile and submit the task to register that datafile * @throws IOException */ - private Map computeFileMetrics(State state) throws IOException { + protected Map computeFileMetrics(State state, List fileList) throws IOException { Map newFiles = new HashMap<>(); NameMapping mapping = getNameMapping(); FileSystem fs = FileSystem.get(conf); - for (final String pathString : state.getPropAsList(NEW_FILES_LIST, "")) { + for (final String pathString : fileList) { Path path = new Path(pathString); LinkedList fileStatuses = new LinkedList<>(); fileStatuses.add(fs.getFileStatus(path)); @@ -153,7 +153,7 @@ private Map computeFileMetrics(State state) throws IOException { * It's used in GMCE writer {@link GobblinMCEWriter} merely for getting the DB and table name. * @throws IOException */ - private Map computeDummyFile(State state) throws IOException { + protected Map computeDummyFile(State state) throws IOException { Map newFiles = new HashMap<>(); FileSystem fs = FileSystem.get(conf); if (!state.contains(ConfigurationKeys.DATA_PUBLISHER_DATASET_DIR)) {