Skip to content

Commit

Permalink
[core] AddColumn move support before and last.
Browse files Browse the repository at this point in the history
  • Loading branch information
joyCurry30 committed Jul 7, 2024
1 parent db8bcd7 commit 6f97de2
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ class Move implements Serializable {

public enum MoveType {
FIRST,
AFTER
AFTER,
BEFORE,
LAST
}

public static Move first(String fieldName) {
Expand All @@ -422,6 +424,14 @@ public static Move after(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.AFTER);
}

public static Move before(String fieldName, String referenceFieldName) {
return new Move(fieldName, referenceFieldName, MoveType.BEFORE);
}

public static Move last(String fieldName) {
return new Move(fieldName, null, MoveType.LAST);
}

private static final long serialVersionUID = 1L;

private final String fieldName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,28 +339,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();

// key: name ; value : index
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.get(move.fieldName());
int refIndex = 0;
if (move.type().equals(SchemaChange.Move.MoveType.FIRST)) {
checkMoveIndexEqual(move, fieldIndex, refIndex);
newFields.add(refIndex, newFields.remove(fieldIndex));
} else if (move.type().equals(SchemaChange.Move.MoveType.AFTER)) {
refIndex = map.get(move.referenceFieldName());
checkMoveIndexEqual(move, fieldIndex, refIndex);
if (fieldIndex > refIndex) {
newFields.add(refIndex + 1, newFields.remove(fieldIndex));
} else {
newFields.add(refIndex, newFields.remove(fieldIndex));
}
}

applyMove(newFields, move);
} else {
throw new UnsupportedOperationException(
"Unsupported change: " + change.getClass());
Expand Down Expand Up @@ -388,6 +367,70 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}
}

public void applyMove(List<DataField> newFields, SchemaChange.Move move) {
Map<String, Integer> map = new HashMap<>();
for (int i = 0; i < newFields.size(); i++) {
map.put(newFields.get(i).name(), i);
}

int fieldIndex = map.getOrDefault(move.fieldName(), -1);
if (fieldIndex == -1) {
throw new IllegalArgumentException("Field name not found: " + move.fieldName());
}

// Handling FIRST and LAST cases directly since they don't need refIndex
switch (move.type()) {
case FIRST:
checkMoveIndexEqual(move, fieldIndex, 0);
moveField(newFields, fieldIndex, 0);
return;
case LAST:
checkMoveIndexEqual(move, fieldIndex, newFields.size() - 1);
moveField(newFields, fieldIndex, newFields.size() - 1);
return;
}

Integer refIndex = map.getOrDefault(move.referenceFieldName(), -1);
if (refIndex == -1) {
throw new IllegalArgumentException(
"Reference field name not found: " + move.referenceFieldName());
}

checkMoveIndexEqual(move, fieldIndex, refIndex);

// For AFTER and BEFORE, adjust the target index based on current and reference positions
int targetIndex = refIndex;
if (move.type() == SchemaChange.Move.MoveType.AFTER && fieldIndex > refIndex) {
targetIndex++;
}
// Ensure adjustments for moving element forwards or backwards
if (move.type() == SchemaChange.Move.MoveType.BEFORE && fieldIndex < refIndex) {
targetIndex--;
}

if (targetIndex > (newFields.size() - 1)) {
targetIndex = newFields.size() - 1;
}

moveField(newFields, fieldIndex, targetIndex);
}

// Utility method to move a field within the list, handling range checks
private void moveField(List<DataField> newFields, int fromIndex, int toIndex) {
if (fromIndex < 0 || fromIndex >= newFields.size() || toIndex < 0) {
return;
}
DataField fieldToMove = newFields.remove(fromIndex);
newFields.add(toIndex, fieldToMove);
}

private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
if (refIndex == fieldIndex) {
throw new UnsupportedOperationException(
String.format("Cannot move itself for column %s", move.fieldName()));
}
}

public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
TableSchema current =
latest().orElseThrow(
Expand All @@ -406,13 +449,6 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
}
}

