diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java index 9851b2fcbf21..8d5b4d4fa08d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/planner/BeamRuleSets.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode; -import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregateProjectMergeRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule; import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcMergeRule; @@ -83,7 +82,7 @@ public class BeamRuleSets { CoreRules.PROJECT_SET_OP_TRANSPOSE, // aggregation and projection rules - BeamAggregateProjectMergeRule.INSTANCE, + // BeamAggregateProjectMergeRule.INSTANCE, // push a projection past a filter or vice versa CoreRules.PROJECT_FILTER_TRANSPOSE, CoreRules.FILTER_PROJECT_TRANSPOSE, diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest new file mode 100644 index 000000000000..790312b7e756 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlAliasTest @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature; +import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper; + +public class BeamSqlAliasTest implements Serializable { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testSqlWithAliasIsNotIgnoredWithOptimizers() { + String ID = "id"; + String EVENT = "event"; + + Schema inputType = Schema.builder().addStringField(ID).addStringField(EVENT).build(); + + String sql = + "select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" + "group by event"; + + List inputRows = + TestUtils.RowsBuilder.of(inputType).addRows("123", "some_event").getRows(); + + PCollection rowPCollection = + pipeline + .apply("boundedInput", Create.of(inputRows).withRowSchema(inputType)) + .apply(SqlTransform.query(sql)) + .apply( + ParDo.of( + new DoFn() { + @DoFn.ProcessElement + public void processElement(DoFn.ProcessContext c) + throws Exception { + ObjectMapper objectMapper = new ObjectMapper(); + Map map = new HashMap<>(); + + for (int i = + Objects.requireNonNull(c.element()).getSchema().getFields().size() + - 1; + i >= 0; + i--) { + Object value = Objects.requireNonNull(c.element()).getValue(i); + Schema.Field field = + Objects.requireNonNull(c.element()).getSchema().getField(i); + map.put(field.getName(), value); + } + + String json = + objectMapper + .configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true) + .writeValueAsString(map); + c.output(json); + } + })) + .setCoder(StringUtf8Coder.of()); + + // assert alias is kept + PAssert.that(rowPCollection).containsInAnyOrder("{\"c\":1,\"event_name\":\"some_event\"}"); + + pipeline.run().waitUntilFinish(); + } +} \ No newline at end of file diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java index 593febb9f190..4eff5c753c4e 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/impl/rule/BeamAggregateProjectMergeRuleTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.values.Row; import org.junit.Before; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -107,6 +108,7 @@ public void testBeamAggregateProjectMergeRule_withProjectTable_withPredicate() { } @Test + @Ignore("BeamAggregateProjectMergeRule disabled due to CALCITE-6357") public void testBeamAggregateProjectMergeRule_withFilterTable() { // When an IO does not supports project push-down, Projects should be merged with an aggregate. String sqlQuery = "select SUM(id) as id_sum from TEST_FILTER group by name"; @@ -126,6 +128,7 @@ public void testBeamAggregateProjectMergeRule_withFilterTable() { } @Test + @Ignore("BeamAggregateProjectMergeRule disabled due to CALCITE-6357") public void testBeamAggregateProjectMergeRule_withNoneTable() { // When an IO does not supports project push-down, Projects should be merged with an aggregate. String sqlQuery = "select SUM(id) as id_sum from TEST_NONE group by name";