Skip to content

Commit

Permalink
[java-extension-sql] Beam Sql doesn't honor aliases because of `BeamA…
Browse files Browse the repository at this point in the history
…ggregateProjectMergeRule` optimization (apache#30902)
  • Loading branch information
brachi-wernick authored Apr 10, 2024
1 parent e6b7a66 commit c218864
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregateProjectMergeRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamBasicAggregationRule;
import org.apache.beam.sdk.extensions.sql.impl.rule.BeamCalcMergeRule;
Expand Down Expand Up @@ -83,7 +82,7 @@ public class BeamRuleSets {
CoreRules.PROJECT_SET_OP_TRANSPOSE,

// aggregation and projection rules
BeamAggregateProjectMergeRule.INSTANCE,
// BeamAggregateProjectMergeRule.INSTANCE,
// push a projection past a filter or vice versa
CoreRules.PROJECT_FILTER_TRANSPOSE,
CoreRules.FILTER_PROJECT_TRANSPOSE,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.extensions.sql;

import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.junit.Rule;
import org.junit.Test;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.MapperFeature;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;

public class BeamSqlAliasTest implements Serializable {

@Rule public final transient TestPipeline pipeline = TestPipeline.create();

@Test
public void testSqlWithAliasIsNotIgnoredWithOptimizers() {
String ID = "id";
String EVENT = "event";

Schema inputType = Schema.builder().addStringField(ID).addStringField(EVENT).build();

String sql =
"select event as event_name, count(*) as c\n" + "from PCOLLECTION\n" + "group by event";

List<Row> inputRows =
TestUtils.RowsBuilder.of(inputType).addRows("123", "some_event").getRows();

PCollection<String> rowPCollection =
pipeline
.apply("boundedInput", Create.of(inputRows).withRowSchema(inputType))
.apply(SqlTransform.query(sql))
.apply(
ParDo.of(
new DoFn<Row, String>() {
@DoFn.ProcessElement
public void processElement(DoFn<Row, String>.ProcessContext c)
throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
Map<String, Object> map = new HashMap<>();

for (int i =
Objects.requireNonNull(c.element()).getSchema().getFields().size()
- 1;
i >= 0;
i--) {
Object value = Objects.requireNonNull(c.element()).getValue(i);
Schema.Field field =
Objects.requireNonNull(c.element()).getSchema().getField(i);
map.put(field.getName(), value);
}

String json =
objectMapper
.configure(MapperFeature.SORT_PROPERTIES_ALPHABETICALLY, true)
.writeValueAsString(map);
c.output(json);
}
}))
.setCoder(StringUtf8Coder.of());

// assert alias is kept
PAssert.that(rowPCollection).containsInAnyOrder("{\"c\":1,\"event_name\":\"some_event\"}");

pipeline.run().waitUntilFinish();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.Row;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -107,6 +108,7 @@ public void testBeamAggregateProjectMergeRule_withProjectTable_withPredicate() {
}

@Test
@Ignore("BeamAggregateProjectMergeRule disabled due to CALCITE-6357")
public void testBeamAggregateProjectMergeRule_withFilterTable() {
// When an IO does not supports project push-down, Projects should be merged with an aggregate.
String sqlQuery = "select SUM(id) as id_sum from TEST_FILTER group by name";
Expand All @@ -126,6 +128,7 @@ public void testBeamAggregateProjectMergeRule_withFilterTable() {
}

@Test
@Ignore("BeamAggregateProjectMergeRule disabled due to CALCITE-6357")
public void testBeamAggregateProjectMergeRule_withNoneTable() {
// When an IO does not supports project push-down, Projects should be merged with an aggregate.
String sqlQuery = "select SUM(id) as id_sum from TEST_NONE group by name";
Expand Down

0 comments on commit c218864

Please sign in to comment.