diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index d9dde11a3081..a6cf7ebb12a5 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -291,8 +291,8 @@ * grouped into batches. The default maximum size of the batch is set to 1MB or 5000 mutated cells, * or 500 rows (whichever is reached first). To override this use {@link * Write#withBatchSizeBytes(long) withBatchSizeBytes()}, {@link Write#withMaxNumMutations(long) - * withMaxNumMutations()} or {@link Write#withMaxNumMutations(long) withMaxNumRows()}. Setting - * either to a small value or zero disables batching. + * withMaxNumMutations()} or {@link Write#withMaxNumRows(long) withMaxNumRows()}. Setting either to + * a small value or zero disables batching. * *

Note that the maximum diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java index 5cd9cb47b696..76440b1ebf1a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerReadSchemaTransformProvider.java @@ -41,6 +41,7 @@ import org.apache.beam.sdk.values.TypeDescriptor; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +/** A provider for reading from Cloud Spanner using a Schema Transform Provider. */ @SuppressWarnings({ "nullness" // TODO(https://github.com/apache/beam/issues/20497) }) @@ -54,43 +55,81 @@ * *

The transformation leverages the {@link SpannerIO} to perform the read operation and maps the * results to Beam rows, preserving the schema. - * - *

Example usage in a YAML pipeline using query: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: ReadFromSpanner
- *       name: ReadShipments
- *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         query: 'SELECT * FROM shipments'
- * }
- * - *

Example usage in a YAML pipeline using a table and columns: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: ReadFromSpanner
- *       name: ReadShipments
- *       # Columns: shipment_id, customer_id, shipment_date, shipment_cost, customer_name, customer_email
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table: 'shipments'
- *         columns: ['customer_id', 'customer_name']
- * }
*/ @AutoService(SchemaTransformProvider.class) public class SpannerReadSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerReadSchemaTransformProvider.SpannerReadSchemaTransformConfiguration> { + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:spanner_read:v1"; + } + + @Override + public String description() { + return "Performs a Bulk read from Google Cloud Spanner using a specified SQL query or " + + "by directly accessing a single table and its columns.\n" + + "\n" + + "Both Query and Read APIs are supported. See more information about " + + "
reading from Cloud Spanner.\n" + + "\n" + + "Example configuration for performing a read using a SQL query: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " query: 'SELECT * FROM table'\n" + + "\n" + + "It is also possible to read a table by specifying a table name and a list of columns. For " + + "example, the following configuration will perform a read on an entire table: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " columns: ['col1', 'col2']\n" + + "\n" + + "Additionally, to read using a " + + "Secondary Index, specify the index name: ::" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + " index: 'my-index'\n" + + " columns: ['col1', 'col2']\n" + + "\n" + + "### Advanced Usage\n" + + "\n" + + "Reads by default use the " + + "PartitionQuery API which enforces some limitations on the type of queries that can be used so that " + + "the data can be read in parallel. If the query is not supported by the PartitionQuery API, then you " + + "can specify a non-partitioned read by setting batching to false.\n" + + "\n" + + "For example: ::" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " batching: false\n" + + " ...\n" + + "\n" + + "Note: See " + + "SpannerIO for more advanced information."; + } + static class SpannerSchemaTransformRead extends SchemaTransform implements Serializable { private final SpannerReadSchemaTransformConfiguration configuration; @@ -113,6 +152,12 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } else { read = read.withTable(configuration.getTableId()).withColumns(configuration.getColumns()); } + if (!Strings.isNullOrEmpty(configuration.getIndex())) { + read = read.withIndex(configuration.getIndex()); + } + if (Boolean.FALSE.equals(configuration.getBatching())) { + read = read.withBatching(false); + } PCollection spannerRows = input.getPipeline().apply(read); Schema schema = spannerRows.getSchema(); PCollection rows = @@ -124,11 +169,6 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } } - @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:spanner_read:v1"; - } - @Override public List inputCollectionNames() { return Collections.emptyList(); @@ -157,6 +197,10 @@ public abstract static class Builder { public abstract Builder setColumns(List columns); + public abstract Builder setIndex(String index); + + public abstract Builder setBatching(Boolean batching); + public abstract SpannerReadSchemaTransformConfiguration build(); } @@ -193,16 +237,16 @@ public static Builder builder() { .Builder(); } - @SchemaFieldDescription("Specifies the GCP project ID.") - @Nullable - public abstract String getProjectId(); - @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @SchemaFieldDescription("Specifies the Cloud Spanner database.") public abstract String getDatabaseId(); + @SchemaFieldDescription("Specifies the GCP project ID.") + @Nullable + public abstract String getProjectId(); + @SchemaFieldDescription("Specifies the Cloud Spanner table.") @Nullable public abstract String getTableId(); @@ -211,9 +255,20 @@ public static Builder builder() { @Nullable public abstract String getQuery(); - @SchemaFieldDescription("Specifies the columns to read from the table.") + @SchemaFieldDescription( + "Specifies the columns to read from the table. This parameter is required when table is specified.") @Nullable public abstract List getColumns(); + + @SchemaFieldDescription( + "Specifies the Index to read from. This parameter can only be specified when using table.") + @Nullable + public abstract String getIndex(); + + @SchemaFieldDescription( + "Set to false to disable batching. Useful when using a query that is not compatible with the PartitionQuery API. Defaults to true.") + @Nullable + public abstract Boolean getBatching(); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java index 9f079c78f886..8601da09ea09 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerWriteSchemaTransformProvider.java @@ -67,49 +67,37 @@ *

The transformation uses the {@link SpannerIO} to perform the write operation and provides * options to handle failed mutations, either by throwing an error, or passing the failed mutation * further in the pipeline for dealing with accordingly. - * - *

Example usage in a YAML pipeline without error handling: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: WriteToSpanner
- *       name: WriteShipments
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table_id: 'shipments'
- *
- * }
- * - *

Example usage in a YAML pipeline using error handling: - * - *

{@code
- * pipeline:
- *   transforms:
- *     - type: WriteToSpanner
- *       name: WriteShipments
- *       config:
- *         project_id: 'apache-beam-testing'
- *         instance_id: 'shipment-test'
- *         database_id: 'shipment'
- *         table_id: 'shipments'
- *         error_handling:
- *           output: 'errors'
- *
- *     - type: WriteToJson
- *       input: WriteSpanner.my_error_output
- *       config:
- *          path: errors.json
- *
- * }
*/ @AutoService(SchemaTransformProvider.class) public class SpannerWriteSchemaTransformProvider extends TypedSchemaTransformProvider< SpannerWriteSchemaTransformProvider.SpannerWriteSchemaTransformConfiguration> { + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:spanner_write:v1"; + } + + @Override + public String description() { + return "Performs a bulk write to a Google Cloud Spanner table.\n" + + "\n" + + "Example configuration for performing a write to a single table: ::\n" + + "\n" + + " pipeline:\n" + + " transforms:\n" + + " - type: ReadFromSpanner\n" + + " config:\n" + + " project_id: 'my-project-id'\n" + + " instance_id: 'my-instance-id'\n" + + " database_id: 'my-database'\n" + + " table: 'my-table'\n" + + "\n" + + "Note: See " + + "SpannerIO for more advanced information."; + } + @Override protected Class configurationClass() { return SpannerWriteSchemaTransformConfiguration.class; @@ -225,11 +213,6 @@ public PCollectionRowTuple expand(@NonNull PCollectionRowTuple input) { } } - @Override - public String identifier() { - return "beam:schematransform:org.apache.beam:spanner_write:v1"; - } - @Override public List inputCollectionNames() { return Collections.singletonList("input"); @@ -244,10 +227,6 @@ public List outputCollectionNames() { @DefaultSchema(AutoValueSchema.class) public abstract static class SpannerWriteSchemaTransformConfiguration implements Serializable { - @SchemaFieldDescription("Specifies the GCP project.") - @Nullable - public abstract String getProjectId(); - @SchemaFieldDescription("Specifies the Cloud Spanner instance.") public abstract String getInstanceId(); @@ -257,7 +236,11 @@ public abstract static class SpannerWriteSchemaTransformConfiguration implements @SchemaFieldDescription("Specifies the Cloud Spanner table.") public abstract String getTableId(); - @SchemaFieldDescription("Specifies how to handle errors.") + @SchemaFieldDescription("Specifies the GCP project.") + @Nullable + public abstract String getProjectId(); + + @SchemaFieldDescription("Whether and how to handle write errors.") @Nullable public abstract ErrorHandling getErrorHandling(); diff --git a/sdks/python/apache_beam/yaml/standard_io.yaml b/sdks/python/apache_beam/yaml/standard_io.yaml index 4de36b3dc9e0..400ab07a41fa 100644 --- a/sdks/python/apache_beam/yaml/standard_io.yaml +++ b/sdks/python/apache_beam/yaml/standard_io.yaml @@ -271,6 +271,8 @@ table: 'table_id' query: 'query' columns: 'columns' + index: 'index' + batching: 'batching' 'WriteToSpanner': project: 'project_id' instance: 'instance_id'