Skip to content

Commit

Permalink
remove pentting splits
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou committed May 1, 2024
1 parent eecacd6 commit b9c2490
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand All @@ -44,7 +43,9 @@ public class PaimonSourceSplitEnumerator
/** Source split enumerator context */
private final Context<PaimonSourceSplit> context;

private Map<Integer, List<PaimonSourceSplit>> pendingSplit;
private final Set<PaimonSourceSplit> pendingSplits = new HashSet<>();

private Map<Integer, Set<PaimonSourceSplit>> assignedSplits;

private volatile boolean shouldEnumerate;

Expand All @@ -61,29 +62,26 @@ public PaimonSourceSplitEnumerator(
Context<PaimonSourceSplit> context, Table table, PaimonSourceState sourceState) {
this.context = context;
this.table = table;
this.pendingSplit = new HashMap<>();
this.shouldEnumerate = sourceState == null;
this.shouldEnumerate = (sourceState == null || sourceState.isShouldEnumerate());
this.assignedSplits = new HashedMap();
if (sourceState != null) {
this.shouldEnumerate = sourceState.isShouldEnumerate();
this.pendingSplit.putAll(sourceState.getPendingSplits());
this.assignedSplits.putAll(sourceState.getAssignedSplits());
}
}

@Override
public void open() {
this.pendingSplit = new HashedMap();
this.pendingSplits.addAll(getTableSplits());
}

@Override
public void run() throws Exception {
Set<Integer> readers = context.registeredReaders();
if (shouldEnumerate) {
Set<PaimonSourceSplit> newSplits = getTableSplits();
synchronized (stateLock) {
addPendingSplit(newSplits);
addAssignSplit(pendingSplits);
shouldEnumerate = false;
}

assignSplit(readers);
}
log.debug(
Expand All @@ -99,28 +97,28 @@ public void close() throws IOException {
@Override
public void addSplitsBack(List<PaimonSourceSplit> splits, int subtaskId) {
if (!splits.isEmpty()) {
addPendingSplit(splits);
addAssignSplit(splits);
assignSplit(Collections.singletonList(subtaskId));
}
}

@Override
public int currentUnassignedSplitSize() {
return pendingSplit.size();
return pendingSplits.size();
}

@Override
public void registerReader(int subtaskId) {
log.debug("Register reader {} to PaimonSourceSplitEnumerator.", subtaskId);
if (!pendingSplit.isEmpty()) {
if (!pendingSplits.isEmpty()) {
assignSplit(Collections.singletonList(subtaskId));
}
}

@Override
public PaimonSourceState snapshotState(long checkpointId) throws Exception {
synchronized (stateLock) {
return new PaimonSourceState(pendingSplit, shouldEnumerate);
return new PaimonSourceState(assignedSplits, shouldEnumerate);
}
}

Expand All @@ -134,12 +132,15 @@ public void handleSplitRequest(int subtaskId) {
// do nothing
}

private void addPendingSplit(Collection<PaimonSourceSplit> splits) {
private void addAssignSplit(Collection<PaimonSourceSplit> splits) {
int readerCount = context.currentParallelism();
for (PaimonSourceSplit split : splits) {
int ownerReader = getSplitOwner(split.splitId(), readerCount);
log.info("Assigning {} to {} reader.", split.getSplit().toString(), ownerReader);
pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split);
// remove the assigned splits from pending splits
pendingSplits.remove(split);
// save the state of assigned splits
assignedSplits.computeIfAbsent(ownerReader, r -> new HashSet<>()).add(split);
}
}

Expand All @@ -149,7 +150,7 @@ private void assignSplit(Collection<Integer> readers) {
log.debug("Assign pendingSplits to readers {}", readers);

for (int reader : readers) {
List<PaimonSourceSplit> assignmentForReader = pendingSplit.remove(reader);
Set<PaimonSourceSplit> assignmentForReader = assignedSplits.remove(reader);
if (assignmentForReader != null && !assignmentForReader.isEmpty()) {
log.info(
"Assign splits {} to reader {}",
Expand All @@ -158,14 +159,14 @@ private void assignSplit(Collection<Integer> readers) {
.collect(Collectors.joining(",")),
reader);
try {
context.assignSplit(reader, assignmentForReader);
context.assignSplit(reader, new ArrayList<>(assignmentForReader));
} catch (Exception e) {
log.error(
"Failed to assign splits {} to reader {}",
assignmentForReader,
reader,
e);
pendingSplit.put(reader, assignmentForReader);
pendingSplits.addAll(assignmentForReader);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import lombok.Getter;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.Set;

@AllArgsConstructor
@Getter
/** Paimon connector source state, saves the splits has assigned to readers. */
public class PaimonSourceState implements Serializable {

private static final long serialVersionUID = 1L;
private final Map<Integer, List<PaimonSourceSplit>> pendingSplits;
private final Map<Integer, Set<PaimonSourceSplit>> assignedSplits;
private boolean shouldEnumerate;
}

0 comments on commit b9c2490

Please sign in to comment.