Skip to content

Commit

Permalink
Merge branch 'master' into cache-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP committed Apr 12, 2024
2 parents f2c3bfb + a764632 commit 360d26c
Show file tree
Hide file tree
Showing 22 changed files with 1,448 additions and 399 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ Most workflows will get kicked off automatically when you open a PR, push code,

If you would like to manually trigger a job, you have 2 options:

1) Trigger Phrases: Many jobs have trigger phrases associated with them (e.g. `Run XYZ PreCommit`). These will appear in statuses of previous PR runs of that check. You can trigger the job on any PR by commenting that trigger phrase in the PR.
1) Trigger Phrases: Some jobs have trigger phrases associated with them (e.g. `Run XYZ PreCommit`). These will appear in statuses of previous PR runs of that check. You can trigger the job on any PR by commenting that trigger phrase in the PR.

**Note:** this approach is found not scalable ([#28909](https://github.com/apache/beam/issues/28909)) and currently only enabled for PreCommit workflows. For PostCommit jobs, it is currently replaced by a temporary approach: test suites are configured to trigger whenever a particular trigger file is modified. Test [workflows](https://github.com/apache/beam/tree/master/.github/workflows) have [pull_request_target paths](https://github.com/apache/beam/blob/e33dec69c7cfd01c0b827538e1dad8567e3ff95e/.github/workflows/beam_PreCommit_Whitespace.yml#L25), which include a trigger file. Whenever a trigger file is modified, the test suite will trigger on the pull request. Make any change to this file to trigger the job. The trigger file looks like the following: `.github/trigger_files/<workflow_file_name.json>`.

2) **Committers only** - Manual triggering: Any committer can start any job with a [workflow_dispatch](https://docs.github.com/en/actions/using-workflows/events-that-trigger-workflows#workflow_dispatch) trigger defined (all jobs should have these). To do so, navigate to the [Actions tab](https://github.com/apache/beam/actions), click on your desired workflow in the left navigation bar, and then click `Run Workflow`.

# Guidelines for Adding or Modifying Workflows
Expand Down Expand Up @@ -139,7 +142,7 @@ In order to make it easier for non-committers to interact with workflows, workfl
2) Each job should have the rerun action immediately after its checkout step. You can add a step that uses the `setup-action` action in your workflow, which encapsulates the checkout and rerun logic in one place. This should be gated on the comment trigger (example: https://github.com/apache/beam/blob/0ee2dc73ec6f555a5bf1a643dffd37f4927be67e/.github/workflows/beam_PreCommit_Go.yml#L65-L70)
3) Each job should have a descriptive name that includes the comment trigger (example: https://github.com/apache/beam/blob/ba8fc935222aeb070668fbafd588bc58e7a21289/.github/workflows/beam_PreCommit_CommunityMetrics.yml#L48)

**Note:** this approach is found not scalable ([#28909](https://github.com/apache/beam/issues/28909)) and currently only enabled for PreCommit workflows. For PostCommit jobs, it is currently replaced by a temporary approach of `pull_request_target` trigger with specific path `.github/trigger_files/<workflow_file_name_stem.json>`.
**Note:** Comment triggering is found not scalable ([#28909](https://github.com/apache/beam/issues/28909)) and is currently limited to a subset of suites. For more information see the [Running Workflows Manually](#running-workflows-manually) section.

# Testing new workflows or workflow updates

Expand Down
44 changes: 16 additions & 28 deletions contributor-docs/java-dependency-upgrades.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,26 @@ or on the dev mailing list, for [example](https://lists.apache.org/thread/jgjdt5

# Google Cloud-related dependency upgrades

To provide the consistent dependencies to Beam users, follow the following steps when upgrading Google Cloud-related dependencies:

- [ ] Set the Libraries BOM version. Find the latest release in
https://github.com/googleapis/java-cloud-bom/releases and set libraries-bom
value in BeamModulePlugin.groovy
- [ ] Find core Google Java library versions.
- Such as gRPC, Protobuf, Guava, Google Auth Library in the release note
of the Libraries BOM and set them in BeamModulePlugin.groovy
- [ ] Find appropriate Netty version by checking io.grpc:grpc-netty's
dependency declaration. For example, you can tell gRPC version 1.49.0
was built with Netty "4.1.77.Final" by reading
https://search.maven.org/artifact/io.grpc/grpc-netty/1.49.0/jar:
```
<artifactId>netty-codec-http2</artifactId>
<version>4.1.77.Final</version>
```
- [ ] Update netty_version in BeamModulePlugin.groovy
- [ ] Find netty-tcnative version via netty-parent artifact. For example, you
can tell Netty 4.1.77.Final was built with netty-tcnative "2.0.52.Final".
https://search.maven.org/artifact/io.netty/netty-parent/4.1.77.Final/jar:
```
<tcnative.version>2.0.52.Final</tcnative.version>
```
- [ ] Update netty_tcnative_boringssl_static version in BeamModulePlugin.groovy


The following script may be useful to identify matching/consistent dependency overrides.
Beam uses [GCP-BOM](https://cloud.google.com/java/docs/bom) to manage Google Cloud-related dependencies. A [script](../scripts/tools/bomupgrader.py) is used to upgrade GCP-BOM dependencies. To upgrade, find the latest BOM version in https://mvnrepository.com/artifact/com.google.cloud/libraries-bom and run

```
python scripts/tools/gcpbomupgrader.py <latest_bom_version>
```

then the changes will made in place.

This script does the following steps:

1. preprocessing BeamModulePlugin.groovy to decide the dependencies need to sync
2. generate an empty Maven project to fetch the exact target versions to change
3. Write back to BeamModulePlugin.groovy
4. Update libraries-bom version on sdks/java/container/license_scripts/dep_urls_java.yaml

In the case of modifying the script or diagnose, the following commands may be useful to identify matching/consistent dependency overrides.

export BOM_VERSION=26.22.0 ; \
cd /tmp; \
wget https://repo1.maven.org/maven2/com/google/cloud/libraries-bom/$BOM_VERSION/libraries-bom-$BOM_VERSION.pom -O base.pom && \
mvn help:effective-pom -f base.pom -Doutput=effective.pom && cat effective.pom | \
grep -v 'dependencyManagement' > cleanup.pom && \
mvn dependency:tree -f cleanup.pom

2 changes: 1 addition & 1 deletion release/src/main/groovy/MobileGamingCommands.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class MobileGamingCommands {
SparkRunner: "spark-runner",
FlinkRunner: "flink-runner"]

public static final EXECUTION_TIMEOUT_IN_MINUTES = 20
public static final EXECUTION_TIMEOUT_IN_MINUTES = 40

// Lists used to verify team names generated in the LeaderBoard example.
// This list should be kept sync with COLORS in org.apache.beam.examples.complete.game.injector.Injector.
Expand Down
15 changes: 4 additions & 11 deletions sdks/java/io/iceberg/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,20 @@ def hive_version = "3.1.3"
dependencies {
implementation library.java.vendored_guava_32_1_2_jre
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":sdks:java:io:hadoop-common")
implementation library.java.slf4j_api
implementation library.java.joda_time
implementation "org.apache.parquet:parquet-column:$parquet_version"
implementation "org.apache.parquet:parquet-common:$parquet_version"
implementation "org.apache.orc:orc-core:$orc_version"
implementation "org.apache.iceberg:iceberg-core:$iceberg_version"
implementation "org.apache.iceberg:iceberg-api:$iceberg_version"
implementation "org.apache.iceberg:iceberg-parquet:$iceberg_version"
implementation "org.apache.iceberg:iceberg-orc:$iceberg_version"
implementation "org.apache.iceberg:iceberg-arrow:$iceberg_version"
implementation "org.apache.iceberg:iceberg-data:$iceberg_version"
implementation library.java.avro
implementation library.java.hadoop_common



provided library.java.avro
provided library.java.hadoop_client
permitUnusedDeclared library.java.hadoop_client
provided library.java.hadoop_common
testImplementation library.java.hadoop_client

testImplementation "org.apache.iceberg:iceberg-gcp:$iceberg_version"
testImplementation "org.apache.iceberg:iceberg-data:$iceberg_version"
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testRuntimeOnly library.java.slf4j_jdk14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,16 @@
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.checkerframework.checker.nullness.qual.Nullable;

public class IcebergIO {

Expand All @@ -28,6 +35,10 @@ public static WriteRows writeToDynamicDestinations(
return new WriteRows(catalog, dynamicDestinations);
}

public static ReadTable readTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
return new ReadTable(catalogConfig, tableId);
}

static class WriteRows extends PTransform<PCollection<Row>, IcebergWriteResult> {

private final IcebergCatalogConfig catalog;
Expand All @@ -47,4 +58,36 @@ public IcebergWriteResult expand(PCollection<Row> input) {
"Write Rows to Destinations", new WriteToDestinations(catalog, dynamicDestinations));
}
}

public static class ReadTable extends PTransform<PBegin, PCollection<Row>> {

private final IcebergCatalogConfig catalogConfig;
private final transient @Nullable TableIdentifier tableId;

private TableIdentifier getTableId() {
return checkStateNotNull(
tableId, "Transient field tableId null; it should not be accessed after serialization");
}

private ReadTable(IcebergCatalogConfig catalogConfig, TableIdentifier tableId) {
this.catalogConfig = catalogConfig;
this.tableId = tableId;
}

@Override
public PCollection<Row> expand(PBegin input) {

Table table = catalogConfig.catalog().loadTable(getTableId());

return input.apply(
Read.from(
new ScanSource(
IcebergScanConfig.builder()
.setCatalogConfig(catalogConfig)
.setScanType(IcebergScanConfig.ScanType.TABLE)
.setTableIdentifier(getTableId())
.setSchema(SchemaAndRowConversions.icebergSchemaToBeamSchema(table.schema()))
.build())));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.io.iceberg;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Expression;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.dataflow.qual.Pure;

@AutoValue
public abstract class IcebergScanConfig implements Serializable {

private transient @MonotonicNonNull Table cachedTable;

public enum ScanType {
TABLE,
BATCH
}

@Pure
public abstract ScanType getScanType();

@Pure
public abstract IcebergCatalogConfig getCatalogConfig();

@Pure
public abstract String getTableIdentifier();

@Pure
public Table getTable() {
if (cachedTable == null) {
cachedTable =
getCatalogConfig().catalog().loadTable(TableIdentifier.parse(getTableIdentifier()));
}
return cachedTable;
}

@Pure
public abstract Schema getSchema();

@Pure
public abstract @Nullable Expression getFilter();

@Pure
public abstract @Nullable Boolean getCaseSensitive();

@Pure
public abstract ImmutableMap<String, String> getOptions();

@Pure
public abstract @Nullable Long getSnapshot();

@Pure
public abstract @Nullable Long getTimestamp();

@Pure
public abstract @Nullable Long getFromSnapshotInclusive();

@Pure
public abstract @Nullable String getFromSnapshotRefInclusive();

@Pure
public abstract @Nullable Long getFromSnapshotExclusive();

@Pure
public abstract @Nullable String getFromSnapshotRefExclusive();

@Pure
public abstract @Nullable Long getToSnapshot();

@Pure
public abstract @Nullable String getToSnapshotRef();

@Pure
public abstract @Nullable String getTag();

@Pure
public abstract @Nullable String getBranch();

@Pure
public static Builder builder() {
return new AutoValue_IcebergScanConfig.Builder()
.setScanType(ScanType.TABLE)
.setFilter(null)
.setCaseSensitive(null)
.setOptions(ImmutableMap.of())
.setSnapshot(null)
.setTimestamp(null)
.setFromSnapshotInclusive(null)
.setFromSnapshotRefInclusive(null)
.setFromSnapshotExclusive(null)
.setFromSnapshotRefExclusive(null)
.setToSnapshot(null)
.setToSnapshotRef(null)
.setTag(null)
.setBranch(null);
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setScanType(ScanType type);

public abstract Builder setCatalogConfig(IcebergCatalogConfig catalog);

public abstract Builder setTableIdentifier(String tableIdentifier);

public Builder setTableIdentifier(TableIdentifier tableIdentifier) {
return this.setTableIdentifier(tableIdentifier.toString());
}

public Builder setTableIdentifier(String... names) {
return setTableIdentifier(TableIdentifier.of(names));
}

public abstract Builder setSchema(Schema schema);

public abstract Builder setFilter(@Nullable Expression filter);

public abstract Builder setCaseSensitive(@Nullable Boolean caseSensitive);

public abstract Builder setOptions(ImmutableMap<String, String> options);

public abstract Builder setSnapshot(@Nullable Long snapshot);

public abstract Builder setTimestamp(@Nullable Long timestamp);

public abstract Builder setFromSnapshotInclusive(@Nullable Long fromInclusive);

public abstract Builder setFromSnapshotRefInclusive(@Nullable String ref);

public abstract Builder setFromSnapshotExclusive(@Nullable Long fromExclusive);

public abstract Builder setFromSnapshotRefExclusive(@Nullable String ref);

public abstract Builder setToSnapshot(@Nullable Long snapshot);

public abstract Builder setToSnapshotRef(@Nullable String ref);

public abstract Builder setTag(@Nullable String tag);

public abstract Builder setBranch(@Nullable String branch);

public abstract IcebergScanConfig build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package org.apache.beam.io.iceberg;

import static org.apache.beam.io.iceberg.RowHelper.rowToRecord;
import static org.apache.beam.io.iceberg.SchemaAndRowConversions.rowToRecord;

import java.io.IOException;
import org.apache.beam.sdk.values.Row;
Expand All @@ -30,7 +30,6 @@
import org.apache.iceberg.data.parquet.GenericParquetWriter;
import org.apache.iceberg.io.DataWriter;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.parquet.Parquet;

class RecordWriter {
Expand Down
Loading

0 comments on commit 360d26c

Please sign in to comment.