Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Java variants of WriteTo{Csv,Json}. #28380

Merged
merged 6 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdks/java/extensions/schemaio-expansion-service/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,14 @@ applyJavaNature(
dependencies {
implementation project(path: ":sdks:java:expansion-service")
permitUnusedDeclared project(path: ":sdks:java:expansion-service") // BEAM-11761
implementation project(":sdks:java:extensions:google-cloud-platform-core")
permitUnusedDeclared project(path: ":sdks:java:extensions:google-cloud-platform-core") // BEAM-11761
implementation project(":sdks:java:io:csv")
permitUnusedDeclared project(path: ":sdks:java:io:csv") // BEAM-11761
implementation project(":sdks:java:io:jdbc")
permitUnusedDeclared project(":sdks:java:io:jdbc") // BEAM-11761
implementation project(":sdks:java:io:json")
permitUnusedDeclared project(path: ":sdks:java:io:json") // BEAM-11761
implementation library.java.postgres
permitUnusedDeclared library.java.postgres // BEAM-11761
implementation project(path: ":model:pipeline", configuration: "shadow")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.csv.providers;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.service.AutoService;
import com.google.auto.value.AutoValue;
import java.util.Collections;
import java.util.List;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.io.csv.CsvIO;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.csv.CSVFormat;

/**
* An implementation of {@link TypedSchemaTransformProvider} for {@link CsvIO#write}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the Beam
* repository.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
@AutoService(SchemaTransformProvider.class)
public class CsvWriteTransformProvider
extends TypedSchemaTransformProvider<CsvWriteTransformProvider.CsvWriteConfiguration> {
private static final String INPUT_ROWS_TAG = "input";
private static final String WRITE_RESULTS = "output";

@Override
protected Class<CsvWriteConfiguration> configurationClass() {
return CsvWriteConfiguration.class;
}

@Override
protected SchemaTransform from(CsvWriteConfiguration configuration) {
return new CsvWriteTransform(configuration);
}

@Override
public String identifier() {
return String.format("beam:schematransform:org.apache.beam:csv_write:v1");
}

@Override
public List<String> inputCollectionNames() {
return Collections.singletonList(INPUT_ROWS_TAG);
}

@Override
public List<String> outputCollectionNames() {
return Collections.singletonList(WRITE_RESULTS);
}

/** Configuration for writing to BigQuery with Storage Write API. */
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class CsvWriteConfiguration {

public void validate() {
checkArgument(
!Strings.isNullOrEmpty(this.getPath()), "Path for a CSV Write must be specified.");
}

public static Builder builder() {
return new AutoValue_CsvWriteTransformProvider_CsvWriteConfiguration.Builder();
}

@SchemaFieldDescription("The file path to write to.")
public abstract String getPath();

/** Builder for {@link CsvWriteConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setPath(String path);

/** Builds a {@link CsvWriteConfiguration} instance. */
public abstract CsvWriteConfiguration build();
}
}

/** A {@link SchemaTransform} for {@link CsvIO#write}. */
protected static class CsvWriteTransform extends SchemaTransform {

private final CsvWriteConfiguration configuration;

CsvWriteTransform(CsvWriteConfiguration configuration) {
configuration.validate();
this.configuration = configuration;
}

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
WriteFilesResult<?> result =
input
.get(INPUT_ROWS_TAG)
.apply(CsvIO.writeRows(configuration.getPath(), CSVFormat.DEFAULT).withSuffix(""));
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
result
.getPerDestinationOutputFilenames()
.apply(
"Collect filenames",
MapElements.into(TypeDescriptors.rows())
.via(
(destinationAndRow) ->
Row.withSchema(outputSchema)
.withFieldValue("filename", destinationAndRow.getValue())
.build()))
.setRowSchema(outputSchema));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/** Transforms for reading and writing CSV files. */
package org.apache.beam.sdk.io.csv.providers;
35 changes: 35 additions & 0 deletions sdks/java/io/json/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an AS IS BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

plugins { id 'org.apache.beam.module' }
applyJavaNature(
automaticModuleName: 'org.apache.beam.sdk.io.json'
)

description = "Apache Beam :: SDKs :: Java :: IO :: JSON"
ext.summary = "IO to read and write JSON files."

dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.everit_json_schema
testImplementation project(path: ":sdks:java:core", configuration: "shadowTest")
testImplementation library.java.junit
testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow")
testImplementation project(path: ":sdks:java:io:common", configuration: "testRuntimeMigration")
}
Loading
Loading