For example, if we want to drop the list of fields {@code ["foo", "baz"]}, for this input
+ * {@link Row}:
+ *
+ *
fields) {
+ checkUnconfigured();
+ verifyNoNestedFields(fields, "drop");
+ validateSchemaContainsFields(rowSchema, fields, "drop");
+ transformedSchema = dropFields(rowSchema, fields);
+ return this;
+ }
+
+ /**
+ * Configures this {@link RowFilter} to only output the contents of a single row field.
+ *
+ * For example, if we want to only extract the contents of field "foo" for this input {@link
+ * Row}:
+ *
+ *
{@code
+ * abc: 123
+ * bar: my_str
+ * foo:
+ * xyz:
+ * baz: 456
+ * qwe: 789
+ * }
+ *
+ * we will get the following output {@link Row}:
+ *
+ * {@code
+ * xyz:
+ * baz: 456
+ * qwe: 789
+ * }
+ *
+ * Note that this will fail if the field is not of type {@link Row}, e.g. if {@code "abc"} is
+ * specified for the example above.
+ */
+ public RowFilter only(String field) {
+ checkUnconfigured();
+ validateSchemaContainsFields(rowSchema, Collections.singletonList(field), "only");
+ Schema.Field rowField = rowSchema.getField(field);
+ Preconditions.checkArgument(
+ rowField.getType().getTypeName().equals(Schema.TypeName.ROW),
+ "Expected type '%s' for field '%s', but instead got type '%s'.",
+ Schema.TypeName.ROW,
+ rowField.getName(),
+ rowField.getType().getTypeName());
+
+ transformedSchema = rowField.getType().getRowSchema();
+ onlyField = field;
+ return this;
+ }
+
+ /**
+ * Performs a filter operation (keep or drop) on the input {@link Row}. Must have already
+ * configured a filter operation with {@link #dropping(List)} or {@link #keeping(List)} for this
+ * {@link RowFilter}.
+ *
+ *
If not yet configured, will simply return the same {@link Row}.
+ */
+ public Row filter(Row row) {
+ if (transformedSchema == null) {
+ return row;
+ }
+
+ Preconditions.checkState(
+ row.getSchema().assignableTo(rowSchema),
+ "Encountered Row with schema that is incompatible with this RowFilter's schema."
+ + "\nRow schema: %s"
+ + "\nSchema used to initialize this RowFilter: %s",
+ row.getSchema(),
+ rowSchema);
+
+ // 'only' case
+ if (onlyField != null) {
+ return checkStateNotNull(row.getRow(onlyField));
+ }
+
+ // 'keep' and 'drop'
+ return Preconditions.checkNotNull(copyWithNewSchema(row, outputSchema()));
+ }
+
+ /** Returns the output {@link Row}'s {@link Schema}. */
+ public Schema outputSchema() {
+ return transformedSchema != null ? transformedSchema : rowSchema;
+ }
+
+ private void checkUnconfigured() {
+ Preconditions.checkState(
+ transformedSchema == null,
+ "This RowFilter has already been configured to filter to the following Schema: %s",
+ transformedSchema);
+ }
+
+ /** Verifies that this selection contains no nested fields. */
+ private void verifyNoNestedFields(List fields, String operation) {
+ List nestedFields = new ArrayList<>();
+ for (String field : fields) {
+ if (field.contains(".")) {
+ nestedFields.add(field);
+ }
+ }
+ if (!nestedFields.isEmpty()) {
+ throw new IllegalArgumentException(
+ String.format(
+ "RowFilter does not support specifying nested fields to %s: %s",
+ operation, nestedFields));
+ }
+ }
+
+ /**
+ * Checks whether a {@link Schema} contains a list of field names. Nested fields can be expressed
+ * with dot-notation. Throws a helpful error in the case where a field doesn't exist, or if a
+ * nested field could not be reached.
+ */
+ @VisibleForTesting
+ static void validateSchemaContainsFields(
+ Schema schema, List specifiedFields, String operation) {
+ Set notFound = new HashSet<>();
+ Set notRowField = new HashSet<>();
+
+ for (String field : specifiedFields) {
+ List levels = Splitter.on(".").splitToList(field);
+
+ Schema currentSchema = schema;
+
+ for (int i = 0; i < levels.size(); i++) {
+ String currentFieldName = String.join(".", levels.subList(0, i + 1));
+
+ if (!currentSchema.hasField(levels.get(i))) {
+ notFound.add(currentFieldName);
+ break;
+ }
+
+ if (i + 1 < levels.size()) {
+ Schema.Field nextField = currentSchema.getField(levels.get(i));
+ if (!nextField.getType().getTypeName().equals(Schema.TypeName.ROW)) {
+ notRowField.add(currentFieldName);
+ break;
+ }
+ currentSchema = Preconditions.checkNotNull(nextField.getType().getRowSchema());
+ }
+ }
+ }
+
+ if (!notFound.isEmpty() || !notRowField.isEmpty()) {
+ String message = "Validation failed for '" + operation + "'.";
+ if (!notFound.isEmpty()) {
+ message += "\nRow Schema does not contain the following specified fields: " + notFound;
+ }
+ if (!notRowField.isEmpty()) {
+ message +=
+ "\nThe following specified fields are not of type Row. Their nested fields could not be reached: "
+ + notRowField;
+ }
+ throw new IllegalArgumentException(message);
+ }
+ }
+
+ /**
+ * Creates a field tree, separating each top-level field from its (potential) nested fields. E.g.
+ * ["foo.bar.baz", "foo.abc", "xyz"] --> {"foo": ["bar.baz", "abc"], "xyz": []}
+ */
+ @VisibleForTesting
+ static Map> getFieldTree(List fields) {
+ Map> fieldTree = Maps.newHashMap();
+
+ for (String field : fields) {
+ List components = Splitter.on(".").splitToList(field);
+ String root = components.get(0);
+ fieldTree.computeIfAbsent(root, r -> new ArrayList<>());
+
+ if (components.size() > 1) {
+ String nestedFields = String.join(".", components.subList(1, components.size()));
+ Preconditions.checkNotNull(fieldTree.get(root)).add(nestedFields);
+ }
+ }
+ return fieldTree;
+ }
+
+ /**
+ * Returns a new {@link Row} containing only the fields that intersect with the new {@link Schema}
+ * Relies on a previous step to have validated the compatibility of the new {@link Schema}.
+ */
+ @VisibleForTesting
+ @Nullable
+ static Row copyWithNewSchema(@Nullable Row row, Schema newSchema) {
+ if (row == null) {
+ return null;
+ }
+ Map values = new HashMap<>(newSchema.getFieldCount());
+
+ for (Schema.Field field : newSchema.getFields()) {
+ String name = field.getName();
+ Object value = row.getValue(name);
+ if (field.getType().getTypeName().equals(Schema.TypeName.ROW)) {
+ Schema nestedRowSchema = Preconditions.checkNotNull(field.getType().getRowSchema());
+ value = copyWithNewSchema(row.getRow(name), nestedRowSchema);
+ }
+ if (value != null) {
+ values.put(name, value);
+ }
+ }
+ return Row.withSchema(newSchema).withFieldValues(values).build();
+ }
+
+ /**
+ * Returns a new {@link Schema} with the specified fields removed.
+ *
+ * No guarantee that field ordering will remain the same.
+ */
+ @VisibleForTesting
+ static Schema dropFields(Schema schema, List fieldsToDrop) {
+ if (fieldsToDrop.isEmpty()) {
+ return schema;
+ }
+ List newFieldsList = new ArrayList<>(schema.getFields());
+ Map> fieldTree = getFieldTree(fieldsToDrop);
+
+ for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) {
+ String root = fieldAndDescendents.getKey();
+ List nestedFields = fieldAndDescendents.getValue();
+ Schema.Field fieldToRemove = schema.getField(root);
+ Schema.FieldType typeToRemove = fieldToRemove.getType();
+
+ // Base case: we're at the specified field to remove.
+ if (nestedFields.isEmpty()) {
+ newFieldsList.remove(fieldToRemove);
+ } else {
+ // Otherwise, we're asked to remove a nested field. Verify current field is ROW type
+ Preconditions.checkArgument(
+ typeToRemove.getTypeName().equals(Schema.TypeName.ROW),
+ "Expected type %s for specified nested field '%s', but instead got type %s.",
+ Schema.TypeName.ROW,
+ root,
+ typeToRemove.getTypeName());
+
+ Schema nestedSchema = Preconditions.checkNotNull(typeToRemove.getRowSchema());
+ Schema newNestedSchema = dropFields(nestedSchema, nestedFields);
+ Schema.Field modifiedField =
+ Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+ .withNullable(typeToRemove.getNullable());
+
+ // Replace with modified field
+ newFieldsList.set(newFieldsList.indexOf(fieldToRemove), modifiedField);
+ }
+ }
+ return new Schema(newFieldsList);
+ }
+
+ /**
+ * Returns a new {@link Schema} with only the specified fields kept.
+ *
+ * No guarantee that field ordering will remain the same.
+ */
+ @VisibleForTesting
+ static Schema keepFields(Schema schema, List fieldsToKeep) {
+ if (fieldsToKeep.isEmpty()) {
+ return schema;
+ }
+ List newFieldsList = new ArrayList<>(fieldsToKeep.size());
+ Map> fieldTree = getFieldTree(fieldsToKeep);
+
+ for (Map.Entry> fieldAndDescendents : fieldTree.entrySet()) {
+ String root = fieldAndDescendents.getKey();
+ List nestedFields = fieldAndDescendents.getValue();
+ Schema.Field fieldToKeep = schema.getField(root);
+ Schema.FieldType typeToKeep = fieldToKeep.getType();
+
+ // Base case: we're at the specified field to keep, and we can skip this conditional.
+ // Otherwise: we're asked to keep a nested field, so we dig deeper to determine which nested
+ // fields to keep
+ if (!nestedFields.isEmpty()) {
+ Preconditions.checkArgument(
+ typeToKeep.getTypeName().equals(Schema.TypeName.ROW),
+ "Expected type %s for specified nested field '%s', but instead got type %s.",
+ Schema.TypeName.ROW,
+ root,
+ typeToKeep.getTypeName());
+
+ Schema nestedSchema = Preconditions.checkNotNull(typeToKeep.getRowSchema());
+ Schema newNestedSchema = keepFields(nestedSchema, nestedFields);
+ fieldToKeep =
+ Schema.Field.of(root, Schema.FieldType.row(newNestedSchema))
+ .withNullable(typeToKeep.getNullable());
+ }
+ newFieldsList.add(fieldToKeep);
+ }
+
+ return new Schema(newFieldsList);
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
new file mode 100644
index 000000000000..22c17f6d07c9
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowFilterTest.java
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Tests for {@link RowFilter}. */
+public class RowFilterTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("doubly_nested_str")
+ .addInt32Field("doubly_nested_int")
+ .build();
+
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build();
+ private static final Schema ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addBooleanField("bool")
+ .addNullableInt32Field("nullable_int")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", NESTED_ROW_SCHEMA)
+ .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+ .build();
+
+ @Test
+ public void testSchemaValidation() {
+ List> goodFields =
+ Arrays.asList(
+ Arrays.asList("str", "bool", "nullable_row"),
+ Arrays.asList("nullable_int", "arr_int"),
+ Arrays.asList("row.nested_str", "row.nested_row.doubly_nested_str"),
+ Arrays.asList("nullable_row.nested_row.doubly_nested_int"));
+
+ for (List fields : goodFields) {
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, fields, "test-operation");
+ }
+ }
+
+ @Test
+ public void testSchemaValidationFailsWithHelpfulErrorForMissingFields() {
+ List, List>> nonExistentFields =
+ Arrays.asList(
+ KV.of(
+ Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3"),
+ Arrays.asList("nonexistent_1", "nonexistent_2", "nonexistent_3")),
+ KV.of(
+ Arrays.asList("nullable_int", "arr_int", "nonexistent"),
+ Collections.singletonList("nonexistent")),
+ KV.of(
+ Arrays.asList(
+ "nullable_row.nested_row.nonexistent", "row.nonexistent", "row.nested_float"),
+ Arrays.asList("nullable_row.nested_row.nonexistent", "row.nonexistent")));
+
+ for (KV, List> fields : nonExistentFields) {
+ List allFields = fields.getKey();
+ List badFields = fields.getValue();
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation"));
+
+ assertThat(e.getMessage(), containsString("Validation failed for 'test-operation'"));
+ assertThat(
+ e.getMessage(),
+ containsString("Row Schema does not contain the following specified fields"));
+ for (String badField : badFields) {
+ assertThat(e.getMessage(), containsString(badField));
+ }
+ }
+ }
+
+ @Test
+ public void testSchemaValidationFailsWithHelpfulErrorForInvalidNestedFields() {
+ List, List>> nonNestedFields =
+ Arrays.asList(
+ KV.of(
+ Arrays.asList(
+ "row.nested_row", "row.nested_int", "row.nested_str.unexpected_nested"),
+ Collections.singletonList("row.nested_str")),
+ KV.of(
+ Arrays.asList(
+ "nullable_row.nested_str",
+ "nullable_row.nested_str.unexpected",
+ "row.nested_int.unexpected_2"),
+ Arrays.asList("nullable_row.nested_str", "row.nested_int")));
+
+ for (KV, List> fields : nonNestedFields) {
+ List allFields = fields.getKey();
+ List badFields = fields.getValue();
+
+ IllegalArgumentException e =
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ RowFilter.validateSchemaContainsFields(ROW_SCHEMA, allFields, "test-operation"));
+
+ assertThat(e.getMessage(), containsString("Validation failed for 'test-operation'"));
+ assertThat(
+ e.getMessage(),
+ containsString(
+ "The following specified fields are not of type Row. Their nested fields could not be reached"));
+ for (String badField : badFields) {
+ assertThat(e.getMessage(), containsString(badField));
+ }
+ }
+ }
+
+ @Test
+ public void testGetFieldTree() {
+ List fields =
+ Arrays.asList(
+ "top-level",
+ "top-level-2",
+ "top-level.nested-level",
+ "top-level.nested-level-2",
+ "top-level.nested-level.doubly-nested-level",
+ "top-level.nested-level.doubly-nested-level-2");
+ List nestedLayer =
+ Arrays.asList(
+ "nested-level",
+ "nested-level-2",
+ "nested-level.doubly-nested-level",
+ "nested-level.doubly-nested-level-2");
+
+ Map> expectedTree =
+ ImmutableMap.>builder()
+ .put("top-level-2", Collections.emptyList())
+ .put("top-level", nestedLayer)
+ .build();
+
+ assertEquals(expectedTree, RowFilter.getFieldTree(fields));
+
+ List doublyNestedLayer = Arrays.asList("doubly-nested-level", "doubly-nested-level-2");
+
+ Map> expectedNestedTree =
+ ImmutableMap.>builder()
+ .put("nested-level-2", Collections.emptyList())
+ .put("nested-level", doublyNestedLayer)
+ .build();
+
+ assertEquals(expectedNestedTree, RowFilter.getFieldTree(nestedLayer));
+ }
+
+ @Test
+ public void testDropSchemaFields() {
+ List fieldsToDrop =
+ Arrays.asList(
+ "str",
+ "arr_int",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row.nested_str",
+ "nullable_row.nested_row");
+
+ Schema expectedDroppedSchema =
+ Schema.builder()
+ .addBooleanField("bool")
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField(
+ "nested_row", Schema.builder().addStringField("doubly_nested_str").build())
+ .build())
+ .addNullableRowField(
+ "nullable_row",
+ Schema.builder().addInt32Field("nested_int").addFloatField("nested_float").build())
+ .build();
+
+ assertTrue(expectedDroppedSchema.equivalent(RowFilter.dropFields(ROW_SCHEMA, fieldsToDrop)));
+ }
+
+ @Test
+ public void testKeepSchemaFields() {
+ List fieldsToKeep =
+ Arrays.asList(
+ "str",
+ "arr_int",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row.nested_str",
+ "nullable_row.nested_row");
+
+ Schema expectedKeptSchema =
+ Schema.builder()
+ .addStringField("str")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addNullableInt32Field("nullable_int")
+ .addRowField(
+ "row",
+ Schema.builder()
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField(
+ "nested_row", Schema.builder().addInt32Field("doubly_nested_int").build())
+ .build())
+ .addNullableRowField(
+ "nullable_row",
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build())
+ .build();
+
+ assertTrue(expectedKeptSchema.equivalent(RowFilter.keepFields(ROW_SCHEMA, fieldsToKeep)));
+ }
+
+ @Test
+ public void testDropNestedFieldsFails() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("RowFilter does not support specifying nested fields to drop");
+
+ new RowFilter(ROW_SCHEMA)
+ .dropping(
+ Arrays.asList(
+ "bool",
+ "nullable_int",
+ "row.nested_int",
+ "row.nested_float",
+ "row.nested_row.doubly_nested_int",
+ "nullable_row"));
+ }
+
+ @Test
+ public void testKeepNestedFieldsFails() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("RowFilter does not support specifying nested fields to keep");
+
+ new RowFilter(ROW_SCHEMA)
+ .keeping(
+ Arrays.asList("str", "arr_int", "row.nested_str", "row.nested_row.doubly_nested_str"));
+ }
+
+ @Test
+ public void testOnlyFailsWhenSpecifyingNonRowField() {
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Expected type 'ROW' for field 'nullable_int', but instead got type 'INT32'");
+
+ new RowFilter(ROW_SCHEMA).only("nullable_int");
+ }
+
+ private static final Row ORIGINAL_ROW =
+ Row.withSchema(ROW_SCHEMA)
+ .addValue("str_value")
+ .addValue(true)
+ .addValue(123)
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(NESTED_ROW_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(456)
+ .addValue(1.234f)
+ .addValue(
+ Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .addValue(789)
+ .build())
+ .build())
+ .addValue(null)
+ .build();
+
+ private static final Schema FILTERED_DOUBLY_NESTED_SCHEMA =
+ Schema.builder().addStringField("doubly_nested_str").build();
+ private static final Schema FILTERED_NESTED_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addRowField("nested_row", FILTERED_DOUBLY_NESTED_SCHEMA)
+ .build();
+ private static final Schema FILTERED_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", FILTERED_NESTED_SCHEMA)
+ .build();
+
+ private static final Row FILTERED_ROW =
+ Row.withSchema(FILTERED_SCHEMA)
+ .addValue("str_value")
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(FILTERED_NESTED_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(
+ Row.withSchema(FILTERED_DOUBLY_NESTED_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .build())
+ .build())
+ .build();
+
+ @Test
+ public void testCopyRowWithNewSchema() {
+ assertEquals(FILTERED_ROW, RowFilter.copyWithNewSchema(ORIGINAL_ROW, FILTERED_SCHEMA));
+ }
+
+ @Test
+ public void testOnlyRowField() {
+ RowFilter rowFilter = new RowFilter(ROW_SCHEMA).only("row");
+
+ Row expecedRow =
+ Row.withSchema(rowFilter.outputSchema())
+ .addValues(ORIGINAL_ROW.getRow("row").getValues())
+ .build();
+
+ assertEquals(expecedRow, rowFilter.filter(ORIGINAL_ROW));
+ }
+}