private static void checkMoveIndexEqual(SchemaChange.Move move, int fieldIndex, int refIndex) {
if (refIndex == fieldIndex) {
throw new UnsupportedOperationException(
String.format("Cannot move itself for column %s", move.fieldName()));
}
}

private void validateNotPrimaryAndPartitionKey(TableSchema schema, String fieldName) {
/// TODO support partition and primary keys schema evolution
if (schema.partitionKeys().contains(fieldName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.MapType;
Expand All @@ -32,6 +33,7 @@
import org.apache.paimon.utils.FailingFileIO;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
Expand All @@ -42,6 +44,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -297,4 +300,145 @@ public void testDeleteSchemaWithSchemaId() throws Exception {
manager.deleteSchema(manager.latest().get().id());
assertThat(manager.latest().get().toString()).isEqualTo(schemaContent);
}

@Test
public void testApplyMoveFirstAndLast() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Use factory methods to create Move objects
SchemaChange.Move moveFirst = SchemaChange.Move.first("f2");
SchemaChange.Move moveLast = SchemaChange.Move.last("f0");

// Test FIRST operation
manager.applyMove(fields, moveFirst);
Assertions.assertEquals(
2,
fields.get(0).id(),
"The field id should remain as 2 after moving f2 to the first position");
Assertions.assertEquals(
fields.get(0).name(), "f2", "f2 should be moved to the first position");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Test LAST operation
manager.applyMove(fields, moveLast);
Assertions.assertEquals(
0,
fields.get(fields.size() - 1).id(),
"The field id should remain as 0 after moving f0 to the last position");
Assertions.assertEquals(
"f0",
fields.get(fields.size() - 1).name(),
"f0 should be moved to the last position");
}

@Test
public void testMoveAfter() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

// Test AFTER operation
SchemaChange.Move moveAfter = SchemaChange.Move.after("f1", "f2");
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
1, fields.get(2).id(), "The field id should remain as 1 after moving f1 after f2");
Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be after f2");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f3", "f1");
// Test AFTER operation
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
3, fields.get(2).id(), "The field id should remain as 3 after moving f3 after f1");
Assertions.assertEquals("f3", fields.get(2).name(), "f3 should be after f1");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f0", "f2");
// Test move column after last column
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
0, fields.get(2).id(), "The field id should remain as 0 after moving f0 after f2");
Assertions.assertEquals("f0", fields.get(2).name(), "f0 should be after f2");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveAfter = SchemaChange.Move.after("f0", "f3");
// Test move column after last column
manager.applyMove(fields, moveAfter);
Assertions.assertEquals(
0, fields.get(3).id(), "The field id should remain as 0 after moving f0 after f3");
Assertions.assertEquals("f0", fields.get(3).name(), "f0 should be after f3");
}

@Test
public void testMoveBefore() {
// Create the initial list of fields
List<DataField> fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

SchemaChange.Move moveBefore = SchemaChange.Move.before("f2", "f1");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
2, fields.get(1).id(), "The field id should remain as 2 after moving f2 before f1");
Assertions.assertEquals("f2", fields.get(1).name(), "f2 should be before f1");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveBefore = SchemaChange.Move.before("f1", "f3");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
1, fields.get(2).id(), "The field id should remain as 1 after moving f1 before f3");
Assertions.assertEquals("f1", fields.get(2).name(), "f1 should be before f3");

// Reset fields to initial state
fields = new LinkedList<>();
fields.add(new DataField(0, "f0", DataTypes.INT()));
fields.add(new DataField(1, "f1", DataTypes.BIGINT()));
fields.add(new DataField(2, "f2", DataTypes.STRING()));
fields.add(new DataField(3, "f3", DataTypes.SMALLINT()));

moveBefore = SchemaChange.Move.before("f2", "f0");
manager.applyMove(fields, moveBefore);
Assertions.assertEquals(
2, fields.get(0).id(), "The field id should remain as 2 after moving f2 before f0");
}
}

0 comments on commit 6f97de2

Please sign in to comment.