Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Java documentation to IcebergIO #32621

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/trigger_files/IO_Iceberg_Integration_Tests.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 1
"modification": 2
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.List;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.AfterFirst;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
Expand All @@ -37,14 +39,243 @@
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Predicates;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* The underlying Iceberg connector used by {@link org.apache.beam.sdk.managed.Managed#ICEBERG}. Not
* intended to be used directly.
* A connector that reads and writes to <a href="https://iceberg.apache.org/">Apache Iceberg</a>
* tables.
*
* <p>{@link IcebergIO} is offered as a {@link Managed} transform. This class is subject to change
* and should not be used directly. Instead, use it via {@link Managed#ICEBERG} like so:
*
* <pre>{@code
* Map<String, Object> config = Map.of(
* "table", table,
* "triggering_frequency_seconds", 5,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these details regarding using "Managed" transform for various sources/sinks such as Iceberg should also be in the Website. May be we can also remove it from here (to avoid duplication) once we have these details in a central place. It feels bit odd to have documentation in a class that we do not want users to use directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that Managed transforms should have a central place -- there are docs that serve this purpose for Dataflow. There isn't any Beam documentation for using IcebergIO though.

It feels bit odd to have documentation in a class that we do not want users to use directly

The class is publicly available for ppl to use (I think this was a prior mistake on our part, we should've kept it package private).

I was thinking if users discover this class, at least they have some clear documentation that drives them towards using the Managed interface.

* "catalog_name", name,
* "catalog_properties", Map.of(
* "warehouse", warehouse_path,
* "catalog-impl", "org.apache.iceberg.hive.HiveCatalog"),
* "config_properties", Map.of(
* "hive.metastore.uris", metastore_uri));
*
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(Managed.write(ICEBERG).withConfig(config));
*
*
* // ====== READ ======
* pipeline
* .apply(Managed.read(ICEBERG).withConfig(config))
* .getSinglePCollection()
* .apply(ParDo.of(...));
* }</pre>
*
* <h3>Configuration Options</h3>
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td> {@code table} </td> <td> {@code str} </td> <td> Required. A fully-qualified table identifier. You may also provide a
* template to use dynamic destinations (see the `Dynamic Destinations` section below for details). </td>
* </tr>
* <tr>
* <td> {@code triggering_frequency_seconds} </td> <td> int </td> <td> Required for streaming writes. Roughly every
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
* {@code triggering_frequency_seconds} duration, the sink will write records to data files and produce a table snapshot.
* Generally, a higher value will produce fewer, larger data files.
* </td>
* </tr>
* <tr>
* <td> {@code catalog_name} </td> <td> {@code str} </td> <td> The name of the catalog. Defaults to {@code apache-beam-<VERSION>}. </td>
* </tr>
* <tr>
* <td> {@code catalog_properties} </td> <td> {@code map<str, str>} </td> <td> A map of properties to be used when
* constructing the Iceberg catalog. Required properties will depend on what catalog you are using, but
* <a href="https://iceberg.apache.org/docs/latest/configuration/#catalog-properties">this list</a>
* is a good starting point. </td>
* </tr>
* <tr>
* <td> {@code config_properties} </td> <td> {@code map<str, str>} </td> <td> A map of properties
* to instantiate the catalog's Hadoop {@link Configuration}. Required properties will depend on your catalog
* implementation, but <a href="https://iceberg.apache.org/docs/latest/configuration/#hadoop-configuration">this list</a>
* is a good starting point.
* </tr>
* </table>
*
* <p><b>Additional configuration options are provided in the `Pre-filtering Options` section below,
* for Iceberg writes.</b>
*
* <h3>Beam Rows</h3>
*
* <p>Being a Managed transform, this IO exclusively writes and reads using Beam {@link Row}s.
* Conversion takes place between Beam {@link Row}s and Iceberg {@link Record}s using helper methods
* in {@link IcebergUtils}. Below is a type conversion table mapping Beam and Iceberg types:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Beam {@link Schema.FieldType}</b> </td> <td> <b>Iceberg {@link Type}</b>
* </tr>
* <tr>
* <td> BYTES </td> <td> BINARY </td>
* </tr>
* <tr>
* <td> BOOLEAN </td> <td> BOOLEAN </td>
* </tr>
* <tr>
* <td> STRING </td> <td> STRING </td>
* </tr>
* <tr>
* <td> INT32 </td> <td> INTEGER </td>
* </tr>
* <tr>
* <td> INT64 </td> <td> LONG </td>
* </tr>
* <tr>
* <td> DECIMAL </td> <td> STRING </td>
* </tr>
* <tr>
* <td> FLOAT </td> <td> FLOAT </td>
* </tr>
* <tr>
* <td> DOUBLE </td> <td> DOUBLE </td>
* </tr>
* <tr>
* <td> DATETIME </td> <td> STRING </td>
* </tr>
* <tr>
* <td> ITERABLE </td> <td> LIST </td>
* </tr>
* <tr>
* <td> ARRAY </td> <td> LIST </td>
* </tr>
* <tr>
* <td> MAP </td> <td> MAP </td>
* </tr>
* <tr>
* <td> ROW </td> <td> STRUCT </td>
* </tr>
* </table>
*
* <h3>Dynamic Destinations</h3>
*
* <p>Managed Iceberg supports writing to dynamic destinations. To do so, please provide an
* identifier template for the <b>{@code table}</b> parameter. A template should have placeholders
* represented as curly braces containing a record field name, e.g.: {@code
* "my_namespace.my_{foo}_table"}.
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
*
* <p>The sink uses simple String interpolation to determine a record's table destination. The
* placeholder is replaced with the record's field value. Nested fields can be specified using
* dot-notation (e.g. {@code "{top.middle.nested}"}).
*
* <h4>Pre-filtering Options</h4>
*
* <p>Some use cases may benefit from filtering record fields right before the write operation. For
* example, you may want to provide meta-data to guide records to the right destination, but not
* necessarily write that meta-data to your table. Some light-weight filtering options are provided
* to accommodate such cases, allowing you to control what actually gets written:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Parameter</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td>{@code drop}</td> <td>{@code list<str>}</td> <td>Drops the specified fields.</td>
* </tr>
* <tr>
* <td>{@code keep}</td> <td>{@code list<str>}</td> <td>Keeps the specified fields and drops the rest.</td>
* </tr>
* <tr>
* <td>{@code only}</td> <td>{@code str}</td> <td>Use this to specify a nested record you intend to write.
* That record wll be written and the rest will be dropped.</td>
* </tr>
* </table>
*
* <p>Example write to dynamic destinations (pseudocode):
*
* <pre>{@code
* Map<String, Object> config = Map.of(
* "table", "flights.{country}.{airport}",
* "catalog_properties", Map.of(...),
* "drop", ["country", "airport"]);
*
* JSON_ROWS = [
* // first record is written to table "flights.usa.RDU"
* "{\"country\": \"usa\"," +
* "\"airport\": \"RDU\"," +
* "\"flight_id\": \"AA356\"," +
* "\"destination\": \"SFO\"," +
* "\"meal\": \"chicken alfredo\"}",
* // second record is written to table "flights.qatar.HIA"
* "{\"country\": \"qatar\"," +
* "\"airport\": \"HIA\"," +
* "\"flight_id\": \"QR 875\"," +
* "\"destination\": \"DEL\"," +
* "\"meal\": \"shawarma\"}",
* ...
* ];
*
* // fields "country" and "airport" are dropped before
* // records are written to tables
* pipeline
* .apply(Create.of(JSON_ROWS))
* .apply(JsonToRow.withSchema(...))
* .apply(Managed.write(ICEBERG).withConfig(config));
*
* }</pre>
*
* <h3>Output Snapshots</h3>
*
* <p>When records are written and committed to a table, a snapshot is produced. A batch pipeline
* will perform a single commit and create a single snapshot per table. A streaming pipeline will
* produce a snapshot roughly according to the configured {@code triggering_frequency_seconds}.
*
* <p>You can access these snapshots and perform downstream processing by fetching the {@code
* "snapshots"} output PCollection:
*
* <pre>{@code
* pipeline
* .apply(Create.of(BEAM_ROWS))
* .apply(Managed.write(ICEBERG).withConfig(config))
* .get("snapshots")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use the SNAPSHOTS_TAG constant here ?

Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users would have to access it using IcebergWriteSchemaTransformProvider.SNAPSHOTS_TAG, which might be a little too low-level. I think it's better to insulate users from going that deep

* .apply(ParDo.of(new DoFn<Row, T> {...});
* }</pre>
*
* Each Snapshot is represented as a Beam Row, with the following Schema:
*
* <table border="1" cellspacing="1">
* <tr>
* <td> <b>Field</b> </td> <td> <b>Type</b> </td> <td> <b>Description</b> </td>
* </tr>
* <tr>
* <td> {@code table} </td> <td> {@code str} </td> <td> Table identifier. </td>
* </tr>
* <tr>
* <td> {@code manifest_list_location} </td> <td> {@code str} </td> <td> Location of the snapshot's manifest list. </td>
* </tr>
* <tr>
* <td> {@code operation} </td> <td> {@code str} </td> <td> Name of the operation that produced the snapshot. </td>
* </tr>
* <tr>
* <td> {@code parent_id} </td> <td> {@code long} </td> <td> The snapshot's parent ID. </td>
* </tr>
* <tr>
* <td> {@code schema_id} </td> <td> {@code int} </td> <td> The id of the schema used when the snapshot was created. </td>
* </tr>
* <tr>
* <td> {@code summary} </td> <td> {@code map<str, str>} </td> <td> A string map of summary data. </td>
* </tr>
* <tr>
* <td> {@code timestamp_millis} </td> <td> {@code long} </td> <td> The snapshot's timestamp in milliseconds. </td>
* </tr>
* </table>
*
* <p>For internal use only; no backwards compatibility guarantees
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class IcebergWriteSchemaTransformProvider
extends TypedSchemaTransformProvider<Configuration> {

static final String INPUT_TAG = "input";
static final String OUTPUT_TAG = "output";
static final String SNAPSHOTS_TAG = "snapshots";
Copy link
Contributor Author

@ahmedabu98 ahmedabu98 Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change that I'm comfortable with because I doubt anyone is fetching snapshots right now (and if they are, they're using .getSinglePcollection() because this tag name is not discoverable yet). I'd like to take the opportunity to give snapshots a more accurate tag and make room for a potential DLQ in the future


static final Schema OUTPUT_SCHEMA =
Schema.builder().addStringField("table").addFields(SnapshotInfo.SCHEMA.getFields()).build();
Expand Down Expand Up @@ -146,7 +146,7 @@ public List<String> inputCollectionNames() {

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(OUTPUT_TAG);
return Collections.singletonList(SNAPSHOTS_TAG);
}

@Override
Expand Down Expand Up @@ -204,7 +204,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
.apply(MapElements.via(new SnapshotToRow()))
.setRowSchema(OUTPUT_SCHEMA);

return PCollectionRowTuple.of(OUTPUT_TAG, snapshots);
return PCollectionRowTuple.of(SNAPSHOTS_TAG, snapshots);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public Row toRow() {
return SchemaRegistry.createDefault()
.getToRowFunction(SnapshotInfo.class)
.apply(this)
.sorted();
.sorted()
.toSnakeCase();
} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
Expand All @@ -69,7 +70,7 @@ public Row toRow() {
try {
SchemaRegistry registry = SchemaRegistry.createDefault();
CODER = registry.getSchemaCoder(SnapshotInfo.class);
SCHEMA = registry.getSchema(SnapshotInfo.class).sorted();
SCHEMA = registry.getSchema(SnapshotInfo.class).sorted().toSnakeCase();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another breaking change that I'm comfortable with. Better to unify everything to snake case format.

} catch (NoSuchSchemaException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.Configuration;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.INPUT_TAG;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.OUTPUT_TAG;
import static org.apache.beam.sdk.io.iceberg.IcebergWriteSchemaTransformProvider.SNAPSHOTS_TAG;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;

Expand Down Expand Up @@ -122,7 +122,7 @@ public void testSimpleAppend() {
PCollection<Row> result =
input
.apply("Append To Table", new IcebergWriteSchemaTransformProvider().from(config))
.get(OUTPUT_TAG);
.get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testWriteUsingManagedTransform() {
.apply("Records To Add", Create.of(TestFixtures.asRows(TestFixtures.FILE1SNAPSHOT1)))
.setRowSchema(IcebergUtils.icebergSchemaToBeamSchema(TestFixtures.SCHEMA));
PCollection<Row> result =
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(OUTPUT_TAG);
inputRows.apply(Managed.write(Managed.ICEBERG).withConfig(configMap)).get(SNAPSHOTS_TAG);

PAssert.that(result)
.satisfies(new VerifyOutputs(Collections.singletonList(identifier), "append"));
Expand Down
Loading